2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
5 * SPDX-License-Identifier: GPL-2.0-only
14 #include <bin/lttng-consumerd/health-consumerd.h>
15 #include <common/common.h>
16 #include <common/compat/endian.h>
17 #include <common/kernel-ctl/kernel-ctl.h>
18 #include <common/kernel-consumer/kernel-consumer.h>
19 #include <common/consumer/consumer-stream.h>
20 #include <common/consumer/consumer-timer.h>
21 #include <common/consumer/consumer-testpoint.h>
22 #include <common/ust-consumer/ust-consumer.h>
24 typedef int (*sample_positions_cb
)(struct lttng_consumer_stream
*stream
);
25 typedef int (*get_consumed_cb
)(struct lttng_consumer_stream
*stream
,
26 unsigned long *consumed
);
27 typedef int (*get_produced_cb
)(struct lttng_consumer_stream
*stream
,
28 unsigned long *produced
);
30 static struct timer_signal_data timer_signal
= {
34 .lock
= PTHREAD_MUTEX_INITIALIZER
,
38 * Set custom signal mask to current thread.
40 static void setmask(sigset_t
*mask
)
44 ret
= sigemptyset(mask
);
46 PERROR("sigemptyset");
48 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_SWITCH
);
50 PERROR("sigaddset switch");
52 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_TEARDOWN
);
54 PERROR("sigaddset teardown");
56 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_LIVE
);
58 PERROR("sigaddset live");
60 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_MONITOR
);
62 PERROR("sigaddset monitor");
64 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_EXIT
);
66 PERROR("sigaddset exit");
70 static int channel_monitor_pipe
= -1;
73 * Execute action on a timer switch.
75 * Beware: metadata_switch_timer() should *never* take a mutex also held
76 * while consumer_timer_switch_stop() is called. It would result in
79 static void metadata_switch_timer(struct lttng_consumer_local_data
*ctx
,
83 struct lttng_consumer_channel
*channel
;
85 channel
= si
->si_value
.sival_ptr
;
88 if (channel
->switch_timer_error
) {
92 DBG("Switch timer for channel %" PRIu64
, channel
->key
);
94 case LTTNG_CONSUMER32_UST
:
95 case LTTNG_CONSUMER64_UST
:
97 * Locks taken by lttng_ustconsumer_request_metadata():
98 * - metadata_socket_lock
99 * - Calling lttng_ustconsumer_recv_metadata():
100 * - channel->metadata_cache->lock
101 * - Calling consumer_metadata_cache_flushed():
102 * - channel->timer_lock
103 * - channel->metadata_cache->lock
105 * Ensure that neither consumer_data.lock nor
106 * channel->lock are taken within this function, since
107 * they are held while consumer_timer_switch_stop() is
110 ret
= lttng_ustconsumer_request_metadata(ctx
, channel
, 1, 1);
112 channel
->switch_timer_error
= 1;
115 case LTTNG_CONSUMER_KERNEL
:
116 case LTTNG_CONSUMER_UNKNOWN
:
122 static int send_empty_index(struct lttng_consumer_stream
*stream
, uint64_t ts
,
126 struct ctf_packet_index index
;
128 memset(&index
, 0, sizeof(index
));
129 index
.stream_id
= htobe64(stream_id
);
130 index
.timestamp_end
= htobe64(ts
);
131 ret
= consumer_stream_write_index(stream
, &index
);
140 int consumer_flush_kernel_index(struct lttng_consumer_stream
*stream
)
142 uint64_t ts
, stream_id
;
145 ret
= kernctl_get_current_timestamp(stream
->wait_fd
, &ts
);
147 ERR("Failed to get the current timestamp");
150 ret
= kernctl_buffer_flush(stream
->wait_fd
);
152 ERR("Failed to flush kernel stream");
155 ret
= kernctl_snapshot(stream
->wait_fd
);
157 if (ret
!= -EAGAIN
&& ret
!= -ENODATA
) {
158 PERROR("live timer kernel snapshot");
162 ret
= kernctl_get_stream_id(stream
->wait_fd
, &stream_id
);
164 PERROR("kernctl_get_stream_id");
167 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
168 ret
= send_empty_index(stream
, ts
, stream_id
);
178 static int check_kernel_stream(struct lttng_consumer_stream
*stream
)
183 * While holding the stream mutex, try to take a snapshot, if it
184 * succeeds, it means that data is ready to be sent, just let the data
185 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
186 * means that there is no data to read after the flush, so we can
187 * safely send the empty index.
189 * Doing a trylock and checking if waiting on metadata if
190 * trylock fails. Bail out of the stream is indeed waiting for
191 * metadata to be pushed. Busy wait on trylock otherwise.
194 ret
= pthread_mutex_trylock(&stream
->lock
);
197 break; /* We have the lock. */
199 pthread_mutex_lock(&stream
->metadata_timer_lock
);
200 if (stream
->waiting_on_metadata
) {
202 stream
->missed_metadata_flush
= true;
203 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
204 goto end
; /* Bail out. */
206 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
211 ERR("Unexpected pthread_mutex_trylock error %d", ret
);
217 ret
= consumer_flush_kernel_index(stream
);
218 pthread_mutex_unlock(&stream
->lock
);
223 int consumer_flush_ust_index(struct lttng_consumer_stream
*stream
)
225 uint64_t ts
, stream_id
;
228 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
233 ret
= lttng_ustconsumer_get_current_timestamp(stream
, &ts
);
235 ERR("Failed to get the current timestamp");
238 lttng_ustconsumer_flush_buffer(stream
, 1);
239 ret
= lttng_ustconsumer_take_snapshot(stream
);
241 if (ret
!= -EAGAIN
) {
242 ERR("Taking UST snapshot");
246 ret
= lttng_ustconsumer_get_stream_id(stream
, &stream_id
);
248 PERROR("ustctl_get_stream_id");
251 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
252 ret
= send_empty_index(stream
, ts
, stream_id
);
262 static int check_ust_stream(struct lttng_consumer_stream
*stream
)
267 assert(stream
->ustream
);
269 * While holding the stream mutex, try to take a snapshot, if it
270 * succeeds, it means that data is ready to be sent, just let the data
271 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
272 * means that there is no data to read after the flush, so we can
273 * safely send the empty index.
275 * Doing a trylock and checking if waiting on metadata if
276 * trylock fails. Bail out of the stream is indeed waiting for
277 * metadata to be pushed. Busy wait on trylock otherwise.
280 ret
= pthread_mutex_trylock(&stream
->lock
);
283 break; /* We have the lock. */
285 pthread_mutex_lock(&stream
->metadata_timer_lock
);
286 if (stream
->waiting_on_metadata
) {
288 stream
->missed_metadata_flush
= true;
289 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
290 goto end
; /* Bail out. */
292 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
297 ERR("Unexpected pthread_mutex_trylock error %d", ret
);
303 ret
= consumer_flush_ust_index(stream
);
304 pthread_mutex_unlock(&stream
->lock
);
310 * Execute action on a live timer
312 static void live_timer(struct lttng_consumer_local_data
*ctx
,
316 struct lttng_consumer_channel
*channel
;
317 struct lttng_consumer_stream
*stream
;
319 struct lttng_ht_iter iter
;
321 channel
= si
->si_value
.sival_ptr
;
324 if (channel
->switch_timer_error
) {
327 ht
= consumer_data
.stream_per_chan_id_ht
;
329 DBG("Live timer for channel %" PRIu64
, channel
->key
);
333 case LTTNG_CONSUMER32_UST
:
334 case LTTNG_CONSUMER64_UST
:
335 cds_lfht_for_each_entry_duplicate(ht
->ht
,
336 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
337 ht
->match_fct
, &channel
->key
, &iter
.iter
,
338 stream
, node_channel_id
.node
) {
339 ret
= check_ust_stream(stream
);
345 case LTTNG_CONSUMER_KERNEL
:
346 cds_lfht_for_each_entry_duplicate(ht
->ht
,
347 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
348 ht
->match_fct
, &channel
->key
, &iter
.iter
,
349 stream
, node_channel_id
.node
) {
350 ret
= check_kernel_stream(stream
);
356 case LTTNG_CONSUMER_UNKNOWN
:
369 void consumer_timer_signal_thread_qs(unsigned int signr
)
371 sigset_t pending_set
;
375 * We need to be the only thread interacting with the thread
376 * that manages signals for teardown synchronization.
378 pthread_mutex_lock(&timer_signal
.lock
);
380 /* Ensure we don't have any signal queued for this channel. */
382 ret
= sigemptyset(&pending_set
);
384 PERROR("sigemptyset");
386 ret
= sigpending(&pending_set
);
388 PERROR("sigpending");
390 if (!sigismember(&pending_set
, signr
)) {
397 * From this point, no new signal handler will be fired that would try to
398 * access "chan". However, we still need to wait for any currently
399 * executing handler to complete.
402 CMM_STORE_SHARED(timer_signal
.qs_done
, 0);
406 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
409 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN
);
411 while (!CMM_LOAD_SHARED(timer_signal
.qs_done
)) {
416 pthread_mutex_unlock(&timer_signal
.lock
);
420 * Start a timer channel timer which will fire at a given interval
421 * (timer_interval_us)and fire a given signal (signal).
423 * Returns a negative value on error, 0 if a timer was created, and
424 * a positive value if no timer was created (not an error).
427 int consumer_channel_timer_start(timer_t
*timer_id
,
428 struct lttng_consumer_channel
*channel
,
429 unsigned int timer_interval_us
, int signal
)
431 int ret
= 0, delete_ret
;
433 struct itimerspec its
;
436 assert(channel
->key
);
438 if (timer_interval_us
== 0) {
439 /* No creation needed; not an error. */
444 sev
.sigev_notify
= SIGEV_SIGNAL
;
445 sev
.sigev_signo
= signal
;
446 sev
.sigev_value
.sival_ptr
= channel
;
447 ret
= timer_create(CLOCKID
, &sev
, timer_id
);
449 PERROR("timer_create");
453 its
.it_value
.tv_sec
= timer_interval_us
/ 1000000;
454 its
.it_value
.tv_nsec
= (timer_interval_us
% 1000000) * 1000;
455 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
456 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
458 ret
= timer_settime(*timer_id
, 0, &its
, NULL
);
460 PERROR("timer_settime");
461 goto error_destroy_timer
;
466 delete_ret
= timer_delete(*timer_id
);
467 if (delete_ret
== -1) {
468 PERROR("timer_delete");
474 int consumer_channel_timer_stop(timer_t
*timer_id
, int signal
)
478 ret
= timer_delete(*timer_id
);
480 PERROR("timer_delete");
484 consumer_timer_signal_thread_qs(signal
);
491 * Set the channel's switch timer.
493 void consumer_timer_switch_start(struct lttng_consumer_channel
*channel
,
494 unsigned int switch_timer_interval_us
)
499 assert(channel
->key
);
501 ret
= consumer_channel_timer_start(&channel
->switch_timer
, channel
,
502 switch_timer_interval_us
, LTTNG_CONSUMER_SIG_SWITCH
);
504 channel
->switch_timer_enabled
= !!(ret
== 0);
508 * Stop and delete the channel's switch timer.
510 void consumer_timer_switch_stop(struct lttng_consumer_channel
*channel
)
516 ret
= consumer_channel_timer_stop(&channel
->switch_timer
,
517 LTTNG_CONSUMER_SIG_SWITCH
);
519 ERR("Failed to stop switch timer");
522 channel
->switch_timer_enabled
= 0;
526 * Set the channel's live timer.
528 void consumer_timer_live_start(struct lttng_consumer_channel
*channel
,
529 unsigned int live_timer_interval_us
)
534 assert(channel
->key
);
536 ret
= consumer_channel_timer_start(&channel
->live_timer
, channel
,
537 live_timer_interval_us
, LTTNG_CONSUMER_SIG_LIVE
);
539 channel
->live_timer_enabled
= !!(ret
== 0);
543 * Stop and delete the channel's live timer.
545 void consumer_timer_live_stop(struct lttng_consumer_channel
*channel
)
551 ret
= consumer_channel_timer_stop(&channel
->live_timer
,
552 LTTNG_CONSUMER_SIG_LIVE
);
554 ERR("Failed to stop live timer");
557 channel
->live_timer_enabled
= 0;
561 * Set the channel's monitoring timer.
563 * Returns a negative value on error, 0 if a timer was created, and
564 * a positive value if no timer was created (not an error).
566 int consumer_timer_monitor_start(struct lttng_consumer_channel
*channel
,
567 unsigned int monitor_timer_interval_us
)
572 assert(channel
->key
);
573 assert(!channel
->monitor_timer_enabled
);
575 ret
= consumer_channel_timer_start(&channel
->monitor_timer
, channel
,
576 monitor_timer_interval_us
, LTTNG_CONSUMER_SIG_MONITOR
);
577 channel
->monitor_timer_enabled
= !!(ret
== 0);
582 * Stop and delete the channel's monitoring timer.
584 int consumer_timer_monitor_stop(struct lttng_consumer_channel
*channel
)
589 assert(channel
->monitor_timer_enabled
);
591 ret
= consumer_channel_timer_stop(&channel
->monitor_timer
,
592 LTTNG_CONSUMER_SIG_MONITOR
);
594 ERR("Failed to stop live timer");
598 channel
->monitor_timer_enabled
= 0;
604 * Block the RT signals for the entire process. It must be called from the
605 * consumer main before creating the threads
607 int consumer_signal_init(void)
612 /* Block signal for entire process, so only our thread processes it. */
614 ret
= pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
617 PERROR("pthread_sigmask");
624 int sample_channel_positions(struct lttng_consumer_channel
*channel
,
625 uint64_t *_highest_use
, uint64_t *_lowest_use
, uint64_t *_total_consumed
,
626 sample_positions_cb sample
, get_consumed_cb get_consumed
,
627 get_produced_cb get_produced
)
630 struct lttng_ht_iter iter
;
631 struct lttng_consumer_stream
*stream
;
632 bool empty_channel
= true;
633 uint64_t high
= 0, low
= UINT64_MAX
;
634 struct lttng_ht
*ht
= consumer_data
.stream_per_chan_id_ht
;
636 *_total_consumed
= 0;
640 cds_lfht_for_each_entry_duplicate(ht
->ht
,
641 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
642 ht
->match_fct
, &channel
->key
,
643 &iter
.iter
, stream
, node_channel_id
.node
) {
644 unsigned long produced
, consumed
, usage
;
646 empty_channel
= false;
648 pthread_mutex_lock(&stream
->lock
);
649 if (cds_lfht_is_node_deleted(&stream
->node
.node
)) {
653 ret
= sample(stream
);
655 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret
);
656 pthread_mutex_unlock(&stream
->lock
);
659 ret
= get_consumed(stream
, &consumed
);
661 ERR("Failed to get buffer consumed position in monitor timer");
662 pthread_mutex_unlock(&stream
->lock
);
665 ret
= get_produced(stream
, &produced
);
667 ERR("Failed to get buffer produced position in monitor timer");
668 pthread_mutex_unlock(&stream
->lock
);
672 usage
= produced
- consumed
;
673 high
= (usage
> high
) ? usage
: high
;
674 low
= (usage
< low
) ? usage
: low
;
677 * We don't use consumed here for 2 reasons:
678 * - output_written takes into account the padding written in the
679 * tracefiles when we stop the session;
680 * - the consumed position is not the accurate representation of what
681 * was extracted from a buffer in overwrite mode.
683 *_total_consumed
+= stream
->output_written
;
685 pthread_mutex_unlock(&stream
->lock
);
688 *_highest_use
= high
;
699 * Execute action on a monitor timer.
702 void monitor_timer(struct lttng_consumer_channel
*channel
)
705 int channel_monitor_pipe
=
706 consumer_timer_thread_get_channel_monitor_pipe();
707 struct lttcomm_consumer_channel_monitor_msg msg
= {
710 sample_positions_cb sample
;
711 get_consumed_cb get_consumed
;
712 get_produced_cb get_produced
;
713 uint64_t lowest
= 0, highest
= 0, total_consumed
= 0;
717 if (channel_monitor_pipe
< 0) {
721 switch (consumer_data
.type
) {
722 case LTTNG_CONSUMER_KERNEL
:
723 sample
= lttng_kconsumer_sample_snapshot_positions
;
724 get_consumed
= lttng_kconsumer_get_consumed_snapshot
;
725 get_produced
= lttng_kconsumer_get_produced_snapshot
;
727 case LTTNG_CONSUMER32_UST
:
728 case LTTNG_CONSUMER64_UST
:
729 sample
= lttng_ustconsumer_sample_snapshot_positions
;
730 get_consumed
= lttng_ustconsumer_get_consumed_snapshot
;
731 get_produced
= lttng_ustconsumer_get_produced_snapshot
;
737 ret
= sample_channel_positions(channel
, &highest
, &lowest
,
738 &total_consumed
, sample
, get_consumed
, get_produced
);
742 msg
.highest
= highest
;
744 msg
.total_consumed
= total_consumed
;
747 * Writes performed here are assumed to be atomic which is only
748 * guaranteed for sizes < than PIPE_BUF.
750 assert(sizeof(msg
) <= PIPE_BUF
);
753 ret
= write(channel_monitor_pipe
, &msg
, sizeof(msg
));
754 } while (ret
== -1 && errno
== EINTR
);
756 if (errno
== EAGAIN
) {
757 /* Not an error, the sample is merely dropped. */
758 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64
,
761 PERROR("write to the channel monitor pipe");
764 DBG("Sent channel monitoring sample for channel key %" PRIu64
765 ", (highest = %" PRIu64
", lowest = %"PRIu64
")",
766 channel
->key
, msg
.highest
, msg
.lowest
);
770 int consumer_timer_thread_get_channel_monitor_pipe(void)
772 return uatomic_read(&channel_monitor_pipe
);
775 int consumer_timer_thread_set_channel_monitor_pipe(int fd
)
779 ret
= uatomic_cmpxchg(&channel_monitor_pipe
, -1, fd
);
790 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
791 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
792 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
794 void *consumer_timer_thread(void *data
)
799 struct lttng_consumer_local_data
*ctx
= data
;
801 rcu_register_thread();
803 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_METADATA_TIMER
);
805 if (testpoint(consumerd_thread_metadata_timer
)) {
806 goto error_testpoint
;
809 health_code_update();
811 /* Only self thread will receive signal mask. */
813 CMM_STORE_SHARED(timer_signal
.tid
, pthread_self());
816 health_code_update();
819 signr
= sigwaitinfo(&mask
, &info
);
823 * NOTE: cascading conditions are used instead of a switch case
824 * since the use of SIGRTMIN in the definition of the signals'
825 * values prevents the reduction to an integer constant.
828 if (errno
!= EINTR
) {
829 PERROR("sigwaitinfo");
832 } else if (signr
== LTTNG_CONSUMER_SIG_SWITCH
) {
833 metadata_switch_timer(ctx
, &info
);
834 } else if (signr
== LTTNG_CONSUMER_SIG_TEARDOWN
) {
836 CMM_STORE_SHARED(timer_signal
.qs_done
, 1);
838 DBG("Signal timer metadata thread teardown");
839 } else if (signr
== LTTNG_CONSUMER_SIG_LIVE
) {
840 live_timer(ctx
, &info
);
841 } else if (signr
== LTTNG_CONSUMER_SIG_MONITOR
) {
842 struct lttng_consumer_channel
*channel
;
844 channel
= info
.si_value
.sival_ptr
;
845 monitor_timer(channel
);
846 } else if (signr
== LTTNG_CONSUMER_SIG_EXIT
) {
847 assert(CMM_LOAD_SHARED(consumer_quit
));
850 ERR("Unexpected signal %d\n", info
.si_signo
);
855 /* Only reached in testpoint error */
858 health_unregister(health_consumerd
);
859 rcu_unregister_thread();