Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / common / consumer / consumer-timer.cpp
... / ...
CommitLineData
1/*
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9#define _LGPL_SOURCE
10#include <common/common.hpp>
11#include <common/compat/endian.hpp>
12#include <common/consumer/consumer-stream.hpp>
13#include <common/consumer/consumer-testpoint.hpp>
14#include <common/consumer/consumer-timer.hpp>
15#include <common/kernel-consumer/kernel-consumer.hpp>
16#include <common/kernel-ctl/kernel-ctl.hpp>
17#include <common/urcu.hpp>
18#include <common/ust-consumer/ust-consumer.hpp>
19
20#include <bin/lttng-consumerd/health-consumerd.hpp>
21#include <inttypes.h>
22#include <signal.h>
23
24using sample_positions_cb = int (*)(struct lttng_consumer_stream *);
25using get_consumed_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
26using get_produced_cb = int (*)(struct lttng_consumer_stream *, unsigned long *);
27using flush_index_cb = int (*)(struct lttng_consumer_stream *);
28
29static struct timer_signal_data timer_signal = {
30 .tid = 0,
31 .setup_done = 0,
32 .qs_done = 0,
33 .lock = PTHREAD_MUTEX_INITIALIZER,
34};
35
36/*
37 * Set custom signal mask to current thread.
38 */
39static void setmask(sigset_t *mask)
40{
41 int ret;
42
43 ret = sigemptyset(mask);
44 if (ret) {
45 PERROR("sigemptyset");
46 }
47 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
48 if (ret) {
49 PERROR("sigaddset switch");
50 }
51 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
52 if (ret) {
53 PERROR("sigaddset teardown");
54 }
55 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
56 if (ret) {
57 PERROR("sigaddset live");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
60 if (ret) {
61 PERROR("sigaddset monitor");
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
64 if (ret) {
65 PERROR("sigaddset exit");
66 }
67}
68
69static int the_channel_monitor_pipe = -1;
70
71/*
72 * Execute action on a timer switch.
73 *
74 * Beware: metadata_switch_timer() should *never* take a mutex also held
75 * while consumer_timer_switch_stop() is called. It would result in
76 * deadlocks.
77 */
78static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
79{
80 int ret;
81 struct lttng_consumer_channel *channel;
82
83 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
84 LTTNG_ASSERT(channel);
85 LTTNG_ASSERT(!channel->is_deleted);
86
87 if (channel->switch_timer_error) {
88 return;
89 }
90
91 DBG("Switch timer for channel %" PRIu64, channel->key);
92 switch (ctx->type) {
93 case LTTNG_CONSUMER32_UST:
94 case LTTNG_CONSUMER64_UST:
95 /*
96 * Locks taken by lttng_ustconsumer_request_metadata():
97 * - metadata_socket_lock
98 * - Calling lttng_ustconsumer_recv_metadata():
99 * - channel->metadata_cache->lock
100 * - Calling consumer_wait_metadata_cache_flushed():
101 * - channel->timer_lock
102 * - channel->metadata_cache->lock
103 *
104 * Ensure that neither consumer_data.lock nor
105 * channel->lock are taken within this function, since
106 * they are held while consumer_timer_switch_stop() is
107 * called.
108 */
109 ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1);
110 if (ret < 0) {
111 channel->switch_timer_error = 1;
112 }
113 break;
114 case LTTNG_CONSUMER_KERNEL:
115 case LTTNG_CONSUMER_UNKNOWN:
116 abort();
117 break;
118 }
119}
120
121static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, uint64_t stream_id)
122{
123 int ret;
124 struct ctf_packet_index index;
125
126 memset(&index, 0, sizeof(index));
127 index.stream_id = htobe64(stream_id);
128 index.timestamp_end = htobe64(ts);
129 ret = consumer_stream_write_index(stream, &index);
130 if (ret < 0) {
131 goto error;
132 }
133
134error:
135 return ret;
136}
137
138int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
139{
140 uint64_t ts, stream_id;
141 int ret;
142
143 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
144 if (ret < 0) {
145 ERR("Failed to get the current timestamp");
146 goto end;
147 }
148 ret = kernctl_buffer_flush(stream->wait_fd);
149 if (ret < 0) {
150 ERR("Failed to flush kernel stream");
151 goto end;
152 }
153 ret = kernctl_snapshot(stream->wait_fd);
154 if (ret < 0) {
155 if (ret != -EAGAIN && ret != -ENODATA) {
156 PERROR("live timer kernel snapshot");
157 ret = -1;
158 goto end;
159 }
160 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
161 if (ret < 0) {
162 PERROR("kernctl_get_stream_id");
163 goto end;
164 }
165 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
166 ret = send_empty_index(stream, ts, stream_id);
167 if (ret < 0) {
168 goto end;
169 }
170 }
171 ret = 0;
172end:
173 return ret;
174}
175
176static int check_stream(struct lttng_consumer_stream *stream, flush_index_cb flush_index)
177{
178 int ret;
179
180 /*
181 * While holding the stream mutex, try to take a snapshot, if it
182 * succeeds, it means that data is ready to be sent, just let the data
183 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
184 * means that there is no data to read after the flush, so we can
185 * safely send the empty index.
186 *
187 * Doing a trylock and checking if waiting on metadata if
188 * trylock fails. Bail out of the stream is indeed waiting for
189 * metadata to be pushed. Busy wait on trylock otherwise.
190 */
191 for (;;) {
192 ret = pthread_mutex_trylock(&stream->lock);
193 switch (ret) {
194 case 0:
195 break; /* We have the lock. */
196 case EBUSY:
197 pthread_mutex_lock(&stream->metadata_timer_lock);
198 if (stream->waiting_on_metadata) {
199 ret = 0;
200 stream->missed_metadata_flush = true;
201 pthread_mutex_unlock(&stream->metadata_timer_lock);
202 goto end; /* Bail out. */
203 }
204 pthread_mutex_unlock(&stream->metadata_timer_lock);
205 /* Try again. */
206 caa_cpu_relax();
207 continue;
208 default:
209 ERR("Unexpected pthread_mutex_trylock error %d", ret);
210 ret = -1;
211 goto end;
212 }
213 break;
214 }
215 ret = flush_index(stream);
216 pthread_mutex_unlock(&stream->lock);
217end:
218 return ret;
219}
220
221int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
222{
223 uint64_t ts, stream_id;
224 int ret;
225
226 ret = cds_lfht_is_node_deleted(&stream->node.node);
227 if (ret) {
228 goto end;
229 }
230
231 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
232 if (ret < 0) {
233 ERR("Failed to get the current timestamp");
234 goto end;
235 }
236 ret = lttng_ustconsumer_flush_buffer(stream, 1);
237 if (ret < 0) {
238 ERR("Failed to flush buffer while flushing index");
239 goto end;
240 }
241 ret = lttng_ustconsumer_take_snapshot(stream);
242 if (ret < 0) {
243 if (ret != -EAGAIN) {
244 ERR("Taking UST snapshot");
245 ret = -1;
246 goto end;
247 }
248 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
249 if (ret < 0) {
250 PERROR("lttng_ust_ctl_get_stream_id");
251 goto end;
252 }
253 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
254 ret = send_empty_index(stream, ts, stream_id);
255 if (ret < 0) {
256 goto end;
257 }
258 }
259 ret = 0;
260end:
261 return ret;
262}
263
264/*
265 * Execute action on a live timer
266 */
267static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
268{
269 int ret;
270 const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
271 const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ?
272 consumer_flush_kernel_index :
273 consumer_flush_ust_index;
274
275 auto *channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
276 LTTNG_ASSERT(channel);
277 LTTNG_ASSERT(!channel->is_deleted);
278
279 if (channel->switch_timer_error) {
280 return;
281 }
282
283 DBG("Live timer for channel %" PRIu64, channel->key);
284
285 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
286 lttng_consumer_stream,
287 decltype(lttng_consumer_stream::node_channel_id),
288 &lttng_consumer_stream::node_channel_id,
289 std::uint64_t>(*ht->ht,
290 &channel->key,
291 ht->hash_fct(&channel->key, lttng_ht_seed),
292 ht->match_fct)) {
293 ret = check_stream(stream, flush_index);
294 if (ret < 0) {
295 return;
296 }
297 }
298}
299
300static void consumer_timer_signal_thread_qs(unsigned int signr)
301{
302 sigset_t pending_set;
303 int ret;
304
305 /*
306 * We need to be the only thread interacting with the thread
307 * that manages signals for teardown synchronization.
308 */
309 pthread_mutex_lock(&timer_signal.lock);
310
311 /* Ensure we don't have any signal queued for this channel. */
312 for (;;) {
313 ret = sigemptyset(&pending_set);
314 if (ret == -1) {
315 PERROR("sigemptyset");
316 }
317 ret = sigpending(&pending_set);
318 if (ret == -1) {
319 PERROR("sigpending");
320 }
321 if (!sigismember(&pending_set, signr)) {
322 break;
323 }
324 caa_cpu_relax();
325 }
326
327 /*
328 * From this point, no new signal handler will be fired that would try to
329 * access "chan". However, we still need to wait for any currently
330 * executing handler to complete.
331 */
332 cmm_smp_mb();
333 CMM_STORE_SHARED(timer_signal.qs_done, 0);
334 cmm_smp_mb();
335
336 /*
337 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
338 * up.
339 */
340 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
341
342 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
343 caa_cpu_relax();
344 }
345 cmm_smp_mb();
346
347 pthread_mutex_unlock(&timer_signal.lock);
348}
349
350/*
351 * Start a timer channel timer which will fire at a given interval
352 * (timer_interval_us)and fire a given signal (signal).
353 *
354 * Returns a negative value on error, 0 if a timer was created, and
355 * a positive value if no timer was created (not an error).
356 */
357static int consumer_channel_timer_start(timer_t *timer_id,
358 struct lttng_consumer_channel *channel,
359 unsigned int timer_interval_us,
360 int signal)
361{
362 int ret = 0, delete_ret;
363 struct sigevent sev = {};
364 struct itimerspec its;
365
366 LTTNG_ASSERT(channel);
367 LTTNG_ASSERT(channel->key);
368 LTTNG_ASSERT(!channel->is_deleted);
369
370 if (timer_interval_us == 0) {
371 /* No creation needed; not an error. */
372 ret = 1;
373 goto end;
374 }
375
376 sev.sigev_notify = SIGEV_SIGNAL;
377 sev.sigev_signo = signal;
378 sev.sigev_value.sival_ptr = channel;
379 ret = timer_create(CLOCKID, &sev, timer_id);
380 if (ret == -1) {
381 PERROR("timer_create");
382 goto end;
383 }
384
385 its.it_value.tv_sec = timer_interval_us / 1000000;
386 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
387 its.it_interval.tv_sec = its.it_value.tv_sec;
388 its.it_interval.tv_nsec = its.it_value.tv_nsec;
389
390 ret = timer_settime(*timer_id, 0, &its, nullptr);
391 if (ret == -1) {
392 PERROR("timer_settime");
393 goto error_destroy_timer;
394 }
395end:
396 return ret;
397error_destroy_timer:
398 delete_ret = timer_delete(*timer_id);
399 if (delete_ret == -1) {
400 PERROR("timer_delete");
401 }
402 goto end;
403}
404
405static int consumer_channel_timer_stop(timer_t *timer_id, int signal)
406{
407 int ret = 0;
408
409 ret = timer_delete(*timer_id);
410 if (ret == -1) {
411 PERROR("timer_delete");
412 goto end;
413 }
414
415 consumer_timer_signal_thread_qs(signal);
416 *timer_id = nullptr;
417end:
418 return ret;
419}
420
421/*
422 * Set the channel's switch timer.
423 */
424void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
425 unsigned int switch_timer_interval_us)
426{
427 int ret;
428
429 LTTNG_ASSERT(channel);
430 LTTNG_ASSERT(channel->key);
431 LTTNG_ASSERT(!channel->is_deleted);
432
433 ret = consumer_channel_timer_start(&channel->switch_timer,
434 channel,
435 switch_timer_interval_us,
436 LTTNG_CONSUMER_SIG_SWITCH);
437
438 channel->switch_timer_enabled = !!(ret == 0);
439}
440
441/*
442 * Stop and delete the channel's switch timer.
443 */
444void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
445{
446 int ret;
447
448 LTTNG_ASSERT(channel);
449
450 ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH);
451 if (ret == -1) {
452 ERR("Failed to stop switch timer");
453 }
454
455 channel->switch_timer_enabled = 0;
456}
457
458/*
459 * Set the channel's live timer.
460 */
461void consumer_timer_live_start(struct lttng_consumer_channel *channel,
462 unsigned int live_timer_interval_us)
463{
464 int ret;
465
466 LTTNG_ASSERT(channel);
467 LTTNG_ASSERT(channel->key);
468 LTTNG_ASSERT(!channel->is_deleted);
469
470 ret = consumer_channel_timer_start(
471 &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
472
473 channel->live_timer_enabled = !!(ret == 0);
474}
475
476/*
477 * Stop and delete the channel's live timer.
478 */
479void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
480{
481 int ret;
482
483 LTTNG_ASSERT(channel);
484
485 ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE);
486 if (ret == -1) {
487 ERR("Failed to stop live timer");
488 }
489
490 channel->live_timer_enabled = 0;
491}
492
493/*
494 * Set the channel's monitoring timer.
495 *
496 * Returns a negative value on error, 0 if a timer was created, and
497 * a positive value if no timer was created (not an error).
498 */
499int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
500 unsigned int monitor_timer_interval_us)
501{
502 int ret;
503
504 LTTNG_ASSERT(channel);
505 LTTNG_ASSERT(channel->key);
506 LTTNG_ASSERT(!channel->is_deleted);
507 LTTNG_ASSERT(!channel->monitor_timer_enabled);
508
509 ret = consumer_channel_timer_start(&channel->monitor_timer,
510 channel,
511 monitor_timer_interval_us,
512 LTTNG_CONSUMER_SIG_MONITOR);
513 channel->monitor_timer_enabled = !!(ret == 0);
514 return ret;
515}
516
517/*
518 * Stop and delete the channel's monitoring timer.
519 */
520int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
521{
522 int ret;
523
524 LTTNG_ASSERT(channel);
525 LTTNG_ASSERT(channel->monitor_timer_enabled);
526
527 ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR);
528 if (ret == -1) {
529 ERR("Failed to stop monitor timer");
530 goto end;
531 }
532
533 channel->monitor_timer_enabled = 0;
534end:
535 return ret;
536}
537
538/*
539 * Block the RT signals for the entire process. It must be called from the
540 * consumer main before creating the threads
541 */
542int consumer_signal_init()
543{
544 int ret;
545 sigset_t mask;
546
547 /* Block signal for entire process, so only our thread processes it. */
548 setmask(&mask);
549 ret = pthread_sigmask(SIG_BLOCK, &mask, nullptr);
550 if (ret) {
551 errno = ret;
552 PERROR("pthread_sigmask");
553 return -1;
554 }
555 return 0;
556}
557
558static int sample_channel_positions(struct lttng_consumer_channel *channel,
559 uint64_t *_highest_use,
560 uint64_t *_lowest_use,
561 uint64_t *_total_consumed,
562 sample_positions_cb sample,
563 get_consumed_cb get_consumed,
564 get_produced_cb get_produced)
565{
566 int ret = 0;
567 bool empty_channel = true;
568 uint64_t high = 0, low = UINT64_MAX;
569 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
570
571 *_total_consumed = 0;
572
573 for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
574 lttng_consumer_stream,
575 decltype(lttng_consumer_stream::node_channel_id),
576 &lttng_consumer_stream::node_channel_id,
577 std::uint64_t>(*ht->ht,
578 &channel->key,
579 ht->hash_fct(&channel->key, lttng_ht_seed),
580 ht->match_fct)) {
581 unsigned long produced, consumed, usage;
582
583 empty_channel = false;
584
585 pthread_mutex_lock(&stream->lock);
586 if (cds_lfht_is_node_deleted(&stream->node.node)) {
587 goto next;
588 }
589
590 ret = sample(stream);
591 if (ret) {
592 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)",
593 ret);
594 pthread_mutex_unlock(&stream->lock);
595 goto end;
596 }
597 ret = get_consumed(stream, &consumed);
598 if (ret) {
599 ERR("Failed to get buffer consumed position in monitor timer");
600 pthread_mutex_unlock(&stream->lock);
601 goto end;
602 }
603 ret = get_produced(stream, &produced);
604 if (ret) {
605 ERR("Failed to get buffer produced position in monitor timer");
606 pthread_mutex_unlock(&stream->lock);
607 goto end;
608 }
609
610 usage = produced - consumed;
611 high = (usage > high) ? usage : high;
612 low = (usage < low) ? usage : low;
613
614 /*
615 * We don't use consumed here for 2 reasons:
616 * - output_written takes into account the padding written in the
617 * tracefiles when we stop the session;
618 * - the consumed position is not the accurate representation of what
619 * was extracted from a buffer in overwrite mode.
620 */
621 *_total_consumed += stream->output_written;
622 next:
623 pthread_mutex_unlock(&stream->lock);
624 }
625
626 *_highest_use = high;
627 *_lowest_use = low;
628end:
629 if (empty_channel) {
630 ret = -1;
631 }
632
633 return ret;
634}
635
636/* Sample and send channel buffering statistics to the session daemon. */
637void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel)
638{
639 int ret;
640 const int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe();
641 struct lttcomm_consumer_channel_monitor_msg msg = {
642 .key = channel->key,
643 .session_id = channel->session_id,
644 .lowest = 0,
645 .highest = 0,
646 .consumed_since_last_sample = 0,
647 };
648 sample_positions_cb sample;
649 get_consumed_cb get_consumed;
650 get_produced_cb get_produced;
651 uint64_t lowest = 0, highest = 0, total_consumed = 0;
652
653 LTTNG_ASSERT(channel);
654
655 if (channel_monitor_pipe < 0) {
656 return;
657 }
658
659 switch (the_consumer_data.type) {
660 case LTTNG_CONSUMER_KERNEL:
661 sample = lttng_kconsumer_sample_snapshot_positions;
662 get_consumed = lttng_kconsumer_get_consumed_snapshot;
663 get_produced = lttng_kconsumer_get_produced_snapshot;
664 break;
665 case LTTNG_CONSUMER32_UST:
666 case LTTNG_CONSUMER64_UST:
667 sample = lttng_ustconsumer_sample_snapshot_positions;
668 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
669 get_produced = lttng_ustconsumer_get_produced_snapshot;
670 break;
671 default:
672 abort();
673 }
674
675 ret = sample_channel_positions(
676 channel, &highest, &lowest, &total_consumed, sample, get_consumed, get_produced);
677 if (ret) {
678 return;
679 }
680
681 msg.highest = highest;
682 msg.lowest = lowest;
683 msg.consumed_since_last_sample =
684 total_consumed - channel->consumed_size_as_of_last_sample_sent;
685
686 /*
687 * Writes performed here are assumed to be atomic which is only
688 * guaranteed for sizes < than PIPE_BUF.
689 */
690 LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF);
691
692 do {
693 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
694 } while (ret == -1 && errno == EINTR);
695 if (ret == -1) {
696 if (errno == EAGAIN) {
697 /* Not an error, the sample is merely dropped. */
698 DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
699 channel->key);
700 } else {
701 PERROR("write to the channel monitor pipe");
702 }
703 } else {
704 DBG("Sent channel monitoring sample for channel key %" PRIu64
705 ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
706 channel->key,
707 msg.highest,
708 msg.lowest);
709 channel->consumed_size_as_of_last_sample_sent = total_consumed;
710 }
711}
712
713int consumer_timer_thread_get_channel_monitor_pipe()
714{
715 return uatomic_read(&the_channel_monitor_pipe);
716}
717
718int consumer_timer_thread_set_channel_monitor_pipe(int fd)
719{
720 int ret;
721
722 ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
723 if (ret != -1) {
724 ret = -1;
725 goto end;
726 }
727 ret = 0;
728end:
729 return ret;
730}
731
732/*
733 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
734 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
735 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
736 */
737void *consumer_timer_thread(void *data)
738{
739 int signr;
740 sigset_t mask;
741 siginfo_t info;
742 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
743
744 rcu_register_thread();
745
746 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
747
748 if (testpoint(consumerd_thread_metadata_timer)) {
749 goto error_testpoint;
750 }
751
752 health_code_update();
753
754 /* Only self thread will receive signal mask. */
755 setmask(&mask);
756 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
757
758 while (true) {
759 health_code_update();
760
761 health_poll_entry();
762 signr = sigwaitinfo(&mask, &info);
763 health_poll_exit();
764
765 /*
766 * NOTE: cascading conditions are used instead of a switch case
767 * since the use of SIGRTMIN in the definition of the signals'
768 * values prevents the reduction to an integer constant.
769 */
770 if (signr == -1) {
771 if (errno != EINTR) {
772 PERROR("sigwaitinfo");
773 }
774 continue;
775 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
776 metadata_switch_timer(ctx, &info);
777 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
778 cmm_smp_mb();
779 CMM_STORE_SHARED(timer_signal.qs_done, 1);
780 cmm_smp_mb();
781 DBG("Signal timer metadata thread teardown");
782 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
783 live_timer(ctx, &info);
784 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
785 struct lttng_consumer_channel *channel;
786
787 channel = (lttng_consumer_channel *) info.si_value.sival_ptr;
788 sample_and_send_channel_buffer_stats(channel);
789 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
790 LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
791 goto end;
792 } else {
793 ERR("Unexpected signal %d\n", info.si_signo);
794 }
795 }
796
797error_testpoint:
798 /* Only reached in testpoint error */
799 health_error();
800end:
801 health_unregister(health_consumerd);
802 rcu_unregister_thread();
803 return nullptr;
804}
This page took 0.026393 seconds and 5 git commands to generate.