Also fix consumer handling of poll fd: a mask should be used.
Also fix UST stream output passed to the consumer (the fix is a hack
currently, left a FIXME).
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
/* Must be a power of 2 */
#define DEFAULT_UST_CHANNEL_SUBBUF_NUM 4
/* See lttng-ust.h enum lttng_ust_output */
-#define DEFAULT_UST_CHANNEL_OUTPUT LTTNG_UST_MMAP
+#define DEFAULT_UST_CHANNEL_OUTPUT LTTNG_EVENT_MMAP
/*
* Default timeout value for the sem_timedwait() call. Blocking forever is not
*/
struct lttng_consumer_local_data {
/* function to call when data is available on a buffer */
- int (*on_buffer_ready)(struct lttng_consumer_stream *stream);
+ int (*on_buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx);
/*
* function to call when we receive a new channel, it receives a
* newly allocated channel, depending on the return code of this
extern struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
- int (*buffer_ready)(struct lttng_consumer_stream *stream),
+ int (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
int (*update_stream)(int sessiond_key, uint32_t state));
extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
+int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx);
+int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
+
#endif /* _LTTNG_CONSUMER_H */
int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
+
+int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx);
+int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
+
#endif /* _LTTNG_KCONSUMER_H */
extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream);
extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx);
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
+
#else /* HAVE_LIBLTTNG_UST_CTL */
static inline
{
}
+static inline
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ return -ENOSYS;
+}
+
+static inline
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ return -ENOSYS;
+}
+
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTTNG_USTCONSUMER_H */
*/
struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
- int (*buffer_ready)(struct lttng_consumer_stream *stream),
+ int (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
int (*update_stream)(int stream_key, uint32_t state))
goto end;
}
- /* No FDs and consumer_quit, kconsumer_cleanup the thread */
+ /* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && consumer_quit == 1) {
goto end;
}
* array. We want to prioritize array update over
* low-priority reads.
*/
- if (pollfd[nb_fd].revents == POLLIN) {
+ if (pollfd[nb_fd].revents & POLLIN) {
DBG("consumer_poll_pipe wake up");
tmp2 = read(ctx->consumer_poll_pipe[0], &tmp, 1);
if (tmp2 < 0) {
- perror("read kconsumer poll");
+ perror("read consumer poll");
}
continue;
}
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
- switch(pollfd[i].revents) {
- case POLLERR:
+ if (pollfd[i].revents & POLLPRI) {
+ DBG("Urgent read on fd %d", pollfd[i].fd);
+ high_prio = 1;
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ /* it's ok to have an unavailable sub-buffer */
+ if (ret == EAGAIN) {
+ ret = 0;
+ }
+ } else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLHUP:
- DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
+ } else if (pollfd[i].revents & POLLNVAL) {
+ ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLNVAL:
- ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
+ } else if ((pollfd[i].revents & POLLHUP &&
+ !(pollfd[i].revents & POLLIN))) {
+ DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
consumer_del_stream(local_stream[i]);
num_hup++;
- break;
- case POLLPRI:
- DBG("Urgent read on fd %d", pollfd[i].fd);
- high_prio = 1;
- ret = ctx->on_buffer_ready(local_stream[i]);
- /* it's ok to have an unavailable sub-buffer */
- if (ret == EAGAIN) {
- ret = 0;
- }
- break;
}
}
/* Take care of low priority channels. */
if (high_prio == 0) {
for (i = 0; i < nb_fd; i++) {
- if (pollfd[i].revents == POLLIN) {
+ if (pollfd[i].revents & POLLIN) {
DBG("Normal read on fd %d", pollfd[i].fd);
- ret = ctx->on_buffer_ready(local_stream[i]);
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable subbuffer */
if (ret == EAGAIN) {
ret = 0;
}
return NULL;
}
+
+int lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_read_subbuffer(stream, ctx);
+ case LTTNG_CONSUMER_UST:
+ return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
+
+int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ return lttng_kconsumer_on_recv_stream(stream);
+ case LTTNG_CONSUMER_UST:
+ return lttng_ustconsumer_on_recv_stream(stream);
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ return -ENOSYS;
+ }
+}
end_nosignal:
return 0;
}
+
+/*
+ * Consume data on a file descriptor and write it on a trace file.
+ */
+int lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ unsigned long len;
+ int err;
+ long ret = 0;
+ int infd = stream->wait_fd;
+
+ DBG("In read_subbuffer (infd : %d)", infd);
+ /* Get the next subbuffer */
+ err = kernctl_get_next_subbuf(infd);
+ if (err != 0) {
+ ret = errno;
+ /*
+ * This is a debug message even for single-threaded consumer,
+ * because poll() have more relaxed criterions than get subbuf,
+ * so get_subbuf may fail for short race windows where poll()
+ * would issue wakeups.
+ */
+ DBG("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency)");
+ goto end;
+ }
+
+ switch (stream->output) {
+ case LTTNG_EVENT_SPLICE:
+ /* read the whole subbuffer */
+ err = kernctl_get_padded_subbuf_size(infd, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+
+ /* splice the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error splicing to tracefile");
+ }
+ break;
+ case LTTNG_EVENT_MMAP:
+ /* read the used subbuffer size */
+ err = kernctl_get_padded_subbuf_size(infd, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+ /* write the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile");
+ }
+ break;
+ default:
+ ERR("Unknown output method");
+ ret = -1;
+ }
+
+ err = kernctl_put_next_subbuf(infd);
+ if (err != 0) {
+ ret = errno;
+ if (errno == EFAULT) {
+ perror("Error in unreserving sub buffer\n");
+ } else if (errno == EIO) {
+ /* Should never happen with newer LTTng versions */
+ perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ /* Opening the tracefile in write mode */
+ if (stream->path_name != NULL) {
+ ret = open(stream->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ if (ret < 0) {
+ ERR("Opening %s", stream->path_name);
+ perror("open");
+ goto error;
+ }
+ stream->out_fd = ret;
+ }
+
+ if (stream->output == LTTNG_EVENT_MMAP) {
+ /* get the len of the mmap region */
+ unsigned long mmap_len;
+
+ ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
+ if (ret != 0) {
+ ret = errno;
+ perror("kernctl_get_mmap_len");
+ goto error_close_fd;
+ }
+ stream->mmap_len = (size_t) mmap_len;
+
+ stream->mmap_base = mmap(NULL, stream->mmap_len,
+ PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
+ if (stream->mmap_base == MAP_FAILED) {
+ perror("Error mmaping");
+ ret = -1;
+ goto error_close_fd;
+ }
+ }
+
+ /* we return 0 to let the library handle the FD internally */
+ return 0;
+
+error_close_fd:
+ {
+ int err;
+
+ err = close(stream->out_fd);
+ assert(!err);
+ }
+error:
+ return ret;
+}
+
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
fds[0], fds[1]);
+ assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
msg.u.stream.stream_key,
fds[0], fds[1],
{
ustctl_close_stream_read(stream->chan->handle, stream->buf);
}
+
+
+int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ unsigned long len;
+ int err;
+ long ret = 0;
+ struct lttng_ust_shm_handle *handle;
+ struct lttng_ust_lib_ring_buffer *buf;
+ char dummy;
+ ssize_t readlen;
+
+ DBG("In read_subbuffer (wait_fd: %d, stream key: %d)",
+ stream->wait_fd, stream->key);
+
+ /* We can consume the 1 byte written into the wait_fd by UST */
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == -EINTR);
+ if (readlen == -1) {
+ ret = readlen;
+ goto end;
+ }
+
+ buf = stream->buf;
+ handle = stream->chan->handle;
+ /* Get the next subbuffer */
+ err = ustctl_get_next_subbuf(handle, buf);
+ if (err != 0) {
+ ret = errno;
+ /*
+ * This is a debug message even for single-threaded consumer,
+ * because poll() have more relaxed criterions than get subbuf,
+ * so get_subbuf may fail for short race windows where poll()
+ * would issue wakeups.
+ */
+ DBG("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency)");
+ goto end;
+ }
+ assert(stream->output == LTTNG_EVENT_MMAP);
+ /* read the used subbuffer size */
+ err = ustctl_get_padded_subbuf_size(handle, buf, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+ /* write the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile");
+ }
+ err = ustctl_put_next_subbuf(handle, buf);
+ if (err != 0) {
+ ret = errno;
+ if (errno == EFAULT) {
+ perror("Error in unreserving sub buffer\n");
+ } else if (errno == EIO) {
+ /* Should never happen with newer LTTng versions */
+ perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ goto end;
+ }
+end:
+ return ret;
+}
+
+int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ /* Opening the tracefile in write mode */
+ if (stream->path_name != NULL) {
+ ret = open(stream->path_name,
+ O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+ if (ret < 0) {
+ ERR("Opening %s", stream->path_name);
+ perror("open");
+ goto error;
+ }
+ stream->out_fd = ret;
+ }
+
+ /* we return 0 to let the library handle the FD internally */
+ return 0;
+
+error:
+ return ret;
+}
}
}
-/*
- * Consume data on a file descriptor and write it on a trace file.
- */
-static int read_subbuffer(struct lttng_consumer_stream *stream)
-{
- unsigned long len;
- int err;
- long ret = 0;
- int infd = stream->wait_fd;
-
- DBG("In read_subbuffer (infd : %d)", infd);
- /* Get the next subbuffer */
- err = kernctl_get_next_subbuf(infd);
- if (err != 0) {
- ret = errno;
- /*
- * This is a debug message even for single-threaded consumer,
- * because poll() have more relaxed criterions than get subbuf,
- * so get_subbuf may fail for short race windows where poll()
- * would issue wakeups.
- */
- DBG("Reserving sub buffer failed (everything is normal, "
- "it is due to concurrency)");
- goto end;
- }
-
- switch (stream->output) {
- case LTTNG_EVENT_SPLICE:
- /* read the whole subbuffer */
- err = kernctl_get_padded_subbuf_size(infd, &len);
- if (err != 0) {
- ret = errno;
- perror("Getting sub-buffer len failed.");
- goto end;
- }
-
- /* splice the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
- if (ret < 0) {
- /*
- * display the error but continue processing to try
- * to release the subbuffer
- */
- ERR("Error splicing to tracefile");
- }
- break;
- case LTTNG_EVENT_MMAP:
- /* read the used subbuffer size */
- err = kernctl_get_padded_subbuf_size(infd, &len);
- if (err != 0) {
- ret = errno;
- perror("Getting sub-buffer len failed.");
- goto end;
- }
- /* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
- if (ret < 0) {
- /*
- * display the error but continue processing to try
- * to release the subbuffer
- */
- ERR("Error writing to tracefile");
- }
- break;
- default:
- ERR("Unknown output method");
- ret = -1;
- }
-
- err = kernctl_put_next_subbuf(infd);
- if (err != 0) {
- ret = errno;
- if (errno == EFAULT) {
- perror("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
- /* Should never happen with newer LTTng versions */
- perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
- }
- goto end;
- }
-
-end:
- return ret;
-}
-
-static int on_recv_stream(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- /* Opening the tracefile in write mode */
- if (stream->path_name != NULL) {
- ret = open(stream->path_name,
- O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
- if (ret < 0) {
- ERR("Opening %s", stream->path_name);
- perror("open");
- goto error;
- }
- stream->out_fd = ret;
- }
-
- if (stream->output == LTTNG_EVENT_MMAP) {
- /* get the len of the mmap region */
- unsigned long mmap_len;
-
- ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
- if (ret != 0) {
- ret = errno;
- perror("kernctl_get_mmap_len");
- goto error_close_fd;
- }
- stream->mmap_len = (size_t) mmap_len;
-
- stream->mmap_base = mmap(NULL, stream->mmap_len,
- PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
- if (stream->mmap_base == MAP_FAILED) {
- perror("Error mmaping");
- ret = -1;
- goto error_close_fd;
- }
- }
-
- /* we return 0 to let the library handle the FD internally */
- return 0;
-
-error_close_fd:
- {
- int err;
-
- err = close(stream->out_fd);
- assert(!err);
- }
-error:
- return ret;
-}
-
/*
* main
*/
USTCONSUMERD_CMD_SOCK_PATH);
}
/* create the consumer instance with and assign the callbacks */
- ctx = lttng_consumer_create(opt_type, read_subbuffer, NULL, on_recv_stream, NULL);
+ ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer,
+ NULL, lttng_consumer_on_recv_stream, NULL);
if (ctx == NULL) {
goto error;
}
lum->attr.num_subbuf = DEFAULT_METADATA_SUBBUF_NUM;
lum->attr.switch_timer_interval = DEFAULT_CHANNEL_SWITCH_TIMER;
lum->attr.read_timer_interval = DEFAULT_CHANNEL_READ_TIMER;
- lum->attr.output = DEFAULT_UST_CHANNEL_OUTPUT;
+ lum->attr.output = LTTNG_UST_MMAP;
lum->handle = -1;
/* Set metadata trace path */
lum.u.stream.channel_key = uchan->obj->shm_fd;
lum.u.stream.stream_key = stream->obj->shm_fd;
lum.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM;
- lum.u.stream.output = uchan->attr.output;
+ /*
+ * FIXME Hack alert! we force MMAP for now. Mixup
+ * between EVENT and UST enums elsewhere.
+ */
+ lum.u.stream.output = DEFAULT_UST_CHANNEL_OUTPUT;
lum.u.stream.mmap_len = stream->obj->memory_map_size;
strncpy(lum.u.stream.path_name, stream->pathname, PATH_MAX - 1);
lum.u.stream.path_name[PATH_MAX - 1] = '\0';