sessiond: main.cpp: iterate on list using list_iteration_adapter
[lttng-tools.git] / src / common / consumer / consumer.cpp
index 4823d9b9c59aa6f9933d9e7552fe27fb48865cb6..f1f29d4292c4411c28dc2b756c7e6a98bbb3b42c 100644 (file)
@@ -23,6 +23,7 @@
 #include <common/io-hint.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
@@ -45,6 +46,7 @@
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <type_traits>
 #include <unistd.h>
 
 lttng_consumer_global_data the_consumer_data;
@@ -171,19 +173,12 @@ error:
  */
 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
 {
-       struct lttng_consumer_stream *stream, *stmp;
-
        LTTNG_ASSERT(channel);
 
        /* Delete streams that might have been left in the stream list. */
-       cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
-               /*
-                * Once a stream is added to this list, the buffers were created so we
-                * have a guarantee that this call will succeed. Setting the monitor
-                * mode to 0 so we don't lock nor try to delete the stream from the
-                * global hash table.
-                */
-               stream->monitor = 0;
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                consumer_stream_destroy(stream, nullptr);
        }
 }
@@ -205,10 +200,10 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                return nullptr;
        }
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        lttng_ht_lookup(ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
@@ -220,7 +215,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        stream = find_stream(key, ht);
        if (stream) {
                stream->key = (uint64_t) -1ULL;
@@ -253,7 +248,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        }
 
        lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
        }
@@ -273,7 +268,7 @@ static void steal_channel_key(uint64_t key)
 {
        struct lttng_consumer_channel *channel;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(key);
        if (channel) {
                channel->key = (uint64_t) -1ULL;
@@ -303,7 +298,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -403,7 +399,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->is_published) {
                int ret;
 
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                iter.iter.node = &channel->node.node;
                ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
                LTTNG_ASSERT(!ret);
@@ -426,16 +422,12 @@ end:
  */
 static void cleanup_relayd_ht()
 {
-       struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd;
-
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
-                       consumer_destroy_relayd(relayd);
-               }
+       for (auto *relayd :
+            lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+                                                decltype(consumer_relayd_sock_pair::node),
+                                                &consumer_relayd_sock_pair::node>(
+                    *the_consumer_data.relayd_ht->ht)) {
+               consumer_destroy_relayd(relayd);
        }
 
        lttng_ht_destroy(the_consumer_data.relayd_ht);
@@ -451,25 +443,26 @@ static void cleanup_relayd_ht()
 static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                                             enum consumer_endpoint_status status)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       lttng::urcu::read_lock_guard read_lock;
-
        /* Let's begin with metadata */
-       cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
-                       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
 
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
 
        /* Follow up by the data streams */
-       cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*data_ht->ht)) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
                        DBG("Delete flag set to data stream %d", stream->wait_fd);
@@ -581,7 +574,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
@@ -626,7 +619,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
  */
 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
-       int ret = 0;
+       const int ret = 0;
        struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
@@ -634,7 +627,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
        ASSERT_RCU_READ_LOCKED();
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                goto end;
        }
@@ -695,7 +688,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
        }
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
        }
@@ -719,7 +712,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
        LTTNG_ASSERT(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -772,7 +765,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        LTTNG_ASSERT(net_seq_idx != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -806,7 +799,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
@@ -1019,9 +1012,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == nullptr) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1035,9 +1030,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
        channel->is_live = is_in_live_session;
-       pthread_mutex_init(&channel->lock, NULL);
-       pthread_mutex_init(&channel->timer_lock, NULL);
-       lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
+       pthread_mutex_init(&channel->lock, nullptr);
+       pthread_mutex_init(&channel->timer_lock, nullptr);
 
        switch (output) {
        case LTTNG_EVENT_SPLICE:
@@ -1048,7 +1042,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
+               delete channel;
                channel = nullptr;
                goto end;
        }
@@ -1088,7 +1082,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
        if (trace_chunk) {
-               int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
+               const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
                if (ret) {
                        goto error;
                }
@@ -1124,7 +1118,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
         */
        steal_channel_key(channel->key);
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
        lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
                         &channel->channels_by_session_id_ht_node);
@@ -1155,8 +1149,6 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                             int *nb_inactive_fd)
 {
        int i = 0;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
 
        LTTNG_ASSERT(ctx);
        LTTNG_ASSERT(ht);
@@ -1166,32 +1158,32 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
        DBG("Updating poll fd array");
        *nb_inactive_fd = 0;
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-                       /*
-                        * Only active streams with an active end point can be added to the
-                        * poll set and local stream storage of the thread.
-                        *
-                        * There is a potential race here for endpoint_status to be updated
-                        * just after the check. However, this is OK since the stream(s) will
-                        * be deleted once the thread is notified that the end point state has
-                        * changed where this function will be called back again.
-                        *
-                        * We track the number of inactive FDs because they still need to be
-                        * closed by the polling thread after a wakeup on the data_pipe or
-                        * metadata_pipe.
-                        */
-                       if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
-                               (*nb_inactive_fd)++;
-                               continue;
-                       }
-
-                       (*pollfd)[i].fd = stream->wait_fd;
-                       (*pollfd)[i].events = POLLIN | POLLPRI;
-                       local_stream[i] = stream;
-                       i++;
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
+               /*
+                * Only active streams with an active end point can be added to the
+                * poll set and local stream storage of the thread.
+                *
+                * There is a potential race here for endpoint_status to be updated
+                * just after the check. However, this is OK since the stream(s) will
+                * be deleted once the thread is notified that the end point state has
+                * changed where this function will be called back again.
+                *
+                * We track the number of inactive FDs because they still need to be
+                * closed by the polling thread after a wakeup on the data_pipe or
+                * metadata_pipe.
+                */
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                       (*nb_inactive_fd)++;
+                       continue;
                }
+
+               (*pollfd)[i].fd = stream->wait_fd;
+               (*pollfd)[i].events = POLLIN | POLLPRI;
+               local_stream[i] = stream;
+               i++;
        }
 
        /*
@@ -1253,11 +1245,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx,
  * Send return code to the session daemon.
  * If the socket is not defined, we return 0, it is not a fatal error
  */
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+                             enum lttcomm_return_code error_code)
 {
        if (ctx->consumer_error_socket > 0) {
+               const std::int32_t comm_code = std::int32_t(error_code);
+
+               static_assert(
+                       sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+                       "Fixed-size communication type too small to accomodate lttcomm_return_code");
                return lttcomm_send_unix_sock(
-                       ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+                       ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
        }
 
        return 0;
@@ -1269,17 +1267,14 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
  */
 void lttng_consumer_cleanup()
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
        unsigned int trace_chunks_left;
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (
-                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-                       consumer_del_channel(channel);
-               }
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::node),
+                                                &lttng_consumer_channel::node>(
+                    *the_consumer_data.channel_ht->ht)) {
+               consumer_del_channel(channel);
        }
 
        lttng_ht_destroy(the_consumer_data.channel_ht);
@@ -1341,7 +1336,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
  */
 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
 {
-       int outfd = stream->out_fd;
+       const int outfd = stream->out_fd;
 
        /*
         * This does a blocking write-and-wait on any page that belongs to the
@@ -1451,22 +1446,19 @@ error:
  */
 static void destroy_data_stream_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        if (ht == nullptr) {
                return;
        }
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-                       /*
-                        * Ignore return value since we are currently cleaning up so any error
-                        * can't be handled.
-                        */
-                       (void) consumer_del_stream(stream, ht);
-               }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_stream(stream, ht);
        }
 
        lttng_ht_destroy(ht);
@@ -1478,22 +1470,19 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
  */
 static void destroy_metadata_stream_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        if (ht == nullptr) {
                return;
        }
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
-                       /*
-                        * Ignore return value since we are currently cleaning up so any error
-                        * can't be handled.
-                        */
-                       (void) consumer_del_metadata_stream(stream, ht);
-               }
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*ht->ht)) {
+               /*
+                * Ignore return value since we are currently cleaning up so any error
+                * can't be handled.
+                */
+               (void) consumer_del_metadata_stream(stream, ht);
        }
 
        lttng_ht_destroy(ht);
@@ -1596,7 +1585,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        size_t write_len;
 
        /* RCU lock for the relayd pointer */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
@@ -1753,7 +1742,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
        off_t orig_offset = stream->out_fd_offset;
-       int fd = stream->wait_fd;
+       const int fd = stream->wait_fd;
        /* Default is on the disk */
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = nullptr;
@@ -1773,7 +1762,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        }
 
        /* RCU lock for the relayd pointer */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
@@ -1870,7 +1859,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
 
                /* Handle stream on the relayd if the output is on the network */
                if (relayd && stream->metadata_flag) {
-                       size_t metadata_payload_size =
+                       const size_t metadata_payload_size =
                                sizeof(struct lttcomm_relayd_metadata_payload);
 
                        /* Update counter to fit the spliced data */
@@ -2133,7 +2122,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
-       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
@@ -2176,14 +2165,14 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         * after this point.
         */
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /*
         * Lookup the stream just to make sure it does not exist in our internal
         * state. This should NEVER happen.
         */
        lttng_ht_lookup(ht, &stream->key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        LTTNG_ASSERT(!node);
 
        /*
@@ -2221,22 +2210,18 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
  */
 static void validate_endpoint_status_data_stream()
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer delete flagged data stream");
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
-                       /* Validate delete flag of the stream */
-                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                               continue;
-                       }
-                       /* Delete it right now */
-                       consumer_del_stream(stream, data_ht);
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*data_ht->ht)) {
+               /* Validate delete flag of the stream */
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                       continue;
                }
+               /* Delete it right now */
+               consumer_del_stream(stream, data_ht);
        }
 }
 
@@ -2245,29 +2230,26 @@ static void validate_endpoint_status_data_stream()
  */
 static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-
        DBG("Consumer delete flagged metadata stream");
 
        LTTNG_ASSERT(pollset);
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
-                       /* Validate delete flag of the stream */
-                       if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
-                               continue;
-                       }
-                       /*
-                        * Remove from pollset so the metadata thread can continue without
-                        * blocking on a deleted stream.
-                        */
-                       lttng_poll_del(pollset, stream->wait_fd);
-
-                       /* Delete it right now */
-                       consumer_del_metadata_stream(stream, metadata_ht);
+       for (auto *stream :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+                                                decltype(lttng_consumer_stream::node),
+                                                &lttng_consumer_stream::node>(*metadata_ht->ht)) {
+               /* Validate delete flag of the stream */
+               if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+                       continue;
                }
+               /*
+                * Remove from pollset so the metadata thread can continue without
+                * blocking on a deleted stream.
+                */
+               lttng_poll_del(pollset, stream->wait_fd);
+
+               /* Delete it right now */
+               consumer_del_metadata_stream(stream, metadata_ht);
        }
 }
 
@@ -2403,16 +2385,16 @@ void *consumer_thread_metadata_poll(void *data)
                                continue;
                        }
 
-                       lttng::urcu::read_lock_guard read_lock;
+                       const lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
                                lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
                        }
-                       node = lttng_ht_iter_get_node_u64(&iter);
+                       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
-                       stream = caa_container_of(node, struct lttng_consumer_stream, node);
+                       stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
 
                        if (revents & (LPOLLIN | LPOLLPRI)) {
                                /* Get the data out of the metadata file descriptor */
@@ -2798,21 +2780,16 @@ error_testpoint:
  */
 static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
 {
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
-       lttng::urcu::read_lock_guard read_lock;
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                /*
                 * Protect against teardown with mutex.
                 */
@@ -2848,21 +2825,16 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
 
 static void destroy_channel_ht(struct lttng_ht *ht)
 {
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
-       int ret;
-
        if (ht == nullptr) {
                return;
        }
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
-                       ret = lttng_ht_del(ht, &iter);
-                       LTTNG_ASSERT(ret != 0);
-               }
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::wait_fd_node),
+                                                &lttng_consumer_channel::wait_fd_node>(*ht->ht)) {
+               const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
+               LTTNG_ASSERT(ret != 0);
        }
 
        lttng_ht_destroy(ht);
@@ -2970,7 +2942,7 @@ void *consumer_thread_channel_poll(void *data)
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                                       chan->wait_fd);
-                                               lttng::urcu::read_lock_guard read_lock;
+                                               const lttng::urcu::read_lock_guard read_lock;
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                        &chan->wait_fd_node);
                                                /* Add channel to the global poll events list */
@@ -2991,7 +2963,7 @@ void *consumer_thread_channel_poll(void *data)
                                                 * GET_CHANNEL failed.
                                                 */
 
-                                               lttng::urcu::read_lock_guard read_lock;
+                                               const lttng::urcu::read_lock_guard read_lock;
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
                                                        ERR("UST consumer get channel key %" PRIu64
@@ -3059,16 +3031,17 @@ void *consumer_thread_channel_poll(void *data)
                                continue;
                        }
 
-                       lttng::urcu::read_lock_guard read_lock;
+                       const lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
                                lttng_ht_lookup(channel_ht, &tmp_id, &iter);
                        }
-                       node = lttng_ht_iter_get_node_u64(&iter);
+                       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
-                       chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
+                       chan = lttng::utils::container_of(node,
+                                                         &lttng_consumer_channel::wait_fd_node);
 
                        /* Check for error event */
                        if (revents & (LPOLLERR | LPOLLHUP)) {
@@ -3697,27 +3670,23 @@ error_nosignal:
  */
 static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 {
-       struct lttng_ht_iter iter;
-       struct consumer_relayd_sock_pair *relayd = nullptr;
-
-       ASSERT_RCU_READ_LOCKED();
-
        /* Iterate over all relayd since they are indexed by net_seq_idx. */
-       cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+       for (auto *relayd :
+            lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+                                                decltype(consumer_relayd_sock_pair::node),
+                                                &consumer_relayd_sock_pair::node>(
+                    *the_consumer_data.relayd_ht->ht)) {
                /*
                 * Check by sessiond id which is unique here where the relayd session
                 * id might not be when having multiple relayd.
                 */
                if (relayd->sessiond_session_id == id) {
                        /* Found the relayd. There can be only one per id. */
-                       goto found;
+                       return relayd;
                }
        }
 
        return nullptr;
-
-found:
-       return relayd;
 }
 
 /*
@@ -3729,16 +3698,13 @@ found:
 int consumer_data_pending(uint64_t id)
 {
        int ret;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
+       const auto ht = the_consumer_data.stream_list_ht;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        int (*data_pending)(struct lttng_consumer_stream *);
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       lttng::urcu::read_lock_guard read_lock;
-       pthread_mutex_lock(&the_consumer_data.lock);
+       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3753,18 +3719,12 @@ int consumer_data_pending(uint64_t id)
                abort();
        }
 
-       /* Ease our life a bit */
-       ht = the_consumer_data.stream_list_ht;
-
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&id, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &id,
-                                         &iter.iter,
-                                         stream,
-                                         node_session_id.node)
-       {
-               pthread_mutex_lock(&stream->lock);
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_session_id),
+                    &lttng_consumer_stream::node_session_id,
+                    std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                /*
                 * A removed node from the hash table indicates that the stream has
@@ -3777,35 +3737,30 @@ int consumer_data_pending(uint64_t id)
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
                                goto data_pending;
                        }
                }
-
-               pthread_mutex_unlock(&stream->lock);
        }
 
        relayd = find_relayd_by_session_id(id);
        if (relayd) {
                unsigned int is_data_inflight = 0;
 
+               const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
+
                /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
                if (ret < 0) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        /* Communication error thus the relayd so no data pending. */
                        goto data_not_pending;
                }
 
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
-               {
+               for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                            lttng_consumer_stream,
+                            decltype(lttng_consumer_stream::node_session_id),
+                            &lttng_consumer_stream::node_session_id,
+                            std::uint64_t>(
+                            *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                               stream->relayd_stream_id);
@@ -3816,13 +3771,11 @@ int consumer_data_pending(uint64_t id)
                        }
 
                        if (ret == 1) {
-                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_pending;
                        } else if (ret < 0) {
                                ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
                                    relayd->net_seq_idx);
                                lttng_consumer_cleanup_relayd(relayd);
-                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_not_pending;
                        }
                }
@@ -3830,7 +3783,6 @@ int consumer_data_pending(uint64_t id)
                /* Send end command for data pending. */
                ret = relayd_end_data_pending(
                        &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
                            relayd->net_seq_idx);
@@ -3850,12 +3802,10 @@ int consumer_data_pending(uint64_t id)
 
 data_not_pending:
        /* Data is available to be read by a viewer. */
-       pthread_mutex_unlock(&the_consumer_data.lock);
        return 0;
 
 data_pending:
        /* Data is still being extracted from buffers. */
-       pthread_mutex_unlock(&the_consumer_data.lock);
        return 1;
 }
 
@@ -3960,9 +3910,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                  uint64_t relayd_id)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
        struct lttng_dynamic_array stream_rotation_positions;
        uint64_t next_chunk_id, stream_count = 0;
        enum lttng_trace_chunk_status chunk_status;
@@ -3971,7 +3919,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        bool rotating_to_new_chunk = true;
        /* Array of `struct lttng_consumer_stream *` */
        struct lttng_dynamic_pointer_array streams_packet_to_open;
-       size_t stream_idx;
 
        ASSERT_RCU_READ_LOCKED();
 
@@ -3982,24 +3929,23 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                 nullptr);
        lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::pthread::lock_guard channel_lock(channel->lock);
 
-       pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
        chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ret = -1;
-               goto end_unlock_channel;
+               goto end;
        }
 
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                unsigned long produced_pos = 0, consumed_pos = 0;
 
                health_code_update();
@@ -4007,7 +3953,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                /*
                 * Lock stream because we are about to change its state.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                if (stream->trace_chunk == stream->chan->trace_chunk) {
                        rotating_to_new_chunk = false;
@@ -4061,7 +4007,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                        ret = sample_stream_positions(
                                                stream, &produced_pos, &consumed_pos);
                                        if (ret) {
-                                               goto end_unlock_stream;
+                                               goto end;
                                        }
 
                                        /*
@@ -4110,26 +4056,26 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                    stream->key);
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
 
                ret = lttng_consumer_take_snapshot(stream);
                if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
                        ERR("Failed to sample snapshot position during channel rotation");
-                       goto end_unlock_stream;
+                       goto end;
                }
                if (!ret) {
                        ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
                        if (ret < 0) {
                                ERR("Failed to sample produced position during channel rotation");
-                               goto end_unlock_stream;
+                               goto end;
                        }
 
                        ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
                        if (ret < 0) {
                                ERR("Failed to sample consumed position during channel rotation");
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
                /*
@@ -4169,7 +4115,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
                            stream->key);
                        ret = -1;
-                       goto end_unlock_stream;
+                       goto end;
                }
                stream->rotate_position = stream->last_sequence_number + 1 +
                        ((produced_pos - consumed_pos) / stream->max_sb_size);
@@ -4192,7 +4138,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                              &position);
                        if (ret) {
                                ERR("Failed to allocate stream rotation position");
-                               goto end_unlock_stream;
+                               goto end;
                        }
                        stream_count++;
                }
@@ -4254,20 +4200,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        if (ret) {
                                PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
                                ret = -1;
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
-
-               pthread_mutex_unlock(&stream->lock);
        }
-       stream = nullptr;
 
        if (!is_local_trace) {
                relayd = consumer_find_relayd(relayd_id);
                if (!relayd) {
                        ERR("Failed to find relayd %" PRIu64, relayd_id);
                        ret = -1;
-                       goto end_unlock_channel;
+                       goto end;
                }
 
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -4281,21 +4224,19 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
                            relayd->net_seq_idx);
                        lttng_consumer_cleanup_relayd(relayd);
-                       goto end_unlock_channel;
+                       goto end;
                }
        }
 
-       for (stream_idx = 0;
+       for (std::size_t stream_idx = 0;
             stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
             stream_idx++) {
                enum consumer_stream_open_packet_status status;
-
-               stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
+               auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
                        &streams_packet_to_open, stream_idx);
 
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                status = consumer_stream_open_packet(stream);
-               pthread_mutex_unlock(&stream->lock);
                switch (status) {
                case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                        DBG("Opened a packet after a rotation: stream id = %" PRIu64
@@ -4319,20 +4260,13 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                        /* Logged by callee. */
                        ret = -1;
-                       goto end_unlock_channel;
+                       goto end;
                default:
                        abort();
                }
        }
 
-       pthread_mutex_unlock(&channel->lock);
        ret = 0;
-       goto end;
-
-end_unlock_stream:
-       pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
-       pthread_mutex_unlock(&channel->lock);
 end:
        lttng_dynamic_array_reset(&stream_rotation_positions);
        lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
@@ -4417,27 +4351,22 @@ error:
 
 static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
 {
-       int ret;
-       struct lttng_consumer_stream *stream;
+       const lttng::urcu::read_lock_guard read_lock;
+       const lttng::pthread::lock_guard channel_lock(channel->lock);
 
-       lttng::urcu::read_lock_guard read_lock;
-       pthread_mutex_lock(&channel->lock);
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
-               pthread_mutex_lock(&stream->lock);
-               ret = consumer_clear_stream(stream);
+
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
+               const auto ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error_unlock;
+                       return ret;
                }
-               pthread_mutex_unlock(&stream->lock);
        }
-       pthread_mutex_unlock(&channel->lock);
-       return 0;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&channel->lock);
-       return ret;
+       return 0;
 }
 
 /*
@@ -4626,39 +4555,31 @@ error:
 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        ASSERT_RCU_READ_LOCKED();
 
-       lttng::urcu::read_lock_guard read_lock;
-
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
-               pthread_mutex_lock(&stream->chan->lock);
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                if (!stream->rotate_ready) {
-                       pthread_mutex_unlock(&stream->lock);
-                       pthread_mutex_unlock(&stream->chan->lock);
                        continue;
                }
-               DBG("Consumer rotate ready stream %" PRIu64, stream->key);
 
+               DBG("Consumer rotate ready stream %" PRIu64, stream->key);
                ret = lttng_consumer_rotate_stream(stream);
-               pthread_mutex_unlock(&stream->lock);
-               pthread_mutex_unlock(&stream->chan->lock);
                if (ret) {
                        goto end;
                }
@@ -4707,8 +4628,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
        char creation_timestamp_buffer[ISO8601_STR_LEN];
        const char *relayd_id_str = "(none)";
        const char *creation_timestamp_str;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -4795,50 +4714,45 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                goto error;
        }
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry_duplicate(
-                       the_consumer_data.channels_by_session_id_ht->ht,
-                       the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
-                                                                             lttng_ht_seed),
-                       the_consumer_data.channels_by_session_id_ht->match_fct,
-                       &session_id,
-                       &iter.iter,
-                       channel,
-                       channels_by_session_id_ht_node.node)
-               {
-                       ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
-                       if (ret) {
-                               /*
-                                * Roll-back the creation of this chunk.
-                                *
-                                * This is important since the session daemon will
-                                * assume that the creation of this chunk failed and
-                                * will never ask for it to be closed, resulting
-                                * in a leak and an inconsistent state for some
-                                * channels.
-                                */
-                               enum lttcomm_return_code close_ret;
-                               char path[LTTNG_PATH_MAX];
-
-                               DBG("Failed to set new trace chunk on existing channels, rolling back");
-                               close_ret =
-                                       lttng_consumer_close_trace_chunk(relayd_id,
-                                                                        session_id,
-                                                                        chunk_id,
-                                                                        chunk_creation_timestamp,
-                                                                        nullptr,
-                                                                        path);
-                               if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
-                                       ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
-                                           ", chunk_id = %" PRIu64,
-                                           session_id,
-                                           chunk_id);
-                               }
+       for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_channel,
+                    decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
+                    &lttng_consumer_channel::channels_by_session_id_ht_node,
+                    std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
+                                   &session_id,
+                                   the_consumer_data.channels_by_session_id_ht->hash_fct(
+                                           &session_id, lttng_ht_seed),
+                                   the_consumer_data.channels_by_session_id_ht->match_fct)) {
+               ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+               if (ret) {
+                       /*
+                        * Roll-back the creation of this chunk.
+                        *
+                        * This is important since the session daemon will
+                        * assume that the creation of this chunk failed and
+                        * will never ask for it to be closed, resulting
+                        * in a leak and an inconsistent state for some
+                        * channels.
+                        */
+                       enum lttcomm_return_code close_ret;
+                       char path[LTTNG_PATH_MAX];
 
-                               ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
-                               break;
+                       DBG("Failed to set new trace chunk on existing channels, rolling back");
+                       close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+                                                                    session_id,
+                                                                    chunk_id,
+                                                                    chunk_creation_timestamp,
+                                                                    nullptr,
+                                                                    path);
+                       if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+                               ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+                                   ", chunk_id = %" PRIu64,
+                                   session_id,
+                                   chunk_id);
                        }
+
+                       ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+                       break;
                }
        }
 
@@ -4896,8 +4810,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
        char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
        const char *relayd_id_str = "(none)";
        const char *close_command_name = "none";
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
        enum lttng_trace_chunk_status chunk_status;
 
        if (relayd_id) {
@@ -4951,32 +4863,33 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * it; it is only kept around to compare it (by address) to the
         * current chunk found in the session's channels.
         */
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry (
-                       the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
-                       int ret;
+       for (auto *channel :
+            lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+                                                decltype(lttng_consumer_channel::node),
+                                                &lttng_consumer_channel::node>(
+                    *the_consumer_data.channel_ht->ht)) {
+               int ret;
 
+               /*
+                * Only change the channel's chunk to NULL if it still
+                * references the chunk being closed. The channel may
+                * reference a newer channel in the case of a session
+                * rotation. When a session rotation occurs, the "next"
+                * chunk is created before the "current" chunk is closed.
+                */
+               if (channel->trace_chunk != chunk) {
+                       continue;
+               }
+               ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+               if (ret) {
                        /*
-                        * Only change the channel's chunk to NULL if it still
-                        * references the chunk being closed. The channel may
-                        * reference a newer channel in the case of a session
-                        * rotation. When a session rotation occurs, the "next"
-                        * chunk is created before the "current" chunk is closed.
+                        * Attempt to close the chunk on as many channels as
+                        * possible.
                         */
-                       if (channel->trace_chunk != chunk) {
-                               continue;
-                       }
-                       ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
-                       if (ret) {
-                               /*
-                                * Attempt to close the chunk on as many channels as
-                                * possible.
-                                */
-                               ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
-                       }
+                       ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
                }
        }
+
        if (relayd_id) {
                int ret;
                struct consumer_relayd_sock_pair *relayd;
@@ -5017,7 +4930,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        const bool is_local_trace = !relayd_id;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        bool chunk_exists_local, chunk_exists_remote;
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -5077,41 +4990,32 @@ end:
 
 static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
 {
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
        int ret;
-
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
-       lttng::urcu::read_lock_guard read_lock;
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                /*
                 * Protect against teardown with mutex.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
+                       continue;
                }
+
                ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error_unlock;
+                       return ret;
                }
-       next:
-               pthread_mutex_unlock(&stream->lock);
        }
-       return LTTCOMM_CONSUMERD_SUCCESS;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       return ret;
+       return LTTCOMM_CONSUMERD_SUCCESS;
 }
 
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
@@ -5142,71 +5046,60 @@ end:
 
 enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
 {
-       struct lttng_consumer_stream *stream;
        enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
 
        if (channel->metadata_stream) {
                ERR("Open channel packets command attempted on a metadata channel");
-               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
-               goto end;
+               return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
        }
 
-       {
-               lttng::urcu::read_lock_guard read_lock;
-               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
-                       enum consumer_stream_open_packet_status status;
-
-                       pthread_mutex_lock(&stream->lock);
-                       if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                               goto next;
-                       }
+       const lttng::urcu::read_lock_guard read_lock;
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
+               enum consumer_stream_open_packet_status status;
 
-                       status = consumer_stream_open_packet(stream);
-                       switch (status) {
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                               DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
-                                   ", channel name = %s, session id = %" PRIu64,
-                                   stream->key,
-                                   stream->chan->name,
-                                   stream->chan->session_id);
-                               stream->opened_packet_in_current_trace_chunk = true;
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                               DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
-                                   ", channel name = %s, session id = %" PRIu64,
-                                   stream->key,
-                                   stream->chan->name,
-                                   stream->chan->session_id);
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                               /*
-                                * Only unexpected internal errors can lead to this
-                                * failing. Report an unknown error.
-                                */
-                               ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
-                                   ", channel id = %" PRIu64 ", channel name = %s"
-                                   ", session id = %" PRIu64,
-                                   stream->key,
-                                   channel->key,
-                                   channel->name,
-                                   channel->session_id);
-                               ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
-                               goto error_unlock;
-                       default:
-                               abort();
-                       }
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       continue;
+               }
 
-               next:
-                       pthread_mutex_unlock(&stream->lock);
+               status = consumer_stream_open_packet(stream);
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                           stream->key,
+                           stream->chan->name,
+                           stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                           stream->key,
+                           stream->chan->name,
+                           stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /*
+                        * Only unexpected internal errors can lead to this
+                        * failing. Report an unknown error.
+                        */
+                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                           ", channel id = %" PRIu64 ", channel name = %s"
+                           ", session id = %" PRIu64,
+                           stream->key,
+                           channel->key,
+                           channel->name,
+                           channel->session_id);
+                       return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+               default:
+                       abort();
                }
        }
-end_rcu_unlock:
-end:
-       return ret;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       goto end_rcu_unlock;
+       return ret;
 }
 
 void lttng_consumer_sigbus_handle(void *addr)
This page took 0.043282 seconds and 4 git commands to generate.