Update lib ring buffer for external consumer
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 23 Sep 2011 15:39:44 +0000 (11:39 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 23 Sep 2011 15:39:44 +0000 (11:39 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
include/Makefile.am
libringbuffer/frontend.h
libringbuffer/frontend_internal.h
libringbuffer/frontend_types.h
libringbuffer/ring_buffer_frontend.c
libringbuffer/shm.c
libust/ltt-ring-buffer-client.h
libust/ltt-ring-buffer-metadata-client.h
libust/lttng-ust-abi.c
libust/lttng-ust-comm.c

index dcec467bac4287e5df9a2d8c479175b585e8ee16..df20225ad8ff6e39d2de2babd48c8daff14c1ffc 100644 (file)
@@ -10,9 +10,10 @@ nobase_include_HEADERS = \
        ust/ringbuffer-abi.h \
        ust/lttng-tracer.h \
        ust/usterr-signal-safe.h \
-       ust/core.h \
+       ust/config.h \
        ust/share.h \
-       ust/ust.h
+       ust/ust.h \
+       ust/core.h
 
 # note: usterr-signal-safe.h, core.h and share.h need namespace cleanup.
 
@@ -21,15 +22,15 @@ noinst_HEADERS = \
        usterr.h \
        ust_snprintf.h \
        ust/compat.h \
-       ust/config.h \
        ust/marker-internal.h \
        ust/tracepoint-internal.h \
        ust/clock.h \
        ust/probe-internal.h \
-       ust/processor.h \
        ust/kcompat/kcompat.h \
        ust/kcompat/jhash.h \
        ust/kcompat/compiler.h \
        ust/kcompat/types.h \
        ust/stringify.h \
-       ust/wait.h
+       ust/wait.h \
+       ust/ringbuffer-config.h \
+       ust/processor.h
index a88a400395cde93aeeecdea3f6dcfbcb39167bce..9fa0f04d2a67d0bf6e1b46a997755e8fc16ff64e 100644 (file)
@@ -62,7 +62,8 @@ int channel_handle_add_stream(struct shm_handle *handle,
  * channel.
  */
 extern
-void *channel_destroy(struct channel *chan, struct shm_handle *handle);
+void *channel_destroy(struct channel *chan, struct shm_handle *handle,
+               int shadow);
 
 
 /* Buffer read operations */
@@ -83,9 +84,11 @@ extern struct lib_ring_buffer *channel_get_ring_buffer(
                                int *shm_fd, int *wait_fd,
                                uint64_t *memory_map_size);
 extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
-                                    struct shm_handle *handle);
+                                    struct shm_handle *handle,
+                                    int shadow);
 extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
-                                        struct shm_handle *handle);
+                                        struct shm_handle *handle,
+                                        int shadow);
 
 /*
  * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
index 1fb08dfdec90d3f25e5f7ce85f39c010386badf4..a6b96c1a098ab089199cbc9d06f59549bcc4328f 100644 (file)
@@ -376,7 +376,8 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                         * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
                         */
                        if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
-                           && uatomic_read(&buf->active_readers)
+                           && (uatomic_read(&buf->active_readers)
+                               || uatomic_read(&buf->active_shadow_readers))
                            && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
                                int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref);
 
index ad7848c831627cd87a009db5a1c9e29c41a1f841..76ad7ff5beece42cd492a2f7da27d8301ea29336 100644 (file)
@@ -87,6 +87,7 @@ struct lib_ring_buffer {
                                         * Active readers count
                                         * standard atomic access (shared)
                                         */
+       long active_shadow_readers;
                                        /* Dropped records */
        union v_atomic records_lost_full;       /* Buffer full */
        union v_atomic records_lost_wrap;       /* Nested wrap-around */
index 5d87782ee321e79d51e8164d0f80087369872b06..5e6d4df267d2c48d9d4c0bfa44b33202574dcea0 100644 (file)
@@ -249,7 +249,7 @@ static void switch_buffer_timer(unsigned long data)
        /*
         * Only flush buffers periodically if readers are active.
         */
-       if (uatomic_read(&buf->active_readers))
+       if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
                lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
 
        //TODO timers
@@ -307,7 +307,7 @@ static void read_buffer_timer(unsigned long data)
 
        CHAN_WARN_ON(chan, !buf->backend.allocated);
 
-       if (uatomic_read(&buf->active_readers)
+       if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers))
            && lib_ring_buffer_poll_deliver(config, buf, chan)) {
                //TODO
                //wake_up_interruptible(&buf->read_wait);
@@ -395,11 +395,13 @@ static void channel_unregister_notifiers(struct channel *chan,
        //channel_backend_unregister_notifiers(&chan->backend);
 }
 
-static void channel_free(struct channel *chan, struct shm_handle *handle)
+static void channel_free(struct channel *chan, struct shm_handle *handle,
+               int shadow)
 {
        int ret;
 
-       channel_backend_free(&chan->backend, handle);
+       if (!shadow)
+               channel_backend_free(&chan->backend, handle);
        /* chan is freed by shm teardown */
        shm_object_table_destroy(handle->table);
        free(handle);
@@ -550,9 +552,10 @@ int channel_handle_add_stream(struct shm_handle *handle,
 }
 
 static
-void channel_release(struct channel *chan, struct shm_handle *handle)
+void channel_release(struct channel *chan, struct shm_handle *handle,
+               int shadow)
 {
-       channel_free(chan, handle);
+       channel_free(chan, handle, shadow);
 }
 
 /**
@@ -566,12 +569,18 @@ void channel_release(struct channel *chan, struct shm_handle *handle)
  * They should release their handle at that point.  Returns the private
  * data pointer.
  */
-void *channel_destroy(struct channel *chan, struct shm_handle *handle)
+void *channel_destroy(struct channel *chan, struct shm_handle *handle,
+               int shadow)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        void *priv;
        int cpu;
 
+       if (shadow) {
+               channel_release(chan, handle, shadow);
+               return NULL;
+       }
+
        channel_unregister_notifiers(chan, handle);
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
@@ -615,7 +624,7 @@ void *channel_destroy(struct channel *chan, struct shm_handle *handle)
         * descriptor directly. No need to refcount.
         */
        priv = chan->backend.priv;
-       channel_release(chan, handle);
+       channel_release(chan, handle, shadow);
        return priv;
 }
 
@@ -642,10 +651,17 @@ struct lib_ring_buffer *channel_get_ring_buffer(
 }
 
 int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
-                             struct shm_handle *handle)
+                             struct shm_handle *handle,
+                             int shadow)
 {
        struct channel *chan = shmp(handle, buf->backend.chan);
 
+       if (shadow) {
+               if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0)
+                       return -EBUSY;
+               cmm_smp_mb();
+               return 0;
+       }
        if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
                return -EBUSY;
        cmm_smp_mb();
@@ -653,10 +669,17 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
 }
 
 void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
-                                 struct shm_handle *handle)
+                                 struct shm_handle *handle,
+                                 int shadow)
 {
        struct channel *chan = shmp(handle, buf->backend.chan);
 
+       if (shadow) {
+               CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1);
+               cmm_smp_mb();
+               uatomic_dec(&buf->active_shadow_readers);
+               return;
+       }
        CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
        cmm_smp_mb();
        uatomic_dec(&buf->active_readers);
@@ -734,7 +757,8 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
        struct channel *chan = shmp(handle, bufb->chan);
        unsigned long consumed;
 
-       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
+       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
+                       && uatomic_read(&buf->active_shadow_readers) != 1);
 
        /*
         * Only push the consumed value forward.
@@ -857,7 +881,8 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf,
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long read_sb_bindex, consumed_idx, consumed;
 
-       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
+       CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1
+                       && uatomic_read(&buf->active_shadow_readers) != 1);
 
        if (!buf->get_subbuf) {
                /*
index ae63b004fd50723cd9c73511049bb8d042928f59..fd0919fac3b75ad057e9c0aed87efd676847a76a 100644 (file)
@@ -214,6 +214,8 @@ void shmp_object_destroy(struct shm_object *obj)
                assert(0);
        }
        for (i = 0; i < 2; i++) {
+               if (obj->wait_fd[i] < 0)
+                       continue;
                ret = close(obj->wait_fd[i]);
                if (ret) {
                        PERROR("close");
index f3a1a27aaca49629e5831de7b3a2156993f68cd3..5f5c31588356f7f78631112eb4e34d004c576c9a 100644 (file)
@@ -397,7 +397,7 @@ struct ltt_channel *_channel_create(const char *name,
 static
 void ltt_channel_destroy(struct ltt_channel *ltt_chan)
 {
-       channel_destroy(ltt_chan->chan, ltt_chan->handle);
+       channel_destroy(ltt_chan->chan, ltt_chan->handle, 0);
 }
 
 static
@@ -413,7 +413,7 @@ struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan,
                buf = channel_get_ring_buffer(&client_config, chan,
                                cpu, handle, shm_fd, wait_fd,
                                memory_map_size);
-               if (!lib_ring_buffer_open_read(buf, handle))
+               if (!lib_ring_buffer_open_read(buf, handle, 0))
                        return buf;
        }
        return NULL;
@@ -423,7 +423,7 @@ static
 void ltt_buffer_read_close(struct lib_ring_buffer *buf,
                           struct shm_handle *handle)
 {
-       lib_ring_buffer_release_read(buf, handle);
+       lib_ring_buffer_release_read(buf, handle, 0);
 }
 
 static
index fa79485c98634d4892ef45c9199a0764add004f6..0102860ed64b8d6af3a1b49f6251b2c21e353839 100644 (file)
@@ -177,7 +177,7 @@ struct ltt_channel *_channel_create(const char *name,
 static
 void ltt_channel_destroy(struct ltt_channel *ltt_chan)
 {
-       channel_destroy(ltt_chan->chan, ltt_chan->handle);
+       channel_destroy(ltt_chan->chan, ltt_chan->handle, 0);
 }
 
 static
@@ -190,7 +190,7 @@ struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan,
 
        buf = channel_get_ring_buffer(&client_config, chan,
                        0, handle, shm_fd, wait_fd, memory_map_size);
-       if (!lib_ring_buffer_open_read(buf, handle))
+       if (!lib_ring_buffer_open_read(buf, handle, 0))
                return buf;
        return NULL;
 }
@@ -199,7 +199,7 @@ static
 void ltt_buffer_read_close(struct lib_ring_buffer *buf,
                           struct shm_handle *handle)
 {
-       lib_ring_buffer_release_read(buf, handle);
+       lib_ring_buffer_release_read(buf, handle, 0);
 }
 
 static
index cea8c30bffe8d29e39d4792bbfd6032b62c21d4c..16566c876664df8ae6a9e812ea126d2aa652ed77 100644 (file)
@@ -179,13 +179,8 @@ void objd_table_destroy(void)
 
        for (i = 0; i < objd_table.allocated_len; i++) {
                struct obj *obj = _objd_get(i);
-               const struct objd_ops *ops;
 
-               if (!obj)
-                       continue;
-               ops = obj->u.s.ops;
-               if (ops->release)
-                       ops->release(i);
+               (void) objd_unref(i);
        }
        free(objd_table.array);
        objd_table.array = NULL;
@@ -754,6 +749,7 @@ int lttng_rb_release(int objd)
                buf = priv->buf;
                channel = priv->ltt_chan;
                free(priv);
+               channel->ops->buffer_read_close(buf, channel->handle);
 
                return objd_unref(channel->objd);
        }
index 2e7e1a3515abd1f6b46024364ddb1747057e8a4f..858e92c9b513c3bfb79a844e37d1cc315c1b968b 100644 (file)
@@ -294,6 +294,7 @@ end:
                shm_fd = lum->u.stream.shm_fd;
                wait_fd = lum->u.stream.wait_fd;
                break;
+       case LTTNG_UST_METADATA:
        case LTTNG_UST_CHANNEL:
                lur.u.channel.memory_map_size = lum->u.channel.memory_map_size;
                shm_fd = lum->u.channel.shm_fd;
@@ -306,7 +307,9 @@ end:
                goto error;
        }
 
-       if ((lum->cmd == LTTNG_UST_STREAM || lum->cmd == LTTNG_UST_CHANNEL)
+       if ((lum->cmd == LTTNG_UST_STREAM
+            || lum->cmd == LTTNG_UST_CHANNEL
+            || lum->cmd == LTTNG_UST_METADATA)
                        && lur.ret_code == LTTCOMM_OK) {
                /* we also need to send the file descriptors. */
                ret = lttcomm_send_fds_unix_sock(sock,
This page took 0.031649 seconds and 4 git commands to generate.