consumerd: consumer.cpp: iterate on lfht using lfht_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 26 Jul 2024 21:54:59 +0000 (21:54 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 18:26:44 +0000 (14:26 -0400)
Change-Id: Icecd70d1023847b1489b298e4c04625bac4ebcc9
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer.cpp

index e242f64b5ada62ca4302026592f9cbb2049637d0..c991d622bb4e2b8848e78e2ffb610e01dc38ffa9 100644 (file)
@@ -421,16 +421,12 @@ end:
  */
 static void cleanup_relayd_ht()
 {
-       struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd;
-
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
-                       consumer_destroy_relayd(relayd);
-               }
+       for (auto *relayd :
+            lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+                                                decltype(consumer_relayd_sock_pair::node),
+                                                &consumer_relayd_sock_pair::node>(
+                    *the_consumer_data.relayd_ht->ht)) {
+               consumer_destroy_relayd(relayd);
        }
 
        lttng_ht_destroy(the_consumer_data.relayd_ht);
@@ -446,15 +442,13 @@ static void cleanup_relayd_ht()
 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                                             enum consumer_endpoint_status status)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       const lttng::urcu::read_lock_guard read_lock;
-
        /* Let's begin with metadata */
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
                        stream->chan->metadata_pushed_wait_queue.wake_all();
@@ -464,7 +458,10 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
        }
 
        /* Follow up by the data streams */
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*data_ht->ht)) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
@@ -1151,8 +1148,6 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                             int *nb_inactive_fd)
 {
        int i = 0;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(ht);
@@ -1162,32 +1157,32 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
        DBG("Updating poll fd array");
        *nb_inactive_fd = 0;
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-                       /*
-                        * Only active streams with an active end point can be added to the
-                        * poll set and local stream storage of the thread.
-                        *
-                        * There is a potential race here for endpoint_status to be updated
-                        * just after the check. However, this is OK since the stream(s) will
-                        * be deleted once the thread is notified that the end point state has
-                        * changed where this function will be called back again.
-                        *
-                        * We track the number of inactive FDs because they still need to be
-                        * closed by the polling thread after a wakeup on the data_pipe or
-                        * metadata_pipe.
-                        */
-                       if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
-                               (*nb_inactive_fd)++;
-                               continue;
-                       }
-
-                       (*pollfd)[i].fd = stream->wait_fd;
-                       (*pollfd)[i].events = POLLIN | POLLPRI;
-                       local_stream[i] = stream;
-                       i++;
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
+               /*
+                * Only active streams with an active end point can be added to the
+                * poll set and local stream storage of the thread.
+                *
+                * There is a potential race here for endpoint_status to be updated
+                * just after the check. However, this is OK since the stream(s) will
+                * be deleted once the thread is notified that the end point state has
+                * changed where this function will be called back again.
+                *
+                * We track the number of inactive FDs because they still need to be
+                * closed by the polling thread after a wakeup on the data_pipe or
+                * metadata_pipe.
+                */
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                       (*nb_inactive_fd)++;
+                       continue;
                }
+
+               (*pollfd)[i].fd = stream->wait_fd;
+               (*pollfd)[i].events = POLLIN | POLLPRI;
+               local_stream[i] = stream;
+               i++;
        }
 
        /*
@@ -1271,17 +1266,14 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
  */
 void lttng_consumer_cleanup()
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
        unsigned int trace_chunks_left;
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-                       consumer_del_channel(channel);
-               }
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::node),
+                                                &lttng_consumer_channel::node>(
+                    *the_consumer_data.channel_ht->ht)) {
+               consumer_del_channel(channel);
        }
 
        lttng_ht_destroy(the_consumer_data.channel_ht);
@@ -1453,22 +1445,19 @@ error:
  */
 static void destroy_data_stream_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        if (ht == nullptr) {
                return;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-                       /*
-                        * Ignore return value since we are currently cleaning up so any error
-                        * can't be handled.
-                        */
-                       (void) consumer_del_stream(stream, ht);
-               }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_stream(stream, ht);
        }
 
        lttng_ht_destroy(ht);
@@ -1480,22 +1469,19 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
  */
 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        if (ht == nullptr) {
                return;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-                       /*
-                        * Ignore return value since we are currently cleaning up so any error
-                        * can't be handled.
-                        */
-                       (void) consumer_del_metadata_stream(stream, ht);
-               }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_metadata_stream(stream, ht);
        }
 
        lttng_ht_destroy(ht);
@@ -2223,22 +2209,18 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
  */
 static void validate_endpoint_status_data_stream()
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer delete flagged data stream");
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
-                       /* Validate delete flag of the stream */
-                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                               continue;
-                       }
-                       /* Delete it right now */
-                       consumer_del_stream(stream, data_ht);
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*data_ht->ht)) {
+               /* Validate delete flag of the stream */
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                       continue;
                }
+               /* Delete it right now */
+               consumer_del_stream(stream, data_ht);
        }
 }
 
@@ -2247,29 +2229,26 @@ static void validate_endpoint_status_data_stream()
  */
 static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer delete flagged metadata stream");
 
        LTTNG_ASSERT(pollset);
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
-                       /* Validate delete flag of the stream */
-                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                               continue;
-                       }
-                       /*
-                        * Remove from pollset so the metadata thread can continue without
-                        * blocking on a deleted stream.
-                        */
-                       lttng_poll_del(pollset, stream->wait_fd);
-
-                       /* Delete it right now */
-                       consumer_del_metadata_stream(stream, metadata_ht);
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
+               /* Validate delete flag of the stream */
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                       continue;
                }
+               /*
+                * Remove from pollset so the metadata thread can continue without
+                * blocking on a deleted stream.
+                */
+               lttng_poll_del(pollset, stream->wait_fd);
+
+               /* Delete it right now */
+               consumer_del_metadata_stream(stream, metadata_ht);
        }
 }
 
@@ -2850,21 +2829,16 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
 
 static void destroy_channel_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
-       int ret;
-
        if (ht == nullptr) {
                return;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
-                       ret = lttng_ht_del(ht, &iter);
-                       LTTNG_ASSERT(ret != 0);
-               }
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::wait_fd_node),
+                                                &lttng_consumer_channel::wait_fd_node>(*ht->ht)) {
+               const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
+               LTTNG_ASSERT(ret != 0);
        }
 
        lttng_ht_destroy(ht);
@@ -3070,7 +3044,8 @@ void *consumer_thread_channel_poll(void *data)
                        node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
-                       chan = lttng::utils::container_of(node, &lttng_consumer_channel::wait_fd_node);
+                       chan = lttng::utils::container_of(node,
+                                                         &lttng_consumer_channel::wait_fd_node);
 
                        /* Check for error event */
                        if (revents & (LPOLLERR | LPOLLHUP)) {
@@ -3699,27 +3674,23 @@ error_nosignal:
  */
 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 {
-       struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd = nullptr;
-
-       ASSERT_RCU_READ_LOCKED();
-
        /* Iterate over all relayd since they are indexed by net_seq_idx. */
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+       for (auto *relayd :
+            lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+                                                decltype(consumer_relayd_sock_pair::node),
+                                                &consumer_relayd_sock_pair::node>(
+                    *the_consumer_data.relayd_ht->ht)) {
                /*
                 * Check by sessiond id which is unique here where the relayd session
                 * id might not be when having multiple relayd.
                 */
                if (relayd->sessiond_session_id == id) {
                        /* Found the relayd. There can be only one per id. */
-                       goto found;
+                       return relayd;
                }
        }
 
        return nullptr;
-
-found:
-       return relayd;
 }
 
 /*
@@ -4898,8 +4869,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
        const char *close_command_name = "none";
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
        enum lttng_trace_chunk_status chunk_status;
 
        if (relayd_id) {
@@ -4953,32 +4922,33 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * it; it is only kept around to compare it (by address) to the
         * current chunk found in the session's channels.
         */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (
-                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-                       int ret;
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::node),
+                                                &lttng_consumer_channel::node>(
+                    *the_consumer_data.channel_ht->ht)) {
+               int ret;
 
+               /*
+                * Only change the channel's chunk to NULL if it still
+                * references the chunk being closed. The channel may
+                * reference a newer channel in the case of a session
+                * rotation. When a session rotation occurs, the "next"
+                * chunk is created before the "current" chunk is closed.
+                */
+               if (channel->trace_chunk != chunk) {
+                       continue;
+               }
+               ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+               if (ret) {
                        /*
-                        * Only change the channel's chunk to NULL if it still
-                        * references the chunk being closed. The channel may
-                        * reference a newer channel in the case of a session
-                        * rotation. When a session rotation occurs, the "next"
-                        * chunk is created before the "current" chunk is closed.
+                        * Attempt to close the chunk on as many channels as
+                        * possible.
                         */
-                       if (channel->trace_chunk != chunk) {
-                               continue;
-                       }
-                       ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
-                       if (ret) {
-                               /*
-                                * Attempt to close the chunk on as many channels as
-                                * possible.
-                                */
-                               ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
-                       }
+                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
                }
        }
+
        if (relayd_id) {
                int ret;
                struct consumer_relayd_sock_pair *relayd;
This page took 0.031488 seconds and 4 git commands to generate.