relayd: live.cpp: iterate on rcu list using rcu_list_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 19:58:10 +0000 (19:58 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 31 Jul 2024 03:36:52 +0000 (23:36 -0400)
Change-Id: I89824eb36bb317a424880f34dc962cf7b1eca1ed
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.cpp

index 02e7aacf686c6235c11769dc7ec4eed95c79b428..73548cfb33cf36e6d52218dd2bcf71f1069129f7 100644 (file)
@@ -30,6 +30,8 @@
 #include <common/fs-handle.hpp>
 #include <common/futex.hpp>
 #include <common/index/index.hpp>
+#include <common/make-unique-wrapper.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/sessiond-comm/inet.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
@@ -237,27 +239,24 @@ static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size)
  */
 static int check_new_streams(struct relay_connection *conn)
 {
-       struct relay_session *session;
        int ret = 0;
 
        if (!conn->viewer_session) {
                goto end;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_list_for_each_entry_rcu(
-                       session, &conn->viewer_session->session_list, viewer_session_node)
-               {
-                       if (!session_get(session)) {
-                               continue;
-                       }
+       for (auto *session :
+            lttng::urcu::rcu_list_iteration_adapter<relay_session,
+                                                    &relay_session::viewer_session_node>(
+                    conn->viewer_session->session_list)) {
+               if (!session_get(session)) {
+                       continue;
+               }
 
-                       ret = uatomic_read(&session->new_streams);
-                       session_put(session);
-                       if (ret == 1) {
-                               goto end;
-                       }
+               ret = uatomic_read(&session->new_streams);
+               session_put(session);
+               if (ret == 1) {
+                       goto end;
                }
        }
 
@@ -360,7 +359,6 @@ static int make_viewer_streams(struct relay_session *relay_session,
                               bool *closed)
 {
        int ret;
-       struct relay_stream *relay_stream = nullptr;
 
        LTTNG_ASSERT(relay_session);
        ASSERT_LOCKED(relay_session->lock);
@@ -373,28 +371,33 @@ static int make_viewer_streams(struct relay_session *relay_session,
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
         */
-       for (auto *ctf_trace : lttng::urcu::
+       for (auto *raw_ctf_trace : lttng::urcu::
                     lfht_iteration_adapter<ctf_trace, decltype(ctf_trace::node), &ctf_trace::node>(
                             *relay_session->ctf_traces_ht->ht)) {
                bool trace_has_metadata_stream = false;
 
                health_code_update();
 
-               if (!ctf_trace_get(ctf_trace)) {
+               if (!ctf_trace_get(raw_ctf_trace)) {
                        continue;
                }
 
+               auto ctf_trace =
+                       lttng::make_unique_wrapper<struct ctf_trace, ctf_trace_put>(raw_ctf_trace);
+
                /*
                 * Iterate over all the streams of the trace to see if we have a
                 * metadata stream.
                 */
-               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
-               {
+               for (auto *stream :
+                    lttng::urcu::rcu_list_iteration_adapter<relay_stream,
+                                                            &relay_stream::stream_node>(
+                            ctf_trace->stream_list)) {
                        bool is_metadata_stream;
 
-                       pthread_mutex_lock(&relay_stream->lock);
-                       is_metadata_stream = relay_stream->is_metadata;
-                       pthread_mutex_unlock(&relay_stream->lock);
+                       pthread_mutex_lock(&stream->lock);
+                       is_metadata_stream = stream->is_metadata;
+                       pthread_mutex_unlock(&stream->lock);
 
                        if (is_metadata_stream) {
                                trace_has_metadata_stream = true;
@@ -402,34 +405,37 @@ static int make_viewer_streams(struct relay_session *relay_session,
                        }
                }
 
-               relay_stream = nullptr;
-
                /*
                 * If there is no metadata stream in this trace at the moment
                 * and we never sent one to the viewer, skip the trace. We
                 * accept that the viewer will not see this trace at all.
                 */
                if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) {
-                       ctf_trace_put(ctf_trace);
                        continue;
                }
 
-               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
-               {
+               for (auto *raw_stream :
+                    lttng::urcu::rcu_list_iteration_adapter<relay_stream,
+                                                            &relay_stream::stream_node>(
+                            ctf_trace->stream_list)) {
                        struct relay_viewer_stream *viewer_stream;
 
-                       if (!stream_get(relay_stream)) {
+                       if (!stream_get(raw_stream)) {
                                continue;
                        }
 
-                       pthread_mutex_lock(&relay_stream->lock);
+                       auto stream =
+                               lttng::make_unique_wrapper<relay_stream, stream_put>(raw_stream);
+                       raw_stream = nullptr;
+
+                       const lttng::pthread::lock_guard stream_lock(stream->lock);
                        /*
                         * stream published is protected by the session lock.
                         */
-                       if (!relay_stream->published) {
-                               goto next;
+                       if (!stream->published) {
+                               continue;
                        }
-                       viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle);
+                       viewer_stream = viewer_stream_get_by_id(stream->stream_handle);
                        if (!viewer_stream) {
                                struct lttng_trace_chunk *viewer_stream_trace_chunk = nullptr;
 
@@ -438,7 +444,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * viewer. So that we know what trace the viewer
                                 * is aware of.
                                 */
-                               if (relay_stream->is_metadata) {
+                               if (stream->is_metadata) {
                                        ctf_trace->metadata_stream_sent_to_viewer = true;
                                }
 
@@ -450,15 +456,14 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                 * Otherwise, the viewer session's current trace
                                 * chunk can be used safely.
                                 */
-                               if ((relay_stream->ongoing_rotation.is_set ||
+                               if ((stream->ongoing_rotation.is_set ||
                                     session_has_ongoing_rotation(relay_session)) &&
-                                   relay_stream->trace_chunk) {
+                                   stream->trace_chunk) {
                                        viewer_stream_trace_chunk =
-                                               lttng_trace_chunk_copy(relay_stream->trace_chunk);
+                                               lttng_trace_chunk_copy(stream->trace_chunk);
                                        if (!viewer_stream_trace_chunk) {
                                                ret = -1;
-                                               ctf_trace_put(ctf_trace);
-                                               goto error_unlock;
+                                               goto end;
                                        }
                                } else {
                                        /*
@@ -467,17 +472,16 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                         */
                                        if (!lttng_trace_chunk_ids_equal(
                                                    viewer_session->current_trace_chunk,
-                                                   relay_stream->trace_chunk)) {
+                                                   stream->trace_chunk)) {
                                                ret = viewer_session_set_trace_chunk_copy(
-                                                       viewer_session, relay_stream->trace_chunk);
+                                                       viewer_session, stream->trace_chunk);
                                                if (ret) {
                                                        ret = -1;
-                                                       ctf_trace_put(ctf_trace);
-                                                       goto error_unlock;
+                                                       goto end;
                                                }
                                        }
 
-                                       if (relay_stream->trace_chunk) {
+                                       if (stream->trace_chunk) {
                                                /*
                                                 * If the corresponding relay
                                                 * stream's trace chunk is set,
@@ -501,13 +505,12 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                }
 
                                viewer_stream = viewer_stream_create(
-                                       relay_stream, viewer_stream_trace_chunk, seek_t);
+                                       stream.get(), viewer_stream_trace_chunk, seek_t);
                                lttng_trace_chunk_put(viewer_stream_trace_chunk);
                                viewer_stream_trace_chunk = nullptr;
                                if (!viewer_stream) {
                                        ret = -1;
-                                       ctf_trace_put(ctf_trace);
-                                       goto error_unlock;
+                                       goto end;
                                }
 
                                if (nb_created) {
@@ -530,39 +533,28 @@ static int make_viewer_streams(struct relay_session *relay_session,
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               if (relay_stream->is_metadata) {
-                                       if (!relay_stream->closed ||
-                                           relay_stream->metadata_received >
+                               if (stream->is_metadata) {
+                                       if (!stream->closed ||
+                                           stream->metadata_received >
                                                    viewer_stream->metadata_sent) {
                                                (*nb_total)++;
                                        }
                                } else {
-                                       if (!relay_stream->closed ||
-                                           !(((int64_t) (relay_stream->prev_data_seq -
-                                                         relay_stream->last_net_seq_num)) >= 0)) {
+                                       if (!stream->closed ||
+                                           !(((int64_t) (stream->prev_data_seq -
+                                                         stream->last_net_seq_num)) >= 0)) {
                                                (*nb_total)++;
                                        }
                                }
                        }
                        /* Put local reference. */
                        viewer_stream_put(viewer_stream);
-               next:
-                       pthread_mutex_unlock(&relay_stream->lock);
-                       stream_put(relay_stream);
                }
-               relay_stream = nullptr;
-               ctf_trace_put(ctf_trace);
        }
 
        ret = 0;
 
-error_unlock:
-
-       if (relay_stream) {
-               pthread_mutex_unlock(&relay_stream->lock);
-               stream_put(relay_stream);
-       }
-
+end:
        return ret;
 }
 
This page took 0.029636 seconds and 4 git commands to generate.