Fix: make ust consumer posix compliant for poll flags
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 1 Mar 2012 03:30:17 +0000 (22:30 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 1 Mar 2012 03:30:17 +0000 (22:30 -0500)
poll flags can have both POLLHUP and POLLIN set, which is not expected
by the consumer. Do not depend on this implementation-specific behavior.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/kernel-consumer/kernel-consumer.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index 2e5ec5c35de08459d0b77b2d224f14ae90632bb3..ae59b6b602909a2752dc142d857dbe1fb503c703 100644 (file)
@@ -646,7 +646,7 @@ void lttng_consumer_sync_trace_file(
  */
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
-               int (*buffer_ready)(struct lttng_consumer_stream *stream,
+               ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
                        struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
@@ -734,7 +734,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
  *
  * Returns the number of bytes written
  */
-int lttng_consumer_on_read_subbuffer_mmap(
+ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
@@ -757,7 +757,7 @@ int lttng_consumer_on_read_subbuffer_mmap(
  *
  * Returns the number of bytes spliced.
  */
-int lttng_consumer_on_read_subbuffer_splice(
+ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
@@ -903,6 +903,10 @@ void *lttng_consumer_thread_poll_fds(void *data)
                }
                pthread_mutex_unlock(&consumer_data.lock);
 
+               /* No FDs and consumer_quit, consumer_cleanup the thread */
+               if (nb_fd == 0 && consumer_quit == 1) {
+                       goto end;
+               }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
@@ -923,11 +927,6 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        goto end;
                }
 
-               /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && consumer_quit == 1) {
-                       goto end;
-               }
-
                /*
                 * If the consumer_poll_pipe triggered poll go
                 * directly to the beginning of the loop to update the
@@ -946,69 +945,90 @@ void *lttng_consumer_thread_poll_fds(void *data)
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
                        if (pollfd[i].revents & POLLPRI) {
+                               ssize_t len;
+
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
-                               ret = ctx->on_buffer_ready(local_stream[i], ctx);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx);
                                /* it's ok to have an unavailable sub-buffer */
-                               if (ret == EAGAIN) {
-                                       ret = 0;
+                               if (len < 0 && len != -EAGAIN) {
+                                       goto end;
+                               } else if (len > 0) {
+                                       local_stream[i]->data_read = 1;
                                }
-                       } else if (pollfd[i].revents & POLLERR) {
-                               ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               rcu_read_lock();
-                               consumer_del_stream_rcu(&local_stream[i]->node.head);
-                               rcu_read_unlock();
-                               num_hup++;
-                       } else if (pollfd[i].revents & POLLNVAL) {
-                               ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               rcu_read_lock();
-                               consumer_del_stream_rcu(&local_stream[i]->node.head);
-                               rcu_read_unlock();
-                               num_hup++;
-                       } else if ((pollfd[i].revents & POLLHUP) &&
-                                       !(pollfd[i].revents & POLLIN)) {
-                               if (consumer_data.type == LTTNG_CONSUMER32_UST
-                                               || consumer_data.type == LTTNG_CONSUMER64_UST) {
-                                       DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
-                                               pollfd[i].fd);
-                                       if (!local_stream[i]->hangup_flush_done) {
-                                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
-                                               /* read after flush */
-                                               do {
-                                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
-                                               } while (ret == EAGAIN);
-                                       }
-                               } else {
-                                       DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
-                               }
-                               rcu_read_lock();
-                               consumer_del_stream_rcu(&local_stream[i]->node.head);
-                               rcu_read_unlock();
-                               num_hup++;
                        }
                }
 
-               /* If every buffer FD has hung up, we end the read loop here */
-               if (nb_fd > 0 && num_hup == nb_fd) {
-                       DBG("every buffer FD has hung up\n");
-                       if (consumer_quit == 1) {
-                               goto end;
-                       }
+               /*
+                * If we read high prio channel in this loop, try again
+                * for more high prio data.
+                */
+               if (high_prio) {
                        continue;
                }
 
                /* Take care of low priority channels. */
-               if (high_prio == 0) {
-                       for (i = 0; i < nb_fd; i++) {
-                               if (pollfd[i].revents & POLLIN) {
-                                       DBG("Normal read on fd %d", pollfd[i].fd);
-                                       ret = ctx->on_buffer_ready(local_stream[i], ctx);
-                                       /* it's ok to have an unavailable subbuffer */
-                                       if (ret == EAGAIN) {
-                                               ret = 0;
-                                       }
+               for (i = 0; i < nb_fd; i++) {
+                       if ((pollfd[i].revents & POLLIN) ||
+                                       local_stream[i]->hangup_flush_done) {
+                               ssize_t len;
+
+                               assert(!(pollfd[i].revents & POLLERR));
+                               assert(!(pollfd[i].revents & POLLNVAL));
+                               DBG("Normal read on fd %d", pollfd[i].fd);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx);
+                               /* it's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN) {
+                                       goto end;
+                               } else if (len > 0) {
+                                       local_stream[i]->data_read = 1;
+                               }
+                       }
+               }
+
+               /* Handle hangup and errors */
+               for (i = 0; i < nb_fd; i++) {
+                       if (!local_stream[i]->hangup_flush_done
+                                       && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
+                                       && (consumer_data.type == LTTNG_CONSUMER32_UST
+                                               || consumer_data.type == LTTNG_CONSUMER64_UST)) {
+                               DBG("fd %d is hup|err|nval. Attempting flush and read.",
+                                       pollfd[i].fd);
+                               lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+                               /* Attempt read again, for the data we just flushed. */
+                               local_stream[i]->data_read = 1;
+                       }
+                       /*
+                        * If the poll flag is HUP/ERR/NVAL and we have
+                        * read no data in this pass, we can remove the
+                        * stream from its hash table.
+                        */
+                       if ((pollfd[i].revents & POLLHUP)) {
+                               DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+                               if (!local_stream[i]->data_read) {
+                                       rcu_read_lock();
+                                       consumer_del_stream_rcu(&local_stream[i]->node.head);
+                                       rcu_read_unlock();
+                                       num_hup++;
+                               }
+                       } else if (pollfd[i].revents & POLLERR) {
+                               ERR("Error returned in polling fd %d.", pollfd[i].fd);
+                               if (!local_stream[i]->data_read) {
+                                       rcu_read_lock();
+                                       consumer_del_stream_rcu(&local_stream[i]->node.head);
+                                       rcu_read_unlock();
+                                       num_hup++;
+                               }
+                       } else if (pollfd[i].revents & POLLNVAL) {
+                               ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
+                               if (!local_stream[i]->data_read) {
+                                       rcu_read_lock();
+                                       consumer_del_stream_rcu(&local_stream[i]->node.head);
+                                       rcu_read_unlock();
+                                       num_hup++;
                                }
                        }
+                       local_stream[i]->data_read = 0;
                }
        }
 end:
@@ -1140,7 +1160,7 @@ end:
        return NULL;
 }
 
-int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        switch (consumer_data.type) {
index 35a72b50f6078e333f1fe754de5f183417b71767..71ae39903a8477c341aff3c322acf6602f049604 100644 (file)
@@ -114,6 +114,7 @@ struct lttng_consumer_stream {
        /* For UST */
        struct lttng_ust_lib_ring_buffer *buf;
        int cpu;
+       int data_read;
        int hangup_flush_done;
        /* UID/GID of the user owning the session to which stream belongs */
        uid_t uid;
@@ -125,8 +126,11 @@ struct lttng_consumer_stream {
  * process.
  */
 struct lttng_consumer_local_data {
-       /* function to call when data is available on a buffer */
-       int (*on_buffer_ready)(struct lttng_consumer_stream *stream,
+       /*
+        * Function to call when data is available on a buffer.
+        * Returns the number of bytes read, or negative error value.
+        */
+       ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
                        struct lttng_consumer_local_data *ctx);
        /*
         * function to call when we receive a new channel, it receives a
@@ -285,16 +289,16 @@ int consumer_add_channel(struct lttng_consumer_channel *channel);
 
 extern struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
-               int (*buffer_ready)(struct lttng_consumer_stream *stream,
+               ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
                        struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(int sessiond_key, uint32_t state));
 extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
-extern int lttng_consumer_on_read_subbuffer_mmap(
+extern ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len);
-extern int lttng_consumer_on_read_subbuffer_splice(
+extern ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len);
 extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
@@ -308,7 +312,7 @@ extern void *lttng_consumer_thread_receive_fds(void *data);
 extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
-int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
index c1ba1405ea478931a9cd3dee4ed57399ec2c5cd7..608265f9627af4291148fe4444e2dbd036883277 100644 (file)
@@ -45,12 +45,12 @@ extern volatile int consumer_quit;
  *
  * Returns the number of bytes written
  */
-int lttng_kconsumer_on_read_subbuffer_mmap(
+ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
        unsigned long mmap_offset;
-       long ret = 0;
+       ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
        int fd = stream->wait_fd;
        int outfd = stream->out_fd;
@@ -91,11 +91,11 @@ end:
  *
  * Returns the number of bytes spliced.
  */
-int lttng_kconsumer_on_read_subbuffer_splice(
+ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
-       long ret = 0;
+       ssize_t ret = 0;
        loff_t offset = 0;
        off_t orig_offset = stream->out_fd_offset;
        int fd = stream->wait_fd;
@@ -308,12 +308,12 @@ end_nosignal:
 /*
  * Consume data on a file descriptor and write it on a trace file.
  */
-int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len;
        int err;
-       long ret = 0;
+       ssize_t ret = 0;
        int infd = stream->wait_fd;
 
        DBG("In read_subbuffer (infd : %d)", infd);
index 6e820044bad2ea6618786ff4673bd009f405d199..f4f46169de2a15bf9548df2d8f554a43681ceef9 100644 (file)
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile.
  *
- * Returns the number of bytes written.
+ * Returns the number of bytes written, or negative value on error.
  */
-extern int lttng_kconsumer_on_read_subbuffer_mmap(
+extern ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len);
 
 /*
  * Splice the data from the ring buffer to the tracefile.
  *
- * Returns the number of bytes spliced.
+ * Returns the number of bytes spliced, or negative error value on
+ * error.
  */
-extern int lttng_kconsumer_on_read_subbuffer_splice(
+extern ssize_t lttng_kconsumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len);
 
@@ -62,7 +63,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
 
-int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx);
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 
index 113682384ae44d829d03af00b1378d25687a4a39..c8ba084606180790b51fc7d11ba8ea5075f019c4 100644 (file)
@@ -43,9 +43,9 @@ extern volatile int consumer_quit;
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile.
  *
- * Returns the number of bytes written
+ * Returns the number of bytes written, else negative value on error.
  */
-int lttng_ustconsumer_on_read_subbuffer_mmap(
+ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
@@ -90,7 +90,7 @@ end:
  *
  * Returns the number of bytes spliced.
  */
-int lttng_ustconsumer_on_read_subbuffer_splice(
+ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
index 76b4eeece093b5804bfe51e4e33a0928a9370649..0089ddf17ab250efa064b539a148cc956cf1034a 100644 (file)
 /*
  * Mmap the ring buffer, read it and write the data to the tracefile.
  *
- * Returns the number of bytes written.
+ * Returns the number of bytes written, else negative value on error.
  */
-extern int lttng_ustconsumer_on_read_subbuffer_mmap(
+extern ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len);
 
 /* Not implemented */
-extern int lttng_ustconsumer_on_read_subbuffer_splice(
+extern ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len);
 
@@ -76,7 +76,7 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
 static inline
-int lttng_ustconsumer_on_read_subbuffer_mmap(
+ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len)
 {
@@ -84,7 +84,7 @@ int lttng_ustconsumer_on_read_subbuffer_mmap(
 }
 
 static inline
-int lttng_ustconsumer_on_read_subbuffer_splice(
+ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *uststream, unsigned long len)
 {
This page took 0.034515 seconds and 4 git commands to generate.