Fix: relayd vs consumerd compatibility
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
29#include <common/consumer/consumer-stream.h>
30#include <common/consumer/consumer-timer.h>
31#include <common/consumer/consumer-testpoint.h>
32#include <common/ust-consumer/ust-consumer.h>
331744e3 33
2b8f8754
MD
34static struct timer_signal_data timer_signal = {
35 .tid = 0,
36 .setup_done = 0,
37 .qs_done = 0,
38 .lock = PTHREAD_MUTEX_INITIALIZER,
39};
331744e3
JD
40
41/*
42 * Set custom signal mask to current thread.
43 */
44static void setmask(sigset_t *mask)
45{
46 int ret;
47
48 ret = sigemptyset(mask);
49 if (ret) {
50 PERROR("sigemptyset");
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
53 if (ret) {
d3e2ba59 54 PERROR("sigaddset switch");
331744e3
JD
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
57 if (ret) {
d3e2ba59
JD
58 PERROR("sigaddset teardown");
59 }
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
61 if (ret) {
62 PERROR("sigaddset live");
331744e3
JD
63 }
64}
65
66/*
67 * Execute action on a timer switch.
d98a47c7
MD
68 *
69 * Beware: metadata_switch_timer() should *never* take a mutex also held
70 * while consumer_timer_switch_stop() is called. It would result in
71 * deadlocks.
331744e3
JD
72 */
73static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
74 int sig, siginfo_t *si, void *uc)
75{
76 int ret;
77 struct lttng_consumer_channel *channel;
78
79 channel = si->si_value.sival_ptr;
80 assert(channel);
81
4419b4fb
MD
82 if (channel->switch_timer_error) {
83 return;
84 }
85
331744e3
JD
86 DBG("Switch timer for channel %" PRIu64, channel->key);
87 switch (ctx->type) {
88 case LTTNG_CONSUMER32_UST:
89 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
90 /*
91 * Locks taken by lttng_ustconsumer_request_metadata():
92 * - metadata_socket_lock
93 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 94 * - channel->metadata_cache->lock
4fa3dc0e 95 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
96 * - channel->timer_lock
97 * - channel->metadata_cache->lock
4fa3dc0e 98 *
5e41ebe1
MD
99 * Ensure that neither consumer_data.lock nor
100 * channel->lock are taken within this function, since
101 * they are held while consumer_timer_switch_stop() is
102 * called.
4fa3dc0e 103 */
94d49140 104 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 105 if (ret < 0) {
4419b4fb 106 channel->switch_timer_error = 1;
331744e3
JD
107 }
108 break;
109 case LTTNG_CONSUMER_KERNEL:
110 case LTTNG_CONSUMER_UNKNOWN:
111 assert(0);
112 break;
113 }
114}
115
528f2ffa
JD
116static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
117 uint64_t stream_id)
d3e2ba59
JD
118{
119 int ret;
50adc264 120 struct ctf_packet_index index;
d3e2ba59
JD
121
122 memset(&index, 0, sizeof(index));
528f2ffa 123 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
124 index.timestamp_end = htobe64(ts);
125 ret = consumer_stream_write_index(stream, &index);
126 if (ret < 0) {
127 goto error;
128 }
129
130error:
131 return ret;
132}
133
c585821b 134int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 135{
528f2ffa 136 uint64_t ts, stream_id;
d3e2ba59
JD
137 int ret;
138
d3e2ba59
JD
139 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
140 if (ret < 0) {
141 ERR("Failed to get the current timestamp");
c585821b 142 goto end;
d3e2ba59
JD
143 }
144 ret = kernctl_buffer_flush(stream->wait_fd);
145 if (ret < 0) {
146 ERR("Failed to flush kernel stream");
c585821b 147 goto end;
d3e2ba59
JD
148 }
149 ret = kernctl_snapshot(stream->wait_fd);
150 if (ret < 0) {
32af2c95 151 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 152 PERROR("live timer kernel snapshot");
d3e2ba59 153 ret = -1;
c585821b 154 goto end;
d3e2ba59 155 }
528f2ffa
JD
156 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
157 if (ret < 0) {
158 PERROR("kernctl_get_stream_id");
c585821b 159 goto end;
528f2ffa 160 }
d3e2ba59 161 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 162 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 163 if (ret < 0) {
c585821b 164 goto end;
d3e2ba59
JD
165 }
166 }
167 ret = 0;
c585821b 168end:
d3e2ba59
JD
169 return ret;
170}
171
c585821b 172static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 173{
d3e2ba59
JD
174 int ret;
175
d3e2ba59
JD
176 /*
177 * While holding the stream mutex, try to take a snapshot, if it
178 * succeeds, it means that data is ready to be sent, just let the data
179 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
180 * means that there is no data to read after the flush, so we can
181 * safely send the empty index.
c585821b
MD
182 *
183 * Doing a trylock and checking if waiting on metadata if
184 * trylock fails. Bail out of the stream is indeed waiting for
185 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 186 */
c585821b
MD
187 for (;;) {
188 ret = pthread_mutex_trylock(&stream->lock);
189 switch (ret) {
190 case 0:
191 break; /* We have the lock. */
192 case EBUSY:
193 pthread_mutex_lock(&stream->metadata_timer_lock);
194 if (stream->waiting_on_metadata) {
195 ret = 0;
196 stream->missed_metadata_flush = true;
197 pthread_mutex_unlock(&stream->metadata_timer_lock);
198 goto end; /* Bail out. */
199 }
200 pthread_mutex_unlock(&stream->metadata_timer_lock);
201 /* Try again. */
202 caa_cpu_relax();
203 continue;
204 default:
205 ERR("Unexpected pthread_mutex_trylock error %d", ret);
206 ret = -1;
207 goto end;
208 }
209 break;
210 }
211 ret = consumer_flush_kernel_index(stream);
212 pthread_mutex_unlock(&stream->lock);
213end:
214 return ret;
215}
216
217int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
218{
219 uint64_t ts, stream_id;
220 int ret;
221
94d49140
JD
222 ret = cds_lfht_is_node_deleted(&stream->node.node);
223 if (ret) {
c585821b 224 goto end;
94d49140
JD
225 }
226
84a182ce 227 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
228 if (ret < 0) {
229 ERR("Failed to get the current timestamp");
c585821b 230 goto end;
d3e2ba59 231 }
84a182ce
DG
232 lttng_ustconsumer_flush_buffer(stream, 1);
233 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 234 if (ret < 0) {
94d49140 235 if (ret != -EAGAIN) {
d3e2ba59
JD
236 ERR("Taking UST snapshot");
237 ret = -1;
c585821b 238 goto end;
d3e2ba59 239 }
70190e1c 240 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
241 if (ret < 0) {
242 PERROR("ustctl_get_stream_id");
c585821b 243 goto end;
528f2ffa 244 }
d3e2ba59 245 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 246 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 247 if (ret < 0) {
c585821b 248 goto end;
d3e2ba59
JD
249 }
250 }
251 ret = 0;
c585821b
MD
252end:
253 return ret;
254}
d3e2ba59 255
c585821b
MD
256static int check_ust_stream(struct lttng_consumer_stream *stream)
257{
258 int ret;
259
260 assert(stream);
261 assert(stream->ustream);
262 /*
263 * While holding the stream mutex, try to take a snapshot, if it
264 * succeeds, it means that data is ready to be sent, just let the data
265 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
266 * means that there is no data to read after the flush, so we can
267 * safely send the empty index.
268 *
269 * Doing a trylock and checking if waiting on metadata if
270 * trylock fails. Bail out of the stream is indeed waiting for
271 * metadata to be pushed. Busy wait on trylock otherwise.
272 */
273 for (;;) {
274 ret = pthread_mutex_trylock(&stream->lock);
275 switch (ret) {
276 case 0:
277 break; /* We have the lock. */
278 case EBUSY:
279 pthread_mutex_lock(&stream->metadata_timer_lock);
280 if (stream->waiting_on_metadata) {
281 ret = 0;
282 stream->missed_metadata_flush = true;
283 pthread_mutex_unlock(&stream->metadata_timer_lock);
284 goto end; /* Bail out. */
285 }
286 pthread_mutex_unlock(&stream->metadata_timer_lock);
287 /* Try again. */
288 caa_cpu_relax();
289 continue;
290 default:
291 ERR("Unexpected pthread_mutex_trylock error %d", ret);
292 ret = -1;
293 goto end;
294 }
295 break;
296 }
297 ret = consumer_flush_ust_index(stream);
d3e2ba59 298 pthread_mutex_unlock(&stream->lock);
c585821b 299end:
d3e2ba59
JD
300 return ret;
301}
302
303/*
304 * Execute action on a live timer
305 */
306static void live_timer(struct lttng_consumer_local_data *ctx,
307 int sig, siginfo_t *si, void *uc)
308{
309 int ret;
310 struct lttng_consumer_channel *channel;
311 struct lttng_consumer_stream *stream;
312 struct lttng_ht *ht;
313 struct lttng_ht_iter iter;
314
315 channel = si->si_value.sival_ptr;
316 assert(channel);
317
318 if (channel->switch_timer_error) {
319 goto error;
320 }
321 ht = consumer_data.stream_per_chan_id_ht;
322
323 DBG("Live timer for channel %" PRIu64, channel->key);
324
325 rcu_read_lock();
326 switch (ctx->type) {
327 case LTTNG_CONSUMER32_UST:
328 case LTTNG_CONSUMER64_UST:
329 cds_lfht_for_each_entry_duplicate(ht->ht,
330 ht->hash_fct(&channel->key, lttng_ht_seed),
331 ht->match_fct, &channel->key, &iter.iter,
332 stream, node_channel_id.node) {
333 ret = check_ust_stream(stream);
334 if (ret < 0) {
335 goto error_unlock;
336 }
337 }
338 break;
339 case LTTNG_CONSUMER_KERNEL:
340 cds_lfht_for_each_entry_duplicate(ht->ht,
341 ht->hash_fct(&channel->key, lttng_ht_seed),
342 ht->match_fct, &channel->key, &iter.iter,
343 stream, node_channel_id.node) {
344 ret = check_kernel_stream(stream);
345 if (ret < 0) {
346 goto error_unlock;
347 }
348 }
349 break;
350 case LTTNG_CONSUMER_UNKNOWN:
351 assert(0);
352 break;
353 }
354
355error_unlock:
356 rcu_read_unlock();
357
358error:
359 return;
360}
361
2b8f8754
MD
362static
363void consumer_timer_signal_thread_qs(unsigned int signr)
364{
365 sigset_t pending_set;
366 int ret;
367
368 /*
369 * We need to be the only thread interacting with the thread
370 * that manages signals for teardown synchronization.
371 */
372 pthread_mutex_lock(&timer_signal.lock);
373
374 /* Ensure we don't have any signal queued for this channel. */
375 for (;;) {
376 ret = sigemptyset(&pending_set);
377 if (ret == -1) {
378 PERROR("sigemptyset");
379 }
380 ret = sigpending(&pending_set);
381 if (ret == -1) {
382 PERROR("sigpending");
383 }
384 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
385 break;
386 }
387 caa_cpu_relax();
388 }
389
390 /*
391 * From this point, no new signal handler will be fired that would try to
392 * access "chan". However, we still need to wait for any currently
393 * executing handler to complete.
394 */
395 cmm_smp_mb();
396 CMM_STORE_SHARED(timer_signal.qs_done, 0);
397 cmm_smp_mb();
398
399 /*
400 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
401 * up.
402 */
403 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
404
405 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
406 caa_cpu_relax();
407 }
408 cmm_smp_mb();
409
410 pthread_mutex_unlock(&timer_signal.lock);
411}
412
331744e3
JD
413/*
414 * Set the timer for periodical metadata flush.
415 */
416void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
417 unsigned int switch_timer_interval)
418{
419 int ret;
420 struct sigevent sev;
421 struct itimerspec its;
422
423 assert(channel);
424 assert(channel->key);
425
426 if (switch_timer_interval == 0) {
427 return;
428 }
429
430 sev.sigev_notify = SIGEV_SIGNAL;
431 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
432 sev.sigev_value.sival_ptr = channel;
433 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
434 if (ret == -1) {
435 PERROR("timer_create");
436 }
437 channel->switch_timer_enabled = 1;
438
439 its.it_value.tv_sec = switch_timer_interval / 1000000;
69f60d21 440 its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
331744e3
JD
441 its.it_interval.tv_sec = its.it_value.tv_sec;
442 its.it_interval.tv_nsec = its.it_value.tv_nsec;
443
444 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
445 if (ret == -1) {
446 PERROR("timer_settime");
447 }
448}
449
450/*
451 * Stop and delete timer.
452 */
453void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
454{
455 int ret;
331744e3
JD
456
457 assert(channel);
458
459 ret = timer_delete(channel->switch_timer);
460 if (ret == -1) {
461 PERROR("timer_delete");
462 }
463
2b8f8754 464 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 465
2b8f8754
MD
466 channel->switch_timer = 0;
467 channel->switch_timer_enabled = 0;
331744e3
JD
468}
469
d3e2ba59
JD
470/*
471 * Set the timer for the live mode.
472 */
473void consumer_timer_live_start(struct lttng_consumer_channel *channel,
474 int live_timer_interval)
475{
476 int ret;
477 struct sigevent sev;
478 struct itimerspec its;
479
480 assert(channel);
481 assert(channel->key);
482
fac41e72 483 if (live_timer_interval <= 0) {
d3e2ba59
JD
484 return;
485 }
486
487 sev.sigev_notify = SIGEV_SIGNAL;
488 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
489 sev.sigev_value.sival_ptr = channel;
490 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
491 if (ret == -1) {
492 PERROR("timer_create");
493 }
494 channel->live_timer_enabled = 1;
495
496 its.it_value.tv_sec = live_timer_interval / 1000000;
69f60d21 497 its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
d3e2ba59
JD
498 its.it_interval.tv_sec = its.it_value.tv_sec;
499 its.it_interval.tv_nsec = its.it_value.tv_nsec;
500
501 ret = timer_settime(channel->live_timer, 0, &its, NULL);
502 if (ret == -1) {
503 PERROR("timer_settime");
504 }
505}
506
507/*
508 * Stop and delete timer.
509 */
510void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
511{
512 int ret;
513
514 assert(channel);
515
516 ret = timer_delete(channel->live_timer);
517 if (ret == -1) {
518 PERROR("timer_delete");
519 }
520
521 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
522
523 channel->live_timer = 0;
524 channel->live_timer_enabled = 0;
525}
526
331744e3
JD
527/*
528 * Block the RT signals for the entire process. It must be called from the
529 * consumer main before creating the threads
530 */
73664f81 531int consumer_signal_init(void)
331744e3
JD
532{
533 int ret;
534 sigset_t mask;
535
536 /* Block signal for entire process, so only our thread processes it. */
537 setmask(&mask);
538 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
539 if (ret) {
540 errno = ret;
541 PERROR("pthread_sigmask");
73664f81 542 return -1;
331744e3 543 }
73664f81 544 return 0;
331744e3
JD
545}
546
547/*
d3e2ba59
JD
548 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
549 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 550 */
d3e2ba59 551void *consumer_timer_thread(void *data)
331744e3
JD
552{
553 int signr;
554 sigset_t mask;
555 siginfo_t info;
556 struct lttng_consumer_local_data *ctx = data;
557
8a9acb74
MD
558 rcu_register_thread();
559
1fc79fb4
MD
560 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
561
2d57de81
MD
562 if (testpoint(consumerd_thread_metadata_timer)) {
563 goto error_testpoint;
564 }
565
9ce5646a
MD
566 health_code_update();
567
331744e3
JD
568 /* Only self thread will receive signal mask. */
569 setmask(&mask);
570 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
571
572 while (1) {
9ce5646a
MD
573 health_code_update();
574
575 health_poll_entry();
331744e3 576 signr = sigwaitinfo(&mask, &info);
9ce5646a 577 health_poll_exit();
331744e3
JD
578 if (signr == -1) {
579 if (errno != EINTR) {
580 PERROR("sigwaitinfo");
581 }
582 continue;
583 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
584 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
585 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
586 cmm_smp_mb();
587 CMM_STORE_SHARED(timer_signal.qs_done, 1);
588 cmm_smp_mb();
589 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
590 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
591 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
592 } else {
593 ERR("Unexpected signal %d\n", info.si_signo);
594 }
595 }
596
2d57de81
MD
597error_testpoint:
598 /* Only reached in testpoint error */
599 health_error();
1fc79fb4
MD
600 health_unregister(health_consumerd);
601
8a9acb74
MD
602 rcu_unregister_thread();
603
1fc79fb4 604 /* Never return */
331744e3
JD
605 return NULL;
606}
This page took 0.065707 seconds and 4 git commands to generate.