fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
fprintf(stderr, " -C, --control-port URL Control port listening.\n");
fprintf(stderr, " -D, --data-port URL Data port listening.\n");
+ fprintf(stderr, " -L, --live-port URL Live view port listening.\n");
fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n");
while (1) {
int option_index = 0;
- c = getopt_long(argc, argv, "dhv" "C:D:o:g:",
+ c = getopt_long(argc, argv, "dhv" "C:D:L:o:g:",
long_options, &option_index);
if (c == -1) {
break;
data_uri->port = DEFAULT_NETWORK_DATA_PORT;
}
break;
+ case 'L':
+ ret = uri_parse(optarg, &live_uri);
+ if (ret < 0) {
+ ERR("Invalid live URI specified");
+ goto exit;
+ }
+ if (live_uri->port == 0) {
+ live_uri->port = DEFAULT_NETWORK_VIEWER_PORT;
+ }
+ break;
case 'd':
opt_daemon = 1;
break;
uri_free(control_uri);
uri_free(data_uri);
+ /* Live URI is freed in the live thread. */
}
/*
struct relay_stream *stream =
caa_container_of(head, struct relay_stream, rcu_node);
- ctf_trace_try_destroy(stream->ctf_trace);
-
free(stream->path_name);
free(stream->channel_name);
free(stream);
* lookup failure on the live thread side of a stream indicates
* that the viewer stream index received value should be used.
*/
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
vstream->total_index_received = stream->total_index_received;
+ vstream->tracefile_count_last = stream->tracefile_count_current;
+ vstream->close_write_flag = 1;
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
}
/* Cleanup index of that stream. */
iter.iter.node = &stream->ctf_trace_node.node;
delret = lttng_ht_del(stream->ctf_traces_ht, &iter);
assert(!delret);
+
+ if (stream->ctf_trace) {
+ ctf_trace_try_destroy(stream->ctf_trace);
+ }
+
call_rcu(&stream->rcu_node, deferred_free_stream);
DBG("Closed tracefile %d from close stream", stream->fd);
}
return ret;
}
+/*
+ * When we have received all the streams and the metadata for a channel,
+ * we make them visible to the viewer threads.
+ */
+static
+void set_viewer_ready_flag(struct relay_command *cmd)
+{
+ struct relay_stream_recv_handle *node, *tmp_node;
+
+ cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ stream = relay_stream_find_by_id(node->id);
+ if (!stream) {
+ /*
+ * Stream is most probably being cleaned up by the data thread thus
+ * simply continue to the next one.
+ */
+ rcu_read_unlock();
+ continue;
+ }
+
+ stream->viewer_ready = 1;
+ rcu_read_unlock();
+
+ /* Clean stream handle node. */
+ cds_list_del(&node->node);
+ free(node);
+ }
+
+ return;
+}
+
+/*
+ * Add a recv handle node to the connection recv list with the given stream
+ * handle. A new node is allocated thus must be freed when the node is deleted
+ * from the list.
+ */
+static void queue_stream_handle(uint64_t handle, struct relay_command *cmd)
+{
+ struct relay_stream_recv_handle *node;
+
+ assert(cmd);
+
+ node = zmalloc(sizeof(*node));
+ if (!node) {
+ PERROR("zmalloc queue stream handle");
+ return;
+ }
+
+ node->id = handle;
+ cds_list_add(&node->node, &cmd->recv_head);
+}
+
/*
* relay_add_stream: allocate a new stream for a session
*/
ctf_trace_assign(cmd->ctf_traces_ht, stream);
stream->ctf_traces_ht = cmd->ctf_traces_ht;
+ /*
+ * Add the stream handle in the recv list of the connection. Once the end
+ * stream message is received, this list is emptied and streams are set
+ * with the viewer ready flag.
+ */
+ if (stream->metadata_flag) {
+ stream->viewer_ready = 1;
+ } else {
+ queue_stream_handle(stream->stream_handle, cmd);
+ }
+
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_add_unique_ulong(relay_streams_ht,
return ret;
}
+/*
+ * Receive the streams_sent message.
+ *
+ * Return 0 on success else a negative value.
+ */
+static
+int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd)
+{
+ int ret, send_ret;
+ struct lttcomm_relayd_generic_reply reply;
+
+ assert(cmd);
+
+ DBG("Relay receiving streams_sent");
+
+ if (!cmd->session || cmd->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ /*
+ * Flag every pending stream in the connection recv list that they are
+ * ready to be used by the viewer.
+ */
+ set_viewer_ready_flag(cmd);
+
+ reply.ret_code = htobe32(LTTNG_OK);
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending sent_stream reply");
+ ret = send_ret;
+ } else {
+ /* Success. */
+ ret = 0;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* Process the commands received on the control socket
*/
case RELAYD_SEND_INDEX:
ret = relay_recv_index(recv_hdr, cmd);
break;
+ case RELAYD_STREAMS_SENT:
+ ret = relay_streams_sent(recv_hdr, cmd);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
if (stream->tracefile_size > 0 &&
(stream->tracefile_size_current + data_size) >
stream->tracefile_size) {
+ struct relay_viewer_stream *vstream;
+ uint64_t new_id;
+
+ new_id = (stream->tracefile_count_current + 1) %
+ stream->tracefile_count;
+ /*
+ * When we wrap-around back to 0, we start overwriting old
+ * trace data.
+ */
+ if (!stream->tracefile_overwrite && new_id == 0) {
+ stream->tracefile_overwrite = 1;
+ }
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
+ if (stream->tracefile_overwrite) {
+ stream->oldest_tracefile_id =
+ (stream->oldest_tracefile_id + 1) %
+ stream->tracefile_count;
+ }
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * The viewer is reading a file about to be
+ * overwritten. Close the FDs it is
+ * currently using and let it handle the fault.
+ */
+ if (vstream->tracefile_count_current == new_id) {
+ pthread_mutex_lock(&vstream->overwrite_lock);
+ vstream->abort_flag = 1;
+ pthread_mutex_unlock(&vstream->overwrite_lock);
+ DBG("Streaming side setting abort_flag on stream %s_%lu\n",
+ stream->channel_name, new_id);
+ } else if (vstream->tracefile_count_current ==
+ stream->tracefile_count_current) {
+ /*
+ * The reader and writer were in the
+ * same trace file, inform the viewer
+ * that no new index will ever be added
+ * to this file.
+ */
+ vstream->close_write_flag = 1;
+ }
+ }
ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
stream->tracefile_size, stream->tracefile_count,
relayd_uid, relayd_gid, stream->fd,
&(stream->tracefile_count_current), &stream->fd);
+ stream->total_index_received = 0;
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
if (ret < 0) {
ERR("Rotating stream output file");
goto end_rcu_unlock;
PERROR("read relay cmd pipe");
goto error_read;
}
+ CDS_INIT_LIST_HEAD(&relay_connection->recv_head);
/*
* Only used by the control side and the reference is copied inside each
assert(!ret);
if (relay_connection->type == RELAY_CONTROL) {
+ struct relay_stream_recv_handle *node, *tmp_node;
+
relay_delete_session(relay_connection, sessions_ht);
lttng_ht_destroy(relay_connection->ctf_traces_ht);
+
+ /* Clean up recv list. */
+ cds_list_for_each_entry_safe(node, tmp_node,
+ &relay_connection->recv_head, node) {
+ cds_list_del(&node->node);
+ free(node);
+ }
}
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
/* Check if daemon is UID = 0 */
if (relayd_uid == 0) {
- if (control_uri->port < 1024 || data_uri->port < 1024) {
+ if (control_uri->port < 1024 || data_uri->port < 1024 ||
+ live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
ret = -1;
goto exit;