continue;
}
- /* Take care of high priority channels first. */
+
+ /* Check if each pipe has data. hack for cygwin. */
for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents & POLLPRI) {
- ssize_t len;
+ if ((pollfd[i].revents & POLLIN) ||
+ local_stream[i]->hangup_flush_done) {
+ int check_ret;
- DBG("Urgent read on fd %d", pollfd[i].fd);
- high_prio = 1;
- len = ctx->on_buffer_ready(local_stream[i], ctx);
- /* it's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
- goto end;
- } else if (len > 0) {
- local_stream[i]->data_read = 1;
+ check_ret = lttng_consumer_check_pipe(local_stream[i], ctx);
+ if (check_ret != 0) {
+ pollfd[i].revents |= POLLHUP;
}
}
}
- /*
- * If we read high prio channel in this loop, try again
- * for more high prio data.
- */
- if (high_prio) {
- continue;
- }
+ /* Take care of high priority channels first. */
+ /* for (i = 0; i < nb_fd; i++) { */
+ /* DBG("!!! POLL FLAGS: %d", pollfd[i].revents); */
+ /* if (pollfd[i].revents & POLLPRI) { */
+ /* ssize_t len; */
+
+ /* DBG("Urgent read on fd %d", pollfd[i].fd); */
+ /* high_prio = 1; */
+ /* len = ctx->on_buffer_ready(local_stream[i], ctx); */
+ /* /\* it's ok to have an unavailable sub-buffer *\/ */
+ /* if (len < 0 && len != -EAGAIN) { */
+ /* goto end; */
+ /* } else if (len > 0) { */
+ /* local_stream[i]->data_read = 1; */
+ /* } */
+ /* } */
+ /* } */
+
+ /* /\* */
+ /* * If we read high prio channel in this loop, try again */
+ /* * for more high prio data. */
+ /* *\/ */
+ /* if (high_prio) { */
+ /* continue; */
+ /* } */
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
}
}
+int lttng_consumer_check_pipe(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ assert(0);
+ return -ENOSYS;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ return lttng_ustconsumer_check_pipe(stream, ctx);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
{
switch (consumer_data.type) {
}
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ ssize_t readlen;
+ char dummy;
+
+ DBG("In check_pipe (wait_fd: %d, stream key: %d)\n",
+ stream->wait_fd, stream->key);
+
+ /* We consume the 1 byte written into the wait_fd by UST */
+ if (!stream->hangup_flush_done) {
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == EINTR);
+ if (readlen == -1) {
+ return -1; /* error */
+ }
+ DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
+ if (readlen == 0)
+ return 1; /* POLLHUP */
+ }
+ return 0; /* no error nor HUP */
+
+}
+
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
ret = readlen;
goto end;
}
+ DBG("Read %zu byte from pipe: %c\n", readlen, dummy);
}
buf = stream->buf;
void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx);
+
#else /* HAVE_LIBLTTNG_UST_CTL */
static inline
{
}
+static inline
+int lttng_ustconsumer_check_pipe(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ return -ENOSYS;
+}
+
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTTNG_USTCONSUMER_H */