relayd: live.cpp: iterate on lfht using lfht_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 24 Jul 2024 20:07:09 +0000 (20:07 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 01:26:51 +0000 (01:26 +0000)
Change-Id: Id738d57f3bdfc26e08dc865dfdfed06834a09bc7
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.cpp

index a100433b47c9ddae6ff50f9d03a653b2adfd5fa4..02e7aacf686c6235c11769dc7ec4eed95c79b428 100644 (file)
@@ -278,62 +278,59 @@ static ssize_t
 send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag)
 {
        ssize_t ret;
-       struct lttng_ht_iter iter;
-       struct relay_viewer_stream *vstream;
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) {
-                       struct ctf_trace *ctf_trace;
-                       struct lttng_viewer_stream send_stream = {};
-
-                       health_code_update();
+       for (auto *vstream :
+            lttng::urcu::lfht_iteration_adapter<relay_viewer_stream,
+                                                decltype(relay_viewer_stream::stream_n),
+                                                &relay_viewer_stream::stream_n>(
+                    *viewer_streams_ht->ht)) {
+               struct ctf_trace *ctf_trace;
+               struct lttng_viewer_stream send_stream = {};
 
-                       if (!viewer_stream_get(vstream)) {
-                               continue;
-                       }
-
-                       pthread_mutex_lock(&vstream->stream->lock);
-                       /* Ignore if not the same session. */
-                       if (vstream->stream->trace->session->id != session_id ||
-                           (!ignore_sent_flag && vstream->sent_flag)) {
-                               pthread_mutex_unlock(&vstream->stream->lock);
-                               viewer_stream_put(vstream);
-                               continue;
-                       }
+               health_code_update();
 
-                       ctf_trace = vstream->stream->trace;
-                       send_stream.id = htobe64(vstream->stream->stream_handle);
-                       send_stream.ctf_trace_id = htobe64(ctf_trace->id);
-                       send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
-                       if (lttng_strncpy(send_stream.path_name,
-                                         vstream->path_name,
-                                         sizeof(send_stream.path_name))) {
-                               pthread_mutex_unlock(&vstream->stream->lock);
-                               viewer_stream_put(vstream);
-                               ret = -1; /* Error. */
-                               goto end;
-                       }
-                       if (lttng_strncpy(send_stream.channel_name,
-                                         vstream->channel_name,
-                                         sizeof(send_stream.channel_name))) {
-                               pthread_mutex_unlock(&vstream->stream->lock);
-                               viewer_stream_put(vstream);
-                               ret = -1; /* Error. */
-                               goto end;
-                       }
+               if (!viewer_stream_get(vstream)) {
+                       continue;
+               }
 
-                       DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
-                       vstream->sent_flag = true;
+               pthread_mutex_lock(&vstream->stream->lock);
+               /* Ignore if not the same session. */
+               if (vstream->stream->trace->session->id != session_id ||
+                   (!ignore_sent_flag && vstream->sent_flag)) {
                        pthread_mutex_unlock(&vstream->stream->lock);
+                       viewer_stream_put(vstream);
+                       continue;
+               }
 
-                       ret = send_response(sock, &send_stream, sizeof(send_stream));
+               ctf_trace = vstream->stream->trace;
+               send_stream.id = htobe64(vstream->stream->stream_handle);
+               send_stream.ctf_trace_id = htobe64(ctf_trace->id);
+               send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
+               if (lttng_strncpy(send_stream.path_name,
+                                 vstream->path_name,
+                                 sizeof(send_stream.path_name))) {
+                       pthread_mutex_unlock(&vstream->stream->lock);
                        viewer_stream_put(vstream);
-                       if (ret < 0) {
-                               goto end;
-                       }
+                       ret = -1; /* Error. */
+                       goto end;
+               }
+               if (lttng_strncpy(send_stream.channel_name,
+                                 vstream->channel_name,
+                                 sizeof(send_stream.channel_name))) {
+                       pthread_mutex_unlock(&vstream->stream->lock);
+                       viewer_stream_put(vstream);
+                       ret = -1; /* Error. */
+                       goto end;
+               }
+
+               DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
+               vstream->sent_flag = true;
+               pthread_mutex_unlock(&vstream->stream->lock);
+
+               ret = send_response(sock, &send_stream, sizeof(send_stream));
+               viewer_stream_put(vstream);
+               if (ret < 0) {
+                       goto end;
                }
        }
 
@@ -363,8 +360,6 @@ static int make_viewer_streams(struct relay_session *relay_session,
                               bool *closed)
 {
        int ret;
-       struct lttng_ht_iter iter;
-       struct ctf_trace *ctf_trace;
        struct relay_stream *relay_stream = nullptr;
 
        LTTNG_ASSERT(relay_session);
@@ -378,196 +373,185 @@ static int make_viewer_streams(struct relay_session *relay_session,
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
         */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
+       for (auto *ctf_trace : lttng::urcu::
+                    lfht_iteration_adapter<ctf_trace, decltype(ctf_trace::node), &ctf_trace::node>(
+                            *relay_session->ctf_traces_ht->ht)) {
+               bool trace_has_metadata_stream = false;
 
-               cds_lfht_for_each_entry (
-                       relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) {
-                       bool trace_has_metadata_stream = false;
+               health_code_update();
 
-                       health_code_update();
+               if (!ctf_trace_get(ctf_trace)) {
+                       continue;
+               }
 
-                       if (!ctf_trace_get(ctf_trace)) {
-                               continue;
-                       }
+               /*
+                * Iterate over all the streams of the trace to see if we have a
+                * metadata stream.
+                */
+               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
+               {
+                       bool is_metadata_stream;
 
-                       /*
-                        * Iterate over all the streams of the trace to see if we have a
-                        * metadata stream.
-                        */
-                       cds_list_for_each_entry_rcu(
-                               relay_stream, &ctf_trace->stream_list, stream_node)
-                       {
-                               bool is_metadata_stream;
-
-                               pthread_mutex_lock(&relay_stream->lock);
-                               is_metadata_stream = relay_stream->is_metadata;
-                               pthread_mutex_unlock(&relay_stream->lock);
-
-                               if (is_metadata_stream) {
-                                       trace_has_metadata_stream = true;
-                                       break;
-                               }
+                       pthread_mutex_lock(&relay_stream->lock);
+                       is_metadata_stream = relay_stream->is_metadata;
+                       pthread_mutex_unlock(&relay_stream->lock);
+
+                       if (is_metadata_stream) {
+                               trace_has_metadata_stream = true;
+                               break;
                        }
+               }
 
-                       relay_stream = nullptr;
+               relay_stream = nullptr;
 
-                       /*
-                        * If there is no metadata stream in this trace at the moment
-                        * and we never sent one to the viewer, skip the trace. We
-                        * accept that the viewer will not see this trace at all.
-                        */
-                       if (!trace_has_metadata_stream &&
-                           !ctf_trace->metadata_stream_sent_to_viewer) {
-                               ctf_trace_put(ctf_trace);
+               /*
+                * If there is no metadata stream in this trace at the moment
+                * and we never sent one to the viewer, skip the trace. We
+                * accept that the viewer will not see this trace at all.
+                */
+               if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) {
+                       ctf_trace_put(ctf_trace);
+                       continue;
+               }
+
+               cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
+               {
+                       struct relay_viewer_stream *viewer_stream;
+
+                       if (!stream_get(relay_stream)) {
                                continue;
                        }
 
-                       cds_list_for_each_entry_rcu(
-                               relay_stream, &ctf_trace->stream_list, stream_node)
-                       {
-                               struct relay_viewer_stream *viewer_stream;
-
-                               if (!stream_get(relay_stream)) {
-                                       continue;
-                               }
+                       pthread_mutex_lock(&relay_stream->lock);
+                       /*
+                        * stream published is protected by the session lock.
+                        */
+                       if (!relay_stream->published) {
+                               goto next;
+                       }
+                       viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle);
+                       if (!viewer_stream) {
+                               struct lttng_trace_chunk *viewer_stream_trace_chunk = nullptr;
 
-                               pthread_mutex_lock(&relay_stream->lock);
                                /*
-                                * stream published is protected by the session lock.
+                                * Save that we sent the metadata stream to the
+                                * viewer. So that we know what trace the viewer
+                                * is aware of.
                                 */
-                               if (!relay_stream->published) {
-                                       goto next;
+                               if (relay_stream->is_metadata) {
+                                       ctf_trace->metadata_stream_sent_to_viewer = true;
                                }
-                               viewer_stream =
-                                       viewer_stream_get_by_id(relay_stream->stream_handle);
-                               if (!viewer_stream) {
-                                       struct lttng_trace_chunk *viewer_stream_trace_chunk =
-                                               nullptr;
 
-                                       /*
-                                        * Save that we sent the metadata stream to the
-                                        * viewer. So that we know what trace the viewer
-                                        * is aware of.
-                                        */
-                                       if (relay_stream->is_metadata) {
-                                               ctf_trace->metadata_stream_sent_to_viewer = true;
+                               /*
+                                * If a rotation is ongoing, use a copy of the
+                                * relay stream's chunk to ensure the stream
+                                * files exist.
+                                *
+                                * Otherwise, the viewer session's current trace
+                                * chunk can be used safely.
+                                */
+                               if ((relay_stream->ongoing_rotation.is_set ||
+                                    session_has_ongoing_rotation(relay_session)) &&
+                                   relay_stream->trace_chunk) {
+                                       viewer_stream_trace_chunk =
+                                               lttng_trace_chunk_copy(relay_stream->trace_chunk);
+                                       if (!viewer_stream_trace_chunk) {
+                                               ret = -1;
+                                               ctf_trace_put(ctf_trace);
+                                               goto error_unlock;
                                        }
-
+                               } else {
                                        /*
-                                        * If a rotation is ongoing, use a copy of the
-                                        * relay stream's chunk to ensure the stream
-                                        * files exist.
-                                        *
-                                        * Otherwise, the viewer session's current trace
-                                        * chunk can be used safely.
+                                        * Transition the viewer session into the newest
+                                        * trace chunk available.
                                         */
-                                       if ((relay_stream->ongoing_rotation.is_set ||
-                                            session_has_ongoing_rotation(relay_session)) &&
-                                           relay_stream->trace_chunk) {
-                                               viewer_stream_trace_chunk = lttng_trace_chunk_copy(
-                                                       relay_stream->trace_chunk);
-                                               if (!viewer_stream_trace_chunk) {
+                                       if (!lttng_trace_chunk_ids_equal(
+                                                   viewer_session->current_trace_chunk,
+                                                   relay_stream->trace_chunk)) {
+                                               ret = viewer_session_set_trace_chunk_copy(
+                                                       viewer_session, relay_stream->trace_chunk);
+                                               if (ret) {
                                                        ret = -1;
                                                        ctf_trace_put(ctf_trace);
                                                        goto error_unlock;
                                                }
-                                       } else {
+                                       }
+
+                                       if (relay_stream->trace_chunk) {
                                                /*
-                                                * Transition the viewer session into the newest
-                                                * trace chunk available.
+                                                * If the corresponding relay
+                                                * stream's trace chunk is set,
+                                                * the viewer stream will be
+                                                * created under it.
+                                                *
+                                                * Note that a relay stream can
+                                                * have a NULL output trace
+                                                * chunk (for instance, after a
+                                                * clear against a stopped
+                                                * session).
                                                 */
-                                               if (!lttng_trace_chunk_ids_equal(
-                                                           viewer_session->current_trace_chunk,
-                                                           relay_stream->trace_chunk)) {
-                                                       ret = viewer_session_set_trace_chunk_copy(
-                                                               viewer_session,
-                                                               relay_stream->trace_chunk);
-                                                       if (ret) {
-                                                               ret = -1;
-                                                               ctf_trace_put(ctf_trace);
-                                                               goto error_unlock;
-                                                       }
-                                               }
+                                               const bool reference_acquired =
+                                                       lttng_trace_chunk_get(
+                                                               viewer_session->current_trace_chunk);
 
-                                               if (relay_stream->trace_chunk) {
-                                                       /*
-                                                        * If the corresponding relay
-                                                        * stream's trace chunk is set,
-                                                        * the viewer stream will be
-                                                        * created under it.
-                                                        *
-                                                        * Note that a relay stream can
-                                                        * have a NULL output trace
-                                                        * chunk (for instance, after a
-                                                        * clear against a stopped
-                                                        * session).
-                                                        */
-                                                       const bool reference_acquired =
-                                                               lttng_trace_chunk_get(
-                                                                       viewer_session
-                                                                               ->current_trace_chunk);
-
-                                                       LTTNG_ASSERT(reference_acquired);
-                                                       viewer_stream_trace_chunk =
-                                                               viewer_session->current_trace_chunk;
-                                               }
+                                               LTTNG_ASSERT(reference_acquired);
+                                               viewer_stream_trace_chunk =
+                                                       viewer_session->current_trace_chunk;
                                        }
+                               }
 
-                                       viewer_stream = viewer_stream_create(
-                                               relay_stream, viewer_stream_trace_chunk, seek_t);
-                                       lttng_trace_chunk_put(viewer_stream_trace_chunk);
-                                       viewer_stream_trace_chunk = nullptr;
-                                       if (!viewer_stream) {
-                                               ret = -1;
-                                               ctf_trace_put(ctf_trace);
-                                               goto error_unlock;
-                                       }
+                               viewer_stream = viewer_stream_create(
+                                       relay_stream, viewer_stream_trace_chunk, seek_t);
+                               lttng_trace_chunk_put(viewer_stream_trace_chunk);
+                               viewer_stream_trace_chunk = nullptr;
+                               if (!viewer_stream) {
+                                       ret = -1;
+                                       ctf_trace_put(ctf_trace);
+                                       goto error_unlock;
+                               }
 
-                                       if (nb_created) {
-                                               /* Update number of created stream counter. */
-                                               (*nb_created)++;
-                                       }
-                                       /*
-                                        * Ensure a self-reference is preserved even
-                                        * after we have put our local reference.
-                                        */
-                                       if (!viewer_stream_get(viewer_stream)) {
-                                               ERR("Unable to get self-reference on viewer stream, logic error.");
-                                               abort();
+                               if (nb_created) {
+                                       /* Update number of created stream counter. */
+                                       (*nb_created)++;
+                               }
+                               /*
+                                * Ensure a self-reference is preserved even
+                                * after we have put our local reference.
+                                */
+                               if (!viewer_stream_get(viewer_stream)) {
+                                       ERR("Unable to get self-reference on viewer stream, logic error.");
+                                       abort();
+                               }
+                       } else {
+                               if (!viewer_stream->sent_flag && nb_unsent) {
+                                       /* Update number of unsent stream counter. */
+                                       (*nb_unsent)++;
+                               }
+                       }
+                       /* Update number of total stream counter. */
+                       if (nb_total) {
+                               if (relay_stream->is_metadata) {
+                                       if (!relay_stream->closed ||
+                                           relay_stream->metadata_received >
+                                                   viewer_stream->metadata_sent) {
+                                               (*nb_total)++;
                                        }
                                } else {
-                                       if (!viewer_stream->sent_flag && nb_unsent) {
-                                               /* Update number of unsent stream counter. */
-                                               (*nb_unsent)++;
-                                       }
-                               }
-                               /* Update number of total stream counter. */
-                               if (nb_total) {
-                                       if (relay_stream->is_metadata) {
-                                               if (!relay_stream->closed ||
-                                                   relay_stream->metadata_received >
-                                                           viewer_stream->metadata_sent) {
-                                                       (*nb_total)++;
-                                               }
-                                       } else {
-                                               if (!relay_stream->closed ||
-                                                   !(((int64_t) (relay_stream->prev_data_seq -
-                                                                 relay_stream->last_net_seq_num)) >=
-                                                     0)) {
-                                                       (*nb_total)++;
-                                               }
+                                       if (!relay_stream->closed ||
+                                           !(((int64_t) (relay_stream->prev_data_seq -
+                                                         relay_stream->last_net_seq_num)) >= 0)) {
+                                               (*nb_total)++;
                                        }
                                }
-                               /* Put local reference. */
-                               viewer_stream_put(viewer_stream);
-                       next:
-                               pthread_mutex_unlock(&relay_stream->lock);
-                               stream_put(relay_stream);
                        }
-                       relay_stream = nullptr;
-                       ctf_trace_put(ctf_trace);
+                       /* Put local reference. */
+                       viewer_stream_put(viewer_stream);
+               next:
+                       pthread_mutex_unlock(&relay_stream->lock);
+                       stream_put(relay_stream);
                }
+               relay_stream = nullptr;
+               ctf_trace_put(ctf_trace);
        }
 
        ret = 0;
@@ -1049,8 +1033,6 @@ static int viewer_list_sessions(struct relay_connection *conn)
 {
        int ret = 0;
        struct lttng_viewer_list_sessions session_list;
-       struct lttng_ht_iter iter;
-       struct relay_session *session;
        struct lttng_viewer_session *send_session_buf = nullptr;
        uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT;
        uint32_t count = 0;
@@ -1060,63 +1042,61 @@ static int viewer_list_sessions(struct relay_connection *conn)
                return -1;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
+       for (auto *session :
+            lttng::urcu::lfht_iteration_adapter<relay_session,
+                                                decltype(relay_session::session_n),
+                                                &relay_session::session_n>(*sessions_ht->ht)) {
+               struct lttng_viewer_session *send_session;
 
-               cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) {
-                       struct lttng_viewer_session *send_session;
-
-                       health_code_update();
+               health_code_update();
 
-                       pthread_mutex_lock(&session->lock);
-                       if (session->connection_closed) {
-                               /* Skip closed session */
-                               goto next_session;
-                       }
+               pthread_mutex_lock(&session->lock);
+               if (session->connection_closed) {
+                       /* Skip closed session */
+                       goto next_session;
+               }
 
-                       if (count >= buf_count) {
-                               struct lttng_viewer_session *newbuf;
-                               const uint32_t new_buf_count = buf_count << 1;
+               if (count >= buf_count) {
+                       struct lttng_viewer_session *newbuf;
+                       const uint32_t new_buf_count = buf_count << 1;
 
-                               newbuf = (lttng_viewer_session *) realloc(
-                                       send_session_buf,
-                                       new_buf_count * sizeof(*send_session_buf));
-                               if (!newbuf) {
-                                       ret = -1;
-                                       goto break_loop;
-                               }
-                               send_session_buf = newbuf;
-                               buf_count = new_buf_count;
-                       }
-                       send_session = &send_session_buf[count];
-                       if (lttng_strncpy(send_session->session_name,
-                                         session->session_name,
-                                         sizeof(send_session->session_name))) {
-                               ret = -1;
-                               goto break_loop;
-                       }
-                       if (lttng_strncpy(send_session->hostname,
-                                         session->hostname,
-                                         sizeof(send_session->hostname))) {
+                       newbuf = (lttng_viewer_session *) realloc(
+                               send_session_buf, new_buf_count * sizeof(*send_session_buf));
+                       if (!newbuf) {
                                ret = -1;
                                goto break_loop;
                        }
-                       send_session->id = htobe64(session->id);
-                       send_session->live_timer = htobe32(session->live_timer);
-                       if (session->viewer_attached) {
-                               send_session->clients = htobe32(1);
-                       } else {
-                               send_session->clients = htobe32(0);
-                       }
-                       send_session->streams = htobe32(session->stream_count);
-                       count++;
-               next_session:
-                       pthread_mutex_unlock(&session->lock);
-                       continue;
-               break_loop:
-                       pthread_mutex_unlock(&session->lock);
-                       break;
+                       send_session_buf = newbuf;
+                       buf_count = new_buf_count;
+               }
+               send_session = &send_session_buf[count];
+               if (lttng_strncpy(send_session->session_name,
+                                 session->session_name,
+                                 sizeof(send_session->session_name))) {
+                       ret = -1;
+                       goto break_loop;
+               }
+               if (lttng_strncpy(send_session->hostname,
+                                 session->hostname,
+                                 sizeof(send_session->hostname))) {
+                       ret = -1;
+                       goto break_loop;
                }
+               send_session->id = htobe64(session->id);
+               send_session->live_timer = htobe32(session->live_timer);
+               if (session->viewer_attached) {
+                       send_session->clients = htobe32(1);
+               } else {
+                       send_session->clients = htobe32(0);
+               }
+               send_session->streams = htobe32(session->stream_count);
+               count++;
+       next_session:
+               pthread_mutex_unlock(&session->lock);
+               continue;
+       break_loop:
+               pthread_mutex_unlock(&session->lock);
+               break;
        }
 
        if (ret < 0) {
@@ -2610,9 +2590,7 @@ static void *thread_worker(void *data __attribute__((unused)))
        uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttng_ht *viewer_connections_ht;
-       struct lttng_ht_iter iter;
        struct lttng_viewer_cmd recv_hdr;
-       struct relay_connection *destroy_conn;
 
        DBG("[thread] Live viewer relay worker started");
 
@@ -2762,15 +2740,15 @@ error:
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 
        /* Cleanup remaining connection object. */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
-                       health_code_update();
-                       connection_put(destroy_conn);
-               }
+       for (auto *destroy_conn :
+            lttng::urcu::lfht_iteration_adapter<relay_connection,
+                                                decltype(relay_connection::sock_n),
+                                                &relay_connection::sock_n>(
+                    *viewer_connections_ht->ht)) {
+               health_code_update();
+               connection_put(destroy_conn);
        }
+
 error_poll_create:
        lttng_ht_destroy(viewer_connections_ht);
 viewer_connections_ht_error:
This page took 0.034855 seconds and 4 git commands to generate.