Fix: memory/fd leak when cleaning streams in channel
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
d3e2ba59
JD
26#include <common/kernel-ctl/kernel-ctl.h>
27#include <common/kernel-consumer/kernel-consumer.h>
28#include <common/consumer-stream.h>
331744e3
JD
29
30#include "consumer-timer.h"
2d57de81 31#include "consumer-testpoint.h"
331744e3
JD
32#include "ust-consumer/ust-consumer.h"
33
2b8f8754
MD
34static struct timer_signal_data timer_signal = {
35 .tid = 0,
36 .setup_done = 0,
37 .qs_done = 0,
38 .lock = PTHREAD_MUTEX_INITIALIZER,
39};
331744e3
JD
40
41/*
42 * Set custom signal mask to current thread.
43 */
44static void setmask(sigset_t *mask)
45{
46 int ret;
47
48 ret = sigemptyset(mask);
49 if (ret) {
50 PERROR("sigemptyset");
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
53 if (ret) {
d3e2ba59 54 PERROR("sigaddset switch");
331744e3
JD
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
57 if (ret) {
d3e2ba59
JD
58 PERROR("sigaddset teardown");
59 }
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
61 if (ret) {
62 PERROR("sigaddset live");
331744e3
JD
63 }
64}
65
66/*
67 * Execute action on a timer switch.
d98a47c7
MD
68 *
69 * Beware: metadata_switch_timer() should *never* take a mutex also held
70 * while consumer_timer_switch_stop() is called. It would result in
71 * deadlocks.
331744e3
JD
72 */
73static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
74 int sig, siginfo_t *si, void *uc)
75{
76 int ret;
77 struct lttng_consumer_channel *channel;
78
79 channel = si->si_value.sival_ptr;
80 assert(channel);
81
4419b4fb
MD
82 if (channel->switch_timer_error) {
83 return;
84 }
85
331744e3
JD
86 DBG("Switch timer for channel %" PRIu64, channel->key);
87 switch (ctx->type) {
88 case LTTNG_CONSUMER32_UST:
89 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
90 /*
91 * Locks taken by lttng_ustconsumer_request_metadata():
92 * - metadata_socket_lock
93 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 94 * - channel->metadata_cache->lock
4fa3dc0e 95 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
96 * - channel->timer_lock
97 * - channel->metadata_cache->lock
4fa3dc0e 98 *
5e41ebe1
MD
99 * Ensure that neither consumer_data.lock nor
100 * channel->lock are taken within this function, since
101 * they are held while consumer_timer_switch_stop() is
102 * called.
4fa3dc0e 103 */
94d49140 104 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 105 if (ret < 0) {
4419b4fb 106 channel->switch_timer_error = 1;
331744e3
JD
107 }
108 break;
109 case LTTNG_CONSUMER_KERNEL:
110 case LTTNG_CONSUMER_UNKNOWN:
111 assert(0);
112 break;
113 }
114}
115
d3e2ba59
JD
116static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
117{
118 int ret;
50adc264 119 struct ctf_packet_index index;
d3e2ba59
JD
120
121 memset(&index, 0, sizeof(index));
122 index.timestamp_end = htobe64(ts);
123 ret = consumer_stream_write_index(stream, &index);
124 if (ret < 0) {
125 goto error;
126 }
127
128error:
129 return ret;
130}
131
132static int check_kernel_stream(struct lttng_consumer_stream *stream)
133{
134 uint64_t ts;
135 int ret;
136
137 /*
138 * While holding the stream mutex, try to take a snapshot, if it
139 * succeeds, it means that data is ready to be sent, just let the data
140 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
141 * means that there is no data to read after the flush, so we can
142 * safely send the empty index.
143 */
144 pthread_mutex_lock(&stream->lock);
145 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
146 if (ret < 0) {
147 ERR("Failed to get the current timestamp");
148 goto error_unlock;
149 }
150 ret = kernctl_buffer_flush(stream->wait_fd);
151 if (ret < 0) {
152 ERR("Failed to flush kernel stream");
153 goto error_unlock;
154 }
155 ret = kernctl_snapshot(stream->wait_fd);
156 if (ret < 0) {
157 if (errno != EAGAIN) {
158 ERR("Taking kernel snapshot");
159 ret = -1;
160 goto error_unlock;
161 }
162 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
163 ret = send_empty_index(stream, ts);
164 if (ret < 0) {
165 goto error_unlock;
166 }
167 }
168 ret = 0;
169
170error_unlock:
171 pthread_mutex_unlock(&stream->lock);
172 return ret;
173}
174
175static int check_ust_stream(struct lttng_consumer_stream *stream)
176{
177 uint64_t ts;
178 int ret;
179
180 assert(stream);
181 assert(stream->ustream);
182 /*
183 * While holding the stream mutex, try to take a snapshot, if it
184 * succeeds, it means that data is ready to be sent, just let the data
185 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
186 * means that there is no data to read after the flush, so we can
187 * safely send the empty index.
188 */
189 pthread_mutex_lock(&stream->lock);
94d49140
JD
190 ret = cds_lfht_is_node_deleted(&stream->node.node);
191 if (ret) {
192 goto error_unlock;
193 }
194
84a182ce 195 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
196 if (ret < 0) {
197 ERR("Failed to get the current timestamp");
198 goto error_unlock;
199 }
84a182ce
DG
200 lttng_ustconsumer_flush_buffer(stream, 1);
201 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 202 if (ret < 0) {
94d49140 203 if (ret != -EAGAIN) {
d3e2ba59
JD
204 ERR("Taking UST snapshot");
205 ret = -1;
206 goto error_unlock;
207 }
208 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
209 ret = send_empty_index(stream, ts);
210 if (ret < 0) {
211 goto error_unlock;
212 }
213 }
214 ret = 0;
215
216error_unlock:
217 pthread_mutex_unlock(&stream->lock);
218 return ret;
219}
220
221/*
222 * Execute action on a live timer
223 */
224static void live_timer(struct lttng_consumer_local_data *ctx,
225 int sig, siginfo_t *si, void *uc)
226{
227 int ret;
228 struct lttng_consumer_channel *channel;
229 struct lttng_consumer_stream *stream;
230 struct lttng_ht *ht;
231 struct lttng_ht_iter iter;
232
233 channel = si->si_value.sival_ptr;
234 assert(channel);
235
236 if (channel->switch_timer_error) {
237 goto error;
238 }
239 ht = consumer_data.stream_per_chan_id_ht;
240
241 DBG("Live timer for channel %" PRIu64, channel->key);
242
243 rcu_read_lock();
244 switch (ctx->type) {
245 case LTTNG_CONSUMER32_UST:
246 case LTTNG_CONSUMER64_UST:
247 cds_lfht_for_each_entry_duplicate(ht->ht,
248 ht->hash_fct(&channel->key, lttng_ht_seed),
249 ht->match_fct, &channel->key, &iter.iter,
250 stream, node_channel_id.node) {
251 ret = check_ust_stream(stream);
252 if (ret < 0) {
253 goto error_unlock;
254 }
255 }
256 break;
257 case LTTNG_CONSUMER_KERNEL:
258 cds_lfht_for_each_entry_duplicate(ht->ht,
259 ht->hash_fct(&channel->key, lttng_ht_seed),
260 ht->match_fct, &channel->key, &iter.iter,
261 stream, node_channel_id.node) {
262 ret = check_kernel_stream(stream);
263 if (ret < 0) {
264 goto error_unlock;
265 }
266 }
267 break;
268 case LTTNG_CONSUMER_UNKNOWN:
269 assert(0);
270 break;
271 }
272
273error_unlock:
274 rcu_read_unlock();
275
276error:
277 return;
278}
279
2b8f8754
MD
280static
281void consumer_timer_signal_thread_qs(unsigned int signr)
282{
283 sigset_t pending_set;
284 int ret;
285
286 /*
287 * We need to be the only thread interacting with the thread
288 * that manages signals for teardown synchronization.
289 */
290 pthread_mutex_lock(&timer_signal.lock);
291
292 /* Ensure we don't have any signal queued for this channel. */
293 for (;;) {
294 ret = sigemptyset(&pending_set);
295 if (ret == -1) {
296 PERROR("sigemptyset");
297 }
298 ret = sigpending(&pending_set);
299 if (ret == -1) {
300 PERROR("sigpending");
301 }
302 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
303 break;
304 }
305 caa_cpu_relax();
306 }
307
308 /*
309 * From this point, no new signal handler will be fired that would try to
310 * access "chan". However, we still need to wait for any currently
311 * executing handler to complete.
312 */
313 cmm_smp_mb();
314 CMM_STORE_SHARED(timer_signal.qs_done, 0);
315 cmm_smp_mb();
316
317 /*
318 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
319 * up.
320 */
321 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
322
323 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
324 caa_cpu_relax();
325 }
326 cmm_smp_mb();
327
328 pthread_mutex_unlock(&timer_signal.lock);
329}
330
331744e3
JD
331/*
332 * Set the timer for periodical metadata flush.
333 */
334void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
335 unsigned int switch_timer_interval)
336{
337 int ret;
338 struct sigevent sev;
339 struct itimerspec its;
340
341 assert(channel);
342 assert(channel->key);
343
344 if (switch_timer_interval == 0) {
345 return;
346 }
347
348 sev.sigev_notify = SIGEV_SIGNAL;
349 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
350 sev.sigev_value.sival_ptr = channel;
351 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
352 if (ret == -1) {
353 PERROR("timer_create");
354 }
355 channel->switch_timer_enabled = 1;
356
357 its.it_value.tv_sec = switch_timer_interval / 1000000;
358 its.it_value.tv_nsec = switch_timer_interval % 1000000;
359 its.it_interval.tv_sec = its.it_value.tv_sec;
360 its.it_interval.tv_nsec = its.it_value.tv_nsec;
361
362 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
363 if (ret == -1) {
364 PERROR("timer_settime");
365 }
366}
367
368/*
369 * Stop and delete timer.
370 */
371void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
372{
373 int ret;
331744e3
JD
374
375 assert(channel);
376
377 ret = timer_delete(channel->switch_timer);
378 if (ret == -1) {
379 PERROR("timer_delete");
380 }
381
2b8f8754 382 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 383
2b8f8754
MD
384 channel->switch_timer = 0;
385 channel->switch_timer_enabled = 0;
331744e3
JD
386}
387
d3e2ba59
JD
388/*
389 * Set the timer for the live mode.
390 */
391void consumer_timer_live_start(struct lttng_consumer_channel *channel,
392 int live_timer_interval)
393{
394 int ret;
395 struct sigevent sev;
396 struct itimerspec its;
397
398 assert(channel);
399 assert(channel->key);
400
fac41e72 401 if (live_timer_interval <= 0) {
d3e2ba59
JD
402 return;
403 }
404
405 sev.sigev_notify = SIGEV_SIGNAL;
406 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
407 sev.sigev_value.sival_ptr = channel;
408 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
409 if (ret == -1) {
410 PERROR("timer_create");
411 }
412 channel->live_timer_enabled = 1;
413
414 its.it_value.tv_sec = live_timer_interval / 1000000;
415 its.it_value.tv_nsec = live_timer_interval % 1000000;
416 its.it_interval.tv_sec = its.it_value.tv_sec;
417 its.it_interval.tv_nsec = its.it_value.tv_nsec;
418
419 ret = timer_settime(channel->live_timer, 0, &its, NULL);
420 if (ret == -1) {
421 PERROR("timer_settime");
422 }
423}
424
425/*
426 * Stop and delete timer.
427 */
428void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
429{
430 int ret;
431
432 assert(channel);
433
434 ret = timer_delete(channel->live_timer);
435 if (ret == -1) {
436 PERROR("timer_delete");
437 }
438
439 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
440
441 channel->live_timer = 0;
442 channel->live_timer_enabled = 0;
443}
444
331744e3
JD
445/*
446 * Block the RT signals for the entire process. It must be called from the
447 * consumer main before creating the threads
448 */
449void consumer_signal_init(void)
450{
451 int ret;
452 sigset_t mask;
453
454 /* Block signal for entire process, so only our thread processes it. */
455 setmask(&mask);
456 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
457 if (ret) {
458 errno = ret;
459 PERROR("pthread_sigmask");
460 }
461}
462
463/*
d3e2ba59
JD
464 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
465 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 466 */
d3e2ba59 467void *consumer_timer_thread(void *data)
331744e3
JD
468{
469 int signr;
470 sigset_t mask;
471 siginfo_t info;
472 struct lttng_consumer_local_data *ctx = data;
473
1fc79fb4
MD
474 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
475
2d57de81
MD
476 if (testpoint(consumerd_thread_metadata_timer)) {
477 goto error_testpoint;
478 }
479
9ce5646a
MD
480 health_code_update();
481
331744e3
JD
482 /* Only self thread will receive signal mask. */
483 setmask(&mask);
484 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
485
486 while (1) {
9ce5646a
MD
487 health_code_update();
488
489 health_poll_entry();
331744e3 490 signr = sigwaitinfo(&mask, &info);
9ce5646a 491 health_poll_exit();
331744e3
JD
492 if (signr == -1) {
493 if (errno != EINTR) {
494 PERROR("sigwaitinfo");
495 }
496 continue;
497 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
498 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
499 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
500 cmm_smp_mb();
501 CMM_STORE_SHARED(timer_signal.qs_done, 1);
502 cmm_smp_mb();
503 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
504 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
505 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
506 } else {
507 ERR("Unexpected signal %d\n", info.si_signo);
508 }
509 }
510
2d57de81
MD
511error_testpoint:
512 /* Only reached in testpoint error */
513 health_error();
1fc79fb4
MD
514 health_unregister(health_consumerd);
515
516 /* Never return */
331744e3
JD
517 return NULL;
518}
This page took 0.064635 seconds and 4 git commands to generate.