/* For UST */
struct lttng_ust_lib_ring_buffer *buf;
int cpu;
+ int hangup_flush_done;
};
/*
struct lttng_consumer_local_data *ctx);
int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
+
#else /* HAVE_LIBLTTNG_UST_CTL */
static inline
return -ENOSYS;
}
+static inline
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+}
+
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTTNG_USTCONSUMER_H */
num_hup++;
} else if ((pollfd[i].revents & POLLHUP) &&
!(pollfd[i].revents & POLLIN)) {
- DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ if (consumer_data.type == LTTNG_CONSUMER_UST) {
+ DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
+ pollfd[i].fd);
+ if (!local_stream[i]->hangup_flush_done) {
+ lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+ /* try reading after flush */
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ /* it's ok to have an unavailable sub-buffer */
+ if (ret == EAGAIN) {
+ ret = 0;
+ }
+ }
+ } else {
+ DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ }
consumer_del_stream(local_stream[i]);
num_hup++;
}
msg.u.channel.mmap_len,
msg.u.channel.max_sb_size);
if (new_channel == NULL) {
+ fprintf(stderr, "AAAAA\n");
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
}
msg.u.stream.output,
msg.u.stream.path_name);
if (new_stream == NULL) {
+ fprintf(stderr, "BBBBBB\n");
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
goto end;
}
return 0;
}
+void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
+{
+ ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+}
+
void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
{
ustctl_unmap_channel(chan->handle);