int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)
{
- ssize_t ret;
+ int ret_func;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttcomm_consumer_msg msg;
health_code_update();
- ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
- if (ret != sizeof(msg)) {
- if (ret > 0) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
- ret = -1;
+ {
+ ssize_t ret_recv;
+
+ ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
+ if (ret_recv != sizeof(msg)) {
+ if (ret_recv > 0) {
+ lttng_consumer_send_error(ctx,
+ LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
+ ret_recv = -1;
+ }
+ return ret_recv;
}
- return ret;
}
health_code_update();
case LTTNG_CONSUMER_ADD_CHANNEL:
{
struct lttng_consumer_channel *new_channel;
- int ret_recv;
+ int ret_send_status, ret_add_channel;
const uint64_t chunk_id = msg.u.channel.chunk_id.value;
health_code_update();
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
health_code_update();
if (ctx->on_recv_channel != NULL) {
- ret_recv = ctx->on_recv_channel(new_channel);
- if (ret_recv == 0) {
- ret = consumer_add_channel(new_channel, ctx);
- } else if (ret_recv < 0) {
+ int ret_recv_channel =
+ ctx->on_recv_channel(new_channel);
+ if (ret_recv_channel == 0) {
+ ret_add_channel = consumer_add_channel(
+ new_channel, ctx);
+ } else if (ret_recv_channel < 0) {
goto end_nosignal;
}
} else {
- ret = consumer_add_channel(new_channel, ctx);
+ ret_add_channel =
+ consumer_add_channel(new_channel, ctx);
}
- if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
+ !ret_add_channel) {
int monitor_start_ret;
DBG("Consumer starting monitor timer");
ERR("Starting channel monitoring timer failed");
goto end_nosignal;
}
-
}
health_code_update();
/* If we received an error in add_channel, we need to report it. */
- if (ret < 0) {
- ret = consumer_send_status_msg(sock, ret);
- if (ret < 0) {
+ if (ret_add_channel < 0) {
+ ret_send_status = consumer_send_status_msg(
+ sock, ret_add_channel);
+ if (ret_send_status < 0) {
goto error_fatal;
}
goto end_nosignal;
struct lttng_consumer_stream *new_stream;
struct lttng_consumer_channel *channel;
int alloc_ret = 0;
+ int ret_send_status, ret_poll, ret_get_max_subbuf_size;
+ ssize_t ret_pipe_write, ret_recv;
/*
* Get stream's channel reference. Needed when adding the stream to the
health_code_update();
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_add_stream_fatal;
}
/* Blocking call */
health_poll_entry();
- ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret) {
+ if (ret_poll) {
goto error_add_stream_fatal;
}
health_code_update();
/* Get stream file descriptor from socket */
- ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
- if (ret != sizeof(fd)) {
+ ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
+ if (ret_recv != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret_func = ret_recv;
goto end;
}
* above recv() failed, the session daemon is notified through the
* error socket and the teardown is eventually done.
*/
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_add_stream_nosignal;
}
}
new_stream->wait_fd = fd;
- ret = kernctl_get_max_subbuf_size(new_stream->wait_fd,
- &new_stream->max_sb_size);
- if (ret < 0) {
+ ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
+ new_stream->wait_fd, &new_stream->max_sb_size);
+ if (ret_get_max_subbuf_size < 0) {
pthread_mutex_unlock(&channel->lock);
ERR("Failed to get kernel maximal subbuffer size");
goto error_add_stream_nosignal;
pthread_mutex_lock(&new_stream->lock);
if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
+ int ret_recv_stream = ctx->on_recv_stream(new_stream);
+ if (ret_recv_stream < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
consumer_stream_free(new_stream);
/* Send stream to relayd if the stream has an ID. */
if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
- ret = consumer_send_relayd_stream(new_stream,
- new_stream->chan->pathname);
- if (ret < 0) {
+ int ret_send_relayd_stream;
+
+ ret_send_relayd_stream = consumer_send_relayd_stream(
+ new_stream, new_stream->chan->pathname);
+ if (ret_send_relayd_stream < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
consumer_stream_free(new_stream);
* to send the "streams_sent" command to relayd.
*/
if (channel->streams_sent_to_relayd) {
- ret = consumer_send_relayd_streams_sent(
- new_stream->net_seq_idx);
- if (ret < 0) {
+ int ret_send_relayd_streams_sent;
+
+ ret_send_relayd_streams_sent =
+ consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret_send_relayd_streams_sent < 0) {
pthread_mutex_unlock(&new_stream->lock);
pthread_mutex_unlock(&channel->lock);
goto error_add_stream_nosignal;
health_code_update();
- ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
- if (ret < 0) {
+ ret_pipe_write = lttng_pipe_write(
+ stream_pipe, &new_stream, sizeof(new_stream));
+ if (ret_pipe_write < 0) {
ERR("Consumer write %s stream to pipe %d",
new_stream->metadata_flag ? "metadata" : "data",
lttng_pipe_get_writefd(stream_pipe));
case LTTNG_CONSUMER_STREAMS_SENT:
{
struct lttng_consumer_channel *channel;
+ int ret_send_status;
/*
* Get stream's channel reference. Needed when adding the stream to the
/*
* Send status code to session daemon.
*/
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0 ||
+ ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
/* Somehow, the session daemon is not responding anymore. */
goto error_streams_sent_nosignal;
}
health_code_update();
/* Send stream to relayd if the stream has an ID. */
if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
- ret = consumer_send_relayd_streams_sent(
+ int ret_send_relay_streams;
+
+ ret_send_relay_streams = consumer_send_relayd_streams_sent(
msg.u.sent_streams.net_seq_idx);
- if (ret < 0) {
+ if (ret_send_relay_streams < 0) {
goto error_streams_sent_nosignal;
}
channel->streams_sent_to_relayd = true;
{
uint64_t index = msg.u.destroy_relayd.net_seq_idx;
struct consumer_relayd_sock_pair *relayd;
+ int ret_send_status;
DBG("Kernel consumer destroying relayd %" PRIu64, index);
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
}
case LTTNG_CONSUMER_DATA_PENDING:
{
- int32_t ret;
+ int32_t ret_data_pending;
uint64_t id = msg.u.data_pending.session_id;
+ ssize_t ret_send;
DBG("Kernel consumer data pending command for id %" PRIu64, id);
- ret = consumer_data_pending(id);
+ ret_data_pending = consumer_data_pending(id);
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
- if (ret < 0) {
+ ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
+ sizeof(ret_data_pending));
+ if (ret_send < 0) {
PERROR("send data pending ret code");
goto error_fatal;
}
{
struct lttng_consumer_channel *channel;
uint64_t key = msg.u.snapshot_channel.key;
+ int ret_send_status;
channel = consumer_find_channel(key);
if (!channel) {
} else {
pthread_mutex_lock(&channel->lock);
if (msg.u.snapshot_channel.metadata == 1) {
- ret = lttng_kconsumer_snapshot_metadata(channel, key,
+ int ret_snapshot;
+
+ ret_snapshot = lttng_kconsumer_snapshot_metadata(
+ channel, key,
msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id, ctx);
- if (ret < 0) {
+ msg.u.snapshot_channel.relayd_id,
+ ctx);
+ if (ret_snapshot < 0) {
ERR("Snapshot metadata failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
}
} else {
- ret = lttng_kconsumer_snapshot_channel(channel, key,
+ int ret_snapshot;
+
+ ret_snapshot = lttng_kconsumer_snapshot_channel(
+ channel, key,
msg.u.snapshot_channel.pathname,
msg.u.snapshot_channel.relayd_id,
- msg.u.snapshot_channel.nb_packets_per_stream,
+ msg.u.snapshot_channel
+ .nb_packets_per_stream,
ctx);
- if (ret < 0) {
+ if (ret_snapshot < 0) {
ERR("Snapshot channel failed");
ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
}
}
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
{
uint64_t key = msg.u.destroy_channel.key;
struct lttng_consumer_channel *channel;
+ int ret_send_status;
channel = consumer_find_channel(key);
if (!channel) {
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_destroy_channel;
}
case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
{
int channel_monitor_pipe;
+ int ret_send_status, ret_set_channel_monitor_pipe;
+ ssize_t ret_recv;
ret_code = LTTCOMM_CONSUMERD_SUCCESS;
/* Successfully received the command's type. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
goto error_fatal;
}
- ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
- 1);
- if (ret != sizeof(channel_monitor_pipe)) {
+ ret_recv = lttcomm_recv_fds_unix_sock(
+ sock, &channel_monitor_pipe, 1);
+ if (ret_recv != sizeof(channel_monitor_pipe)) {
ERR("Failed to receive channel monitor pipe");
goto error_fatal;
}
DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
- ret = consumer_timer_thread_set_channel_monitor_pipe(
- channel_monitor_pipe);
- if (!ret) {
+ ret_set_channel_monitor_pipe =
+ consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret_set_channel_monitor_pipe) {
int flags;
+ int ret_fcntl;
ret_code = LTTCOMM_CONSUMERD_SUCCESS;
/* Set the pipe as non-blocking. */
- ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
- if (ret == -1) {
+ ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret_fcntl == -1) {
PERROR("fcntl get flags of the channel monitoring pipe");
goto error_fatal;
}
- flags = ret;
+ flags = ret_fcntl;
- ret = fcntl(channel_monitor_pipe, F_SETFL,
+ ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
flags | O_NONBLOCK);
- if (ret == -1) {
+ if (ret_fcntl == -1) {
PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
goto error_fatal;
}
} else {
ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
goto error_fatal;
}
break;
{
struct lttng_consumer_channel *channel;
uint64_t key = msg.u.rotate_channel.key;
+ int ret_send_status;
DBG("Consumer rotate channel %" PRIu64, key);
/*
* Sample the rotate position of all the streams in this channel.
*/
- ret = lttng_consumer_rotate_channel(channel, key,
+ int ret_rotate_channel;
+
+ ret_rotate_channel = lttng_consumer_rotate_channel(
+ channel, key,
msg.u.rotate_channel.relayd_id,
- msg.u.rotate_channel.metadata,
- ctx);
- if (ret < 0) {
+ msg.u.rotate_channel.metadata, ctx);
+ if (ret_rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
}
health_code_update();
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_rotate_channel;
}
if (channel) {
/* Rotate the streams that are ready right now. */
- ret = lttng_consumer_rotate_ready_streams(
+ int ret_rotate;
+
+ ret_rotate = lttng_consumer_rotate_ready_streams(
channel, key, ctx);
- if (ret < 0) {
+ if (ret_rotate < 0) {
ERR("Rotate ready streams failed");
}
}
{
struct lttng_consumer_channel *channel;
uint64_t key = msg.u.clear_channel.key;
+ int ret_send_status;
channel = consumer_find_channel(key);
if (!channel) {
DBG("Channel %" PRIu64 " not found", key);
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
} else {
- ret = lttng_consumer_clear_channel(channel);
- if (ret) {
+ int ret_clear_channel;
+
+ ret_clear_channel =
+ lttng_consumer_clear_channel(channel);
+ if (ret_clear_channel) {
ERR("Clear channel failed");
- ret_code = ret;
+ ret_code = ret_clear_channel;
}
health_code_update();
}
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
}
case LTTNG_CONSUMER_INIT:
{
+ int ret_send_status;
+
ret_code = lttng_consumer_init_command(ctx,
msg.u.init.sessiond_uuid);
health_code_update();
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
*/
if (is_local_trace) {
int chunk_dirfd;
+ int ret_send_status;
+ ssize_t ret_recv;
/* Acnowledge the reception of the command. */
- ret = consumer_send_status_msg(sock,
- LTTCOMM_CONSUMERD_SUCCESS);
- if (ret < 0) {
+ ret_send_status = consumer_send_status_msg(
+ sock, LTTCOMM_CONSUMERD_SUCCESS);
+ if (ret_send_status < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
- ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
- if (ret != sizeof(chunk_dirfd)) {
+ ret_recv = lttcomm_recv_fds_unix_sock(
+ sock, &chunk_dirfd, 1);
+ if (ret_recv != sizeof(chunk_dirfd)) {
ERR("Failed to receive trace chunk directory file descriptor");
goto error_fatal;
}
msg.u.close_trace_chunk.relayd_id.value;
struct lttcomm_consumer_close_trace_chunk_reply reply;
char path[LTTNG_PATH_MAX];
+ ssize_t ret_send;
ret_code = lttng_consumer_close_trace_chunk(
msg.u.close_trace_chunk.relayd_id.is_set ?
NULL, path);
reply.ret_code = ret_code;
reply.path_length = strlen(path) + 1;
- ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
- if (ret != sizeof(reply)) {
+ ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
+ if (ret_send != sizeof(reply)) {
goto error_fatal;
}
- ret = lttcomm_send_unix_sock(sock, path, reply.path_length);
- if (ret != reply.path_length) {
+ ret_send = lttcomm_send_unix_sock(
+ sock, path, reply.path_length);
+ if (ret_send != reply.path_length) {
goto error_fatal;
}
goto end_nosignal;
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
*/
- ret = 1;
+ ret_func = 1;
goto end;
error_fatal:
/* This will issue a consumer stop. */
- ret = -1;
+ ret_func = -1;
goto end;
end_msg_sessiond:
/*
* the caller because the session daemon socket management is done
* elsewhere. Returning a negative code or 0 will shutdown the consumer.
*/
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- goto error_fatal;
+ {
+ int ret_send_status;
+
+ ret_send_status = consumer_send_status_msg(sock, ret_code);
+ if (ret_send_status < 0) {
+ goto error_fatal;
+ }
}
- ret = 1;
+
+ ret_func = 1;
+
end:
health_code_update();
rcu_read_unlock();
- return ret;
+ return ret_func;
}
/*