From: Jérémie Galarneau Date: Tue, 30 Jul 2024 19:58:10 +0000 (+0000) Subject: relayd: live.cpp: iterate on rcu list using rcu_list_iteration_adapter X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=a402293a78cfff43a7f6b35f21a38a873e3b5c54;p=lttng-tools.git relayd: live.cpp: iterate on rcu list using rcu_list_iteration_adapter Change-Id: I89824eb36bb317a424880f34dc962cf7b1eca1ed Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index 02e7aacf6..73548cfb3 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -30,6 +30,8 @@ #include #include #include +#include +#include #include #include #include @@ -237,27 +239,24 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) */ static int check_new_streams(struct relay_connection *conn) { - struct relay_session *session; int ret = 0; if (!conn->viewer_session) { goto end; } - { - const lttng::urcu::read_lock_guard read_lock; - cds_list_for_each_entry_rcu( - session, &conn->viewer_session->session_list, viewer_session_node) - { - if (!session_get(session)) { - continue; - } + for (auto *session : + lttng::urcu::rcu_list_iteration_adapter( + conn->viewer_session->session_list)) { + if (!session_get(session)) { + continue; + } - ret = uatomic_read(&session->new_streams); - session_put(session); - if (ret == 1) { - goto end; - } + ret = uatomic_read(&session->new_streams); + session_put(session); + if (ret == 1) { + goto end; } } @@ -360,7 +359,6 @@ static int make_viewer_streams(struct relay_session *relay_session, bool *closed) { int ret; - struct relay_stream *relay_stream = nullptr; LTTNG_ASSERT(relay_session); ASSERT_LOCKED(relay_session->lock); @@ -373,28 +371,33 @@ static int make_viewer_streams(struct relay_session *relay_session, * Create viewer streams for relay streams that are ready to be * used for a the given session id only. */ - for (auto *ctf_trace : lttng::urcu:: + for (auto *raw_ctf_trace : lttng::urcu:: lfht_iteration_adapter( *relay_session->ctf_traces_ht->ht)) { bool trace_has_metadata_stream = false; health_code_update(); - if (!ctf_trace_get(ctf_trace)) { + if (!ctf_trace_get(raw_ctf_trace)) { continue; } + auto ctf_trace = + lttng::make_unique_wrapper(raw_ctf_trace); + /* * Iterate over all the streams of the trace to see if we have a * metadata stream. */ - cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node) - { + for (auto *stream : + lttng::urcu::rcu_list_iteration_adapter( + ctf_trace->stream_list)) { bool is_metadata_stream; - pthread_mutex_lock(&relay_stream->lock); - is_metadata_stream = relay_stream->is_metadata; - pthread_mutex_unlock(&relay_stream->lock); + pthread_mutex_lock(&stream->lock); + is_metadata_stream = stream->is_metadata; + pthread_mutex_unlock(&stream->lock); if (is_metadata_stream) { trace_has_metadata_stream = true; @@ -402,34 +405,37 @@ static int make_viewer_streams(struct relay_session *relay_session, } } - relay_stream = nullptr; - /* * If there is no metadata stream in this trace at the moment * and we never sent one to the viewer, skip the trace. We * accept that the viewer will not see this trace at all. */ if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) { - ctf_trace_put(ctf_trace); continue; } - cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node) - { + for (auto *raw_stream : + lttng::urcu::rcu_list_iteration_adapter( + ctf_trace->stream_list)) { struct relay_viewer_stream *viewer_stream; - if (!stream_get(relay_stream)) { + if (!stream_get(raw_stream)) { continue; } - pthread_mutex_lock(&relay_stream->lock); + auto stream = + lttng::make_unique_wrapper(raw_stream); + raw_stream = nullptr; + + const lttng::pthread::lock_guard stream_lock(stream->lock); /* * stream published is protected by the session lock. */ - if (!relay_stream->published) { - goto next; + if (!stream->published) { + continue; } - viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle); + viewer_stream = viewer_stream_get_by_id(stream->stream_handle); if (!viewer_stream) { struct lttng_trace_chunk *viewer_stream_trace_chunk = nullptr; @@ -438,7 +444,7 @@ static int make_viewer_streams(struct relay_session *relay_session, * viewer. So that we know what trace the viewer * is aware of. */ - if (relay_stream->is_metadata) { + if (stream->is_metadata) { ctf_trace->metadata_stream_sent_to_viewer = true; } @@ -450,15 +456,14 @@ static int make_viewer_streams(struct relay_session *relay_session, * Otherwise, the viewer session's current trace * chunk can be used safely. */ - if ((relay_stream->ongoing_rotation.is_set || + if ((stream->ongoing_rotation.is_set || session_has_ongoing_rotation(relay_session)) && - relay_stream->trace_chunk) { + stream->trace_chunk) { viewer_stream_trace_chunk = - lttng_trace_chunk_copy(relay_stream->trace_chunk); + lttng_trace_chunk_copy(stream->trace_chunk); if (!viewer_stream_trace_chunk) { ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; + goto end; } } else { /* @@ -467,17 +472,16 @@ static int make_viewer_streams(struct relay_session *relay_session, */ if (!lttng_trace_chunk_ids_equal( viewer_session->current_trace_chunk, - relay_stream->trace_chunk)) { + stream->trace_chunk)) { ret = viewer_session_set_trace_chunk_copy( - viewer_session, relay_stream->trace_chunk); + viewer_session, stream->trace_chunk); if (ret) { ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; + goto end; } } - if (relay_stream->trace_chunk) { + if (stream->trace_chunk) { /* * If the corresponding relay * stream's trace chunk is set, @@ -501,13 +505,12 @@ static int make_viewer_streams(struct relay_session *relay_session, } viewer_stream = viewer_stream_create( - relay_stream, viewer_stream_trace_chunk, seek_t); + stream.get(), viewer_stream_trace_chunk, seek_t); lttng_trace_chunk_put(viewer_stream_trace_chunk); viewer_stream_trace_chunk = nullptr; if (!viewer_stream) { ret = -1; - ctf_trace_put(ctf_trace); - goto error_unlock; + goto end; } if (nb_created) { @@ -530,39 +533,28 @@ static int make_viewer_streams(struct relay_session *relay_session, } /* Update number of total stream counter. */ if (nb_total) { - if (relay_stream->is_metadata) { - if (!relay_stream->closed || - relay_stream->metadata_received > + if (stream->is_metadata) { + if (!stream->closed || + stream->metadata_received > viewer_stream->metadata_sent) { (*nb_total)++; } } else { - if (!relay_stream->closed || - !(((int64_t) (relay_stream->prev_data_seq - - relay_stream->last_net_seq_num)) >= 0)) { + if (!stream->closed || + !(((int64_t) (stream->prev_data_seq - + stream->last_net_seq_num)) >= 0)) { (*nb_total)++; } } } /* Put local reference. */ viewer_stream_put(viewer_stream); - next: - pthread_mutex_unlock(&relay_stream->lock); - stream_put(relay_stream); } - relay_stream = nullptr; - ctf_trace_put(ctf_trace); } ret = 0; -error_unlock: - - if (relay_stream) { - pthread_mutex_unlock(&relay_stream->lock); - stream_put(relay_stream); - } - +end: return ret; }