]> git.lttng.org Git - lttng-tools.git/commitdiff
relayd: main.cpp: iterate on lfht using lfht_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 26 Jul 2024 19:06:54 +0000 (19:06 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 01:26:51 +0000 (01:26 +0000)
Change-Id: Ibe839a6733432e2ed464e1785569ed92087cf329
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.cpp

index 87b6238955a0afc1ed695bffd831753abcf3dc36..b18df2e9d6e43462d96b4d0403486f6fcb3a06f2 100644 (file)
@@ -2194,10 +2194,8 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 {
        int ret;
        ssize_t send_ret;
-       struct lttng_ht_iter iter;
        struct lttcomm_relayd_begin_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
 
        LTTNG_ASSERT(recv_hdr);
        LTTNG_ASSERT(conn);
@@ -2226,24 +2224,23 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
         * to iterate over all streams to find the one associated with
         * the right session_id.
         */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-                       if (!stream_get(stream)) {
-                               continue;
-                       }
-
-                       if (stream->trace->session->id == msg.session_id) {
-                               pthread_mutex_lock(&stream->lock);
-                               stream->data_pending_check_done = false;
-                               pthread_mutex_unlock(&stream->lock);
-                               DBG("Set begin data pending flag to stream %" PRIu64,
-                                   stream->stream_handle);
-                       }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<relay_stream,
+                                                decltype(relay_stream::node),
+                                                &relay_stream::node>(*relay_streams_ht->ht)) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
 
-                       stream_put(stream);
+               if (stream->trace->session->id == msg.session_id) {
+                       pthread_mutex_lock(&stream->lock);
+                       stream->data_pending_check_done = false;
+                       pthread_mutex_unlock(&stream->lock);
+                       DBG("Set begin data pending flag to stream %" PRIu64,
+                           stream->stream_handle);
                }
+
+               stream_put(stream);
        }
 
        memset(&reply, 0, sizeof(reply));
@@ -2277,10 +2274,8 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
 {
        int ret;
        ssize_t send_ret;
-       struct lttng_ht_iter iter;
        struct lttcomm_relayd_end_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
-       struct relay_stream *stream;
        uint32_t is_data_inflight = 0;
 
        DBG("End data pending command");
@@ -2305,48 +2300,47 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at
         * Iterate over all streams to see if the begin data pending
         * flag is set.
         */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
-                       if (!stream_get(stream)) {
-                               continue;
-                       }
-
-                       if (stream->trace->session->id != msg.session_id) {
-                               stream_put(stream);
-                               continue;
-                       }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<relay_stream,
+                                                decltype(relay_stream::node),
+                                                &relay_stream::node>(*relay_streams_ht->ht)) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
 
-                       pthread_mutex_lock(&stream->lock);
-                       if (!stream->data_pending_check_done) {
-                               uint64_t stream_seq;
+               if (stream->trace->session->id != msg.session_id) {
+                       stream_put(stream);
+                       continue;
+               }
 
-                               if (session_streams_have_index(conn->session)) {
-                                       /*
-                                        * Ensure that both the index and stream data have been
-                                        * flushed up to the requested point.
-                                        */
-                                       stream_seq = std::min(stream->prev_data_seq,
-                                                             stream->prev_index_seq);
-                               } else {
-                                       stream_seq = stream->prev_data_seq;
-                               }
+               pthread_mutex_lock(&stream->lock);
+               if (!stream->data_pending_check_done) {
+                       uint64_t stream_seq;
 
-                               if (!stream->closed ||
-                                   !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
-                                       is_data_inflight = 1;
-                                       DBG("Data is still in flight for stream %" PRIu64,
-                                           stream->stream_handle);
-                                       pthread_mutex_unlock(&stream->lock);
-                                       stream_put(stream);
-                                       break;
-                               }
+                       if (session_streams_have_index(conn->session)) {
+                               /*
+                                * Ensure that both the index and stream data have been
+                                * flushed up to the requested point.
+                                */
+                               stream_seq =
+                                       std::min(stream->prev_data_seq, stream->prev_index_seq);
+                       } else {
+                               stream_seq = stream->prev_data_seq;
                        }
 
-                       pthread_mutex_unlock(&stream->lock);
-                       stream_put(stream);
+                       if (!stream->closed ||
+                           !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+                               is_data_inflight = 1;
+                               DBG("Data is still in flight for stream %" PRIu64,
+                                   stream->stream_handle);
+                               pthread_mutex_unlock(&stream->lock);
+                               stream_put(stream);
+                               break;
+                       }
                }
+
+               pthread_mutex_unlock(&stream->lock);
+               stream_put(stream);
        }
 
        memset(&reply, 0, sizeof(reply));
@@ -3900,8 +3894,6 @@ static void *relay_thread_worker(void *data __attribute__((unused)))
        uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
-       struct lttng_ht_iter iter;
-       struct relay_connection *destroy_conn = nullptr;
 
        DBG("[thread] Relay worker started");
 
@@ -4173,22 +4165,20 @@ restart:
 exit:
 error:
        /* Cleanup remaining connection object. */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
-                       health_code_update();
+       for (auto *destroy_conn :
+            lttng::urcu::lfht_iteration_adapter<relay_connection,
+                                                decltype(relay_connection::sock_n),
+                                                &relay_connection::sock_n>(
+                    *relay_connections_ht->ht)) {
+               health_code_update();
 
-                       session_abort(destroy_conn->session);
+               session_abort(destroy_conn->session);
 
-                       /*
-                        * No need to grab another ref, because we own
-                        * destroy_conn.
-                        */
-                       relay_thread_close_connection(
-                               &events, destroy_conn->sock->fd, destroy_conn);
-               }
+               /*
+                * No need to grab another ref, because we own
+                * destroy_conn.
+                */
+               relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
        }
 
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
This page took 0.03294 seconds and 4 git commands to generate.