*/
struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
- int (*buffer_ready)(struct lttng_consumer_stream *stream,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
*
* Returns the number of bytes written
*/
-int lttng_consumer_on_read_subbuffer_mmap(
+ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
*
* Returns the number of bytes spliced.
*/
-int lttng_consumer_on_read_subbuffer_splice(
+ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
}
pthread_mutex_unlock(&consumer_data.lock);
+ /* No FDs and consumer_quit, consumer_cleanup the thread */
+ if (nb_fd == 0 && consumer_quit == 1) {
+ goto end;
+ }
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
goto end;
}
- /* No FDs and consumer_quit, consumer_cleanup the thread */
- if (nb_fd == 0 && consumer_quit == 1) {
- goto end;
- }
-
/*
* If the consumer_poll_pipe triggered poll go
* directly to the beginning of the loop to update the
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
if (pollfd[i].revents & POLLPRI) {
+ ssize_t len;
+
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
- ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
- if (ret == EAGAIN) {
- ret = 0;
+ if (len < 0 && len != -EAGAIN) {
+ goto end;
+ } else if (len > 0) {
+ local_stream[i]->data_read = 1;
}
- } else if (pollfd[i].revents & POLLERR) {
- ERR("Error returned in polling fd %d.", pollfd[i].fd);
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
- num_hup++;
- } else if (pollfd[i].revents & POLLNVAL) {
- ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
- num_hup++;
- } else if ((pollfd[i].revents & POLLHUP) &&
- !(pollfd[i].revents & POLLIN)) {
- if (consumer_data.type == LTTNG_CONSUMER32_UST
- || consumer_data.type == LTTNG_CONSUMER64_UST) {
- DBG("Polling fd %d tells it has hung up. Attempting flush and read.",
- pollfd[i].fd);
- if (!local_stream[i]->hangup_flush_done) {
- lttng_ustconsumer_on_stream_hangup(local_stream[i]);
- /* read after flush */
- do {
- ret = ctx->on_buffer_ready(local_stream[i], ctx);
- } while (ret == EAGAIN);
- }
- } else {
- DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- }
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
- num_hup++;
}
}
- /* If every buffer FD has hung up, we end the read loop here */
- if (nb_fd > 0 && num_hup == nb_fd) {
- DBG("every buffer FD has hung up\n");
- if (consumer_quit == 1) {
- goto end;
- }
+ /*
+ * If we read high prio channel in this loop, try again
+ * for more high prio data.
+ */
+ if (high_prio) {
continue;
}
/* Take care of low priority channels. */
- if (high_prio == 0) {
- for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents & POLLIN) {
- DBG("Normal read on fd %d", pollfd[i].fd);
- ret = ctx->on_buffer_ready(local_stream[i], ctx);
- /* it's ok to have an unavailable subbuffer */
- if (ret == EAGAIN) {
- ret = 0;
- }
+ for (i = 0; i < nb_fd; i++) {
+ if ((pollfd[i].revents & POLLIN) ||
+ local_stream[i]->hangup_flush_done) {
+ ssize_t len;
+
+ assert(!(pollfd[i].revents & POLLERR));
+ assert(!(pollfd[i].revents & POLLNVAL));
+ DBG("Normal read on fd %d", pollfd[i].fd);
+ len = ctx->on_buffer_ready(local_stream[i], ctx);
+ /* it's ok to have an unavailable sub-buffer */
+ if (len < 0 && len != -EAGAIN) {
+ goto end;
+ } else if (len > 0) {
+ local_stream[i]->data_read = 1;
+ }
+ }
+ }
+
+ /* Handle hangup and errors */
+ for (i = 0; i < nb_fd; i++) {
+ if (!local_stream[i]->hangup_flush_done
+ && (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
+ && (consumer_data.type == LTTNG_CONSUMER32_UST
+ || consumer_data.type == LTTNG_CONSUMER64_UST)) {
+ DBG("fd %d is hup|err|nval. Attempting flush and read.",
+ pollfd[i].fd);
+ lttng_ustconsumer_on_stream_hangup(local_stream[i]);
+ /* Attempt read again, for the data we just flushed. */
+ local_stream[i]->data_read = 1;
+ }
+ /*
+ * If the poll flag is HUP/ERR/NVAL and we have
+ * read no data in this pass, we can remove the
+ * stream from its hash table.
+ */
+ if ((pollfd[i].revents & POLLHUP)) {
+ DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ if (!local_stream[i]->data_read) {
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
+ num_hup++;
+ }
+ } else if (pollfd[i].revents & POLLERR) {
+ ERR("Error returned in polling fd %d.", pollfd[i].fd);
+ if (!local_stream[i]->data_read) {
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
+ num_hup++;
+ }
+ } else if (pollfd[i].revents & POLLNVAL) {
+ ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
+ if (!local_stream[i]->data_read) {
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
+ num_hup++;
}
}
+ local_stream[i]->data_read = 0;
}
}
end:
return NULL;
}
-int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
switch (consumer_data.type) {
/* For UST */
struct lttng_ust_lib_ring_buffer *buf;
int cpu;
+ int data_read;
int hangup_flush_done;
/* UID/GID of the user owning the session to which stream belongs */
uid_t uid;
* process.
*/
struct lttng_consumer_local_data {
- /* function to call when data is available on a buffer */
- int (*on_buffer_ready)(struct lttng_consumer_stream *stream,
+ /*
+ * Function to call when data is available on a buffer.
+ * Returns the number of bytes read, or negative error value.
+ */
+ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
/*
* function to call when we receive a new channel, it receives a
extern struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
- int (*buffer_ready)(struct lttng_consumer_stream *stream,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
int (*update_stream)(int sessiond_key, uint32_t state));
extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
-extern int lttng_consumer_on_read_subbuffer_mmap(
+extern ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len);
-extern int lttng_consumer_on_read_subbuffer_splice(
+extern ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len);
extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
-int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
*
* Returns the number of bytes written
*/
-int lttng_kconsumer_on_read_subbuffer_mmap(
+ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
unsigned long mmap_offset;
- long ret = 0;
+ ssize_t ret = 0;
off_t orig_offset = stream->out_fd_offset;
int fd = stream->wait_fd;
int outfd = stream->out_fd;
*
* Returns the number of bytes spliced.
*/
-int lttng_kconsumer_on_read_subbuffer_splice(
+ssize_t lttng_kconsumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
- long ret = 0;
+ ssize_t ret = 0;
loff_t offset = 0;
off_t orig_offset = stream->out_fd_offset;
int fd = stream->wait_fd;
/*
* Consume data on a file descriptor and write it on a trace file.
*/
-int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
unsigned long len;
int err;
- long ret = 0;
+ ssize_t ret = 0;
int infd = stream->wait_fd;
DBG("In read_subbuffer (infd : %d)", infd);
/*
* Mmap the ring buffer, read it and write the data to the tracefile.
*
- * Returns the number of bytes written.
+ * Returns the number of bytes written, or negative value on error.
*/
-extern int lttng_kconsumer_on_read_subbuffer_mmap(
+extern ssize_t lttng_kconsumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len);
/*
* Splice the data from the ring buffer to the tracefile.
*
- * Returns the number of bytes spliced.
+ * Returns the number of bytes spliced, or negative error value on
+ * error.
*/
-extern int lttng_kconsumer_on_read_subbuffer_splice(
+extern ssize_t lttng_kconsumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len);
int sock, struct pollfd *consumer_sockpoll);
-int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
/*
* Mmap the ring buffer, read it and write the data to the tracefile.
*
- * Returns the number of bytes written
+ * Returns the number of bytes written, else negative value on error.
*/
-int lttng_ustconsumer_on_read_subbuffer_mmap(
+ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
*
* Returns the number of bytes spliced.
*/
-int lttng_ustconsumer_on_read_subbuffer_splice(
+ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
/*
* Mmap the ring buffer, read it and write the data to the tracefile.
*
- * Returns the number of bytes written.
+ * Returns the number of bytes written, else negative value on error.
*/
-extern int lttng_ustconsumer_on_read_subbuffer_mmap(
+extern ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len);
/* Not implemented */
-extern int lttng_ustconsumer_on_read_subbuffer_splice(
+extern ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len);
#else /* HAVE_LIBLTTNG_UST_CTL */
static inline
-int lttng_ustconsumer_on_read_subbuffer_mmap(
+ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len)
{
}
static inline
-int lttng_ustconsumer_on_read_subbuffer_splice(
+ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *uststream, unsigned long len)
{