iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
-
rcu_read_unlock();
- if (consumer_data.stream_count <= 0) {
- goto end;
- }
+ assert(consumer_data.stream_count > 0);
consumer_data.stream_count--;
- if (!stream) {
- goto end;
- }
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
rcu_read_unlock();
/*
- * Insert the consumer_poll_pipe at the end of the array and don't
+ * Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+ (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_poll_pipe);
+ ret = pipe(ctx->consumer_data_pipe);
if (ret < 0) {
PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
for (i = 0; i < 2; i++) {
int err;
- err = close(ctx->consumer_poll_pipe[i]);
+ err = close(ctx->consumer_data_pipe[i]);
if (err) {
PERROR("close");
}
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[0]);
+ ret = close(ctx->consumer_data_pipe[0]);
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[1]);
+ ret = close(ctx->consumer_data_pipe[1]);
if (ret) {
PERROR("close");
}
local_stream = NULL;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
goto end;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
}
/*
- * If the consumer_poll_pipe triggered poll go directly to the
+ * If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
- DBG("consumer_poll_pipe wake up");
+ DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+ pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
do {
struct lttng_consumer_stream *null_stream = NULL;
- ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+ ret = write(ctx->consumer_data_pipe[1], &null_stream,
sizeof(null_stream));
} while (ret < 0 && errno == EINTR);