From 50f8ae690312d8f824fb9c9875b0a07f4a2547b6 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 16 Oct 2012 11:03:16 -0400 Subject: [PATCH] Cleanup code and rename variables No behavior altered. Only code cleanup and renaming of the consumer poll pipe to "consumer_data_pipe". Signed-off-by: David Goulet --- src/common/consumer.c | 37 +++++++++---------- src/common/consumer.h | 4 +-- src/common/kernel-consumer/kernel-consumer.c | 38 +++++++++----------- src/common/ust-consumer/ust-consumer.c | 37 +++++++++---------- 4 files changed, 51 insertions(+), 65 deletions(-) diff --git a/src/common/consumer.c b/src/common/consumer.c index 35df86dd8..fb0d7d05a 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -281,16 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream, iter.iter.node = &stream->node.node; ret = lttng_ht_del(ht, &iter); assert(!ret); - rcu_read_unlock(); - if (consumer_data.stream_count <= 0) { - goto end; - } + assert(consumer_data.stream_count > 0); consumer_data.stream_count--; - if (!stream) { - goto end; - } + if (stream->out_fd >= 0) { ret = close(stream->out_fd); if (ret) { @@ -820,10 +815,10 @@ static int consumer_update_poll_array( rcu_read_unlock(); /* - * Insert the consumer_poll_pipe at the end of the array and don't + * Insert the consumer_data_pipe at the end of the array and don't * increment i so nb_fd is the number of real FD. */ - (*pollfd)[i].fd = ctx->consumer_poll_pipe[0]; + (*pollfd)[i].fd = ctx->consumer_data_pipe[0]; (*pollfd)[i].events = POLLIN | POLLPRI; return i; } @@ -1020,21 +1015,21 @@ struct lttng_consumer_local_data *lttng_consumer_create( ctx->on_recv_stream = recv_stream; ctx->on_update_stream = update_stream; - ret = pipe(ctx->consumer_poll_pipe); + ret = pipe(ctx->consumer_data_pipe); if (ret < 0) { PERROR("Error creating poll pipe"); goto error_poll_pipe; } /* set read end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK); + ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK); if (ret < 0) { PERROR("fcntl O_NONBLOCK"); goto error_poll_fcntl; } /* set write end of the pipe to non-blocking */ - ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK); + ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK); if (ret < 0) { PERROR("fcntl O_NONBLOCK"); goto error_poll_fcntl; @@ -1082,7 +1077,7 @@ error_quit_pipe: for (i = 0; i < 2; i++) { int err; - err = close(ctx->consumer_poll_pipe[i]); + err = close(ctx->consumer_data_pipe[i]); if (err) { PERROR("close"); } @@ -1112,11 +1107,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - ret = close(ctx->consumer_poll_pipe[0]); + ret = close(ctx->consumer_data_pipe[0]); if (ret) { PERROR("close"); } - ret = close(ctx->consumer_poll_pipe[1]); + ret = close(ctx->consumer_data_pipe[1]); if (ret) { PERROR("close"); } @@ -1981,7 +1976,7 @@ void *consumer_thread_data_poll(void *data) local_stream = NULL; } - /* allocate for all fds + 1 for the consumer_poll_pipe */ + /* allocate for all fds + 1 for the consumer_data_pipe */ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); @@ -1989,7 +1984,7 @@ void *consumer_thread_data_poll(void *data) goto end; } - /* allocate for all fds + 1 for the consumer_poll_pipe */ + /* allocate for all fds + 1 for the consumer_data_pipe */ local_stream = zmalloc((consumer_data.stream_count + 1) * sizeof(struct lttng_consumer_stream)); if (local_stream == NULL) { @@ -2035,17 +2030,17 @@ void *consumer_thread_data_poll(void *data) } /* - * If the consumer_poll_pipe triggered poll go directly to the + * If the consumer_data_pipe triggered poll go directly to the * beginning of the loop to update the array. We want to prioritize * array update over low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; - DBG("consumer_poll_pipe wake up"); + DBG("consumer_data_pipe wake up"); /* Consume 1 byte of pipe data */ do { - pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream, + pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream, sizeof(new_stream)); } while (pipe_readlen == -1 && errno == EINTR); @@ -2296,7 +2291,7 @@ end: do { struct lttng_consumer_stream *null_stream = NULL; - ret = write(ctx->consumer_poll_pipe[1], &null_stream, + ret = write(ctx->consumer_data_pipe[1], &null_stream, sizeof(null_stream)); } while (ret < 0 && errno == EINTR); diff --git a/src/common/consumer.h b/src/common/consumer.h index 6bce96d96..058bd695b 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -230,8 +230,8 @@ struct lttng_consumer_local_data { /* communication with splice */ int consumer_thread_pipe[2]; int consumer_splice_metadata_pipe[2]; - /* pipe to wake the poll thread when necessary */ - int consumer_poll_pipe[2]; + /* Data stream poll thread pipe. To transfer data stream to the thread */ + int consumer_data_pipe[2]; /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 1d725c231..c762934ff 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -138,7 +138,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ADD_STREAM: { - int fd; + int fd, stream_pipe; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; int alloc_ret = 0; @@ -224,30 +224,26 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } - /* Send stream to the metadata thread */ + /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - do { - ret = write(ctx->consumer_metadata_pipe[1], &new_stream, - sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata pipe"); - consumer_del_stream(new_stream, NULL); - goto end_nosignal; - } + stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - do { - ret = write(ctx->consumer_poll_pipe[1], &new_stream, - sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write data pipe"); - consumer_del_stream(new_stream, NULL); - goto end_nosignal; - } + stream_pipe = ctx->consumer_data_pipe[1]; + } + + do { + ret = write(stream_pipe, &new_stream, sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + stream_pipe); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - DBG("Kernel consumer_add_stream (%d)", fd); + DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, + msg.u.stream.path_name, fd, new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 718887971..5886d89b5 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -171,7 +171,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_STREAM: { struct lttng_consumer_stream *new_stream; - int fds[2]; + int fds[2], stream_pipe; size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; int alloc_ret = 0; @@ -253,30 +253,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } - /* Send stream to the metadata thread */ + /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - do { - ret = write(ctx->consumer_metadata_pipe[1], &new_stream, - sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write metadata pipe"); - consumer_del_metadata_stream(new_stream, NULL); - goto end_nosignal; - } + stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - do { - ret = write(ctx->consumer_poll_pipe[1], &new_stream, - sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - PERROR("write data pipe"); - consumer_del_stream(new_stream, NULL); - goto end_nosignal; - } + stream_pipe = ctx->consumer_data_pipe[1]; + } + + do { + ret = write(stream_pipe, &new_stream, sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + stream_pipe); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64, + DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64, msg.u.stream.path_name, fds[0], fds[1], new_stream->relayd_stream_id); break; -- 2.34.1