consumerd: consumer.cpp: iterate on lfht using lfht_filtered_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 01:26:10 +0000 (01:26 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 18:26:44 +0000 (14:26 -0400)
Change-Id: I6f7e66d8446e1ebc05679a61fd190053f252902f
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer.cpp

index c991d622bb4e2b8848e78e2ffb610e01dc38ffa9..6ed6e216b9ea060227e93eea76bb04382b2a2485 100644 (file)
@@ -23,6 +23,7 @@
 #include <common/io-hint.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
@@ -2779,21 +2780,16 @@ error_testpoint:
  */
 static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
 {
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
-       const lttng::urcu::read_lock_guard read_lock;
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                /*
                 * Protect against teardown with mutex.
                 */
@@ -3702,16 +3698,13 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
 int consumer_data_pending(uint64_t id)
 {
        int ret;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
+       const auto ht = the_consumer_data.stream_list_ht;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        int (*data_pending)(struct lttng_consumer_stream *);
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       const lttng::urcu::read_lock_guard read_lock;
-       pthread_mutex_lock(&the_consumer_data.lock);
+       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3726,18 +3719,12 @@ int consumer_data_pending(uint64_t id)
                abort();
        }
 
-       /* Ease our life a bit */
-       ht = the_consumer_data.stream_list_ht;
-
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&id, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &id,
-                                         &iter.iter,
-                                         stream,
-                                         node_session_id.node)
-       {
-               pthread_mutex_lock(&stream->lock);
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_session_id),
+                    &lttng_consumer_stream::node_session_id,
+                    std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                /*
                 * A removed node from the hash table indicates that the stream has
@@ -3750,35 +3737,30 @@ int consumer_data_pending(uint64_t id)
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
-                               pthread_mutex_unlock(&stream->lock);
                                goto data_pending;
                        }
                }
-
-               pthread_mutex_unlock(&stream->lock);
        }
 
        relayd = find_relayd_by_session_id(id);
        if (relayd) {
                unsigned int is_data_inflight = 0;
 
+               const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
+
                /* Send init command for data pending. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
                if (ret < 0) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        /* Communication error thus the relayd so no data pending. */
                        goto data_not_pending;
                }
 
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
-               {
+               for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                            lttng_consumer_stream,
+                            decltype(lttng_consumer_stream::node_session_id),
+                            &lttng_consumer_stream::node_session_id,
+                            std::uint64_t>(
+                            *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
                        if (stream->metadata_flag) {
                                ret = relayd_quiescent_control(&relayd->control_sock,
                                                               stream->relayd_stream_id);
@@ -3789,13 +3771,11 @@ int consumer_data_pending(uint64_t id)
                        }
 
                        if (ret == 1) {
-                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_pending;
                        } else if (ret < 0) {
                                ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
                                    relayd->net_seq_idx);
                                lttng_consumer_cleanup_relayd(relayd);
-                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                                goto data_not_pending;
                        }
                }
@@ -3803,7 +3783,6 @@ int consumer_data_pending(uint64_t id)
                /* Send end command for data pending. */
                ret = relayd_end_data_pending(
                        &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
                            relayd->net_seq_idx);
@@ -3823,12 +3802,10 @@ int consumer_data_pending(uint64_t id)
 
 data_not_pending:
        /* Data is available to be read by a viewer. */
-       pthread_mutex_unlock(&the_consumer_data.lock);
        return 0;
 
 data_pending:
        /* Data is still being extracted from buffers. */
-       pthread_mutex_unlock(&the_consumer_data.lock);
        return 1;
 }
 
@@ -3933,9 +3910,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                  uint64_t relayd_id)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
        struct lttng_dynamic_array stream_rotation_positions;
        uint64_t next_chunk_id, stream_count = 0;
        enum lttng_trace_chunk_status chunk_status;
@@ -3944,7 +3919,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        bool rotating_to_new_chunk = true;
        /* Array of `struct lttng_consumer_stream *` */
        struct lttng_dynamic_pointer_array streams_packet_to_open;
-       size_t stream_idx;
 
        ASSERT_RCU_READ_LOCKED();
 
@@ -3955,24 +3929,23 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                 nullptr);
        lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       const lttng::urcu::read_lock_guard read_lock;
+       const lttng::pthread::lock_guard channel_lock(channel->lock);
 
-       pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
        chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
        if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ret = -1;
-               goto end_unlock_channel;
+               goto end;
        }
 
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                unsigned long produced_pos = 0, consumed_pos = 0;
 
                health_code_update();
@@ -3980,7 +3953,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                /*
                 * Lock stream because we are about to change its state.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                if (stream->trace_chunk == stream->chan->trace_chunk) {
                        rotating_to_new_chunk = false;
@@ -4034,7 +4007,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                        ret = sample_stream_positions(
                                                stream, &produced_pos, &consumed_pos);
                                        if (ret) {
-                                               goto end_unlock_stream;
+                                               goto end;
                                        }
 
                                        /*
@@ -4083,26 +4056,26 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        if (ret < 0) {
                                ERR("Failed to flush stream %" PRIu64 " during channel rotation",
                                    stream->key);
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
 
                ret = lttng_consumer_take_snapshot(stream);
                if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
                        ERR("Failed to sample snapshot position during channel rotation");
-                       goto end_unlock_stream;
+                       goto end;
                }
                if (!ret) {
                        ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
                        if (ret < 0) {
                                ERR("Failed to sample produced position during channel rotation");
-                               goto end_unlock_stream;
+                               goto end;
                        }
 
                        ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
                        if (ret < 0) {
                                ERR("Failed to sample consumed position during channel rotation");
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
                /*
@@ -4142,7 +4115,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
                            stream->key);
                        ret = -1;
-                       goto end_unlock_stream;
+                       goto end;
                }
                stream->rotate_position = stream->last_sequence_number + 1 +
                        ((produced_pos - consumed_pos) / stream->max_sb_size);
@@ -4165,7 +4138,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                              &position);
                        if (ret) {
                                ERR("Failed to allocate stream rotation position");
-                               goto end_unlock_stream;
+                               goto end;
                        }
                        stream_count++;
                }
@@ -4227,20 +4200,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        if (ret) {
                                PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
                                ret = -1;
-                               goto end_unlock_stream;
+                               goto end;
                        }
                }
-
-               pthread_mutex_unlock(&stream->lock);
        }
-       stream = nullptr;
 
        if (!is_local_trace) {
                relayd = consumer_find_relayd(relayd_id);
                if (!relayd) {
                        ERR("Failed to find relayd %" PRIu64, relayd_id);
                        ret = -1;
-                       goto end_unlock_channel;
+                       goto end;
                }
 
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -4254,21 +4224,19 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
                            relayd->net_seq_idx);
                        lttng_consumer_cleanup_relayd(relayd);
-                       goto end_unlock_channel;
+                       goto end;
                }
        }
 
-       for (stream_idx = 0;
+       for (std::size_t stream_idx = 0;
             stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
             stream_idx++) {
                enum consumer_stream_open_packet_status status;
-
-               stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
+               auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
                        &streams_packet_to_open, stream_idx);
 
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                status = consumer_stream_open_packet(stream);
-               pthread_mutex_unlock(&stream->lock);
                switch (status) {
                case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
                        DBG("Opened a packet after a rotation: stream id = %" PRIu64
@@ -4292,20 +4260,13 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
                        /* Logged by callee. */
                        ret = -1;
-                       goto end_unlock_channel;
+                       goto end;
                default:
                        abort();
                }
        }
 
-       pthread_mutex_unlock(&channel->lock);
        ret = 0;
-       goto end;
-
-end_unlock_stream:
-       pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
-       pthread_mutex_unlock(&channel->lock);
 end:
        lttng_dynamic_array_reset(&stream_rotation_positions);
        lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
@@ -4599,39 +4560,31 @@ error:
 int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        ASSERT_RCU_READ_LOCKED();
 
-       const lttng::urcu::read_lock_guard read_lock;
-
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
-               pthread_mutex_lock(&stream->chan->lock);
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                if (!stream->rotate_ready) {
-                       pthread_mutex_unlock(&stream->lock);
-                       pthread_mutex_unlock(&stream->chan->lock);
                        continue;
                }
-               DBG("Consumer rotate ready stream %" PRIu64, stream->key);
 
+               DBG("Consumer rotate ready stream %" PRIu64, stream->key);
                ret = lttng_consumer_rotate_stream(stream);
-               pthread_mutex_unlock(&stream->lock);
-               pthread_mutex_unlock(&stream->chan->lock);
                if (ret) {
                        goto end;
                }
@@ -4680,8 +4633,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
        char creation_timestamp_buffer[ISO8601_STR_LEN];
        const char *relayd_id_str = "(none)";
        const char *creation_timestamp_str;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *channel;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -4768,50 +4719,45 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
                goto error;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_lfht_for_each_entry_duplicate(
-                       the_consumer_data.channels_by_session_id_ht->ht,
-                       the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
-                                                                             lttng_ht_seed),
-                       the_consumer_data.channels_by_session_id_ht->match_fct,
-                       &session_id,
-                       &iter.iter,
-                       channel,
-                       channels_by_session_id_ht_node.node)
-               {
-                       ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
-                       if (ret) {
-                               /*
-                                * Roll-back the creation of this chunk.
-                                *
-                                * This is important since the session daemon will
-                                * assume that the creation of this chunk failed and
-                                * will never ask for it to be closed, resulting
-                                * in a leak and an inconsistent state for some
-                                * channels.
-                                */
-                               enum lttcomm_return_code close_ret;
-                               char path[LTTNG_PATH_MAX];
-
-                               DBG("Failed to set new trace chunk on existing channels, rolling back");
-                               close_ret =
-                                       lttng_consumer_close_trace_chunk(relayd_id,
-                                                                        session_id,
-                                                                        chunk_id,
-                                                                        chunk_creation_timestamp,
-                                                                        nullptr,
-                                                                        path);
-                               if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
-                                       ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
-                                           ", chunk_id = %" PRIu64,
-                                           session_id,
-                                           chunk_id);
-                               }
+       for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_channel,
+                    decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
+                    &lttng_consumer_channel::channels_by_session_id_ht_node,
+                    std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
+                                   &session_id,
+                                   the_consumer_data.channels_by_session_id_ht->hash_fct(
+                                           &session_id, lttng_ht_seed),
+                                   the_consumer_data.channels_by_session_id_ht->match_fct)) {
+               ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+               if (ret) {
+                       /*
+                        * Roll-back the creation of this chunk.
+                        *
+                        * This is important since the session daemon will
+                        * assume that the creation of this chunk failed and
+                        * will never ask for it to be closed, resulting
+                        * in a leak and an inconsistent state for some
+                        * channels.
+                        */
+                       enum lttcomm_return_code close_ret;
+                       char path[LTTNG_PATH_MAX];
 
-                               ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
-                               break;
+                       DBG("Failed to set new trace chunk on existing channels, rolling back");
+                       close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+                                                                    session_id,
+                                                                    chunk_id,
+                                                                    chunk_creation_timestamp,
+                                                                    nullptr,
+                                                                    path);
+                       if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+                               ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+                                   ", chunk_id = %" PRIu64,
+                                   session_id,
+                                   chunk_id);
                        }
+
+                       ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+                       break;
                }
        }
 
@@ -5049,41 +4995,32 @@ end:
 
 static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
 {
-       struct lttng_ht *ht;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht_iter iter;
        int ret;
-
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
-       const lttng::urcu::read_lock_guard read_lock;
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                /*
                 * Protect against teardown with mutex.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                       goto next;
+                       continue;
                }
+
                ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error_unlock;
+                       return ret;
                }
-       next:
-               pthread_mutex_unlock(&stream->lock);
        }
-       return LTTCOMM_CONSUMERD_SUCCESS;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       return ret;
+       return LTTCOMM_CONSUMERD_SUCCESS;
 }
 
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
This page took 0.034587 seconds and 4 git commands to generate.