#include <common/consumer/consumer.hpp>
#include <common/index/index.hpp>
#include <common/optional.hpp>
+#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/shm.hpp>
{
int ret = 0;
struct lttng_consumer_channel *channel;
- struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
- struct lttng_ht_iter iter;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
DBG("UST consumer flush channel key %" PRIu64, chan_key);
goto error;
}
- ht = the_consumer_data.stream_per_chan_id_ht;
-
/* For each stream of the channel id, flush it. */
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
pthread_mutex_lock(&stream->lock);
*/
static int clear_quiescent_channel(uint64_t chan_key)
{
- int ret = 0;
- struct lttng_consumer_channel *channel;
- struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
- struct lttng_ht_iter iter;
-
DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
const lttng::urcu::read_lock_guard read_lock;
- channel = consumer_find_channel(chan_key);
+ auto channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
- ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
- goto error;
+ return LTTNG_ERR_UST_CHAN_NOT_FOUND;
}
- ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
/* For each stream of the channel id, clear quiescent state. */
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
stream->quiescent = false;
- pthread_mutex_unlock(&stream->lock);
}
-error:
- return ret;
+
+ return 0;
}
/*
{
int ret = 0;
uint64_t discarded_events;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- uint64_t id = msg.u.discarded_events.session_id;
- const uint64_t key = msg.u.discarded_events.channel_key;
+ const auto id = msg.u.discarded_events.session_id;
+ const auto key = msg.u.discarded_events.channel_key;
DBG("UST consumer discarded events command for session id %" PRIu64, id);
- pthread_mutex_lock(&the_consumer_data.lock);
-
- ht = the_consumer_data.stream_list_ht;
-
- /*
- * We only need a reference to the channel, but they are not
- * directly indexed, so we just use the first matching stream
- * to extract the information we need, we default to 0 if not
- * found (no events are dropped if the channel is not yet in
- * use).
- */
- discarded_events = 0;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
{
- if (stream->chan->key == key) {
- discarded_events = stream->chan->discarded_events;
- break;
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+ const auto ht = the_consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no events are dropped if the channel is not yet in
+ * use).
+ */
+ discarded_events = 0;
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht,
+ &id,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct)) {
+ if (stream->chan->key == key) {
+ discarded_events = stream->chan->discarded_events;
+ break;
+ }
}
}
- pthread_mutex_unlock(&the_consumer_data.lock);
DBG("UST consumer discarded events command for session id %" PRIu64
", channel key %" PRIu64,
{
int ret;
uint64_t lost_packets;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- uint64_t id = msg.u.lost_packets.session_id;
- const uint64_t key = msg.u.lost_packets.channel_key;
+ const auto id = msg.u.lost_packets.session_id;
+ const auto key = msg.u.lost_packets.channel_key;
DBG("UST consumer lost packets command for session id %" PRIu64, id);
- pthread_mutex_lock(&the_consumer_data.lock);
-
- ht = the_consumer_data.stream_list_ht;
-
- /*
- * We only need a reference to the channel, but they are not
- * directly indexed, so we just use the first matching stream
- * to extract the information we need, we default to 0 if not
- * found (no packets lost if the channel is not yet in use).
- */
- lost_packets = 0;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
{
- if (stream->chan->key == key) {
- lost_packets = stream->chan->lost_packets;
- break;
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
+ const auto ht = the_consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no packets lost if the channel is not yet in use).
+ */
+ lost_packets = 0;
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht,
+ &id,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct)) {
+ if (stream->chan->key == key) {
+ lost_packets = stream->chan->lost_packets;
+ break;
+ }
}
}
- pthread_mutex_unlock(&the_consumer_data.lock);
DBG("UST consumer lost packets command for session id %" PRIu64
", channel key %" PRIu64,