struct lttng_consumer_stream *stream;
int ret;
- stream = malloc(sizeof(*stream));
+ stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
perror("malloc struct lttng_consumer_stream");
goto end;
consumer_data.type == type);
consumer_data.type = type;
- ctx = malloc(sizeof(struct lttng_consumer_local_data));
+ ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
perror("allocating context");
goto error;
int tmp2;
struct lttng_consumer_local_data *ctx = data;
- local_stream = malloc(sizeof(struct lttng_consumer_stream));
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
while (1) {
high_prio = 0;
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- pollfd = malloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
+ pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
perror("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
- local_stream = malloc((consumer_data.stream_count + 1) *
+ local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
perror("local_stream malloc");
pollfd[i].fd);
if (!local_stream[i]->hangup_flush_done) {
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
- /* try reading after flush */
- ret = ctx->on_buffer_ready(local_stream[i], ctx);
- /* it's ok to have an unavailable sub-buffer */
- if (ret == EAGAIN) {
- ret = 0;
- }
+ /* read after flush */
+ do {
+ ret = ctx->on_buffer_ready(local_stream[i], ctx);
+ } while (ret == EAGAIN);
}
} else {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
{
ustctl_flush_buffer(stream->chan->handle, stream->buf, 0);
+ stream->hangup_flush_done = 1;
}
void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
stream->wait_fd, stream->key);
/* We can consume the 1 byte written into the wait_fd by UST */
- do {
- readlen = read(stream->wait_fd, &dummy, 1);
- } while (readlen == -1 && errno == -EINTR);
- if (readlen == -1) {
- ret = readlen;
- goto end;
+ if (!stream->hangup_flush_done) {
+ do {
+ readlen = read(stream->wait_fd, &dummy, 1);
+ } while (readlen == -1 && errno == -EINTR);
+ if (readlen == -1) {
+ ret = readlen;
+ goto end;
+ }
}
buf = stream->buf;
/* Get the next subbuffer */
err = ustctl_get_next_subbuf(handle, buf);
if (err != 0) {
- ret = errno;
+ ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
assert(stream->output == LTTNG_EVENT_MMAP);
/* read the used subbuffer size */
err = ustctl_get_padded_subbuf_size(handle, buf, &len);
- if (err != 0) {
- ret = errno;
- perror("Getting sub-buffer len failed.");
- goto end;
- }
+ assert(err == 0);
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
if (ret < 0) {
ERR("Error writing to tracefile");
}
err = ustctl_put_next_subbuf(handle, buf);
- if (err != 0) {
- ret = errno;
- if (errno == EFAULT) {
- perror("Error in unreserving sub buffer\n");
- } else if (errno == EIO) {
- /* Should never happen with newer LTTng versions */
- perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
- }
- goto end;
- }
+ assert(err == 0);
end:
return ret;
}