2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer/consumer-stream.h>
30 #include <common/consumer/consumer-timer.h>
31 #include <common/consumer/consumer-testpoint.h>
32 #include <common/ust-consumer/ust-consumer.h>
34 typedef int (*sample_positions_cb
)(struct lttng_consumer_stream
*stream
);
35 typedef int (*get_consumed_cb
)(struct lttng_consumer_stream
*stream
,
36 unsigned long *consumed
);
37 typedef int (*get_produced_cb
)(struct lttng_consumer_stream
*stream
,
38 unsigned long *produced
);
40 static struct timer_signal_data timer_signal
= {
44 .lock
= PTHREAD_MUTEX_INITIALIZER
,
48 * Set custom signal mask to current thread.
50 static void setmask(sigset_t
*mask
)
54 ret
= sigemptyset(mask
);
56 PERROR("sigemptyset");
58 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_SWITCH
);
60 PERROR("sigaddset switch");
62 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_TEARDOWN
);
64 PERROR("sigaddset teardown");
66 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_LIVE
);
68 PERROR("sigaddset live");
70 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_MONITOR
);
72 PERROR("sigaddset monitor");
74 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_EXIT
);
76 PERROR("sigaddset exit");
80 static int channel_monitor_pipe
= -1;
83 * Execute action on a timer switch.
85 * Beware: metadata_switch_timer() should *never* take a mutex also held
86 * while consumer_timer_switch_stop() is called. It would result in
89 static void metadata_switch_timer(struct lttng_consumer_local_data
*ctx
,
93 struct lttng_consumer_channel
*channel
;
95 channel
= si
->si_value
.sival_ptr
;
98 if (channel
->switch_timer_error
) {
102 DBG("Switch timer for channel %" PRIu64
, channel
->key
);
104 case LTTNG_CONSUMER32_UST
:
105 case LTTNG_CONSUMER64_UST
:
107 * Locks taken by lttng_ustconsumer_request_metadata():
108 * - metadata_socket_lock
109 * - Calling lttng_ustconsumer_recv_metadata():
110 * - channel->metadata_cache->lock
111 * - Calling consumer_metadata_cache_flushed():
112 * - channel->timer_lock
113 * - channel->metadata_cache->lock
115 * Ensure that neither consumer_data.lock nor
116 * channel->lock are taken within this function, since
117 * they are held while consumer_timer_switch_stop() is
120 ret
= lttng_ustconsumer_request_metadata(ctx
, channel
, 1, 1);
122 channel
->switch_timer_error
= 1;
125 case LTTNG_CONSUMER_KERNEL
:
126 case LTTNG_CONSUMER_UNKNOWN
:
132 static int send_empty_index(struct lttng_consumer_stream
*stream
, uint64_t ts
,
136 struct ctf_packet_index index
;
138 memset(&index
, 0, sizeof(index
));
139 index
.stream_id
= htobe64(stream_id
);
140 index
.timestamp_end
= htobe64(ts
);
141 ret
= consumer_stream_write_index(stream
, &index
);
150 int consumer_flush_kernel_index(struct lttng_consumer_stream
*stream
)
152 uint64_t ts
, stream_id
;
155 ret
= kernctl_get_current_timestamp(stream
->wait_fd
, &ts
);
157 ERR("Failed to get the current timestamp");
160 ret
= kernctl_buffer_flush(stream
->wait_fd
);
162 ERR("Failed to flush kernel stream");
165 ret
= kernctl_snapshot(stream
->wait_fd
);
167 if (ret
!= -EAGAIN
&& ret
!= -ENODATA
) {
168 PERROR("live timer kernel snapshot");
172 ret
= kernctl_get_stream_id(stream
->wait_fd
, &stream_id
);
174 PERROR("kernctl_get_stream_id");
177 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
178 ret
= send_empty_index(stream
, ts
, stream_id
);
188 static int check_kernel_stream(struct lttng_consumer_stream
*stream
)
193 * While holding the stream mutex, try to take a snapshot, if it
194 * succeeds, it means that data is ready to be sent, just let the data
195 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
196 * means that there is no data to read after the flush, so we can
197 * safely send the empty index.
199 * Doing a trylock and checking if waiting on metadata if
200 * trylock fails. Bail out of the stream is indeed waiting for
201 * metadata to be pushed. Busy wait on trylock otherwise.
204 ret
= pthread_mutex_trylock(&stream
->lock
);
207 break; /* We have the lock. */
209 pthread_mutex_lock(&stream
->metadata_timer_lock
);
210 if (stream
->waiting_on_metadata
) {
212 stream
->missed_metadata_flush
= true;
213 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
214 goto end
; /* Bail out. */
216 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
221 ERR("Unexpected pthread_mutex_trylock error %d", ret
);
227 ret
= consumer_flush_kernel_index(stream
);
228 pthread_mutex_unlock(&stream
->lock
);
233 int consumer_flush_ust_index(struct lttng_consumer_stream
*stream
)
235 uint64_t ts
, stream_id
;
238 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
243 ret
= lttng_ustconsumer_get_current_timestamp(stream
, &ts
);
245 ERR("Failed to get the current timestamp");
248 lttng_ustconsumer_flush_buffer(stream
, 1);
249 ret
= lttng_ustconsumer_take_snapshot(stream
);
251 if (ret
!= -EAGAIN
) {
252 ERR("Taking UST snapshot");
256 ret
= lttng_ustconsumer_get_stream_id(stream
, &stream_id
);
258 PERROR("ustctl_get_stream_id");
261 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
262 ret
= send_empty_index(stream
, ts
, stream_id
);
272 static int check_ust_stream(struct lttng_consumer_stream
*stream
)
277 assert(stream
->ustream
);
279 * While holding the stream mutex, try to take a snapshot, if it
280 * succeeds, it means that data is ready to be sent, just let the data
281 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
282 * means that there is no data to read after the flush, so we can
283 * safely send the empty index.
285 * Doing a trylock and checking if waiting on metadata if
286 * trylock fails. Bail out of the stream is indeed waiting for
287 * metadata to be pushed. Busy wait on trylock otherwise.
290 ret
= pthread_mutex_trylock(&stream
->lock
);
293 break; /* We have the lock. */
295 pthread_mutex_lock(&stream
->metadata_timer_lock
);
296 if (stream
->waiting_on_metadata
) {
298 stream
->missed_metadata_flush
= true;
299 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
300 goto end
; /* Bail out. */
302 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
307 ERR("Unexpected pthread_mutex_trylock error %d", ret
);
313 ret
= consumer_flush_ust_index(stream
);
314 pthread_mutex_unlock(&stream
->lock
);
320 * Execute action on a live timer
322 static void live_timer(struct lttng_consumer_local_data
*ctx
,
326 struct lttng_consumer_channel
*channel
;
327 struct lttng_consumer_stream
*stream
;
329 struct lttng_ht_iter iter
;
331 channel
= si
->si_value
.sival_ptr
;
334 if (channel
->switch_timer_error
) {
337 ht
= consumer_data
.stream_per_chan_id_ht
;
339 DBG("Live timer for channel %" PRIu64
, channel
->key
);
343 case LTTNG_CONSUMER32_UST
:
344 case LTTNG_CONSUMER64_UST
:
345 cds_lfht_for_each_entry_duplicate(ht
->ht
,
346 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
347 ht
->match_fct
, &channel
->key
, &iter
.iter
,
348 stream
, node_channel_id
.node
) {
349 ret
= check_ust_stream(stream
);
355 case LTTNG_CONSUMER_KERNEL
:
356 cds_lfht_for_each_entry_duplicate(ht
->ht
,
357 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
358 ht
->match_fct
, &channel
->key
, &iter
.iter
,
359 stream
, node_channel_id
.node
) {
360 ret
= check_kernel_stream(stream
);
366 case LTTNG_CONSUMER_UNKNOWN
:
379 void consumer_timer_signal_thread_qs(unsigned int signr
)
381 sigset_t pending_set
;
385 * We need to be the only thread interacting with the thread
386 * that manages signals for teardown synchronization.
388 pthread_mutex_lock(&timer_signal
.lock
);
390 /* Ensure we don't have any signal queued for this channel. */
392 ret
= sigemptyset(&pending_set
);
394 PERROR("sigemptyset");
396 ret
= sigpending(&pending_set
);
398 PERROR("sigpending");
400 if (!sigismember(&pending_set
, signr
)) {
407 * From this point, no new signal handler will be fired that would try to
408 * access "chan". However, we still need to wait for any currently
409 * executing handler to complete.
412 CMM_STORE_SHARED(timer_signal
.qs_done
, 0);
416 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
419 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN
);
421 while (!CMM_LOAD_SHARED(timer_signal
.qs_done
)) {
426 pthread_mutex_unlock(&timer_signal
.lock
);
430 * Start a timer channel timer which will fire at a given interval
431 * (timer_interval_us)and fire a given signal (signal).
433 * Returns a negative value on error, 0 if a timer was created, and
434 * a positive value if no timer was created (not an error).
437 int consumer_channel_timer_start(timer_t
*timer_id
,
438 struct lttng_consumer_channel
*channel
,
439 unsigned int timer_interval_us
, int signal
)
441 int ret
= 0, delete_ret
;
443 struct itimerspec its
;
446 assert(channel
->key
);
448 if (timer_interval_us
== 0) {
449 /* No creation needed; not an error. */
454 sev
.sigev_notify
= SIGEV_SIGNAL
;
455 sev
.sigev_signo
= signal
;
456 sev
.sigev_value
.sival_ptr
= channel
;
457 ret
= timer_create(CLOCKID
, &sev
, timer_id
);
459 PERROR("timer_create");
463 its
.it_value
.tv_sec
= timer_interval_us
/ 1000000;
464 its
.it_value
.tv_nsec
= (timer_interval_us
% 1000000) * 1000;
465 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
466 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
468 ret
= timer_settime(*timer_id
, 0, &its
, NULL
);
470 PERROR("timer_settime");
471 goto error_destroy_timer
;
476 delete_ret
= timer_delete(*timer_id
);
477 if (delete_ret
== -1) {
478 PERROR("timer_delete");
484 int consumer_channel_timer_stop(timer_t
*timer_id
, int signal
)
488 ret
= timer_delete(*timer_id
);
490 PERROR("timer_delete");
494 consumer_timer_signal_thread_qs(signal
);
501 * Set the channel's switch timer.
503 void consumer_timer_switch_start(struct lttng_consumer_channel
*channel
,
504 unsigned int switch_timer_interval_us
)
509 assert(channel
->key
);
511 ret
= consumer_channel_timer_start(&channel
->switch_timer
, channel
,
512 switch_timer_interval_us
, LTTNG_CONSUMER_SIG_SWITCH
);
514 channel
->switch_timer_enabled
= !!(ret
== 0);
518 * Stop and delete the channel's switch timer.
520 void consumer_timer_switch_stop(struct lttng_consumer_channel
*channel
)
526 ret
= consumer_channel_timer_stop(&channel
->switch_timer
,
527 LTTNG_CONSUMER_SIG_SWITCH
);
529 ERR("Failed to stop switch timer");
532 channel
->switch_timer_enabled
= 0;
536 * Set the channel's live timer.
538 void consumer_timer_live_start(struct lttng_consumer_channel
*channel
,
539 unsigned int live_timer_interval_us
)
544 assert(channel
->key
);
546 ret
= consumer_channel_timer_start(&channel
->live_timer
, channel
,
547 live_timer_interval_us
, LTTNG_CONSUMER_SIG_LIVE
);
549 channel
->live_timer_enabled
= !!(ret
== 0);
553 * Stop and delete the channel's live timer.
555 void consumer_timer_live_stop(struct lttng_consumer_channel
*channel
)
561 ret
= consumer_channel_timer_stop(&channel
->live_timer
,
562 LTTNG_CONSUMER_SIG_LIVE
);
564 ERR("Failed to stop live timer");
567 channel
->live_timer_enabled
= 0;
571 * Set the channel's monitoring timer.
573 * Returns a negative value on error, 0 if a timer was created, and
574 * a positive value if no timer was created (not an error).
576 int consumer_timer_monitor_start(struct lttng_consumer_channel
*channel
,
577 unsigned int monitor_timer_interval_us
)
582 assert(channel
->key
);
583 assert(!channel
->monitor_timer_enabled
);
585 ret
= consumer_channel_timer_start(&channel
->monitor_timer
, channel
,
586 monitor_timer_interval_us
, LTTNG_CONSUMER_SIG_MONITOR
);
587 channel
->monitor_timer_enabled
= !!(ret
== 0);
592 * Stop and delete the channel's monitoring timer.
594 int consumer_timer_monitor_stop(struct lttng_consumer_channel
*channel
)
599 assert(channel
->monitor_timer_enabled
);
601 ret
= consumer_channel_timer_stop(&channel
->monitor_timer
,
602 LTTNG_CONSUMER_SIG_MONITOR
);
604 ERR("Failed to stop live timer");
608 channel
->monitor_timer_enabled
= 0;
614 * Block the RT signals for the entire process. It must be called from the
615 * consumer main before creating the threads
617 int consumer_signal_init(void)
622 /* Block signal for entire process, so only our thread processes it. */
624 ret
= pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
627 PERROR("pthread_sigmask");
634 int sample_channel_positions(struct lttng_consumer_channel
*channel
,
635 uint64_t *_highest_use
, uint64_t *_lowest_use
, uint64_t *_total_consumed
,
636 sample_positions_cb sample
, get_consumed_cb get_consumed
,
637 get_produced_cb get_produced
)
640 struct lttng_ht_iter iter
;
641 struct lttng_consumer_stream
*stream
;
642 bool empty_channel
= true;
643 uint64_t high
= 0, low
= UINT64_MAX
;
644 struct lttng_ht
*ht
= consumer_data
.stream_per_chan_id_ht
;
646 *_total_consumed
= 0;
650 cds_lfht_for_each_entry_duplicate(ht
->ht
,
651 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
652 ht
->match_fct
, &channel
->key
,
653 &iter
.iter
, stream
, node_channel_id
.node
) {
654 unsigned long produced
, consumed
, usage
;
656 empty_channel
= false;
658 pthread_mutex_lock(&stream
->lock
);
659 if (cds_lfht_is_node_deleted(&stream
->node
.node
)) {
663 ret
= sample(stream
);
665 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret
);
666 pthread_mutex_unlock(&stream
->lock
);
669 ret
= get_consumed(stream
, &consumed
);
671 ERR("Failed to get buffer consumed position in monitor timer");
672 pthread_mutex_unlock(&stream
->lock
);
675 ret
= get_produced(stream
, &produced
);
677 ERR("Failed to get buffer produced position in monitor timer");
678 pthread_mutex_unlock(&stream
->lock
);
682 usage
= produced
- consumed
;
683 high
= (usage
> high
) ? usage
: high
;
684 low
= (usage
< low
) ? usage
: low
;
687 * We don't use consumed here for 2 reasons:
688 * - output_written takes into account the padding written in the
689 * tracefiles when we stop the session;
690 * - the consumed position is not the accurate representation of what
691 * was extracted from a buffer in overwrite mode.
693 *_total_consumed
+= stream
->output_written
;
695 pthread_mutex_unlock(&stream
->lock
);
698 *_highest_use
= high
;
709 * Execute action on a monitor timer.
712 void monitor_timer(struct lttng_consumer_channel
*channel
)
715 int channel_monitor_pipe
=
716 consumer_timer_thread_get_channel_monitor_pipe();
717 struct lttcomm_consumer_channel_monitor_msg msg
= {
720 sample_positions_cb sample
;
721 get_consumed_cb get_consumed
;
722 get_produced_cb get_produced
;
723 uint64_t lowest
= 0, highest
= 0, total_consumed
= 0;
727 if (channel_monitor_pipe
< 0) {
731 switch (consumer_data
.type
) {
732 case LTTNG_CONSUMER_KERNEL
:
733 sample
= lttng_kconsumer_sample_snapshot_positions
;
734 get_consumed
= lttng_kconsumer_get_consumed_snapshot
;
735 get_produced
= lttng_kconsumer_get_produced_snapshot
;
737 case LTTNG_CONSUMER32_UST
:
738 case LTTNG_CONSUMER64_UST
:
739 sample
= lttng_ustconsumer_sample_snapshot_positions
;
740 get_consumed
= lttng_ustconsumer_get_consumed_snapshot
;
741 get_produced
= lttng_ustconsumer_get_produced_snapshot
;
747 ret
= sample_channel_positions(channel
, &highest
, &lowest
,
748 &total_consumed
, sample
, get_consumed
, get_produced
);
752 msg
.highest
= highest
;
754 msg
.total_consumed
= total_consumed
;
757 * Writes performed here are assumed to be atomic which is only
758 * guaranteed for sizes < than PIPE_BUF.
760 assert(sizeof(msg
) <= PIPE_BUF
);
763 ret
= write(channel_monitor_pipe
, &msg
, sizeof(msg
));
764 } while (ret
== -1 && errno
== EINTR
);
766 if (errno
== EAGAIN
) {
767 /* Not an error, the sample is merely dropped. */
768 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64
,
771 PERROR("write to the channel monitor pipe");
774 DBG("Sent channel monitoring sample for channel key %" PRIu64
775 ", (highest = %" PRIu64
", lowest = %"PRIu64
")",
776 channel
->key
, msg
.highest
, msg
.lowest
);
780 int consumer_timer_thread_get_channel_monitor_pipe(void)
782 return uatomic_read(&channel_monitor_pipe
);
785 int consumer_timer_thread_set_channel_monitor_pipe(int fd
)
789 ret
= uatomic_cmpxchg(&channel_monitor_pipe
, -1, fd
);
800 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
801 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
802 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
804 void *consumer_timer_thread(void *data
)
809 struct lttng_consumer_local_data
*ctx
= data
;
811 rcu_register_thread();
813 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_METADATA_TIMER
);
815 if (testpoint(consumerd_thread_metadata_timer
)) {
816 goto error_testpoint
;
819 health_code_update();
821 /* Only self thread will receive signal mask. */
823 CMM_STORE_SHARED(timer_signal
.tid
, pthread_self());
826 health_code_update();
829 signr
= sigwaitinfo(&mask
, &info
);
833 * NOTE: cascading conditions are used instead of a switch case
834 * since the use of SIGRTMIN in the definition of the signals'
835 * values prevents the reduction to an integer constant.
838 if (errno
!= EINTR
) {
839 PERROR("sigwaitinfo");
842 } else if (signr
== LTTNG_CONSUMER_SIG_SWITCH
) {
843 metadata_switch_timer(ctx
, &info
);
844 } else if (signr
== LTTNG_CONSUMER_SIG_TEARDOWN
) {
846 CMM_STORE_SHARED(timer_signal
.qs_done
, 1);
848 DBG("Signal timer metadata thread teardown");
849 } else if (signr
== LTTNG_CONSUMER_SIG_LIVE
) {
850 live_timer(ctx
, &info
);
851 } else if (signr
== LTTNG_CONSUMER_SIG_MONITOR
) {
852 struct lttng_consumer_channel
*channel
;
854 channel
= info
.si_value
.sival_ptr
;
855 monitor_timer(channel
);
856 } else if (signr
== LTTNG_CONSUMER_SIG_EXIT
) {
857 assert(CMM_LOAD_SHARED(consumer_quit
));
860 ERR("Unexpected signal %d\n", info
.si_signo
);
865 /* Only reached in testpoint error */
868 health_unregister(health_consumerd
);
869 rcu_unregister_thread();