consumerd: ust-consumer.cpp: iterate on lfht using lfht_filtered_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 01:38:23 +0000 (01:38 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 18:26:44 +0000 (14:26 -0400)
Change-Id: If4f54d8e79a8eb68e2214d3182150b974b806398
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/ust-consumer/ust-consumer.cpp

index 8d3bc882af13e587ada2438c9fe39baac1d37804..82df2c94548b8b178826e672874a8060a45cabe1 100644 (file)
@@ -18,6 +18,7 @@
 #include <common/consumer/consumer.hpp>
 #include <common/index/index.hpp>
 #include <common/optional.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/shm.hpp>
@@ -661,9 +662,7 @@ static int flush_channel(uint64_t chan_key)
 {
        int ret = 0;
        struct lttng_consumer_channel *channel;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
-       struct lttng_ht_iter iter;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        DBG("UST consumer flush channel key %" PRIu64, chan_key);
 
@@ -675,17 +674,15 @@ static int flush_channel(uint64_t chan_key)
                goto error;
        }
 
-       ht = the_consumer_data.stream_per_chan_id_ht;
-
        /* For each stream of the channel id, flush it. */
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
                pthread_mutex_lock(&stream->lock);
@@ -733,41 +730,33 @@ error:
  */
 static int clear_quiescent_channel(uint64_t chan_key)
 {
-       int ret = 0;
-       struct lttng_consumer_channel *channel;
-       struct lttng_consumer_stream *stream;
-       struct lttng_ht *ht;
-       struct lttng_ht_iter iter;
-
        DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
 
        const lttng::urcu::read_lock_guard read_lock;
-       channel = consumer_find_channel(chan_key);
+       auto channel = consumer_find_channel(chan_key);
        if (!channel) {
                ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
-               ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
-               goto error;
+               return LTTNG_ERR_UST_CHAN_NOT_FOUND;
        }
 
-       ht = the_consumer_data.stream_per_chan_id_ht;
+       const auto ht = the_consumer_data.stream_per_chan_id_ht;
 
        /* For each stream of the channel id, clear quiescent state. */
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                                         ht->hash_fct(&channel->key, lttng_ht_seed),
-                                         ht->match_fct,
-                                         &channel->key,
-                                         &iter.iter,
-                                         stream,
-                                         node_channel_id.node)
-       {
+       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                    lttng_consumer_stream,
+                    decltype(lttng_consumer_stream::node_channel_id),
+                    &lttng_consumer_stream::node_channel_id,
+                    std::uint64_t>(*ht->ht,
+                                   &channel->key,
+                                   ht->hash_fct(&channel->key, lttng_ht_seed),
+                                   ht->match_fct)) {
                health_code_update();
 
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                stream->quiescent = false;
-               pthread_mutex_unlock(&stream->lock);
        }
-error:
-       return ret;
+
+       return 0;
 }
 
 /*
@@ -1863,39 +1852,36 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                int ret = 0;
                uint64_t discarded_events;
-               struct lttng_ht_iter iter;
-               struct lttng_ht *ht;
-               struct lttng_consumer_stream *stream;
-               uint64_t id = msg.u.discarded_events.session_id;
-               const uint64_t key = msg.u.discarded_events.channel_key;
+               const auto id = msg.u.discarded_events.session_id;
+               const auto key = msg.u.discarded_events.channel_key;
 
                DBG("UST consumer discarded events command for session id %" PRIu64, id);
-               pthread_mutex_lock(&the_consumer_data.lock);
-
-               ht = the_consumer_data.stream_list_ht;
-
-               /*
-                * We only need a reference to the channel, but they are not
-                * directly indexed, so we just use the first matching stream
-                * to extract the information we need, we default to 0 if not
-                * found (no events are dropped if the channel is not yet in
-                * use).
-                */
-               discarded_events = 0;
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
                {
-                       if (stream->chan->key == key) {
-                               discarded_events = stream->chan->discarded_events;
-                               break;
+                       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+                       const auto ht = the_consumer_data.stream_list_ht;
+
+                       /*
+                        * We only need a reference to the channel, but they are not
+                        * directly indexed, so we just use the first matching stream
+                        * to extract the information we need, we default to 0 if not
+                        * found (no events are dropped if the channel is not yet in
+                        * use).
+                        */
+                       discarded_events = 0;
+                       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                                    lttng_consumer_stream,
+                                    decltype(lttng_consumer_stream::node_channel_id),
+                                    &lttng_consumer_stream::node_session_id,
+                                    std::uint64_t>(*ht->ht,
+                                                   &id,
+                                                   ht->hash_fct(&id, lttng_ht_seed),
+                                                   ht->match_fct)) {
+                               if (stream->chan->key == key) {
+                                       discarded_events = stream->chan->discarded_events;
+                                       break;
+                               }
                        }
                }
-               pthread_mutex_unlock(&the_consumer_data.lock);
 
                DBG("UST consumer discarded events command for session id %" PRIu64
                    ", channel key %" PRIu64,
@@ -1917,38 +1903,35 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        {
                int ret;
                uint64_t lost_packets;
-               struct lttng_ht_iter iter;
-               struct lttng_ht *ht;
-               struct lttng_consumer_stream *stream;
-               uint64_t id = msg.u.lost_packets.session_id;
-               const uint64_t key = msg.u.lost_packets.channel_key;
+               const auto id = msg.u.lost_packets.session_id;
+               const auto key = msg.u.lost_packets.channel_key;
 
                DBG("UST consumer lost packets command for session id %" PRIu64, id);
-               pthread_mutex_lock(&the_consumer_data.lock);
-
-               ht = the_consumer_data.stream_list_ht;
-
-               /*
-                * We only need a reference to the channel, but they are not
-                * directly indexed, so we just use the first matching stream
-                * to extract the information we need, we default to 0 if not
-                * found (no packets lost if the channel is not yet in use).
-                */
-               lost_packets = 0;
-               cds_lfht_for_each_entry_duplicate(ht->ht,
-                                                 ht->hash_fct(&id, lttng_ht_seed),
-                                                 ht->match_fct,
-                                                 &id,
-                                                 &iter.iter,
-                                                 stream,
-                                                 node_session_id.node)
                {
-                       if (stream->chan->key == key) {
-                               lost_packets = stream->chan->lost_packets;
-                               break;
+                       const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+                       const auto ht = the_consumer_data.stream_list_ht;
+
+                       /*
+                        * We only need a reference to the channel, but they are not
+                        * directly indexed, so we just use the first matching stream
+                        * to extract the information we need, we default to 0 if not
+                        * found (no packets lost if the channel is not yet in use).
+                        */
+                       lost_packets = 0;
+                       for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+                                    lttng_consumer_stream,
+                                    decltype(lttng_consumer_stream::node_session_id),
+                                    &lttng_consumer_stream::node_session_id,
+                                    std::uint64_t>(*ht->ht,
+                                                   &id,
+                                                   ht->hash_fct(&id, lttng_ht_seed),
+                                                   ht->match_fct)) {
+                               if (stream->chan->key == key) {
+                                       lost_packets = stream->chan->lost_packets;
+                                       break;
+                               }
                        }
                }
-               pthread_mutex_unlock(&the_consumer_data.lock);
 
                DBG("UST consumer lost packets command for session id %" PRIu64
                    ", channel key %" PRIu64,
This page took 0.02911 seconds and 4 git commands to generate.