From b044005eef27d40e199455cbfed7223ddef2b33e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Tue, 30 Jul 2024 01:38:23 +0000 Subject: [PATCH] consumerd: ust-consumer.cpp: iterate on lfht using lfht_filtered_iteration_adapter MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: If4f54d8e79a8eb68e2214d3182150b974b806398 Signed-off-by: Jérémie Galarneau --- src/common/ust-consumer/ust-consumer.cpp | 163 ++++++++++------------- 1 file changed, 73 insertions(+), 90 deletions(-) diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 8d3bc882a..82df2c945 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -661,9 +662,7 @@ static int flush_channel(uint64_t chan_key) { int ret = 0; struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht *ht; - struct lttng_ht_iter iter; + const auto ht = the_consumer_data.stream_per_chan_id_ht; DBG("UST consumer flush channel key %" PRIu64, chan_key); @@ -675,17 +674,15 @@ static int flush_channel(uint64_t chan_key) goto error; } - ht = the_consumer_data.stream_per_chan_id_ht; - /* For each stream of the channel id, flush it. */ - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { health_code_update(); pthread_mutex_lock(&stream->lock); @@ -733,41 +730,33 @@ error: */ static int clear_quiescent_channel(uint64_t chan_key) { - int ret = 0; - struct lttng_consumer_channel *channel; - struct lttng_consumer_stream *stream; - struct lttng_ht *ht; - struct lttng_ht_iter iter; - DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key); const lttng::urcu::read_lock_guard read_lock; - channel = consumer_find_channel(chan_key); + auto channel = consumer_find_channel(chan_key); if (!channel) { ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key); - ret = LTTNG_ERR_UST_CHAN_NOT_FOUND; - goto error; + return LTTNG_ERR_UST_CHAN_NOT_FOUND; } - ht = the_consumer_data.stream_per_chan_id_ht; + const auto ht = the_consumer_data.stream_per_chan_id_ht; /* For each stream of the channel id, clear quiescent state. */ - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { health_code_update(); - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); stream->quiescent = false; - pthread_mutex_unlock(&stream->lock); } -error: - return ret; + + return 0; } /* @@ -1863,39 +1852,36 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret = 0; uint64_t discarded_events; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - uint64_t id = msg.u.discarded_events.session_id; - const uint64_t key = msg.u.discarded_events.channel_key; + const auto id = msg.u.discarded_events.session_id; + const auto key = msg.u.discarded_events.channel_key; DBG("UST consumer discarded events command for session id %" PRIu64, id); - pthread_mutex_lock(&the_consumer_data.lock); - - ht = the_consumer_data.stream_list_ht; - - /* - * We only need a reference to the channel, but they are not - * directly indexed, so we just use the first matching stream - * to extract the information we need, we default to 0 if not - * found (no events are dropped if the channel is not yet in - * use). - */ - discarded_events = 0; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) { - if (stream->chan->key == key) { - discarded_events = stream->chan->discarded_events; - break; + const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock); + const auto ht = the_consumer_data.stream_list_ht; + + /* + * We only need a reference to the channel, but they are not + * directly indexed, so we just use the first matching stream + * to extract the information we need, we default to 0 if not + * found (no events are dropped if the channel is not yet in + * use). + */ + discarded_events = 0; + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>(*ht->ht, + &id, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct)) { + if (stream->chan->key == key) { + discarded_events = stream->chan->discarded_events; + break; + } } } - pthread_mutex_unlock(&the_consumer_data.lock); DBG("UST consumer discarded events command for session id %" PRIu64 ", channel key %" PRIu64, @@ -1917,38 +1903,35 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, { int ret; uint64_t lost_packets; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - uint64_t id = msg.u.lost_packets.session_id; - const uint64_t key = msg.u.lost_packets.channel_key; + const auto id = msg.u.lost_packets.session_id; + const auto key = msg.u.lost_packets.channel_key; DBG("UST consumer lost packets command for session id %" PRIu64, id); - pthread_mutex_lock(&the_consumer_data.lock); - - ht = the_consumer_data.stream_list_ht; - - /* - * We only need a reference to the channel, but they are not - * directly indexed, so we just use the first matching stream - * to extract the information we need, we default to 0 if not - * found (no packets lost if the channel is not yet in use). - */ - lost_packets = 0; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) { - if (stream->chan->key == key) { - lost_packets = stream->chan->lost_packets; - break; + const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock); + const auto ht = the_consumer_data.stream_list_ht; + + /* + * We only need a reference to the channel, but they are not + * directly indexed, so we just use the first matching stream + * to extract the information we need, we default to 0 if not + * found (no packets lost if the channel is not yet in use). + */ + lost_packets = 0; + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_session_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>(*ht->ht, + &id, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct)) { + if (stream->chan->key == key) { + lost_packets = stream->chan->lost_packets; + break; + } } } - pthread_mutex_unlock(&the_consumer_data.lock); DBG("UST consumer lost packets command for session id %" PRIu64 ", channel key %" PRIu64, -- 2.39.5