Cygwin: Fix handling of wait pipe hangup by properly detecting EOF
authorChristian Babeux <christian.babeux@efficios.com>
Mon, 3 Dec 2012 02:09:39 +0000 (21:09 -0500)
committerChristian Babeux <christian.babeux@efficios.com>
Fri, 7 Dec 2012 20:17:55 +0000 (15:17 -0500)
On Linux, the POLLHUP poll(3) event is used to signal that the other end
of a pipe has been disconnected. Due to poor wording in the Single UNIX
Specification, differents UNIX implementation signal the EOF with
conflicting poll events [1].

This is the case on Cygwin. A pipe close sends the POLLIN poll(3) event.
The actual consumer implementation sees this has a wakeup for data
ready to be consumed. The current hangup handling leads to infinite looping
in the consumer because the hangup is never detected and the POLLIN event
is never cleared.

To fix this issue, the consumer must read on the pipe, check for EOF
(read(3) shall return 0 to indicate EOF) and proceed to force the POLLHUP
poll(3) event if this is indeed the case.

[1] - http://www.greenend.org.uk/rjk/tech/poll.html

Signed-off-by: Christian Babeux <christian.babeux@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index 024ee17853c03c053dfa0ae4382a7e3763346b1f..c511494356369f59727467d835ef08e82bc99a19 100644 (file)
@@ -1046,30 +1046,45 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        continue;
                }
 
-               /* Take care of high priority channels first. */
+
+               /* Check if each pipe has data. hack for cygwin. */
                for (i = 0; i < nb_fd; i++) {
-                       if (pollfd[i].revents & POLLPRI) {
-                               ssize_t len;
+                       if ((pollfd[i].revents & POLLIN) ||
+                                       local_stream[i]->hangup_flush_done) {
+                               int check_ret;
 
-                               DBG("Urgent read on fd %d", pollfd[i].fd);
-                               high_prio = 1;
-                               len = ctx->on_buffer_ready(local_stream[i], ctx);
-                               /* it's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN) {
-                                       goto end;
-                               } else if (len > 0) {
-                                       local_stream[i]->data_read = 1;
+                               check_ret = lttng_consumer_check_pipe(local_stream[i], ctx);
+                               if (check_ret != 0) {
+                                       pollfd[i].revents |= POLLHUP;
                                }
                        }
                }
 
-               /*
-                * If we read high prio channel in this loop, try again
-                * for more high prio data.
-                */
-               if (high_prio) {
-                       continue;
-               }
+               /* Take care of high priority channels first. */
+               /* for (i = 0; i < nb_fd; i++) { */
+               /*      DBG("!!! POLL FLAGS: %d", pollfd[i].revents); */
+               /*      if (pollfd[i].revents & POLLPRI) { */
+               /*              ssize_t len; */
+
+               /*              DBG("Urgent read on fd %d", pollfd[i].fd); */
+               /*              high_prio = 1; */
+               /*              len = ctx->on_buffer_ready(local_stream[i], ctx); */
+               /*              /\* it's ok to have an unavailable sub-buffer *\/ */
+               /*              if (len < 0 && len != -EAGAIN) { */
+               /*                      goto end; */
+               /*              } else if (len > 0) { */
+               /*                      local_stream[i]->data_read = 1; */
+               /*              } */
+               /*      } */
+               /* } */
+
+               /* /\* */
+               /*  * If we read high prio channel in this loop, try again */
+               /*  * for more high prio data. */
+               /*  *\/ */
+               /* if (high_prio) { */
+               /*      continue; */
+               /* } */
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
@@ -1285,6 +1300,23 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 }
 
+int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               assert(0);
+               return -ENOSYS;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               return lttng_ustconsumer_check_pipe(stream, ctx);
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               return -ENOSYS;
+       }
+}
+
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
 {
        switch (consumer_data.type) {
index 6ac781605c7284ecf7319e2b10fa0c3920b441b3..6953c0b38bc7c9dc2be9564889de5fb66f9bc522 100644 (file)
@@ -314,5 +314,7 @@ extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
+int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx);
 
 #endif /* _LTTNG_CONSUMER_H */
index 97b890bf25340ba885f65a5c394d74de187126dd..9d624966bf6ae76f942401c5c9b8b1205bdaef54 100644 (file)
@@ -414,6 +414,31 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
 }
 
 
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       ssize_t readlen;
+       char dummy;
+
+       DBG("In check_pipe (wait_fd: %d, stream key: %d)\n",
+               stream->wait_fd, stream->key);
+
+       /* We consume the 1 byte written into the wait_fd by UST */
+       if (!stream->hangup_flush_done) {
+               do {
+                       readlen = read(stream->wait_fd, &dummy, 1);
+               } while (readlen == -1 && errno == EINTR);
+               if (readlen == -1) {
+                       return -1;      /* error */
+               }
+               DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
+               if (readlen == 0)
+                       return 1;       /* POLLHUP */
+       }
+       return 0;       /* no error nor HUP */
+
+}
+
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
@@ -437,6 +462,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ret = readlen;
                        goto end;
                }
+               DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
        }
 
        buf = stream->buf;
index c07377f8ea38711cfbd72951982ff173640ff683..044a10d823075aa324e29bfc7b1be40428bd9d3c 100644 (file)
@@ -72,6 +72,9 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
 
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx);
+
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
 static inline
@@ -153,6 +156,13 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
 {
 }
 
+static inline
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       return -ENOSYS;
+}
+
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTTNG_USTCONSUMER_H */
This page took 0.030472 seconds and 4 git commands to generate.