.. for multi-process accesses to the same maps, with safe "pointers".
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
};
struct channel;
+struct shm_handle;
struct ltt_channel_ops {
- struct shm_handle *(*channel_create)(const char *name,
+ struct ltt_channel *(*channel_create)(const char *name,
struct ltt_channel *ltt_chan,
void *buf_addr,
size_t subbuf_size, size_t num_subbuf,
unsigned int switch_timer_interval,
unsigned int read_timer_interval);
- void (*channel_destroy)(struct shm_handle *handle);
- struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan);
- void (*buffer_read_close)(struct lib_ring_buffer *buf);
+ void (*channel_destroy)(struct ltt_channel *ltt_chan);
+ struct lib_ring_buffer *(*buffer_read_open)(struct channel *chan,
+ struct shm_handle *handle);
+ void (*buffer_read_close)(struct lib_ring_buffer *buf,
+ struct shm_handle *handle);
int (*event_reserve)(struct lib_ring_buffer_ctx *ctx,
uint32_t event_id);
void (*event_commit)(struct lib_ring_buffer_ctx *ctx);
* packet. Note that the size returned is only a hint, since it
* may change due to concurrent writes.
*/
- size_t (*packet_avail_size)(struct channel *chan);
+ size_t (*packet_avail_size)(struct channel *chan,
+ struct shm_handle *handle);
//wait_queue_head_t *(*get_reader_wait_queue)(struct channel *chan);
//wait_queue_head_t *(*get_hp_wait_queue)(struct channel *chan);
int (*is_finalized)(struct channel *chan);
__event_len = __event_get_size__##_name(__dynamic_len, _args); \
__event_align = __event_get_align__##_name(_args); \
lib_ring_buffer_ctx_init(&ctx, __chan->chan, __event, __event_len, \
- __event_align, -1); \
+ __event_align, -1, __chan->handle); \
__ret = __chan->ops->event_reserve(&ctx, __event->id); \
if (__ret < 0) \
return; \
__event_len = __event_get_size__##_name(__dynamic_len); \
__event_align = __event_get_align__##_name(); \
lib_ring_buffer_ctx_init(&ctx, __chan->chan, __event, __event_len, \
- __event_align, -1); \
+ __event_align, -1, __chan->handle); \
__ret = __chan->ops->event_reserve(&ctx, __event->id); \
if (__ret < 0) \
return; \
struct channel;
struct lib_ring_buffer_config;
struct lib_ring_buffer_ctx;
+struct shm_handle *handle;
/*
* Ring buffer client callbacks. Only used by slow path, never on fast path.
/* Slow path only, at subbuffer switch */
size_t (*subbuffer_header_size) (void);
void (*buffer_begin) (struct lib_ring_buffer *buf, u64 tsc,
- unsigned int subbuf_idx);
+ unsigned int subbuf_idx,
+ struct shm_handle *handle);
void (*buffer_end) (struct lib_ring_buffer *buf, u64 tsc,
- unsigned int subbuf_idx, unsigned long data_size);
+ unsigned int subbuf_idx, unsigned long data_size,
+ struct shm_handle *handle);
/* Optional callbacks (can be set to NULL) */
/* Called at buffer creation/finalize */
int (*buffer_create) (struct lib_ring_buffer *buf, void *priv,
- int cpu, const char *name);
+ int cpu, const char *name,
+ struct shm_handle *handle);
/*
* Clients should guarantee that no new reader handle can be opened
* after finalize.
*/
- void (*buffer_finalize) (struct lib_ring_buffer *buf, void *priv, int cpu);
+ void (*buffer_finalize) (struct lib_ring_buffer *buf,
+ void *priv, int cpu,
+ struct shm_handle *handle);
/*
* Extract header length, payload length and timestamp from event
void (*record_get) (const struct lib_ring_buffer_config *config,
struct channel *chan, struct lib_ring_buffer *buf,
size_t offset, size_t *header_len,
- size_t *payload_len, u64 *timestamp);
+ size_t *payload_len, u64 *timestamp,
+ struct shm_handle *handle);
};
/*
/* input received by lib_ring_buffer_reserve(), saved here. */
struct channel *chan; /* channel */
void *priv; /* client private data */
+ struct shm_handle *handle; /* shared-memory handle */
size_t data_size; /* size of payload */
int largest_align; /*
* alignment of the largest element
void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx,
struct channel *chan, void *priv,
size_t data_size, int largest_align,
- int cpu)
+ int cpu, struct shm_handle *handle)
{
ctx->chan = chan;
ctx->priv = priv;
ctx->largest_align = largest_align;
ctx->cpu = cpu;
ctx->rflags = 0;
+ ctx->handle = handle;
}
/*
smp.h
libringbuffer_la_SOURCES = \
+ shm.c \
smp.c \
ring_buffer_backend.c \
ring_buffer_frontend.c
/* Ring buffer backend access (read/write) */
extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb,
- size_t offset, void *dest, size_t len);
+ size_t offset, void *dest, size_t len,
+ struct shm_handle *handle);
extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
- size_t offset, void *dest, size_t len);
+ size_t offset, void *dest, size_t len,
+ struct shm_handle *handle);
/*
* Return the address where a given offset is located.
*/
extern void *
lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
- size_t offset);
+ size_t offset,
+ struct shm_handle *handle);
extern void *
lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
- size_t offset);
+ size_t offset,
+ struct shm_handle *handle);
/**
* lib_ring_buffer_write - write data to a buffer backend
{
struct lib_ring_buffer_backend *bufb = &ctx->buf->backend;
struct channel_backend *chanb = &ctx->chan->backend;
+ struct shm_handle *handle = ctx->handle;
size_t sbidx;
size_t offset = ctx->buf_offset;
- struct lib_ring_buffer_backend_pages *rpages;
+ struct lib_ring_buffer_backend_pages_shmp *rpages;
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
- id = shmp(bufb->buf_wsb)[sbidx].id;
+ id = shmp(handle, bufb->buf_wsb)[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
- rpages = shmp(bufb->array)[sb_bindex];
+ rpages = &shmp(handle, bufb->array)[sb_bindex];
CHAN_WARN_ON(ctx->chan,
config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
*/
CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
lib_ring_buffer_do_copy(config,
- shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)),
+ shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)),
src, len);
ctx->buf_offset += len;
}
static inline
unsigned long lib_ring_buffer_get_records_unread(
const struct lib_ring_buffer_config *config,
- struct lib_ring_buffer *buf)
+ struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
struct lib_ring_buffer_backend *bufb = &buf->backend;
- struct lib_ring_buffer_backend_pages *pages;
+ struct lib_ring_buffer_backend_pages_shmp *pages;
unsigned long records_unread = 0, sb_bindex, id;
unsigned int i;
- for (i = 0; i < shmp(bufb->chan)->backend.num_subbuf; i++) {
- id = shmp(bufb->buf_wsb)[i].id;
+ for (i = 0; i < shmp(handle, bufb->chan)->backend.num_subbuf; i++) {
+ id = shmp(handle, bufb->buf_wsb)[i].id;
sb_bindex = subbuffer_id_get_index(config, id);
- pages = shmp(bufb->array)[sb_bindex];
- records_unread += v_read(config, &pages->records_unread);
+ pages = &shmp(handle, bufb->array)[sb_bindex];
+ records_unread += v_read(config, &shmp(handle, pages->shmp)->records_unread);
}
if (config->mode == RING_BUFFER_OVERWRITE) {
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
- pages = shmp(bufb->array)[sb_bindex];
- records_unread += v_read(config, &pages->records_unread);
+ pages = &shmp(handle, bufb->array)[sb_bindex];
+ records_unread += v_read(config, &shmp(handle, pages->shmp)->records_unread);
}
return records_unread;
}
int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
struct channel_backend *chan, int cpu,
- struct shm_header *shm_header);
+ struct shm_handle *handle,
+ struct shm_object *shmobj);
void channel_backend_unregister_notifiers(struct channel_backend *chanb);
void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb);
int channel_backend_init(struct channel_backend *chanb,
const char *name,
const struct lib_ring_buffer_config *config,
void *priv, size_t subbuf_size,
- size_t num_subbuf, struct shm_header *shm_header);
-void channel_backend_free(struct channel_backend *chanb);
+ size_t num_subbuf, struct shm_handle *handle);
+void channel_backend_free(struct channel_backend *chanb,
+ struct shm_handle *handle);
-void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb);
+void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb,
+ struct shm_handle *handle);
void channel_backend_reset(struct channel_backend *chanb);
int lib_ring_buffer_backend_init(void);
static inline
void subbuffer_count_record(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
- unsigned long idx)
+ unsigned long idx, struct shm_handle *handle)
{
unsigned long sb_bindex;
- sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
- v_inc(config, &shmp(bufb->array)[sb_bindex]->records_commit);
+ sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+ v_inc(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_commit);
}
/*
*/
static inline
void subbuffer_consume_record(const struct lib_ring_buffer_config *config,
- struct lib_ring_buffer_backend *bufb)
+ struct lib_ring_buffer_backend *bufb,
+ struct shm_handle *handle)
{
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
- CHAN_WARN_ON(bufb->chan,
- !v_read(config, &shmp(bufb->array)[sb_bindex]->records_unread));
+ CHAN_WARN_ON(shmp(handle, bufb->chan),
+ !v_read(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_unread));
/* Non-atomic decrement protected by exclusive subbuffer access */
- _v_dec(config, &shmp(bufb->array)[sb_bindex]->records_unread);
+ _v_dec(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_unread);
v_inc(config, &bufb->records_read);
}
unsigned long subbuffer_get_records_count(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
unsigned long sb_bindex;
- sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
- return v_read(config, &shmp(bufb->array)[sb_bindex]->records_commit);
+ sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+ return v_read(config, &shmp(handle, (shmp(handle, bufb->array)[sb_bindex]).shmp)->records_commit);
}
/*
unsigned long subbuffer_count_records_overrun(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
- struct lib_ring_buffer_backend_pages *pages;
+ struct lib_ring_buffer_backend_pages_shmp *pages;
unsigned long overruns, sb_bindex;
- sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
- pages = shmp(bufb->array)[sb_bindex];
- overruns = v_read(config, &pages->records_unread);
- v_set(config, &pages->records_unread,
- v_read(config, &pages->records_commit));
- v_set(config, &pages->records_commit, 0);
+ sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+ pages = &shmp(handle, bufb->array)[sb_bindex];
+ overruns = v_read(config, &shmp(handle, pages->shmp)->records_unread);
+ v_set(config, &shmp(handle, pages->shmp)->records_unread,
+ v_read(config, &shmp(handle, pages->shmp)->records_commit));
+ v_set(config, &shmp(handle, pages->shmp)->records_commit, 0);
return overruns;
}
void subbuffer_set_data_size(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
unsigned long idx,
- unsigned long data_size)
+ unsigned long data_size,
+ struct shm_handle *handle)
{
- struct lib_ring_buffer_backend_pages *pages;
+ struct lib_ring_buffer_backend_pages_shmp *pages;
unsigned long sb_bindex;
- sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
- pages = shmp(bufb->array)[sb_bindex];
- pages->data_size = data_size;
+ sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+ pages = &shmp(handle, bufb->array)[sb_bindex];
+ shmp(handle, pages->shmp)->data_size = data_size;
}
static inline
unsigned long subbuffer_get_read_data_size(
const struct lib_ring_buffer_config *config,
- struct lib_ring_buffer_backend *bufb)
+ struct lib_ring_buffer_backend *bufb,
+ struct shm_handle *handle)
{
- struct lib_ring_buffer_backend_pages *pages;
+ struct lib_ring_buffer_backend_pages_shmp *pages;
unsigned long sb_bindex;
sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
- pages = shmp(bufb->array)[sb_bindex];
- return pages->data_size;
+ pages = &shmp(handle, bufb->array)[sb_bindex];
+ return shmp(handle, pages->shmp)->data_size;
}
static inline
unsigned long subbuffer_get_data_size(
const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
- struct lib_ring_buffer_backend_pages *pages;
+ struct lib_ring_buffer_backend_pages_shmp *pages;
unsigned long sb_bindex;
- sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id);
- pages = shmp(bufb->array)[sb_bindex];
- return pages->data_size;
+ sb_bindex = subbuffer_id_get_index(config, shmp(handle, bufb->buf_wsb)[idx].id);
+ pages = &shmp(handle, bufb->array)[sb_bindex];
+ return shmp(handle, pages->shmp)->data_size;
}
/**
static inline
void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
unsigned long id, new_id;
* Performing a volatile access to read the sb_pages, because we want to
* read a coherent version of the pointer and the associated noref flag.
*/
- id = CMM_ACCESS_ONCE(shmp(bufb->buf_wsb)[idx].id);
+ id = CMM_ACCESS_ONCE(shmp(handle, bufb->buf_wsb)[idx].id);
for (;;) {
/* This check is called on the fast path for each record. */
if (likely(!subbuffer_id_is_noref(config, id))) {
}
new_id = id;
subbuffer_id_clear_noref(config, &new_id);
- new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[idx].id, id, new_id);
+ new_id = uatomic_cmpxchg(&shmp(handle, bufb->buf_wsb)[idx].id, id, new_id);
if (likely(new_id == id))
break;
id = new_id;
static inline
void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer_backend *bufb,
- unsigned long idx, unsigned long offset)
+ unsigned long idx, unsigned long offset,
+ struct shm_handle *handle)
{
if (config->mode != RING_BUFFER_OVERWRITE)
return;
* subbuffer_set_noref() uses a volatile store to deal with concurrent
* readers of the noref flag.
*/
- CHAN_WARN_ON(bufb->chan,
- subbuffer_id_is_noref(config, shmp(bufb->buf_wsb)[idx].id));
+ CHAN_WARN_ON(shmp(handle, bufb->chan),
+ subbuffer_id_is_noref(config, shmp(handle, bufb->buf_wsb)[idx].id));
/*
* Memory barrier that ensures counter stores are ordered before set
* noref and offset.
*/
cmm_smp_mb();
- subbuffer_id_set_noref_offset(config, &shmp(bufb->buf_wsb)[idx].id, offset);
+ subbuffer_id_set_noref_offset(config, &shmp(handle, bufb->buf_wsb)[idx].id, offset);
}
/**
struct lib_ring_buffer_backend *bufb,
struct channel_backend *chanb,
unsigned long consumed_idx,
- unsigned long consumed_count)
+ unsigned long consumed_count,
+ struct shm_handle *handle)
{
unsigned long old_id, new_id;
* old_wpage, because the value read will be confirmed by the
* following cmpxchg().
*/
- old_id = shmp(bufb->buf_wsb)[consumed_idx].id;
+ old_id = shmp(handle, bufb->buf_wsb)[consumed_idx].id;
if (unlikely(!subbuffer_id_is_noref(config, old_id)))
return -EAGAIN;
/*
if (unlikely(!subbuffer_id_compare_offset(config, old_id,
consumed_count)))
return -EAGAIN;
- CHAN_WARN_ON(bufb->chan,
+ CHAN_WARN_ON(shmp(handle, bufb->chan),
!subbuffer_id_is_noref(config, bufb->buf_rsb.id));
subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id,
consumed_count);
- new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[consumed_idx].id, old_id,
+ new_id = uatomic_cmpxchg(&shmp(handle, bufb->buf_wsb)[consumed_idx].id, old_id,
bufb->buf_rsb.id);
if (unlikely(old_id != new_id))
return -EAGAIN;
bufb->buf_rsb.id = new_id;
} else {
/* No page exchange, use the writer page directly */
- bufb->buf_rsb.id = shmp(bufb->buf_wsb)[consumed_idx].id;
+ bufb->buf_rsb.id = shmp(handle, bufb->buf_wsb)[consumed_idx].id;
}
return 0;
}
* Dual LGPL v2.1/GPL v2 license.
*/
-#include "shm.h"
+#include "shm_internal.h"
struct lib_ring_buffer_backend_pages {
unsigned long mmap_offset; /* offset of the subbuffer in mmap */
struct channel;
struct lib_ring_buffer;
+struct lib_ring_buffer_backend_pages_shmp {
+ DECLARE_SHMP(struct lib_ring_buffer_backend_pages, shmp);
+};
+
struct lib_ring_buffer_backend {
/* Array of ring_buffer_backend_subbuffer for writer */
DECLARE_SHMP(struct lib_ring_buffer_backend_subbuffer, buf_wsb);
* Pointer array of backend pages, for whole buffer.
* Indexed by ring_buffer_backend_subbuffer identifier (id) index.
*/
- DECLARE_SHMP(struct lib_ring_buffer_backend_pages *, array);
+ DECLARE_SHMP(struct lib_ring_buffer_backend_pages_shmp, array);
DECLARE_SHMP(char, memory_map); /* memory mapping */
DECLARE_SHMP(struct channel, chan); /* Associated channel */
unsigned int allocated:1; /* Bool: is buffer allocated ? */
};
+struct lib_ring_buffer_shmp {
+ DECLARE_SHMP(struct lib_ring_buffer, shmp); /* Channel per-cpu buffers */
+};
+
struct channel_backend {
unsigned long buf_size; /* Size of the buffer */
unsigned long subbuf_size; /* Sub-buffer size */
*/
unsigned int buf_size_order; /* Order of buffer size */
int extra_reader_sb:1; /* Bool: has extra reader subbuffer */
- DECLARE_SHMP(struct lib_ring_buffer, buf); /* Channel per-cpu buffers */
unsigned long num_subbuf; /* Number of sub-buffers for writer */
u64 start_tsc; /* Channel creation TSC value */
void *priv; /* Client-specific information */
const struct lib_ring_buffer_config *config; /* Ring buffer configuration */
char name[NAME_MAX]; /* Channel name */
+ struct lib_ring_buffer_shmp buf[];
};
#endif /* _LINUX_RING_BUFFER_BACKEND_TYPES_H */
* channel.
*/
extern
-void *channel_destroy(struct shm_handle *handle);
+void *channel_destroy(struct channel *chan, struct shm_handle *handle);
/* Buffer read operations */
extern struct lib_ring_buffer *channel_get_ring_buffer(
const struct lib_ring_buffer_config *config,
- struct channel *chan, int cpu);
-extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf);
-extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf);
+ struct channel *chan, int cpu,
+ struct shm_handle *handle);
+extern int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
+ struct shm_handle *handle);
+extern void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
+ struct shm_handle *handle);
/*
* Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
*/
extern int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
unsigned long *consumed,
- unsigned long *produced);
+ unsigned long *produced,
+ struct shm_handle *handle);
extern void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
- unsigned long consumed_new);
+ unsigned long consumed_new,
+ struct shm_handle *handle);
extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
- unsigned long consumed);
-extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
+ unsigned long consumed,
+ struct shm_handle *handle);
+extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf,
+ struct shm_handle *handle);
/*
* lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
* to read sub-buffers sequentially.
*/
-static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf)
+static inline int lib_ring_buffer_get_next_subbuf(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
int ret;
ret = lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
- &buf->prod_snapshot);
+ &buf->prod_snapshot, handle);
if (ret)
return ret;
- ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot);
+ ret = lib_ring_buffer_get_subbuf(buf, buf->cons_snapshot, handle);
return ret;
}
-static inline void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf)
+static inline
+void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- lib_ring_buffer_put_subbuf(buf);
+ lib_ring_buffer_put_subbuf(buf, handle);
lib_ring_buffer_move_consumer(buf, subbuf_align(buf->cons_snapshot,
- shmp(buf->backend.chan)));
+ shmp(handle, buf->backend.chan)), handle);
}
extern void channel_reset(struct channel *chan);
-extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf);
+extern void lib_ring_buffer_reset(struct lib_ring_buffer *buf,
+ struct shm_handle *handle);
static inline
unsigned long lib_ring_buffer_get_offset(const struct lib_ring_buffer_config *config,
static inline
unsigned long lib_ring_buffer_get_read_data_size(
const struct lib_ring_buffer_config *config,
- struct lib_ring_buffer *buf)
+ struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- return subbuffer_get_read_data_size(config, &buf->backend);
+ return subbuffer_get_read_data_size(config, &buf->backend, handle);
}
static inline
struct lib_ring_buffer_ctx *ctx)
{
struct channel *chan = ctx->chan;
+ struct shm_handle *handle = ctx->handle;
struct lib_ring_buffer *buf;
unsigned long o_begin, o_end, o_old;
size_t before_hdr_pad = 0;
return -EAGAIN;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- buf = &shmp(chan->backend.buf)[ctx->cpu];
+ buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
else
- buf = shmp(chan->backend.buf);
+ buf = shmp(handle, chan->backend.buf[0].shmp);
if (uatomic_read(&buf->record_disabled))
return -EAGAIN;
ctx->buf = buf;
* Clear noref flag for this subbuffer.
*/
lib_ring_buffer_clear_noref(config, &ctx->buf->backend,
- subbuf_index(o_end - 1, chan));
+ subbuf_index(o_end - 1, chan), handle);
ctx->pre_offset = o_begin;
ctx->buf_offset = o_begin + before_hdr_pad;
*/
static inline
void lib_ring_buffer_switch(const struct lib_ring_buffer_config *config,
- struct lib_ring_buffer *buf, enum switch_mode mode)
+ struct lib_ring_buffer *buf, enum switch_mode mode,
+ struct shm_handle *handle)
{
- lib_ring_buffer_switch_slow(buf, mode);
+ lib_ring_buffer_switch_slow(buf, mode, handle);
}
/* See ring_buffer_frontend_api.h for lib_ring_buffer_reserve(). */
const struct lib_ring_buffer_ctx *ctx)
{
struct channel *chan = ctx->chan;
+ struct shm_handle *handle = ctx->handle;
struct lib_ring_buffer *buf = ctx->buf;
unsigned long offset_end = ctx->buf_offset;
unsigned long endidx = subbuf_index(offset_end - 1, chan);
/*
* Must count record before incrementing the commit count.
*/
- subbuffer_count_record(config, &buf->backend, endidx);
+ subbuffer_count_record(config, &buf->backend, endidx, handle);
/*
* Order all writes to buffer before the commit count update that will
*/
cmm_smp_wmb();
- v_add(config, ctx->slot_size, &shmp(buf->commit_hot)[endidx].cc);
+ v_add(config, ctx->slot_size, &shmp(handle, buf->commit_hot)[endidx].cc);
/*
* commit count read can race with concurrent OOO commit count updates.
* count reaches back the reserve offset for a specific sub-buffer,
* which is completely independent of the order.
*/
- commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[endidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
- commit_count, endidx);
+ commit_count, endidx, handle);
/*
* Update used size at each commit. It's needed only for extracting
* ring_buffer buffers from vmcore, after crash.
*/
lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
ctx->buf_offset, commit_count,
- ctx->slot_size);
+ ctx->slot_size, handle);
}
/**
extern
void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf,
- enum switch_mode mode);
+ enum switch_mode mode,
+ struct shm_handle *handle);
/* Buffer write helpers */
void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
unsigned long commit_count,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
if (config->oops == RING_BUFFER_OOPS_CONSISTENCY)
- v_set(config, &shmp(buf->commit_hot)[idx].seq, commit_count);
+ v_set(config, &shmp(handle, buf->commit_hot)[idx].seq, commit_count);
}
static inline
int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
- struct channel *chan)
+ struct channel *chan,
+ struct shm_handle *handle)
{
unsigned long consumed_old, consumed_idx, commit_count, write_offset;
consumed_old = uatomic_read(&buf->consumed);
consumed_idx = subbuf_index(consumed_old, chan);
- commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
+ commit_count = v_read(config, &shmp(handle, buf->commit_cold)[consumed_idx].cc_sb);
/*
* No memory barrier here, since we are only interested
* in a statistically correct polling result. The next poll will
static inline
unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
- return subbuffer_get_data_size(config, &buf->backend, idx);
+ return subbuffer_get_data_size(config, &buf->backend, idx, handle);
}
/*
static inline
int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf,
- struct channel *chan)
+ struct channel *chan,
+ struct shm_handle *handle)
{
unsigned long offset, idx, commit_count;
do {
offset = v_read(config, &buf->offset);
idx = subbuf_index(offset, chan);
- commit_count = v_read(config, &shmp(buf->commit_hot)[idx].cc);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[idx].cc);
} while (offset != v_read(config, &buf->offset));
return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
struct channel *chan,
unsigned long offset,
unsigned long commit_count,
- unsigned long idx)
+ unsigned long idx,
+ struct shm_handle *handle)
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
* The subbuffer size is least 2 bytes (minimum size: 1 page).
* This guarantees that old_commit_count + 1 != commit_count.
*/
- if (likely(v_cmpxchg(config, &shmp(buf->commit_cold)[idx].cc_sb,
+ if (likely(v_cmpxchg(config, &shmp(handle, buf->commit_cold)[idx].cc_sb,
old_commit_count, old_commit_count + 1)
== old_commit_count)) {
/*
tsc = config->cb.ring_buffer_clock_read(chan);
v_add(config,
subbuffer_get_records_count(config,
- &buf->backend, idx),
+ &buf->backend,
+ idx, handle),
&buf->records_count);
v_add(config,
subbuffer_count_records_overrun(config,
&buf->backend,
- idx),
+ idx, handle),
&buf->records_overrun);
config->cb.buffer_end(buf, tsc, idx,
lib_ring_buffer_get_data_size(config,
buf,
- idx));
+ idx,
+ handle),
+ handle);
/*
* Set noref flag and offset for this subbuffer id.
* are ordered before set noref and offset.
*/
lib_ring_buffer_set_noref_offset(config, &buf->backend, idx,
- buf_trunc_val(offset, chan));
+ buf_trunc_val(offset, chan), handle);
/*
* Order set_noref and record counter updates before the
*/
cmm_smp_mb();
/* End of exclusive subbuffer access */
- v_set(config, &shmp(buf->commit_cold)[idx].cc_sb,
+ v_set(config, &shmp(handle, buf->commit_cold)[idx].cc_sb,
commit_count);
lib_ring_buffer_vmcore_check_deliver(config, buf,
- commit_count, idx);
+ commit_count, idx, handle);
/*
* RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
*/
if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
&& uatomic_read(&buf->active_readers)
- && lib_ring_buffer_poll_deliver(config, buf, chan)) {
+ && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
//wake_up_interruptible(&buf->read_wait);
//wake_up_interruptible(&chan->read_wait);
}
unsigned long idx,
unsigned long buf_offset,
unsigned long commit_count,
- size_t slot_size)
+ size_t slot_size,
+ struct shm_handle *handle)
{
unsigned long offset, commit_seq_old;
if (unlikely(subbuf_offset(offset - commit_count, chan)))
return;
- commit_seq_old = v_read(config, &shmp(buf->commit_hot)[idx].seq);
+ commit_seq_old = v_read(config, &shmp(handle, buf->commit_hot)[idx].seq);
while ((long) (commit_seq_old - commit_count) < 0)
- commit_seq_old = v_cmpxchg(config, &shmp(buf->commit_hot)[idx].seq,
+ commit_seq_old = v_cmpxchg(config, &shmp(handle, buf->commit_hot)[idx].seq,
commit_seq_old, commit_count);
}
extern int lib_ring_buffer_create(struct lib_ring_buffer *buf,
struct channel_backend *chanb, int cpu,
- struct shm_header *shm_header);
-extern void lib_ring_buffer_free(struct lib_ring_buffer *buf);
+ struct shm_handle *handle,
+ struct shm_object *shmobj);
+extern void lib_ring_buffer_free(struct lib_ring_buffer *buf,
+ struct shm_handle *handle);
/* Keep track of trap nesting inside ring buffer code */
extern __thread unsigned int lib_ring_buffer_nesting;
#include <ust/usterr-signal-safe.h>
#include <ust/ringbuffer-config.h>
#include "backend_types.h"
-#include "shm.h"
+#include "shm_internal.h"
/*
* A switch is done during tracing or as a final flush after tracing (so it
unsigned long read_timer_interval; /* Reader wakeup (jiffies) */
//wait_queue_head_t read_wait; /* reader wait queue */
int finalized; /* Has channel been finalized */
- DECLARE_SHMP(struct shm_header, shm_header);
} ____cacheline_aligned;
/* Per-subbuffer commit counters used on the hot path */
arg);
}
case RING_BUFFER_FLUSH:
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
return 0;
default:
return -ENOIOCTLCMD;
return put_ulong(read_offset, arg);
}
case RING_BUFFER_FLUSH:
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
return 0;
default:
return -ENOIOCTLCMD;
struct lib_ring_buffer_backend *bufb,
size_t size, size_t num_subbuf,
int extra_reader_sb,
- struct shm_header *shm_header)
+ struct shm_handle *handle,
+ struct shm_object *shmobj)
{
- struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+ struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
unsigned long subbuf_size, mmap_offset = 0;
unsigned long num_subbuf_alloc;
unsigned long i;
if (extra_reader_sb)
num_subbuf_alloc++;
- /* Align the entire buffer backend data on PAGE_SIZE */
- align_shm(shm_header, PAGE_SIZE);
- set_shmp(bufb->array, zalloc_shm(shm_header,
- sizeof(*bufb->array) * num_subbuf_alloc));
- if (unlikely(!shmp(bufb->array)))
+ align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_pages_shmp));
+ set_shmp(bufb->array, zalloc_shm(shmobj,
+ sizeof(struct lib_ring_buffer_backend_pages_shmp) * num_subbuf_alloc));
+ if (unlikely(!shmp(handle, bufb->array)))
goto array_error;
/*
* This is the largest element (the buffer pages) which needs to
* be aligned on PAGE_SIZE.
*/
- align_shm(shm_header, PAGE_SIZE);
- set_shmp(bufb->memory_map, zalloc_shm(shm_header,
+ align_shm(shmobj, PAGE_SIZE);
+ set_shmp(bufb->memory_map, zalloc_shm(shmobj,
subbuf_size * num_subbuf_alloc));
- if (unlikely(!shmp(bufb->memory_map)))
+ if (unlikely(!shmp(handle, bufb->memory_map)))
goto memory_map_error;
/* Allocate backend pages array elements */
for (i = 0; i < num_subbuf_alloc; i++) {
- align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_pages));
- set_shmp(bufb->array[i],
- zalloc_shm(shm_header,
+ align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_pages));
+ set_shmp(shmp(handle, bufb->array)[i].shmp,
+ zalloc_shm(shmobj,
sizeof(struct lib_ring_buffer_backend_pages)));
- if (!shmp(bufb->array[i]))
+ if (!shmp(handle, shmp(handle, bufb->array)[i].shmp))
goto free_array;
}
/* Allocate write-side subbuffer table */
- align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_subbuffer));
- bufb->buf_wsb = zalloc_shm(shm_header,
+ align_shm(shmobj, __alignof__(struct lib_ring_buffer_backend_subbuffer));
+ set_shmp(bufb->buf_wsb, zalloc_shm(shmobj,
sizeof(struct lib_ring_buffer_backend_subbuffer)
- * num_subbuf);
- if (unlikely(!shmp(bufb->buf_wsb)))
+ * num_subbuf));
+ if (unlikely(!shmp(handle, bufb->buf_wsb)))
goto free_array;
for (i = 0; i < num_subbuf; i++)
- shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
+ shmp(handle, bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
/* Assign read-side subbuffer table */
if (extra_reader_sb)
/* Assign pages to page index */
for (i = 0; i < num_subbuf_alloc; i++) {
- set_shmp(shmp(bufb->array)[i]->p,
- &shmp(bufb->memory_map)[i * subbuf_size]);
+ struct shm_ref ref;
+
+ ref.index = bufb->memory_map._ref.index;
+ ref.offset = bufb->memory_map._ref.offset;
+ ref.offset += i * subbuf_size;
+
+ set_shmp(shmp(handle, shmp(handle, bufb->array)[i].shmp)->p,
+ ref);
if (config->output == RING_BUFFER_MMAP) {
- shmp(bufb->array)[i]->mmap_offset = mmap_offset;
+ shmp(handle, shmp(handle, bufb->array)[i].shmp)->mmap_offset = mmap_offset;
mmap_offset += subbuf_size;
}
}
- /*
- * Align the end of each buffer backend data on PAGE_SIZE, to
- * behave like an array which contains elements that need to be
- * aligned on PAGE_SIZE.
- */
- align_shm(shm_header, PAGE_SIZE);
return 0;
int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
struct channel_backend *chanb, int cpu,
- struct shm_header *shm_header)
+ struct shm_handle *handle,
+ struct shm_object *shmobj)
{
const struct lib_ring_buffer_config *config = chanb->config;
- set_shmp(&bufb->chan, caa_container_of(chanb, struct channel, backend));
+ set_shmp(bufb->chan, handle->chan._ref);
bufb->cpu = cpu;
return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
chanb->num_subbuf,
chanb->extra_reader_sb,
- shm_header);
+ handle, shmobj);
}
void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
bufb->allocated = 0;
}
-void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
+void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb,
+ struct shm_handle *handle)
{
- struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+ struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
const struct lib_ring_buffer_config *config = chanb->config;
unsigned long num_subbuf_alloc;
unsigned int i;
num_subbuf_alloc++;
for (i = 0; i < chanb->num_subbuf; i++)
- shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
+ shmp(handle, bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i);
if (chanb->extra_reader_sb)
bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
num_subbuf_alloc - 1);
for (i = 0; i < num_subbuf_alloc; i++) {
/* Don't reset mmap_offset */
- v_set(config, &shmp(bufb->array)[i]->records_commit, 0);
- v_set(config, &shmp(bufb->array)[i]->records_unread, 0);
- shmp(bufb->array)[i]->data_size = 0;
+ v_set(config, &shmp(handle, shmp(handle, bufb->array)[i].shmp)->records_commit, 0);
+ v_set(config, &shmp(handle, shmp(handle, bufb->array)[i].shmp)->records_unread, 0);
+ shmp(handle, shmp(handle, bufb->array)[i].shmp)->data_size = 0;
/* Don't reset backend page and virt addresses */
}
/* Don't reset num_pages_per_subbuf, cpu, allocated */
* @parent: dentry of parent directory, %NULL for root directory
* @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
* @num_subbuf: number of sub-buffers (power of 2)
- * @shm_header: shared memory header
+ * @shm_handle: shared memory handle
*
* Returns channel pointer if successful, %NULL otherwise.
*
const char *name,
const struct lib_ring_buffer_config *config,
void *priv, size_t subbuf_size, size_t num_subbuf,
- struct shm_header *shm_header)
+ struct shm_handle *handle)
{
struct channel *chan = caa_container_of(chanb, struct channel, backend);
unsigned int i;
int ret;
+ size_t shmsize = 0, bufshmsize = 0, num_subbuf_alloc;
if (!name)
return -EPERM;
chanb->name[NAME_MAX - 1] = '\0';
chanb->config = config;
+ /* Per-cpu buffer size: control (prior to backend) */
+ shmsize = offset_align(shmsize, __alignof__(struct lib_ring_buffer));
+ shmsize += sizeof(struct lib_ring_buffer);
+
+ /* Per-cpu buffer size: backend */
+ /* num_subbuf + 1 is the worse case */
+ num_subbuf_alloc = num_subbuf + 1;
+ shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer_backend_pages_shmp));
+ shmsize += sizeof(struct lib_ring_buffer_backend_pages_shmp) * num_subbuf_alloc;
+ shmsize += offset_align(bufshmsize, PAGE_SIZE);
+ shmsize += subbuf_size * num_subbuf_alloc;
+ shmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
+ shmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
+ shmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
+ shmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
+ /* Per-cpu buffer size: control (after backend) */
+ shmsize += offset_align(shmsize, __alignof__(struct commit_counters_hot));
+ shmsize += sizeof(struct commit_counters_hot) * num_subbuf;
+ shmsize += offset_align(shmsize, __alignof__(struct commit_counters_cold));
+ shmsize += sizeof(struct commit_counters_cold) * num_subbuf;
+
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
struct lib_ring_buffer *buf;
- size_t alloc_size;
-
- /* Allocating the buffer per-cpu structures */
- align_shm(shm_header, __alignof__(struct lib_ring_buffer));
- alloc_size = sizeof(struct lib_ring_buffer);
- buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus());
- if (!buf)
- goto end;
- set_shmp(chanb->buf, buf);
-
/*
* We need to allocate for all possible cpus.
*/
for_each_possible_cpu(i) {
- ret = lib_ring_buffer_create(&shmp(chanb->buf)[i],
- chanb, i, shm_header);
+ struct shm_object *shmobj;
+
+ shmobj = shm_object_table_append(handle->table, shmsize);
+ align_shm(shmobj, __alignof__(struct lib_ring_buffer));
+ set_shmp(chanb->buf[i].shmp, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer)));
+ buf = shmp(handle, chanb->buf[i].shmp);
+ if (!buf)
+ goto end;
+ ret = lib_ring_buffer_create(buf, chanb, i,
+ handle, shmobj);
if (ret)
goto free_bufs; /* cpu hotplug locked */
}
} else {
+ struct shm_object *shmobj;
struct lib_ring_buffer *buf;
- size_t alloc_size;
- align_shm(shm_header, __alignof__(struct lib_ring_buffer));
- alloc_size = sizeof(struct lib_ring_buffer);
- buf = zalloc_shm(shm_header, alloc_size);
+ shmobj = shm_object_table_append(handle->table, shmsize);
+ align_shm(shmobj, __alignof__(struct lib_ring_buffer));
+ set_shmp(chanb->buf[0].shmp, zalloc_shm(shmobj, sizeof(struct lib_ring_buffer)));
+ buf = shmp(handle, chanb->buf[0].shmp);
if (!buf)
goto end;
- set_shmp(chanb->buf, buf);
- ret = lib_ring_buffer_create(shmp(chanb->buf), chanb, -1,
- shm_header);
+ ret = lib_ring_buffer_create(buf, chanb, -1,
+ handle, shmobj);
if (ret)
goto free_bufs;
}
free_bufs:
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
for_each_possible_cpu(i) {
- struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
+ struct lib_ring_buffer *buf = shmp(handle, chanb->buf[i].shmp);
if (!buf->backend.allocated)
continue;
- lib_ring_buffer_free(buf);
+ lib_ring_buffer_free(buf, handle);
}
}
/* We only free the buffer data upon shm teardown */
*
* Destroy all channel buffers and frees the channel.
*/
-void channel_backend_free(struct channel_backend *chanb)
+void channel_backend_free(struct channel_backend *chanb,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chanb->config;
unsigned int i;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
for_each_possible_cpu(i) {
- struct lib_ring_buffer *buf = &shmp(chanb->buf)[i];
+ struct lib_ring_buffer *buf = shmp(handle, chanb->buf[i].shmp);
if (!buf->backend.allocated)
continue;
- lib_ring_buffer_free(buf);
+ lib_ring_buffer_free(buf, handle);
}
} else {
- struct lib_ring_buffer *buf = shmp(chanb->buf);
+ struct lib_ring_buffer *buf = shmp(handle, chanb->buf[0].shmp);
CHAN_WARN_ON(chanb, !buf->backend.allocated);
- lib_ring_buffer_free(buf);
+ lib_ring_buffer_free(buf, handle);
}
/* We only free the buffer data upon shm teardown */
}
* Returns the length copied.
*/
size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
- void *dest, size_t len)
+ void *dest, size_t len, struct shm_handle *handle)
{
- struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+ struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
const struct lib_ring_buffer_config *config = chanb->config;
ssize_t orig_len;
- struct lib_ring_buffer_backend_pages *rpages;
+ struct lib_ring_buffer_backend_pages_shmp *rpages;
unsigned long sb_bindex, id;
orig_len = len;
return 0;
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
- rpages = shmp(bufb->array)[sb_bindex];
+ rpages = &shmp(handle, bufb->array)[sb_bindex];
/*
* Underlying layer should never ask for reads across
* subbuffers.
CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
- memcpy(dest, shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), len);
+ memcpy(dest, shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1)), len);
return orig_len;
}
* Should be protected by get_subbuf/put_subbuf.
*/
int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
- void *dest, size_t len)
+ void *dest, size_t len, struct shm_handle *handle)
{
- struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+ struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
const struct lib_ring_buffer_config *config = chanb->config;
ssize_t string_len, orig_offset;
char *str;
- struct lib_ring_buffer_backend_pages *rpages;
+ struct lib_ring_buffer_backend_pages_shmp *rpages;
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
orig_offset = offset;
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
- rpages = shmp(bufb->array)[sb_bindex];
+ rpages = &shmp(handle, bufb->array)[sb_bindex];
/*
* Underlying layer should never ask for reads across
* subbuffers.
CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
- str = (char *)shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
+ str = (char *)shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1));
string_len = strnlen(str, len);
if (dest && len) {
memcpy(dest, str, string_len);
* as long as the write is never bigger than a page size.
*/
void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
- size_t offset)
+ size_t offset,
+ struct shm_handle *handle)
{
- struct lib_ring_buffer_backend_pages *rpages;
- struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+ struct lib_ring_buffer_backend_pages_shmp *rpages;
+ struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
const struct lib_ring_buffer_config *config = chanb->config;
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
- rpages = shmp(bufb->array)[sb_bindex];
+ rpages = &shmp(handle, bufb->array)[sb_bindex];
CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
- return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
+ return shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1));
}
/**
* address, as long as the write is never bigger than a page size.
*/
void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
- size_t offset)
+ size_t offset,
+ struct shm_handle *handle)
{
size_t sbidx;
- struct lib_ring_buffer_backend_pages *rpages;
- struct channel_backend *chanb = &shmp(bufb->chan)->backend;
+ struct lib_ring_buffer_backend_pages_shmp *rpages;
+ struct channel_backend *chanb = &shmp(handle, bufb->chan)->backend;
const struct lib_ring_buffer_config *config = chanb->config;
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
- id = shmp(bufb->buf_wsb)[sbidx].id;
+ id = shmp(handle, bufb->buf_wsb)[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
- rpages = shmp(bufb->array)[sb_bindex];
+ rpages = &shmp(handle, bufb->array)[sb_bindex];
CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, id));
- return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1));
+ return shmp(handle, shmp(handle, rpages->shmp)->p) + (offset & ~(chanb->subbuf_size - 1));
}
static
void lib_ring_buffer_print_errors(struct channel *chan,
- struct lib_ring_buffer *buf, int cpu);
+ struct lib_ring_buffer *buf, int cpu,
+ struct shm_handle *handle);
/*
* Must be called under cpu hotplug protection.
*/
-void lib_ring_buffer_free(struct lib_ring_buffer *buf)
+void lib_ring_buffer_free(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
- lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
+ lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle);
/* buf->commit_hot will be freed by shm teardown */
/* buf->commit_cold will be freed by shm teardown */
* should not be using the iterator concurrently with reset. The previous
* current iterator record is reset.
*/
-void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
+void lib_ring_buffer_reset(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned int i;
*/
v_set(config, &buf->offset, 0);
for (i = 0; i < chan->backend.num_subbuf; i++) {
- v_set(config, &shmp(buf->commit_hot)[i].cc, 0);
- v_set(config, &shmp(buf->commit_hot)[i].seq, 0);
- v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0);
+ v_set(config, &shmp(handle, buf->commit_hot)[i].cc, 0);
+ v_set(config, &shmp(handle, buf->commit_hot)[i].seq, 0);
+ v_set(config, &shmp(handle, buf->commit_cold)[i].cc_sb, 0);
}
uatomic_set(&buf->consumed, 0);
uatomic_set(&buf->record_disabled, 0);
v_set(config, &buf->last_tsc, 0);
- lib_ring_buffer_backend_reset(&buf->backend);
+ lib_ring_buffer_backend_reset(&buf->backend, handle);
/* Don't reset number of active readers */
v_set(config, &buf->records_lost_full, 0);
v_set(config, &buf->records_lost_wrap, 0);
*/
int lib_ring_buffer_create(struct lib_ring_buffer *buf,
struct channel_backend *chanb, int cpu,
- struct shm_header *shm_header)
+ struct shm_handle *handle,
+ struct shm_object *shmobj)
{
const struct lib_ring_buffer_config *config = chanb->config;
struct channel *chan = caa_container_of(chanb, struct channel, backend);
return 0;
ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
- cpu, shm_header);
+ cpu, handle, shmobj);
if (ret)
return ret;
- align_shm(shm_header,
- max(__alignof__(struct commit_counters_hot),
- __alignof__(struct commit_counters_cold)));
- set_shmp(&buf->commit_hot,
- zalloc_shm(shm_header,
- sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
- if (!shmp(buf->commit_hot)) {
+ align_shm(shmobj, __alignof__(struct commit_counters_hot));
+ set_shmp(buf->commit_hot,
+ zalloc_shm(shmobj,
+ sizeof(struct commit_counters_hot) * chan->backend.num_subbuf));
+ if (!shmp(handle, buf->commit_hot)) {
ret = -ENOMEM;
goto free_chanbuf;
}
- align_shm(shm_header, __alignof__(struct commit_counters_cold));
- set_shmp(&buf->commit_cold,
- zalloc_shm(shm_header,
- sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
- if (!shmp(buf->commit_cold)) {
+ align_shm(shmobj, __alignof__(struct commit_counters_cold));
+ set_shmp(buf->commit_cold,
+ zalloc_shm(shmobj,
+ sizeof(struct commit_counters_cold) * chan->backend.num_subbuf));
+ if (!shmp(handle, buf->commit_cold)) {
ret = -ENOMEM;
goto free_commit;
}
*/
subbuf_header_size = config->cb.subbuffer_header_size();
v_set(config, &buf->offset, subbuf_header_size);
- subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id);
- tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan));
- config->cb.buffer_begin(buf, tsc, 0);
- v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc);
+ subbuffer_id_clear_noref(config, &shmp(handle, buf->backend.buf_wsb)[0].id);
+ tsc = config->cb.ring_buffer_clock_read(shmp(handle, buf->backend.chan));
+ config->cb.buffer_begin(buf, tsc, 0, handle);
+ v_add(config, subbuf_header_size, &shmp(handle, buf->commit_hot)[0].cc);
if (config->cb.buffer_create) {
- ret = config->cb.buffer_create(buf, priv, cpu, chanb->name);
+ ret = config->cb.buffer_create(buf, priv, cpu, chanb->name, handle);
if (ret)
goto free_init;
}
return ret;
}
+#if 0
static void switch_buffer_timer(unsigned long data)
{
struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
/*
* Only flush buffers periodically if readers are active.
*/
if (uatomic_read(&buf->active_readers))
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
//TODO timers
//if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
// mod_timer(&buf->switch_timer,
// jiffies + chan->switch_timer_interval);
}
+#endif //0
-static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
buf->switch_timer_enabled = 1;
}
-static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
return;
buf->switch_timer_enabled = 0;
}
+#if 0
/*
* Polling timer to check the channels for data.
*/
static void read_buffer_timer(unsigned long data)
{
struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
CHAN_WARN_ON(chan, !buf->backend.allocated);
// mod_timer(&buf->read_timer,
// jiffies + chan->read_timer_interval);
}
+#endif //0
-static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
buf->read_timer_enabled = 1;
}
-static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
+static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
* do one more check to catch data that has been written in the last
* timer period.
*/
- if (lib_ring_buffer_poll_deliver(config, buf, chan)) {
+ if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
//TODO
//wake_up_interruptible(&buf->read_wait);
//wake_up_interruptible(&chan->read_wait);
buf->read_timer_enabled = 0;
}
-static void channel_unregister_notifiers(struct channel *chan)
+static void channel_unregister_notifiers(struct channel *chan,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
int cpu;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
+ struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
+ lib_ring_buffer_stop_switch_timer(buf, handle);
+ lib_ring_buffer_stop_read_timer(buf, handle);
}
} else {
- struct lib_ring_buffer *buf = shmp(chan->backend.buf);
+ struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
+ lib_ring_buffer_stop_switch_timer(buf, handle);
+ lib_ring_buffer_stop_read_timer(buf, handle);
}
//channel_backend_unregister_notifiers(&chan->backend);
}
-static void channel_free(struct shm_handle *handle)
+static void channel_free(struct channel *chan, struct shm_handle *handle)
{
- struct shm_header *header = handle->header;
- struct channel *chan = shmp(header->chan);
int ret;
- channel_backend_free(&chan->backend);
+ channel_backend_free(&chan->backend, handle);
/* chan is freed by shm teardown */
- ret = munmap(header, header->shm_size);
- if (ret) {
- PERROR("umnmap");
- assert(0);
- }
- ret = close(handle->shmfd);
- if (ret) {
- PERROR("close");
- assert(0);
- }
+ shm_object_table_destroy(handle->table);
+ free(handle);
}
/**
size_t num_subbuf, unsigned int switch_timer_interval,
unsigned int read_timer_interval)
{
- int ret, cpu, shmfd;
+ int ret, cpu;
+ size_t shmsize;
struct channel *chan;
- size_t shmsize, bufshmsize, bufshmalign;
- struct shm_header *shm_header;
- unsigned long num_subbuf_alloc;
struct shm_handle *handle;
+ struct shm_object *shmobj;
if (lib_ring_buffer_check_config(config, switch_timer_interval,
read_timer_interval))
if (!handle)
return NULL;
- /* Calculate the shm allocation layout */
- shmsize = sizeof(struct shm_header);
- shmsize += offset_align(shmsize, __alignof__(struct channel));
- shmsize += sizeof(struct channel);
-
- /* Per-cpu buffer size: control (prior to backend) */
- shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer));
- bufshmsize = sizeof(struct lib_ring_buffer);
- shmsize += bufshmsize * num_possible_cpus();
-
- /* Per-cpu buffer size: backend */
- shmsize += offset_align(shmsize, PAGE_SIZE);
- /* num_subbuf + 1 is the worse case */
- num_subbuf_alloc = num_subbuf + 1;
- bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
- bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
- bufshmsize += subbuf_size * num_subbuf_alloc;
- bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
- bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
- bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
- bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
- bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
- shmsize += bufshmsize * num_possible_cpus();
-
- /* Per-cpu buffer size: control (after backend) */
- shmsize += offset_align(shmsize,
- max(__alignof__(struct commit_counters_hot),
- __alignof__(struct commit_counters_cold)));
- bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf;
- bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold));
- bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
- shmsize += bufshmsize * num_possible_cpus();
-
- /*
- * Allocate shm, and immediately unlink its shm oject, keeping
- * only the file descriptor as a reference to the object. If it
- * already exists (caused by short race window during which the
- * global object exists in a concurrent shm_open), simply retry.
- */
- do {
- shmfd = shm_open("/ust-shm-tmp",
- O_CREAT | O_EXCL | O_RDWR, 0700);
- } while (shmfd < 0 && errno == EEXIST);
- if (shmfd < 0) {
- PERROR("shm_open");
- goto error_shm_open;
- }
- ret = shm_unlink("/ust-shm-tmp");
- if (ret) {
- PERROR("shm_unlink");
- goto error_unlink;
- }
- ret = ftruncate(shmfd, shmsize);
- if (ret) {
- PERROR("ftruncate");
- goto error_ftruncate;
- }
+ /* Allocate table for channel + per-cpu buffers */
+ handle->table = shm_object_table_create(1 + num_possible_cpus());
+ if (!handle->table)
+ goto error_table_alloc;
- shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE,
- MAP_SHARED, shmfd, 0);
- if (shm_header == MAP_FAILED) {
- PERROR("mmap");
- goto error_mmap;
- }
-
- shm_header->magic = SHM_MAGIC;
- shm_header->major = SHM_MAJOR;
- shm_header->major = SHM_MINOR;
- shm_header->bits_per_long = CAA_BITS_PER_LONG;
- shm_header->shm_size = shmsize;
- shm_header->shm_allocated = sizeof(struct shm_header);
+ /* Calculate the shm allocation layout */
+ shmsize = sizeof(struct channel);
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+ shmsize += sizeof(struct lib_ring_buffer_shmp) * num_possible_cpus();
+ else
+ shmsize += sizeof(struct lib_ring_buffer_shmp);
- align_shm(shm_header, __alignof__(struct channel));
- chan = zalloc_shm(shm_header, sizeof(struct channel));
+ shmobj = shm_object_table_append(handle->table, shmsize);
+ set_shmp(handle->chan, zalloc_shm(shmobj, sizeof(struct channel)));
+ chan = shmp(handle, handle->chan);
if (!chan)
- goto destroy_shmem;
- set_shmp(shm_header->chan, chan);
+ goto error_append;
ret = channel_backend_init(&chan->backend, name, config, priv,
- subbuf_size, num_subbuf, shm_header);
+ subbuf_size, num_subbuf, handle);
if (ret)
- goto destroy_shmem;
+ goto error_backend_init;
chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
//TODO
* In that off case, we need to allocate for all possible cpus.
*/
for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
+ struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
+ lib_ring_buffer_start_switch_timer(buf, handle);
+ lib_ring_buffer_start_read_timer(buf, handle);
}
} else {
- struct lib_ring_buffer *buf = shmp(chan->backend.buf);
+ struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
+ lib_ring_buffer_start_switch_timer(buf, handle);
+ lib_ring_buffer_start_read_timer(buf, handle);
}
- handle->header = shm_header;
- handle->shmfd = shmfd;
return handle;
-destroy_shmem:
- ret = munmap(shm_header, shmsize);
- if (ret) {
- PERROR("umnmap");
- assert(0);
- }
-error_mmap:
-error_ftruncate:
-error_unlink:
- ret = close(shmfd);
- if (ret) {
- PERROR("close");
- assert(0);
- }
-error_shm_open:
+error_backend_init:
+error_append:
+ shm_object_table_destroy(handle->table);
+error_table_alloc:
free(handle);
return NULL;
}
static
-void channel_release(struct shm_handle *handle)
+void channel_release(struct channel *chan, struct shm_handle *handle)
{
- channel_free(handle);
+ channel_free(chan, handle);
}
/**
* They should release their handle at that point. Returns the private
* data pointer.
*/
-void *channel_destroy(struct shm_handle *handle)
+void *channel_destroy(struct channel *chan, struct shm_handle *handle)
{
- struct shm_header *header = handle->header;
- struct channel *chan = shmp(header->chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
void *priv;
int cpu;
- channel_unregister_notifiers(chan);
+ channel_unregister_notifiers(chan, handle);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
for_each_channel_cpu(cpu, chan) {
- struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu];
+ struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf,
chan->backend.priv,
- cpu);
+ cpu, handle);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
+ handle);
/*
* Perform flush before writing to finalized.
*/
//wake_up_interruptible(&buf->read_wait);
}
} else {
- struct lib_ring_buffer *buf = shmp(chan->backend.buf);
+ struct lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
if (config->cb.buffer_finalize)
- config->cb.buffer_finalize(buf, chan->backend.priv, -1);
+ config->cb.buffer_finalize(buf, chan->backend.priv, -1, handle);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH,
+ handle);
/*
* Perform flush before writing to finalized.
*/
* sessiond/consumer are keeping a reference on the shm file
* descriptor directly. No need to refcount.
*/
- channel_release(handle);
+ channel_release(chan, handle);
priv = chan->backend.priv;
return priv;
}
struct lib_ring_buffer *channel_get_ring_buffer(
const struct lib_ring_buffer_config *config,
- struct channel *chan, int cpu)
+ struct channel *chan, int cpu,
+ struct shm_handle *handle)
{
if (config->alloc == RING_BUFFER_ALLOC_GLOBAL)
- return shmp(chan->backend.buf);
+ return shmp(handle, chan->backend.buf[0].shmp);
else
- return &shmp(chan->backend.buf)[cpu];
+ return shmp(handle, chan->backend.buf[cpu].shmp);
}
-int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
+int lib_ring_buffer_open_read(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
return -EBUSY;
return 0;
}
-void lib_ring_buffer_release_read(struct lib_ring_buffer *buf)
+void lib_ring_buffer_release_read(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
cmm_smp_mb();
*/
int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
- unsigned long *consumed, unsigned long *produced)
+ unsigned long *consumed, unsigned long *produced,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long consumed_cur, write_offset;
int finalized;
* @consumed_new: new consumed count value
*/
void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf,
- unsigned long consumed_new)
+ unsigned long consumed_new,
+ struct shm_handle *handle)
{
struct lib_ring_buffer_backend *bufb = &buf->backend;
- struct channel *chan = shmp(bufb->chan);
+ struct channel *chan = shmp(handle, bufb->chan);
unsigned long consumed;
CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
* data to read at consumed position, or 0 if the get operation succeeds.
*/
int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
- unsigned long consumed)
+ unsigned long consumed,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
int ret;
cmm_smp_rmb();
consumed_cur = uatomic_read(&buf->consumed);
consumed_idx = subbuf_index(consumed, chan);
- commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb);
+ commit_count = v_read(config, &shmp(handle, buf->commit_cold)[consumed_idx].cc_sb);
/*
* Make sure we read the commit count before reading the buffer
* data and the write offset. Correct consumed offset ordering
* looking for matches the one contained in the subbuffer id.
*/
ret = update_read_sb_index(config, &buf->backend, &chan->backend,
- consumed_idx, buf_trunc_val(consumed, chan));
+ consumed_idx, buf_trunc_val(consumed, chan),
+ handle);
if (ret)
goto retry;
subbuffer_id_clear_noref(config, &buf->backend.buf_rsb.id);
* lib_ring_buffer_put_subbuf - release exclusive subbuffer access
* @buf: ring buffer
*/
-void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf)
+void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
struct lib_ring_buffer_backend *bufb = &buf->backend;
- struct channel *chan = shmp(bufb->chan);
+ struct channel *chan = shmp(handle, bufb->chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long read_sb_bindex, consumed_idx, consumed;
*/
read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id);
v_add(config, v_read(config,
- &shmp(bufb->array)[read_sb_bindex]->records_unread),
+ &shmp(handle, shmp(handle, bufb->array)[read_sb_bindex].shmp)->records_unread),
&bufb->records_read);
- v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0);
+ v_set(config, &shmp(handle, shmp(handle, bufb->array)[read_sb_bindex].shmp)->records_unread, 0);
CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE
&& subbuffer_id_is_noref(config, bufb->buf_rsb.id));
subbuffer_id_set_noref(config, &bufb->buf_rsb.id);
*/
consumed_idx = subbuf_index(consumed, chan);
update_read_sb_index(config, &buf->backend, &chan->backend,
- consumed_idx, buf_trunc_val(consumed, chan));
+ consumed_idx, buf_trunc_val(consumed, chan),
+ handle);
/*
* update_read_sb_index return value ignored. Don't exchange sub-buffer
* if the writer concurrently updated it.
void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf,
struct channel *chan,
unsigned long cons_offset,
- int cpu)
+ int cpu,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long cons_idx, commit_count, commit_count_sb;
cons_idx = subbuf_index(cons_offset, chan);
- commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc);
- commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[cons_idx].cc);
+ commit_count_sb = v_read(config, &shmp(handle, buf->commit_cold)[cons_idx].cc_sb);
if (subbuf_offset(commit_count, chan) != 0)
ERRMSG("ring buffer %s, cpu %d: "
static
void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
struct channel *chan,
- void *priv, int cpu)
+ void *priv, int cpu,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long write_offset, cons_offset;
- cons_offset) > 0;
cons_offset = subbuf_align(cons_offset, chan))
lib_ring_buffer_print_subbuffer_errors(buf, chan, cons_offset,
- cpu);
+ cpu, handle);
}
static
void lib_ring_buffer_print_errors(struct channel *chan,
- struct lib_ring_buffer *buf, int cpu)
+ struct lib_ring_buffer *buf, int cpu,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
void *priv = chan->backend.priv;
v_read(config, &buf->records_lost_wrap),
v_read(config, &buf->records_lost_big));
- lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu);
+ lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
}
/*
void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ u64 tsc,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long oldidx = subbuf_index(offsets->old, chan);
unsigned long commit_count;
- config->cb.buffer_begin(buf, tsc, oldidx);
+ config->cb.buffer_begin(buf, tsc, oldidx, handle);
/*
* Order all writes to buffer before the commit count update that will
*/
cmm_smp_wmb();
v_add(config, config->cb.subbuffer_header_size(),
- &shmp(buf->commit_hot)[oldidx].cc);
- commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
+ &shmp(handle, buf->commit_hot)[oldidx].cc);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[oldidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
- commit_count, oldidx);
+ commit_count, oldidx, handle);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
offsets->old, commit_count,
- config->cb.subbuffer_header_size());
+ config->cb.subbuffer_header_size(),
+ handle);
}
/*
void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ u64 tsc,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
data_size = subbuf_offset(offsets->old - 1, chan) + 1;
padding_size = chan->backend.subbuf_size - data_size;
- subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
+ subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
+ handle);
/*
* Order all writes to buffer before the commit count update that will
* determine that the subbuffer is full.
*/
cmm_smp_wmb();
- v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc);
- commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc);
+ v_add(config, padding_size, &shmp(handle, buf->commit_hot)[oldidx].cc);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[oldidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
- commit_count, oldidx);
+ commit_count, oldidx, handle);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
offsets->old, commit_count,
- padding_size);
+ padding_size, handle);
}
/*
void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- u64 tsc)
+ u64 tsc,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long beginidx = subbuf_index(offsets->begin, chan);
unsigned long commit_count;
- config->cb.buffer_begin(buf, tsc, beginidx);
+ config->cb.buffer_begin(buf, tsc, beginidx, handle);
/*
* Order all writes to buffer before the commit count update that will
*/
cmm_smp_wmb();
v_add(config, config->cb.subbuffer_header_size(),
- &shmp(buf->commit_hot)[beginidx].cc);
- commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc);
+ &shmp(handle, buf->commit_hot)[beginidx].cc);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[beginidx].cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
- commit_count, beginidx);
+ commit_count, beginidx, handle);
lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
offsets->begin, commit_count,
- config->cb.subbuffer_header_size());
+ config->cb.subbuffer_header_size(),
+ handle);
}
/*
*/
static
void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
- struct channel *chan,
- struct switch_offsets *offsets,
- u64 tsc)
+ struct channel *chan,
+ struct switch_offsets *offsets,
+ u64 tsc,
+ struct shm_handle *handle)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long endidx = subbuf_index(offsets->end - 1, chan);
data_size = subbuf_offset(offsets->end - 1, chan) + 1;
padding_size = chan->backend.subbuf_size - data_size;
- subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+ subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
+ handle);
/*
* Order all writes to buffer before the commit count update that will
* determine that the subbuffer is full.
*/
cmm_smp_wmb();
- v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc);
- commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc);
+ v_add(config, padding_size, &shmp(handle, buf->commit_hot)[endidx].cc);
+ commit_count = v_read(config, &shmp(handle, buf->commit_hot)[endidx].cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1,
- commit_count, endidx);
+ commit_count, endidx, handle);
lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
offsets->end, commit_count,
- padding_size);
+ padding_size, handle);
}
/*
* operations, this function must be called from the CPU which owns the buffer
* for a ACTIVE flush.
*/
-void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode)
+void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
struct switch_offsets offsets;
unsigned long oldidx;
lib_ring_buffer_reserve_push_reader(buf, chan, offsets.old);
oldidx = subbuf_index(offsets.old, chan);
- lib_ring_buffer_clear_noref(config, &buf->backend, oldidx);
+ lib_ring_buffer_clear_noref(config, &buf->backend, oldidx, handle);
/*
* May need to populate header start on SWITCH_FLUSH.
*/
if (offsets.switch_old_start) {
- lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc);
+ lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
offsets.old += config->cb.subbuffer_header_size();
}
/*
* Switch old subbuffer.
*/
- lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc);
+ lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
}
/*
struct lib_ring_buffer_ctx *ctx)
{
const struct lib_ring_buffer_config *config = chan->backend.config;
+ struct shm_handle *handle = ctx->handle;
unsigned long reserve_commit_diff;
offsets->begin = v_read(config, &buf->offset);
(buf_trunc(offsets->begin, chan)
>> chan->backend.num_subbuf_order)
- ((unsigned long) v_read(config,
- &shmp(buf->commit_cold)[sb_index].cc_sb)
+ &shmp(handle, buf->commit_cold)[sb_index].cc_sb)
& chan->commit_count_mask);
if (likely(reserve_commit_diff == 0)) {
/* Next subbuffer not being written to. */
int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
{
struct channel *chan = ctx->chan;
+ struct shm_handle *handle = ctx->handle;
const struct lib_ring_buffer_config *config = chan->backend.config;
struct lib_ring_buffer *buf;
struct switch_offsets offsets;
int ret;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- buf = &shmp(chan->backend.buf)[ctx->cpu];
+ buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
else
- buf = shmp(chan->backend.buf);
+ buf = shmp(handle, chan->backend.buf[0].shmp);
ctx->buf = buf;
offsets.size = 0;
* Clear noref flag for this subbuffer.
*/
lib_ring_buffer_clear_noref(config, &buf->backend,
- subbuf_index(offsets.end - 1, chan));
+ subbuf_index(offsets.end - 1, chan),
+ handle);
/*
* Switch old subbuffer if needed.
*/
if (unlikely(offsets.switch_old_end)) {
lib_ring_buffer_clear_noref(config, &buf->backend,
- subbuf_index(offsets.old - 1, chan));
- lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc);
+ subbuf_index(offsets.old - 1, chan),
+ handle);
+ lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
}
/*
* Populate new subbuffer.
*/
if (unlikely(offsets.switch_new_start))
- lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc);
+ lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
if (unlikely(offsets.switch_new_end))
- lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc);
+ lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
ctx->slot_size = offsets.size;
ctx->pre_offset = offsets.begin;
--- /dev/null
+/*
+ * libringbuffer/shm.c
+ *
+ * Copyright 2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "shm.h"
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/mman.h>
+#include <sys/stat.h> /* For mode constants */
+#include <fcntl.h> /* For O_* constants */
+#include <assert.h>
+#include <ust/align.h>
+
+struct shm_object_table *shm_object_table_create(size_t max_nb_obj)
+{
+ struct shm_object_table *table;
+
+ table = zmalloc(sizeof(struct shm_object_table) +
+ max_nb_obj * sizeof(table->objects[0]));
+ table->size = max_nb_obj;
+ return table;
+}
+
+struct shm_object *shm_object_table_append(struct shm_object_table *table,
+ size_t memory_map_size)
+{
+ int shmfd, waitfd[2], ret, i;
+ struct shm_object *obj;
+ char *memory_map;
+
+ if (table->allocated_len >= table->size)
+ return NULL;
+ obj = &table->objects[table->allocated_len++];
+
+ /* wait_fd: create pipe */
+ ret = pipe(waitfd);
+ if (ret < 0) {
+ PERROR("pipe");
+ goto error_pipe;
+ }
+ for (i = 0; i < 2; i++) {
+ ret = fcntl(waitfd[i], F_SETFD, FD_CLOEXEC);
+ if (ret < 0) {
+ PERROR("fcntl");
+ goto error_fcntl;
+ }
+ }
+ *obj->wait_fd = *waitfd;
+
+ /* shm_fd: create shm */
+
+ /*
+ * Allocate shm, and immediately unlink its shm oject, keeping
+ * only the file descriptor as a reference to the object. If it
+ * already exists (caused by short race window during which the
+ * global object exists in a concurrent shm_open), simply retry.
+ * We specifically do _not_ use the / at the beginning of the
+ * pathname so that some OS implementations can keep it local to
+ * the process (POSIX leaves this implementation-defined).
+ */
+ do {
+ shmfd = shm_open("ust-shm-tmp",
+ O_CREAT | O_EXCL | O_RDWR, 0700);
+ } while (shmfd < 0 && errno == EEXIST);
+ if (shmfd < 0) {
+ PERROR("shm_open");
+ goto error_shm_open;
+ }
+ ret = shm_unlink("ust-shm-tmp");
+ if (ret) {
+ PERROR("shm_unlink");
+ goto error_unlink;
+ }
+ ret = ftruncate(shmfd, memory_map_size);
+ if (ret) {
+ PERROR("ftruncate");
+ goto error_ftruncate;
+ }
+ obj->shm_fd = shmfd;
+
+ /* memory_map: mmap */
+ memory_map = mmap(NULL, memory_map_size, PROT_READ | PROT_WRITE,
+ MAP_SHARED, shmfd, 0);
+ if (memory_map == MAP_FAILED) {
+ PERROR("mmap");
+ goto error_mmap;
+ }
+ obj->memory_map = memory_map;
+ obj->memory_map_size = memory_map_size;
+ obj->allocated_len = 0;
+ return obj;
+
+error_mmap:
+error_ftruncate:
+error_unlink:
+ ret = close(shmfd);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+error_shm_open:
+error_fcntl:
+ for (i = 0; i < 2; i++) {
+ ret = close(waitfd[i]);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+ }
+error_pipe:
+ free(obj);
+ return NULL;
+
+}
+
+static
+void shmp_object_destroy(struct shm_object *obj)
+{
+ int ret, i;
+
+ ret = munmap(obj->memory_map, obj->memory_map_size);
+ if (ret) {
+ PERROR("umnmap");
+ assert(0);
+ }
+ ret = close(obj->shm_fd);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+ for (i = 0; i < 2; i++) {
+ ret = close(obj->wait_fd[i]);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+ }
+}
+
+void shm_object_table_destroy(struct shm_object_table *table)
+{
+ int i;
+
+ for (i = 0; i < table->allocated_len; i++)
+ shmp_object_destroy(&table->objects[i]);
+ free(table);
+}
+
+/*
+ * zalloc_shm - allocate memory within a shm object.
+ *
+ * Shared memory is already zeroed by shmget.
+ * *NOT* multithread-safe (should be protected by mutex).
+ * Returns a -1, -1 tuple on error.
+ */
+struct shm_ref zalloc_shm(struct shm_object *obj, size_t len)
+{
+ struct shm_ref ref;
+ struct shm_ref shm_ref_error = { -1, -1 };
+
+ if (obj->memory_map_size - obj->allocated_len < len)
+ return shm_ref_error;
+ ref.index = obj->index;
+ ref.offset = obj->allocated_len;
+ obj->allocated_len += len;
+ return ref;
+}
+
+void align_shm(struct shm_object *obj, size_t align)
+{
+ size_t offset_len = offset_align(obj->allocated_len, align);
+ obj->allocated_len += offset_len;
+}
#include <stdint.h>
#include <ust/usterr-signal-safe.h>
#include "ust/core.h"
+#include "shm_types.h"
#define SHM_MAGIC 0x54335433
#define SHM_MAJOR 0
#define SHM_MINOR 1
/*
- * Defining a max shm offset, for debugging purposes.
+ * Pointer dereferencing. We don't trust the shm_ref, so we validate
+ * both the index and offset with known boundaries.
*/
-#if (CAA_BITS_PER_LONG == 32)
-/* Define the maximum shared memory size to 128MB on 32-bit machines */
-#define MAX_SHM_SIZE 134217728
-#else
-/* Define the maximum shared memory size to 8GB on 64-bit machines */
-#define MAX_SHM_SIZE 8589934592
-#endif
-
-#define DECLARE_SHMP(type, name) type *****name
-
-struct shm_header {
- uint32_t magic;
- uint8_t major;
- uint8_t minor;
- uint8_t bits_per_long;
- size_t shm_size, shm_allocated;
-
- DECLARE_SHMP(struct channel, chan);
-};
-
-struct shm_handle {
- struct shm_header *header; /* beginning of mapping */
- int shmfd; /* process-local file descriptor */
-};
-
-#define shmp(shm_offset) \
- ((__typeof__(****(shm_offset))) (((char *) &(shm_offset)) + (ptrdiff_t) (shm_offset)))
-
-#define _shmp_abs(a) ((a < 0) ? -(a) : (a))
-
-static inline
-void _set_shmp(ptrdiff_t *shm_offset, void *ptr)
-{
- *shm_offset = (((char *) ptr) - ((char *) shm_offset));
- assert(_shmp_abs(*shm_offset) < MAX_SHM_SIZE);
-}
-
-#define set_shmp(shm_offset, ptr) \
- _set_shmp((ptrdiff_t *) ****(shm_offset), ptr)
-
-/* Shared memory is already zeroed by shmget */
-/* *NOT* multithread-safe (should be protected by mutex) */
static inline
-void *zalloc_shm(struct shm_header *shm_header, size_t len)
+char *_shmp(struct shm_object_table *table, struct shm_ref *ref)
{
- void *ret;
+ struct shm_object *obj;
+ size_t index, offset;
- if (shm_header->shm_size - shm_header->shm_allocated < len)
+ index = (size_t) ref->index;
+ if (unlikely(index >= table->allocated_len))
+ return NULL;
+ obj = &table->objects[index];
+ offset = (size_t) ref->offset;
+ if (unlikely(offset >= obj->memory_map_size))
return NULL;
- ret = (char *) shm_header + shm_header->shm_allocated;
- shm_header->shm_allocated += len;
- return ret;
+ return &obj->memory_map[offset];
}
+#define shmp(handle, ref) \
+ ({ \
+ __typeof__((ref)._type) ____ptr_ret; \
+ ____ptr_ret = (__typeof__(____ptr_ret)) _shmp((handle)->table, &(ref)._ref); \
+ ____ptr_ret; \
+ })
+
static inline
-void align_shm(struct shm_header *shm_header, size_t align)
+void _set_shmp(struct shm_ref *ref, struct shm_ref src)
{
- size_t offset_len = offset_align(shm_header->shm_allocated, align);
- shm_header->shm_allocated += offset_len;
+ *ref = src;
}
+#define set_shmp(ref, src) _set_shmp(&(ref)._ref, src)
+
+struct shm_object_table *shm_object_table_create(size_t max_nb_obj);
+void shm_object_table_destroy(struct shm_object_table *table);
+struct shm_object *shm_object_table_append(struct shm_object_table *table,
+ size_t memory_map_size);
+
+/*
+ * zalloc_shm - allocate memory within a shm object.
+ *
+ * Shared memory is already zeroed by shmget.
+ * *NOT* multithread-safe (should be protected by mutex).
+ * Returns a -1, -1 tuple on error.
+ */
+struct shm_ref zalloc_shm(struct shm_object *obj, size_t len);
+void align_shm(struct shm_object *obj, size_t align);
+
#endif /* _LIBRINGBUFFER_SHM_H */
--- /dev/null
+#ifndef _LIBRINGBUFFER_SHM_INTERNAL_H
+#define _LIBRINGBUFFER_SHM_INTERNAL_H
+
+/*
+ * libringbuffer/shm_internal.h
+ *
+ * Copyright 2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+struct shm_ref {
+ volatile ssize_t index; /* within the object table */
+ volatile ssize_t offset; /* within the object */
+};
+
+#define DECLARE_SHMP(type, name) \
+ union { \
+ struct shm_ref _ref; \
+ type *_type; \
+ } name
+
+#endif /* _LIBRINGBUFFER_SHM_INTERNAL_H */
--- /dev/null
+#ifndef _LIBRINGBUFFER_SHM_TYPES_H
+#define _LIBRINGBUFFER_SHM_TYPES_H
+
+/*
+ * libringbuffer/shm_types.h
+ *
+ * Copyright 2011 (c) - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <stdint.h>
+#include "shm_internal.h"
+
+struct channel;
+
+struct shm_object {
+ size_t index; /* within the object table */
+ int shm_fd; /* shm fd */
+ int wait_fd[2]; /* fd for wait/wakeup */
+ char *memory_map;
+ size_t memory_map_size;
+ size_t allocated_len;
+};
+
+struct shm_object_table {
+ size_t size;
+ size_t allocated_len;
+ struct shm_object objects[];
+};
+
+struct shm_handle {
+ struct shm_object_table *table;
+ DECLARE_SHMP(struct channel, chan);
+ /*
+ * In the consumer, chan points to a shadow copy, validated upon
+ * reception. The chan object is overridden in the consumer to
+ * point to this shadow copy.
+ */
+ struct channel *shadow_chan;
+};
+
+#endif /* _LIBRINGBUFFER_SHM_TYPES_H */
* headers. Therefore the "chan" information used as input
* should be already accessible.
*/
- chan->handle = transport->ops.channel_create("[lttng]", chan, buf_addr,
+ transport->ops.channel_create("[lttng]", chan, buf_addr,
subbuf_size, num_subbuf, switch_timer_interval,
read_timer_interval);
- chan->chan = shmp(chan->handle->header->chan);
if (!chan->chan)
goto create_error;
chan->enabled = 1;
static
void _ltt_channel_destroy(struct ltt_channel *chan)
{
- chan->ops->channel_destroy(chan->handle);
+ chan->ops->channel_destroy(chan);
cds_list_del(&chan->list);
lttng_destroy_context(chan->ctx);
free(chan);
for (pos = 0; pos < len; pos += reserve_len) {
reserve_len = min_t(size_t,
- chan->ops->packet_avail_size(chan->chan),
+ chan->ops->packet_avail_size(chan->chan, chan->handle),
len - pos);
lib_ring_buffer_ctx_init(&ctx, chan->chan, NULL, reserve_len,
- sizeof(char), -1);
+ sizeof(char), -1, chan->handle);
/*
* We don't care about metadata buffer's records lost
* count, because we always retry here. Report error if
}
static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
- unsigned int subbuf_idx)
+ unsigned int subbuf_idx,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
struct packet_header *header =
(struct packet_header *)
lib_ring_buffer_offset_address(&buf->backend,
- subbuf_idx * chan->backend.subbuf_size);
+ subbuf_idx * chan->backend.subbuf_size,
+ handle);
struct ltt_channel *ltt_chan = channel_get_private(chan);
struct ltt_session *session = ltt_chan->session;
* subbuffer. data_size is between 1 and subbuf_size.
*/
static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
- unsigned int subbuf_idx, unsigned long data_size)
+ unsigned int subbuf_idx, unsigned long data_size,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
struct packet_header *header =
(struct packet_header *)
lib_ring_buffer_offset_address(&buf->backend,
- subbuf_idx * chan->backend.subbuf_size);
+ subbuf_idx * chan->backend.subbuf_size,
+ handle);
unsigned long records_lost = 0;
header->ctx.timestamp_end = tsc;
}
static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
- int cpu, const char *name)
+ int cpu, const char *name, struct shm_handle *handle)
{
return 0;
}
-static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
+static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu, struct shm_handle *handle)
{
}
};
static
-struct shm_handle *_channel_create(const char *name,
+struct ltt_channel *_channel_create(const char *name,
struct ltt_channel *ltt_chan, void *buf_addr,
size_t subbuf_size, size_t num_subbuf,
unsigned int switch_timer_interval,
unsigned int read_timer_interval)
{
- return channel_create(&client_config, name, ltt_chan, buf_addr,
+ ltt_chan->handle = channel_create(&client_config, name, ltt_chan, buf_addr,
subbuf_size, num_subbuf, switch_timer_interval,
read_timer_interval);
+ ltt_chan->chan = shmp(handle, handle->chan);
+ return ltt_chan;
}
static
-void ltt_channel_destroy(struct shm_handle *handle)
+void ltt_channel_destroy(struct ltt_channel *ltt_chan)
{
- channel_destroy(handle);
+ channel_destroy(ltt_chan->chan, ltt_chan->handle);
}
static
-struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan)
+struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan,
+ struct shm_handle *handle)
{
struct lib_ring_buffer *buf;
int cpu;
for_each_channel_cpu(cpu, chan) {
- buf = channel_get_ring_buffer(&client_config, chan, cpu);
- if (!lib_ring_buffer_open_read(buf))
+ buf = channel_get_ring_buffer(&client_config, chan, cpu, handle);
+ if (!lib_ring_buffer_open_read(buf, handle))
return buf;
}
return NULL;
}
static
-void ltt_buffer_read_close(struct lib_ring_buffer *buf)
+void ltt_buffer_read_close(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- lib_ring_buffer_release_read(buf);
+ lib_ring_buffer_release_read(buf, handle);
}
static
}
static void client_buffer_begin(struct lib_ring_buffer *buf, u64 tsc,
- unsigned int subbuf_idx)
+ unsigned int subbuf_idx,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
struct metadata_packet_header *header =
(struct metadata_packet_header *)
lib_ring_buffer_offset_address(&buf->backend,
- subbuf_idx * chan->backend.subbuf_size);
+ subbuf_idx * chan->backend.subbuf_size,
+ handle);
struct ltt_channel *ltt_chan = channel_get_private(chan);
struct ltt_session *session = ltt_chan->session;
* subbuffer. data_size is between 1 and subbuf_size.
*/
static void client_buffer_end(struct lib_ring_buffer *buf, u64 tsc,
- unsigned int subbuf_idx, unsigned long data_size)
+ unsigned int subbuf_idx, unsigned long data_size,
+ struct shm_handle *handle)
{
- struct channel *chan = shmp(buf->backend.chan);
+ struct channel *chan = shmp(handle, buf->backend.chan);
struct metadata_packet_header *header =
(struct metadata_packet_header *)
lib_ring_buffer_offset_address(&buf->backend,
- subbuf_idx * chan->backend.subbuf_size);
+ subbuf_idx * chan->backend.subbuf_size,
+ handle);
unsigned long records_lost = 0;
header->content_size = data_size * CHAR_BIT; /* in bits */
}
static int client_buffer_create(struct lib_ring_buffer *buf, void *priv,
- int cpu, const char *name)
+ int cpu, const char *name,
+ struct shm_handle *handle)
{
return 0;
}
-static void client_buffer_finalize(struct lib_ring_buffer *buf, void *priv, int cpu)
+static void client_buffer_finalize(struct lib_ring_buffer *buf,
+ void *priv, int cpu,
+ struct shm_handle *handle)
{
}
};
static
-struct shm_handle *_channel_create(const char *name,
+struct ltt_channel *_channel_create(const char *name,
struct ltt_channel *ltt_chan, void *buf_addr,
size_t subbuf_size, size_t num_subbuf,
unsigned int switch_timer_interval,
unsigned int read_timer_interval)
{
- return channel_create(&client_config, name, ltt_chan, buf_addr,
+ ltt_chan->handle = channel_create(&client_config, name, ltt_chan, buf_addr,
subbuf_size, num_subbuf, switch_timer_interval,
read_timer_interval);
+ ltt_chan->chan = shmp(handle, handle->chan);
+ return ltt_chan;
}
static
-void ltt_channel_destroy(struct shm_handle *handle)
+void ltt_channel_destroy(struct ltt_channel *ltt_chan)
{
- channel_destroy(handle);
+ channel_destroy(ltt_chan->chan, ltt_chan->handle);
}
static
-struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan)
+struct lib_ring_buffer *ltt_buffer_read_open(struct channel *chan,
+ struct shm_handle *handle)
{
struct lib_ring_buffer *buf;
- buf = channel_get_ring_buffer(&client_config, chan, 0);
- if (!lib_ring_buffer_open_read(buf))
+ buf = channel_get_ring_buffer(&client_config, chan, 0, handle);
+ if (!lib_ring_buffer_open_read(buf, handle))
return buf;
return NULL;
}
static
-void ltt_buffer_read_close(struct lib_ring_buffer *buf)
+void ltt_buffer_read_close(struct lib_ring_buffer *buf,
+ struct shm_handle *handle)
{
- lib_ring_buffer_release_read(buf);
+ lib_ring_buffer_release_read(buf, handle);
}
static
}
static
-size_t ltt_packet_avail_size(struct channel *chan)
+size_t ltt_packet_avail_size(struct channel *chan, struct shm_handle *handle)
{
unsigned long o_begin;
struct lib_ring_buffer *buf;
- buf = shmp(chan->backend.buf); /* Only for global buffer ! */
+ buf = shmp(handle, chan->backend.buf[0].shmp); /* Only for global buffer ! */
o_begin = v_read(&client_config, &buf->offset);
if (subbuf_offset(o_begin, chan) != 0) {
return chan->backend.subbuf_size - subbuf_offset(o_begin, chan);