2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include <common/common.h>
25 #include <common/kernel-ctl/kernel-ctl.h>
26 #include <common/kernel-consumer/kernel-consumer.h>
27 #include <common/consumer-stream.h>
29 #include "consumer-timer.h"
30 #include "ust-consumer/ust-consumer.h"
32 static struct timer_signal_data timer_signal
= {
36 .lock
= PTHREAD_MUTEX_INITIALIZER
,
40 * Set custom signal mask to current thread.
42 static void setmask(sigset_t
*mask
)
46 ret
= sigemptyset(mask
);
48 PERROR("sigemptyset");
50 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_SWITCH
);
52 PERROR("sigaddset switch");
54 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_TEARDOWN
);
56 PERROR("sigaddset teardown");
58 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_LIVE
);
60 PERROR("sigaddset live");
65 * Execute action on a timer switch.
67 * Beware: metadata_switch_timer() should *never* take a mutex also held
68 * while consumer_timer_switch_stop() is called. It would result in
71 static void metadata_switch_timer(struct lttng_consumer_local_data
*ctx
,
72 int sig
, siginfo_t
*si
, void *uc
)
75 struct lttng_consumer_channel
*channel
;
77 channel
= si
->si_value
.sival_ptr
;
80 if (channel
->switch_timer_error
) {
84 DBG("Switch timer for channel %" PRIu64
, channel
->key
);
86 case LTTNG_CONSUMER32_UST
:
87 case LTTNG_CONSUMER64_UST
:
89 * Locks taken by lttng_ustconsumer_request_metadata():
90 * - metadata_socket_lock
91 * - Calling lttng_ustconsumer_recv_metadata():
92 * - channel->metadata_cache->lock
93 * - Calling consumer_metadata_cache_flushed():
94 * - channel->timer_lock
95 * - channel->metadata_cache->lock
97 * Ensure that neither consumer_data.lock nor
98 * channel->lock are taken within this function, since
99 * they are held while consumer_timer_switch_stop() is
102 ret
= lttng_ustconsumer_request_metadata(ctx
, channel
, 1, 1);
104 channel
->switch_timer_error
= 1;
107 case LTTNG_CONSUMER_KERNEL
:
108 case LTTNG_CONSUMER_UNKNOWN
:
114 static int send_empty_index(struct lttng_consumer_stream
*stream
, uint64_t ts
)
117 struct lttng_packet_index index
;
119 memset(&index
, 0, sizeof(index
));
120 index
.timestamp_end
= htobe64(ts
);
121 ret
= consumer_stream_write_index(stream
, &index
);
130 static int check_kernel_stream(struct lttng_consumer_stream
*stream
)
136 * While holding the stream mutex, try to take a snapshot, if it
137 * succeeds, it means that data is ready to be sent, just let the data
138 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
139 * means that there is no data to read after the flush, so we can
140 * safely send the empty index.
142 pthread_mutex_lock(&stream
->lock
);
143 ret
= kernctl_get_current_timestamp(stream
->wait_fd
, &ts
);
145 ERR("Failed to get the current timestamp");
148 ret
= kernctl_buffer_flush(stream
->wait_fd
);
150 ERR("Failed to flush kernel stream");
153 ret
= kernctl_snapshot(stream
->wait_fd
);
155 if (errno
!= EAGAIN
) {
156 ERR("Taking kernel snapshot");
160 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
161 ret
= send_empty_index(stream
, ts
);
169 pthread_mutex_unlock(&stream
->lock
);
173 static int check_ust_stream(struct lttng_consumer_stream
*stream
)
179 assert(stream
->ustream
);
181 * While holding the stream mutex, try to take a snapshot, if it
182 * succeeds, it means that data is ready to be sent, just let the data
183 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
184 * means that there is no data to read after the flush, so we can
185 * safely send the empty index.
187 pthread_mutex_lock(&stream
->lock
);
188 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
193 ret
= lttng_ustconsumer_get_current_timestamp(stream
, &ts
);
195 ERR("Failed to get the current timestamp");
198 lttng_ustconsumer_flush_buffer(stream
, 1);
199 ret
= lttng_ustconsumer_take_snapshot(stream
);
201 if (ret
!= -EAGAIN
) {
202 ERR("Taking UST snapshot");
206 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
207 ret
= send_empty_index(stream
, ts
);
215 pthread_mutex_unlock(&stream
->lock
);
220 * Execute action on a live timer
222 static void live_timer(struct lttng_consumer_local_data
*ctx
,
223 int sig
, siginfo_t
*si
, void *uc
)
226 struct lttng_consumer_channel
*channel
;
227 struct lttng_consumer_stream
*stream
;
229 struct lttng_ht_iter iter
;
231 channel
= si
->si_value
.sival_ptr
;
234 if (channel
->switch_timer_error
) {
237 ht
= consumer_data
.stream_per_chan_id_ht
;
239 DBG("Live timer for channel %" PRIu64
, channel
->key
);
243 case LTTNG_CONSUMER32_UST
:
244 case LTTNG_CONSUMER64_UST
:
245 cds_lfht_for_each_entry_duplicate(ht
->ht
,
246 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
247 ht
->match_fct
, &channel
->key
, &iter
.iter
,
248 stream
, node_channel_id
.node
) {
249 ret
= check_ust_stream(stream
);
255 case LTTNG_CONSUMER_KERNEL
:
256 cds_lfht_for_each_entry_duplicate(ht
->ht
,
257 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
258 ht
->match_fct
, &channel
->key
, &iter
.iter
,
259 stream
, node_channel_id
.node
) {
260 ret
= check_kernel_stream(stream
);
266 case LTTNG_CONSUMER_UNKNOWN
:
279 void consumer_timer_signal_thread_qs(unsigned int signr
)
281 sigset_t pending_set
;
285 * We need to be the only thread interacting with the thread
286 * that manages signals for teardown synchronization.
288 pthread_mutex_lock(&timer_signal
.lock
);
290 /* Ensure we don't have any signal queued for this channel. */
292 ret
= sigemptyset(&pending_set
);
294 PERROR("sigemptyset");
296 ret
= sigpending(&pending_set
);
298 PERROR("sigpending");
300 if (!sigismember(&pending_set
, LTTNG_CONSUMER_SIG_SWITCH
)) {
307 * From this point, no new signal handler will be fired that would try to
308 * access "chan". However, we still need to wait for any currently
309 * executing handler to complete.
312 CMM_STORE_SHARED(timer_signal
.qs_done
, 0);
316 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
319 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN
);
321 while (!CMM_LOAD_SHARED(timer_signal
.qs_done
)) {
326 pthread_mutex_unlock(&timer_signal
.lock
);
330 * Set the timer for periodical metadata flush.
332 void consumer_timer_switch_start(struct lttng_consumer_channel
*channel
,
333 unsigned int switch_timer_interval
)
337 struct itimerspec its
;
340 assert(channel
->key
);
342 if (switch_timer_interval
== 0) {
346 sev
.sigev_notify
= SIGEV_SIGNAL
;
347 sev
.sigev_signo
= LTTNG_CONSUMER_SIG_SWITCH
;
348 sev
.sigev_value
.sival_ptr
= channel
;
349 ret
= timer_create(CLOCKID
, &sev
, &channel
->switch_timer
);
351 PERROR("timer_create");
353 channel
->switch_timer_enabled
= 1;
355 its
.it_value
.tv_sec
= switch_timer_interval
/ 1000000;
356 its
.it_value
.tv_nsec
= switch_timer_interval
% 1000000;
357 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
358 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
360 ret
= timer_settime(channel
->switch_timer
, 0, &its
, NULL
);
362 PERROR("timer_settime");
367 * Stop and delete timer.
369 void consumer_timer_switch_stop(struct lttng_consumer_channel
*channel
)
375 ret
= timer_delete(channel
->switch_timer
);
377 PERROR("timer_delete");
380 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH
);
382 channel
->switch_timer
= 0;
383 channel
->switch_timer_enabled
= 0;
387 * Set the timer for the live mode.
389 void consumer_timer_live_start(struct lttng_consumer_channel
*channel
,
390 int live_timer_interval
)
394 struct itimerspec its
;
397 assert(channel
->key
);
399 if (live_timer_interval
== 0) {
403 sev
.sigev_notify
= SIGEV_SIGNAL
;
404 sev
.sigev_signo
= LTTNG_CONSUMER_SIG_LIVE
;
405 sev
.sigev_value
.sival_ptr
= channel
;
406 ret
= timer_create(CLOCKID
, &sev
, &channel
->live_timer
);
408 PERROR("timer_create");
410 channel
->live_timer_enabled
= 1;
412 its
.it_value
.tv_sec
= live_timer_interval
/ 1000000;
413 its
.it_value
.tv_nsec
= live_timer_interval
% 1000000;
414 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
415 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
417 ret
= timer_settime(channel
->live_timer
, 0, &its
, NULL
);
419 PERROR("timer_settime");
424 * Stop and delete timer.
426 void consumer_timer_live_stop(struct lttng_consumer_channel
*channel
)
432 ret
= timer_delete(channel
->live_timer
);
434 PERROR("timer_delete");
437 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE
);
439 channel
->live_timer
= 0;
440 channel
->live_timer_enabled
= 0;
444 * Block the RT signals for the entire process. It must be called from the
445 * consumer main before creating the threads
447 void consumer_signal_init(void)
452 /* Block signal for entire process, so only our thread processes it. */
454 ret
= pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
457 PERROR("pthread_sigmask");
462 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
463 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
465 void *consumer_timer_thread(void *data
)
470 struct lttng_consumer_local_data
*ctx
= data
;
472 /* Only self thread will receive signal mask. */
474 CMM_STORE_SHARED(timer_signal
.tid
, pthread_self());
477 signr
= sigwaitinfo(&mask
, &info
);
479 if (errno
!= EINTR
) {
480 PERROR("sigwaitinfo");
483 } else if (signr
== LTTNG_CONSUMER_SIG_SWITCH
) {
484 metadata_switch_timer(ctx
, info
.si_signo
, &info
, NULL
);
485 } else if (signr
== LTTNG_CONSUMER_SIG_TEARDOWN
) {
487 CMM_STORE_SHARED(timer_signal
.qs_done
, 1);
489 DBG("Signal timer metadata thread teardown");
490 } else if (signr
== LTTNG_CONSUMER_SIG_LIVE
) {
491 live_timer(ctx
, info
.si_signo
, &info
, NULL
);
493 ERR("Unexpected signal %d\n", info
.si_signo
);