From: Mathieu Desnoyers Date: Mon, 11 Mar 2013 14:12:40 +0000 (-0400) Subject: Implement read timer (for RT) X-Git-Tag: v2.2.0-rc1~51 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=34a91bdb42a2a3b01b687ab5e1ba7638401e6dfc;p=lttng-ust.git Implement read timer (for RT) Signed-off-by: Mathieu Desnoyers --- diff --git a/include/lttng/ust-events.h b/include/lttng/ust-events.h index 363fcb5c..2ffadde1 100644 --- a/include/lttng/ust-events.h +++ b/include/lttng/ust-events.h @@ -54,6 +54,8 @@ enum lttng_client_types { LTTNG_CLIENT_METADATA = 0, LTTNG_CLIENT_DISCARD = 1, LTTNG_CLIENT_OVERWRITE = 2, + LTTNG_CLIENT_DISCARD_RT = 3, + LTTNG_CLIENT_OVERWRITE_RT = 4, LTTNG_NR_CLIENT_TYPES, }; diff --git a/liblttng-ust-ctl/ustctl.c b/liblttng-ust-ctl/ustctl.c index 18bf614b..67aae682 100644 --- a/liblttng-ust-ctl/ustctl.c +++ b/liblttng-ust-ctl/ustctl.c @@ -915,8 +915,19 @@ struct ustctl_consumer_channel * switch (attr->type) { case LTTNG_UST_CHAN_PER_CPU: if (attr->output == LTTNG_UST_MMAP) { - transport_name = attr->overwrite ? - "relay-overwrite-mmap" : "relay-discard-mmap"; + if (attr->overwrite) { + if (attr->read_timer_interval == 0) { + transport_name = "relay-overwrite-mmap"; + } else { + transport_name = "relay-overwrite-rt-mmap"; + } + } else { + if (attr->read_timer_interval == 0) { + transport_name = "relay-discard-mmap"; + } else { + transport_name = "relay-discard-rt-mmap"; + } + } } else { return NULL; } @@ -1758,14 +1769,18 @@ void ustctl_init(void) init_usterr(); lttng_ring_buffer_metadata_client_init(); lttng_ring_buffer_client_overwrite_init(); + lttng_ring_buffer_client_overwrite_rt_init(); lttng_ring_buffer_client_discard_init(); + lttng_ring_buffer_client_discard_rt_init(); lib_ringbuffer_signal_init(); } static __attribute__((destructor)) void ustctl_exit(void) { + lttng_ring_buffer_client_discard_rt_exit(); lttng_ring_buffer_client_discard_exit(); + lttng_ring_buffer_client_overwrite_rt_exit(); lttng_ring_buffer_client_overwrite_exit(); lttng_ring_buffer_metadata_client_exit(); } diff --git a/liblttng-ust/Makefile.am b/liblttng-ust/Makefile.am index a5076c6e..cc105d7d 100644 --- a/liblttng-ust/Makefile.am +++ b/liblttng-ust/Makefile.am @@ -47,7 +47,9 @@ liblttng_ust_support_la_SOURCES = \ ust-core.c \ lttng-ring-buffer-client.h \ lttng-ring-buffer-client-discard.c \ + lttng-ring-buffer-client-discard-rt.c \ lttng-ring-buffer-client-overwrite.c \ + lttng-ring-buffer-client-overwrite-rt.c \ lttng-ring-buffer-metadata-client.h \ lttng-ring-buffer-metadata-client.c diff --git a/liblttng-ust/lttng-ring-buffer-client-discard-rt.c b/liblttng-ust/lttng-ring-buffer-client-discard-rt.c new file mode 100644 index 00000000..301328f9 --- /dev/null +++ b/liblttng-ust/lttng-ring-buffer-client-discard-rt.c @@ -0,0 +1,35 @@ +/* + * lttng-ring-buffer-client-discard-rt.c + * + * LTTng lib ring buffer client (discard mode) for RT. + * + * Copyright (C) 2010-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#define _GNU_SOURCE +#include "lttng-tracer.h" + +#define RING_BUFFER_MODE_TEMPLATE RING_BUFFER_DISCARD +#define RING_BUFFER_MODE_TEMPLATE_STRING "discard-rt" +#define RING_BUFFER_MODE_TEMPLATE_INIT \ + lttng_ring_buffer_client_discard_rt_init +#define RING_BUFFER_MODE_TEMPLATE_EXIT \ + lttng_ring_buffer_client_discard_rt_exit +#define LTTNG_CLIENT_TYPE LTTNG_CLIENT_DISCARD_RT +#define LTTNG_CLIENT_CALLBACKS lttng_client_callbacks_discard_rt +#define LTTNG_CLIENT_WAKEUP RING_BUFFER_WAKEUP_BY_TIMER +#include "lttng-ring-buffer-client.h" diff --git a/liblttng-ust/lttng-ring-buffer-client-discard.c b/liblttng-ust/lttng-ring-buffer-client-discard.c index 0463864b..2e469b7a 100644 --- a/liblttng-ust/lttng-ring-buffer-client-discard.c +++ b/liblttng-ust/lttng-ring-buffer-client-discard.c @@ -31,4 +31,5 @@ lttng_ring_buffer_client_discard_exit #define LTTNG_CLIENT_TYPE LTTNG_CLIENT_DISCARD #define LTTNG_CLIENT_CALLBACKS lttng_client_callbacks_discard +#define LTTNG_CLIENT_WAKEUP RING_BUFFER_WAKEUP_BY_WRITER #include "lttng-ring-buffer-client.h" diff --git a/liblttng-ust/lttng-ring-buffer-client-overwrite-rt.c b/liblttng-ust/lttng-ring-buffer-client-overwrite-rt.c new file mode 100644 index 00000000..b4c5fe57 --- /dev/null +++ b/liblttng-ust/lttng-ring-buffer-client-overwrite-rt.c @@ -0,0 +1,35 @@ +/* + * lttng-ring-buffer-client-overwrite-rt.c + * + * LTTng lib ring buffer client (overwrite mode). + * + * Copyright (C) 2010-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#define _GNU_SOURCE +#include "lttng-tracer.h" + +#define RING_BUFFER_MODE_TEMPLATE RING_BUFFER_OVERWRITE +#define RING_BUFFER_MODE_TEMPLATE_STRING "overwrite-rt" +#define RING_BUFFER_MODE_TEMPLATE_INIT \ + lttng_ring_buffer_client_overwrite_rt_init +#define RING_BUFFER_MODE_TEMPLATE_EXIT \ + lttng_ring_buffer_client_overwrite_rt_exit +#define LTTNG_CLIENT_TYPE LTTNG_CLIENT_OVERWRITE_RT +#define LTTNG_CLIENT_CALLBACKS lttng_client_callbacks_overwrite_rt +#define LTTNG_CLIENT_WAKEUP RING_BUFFER_WAKEUP_BY_TIMER +#include "lttng-ring-buffer-client.h" diff --git a/liblttng-ust/lttng-ring-buffer-client-overwrite.c b/liblttng-ust/lttng-ring-buffer-client-overwrite.c index 3d6fdabf..e9cbc89d 100644 --- a/liblttng-ust/lttng-ring-buffer-client-overwrite.c +++ b/liblttng-ust/lttng-ring-buffer-client-overwrite.c @@ -31,4 +31,5 @@ lttng_ring_buffer_client_overwrite_exit #define LTTNG_CLIENT_TYPE LTTNG_CLIENT_OVERWRITE #define LTTNG_CLIENT_CALLBACKS lttng_client_callbacks_overwrite +#define LTTNG_CLIENT_WAKEUP RING_BUFFER_WAKEUP_BY_WRITER #include "lttng-ring-buffer-client.h" diff --git a/liblttng-ust/lttng-ring-buffer-client.h b/liblttng-ust/lttng-ring-buffer-client.h index 8c531cbf..a67c65ae 100644 --- a/liblttng-ust/lttng-ring-buffer-client.h +++ b/liblttng-ust/lttng-ring-buffer-client.h @@ -402,7 +402,7 @@ static const struct lttng_ust_lib_ring_buffer_config client_config = { .output = RING_BUFFER_MMAP, .oops = RING_BUFFER_OOPS_CONSISTENCY, .ipi = RING_BUFFER_NO_IPI_BARRIER, - .wakeup = RING_BUFFER_WAKEUP_BY_WRITER, + .wakeup = LTTNG_CLIENT_WAKEUP, .client_type = LTTNG_CLIENT_TYPE, }; @@ -556,12 +556,14 @@ static struct lttng_transport lttng_relay_transport = { void RING_BUFFER_MODE_TEMPLATE_INIT(void) { - DBG("LTT : ltt ring buffer client init\n"); + DBG("LTT : ltt ring buffer client \"%s\" init\n", + "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap"); lttng_transport_register(<tng_relay_transport); } void RING_BUFFER_MODE_TEMPLATE_EXIT(void) { - DBG("LTT : ltt ring buffer client exit\n"); + DBG("LTT : ltt ring buffer client \"%s\" exit\n", + "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap"); lttng_transport_unregister(<tng_relay_transport); } diff --git a/liblttng-ust/lttng-ring-buffer-metadata-client.h b/liblttng-ust/lttng-ring-buffer-metadata-client.h index 1e1b8a76..5c82187d 100644 --- a/liblttng-ust/lttng-ring-buffer-metadata-client.h +++ b/liblttng-ust/lttng-ring-buffer-metadata-client.h @@ -307,12 +307,14 @@ static struct lttng_transport lttng_relay_transport = { void RING_BUFFER_MODE_TEMPLATE_INIT(void) { - DBG("LTT : ltt ring buffer client init\n"); + DBG("LTT : ltt ring buffer client \"%s\" init\n", + "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap"); lttng_transport_register(<tng_relay_transport); } void RING_BUFFER_MODE_TEMPLATE_EXIT(void) { - DBG("LTT : ltt ring buffer client exit\n"); + DBG("LTT : ltt ring buffer client \"%s\" exit\n", + "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap"); lttng_transport_unregister(<tng_relay_transport); } diff --git a/liblttng-ust/lttng-ust-abi.c b/liblttng-ust/lttng-ust-abi.c index f26ab5c4..a852aaef 100644 --- a/liblttng-ust/lttng-ust-abi.c +++ b/liblttng-ust/lttng-ust-abi.c @@ -446,8 +446,19 @@ int lttng_abi_map_channel(int session_objd, switch (type) { case LTTNG_UST_CHAN_PER_CPU: if (config->output == RING_BUFFER_MMAP) { - transport_name = config->mode == RING_BUFFER_OVERWRITE ? - "relay-overwrite-mmap" : "relay-discard-mmap"; + if (config->mode == RING_BUFFER_OVERWRITE) { + if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER) { + transport_name = "relay-overwrite-mmap"; + } else { + transport_name = "relay-overwrite-rt-mmap"; + } + } else { + if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER) { + transport_name = "relay-discard-mmap"; + } else { + transport_name = "relay-discard-rt-mmap"; + } + } } else { ret = -EINVAL; goto notransport; diff --git a/liblttng-ust/lttng-ust-comm.c b/liblttng-ust/lttng-ust-comm.c index d91f4bd2..18763c45 100644 --- a/liblttng-ust/lttng-ust-comm.c +++ b/liblttng-ust/lttng-ust-comm.c @@ -178,10 +178,14 @@ static const char *str_timeout; static int got_timeout_env; extern void lttng_ring_buffer_client_overwrite_init(void); +extern void lttng_ring_buffer_client_overwrite_rt_init(void); extern void lttng_ring_buffer_client_discard_init(void); +extern void lttng_ring_buffer_client_discard_rt_init(void); extern void lttng_ring_buffer_metadata_client_init(void); extern void lttng_ring_buffer_client_overwrite_exit(void); +extern void lttng_ring_buffer_client_overwrite_rt_exit(void); extern void lttng_ring_buffer_client_discard_exit(void); +extern void lttng_ring_buffer_client_discard_rt_exit(void); extern void lttng_ring_buffer_metadata_client_exit(void); /* @@ -1116,7 +1120,9 @@ void __attribute__((constructor)) lttng_ust_init(void) init_tracepoint(); lttng_ring_buffer_metadata_client_init(); lttng_ring_buffer_client_overwrite_init(); + lttng_ring_buffer_client_overwrite_rt_init(); lttng_ring_buffer_client_discard_init(); + lttng_ring_buffer_client_discard_rt_init(); timeout_mode = get_constructor_timeout(&constructor_timeout); @@ -1219,7 +1225,9 @@ void lttng_ust_cleanup(int exiting) */ lttng_ust_abi_exit(); lttng_ust_events_exit(); + lttng_ring_buffer_client_discard_rt_exit(); lttng_ring_buffer_client_discard_exit(); + lttng_ring_buffer_client_overwrite_rt_exit(); lttng_ring_buffer_client_overwrite_exit(); lttng_ring_buffer_metadata_client_exit(); exit_tracepoint(); diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index a96746dc..77a43129 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -300,6 +300,71 @@ int lib_ring_buffer_reserve_committed(const struct lttng_ust_lib_ring_buffer_con - (commit_count & chan->commit_count_mask) == 0); } +static inline +void lib_ring_buffer_wakeup(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) +{ + int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); + sigset_t sigpipe_set, pending_set, old_set; + int ret, sigpipe_was_pending = 0; + + if (wakeup_fd < 0) + return; + + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the consumer), the consumer should perform + * the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) check if there is data in the buffer. + * 3) wait on the pipe (poll). + * + * Discard the SIGPIPE from write(), not disturbing any SIGPIPE + * that might be already pending. If a bogus SIGPIPE is sent to + * the entire process concurrently by a malicious user, it may + * be simply discarded. + */ + ret = sigemptyset(&pending_set); + assert(!ret); + /* + * sigpending returns the mask of signals that are _both_ + * blocked for the thread _and_ pending for either the thread or + * the entire process. + */ + ret = sigpending(&pending_set); + assert(!ret); + sigpipe_was_pending = sigismember(&pending_set, SIGPIPE); + /* + * If sigpipe was pending, it means it was already blocked, so + * no need to block it. + */ + if (!sigpipe_was_pending) { + ret = sigemptyset(&sigpipe_set); + assert(!ret); + ret = sigaddset(&sigpipe_set, SIGPIPE); + assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set); + assert(!ret); + } + do { + ret = write(wakeup_fd, "", 1); + } while (ret == -1L && errno == EINTR); + if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) { + struct timespec timeout = { 0, 0 }; + do { + ret = sigtimedwait(&sigpipe_set, NULL, + &timeout); + } while (ret == -1L && errno == EINTR); + } + if (!sigpipe_was_pending) { + ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL); + assert(!ret); + } +} + static inline void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer *buf, @@ -396,78 +461,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER && uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { - int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); - - if (wakeup_fd >= 0) { - sigset_t sigpipe_set, pending_set, old_set; - int ret, sigpipe_was_pending = 0; - - /* - * Wake-up the other end by - * writing a null byte in the - * pipe (non-blocking). - * Important note: Because - * writing into the pipe is - * non-blocking (and therefore - * we allow dropping wakeup - * data, as long as there is - * wakeup data present in the - * pipe buffer to wake up the - * consumer), the consumer - * should perform the following - * sequence for waiting: - * 1) empty the pipe (reads). - * 2) check if there is data in - * the buffer. - * 3) wait on the pipe (poll). - * - * Discard the SIGPIPE from write(), not - * disturbing any SIGPIPE that might be - * already pending. If a bogus SIGPIPE - * is sent to the entire process - * concurrently by a malicious user, it - * may be simply discarded. - */ - ret = sigemptyset(&pending_set); - assert(!ret); - /* - * sigpending returns the mask - * of signals that are _both_ - * blocked for the thread _and_ - * pending for either the thread - * or the entire process. - */ - ret = sigpending(&pending_set); - assert(!ret); - sigpipe_was_pending = sigismember(&pending_set, SIGPIPE); - /* - * If sigpipe was pending, it - * means it was already blocked, - * so no need to block it. - */ - if (!sigpipe_was_pending) { - ret = sigemptyset(&sigpipe_set); - assert(!ret); - ret = sigaddset(&sigpipe_set, SIGPIPE); - assert(!ret); - ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set); - assert(!ret); - } - do { - ret = write(wakeup_fd, "", 1); - } while (ret == -1L && errno == EINTR); - if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) { - struct timespec timeout = { 0, 0 }; - do { - ret = sigtimedwait(&sigpipe_set, NULL, - &timeout); - } while (ret == -1L && errno == EINTR); - } - if (!sigpipe_was_pending) { - ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL); - assert(!ret); - } - } + lib_ring_buffer_wakeup(buf, handle); } } } diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index c23fdd28..70e7bb7d 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -63,8 +63,9 @@ struct channel { int switch_timer_enabled; unsigned long read_timer_interval; /* Reader wakeup (us) */ - //timer_t read_timer; - //wait_queue_head_t read_wait; /* reader wait queue */ + timer_t read_timer; + int read_timer_enabled; + int finalized; /* Has channel been finalized */ size_t priv_data_offset; unsigned int nr_streams; /* Number of streams */ @@ -128,9 +129,7 @@ struct lttng_ust_lib_ring_buffer { unsigned long get_subbuf_consumed; /* Read-side consumed */ unsigned long prod_snapshot; /* Producer count snapshot */ unsigned long cons_snapshot; /* Consumer count snapshot */ - unsigned int get_subbuf:1, /* Sub-buffer being held by reader */ - switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */ - read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */ + unsigned int get_subbuf:1; /* Sub-buffer being held by reader */ /* shmp pointer to self */ DECLARE_SHMP(struct lttng_ust_lib_ring_buffer, self); char padding[RB_RING_BUFFER_PADDING]; diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index d0e94668..6504f0e6 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -80,8 +80,9 @@ /* Print DBG() messages about events lost only every 1048576 hits */ #define DBG_PRINT_NR_LOST (1UL << 20) -#define LTTNG_UST_RB_SIG SIGRTMIN -#define LTTNG_UST_RB_SIG_TEARDOWN SIGRTMIN + 1 +#define LTTNG_UST_RB_SIG_FLUSH SIGRTMIN +#define LTTNG_UST_RB_SIG_READ SIGRTMIN + 1 +#define LTTNG_UST_RB_SIG_TEARDOWN SIGRTMIN + 2 #define CLOCKID CLOCK_MONOTONIC /* @@ -282,7 +283,7 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc) handle = chan->handle; config = &chan->backend.config; - DBG("Timer for channel %p\n", chan); + DBG("Switch timer for channel %p\n", chan); /* * Only flush buffers periodically if readers are active. @@ -300,14 +301,64 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc) struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - if (uatomic_read(&buf->active_readers)) - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, - chan->handle); + if (uatomic_read(&buf->active_readers)) + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, + chan->handle); } pthread_mutex_unlock(&wakeup_fd_mutex); return; } +static +void lib_ring_buffer_channel_do_read(struct channel *chan) +{ + const struct lttng_ust_lib_ring_buffer_config *config; + struct lttng_ust_shm_handle *handle; + int cpu; + + handle = chan->handle; + config = &chan->backend.config; + + /* + * Only flush buffers periodically if readers are active. + */ + pthread_mutex_lock(&wakeup_fd_mutex); + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { + for_each_possible_cpu(cpu) { + struct lttng_ust_lib_ring_buffer *buf = + shmp(handle, chan->backend.buf[cpu].shmp); + + if (uatomic_read(&buf->active_readers) + && lib_ring_buffer_poll_deliver(config, buf, + chan, handle)) { + lib_ring_buffer_wakeup(buf, handle); + } + } + } else { + struct lttng_ust_lib_ring_buffer *buf = + shmp(handle, chan->backend.buf[0].shmp); + + if (uatomic_read(&buf->active_readers) + && lib_ring_buffer_poll_deliver(config, buf, + chan, handle)) { + lib_ring_buffer_wakeup(buf, handle); + } + } + pthread_mutex_unlock(&wakeup_fd_mutex); +} + +static +void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc) +{ + struct channel *chan; + + assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self()); + chan = si->si_value.sival_ptr; + DBG("Read timer for channel %p\n", chan); + lib_ring_buffer_channel_do_read(chan); + return; +} + static void rb_setmask(sigset_t *mask) { @@ -317,7 +368,11 @@ void rb_setmask(sigset_t *mask) if (ret) { PERROR("sigemptyset"); } - ret = sigaddset(mask, LTTNG_UST_RB_SIG); + ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH); + if (ret) { + PERROR("sigaddset"); + } + ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ); if (ret) { PERROR("sigaddset"); } @@ -341,12 +396,16 @@ void *sig_thread(void *arg) for (;;) { signr = sigwaitinfo(&mask, &info); if (signr == -1) { - PERROR("sigwaitinfo"); + if (errno != EINTR) + PERROR("sigwaitinfo"); continue; } - if (signr == LTTNG_UST_RB_SIG) { + if (signr == LTTNG_UST_RB_SIG_FLUSH) { lib_ring_buffer_channel_switch_timer(info.si_signo, &info, NULL); + } else if (signr == LTTNG_UST_RB_SIG_READ) { + lib_ring_buffer_channel_read_timer(info.si_signo, + &info, NULL); } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) { cmm_smp_mb(); CMM_STORE_SHARED(timer_signal.qs_done, 1); @@ -402,7 +461,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) lib_ring_buffer_setup_timer_thread(); sev.sigev_notify = SIGEV_SIGNAL; - sev.sigev_signo = LTTNG_UST_RB_SIG; + sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH; sev.sigev_value.sival_ptr = chan; ret = timer_create(CLOCKID, &sev, &chan->switch_timer); if (ret == -1) { @@ -427,7 +486,7 @@ static void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) { sigset_t pending_set; - int sig_is_pending, ret; + int ret; if (!chan->switch_timer_interval || !chan->switch_timer_enabled) return; @@ -449,8 +508,7 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) if (ret == -1) { PERROR("sigpending"); } - sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG); - if (!sig_is_pending) + if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH)) break; caa_cpu_relax(); } @@ -478,82 +536,108 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) chan->switch_timer_enabled = 0; } -#if 0 /* - * Polling timer to check the channels for data. + * Called with ust_lock() held. */ -static void read_buffer_timer(unsigned long data) +static +void lib_ring_buffer_channel_read_timer_start(struct channel *chan) { - struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data; - struct channel *chan = shmp(handle, buf->backend.chan); const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + struct sigevent sev; + struct itimerspec its; + int ret; - CHAN_WARN_ON(chan, !buf->backend.allocated); + if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER + || !chan->read_timer_interval || chan->read_timer_enabled) + return; - if (uatomic_read(&buf->active_readers)) - && lib_ring_buffer_poll_deliver(config, buf, chan)) { - //TODO - //wake_up_interruptible(&buf->read_wait); - //wake_up_interruptible(&chan->read_wait); - } + chan->read_timer_enabled = 1; - //TODO - //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - // mod_timer_pinned(&buf->read_timer, - // jiffies + chan->read_timer_interval); - //else - // mod_timer(&buf->read_timer, - // jiffies + chan->read_timer_interval); -} -#endif //0 + lib_ring_buffer_setup_timer_thread(); -static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) -{ - struct channel *chan = shmp(handle, buf->backend.chan); - const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = LTTNG_UST_RB_SIG_READ; + sev.sigev_value.sival_ptr = chan; + ret = timer_create(CLOCKID, &sev, &chan->read_timer); + if (ret == -1) { + PERROR("timer_create"); + } - if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER - || !chan->read_timer_interval - || buf->read_timer_enabled) - return; + its.it_value.tv_sec = chan->read_timer_interval / 1000000; + its.it_value.tv_nsec = chan->read_timer_interval % 1000000; + its.it_interval.tv_sec = its.it_value.tv_sec; + its.it_interval.tv_nsec = its.it_value.tv_nsec; - //TODO - //init_timer(&buf->read_timer); - //buf->read_timer.function = read_buffer_timer; - //buf->read_timer.expires = jiffies + chan->read_timer_interval; - //buf->read_timer.data = (unsigned long)buf; - - //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - // add_timer_on(&buf->read_timer, buf->backend.cpu); - //else - // add_timer(&buf->read_timer); - buf->read_timer_enabled = 1; + ret = timer_settime(chan->read_timer, 0, &its, NULL); + if (ret == -1) { + PERROR("timer_settime"); + } } -static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) +/* + * Called with ust_lock() held. + */ +static +void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) { - struct channel *chan = shmp(handle, buf->backend.chan); const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; + sigset_t pending_set; + int ret; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER - || !chan->read_timer_interval - || !buf->read_timer_enabled) + || !chan->read_timer_interval || !chan->read_timer_enabled) return; - //TODO - //del_timer_sync(&buf->read_timer); + ret = timer_delete(chan->read_timer); + if (ret == -1) { + PERROR("timer_delete"); + } + /* * do one more check to catch data that has been written in the last * timer period. */ - if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { - //TODO - //wake_up_interruptible(&buf->read_wait); - //wake_up_interruptible(&chan->read_wait); + lib_ring_buffer_channel_do_read(chan); + + + /* + * Ensure we don't have any signal queued for this channel. + */ + for (;;) { + ret = sigemptyset(&pending_set); + if (ret == -1) { + PERROR("sigemptyset"); + } + ret = sigpending(&pending_set); + if (ret == -1) { + PERROR("sigpending"); + } + if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ)) + break; + caa_cpu_relax(); } - buf->read_timer_enabled = 0; + + /* + * From this point, no new signal handler will be fired that + * would try to access "chan". However, we still need to wait + * for any currently executing handler to complete. + */ + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 0); + cmm_smp_mb(); + + /* + * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management + * thread wakes up. + */ + kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); + + while (!CMM_LOAD_SHARED(timer_signal.qs_done)) + caa_cpu_relax(); + cmm_smp_mb(); + + chan->read_timer = 0; + chan->read_timer_enabled = 0; } static void channel_unregister_notifiers(struct channel *chan, @@ -563,18 +647,7 @@ static void channel_unregister_notifiers(struct channel *chan, int cpu; lib_ring_buffer_channel_switch_timer_stop(chan); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - for_each_possible_cpu(cpu) { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - - lib_ring_buffer_stop_read_timer(buf, handle); - } - } else { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - - lib_ring_buffer_stop_read_timer(buf, handle); - } - //channel_backend_unregister_notifiers(&chan->backend); + lib_ring_buffer_channel_read_timer_stop(chan); } static void channel_print_errors(struct channel *chan, @@ -708,29 +781,12 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff chan->handle = handle; chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); - chan->switch_timer_interval = switch_timer_interval; - - //TODO - //chan->read_timer_interval = read_timer_interval; - //init_waitqueue_head(&chan->read_wait); - //init_waitqueue_head(&chan->hp_wait); + chan->switch_timer_interval = switch_timer_interval; + chan->read_timer_interval = read_timer_interval; lib_ring_buffer_channel_switch_timer_start(chan); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - /* - * In case of non-hotplug cpu, if the ring-buffer is allocated - * in early initcall, it will not be notified of secondary cpus. - * In that off case, we need to allocate for all possible cpus. - */ - for_each_possible_cpu(cpu) { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - lib_ring_buffer_start_read_timer(buf, handle); - } - } else { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); + lib_ring_buffer_channel_read_timer_start(chan); - lib_ring_buffer_start_read_timer(buf, handle); - } return handle; error_backend_init: