health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
+ health_code_update();
+
/* Only self thread will receive signal mask. */
setmask(&mask);
CMM_STORE_SHARED(timer_signal.tid, pthread_self());
while (1) {
+ health_code_update();
+
+ health_poll_entry();
signr = sigwaitinfo(&mask, &info);
+ health_poll_exit();
if (signr == -1) {
if (errno != EINTR) {
PERROR("sigwaitinfo");
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+ health_code_update();
+
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
DBG("Metadata main loop started");
while (1) {
+ health_code_update();
+
/* Only the metadata pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
err = 0; /* All is OK */
restart:
DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
DBG("Metadata event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* We just flushed the stream now read it. */
do {
+ health_code_update();
+
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
assert(stream->wait_fd == pollfd);
do {
+ health_code_update();
+
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+ health_code_update();
+
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
}
while (1) {
+ health_code_update();
+
high_prio = 0;
num_hup = 0;
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
+ health_poll_entry();
num_rdy = poll(pollfd, nb_fd + 1, -1);
+ health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
/* Handle hangup and errors */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+ health_code_update();
+
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!channel_ht) {
/* ENOMEM at this point. Better to bail out. */
DBG("Channel main loop started");
while (1) {
+ health_code_update();
+
/* Only the channel pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
err = 0; /* All is OK */
restart:
DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
DBG("Channel event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
/* From here, the event is a channel wait fd */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
send_node) {
+ health_code_update();
+
cds_list_del(&stream->send_node);
lttng_ustconsumer_del_stream(stream);
uatomic_sub(&stream->chan->refcount, 1);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+ health_code_update();
+
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
consumer_sockpoll[1].events = POLLIN | POLLPRI;
while (1) {
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_code_update();
+
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
goto end;
}
DBG("Incoming command on sock");
#include <common/consumer-timer.h>
#include "kernel-consumer.h"
+#include "../../bin/lttng-consumerd/health-consumerd.h"
extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
}
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/*
* Lock stream because we are about to change its state.
*/
ssize_t read_len;
unsigned long len, padded_len;
+ health_code_update();
+
DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
}
do {
+ health_code_update();
+
ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
if (ret_read < 0) {
if (ret_read != -EAGAIN) {
enum lttng_error_code ret_code = LTTNG_OK;
struct lttcomm_consumer_msg msg;
+ health_code_update();
+
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
if (ret > 0) {
}
return ret;
}
+
+ health_code_update();
+
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
/*
* Notify the session daemon that the command is completed.
return -ENOENT;
}
+ health_code_update();
+
/* relayd needs RCU read-side protection */
rcu_read_lock();
struct lttng_consumer_channel *new_channel;
int ret_recv;
+ health_code_update();
+
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
+
+ health_code_update();
+
DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
msg.u.channel.session_id, msg.u.channel.pathname,
goto end_nosignal;
};
+ health_code_update();
+
if (ctx->on_recv_channel != NULL) {
ret_recv = ctx->on_recv_channel(new_channel);
if (ret_recv == 0) {
msg.u.channel.live_timer_interval);
}
+ health_code_update();
+
/* If we received an error in add_channel, we need to report it. */
if (ret < 0) {
ret = consumer_send_status_msg(sock, ret);
ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
}
+ health_code_update();
+
/* First send a status message before receiving the fds. */
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto error_fatal;
}
+
+ health_code_update();
+
if (ret_code != LTTNG_OK) {
/* Channel was not found. */
goto end_nosignal;
}
/* Blocking call */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
rcu_read_unlock();
return -EINTR;
}
+ health_code_update();
+
/* Get stream file descriptor from socket */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
return ret;
}
+ health_code_update();
+
/*
* Send status code to session daemon only if the recv works. If the
* above recv() failed, the session daemon is notified through the
goto end_nosignal;
}
+ health_code_update();
+
new_stream = consumer_allocate_stream(channel->key,
fd,
LTTNG_CONSUMER_ACTIVE_STREAM,
*/
new_stream->hangup_flush_done = 0;
+ health_code_update();
+
if (ctx->on_recv_stream) {
ret = ctx->on_recv_stream(new_stream);
if (ret < 0) {
}
}
+ health_code_update();
+
if (new_stream->metadata_flag) {
channel->metadata_stream = new_stream;
}
/* Vitible to other threads */
new_stream->globally_visible = 1;
+ health_code_update();
+
ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
consumer_flag_relayd_for_destroy(relayd);
}
+ health_code_update();
+
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
ret = consumer_data_pending(id);
+ health_code_update();
+
/* Send back returned value to session daemon */
ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
if (ret < 0) {
}
}
+ health_code_update();
+
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
}
+ health_code_update();
+
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
+ health_code_update();
+
/*
* This command should ONLY be issued for channel with streams set in
* no monitor mode.
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
*/
+ health_code_update();
return 1;
error_fatal:
#include <common/index/index.h>
#include "ust-consumer.h"
+#include "../../bin/lttng-consumerd/health-consumerd.h"
extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
+
+ health_code_update();
+
cds_list_del(&stream->send_node);
ustctl_destroy_stream(stream->ustream);
free(stream);
int wait_fd;
int ust_metadata_pipe[2];
+ health_code_update();
+
if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
if (ret < 0) {
if (channel->relayd_id != (uint64_t) -1ULL) {
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Try to send the stream to the relayd if one is available. */
ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
if (ret < 0) {
/* The channel was sent successfully to the sessiond at this point. */
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Send stream to session daemon. */
ret = send_sessiond_stream(sock, stream);
if (ret < 0) {
/* Send streams to the corresponding thread. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
+
+ health_code_update();
+
/* Sending the stream to the thread. */
ret = send_stream_to_thread(stream, ctx);
if (ret < 0) {
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
&channel->key, &iter.iter, stream, node_channel_id.node) {
+
+ health_code_update();
+
ustctl_flush_buffer(stream->ustream, 1);
}
error:
}
assert(!metadata_channel->monitor);
+ health_code_update();
+
/*
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
goto error;
}
+ health_code_update();
+
/*
* The metadata stream is NOT created in no monitor mode when the channel
* is created on a sessiond ask channel command.
}
do {
+ health_code_update();
+
ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
if (ret < 0) {
goto error_stream;
DBG("UST consumer snapshot channel %" PRIu64, key);
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Lock stream because we are about to change its state. */
pthread_mutex_lock(&stream->lock);
stream->net_seq_idx = relayd_id;
ssize_t read_len;
unsigned long len, padded_len;
+ health_code_update();
+
DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
goto end;
}
+ health_code_update();
+
/* Receive metadata string. */
ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
if (ret < 0) {
goto end_free;
}
+ health_code_update();
+
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
}
while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
DBG("Waiting for metadata to be flushed");
+
+ health_code_update();
+
usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
}
struct lttcomm_consumer_msg msg;
struct lttng_consumer_channel *channel = NULL;
+ health_code_update();
+
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
DBG("Consumer received unexpected message size %zd (expects %zu)",
}
return ret;
}
+
+ health_code_update();
+
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
/*
* Notify the session daemon that the command is completed.
return -ENOENT;
}
+ health_code_update();
+
/* relayd needs RCU read-side lock */
rcu_read_lock();
goto error_fatal;
};
+ health_code_update();
+
ret = ask_channel(ctx, sock, channel, &attr);
if (ret < 0) {
goto end_channel_error;
msg.u.ask_channel.live_timer_interval);
}
+ health_code_update();
+
/*
* Add the channel to the internal state AFTER all streams were created
* and successfully sent to session daemon. This way, all streams must
goto end_channel_error;
}
+ health_code_update();
+
/*
* Channel and streams are now created. Inform the session daemon that
* everything went well and should wait to receive the channel and
goto end_msg_sessiond;
}
+ health_code_update();
+
/* Send everything to sessiond. */
ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
/*
* In no monitor mode, the streams ownership is kept inside the channel
* so don't send them to the data thread.
goto end_msg_sessiond;
}
+ health_code_update();
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(sock, LTTNG_OK);
if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
/* Wait for more data. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
len, channel, 0, 1);
if (ret < 0) {
}
}
+ health_code_update();
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
+ health_code_update();
break;
}
default:
end_nosignal:
rcu_read_unlock();
+ health_code_update();
+
/*
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
goto error_fatal;
}
rcu_read_unlock();
+
+ health_code_update();
+
return 1;
end_channel_error:
if (channel) {
goto error_fatal;
}
rcu_read_unlock();
+
+ health_code_update();
+
return 1;
error_fatal:
rcu_read_unlock();
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
node.node) {
+
+ health_code_update();
+
pthread_mutex_lock(&stream->chan->lock);
/*
* Whatever returned value, we must continue to try to close everything
request.key);
pthread_mutex_lock(&ctx->metadata_socket_lock);
+
+ health_code_update();
+
ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
sizeof(request));
if (ret < 0) {
goto end;
}
+ health_code_update();
+
/* Receive the metadata from sessiond */
ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
sizeof(msg));
goto end;
}
+ health_code_update();
+
if (msg.cmd_type == LTTNG_ERR_UND) {
/* No registry found */
(void) consumer_send_status_msg(ctx->consumer_metadata_socket,
DBG("No new metadata to receive for key %" PRIu64, key);
}
+ health_code_update();
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
LTTNG_OK);
goto end;
}
+ health_code_update();
+
ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
key, offset, len, channel, timer, wait);
if (ret_code >= 0) {
ret = 0;
end:
+ health_code_update();
+
pthread_mutex_unlock(&ctx->metadata_socket_lock);
return ret;
}