struct {
void *data;
enum lttng_ust_chan_type type;
+ int wakeup_fd;
} channel;
struct {
int shm_fd;
union ust_args {
struct {
void *chan_data;
+ int wakeup_fd;
} channel;
struct {
int shm_fd;
int ustctl_send_channel_to_sessiond(int sock,
struct ustctl_consumer_channel *channel);
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *consumer_chan);
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *consumer_chan);
int ustctl_write_metadata_to_channel(
struct ustctl_consumer_channel *channel,
struct ustctl_consumer_stream *stream);
int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream);
int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream);
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream);
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream);
/* Create/destroy stream buffers for read */
struct ustctl_consumer_stream *
int cpu);
void ustctl_destroy_stream(struct ustctl_consumer_stream *stream);
-int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream);
-int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream);
-
/* For mmap mode, readable without "get" operation */
int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream,
unsigned long *len);
int ustcomm_recv_fd(int sock);
ssize_t ustcomm_recv_channel_from_sessiond(int sock,
- void **chan_data, uint64_t len);
+ void **chan_data, uint64_t len, int *wakeup_fd);
int ustcomm_recv_stream_from_sessiond(int sock,
uint64_t *memory_map_size,
int *shm_fd, int *wakeup_fd);
* expected var_len.
*/
ssize_t ustcomm_recv_channel_from_sessiond(int sock,
- void **_chan_data, uint64_t var_len)
+ void **_chan_data, uint64_t var_len,
+ int *_wakeup_fd)
{
void *chan_data;
- ssize_t len;
+ ssize_t len, nr_fd;
+ int wakeup_fd;
if (var_len > LTTNG_UST_CHANNEL_DATA_MAX_LEN) {
len = -EINVAL;
if (len != var_len) {
goto error_recv;
}
+ /* recv wakeup fd */
+ nr_fd = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1);
+ if (nr_fd <= 0) {
+ if (nr_fd < 0) {
+ len = nr_fd;
+ goto error_recv;
+ } else {
+ len = -EIO;
+ goto error_recv;
+ }
+ }
+ *_wakeup_fd = wakeup_fd;
*_chan_data = chan_data;
return len;
/* initial attributes */
struct ustctl_consumer_channel_attr attr;
+ int wait_fd; /* monitor close() */
+ int wakeup_fd; /* monitor close() */
};
/*
switch (data->type) {
case LTTNG_UST_OBJECT_TYPE_CHANNEL:
+ if (data->u.channel.wakeup_fd >= 0) {
+ ret = close(data->u.channel.wakeup_fd);
+ if (ret < 0) {
+ ret = -errno;
+ return ret;
+ }
+ }
free(data->u.channel.data);
break;
case LTTNG_UST_OBJECT_TYPE_STREAM:
enum lttng_ust_chan_type type,
void *data,
uint64_t size,
+ int wakeup_fd,
int send_fd_only)
{
ssize_t len;
return -EIO;
}
+ /* Send wakeup fd */
+ len = ustcomm_send_fds_unix_sock(sock, &wakeup_fd, 1);
+ if (len <= 0) {
+ if (len < 0)
+ return len;
+ else
+ return -EIO;
+ }
return 0;
}
{
struct lttng_ust_object_data *channel_data;
ssize_t len;
+ int wakeup_fd;
int ret;
channel_data = zmalloc(sizeof(*channel_data));
ret = -EINVAL;
goto error_recv_data;
}
-
+ /* recv wakeup fd */
+ len = ustcomm_recv_fds_unix_sock(sock, &wakeup_fd, 1);
+ if (len <= 0) {
+ if (len < 0) {
+ ret = len;
+ goto error_recv_data;
+ } else {
+ ret = -EIO;
+ goto error_recv_data;
+ }
+ }
+ channel_data->u.channel.wakeup_fd = wakeup_fd;
*_channel_data = channel_data;
return 0;
channel_data->u.channel.type,
channel_data->u.channel.data,
channel_data->size,
+ channel_data->u.channel.wakeup_fd,
1);
if (ret)
return ret;
channel->attr.type,
table->objects[0].memory_map,
table->objects[0].memory_map_size,
+ channel->wakeup_fd,
0);
}
return ret;
}
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+
+ chan = consumer_chan->chan->chan;
+ return ring_buffer_channel_close_wait_fd(&chan->backend.config,
+ chan, chan->handle);
+}
+
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+
+ chan = consumer_chan->chan->chan;
+ return ring_buffer_channel_close_wakeup_fd(&chan->backend.config,
+ chan, chan->handle);
+}
+
int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream)
{
struct channel *chan;
chan = stream->chan->chan->chan;
- return ring_buffer_close_wait_fd(&chan->backend.config,
+ return ring_buffer_stream_close_wait_fd(&chan->backend.config,
chan, stream->handle, stream->cpu);
}
struct channel *chan;
chan = stream->chan->chan->chan;
- return ring_buffer_close_wakeup_fd(&chan->backend.config,
+ return ring_buffer_stream_close_wakeup_fd(&chan->backend.config,
chan, stream->handle, stream->cpu);
}
free(stream);
}
-int ustctl_get_wait_fd(struct ustctl_consumer_stream *stream)
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wait_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wakeup_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream)
{
struct lttng_ust_lib_ring_buffer *buf;
struct ustctl_consumer_channel *consumer_chan;
return shm_get_wait_fd(consumer_chan->chan->handle, &buf->self._ref);
}
-int ustctl_get_wakeup_fd(struct ustctl_consumer_stream *stream)
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream)
{
struct lttng_ust_lib_ring_buffer *buf;
struct ustctl_consumer_channel *consumer_chan;
struct channel *chan;
struct lttng_ust_lib_ring_buffer_config *config;
void *chan_data;
+ int wakeup_fd;
uint64_t len;
int ret;
enum lttng_ust_chan_type type;
chan_data = uargs->channel.chan_data;
+ wakeup_fd = uargs->channel.wakeup_fd;
len = ust_chan->len;
type = ust_chan->type;
case LTTNG_UST_CHAN_PER_CPU:
break;
default:
- return -EINVAL;
+ ret = -EINVAL;
+ goto invalid;
}
if (session->been_active) {
goto active; /* Refuse to add channel to active session */
}
- channel_handle = channel_handle_create(chan_data, len);
+ channel_handle = channel_handle_create(chan_data, len, wakeup_fd);
if (!channel_handle) {
ret = -EINVAL;
goto handle_error;
objd_ref(session_objd);
return chan_objd;
+ /* error path after channel was created */
objd_error:
notransport:
free(lttng_chan);
alloc_error:
channel_destroy(chan, channel_handle, 0);
+ return ret;
+
+ /*
+ * error path before channel creation (owning chan_data and
+ * wakeup_fd).
+ */
handle_error:
active:
+invalid:
+ {
+ int close_ret;
+
+ close_ret = close(wakeup_fd);
+ if (close_ret) {
+ PERROR("close");
+ }
+ }
+ free(chan_data);
return ret;
}
case LTTNG_UST_CHANNEL:
{
void *chan_data;
+ int wakeup_fd;
len = ustcomm_recv_channel_from_sessiond(sock,
- &chan_data, lum->u.channel.len);
+ &chan_data, lum->u.channel.len,
+ &wakeup_fd);
switch (len) {
case 0: /* orderly shutdown */
ret = 0;
}
}
args.channel.chan_data = chan_data;
+ args.channel.wakeup_fd = wakeup_fd;
if (ops->cmd)
ret = ops->cmd(lum->handle, lum->cmd,
(unsigned long) &lum->u,
int *wakeup_fd,
uint64_t *memory_map_size);
extern
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle);
+extern
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle);
+extern
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu);
extern
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu);
}
struct lttng_ust_shm_handle *channel_handle_create(void *data,
- uint64_t memory_map_size)
+ uint64_t memory_map_size,
+ int wakeup_fd)
{
struct lttng_ust_shm_handle *handle;
struct shm_object *object;
goto error_table_alloc;
/* Add channel object */
object = shm_object_table_append_mem(handle->table, data,
- memory_map_size);
+ memory_map_size, wakeup_fd);
if (!object)
goto error_table_object;
/* struct channel is at object 0, offset 0 (hardcoded) */
return shmp(handle, chan->backend.buf[cpu].shmp);
}
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct shm_ref *ref;
+
+ ref = &handle->chan._ref;
+ return shm_close_wait_fd(handle, ref);
+}
+
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct shm_ref *ref;
+
+ ref = &handle->chan._ref;
+ return shm_close_wakeup_fd(handle, ref);
+}
+
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu)
return shm_close_wait_fd(handle, ref);
}
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
struct channel *chan,
struct lttng_ust_shm_handle *handle,
int cpu)
{
struct shm_object *obj;
void *memory_map;
+ int waitfd[2], i, ret;
if (table->allocated_len >= table->size)
return NULL;
if (!memory_map)
goto alloc_error;
- obj->wait_fd[0] = -1;
- obj->wait_fd[1] = -1;
+ /* wait_fd: create pipe */
+ ret = pipe(waitfd);
+ if (ret < 0) {
+ PERROR("pipe");
+ goto error_pipe;
+ }
+ for (i = 0; i < 2; i++) {
+ ret = fcntl(waitfd[i], F_SETFD, FD_CLOEXEC);
+ if (ret < 0) {
+ PERROR("fcntl");
+ goto error_fcntl;
+ }
+ }
+ /* The write end of the pipe needs to be non-blocking */
+ ret = fcntl(waitfd[1], F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ PERROR("fcntl");
+ goto error_fcntl;
+ }
+ memcpy(obj->wait_fd, waitfd, sizeof(waitfd));
+
+ /* no shm_fd */
obj->shm_fd = -1;
obj->type = SHM_OBJECT_MEM;
return obj;
+error_fcntl:
+ for (i = 0; i < 2; i++) {
+ ret = close(waitfd[i]);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+ }
+error_pipe:
+ free(memory_map);
alloc_error:
return NULL;
}
* Passing ownership of mem to object.
*/
struct shm_object *shm_object_table_append_mem(struct shm_object_table *table,
- void *mem, size_t memory_map_size)
+ void *mem, size_t memory_map_size, int wakeup_fd)
{
struct shm_object *obj;
+ int ret;
if (table->allocated_len >= table->size)
return NULL;
obj = &table->objects[table->allocated_len];
- obj->wait_fd[0] = -1;
- obj->wait_fd[1] = -1;
+ obj->wait_fd[0] = -1; /* read end is unset */
+ obj->wait_fd[1] = wakeup_fd;
obj->shm_fd = -1;
+ ret = fcntl(obj->wait_fd[1], F_SETFD, FD_CLOEXEC);
+ if (ret < 0) {
+ PERROR("fcntl");
+ goto error_fcntl;
+ }
+ /* The write end of the pipe needs to be non-blocking */
+ ret = fcntl(obj->wait_fd[1], F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ PERROR("fcntl");
+ goto error_fcntl;
+ }
+
obj->type = SHM_OBJECT_MEM;
obj->memory_map = mem;
obj->memory_map_size = memory_map_size;
obj->index = table->allocated_len++;
return obj;
+
+error_fcntl:
+ return NULL;
}
static
break;
}
case SHM_OBJECT_MEM:
+ {
+ int ret, i;
+
+ for (i = 0; i < 2; i++) {
+ if (obj->wait_fd[i] < 0)
+ continue;
+ ret = close(obj->wait_fd[i]);
+ if (ret) {
+ PERROR("close");
+ assert(0);
+ }
+ }
free(obj->memory_map);
break;
+ }
default:
assert(0);
}
/* channel_handle_create - for UST. */
extern
struct lttng_ust_shm_handle *channel_handle_create(void *data,
- uint64_t memory_map_size);
+ uint64_t memory_map_size, int wakeup_fd);
/* channel_handle_add_stream - for UST. */
extern
int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
size_t memory_map_size);
/* mem ownership is passed to shm_object_table_append_mem(). */
struct shm_object *shm_object_table_append_mem(struct shm_object_table *table,
- void *mem, size_t memory_map_size);
+ void *mem, size_t memory_map_size, int wakeup_fd);
void shm_object_table_destroy(struct shm_object_table *table);
/*