iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
-
rcu_read_unlock();
- if (consumer_data.stream_count <= 0) {
- goto end;
- }
+ assert(consumer_data.stream_count > 0);
consumer_data.stream_count--;
- if (!stream) {
- goto end;
- }
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
rcu_read_unlock();
/*
- * Insert the consumer_poll_pipe at the end of the array and don't
+ * Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+ (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_poll_pipe);
+ ret = pipe(ctx->consumer_data_pipe);
if (ret < 0) {
PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
for (i = 0; i < 2; i++) {
int err;
- err = close(ctx->consumer_poll_pipe[i]);
+ err = close(ctx->consumer_data_pipe[i]);
if (err) {
PERROR("close");
}
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[0]);
+ ret = close(ctx->consumer_data_pipe[0]);
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[1]);
+ ret = close(ctx->consumer_data_pipe[1]);
if (ret) {
PERROR("close");
}
local_stream = NULL;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
goto end;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
}
/*
- * If the consumer_poll_pipe triggered poll go directly to the
+ * If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
- DBG("consumer_poll_pipe wake up");
+ DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+ pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
do {
struct lttng_consumer_stream *null_stream = NULL;
- ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+ ret = write(ctx->consumer_data_pipe[1], &null_stream,
sizeof(null_stream));
} while (ret < 0 && errno == EINTR);
/* communication with splice */
int consumer_thread_pipe[2];
int consumer_splice_metadata_pipe[2];
- /* pipe to wake the poll thread when necessary */
- int consumer_poll_pipe[2];
+ /* Data stream poll thread pipe. To transfer data stream to the thread */
+ int consumer_data_pipe[2];
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
}
case LTTNG_CONSUMER_ADD_STREAM:
{
- int fd;
+ int fd, stream_pipe;
struct consumer_relayd_sock_pair *relayd = NULL;
struct lttng_consumer_stream *new_stream;
int alloc_ret = 0;
}
}
- /* Send stream to the metadata thread */
+ /* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
- do {
- ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata pipe");
- consumer_del_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- do {
- ret = write(ctx->consumer_poll_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write data pipe");
- consumer_del_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_data_pipe[1];
+ }
+
+ do {
+ ret = write(stream_pipe, &new_stream, sizeof(new_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ new_stream->metadata_flag ? "metadata" : "data",
+ stream_pipe);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
}
- DBG("Kernel consumer_add_stream (%d)", fd);
+ DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
+ msg.u.stream.path_name, fd, new_stream->relayd_stream_id);
break;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
case LTTNG_CONSUMER_ADD_STREAM:
{
struct lttng_consumer_stream *new_stream;
- int fds[2];
+ int fds[2], stream_pipe;
size_t nb_fd = 2;
struct consumer_relayd_sock_pair *relayd = NULL;
int alloc_ret = 0;
}
}
- /* Send stream to the metadata thread */
+ /* Get the right pipe where the stream will be sent. */
if (new_stream->metadata_flag) {
- do {
- ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata pipe");
- consumer_del_metadata_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_metadata_pipe[1];
} else {
- do {
- ret = write(ctx->consumer_poll_pipe[1], &new_stream,
- sizeof(new_stream));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write data pipe");
- consumer_del_stream(new_stream, NULL);
- goto end_nosignal;
- }
+ stream_pipe = ctx->consumer_data_pipe[1];
+ }
+
+ do {
+ ret = write(stream_pipe, &new_stream, sizeof(new_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("Consumer write %s stream to pipe %d",
+ new_stream->metadata_flag ? "metadata" : "data",
+ stream_pipe);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
}
- DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
+ DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64,
msg.u.stream.path_name, fds[0], fds[1],
new_stream->relayd_stream_id);
break;