return ret;
}
+/*
+ * Send the viewer the list of current sessions.
+ */
+static
+int viewer_get_new_streams(struct relay_command *cmd,
+ struct lttng_ht *sessions_ht)
+{
+ int ret, send_streams = 0;
+ uint32_t nb_streams = 0;
+ struct lttng_viewer_new_streams_request request;
+ struct lttng_viewer_new_streams_response response;
+ struct lttng_viewer_stream send_stream;
+ struct relay_stream *stream;
+ struct relay_viewer_stream *viewer_stream;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+ struct relay_session *session;
+
+ assert(cmd);
+ assert(sessions_ht);
+
+ DBG("Get new streams received");
+
+ if (cmd->version_check_done == 0) {
+ ERR("Trying to get streams before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ health_code_update();
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0);
+ if (ret < 0 || ret != sizeof(request)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
+ } else {
+ ERR("Relay failed to receive the command parameters.");
+ }
+ ret = -1;
+ goto error;
+ }
+
+ health_code_update();
+
+ rcu_read_lock();
+ lttng_ht_lookup(sessions_ht,
+ (void *)((unsigned long) be64toh(request.session_id)), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ DBG("Relay session %" PRIu64 " not found",
+ be64toh(request.session_id));
+ response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+ goto send_reply;
+ }
+
+ session = caa_container_of(node, struct relay_session, session_n);
+ if (cmd->session_id == session->id) {
+ /* We confirmed the viewer is asking for the same session. */
+ send_streams = 1;
+ response.status = htobe32(VIEWER_NEW_STREAMS_OK);
+ } else {
+ send_streams = 0;
+ response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+ goto send_reply;
+ }
+
+ /*
+ * Fill the viewer_streams_ht to count the number of streams ready to be
+ * sent and avoid concurrency issues on the relay_streams_ht and don't rely
+ * on a total session stream count.
+ */
+ pthread_mutex_lock(&session->viewer_ready_lock);
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ stream_n.node) {
+ struct relay_viewer_stream *vstream;
+
+ health_code_update();
+
+ /*
+ * Don't send stream if no ctf_trace, wrong session or if the stream is
+ * not ready for the viewer.
+ */
+ if (stream->session != cmd->session ||
+ !stream->ctf_trace || !stream->viewer_ready) {
+ continue;
+ }
+
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+ if (!vstream) {
+ ret = init_viewer_stream(stream, 0);
+ if (ret < 0) {
+ pthread_mutex_unlock(&session->viewer_ready_lock);
+ goto end_unlock;
+ }
+ nb_streams++;
+ } else if (!vstream->sent_flag) {
+ nb_streams++;
+ }
+ }
+ pthread_mutex_unlock(&session->viewer_ready_lock);
+
+ response.streams_count = htobe32(nb_streams);
+
+send_reply:
+ health_code_update();
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0);
+ if (ret < 0) {
+ ERR("Relay sending viewer attach response");
+ goto end_unlock;
+ }
+ health_code_update();
+
+ /*
+ * Unknown or empty session, just return gracefully, the viewer knows what
+ * is happening.
+ */
+ if (!send_streams || !nb_streams) {
+ ret = 0;
+ goto end_unlock;
+ }
+
+ /* We should only be there if we have a session to attach to. */
+ cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, viewer_stream,
+ stream_n.node) {
+ health_code_update();
+
+ /* Don't send back if session does not match or already sent. */
+ if (viewer_stream->session_id != cmd->session->id ||
+ viewer_stream->sent_flag) {
+ continue;
+ }
+
+ send_stream.id = htobe64(viewer_stream->stream_handle);
+ send_stream.ctf_trace_id = htobe64(viewer_stream->ctf_trace->id);
+ send_stream.metadata_flag = htobe32(viewer_stream->metadata_flag);
+ strncpy(send_stream.path_name, viewer_stream->path_name,
+ sizeof(send_stream.path_name));
+ strncpy(send_stream.channel_name, viewer_stream->channel_name,
+ sizeof(send_stream.channel_name));
+
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &send_stream,
+ sizeof(send_stream), 0);
+ if (ret < 0) {
+ ERR("Relay sending stream %" PRIu64, viewer_stream->stream_handle);
+ goto end_unlock;
+ }
+ DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle);
+ viewer_stream->sent_flag = 1;
+ }
+
+ ret = 0;
+
+end_unlock:
+ rcu_read_unlock();
+end_no_session:
+error:
+ return ret;
+}
+
/*
* Send the viewer the list of current sessions.
*/
struct lttng_ht *sessions_ht)
{
int ret, send_streams = 0;
- uint32_t nb_streams = 0, nb_streams_ready = 0;
+ uint32_t nb_streams = 0;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct lttng_viewer_stream send_stream;
* ready to be sent and avoid concurrency issues on the
* relay_streams_ht and don't rely on a total session stream count.
*/
+ pthread_mutex_lock(&session->viewer_ready_lock);
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
struct relay_viewer_stream *vstream;
if (stream->session != cmd->session) {
continue;
}
- nb_streams++;
/*
* Don't send streams with no ctf_trace, they are not
if (!stream->ctf_trace || !stream->viewer_ready) {
continue;
}
- nb_streams_ready++;
vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (!vstream) {
ret = init_viewer_stream(stream, seek_last);
if (ret < 0) {
+ pthread_mutex_unlock(&session->viewer_ready_lock);
goto end_unlock;
}
}
+ nb_streams++;
}
+ pthread_mutex_unlock(&session->viewer_ready_lock);
- /* We must have the same amount of existing stream and ready stream. */
- if (nb_streams != nb_streams_ready) {
- nb_streams = 0;
- }
response.streams_count = htobe32(nb_streams);
}
goto end_unlock;
}
DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle);
+ viewer_stream->sent_flag = 1;
}
ret = 0;
case VIEWER_GET_METADATA:
ret = viewer_get_metadata(cmd);
break;
+ case VIEWER_GET_NEW_STREAMS:
+ ret = viewer_get_new_streams(cmd, sessions_ht);
+ break;
default:
ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
live_relay_unknown_command(cmd);