Cleanup code and rename variables
authorDavid Goulet <dgoulet@efficios.com>
Tue, 16 Oct 2012 15:03:16 +0000 (11:03 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 16:49:01 +0000 (12:49 -0400)
No behavior altered. Only code cleanup and renaming of the consumer poll
pipe to "consumer_data_pipe".

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 35df86dd81c922217a6dbf628af93fd79d5b3432..fb0d7d05afb182992c97c789dcdd54544bb2fcea 100644 (file)
@@ -281,16 +281,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        iter.iter.node = &stream->node.node;
        ret = lttng_ht_del(ht, &iter);
        assert(!ret);
-
        rcu_read_unlock();
 
-       if (consumer_data.stream_count <= 0) {
-               goto end;
-       }
+       assert(consumer_data.stream_count > 0);
        consumer_data.stream_count--;
-       if (!stream) {
-               goto end;
-       }
+
        if (stream->out_fd >= 0) {
                ret = close(stream->out_fd);
                if (ret) {
@@ -820,10 +815,10 @@ static int consumer_update_poll_array(
        rcu_read_unlock();
 
        /*
-        * Insert the consumer_poll_pipe at the end of the array and don't
+        * Insert the consumer_data_pipe at the end of the array and don't
         * increment i so nb_fd is the number of real FD.
         */
-       (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+       (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
        (*pollfd)[i].events = POLLIN | POLLPRI;
        return i;
 }
@@ -1020,21 +1015,21 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        ctx->on_recv_stream = recv_stream;
        ctx->on_update_stream = update_stream;
 
-       ret = pipe(ctx->consumer_poll_pipe);
+       ret = pipe(ctx->consumer_data_pipe);
        if (ret < 0) {
                PERROR("Error creating poll pipe");
                goto error_poll_pipe;
        }
 
        /* set read end of the pipe to non-blocking */
-       ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+       ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
        if (ret < 0) {
                PERROR("fcntl O_NONBLOCK");
                goto error_poll_fcntl;
        }
 
        /* set write end of the pipe to non-blocking */
-       ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+       ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
        if (ret < 0) {
                PERROR("fcntl O_NONBLOCK");
                goto error_poll_fcntl;
@@ -1082,7 +1077,7 @@ error_quit_pipe:
        for (i = 0; i < 2; i++) {
                int err;
 
-               err = close(ctx->consumer_poll_pipe[i]);
+               err = close(ctx->consumer_data_pipe[i]);
                if (err) {
                        PERROR("close");
                }
@@ -1112,11 +1107,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        if (ret) {
                PERROR("close");
        }
-       ret = close(ctx->consumer_poll_pipe[0]);
+       ret = close(ctx->consumer_data_pipe[0]);
        if (ret) {
                PERROR("close");
        }
-       ret = close(ctx->consumer_poll_pipe[1]);
+       ret = close(ctx->consumer_data_pipe[1]);
        if (ret) {
                PERROR("close");
        }
@@ -1981,7 +1976,7 @@ void *consumer_thread_data_poll(void *data)
                                local_stream = NULL;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_poll_pipe */
+                       /* allocate for all fds + 1 for the consumer_data_pipe */
                        pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
@@ -1989,7 +1984,7 @@ void *consumer_thread_data_poll(void *data)
                                goto end;
                        }
 
-                       /* allocate for all fds + 1 for the consumer_poll_pipe */
+                       /* allocate for all fds + 1 for the consumer_data_pipe */
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
                                        sizeof(struct lttng_consumer_stream));
                        if (local_stream == NULL) {
@@ -2035,17 +2030,17 @@ void *consumer_thread_data_poll(void *data)
                }
 
                /*
-                * If the consumer_poll_pipe triggered poll go directly to the
+                * If the consumer_data_pipe triggered poll go directly to the
                 * beginning of the loop to update the array. We want to prioritize
                 * array update over low-priority reads.
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
                        size_t pipe_readlen;
 
-                       DBG("consumer_poll_pipe wake up");
+                       DBG("consumer_data_pipe wake up");
                        /* Consume 1 byte of pipe data */
                        do {
-                               pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+                               pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
                                                sizeof(new_stream));
                        } while (pipe_readlen == -1 && errno == EINTR);
 
@@ -2296,7 +2291,7 @@ end:
        do {
                struct lttng_consumer_stream *null_stream = NULL;
 
-               ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+               ret = write(ctx->consumer_data_pipe[1], &null_stream,
                                sizeof(null_stream));
        } while (ret < 0 && errno == EINTR);
 
index 6bce96d96946abf263b9bad0bff0d6ff98d8dc49..058bd695b9f6ee41c317e204f97d632c95efe294 100644 (file)
@@ -230,8 +230,8 @@ struct lttng_consumer_local_data {
        /* communication with splice */
        int consumer_thread_pipe[2];
        int consumer_splice_metadata_pipe[2];
-       /* pipe to wake the poll thread when necessary */
-       int consumer_poll_pipe[2];
+       /* Data stream poll thread pipe. To transfer data stream to the thread */
+       int consumer_data_pipe[2];
        /* to let the signal handler wake up the fd receiver thread */
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
index 1d725c2318b74029feb878289cfdfde55fa082fd..c762934ff92a55569b9323852589157e3e4ceb81 100644 (file)
@@ -138,7 +138,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        }
        case LTTNG_CONSUMER_ADD_STREAM:
        {
-               int fd;
+               int fd, stream_pipe;
                struct consumer_relayd_sock_pair *relayd = NULL;
                struct lttng_consumer_stream *new_stream;
                int alloc_ret = 0;
@@ -224,30 +224,26 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
-               /* Send stream to the metadata thread */
+               /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       do {
-                               ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
-                                               sizeof(new_stream));
-                       } while (ret < 0 && errno == EINTR);
-                       if (ret < 0) {
-                               PERROR("write metadata pipe");
-                               consumer_del_stream(new_stream, NULL);
-                               goto end_nosignal;
-                       }
+                       stream_pipe = ctx->consumer_metadata_pipe[1];
                } else {
-                       do {
-                               ret = write(ctx->consumer_poll_pipe[1], &new_stream,
-                                               sizeof(new_stream));
-                       } while (ret < 0 && errno == EINTR);
-                       if (ret < 0) {
-                               PERROR("write data pipe");
-                               consumer_del_stream(new_stream, NULL);
-                               goto end_nosignal;
-                       }
+                       stream_pipe = ctx->consumer_data_pipe[1];
+               }
+
+               do {
+                       ret = write(stream_pipe, &new_stream, sizeof(new_stream));
+               } while (ret < 0 && errno == EINTR);
+               if (ret < 0) {
+                       PERROR("Consumer write %s stream to pipe %d",
+                                       new_stream->metadata_flag ? "metadata" : "data",
+                                       stream_pipe);
+                       consumer_del_stream(new_stream, NULL);
+                       goto end_nosignal;
                }
 
-               DBG("Kernel consumer_add_stream (%d)", fd);
+               DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
+                               msg.u.stream.path_name, fd, new_stream->relayd_stream_id);
                break;
        }
        case LTTNG_CONSUMER_UPDATE_STREAM:
index 718887971abb7b280565613b0b08643e8232304b..5886d89b598cab2f4d54aad2868e3172343eaac4 100644 (file)
@@ -171,7 +171,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        case LTTNG_CONSUMER_ADD_STREAM:
        {
                struct lttng_consumer_stream *new_stream;
-               int fds[2];
+               int fds[2], stream_pipe;
                size_t nb_fd = 2;
                struct consumer_relayd_sock_pair *relayd = NULL;
                int alloc_ret = 0;
@@ -253,30 +253,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
-               /* Send stream to the metadata thread */
+               /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       do {
-                               ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
-                                               sizeof(new_stream));
-                       } while (ret < 0 && errno == EINTR);
-                       if (ret < 0) {
-                               PERROR("write metadata pipe");
-                               consumer_del_metadata_stream(new_stream, NULL);
-                               goto end_nosignal;
-                       }
+                       stream_pipe = ctx->consumer_metadata_pipe[1];
                } else {
-                       do {
-                               ret = write(ctx->consumer_poll_pipe[1], &new_stream,
-                                               sizeof(new_stream));
-                       } while (ret < 0 && errno == EINTR);
-                       if (ret < 0) {
-                               PERROR("write data pipe");
-                               consumer_del_stream(new_stream, NULL);
-                               goto end_nosignal;
-                       }
+                       stream_pipe = ctx->consumer_data_pipe[1];
+               }
+
+               do {
+                       ret = write(stream_pipe, &new_stream, sizeof(new_stream));
+               } while (ret < 0 && errno == EINTR);
+               if (ret < 0) {
+                       PERROR("Consumer write %s stream to pipe %d",
+                                       new_stream->metadata_flag ? "metadata" : "data",
+                                       stream_pipe);
+                       consumer_del_stream(new_stream, NULL);
+                       goto end_nosignal;
                }
 
-               DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
+               DBG("UST consumer ADD_STREAM %s (%d,%d) with relayd id %" PRIu64,
                                msg.u.stream.path_name, fds[0], fds[1],
                                new_stream->relayd_stream_id);
                break;
This page took 0.038575 seconds and 4 git commands to generate.