From 1d4981969313da002983ca979bd85c95493f7316 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Fri, 19 Aug 2011 20:03:53 -0400 Subject: [PATCH] Implement shm object table .. for multi-process accesses to the same maps, with safe "pointers". Signed-off-by: Mathieu Desnoyers --- include/ust/lttng-events.h | 14 +- include/ust/lttng-tracepoint-event.h | 4 +- include/ust/ringbuffer-config.h | 21 +- libringbuffer/Makefile.am | 1 + libringbuffer/backend.h | 38 +- libringbuffer/backend_internal.h | 108 +++--- libringbuffer/backend_types.h | 14 +- libringbuffer/frontend.h | 46 ++- libringbuffer/frontend_api.h | 23 +- libringbuffer/frontend_internal.h | 58 +-- libringbuffer/frontend_types.h | 3 +- libringbuffer/ring_buffer_abi.c | 4 +- libringbuffer/ring_buffer_backend.c | 197 +++++----- libringbuffer/ring_buffer_frontend.c | 440 ++++++++++------------- libringbuffer/shm.c | 177 +++++++++ libringbuffer/shm.h | 91 ++--- libringbuffer/shm_internal.h | 23 ++ libringbuffer/shm_types.h | 43 +++ libust/ltt-events.c | 9 +- libust/ltt-ring-buffer-client.h | 42 ++- libust/ltt-ring-buffer-metadata-client.h | 49 ++- 21 files changed, 848 insertions(+), 557 deletions(-) create mode 100644 libringbuffer/shm.c create mode 100644 libringbuffer/shm_internal.h create mode 100644 libringbuffer/shm_types.h diff --git a/include/ust/lttng-events.h b/include/ust/lttng-events.h index 8f1c98d3..31938327 100644 --- a/include/ust/lttng-events.h +++ b/include/ust/lttng-events.h @@ -195,17 +195,20 @@ struct ltt_event { }; struct channel; +struct shm_handle; struct ltt_channel_ops { - struct shm_handle *(*channel_create)(const char *name, + struct ltt_channel *(*channel_create)(const char *name, struct ltt_channel *ltt_chan, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval); - void (*channel_destroy)(struct shm_handle *handle); - struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan); - void (*buffer_read_close)(struct lib_ring_buffer *buf); + void (*channel_destroy)(struct ltt_channel *ltt_chan); + struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan, + struct shm_handle *handle); + void (*buffer_read_close)(struct lib_ring_buffer *buf, + struct shm_handle *handle); int (*event_reserve)(struct lib_ring_buffer_ctx *ctx, uint32_t event_id); void (*event_commit)(struct lib_ring_buffer_ctx *ctx); @@ -216,7 +219,8 @@ struct ltt_channel_ops { * packet. Note that the size returned is only a hint, since it * may change due to concurrent writes. */ - size_t (*packet_avail_size)(struct channel *chan); + size_t (*packet_avail_size)(struct channel *chan, + struct shm_handle *handle); //wait_queue_head_t *(*get_reader_wait_queue)(struct channel *chan); //wait_queue_head_t *(*get_hp_wait_queue)(struct channel *chan); int (*is_finalized)(struct channel *chan); diff --git a/include/ust/lttng-tracepoint-event.h b/include/ust/lttng-tracepoint-event.h index de145ebe..1642cf18 100644 --- a/include/ust/lttng-tracepoint-event.h +++ b/include/ust/lttng-tracepoint-event.h @@ -480,7 +480,7 @@ static void __event_probe__##_name(void *__data, _proto) \ __event_len = __event_get_size__##_name(__dynamic_len, _args); \ __event_align = __event_get_align__##_name(_args); \ lib_ring_buffer_ctx_init(&ctx, __chan->chan, __event, __event_len, \ - __event_align, -1); \ + __event_align, -1, __chan->handle); \ __ret = __chan->ops->event_reserve(&ctx, __event->id); \ if (__ret < 0) \ return; \ @@ -511,7 +511,7 @@ static void __event_probe__##_name(void *__data) \ __event_len = __event_get_size__##_name(__dynamic_len); \ __event_align = __event_get_align__##_name(); \ lib_ring_buffer_ctx_init(&ctx, __chan->chan, __event, __event_len, \ - __event_align, -1); \ + __event_align, -1, __chan->handle); \ __ret = __chan->ops->event_reserve(&ctx, __event->id); \ if (__ret < 0) \ return; \ diff --git a/include/ust/ringbuffer-config.h b/include/ust/ringbuffer-config.h index 80503205..15fee52d 100644 --- a/include/ust/ringbuffer-config.h +++ b/include/ust/ringbuffer-config.h @@ -20,6 +20,7 @@ struct lib_ring_buffer; struct channel; struct lib_ring_buffer_config; struct lib_ring_buffer_ctx; +struct shm_handle *handle; /* * Ring buffer client callbacks. Only used by slow path, never on fast path. @@ -40,20 +41,25 @@ struct lib_ring_buffer_client_cb { /* Slow path only, at subbuffer switch */ size_t (*subbuffer_header_size) (void); void (*buffer_begin) (struct lib_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx); + unsigned int subbuf_idx, + struct shm_handle *handle); void (*buffer_end) (struct lib_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size); + unsigned int subbuf_idx, unsigned long data_size, + struct shm_handle *handle); /* Optional callbacks (can be set to NULL) */ /* Called at buffer creation/finalize */ int (*buffer_create) (struct lib_ring_buffer *buf, void *priv, - int cpu, const char *name); + int cpu, const char *name, + struct shm_handle *handle); /* * Clients should guarantee that no new reader handle can be opened * after finalize. */ - void (*buffer_finalize) (struct lib_ring_buffer *buf, void *priv, int cpu); + void (*buffer_finalize) (struct lib_ring_buffer *buf, + void *priv, int cpu, + struct shm_handle *handle); /* * Extract header length, payload length and timestamp from event @@ -63,7 +69,8 @@ struct lib_ring_buffer_client_cb { void (*record_get) (const struct lib_ring_buffer_config *config, struct channel *chan, struct lib_ring_buffer *buf, size_t offset, size_t *header_len, - size_t *payload_len, u64 *timestamp); + size_t *payload_len, u64 *timestamp, + struct shm_handle *handle); }; /* @@ -165,6 +172,7 @@ struct lib_ring_buffer_ctx { /* input received by lib_ring_buffer_reserve(), saved here. */ struct channel *chan; /* channel */ void *priv; /* client private data */ + struct shm_handle *handle; /* shared-memory handle */ size_t data_size; /* size of payload */ int largest_align; /* * alignment of the largest element @@ -202,7 +210,7 @@ static inline void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx, struct channel *chan, void *priv, size_t data_size, int largest_align, - int cpu) + int cpu, struct shm_handle *handle) { ctx->chan = chan; ctx->priv = priv; @@ -210,6 +218,7 @@ void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx, ctx->largest_align = largest_align; ctx->cpu = cpu; ctx->rflags = 0; + ctx->handle = handle; } /* diff --git a/libringbuffer/Makefile.am b/libringbuffer/Makefile.am index ffa37017..2fe76816 100644 --- a/libringbuffer/Makefile.am +++ b/libringbuffer/Makefile.am @@ -7,6 +7,7 @@ noinst_HEADERS = \ smp.h libringbuffer_la_SOURCES = \ + shm.c \ smp.c \ ring_buffer_backend.c \ ring_buffer_frontend.c diff --git a/libringbuffer/backend.h b/libringbuffer/backend.h index 1bd61109..e26045af 100644 --- a/libringbuffer/backend.h +++ b/libringbuffer/backend.h @@ -27,10 +27,12 @@ /* Ring buffer backend access (read/write) */ extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, - size_t offset, void *dest, size_t len); + size_t offset, void *dest, size_t len, + struct shm_handle *handle); extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, - size_t offset, void *dest, size_t len); + size_t offset, void *dest, size_t len, + struct shm_handle *handle); /* * Return the address where a given offset is located. @@ -40,10 +42,12 @@ extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, */ extern void * lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb, - size_t offset); + size_t offset, + struct shm_handle *handle); extern void * lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb, - size_t offset); + size_t offset, + struct shm_handle *handle); /** * lib_ring_buffer_write - write data to a buffer backend @@ -64,16 +68,17 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config, { struct lib_ring_buffer_backend *bufb = &ctx->buf->backend; struct channel_backend *chanb = &ctx->chan->backend; + struct shm_handle *handle = ctx->handle; size_t sbidx; size_t offset = ctx->buf_offset; - struct lib_ring_buffer_backend_pages *rpages; + struct lib_ring_buffer_backend_pages_shmp *rpages; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; sbidx = offset >> chanb->subbuf_size_order; - id = shmp(bufb->buf_wsb)[sbidx].id; + id = shmp(handle, bufb->buf_wsb)[sbidx].id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = shmp(bufb->array)[sb_bindex]; + rpages = &shmp(handle, bufb->array)[sb_bindex]; CHAN_WARN_ON(ctx->chan, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); @@ -83,7 +88,7 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config, */ CHAN_WARN_ON(chanb, offset >= chanb->buf_size); lib_ring_buffer_do_copy(config, - shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), + shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)), src, len); ctx->buf_offset += len; } @@ -96,24 +101,25 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config, static inline unsigned long lib_ring_buffer_get_records_unread( const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf) + struct lib_ring_buffer *buf, + struct shm_handle *handle) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct lib_ring_buffer_backend_pages *pages; + struct lib_ring_buffer_backend_pages_shmp *pages; unsigned long records_unread = 0, sb_bindex, id; unsigned int i; - for (i = 0; i < shmp(bufb->chan)->backend.num_subbuf; i++) { - id = shmp(bufb->buf_wsb)[i].id; + for (i = 0; i < shmp(handle, bufb->chan)->backend.num_subbuf; i++) { + id = shmp(handle, bufb->buf_wsb)[i].id; sb_bindex = subbuffer_id_get_index(config, id); - pages = shmp(bufb->array)[sb_bindex]; - records_unread += v_read(config, &pages->records_unread); + pages = &shmp(handle, bufb->array)[sb_bindex]; + records_unread += v_read(config, &shmp(handle, pages->shmp)->records_unread); } if (config->mode == RING_BUFFER_OVERWRITE) { id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - pages = shmp(bufb->array)[sb_bindex]; - records_unread += v_read(config, &pages->records_unread); + pages = &shmp(handle, bufb->array)[sb_bindex]; + records_unread += v_read(config, &shmp(handle, pages->shmp)->records_unread); } return records_unread; } diff --git a/libringbuffer/backend_internal.h b/libringbuffer/backend_internal.h index 061924fc..30e32098 100644 --- a/libringbuffer/backend_internal.h +++ b/libringbuffer/backend_internal.h @@ -25,17 +25,20 @@ int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb, struct channel_backend *chan, int cpu, - struct shm_header *shm_header); + struct shm_handle *handle, + struct shm_object *shmobj); void channel_backend_unregister_notifiers(struct channel_backend *chanb); void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb); int channel_backend_init(struct channel_backend *chanb, const char *name, const struct lib_ring_buffer_config *config, void *priv, size_t subbuf_size, - size_t num_subbuf, struct shm_header *shm_header); -void channel_backend_free(struct channel_backend *chanb); + size_t num_subbuf, struct shm_handle *handle); +void channel_backend_free(struct channel_backend *chanb, + struct shm_handle *handle); -void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb); +void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb, + struct shm_handle *handle); void channel_backend_reset(struct channel_backend *chanb); int lib_ring_buffer_backend_init(void); @@ -183,12 +186,12 @@ int subbuffer_id_check_index(const struct lib_ring_buffer_config *config, static inline void subbuffer_count_record(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, - unsigned long idx) + unsigned long idx, struct shm_handle *handle) { unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); - v_inc(config, &shmp(bufb->array)[sb_bindex]->records_commit); + sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id); + v_inc(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_commit); } /* @@ -197,15 +200,16 @@ void subbuffer_count_record(const struct lib_ring_buffer_config *config, */ static inline void subbuffer_consume_record(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer_backend *bufb) + struct lib_ring_buffer_backend *bufb, + struct shm_handle *handle) { unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); - CHAN_WARN_ON(bufb->chan, - !v_read(config, &shmp(bufb->array)[sb_bindex]->records_unread)); + CHAN_WARN_ON(shmp(handle, bufb->chan), + !v_read(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_unread)); /* Non-atomic decrement protected by exclusive subbuffer access */ - _v_dec(config, &shmp(bufb->array)[sb_bindex]->records_unread); + _v_dec(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_unread); v_inc(config, &bufb->records_read); } @@ -213,12 +217,13 @@ static inline unsigned long subbuffer_get_records_count( const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); - return v_read(config, &shmp(bufb->array)[sb_bindex]->records_commit); + sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id); + return v_read(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_commit); } /* @@ -231,17 +236,18 @@ static inline unsigned long subbuffer_count_records_overrun( const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { - struct lib_ring_buffer_backend_pages *pages; + struct lib_ring_buffer_backend_pages_shmp *pages; unsigned long overruns, sb_bindex; - sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); - pages = shmp(bufb->array)[sb_bindex]; - overruns = v_read(config, &pages->records_unread); - v_set(config, &pages->records_unread, - v_read(config, &pages->records_commit)); - v_set(config, &pages->records_commit, 0); + sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id); + pages = &shmp(handle, bufb->array)[sb_bindex]; + overruns = v_read(config, &shmp(handle, pages->shmp)->records_unread); + v_set(config, &shmp(handle, pages->shmp)->records_unread, + v_read(config, &shmp(handle, pages->shmp)->records_commit)); + v_set(config, &shmp(handle, pages->shmp)->records_commit, 0); return overruns; } @@ -250,41 +256,44 @@ static inline void subbuffer_set_data_size(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, unsigned long idx, - unsigned long data_size) + unsigned long data_size, + struct shm_handle *handle) { - struct lib_ring_buffer_backend_pages *pages; + struct lib_ring_buffer_backend_pages_shmp *pages; unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); - pages = shmp(bufb->array)[sb_bindex]; - pages->data_size = data_size; + sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id); + pages = &shmp(handle, bufb->array)[sb_bindex]; + shmp(handle, pages->shmp)->data_size = data_size; } static inline unsigned long subbuffer_get_read_data_size( const struct lib_ring_buffer_config *config, - struct lib_ring_buffer_backend *bufb) + struct lib_ring_buffer_backend *bufb, + struct shm_handle *handle) { - struct lib_ring_buffer_backend_pages *pages; + struct lib_ring_buffer_backend_pages_shmp *pages; unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); - pages = shmp(bufb->array)[sb_bindex]; - return pages->data_size; + pages = &shmp(handle, bufb->array)[sb_bindex]; + return shmp(handle, pages->shmp)->data_size; } static inline unsigned long subbuffer_get_data_size( const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { - struct lib_ring_buffer_backend_pages *pages; + struct lib_ring_buffer_backend_pages_shmp *pages; unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); - pages = shmp(bufb->array)[sb_bindex]; - return pages->data_size; + sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id); + pages = &shmp(handle, bufb->array)[sb_bindex]; + return shmp(handle, pages->shmp)->data_size; } /** @@ -294,7 +303,8 @@ unsigned long subbuffer_get_data_size( static inline void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { unsigned long id, new_id; @@ -305,7 +315,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, * Performing a volatile access to read the sb_pages, because we want to * read a coherent version of the pointer and the associated noref flag. */ - id = CMM_ACCESS_ONCE(shmp(bufb->buf_wsb)[idx].id); + id = CMM_ACCESS_ONCE(shmp(handle, bufb->buf_wsb)[idx].id); for (;;) { /* This check is called on the fast path for each record. */ if (likely(!subbuffer_id_is_noref(config, id))) { @@ -319,7 +329,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, } new_id = id; subbuffer_id_clear_noref(config, &new_id); - new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[idx].id, id, new_id); + new_id = uatomic_cmpxchg(&shmp(handle, bufb->buf_wsb)[idx].id, id, new_id); if (likely(new_id == id)) break; id = new_id; @@ -333,7 +343,8 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, static inline void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, - unsigned long idx, unsigned long offset) + unsigned long idx, unsigned long offset, + struct shm_handle *handle) { if (config->mode != RING_BUFFER_OVERWRITE) return; @@ -349,14 +360,14 @@ void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *confi * subbuffer_set_noref() uses a volatile store to deal with concurrent * readers of the noref flag. */ - CHAN_WARN_ON(bufb->chan, - subbuffer_id_is_noref(config, shmp(bufb->buf_wsb)[idx].id)); + CHAN_WARN_ON(shmp(handle, bufb->chan), + subbuffer_id_is_noref(config, shmp(handle, bufb->buf_wsb)[idx].id)); /* * Memory barrier that ensures counter stores are ordered before set * noref and offset. */ cmm_smp_mb(); - subbuffer_id_set_noref_offset(config, &shmp(bufb->buf_wsb)[idx].id, offset); + subbuffer_id_set_noref_offset(config, &shmp(handle, bufb->buf_wsb)[idx].id, offset); } /** @@ -367,7 +378,8 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, struct channel_backend *chanb, unsigned long consumed_idx, - unsigned long consumed_count) + unsigned long consumed_count, + struct shm_handle *handle) { unsigned long old_id, new_id; @@ -378,7 +390,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, * old_wpage, because the value read will be confirmed by the * following cmpxchg(). */ - old_id = shmp(bufb->buf_wsb)[consumed_idx].id; + old_id = shmp(handle, bufb->buf_wsb)[consumed_idx].id; if (unlikely(!subbuffer_id_is_noref(config, old_id))) return -EAGAIN; /* @@ -388,18 +400,18 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, if (unlikely(!subbuffer_id_compare_offset(config, old_id, consumed_count))) return -EAGAIN; - CHAN_WARN_ON(bufb->chan, + CHAN_WARN_ON(shmp(handle, bufb->chan), !subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id, consumed_count); - new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[consumed_idx].id, old_id, + new_id = uatomic_cmpxchg(&shmp(handle, bufb->buf_wsb)[consumed_idx].id, old_id, bufb->buf_rsb.id); if (unlikely(old_id != new_id)) return -EAGAIN; bufb->buf_rsb.id = new_id; } else { /* No page exchange, use the writer page directly */ - bufb->buf_rsb.id = shmp(bufb->buf_wsb)[consumed_idx].id; + bufb->buf_rsb.id = shmp(handle, bufb->buf_wsb)[consumed_idx].id; } return 0; } diff --git a/libringbuffer/backend_types.h b/libringbuffer/backend_types.h index 3bc36ba8..51dcfbf9 100644 --- a/libringbuffer/backend_types.h +++ b/libringbuffer/backend_types.h @@ -11,7 +11,7 @@ * Dual LGPL v2.1/GPL v2 license. */ -#include "shm.h" +#include "shm_internal.h" struct lib_ring_buffer_backend_pages { unsigned long mmap_offset; /* offset of the subbuffer in mmap */ @@ -32,6 +32,10 @@ struct lib_ring_buffer_backend_subbuffer { struct channel; struct lib_ring_buffer; +struct lib_ring_buffer_backend_pages_shmp { + DECLARE_SHMP(struct lib_ring_buffer_backend_pages, shmp); +}; + struct lib_ring_buffer_backend { /* Array of ring_buffer_backend_subbuffer for writer */ DECLARE_SHMP(struct lib_ring_buffer_backend_subbuffer, buf_wsb); @@ -41,7 +45,7 @@ struct lib_ring_buffer_backend { * Pointer array of backend pages, for whole buffer. * Indexed by ring_buffer_backend_subbuffer identifier (id) index. */ - DECLARE_SHMP(struct lib_ring_buffer_backend_pages *, array); + DECLARE_SHMP(struct lib_ring_buffer_backend_pages_shmp, array); DECLARE_SHMP(char, memory_map); /* memory mapping */ DECLARE_SHMP(struct channel, chan); /* Associated channel */ @@ -50,6 +54,10 @@ struct lib_ring_buffer_backend { unsigned int allocated:1; /* Bool: is buffer allocated ? */ }; +struct lib_ring_buffer_shmp { + DECLARE_SHMP(struct lib_ring_buffer, shmp); /* Channel per-cpu buffers */ +}; + struct channel_backend { unsigned long buf_size; /* Size of the buffer */ unsigned long subbuf_size; /* Sub-buffer size */ @@ -60,12 +68,12 @@ struct channel_backend { */ unsigned int buf_size_order; /* Order of buffer size */ int extra_reader_sb:1; /* Bool: has extra reader subbuffer */ - DECLARE_SHMP(struct lib_ring_buffer, buf); /* Channel per-cpu buffers */ unsigned long num_subbuf; /* Number of sub-buffers for writer */ u64 start_tsc; /* Channel creation TSC value */ void *priv; /* Client-specific information */ const struct lib_ring_buffer_config *config; /* Ring buffer configuration */ char name[NAME_MAX]; /* Channel name */ + struct lib_ring_buffer_shmp buf[]; }; #endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */ diff --git a/libringbuffer/frontend.h b/libringbuffer/frontend.h index 5e5b5cc1..2385d392 100644 --- a/libringbuffer/frontend.h +++ b/libringbuffer/frontend.h @@ -50,7 +50,7 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, * channel. */ extern -void *channel_destroy(struct shm_handle *handle); +void *channel_destroy(struct channel *chan, struct shm_handle *handle); /* Buffer read operations */ @@ -66,48 +66,59 @@ void *channel_destroy(struct shm_handle *handle); extern struct lib_ring_buffer *channel_get_ring_buffer( const struct lib_ring_buffer_config *config, - struct channel *chan, int cpu); -extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf); -extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf); + struct channel *chan, int cpu, + struct shm_handle *handle); +extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, + struct shm_handle *handle); +extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf, + struct shm_handle *handle); /* * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer. */ extern int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, unsigned long *consumed, - unsigned long *produced); + unsigned long *produced, + struct shm_handle *handle); extern void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, - unsigned long consumed_new); + unsigned long consumed_new, + struct shm_handle *handle); extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, - unsigned long consumed); -extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf); + unsigned long consumed, + struct shm_handle *handle); +extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf, + struct shm_handle *handle); /* * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers * to read sub-buffers sequentially. */ -static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf) +static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf, + struct shm_handle *handle) { int ret; ret = lib_ring_buffer_snapshot(buf, &buf->cons_snapshot, - &buf->prod_snapshot); + &buf->prod_snapshot, handle); if (ret) return ret; - ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot); + ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot, handle); return ret; } -static inline void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf) +static inline +void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - lib_ring_buffer_put_subbuf(buf); + lib_ring_buffer_put_subbuf(buf, handle); lib_ring_buffer_move_consumer(buf, subbuf_align(buf->cons_snapshot, - shmp(buf->backend.chan))); + shmp(handle, buf->backend.chan)), handle); } extern void channel_reset(struct channel *chan); -extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf); +extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf, + struct shm_handle *handle); static inline unsigned long lib_ring_buffer_get_offset(const struct lib_ring_buffer_config *config, @@ -154,9 +165,10 @@ int lib_ring_buffer_channel_is_disabled(const struct channel *chan) static inline unsigned long lib_ring_buffer_get_read_data_size( const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf) + struct lib_ring_buffer *buf, + struct shm_handle *handle) { - return subbuffer_get_read_data_size(config, &buf->backend); + return subbuffer_get_read_data_size(config, &buf->backend, handle); } static inline diff --git a/libringbuffer/frontend_api.h b/libringbuffer/frontend_api.h index 75146e60..1d8e2949 100644 --- a/libringbuffer/frontend_api.h +++ b/libringbuffer/frontend_api.h @@ -146,6 +146,7 @@ int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_ctx *ctx) { struct channel *chan = ctx->chan; + struct shm_handle *handle = ctx->handle; struct lib_ring_buffer *buf; unsigned long o_begin, o_end, o_old; size_t before_hdr_pad = 0; @@ -154,9 +155,9 @@ int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config, return -EAGAIN; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - buf = &shmp(chan->backend.buf)[ctx->cpu]; + buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp); else - buf = shmp(chan->backend.buf); + buf = shmp(handle, chan->backend.buf[0].shmp); if (uatomic_read(&buf->record_disabled)) return -EAGAIN; ctx->buf = buf; @@ -189,7 +190,7 @@ int lib_ring_buffer_reserve(const struct lib_ring_buffer_config *config, * Clear noref flag for this subbuffer. */ lib_ring_buffer_clear_noref(config, &ctx->buf->backend, - subbuf_index(o_end - 1, chan)); + subbuf_index(o_end - 1, chan), handle); ctx->pre_offset = o_begin; ctx->buf_offset = o_begin + before_hdr_pad; @@ -214,9 +215,10 @@ slow_path: */ static inline void lib_ring_buffer_switch(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, enum switch_mode mode) + struct lib_ring_buffer *buf, enum switch_mode mode, + struct shm_handle *handle) { - lib_ring_buffer_switch_slow(buf, mode); + lib_ring_buffer_switch_slow(buf, mode, handle); } /* See ring_buffer_frontend_api.h for lib_ring_buffer_reserve(). */ @@ -234,6 +236,7 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config, const struct lib_ring_buffer_ctx *ctx) { struct channel *chan = ctx->chan; + struct shm_handle *handle = ctx->handle; struct lib_ring_buffer *buf = ctx->buf; unsigned long offset_end = ctx->buf_offset; unsigned long endidx = subbuf_index(offset_end - 1, chan); @@ -242,7 +245,7 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config, /* * Must count record before incrementing the commit count. */ - subbuffer_count_record(config, &buf->backend, endidx); + subbuffer_count_record(config, &buf->backend, endidx, handle); /* * Order all writes to buffer before the commit count update that will @@ -250,7 +253,7 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config, */ cmm_smp_wmb(); - v_add(config, ctx->slot_size, &shmp(buf->commit_hot)[endidx].cc); + v_add(config, ctx->slot_size, &shmp(handle, buf->commit_hot)[endidx].cc); /* * commit count read can race with concurrent OOO commit count updates. @@ -270,17 +273,17 @@ void lib_ring_buffer_commit(const struct lib_ring_buffer_config *config, * count reaches back the reserve offset for a specific sub-buffer, * which is completely independent of the order. */ - commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[endidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1, - commit_count, endidx); + commit_count, endidx, handle); /* * Update used size at each commit. It's needed only for extracting * ring_buffer buffers from vmcore, after crash. */ lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, ctx->buf_offset, commit_count, - ctx->slot_size); + ctx->slot_size, handle); } /** diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index 7c28788a..73b44726 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -144,7 +144,8 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx); extern void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, - enum switch_mode mode); + enum switch_mode mode, + struct shm_handle *handle); /* Buffer write helpers */ @@ -180,22 +181,24 @@ static inline void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) - v_set(config, &shmp(buf->commit_hot)[idx].seq, commit_count); + v_set(config, &shmp(handle, buf->commit_hot)[idx].seq, commit_count); } static inline int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - struct channel *chan) + struct channel *chan, + struct shm_handle *handle) { unsigned long consumed_old, consumed_idx, commit_count, write_offset; consumed_old = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed_old, chan); - commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp(handle, buf->commit_cold)[consumed_idx].cc_sb); /* * No memory barrier here, since we are only interested * in a statistically correct polling result. The next poll will @@ -239,9 +242,10 @@ int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config, static inline unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { - return subbuffer_get_data_size(config, &buf->backend, idx); + return subbuffer_get_data_size(config, &buf->backend, idx, handle); } /* @@ -252,7 +256,8 @@ unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config static inline int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config, struct lib_ring_buffer *buf, - struct channel *chan) + struct channel *chan, + struct shm_handle *handle) { unsigned long offset, idx, commit_count; @@ -270,7 +275,7 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi do { offset = v_read(config, &buf->offset); idx = subbuf_index(offset, chan); - commit_count = v_read(config, &shmp(buf->commit_hot)[idx].cc); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[idx].cc); } while (offset != v_read(config, &buf->offset)); return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) @@ -283,7 +288,8 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, struct channel *chan, unsigned long offset, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct shm_handle *handle) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -318,7 +324,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ - if (likely(v_cmpxchg(config, &shmp(buf->commit_cold)[idx].cc_sb, + if (likely(v_cmpxchg(config, &shmp(handle, buf->commit_cold)[idx].cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { /* @@ -330,17 +336,20 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, tsc = config->cb.ring_buffer_clock_read(chan); v_add(config, subbuffer_get_records_count(config, - &buf->backend, idx), + &buf->backend, + idx, handle), &buf->records_count); v_add(config, subbuffer_count_records_overrun(config, &buf->backend, - idx), + idx, handle), &buf->records_overrun); config->cb.buffer_end(buf, tsc, idx, lib_ring_buffer_get_data_size(config, buf, - idx)); + idx, + handle), + handle); /* * Set noref flag and offset for this subbuffer id. @@ -348,7 +357,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * are ordered before set noref and offset. */ lib_ring_buffer_set_noref_offset(config, &buf->backend, idx, - buf_trunc_val(offset, chan)); + buf_trunc_val(offset, chan), handle); /* * Order set_noref and record counter updates before the @@ -358,17 +367,17 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, */ cmm_smp_mb(); /* End of exclusive subbuffer access */ - v_set(config, &shmp(buf->commit_cold)[idx].cc_sb, + v_set(config, &shmp(handle, buf->commit_cold)[idx].cc_sb, commit_count); lib_ring_buffer_vmcore_check_deliver(config, buf, - commit_count, idx); + commit_count, idx, handle); /* * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER && uatomic_read(&buf->active_readers) - && lib_ring_buffer_poll_deliver(config, buf, chan)) { + && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { //wake_up_interruptible(&buf->read_wait); //wake_up_interruptible(&chan->read_wait); } @@ -392,7 +401,8 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c unsigned long idx, unsigned long buf_offset, unsigned long commit_count, - size_t slot_size) + size_t slot_size, + struct shm_handle *handle) { unsigned long offset, commit_seq_old; @@ -410,16 +420,18 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c if (unlikely(subbuf_offset(offset - commit_count, chan))) return; - commit_seq_old = v_read(config, &shmp(buf->commit_hot)[idx].seq); + commit_seq_old = v_read(config, &shmp(handle, buf->commit_hot)[idx].seq); while ((long) (commit_seq_old - commit_count) < 0) - commit_seq_old = v_cmpxchg(config, &shmp(buf->commit_hot)[idx].seq, + commit_seq_old = v_cmpxchg(config, &shmp(handle, buf->commit_hot)[idx].seq, commit_seq_old, commit_count); } extern int lib_ring_buffer_create(struct lib_ring_buffer *buf, struct channel_backend *chanb, int cpu, - struct shm_header *shm_header); -extern void lib_ring_buffer_free(struct lib_ring_buffer *buf); + struct shm_handle *handle, + struct shm_object *shmobj); +extern void lib_ring_buffer_free(struct lib_ring_buffer *buf, + struct shm_handle *handle); /* Keep track of trap nesting inside ring buffer code */ extern __thread unsigned int lib_ring_buffer_nesting; diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index 28179dfc..974f782e 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -26,7 +26,7 @@ #include #include #include "backend_types.h" -#include "shm.h" +#include "shm_internal.h" /* * A switch is done during tracing or as a final flush after tracing (so it @@ -50,7 +50,6 @@ struct channel { unsigned long read_timer_interval; /* Reader wakeup (jiffies) */ //wait_queue_head_t read_wait; /* reader wait queue */ int finalized; /* Has channel been finalized */ - DECLARE_SHMP(struct shm_header, shm_header); } ____cacheline_aligned; /* Per-subbuffer commit counters used on the hot path */ diff --git a/libringbuffer/ring_buffer_abi.c b/libringbuffer/ring_buffer_abi.c index fbf6df53..3279ccd9 100644 --- a/libringbuffer/ring_buffer_abi.c +++ b/libringbuffer/ring_buffer_abi.c @@ -241,7 +241,7 @@ long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long ar arg); } case RING_BUFFER_FLUSH: - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle); return 0; default: return -ENOIOCTLCMD; @@ -354,7 +354,7 @@ long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd, return put_ulong(read_offset, arg); } case RING_BUFFER_FLUSH: - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle); return 0; default: return -ENOIOCTLCMD; diff --git a/libringbuffer/ring_buffer_backend.c b/libringbuffer/ring_buffer_backend.c index 3a5e6a9b..badde80f 100644 --- a/libringbuffer/ring_buffer_backend.c +++ b/libringbuffer/ring_buffer_backend.c @@ -29,9 +29,10 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config struct lib_ring_buffer_backend *bufb, size_t size, size_t num_subbuf, int extra_reader_sb, - struct shm_header *shm_header) + struct shm_handle *handle, + struct shm_object *shmobj) { - struct channel_backend *chanb = &shmp(bufb->chan)->backend; + struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend; unsigned long subbuf_size, mmap_offset = 0; unsigned long num_subbuf_alloc; unsigned long i; @@ -42,43 +43,42 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config if (extra_reader_sb) num_subbuf_alloc++; - /* Align the entire buffer backend data on PAGE_SIZE */ - align_shm(shm_header, PAGE_SIZE); - set_shmp(bufb->array, zalloc_shm(shm_header, - sizeof(*bufb->array) * num_subbuf_alloc)); - if (unlikely(!shmp(bufb->array))) + align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_pages_shmp)); + set_shmp(bufb->array, zalloc_shm(shmobj, + sizeof(struct lib_ring_buffer_backend_pages_shmp) * num_subbuf_alloc)); + if (unlikely(!shmp(handle, bufb->array))) goto array_error; /* * This is the largest element (the buffer pages) which needs to * be aligned on PAGE_SIZE. */ - align_shm(shm_header, PAGE_SIZE); - set_shmp(bufb->memory_map, zalloc_shm(shm_header, + align_shm(shmobj, PAGE_SIZE); + set_shmp(bufb->memory_map, zalloc_shm(shmobj, subbuf_size * num_subbuf_alloc)); - if (unlikely(!shmp(bufb->memory_map))) + if (unlikely(!shmp(handle, bufb->memory_map))) goto memory_map_error; /* Allocate backend pages array elements */ for (i = 0; i < num_subbuf_alloc; i++) { - align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_pages)); - set_shmp(bufb->array[i], - zalloc_shm(shm_header, + align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_pages)); + set_shmp(shmp(handle, bufb->array)[i].shmp, + zalloc_shm(shmobj, sizeof(struct lib_ring_buffer_backend_pages))); - if (!shmp(bufb->array[i])) + if (!shmp(handle, shmp(handle, bufb->array)[i].shmp)) goto free_array; } /* Allocate write-side subbuffer table */ - align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_subbuffer)); - bufb->buf_wsb = zalloc_shm(shm_header, + align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_subbuffer)); + set_shmp(bufb->buf_wsb, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer_backend_subbuffer) - * num_subbuf); - if (unlikely(!shmp(bufb->buf_wsb))) + * num_subbuf)); + if (unlikely(!shmp(handle, bufb->buf_wsb))) goto free_array; for (i = 0; i < num_subbuf; i++) - shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i); + shmp(handle, bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i); /* Assign read-side subbuffer table */ if (extra_reader_sb) @@ -89,19 +89,19 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config /* Assign pages to page index */ for (i = 0; i < num_subbuf_alloc; i++) { - set_shmp(shmp(bufb->array)[i]->p, - &shmp(bufb->memory_map)[i * subbuf_size]); + struct shm_ref ref; + + ref.index = bufb->memory_map._ref.index; + ref.offset = bufb->memory_map._ref.offset; + ref.offset += i * subbuf_size; + + set_shmp(shmp(handle, shmp(handle, bufb->array)[i].shmp)->p, + ref); if (config->output == RING_BUFFER_MMAP) { - shmp(bufb->array)[i]->mmap_offset = mmap_offset; + shmp(handle, shmp(handle, bufb->array)[i].shmp)->mmap_offset = mmap_offset; mmap_offset += subbuf_size; } } - /* - * Align the end of each buffer backend data on PAGE_SIZE, to - * behave like an array which contains elements that need to be - * aligned on PAGE_SIZE. - */ - align_shm(shm_header, PAGE_SIZE); return 0; @@ -115,17 +115,18 @@ array_error: int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb, struct channel_backend *chanb, int cpu, - struct shm_header *shm_header) + struct shm_handle *handle, + struct shm_object *shmobj) { const struct lib_ring_buffer_config *config = chanb->config; - set_shmp(&bufb->chan, caa_container_of(chanb, struct channel, backend)); + set_shmp(bufb->chan, handle->chan._ref); bufb->cpu = cpu; return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size, chanb->num_subbuf, chanb->extra_reader_sb, - shm_header); + handle, shmobj); } void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb) @@ -136,9 +137,10 @@ void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb) bufb->allocated = 0; } -void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) +void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb, + struct shm_handle *handle) { - struct channel_backend *chanb = &shmp(bufb->chan)->backend; + struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; unsigned long num_subbuf_alloc; unsigned int i; @@ -148,7 +150,7 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) num_subbuf_alloc++; for (i = 0; i < chanb->num_subbuf; i++) - shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i); + shmp(handle, bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i); if (chanb->extra_reader_sb) bufb->buf_rsb.id = subbuffer_id(config, 0, 1, num_subbuf_alloc - 1); @@ -157,9 +159,9 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) for (i = 0; i < num_subbuf_alloc; i++) { /* Don't reset mmap_offset */ - v_set(config, &shmp(bufb->array)[i]->records_commit, 0); - v_set(config, &shmp(bufb->array)[i]->records_unread, 0); - shmp(bufb->array)[i]->data_size = 0; + v_set(config, &shmp(handle, shmp(handle, bufb->array)[i].shmp)->records_commit, 0); + v_set(config, &shmp(handle, shmp(handle, bufb->array)[i].shmp)->records_unread, 0); + shmp(handle, shmp(handle, bufb->array)[i].shmp)->data_size = 0; /* Don't reset backend page and virt addresses */ } /* Don't reset num_pages_per_subbuf, cpu, allocated */ @@ -192,7 +194,7 @@ void channel_backend_reset(struct channel_backend *chanb) * @parent: dentry of parent directory, %NULL for root directory * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2) * @num_subbuf: number of sub-buffers (power of 2) - * @shm_header: shared memory header + * @shm_handle: shared memory handle * * Returns channel pointer if successful, %NULL otherwise. * @@ -206,11 +208,12 @@ int channel_backend_init(struct channel_backend *chanb, const char *name, const struct lib_ring_buffer_config *config, void *priv, size_t subbuf_size, size_t num_subbuf, - struct shm_header *shm_header) + struct shm_handle *handle) { struct channel *chan = caa_container_of(chanb, struct channel, backend); unsigned int i; int ret; + size_t shmsize = 0, bufshmsize = 0, num_subbuf_alloc; if (!name) return -EPERM; @@ -245,39 +248,58 @@ int channel_backend_init(struct channel_backend *chanb, chanb->name[NAME_MAX - 1] = '\0'; chanb->config = config; + /* Per-cpu buffer size: control (prior to backend) */ + shmsize = offset_align(shmsize, __alignof__(struct lib_ring_buffer)); + shmsize += sizeof(struct lib_ring_buffer); + + /* Per-cpu buffer size: backend */ + /* num_subbuf + 1 is the worse case */ + num_subbuf_alloc = num_subbuf + 1; + shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer_backend_pages_shmp)); + shmsize += sizeof(struct lib_ring_buffer_backend_pages_shmp) * num_subbuf_alloc; + shmsize += offset_align(bufshmsize, PAGE_SIZE); + shmsize += subbuf_size * num_subbuf_alloc; + shmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages)); + shmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc; + shmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer)); + shmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf; + /* Per-cpu buffer size: control (after backend) */ + shmsize += offset_align(shmsize, __alignof__(struct commit_counters_hot)); + shmsize += sizeof(struct commit_counters_hot) * num_subbuf; + shmsize += offset_align(shmsize, __alignof__(struct commit_counters_cold)); + shmsize += sizeof(struct commit_counters_cold) * num_subbuf; + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { struct lib_ring_buffer *buf; - size_t alloc_size; - - /* Allocating the buffer per-cpu structures */ - align_shm(shm_header, __alignof__(struct lib_ring_buffer)); - alloc_size = sizeof(struct lib_ring_buffer); - buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus()); - if (!buf) - goto end; - set_shmp(chanb->buf, buf); - /* * We need to allocate for all possible cpus. */ for_each_possible_cpu(i) { - ret = lib_ring_buffer_create(&shmp(chanb->buf)[i], - chanb, i, shm_header); + struct shm_object *shmobj; + + shmobj = shm_object_table_append(handle->table, shmsize); + align_shm(shmobj, __alignof__(struct lib_ring_buffer)); + set_shmp(chanb->buf[i].shmp, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer))); + buf = shmp(handle, chanb->buf[i].shmp); + if (!buf) + goto end; + ret = lib_ring_buffer_create(buf, chanb, i, + handle, shmobj); if (ret) goto free_bufs; /* cpu hotplug locked */ } } else { + struct shm_object *shmobj; struct lib_ring_buffer *buf; - size_t alloc_size; - align_shm(shm_header, __alignof__(struct lib_ring_buffer)); - alloc_size = sizeof(struct lib_ring_buffer); - buf = zalloc_shm(shm_header, alloc_size); + shmobj = shm_object_table_append(handle->table, shmsize); + align_shm(shmobj, __alignof__(struct lib_ring_buffer)); + set_shmp(chanb->buf[0].shmp, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer))); + buf = shmp(handle, chanb->buf[0].shmp); if (!buf) goto end; - set_shmp(chanb->buf, buf); - ret = lib_ring_buffer_create(shmp(chanb->buf), chanb, -1, - shm_header); + ret = lib_ring_buffer_create(buf, chanb, -1, + handle, shmobj); if (ret) goto free_bufs; } @@ -288,11 +310,11 @@ int channel_backend_init(struct channel_backend *chanb, free_bufs: if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(i) { - struct lib_ring_buffer *buf = &shmp(chanb->buf)[i]; + struct lib_ring_buffer *buf = shmp(handle, chanb->buf[i].shmp); if (!buf->backend.allocated) continue; - lib_ring_buffer_free(buf); + lib_ring_buffer_free(buf, handle); } } /* We only free the buffer data upon shm teardown */ @@ -306,24 +328,25 @@ end: * * Destroy all channel buffers and frees the channel. */ -void channel_backend_free(struct channel_backend *chanb) +void channel_backend_free(struct channel_backend *chanb, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chanb->config; unsigned int i; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(i) { - struct lib_ring_buffer *buf = &shmp(chanb->buf)[i]; + struct lib_ring_buffer *buf = shmp(handle, chanb->buf[i].shmp); if (!buf->backend.allocated) continue; - lib_ring_buffer_free(buf); + lib_ring_buffer_free(buf, handle); } } else { - struct lib_ring_buffer *buf = shmp(chanb->buf); + struct lib_ring_buffer *buf = shmp(handle, chanb->buf[0].shmp); CHAN_WARN_ON(chanb, !buf->backend.allocated); - lib_ring_buffer_free(buf); + lib_ring_buffer_free(buf, handle); } /* We only free the buffer data upon shm teardown */ } @@ -339,12 +362,12 @@ void channel_backend_free(struct channel_backend *chanb) * Returns the length copied. */ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, - void *dest, size_t len) + void *dest, size_t len, struct shm_handle *handle) { - struct channel_backend *chanb = &shmp(bufb->chan)->backend; + struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; ssize_t orig_len; - struct lib_ring_buffer_backend_pages *rpages; + struct lib_ring_buffer_backend_pages_shmp *rpages; unsigned long sb_bindex, id; orig_len = len; @@ -354,7 +377,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, return 0; id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = shmp(bufb->array)[sb_bindex]; + rpages = &shmp(handle, bufb->array)[sb_bindex]; /* * Underlying layer should never ask for reads across * subbuffers. @@ -362,7 +385,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, CHAN_WARN_ON(chanb, offset >= chanb->buf_size); CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - memcpy(dest, shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), len); + memcpy(dest, shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)), len); return orig_len; } @@ -377,20 +400,20 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, * Should be protected by get_subbuf/put_subbuf. */ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset, - void *dest, size_t len) + void *dest, size_t len, struct shm_handle *handle) { - struct channel_backend *chanb = &shmp(bufb->chan)->backend; + struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; ssize_t string_len, orig_offset; char *str; - struct lib_ring_buffer_backend_pages *rpages; + struct lib_ring_buffer_backend_pages_shmp *rpages; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; orig_offset = offset; id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = shmp(bufb->array)[sb_bindex]; + rpages = &shmp(handle, bufb->array)[sb_bindex]; /* * Underlying layer should never ask for reads across * subbuffers. @@ -398,7 +421,7 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse CHAN_WARN_ON(chanb, offset >= chanb->buf_size); CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - str = (char *)shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)); + str = (char *)shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)); string_len = strnlen(str, len); if (dest && len) { memcpy(dest, str, string_len); @@ -418,20 +441,21 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse * as long as the write is never bigger than a page size. */ void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb, - size_t offset) + size_t offset, + struct shm_handle *handle) { - struct lib_ring_buffer_backend_pages *rpages; - struct channel_backend *chanb = &shmp(bufb->chan)->backend; + struct lib_ring_buffer_backend_pages_shmp *rpages; + struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = shmp(bufb->array)[sb_bindex]; + rpages = &shmp(handle, bufb->array)[sb_bindex]; CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)); + return shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)); } /** @@ -445,20 +469,21 @@ void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb, * address, as long as the write is never bigger than a page size. */ void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb, - size_t offset) + size_t offset, + struct shm_handle *handle) { size_t sbidx; - struct lib_ring_buffer_backend_pages *rpages; - struct channel_backend *chanb = &shmp(bufb->chan)->backend; + struct lib_ring_buffer_backend_pages_shmp *rpages; + struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; sbidx = offset >> chanb->subbuf_size_order; - id = shmp(bufb->buf_wsb)[sbidx].id; + id = shmp(handle, bufb->buf_wsb)[sbidx].id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = shmp(bufb->array)[sb_bindex]; + rpages = &shmp(handle, bufb->array)[sb_bindex]; CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)); + return shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)); } diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index b87e554e..6cd869c3 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -85,16 +85,18 @@ __thread unsigned int lib_ring_buffer_nesting; static void lib_ring_buffer_print_errors(struct channel *chan, - struct lib_ring_buffer *buf, int cpu); + struct lib_ring_buffer *buf, int cpu, + struct shm_handle *handle); /* * Must be called under cpu hotplug protection. */ -void lib_ring_buffer_free(struct lib_ring_buffer *buf) +void lib_ring_buffer_free(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); - lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); + lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle); /* buf->commit_hot will be freed by shm teardown */ /* buf->commit_cold will be freed by shm teardown */ @@ -110,9 +112,10 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) * should not be using the iterator concurrently with reset. The previous * current iterator record is reset. */ -void lib_ring_buffer_reset(struct lib_ring_buffer *buf) +void lib_ring_buffer_reset(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned int i; @@ -122,14 +125,14 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) */ v_set(config, &buf->offset, 0); for (i = 0; i < chan->backend.num_subbuf; i++) { - v_set(config, &shmp(buf->commit_hot)[i].cc, 0); - v_set(config, &shmp(buf->commit_hot)[i].seq, 0); - v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0); + v_set(config, &shmp(handle, buf->commit_hot)[i].cc, 0); + v_set(config, &shmp(handle, buf->commit_hot)[i].seq, 0); + v_set(config, &shmp(handle, buf->commit_cold)[i].cc_sb, 0); } uatomic_set(&buf->consumed, 0); uatomic_set(&buf->record_disabled, 0); v_set(config, &buf->last_tsc, 0); - lib_ring_buffer_backend_reset(&buf->backend); + lib_ring_buffer_backend_reset(&buf->backend, handle); /* Don't reset number of active readers */ v_set(config, &buf->records_lost_full, 0); v_set(config, &buf->records_lost_wrap, 0); @@ -166,7 +169,8 @@ void channel_reset(struct channel *chan) */ int lib_ring_buffer_create(struct lib_ring_buffer *buf, struct channel_backend *chanb, int cpu, - struct shm_header *shm_header) + struct shm_handle *handle, + struct shm_object *shmobj) { const struct lib_ring_buffer_config *config = chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); @@ -181,26 +185,24 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, return 0; ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, - cpu, shm_header); + cpu, handle, shmobj); if (ret) return ret; - align_shm(shm_header, - max(__alignof__(struct commit_counters_hot), - __alignof__(struct commit_counters_cold))); - set_shmp(&buf->commit_hot, - zalloc_shm(shm_header, - sizeof(*buf->commit_hot) * chan->backend.num_subbuf)); - if (!shmp(buf->commit_hot)) { + align_shm(shmobj, __alignof__(struct commit_counters_hot)); + set_shmp(buf->commit_hot, + zalloc_shm(shmobj, + sizeof(struct commit_counters_hot) * chan->backend.num_subbuf)); + if (!shmp(handle, buf->commit_hot)) { ret = -ENOMEM; goto free_chanbuf; } - align_shm(shm_header, __alignof__(struct commit_counters_cold)); - set_shmp(&buf->commit_cold, - zalloc_shm(shm_header, - sizeof(*buf->commit_cold) * chan->backend.num_subbuf)); - if (!shmp(buf->commit_cold)) { + align_shm(shmobj, __alignof__(struct commit_counters_cold)); + set_shmp(buf->commit_cold, + zalloc_shm(shmobj, + sizeof(struct commit_counters_cold) * chan->backend.num_subbuf)); + if (!shmp(handle, buf->commit_cold)) { ret = -ENOMEM; goto free_commit; } @@ -214,13 +216,13 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, */ subbuf_header_size = config->cb.subbuffer_header_size(); v_set(config, &buf->offset, subbuf_header_size); - subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id); - tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan)); - config->cb.buffer_begin(buf, tsc, 0); - v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc); + subbuffer_id_clear_noref(config, &shmp(handle, buf->backend.buf_wsb)[0].id); + tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan)); + config->cb.buffer_begin(buf, tsc, 0, handle); + v_add(config, subbuf_header_size, &shmp(handle, buf->commit_hot)[0].cc); if (config->cb.buffer_create) { - ret = config->cb.buffer_create(buf, priv, cpu, chanb->name); + ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle); if (ret) goto free_init; } @@ -237,17 +239,18 @@ free_chanbuf: return ret; } +#if 0 static void switch_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; /* * Only flush buffers periodically if readers are active. */ if (uatomic_read(&buf->active_readers)) - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle); //TODO timers //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) @@ -257,10 +260,12 @@ static void switch_buffer_timer(unsigned long data) // mod_timer(&buf->switch_timer, // jiffies + chan->switch_timer_interval); } +#endif //0 -static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (!chan->switch_timer_interval || buf->switch_timer_enabled) @@ -277,9 +282,10 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) buf->switch_timer_enabled = 1; } -static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); if (!chan->switch_timer_interval || !buf->switch_timer_enabled) return; @@ -289,13 +295,14 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) buf->switch_timer_enabled = 0; } +#if 0 /* * Polling timer to check the channels for data. */ static void read_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); @@ -315,10 +322,12 @@ static void read_buffer_timer(unsigned long data) // mod_timer(&buf->read_timer, // jiffies + chan->read_timer_interval); } +#endif //0 -static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -339,9 +348,10 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) buf->read_timer_enabled = 1; } -static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) +static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -355,7 +365,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) * do one more check to catch data that has been written in the last * timer period. */ - if (lib_ring_buffer_poll_deliver(config, buf, chan)) { + if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { //TODO //wake_up_interruptible(&buf->read_wait); //wake_up_interruptible(&chan->read_wait); @@ -363,45 +373,36 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) buf->read_timer_enabled = 0; } -static void channel_unregister_notifiers(struct channel *chan) +static void channel_unregister_notifiers(struct channel *chan, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; int cpu; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; + struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); + lib_ring_buffer_stop_switch_timer(buf, handle); + lib_ring_buffer_stop_read_timer(buf, handle); } } else { - struct lib_ring_buffer *buf = shmp(chan->backend.buf); + struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); + lib_ring_buffer_stop_switch_timer(buf, handle); + lib_ring_buffer_stop_read_timer(buf, handle); } //channel_backend_unregister_notifiers(&chan->backend); } -static void channel_free(struct shm_handle *handle) +static void channel_free(struct channel *chan, struct shm_handle *handle) { - struct shm_header *header = handle->header; - struct channel *chan = shmp(header->chan); int ret; - channel_backend_free(&chan->backend); + channel_backend_free(&chan->backend, handle); /* chan is freed by shm teardown */ - ret = munmap(header, header->shm_size); - if (ret) { - PERROR("umnmap"); - assert(0); - } - ret = close(handle->shmfd); - if (ret) { - PERROR("close"); - assert(0); - } + shm_object_table_destroy(handle->table); + free(handle); } /** @@ -428,12 +429,11 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval) { - int ret, cpu, shmfd; + int ret, cpu; + size_t shmsize; struct channel *chan; - size_t shmsize, bufshmsize, bufshmalign; - struct shm_header *shm_header; - unsigned long num_subbuf_alloc; struct shm_handle *handle; + struct shm_object *shmobj; if (lib_ring_buffer_check_config(config, switch_timer_interval, read_timer_interval)) @@ -443,88 +443,28 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, if (!handle) return NULL; - /* Calculate the shm allocation layout */ - shmsize = sizeof(struct shm_header); - shmsize += offset_align(shmsize, __alignof__(struct channel)); - shmsize += sizeof(struct channel); - - /* Per-cpu buffer size: control (prior to backend) */ - shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer)); - bufshmsize = sizeof(struct lib_ring_buffer); - shmsize += bufshmsize * num_possible_cpus(); - - /* Per-cpu buffer size: backend */ - shmsize += offset_align(shmsize, PAGE_SIZE); - /* num_subbuf + 1 is the worse case */ - num_subbuf_alloc = num_subbuf + 1; - bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc; - bufshmsize += offset_align(bufshmsize, PAGE_SIZE); - bufshmsize += subbuf_size * num_subbuf_alloc; - bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages)); - bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc; - bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer)); - bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf; - bufshmsize += offset_align(bufshmsize, PAGE_SIZE); - shmsize += bufshmsize * num_possible_cpus(); - - /* Per-cpu buffer size: control (after backend) */ - shmsize += offset_align(shmsize, - max(__alignof__(struct commit_counters_hot), - __alignof__(struct commit_counters_cold))); - bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf; - bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold)); - bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf; - shmsize += bufshmsize * num_possible_cpus(); - - /* - * Allocate shm, and immediately unlink its shm oject, keeping - * only the file descriptor as a reference to the object. If it - * already exists (caused by short race window during which the - * global object exists in a concurrent shm_open), simply retry. - */ - do { - shmfd = shm_open("/ust-shm-tmp", - O_CREAT | O_EXCL | O_RDWR, 0700); - } while (shmfd < 0 && errno == EEXIST); - if (shmfd < 0) { - PERROR("shm_open"); - goto error_shm_open; - } - ret = shm_unlink("/ust-shm-tmp"); - if (ret) { - PERROR("shm_unlink"); - goto error_unlink; - } - ret = ftruncate(shmfd, shmsize); - if (ret) { - PERROR("ftruncate"); - goto error_ftruncate; - } + /* Allocate table for channel + per-cpu buffers */ + handle->table = shm_object_table_create(1 + num_possible_cpus()); + if (!handle->table) + goto error_table_alloc; - shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE, - MAP_SHARED, shmfd, 0); - if (shm_header == MAP_FAILED) { - PERROR("mmap"); - goto error_mmap; - } - - shm_header->magic = SHM_MAGIC; - shm_header->major = SHM_MAJOR; - shm_header->major = SHM_MINOR; - shm_header->bits_per_long = CAA_BITS_PER_LONG; - shm_header->shm_size = shmsize; - shm_header->shm_allocated = sizeof(struct shm_header); + /* Calculate the shm allocation layout */ + shmsize = sizeof(struct channel); + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + shmsize += sizeof(struct lib_ring_buffer_shmp) * num_possible_cpus(); + else + shmsize += sizeof(struct lib_ring_buffer_shmp); - align_shm(shm_header, __alignof__(struct channel)); - chan = zalloc_shm(shm_header, sizeof(struct channel)); + shmobj = shm_object_table_append(handle->table, shmsize); + set_shmp(handle->chan, zalloc_shm(shmobj, sizeof(struct channel))); + chan = shmp(handle, handle->chan); if (!chan) - goto destroy_shmem; - set_shmp(shm_header->chan, chan); + goto error_append; ret = channel_backend_init(&chan->backend, name, config, priv, - subbuf_size, num_subbuf, shm_header); + subbuf_size, num_subbuf, handle); if (ret) - goto destroy_shmem; + goto error_backend_init; chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); //TODO @@ -541,44 +481,31 @@ struct shm_handle *channel_create(const struct lib_ring_buffer_config *config, * In that off case, we need to allocate for all possible cpus. */ for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); + struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); + lib_ring_buffer_start_switch_timer(buf, handle); + lib_ring_buffer_start_read_timer(buf, handle); } } else { - struct lib_ring_buffer *buf = shmp(chan->backend.buf); + struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); + lib_ring_buffer_start_switch_timer(buf, handle); + lib_ring_buffer_start_read_timer(buf, handle); } - handle->header = shm_header; - handle->shmfd = shmfd; return handle; -destroy_shmem: - ret = munmap(shm_header, shmsize); - if (ret) { - PERROR("umnmap"); - assert(0); - } -error_mmap: -error_ftruncate: -error_unlink: - ret = close(shmfd); - if (ret) { - PERROR("close"); - assert(0); - } -error_shm_open: +error_backend_init: +error_append: + shm_object_table_destroy(handle->table); +error_table_alloc: free(handle); return NULL; } static -void channel_release(struct shm_handle *handle) +void channel_release(struct channel *chan, struct shm_handle *handle) { - channel_free(handle); + channel_free(chan, handle); } /** @@ -592,26 +519,25 @@ void channel_release(struct shm_handle *handle) * They should release their handle at that point. Returns the private * data pointer. */ -void *channel_destroy(struct shm_handle *handle) +void *channel_destroy(struct channel *chan, struct shm_handle *handle) { - struct shm_header *header = handle->header; - struct channel *chan = shmp(header->chan); const struct lib_ring_buffer_config *config = chan->backend.config; void *priv; int cpu; - channel_unregister_notifiers(chan); + channel_unregister_notifiers(chan, handle); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; + struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, - cpu); + cpu, handle); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, + handle); /* * Perform flush before writing to finalized. */ @@ -620,12 +546,13 @@ void *channel_destroy(struct shm_handle *handle) //wake_up_interruptible(&buf->read_wait); } } else { - struct lib_ring_buffer *buf = shmp(chan->backend.buf); + struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, chan->backend.priv, -1); + config->cb.buffer_finalize(buf, chan->backend.priv, -1, handle); if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH); + lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, + handle); /* * Perform flush before writing to finalized. */ @@ -640,24 +567,26 @@ void *channel_destroy(struct shm_handle *handle) * sessiond/consumer are keeping a reference on the shm file * descriptor directly. No need to refcount. */ - channel_release(handle); + channel_release(chan, handle); priv = chan->backend.priv; return priv; } struct lib_ring_buffer *channel_get_ring_buffer( const struct lib_ring_buffer_config *config, - struct channel *chan, int cpu) + struct channel *chan, int cpu, + struct shm_handle *handle) { if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) - return shmp(chan->backend.buf); + return shmp(handle, chan->backend.buf[0].shmp); else - return &shmp(chan->backend.buf)[cpu]; + return shmp(handle, chan->backend.buf[cpu].shmp); } -int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) +int lib_ring_buffer_open_read(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0) return -EBUSY; @@ -665,9 +594,10 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) return 0; } -void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) +void lib_ring_buffer_release_read(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); cmm_smp_mb(); @@ -685,9 +615,10 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) */ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, - unsigned long *consumed, unsigned long *produced) + unsigned long *consumed, unsigned long *produced, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; @@ -738,10 +669,11 @@ nodata: * @consumed_new: new consumed count value */ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, - unsigned long consumed_new) + unsigned long consumed_new, + struct shm_handle *handle) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan = shmp(bufb->chan); + struct channel *chan = shmp(handle, bufb->chan); unsigned long consumed; CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); @@ -766,9 +698,10 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, * data to read at consumed position, or 0 if the get operation succeeds. */ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, - unsigned long consumed) + unsigned long consumed, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; @@ -782,7 +715,7 @@ retry: cmm_smp_rmb(); consumed_cur = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed, chan); - commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp(handle, buf->commit_cold)[consumed_idx].cc_sb); /* * Make sure we read the commit count before reading the buffer * data and the write offset. Correct consumed offset ordering @@ -832,7 +765,8 @@ retry: * looking for matches the one contained in the subbuffer id. */ ret = update_read_sb_index(config, &buf->backend, &chan->backend, - consumed_idx, buf_trunc_val(consumed, chan)); + consumed_idx, buf_trunc_val(consumed, chan), + handle); if (ret) goto retry; subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id); @@ -857,10 +791,11 @@ nodata: * lib_ring_buffer_put_subbuf - release exclusive subbuffer access * @buf: ring buffer */ -void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) +void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf, + struct shm_handle *handle) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan = shmp(bufb->chan); + struct channel *chan = shmp(handle, bufb->chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; @@ -885,9 +820,9 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) */ read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); v_add(config, v_read(config, - &shmp(bufb->array)[read_sb_bindex]->records_unread), + &shmp(handle, shmp(handle, bufb->array)[read_sb_bindex].shmp)->records_unread), &bufb->records_read); - v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0); + v_set(config, &shmp(handle, shmp(handle, bufb->array)[read_sb_bindex].shmp)->records_unread, 0); CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref(config, &bufb->buf_rsb.id); @@ -902,7 +837,8 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) */ consumed_idx = subbuf_index(consumed, chan); update_read_sb_index(config, &buf->backend, &chan->backend, - consumed_idx, buf_trunc_val(consumed, chan)); + consumed_idx, buf_trunc_val(consumed, chan), + handle); /* * update_read_sb_index return value ignored. Don't exchange sub-buffer * if the writer concurrently updated it. @@ -917,14 +853,15 @@ static void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, struct channel *chan, unsigned long cons_offset, - int cpu) + int cpu, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); - commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc); - commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[cons_idx].cc); + commit_count_sb = v_read(config, &shmp(handle, buf->commit_cold)[cons_idx].cc_sb); if (subbuf_offset(commit_count, chan) != 0) ERRMSG("ring buffer %s, cpu %d: " @@ -942,7 +879,8 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, static void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, struct channel *chan, - void *priv, int cpu) + void *priv, int cpu, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long write_offset, cons_offset; @@ -972,12 +910,13 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, - cons_offset) > 0; cons_offset = subbuf_align(cons_offset, chan)) lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset, - cpu); + cpu, handle); } static void lib_ring_buffer_print_errors(struct channel *chan, - struct lib_ring_buffer *buf, int cpu) + struct lib_ring_buffer *buf, int cpu, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; void *priv = chan->backend.priv; @@ -999,7 +938,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, v_read(config, &buf->records_lost_wrap), v_read(config, &buf->records_lost_big)); - lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu); + lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle); } /* @@ -1011,13 +950,14 @@ static void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc) + u64 tsc, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old, chan); unsigned long commit_count; - config->cb.buffer_begin(buf, tsc, oldidx); + config->cb.buffer_begin(buf, tsc, oldidx, handle); /* * Order all writes to buffer before the commit count update that will @@ -1025,14 +965,15 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, */ cmm_smp_wmb(); v_add(config, config->cb.subbuffer_header_size(), - &shmp(buf->commit_hot)[oldidx].cc); - commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc); + &shmp(handle, buf->commit_hot)[oldidx].cc); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[oldidx].cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, - commit_count, oldidx); + commit_count, oldidx, handle); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, offsets->old, commit_count, - config->cb.subbuffer_header_size()); + config->cb.subbuffer_header_size(), + handle); } /* @@ -1047,7 +988,8 @@ static void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc) + u64 tsc, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long oldidx = subbuf_index(offsets->old - 1, chan); @@ -1055,20 +997,21 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, data_size = subbuf_offset(offsets->old - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; - subbuffer_set_data_size(config, &buf->backend, oldidx, data_size); + subbuffer_set_data_size(config, &buf->backend, oldidx, data_size, + handle); /* * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ cmm_smp_wmb(); - v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc); - commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc); + v_add(config, padding_size, &shmp(handle, buf->commit_hot)[oldidx].cc); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[oldidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, - commit_count, oldidx); + commit_count, oldidx, handle); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, offsets->old, commit_count, - padding_size); + padding_size, handle); } /* @@ -1082,13 +1025,14 @@ static void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc) + u64 tsc, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long beginidx = subbuf_index(offsets->begin, chan); unsigned long commit_count; - config->cb.buffer_begin(buf, tsc, beginidx); + config->cb.buffer_begin(buf, tsc, beginidx, handle); /* * Order all writes to buffer before the commit count update that will @@ -1096,14 +1040,15 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, */ cmm_smp_wmb(); v_add(config, config->cb.subbuffer_header_size(), - &shmp(buf->commit_hot)[beginidx].cc); - commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc); + &shmp(handle, buf->commit_hot)[beginidx].cc); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[beginidx].cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, - commit_count, beginidx); + commit_count, beginidx, handle); lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx, offsets->begin, commit_count, - config->cb.subbuffer_header_size()); + config->cb.subbuffer_header_size(), + handle); } /* @@ -1114,9 +1059,10 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, */ static void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, - struct channel *chan, - struct switch_offsets *offsets, - u64 tsc) + struct channel *chan, + struct switch_offsets *offsets, + u64 tsc, + struct shm_handle *handle) { const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long endidx = subbuf_index(offsets->end - 1, chan); @@ -1124,20 +1070,21 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, data_size = subbuf_offset(offsets->end - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; - subbuffer_set_data_size(config, &buf->backend, endidx, data_size); + subbuffer_set_data_size(config, &buf->backend, endidx, data_size, + handle); /* * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ cmm_smp_wmb(); - v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc); - commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc); + v_add(config, padding_size, &shmp(handle, buf->commit_hot)[endidx].cc); + commit_count = v_read(config, &shmp(handle, buf->commit_hot)[endidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, - commit_count, endidx); + commit_count, endidx, handle); lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, offsets->end, commit_count, - padding_size); + padding_size, handle); } /* @@ -1208,9 +1155,10 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * operations, this function must be called from the CPU which owns the buffer * for a ACTIVE flush. */ -void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode) +void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; @@ -1242,20 +1190,20 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old); oldidx = subbuf_index(offsets.old, chan); - lib_ring_buffer_clear_noref(config, &buf->backend, oldidx); + lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle); /* * May need to populate header start on SWITCH_FLUSH. */ if (offsets.switch_old_start) { - lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc); + lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle); offsets.old += config->cb.subbuffer_header_size(); } /* * Switch old subbuffer. */ - lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle); } /* @@ -1272,6 +1220,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, struct lib_ring_buffer_ctx *ctx) { const struct lib_ring_buffer_config *config = chan->backend.config; + struct shm_handle *handle = ctx->handle; unsigned long reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); @@ -1321,7 +1270,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, (buf_trunc(offsets->begin, chan) >> chan->backend.num_subbuf_order) - ((unsigned long) v_read(config, - &shmp(buf->commit_cold)[sb_index].cc_sb) + &shmp(handle, buf->commit_cold)[sb_index].cc_sb) & chan->commit_count_mask); if (likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ @@ -1406,15 +1355,16 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) { struct channel *chan = ctx->chan; + struct shm_handle *handle = ctx->handle; const struct lib_ring_buffer_config *config = chan->backend.config; struct lib_ring_buffer *buf; struct switch_offsets offsets; int ret; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - buf = &shmp(chan->backend.buf)[ctx->cpu]; + buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp); else - buf = shmp(chan->backend.buf); + buf = shmp(handle, chan->backend.buf[0].shmp); ctx->buf = buf; offsets.size = 0; @@ -1445,25 +1395,27 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) * Clear noref flag for this subbuffer. */ lib_ring_buffer_clear_noref(config, &buf->backend, - subbuf_index(offsets.end - 1, chan)); + subbuf_index(offsets.end - 1, chan), + handle); /* * Switch old subbuffer if needed. */ if (unlikely(offsets.switch_old_end)) { lib_ring_buffer_clear_noref(config, &buf->backend, - subbuf_index(offsets.old - 1, chan)); - lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc); + subbuf_index(offsets.old - 1, chan), + handle); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle); } /* * Populate new subbuffer. */ if (unlikely(offsets.switch_new_start)) - lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc); + lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle); if (unlikely(offsets.switch_new_end)) - lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc); + lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle); ctx->slot_size = offsets.size; ctx->pre_offset = offsets.begin; diff --git a/libringbuffer/shm.c b/libringbuffer/shm.c new file mode 100644 index 00000000..8f158e2f --- /dev/null +++ b/libringbuffer/shm.c @@ -0,0 +1,177 @@ +/* + * libringbuffer/shm.c + * + * Copyright 2011 (c) - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include "shm.h" +#include +#include +#include +#include /* For mode constants */ +#include /* For O_* constants */ +#include +#include + +struct shm_object_table *shm_object_table_create(size_t max_nb_obj) +{ + struct shm_object_table *table; + + table = zmalloc(sizeof(struct shm_object_table) + + max_nb_obj * sizeof(table->objects[0])); + table->size = max_nb_obj; + return table; +} + +struct shm_object *shm_object_table_append(struct shm_object_table *table, + size_t memory_map_size) +{ + int shmfd, waitfd[2], ret, i; + struct shm_object *obj; + char *memory_map; + + if (table->allocated_len >= table->size) + return NULL; + obj = &table->objects[table->allocated_len++]; + + /* wait_fd: create pipe */ + ret = pipe(waitfd); + if (ret < 0) { + PERROR("pipe"); + goto error_pipe; + } + for (i = 0; i < 2; i++) { + ret = fcntl(waitfd[i], F_SETFD, FD_CLOEXEC); + if (ret < 0) { + PERROR("fcntl"); + goto error_fcntl; + } + } + *obj->wait_fd = *waitfd; + + /* shm_fd: create shm */ + + /* + * Allocate shm, and immediately unlink its shm oject, keeping + * only the file descriptor as a reference to the object. If it + * already exists (caused by short race window during which the + * global object exists in a concurrent shm_open), simply retry. + * We specifically do _not_ use the / at the beginning of the + * pathname so that some OS implementations can keep it local to + * the process (POSIX leaves this implementation-defined). + */ + do { + shmfd = shm_open("ust-shm-tmp", + O_CREAT | O_EXCL | O_RDWR, 0700); + } while (shmfd < 0 && errno == EEXIST); + if (shmfd < 0) { + PERROR("shm_open"); + goto error_shm_open; + } + ret = shm_unlink("ust-shm-tmp"); + if (ret) { + PERROR("shm_unlink"); + goto error_unlink; + } + ret = ftruncate(shmfd, memory_map_size); + if (ret) { + PERROR("ftruncate"); + goto error_ftruncate; + } + obj->shm_fd = shmfd; + + /* memory_map: mmap */ + memory_map = mmap(NULL, memory_map_size, PROT_READ | PROT_WRITE, + MAP_SHARED, shmfd, 0); + if (memory_map == MAP_FAILED) { + PERROR("mmap"); + goto error_mmap; + } + obj->memory_map = memory_map; + obj->memory_map_size = memory_map_size; + obj->allocated_len = 0; + return obj; + +error_mmap: +error_ftruncate: +error_unlink: + ret = close(shmfd); + if (ret) { + PERROR("close"); + assert(0); + } +error_shm_open: +error_fcntl: + for (i = 0; i < 2; i++) { + ret = close(waitfd[i]); + if (ret) { + PERROR("close"); + assert(0); + } + } +error_pipe: + free(obj); + return NULL; + +} + +static +void shmp_object_destroy(struct shm_object *obj) +{ + int ret, i; + + ret = munmap(obj->memory_map, obj->memory_map_size); + if (ret) { + PERROR("umnmap"); + assert(0); + } + ret = close(obj->shm_fd); + if (ret) { + PERROR("close"); + assert(0); + } + for (i = 0; i < 2; i++) { + ret = close(obj->wait_fd[i]); + if (ret) { + PERROR("close"); + assert(0); + } + } +} + +void shm_object_table_destroy(struct shm_object_table *table) +{ + int i; + + for (i = 0; i < table->allocated_len; i++) + shmp_object_destroy(&table->objects[i]); + free(table); +} + +/* + * zalloc_shm - allocate memory within a shm object. + * + * Shared memory is already zeroed by shmget. + * *NOT* multithread-safe (should be protected by mutex). + * Returns a -1, -1 tuple on error. + */ +struct shm_ref zalloc_shm(struct shm_object *obj, size_t len) +{ + struct shm_ref ref; + struct shm_ref shm_ref_error = { -1, -1 }; + + if (obj->memory_map_size - obj->allocated_len < len) + return shm_ref_error; + ref.index = obj->index; + ref.offset = obj->allocated_len; + obj->allocated_len += len; + return ref; +} + +void align_shm(struct shm_object *obj, size_t align) +{ + size_t offset_len = offset_align(obj->allocated_len, align); + obj->allocated_len += offset_len; +} diff --git a/libringbuffer/shm.h b/libringbuffer/shm.h index 71624e39..d370375b 100644 --- a/libringbuffer/shm.h +++ b/libringbuffer/shm.h @@ -12,73 +12,60 @@ #include #include #include "ust/core.h" +#include "shm_types.h" #define SHM_MAGIC 0x54335433 #define SHM_MAJOR 0 #define SHM_MINOR 1 /* - * Defining a max shm offset, for debugging purposes. + * Pointer dereferencing. We don't trust the shm_ref, so we validate + * both the index and offset with known boundaries. */ -#if (CAA_BITS_PER_LONG == 32) -/* Define the maximum shared memory size to 128MB on 32-bit machines */ -#define MAX_SHM_SIZE 134217728 -#else -/* Define the maximum shared memory size to 8GB on 64-bit machines */ -#define MAX_SHM_SIZE 8589934592 -#endif - -#define DECLARE_SHMP(type, name) type *****name - -struct shm_header { - uint32_t magic; - uint8_t major; - uint8_t minor; - uint8_t bits_per_long; - size_t shm_size, shm_allocated; - - DECLARE_SHMP(struct channel, chan); -}; - -struct shm_handle { - struct shm_header *header; /* beginning of mapping */ - int shmfd; /* process-local file descriptor */ -}; - -#define shmp(shm_offset) \ - ((__typeof__(****(shm_offset))) (((char *) &(shm_offset)) + (ptrdiff_t) (shm_offset))) - -#define _shmp_abs(a) ((a < 0) ? -(a) : (a)) - -static inline -void _set_shmp(ptrdiff_t *shm_offset, void *ptr) -{ - *shm_offset = (((char *) ptr) - ((char *) shm_offset)); - assert(_shmp_abs(*shm_offset) < MAX_SHM_SIZE); -} - -#define set_shmp(shm_offset, ptr) \ - _set_shmp((ptrdiff_t *) ****(shm_offset), ptr) - -/* Shared memory is already zeroed by shmget */ -/* *NOT* multithread-safe (should be protected by mutex) */ static inline -void *zalloc_shm(struct shm_header *shm_header, size_t len) +char *_shmp(struct shm_object_table *table, struct shm_ref *ref) { - void *ret; + struct shm_object *obj; + size_t index, offset; - if (shm_header->shm_size - shm_header->shm_allocated < len) + index = (size_t) ref->index; + if (unlikely(index >= table->allocated_len)) + return NULL; + obj = &table->objects[index]; + offset = (size_t) ref->offset; + if (unlikely(offset >= obj->memory_map_size)) return NULL; - ret = (char *) shm_header + shm_header->shm_allocated; - shm_header->shm_allocated += len; - return ret; + return &obj->memory_map[offset]; } +#define shmp(handle, ref) \ + ({ \ + __typeof__((ref)._type) ____ptr_ret; \ + ____ptr_ret = (__typeof__(____ptr_ret)) _shmp((handle)->table, &(ref)._ref); \ + ____ptr_ret; \ + }) + static inline -void align_shm(struct shm_header *shm_header, size_t align) +void _set_shmp(struct shm_ref *ref, struct shm_ref src) { - size_t offset_len = offset_align(shm_header->shm_allocated, align); - shm_header->shm_allocated += offset_len; + *ref = src; } +#define set_shmp(ref, src) _set_shmp(&(ref)._ref, src) + +struct shm_object_table *shm_object_table_create(size_t max_nb_obj); +void shm_object_table_destroy(struct shm_object_table *table); +struct shm_object *shm_object_table_append(struct shm_object_table *table, + size_t memory_map_size); + +/* + * zalloc_shm - allocate memory within a shm object. + * + * Shared memory is already zeroed by shmget. + * *NOT* multithread-safe (should be protected by mutex). + * Returns a -1, -1 tuple on error. + */ +struct shm_ref zalloc_shm(struct shm_object *obj, size_t len); +void align_shm(struct shm_object *obj, size_t align); + #endif /* _LIBRINGBUFFER_SHM_H */ diff --git a/libringbuffer/shm_internal.h b/libringbuffer/shm_internal.h new file mode 100644 index 00000000..77f345d3 --- /dev/null +++ b/libringbuffer/shm_internal.h @@ -0,0 +1,23 @@ +#ifndef _LIBRINGBUFFER_SHM_INTERNAL_H +#define _LIBRINGBUFFER_SHM_INTERNAL_H + +/* + * libringbuffer/shm_internal.h + * + * Copyright 2011 (c) - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +struct shm_ref { + volatile ssize_t index; /* within the object table */ + volatile ssize_t offset; /* within the object */ +}; + +#define DECLARE_SHMP(type, name) \ + union { \ + struct shm_ref _ref; \ + type *_type; \ + } name + +#endif /* _LIBRINGBUFFER_SHM_INTERNAL_H */ diff --git a/libringbuffer/shm_types.h b/libringbuffer/shm_types.h new file mode 100644 index 00000000..e92c0af0 --- /dev/null +++ b/libringbuffer/shm_types.h @@ -0,0 +1,43 @@ +#ifndef _LIBRINGBUFFER_SHM_TYPES_H +#define _LIBRINGBUFFER_SHM_TYPES_H + +/* + * libringbuffer/shm_types.h + * + * Copyright 2011 (c) - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include "shm_internal.h" + +struct channel; + +struct shm_object { + size_t index; /* within the object table */ + int shm_fd; /* shm fd */ + int wait_fd[2]; /* fd for wait/wakeup */ + char *memory_map; + size_t memory_map_size; + size_t allocated_len; +}; + +struct shm_object_table { + size_t size; + size_t allocated_len; + struct shm_object objects[]; +}; + +struct shm_handle { + struct shm_object_table *table; + DECLARE_SHMP(struct channel, chan); + /* + * In the consumer, chan points to a shadow copy, validated upon + * reception. The chan object is overridden in the consumer to + * point to this shadow copy. + */ + struct channel *shadow_chan; +}; + +#endif /* _LIBRINGBUFFER_SHM_TYPES_H */ diff --git a/libust/ltt-events.c b/libust/ltt-events.c index ece959b3..8f9b7ed2 100644 --- a/libust/ltt-events.c +++ b/libust/ltt-events.c @@ -221,10 +221,9 @@ struct ltt_channel *ltt_channel_create(struct ltt_session *session, * headers. Therefore the "chan" information used as input * should be already accessible. */ - chan->handle = transport->ops.channel_create("[lttng]", chan, buf_addr, + transport->ops.channel_create("[lttng]", chan, buf_addr, subbuf_size, num_subbuf, switch_timer_interval, read_timer_interval); - chan->chan = shmp(chan->handle->header->chan); if (!chan->chan) goto create_error; chan->enabled = 1; @@ -248,7 +247,7 @@ active: static void _ltt_channel_destroy(struct ltt_channel *chan) { - chan->ops->channel_destroy(chan->handle); + chan->ops->channel_destroy(chan); cds_list_del(&chan->list); lttng_destroy_context(chan->ctx); free(chan); @@ -387,10 +386,10 @@ int lttng_metadata_printf(struct ltt_session *session, for (pos = 0; pos < len; pos += reserve_len) { reserve_len = min_t(size_t, - chan->ops->packet_avail_size(chan->chan), + chan->ops->packet_avail_size(chan->chan, chan->handle), len - pos); lib_ring_buffer_ctx_init(&ctx, chan->chan, NULL, reserve_len, - sizeof(char), -1); + sizeof(char), -1, chan->handle); /* * We don't care about metadata buffer's records lost * count, because we always retry here. Report error if diff --git a/libust/ltt-ring-buffer-client.h b/libust/ltt-ring-buffer-client.h index dce34aa2..d71d3861 100644 --- a/libust/ltt-ring-buffer-client.h +++ b/libust/ltt-ring-buffer-client.h @@ -293,13 +293,15 @@ static size_t client_packet_header_size(void) } static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx) + unsigned int subbuf_idx, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); struct packet_header *header = (struct packet_header *) lib_ring_buffer_offset_address(&buf->backend, - subbuf_idx * chan->backend.subbuf_size); + subbuf_idx * chan->backend.subbuf_size, + handle); struct ltt_channel *ltt_chan = channel_get_private(chan); struct ltt_session *session = ltt_chan->session; @@ -319,13 +321,15 @@ static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc, * subbuffer. data_size is between 1 and subbuf_size. */ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size) + unsigned int subbuf_idx, unsigned long data_size, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); struct packet_header *header = (struct packet_header *) lib_ring_buffer_offset_address(&buf->backend, - subbuf_idx * chan->backend.subbuf_size); + subbuf_idx * chan->backend.subbuf_size, + handle); unsigned long records_lost = 0; header->ctx.timestamp_end = tsc; @@ -338,12 +342,12 @@ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc, } static int client_buffer_create(struct lib_ring_buffer *buf, void *priv, - int cpu, const char *name) + int cpu, const char *name, struct shm_handle *handle) { return 0; } -static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu) +static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu, struct shm_handle *handle) { } @@ -368,41 +372,45 @@ static const struct lib_ring_buffer_config client_config = { }; static -struct shm_handle *_channel_create(const char *name, +struct ltt_channel *_channel_create(const char *name, struct ltt_channel *ltt_chan, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval) { - return channel_create(&client_config, name, ltt_chan, buf_addr, + ltt_chan->handle = channel_create(&client_config, name, ltt_chan, buf_addr, subbuf_size, num_subbuf, switch_timer_interval, read_timer_interval); + ltt_chan->chan = shmp(handle, handle->chan); + return ltt_chan; } static -void ltt_channel_destroy(struct shm_handle *handle) +void ltt_channel_destroy(struct ltt_channel *ltt_chan) { - channel_destroy(handle); + channel_destroy(ltt_chan->chan, ltt_chan->handle); } static -struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan) +struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan, + struct shm_handle *handle) { struct lib_ring_buffer *buf; int cpu; for_each_channel_cpu(cpu, chan) { - buf = channel_get_ring_buffer(&client_config, chan, cpu); - if (!lib_ring_buffer_open_read(buf)) + buf = channel_get_ring_buffer(&client_config, chan, cpu, handle); + if (!lib_ring_buffer_open_read(buf, handle)) return buf; } return NULL; } static -void ltt_buffer_read_close(struct lib_ring_buffer *buf) +void ltt_buffer_read_close(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - lib_ring_buffer_release_read(buf); + lib_ring_buffer_release_read(buf, handle); } static diff --git a/libust/ltt-ring-buffer-metadata-client.h b/libust/ltt-ring-buffer-metadata-client.h index be9e4fb6..88716a26 100644 --- a/libust/ltt-ring-buffer-metadata-client.h +++ b/libust/ltt-ring-buffer-metadata-client.h @@ -76,13 +76,15 @@ static size_t client_packet_header_size(void) } static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx) + unsigned int subbuf_idx, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); struct metadata_packet_header *header = (struct metadata_packet_header *) lib_ring_buffer_offset_address(&buf->backend, - subbuf_idx * chan->backend.subbuf_size); + subbuf_idx * chan->backend.subbuf_size, + handle); struct ltt_channel *ltt_chan = channel_get_private(chan); struct ltt_session *session = ltt_chan->session; @@ -101,13 +103,15 @@ static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc, * subbuffer. data_size is between 1 and subbuf_size. */ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc, - unsigned int subbuf_idx, unsigned long data_size) + unsigned int subbuf_idx, unsigned long data_size, + struct shm_handle *handle) { - struct channel *chan = shmp(buf->backend.chan); + struct channel *chan = shmp(handle, buf->backend.chan); struct metadata_packet_header *header = (struct metadata_packet_header *) lib_ring_buffer_offset_address(&buf->backend, - subbuf_idx * chan->backend.subbuf_size); + subbuf_idx * chan->backend.subbuf_size, + handle); unsigned long records_lost = 0; header->content_size = data_size * CHAR_BIT; /* in bits */ @@ -119,12 +123,15 @@ static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc, } static int client_buffer_create(struct lib_ring_buffer *buf, void *priv, - int cpu, const char *name) + int cpu, const char *name, + struct shm_handle *handle) { return 0; } -static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu) +static void client_buffer_finalize(struct lib_ring_buffer *buf, + void *priv, int cpu, + struct shm_handle *handle) { } @@ -149,38 +156,42 @@ static const struct lib_ring_buffer_config client_config = { }; static -struct shm_handle *_channel_create(const char *name, +struct ltt_channel *_channel_create(const char *name, struct ltt_channel *ltt_chan, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval) { - return channel_create(&client_config, name, ltt_chan, buf_addr, + ltt_chan->handle = channel_create(&client_config, name, ltt_chan, buf_addr, subbuf_size, num_subbuf, switch_timer_interval, read_timer_interval); + ltt_chan->chan = shmp(handle, handle->chan); + return ltt_chan; } static -void ltt_channel_destroy(struct shm_handle *handle) +void ltt_channel_destroy(struct ltt_channel *ltt_chan) { - channel_destroy(handle); + channel_destroy(ltt_chan->chan, ltt_chan->handle); } static -struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan) +struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan, + struct shm_handle *handle) { struct lib_ring_buffer *buf; - buf = channel_get_ring_buffer(&client_config, chan, 0); - if (!lib_ring_buffer_open_read(buf)) + buf = channel_get_ring_buffer(&client_config, chan, 0, handle); + if (!lib_ring_buffer_open_read(buf, handle)) return buf; return NULL; } static -void ltt_buffer_read_close(struct lib_ring_buffer *buf) +void ltt_buffer_read_close(struct lib_ring_buffer *buf, + struct shm_handle *handle) { - lib_ring_buffer_release_read(buf); + lib_ring_buffer_release_read(buf, handle); } static @@ -203,13 +214,13 @@ void ltt_event_write(struct lib_ring_buffer_ctx *ctx, const void *src, } static -size_t ltt_packet_avail_size(struct channel *chan) +size_t ltt_packet_avail_size(struct channel *chan, struct shm_handle *handle) { unsigned long o_begin; struct lib_ring_buffer *buf; - buf = shmp(chan->backend.buf); /* Only for global buffer ! */ + buf = shmp(handle, chan->backend.buf[0].shmp); /* Only for global buffer ! */ o_begin = v_read(&client_config, &buf->offset); if (subbuf_offset(o_begin, chan) != 0) { return chan->backend.subbuf_size - subbuf_offset(o_begin, chan); -- 2.34.1