Fix: consumer fd recv thread should write into non-blocking pipe
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 12 Apr 2012 01:57:31 +0000 (21:57 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Mon, 16 Apr 2012 15:27:27 +0000 (11:27 -0400)
Writing into a blocking pipe will cause the writer thread to block on
the poll fds thread when the pipe is full. Given that we would like to
batch stream array reallocation as much as possible, this wakeup should
not block.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer.c
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 2604f3d060574c1c7677047fcc16581a5c54818d..ee5575262ec5667d448007a8ab82eec0607e19c9 100644 (file)
@@ -736,6 +736,20 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                goto error_poll_pipe;
        }
 
+       /* set read end of the pipe to non-blocking */
+       ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto error_poll_fcntl;
+       }
+
+       /* set write end of the pipe to non-blocking */
+       ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("fcntl O_NONBLOCK");
+               goto error_poll_fcntl;
+       }
+
        ret = pipe(ctx->consumer_should_quit);
        if (ret < 0) {
                perror("Error creating recv pipe");
@@ -760,6 +774,7 @@ error_thread_pipe:
                        PERROR("close");
                }
        }
+error_poll_fcntl:
 error_quit_pipe:
        for (i = 0; i < 2; i++) {
                int err;
@@ -933,8 +948,6 @@ void *lttng_consumer_thread_poll_fds(void *data)
        struct lttng_consumer_stream **local_stream = NULL;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
-       char tmp;
-       int tmp2;
        struct lttng_consumer_local_data *ctx = data;
 
        rcu_register_thread();
@@ -1019,11 +1032,14 @@ void *lttng_consumer_thread_poll_fds(void *data)
                 * low-priority reads.
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
+                       size_t pipe_readlen;
+                       char tmp;
+
                        DBG("consumer_poll_pipe wake up");
-                       tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
-                       if (tmp2 < 0) {
-                               perror("read consumer poll");
-                       }
+                       /* Consume 1 byte of pipe data */
+                       do {
+                               pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+                       } while (pipe_readlen == -1 && errno == EINTR);
                        continue;
                }
 
@@ -1228,11 +1244,20 @@ end:
         */
        consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
 
-       /* wake up the polling thread */
-       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
-       if (ret < 0) {
-               perror("poll pipe write");
-       }
+       /*
+        * Wake-up the other end by writing a null byte in the pipe
+        * (non-blocking). Important note: Because writing into the
+        * pipe is non-blocking (and therefore we allow dropping wakeup
+        * data, as long as there is wakeup data present in the pipe
+        * buffer to wake up the other end), the other end should
+        * perform the following sequence for waiting:
+        * 1) empty the pipe (reads).
+        * 2) perform update operation.
+        * 3) wait on the pipe (poll).
+        */
+       do {
+               ret = write(ctx->consumer_poll_pipe[1], "", 1);
+       } while (ret == -1UL && errno == EINTR);
        rcu_unregister_thread();
        return NULL;
 }
index 9abee1de8ad5d3bb01efdf2b04ada6270811de18..bbc31f8e3be14bdb3927c87a1aabb40e28e777b5 100644 (file)
@@ -295,11 +295,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                break;
        }
 end:
-       /* signal the poll thread */
-       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
-       if (ret < 0) {
-               perror("write consumer poll");
-       }
+       /*
+        * Wake-up the other end by writing a null byte in the pipe
+        * (non-blocking). Important note: Because writing into the
+        * pipe is non-blocking (and therefore we allow dropping wakeup
+        * data, as long as there is wakeup data present in the pipe
+        * buffer to wake up the other end), the other end should
+        * perform the following sequence for waiting:
+        * 1) empty the pipe (reads).
+        * 2) perform update operation.
+        * 3) wait on the pipe (poll).
+        */
+       do {
+               ret = write(ctx->consumer_poll_pipe[1], "", 1);
+       } while (ret == -1UL && errno == EINTR);
 end_nosignal:
        return 0;
 }
index bfdb7e968fe9aa77ba9ad922e8fb6bf8748f4353..2b55fd4637ead2b056fcafecc47ef5504d7c5df5 100644 (file)
@@ -257,11 +257,20 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                break;
        }
 end:
-       /* signal the poll thread */
-       ret = write(ctx->consumer_poll_pipe[1], "4", 1);
-       if (ret < 0) {
-               PERROR("write consumer poll");
-       }
+       /*
+        * Wake-up the other end by writing a null byte in the pipe
+        * (non-blocking). Important note: Because writing into the
+        * pipe is non-blocking (and therefore we allow dropping wakeup
+        * data, as long as there is wakeup data present in the pipe
+        * buffer to wake up the other end), the other end should
+        * perform the following sequence for waiting:
+        * 1) empty the pipe (reads).
+        * 2) perform update operation.
+        * 3) wait on the pipe (poll).
+        */
+       do {
+               ret = write(ctx->consumer_poll_pipe[1], "", 1);
+       } while (ret == -1UL && errno == EINTR);
 end_nosignal:
        return 0;
 }
This page took 0.029537 seconds and 4 git commands to generate.