From: Kienan Stewart Date: Fri, 12 Apr 2024 18:27:09 +0000 (-0400) Subject: Fix: relayd: live: Catch short lived applications for attached viewers X-Git-Url: http://git.lttng.org./?a=commitdiff_plain;h=98b82dfa2b79d7fa3d1c7716bf5c59b34a1e6a20;p=lttng-tools.git Fix: relayd: live: Catch short lived applications for attached viewers Observed issue ============== When a live viewer is attached to a session and a new application starts, emits events, and exits the viewer may not see the produced events. With per-UID buffer allocation, the application needs to run as a new user that hasn't had streams allocated before. With per-PID buffers, spawning a new traced application is sufficient. Cause ===== When the new relay streams are created, associated viewer streams are not immediately created. As a result, there is a gap between in which the session may start being destroyed and/or the relay streams unpublished and the time at which the live viewer sends a GET_NEW_STREAMS command. When the relay streams are unpublished for any reason, the reference to the relay stream in the ctf_trace is removed. The new and unsent streams iterate over the relay streams in each ctf_trace. Therefore, relay streams that were created and unpublished while the live viewer was already attached to the session can be completely missed. Solution ======== The solution has three main aspects: 1. When new relayd streams are published and a viewer is attached for the corresponding relay session or when a live viewer session attaches to an existing relay session the viewer streams are created immediately. 2. The unsent viewer streams are tracked in a per-viewer session list so that there continues to be a reference (via the viewer_stream->stream backreference) held for the relay stream, and that unpublished relay streams can be found without iterating over the entire relay streams hashtable. 3. To cover cases where a relay stream has been closed but there are still known trace chunks available, an additional check has been added to the `get_next_index` viewer stream transition checks. When the seen rotation count and relay stream rotation count are the same and that the relay stream no longer has an active trace chunk, the viewer stream is not forcibly rotated. This stops the final drop to the trace chunk reference (via viewerstream->stream_file->trace_chunk). Later, when the relay stream is fully closed, there is a final rotation that is performed. Known drawbacks =============== The current implementation adds a global hash table which holds references to created viewer sessions. When searching to determine if new viewer streams should be created, the search is O(N*M) where N is the number of viewer sessons and M is the number of relay sessions. A different approach to recording references from relay sessions to viewer sessions (if any exist) could reduce the search space. Change-Id: Ie8f00697a4dafd5c9b0bfe60a872d1c1882f6944 Signed-off-by: Kienan Stewart Signed-off-by: Jérémie Galarneau --- diff --git a/doc/relayd-architecture.txt b/doc/relayd-architecture.txt index 56c81241f..1d85f0be6 100644 --- a/doc/relayd-architecture.txt +++ b/doc/relayd-architecture.txt @@ -22,6 +22,8 @@ relay connection (main.c, for sessiond/consumer) live connection (live.c, for client) | \-> 1 viewer session + | | + | \-> (transient ref) 0 or many unannounced relay streams | \-> 0 or many session (actually a reference to session as created | by the relay connection) @@ -30,8 +32,9 @@ live connection (live.c, for client) There are global tables declared in lttng-relayd.h for sessions (sessions_ht, indexed by session id), streams (relay_streams_ht, indexed -by stream handle), and viewer streams (viewer_streams_ht, indexed by -stream handle). The purpose of those tables is to allow fast lookup of +by stream handle), viewer sessions (viewer_sessions_ht, indexed by +connection sock fd), and viewer streams (viewer_streams_ht, indexed +by stream handle). The purpose of those tables is to allow fast lookup of those objects using the IDs received in the communication protocols. There is also one connection hash table per worker thread. There is one @@ -56,7 +59,10 @@ There is also a "lock" mutex in each object. Those are used to synchronize between threads (currently the main.c relay thread and live.c client thread) when objects are shared. Locks can be nested from the outermost object to the innermost object. IOW, the ctf-trace lock can -nest within the session lock. +nest within the session lock. The unannounced stream list lock in viewer +sessions is an exception to the default locking order: it may be nested +inside the following locks (in order): relay session, ctf_trace, and relay +stream. RCU linked lists are used to iterate using RCU, and are protected by their own mutex for modifications. Iterations should be confirmed using diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index 73548cfb3..250e05c2e 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -267,14 +267,62 @@ end: return ret; } +/* + * Sends one viewer stream to the given socket. + * + * This function needs to be called with the stream locked. + * + * Return 0 on success, or else a negative value. + */ +static ssize_t send_one_viewer_stream(struct lttcomm_sock *sock, + struct relay_viewer_stream *vstream) +{ + struct ctf_trace *ctf_trace; + struct lttng_viewer_stream send_stream = {}; + ssize_t ret = -1; + + ASSERT_LOCKED(vstream->stream->lock); + + ctf_trace = vstream->stream->trace; + send_stream.id = htobe64(vstream->stream->stream_handle); + send_stream.ctf_trace_id = htobe64(ctf_trace->id); + send_stream.metadata_flag = htobe32(vstream->stream->is_metadata); + if (lttng_strncpy( + send_stream.path_name, vstream->path_name, sizeof(send_stream.path_name))) { + ret = -1; /* Error. */ + goto end; + } + if (lttng_strncpy(send_stream.channel_name, + vstream->channel_name, + sizeof(send_stream.channel_name))) { + ret = -1; /* Error. */ + goto end; + } + + DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); + vstream->sent_flag = true; + + ret = send_response(sock, &send_stream, sizeof(send_stream)); + if (ret < 0) { + goto end; + } + + ret = 0; + +end: + return ret; +} + /* * Send viewer streams to the given socket. The ignore_sent_flag indicates if * this function should ignore the sent flag or not. * * Return 0 on success or else a negative value. */ -static ssize_t -send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag) +static ssize_t send_viewer_streams(struct lttcomm_sock *sock, + uint64_t session_id, + unsigned int ignore_sent_flag, + struct relay_viewer_session *viewer_session) { ssize_t ret; @@ -283,8 +331,6 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int decltype(relay_viewer_stream::stream_n), &relay_viewer_stream::stream_n>( *viewer_streams_ht->ht)) { - struct ctf_trace *ctf_trace; - struct lttng_viewer_stream send_stream = {}; health_code_update(); @@ -301,36 +347,50 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int continue; } - ctf_trace = vstream->stream->trace; - send_stream.id = htobe64(vstream->stream->stream_handle); - send_stream.ctf_trace_id = htobe64(ctf_trace->id); - send_stream.metadata_flag = htobe32(vstream->stream->is_metadata); - if (lttng_strncpy(send_stream.path_name, - vstream->path_name, - sizeof(send_stream.path_name))) { - pthread_mutex_unlock(&vstream->stream->lock); + ret = send_one_viewer_stream(sock, vstream); + pthread_mutex_unlock(&vstream->stream->lock); + if (ret < 0) { viewer_stream_put(vstream); - ret = -1; /* Error. */ goto end; } - if (lttng_strncpy(send_stream.channel_name, - vstream->channel_name, - sizeof(send_stream.channel_name))) { + + pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock); + cds_list_del_rcu(&vstream->viewer_stream_node); + pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock); + viewer_stream_put(vstream); + } + + /* + * Any remaining streams that have been seen, but are perhaps unpublished + * due to a session being destroyed in between attach and get_new_streams. + */ + for (auto *vstream : lttng::urcu::rcu_list_iteration_adapter(viewer_session->unannounced_stream_list)) { + health_code_update(); + if (!viewer_stream_get(vstream)) { + continue; + } + + pthread_mutex_lock(&vstream->stream->lock); + if (vstream->stream->trace->session->id != session_id) { pthread_mutex_unlock(&vstream->stream->lock); viewer_stream_put(vstream); - ret = -1; /* Error. */ - goto end; + continue; } - DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); - vstream->sent_flag = true; + ret = send_one_viewer_stream(sock, vstream); pthread_mutex_unlock(&vstream->stream->lock); - - ret = send_response(sock, &send_stream, sizeof(send_stream)); - viewer_stream_put(vstream); if (ret < 0) { + viewer_stream_put(vstream); goto end; } + + pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock); + cds_list_del_rcu(&vstream->viewer_stream_node); + viewer_stream_put(vstream); + pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock); + viewer_stream_put(vstream); + } ret = 0; @@ -350,12 +410,12 @@ end: * * Return 0 on success or else a negative value. */ -static int make_viewer_streams(struct relay_session *relay_session, +int make_viewer_streams(struct relay_session *relay_session, struct relay_viewer_session *viewer_session, enum lttng_viewer_seek seek_t, - uint32_t *nb_total, - uint32_t *nb_unsent, - uint32_t *nb_created, + unsigned int *nb_total, + unsigned int *nb_unsent, + unsigned int *nb_created, bool *closed) { int ret; @@ -367,6 +427,47 @@ static int make_viewer_streams(struct relay_session *relay_session, *closed = true; } + /* + * Check unannounced viewer streams for any that have been seen but are no longer published. + */ + for (auto *viewer_stream : lttng::urcu::rcu_list_iteration_adapter(viewer_session->unannounced_stream_list)) { + if (!viewer_stream_get(viewer_stream)) { + DBG("Couldn't get reference for viewer_stream"); + continue; + } + + if (viewer_stream->sent_flag) { + ERR("logic error -> viewer stream %ld is in unannounced_stream_list is marked as sent", + viewer_stream->stream->stream_handle); + abort(); + } + + if (viewer_stream->stream->published) { + /* + * This stream should be handled later when iterating via the + * ctf_traces + */ + viewer_stream_put(viewer_stream); + continue; + } + + if (viewer_stream->stream->trace->session->id != relay_session->id) { + viewer_stream_put(viewer_stream); + continue; + } + + if (nb_unsent) { + (*nb_unsent)++; + } + + if (nb_total) { + (*nb_total)++; + } + + viewer_stream_put(viewer_stream); + } + /* * Create viewer streams for relay streams that are ready to be * used for a the given session id only. @@ -384,6 +485,11 @@ static int make_viewer_streams(struct relay_session *relay_session, auto ctf_trace = lttng::make_unique_wrapper(raw_ctf_trace); + /* + * The trace metadata state may be updated while iterating over all the + * relay streams associated with the trace, so the lock is required. + */ + const lttng::pthread::lock_guard ctf_trace_lock(ctf_trace->lock); /* * Iterate over all the streams of the trace to see if we have a @@ -513,6 +619,25 @@ static int make_viewer_streams(struct relay_session *relay_session, goto end; } + /* + * Add the new stream to the list of streams to publish for + * this session. + */ + pthread_mutex_lock( + &viewer_session->unannounced_stream_list_lock); + cds_list_add_rcu(&viewer_stream->viewer_stream_node, + &viewer_session->unannounced_stream_list); + pthread_mutex_unlock( + &viewer_session->unannounced_stream_list_lock); + /* + * Get for the unannounced stream list, this should be + * put when the unannounced stream is sent. + */ + if (!viewer_stream_get(viewer_stream)) { + ERR("Unable to get self-reference on viewer stream"); + abort(); + } + if (nb_created) { /* Update number of created stream counter. */ (*nb_created)++; @@ -533,7 +658,7 @@ static int make_viewer_streams(struct relay_session *relay_session, } /* Update number of total stream counter. */ if (nb_total) { - if (stream->is_metadata) { + if (stream->is_metadata) { if (!stream->closed || stream->metadata_received > viewer_stream->metadata_sent) { @@ -1124,7 +1249,7 @@ end_free: static int viewer_get_new_streams(struct relay_connection *conn) { int ret, send_streams = 0; - uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0; + unsigned int nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0; struct lttng_viewer_new_streams_request request; struct lttng_viewer_new_streams_response response; struct relay_session *session = nullptr; @@ -1199,7 +1324,7 @@ static int viewer_get_new_streams(struct relay_connection *conn) response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); /* Only send back the newly created streams with the unsent ones. */ - nb_streams = nb_created + nb_unsent; + nb_streams = nb_unsent + nb_created; response.streams_count = htobe32(nb_streams); /* @@ -1237,7 +1362,7 @@ send_reply: * streams that were not sent from that point will be sent to * the viewer. */ - ret = send_viewer_streams(conn->sock, session_id, 0); + ret = send_viewer_streams(conn->sock, session_id, 0, conn->viewer_session); if (ret < 0) { goto end_put_session; } @@ -1257,7 +1382,7 @@ static int viewer_attach_session(struct relay_connection *conn) { int send_streams = 0; ssize_t ret; - uint32_t nb_streams = 0; + unsigned int nb_streams = 0; enum lttng_viewer_seek seek_type; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; @@ -1390,7 +1515,7 @@ send_reply: } /* Send stream and ignore the sent flag. */ - ret = send_viewer_streams(conn->sock, session_id, 1); + ret = send_viewer_streams(conn->sock, session_id, 1, conn->viewer_session); if (ret < 0) { goto end_put_session; } @@ -1786,6 +1911,20 @@ static int viewer_get_next_index(struct relay_connection *conn) conn->viewer_session->current_trace_chunk ? std::to_string(viewer_session_chunk_id).c_str() : "None"); + } else if (vstream->stream_file.trace_chunk && rstream->completed_rotation_count == vstream->last_seen_rotation_count && !rstream->trace_chunk) { + /* + * When a relay stream is closed, there is a window before the rotation of the + * streams happens, during which the next index may be fetched. If the seen + * rotations are the same and the relay stream trace chunk is null, don't rotate. + * When the close finishes, the rotation count on the relay stream will go up. + */ + DBG("Transition to latest chunk check (%s -> %s): relay stream chunk is null, but viewer stream knows a chunk and isn't yet behind a rotation", + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); } else { DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream", vstream->stream_file.trace_chunk ? @@ -1820,7 +1959,7 @@ static int viewer_get_next_index(struct relay_connection *conn) if (rstream->closed) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; DBG("Cannot open index for stream id %" PRIu64 - "stream is closed, returning status=%s", + " stream is closed, returning status=%s", (uint64_t) be64toh(request_index.stream_id), lttng_viewer_next_index_return_code_str( (enum lttng_viewer_next_index_return_code) viewer_index.status)); @@ -2181,7 +2320,7 @@ static int viewer_get_metadata(struct relay_connection *conn) bool acquired_reference; DBG("Viewer session and viewer stream chunk differ: " - "vsession chunk %p vstream chunk %p", + "vsession chunk %p vstream chunk=%p", conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); lttng_trace_chunk_put(vstream->stream_file.trace_chunk); diff --git a/src/bin/lttng-relayd/live.hpp b/src/bin/lttng-relayd/live.hpp index 53f67d30e..47926d2ab 100644 --- a/src/bin/lttng-relayd/live.hpp +++ b/src/bin/lttng-relayd/live.hpp @@ -18,4 +18,11 @@ int relayd_live_create(struct lttng_uri *live_uri); int relayd_live_stop(); int relayd_live_join(); +int make_viewer_streams(struct relay_session *relay_session, + struct relay_viewer_session *viewer_session, + enum lttng_viewer_seek seek_t, + unsigned int *nb_total, + unsigned int *nb_unsent, + unsigned int *nb_created, + bool *closed); #endif /* LTTNG_RELAYD_LIVE_H */ diff --git a/src/bin/lttng-relayd/lttng-relayd.hpp b/src/bin/lttng-relayd/lttng-relayd.hpp index bf86575f5..052209b47 100644 --- a/src/bin/lttng-relayd/lttng-relayd.hpp +++ b/src/bin/lttng-relayd/lttng-relayd.hpp @@ -42,6 +42,7 @@ enum relay_group_output_by { */ extern struct lttng_ht *sessions_ht; extern struct lttng_ht *relay_streams_ht; +extern struct lttng_ht *viewer_sessions_ht; extern struct lttng_ht *viewer_streams_ht; extern struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry; diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 42ec33e52..c26050b45 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -25,6 +25,7 @@ #include "tracefile-array.hpp" #include "utils.hpp" #include "version.hpp" +#include "viewer-session.hpp" #include "viewer-stream.hpp" #include @@ -169,6 +170,9 @@ struct lttng_ht *viewer_streams_ht; /* Global relay sessions hash table. */ struct lttng_ht *sessions_ht; +/* Global viewer sessions hash table. */ +struct lttng_ht *viewer_sessions_ht; + /* Relayd health monitoring */ struct health_app *health_relayd; @@ -774,11 +778,13 @@ static void relayd_cleanup() if (viewer_streams_ht) lttng_ht_destroy(viewer_streams_ht); + if (viewer_sessions_ht) { + lttng_ht_destroy(viewer_sessions_ht); + } if (relay_streams_ht) lttng_ht_destroy(relay_streams_ht); if (sessions_ht) lttng_ht_destroy(sessions_ht); - free(opt_output_path); free(opt_working_directory); @@ -1513,6 +1519,10 @@ end: static void publish_connection_local_streams(struct relay_connection *conn) { struct relay_session *session = conn->session; + unsigned int created = 0; + bool closed = false; + + LTTNG_ASSERT(viewer_sessions_ht); /* * We publish all streams belonging to a session atomically wrt @@ -1529,9 +1539,57 @@ static void publish_connection_local_streams(struct relay_connection *conn) /* * Inform the viewer that there are new streams in the session. */ - if (session->viewer_attached) { - uatomic_set(&session->new_streams, 1); + if (!session->viewer_attached) { + goto unlock; + } + + /* + * Create viewer_streams for all the newly published streams for this relay session. + * This searches through all known viewer sessions and finds those that are + * attached to this connection's relay session. This is done so that the newer + * viewer streams will hold a reference on any relay streams that already exist, + * but may be unpublished between now and the next GET_NEW_STREAMS from the + * attached live viewer. + */ + for (auto *viewer_session: lttng::urcu::lfht_iteration_adapter(*viewer_sessions_ht->ht)) + { + for (auto *session_iter: lttng::urcu::rcu_list_iteration_adapter(viewer_session->session_list)) + { + if (session != session_iter) { + continue; + } + const int ret = make_viewer_streams(session, + viewer_session, + LTTNG_VIEWER_SEEK_BEGINNING, + nullptr, + nullptr, + &created, + &closed); + if (ret == 0) { + DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64, + created, + session->id); + } else if (ret < 0) { + /* + * Warning, since the creation of the + * streams will be retried when the viewer + * next sends the GET_NEW_STREAMS again. + */ + WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64 + ", ret=%d, created=%d, closed=%d", + session->id, + ret, + created, + closed); + } + } } +unlock: + uatomic_set(&session->new_streams, 1); + pthread_mutex_unlock(&session->lock); } static int conform_channel_path(char *channel_path) @@ -4393,6 +4451,13 @@ int main(int argc, char **argv) goto exit_options; } + /* tables of viewer sessions indexed by session ID */ + viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!viewer_sessions_ht) { + retval = -1; + goto exit_options; + } + /* tables of streams indexed by stream ID */ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!viewer_streams_ht) { diff --git a/src/bin/lttng-relayd/viewer-session.cpp b/src/bin/lttng-relayd/viewer-session.cpp index 9b6640f6b..1ed625f75 100644 --- a/src/bin/lttng-relayd/viewer-session.cpp +++ b/src/bin/lttng-relayd/viewer-session.cpp @@ -9,6 +9,7 @@ #define _LGPL_SOURCE #include "ctf-trace.hpp" +#include "live.hpp" #include "lttng-relayd.hpp" #include "session.hpp" #include "stream.hpp" @@ -20,6 +21,10 @@ #include +/* Global session id used in the session creation. */ +static uint64_t last_viewer_session_id; +static pthread_mutex_t last_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER; + struct relay_viewer_session *viewer_session_create() { struct relay_viewer_session *vsession; @@ -28,7 +33,14 @@ struct relay_viewer_session *viewer_session_create() if (!vsession) { goto end; } + pthread_mutex_lock(&last_viewer_session_id_lock); + vsession->id = ++last_viewer_session_id; + pthread_mutex_unlock(&last_viewer_session_id_lock); CDS_INIT_LIST_HEAD(&vsession->session_list); + CDS_INIT_LIST_HEAD(&vsession->unannounced_stream_list); + pthread_mutex_init(&vsession->unannounced_stream_list_lock, nullptr); + lttng_ht_node_init_u64(&vsession->viewer_session_n, vsession->id); + lttng_ht_add_unique_u64(viewer_sessions_ht, &vsession->viewer_session_n); end: return vsession; } @@ -98,6 +110,40 @@ enum lttng_viewer_attach_return_code viewer_session_attach(struct relay_viewer_s /* Ownership is transfered to the list. */ cds_list_add_rcu(&session->viewer_session_node, &vsession->session_list); pthread_mutex_unlock(&vsession->session_list_lock); + + /* + * Immediately create new viewer streams for the attached session + * so that the viewer streams hold a reference on the any relay + * streams that could be unpublished between now and the next + * GET_NEW_STREAMS command from the live viewer. + */ + uint32_t created = 0; + uint32_t total = 0; + uint32_t unsent = 0; + bool closed = false; + const int make_viewer_streams_ret = make_viewer_streams(session, + vsession, + LTTNG_VIEWER_SEEK_BEGINNING, + &total, + &unsent, + &created, + &closed); + + if (make_viewer_streams_ret == 0) { + DBG("Created %d new viewer streams while attaching to relay session %" PRIu64, created, session->id); + } else { + /* + * Warning, since the creation of the streams will be retried when + * the viewer next sends the GET_NEW_STREAMS commands. + */ + WARN("Failed to create new viewer streams while attaching to relay session %" PRIu64 ", ret=%d, total=%d, unsent=%d, created=%d, closed=%d", + session->id, + make_viewer_streams_ret, + total, + unsent, + created, + closed); + } } else { /* Put our local ref. */ session_put(session); @@ -126,6 +172,7 @@ static int viewer_session_detach(struct relay_viewer_session *vsession, /* Release reference held by the list. */ session_put(session); } + /* Safe since we know the session exists. */ pthread_mutex_unlock(&session->lock); return ret; @@ -133,6 +180,12 @@ static int viewer_session_detach(struct relay_viewer_session *vsession, void viewer_session_destroy(struct relay_viewer_session *vsession) { + struct lttng_ht_iter iter; + + LTTNG_ASSERT(cds_list_empty(&vsession->unannounced_stream_list)); + + iter.iter.node = &vsession->viewer_session_n.node; + lttng_ht_del(viewer_sessions_ht, &iter); lttng_trace_chunk_put(vsession->current_trace_chunk); free(vsession); } @@ -155,6 +208,7 @@ void viewer_session_close_one_session(struct relay_viewer_session *vsession, if (!viewer_stream_get(vstream)) { continue; } + if (vstream->stream->trace->session != session) { viewer_stream_put(vstream); continue; @@ -169,6 +223,25 @@ void viewer_session_close_one_session(struct relay_viewer_session *vsession, viewer_stream_put(vstream); } + for (auto *vstream: lttng::urcu::rcu_list_iteration_adapter(vsession->unannounced_stream_list)) + { + if (!viewer_stream_get(vstream)) { + continue; + } + if (vstream->stream->trace->session != session) { + viewer_stream_put(vstream); + continue; + } + pthread_mutex_lock(&vsession->unannounced_stream_list_lock); + cds_list_del_rcu(&vstream->viewer_stream_node); + pthread_mutex_unlock(&vsession->unannounced_stream_list_lock); + /* Local reference */ + viewer_stream_put(vstream); + /* Reference from unannounced_stream_list */ + viewer_stream_put(vstream); + } + lttng_trace_chunk_put(vsession->current_trace_chunk); vsession->current_trace_chunk = nullptr; viewer_session_detach(vsession, session); diff --git a/src/bin/lttng-relayd/viewer-session.hpp b/src/bin/lttng-relayd/viewer-session.hpp index 3dfa3acea..728676b95 100644 --- a/src/bin/lttng-relayd/viewer-session.hpp +++ b/src/bin/lttng-relayd/viewer-session.hpp @@ -23,6 +23,10 @@ #include struct relay_viewer_session { + /* + * The id of the relay viewer session. Uses the associated connection's socket FD. + */ + uint64_t id; /* * Session list. Updates are protected by the session_list_lock. * Traversals are protected by RCU. @@ -31,6 +35,20 @@ struct relay_viewer_session { */ struct cds_list_head session_list; /* RCU list. */ pthread_mutex_t session_list_lock; /* Protects list updates. */ + /* + * Unannounced stream list. Updates are protected by the + * unannounced_stream_list_lock. This lock nests inside + * the following locks (in order): relay session, ctf_trace, + * and relay stream. + * + * Traversals are protected by RCU. + */ + struct cds_list_head unannounced_stream_list; + pthread_mutex_t unannounced_stream_list_lock; + /* + * Node in the global viewer sessions hashtable. + */ + struct lttng_ht_node_u64 viewer_session_n; /* * The viewer session's current trace chunk is initially set, when * a viewer attaches to the viewer session, to a copy the corresponding diff --git a/src/bin/lttng-relayd/viewer-stream.cpp b/src/bin/lttng-relayd/viewer-stream.cpp index 066571743..ef0dfbdd3 100644 --- a/src/bin/lttng-relayd/viewer-stream.cpp +++ b/src/bin/lttng-relayd/viewer-stream.cpp @@ -69,7 +69,6 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, PERROR("relay viewer stream zmalloc"); goto error; } - if (trace_chunk) { const bool acquired_reference = lttng_trace_chunk_get(trace_chunk); @@ -211,7 +210,6 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); urcu_ref_init(&vstream->ref); lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - return vstream; error: diff --git a/src/bin/lttng-relayd/viewer-stream.hpp b/src/bin/lttng-relayd/viewer-stream.hpp index ea6292b2c..977a7cfa9 100644 --- a/src/bin/lttng-relayd/viewer-stream.hpp +++ b/src/bin/lttng-relayd/viewer-stream.hpp @@ -39,6 +39,13 @@ struct relay_viewer_stream { /* Back ref to stream. */ struct relay_stream *stream; + /* + * Member of unannounced_stream_list in struct viewer_sesion. + * Updates are protected by the unannounced_stream_list_lock, and + * traversals are protected by RCU. + */ + struct cds_list_head viewer_stream_node; + struct { struct fs_handle *handle; struct lttng_trace_chunk *trace_chunk;