Fix: get the stream_id when generating live beacons
[lttng-tools.git] / src / common / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer-stream.h>
30 #include <lttng/ust-ctl.h>
31
32 #include "consumer-timer.h"
33 #include "consumer-testpoint.h"
34 #include "ust-consumer/ust-consumer.h"
35
36 static struct timer_signal_data timer_signal = {
37 .tid = 0,
38 .setup_done = 0,
39 .qs_done = 0,
40 .lock = PTHREAD_MUTEX_INITIALIZER,
41 };
42
43 /*
44 * Set custom signal mask to current thread.
45 */
46 static void setmask(sigset_t *mask)
47 {
48 int ret;
49
50 ret = sigemptyset(mask);
51 if (ret) {
52 PERROR("sigemptyset");
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
55 if (ret) {
56 PERROR("sigaddset switch");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
59 if (ret) {
60 PERROR("sigaddset teardown");
61 }
62 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
63 if (ret) {
64 PERROR("sigaddset live");
65 }
66 }
67
68 /*
69 * Execute action on a timer switch.
70 *
71 * Beware: metadata_switch_timer() should *never* take a mutex also held
72 * while consumer_timer_switch_stop() is called. It would result in
73 * deadlocks.
74 */
75 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
76 int sig, siginfo_t *si, void *uc)
77 {
78 int ret;
79 struct lttng_consumer_channel *channel;
80
81 channel = si->si_value.sival_ptr;
82 assert(channel);
83
84 if (channel->switch_timer_error) {
85 return;
86 }
87
88 DBG("Switch timer for channel %" PRIu64, channel->key);
89 switch (ctx->type) {
90 case LTTNG_CONSUMER32_UST:
91 case LTTNG_CONSUMER64_UST:
92 /*
93 * Locks taken by lttng_ustconsumer_request_metadata():
94 * - metadata_socket_lock
95 * - Calling lttng_ustconsumer_recv_metadata():
96 * - channel->metadata_cache->lock
97 * - Calling consumer_metadata_cache_flushed():
98 * - channel->timer_lock
99 * - channel->metadata_cache->lock
100 *
101 * Ensure that neither consumer_data.lock nor
102 * channel->lock are taken within this function, since
103 * they are held while consumer_timer_switch_stop() is
104 * called.
105 */
106 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
107 if (ret < 0) {
108 channel->switch_timer_error = 1;
109 }
110 break;
111 case LTTNG_CONSUMER_KERNEL:
112 case LTTNG_CONSUMER_UNKNOWN:
113 assert(0);
114 break;
115 }
116 }
117
118 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
119 uint64_t stream_id)
120 {
121 int ret;
122 struct ctf_packet_index index;
123
124 memset(&index, 0, sizeof(index));
125 index.stream_id = htobe64(stream_id);
126 index.timestamp_end = htobe64(ts);
127 ret = consumer_stream_write_index(stream, &index);
128 if (ret < 0) {
129 goto error;
130 }
131
132 error:
133 return ret;
134 }
135
136 static int check_kernel_stream(struct lttng_consumer_stream *stream)
137 {
138 uint64_t ts, stream_id;
139 int ret;
140
141 /*
142 * While holding the stream mutex, try to take a snapshot, if it
143 * succeeds, it means that data is ready to be sent, just let the data
144 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
145 * means that there is no data to read after the flush, so we can
146 * safely send the empty index.
147 */
148 pthread_mutex_lock(&stream->lock);
149 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
150 if (ret < 0) {
151 ERR("Failed to get the current timestamp");
152 goto error_unlock;
153 }
154 ret = kernctl_buffer_flush(stream->wait_fd);
155 if (ret < 0) {
156 ERR("Failed to flush kernel stream");
157 goto error_unlock;
158 }
159 ret = kernctl_snapshot(stream->wait_fd);
160 if (ret < 0) {
161 if (errno != EAGAIN && errno != ENODATA) {
162 PERROR("live timer kernel snapshot");
163 ret = -1;
164 goto error_unlock;
165 }
166 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
167 if (ret < 0) {
168 PERROR("kernctl_get_stream_id");
169 goto error_unlock;
170 }
171 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
172 ret = send_empty_index(stream, ts, stream_id);
173 if (ret < 0) {
174 goto error_unlock;
175 }
176 }
177 ret = 0;
178
179 error_unlock:
180 pthread_mutex_unlock(&stream->lock);
181 return ret;
182 }
183
184 static int check_ust_stream(struct lttng_consumer_stream *stream)
185 {
186 uint64_t ts, stream_id;
187 int ret;
188
189 assert(stream);
190 assert(stream->ustream);
191 /*
192 * While holding the stream mutex, try to take a snapshot, if it
193 * succeeds, it means that data is ready to be sent, just let the data
194 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
195 * means that there is no data to read after the flush, so we can
196 * safely send the empty index.
197 */
198 pthread_mutex_lock(&stream->lock);
199 ret = cds_lfht_is_node_deleted(&stream->node.node);
200 if (ret) {
201 goto error_unlock;
202 }
203
204 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
205 if (ret < 0) {
206 ERR("Failed to get the current timestamp");
207 goto error_unlock;
208 }
209 lttng_ustconsumer_flush_buffer(stream, 1);
210 ret = lttng_ustconsumer_take_snapshot(stream);
211 if (ret < 0) {
212 if (ret != -EAGAIN) {
213 ERR("Taking UST snapshot");
214 ret = -1;
215 goto error_unlock;
216 }
217 ret = ustctl_get_stream_id(stream->ustream, &stream_id);
218 if (ret < 0) {
219 PERROR("ustctl_get_stream_id");
220 goto error_unlock;
221 }
222 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
223 ret = send_empty_index(stream, ts, stream_id);
224 if (ret < 0) {
225 goto error_unlock;
226 }
227 }
228 ret = 0;
229
230 error_unlock:
231 pthread_mutex_unlock(&stream->lock);
232 return ret;
233 }
234
235 /*
236 * Execute action on a live timer
237 */
238 static void live_timer(struct lttng_consumer_local_data *ctx,
239 int sig, siginfo_t *si, void *uc)
240 {
241 int ret;
242 struct lttng_consumer_channel *channel;
243 struct lttng_consumer_stream *stream;
244 struct lttng_ht *ht;
245 struct lttng_ht_iter iter;
246
247 channel = si->si_value.sival_ptr;
248 assert(channel);
249
250 if (channel->switch_timer_error) {
251 goto error;
252 }
253 ht = consumer_data.stream_per_chan_id_ht;
254
255 DBG("Live timer for channel %" PRIu64, channel->key);
256
257 rcu_read_lock();
258 switch (ctx->type) {
259 case LTTNG_CONSUMER32_UST:
260 case LTTNG_CONSUMER64_UST:
261 cds_lfht_for_each_entry_duplicate(ht->ht,
262 ht->hash_fct(&channel->key, lttng_ht_seed),
263 ht->match_fct, &channel->key, &iter.iter,
264 stream, node_channel_id.node) {
265 ret = check_ust_stream(stream);
266 if (ret < 0) {
267 goto error_unlock;
268 }
269 }
270 break;
271 case LTTNG_CONSUMER_KERNEL:
272 cds_lfht_for_each_entry_duplicate(ht->ht,
273 ht->hash_fct(&channel->key, lttng_ht_seed),
274 ht->match_fct, &channel->key, &iter.iter,
275 stream, node_channel_id.node) {
276 ret = check_kernel_stream(stream);
277 if (ret < 0) {
278 goto error_unlock;
279 }
280 }
281 break;
282 case LTTNG_CONSUMER_UNKNOWN:
283 assert(0);
284 break;
285 }
286
287 error_unlock:
288 rcu_read_unlock();
289
290 error:
291 return;
292 }
293
294 static
295 void consumer_timer_signal_thread_qs(unsigned int signr)
296 {
297 sigset_t pending_set;
298 int ret;
299
300 /*
301 * We need to be the only thread interacting with the thread
302 * that manages signals for teardown synchronization.
303 */
304 pthread_mutex_lock(&timer_signal.lock);
305
306 /* Ensure we don't have any signal queued for this channel. */
307 for (;;) {
308 ret = sigemptyset(&pending_set);
309 if (ret == -1) {
310 PERROR("sigemptyset");
311 }
312 ret = sigpending(&pending_set);
313 if (ret == -1) {
314 PERROR("sigpending");
315 }
316 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
317 break;
318 }
319 caa_cpu_relax();
320 }
321
322 /*
323 * From this point, no new signal handler will be fired that would try to
324 * access "chan". However, we still need to wait for any currently
325 * executing handler to complete.
326 */
327 cmm_smp_mb();
328 CMM_STORE_SHARED(timer_signal.qs_done, 0);
329 cmm_smp_mb();
330
331 /*
332 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
333 * up.
334 */
335 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
336
337 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
338 caa_cpu_relax();
339 }
340 cmm_smp_mb();
341
342 pthread_mutex_unlock(&timer_signal.lock);
343 }
344
345 /*
346 * Set the timer for periodical metadata flush.
347 */
348 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
349 unsigned int switch_timer_interval)
350 {
351 int ret;
352 struct sigevent sev;
353 struct itimerspec its;
354
355 assert(channel);
356 assert(channel->key);
357
358 if (switch_timer_interval == 0) {
359 return;
360 }
361
362 sev.sigev_notify = SIGEV_SIGNAL;
363 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
364 sev.sigev_value.sival_ptr = channel;
365 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
366 if (ret == -1) {
367 PERROR("timer_create");
368 }
369 channel->switch_timer_enabled = 1;
370
371 its.it_value.tv_sec = switch_timer_interval / 1000000;
372 its.it_value.tv_nsec = switch_timer_interval % 1000000;
373 its.it_interval.tv_sec = its.it_value.tv_sec;
374 its.it_interval.tv_nsec = its.it_value.tv_nsec;
375
376 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
377 if (ret == -1) {
378 PERROR("timer_settime");
379 }
380 }
381
382 /*
383 * Stop and delete timer.
384 */
385 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
386 {
387 int ret;
388
389 assert(channel);
390
391 ret = timer_delete(channel->switch_timer);
392 if (ret == -1) {
393 PERROR("timer_delete");
394 }
395
396 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
397
398 channel->switch_timer = 0;
399 channel->switch_timer_enabled = 0;
400 }
401
402 /*
403 * Set the timer for the live mode.
404 */
405 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
406 int live_timer_interval)
407 {
408 int ret;
409 struct sigevent sev;
410 struct itimerspec its;
411
412 assert(channel);
413 assert(channel->key);
414
415 if (live_timer_interval <= 0) {
416 return;
417 }
418
419 sev.sigev_notify = SIGEV_SIGNAL;
420 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
421 sev.sigev_value.sival_ptr = channel;
422 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
423 if (ret == -1) {
424 PERROR("timer_create");
425 }
426 channel->live_timer_enabled = 1;
427
428 its.it_value.tv_sec = live_timer_interval / 1000000;
429 its.it_value.tv_nsec = live_timer_interval % 1000000;
430 its.it_interval.tv_sec = its.it_value.tv_sec;
431 its.it_interval.tv_nsec = its.it_value.tv_nsec;
432
433 ret = timer_settime(channel->live_timer, 0, &its, NULL);
434 if (ret == -1) {
435 PERROR("timer_settime");
436 }
437 }
438
439 /*
440 * Stop and delete timer.
441 */
442 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
443 {
444 int ret;
445
446 assert(channel);
447
448 ret = timer_delete(channel->live_timer);
449 if (ret == -1) {
450 PERROR("timer_delete");
451 }
452
453 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
454
455 channel->live_timer = 0;
456 channel->live_timer_enabled = 0;
457 }
458
459 /*
460 * Block the RT signals for the entire process. It must be called from the
461 * consumer main before creating the threads
462 */
463 void consumer_signal_init(void)
464 {
465 int ret;
466 sigset_t mask;
467
468 /* Block signal for entire process, so only our thread processes it. */
469 setmask(&mask);
470 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
471 if (ret) {
472 errno = ret;
473 PERROR("pthread_sigmask");
474 }
475 }
476
477 /*
478 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
479 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
480 */
481 void *consumer_timer_thread(void *data)
482 {
483 int signr;
484 sigset_t mask;
485 siginfo_t info;
486 struct lttng_consumer_local_data *ctx = data;
487
488 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
489
490 if (testpoint(consumerd_thread_metadata_timer)) {
491 goto error_testpoint;
492 }
493
494 health_code_update();
495
496 /* Only self thread will receive signal mask. */
497 setmask(&mask);
498 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
499
500 while (1) {
501 health_code_update();
502
503 health_poll_entry();
504 signr = sigwaitinfo(&mask, &info);
505 health_poll_exit();
506 if (signr == -1) {
507 if (errno != EINTR) {
508 PERROR("sigwaitinfo");
509 }
510 continue;
511 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
512 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
513 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
514 cmm_smp_mb();
515 CMM_STORE_SHARED(timer_signal.qs_done, 1);
516 cmm_smp_mb();
517 DBG("Signal timer metadata thread teardown");
518 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
519 live_timer(ctx, info.si_signo, &info, NULL);
520 } else {
521 ERR("Unexpected signal %d\n", info.si_signo);
522 }
523 }
524
525 error_testpoint:
526 /* Only reached in testpoint error */
527 health_error();
528 health_unregister(health_consumerd);
529
530 /* Never return */
531 return NULL;
532 }
This page took 0.045408 seconds and 4 git commands to generate.