Implement shm object table
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 20 Aug 2011 00:03:53 +0000 (20:03 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 20 Aug 2011 00:03:53 +0000 (20:03 -0400)
.. for multi-process accesses to the same maps, with safe "pointers".

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
21 files changed:
include/ust/lttng-events.h
include/ust/lttng-tracepoint-event.h
include/ust/ringbuffer-config.h
libringbuffer/Makefile.am
libringbuffer/backend.h
libringbuffer/backend_internal.h
libringbuffer/backend_types.h
libringbuffer/frontend.h
libringbuffer/frontend_api.h
libringbuffer/frontend_internal.h
libringbuffer/frontend_types.h
libringbuffer/ring_buffer_abi.c
libringbuffer/ring_buffer_backend.c
libringbuffer/ring_buffer_frontend.c
libringbuffer/shm.c [new file with mode: 0644]
libringbuffer/shm.h
libringbuffer/shm_internal.h [new file with mode: 0644]
libringbuffer/shm_types.h [new file with mode: 0644]
libust/ltt-events.c
libust/ltt-ring-buffer-client.h
libust/ltt-ring-buffer-metadata-client.h

index 8f1c98d32eed261964cb9616aa072032a86c04ca..31938327059c09ab218d92eed4c0cae0ba84fcf4 100644 (file)
@@ -195,17 +195,20 @@ struct ltt_event {
 };
 
 struct channel;
+struct shm_handle;
 
 struct ltt_channel_ops {
-       struct shm_handle *(*channel_create)(const char *name,
+       struct ltt_channel *(*channel_create)(const char *name,
                                struct ltt_channel *ltt_chan,
                                void *buf_addr,
                                size_t subbuf_size, size_t num_subbuf,
                                unsigned int switch_timer_interval,
                                unsigned int read_timer_interval);
-       void (*channel_destroy)(struct shm_handle *handle);
-       struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan);
-       void (*buffer_read_close)(struct lib_ring_buffer *buf);
+       void (*channel_destroy)(struct ltt_channel *ltt_chan);
+       struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan,
+                               struct shm_handle *handle);
+       void (*buffer_read_close)(struct lib_ring_buffer *buf,
+                               struct shm_handle *handle);
        int (*event_reserve)(struct lib_ring_buffer_ctx *ctx,
                             uint32_t event_id);
        void (*event_commit)(struct lib_ring_buffer_ctx *ctx);
@@ -216,7 +219,8 @@ struct ltt_channel_ops {
         * packet. Note that the size returned is only a hint, since it
         * may change due to concurrent writes.
         */
-       size_t (*packet_avail_size)(struct channel *chan);
+       size_t (*packet_avail_size)(struct channel *chan,
+                                   struct shm_handle *handle);
        //wait_queue_head_t *(*get_reader_wait_queue)(struct channel *chan);
        //wait_queue_head_t *(*get_hp_wait_queue)(struct channel *chan);
        int (*is_finalized)(struct channel *chan);
index de145ebe5b2d2a270a60f24371a7e31fb2813652..1642cf180c1356424a6aab8771b899574abfd77a 100644 (file)
@@ -480,7 +480,7 @@ static void __event_probe__##_name(void *__data, _proto)                  \
        __event_len = __event_get_size__##_name(__dynamic_len, _args);        \
        __event_align = __event_get_align__##_name(_args);                    \
        lib_ring_buffer_ctx_init(&ctx, __chan->chan, __event, __event_len,    \
-                                __event_align, -1);                          \
+                                __event_align, -1, __chan->handle);          \
        __ret = __chan->ops->event_reserve(&ctx, __event->id);                \
        if (__ret < 0)                                                        \
                return;                                                       \
@@ -511,7 +511,7 @@ static void __event_probe__##_name(void *__data)                          \
        __event_len = __event_get_size__##_name(__dynamic_len);               \
        __event_align = __event_get_align__##_name();                         \
        lib_ring_buffer_ctx_init(&ctx, __chan->chan, __event, __event_len,    \
-                                __event_align, -1);                          \
+                                __event_align, -1, __chan->handle);          \
        __ret = __chan->ops->event_reserve(&ctx, __event->id);                \
        if (__ret < 0)                                                        \
                return;                                                       \
index 80503205a9ad0ee90b7494f01824ca335c53eab2..15fee52d26bb13f8da9bf6de3069e3ba73281b79 100644 (file)
@@ -20,6 +20,7 @@ struct lib_ring_buffer;
 struct channel;
 struct lib_ring_buffer_config;
 struct lib_ring_buffer_ctx;
+struct shm_handle *handle;
 
 /*
  * Ring buffer client callbacks. Only used by slow path, never on fast path.
@@ -40,20 +41,25 @@ struct lib_ring_buffer_client_cb {
        /* Slow path only, at subbuffer switch */
        size_t (*subbuffer_header_size) (void);
        void (*buffer_begin) (struct lib_ring_buffer *buf, u64 tsc,
-                             unsigned int subbuf_idx);
+                             unsigned int subbuf_idx,
+                             struct shm_handle *handle);
        void (*buffer_end) (struct lib_ring_buffer *buf, u64 tsc,
-                           unsigned int subbuf_idx, unsigned long data_size);
+                           unsigned int subbuf_idx, unsigned long data_size,
+                           struct shm_handle *handle);
 
        /* Optional callbacks (can be set to NULL) */
 
        /* Called at buffer creation/finalize */
        int (*buffer_create) (struct lib_ring_buffer *buf, void *priv,
-                             int cpu, const char *name);
+                             int cpu, const char *name,
+                             struct shm_handle *handle);
        /*
         * Clients should guarantee that no new reader handle can be opened
         * after finalize.
         */
-       void (*buffer_finalize) (struct lib_ring_buffer *buf, void *priv, int cpu);
+       void (*buffer_finalize) (struct lib_ring_buffer *buf,
+                                void *priv, int cpu,
+                                struct shm_handle *handle);
 
        /*
         * Extract header length, payload length and timestamp from event
@@ -63,7 +69,8 @@ struct lib_ring_buffer_client_cb {
        void (*record_get) (const struct lib_ring_buffer_config *config,
                            struct channel *chan, struct lib_ring_buffer *buf,
                            size_t offset, size_t *header_len,
-                           size_t *payload_len, u64 *timestamp);
+                           size_t *payload_len, u64 *timestamp,
+                           struct shm_handle *handle);
 };
 
 /*
@@ -165,6 +172,7 @@ struct lib_ring_buffer_ctx {
        /* input received by lib_ring_buffer_reserve(), saved here. */
        struct channel *chan;           /* channel */
        void *priv;                     /* client private data */
+       struct shm_handle *handle;      /* shared-memory handle */
        size_t data_size;               /* size of payload */
        int largest_align;              /*
                                         * alignment of the largest element
@@ -202,7 +210,7 @@ static inline
 void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx,
                              struct channel *chan, void *priv,
                              size_t data_size, int largest_align,
-                             int cpu)
+                             int cpu, struct shm_handle *handle)
 {
        ctx->chan = chan;
        ctx->priv = priv;
@@ -210,6 +218,7 @@ void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx,
        ctx->largest_align = largest_align;
        ctx->cpu = cpu;
        ctx->rflags = 0;
+       ctx->handle = handle;
 }
 
 /*
index ffa37017c44b077d40049649e180db8490e251e5..2fe76816b4964d850cb6251272a337c0bb01d617 100644 (file)
@@ -7,6 +7,7 @@ noinst_HEADERS = \
        smp.h
 
 libringbuffer_la_SOURCES = \
+       shm.c \
        smp.c \
        ring_buffer_backend.c \
        ring_buffer_frontend.c
index 1bd61109ca0096e2685b864324449b5760dc3128..e26045af2c7318f8bff93813cd9beaae8085db81 100644 (file)
 /* Ring buffer backend access (read/write) */
 
 extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb,
-                                  size_t offset, void *dest, size_t len);
+                                  size_t offset, void *dest, size_t len,
+                                  struct shm_handle *handle);
 
 extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
-                                    size_t offset, void *dest, size_t len);
+                                    size_t offset, void *dest, size_t len,
+                                    struct shm_handle *handle);
 
 /*
  * Return the address where a given offset is located.
@@ -40,10 +42,12 @@ extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
  */
 extern void *
 lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
-                              size_t offset);
+                              size_t offset,
+                              struct shm_handle *handle);
 extern void *
 lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
-                                   size_t offset);
+                                   size_t offset,
+                                   struct shm_handle *handle);
 
 /**
  * lib_ring_buffer_write - write data to a buffer backend
@@ -64,16 +68,17 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config,
 {
        struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
        struct channel_backend *chanb = &ctx->chan->backend;
+       struct shm_handle *handle = ctx->handle;
        size_t sbidx;
        size_t offset = ctx->buf_offset;
-       struct lib_ring_buffer_backend_pages *rpages;
+       struct lib_ring_buffer_backend_pages_shmp *rpages;
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
        sbidx = offset >> chanb->subbuf_size_order;
-       id = shmp(bufb->buf_wsb)[sbidx].id;
+       id = shmp(handle, bufb->buf_wsb)[sbidx].id;
        sb_bindex = subbuffer_id_get_index(config, id);
-       rpages = shmp(bufb->array)[sb_bindex];
+       rpages = &shmp(handle, bufb->array)[sb_bindex];
        CHAN_WARN_ON(ctx->chan,
                     config->mode == RING_BUFFER_OVERWRITE
                     && subbuffer_id_is_noref(config, id));
@@ -83,7 +88,7 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config,
         */
        CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
        lib_ring_buffer_do_copy(config,
-                               shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)),
+                               shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)),
                                src, len);
        ctx->buf_offset += len;
 }
@@ -96,24 +101,25 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config,
 static inline
 unsigned long lib_ring_buffer_get_records_unread(
                                const struct lib_ring_buffer_config *config,
-                               struct lib_ring_buffer *buf)
+                               struct lib_ring_buffer *buf,
+                               struct shm_handle *handle)
 {
        struct lib_ring_buffer_backend *bufb = &buf->backend;
-       struct lib_ring_buffer_backend_pages *pages;
+       struct lib_ring_buffer_backend_pages_shmp *pages;
        unsigned long records_unread = 0, sb_bindex, id;
        unsigned int i;
 
-       for (i = 0; i < shmp(bufb->chan)->backend.num_subbuf; i++) {
-               id = shmp(bufb->buf_wsb)[i].id;
+       for (i = 0; i < shmp(handle, bufb->chan)->backend.num_subbuf; i++) {
+               id = shmp(handle, bufb->buf_wsb)[i].id;
                sb_bindex = subbuffer_id_get_index(config, id);
-               pages = shmp(bufb->array)[sb_bindex];
-               records_unread += v_read(config, &pages->records_unread);
+               pages = &shmp(handle, bufb->array)[sb_bindex];
+               records_unread += v_read(config, &shmp(handle, pages->shmp)->records_unread);
        }
        if (config->mode == RING_BUFFER_OVERWRITE) {
                id = bufb->buf_rsb.id;
                sb_bindex = subbuffer_id_get_index(config, id);
-               pages = shmp(bufb->array)[sb_bindex];
-               records_unread += v_read(config, &pages->records_unread);
+               pages = &shmp(handle, bufb->array)[sb_bindex];
+               records_unread += v_read(config, &shmp(handle, pages->shmp)->records_unread);
        }
        return records_unread;
 }
index 061924fcfa5091d32a76a8949c37d7fae49db03c..30e32098af0de0832461dcdfb3df31c5f1b54ff8 100644 (file)
 
 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
                                   struct channel_backend *chan, int cpu,
-                                  struct shm_header *shm_header);
+                                  struct shm_handle *handle,
+                                  struct shm_object *shmobj);
 void channel_backend_unregister_notifiers(struct channel_backend *chanb);
 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb);
 int channel_backend_init(struct channel_backend *chanb,
                         const char *name,
                         const struct lib_ring_buffer_config *config,
                         void *priv, size_t subbuf_size,
-                        size_t num_subbuf, struct shm_header *shm_header);
-void channel_backend_free(struct channel_backend *chanb);
+                        size_t num_subbuf, struct shm_handle *handle);
+void channel_backend_free(struct channel_backend *chanb,
+                         struct shm_handle *handle);
 
-void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb);
+void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb,
+                                  struct shm_handle *handle);
 void channel_backend_reset(struct channel_backend *chanb);
 
 int lib_ring_buffer_backend_init(void);
@@ -183,12 +186,12 @@ int subbuffer_id_check_index(const struct lib_ring_buffer_config *config,
 static inline
 void subbuffer_count_record(const struct lib_ring_buffer_config *config,
                            struct lib_ring_buffer_backend *bufb,
-                           unsigned long idx)
+                           unsigned long idx, struct shm_handle *handle)
 {
        unsigned long sb_bindex;
 
-       sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
-       v_inc(config, &shmp(bufb->array)[sb_bindex]->records_commit);
+       sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+       v_inc(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_commit);
 }
 
 /*
@@ -197,15 +200,16 @@ void subbuffer_count_record(const struct lib_ring_buffer_config *config,
  */
 static inline
 void subbuffer_consume_record(const struct lib_ring_buffer_config *config,
-                             struct lib_ring_buffer_backend *bufb)
+                             struct lib_ring_buffer_backend *bufb,
+                             struct shm_handle *handle)
 {
        unsigned long sb_bindex;
 
        sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
-       CHAN_WARN_ON(bufb->chan,
-                    !v_read(config, &shmp(bufb->array)[sb_bindex]->records_unread));
+       CHAN_WARN_ON(shmp(handle, bufb->chan),
+                    !v_read(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_unread));
        /* Non-atomic decrement protected by exclusive subbuffer access */
-       _v_dec(config, &shmp(bufb->array)[sb_bindex]->records_unread);
+       _v_dec(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_unread);
        v_inc(config, &bufb->records_read);
 }
 
@@ -213,12 +217,13 @@ static inline
 unsigned long subbuffer_get_records_count(
                                const struct lib_ring_buffer_config *config,
                                struct lib_ring_buffer_backend *bufb,
-                               unsigned long idx)
+                               unsigned long idx,
+                               struct shm_handle *handle)
 {
        unsigned long sb_bindex;
 
-       sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
-       return v_read(config, &shmp(bufb->array)[sb_bindex]->records_commit);
+       sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+       return v_read(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_commit);
 }
 
 /*
@@ -231,17 +236,18 @@ static inline
 unsigned long subbuffer_count_records_overrun(
                                const struct lib_ring_buffer_config *config,
                                struct lib_ring_buffer_backend *bufb,
-                               unsigned long idx)
+                               unsigned long idx,
+                               struct shm_handle *handle)
 {
-       struct lib_ring_buffer_backend_pages *pages;
+       struct lib_ring_buffer_backend_pages_shmp *pages;
        unsigned long overruns, sb_bindex;
 
-       sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
-       pages = shmp(bufb->array)[sb_bindex];
-       overruns = v_read(config, &pages->records_unread);
-       v_set(config, &pages->records_unread,
-             v_read(config, &pages->records_commit));
-       v_set(config, &pages->records_commit, 0);
+       sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+       pages = &shmp(handle, bufb->array)[sb_bindex];
+       overruns = v_read(config, &shmp(handle, pages->shmp)->records_unread);
+       v_set(config, &shmp(handle, pages->shmp)->records_unread,
+             v_read(config, &shmp(handle, pages->shmp)->records_commit));
+       v_set(config, &shmp(handle, pages->shmp)->records_commit, 0);
 
        return overruns;
 }
@@ -250,41 +256,44 @@ static inline
 void subbuffer_set_data_size(const struct lib_ring_buffer_config *config,
                             struct lib_ring_buffer_backend *bufb,
                             unsigned long idx,
-                            unsigned long data_size)
+                            unsigned long data_size,
+                            struct shm_handle *handle)
 {
-       struct lib_ring_buffer_backend_pages *pages;
+       struct lib_ring_buffer_backend_pages_shmp *pages;
        unsigned long sb_bindex;
 
-       sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
-       pages = shmp(bufb->array)[sb_bindex];
-       pages->data_size = data_size;
+       sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+       pages = &shmp(handle, bufb->array)[sb_bindex];
+       shmp(handle, pages->shmp)->data_size = data_size;
 }
 
 static inline
 unsigned long subbuffer_get_read_data_size(
                                const struct lib_ring_buffer_config *config,
-                               struct lib_ring_buffer_backend *bufb)
+                               struct lib_ring_buffer_backend *bufb,
+                               struct shm_handle *handle)
 {
-       struct lib_ring_buffer_backend_pages *pages;
+       struct lib_ring_buffer_backend_pages_shmp *pages;
        unsigned long sb_bindex;
 
        sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
-       pages = shmp(bufb->array)[sb_bindex];
-       return pages->data_size;
+       pages = &shmp(handle, bufb->array)[sb_bindex];
+       return shmp(handle, pages->shmp)->data_size;
 }
 
 static inline
 unsigned long subbuffer_get_data_size(
                                const struct lib_ring_buffer_config *config,
                                struct lib_ring_buffer_backend *bufb,
-                               unsigned long idx)
+                               unsigned long idx,
+                               struct shm_handle *handle)
 {
-       struct lib_ring_buffer_backend_pages *pages;
+       struct lib_ring_buffer_backend_pages_shmp *pages;
        unsigned long sb_bindex;
 
-       sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
-       pages = shmp(bufb->array)[sb_bindex];
-       return pages->data_size;
+       sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+       pages = &shmp(handle, bufb->array)[sb_bindex];
+       return shmp(handle, pages->shmp)->data_size;
 }
 
 /**
@@ -294,7 +303,8 @@ unsigned long subbuffer_get_data_size(
 static inline
 void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
                                 struct lib_ring_buffer_backend *bufb,
-                                unsigned long idx)
+                                unsigned long idx,
+                                struct shm_handle *handle)
 {
        unsigned long id, new_id;
 
@@ -305,7 +315,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
         * Performing a volatile access to read the sb_pages, because we want to
         * read a coherent version of the pointer and the associated noref flag.
         */
-       id = CMM_ACCESS_ONCE(shmp(bufb->buf_wsb)[idx].id);
+       id = CMM_ACCESS_ONCE(shmp(handle, bufb->buf_wsb)[idx].id);
        for (;;) {
                /* This check is called on the fast path for each record. */
                if (likely(!subbuffer_id_is_noref(config, id))) {
@@ -319,7 +329,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
                }
                new_id = id;
                subbuffer_id_clear_noref(config, &new_id);
-               new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[idx].id, id, new_id);
+               new_id = uatomic_cmpxchg(&shmp(handle, bufb->buf_wsb)[idx].id, id, new_id);
                if (likely(new_id == id))
                        break;
                id = new_id;
@@ -333,7 +343,8 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
 static inline
 void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *config,
                                      struct lib_ring_buffer_backend *bufb,
-                                     unsigned long idx, unsigned long offset)
+                                     unsigned long idx, unsigned long offset,
+                                     struct shm_handle *handle)
 {
        if (config->mode != RING_BUFFER_OVERWRITE)
                return;
@@ -349,14 +360,14 @@ void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *confi
         * subbuffer_set_noref() uses a volatile store to deal with concurrent
         * readers of the noref flag.
         */
-       CHAN_WARN_ON(bufb->chan,
-                    subbuffer_id_is_noref(config, shmp(bufb->buf_wsb)[idx].id));
+       CHAN_WARN_ON(shmp(handle, bufb->chan),
+                    subbuffer_id_is_noref(config, shmp(handle, bufb->buf_wsb)[idx].id));
        /*
         * Memory barrier that ensures counter stores are ordered before set
         * noref and offset.
         */
        cmm_smp_mb();
-       subbuffer_id_set_noref_offset(config, &shmp(bufb->buf_wsb)[idx].id, offset);
+       subbuffer_id_set_noref_offset(config, &shmp(handle, bufb->buf_wsb)[idx].id, offset);
 }
 
 /**
@@ -367,7 +378,8 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config,
                         struct lib_ring_buffer_backend *bufb,
                         struct channel_backend *chanb,
                         unsigned long consumed_idx,
-                        unsigned long consumed_count)
+                        unsigned long consumed_count,
+                        struct shm_handle *handle)
 {
        unsigned long old_id, new_id;
 
@@ -378,7 +390,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config,
                 * old_wpage, because the value read will be confirmed by the
                 * following cmpxchg().
                 */
-               old_id = shmp(bufb->buf_wsb)[consumed_idx].id;
+               old_id = shmp(handle, bufb->buf_wsb)[consumed_idx].id;
                if (unlikely(!subbuffer_id_is_noref(config, old_id)))
                        return -EAGAIN;
                /*
@@ -388,18 +400,18 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config,
                if (unlikely(!subbuffer_id_compare_offset(config, old_id,
                                                          consumed_count)))
                        return -EAGAIN;
-               CHAN_WARN_ON(bufb->chan,
+               CHAN_WARN_ON(shmp(handle, bufb->chan),
                             !subbuffer_id_is_noref(config, bufb->buf_rsb.id));
                subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id,
                                              consumed_count);
-               new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[consumed_idx].id, old_id,
+               new_id = uatomic_cmpxchg(&shmp(handle, bufb->buf_wsb)[consumed_idx].id, old_id,
                                 bufb->buf_rsb.id);
                if (unlikely(old_id != new_id))
                        return -EAGAIN;
                bufb->buf_rsb.id = new_id;
        } else {
                /* No page exchange, use the writer page directly */
-               bufb->buf_rsb.id = shmp(bufb->buf_wsb)[consumed_idx].id;
+               bufb->buf_rsb.id = shmp(handle, bufb->buf_wsb)[consumed_idx].id;
        }
        return 0;
 }
index 3bc36ba81ee896fbd295e5973b6f9cbe052cdbfe..51dcfbf9a7a9238776d2392d66e6ed03659e7bc1 100644 (file)
@@ -11,7 +11,7 @@
  * Dual LGPL v2.1/GPL v2 license.
  */
 
-#include "shm.h"
+#include "shm_internal.h"
 
 struct lib_ring_buffer_backend_pages {
        unsigned long mmap_offset;      /* offset of the subbuffer in mmap */
@@ -32,6 +32,10 @@ struct lib_ring_buffer_backend_subbuffer {
 struct channel;
 struct lib_ring_buffer;
 
+struct lib_ring_buffer_backend_pages_shmp {
+       DECLARE_SHMP(struct lib_ring_buffer_backend_pages, shmp);
+};
+
 struct lib_ring_buffer_backend {
        /* Array of ring_buffer_backend_subbuffer for writer */
        DECLARE_SHMP(struct lib_ring_buffer_backend_subbuffer, buf_wsb);
@@ -41,7 +45,7 @@ struct lib_ring_buffer_backend {
         * Pointer array of backend pages, for whole buffer.
         * Indexed by ring_buffer_backend_subbuffer identifier (id) index.
         */
-       DECLARE_SHMP(struct lib_ring_buffer_backend_pages *, array);
+       DECLARE_SHMP(struct lib_ring_buffer_backend_pages_shmp, array);
        DECLARE_SHMP(char, memory_map); /* memory mapping */
 
        DECLARE_SHMP(struct channel, chan);     /* Associated channel */
@@ -50,6 +54,10 @@ struct lib_ring_buffer_backend {
        unsigned int allocated:1;       /* Bool: is buffer allocated ? */
 };
 
+struct lib_ring_buffer_shmp {
+       DECLARE_SHMP(struct lib_ring_buffer, shmp); /* Channel per-cpu buffers */
+};
+
 struct channel_backend {
        unsigned long buf_size;         /* Size of the buffer */
        unsigned long subbuf_size;      /* Sub-buffer size */
@@ -60,12 +68,12 @@ struct channel_backend {
                                         */
        unsigned int buf_size_order;    /* Order of buffer size */
        int extra_reader_sb:1;          /* Bool: has extra reader subbuffer */
-       DECLARE_SHMP(struct lib_ring_buffer, buf); /* Channel per-cpu buffers */
        unsigned long num_subbuf;       /* Number of sub-buffers for writer */
        u64 start_tsc;                  /* Channel creation TSC value */
        void *priv;                     /* Client-specific information */
        const struct lib_ring_buffer_config *config; /* Ring buffer configuration */
        char name[NAME_MAX];            /* Channel name */
+       struct lib_ring_buffer_shmp buf[];
 };
 
 #endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */
index 5e5b5cc101f48ed5788c61b391fc12d7579526d5..2385d392e68f985c9061e02ee1f6690692b3ff09 100644 (file)
@@ -50,7 +50,7 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
  * channel.
  */
 extern
-void *channel_destroy(struct shm_handle *handle);
+void *channel_destroy(struct channel *chan, struct shm_handle *handle);
 
 
 /* Buffer read operations */
@@ -66,48 +66,59 @@ void *channel_destroy(struct shm_handle *handle);
 
 extern struct lib_ring_buffer *channel_get_ring_buffer(
                                const struct lib_ring_buffer_config *config,
-                               struct channel *chan, int cpu);
-extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf);
-extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf);
+                               struct channel *chan, int cpu,
+                               struct shm_handle *handle);
+extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
+                                    struct shm_handle *handle);
+extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
+                                        struct shm_handle *handle);
 
 /*
  * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
  */
 extern int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
                                    unsigned long *consumed,
-                                   unsigned long *produced);
+                                   unsigned long *produced,
+                                   struct shm_handle *handle);
 extern void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
-                                         unsigned long consumed_new);
+                                         unsigned long consumed_new,
+                                         struct shm_handle *handle);
 
 extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
-                                     unsigned long consumed);
-extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
+                                     unsigned long consumed,
+                                     struct shm_handle *handle);
+extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf,
+                                      struct shm_handle *handle);
 
 /*
  * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
  * to read sub-buffers sequentially.
  */
-static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf)
+static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf,
+                                                 struct shm_handle *handle)
 {
        int ret;
 
        ret = lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
-                                      &buf->prod_snapshot);
+                                      &buf->prod_snapshot, handle);
        if (ret)
                return ret;
-       ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot);
+       ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot, handle);
        return ret;
 }
 
-static inline void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf)
+static inline
+void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf,
+                                    struct shm_handle *handle)
 {
-       lib_ring_buffer_put_subbuf(buf);
+       lib_ring_buffer_put_subbuf(buf, handle);
        lib_ring_buffer_move_consumer(buf, subbuf_align(buf->cons_snapshot,
-                                                   shmp(buf->backend.chan)));
+                                                       shmp(handle, buf->backend.chan)), handle);
 }
 
 extern void channel_reset(struct channel *chan);
-extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf);
+extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf,
+                                 struct shm_handle *handle);
 
 static inline
 unsigned long lib_ring_buffer_get_offset(const struct lib_ring_buffer_config *config,
@@ -154,9 +165,10 @@ int lib_ring_buffer_channel_is_disabled(const struct channel *chan)
 static inline
 unsigned long lib_ring_buffer_get_read_data_size(
                                const struct lib_ring_buffer_config *config,
-                               struct lib_ring_buffer *buf)
+                               struct lib_ring_buffer *buf,
+                               struct shm_handle *handle)
 {
-       return subbuffer_get_read_data_size(config, &buf->backend);
+       return subbuffer_get_read_data_size(config, &buf->backend, handle);
 }
 
 static inline
index 75146e60bef33a23937f71d27e254b486ae1f7c9..1d8e29493655da9709cec47909167177a7c1bcc6 100644 (file)
@@ -146,6 +146,7 @@ int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config,
                            struct lib_ring_buffer_ctx *ctx)
 {
        struct channel *chan = ctx->chan;
+       struct shm_handle *handle = ctx->handle;
        struct lib_ring_buffer *buf;
        unsigned long o_begin, o_end, o_old;
        size_t before_hdr_pad = 0;
@@ -154,9 +155,9 @@ int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config,
                return -EAGAIN;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               buf = &shmp(chan->backend.buf)[ctx->cpu];
+               buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
        else
-               buf = shmp(chan->backend.buf);
+               buf = shmp(handle, chan->backend.buf[0].shmp);
        if (uatomic_read(&buf->record_disabled))
                return -EAGAIN;
        ctx->buf = buf;
@@ -189,7 +190,7 @@ int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config,
         * Clear noref flag for this subbuffer.
         */
        lib_ring_buffer_clear_noref(config, &ctx->buf->backend,
-                               subbuf_index(o_end - 1, chan));
+                               subbuf_index(o_end - 1, chan), handle);
 
        ctx->pre_offset = o_begin;
        ctx->buf_offset = o_begin + before_hdr_pad;
@@ -214,9 +215,10 @@ slow_path:
  */
 static inline
 void lib_ring_buffer_switch(const struct lib_ring_buffer_config *config,
-                           struct lib_ring_buffer *buf, enum switch_mode mode)
+                           struct lib_ring_buffer *buf, enum switch_mode mode,
+                           struct shm_handle *handle)
 {
-       lib_ring_buffer_switch_slow(buf, mode);
+       lib_ring_buffer_switch_slow(buf, mode, handle);
 }
 
 /* See ring_buffer_frontend_api.h for lib_ring_buffer_reserve(). */
@@ -234,6 +236,7 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config,
                            const struct lib_ring_buffer_ctx *ctx)
 {
        struct channel *chan = ctx->chan;
+       struct shm_handle *handle = ctx->handle;
        struct lib_ring_buffer *buf = ctx->buf;
        unsigned long offset_end = ctx->buf_offset;
        unsigned long endidx = subbuf_index(offset_end - 1, chan);
@@ -242,7 +245,7 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config,
        /*
         * Must count record before incrementing the commit count.
         */
-       subbuffer_count_record(config, &buf->backend, endidx);
+       subbuffer_count_record(config, &buf->backend, endidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -250,7 +253,7 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config,
         */
        cmm_smp_wmb();
 
-       v_add(config, ctx->slot_size, &shmp(buf->commit_hot)[endidx].cc);
+       v_add(config, ctx->slot_size, &shmp(handle, buf->commit_hot)[endidx].cc);
 
        /*
         * commit count read can race with concurrent OOO commit count updates.
@@ -270,17 +273,17 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config,
         *   count reaches back the reserve offset for a specific sub-buffer,
         *   which is completely independent of the order.
         */
-       commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
+       commit_count = v_read(config, &shmp(handle, buf->commit_hot)[endidx].cc);
 
        lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
-                                     commit_count, endidx);
+                                     commit_count, endidx, handle);
        /*
         * Update used size at each commit. It's needed only for extracting
         * ring_buffer buffers from vmcore, after crash.
         */
        lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
                                             ctx->buf_offset, commit_count,
-                                        ctx->slot_size);
+                                            ctx->slot_size, handle);
 }
 
 /**
index 7c28788a5758468e86b72db958dfaed3f09e2bee..73b44726df2fe9f1c4baaee731418e8fcead28f0 100644 (file)
@@ -144,7 +144,8 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx);
 
 extern
 void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf,
-                                enum switch_mode mode);
+                                enum switch_mode mode,
+                                struct shm_handle *handle);
 
 /* Buffer write helpers */
 
@@ -180,22 +181,24 @@ static inline
 void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config,
                                          struct lib_ring_buffer *buf,
                                          unsigned long commit_count,
-                                         unsigned long idx)
+                                         unsigned long idx,
+                                         struct shm_handle *handle)
 {
        if (config->oops == RING_BUFFER_OOPS_CONSISTENCY)
-               v_set(config, &shmp(buf->commit_hot)[idx].seq, commit_count);
+               v_set(config, &shmp(handle, buf->commit_hot)[idx].seq, commit_count);
 }
 
 static inline
 int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config,
                                 struct lib_ring_buffer *buf,
-                                struct channel *chan)
+                                struct channel *chan,
+                                struct shm_handle *handle)
 {
        unsigned long consumed_old, consumed_idx, commit_count, write_offset;
 
        consumed_old = uatomic_read(&buf->consumed);
        consumed_idx = subbuf_index(consumed_old, chan);
-       commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
+       commit_count = v_read(config, &shmp(handle, buf->commit_cold)[consumed_idx].cc_sb);
        /*
         * No memory barrier here, since we are only interested
         * in a statistically correct polling result. The next poll will
@@ -239,9 +242,10 @@ int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config,
 static inline
 unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config,
                                            struct lib_ring_buffer *buf,
-                                           unsigned long idx)
+                                           unsigned long idx,
+                                           struct shm_handle *handle)
 {
-       return subbuffer_get_data_size(config, &buf->backend, idx);
+       return subbuffer_get_data_size(config, &buf->backend, idx, handle);
 }
 
 /*
@@ -252,7 +256,8 @@ unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config
 static inline
 int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config,
                                      struct lib_ring_buffer *buf,
-                                     struct channel *chan)
+                                     struct channel *chan,
+                                     struct shm_handle *handle)
 {
        unsigned long offset, idx, commit_count;
 
@@ -270,7 +275,7 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi
        do {
                offset = v_read(config, &buf->offset);
                idx = subbuf_index(offset, chan);
-               commit_count = v_read(config, &shmp(buf->commit_hot)[idx].cc);
+               commit_count = v_read(config, &shmp(handle, buf->commit_hot)[idx].cc);
        } while (offset != v_read(config, &buf->offset));
 
        return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
@@ -283,7 +288,8 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                                   struct channel *chan,
                                   unsigned long offset,
                                   unsigned long commit_count,
-                                  unsigned long idx)
+                                  unsigned long idx,
+                                  struct shm_handle *handle)
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -318,7 +324,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                 * The subbuffer size is least 2 bytes (minimum size: 1 page).
                 * This guarantees that old_commit_count + 1 != commit_count.
                 */
-               if (likely(v_cmpxchg(config, &shmp(buf->commit_cold)[idx].cc_sb,
+               if (likely(v_cmpxchg(config, &shmp(handle, buf->commit_cold)[idx].cc_sb,
                                         old_commit_count, old_commit_count + 1)
                           == old_commit_count)) {
                        /*
@@ -330,17 +336,20 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                        tsc = config->cb.ring_buffer_clock_read(chan);
                        v_add(config,
                              subbuffer_get_records_count(config,
-                                                         &buf->backend, idx),
+                                                         &buf->backend,
+                                                         idx, handle),
                              &buf->records_count);
                        v_add(config,
                              subbuffer_count_records_overrun(config,
                                                              &buf->backend,
-                                                             idx),
+                                                             idx, handle),
                              &buf->records_overrun);
                        config->cb.buffer_end(buf, tsc, idx,
                                              lib_ring_buffer_get_data_size(config,
                                                                        buf,
-                                                                       idx));
+                                                                       idx,
+                                                                       handle),
+                                             handle);
 
                        /*
                         * Set noref flag and offset for this subbuffer id.
@@ -348,7 +357,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                         * are ordered before set noref and offset.
                         */
                        lib_ring_buffer_set_noref_offset(config, &buf->backend, idx,
-                                                        buf_trunc_val(offset, chan));
+                                                        buf_trunc_val(offset, chan), handle);
 
                        /*
                         * Order set_noref and record counter updates before the
@@ -358,17 +367,17 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                         */
                        cmm_smp_mb();
                        /* End of exclusive subbuffer access */
-                       v_set(config, &shmp(buf->commit_cold)[idx].cc_sb,
+                       v_set(config, &shmp(handle, buf->commit_cold)[idx].cc_sb,
                              commit_count);
                        lib_ring_buffer_vmcore_check_deliver(config, buf,
-                                                        commit_count, idx);
+                                                commit_count, idx, handle);
 
                        /*
                         * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
                         */
                        if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
                            && uatomic_read(&buf->active_readers)
-                           && lib_ring_buffer_poll_deliver(config, buf, chan)) {
+                           && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
                                //wake_up_interruptible(&buf->read_wait);
                                //wake_up_interruptible(&chan->read_wait);
                        }
@@ -392,7 +401,8 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c
                                          unsigned long idx,
                                          unsigned long buf_offset,
                                          unsigned long commit_count,
-                                         size_t slot_size)
+                                         size_t slot_size,
+                                         struct shm_handle *handle)
 {
        unsigned long offset, commit_seq_old;
 
@@ -410,16 +420,18 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c
        if (unlikely(subbuf_offset(offset - commit_count, chan)))
                return;
 
-       commit_seq_old = v_read(config, &shmp(buf->commit_hot)[idx].seq);
+       commit_seq_old = v_read(config, &shmp(handle, buf->commit_hot)[idx].seq);
        while ((long) (commit_seq_old - commit_count) < 0)
-               commit_seq_old = v_cmpxchg(config, &shmp(buf->commit_hot)[idx].seq,
+               commit_seq_old = v_cmpxchg(config, &shmp(handle, buf->commit_hot)[idx].seq,
                                           commit_seq_old, commit_count);
 }
 
 extern int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                                  struct channel_backend *chanb, int cpu,
-                                 struct shm_header *shm_header);
-extern void lib_ring_buffer_free(struct lib_ring_buffer *buf);
+                                 struct shm_handle *handle,
+                                 struct shm_object *shmobj);
+extern void lib_ring_buffer_free(struct lib_ring_buffer *buf,
+                                struct shm_handle *handle);
 
 /* Keep track of trap nesting inside ring buffer code */
 extern __thread unsigned int lib_ring_buffer_nesting;
index 28179dfc58b8ebc31c75b65115127d9fa73e79a3..974f782e0dc86b3ed3903ec861ede6754504fbd9 100644 (file)
@@ -26,7 +26,7 @@
 #include <ust/usterr-signal-safe.h>
 #include <ust/ringbuffer-config.h>
 #include "backend_types.h"
-#include "shm.h"
+#include "shm_internal.h"
 
 /*
  * A switch is done during tracing or as a final flush after tracing (so it
@@ -50,7 +50,6 @@ struct channel {
        unsigned long read_timer_interval;      /* Reader wakeup (jiffies) */
        //wait_queue_head_t read_wait;          /* reader wait queue */
        int finalized;                          /* Has channel been finalized */
-       DECLARE_SHMP(struct shm_header, shm_header);
 } ____cacheline_aligned;
 
 /* Per-subbuffer commit counters used on the hot path */
index fbf6df537f20de015467a5c416189b31710e0009..3279ccd9ffc8b6e624aa564b722c8f32bfec5b01 100644 (file)
@@ -241,7 +241,7 @@ long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long ar
                                 arg);
        }
        case RING_BUFFER_FLUSH:
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
                return 0;
        default:
                return -ENOIOCTLCMD;
@@ -354,7 +354,7 @@ long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
                return put_ulong(read_offset, arg);
        }
        case RING_BUFFER_FLUSH:
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
                return 0;
        default:
                return -ENOIOCTLCMD;
index 3a5e6a9b4972cc7389453bea722bf4a6831fa5dc..badde80f0b4c40cea03ec04a63e99e201fd0a65d 100644 (file)
@@ -29,9 +29,10 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
                                     struct lib_ring_buffer_backend *bufb,
                                     size_t size, size_t num_subbuf,
                                     int extra_reader_sb,
-                                    struct shm_header *shm_header)
+                                    struct shm_handle *handle,
+                                    struct shm_object *shmobj)
 {
-       struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+       struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
        unsigned long subbuf_size, mmap_offset = 0;
        unsigned long num_subbuf_alloc;
        unsigned long i;
@@ -42,43 +43,42 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
        if (extra_reader_sb)
                num_subbuf_alloc++;
 
-       /* Align the entire buffer backend data on PAGE_SIZE */
-       align_shm(shm_header, PAGE_SIZE);
-       set_shmp(bufb->array, zalloc_shm(shm_header,
-                       sizeof(*bufb->array) * num_subbuf_alloc));
-       if (unlikely(!shmp(bufb->array)))
+       align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_pages_shmp));
+       set_shmp(bufb->array, zalloc_shm(shmobj,
+                       sizeof(struct lib_ring_buffer_backend_pages_shmp) * num_subbuf_alloc));
+       if (unlikely(!shmp(handle, bufb->array)))
                goto array_error;
 
        /*
         * This is the largest element (the buffer pages) which needs to
         * be aligned on PAGE_SIZE.
         */
-       align_shm(shm_header, PAGE_SIZE);
-       set_shmp(bufb->memory_map, zalloc_shm(shm_header,
+       align_shm(shmobj, PAGE_SIZE);
+       set_shmp(bufb->memory_map, zalloc_shm(shmobj,
                        subbuf_size * num_subbuf_alloc));
-       if (unlikely(!shmp(bufb->memory_map)))
+       if (unlikely(!shmp(handle, bufb->memory_map)))
                goto memory_map_error;
 
        /* Allocate backend pages array elements */
        for (i = 0; i < num_subbuf_alloc; i++) {
-               align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_pages));
-               set_shmp(bufb->array[i],
-                       zalloc_shm(shm_header,
+               align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_pages));
+               set_shmp(shmp(handle, bufb->array)[i].shmp,
+                       zalloc_shm(shmobj,
                                sizeof(struct lib_ring_buffer_backend_pages)));
-               if (!shmp(bufb->array[i]))
+               if (!shmp(handle, shmp(handle, bufb->array)[i].shmp))
                        goto free_array;
        }
 
        /* Allocate write-side subbuffer table */
-       align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_subbuffer));
-       bufb->buf_wsb = zalloc_shm(shm_header,
+       align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_subbuffer));
+       set_shmp(bufb->buf_wsb, zalloc_shm(shmobj,
                                sizeof(struct lib_ring_buffer_backend_subbuffer)
-                               * num_subbuf);
-       if (unlikely(!shmp(bufb->buf_wsb)))
+                               * num_subbuf));
+       if (unlikely(!shmp(handle, bufb->buf_wsb)))
                goto free_array;
 
        for (i = 0; i < num_subbuf; i++)
-               shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
+               shmp(handle, bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
 
        /* Assign read-side subbuffer table */
        if (extra_reader_sb)
@@ -89,19 +89,19 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
 
        /* Assign pages to page index */
        for (i = 0; i < num_subbuf_alloc; i++) {
-               set_shmp(shmp(bufb->array)[i]->p,
-                        &shmp(bufb->memory_map)[i * subbuf_size]);
+               struct shm_ref ref;
+
+               ref.index = bufb->memory_map._ref.index;
+               ref.offset = bufb->memory_map._ref.offset;
+               ref.offset += i * subbuf_size;
+
+               set_shmp(shmp(handle, shmp(handle, bufb->array)[i].shmp)->p,
+                        ref);
                if (config->output == RING_BUFFER_MMAP) {
-                       shmp(bufb->array)[i]->mmap_offset = mmap_offset;
+                       shmp(handle, shmp(handle, bufb->array)[i].shmp)->mmap_offset = mmap_offset;
                        mmap_offset += subbuf_size;
                }
        }
-       /*
-        * Align the end of each buffer backend data on PAGE_SIZE, to
-        * behave like an array which contains elements that need to be
-        * aligned on PAGE_SIZE.
-        */
-       align_shm(shm_header, PAGE_SIZE);
 
        return 0;
 
@@ -115,17 +115,18 @@ array_error:
 
 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
                                   struct channel_backend *chanb, int cpu,
-                                  struct shm_header *shm_header)
+                                  struct shm_handle *handle,
+                                  struct shm_object *shmobj)
 {
        const struct lib_ring_buffer_config *config = chanb->config;
 
-       set_shmp(&bufb->chan, caa_container_of(chanb, struct channel, backend));
+       set_shmp(bufb->chan, handle->chan._ref);
        bufb->cpu = cpu;
 
        return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
                                                chanb->num_subbuf,
                                                chanb->extra_reader_sb,
-                                               shm_header);
+                                               handle, shmobj);
 }
 
 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
@@ -136,9 +137,10 @@ void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
        bufb->allocated = 0;
 }
 
-void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
+void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb,
+                                  struct shm_handle *handle)
 {
-       struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+       struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
        const struct lib_ring_buffer_config *config = chanb->config;
        unsigned long num_subbuf_alloc;
        unsigned int i;
@@ -148,7 +150,7 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
                num_subbuf_alloc++;
 
        for (i = 0; i < chanb->num_subbuf; i++)
-               shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
+               shmp(handle, bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
        if (chanb->extra_reader_sb)
                bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
                                                num_subbuf_alloc - 1);
@@ -157,9 +159,9 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
 
        for (i = 0; i < num_subbuf_alloc; i++) {
                /* Don't reset mmap_offset */
-               v_set(config, &shmp(bufb->array)[i]->records_commit, 0);
-               v_set(config, &shmp(bufb->array)[i]->records_unread, 0);
-               shmp(bufb->array)[i]->data_size = 0;
+               v_set(config, &shmp(handle, shmp(handle, bufb->array)[i].shmp)->records_commit, 0);
+               v_set(config, &shmp(handle, shmp(handle, bufb->array)[i].shmp)->records_unread, 0);
+               shmp(handle, shmp(handle, bufb->array)[i].shmp)->data_size = 0;
                /* Don't reset backend page and virt addresses */
        }
        /* Don't reset num_pages_per_subbuf, cpu, allocated */
@@ -192,7 +194,7 @@ void channel_backend_reset(struct channel_backend *chanb)
  * @parent: dentry of parent directory, %NULL for root directory
  * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
  * @num_subbuf: number of sub-buffers (power of 2)
- * @shm_header: shared memory header
+ * @shm_handle: shared memory handle
  *
  * Returns channel pointer if successful, %NULL otherwise.
  *
@@ -206,11 +208,12 @@ int channel_backend_init(struct channel_backend *chanb,
                         const char *name,
                         const struct lib_ring_buffer_config *config,
                         void *priv, size_t subbuf_size, size_t num_subbuf,
-                        struct shm_header *shm_header)
+                        struct shm_handle *handle)
 {
        struct channel *chan = caa_container_of(chanb, struct channel, backend);
        unsigned int i;
        int ret;
+       size_t shmsize = 0, bufshmsize = 0, num_subbuf_alloc;
 
        if (!name)
                return -EPERM;
@@ -245,39 +248,58 @@ int channel_backend_init(struct channel_backend *chanb,
        chanb->name[NAME_MAX - 1] = '\0';
        chanb->config = config;
 
+       /* Per-cpu buffer size: control (prior to backend) */
+       shmsize = offset_align(shmsize, __alignof__(struct lib_ring_buffer));
+       shmsize += sizeof(struct lib_ring_buffer);
+
+       /* Per-cpu buffer size: backend */
+       /* num_subbuf + 1 is the worse case */
+       num_subbuf_alloc = num_subbuf + 1;
+       shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer_backend_pages_shmp));
+       shmsize += sizeof(struct lib_ring_buffer_backend_pages_shmp) * num_subbuf_alloc;
+       shmsize += offset_align(bufshmsize, PAGE_SIZE);
+       shmsize += subbuf_size * num_subbuf_alloc;
+       shmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
+       shmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
+       shmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
+       shmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
+       /* Per-cpu buffer size: control (after backend) */
+       shmsize += offset_align(shmsize, __alignof__(struct commit_counters_hot));
+       shmsize += sizeof(struct commit_counters_hot) * num_subbuf;
+       shmsize += offset_align(shmsize, __alignof__(struct commit_counters_cold));
+       shmsize += sizeof(struct commit_counters_cold) * num_subbuf;
+
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                struct lib_ring_buffer *buf;
-               size_t alloc_size;
-
-               /* Allocating the buffer per-cpu structures */
-               align_shm(shm_header, __alignof__(struct lib_ring_buffer));
-               alloc_size = sizeof(struct lib_ring_buffer);
-               buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus());
-               if (!buf)
-                       goto end;
-               set_shmp(chanb->buf, buf);
-
                /*
                 * We need to allocate for all possible cpus.
                 */
                for_each_possible_cpu(i) {
-                       ret = lib_ring_buffer_create(&shmp(chanb->buf)[i],
-                                                    chanb, i, shm_header);
+                       struct shm_object *shmobj;
+
+                       shmobj = shm_object_table_append(handle->table, shmsize);
+                       align_shm(shmobj, __alignof__(struct lib_ring_buffer));
+                       set_shmp(chanb->buf[i].shmp, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer)));
+                       buf = shmp(handle, chanb->buf[i].shmp);
+                       if (!buf)
+                               goto end;
+                       ret = lib_ring_buffer_create(buf, chanb, i,
+                                       handle, shmobj);
                        if (ret)
                                goto free_bufs; /* cpu hotplug locked */
                }
        } else {
+               struct shm_object *shmobj;
                struct lib_ring_buffer *buf;
-               size_t alloc_size;
 
-               align_shm(shm_header, __alignof__(struct lib_ring_buffer));
-               alloc_size = sizeof(struct lib_ring_buffer);
-               buf = zalloc_shm(shm_header, alloc_size);
+               shmobj = shm_object_table_append(handle->table, shmsize);
+               align_shm(shmobj, __alignof__(struct lib_ring_buffer));
+               set_shmp(chanb->buf[0].shmp, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer)));
+               buf = shmp(handle, chanb->buf[0].shmp);
                if (!buf)
                        goto end;
-               set_shmp(chanb->buf, buf);
-               ret = lib_ring_buffer_create(shmp(chanb->buf), chanb, -1,
-                                            shm_header);
+               ret = lib_ring_buffer_create(buf, chanb, -1,
+                                       handle, shmobj);
                if (ret)
                        goto free_bufs;
        }
@@ -288,11 +310,11 @@ int channel_backend_init(struct channel_backend *chanb,
 free_bufs:
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_possible_cpu(i) {
-                       struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
+                       struct lib_ring_buffer *buf = shmp(handle, chanb->buf[i].shmp);
 
                        if (!buf->backend.allocated)
                                continue;
-                       lib_ring_buffer_free(buf);
+                       lib_ring_buffer_free(buf, handle);
                }
        }
        /* We only free the buffer data upon shm teardown */
@@ -306,24 +328,25 @@ end:
  *
  * Destroy all channel buffers and frees the channel.
  */
-void channel_backend_free(struct channel_backend *chanb)
+void channel_backend_free(struct channel_backend *chanb,
+                         struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chanb->config;
        unsigned int i;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_possible_cpu(i) {
-                       struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
+                       struct lib_ring_buffer *buf = shmp(handle, chanb->buf[i].shmp);
 
                        if (!buf->backend.allocated)
                                continue;
-                       lib_ring_buffer_free(buf);
+                       lib_ring_buffer_free(buf, handle);
                }
        } else {
-               struct lib_ring_buffer *buf = shmp(chanb->buf);
+               struct lib_ring_buffer *buf = shmp(handle, chanb->buf[0].shmp);
 
                CHAN_WARN_ON(chanb, !buf->backend.allocated);
-               lib_ring_buffer_free(buf);
+               lib_ring_buffer_free(buf, handle);
        }
        /* We only free the buffer data upon shm teardown */
 }
@@ -339,12 +362,12 @@ void channel_backend_free(struct channel_backend *chanb)
  * Returns the length copied.
  */
 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
-                           void *dest, size_t len)
+                           void *dest, size_t len, struct shm_handle *handle)
 {
-       struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+       struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
        const struct lib_ring_buffer_config *config = chanb->config;
        ssize_t orig_len;
-       struct lib_ring_buffer_backend_pages *rpages;
+       struct lib_ring_buffer_backend_pages_shmp *rpages;
        unsigned long sb_bindex, id;
 
        orig_len = len;
@@ -354,7 +377,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
                return 0;
        id = bufb->buf_rsb.id;
        sb_bindex = subbuffer_id_get_index(config, id);
-       rpages = shmp(bufb->array)[sb_bindex];
+       rpages = &shmp(handle, bufb->array)[sb_bindex];
        /*
         * Underlying layer should never ask for reads across
         * subbuffers.
@@ -362,7 +385,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
        CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
        CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
                     && subbuffer_id_is_noref(config, id));
-       memcpy(dest, shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), len);
+       memcpy(dest, shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)), len);
        return orig_len;
 }
 
@@ -377,20 +400,20 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
  * Should be protected by get_subbuf/put_subbuf.
  */
 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
-                             void *dest, size_t len)
+                             void *dest, size_t len, struct shm_handle *handle)
 {
-       struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+       struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
        const struct lib_ring_buffer_config *config = chanb->config;
        ssize_t string_len, orig_offset;
        char *str;
-       struct lib_ring_buffer_backend_pages *rpages;
+       struct lib_ring_buffer_backend_pages_shmp *rpages;
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
        orig_offset = offset;
        id = bufb->buf_rsb.id;
        sb_bindex = subbuffer_id_get_index(config, id);
-       rpages = shmp(bufb->array)[sb_bindex];
+       rpages = &shmp(handle, bufb->array)[sb_bindex];
        /*
         * Underlying layer should never ask for reads across
         * subbuffers.
@@ -398,7 +421,7 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse
        CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
        CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
                     && subbuffer_id_is_noref(config, id));
-       str = (char *)shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
+       str = (char *)shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1));
        string_len = strnlen(str, len);
        if (dest && len) {
                memcpy(dest, str, string_len);
@@ -418,20 +441,21 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse
  * as long as the write is never bigger than a page size.
  */
 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
-                                         size_t offset)
+                                         size_t offset,
+                                         struct shm_handle *handle)
 {
-       struct lib_ring_buffer_backend_pages *rpages;
-       struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+       struct lib_ring_buffer_backend_pages_shmp *rpages;
+       struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
        const struct lib_ring_buffer_config *config = chanb->config;
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
        id = bufb->buf_rsb.id;
        sb_bindex = subbuffer_id_get_index(config, id);
-       rpages = shmp(bufb->array)[sb_bindex];
+       rpages = &shmp(handle, bufb->array)[sb_bindex];
        CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
                     && subbuffer_id_is_noref(config, id));
-       return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
+       return shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1));
 }
 
 /**
@@ -445,20 +469,21 @@ void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
  * address, as long as the write is never bigger than a page size.
  */
 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
-                                    size_t offset)
+                                    size_t offset,
+                                    struct shm_handle *handle)
 {
        size_t sbidx;
-       struct lib_ring_buffer_backend_pages *rpages;
-       struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+       struct lib_ring_buffer_backend_pages_shmp *rpages;
+       struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
        const struct lib_ring_buffer_config *config = chanb->config;
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
        sbidx = offset >> chanb->subbuf_size_order;
-       id = shmp(bufb->buf_wsb)[sbidx].id;
+       id = shmp(handle, bufb->buf_wsb)[sbidx].id;
        sb_bindex = subbuffer_id_get_index(config, id);
-       rpages = shmp(bufb->array)[sb_bindex];
+       rpages = &shmp(handle, bufb->array)[sb_bindex];
        CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
                     && subbuffer_id_is_noref(config, id));
-       return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
+       return shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1));
 }
index b87e554e0c547b29ca0182500660a1393792bf42..6cd869c38c9e839c77aca6b322eb46fc7f0af3fc 100644 (file)
@@ -85,16 +85,18 @@ __thread unsigned int lib_ring_buffer_nesting;
 
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
-                                 struct lib_ring_buffer *buf, int cpu);
+                                 struct lib_ring_buffer *buf, int cpu,
+                                 struct shm_handle *handle);
 
 /*
  * Must be called under cpu hotplug protection.
  */
-void lib_ring_buffer_free(struct lib_ring_buffer *buf)
+void lib_ring_buffer_free(struct lib_ring_buffer *buf,
+                         struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
 
-       lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
+       lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle);
        /* buf->commit_hot will be freed by shm teardown */
        /* buf->commit_cold will be freed by shm teardown */
 
@@ -110,9 +112,10 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf)
  * should not be using the iterator concurrently with reset. The previous
  * current iterator record is reset.
  */
-void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
+void lib_ring_buffer_reset(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned int i;
 
@@ -122,14 +125,14 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
         */
        v_set(config, &buf->offset, 0);
        for (i = 0; i < chan->backend.num_subbuf; i++) {
-               v_set(config, &shmp(buf->commit_hot)[i].cc, 0);
-               v_set(config, &shmp(buf->commit_hot)[i].seq, 0);
-               v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0);
+               v_set(config, &shmp(handle, buf->commit_hot)[i].cc, 0);
+               v_set(config, &shmp(handle, buf->commit_hot)[i].seq, 0);
+               v_set(config, &shmp(handle, buf->commit_cold)[i].cc_sb, 0);
        }
        uatomic_set(&buf->consumed, 0);
        uatomic_set(&buf->record_disabled, 0);
        v_set(config, &buf->last_tsc, 0);
-       lib_ring_buffer_backend_reset(&buf->backend);
+       lib_ring_buffer_backend_reset(&buf->backend, handle);
        /* Don't reset number of active readers */
        v_set(config, &buf->records_lost_full, 0);
        v_set(config, &buf->records_lost_wrap, 0);
@@ -166,7 +169,8 @@ void channel_reset(struct channel *chan)
  */
 int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                           struct channel_backend *chanb, int cpu,
-                          struct shm_header *shm_header)
+                          struct shm_handle *handle,
+                          struct shm_object *shmobj)
 {
        const struct lib_ring_buffer_config *config = chanb->config;
        struct channel *chan = caa_container_of(chanb, struct channel, backend);
@@ -181,26 +185,24 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                return 0;
 
        ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
-                       cpu, shm_header);
+                       cpu, handle, shmobj);
        if (ret)
                return ret;
 
-       align_shm(shm_header,
-               max(__alignof__(struct commit_counters_hot),
-                   __alignof__(struct commit_counters_cold)));
-       set_shmp(&buf->commit_hot,
-                zalloc_shm(shm_header,
-                       sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
-       if (!shmp(buf->commit_hot)) {
+       align_shm(shmobj, __alignof__(struct commit_counters_hot));
+       set_shmp(buf->commit_hot,
+                zalloc_shm(shmobj,
+                       sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
+       if (!shmp(handle, buf->commit_hot)) {
                ret = -ENOMEM;
                goto free_chanbuf;
        }
 
-       align_shm(shm_header, __alignof__(struct commit_counters_cold));
-       set_shmp(&buf->commit_cold,
-                zalloc_shm(shm_header,
-                       sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
-       if (!shmp(buf->commit_cold)) {
+       align_shm(shmobj, __alignof__(struct commit_counters_cold));
+       set_shmp(buf->commit_cold,
+                zalloc_shm(shmobj,
+                       sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
+       if (!shmp(handle, buf->commit_cold)) {
                ret = -ENOMEM;
                goto free_commit;
        }
@@ -214,13 +216,13 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
         */
        subbuf_header_size = config->cb.subbuffer_header_size();
        v_set(config, &buf->offset, subbuf_header_size);
-       subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id);
-       tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan));
-       config->cb.buffer_begin(buf, tsc, 0);
-       v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc);
+       subbuffer_id_clear_noref(config, &shmp(handle, buf->backend.buf_wsb)[0].id);
+       tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
+       config->cb.buffer_begin(buf, tsc, 0, handle);
+       v_add(config, subbuf_header_size, &shmp(handle, buf->commit_hot)[0].cc);
 
        if (config->cb.buffer_create) {
-               ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
+               ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
                if (ret)
                        goto free_init;
        }
@@ -237,17 +239,18 @@ free_chanbuf:
        return ret;
 }
 
+#if 0
 static void switch_buffer_timer(unsigned long data)
 {
        struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
 
        /*
         * Only flush buffers periodically if readers are active.
         */
        if (uatomic_read(&buf->active_readers))
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
 
        //TODO timers
        //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
@@ -257,10 +260,12 @@ static void switch_buffer_timer(unsigned long data)
        //      mod_timer(&buf->switch_timer,
        //                jiffies + chan->switch_timer_interval);
 }
+#endif //0
 
-static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
 
        if (!chan->switch_timer_interval || buf->switch_timer_enabled)
@@ -277,9 +282,10 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
        buf->switch_timer_enabled = 1;
 }
 
-static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
 
        if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
                return;
@@ -289,13 +295,14 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
        buf->switch_timer_enabled = 0;
 }
 
+#if 0
 /*
  * Polling timer to check the channels for data.
  */
 static void read_buffer_timer(unsigned long data)
 {
        struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
 
        CHAN_WARN_ON(chan, !buf->backend.allocated);
@@ -315,10 +322,12 @@ static void read_buffer_timer(unsigned long data)
        //      mod_timer(&buf->read_timer,
        //                jiffies + chan->read_timer_interval);
 }
+#endif //0
 
-static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
@@ -339,9 +348,10 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
        buf->read_timer_enabled = 1;
 }
 
-static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
@@ -355,7 +365,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
         * do one more check to catch data that has been written in the last
         * timer period.
         */
-       if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
+       if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
                //TODO
                //wake_up_interruptible(&buf->read_wait);
                //wake_up_interruptible(&chan->read_wait);
@@ -363,45 +373,36 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
        buf->read_timer_enabled = 0;
 }
 
-static void channel_unregister_notifiers(struct channel *chan)
+static void channel_unregister_notifiers(struct channel *chan,
+                          struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        int cpu;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_possible_cpu(cpu) {
-                       struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
+                       struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
 
-                       lib_ring_buffer_stop_switch_timer(buf);
-                       lib_ring_buffer_stop_read_timer(buf);
+                       lib_ring_buffer_stop_switch_timer(buf, handle);
+                       lib_ring_buffer_stop_read_timer(buf, handle);
                }
        } else {
-               struct lib_ring_buffer *buf = shmp(chan->backend.buf);
+               struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
 
-               lib_ring_buffer_stop_switch_timer(buf);
-               lib_ring_buffer_stop_read_timer(buf);
+               lib_ring_buffer_stop_switch_timer(buf, handle);
+               lib_ring_buffer_stop_read_timer(buf, handle);
        }
        //channel_backend_unregister_notifiers(&chan->backend);
 }
 
-static void channel_free(struct shm_handle *handle)
+static void channel_free(struct channel *chan, struct shm_handle *handle)
 {
-       struct shm_header *header = handle->header;
-       struct channel *chan = shmp(header->chan);
        int ret;
 
-       channel_backend_free(&chan->backend);
+       channel_backend_free(&chan->backend, handle);
        /* chan is freed by shm teardown */
-       ret = munmap(header, header->shm_size);
-       if (ret) {
-               PERROR("umnmap");
-               assert(0);
-       }
-       ret = close(handle->shmfd);
-       if (ret) {
-               PERROR("close");
-               assert(0);
-       }
+       shm_object_table_destroy(handle->table);
+       free(handle);
 }
 
 /**
@@ -428,12 +429,11 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
                   size_t num_subbuf, unsigned int switch_timer_interval,
                   unsigned int read_timer_interval)
 {
-       int ret, cpu, shmfd;
+       int ret, cpu;
+       size_t shmsize;
        struct channel *chan;
-       size_t shmsize, bufshmsize, bufshmalign;
-       struct shm_header *shm_header;
-       unsigned long num_subbuf_alloc;
        struct shm_handle *handle;
+       struct shm_object *shmobj;
 
        if (lib_ring_buffer_check_config(config, switch_timer_interval,
                                         read_timer_interval))
@@ -443,88 +443,28 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
        if (!handle)
                return NULL;
 
-       /* Calculate the shm allocation layout */
-       shmsize = sizeof(struct shm_header);
-       shmsize += offset_align(shmsize, __alignof__(struct channel));
-       shmsize += sizeof(struct channel);
-
-       /* Per-cpu buffer size: control (prior to backend) */
-       shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer));
-       bufshmsize = sizeof(struct lib_ring_buffer);
-       shmsize += bufshmsize * num_possible_cpus();
-
-       /* Per-cpu buffer size: backend */
-       shmsize += offset_align(shmsize, PAGE_SIZE);
-       /* num_subbuf + 1 is the worse case */
-       num_subbuf_alloc = num_subbuf + 1;
-       bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
-       bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
-       bufshmsize += subbuf_size * num_subbuf_alloc;
-       bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
-       bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
-       bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
-       bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
-       bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
-       shmsize += bufshmsize * num_possible_cpus();
-
-       /* Per-cpu buffer size: control (after backend) */
-       shmsize += offset_align(shmsize,
-                       max(__alignof__(struct commit_counters_hot),
-                           __alignof__(struct commit_counters_cold)));
-       bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf;
-       bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold));
-       bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
-       shmsize += bufshmsize * num_possible_cpus();
-
-       /*
-        * Allocate shm, and immediately unlink its shm oject, keeping
-        * only the file descriptor as a reference to the object. If it
-        * already exists (caused by short race window during which the
-        * global object exists in a concurrent shm_open), simply retry.
-        */
-       do {
-               shmfd = shm_open("/ust-shm-tmp",
-                                 O_CREAT | O_EXCL | O_RDWR, 0700);
-       } while (shmfd < 0 && errno == EEXIST);
-       if (shmfd < 0) {
-               PERROR("shm_open");
-               goto error_shm_open;
-       }
-       ret = shm_unlink("/ust-shm-tmp");
-       if (ret) {
-               PERROR("shm_unlink");
-               goto error_unlink;
-       }
-       ret = ftruncate(shmfd, shmsize);
-       if (ret) {
-               PERROR("ftruncate");
-               goto error_ftruncate;
-       }
+       /* Allocate table for channel + per-cpu buffers */
+       handle->table = shm_object_table_create(1 + num_possible_cpus());
+       if (!handle->table)
+               goto error_table_alloc;
 
-       shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE,
-                         MAP_SHARED, shmfd, 0);
-       if (shm_header == MAP_FAILED) {
-               PERROR("mmap");
-               goto error_mmap;
-       }
-
-       shm_header->magic = SHM_MAGIC;
-       shm_header->major = SHM_MAJOR;
-       shm_header->major = SHM_MINOR;
-       shm_header->bits_per_long = CAA_BITS_PER_LONG;
-       shm_header->shm_size = shmsize;
-       shm_header->shm_allocated = sizeof(struct shm_header);
+       /* Calculate the shm allocation layout */
+       shmsize = sizeof(struct channel);
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+               shmsize += sizeof(struct lib_ring_buffer_shmp) * num_possible_cpus();
+       else
+               shmsize += sizeof(struct lib_ring_buffer_shmp);
 
-       align_shm(shm_header, __alignof__(struct channel));
-       chan = zalloc_shm(shm_header, sizeof(struct channel));
+       shmobj = shm_object_table_append(handle->table, shmsize);
+       set_shmp(handle->chan, zalloc_shm(shmobj, sizeof(struct channel)));
+       chan = shmp(handle, handle->chan);
        if (!chan)
-               goto destroy_shmem;
-       set_shmp(shm_header->chan, chan);
+               goto error_append;
 
        ret = channel_backend_init(&chan->backend, name, config, priv,
-                                  subbuf_size, num_subbuf, shm_header);
+                                  subbuf_size, num_subbuf, handle);
        if (ret)
-               goto destroy_shmem;
+               goto error_backend_init;
 
        chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
        //TODO
@@ -541,44 +481,31 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
                 * In that off case, we need to allocate for all possible cpus.
                 */
                for_each_possible_cpu(cpu) {
-                       struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
-                       lib_ring_buffer_start_switch_timer(buf);
-                       lib_ring_buffer_start_read_timer(buf);
+                       struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
+                       lib_ring_buffer_start_switch_timer(buf, handle);
+                       lib_ring_buffer_start_read_timer(buf, handle);
                }
        } else {
-               struct lib_ring_buffer *buf = shmp(chan->backend.buf);
+               struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
 
-               lib_ring_buffer_start_switch_timer(buf);
-               lib_ring_buffer_start_read_timer(buf);
+               lib_ring_buffer_start_switch_timer(buf, handle);
+               lib_ring_buffer_start_read_timer(buf, handle);
        }
 
-       handle->header = shm_header;
-       handle->shmfd = shmfd;
        return handle;
 
-destroy_shmem:
-       ret = munmap(shm_header, shmsize);
-       if (ret) {
-               PERROR("umnmap");
-               assert(0);
-       }
-error_mmap:
-error_ftruncate:
-error_unlink:
-       ret = close(shmfd);
-       if (ret) {
-               PERROR("close");
-               assert(0);
-       }
-error_shm_open:
+error_backend_init:
+error_append:
+       shm_object_table_destroy(handle->table);
+error_table_alloc:
        free(handle);
        return NULL;
 }
 
 static
-void channel_release(struct shm_handle *handle)
+void channel_release(struct channel *chan, struct shm_handle *handle)
 {
-       channel_free(handle);
+       channel_free(chan, handle);
 }
 
 /**
@@ -592,26 +519,25 @@ void channel_release(struct shm_handle *handle)
  * They should release their handle at that point.  Returns the private
  * data pointer.
  */
-void *channel_destroy(struct shm_handle *handle)
+void *channel_destroy(struct channel *chan, struct shm_handle *handle)
 {
-       struct shm_header *header = handle->header;
-       struct channel *chan = shmp(header->chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        void *priv;
        int cpu;
 
-       channel_unregister_notifiers(chan);
+       channel_unregister_notifiers(chan, handle);
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_channel_cpu(cpu, chan) {
-                       struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
+                       struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
 
                        if (config->cb.buffer_finalize)
                                config->cb.buffer_finalize(buf,
                                                           chan->backend.priv,
-                                                          cpu);
+                                                          cpu, handle);
                        if (buf->backend.allocated)
-                               lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                               lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
+                                               handle);
                        /*
                         * Perform flush before writing to finalized.
                         */
@@ -620,12 +546,13 @@ void *channel_destroy(struct shm_handle *handle)
                        //wake_up_interruptible(&buf->read_wait);
                }
        } else {
-               struct lib_ring_buffer *buf = shmp(chan->backend.buf);
+               struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
 
                if (config->cb.buffer_finalize)
-                       config->cb.buffer_finalize(buf, chan->backend.priv, -1);
+                       config->cb.buffer_finalize(buf, chan->backend.priv, -1, handle);
                if (buf->backend.allocated)
-                       lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                       lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
+                                               handle);
                /*
                 * Perform flush before writing to finalized.
                 */
@@ -640,24 +567,26 @@ void *channel_destroy(struct shm_handle *handle)
         * sessiond/consumer are keeping a reference on the shm file
         * descriptor directly. No need to refcount.
         */
-       channel_release(handle);
+       channel_release(chan, handle);
        priv = chan->backend.priv;
        return priv;
 }
 
 struct lib_ring_buffer *channel_get_ring_buffer(
                                        const struct lib_ring_buffer_config *config,
-                                       struct channel *chan, int cpu)
+                                       struct channel *chan, int cpu,
+                                       struct shm_handle *handle)
 {
        if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
-               return shmp(chan->backend.buf);
+               return shmp(handle, chan->backend.buf[0].shmp);
        else
-               return &shmp(chan->backend.buf)[cpu];
+               return shmp(handle, chan->backend.buf[cpu].shmp);
 }
 
-int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
+int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
+                             struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
 
        if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
                return -EBUSY;
@@ -665,9 +594,10 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
        return 0;
 }
 
-void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
+void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
+                                 struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
 
        CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
        cmm_smp_mb();
@@ -685,9 +615,10 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
  */
 
 int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
-                            unsigned long *consumed, unsigned long *produced)
+                            unsigned long *consumed, unsigned long *produced,
+                            struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long consumed_cur, write_offset;
        int finalized;
@@ -738,10 +669,11 @@ nodata:
  * @consumed_new: new consumed count value
  */
 void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
-                                  unsigned long consumed_new)
+                                  unsigned long consumed_new,
+                                  struct shm_handle *handle)
 {
        struct lib_ring_buffer_backend *bufb = &buf->backend;
-       struct channel *chan = shmp(bufb->chan);
+       struct channel *chan = shmp(handle, bufb->chan);
        unsigned long consumed;
 
        CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
@@ -766,9 +698,10 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
  * data to read at consumed position, or 0 if the get operation succeeds.
  */
 int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
-                              unsigned long consumed)
+                              unsigned long consumed,
+                              struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
        int ret;
@@ -782,7 +715,7 @@ retry:
        cmm_smp_rmb();
        consumed_cur = uatomic_read(&buf->consumed);
        consumed_idx = subbuf_index(consumed, chan);
-       commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
+       commit_count = v_read(config, &shmp(handle, buf->commit_cold)[consumed_idx].cc_sb);
        /*
         * Make sure we read the commit count before reading the buffer
         * data and the write offset. Correct consumed offset ordering
@@ -832,7 +765,8 @@ retry:
         * looking for matches the one contained in the subbuffer id.
         */
        ret = update_read_sb_index(config, &buf->backend, &chan->backend,
-                                  consumed_idx, buf_trunc_val(consumed, chan));
+                                  consumed_idx, buf_trunc_val(consumed, chan),
+                                  handle);
        if (ret)
                goto retry;
        subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
@@ -857,10 +791,11 @@ nodata:
  * lib_ring_buffer_put_subbuf - release exclusive subbuffer access
  * @buf: ring buffer
  */
-void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
+void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf,
+                               struct shm_handle *handle)
 {
        struct lib_ring_buffer_backend *bufb = &buf->backend;
-       struct channel *chan = shmp(bufb->chan);
+       struct channel *chan = shmp(handle, bufb->chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long read_sb_bindex, consumed_idx, consumed;
 
@@ -885,9 +820,9 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
         */
        read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
        v_add(config, v_read(config,
-                            &shmp(bufb->array)[read_sb_bindex]->records_unread),
+                            &shmp(handle, shmp(handle, bufb->array)[read_sb_bindex].shmp)->records_unread),
              &bufb->records_read);
-       v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0);
+       v_set(config, &shmp(handle, shmp(handle, bufb->array)[read_sb_bindex].shmp)->records_unread, 0);
        CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
                     && subbuffer_id_is_noref(config, bufb->buf_rsb.id));
        subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
@@ -902,7 +837,8 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
         */
        consumed_idx = subbuf_index(consumed, chan);
        update_read_sb_index(config, &buf->backend, &chan->backend,
-                            consumed_idx, buf_trunc_val(consumed, chan));
+                            consumed_idx, buf_trunc_val(consumed, chan),
+                            handle);
        /*
         * update_read_sb_index return value ignored. Don't exchange sub-buffer
         * if the writer concurrently updated it.
@@ -917,14 +853,15 @@ static
 void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
                                            struct channel *chan,
                                            unsigned long cons_offset,
-                                           int cpu)
+                                           int cpu,
+                                           struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long cons_idx, commit_count, commit_count_sb;
 
        cons_idx = subbuf_index(cons_offset, chan);
-       commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc);
-       commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb);
+       commit_count = v_read(config, &shmp(handle, buf->commit_hot)[cons_idx].cc);
+       commit_count_sb = v_read(config, &shmp(handle, buf->commit_cold)[cons_idx].cc_sb);
 
        if (subbuf_offset(commit_count, chan) != 0)
                ERRMSG("ring buffer %s, cpu %d: "
@@ -942,7 +879,8 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
 static
 void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
                                         struct channel *chan,
-                                        void *priv, int cpu)
+                                        void *priv, int cpu,
+                                        struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long write_offset, cons_offset;
@@ -972,12 +910,13 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
                     - cons_offset) > 0;
             cons_offset = subbuf_align(cons_offset, chan))
                lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
-                                                      cpu);
+                                                      cpu, handle);
 }
 
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
-                                 struct lib_ring_buffer *buf, int cpu)
+                                 struct lib_ring_buffer *buf, int cpu,
+                                 struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        void *priv = chan->backend.priv;
@@ -999,7 +938,7 @@ void lib_ring_buffer_print_errors(struct channel *chan,
                       v_read(config, &buf->records_lost_wrap),
                       v_read(config, &buf->records_lost_big));
 
-       lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
+       lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
 }
 
 /*
@@ -1011,13 +950,14 @@ static
 void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
                                      struct channel *chan,
                                      struct switch_offsets *offsets,
-                                     u64 tsc)
+                                     u64 tsc,
+                                     struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long oldidx = subbuf_index(offsets->old, chan);
        unsigned long commit_count;
 
-       config->cb.buffer_begin(buf, tsc, oldidx);
+       config->cb.buffer_begin(buf, tsc, oldidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1025,14 +965,15 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
         */
        cmm_smp_wmb();
        v_add(config, config->cb.subbuffer_header_size(),
-             &shmp(buf->commit_hot)[oldidx].cc);
-       commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
+             &shmp(handle, buf->commit_hot)[oldidx].cc);
+       commit_count = v_read(config, &shmp(handle, buf->commit_hot)[oldidx].cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
-                                     commit_count, oldidx);
+                                     commit_count, oldidx, handle);
        lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
                                             offsets->old, commit_count,
-                                            config->cb.subbuffer_header_size());
+                                            config->cb.subbuffer_header_size(),
+                                            handle);
 }
 
 /*
@@ -1047,7 +988,8 @@ static
 void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
                                    struct channel *chan,
                                    struct switch_offsets *offsets,
-                                   u64 tsc)
+                                   u64 tsc,
+                                   struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
@@ -1055,20 +997,21 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
 
        data_size = subbuf_offset(offsets->old - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
-       subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
+       subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
+                               handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
         * determine that the subbuffer is full.
         */
        cmm_smp_wmb();
-       v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc);
-       commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
+       v_add(config, padding_size, &shmp(handle, buf->commit_hot)[oldidx].cc);
+       commit_count = v_read(config, &shmp(handle, buf->commit_hot)[oldidx].cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
-                                     commit_count, oldidx);
+                                     commit_count, oldidx, handle);
        lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
                                             offsets->old, commit_count,
-                                            padding_size);
+                                            padding_size, handle);
 }
 
 /*
@@ -1082,13 +1025,14 @@ static
 void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
                                      struct channel *chan,
                                      struct switch_offsets *offsets,
-                                     u64 tsc)
+                                     u64 tsc,
+                                     struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long beginidx = subbuf_index(offsets->begin, chan);
        unsigned long commit_count;
 
-       config->cb.buffer_begin(buf, tsc, beginidx);
+       config->cb.buffer_begin(buf, tsc, beginidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1096,14 +1040,15 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
         */
        cmm_smp_wmb();
        v_add(config, config->cb.subbuffer_header_size(),
-             &shmp(buf->commit_hot)[beginidx].cc);
-       commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc);
+             &shmp(handle, buf->commit_hot)[beginidx].cc);
+       commit_count = v_read(config, &shmp(handle, buf->commit_hot)[beginidx].cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
-                                     commit_count, beginidx);
+                                     commit_count, beginidx, handle);
        lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
                                             offsets->begin, commit_count,
-                                            config->cb.subbuffer_header_size());
+                                            config->cb.subbuffer_header_size(),
+                                            handle);
 }
 
 /*
@@ -1114,9 +1059,10 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
  */
 static
 void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
-                                           struct channel *chan,
-                                           struct switch_offsets *offsets,
-                                           u64 tsc)
+                                   struct channel *chan,
+                                   struct switch_offsets *offsets,
+                                   u64 tsc,
+                                   struct shm_handle *handle)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
        unsigned long endidx = subbuf_index(offsets->end - 1, chan);
@@ -1124,20 +1070,21 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
 
        data_size = subbuf_offset(offsets->end - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
-       subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+       subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
+                               handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
         * determine that the subbuffer is full.
         */
        cmm_smp_wmb();
-       v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc);
-       commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
+       v_add(config, padding_size, &shmp(handle, buf->commit_hot)[endidx].cc);
+       commit_count = v_read(config, &shmp(handle, buf->commit_hot)[endidx].cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
-                                 commit_count, endidx);
+                                 commit_count, endidx, handle);
        lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
                                             offsets->end, commit_count,
-                                            padding_size);
+                                            padding_size, handle);
 }
 
 /*
@@ -1208,9 +1155,10 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
  * operations, this function must be called from the CPU which owns the buffer
  * for a ACTIVE flush.
  */
-void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
+void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode,
+                                struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        struct switch_offsets offsets;
        unsigned long oldidx;
@@ -1242,20 +1190,20 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m
        lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
 
        oldidx = subbuf_index(offsets.old, chan);
-       lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
+       lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
 
        /*
         * May need to populate header start on SWITCH_FLUSH.
         */
        if (offsets.switch_old_start) {
-               lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
+               lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
                offsets.old += config->cb.subbuffer_header_size();
        }
 
        /*
         * Switch old subbuffer.
         */
-       lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
+       lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
 }
 
 /*
@@ -1272,6 +1220,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                     struct lib_ring_buffer_ctx *ctx)
 {
        const struct lib_ring_buffer_config *config = chan->backend.config;
+       struct shm_handle *handle = ctx->handle;
        unsigned long reserve_commit_diff;
 
        offsets->begin = v_read(config, &buf->offset);
@@ -1321,7 +1270,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                  (buf_trunc(offsets->begin, chan)
                   >> chan->backend.num_subbuf_order)
                  - ((unsigned long) v_read(config,
-                                           &shmp(buf->commit_cold)[sb_index].cc_sb)
+                                           &shmp(handle, buf->commit_cold)[sb_index].cc_sb)
                     & chan->commit_count_mask);
                if (likely(reserve_commit_diff == 0)) {
                        /* Next subbuffer not being written to. */
@@ -1406,15 +1355,16 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
 int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
 {
        struct channel *chan = ctx->chan;
+       struct shm_handle *handle = ctx->handle;
        const struct lib_ring_buffer_config *config = chan->backend.config;
        struct lib_ring_buffer *buf;
        struct switch_offsets offsets;
        int ret;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               buf = &shmp(chan->backend.buf)[ctx->cpu];
+               buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
        else
-               buf = shmp(chan->backend.buf);
+               buf = shmp(handle, chan->backend.buf[0].shmp);
        ctx->buf = buf;
 
        offsets.size = 0;
@@ -1445,25 +1395,27 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
         * Clear noref flag for this subbuffer.
         */
        lib_ring_buffer_clear_noref(config, &buf->backend,
-                                   subbuf_index(offsets.end - 1, chan));
+                                   subbuf_index(offsets.end - 1, chan),
+                                   handle);
 
        /*
         * Switch old subbuffer if needed.
         */
        if (unlikely(offsets.switch_old_end)) {
                lib_ring_buffer_clear_noref(config, &buf->backend,
-                                           subbuf_index(offsets.old - 1, chan));
-               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
+                                           subbuf_index(offsets.old - 1, chan),
+                                           handle);
+               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
        }
 
        /*
         * Populate new subbuffer.
         */
        if (unlikely(offsets.switch_new_start))
-               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
+               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
 
        if (unlikely(offsets.switch_new_end))
-               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
+               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
 
        ctx->slot_size = offsets.size;
        ctx->pre_offset = offsets.begin;
diff --git a/libringbuffer/shm.c b/libringbuffer/shm.c
new file mode 100644 (file)
index 0000000..8f158e2
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * libringbuffer/shm.c
+ *
+ * Copyright 2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "shm.h"
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>  /* For mode constants */
+#include <fcntl.h>     /* For O_* constants */
+#include <assert.h>
+#include <ust/align.h>
+
+struct shm_object_table *shm_object_table_create(size_t max_nb_obj)
+{
+       struct shm_object_table *table;
+
+       table = zmalloc(sizeof(struct shm_object_table) +
+                       max_nb_obj * sizeof(table->objects[0]));
+       table->size = max_nb_obj;
+       return table;
+}
+
+struct shm_object *shm_object_table_append(struct shm_object_table *table,
+                                          size_t memory_map_size)
+{
+       int shmfd, waitfd[2], ret, i;
+       struct shm_object *obj;
+       char *memory_map;
+
+       if (table->allocated_len >= table->size)
+               return NULL;
+       obj = &table->objects[table->allocated_len++];
+
+       /* wait_fd: create pipe */
+       ret = pipe(waitfd);
+       if (ret < 0) {
+               PERROR("pipe");
+               goto error_pipe;
+       }
+       for (i = 0; i < 2; i++) {
+               ret = fcntl(waitfd[i], F_SETFD, FD_CLOEXEC);
+               if (ret < 0) {
+                       PERROR("fcntl");
+                       goto error_fcntl;
+               }
+       }
+       *obj->wait_fd = *waitfd;
+
+       /* shm_fd: create shm */
+
+       /*
+        * Allocate shm, and immediately unlink its shm oject, keeping
+        * only the file descriptor as a reference to the object. If it
+        * already exists (caused by short race window during which the
+        * global object exists in a concurrent shm_open), simply retry.
+        * We specifically do _not_ use the / at the beginning of the
+        * pathname so that some OS implementations can keep it local to
+        * the process (POSIX leaves this implementation-defined).
+        */
+       do {
+               shmfd = shm_open("ust-shm-tmp",
+                                O_CREAT | O_EXCL | O_RDWR, 0700);
+       } while (shmfd < 0 && errno == EEXIST);
+       if (shmfd < 0) {
+               PERROR("shm_open");
+               goto error_shm_open;
+       }
+       ret = shm_unlink("ust-shm-tmp");
+       if (ret) {
+               PERROR("shm_unlink");
+               goto error_unlink;
+       }
+       ret = ftruncate(shmfd, memory_map_size);
+       if (ret) {
+               PERROR("ftruncate");
+               goto error_ftruncate;
+       }
+       obj->shm_fd = shmfd;
+
+       /* memory_map: mmap */
+       memory_map = mmap(NULL, memory_map_size, PROT_READ | PROT_WRITE,
+                         MAP_SHARED, shmfd, 0);
+       if (memory_map == MAP_FAILED) {
+               PERROR("mmap");
+               goto error_mmap;
+       }
+       obj->memory_map = memory_map;
+       obj->memory_map_size = memory_map_size;
+       obj->allocated_len = 0;
+       return obj;
+
+error_mmap:
+error_ftruncate:
+error_unlink:
+       ret = close(shmfd);
+       if (ret) {
+               PERROR("close");
+               assert(0);
+       }
+error_shm_open:
+error_fcntl:
+       for (i = 0; i < 2; i++) {
+               ret = close(waitfd[i]);
+               if (ret) {
+                       PERROR("close");
+                       assert(0);
+               }
+       }
+error_pipe:
+       free(obj);
+       return NULL;
+       
+}
+
+static
+void shmp_object_destroy(struct shm_object *obj)
+{
+       int ret, i;
+
+        ret = munmap(obj->memory_map, obj->memory_map_size);
+        if (ret) {
+                PERROR("umnmap");
+                assert(0);
+        }
+       ret = close(obj->shm_fd);
+       if (ret) {
+               PERROR("close");
+               assert(0);
+       }
+       for (i = 0; i < 2; i++) {
+               ret = close(obj->wait_fd[i]);
+               if (ret) {
+                       PERROR("close");
+                       assert(0);
+               }
+       }
+}
+
+void shm_object_table_destroy(struct shm_object_table *table)
+{
+       int i;
+
+       for (i = 0; i < table->allocated_len; i++)
+               shmp_object_destroy(&table->objects[i]);
+       free(table);
+}
+
+/*
+ * zalloc_shm - allocate memory within a shm object.
+ *
+ * Shared memory is already zeroed by shmget.
+ * *NOT* multithread-safe (should be protected by mutex).
+ * Returns a -1, -1 tuple on error.
+ */
+struct shm_ref zalloc_shm(struct shm_object *obj, size_t len)
+{
+       struct shm_ref ref;
+       struct shm_ref shm_ref_error = { -1, -1 };
+
+       if (obj->memory_map_size - obj->allocated_len < len)
+               return shm_ref_error;
+       ref.index = obj->index;
+       ref.offset =  obj->allocated_len;
+       obj->allocated_len += len;
+       return ref;
+}
+
+void align_shm(struct shm_object *obj, size_t align)
+{
+       size_t offset_len = offset_align(obj->allocated_len, align);
+       obj->allocated_len += offset_len;
+}
index 71624e3999a3974ec4b11ea4c2c5045bc29dbaf1..d370375b6929cf913ed568e3e665ead996f515ec 100644 (file)
 #include <stdint.h>
 #include <ust/usterr-signal-safe.h>
 #include "ust/core.h"
+#include "shm_types.h"
 
 #define SHM_MAGIC      0x54335433
 #define SHM_MAJOR      0
 #define SHM_MINOR      1
 
 /*
- * Defining a max shm offset, for debugging purposes.
+ * Pointer dereferencing. We don't trust the shm_ref, so we validate
+ * both the index and offset with known boundaries.
  */
-#if (CAA_BITS_PER_LONG == 32)
-/* Define the maximum shared memory size to 128MB on 32-bit machines */
-#define MAX_SHM_SIZE   134217728
-#else
-/* Define the maximum shared memory size to 8GB on 64-bit machines */
-#define MAX_SHM_SIZE   8589934592
-#endif
-
-#define DECLARE_SHMP(type, name)       type *****name
-
-struct shm_header {
-       uint32_t magic;
-       uint8_t major;
-       uint8_t minor;
-       uint8_t bits_per_long;
-       size_t shm_size, shm_allocated;
-
-       DECLARE_SHMP(struct channel, chan);
-};
-
-struct shm_handle {
-       struct shm_header *header;      /* beginning of mapping */
-       int shmfd;                      /* process-local file descriptor */
-};
-
-#define shmp(shm_offset)               \
-       ((__typeof__(****(shm_offset))) (((char *) &(shm_offset)) + (ptrdiff_t) (shm_offset)))
-
-#define _shmp_abs(a)   ((a < 0) ? -(a) : (a))
-
-static inline
-void _set_shmp(ptrdiff_t *shm_offset, void *ptr)
-{
-       *shm_offset = (((char *) ptr) - ((char *) shm_offset));
-       assert(_shmp_abs(*shm_offset) < MAX_SHM_SIZE);
-}
-
-#define set_shmp(shm_offset, ptr)      \
-       _set_shmp((ptrdiff_t *) ****(shm_offset), ptr)
-
-/* Shared memory is already zeroed by shmget */
-/* *NOT* multithread-safe (should be protected by mutex) */
 static inline
-void *zalloc_shm(struct shm_header *shm_header, size_t len)
+char *_shmp(struct shm_object_table *table, struct shm_ref *ref)
 {
-       void *ret;
+       struct shm_object *obj;
+       size_t index, offset;
 
-       if (shm_header->shm_size - shm_header->shm_allocated < len)
+       index = (size_t) ref->index;
+       if (unlikely(index >= table->allocated_len))
+               return NULL;
+       obj = &table->objects[index];
+       offset = (size_t) ref->offset;
+       if (unlikely(offset >= obj->memory_map_size))
                return NULL;
-       ret = (char *) shm_header + shm_header->shm_allocated;
-       shm_header->shm_allocated += len;
-       return ret;
+       return &obj->memory_map[offset];
 }
 
+#define shmp(handle, ref)                                              \
+       ({                                                              \
+               __typeof__((ref)._type) ____ptr_ret;                    \
+               ____ptr_ret = (__typeof__(____ptr_ret)) _shmp((handle)->table, &(ref)._ref);    \
+               ____ptr_ret;                                            \
+       })
+
 static inline
-void align_shm(struct shm_header *shm_header, size_t align)
+void _set_shmp(struct shm_ref *ref, struct shm_ref src)
 {
-       size_t offset_len = offset_align(shm_header->shm_allocated, align);
-       shm_header->shm_allocated += offset_len;
+       *ref = src;
 }
 
+#define set_shmp(ref, src)     _set_shmp(&(ref)._ref, src)
+
+struct shm_object_table *shm_object_table_create(size_t max_nb_obj);
+void shm_object_table_destroy(struct shm_object_table *table);
+struct shm_object *shm_object_table_append(struct shm_object_table *table,
+                                          size_t memory_map_size);
+
+/*
+ * zalloc_shm - allocate memory within a shm object.
+ *
+ * Shared memory is already zeroed by shmget.
+ * *NOT* multithread-safe (should be protected by mutex).
+ * Returns a -1, -1 tuple on error.
+ */
+struct shm_ref zalloc_shm(struct shm_object *obj, size_t len);
+void align_shm(struct shm_object *obj, size_t align);
+
 #endif /* _LIBRINGBUFFER_SHM_H */
diff --git a/libringbuffer/shm_internal.h b/libringbuffer/shm_internal.h
new file mode 100644 (file)
index 0000000..77f345d
--- /dev/null
@@ -0,0 +1,23 @@
+#ifndef _LIBRINGBUFFER_SHM_INTERNAL_H
+#define _LIBRINGBUFFER_SHM_INTERNAL_H
+
+/*
+ * libringbuffer/shm_internal.h
+ *
+ * Copyright 2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+struct shm_ref {
+       volatile ssize_t index;         /* within the object table */
+       volatile ssize_t offset;        /* within the object */
+};
+
+#define DECLARE_SHMP(type, name)       \
+       union {                         \
+               struct shm_ref _ref;    \
+               type *_type;            \
+       } name
+
+#endif /* _LIBRINGBUFFER_SHM_INTERNAL_H */
diff --git a/libringbuffer/shm_types.h b/libringbuffer/shm_types.h
new file mode 100644 (file)
index 0000000..e92c0af
--- /dev/null
@@ -0,0 +1,43 @@
+#ifndef _LIBRINGBUFFER_SHM_TYPES_H
+#define _LIBRINGBUFFER_SHM_TYPES_H
+
+/*
+ * libringbuffer/shm_types.h
+ *
+ * Copyright 2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <stdint.h>
+#include "shm_internal.h"
+
+struct channel;
+
+struct shm_object {
+       size_t index;   /* within the object table */
+       int shm_fd;     /* shm fd */
+       int wait_fd[2]; /* fd for wait/wakeup */
+       char *memory_map;
+       size_t memory_map_size;
+       size_t allocated_len;
+};
+
+struct shm_object_table {
+       size_t size;
+       size_t allocated_len;
+       struct shm_object objects[];
+};
+
+struct shm_handle {
+       struct shm_object_table *table;
+       DECLARE_SHMP(struct channel, chan);
+       /*
+        * In the consumer, chan points to a shadow copy, validated upon
+        * reception. The chan object is overridden in the consumer to
+        * point to this shadow copy.
+        */
+       struct channel *shadow_chan;
+};
+
+#endif /* _LIBRINGBUFFER_SHM_TYPES_H */
index ece959b3e88c8237f0bf542fb50cf7f57871b5e6..8f9b7ed26410248176ec6174ce5101790bb3be23 100644 (file)
@@ -221,10 +221,9 @@ struct ltt_channel *ltt_channel_create(struct ltt_session *session,
         * headers. Therefore the "chan" information used as input
         * should be already accessible.
         */
-       chan->handle = transport->ops.channel_create("[lttng]", chan, buf_addr,
+       transport->ops.channel_create("[lttng]", chan, buf_addr,
                        subbuf_size, num_subbuf, switch_timer_interval,
                        read_timer_interval);
-       chan->chan = shmp(chan->handle->header->chan);
        if (!chan->chan)
                goto create_error;
        chan->enabled = 1;
@@ -248,7 +247,7 @@ active:
 static
 void _ltt_channel_destroy(struct ltt_channel *chan)
 {
-       chan->ops->channel_destroy(chan->handle);
+       chan->ops->channel_destroy(chan);
        cds_list_del(&chan->list);
        lttng_destroy_context(chan->ctx);
        free(chan);
@@ -387,10 +386,10 @@ int lttng_metadata_printf(struct ltt_session *session,
 
        for (pos = 0; pos < len; pos += reserve_len) {
                reserve_len = min_t(size_t,
-                               chan->ops->packet_avail_size(chan->chan),
+                               chan->ops->packet_avail_size(chan->chan, chan->handle),
                                len - pos);
                lib_ring_buffer_ctx_init(&ctx, chan->chan, NULL, reserve_len,
-                                        sizeof(char), -1);
+                                        sizeof(char), -1, chan->handle);
                /*
                 * We don't care about metadata buffer's records lost
                 * count, because we always retry here. Report error if
index dce34aa24597d35548069aa03e582cd2bbabf34b..d71d38616fe0fa3d52f0f2952ba8d32264b40e11 100644 (file)
@@ -293,13 +293,15 @@ static size_t client_packet_header_size(void)
 }
 
 static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
-                               unsigned int subbuf_idx)
+                               unsigned int subbuf_idx,
+                               struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        struct packet_header *header =
                (struct packet_header *)
                        lib_ring_buffer_offset_address(&buf->backend,
-                               subbuf_idx * chan->backend.subbuf_size);
+                               subbuf_idx * chan->backend.subbuf_size,
+                               handle);
        struct ltt_channel *ltt_chan = channel_get_private(chan);
        struct ltt_session *session = ltt_chan->session;
 
@@ -319,13 +321,15 @@ static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
  * subbuffer. data_size is between 1 and subbuf_size.
  */
 static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
-                             unsigned int subbuf_idx, unsigned long data_size)
+                             unsigned int subbuf_idx, unsigned long data_size,
+                             struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        struct packet_header *header =
                (struct packet_header *)
                        lib_ring_buffer_offset_address(&buf->backend,
-                               subbuf_idx * chan->backend.subbuf_size);
+                               subbuf_idx * chan->backend.subbuf_size,
+                               handle);
        unsigned long records_lost = 0;
 
        header->ctx.timestamp_end = tsc;
@@ -338,12 +342,12 @@ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
 }
 
 static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
-                               int cpu, const char *name)
+                               int cpu, const char *name, struct shm_handle *handle)
 {
        return 0;
 }
 
-static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
+static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu, struct shm_handle *handle)
 {
 }
 
@@ -368,41 +372,45 @@ static const struct lib_ring_buffer_config client_config = {
 };
 
 static
-struct shm_handle *_channel_create(const char *name,
+struct ltt_channel *_channel_create(const char *name,
                                struct ltt_channel *ltt_chan, void *buf_addr,
                                size_t subbuf_size, size_t num_subbuf,
                                unsigned int switch_timer_interval,
                                unsigned int read_timer_interval)
 {
-       return channel_create(&client_config, name, ltt_chan, buf_addr,
+       ltt_chan->handle = channel_create(&client_config, name, ltt_chan, buf_addr,
                              subbuf_size, num_subbuf, switch_timer_interval,
                              read_timer_interval);
+       ltt_chan->chan = shmp(handle, handle->chan);
+       return ltt_chan;
 }
 
 static
-void ltt_channel_destroy(struct shm_handle *handle)
+void ltt_channel_destroy(struct ltt_channel *ltt_chan)
 {
-       channel_destroy(handle);
+       channel_destroy(ltt_chan->chan, ltt_chan->handle);
 }
 
 static
-struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan)
+struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan,
+                                            struct shm_handle *handle)
 {
        struct lib_ring_buffer *buf;
        int cpu;
 
        for_each_channel_cpu(cpu, chan) {
-               buf = channel_get_ring_buffer(&client_config, chan, cpu);
-               if (!lib_ring_buffer_open_read(buf))
+               buf = channel_get_ring_buffer(&client_config, chan, cpu, handle);
+               if (!lib_ring_buffer_open_read(buf, handle))
                        return buf;
        }
        return NULL;
 }
 
 static
-void ltt_buffer_read_close(struct lib_ring_buffer *buf)
+void ltt_buffer_read_close(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       lib_ring_buffer_release_read(buf);
+       lib_ring_buffer_release_read(buf, handle);
 }
 
 static
index be9e4fb6781f4803ebc5de5ab3c5c1b23cf43ac7..88716a26f6b7d36548a50d96fafcb4790be2376a 100644 (file)
@@ -76,13 +76,15 @@ static size_t client_packet_header_size(void)
 }
 
 static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
-                               unsigned int subbuf_idx)
+                               unsigned int subbuf_idx,
+                               struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        struct metadata_packet_header *header =
                (struct metadata_packet_header *)
                        lib_ring_buffer_offset_address(&buf->backend,
-                               subbuf_idx * chan->backend.subbuf_size);
+                               subbuf_idx * chan->backend.subbuf_size,
+                               handle);
        struct ltt_channel *ltt_chan = channel_get_private(chan);
        struct ltt_session *session = ltt_chan->session;
 
@@ -101,13 +103,15 @@ static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
  * subbuffer. data_size is between 1 and subbuf_size.
  */
 static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
-                             unsigned int subbuf_idx, unsigned long data_size)
+                             unsigned int subbuf_idx, unsigned long data_size,
+                             struct shm_handle *handle)
 {
-       struct channel *chan = shmp(buf->backend.chan);
+       struct channel *chan = shmp(handle, buf->backend.chan);
        struct metadata_packet_header *header =
                (struct metadata_packet_header *)
                        lib_ring_buffer_offset_address(&buf->backend,
-                               subbuf_idx * chan->backend.subbuf_size);
+                               subbuf_idx * chan->backend.subbuf_size,
+                               handle);
        unsigned long records_lost = 0;
 
        header->content_size = data_size * CHAR_BIT;            /* in bits */
@@ -119,12 +123,15 @@ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
 }
 
 static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
-                               int cpu, const char *name)
+                               int cpu, const char *name,
+                               struct shm_handle *handle)
 {
        return 0;
 }
 
-static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
+static void client_buffer_finalize(struct lib_ring_buffer *buf,
+                                  void *priv, int cpu,
+                                  struct shm_handle *handle)
 {
 }
 
@@ -149,38 +156,42 @@ static const struct lib_ring_buffer_config client_config = {
 };
 
 static
-struct shm_handle *_channel_create(const char *name,
+struct ltt_channel *_channel_create(const char *name,
                                struct ltt_channel *ltt_chan, void *buf_addr,
                                size_t subbuf_size, size_t num_subbuf,
                                unsigned int switch_timer_interval,
                                unsigned int read_timer_interval)
 {
-       return channel_create(&client_config, name, ltt_chan, buf_addr,
+       ltt_chan->handle = channel_create(&client_config, name, ltt_chan, buf_addr,
                              subbuf_size, num_subbuf, switch_timer_interval,
                              read_timer_interval);
+       ltt_chan->chan = shmp(handle, handle->chan);
+       return ltt_chan;
 }
 
 static
-void ltt_channel_destroy(struct shm_handle *handle)
+void ltt_channel_destroy(struct ltt_channel *ltt_chan)
 {
-       channel_destroy(handle);
+       channel_destroy(ltt_chan->chan, ltt_chan->handle);
 }
 
 static
-struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan)
+struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan,
+                                            struct shm_handle *handle)
 {
        struct lib_ring_buffer *buf;
 
-       buf = channel_get_ring_buffer(&client_config, chan, 0);
-       if (!lib_ring_buffer_open_read(buf))
+       buf = channel_get_ring_buffer(&client_config, chan, 0, handle);
+       if (!lib_ring_buffer_open_read(buf, handle))
                return buf;
        return NULL;
 }
 
 static
-void ltt_buffer_read_close(struct lib_ring_buffer *buf)
+void ltt_buffer_read_close(struct lib_ring_buffer *buf,
+                          struct shm_handle *handle)
 {
-       lib_ring_buffer_release_read(buf);
+       lib_ring_buffer_release_read(buf, handle);
 }
 
 static
@@ -203,13 +214,13 @@ void ltt_event_write(struct lib_ring_buffer_ctx *ctx, const void *src,
 }
 
 static
-size_t ltt_packet_avail_size(struct channel *chan)
+size_t ltt_packet_avail_size(struct channel *chan, struct shm_handle *handle)
                             
 {
        unsigned long o_begin;
        struct lib_ring_buffer *buf;
 
-       buf = shmp(chan->backend.buf);  /* Only for global buffer ! */
+       buf = shmp(handle, chan->backend.buf[0].shmp);  /* Only for global buffer ! */
        o_begin = v_read(&client_config, &buf->offset);
        if (subbuf_offset(o_begin, chan) != 0) {
                return chan->backend.subbuf_size - subbuf_offset(o_begin, chan);
This page took 0.075781 seconds and 4 git commands to generate.