Implement read timer (for RT)
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 11 Mar 2013 14:12:40 +0000 (10:12 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 11 Mar 2013 14:12:40 +0000 (10:12 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
14 files changed:
include/lttng/ust-events.h
liblttng-ust-ctl/ustctl.c
liblttng-ust/Makefile.am
liblttng-ust/lttng-ring-buffer-client-discard-rt.c [new file with mode: 0644]
liblttng-ust/lttng-ring-buffer-client-discard.c
liblttng-ust/lttng-ring-buffer-client-overwrite-rt.c [new file with mode: 0644]
liblttng-ust/lttng-ring-buffer-client-overwrite.c
liblttng-ust/lttng-ring-buffer-client.h
liblttng-ust/lttng-ring-buffer-metadata-client.h
liblttng-ust/lttng-ust-abi.c
liblttng-ust/lttng-ust-comm.c
libringbuffer/frontend_internal.h
libringbuffer/frontend_types.h
libringbuffer/ring_buffer_frontend.c

index 363fcb5cae6ce228175034e669b358040c4ad4d4..2ffadde1c82b38b82c51a240db213a53949c6905 100644 (file)
@@ -54,6 +54,8 @@ enum lttng_client_types {
        LTTNG_CLIENT_METADATA = 0,
        LTTNG_CLIENT_DISCARD = 1,
        LTTNG_CLIENT_OVERWRITE = 2,
+       LTTNG_CLIENT_DISCARD_RT = 3,
+       LTTNG_CLIENT_OVERWRITE_RT = 4,
        LTTNG_NR_CLIENT_TYPES,
 };
 
index 18bf614babdd9db292243f4aee0d48761699b5f4..67aae68294fdfcf64782fe291568e14f5b3d3544 100644 (file)
@@ -915,8 +915,19 @@ struct ustctl_consumer_channel *
        switch (attr->type) {
        case LTTNG_UST_CHAN_PER_CPU:
                if (attr->output == LTTNG_UST_MMAP) {
-                       transport_name = attr->overwrite ?
-                               "relay-overwrite-mmap" : "relay-discard-mmap";
+                       if (attr->overwrite) {
+                               if (attr->read_timer_interval == 0) {
+                                       transport_name = "relay-overwrite-mmap";
+                               } else {
+                                       transport_name = "relay-overwrite-rt-mmap";
+                               }
+                       } else {
+                               if (attr->read_timer_interval == 0) {
+                                       transport_name = "relay-discard-mmap";
+                               } else {
+                                       transport_name = "relay-discard-rt-mmap";
+                               }
+                       }
                } else {
                        return NULL;
                }
@@ -1758,14 +1769,18 @@ void ustctl_init(void)
        init_usterr();
        lttng_ring_buffer_metadata_client_init();
        lttng_ring_buffer_client_overwrite_init();
+       lttng_ring_buffer_client_overwrite_rt_init();
        lttng_ring_buffer_client_discard_init();
+       lttng_ring_buffer_client_discard_rt_init();
        lib_ringbuffer_signal_init();
 }
 
 static __attribute__((destructor))
 void ustctl_exit(void)
 {
+       lttng_ring_buffer_client_discard_rt_exit();
        lttng_ring_buffer_client_discard_exit();
+       lttng_ring_buffer_client_overwrite_rt_exit();
        lttng_ring_buffer_client_overwrite_exit();
        lttng_ring_buffer_metadata_client_exit();
 }
index a5076c6ec7f560fcf755cd20fb1562a3d3ba3410..cc105d7d319b975093bc3f0a784c487f511c77f8 100644 (file)
@@ -47,7 +47,9 @@ liblttng_ust_support_la_SOURCES = \
        ust-core.c \
        lttng-ring-buffer-client.h \
        lttng-ring-buffer-client-discard.c \
+       lttng-ring-buffer-client-discard-rt.c \
        lttng-ring-buffer-client-overwrite.c \
+       lttng-ring-buffer-client-overwrite-rt.c \
        lttng-ring-buffer-metadata-client.h \
        lttng-ring-buffer-metadata-client.c
 
diff --git a/liblttng-ust/lttng-ring-buffer-client-discard-rt.c b/liblttng-ust/lttng-ring-buffer-client-discard-rt.c
new file mode 100644 (file)
index 0000000..301328f
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * lttng-ring-buffer-client-discard-rt.c
+ *
+ * LTTng lib ring buffer client (discard mode) for RT.
+ *
+ * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; only
+ * version 2.1 of the License.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#define _GNU_SOURCE
+#include "lttng-tracer.h"
+
+#define RING_BUFFER_MODE_TEMPLATE              RING_BUFFER_DISCARD
+#define RING_BUFFER_MODE_TEMPLATE_STRING       "discard-rt"
+#define RING_BUFFER_MODE_TEMPLATE_INIT \
+       lttng_ring_buffer_client_discard_rt_init
+#define RING_BUFFER_MODE_TEMPLATE_EXIT \
+       lttng_ring_buffer_client_discard_rt_exit
+#define LTTNG_CLIENT_TYPE                      LTTNG_CLIENT_DISCARD_RT
+#define LTTNG_CLIENT_CALLBACKS                 lttng_client_callbacks_discard_rt
+#define LTTNG_CLIENT_WAKEUP                    RING_BUFFER_WAKEUP_BY_TIMER
+#include "lttng-ring-buffer-client.h"
index 0463864b6c00d26966725018c50108f8b4f777d9..2e469b7a6b79f67e0486cd4d62236a16297a5ae9 100644 (file)
@@ -31,4 +31,5 @@
        lttng_ring_buffer_client_discard_exit
 #define LTTNG_CLIENT_TYPE                      LTTNG_CLIENT_DISCARD
 #define LTTNG_CLIENT_CALLBACKS                 lttng_client_callbacks_discard
+#define LTTNG_CLIENT_WAKEUP                    RING_BUFFER_WAKEUP_BY_WRITER
 #include "lttng-ring-buffer-client.h"
diff --git a/liblttng-ust/lttng-ring-buffer-client-overwrite-rt.c b/liblttng-ust/lttng-ring-buffer-client-overwrite-rt.c
new file mode 100644 (file)
index 0000000..b4c5fe5
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * lttng-ring-buffer-client-overwrite-rt.c
+ *
+ * LTTng lib ring buffer client (overwrite mode).
+ *
+ * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; only
+ * version 2.1 of the License.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#define _GNU_SOURCE
+#include "lttng-tracer.h"
+
+#define RING_BUFFER_MODE_TEMPLATE              RING_BUFFER_OVERWRITE
+#define RING_BUFFER_MODE_TEMPLATE_STRING       "overwrite-rt"
+#define RING_BUFFER_MODE_TEMPLATE_INIT \
+       lttng_ring_buffer_client_overwrite_rt_init
+#define RING_BUFFER_MODE_TEMPLATE_EXIT \
+       lttng_ring_buffer_client_overwrite_rt_exit
+#define LTTNG_CLIENT_TYPE                      LTTNG_CLIENT_OVERWRITE_RT
+#define LTTNG_CLIENT_CALLBACKS                 lttng_client_callbacks_overwrite_rt
+#define LTTNG_CLIENT_WAKEUP                    RING_BUFFER_WAKEUP_BY_TIMER
+#include "lttng-ring-buffer-client.h"
index 3d6fdabf79e08c8977c8b90b485bfe339d4cb0b1..e9cbc89d90005d17ab9a58c41916b6548f4e98d9 100644 (file)
@@ -31,4 +31,5 @@
        lttng_ring_buffer_client_overwrite_exit
 #define LTTNG_CLIENT_TYPE                      LTTNG_CLIENT_OVERWRITE
 #define LTTNG_CLIENT_CALLBACKS                 lttng_client_callbacks_overwrite
+#define LTTNG_CLIENT_WAKEUP                    RING_BUFFER_WAKEUP_BY_WRITER
 #include "lttng-ring-buffer-client.h"
index 8c531cbf36999b6e73a64de8a033199053ce9c98..a67c65ae3a3cb87d37831d9e5e86e5fb1090d327 100644 (file)
@@ -402,7 +402,7 @@ static const struct lttng_ust_lib_ring_buffer_config client_config = {
        .output = RING_BUFFER_MMAP,
        .oops = RING_BUFFER_OOPS_CONSISTENCY,
        .ipi = RING_BUFFER_NO_IPI_BARRIER,
-       .wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+       .wakeup = LTTNG_CLIENT_WAKEUP,
        .client_type = LTTNG_CLIENT_TYPE,
 };
 
@@ -556,12 +556,14 @@ static struct lttng_transport lttng_relay_transport = {
 
 void RING_BUFFER_MODE_TEMPLATE_INIT(void)
 {
-       DBG("LTT : ltt ring buffer client init\n");
+       DBG("LTT : ltt ring buffer client \"%s\" init\n",
+               "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap");
        lttng_transport_register(&lttng_relay_transport);
 }
 
 void RING_BUFFER_MODE_TEMPLATE_EXIT(void)
 {
-       DBG("LTT : ltt ring buffer client exit\n");
+       DBG("LTT : ltt ring buffer client \"%s\" exit\n",
+               "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap");
        lttng_transport_unregister(&lttng_relay_transport);
 }
index 1e1b8a766b63b3e8b82fd5597eaad675a095f463..5c82187d4311d9f496b188d6e54c735dbbbf86f0 100644 (file)
@@ -307,12 +307,14 @@ static struct lttng_transport lttng_relay_transport = {
 
 void RING_BUFFER_MODE_TEMPLATE_INIT(void)
 {
-       DBG("LTT : ltt ring buffer client init\n");
+       DBG("LTT : ltt ring buffer client \"%s\" init\n",
+               "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap");
        lttng_transport_register(&lttng_relay_transport);
 }
 
 void RING_BUFFER_MODE_TEMPLATE_EXIT(void)
 {
-       DBG("LTT : ltt ring buffer client exit\n");
+       DBG("LTT : ltt ring buffer client \"%s\" exit\n",
+               "relay-" RING_BUFFER_MODE_TEMPLATE_STRING "-mmap");
        lttng_transport_unregister(&lttng_relay_transport);
 }
index f26ab5c42907f6a71aab27620e7ecbea10465e07..a852aaef99752789ee66cb6a017485c8604e43b6 100644 (file)
@@ -446,8 +446,19 @@ int lttng_abi_map_channel(int session_objd,
        switch (type) {
        case LTTNG_UST_CHAN_PER_CPU:
                if (config->output == RING_BUFFER_MMAP) {
-                       transport_name = config->mode == RING_BUFFER_OVERWRITE ?
-                               "relay-overwrite-mmap" : "relay-discard-mmap";
+                       if (config->mode == RING_BUFFER_OVERWRITE) {
+                               if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER) {
+                                       transport_name = "relay-overwrite-mmap";
+                               } else {
+                                       transport_name = "relay-overwrite-rt-mmap";
+                               }
+                       } else {
+                               if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER) {
+                                       transport_name = "relay-discard-mmap";
+                               } else {
+                                       transport_name = "relay-discard-rt-mmap";
+                               }
+                       }
                } else {
                        ret = -EINVAL;
                        goto notransport;
index d91f4bd255a993fc1f8699a3deed4f1cf8a6dba9..18763c45fd3a9c0e499f50ddf657b2b3996c0f29 100644 (file)
@@ -178,10 +178,14 @@ static const char *str_timeout;
 static int got_timeout_env;
 
 extern void lttng_ring_buffer_client_overwrite_init(void);
+extern void lttng_ring_buffer_client_overwrite_rt_init(void);
 extern void lttng_ring_buffer_client_discard_init(void);
+extern void lttng_ring_buffer_client_discard_rt_init(void);
 extern void lttng_ring_buffer_metadata_client_init(void);
 extern void lttng_ring_buffer_client_overwrite_exit(void);
+extern void lttng_ring_buffer_client_overwrite_rt_exit(void);
 extern void lttng_ring_buffer_client_discard_exit(void);
+extern void lttng_ring_buffer_client_discard_rt_exit(void);
 extern void lttng_ring_buffer_metadata_client_exit(void);
 
 /*
@@ -1116,7 +1120,9 @@ void __attribute__((constructor)) lttng_ust_init(void)
        init_tracepoint();
        lttng_ring_buffer_metadata_client_init();
        lttng_ring_buffer_client_overwrite_init();
+       lttng_ring_buffer_client_overwrite_rt_init();
        lttng_ring_buffer_client_discard_init();
+       lttng_ring_buffer_client_discard_rt_init();
 
        timeout_mode = get_constructor_timeout(&constructor_timeout);
 
@@ -1219,7 +1225,9 @@ void lttng_ust_cleanup(int exiting)
         */
        lttng_ust_abi_exit();
        lttng_ust_events_exit();
+       lttng_ring_buffer_client_discard_rt_exit();
        lttng_ring_buffer_client_discard_exit();
+       lttng_ring_buffer_client_overwrite_rt_exit();
        lttng_ring_buffer_client_overwrite_exit();
        lttng_ring_buffer_metadata_client_exit();
        exit_tracepoint();
index a96746dcc53533910e76baaa15ab28e0133fd566..77a431295a71b495d72fff3de74c31b8aff761eb 100644 (file)
@@ -300,6 +300,71 @@ int lib_ring_buffer_reserve_committed(const struct lttng_ust_lib_ring_buffer_con
                     - (commit_count & chan->commit_count_mask) == 0);
 }
 
+static inline
+void lib_ring_buffer_wakeup(struct lttng_ust_lib_ring_buffer *buf,
+               struct lttng_ust_shm_handle *handle)
+{
+       int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref);
+       sigset_t sigpipe_set, pending_set, old_set;
+       int ret, sigpipe_was_pending = 0;
+
+       if (wakeup_fd < 0)
+               return;
+
+       /*
+        * Wake-up the other end by writing a null byte in the pipe
+        * (non-blocking).  Important note: Because writing into the
+        * pipe is non-blocking (and therefore we allow dropping wakeup
+        * data, as long as there is wakeup data present in the pipe
+        * buffer to wake up the consumer), the consumer should perform
+        * the following sequence for waiting:
+        * 1) empty the pipe (reads).
+        * 2) check if there is data in the buffer.
+        * 3) wait on the pipe (poll).
+        *
+        * Discard the SIGPIPE from write(), not disturbing any SIGPIPE
+        * that might be already pending. If a bogus SIGPIPE is sent to
+        * the entire process concurrently by a malicious user, it may
+        * be simply discarded.
+        */
+       ret = sigemptyset(&pending_set);
+       assert(!ret);
+       /*
+        * sigpending returns the mask of signals that are _both_
+        * blocked for the thread _and_ pending for either the thread or
+        * the entire process.
+        */
+       ret = sigpending(&pending_set);
+       assert(!ret);
+       sigpipe_was_pending = sigismember(&pending_set, SIGPIPE);
+       /*
+        * If sigpipe was pending, it means it was already blocked, so
+        * no need to block it.
+        */
+       if (!sigpipe_was_pending) {
+               ret = sigemptyset(&sigpipe_set);
+               assert(!ret);
+               ret = sigaddset(&sigpipe_set, SIGPIPE);
+               assert(!ret);
+               ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set);
+               assert(!ret);
+       }
+       do {
+               ret = write(wakeup_fd, "", 1);
+       } while (ret == -1L && errno == EINTR);
+       if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) {
+               struct timespec timeout = { 0, 0 };
+               do {
+                       ret = sigtimedwait(&sigpipe_set, NULL,
+                               &timeout);
+               } while (ret == -1L && errno == EINTR);
+       }
+       if (!sigpipe_was_pending) {
+               ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL);
+               assert(!ret);
+       }
+}
+
 static inline
 void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config *config,
                                   struct lttng_ust_lib_ring_buffer *buf,
@@ -396,78 +461,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config
                        if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
                            && uatomic_read(&buf->active_readers)
                            && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
-                               int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref);
-
-                               if (wakeup_fd >= 0) {
-                                       sigset_t sigpipe_set, pending_set, old_set;
-                                       int ret, sigpipe_was_pending = 0;
-
-                                       /*
-                                        * Wake-up the other end by
-                                        * writing a null byte in the
-                                        * pipe (non-blocking).
-                                        * Important note: Because
-                                        * writing into the pipe is
-                                        * non-blocking (and therefore
-                                        * we allow dropping wakeup
-                                        * data, as long as there is
-                                        * wakeup data present in the
-                                        * pipe buffer to wake up the
-                                        * consumer), the consumer
-                                        * should perform the following
-                                        * sequence for waiting:
-                                        * 1) empty the pipe (reads).
-                                        * 2) check if there is data in
-                                        *    the buffer.
-                                        * 3) wait on the pipe (poll).
-                                        *
-                                        * Discard the SIGPIPE from write(), not
-                                        * disturbing any SIGPIPE that might be
-                                        * already pending. If a bogus SIGPIPE
-                                        * is sent to the entire process
-                                        * concurrently by a malicious user, it
-                                        * may be simply discarded.
-                                        */
-                                       ret = sigemptyset(&pending_set);
-                                       assert(!ret);
-                                       /*
-                                        * sigpending returns the mask
-                                        * of signals that are _both_
-                                        * blocked for the thread _and_
-                                        * pending for either the thread
-                                        * or the entire process.
-                                        */
-                                       ret = sigpending(&pending_set);
-                                       assert(!ret);
-                                       sigpipe_was_pending = sigismember(&pending_set, SIGPIPE);
-                                       /*
-                                        * If sigpipe was pending, it
-                                        * means it was already blocked,
-                                        * so no need to block it.
-                                        */
-                                       if (!sigpipe_was_pending) {
-                                               ret = sigemptyset(&sigpipe_set);
-                                               assert(!ret);
-                                               ret = sigaddset(&sigpipe_set, SIGPIPE);
-                                               assert(!ret);
-                                               ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set);
-                                               assert(!ret);
-                                       }
-                                       do {
-                                               ret = write(wakeup_fd, "", 1);
-                                       } while (ret == -1L && errno == EINTR);
-                                       if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) {
-                                               struct timespec timeout = { 0, 0 };
-                                               do {
-                                                       ret = sigtimedwait(&sigpipe_set, NULL,
-                                                               &timeout);
-                                               } while (ret == -1L && errno == EINTR);
-                                       }
-                                       if (!sigpipe_was_pending) {
-                                               ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL);
-                                               assert(!ret);
-                                       }
-                               }
+                               lib_ring_buffer_wakeup(buf, handle);
                        }
                }
        }
index c23fdd2841e85f808e6505455205c67231be1aac..70e7bb7deb7f17de588f3743ed3430ce69174ece 100644 (file)
@@ -63,8 +63,9 @@ struct channel {
        int switch_timer_enabled;
 
        unsigned long read_timer_interval;      /* Reader wakeup (us) */
-       //timer_t read_timer;
-       //wait_queue_head_t read_wait;          /* reader wait queue */
+       timer_t read_timer;
+       int read_timer_enabled;
+
        int finalized;                          /* Has channel been finalized */
        size_t priv_data_offset;
        unsigned int nr_streams;                /* Number of streams */
@@ -128,9 +129,7 @@ struct lttng_ust_lib_ring_buffer {
        unsigned long get_subbuf_consumed;      /* Read-side consumed */
        unsigned long prod_snapshot;    /* Producer count snapshot */
        unsigned long cons_snapshot;    /* Consumer count snapshot */
-       unsigned int get_subbuf:1,      /* Sub-buffer being held by reader */
-               switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */
-               read_timer_enabled:1;   /* Protected by ring_buffer_nohz_lock */
+       unsigned int get_subbuf:1;      /* Sub-buffer being held by reader */
        /* shmp pointer to self */
        DECLARE_SHMP(struct lttng_ust_lib_ring_buffer, self);
        char padding[RB_RING_BUFFER_PADDING];
index d0e9466810cc0cb45b7ef51e40a67c08e367cf33..6504f0e64cee59a07344703d8c402a96c375f5b1 100644 (file)
@@ -80,8 +80,9 @@
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
 
-#define LTTNG_UST_RB_SIG               SIGRTMIN
-#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_FLUSH         SIGRTMIN
+#define LTTNG_UST_RB_SIG_READ          SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 2
 #define CLOCKID                CLOCK_MONOTONIC
 
 /*
@@ -282,7 +283,7 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
        handle = chan->handle;
        config = &chan->backend.config;
 
-       DBG("Timer for channel %p\n", chan);
+       DBG("Switch timer for channel %p\n", chan);
 
        /*
         * Only flush buffers periodically if readers are active.
@@ -300,14 +301,64 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
                struct lttng_ust_lib_ring_buffer *buf =
                        shmp(handle, chan->backend.buf[0].shmp);
 
-                       if (uatomic_read(&buf->active_readers))
-                               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
-                                       chan->handle);
+               if (uatomic_read(&buf->active_readers))
+                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                               chan->handle);
        }
        pthread_mutex_unlock(&wakeup_fd_mutex);
        return;
 }
 
+static
+void lib_ring_buffer_channel_do_read(struct channel *chan)
+{
+       const struct lttng_ust_lib_ring_buffer_config *config;
+       struct lttng_ust_shm_handle *handle;
+       int cpu;
+
+       handle = chan->handle;
+       config = &chan->backend.config;
+
+       /*
+        * Only flush buffers periodically if readers are active.
+        */
+       pthread_mutex_lock(&wakeup_fd_mutex);
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               for_each_possible_cpu(cpu) {
+                       struct lttng_ust_lib_ring_buffer *buf =
+                               shmp(handle, chan->backend.buf[cpu].shmp);
+
+                       if (uatomic_read(&buf->active_readers)
+                           && lib_ring_buffer_poll_deliver(config, buf,
+                                       chan, handle)) {
+                               lib_ring_buffer_wakeup(buf, handle);
+                       }
+               }
+       } else {
+               struct lttng_ust_lib_ring_buffer *buf =
+                       shmp(handle, chan->backend.buf[0].shmp);
+
+               if (uatomic_read(&buf->active_readers)
+                   && lib_ring_buffer_poll_deliver(config, buf,
+                               chan, handle)) {
+                       lib_ring_buffer_wakeup(buf, handle);
+               }
+       }
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+}
+
+static
+void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
+{
+       struct channel *chan;
+
+       assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+       chan = si->si_value.sival_ptr;
+       DBG("Read timer for channel %p\n", chan);
+       lib_ring_buffer_channel_do_read(chan);
+       return;
+}
+
 static
 void rb_setmask(sigset_t *mask)
 {
@@ -317,7 +368,11 @@ void rb_setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigemptyset");
        }
-       ret = sigaddset(mask, LTTNG_UST_RB_SIG);
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH);
+       if (ret) {
+               PERROR("sigaddset");
+       }
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ);
        if (ret) {
                PERROR("sigaddset");
        }
@@ -341,12 +396,16 @@ void *sig_thread(void *arg)
        for (;;) {
                signr = sigwaitinfo(&mask, &info);
                if (signr == -1) {
-                       PERROR("sigwaitinfo");
+                       if (errno != EINTR)
+                               PERROR("sigwaitinfo");
                        continue;
                }
-               if (signr == LTTNG_UST_RB_SIG) {
+               if (signr == LTTNG_UST_RB_SIG_FLUSH) {
                        lib_ring_buffer_channel_switch_timer(info.si_signo,
                                        &info, NULL);
+               } else if (signr == LTTNG_UST_RB_SIG_READ) {
+                       lib_ring_buffer_channel_read_timer(info.si_signo,
+                                       &info, NULL);
                } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
@@ -402,7 +461,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
        lib_ring_buffer_setup_timer_thread();
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_UST_RB_SIG;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
        sev.sigev_value.sival_ptr = chan;
        ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
        if (ret == -1) {
@@ -427,7 +486,7 @@ static
 void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
 {
        sigset_t pending_set;
-       int sig_is_pending, ret;
+       int ret;
 
        if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
                return;
@@ -449,8 +508,7 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
                if (ret == -1) {
                        PERROR("sigpending");
                }
-               sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG);
-               if (!sig_is_pending)
+               if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH))
                        break;
                caa_cpu_relax();
        }
@@ -478,82 +536,108 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
        chan->switch_timer_enabled = 0;
 }
 
-#if 0
 /*
- * Polling timer to check the channels for data.
+ * Called with ust_lock() held.
  */
-static void read_buffer_timer(unsigned long data)
+static
+void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
 {
-       struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       struct sigevent sev;
+       struct itimerspec its;
+       int ret;
 
-       CHAN_WARN_ON(chan, !buf->backend.allocated);
+       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
+                       || !chan->read_timer_interval || chan->read_timer_enabled)
+               return;
 
-       if (uatomic_read(&buf->active_readers))
-           && lib_ring_buffer_poll_deliver(config, buf, chan)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
-       }
+       chan->read_timer_enabled = 1;
 
-       //TODO
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      mod_timer_pinned(&buf->read_timer,
-       //                       jiffies + chan->read_timer_interval);
-       //else
-       //      mod_timer(&buf->read_timer,
-       //                jiffies + chan->read_timer_interval);
-}
-#endif //0
+       lib_ring_buffer_setup_timer_thread();
 
-static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
-{
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_READ;
+       sev.sigev_value.sival_ptr = chan;
+       ret = timer_create(CLOCKID, &sev, &chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_create");
+       }
 
-       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || buf->read_timer_enabled)
-               return;
+       its.it_value.tv_sec = chan->read_timer_interval / 1000000;
+       its.it_value.tv_nsec = chan->read_timer_interval % 1000000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       //TODO
-       //init_timer(&buf->read_timer);
-       //buf->read_timer.function = read_buffer_timer;
-       //buf->read_timer.expires = jiffies + chan->read_timer_interval;
-       //buf->read_timer.data = (unsigned long)buf;
-
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      add_timer_on(&buf->read_timer, buf->backend.cpu);
-       //else
-       //      add_timer(&buf->read_timer);
-       buf->read_timer_enabled = 1;
+       ret = timer_settime(chan->read_timer, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
+       }
 }
 
-static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       sigset_t pending_set;
+       int ret;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || !buf->read_timer_enabled)
+                       || !chan->read_timer_interval || !chan->read_timer_enabled)
                return;
 
-       //TODO
-       //del_timer_sync(&buf->read_timer);
+       ret = timer_delete(chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_delete");
+       }
+
        /*
         * do one more check to catch data that has been written in the last
         * timer period.
         */
-       if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
+       lib_ring_buffer_channel_do_read(chan);
+
+
+       /*
+        * Ensure we don't have any signal queued for this channel.
+        */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ))
+                       break;
+               caa_cpu_relax();
        }
-       buf->read_timer_enabled = 0;
+
+       /*
+        * From this point, no new signal handler will be fired that
+        * would try to access "chan". However, we still need to wait
+        * for any currently executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+        * thread wakes up.
+        */
+       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+               caa_cpu_relax();
+       cmm_smp_mb();
+
+       chan->read_timer = 0;
+       chan->read_timer_enabled = 0;
 }
 
 static void channel_unregister_notifiers(struct channel *chan,
@@ -563,18 +647,7 @@ static void channel_unregister_notifiers(struct channel *chan,
        int cpu;
 
        lib_ring_buffer_channel_switch_timer_stop(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-
-                       lib_ring_buffer_stop_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
-
-               lib_ring_buffer_stop_read_timer(buf, handle);
-       }
-       //channel_backend_unregister_notifiers(&chan->backend);
+       lib_ring_buffer_channel_read_timer_stop(chan);
 }
 
 static void channel_print_errors(struct channel *chan,
@@ -708,29 +781,12 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 
        chan->handle = handle;
        chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
-       chan->switch_timer_interval = switch_timer_interval;
-
-       //TODO
-       //chan->read_timer_interval = read_timer_interval;
-       //init_waitqueue_head(&chan->read_wait);
-       //init_waitqueue_head(&chan->hp_wait);
 
+       chan->switch_timer_interval = switch_timer_interval;
+       chan->read_timer_interval = read_timer_interval;
        lib_ring_buffer_channel_switch_timer_start(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               /*
-                * In case of non-hotplug cpu, if the ring-buffer is allocated
-                * in early initcall, it will not be notified of secondary cpus.
-                * In that off case, we need to allocate for all possible cpus.
-                */
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-                       lib_ring_buffer_start_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
+       lib_ring_buffer_channel_read_timer_start(chan);
 
-               lib_ring_buffer_start_read_timer(buf, handle);
-       }
        return handle;
 
 error_backend_init:
This page took 0.045389 seconds and 4 git commands to generate.