enum lttng_ust_output output; /* splice, mmap */
uint32_t chan_id; /* channel ID */
unsigned char uuid[LTTNG_UST_UUID_LEN]; /* Trace session unique ID */
- char shm_path[PATH_MAX]; /* Shared memory path */
} LTTNG_PACKED;
/*
struct ustctl_consumer_stream;
struct ustctl_consumer_channel_attr;
+int ustctl_get_nr_stream_per_channel(void);
+
struct ustctl_consumer_channel *
- ustctl_create_channel(struct ustctl_consumer_channel_attr *attr);
+ ustctl_create_channel(struct ustctl_consumer_channel_attr *attr,
+ const int *stream_fds, int nr_stream_fds);
/*
* Each stream created needs to be destroyed before calling
* ustctl_destroy_channel().
unsigned int read_timer_interval,
unsigned char *uuid,
uint32_t chan_id,
- const char *shm_path);
+ const int *stream_fds, int nr_stream_fds);
void (*channel_destroy)(struct lttng_channel *chan);
union {
void *_deprecated1;
/* Buffer operations */
+int ustctl_get_nr_stream_per_channel(void)
+{
+ return num_possible_cpus();
+}
+
struct ustctl_consumer_channel *
- ustctl_create_channel(struct ustctl_consumer_channel_attr *attr)
+ ustctl_create_channel(struct ustctl_consumer_channel_attr *attr,
+ const int *stream_fds, int nr_stream_fds)
{
struct ustctl_consumer_channel *chan;
const char *transport_name;
attr->switch_timer_interval,
attr->read_timer_interval,
attr->uuid, attr->chan_id,
- attr->shm_path[0] == '\0' ? NULL : attr->shm_path);
+ stream_fds, nr_stream_fds);
if (!chan->chan) {
goto chan_error;
}
unsigned int read_timer_interval,
unsigned char *uuid,
uint32_t chan_id,
- const char *shm_path)
+ const int *stream_fds, int nr_stream_fds)
{
struct lttng_channel chan_priv_init;
struct lttng_ust_shm_handle *handle;
&chan_priv_init,
buf_addr, subbuf_size, num_subbuf,
switch_timer_interval, read_timer_interval,
- shm_path);
+ stream_fds, nr_stream_fds);
if (!handle)
return NULL;
lttng_chan = priv;
unsigned int read_timer_interval,
unsigned char *uuid,
uint32_t chan_id,
- const char *shm_path)
+ const int *stream_fds, int nr_stream_fds)
{
struct lttng_channel chan_priv_init;
struct lttng_ust_shm_handle *handle;
&chan_priv_init,
buf_addr, subbuf_size, num_subbuf,
switch_timer_interval, read_timer_interval,
- shm_path);
+ stream_fds, nr_stream_fds);
if (!handle)
return NULL;
lttng_chan = priv;
const struct lttng_ust_lib_ring_buffer_config *config,
size_t subbuf_size,
size_t num_subbuf, struct lttng_ust_shm_handle *handle,
- const char *shm_path);
+ const int *stream_fds);
void channel_backend_free(struct channel_backend *chanb,
struct lttng_ust_shm_handle *handle);
size_t subbuf_size, size_t num_subbuf,
unsigned int switch_timer_interval,
unsigned int read_timer_interval,
- const char *shm_path);
+ const int *stream_fds, int nr_stream_fds);
/*
* channel_destroy finalizes all channel's buffers, waits for readers to
#include "smp.h"
#include "shm.h"
-#define UINT_MAX_STR_LEN 11 /* includes \0 */
-
/**
* lib_ring_buffer_backend_allocate - allocate a channel buffer
* @config: ring buffer instance configuration
* @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
* @num_subbuf: number of sub-buffers (power of 2)
* @lttng_ust_shm_handle: shared memory handle
- * @shm_path: shared memory files path
+ * @stream_fds: stream file descriptors.
*
* Returns channel pointer if successful, %NULL otherwise.
*
const struct lttng_ust_lib_ring_buffer_config *config,
size_t subbuf_size, size_t num_subbuf,
struct lttng_ust_shm_handle *handle,
- const char *shm_path)
+ const int *stream_fds)
{
struct channel *chan = caa_container_of(chanb, struct channel, backend);
unsigned int i;
*/
for_each_possible_cpu(i) {
struct shm_object *shmobj;
- char shm_buf_path[PATH_MAX];
-
- if (shm_path) {
- char cpu_nr[UINT_MAX_STR_LEN]; /* unsigned int max len */
-
- strncpy(shm_buf_path, shm_path, PATH_MAX);
- shm_buf_path[PATH_MAX - 1] = '\0';
- ret = snprintf(cpu_nr, UINT_MAX_STR_LEN, "%u", i);
- if (ret != 1)
- goto end;
- strncat(shm_buf_path, cpu_nr,
- PATH_MAX - strlen(shm_buf_path) - 1);
- }
+
shmobj = shm_object_table_alloc(handle->table, shmsize,
- SHM_OBJECT_SHM, shm_path ? shm_buf_path : NULL);
+ SHM_OBJECT_SHM, stream_fds[i]);
if (!shmobj)
goto end;
align_shm(shmobj, __alignof__(struct lttng_ust_lib_ring_buffer));
struct lttng_ust_lib_ring_buffer *buf;
shmobj = shm_object_table_alloc(handle->table, shmsize,
- SHM_OBJECT_SHM, shm_path);
+ SHM_OBJECT_SHM, stream_fds[0]);
if (!shmobj)
goto end;
align_shm(shmobj, __alignof__(struct lttng_ust_lib_ring_buffer));
* padding to let readers get those sub-buffers.
* Used for live streaming.
* @read_timer_interval: Time interval (in us) to wake up pending readers.
- * @shm_path: Shared memory files path.
+ * @stream_fds: array of stream file descriptors.
+ * @nr_stream_fds: number of file descriptors in array.
*
* Holds cpu hotplug.
* Returns NULL on failure.
void *buf_addr, size_t subbuf_size,
size_t num_subbuf, unsigned int switch_timer_interval,
unsigned int read_timer_interval,
- const char *shm_path)
+ const int *stream_fds, int nr_stream_fds)
{
int ret;
size_t shmsize, chansize;
else
nr_streams = 1;
+ if (nr_stream_fds != nr_streams)
+ return NULL;
+
if (lib_ring_buffer_check_config(config, switch_timer_interval,
read_timer_interval))
return NULL;
/* Allocate normal memory for channel (not shared) */
shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM,
- NULL);
+ -1);
if (!shmobj)
goto error_append;
/* struct channel is at object 0, offset 0 (hardcoded) */
ret = channel_backend_init(&chan->backend, name, config,
subbuf_size, num_subbuf, handle,
- shm_path);
+ stream_fds);
if (ret)
goto error_backend_init;
return table;
}
-static
-int create_posix_shm(void)
-{
- char tmp_name[NAME_MAX] = "/ust-shm-tmp-XXXXXX";
- int shmfd, ret;
-
- /*
- * Allocate shm, and immediately unlink its shm oject, keeping
- * only the file descriptor as a reference to the object. If it
- * already exists (caused by short race window during which the
- * global object exists in a concurrent shm_open), simply retry.
- * We specifically do _not_ use the / at the beginning of the
- * pathname so that some OS implementations can keep it local to
- * the process (POSIX leaves this implementation-defined).
- */
- do {
- /*
- * Using mktemp filename with O_CREAT | O_EXCL open
- * flags.
- */
- (void) mktemp(tmp_name);
- if (tmp_name[0] == '\0') {
- PERROR("mktemp");
- goto error_shm_open;
- }
- shmfd = shm_open(tmp_name,
- O_CREAT | O_EXCL | O_RDWR, 0700);
- } while (shmfd < 0 && (errno == EEXIST || errno == EACCES));
- if (shmfd < 0) {
- PERROR("shm_open");
- goto error_shm_open;
- }
- ret = shm_unlink(tmp_name);
- if (ret < 0 && errno != ENOENT) {
- PERROR("shm_unlink");
- goto error_shm_release;
- }
- return shmfd;
-
-error_shm_release:
- ret = close(shmfd);
- if (ret) {
- PERROR("close");
- assert(0);
- }
-error_shm_open:
- return -1;
-}
-
-static
-int create_shared_file(const char *shm_path)
-{
- return open(shm_path, O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
-}
-
static
struct shm_object *_shm_object_table_alloc_shm(struct shm_object_table *table,
size_t memory_map_size,
- const char *shm_path)
+ int stream_fd)
{
- int shmfd, waitfd[2], ret, i, sigblocked = 0;
+ int shmfd, waitfd[2], ret, i;
struct shm_object *obj;
char *memory_map;
- sigset_t all_sigs, orig_sigs;
+ if (stream_fd < 0)
+ return NULL;
if (table->allocated_len >= table->size)
return NULL;
obj = &table->objects[table->allocated_len];
}
memcpy(obj->wait_fd, waitfd, sizeof(waitfd));
- /* shm_fd: create shm */
-
- /*
- * Theoretically, we could leak a shm if the application crashes
- * between open and unlink. Disable signals on this thread for
- * increased safety against this scenario.
- */
- sigfillset(&all_sigs);
- ret = pthread_sigmask(SIG_BLOCK, &all_sigs, &orig_sigs);
- if (ret == -1) {
- PERROR("pthread_sigmask");
- goto error_pthread_sigmask;
- }
- sigblocked = 1;
-
+ /* create shm */
- if (!shm_path) {
- obj->shm_path[0] = '\0';
- shmfd = create_posix_shm();
- } else {
- strncpy(obj->shm_path, shm_path,
- sizeof(obj->shm_path));
- obj->shm_path[sizeof(obj->shm_path) - 1] = '\0';
-
- /* Path should already exist, but could fail. */
- shmfd = create_shared_file(shm_path);
- }
- if (shmfd < 0)
- goto error_shm_open;
-
- sigblocked = 0;
- ret = pthread_sigmask(SIG_SETMASK, &orig_sigs, NULL);
- if (ret == -1) {
- PERROR("pthread_sigmask");
- goto error_sigmask_release;
- }
+ shmfd = stream_fd;
ret = zero_file(shmfd, memory_map_size);
if (ret) {
PERROR("zero_file");
PERROR("ftruncate");
goto error_ftruncate;
}
+ obj->shm_fd_ownership = 0;
obj->shm_fd = shmfd;
/* memory_map: mmap */
error_mmap:
error_ftruncate:
error_zero_file:
-error_sigmask_release:
- ret = close(shmfd);
- if (ret) {
- PERROR("close");
- assert(0);
- }
- if (shm_path) {
- ret = unlink(shm_path);
- if (ret) {
- PERROR("ret");
- }
- }
-error_shm_open:
- if (sigblocked) {
- ret = pthread_sigmask(SIG_SETMASK, &orig_sigs, NULL);
- if (ret == -1) {
- PERROR("pthread_sigmask");
- }
- }
-error_pthread_sigmask:
error_fcntl:
for (i = 0; i < 2; i++) {
ret = close(waitfd[i]);
/* no shm_fd */
obj->shm_fd = -1;
+ obj->shm_fd_ownership = 0;
obj->type = SHM_OBJECT_MEM;
obj->memory_map = memory_map;
struct shm_object *shm_object_table_alloc(struct shm_object_table *table,
size_t memory_map_size,
enum shm_object_type type,
- const char *shm_path)
+ int stream_fd)
{
switch (type) {
case SHM_OBJECT_SHM:
return _shm_object_table_alloc_shm(table, memory_map_size,
- shm_path);
+ stream_fd);
case SHM_OBJECT_MEM:
return _shm_object_table_alloc_mem(table, memory_map_size);
default:
obj->wait_fd[0] = -1; /* read end is unset */
obj->wait_fd[1] = wakeup_fd;
obj->shm_fd = shm_fd;
+ obj->shm_fd_ownership = 1;
ret = fcntl(obj->wait_fd[1], F_SETFD, FD_CLOEXEC);
if (ret < 0) {
obj->wait_fd[0] = -1; /* read end is unset */
obj->wait_fd[1] = wakeup_fd;
obj->shm_fd = -1;
+ obj->shm_fd_ownership = 0;
ret = fcntl(obj->wait_fd[1], F_SETFD, FD_CLOEXEC);
if (ret < 0) {
PERROR("umnmap");
assert(0);
}
- ret = close(obj->shm_fd);
- if (ret) {
- PERROR("close");
- assert(0);
- }
- if (obj->shm_path[0]) {
- ret = unlink(obj->shm_path);
+ if (obj->shm_fd_ownership) {
+ ret = close(obj->shm_fd);
if (ret) {
- PERROR("ret");
+ PERROR("close");
+ assert(0);
}
}
for (i = 0; i < 2; i++) {
struct shm_object *shm_object_table_alloc(struct shm_object_table *table,
size_t memory_map_size,
enum shm_object_type type,
- const char *shm_path);
+ const int stream_fd);
struct shm_object *shm_object_table_append_shm(struct shm_object_table *table,
int shm_fd, int wakeup_fd, uint32_t stream_nr,
size_t memory_map_size);
char *memory_map;
size_t memory_map_size;
uint64_t allocated_len;
- char shm_path[PATH_MAX];
+ int shm_fd_ownership;
};
struct shm_object_table {