Fix: consumerd: incorrect clear logging statement
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3 1/*
ab5be9fa
MJ
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
331744e3 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
331744e3 6 *
331744e3
JD
7 */
8
6c1c0768 9#define _LGPL_SOURCE
331744e3
JD
10#include <assert.h>
11#include <inttypes.h>
12#include <signal.h>
13
51a9e1c7 14#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 15#include <common/common.h>
f263b7fd 16#include <common/compat/endian.h>
d3e2ba59
JD
17#include <common/kernel-ctl/kernel-ctl.h>
18#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
19#include <common/consumer/consumer-stream.h>
20#include <common/consumer/consumer-timer.h>
21#include <common/consumer/consumer-testpoint.h>
22#include <common/ust-consumer/ust-consumer.h>
331744e3 23
e9404c27
JG
24typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
25typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
26 unsigned long *consumed);
27typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
28 unsigned long *produced);
29
2b8f8754
MD
30static struct timer_signal_data timer_signal = {
31 .tid = 0,
32 .setup_done = 0,
33 .qs_done = 0,
34 .lock = PTHREAD_MUTEX_INITIALIZER,
35};
331744e3
JD
36
37/*
38 * Set custom signal mask to current thread.
39 */
40static void setmask(sigset_t *mask)
41{
42 int ret;
43
44 ret = sigemptyset(mask);
45 if (ret) {
46 PERROR("sigemptyset");
47 }
48 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
49 if (ret) {
d3e2ba59 50 PERROR("sigaddset switch");
331744e3
JD
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
53 if (ret) {
d3e2ba59
JD
54 PERROR("sigaddset teardown");
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
57 if (ret) {
58 PERROR("sigaddset live");
331744e3 59 }
e9404c27
JG
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
61 if (ret) {
62 PERROR("sigaddset monitor");
63 }
13675d0e
MD
64 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
65 if (ret) {
66 PERROR("sigaddset exit");
67 }
331744e3
JD
68}
69
e9404c27
JG
70static int channel_monitor_pipe = -1;
71
331744e3
JD
72/*
73 * Execute action on a timer switch.
d98a47c7
MD
74 *
75 * Beware: metadata_switch_timer() should *never* take a mutex also held
76 * while consumer_timer_switch_stop() is called. It would result in
77 * deadlocks.
331744e3
JD
78 */
79static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 80 siginfo_t *si)
331744e3
JD
81{
82 int ret;
83 struct lttng_consumer_channel *channel;
84
85 channel = si->si_value.sival_ptr;
86 assert(channel);
87
4419b4fb
MD
88 if (channel->switch_timer_error) {
89 return;
90 }
91
331744e3
JD
92 DBG("Switch timer for channel %" PRIu64, channel->key);
93 switch (ctx->type) {
94 case LTTNG_CONSUMER32_UST:
95 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
96 /*
97 * Locks taken by lttng_ustconsumer_request_metadata():
98 * - metadata_socket_lock
99 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 100 * - channel->metadata_cache->lock
4fa3dc0e 101 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
102 * - channel->timer_lock
103 * - channel->metadata_cache->lock
4fa3dc0e 104 *
5e41ebe1
MD
105 * Ensure that neither consumer_data.lock nor
106 * channel->lock are taken within this function, since
107 * they are held while consumer_timer_switch_stop() is
108 * called.
4fa3dc0e 109 */
94d49140 110 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 111 if (ret < 0) {
4419b4fb 112 channel->switch_timer_error = 1;
331744e3
JD
113 }
114 break;
115 case LTTNG_CONSUMER_KERNEL:
116 case LTTNG_CONSUMER_UNKNOWN:
117 assert(0);
118 break;
119 }
120}
121
528f2ffa
JD
122static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
123 uint64_t stream_id)
d3e2ba59
JD
124{
125 int ret;
50adc264 126 struct ctf_packet_index index;
d3e2ba59
JD
127
128 memset(&index, 0, sizeof(index));
528f2ffa 129 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
130 index.timestamp_end = htobe64(ts);
131 ret = consumer_stream_write_index(stream, &index);
132 if (ret < 0) {
133 goto error;
134 }
135
136error:
137 return ret;
138}
139
c585821b 140int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 141{
528f2ffa 142 uint64_t ts, stream_id;
d3e2ba59
JD
143 int ret;
144
d3e2ba59
JD
145 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
146 if (ret < 0) {
147 ERR("Failed to get the current timestamp");
c585821b 148 goto end;
d3e2ba59
JD
149 }
150 ret = kernctl_buffer_flush(stream->wait_fd);
151 if (ret < 0) {
152 ERR("Failed to flush kernel stream");
c585821b 153 goto end;
d3e2ba59
JD
154 }
155 ret = kernctl_snapshot(stream->wait_fd);
156 if (ret < 0) {
32af2c95 157 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 158 PERROR("live timer kernel snapshot");
d3e2ba59 159 ret = -1;
c585821b 160 goto end;
d3e2ba59 161 }
528f2ffa
JD
162 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
163 if (ret < 0) {
164 PERROR("kernctl_get_stream_id");
c585821b 165 goto end;
528f2ffa 166 }
d3e2ba59 167 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 168 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 169 if (ret < 0) {
c585821b 170 goto end;
d3e2ba59
JD
171 }
172 }
173 ret = 0;
c585821b 174end:
d3e2ba59
JD
175 return ret;
176}
177
c585821b 178static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 179{
d3e2ba59
JD
180 int ret;
181
d3e2ba59
JD
182 /*
183 * While holding the stream mutex, try to take a snapshot, if it
184 * succeeds, it means that data is ready to be sent, just let the data
185 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
186 * means that there is no data to read after the flush, so we can
187 * safely send the empty index.
c585821b
MD
188 *
189 * Doing a trylock and checking if waiting on metadata if
190 * trylock fails. Bail out of the stream is indeed waiting for
191 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 192 */
c585821b
MD
193 for (;;) {
194 ret = pthread_mutex_trylock(&stream->lock);
195 switch (ret) {
196 case 0:
197 break; /* We have the lock. */
198 case EBUSY:
199 pthread_mutex_lock(&stream->metadata_timer_lock);
200 if (stream->waiting_on_metadata) {
201 ret = 0;
202 stream->missed_metadata_flush = true;
203 pthread_mutex_unlock(&stream->metadata_timer_lock);
204 goto end; /* Bail out. */
205 }
206 pthread_mutex_unlock(&stream->metadata_timer_lock);
207 /* Try again. */
208 caa_cpu_relax();
209 continue;
210 default:
211 ERR("Unexpected pthread_mutex_trylock error %d", ret);
212 ret = -1;
213 goto end;
214 }
215 break;
216 }
217 ret = consumer_flush_kernel_index(stream);
218 pthread_mutex_unlock(&stream->lock);
219end:
220 return ret;
221}
222
223int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
224{
225 uint64_t ts, stream_id;
226 int ret;
227
94d49140
JD
228 ret = cds_lfht_is_node_deleted(&stream->node.node);
229 if (ret) {
c585821b 230 goto end;
94d49140
JD
231 }
232
84a182ce 233 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
234 if (ret < 0) {
235 ERR("Failed to get the current timestamp");
c585821b 236 goto end;
d3e2ba59 237 }
84a182ce
DG
238 lttng_ustconsumer_flush_buffer(stream, 1);
239 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 240 if (ret < 0) {
94d49140 241 if (ret != -EAGAIN) {
d3e2ba59
JD
242 ERR("Taking UST snapshot");
243 ret = -1;
c585821b 244 goto end;
d3e2ba59 245 }
70190e1c 246 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
247 if (ret < 0) {
248 PERROR("ustctl_get_stream_id");
c585821b 249 goto end;
528f2ffa 250 }
d3e2ba59 251 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 252 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 253 if (ret < 0) {
c585821b 254 goto end;
d3e2ba59
JD
255 }
256 }
257 ret = 0;
c585821b
MD
258end:
259 return ret;
260}
d3e2ba59 261
c585821b
MD
262static int check_ust_stream(struct lttng_consumer_stream *stream)
263{
264 int ret;
265
266 assert(stream);
267 assert(stream->ustream);
268 /*
269 * While holding the stream mutex, try to take a snapshot, if it
270 * succeeds, it means that data is ready to be sent, just let the data
271 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
272 * means that there is no data to read after the flush, so we can
273 * safely send the empty index.
274 *
275 * Doing a trylock and checking if waiting on metadata if
276 * trylock fails. Bail out of the stream is indeed waiting for
277 * metadata to be pushed. Busy wait on trylock otherwise.
278 */
279 for (;;) {
280 ret = pthread_mutex_trylock(&stream->lock);
281 switch (ret) {
282 case 0:
283 break; /* We have the lock. */
284 case EBUSY:
285 pthread_mutex_lock(&stream->metadata_timer_lock);
286 if (stream->waiting_on_metadata) {
287 ret = 0;
288 stream->missed_metadata_flush = true;
289 pthread_mutex_unlock(&stream->metadata_timer_lock);
290 goto end; /* Bail out. */
291 }
292 pthread_mutex_unlock(&stream->metadata_timer_lock);
293 /* Try again. */
294 caa_cpu_relax();
295 continue;
296 default:
297 ERR("Unexpected pthread_mutex_trylock error %d", ret);
298 ret = -1;
299 goto end;
300 }
301 break;
302 }
303 ret = consumer_flush_ust_index(stream);
d3e2ba59 304 pthread_mutex_unlock(&stream->lock);
c585821b 305end:
d3e2ba59
JD
306 return ret;
307}
308
309/*
310 * Execute action on a live timer
311 */
312static void live_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 313 siginfo_t *si)
d3e2ba59
JD
314{
315 int ret;
316 struct lttng_consumer_channel *channel;
317 struct lttng_consumer_stream *stream;
318 struct lttng_ht *ht;
319 struct lttng_ht_iter iter;
320
321 channel = si->si_value.sival_ptr;
322 assert(channel);
323
324 if (channel->switch_timer_error) {
325 goto error;
326 }
327 ht = consumer_data.stream_per_chan_id_ht;
328
329 DBG("Live timer for channel %" PRIu64, channel->key);
330
331 rcu_read_lock();
332 switch (ctx->type) {
333 case LTTNG_CONSUMER32_UST:
334 case LTTNG_CONSUMER64_UST:
335 cds_lfht_for_each_entry_duplicate(ht->ht,
336 ht->hash_fct(&channel->key, lttng_ht_seed),
337 ht->match_fct, &channel->key, &iter.iter,
338 stream, node_channel_id.node) {
339 ret = check_ust_stream(stream);
340 if (ret < 0) {
341 goto error_unlock;
342 }
343 }
344 break;
345 case LTTNG_CONSUMER_KERNEL:
346 cds_lfht_for_each_entry_duplicate(ht->ht,
347 ht->hash_fct(&channel->key, lttng_ht_seed),
348 ht->match_fct, &channel->key, &iter.iter,
349 stream, node_channel_id.node) {
350 ret = check_kernel_stream(stream);
351 if (ret < 0) {
352 goto error_unlock;
353 }
354 }
355 break;
356 case LTTNG_CONSUMER_UNKNOWN:
357 assert(0);
358 break;
359 }
360
361error_unlock:
362 rcu_read_unlock();
363
364error:
365 return;
366}
367
2b8f8754
MD
368static
369void consumer_timer_signal_thread_qs(unsigned int signr)
370{
371 sigset_t pending_set;
372 int ret;
373
374 /*
375 * We need to be the only thread interacting with the thread
376 * that manages signals for teardown synchronization.
377 */
378 pthread_mutex_lock(&timer_signal.lock);
379
380 /* Ensure we don't have any signal queued for this channel. */
381 for (;;) {
382 ret = sigemptyset(&pending_set);
383 if (ret == -1) {
384 PERROR("sigemptyset");
385 }
386 ret = sigpending(&pending_set);
387 if (ret == -1) {
388 PERROR("sigpending");
389 }
f05a6b6d 390 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
391 break;
392 }
393 caa_cpu_relax();
394 }
395
396 /*
397 * From this point, no new signal handler will be fired that would try to
398 * access "chan". However, we still need to wait for any currently
399 * executing handler to complete.
400 */
401 cmm_smp_mb();
402 CMM_STORE_SHARED(timer_signal.qs_done, 0);
403 cmm_smp_mb();
404
405 /*
406 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
407 * up.
408 */
409 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
410
411 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
412 caa_cpu_relax();
413 }
414 cmm_smp_mb();
415
416 pthread_mutex_unlock(&timer_signal.lock);
417}
418
331744e3 419/*
e9404c27
JG
420 * Start a timer channel timer which will fire at a given interval
421 * (timer_interval_us)and fire a given signal (signal).
422 *
423 * Returns a negative value on error, 0 if a timer was created, and
424 * a positive value if no timer was created (not an error).
331744e3 425 */
e9404c27
JG
426static
427int consumer_channel_timer_start(timer_t *timer_id,
428 struct lttng_consumer_channel *channel,
429 unsigned int timer_interval_us, int signal)
331744e3 430{
e9404c27 431 int ret = 0, delete_ret;
331744e3
JD
432 struct sigevent sev;
433 struct itimerspec its;
434
435 assert(channel);
436 assert(channel->key);
437
e9404c27
JG
438 if (timer_interval_us == 0) {
439 /* No creation needed; not an error. */
440 ret = 1;
441 goto end;
331744e3
JD
442 }
443
444 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 445 sev.sigev_signo = signal;
331744e3 446 sev.sigev_value.sival_ptr = channel;
e9404c27 447 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
448 if (ret == -1) {
449 PERROR("timer_create");
e9404c27 450 goto end;
331744e3 451 }
331744e3 452
e9404c27
JG
453 its.it_value.tv_sec = timer_interval_us / 1000000;
454 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
455 its.it_interval.tv_sec = its.it_value.tv_sec;
456 its.it_interval.tv_nsec = its.it_value.tv_nsec;
457
e9404c27 458 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
459 if (ret == -1) {
460 PERROR("timer_settime");
e9404c27
JG
461 goto error_destroy_timer;
462 }
463end:
464 return ret;
465error_destroy_timer:
466 delete_ret = timer_delete(*timer_id);
467 if (delete_ret == -1) {
468 PERROR("timer_delete");
469 }
470 goto end;
471}
472
473static
474int consumer_channel_timer_stop(timer_t *timer_id, int signal)
475{
476 int ret = 0;
477
478 ret = timer_delete(*timer_id);
479 if (ret == -1) {
480 PERROR("timer_delete");
481 goto end;
331744e3 482 }
e9404c27
JG
483
484 consumer_timer_signal_thread_qs(signal);
485 *timer_id = 0;
486end:
487 return ret;
488}
489
490/*
491 * Set the channel's switch timer.
492 */
493void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
494 unsigned int switch_timer_interval_us)
495{
496 int ret;
497
498 assert(channel);
499 assert(channel->key);
500
501 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
502 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
503
504 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
505}
506
507/*
e9404c27 508 * Stop and delete the channel's switch timer.
331744e3
JD
509 */
510void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
511{
512 int ret;
331744e3
JD
513
514 assert(channel);
515
e9404c27
JG
516 ret = consumer_channel_timer_stop(&channel->switch_timer,
517 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 518 if (ret == -1) {
e9404c27 519 ERR("Failed to stop switch timer");
331744e3
JD
520 }
521
2b8f8754 522 channel->switch_timer_enabled = 0;
331744e3
JD
523}
524
d3e2ba59 525/*
e9404c27 526 * Set the channel's live timer.
d3e2ba59
JD
527 */
528void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 529 unsigned int live_timer_interval_us)
d3e2ba59
JD
530{
531 int ret;
d3e2ba59
JD
532
533 assert(channel);
534 assert(channel->key);
535
e9404c27
JG
536 ret = consumer_channel_timer_start(&channel->live_timer, channel,
537 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 538
e9404c27
JG
539 channel->live_timer_enabled = !!(ret == 0);
540}
d3e2ba59 541
e9404c27
JG
542/*
543 * Stop and delete the channel's live timer.
544 */
545void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
546{
547 int ret;
548
549 assert(channel);
d3e2ba59 550
e9404c27
JG
551 ret = consumer_channel_timer_stop(&channel->live_timer,
552 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 553 if (ret == -1) {
e9404c27 554 ERR("Failed to stop live timer");
d3e2ba59 555 }
e9404c27
JG
556
557 channel->live_timer_enabled = 0;
d3e2ba59
JD
558}
559
560/*
e9404c27
JG
561 * Set the channel's monitoring timer.
562 *
563 * Returns a negative value on error, 0 if a timer was created, and
564 * a positive value if no timer was created (not an error).
d3e2ba59 565 */
e9404c27
JG
566int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
567 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
568{
569 int ret;
570
571 assert(channel);
e9404c27
JG
572 assert(channel->key);
573 assert(!channel->monitor_timer_enabled);
d3e2ba59 574
e9404c27
JG
575 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
576 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
577 channel->monitor_timer_enabled = !!(ret == 0);
578 return ret;
579}
580
581/*
582 * Stop and delete the channel's monitoring timer.
583 */
584int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
585{
586 int ret;
587
588 assert(channel);
589 assert(channel->monitor_timer_enabled);
590
591 ret = consumer_channel_timer_stop(&channel->monitor_timer,
592 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 593 if (ret == -1) {
e9404c27
JG
594 ERR("Failed to stop live timer");
595 goto end;
d3e2ba59
JD
596 }
597
e9404c27
JG
598 channel->monitor_timer_enabled = 0;
599end:
600 return ret;
d3e2ba59
JD
601}
602
331744e3
JD
603/*
604 * Block the RT signals for the entire process. It must be called from the
605 * consumer main before creating the threads
606 */
73664f81 607int consumer_signal_init(void)
331744e3
JD
608{
609 int ret;
610 sigset_t mask;
611
612 /* Block signal for entire process, so only our thread processes it. */
613 setmask(&mask);
614 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
615 if (ret) {
616 errno = ret;
617 PERROR("pthread_sigmask");
73664f81 618 return -1;
331744e3 619 }
73664f81 620 return 0;
331744e3
JD
621}
622
e9404c27
JG
623static
624int sample_channel_positions(struct lttng_consumer_channel *channel,
e8360425 625 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
e9404c27
JG
626 sample_positions_cb sample, get_consumed_cb get_consumed,
627 get_produced_cb get_produced)
628{
23bc9bb5 629 int ret = 0;
e9404c27
JG
630 struct lttng_ht_iter iter;
631 struct lttng_consumer_stream *stream;
632 bool empty_channel = true;
633 uint64_t high = 0, low = UINT64_MAX;
634 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
635
e8360425
JD
636 *_total_consumed = 0;
637
e9404c27
JG
638 rcu_read_lock();
639
640 cds_lfht_for_each_entry_duplicate(ht->ht,
641 ht->hash_fct(&channel->key, lttng_ht_seed),
642 ht->match_fct, &channel->key,
643 &iter.iter, stream, node_channel_id.node) {
644 unsigned long produced, consumed, usage;
645
646 empty_channel = false;
647
648 pthread_mutex_lock(&stream->lock);
649 if (cds_lfht_is_node_deleted(&stream->node.node)) {
650 goto next;
651 }
652
653 ret = sample(stream);
654 if (ret) {
655 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
656 pthread_mutex_unlock(&stream->lock);
657 goto end;
658 }
659 ret = get_consumed(stream, &consumed);
660 if (ret) {
661 ERR("Failed to get buffer consumed position in monitor timer");
662 pthread_mutex_unlock(&stream->lock);
663 goto end;
664 }
665 ret = get_produced(stream, &produced);
666 if (ret) {
667 ERR("Failed to get buffer produced position in monitor timer");
668 pthread_mutex_unlock(&stream->lock);
669 goto end;
670 }
671
672 usage = produced - consumed;
673 high = (usage > high) ? usage : high;
674 low = (usage < low) ? usage : low;
e8360425
JD
675
676 /*
677 * We don't use consumed here for 2 reasons:
678 * - output_written takes into account the padding written in the
679 * tracefiles when we stop the session;
680 * - the consumed position is not the accurate representation of what
681 * was extracted from a buffer in overwrite mode.
682 */
683 *_total_consumed += stream->output_written;
e9404c27
JG
684 next:
685 pthread_mutex_unlock(&stream->lock);
686 }
687
688 *_highest_use = high;
689 *_lowest_use = low;
690end:
691 rcu_read_unlock();
692 if (empty_channel) {
693 ret = -1;
694 }
695 return ret;
696}
697
698/*
699 * Execute action on a monitor timer.
700 */
701static
5704917f 702void monitor_timer(struct lttng_consumer_channel *channel)
e9404c27
JG
703{
704 int ret;
705 int channel_monitor_pipe =
706 consumer_timer_thread_get_channel_monitor_pipe();
707 struct lttcomm_consumer_channel_monitor_msg msg = {
708 .key = channel->key,
709 };
710 sample_positions_cb sample;
711 get_consumed_cb get_consumed;
712 get_produced_cb get_produced;
ef4b086e 713 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27
JG
714
715 assert(channel);
e9404c27
JG
716
717 if (channel_monitor_pipe < 0) {
873dda4e 718 return;
e9404c27
JG
719 }
720
721 switch (consumer_data.type) {
722 case LTTNG_CONSUMER_KERNEL:
723 sample = lttng_kconsumer_sample_snapshot_positions;
724 get_consumed = lttng_kconsumer_get_consumed_snapshot;
725 get_produced = lttng_kconsumer_get_produced_snapshot;
726 break;
727 case LTTNG_CONSUMER32_UST:
728 case LTTNG_CONSUMER64_UST:
729 sample = lttng_ustconsumer_sample_snapshot_positions;
730 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
731 get_produced = lttng_ustconsumer_get_produced_snapshot;
732 break;
733 default:
734 abort();
735 }
736
ef4b086e
JG
737 ret = sample_channel_positions(channel, &highest, &lowest,
738 &total_consumed, sample, get_consumed, get_produced);
e9404c27 739 if (ret) {
873dda4e 740 return;
e9404c27 741 }
ef4b086e
JG
742 msg.highest = highest;
743 msg.lowest = lowest;
744 msg.total_consumed = total_consumed;
e9404c27
JG
745
746 /*
747 * Writes performed here are assumed to be atomic which is only
748 * guaranteed for sizes < than PIPE_BUF.
749 */
750 assert(sizeof(msg) <= PIPE_BUF);
751
752 do {
753 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
754 } while (ret == -1 && errno == EINTR);
755 if (ret == -1) {
756 if (errno == EAGAIN) {
757 /* Not an error, the sample is merely dropped. */
758 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
759 channel->key);
760 } else {
761 PERROR("write to the channel monitor pipe");
762 }
763 } else {
764 DBG("Sent channel monitoring sample for channel key %" PRIu64
765 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
766 channel->key, msg.highest, msg.lowest);
767 }
e9404c27
JG
768}
769
770int consumer_timer_thread_get_channel_monitor_pipe(void)
771{
772 return uatomic_read(&channel_monitor_pipe);
773}
774
775int consumer_timer_thread_set_channel_monitor_pipe(int fd)
776{
777 int ret;
778
779 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
780 if (ret != -1) {
781 ret = -1;
782 goto end;
783 }
784 ret = 0;
785end:
786 return ret;
787}
788
331744e3 789/*
d3e2ba59 790 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 791 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 792 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 793 */
d3e2ba59 794void *consumer_timer_thread(void *data)
331744e3
JD
795{
796 int signr;
797 sigset_t mask;
798 siginfo_t info;
799 struct lttng_consumer_local_data *ctx = data;
800
8a9acb74
MD
801 rcu_register_thread();
802
1fc79fb4
MD
803 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
804
2d57de81
MD
805 if (testpoint(consumerd_thread_metadata_timer)) {
806 goto error_testpoint;
807 }
808
9ce5646a
MD
809 health_code_update();
810
331744e3
JD
811 /* Only self thread will receive signal mask. */
812 setmask(&mask);
813 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
814
815 while (1) {
9ce5646a
MD
816 health_code_update();
817
818 health_poll_entry();
331744e3 819 signr = sigwaitinfo(&mask, &info);
9ce5646a 820 health_poll_exit();
e9404c27
JG
821
822 /*
823 * NOTE: cascading conditions are used instead of a switch case
824 * since the use of SIGRTMIN in the definition of the signals'
825 * values prevents the reduction to an integer constant.
826 */
331744e3
JD
827 if (signr == -1) {
828 if (errno != EINTR) {
829 PERROR("sigwaitinfo");
830 }
831 continue;
832 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 833 metadata_switch_timer(ctx, &info);
331744e3
JD
834 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
835 cmm_smp_mb();
836 CMM_STORE_SHARED(timer_signal.qs_done, 1);
837 cmm_smp_mb();
838 DBG("Signal timer metadata thread teardown");
d3e2ba59 839 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 840 live_timer(ctx, &info);
e9404c27
JG
841 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
842 struct lttng_consumer_channel *channel;
843
844 channel = info.si_value.sival_ptr;
5704917f 845 monitor_timer(channel);
13675d0e
MD
846 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
847 assert(CMM_LOAD_SHARED(consumer_quit));
848 goto end;
331744e3
JD
849 } else {
850 ERR("Unexpected signal %d\n", info.si_signo);
851 }
852 }
853
2d57de81
MD
854error_testpoint:
855 /* Only reached in testpoint error */
856 health_error();
13675d0e 857end:
1fc79fb4 858 health_unregister(health_consumerd);
8a9acb74 859 rcu_unregister_thread();
331744e3
JD
860 return NULL;
861}
This page took 0.084964 seconds and 4 git commands to generate.