int viewer_attach_session(struct relay_command *cmd,
struct lttng_ht *sessions_ht)
{
- int ret, send_streams = 0, nb_streams = 0;
+ int ret, send_streams = 0;
+ uint32_t nb_streams = 0, nb_streams_ready = 0;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct lttng_viewer_stream send_stream;
if (stream->session != cmd->session) {
continue;
}
+ nb_streams++;
/*
- * Don't send streams with no ctf_trace, they are not ready to be
- * read.
+ * Don't send streams with no ctf_trace, they are not
+ * ready to be read.
*/
- if (!stream->ctf_trace) {
+ if (!stream->ctf_trace || !stream->viewer_ready) {
continue;
}
+ nb_streams_ready++;
vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (!vstream) {
goto end_unlock;
}
}
- nb_streams++;
+ }
+
+ /* We must have the same amount of existing stream and ready stream. */
+ if (nb_streams != nb_streams_ready) {
+ nb_streams = 0;
}
response.streams_count = htobe32(nb_streams);
}
#include <limits.h>
#include <urcu.h>
#include <urcu/wfqueue.h>
+#include <urcu/list.h>
#include <common/hashtable/hashtable.h>
#include <common/index/ctf-index.h>
RELAY_VIEWER_NOTIFICATION = 4,
};
+/*
+ * When we receive a stream, it gets stored in a list (on a per connection
+ * basis) until we have all the streams of the same channel and the metadata
+ * associated with it, then it gets flagged with viewer_ready.
+ */
+struct relay_stream_recv_handle {
+ uint64_t id; /* stream handle */
+ struct cds_list_head node;
+};
+
/*
* Represents a session for the relay point of view
*/
* update the oldest_tracefile_id.
*/
unsigned int tracefile_overwrite:1;
+ /*
+ * Can this stream be used by a viewer or are we waiting for additional
+ * information.
+ */
+ unsigned int viewer_ready:1;
};
/*
struct lttng_ht_node_ulong sock_n;
struct rcu_head rcu_node;
enum connection_type type;
- unsigned int version_check_done:1;
/* protocol version to use for this session */
uint32_t major;
uint32_t minor;
struct lttng_ht *ctf_traces_ht; /* indexed by path name */
uint64_t session_id;
+ struct cds_list_head recv_head;
+ unsigned int version_check_done:1;
};
struct relay_local_data {
return ret;
}
+/*
+ * When we have received all the streams and the metadata for a channel,
+ * we make them visible to the viewer threads.
+ */
+static
+void set_viewer_ready_flag(struct relay_command *cmd)
+{
+ struct relay_stream_recv_handle *node, *tmp_node;
+
+ cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ stream = relay_stream_find_by_id(node->id);
+ if (!stream) {
+ /*
+ * Stream is most probably being cleaned up by the data thread thus
+ * simply continue to the next one.
+ */
+ continue;
+ }
+
+ /*
+ * If any of the streams in the list doesn't have a ctf_trace assigned,
+ * it means that we never received the metadata stream, so we have to
+ * wait until it arrives to make the streams available to the viewer.
+ */
+ if (!stream->ctf_trace) {
+ goto end;
+ }
+
+ stream->viewer_ready = 1;
+ rcu_read_unlock();
+
+ /* Clean stream handle node. */
+ cds_list_del(&node->node);
+ free(node);
+ }
+
+end:
+ return;
+}
+
+/*
+ * Add a recv handle node to the connection recv list with the given stream
+ * handle. A new node is allocated thus must be freed when the node is deleted
+ * from the list.
+ */
+static void queue_stream_handle(uint64_t handle, struct relay_command *cmd)
+{
+ struct relay_stream_recv_handle *node;
+
+ assert(cmd);
+
+ node = zmalloc(sizeof(*node));
+ if (!node) {
+ PERROR("zmalloc queue stream handle");
+ return;
+ }
+
+ node->id = handle;
+ cds_list_add(&node->node, &cmd->recv_head);
+}
+
/*
* relay_add_stream: allocate a new stream for a session
*/
ctf_trace_assign(cmd->ctf_traces_ht, stream);
stream->ctf_traces_ht = cmd->ctf_traces_ht;
+ /*
+ * Add the stream handle in the recv list of the connection. Once the end
+ * stream message is received, this list is emptied and streams are set
+ * with the viewer ready flag.
+ */
+ queue_stream_handle(stream->stream_handle, cmd);
+
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_add_unique_ulong(relay_streams_ht,
return ret;
}
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret, send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(cmd);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Flag every pending stream in the connection recv list that they are
+ * ready to be used by the viewer.
+ */
+ set_viewer_ready_flag(cmd);
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending sent_stream reply");
+ ret = send_ret;
+ } else {
+ /* Success. */
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* Process the commands received on the control socket
*/
case RELAYD_SEND_INDEX:
ret = relay_recv_index(recv_hdr, cmd);
break;
+ case RELAYD_STREAMS_SENT:
+ ret = relay_streams_sent(recv_hdr, cmd);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
PERROR("read relay cmd pipe");
goto error_read;
}
+ CDS_INIT_LIST_HEAD(&relay_connection->recv_head);
/*
* Only used by the control side and the reference is copied inside each
assert(!ret);
if (relay_connection->type == RELAY_CONTROL) {
+ struct relay_stream_recv_handle *node, *tmp_node;
+
relay_delete_session(relay_connection, sessions_ht);
lttng_ht_destroy(relay_connection->ctf_traces_ht);
+
+ /* Clean up recv list. */
+ cds_list_for_each_entry_safe(node, tmp_node,
+ &relay_connection->recv_head, node) {
+ cds_list_del(&node->node);
+ free(node);
+ }
}
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
msg->u.stream.cpu = cpu;
}
+void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
+ enum lttng_consumer_command cmd,
+ uint64_t channel_key, uint64_t net_seq_idx)
+{
+ assert(msg);
+
+ memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
+
+ msg->cmd_type = cmd;
+ msg->u.sent_streams.channel_key = channel_key;
+ msg->u.sent_streams.net_seq_idx = net_seq_idx;
+}
+
/*
* Send stream communication structure to the consumer.
*/
uint64_t channel_key,
uint64_t stream_key,
int cpu);
+void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
+ enum lttng_consumer_command cmd,
+ uint64_t channel_key, uint64_t net_seq_idx);
void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg,
enum lttng_consumer_command cmd,
uint64_t channel_key,
return ret;
}
+/*
+ * Sending the notification that all streams were sent with STREAMS_SENT.
+ */
+int kernel_consumer_streams_sent(struct consumer_socket *sock,
+ struct ltt_kernel_session *session, uint64_t channel_key)
+{
+ int ret;
+ struct lttcomm_consumer_msg lkm;
+ struct consumer_output *consumer;
+
+ assert(sock);
+ assert(session);
+
+ DBG("Sending streams_sent");
+ /* Get consumer output pointer */
+ consumer = session->consumer;
+
+ /* Prep stream consumer message */
+ consumer_init_streams_sent_comm_msg(&lkm,
+ LTTNG_CONSUMER_STREAMS_SENT,
+ channel_key, consumer->net_seq_index);
+
+ health_code_update();
+
+ /* Send stream and file descriptor */
+ ret = consumer_send_msg(sock, &lkm);
+ if (ret < 0) {
+ goto error;
+ }
+
+error:
+ return ret;
+}
+
/*
* Send all stream fds of kernel channel to the consumer.
*/
{
int ret;
struct ltt_kernel_stream *stream;
+ uint64_t channel_key = -1ULL;
/* Safety net */
assert(channel);
if (ret < 0) {
goto error;
}
+ if (channel_key == -1ULL) {
+ channel_key = channel->fd;
+ }
+ }
+
+ if (!monitor || channel_key == -1ULL) {
+ goto end;
+ }
+
+ /* Add stream on the kernel consumer side. */
+ ret = kernel_consumer_streams_sent(sock, session, channel_key);
+ if (ret < 0) {
+ goto error;
}
+end:
error:
return ret;
}
int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
struct ltt_kernel_metadata *metadata);
+
+int kernel_consumer_streams_sent(struct consumer_socket *sock,
+ struct ltt_kernel_session *session, uint64_t channel_key);
return ret;
}
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(net_seq_idx != -1ULL);
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_streams_sent(&relayd->control_sock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+ } else {
+ ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+ net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+ DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Find a relayd and close the stream
*/
LTTNG_CONSUMER_FLUSH_CHANNEL,
LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
LTTNG_CONSUMER_SNAPSHOT_METADATA,
+ LTTNG_CONSUMER_STREAMS_SENT,
};
/* State of each fd in consumer */
uint64_t net_seq_idx);
struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path);
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx);
void close_relayd_stream(struct lttng_consumer_stream *stream);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
path, stream->name, stream->key);
}
+ ret = consumer_send_relayd_streams_sent(relayd_id);
+ if (ret < 0) {
+ ERR("sending streams sent to relayd");
+ goto end_unlock;
+ }
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
new_stream->name, fd, new_stream->relayd_stream_id);
break;
}
+ case LTTNG_CONSUMER_STREAMS_SENT:
+ {
+ struct lttng_consumer_channel *channel;
+
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
+ channel = consumer_find_channel(msg.u.sent_streams.channel_key);
+ if (!channel) {
+ /*
+ * We could not find the channel. Can happen if cpu hotplug
+ * happens while tearing down.
+ */
+ ERR("Unable to find channel key %" PRIu64,
+ msg.u.sent_streams.channel_key);
+ ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ /*
+ * Send status code to session daemon.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ health_code_update();
+
+ /*
+ * We should not send this message if we don't monitor the
+ * streams in this channel.
+ */
+ if (!channel->monitor) {
+ break;
+ }
+
+ health_code_update();
+ /* Send stream to relayd if the stream has an ID. */
+ if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
+ ret = consumer_send_relayd_streams_sent(
+ msg.u.sent_streams.net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
+ break;
+ }
case LTTNG_CONSUMER_UPDATE_STREAM:
{
rcu_read_unlock();
return ret;
}
+/*
+ * Inform the relay that all the streams for the current channel has been sent.
+ *
+ * On success return 0 else return ret_code negative value.
+ */
+int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
+{
+ int ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+
+ DBG("Relayd sending streams sent.");
+
+ /* This feature was introduced in 2.4, ignore it for earlier versions. */
+ if (rsock->minor < 4) {
+ ret = 0;
+ goto end;
+ }
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Waiting for reply */
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Back to host bytes order. */
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd streams sent replied error %d", reply.ret_code);
+ goto error;
+ } else {
+ /* Success */
+ ret = 0;
+ }
+
+ DBG("Relayd streams sent success");
+
+error:
+end:
+ return ret;
+}
+
/*
* Check version numbers on the relayd.
* If major versions are compatible, we assign minor_to_use to the
int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name,
const char *pathname, uint64_t *stream_id,
uint64_t tracefile_size, uint64_t tracefile_count);
+int relayd_streams_sent(struct lttcomm_relayd_sock *rsock);
int relayd_send_close_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
uint64_t last_net_seq_num);
int relayd_version_check(struct lttcomm_relayd_sock *sock);
RELAYD_ADD_INDEX = 12,
RELAYD_SEND_INDEX = 13,
RELAYD_CLOSE_INDEX = 14,
- /* Live-reading commands. */
+ /* Live-reading commands (2.4+). */
RELAYD_LIST_SESSIONS = 15,
+ /* All streams of the channel have been sent to the relayd (2.4+). */
+ RELAYD_STREAMS_SENT = 16,
};
/*
uint64_t key;
uint64_t max_stream_size;
} LTTNG_PACKED snapshot_channel;
+ struct {
+ uint64_t channel_key;
+ uint64_t net_seq_idx;
+ } LTTNG_PACKED sent_streams;
} u;
} LTTNG_PACKED;
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct lttng_consumer_stream *stream;
+ uint64_t net_seq_idx = -1ULL;
assert(channel);
assert(ctx);
}
ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
}
+ if (net_seq_idx == -1ULL) {
+ net_seq_idx = stream->net_seq_idx;
+ }
+ }
+ ret = consumer_send_relayd_streams_sent(net_seq_idx);
+ if (ret < 0) {
+ /*
+ * Flag that the relayd was the problem here probably due to a
+ * communicaton error on the socket.
+ */
+ if (relayd_error) {
+ *relayd_error = 1;
+ }
+ ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
}
}
DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path,
stream->name, stream->key);
}
+ if (relayd_id != -1ULL) {
+ ret = consumer_send_relayd_streams_sent(relayd_id);
+ if (ret < 0) {
+ goto error_unlock;
+ }
+ }
ustctl_flush_buffer(stream->ustream, 1);