struct lttng_consumer_stream *stream = NULL;
/* Negative keys are lookup failures */
- if (key < 0)
+ if (key < 0) {
return NULL;
+ }
rcu_read_lock();
struct lttng_consumer_channel *channel = NULL;
/* Negative keys are lookup failures */
- if (key < 0)
+ if (key < 0) {
return NULL;
+ }
rcu_read_lock();
if (stream->mmap_base != NULL) {
ret = munmap(stream->mmap_base, stream->mmap_len);
if (ret != 0) {
- perror("munmap");
+ PERROR("munmap");
}
}
break;
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
- perror("malloc struct lttng_consumer_stream");
+ PERROR("malloc struct lttng_consumer_stream");
*alloc_ret = -ENOMEM;
- return NULL;
+ goto end;
}
+
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
stream->chan = consumer_find_channel(channel_key);
if (!stream->chan) {
*alloc_ret = -ENOENT;
+ ERR("Unable to find channel for stream %d", stream_key);
goto error;
}
stream->chan->refcount++;
error:
free(stream);
+end:
return NULL;
}
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
- perror("munmap");
+ PERROR("munmap");
}
}
if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
- perror("malloc struct lttng_consumer_channel");
+ PERROR("malloc struct lttng_consumer_channel");
goto end;
}
channel->key = channel_key;
if (errno == EINTR) {
goto restart;
}
- perror("Poll error");
+ PERROR("Poll error");
goto exit;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
- perror("write consumer quit");
+ PERROR("write consumer quit");
}
}
ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
- perror("allocating context");
+ PERROR("allocating context");
goto error;
}
ret = pipe(ctx->consumer_poll_pipe);
if (ret < 0) {
- perror("Error creating poll pipe");
+ PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
- perror("Error creating recv pipe");
+ PERROR("Error creating recv pipe");
goto error_quit_pipe;
}
ret = pipe(ctx->consumer_thread_pipe);
if (ret < 0) {
- perror("Error creating thread pipe");
+ PERROR("Error creating thread pipe");
goto error_thread_pipe;
}
/* allocate for all fds + 1 for the consumer_poll_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
- perror("pollfd malloc");
+ PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
- perror("local_stream malloc");
+ PERROR("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
if (errno == EINTR) {
goto restart;
}
- perror("Poll error");
+ PERROR("Poll error");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
goto end;
} else if (num_rdy == 0) {
ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto end;
}
}
ret = fcntl(sock, F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto end;
}
return ret;
}
- DBG("consumer_add_stream chan %d stream %d",
- msg.u.stream.channel_key,
- msg.u.stream.stream_key);
+ DBG("Consumer command ADD_STREAM chan %d stream %d",
+ msg.u.stream.channel_key, msg.u.stream.stream_key);
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
obj.wait_fd = stream->wait_fd;
obj.memory_map_size = stream->mmap_len;
ret = ustctl_add_stream(stream->chan->handle, &obj);
- if (ret)
+ if (ret) {
+ ERR("UST ctl add_stream failed with ret %d", ret);
return ret;
+ }
+
stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
- if (!stream->buf)
+ if (!stream->buf) {
+ ERR("UST ctl open_stream_read failed");
return -EBUSY;
+ }
+
/* ustctl_open_stream_read has closed the shm fd. */
stream->wait_fd_is_copy = 1;
stream->shm_fd = -1;
stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
if (!stream->mmap_base) {
+ ERR("UST ctl get_mmap_base failed");
return -EINVAL;
}