Change consumer_metadata_pipe to be a lttng_pipe
authorDavid Goulet <dgoulet@efficios.com>
Tue, 14 May 2013 15:27:26 +0000 (11:27 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Wed, 22 May 2013 14:41:37 +0000 (10:41 -0400)
The read() call in the metadata thread is also changed to use the lttng
pipe read wrapper.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index d1b7ba29b22b91551f790fc2ed514f4b1508ba73..044a504cec512443039e15eac186df07e841f896 100644 (file)
@@ -76,21 +76,6 @@ volatile int consumer_quit;
 static struct lttng_ht *metadata_ht;
 static struct lttng_ht *data_ht;
 
-/*
- * Notify a thread pipe to poll back again. This usually means that some global
- * state has changed so we just send back the thread in a poll wait call.
- */
-static void notify_thread_pipe(int wpipe)
-{
-       int ret;
-
-       do {
-               struct lttng_consumer_stream *null_stream = NULL;
-
-               ret = write(wpipe, &null_stream, sizeof(null_stream));
-       } while (ret < 0 && errno == EINTR);
-}
-
 /*
  * Notify a thread lttng pipe to poll back again. This usually means that some
  * global state has changed so we just send back the thread in a poll wait
@@ -423,7 +408,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         */
        if (ctx) {
                notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
        }
 }
 
@@ -1203,8 +1188,8 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_channel_pipe;
        }
 
-       ret = utils_create_pipe(ctx->consumer_metadata_pipe);
-       if (ret < 0) {
+       ctx->consumer_metadata_pipe = lttng_pipe_open(0);
+       if (!ctx->consumer_metadata_pipe) {
                goto error_metadata_pipe;
        }
 
@@ -1216,7 +1201,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
        return ctx;
 
 error_splice_pipe:
-       utils_close_pipe(ctx->consumer_metadata_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
 error_metadata_pipe:
        utils_close_pipe(ctx->consumer_channel_pipe);
 error_channel_pipe:
@@ -1251,6 +1236,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
        utils_close_pipe(ctx->consumer_thread_pipe);
        utils_close_pipe(ctx->consumer_channel_pipe);
        lttng_pipe_destroy(ctx->consumer_data_pipe);
+       lttng_pipe_destroy(ctx->consumer_metadata_pipe);
        utils_close_pipe(ctx->consumer_should_quit);
        utils_close_pipe(ctx->consumer_splice_metadata_pipe);
 
@@ -2131,7 +2117,8 @@ void *consumer_thread_metadata_poll(void *data)
                goto end_poll;
        }
 
-       ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN);
+       ret = lttng_poll_add(&events,
+                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN);
        if (ret < 0) {
                goto end;
        }
@@ -2169,30 +2156,26 @@ restart:
                                continue;
                        }
 
-                       if (pollfd == ctx->consumer_metadata_pipe[0]) {
+                       if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
                                        /*
                                         * Remove the pipe from the poll set and continue the loop
                                         * since their might be data to consume.
                                         */
-                                       lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
-                                       ret = close(ctx->consumer_metadata_pipe[0]);
-                                       if (ret < 0) {
-                                               PERROR("close metadata pipe");
-                                       }
+                                       lttng_poll_del(&events,
+                                                       lttng_pipe_get_readfd(ctx->consumer_metadata_pipe));
+                                       lttng_pipe_read_close(ctx->consumer_metadata_pipe);
                                        continue;
                                } else if (revents & LPOLLIN) {
-                                       do {
-                                               /* Get the stream pointer received */
-                                               ret = read(pollfd, &stream, sizeof(stream));
-                                       } while (ret < 0 && errno == EINTR);
-                                       if (ret < 0 ||
-                                                       ret < sizeof(struct lttng_consumer_stream *)) {
-                                               PERROR("read metadata stream");
+                                       ssize_t pipe_len;
+
+                                       pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
+                                                       &stream, sizeof(stream));
+                                       if (pipe_len < 0) {
+                                               ERR("read metadata stream, ret: %ld", pipe_len);
                                                /*
-                                                * Let's continue here and hope we can still work
-                                                * without stopping the consumer. XXX: Should we?
+                                                * Continue here to handle the rest of the streams.
                                                 */
                                                continue;
                                        }
@@ -2540,10 +2523,7 @@ end:
         * only tracked fd in the poll set. The thread will take care of closing
         * the read side.
         */
-       ret = close(ctx->consumer_metadata_pipe[1]);
-       if (ret < 0) {
-               PERROR("close data pipe");
-       }
+       (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
 
        destroy_data_stream_ht(data_ht);
 
index 91039e8e956563c3f99d3de6f9cd0ca7dbb98251..3726fd1e6a2b19b3ca35623243524fd8e608fda9 100644 (file)
@@ -351,7 +351,7 @@ struct lttng_consumer_local_data {
        /* to let the signal handler wake up the fd receiver thread */
        int consumer_should_quit[2];
        /* Metadata poll thread pipe. Transfer metadata stream to it */
-       int consumer_metadata_pipe[2];
+       struct lttng_pipe *consumer_metadata_pipe;
 };
 
 /*
index d8aec492f774a4c3a2fb1b6bcc0bf3f64a587b3e..f23fc9c5711f18101e1b7b3fb6cd7245ea378cee 100644 (file)
@@ -288,7 +288,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
-                       stream_pipe = ctx->consumer_metadata_pipe[1];
+                       stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
                } else {
                        stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
                }
index 8bc69006b0e1f30744ea41524e95d7ed1af3de0b..3731cbb26b527c4deb1621de5bf3f84051007a20 100644 (file)
@@ -189,7 +189,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
 
        /* Get the right pipe where the stream will be sent. */
        if (stream->metadata_flag) {
-               stream_pipe = ctx->consumer_metadata_pipe[1];
+               stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe);
        } else {
                stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe);
        }
This page took 0.030197 seconds and 4 git commands to generate.