#include "backend.h"
#include "frontend.h"
#include "smp.h"
+#include "shm.h"
/**
* lib_ring_buffer_backend_allocate - allocate a channel buffer
if (extra_reader_sb)
num_subbuf_alloc++;
+ /* Align the entire buffer backend data on PAGE_SIZE */
+ align_shm(shm_header, PAGE_SIZE);
set_shmp(bufb->array, zalloc_shm(shm_header,
sizeof(*bufb->array) * num_subbuf_alloc));
if (unlikely(!shmp(bufb->array)))
goto array_error;
+ /*
+ * This is the largest element (the buffer pages) which needs to
+ * be aligned on PAGE_SIZE.
+ */
+ align_shm(shm_header, PAGE_SIZE);
set_shmp(bufb->memory_map, zalloc_shm(shm_header,
subbuf_size * num_subbuf_alloc));
if (unlikely(!shmp(bufb->memory_map)))
/* Allocate backend pages array elements */
for (i = 0; i < num_subbuf_alloc; i++) {
+ align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_pages));
set_shmp(bufb->array[i],
zalloc_shm(shm_header,
- sizeof(struct lib_ring_buffer_backend_pages) +
- subbuf_size));
+ sizeof(struct lib_ring_buffer_backend_pages)));
if (!shmp(bufb->array[i]))
goto free_array;
}
/* Allocate write-side subbuffer table */
+ align_shm(shm_header, __alignof__(struct lib_ring_buffer_backend_subbuffer));
bufb->buf_wsb = zalloc_shm(shm_header,
sizeof(struct lib_ring_buffer_backend_subbuffer)
* num_subbuf);
mmap_offset += subbuf_size;
}
}
+ /*
+ * Align the end of each buffer backend data on PAGE_SIZE, to
+ * behave like an array which contains elements that need to be
+ * aligned on PAGE_SIZE.
+ */
+ align_shm(shm_header, PAGE_SIZE);
return 0;
size_t alloc_size;
/* Allocating the buffer per-cpu structures */
+ align_shm(shm_header, __alignof__(struct lib_ring_buffer));
alloc_size = sizeof(struct lib_ring_buffer);
buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus());
if (!buf)
struct lib_ring_buffer *buf;
size_t alloc_size;
+ align_shm(shm_header, __alignof__(struct lib_ring_buffer));
alloc_size = sizeof(struct lib_ring_buffer);
- chanb->buf = zmalloc(sizeof(struct lib_ring_buffer));
buf = zalloc_shm(shm_header, alloc_size);
if (!buf)
goto end;
*/
#include <sys/types.h>
-#include <sys/shm.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
#include <urcu/compiler.h>
#include <urcu/ref.h>
#include "frontend.h"
#include "shm.h"
+#ifndef max
+#define max(a, b) ((a) > (b) ? (a) : (b))
+#endif
+
/*
* Internal structure representing offsets to use at a sub-buffer switch.
*/
struct channel *chan = shmp(buf->backend.chan);
lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
- free(shmp(buf->commit_hot));
- free(shmp(buf->commit_cold));
+ /* buf->commit_hot will be freed by shm teardown */
+ /* buf->commit_cold will be freed by shm teardown */
lib_ring_buffer_backend_free(&buf->backend);
}
if (ret)
return ret;
+ align_shm(shm_header,
+ max(__alignof__(struct commit_counters_hot),
+ __alignof__(struct commit_counters_cold)));
set_shmp(&buf->commit_hot,
zalloc_shm(shm_header,
sizeof(*buf->commit_hot) * chan->backend.num_subbuf));
goto free_chanbuf;
}
+ align_shm(shm_header, __alignof__(struct commit_counters_cold));
set_shmp(&buf->commit_cold,
zalloc_shm(shm_header,
sizeof(*buf->commit_cold) * chan->backend.num_subbuf));
channel_backend_unregister_notifiers(&chan->backend);
}
-static void channel_free(struct channel *chan)
+static void channel_free(struct shm_handle *handle)
{
+ struct shm_header *header = handle->header;
+ struct channel *chan = shmp(header->chan);
+ int ret;
+
channel_backend_free(&chan->backend);
- free(chan);
+ /* chan is freed by shm teardown */
+ ret = munmap(header, header->shm_size);
+ if (ret) {
+ PERROR("umnmap");
+ assert(0);
+ }
+ ret = close(handle->shmfd);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
}
/**
* padding to let readers get those sub-buffers.
* Used for live streaming.
* @read_timer_interval: Time interval (in us) to wake up pending readers.
- * @shmid: shared memory ID (output)
+ * @shmfd: shared memory file descriptor (output, needs to be closed by
+ * the caller)
*
* Holds cpu hotplug.
* Returns NULL on failure.
*/
-struct channel *channel_create(const struct lib_ring_buffer_config *config,
+struct shm_handle *channel_create(const struct lib_ring_buffer_config *config,
const char *name, void *priv, void *buf_addr,
size_t subbuf_size,
size_t num_subbuf, unsigned int switch_timer_interval,
- unsigned int read_timer_interval,
- int *shmid)
+ unsigned int read_timer_interval)
{
- int ret, cpu;
+ int ret, cpu, shmfd;
struct channel *chan;
- size_t shmsize, bufshmsize;
+ size_t shmsize, bufshmsize, bufshmalign;
struct shm_header *shm_header;
unsigned long num_subbuf_alloc;
+ struct shm_handle *handle;
if (lib_ring_buffer_check_config(config, switch_timer_interval,
read_timer_interval))
return NULL;
+ handle = zmalloc(sizeof(struct shm_handle));
+ if (!handle)
+ return NULL;
+
/* Calculate the shm allocation layout */
shmsize = sizeof(struct shm_header);
+ shmsize += offset_align(shmsize, __alignof__(struct channel));
shmsize += sizeof(struct channel);
/* Per-cpu buffer size: control (prior to backend) */
+ shmsize += offset_align(shmsize, __alignof__(struct lib_ring_buffer));
bufshmsize = sizeof(struct lib_ring_buffer);
shmsize += bufshmsize * num_possible_cpus();
/* Per-cpu buffer size: backend */
+ shmsize += offset_align(shmsize, PAGE_SIZE);
/* num_subbuf + 1 is the worse case */
num_subbuf_alloc = num_subbuf + 1;
bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc;
- bufshmsize += subbuf_size * (num_subbuf_alloc);
- bufshmsize += (sizeof(struct lib_ring_buffer_backend_pages) + subbuf_size) * num_subbuf_alloc;
+ bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
+ bufshmsize += subbuf_size * num_subbuf_alloc;
+ bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_pages));
+ bufshmsize += sizeof(struct lib_ring_buffer_backend_pages) * num_subbuf_alloc;
+ bufshmsize += offset_align(bufshmsize, __alignof__(struct lib_ring_buffer_backend_subbuffer));
bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf;
+ bufshmsize += offset_align(bufshmsize, PAGE_SIZE);
shmsize += bufshmsize * num_possible_cpus();
/* Per-cpu buffer size: control (after backend) */
- bufshmsize += sizeof(struct commit_counters_hot) * num_subbuf;
+ shmsize += offset_align(shmsize,
+ max(__alignof__(struct commit_counters_hot),
+ __alignof__(struct commit_counters_cold)));
+ bufshmsize = sizeof(struct commit_counters_hot) * num_subbuf;
+ bufshmsize += offset_align(bufshmsize, __alignof__(struct commit_counters_cold));
bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf;
+ shmsize += bufshmsize * num_possible_cpus();
- /* Allocate shm */
- *shmid = shmget(getpid(), shmsize, IPC_CREAT | IPC_EXCL | 0700);
- if (*shmid < 0) {
- if (errno == EINVAL)
- ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased.");
- else
- PERROR("shmget");
- return NULL;
+ /*
+ * Allocate shm, and immediately unlink its shm oject, keeping
+ * only the file descriptor as a reference to the object. If it
+ * already exists (caused by short race window during which the
+ * global object exists in a concurrent shm_open), simply retry.
+ */
+ do {
+ shmfd = shm_open("/ust-shm-tmp",
+ O_CREAT | O_EXCL | O_RDWR, 0700);
+ } while (shmfd < 0 && errno == EEXIST);
+ if (shmfd < 0) {
+ PERROR("shm_open");
+ goto error_shm_open;
}
-
- shm_header = shmat(*shmid, NULL, 0);
- if (shm_header == (void *) -1) {
- perror("shmat");
- goto destroy_shmem;
+ ret = shm_unlink("/ust-shm-tmp");
+ if (ret) {
+ PERROR("shm_unlink");
+ goto error_unlink;
+ }
+ ret = ftruncate(shmfd, shmsize);
+ if (ret) {
+ PERROR("ftruncate");
+ goto error_ftruncate;
}
- /* Already mark the shared memory for destruction. This will occur only
- * when all users have detached.
- */
- ret = shmctl(*shmid, IPC_RMID, NULL);
- if (ret == -1) {
- perror("shmctl");
- goto destroy_shmem;
+ shm_header = mmap(NULL, shmsize, PROT_READ | PROT_WRITE,
+ MAP_SHARED, shmfd, 0);
+ if (shm_header == MAP_FAILED) {
+ PERROR("mmap");
+ goto error_mmap;
}
shm_header->magic = SHM_MAGIC;
shm_header->shm_size = shmsize;
shm_header->shm_allocated = sizeof(struct shm_header);
+ align_shm(shm_header, __alignof__(struct channel));
chan = zalloc_shm(shm_header, sizeof(struct channel));
if (!chan)
goto destroy_shmem;
//TODO
//chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
//chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
- urcu_ref_init(&chan->ref);
//TODO
//init_waitqueue_head(&chan->read_wait);
//init_waitqueue_head(&chan->hp_wait);
lib_ring_buffer_start_read_timer(buf);
}
- return chan;
+ handle->header = shm_header;
+ handle->shmfd = shmfd;
+ return handle;
destroy_shmem:
- ret = shmctl(*shmid, IPC_RMID, NULL);
- if (ret == -1) {
- perror("shmctl");
+ ret = munmap(shm_header, shmsize);
+ if (ret) {
+ PERROR("umnmap");
+ assert(0);
}
+error_mmap:
+error_ftruncate:
+error_unlink:
+ ret = close(shmfd);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+error_shm_open:
+ free(handle);
return NULL;
}
static
-void channel_release(struct urcu_ref *ref)
+void channel_release(struct shm_handle *handle)
{
- struct channel *chan = caa_container_of(ref, struct channel, ref);
- channel_free(chan);
+ channel_free(handle);
}
/**
* @chan: channel to destroy
*
* Holds cpu hotplug.
- * Call "destroy" callback, finalize channels, wait for readers to release their
- * reference, then destroy ring buffer data. Note that when readers have
- * completed data consumption of finalized channels, get_subbuf() will return
- * -ENODATA. They should release their handle at that point.
- * Returns the private data pointer.
+ * Call "destroy" callback, finalize channels, decrement the channel
+ * reference count. Note that when readers have completed data
+ * consumption of finalized channels, get_subbuf() will return -ENODATA.
+ * They should release their handle at that point. Returns the private
+ * data pointer.
*/
-void *channel_destroy(struct channel *chan)
+void *channel_destroy(struct shm_handle *handle)
{
- int cpu;
+ struct shm_header *header = handle->header;
+ struct channel *chan = shmp(header->chan);
const struct lib_ring_buffer_config *config = chan->backend.config;
void *priv;
+ int cpu;
channel_unregister_notifiers(chan);
CMM_ACCESS_ONCE(chan->finalized) = 1;
//wake_up_interruptible(&chan->hp_wait);
//wake_up_interruptible(&chan->read_wait);
- urcu_ref_put(&chan->ref, channel_release);
+ /*
+ * sessiond/consumer are keeping a reference on the shm file
+ * descriptor directly. No need to refcount.
+ */
+ channel_release(handle);
priv = chan->backend.priv;
return priv;
}
if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
return -EBUSY;
- urcu_ref_get(&chan->ref);
cmm_smp_mb();
return 0;
}
CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1);
cmm_smp_mb();
uatomic_dec(&buf->active_readers);
- urcu_ref_put(&chan->ref, channel_release);
}
/**