*/
static void cleanup_relayd_ht()
{
- struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd;
-
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (
- the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
- consumer_destroy_relayd(relayd);
- }
+ for (auto *relayd :
+ lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+ decltype(consumer_relayd_sock_pair::node),
+ &consumer_relayd_sock_pair::node>(
+ *the_consumer_data.relayd_ht->ht)) {
+ consumer_destroy_relayd(relayd);
}
lttng_ht_destroy(the_consumer_data.relayd_ht);
static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
enum consumer_endpoint_status status)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- const lttng::urcu::read_lock_guard read_lock;
-
/* Let's begin with metadata */
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
stream->chan->metadata_pushed_wait_queue.wake_all();
}
/* Follow up by the data streams */
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*data_ht->ht)) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
DBG("Delete flag set to data stream %d", stream->wait_fd);
int *nb_inactive_fd)
{
int i = 0;
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(ht);
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Only active streams with an active end point can be added to the
- * poll set and local stream storage of the thread.
- *
- * There is a potential race here for endpoint_status to be updated
- * just after the check. However, this is OK since the stream(s) will
- * be deleted once the thread is notified that the end point state has
- * changed where this function will be called back again.
- *
- * We track the number of inactive FDs because they still need to be
- * closed by the polling thread after a wakeup on the data_pipe or
- * metadata_pipe.
- */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
- (*nb_inactive_fd)++;
- continue;
- }
-
- (*pollfd)[i].fd = stream->wait_fd;
- (*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] = stream;
- i++;
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
+ */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
+ continue;
}
+
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
}
/*
*/
void lttng_consumer_cleanup()
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
unsigned int trace_chunks_left;
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (
- the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- consumer_del_channel(channel);
- }
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::node),
+ <tng_consumer_channel::node>(
+ *the_consumer_data.channel_ht->ht)) {
+ consumer_del_channel(channel);
}
lttng_ht_destroy(the_consumer_data.channel_ht);
*/
static void destroy_data_stream_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
if (ht == nullptr) {
return;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_stream(stream, ht);
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
}
lttng_ht_destroy(ht);
*/
static void destroy_metadata_stream_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
if (ht == nullptr) {
return;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_metadata_stream(stream, ht);
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*ht->ht)) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
}
lttng_ht_destroy(ht);
*/
static void validate_endpoint_status_data_stream()
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer delete flagged data stream");
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /* Delete it right now */
- consumer_del_stream(stream, data_ht);
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*data_ht->ht)) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
}
+ /* Delete it right now */
+ consumer_del_stream(stream, data_ht);
}
}
*/
static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
-
DBG("Consumer delete flagged metadata stream");
LTTNG_ASSERT(pollset);
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /*
- * Remove from pollset so the metadata thread can continue without
- * blocking on a deleted stream.
- */
- lttng_poll_del(pollset, stream->wait_fd);
-
- /* Delete it right now */
- consumer_del_metadata_stream(stream, metadata_ht);
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node),
+ <tng_consumer_stream::node>(*metadata_ht->ht)) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
}
+ /*
+ * Remove from pollset so the metadata thread can continue without
+ * blocking on a deleted stream.
+ */
+ lttng_poll_del(pollset, stream->wait_fd);
+
+ /* Delete it right now */
+ consumer_del_metadata_stream(stream, metadata_ht);
}
}
static void destroy_channel_ht(struct lttng_ht *ht)
{
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
- int ret;
-
if (ht == nullptr) {
return;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
- ret = lttng_ht_del(ht, &iter);
- LTTNG_ASSERT(ret != 0);
- }
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::wait_fd_node),
+ <tng_consumer_channel::wait_fd_node>(*ht->ht)) {
+ const auto ret = cds_lfht_del(ht->ht, &channel->node.node);
+ LTTNG_ASSERT(ret != 0);
}
lttng_ht_destroy(ht);
node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
- chan = lttng::utils::container_of(node, <tng_consumer_channel::wait_fd_node);
+ chan = lttng::utils::container_of(node,
+ <tng_consumer_channel::wait_fd_node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
*/
static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
{
- struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd = nullptr;
-
- ASSERT_RCU_READ_LOCKED();
-
/* Iterate over all relayd since they are indexed by net_seq_idx. */
- cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ for (auto *relayd :
+ lttng::urcu::lfht_iteration_adapter<consumer_relayd_sock_pair,
+ decltype(consumer_relayd_sock_pair::node),
+ &consumer_relayd_sock_pair::node>(
+ *the_consumer_data.relayd_ht->ht)) {
/*
* Check by sessiond id which is unique here where the relayd session
* id might not be when having multiple relayd.
*/
if (relayd->sessiond_session_id == id) {
/* Found the relayd. There can be only one per id. */
- goto found;
+ return relayd;
}
}
return nullptr;
-
-found:
- return relayd;
}
/*
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
const char *close_command_name = "none";
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
enum lttng_trace_chunk_status chunk_status;
if (relayd_id) {
* it; it is only kept around to compare it (by address) to the
* current chunk found in the session's channels.
*/
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (
- the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- int ret;
+ for (auto *channel :
+ lttng::urcu::lfht_iteration_adapter<lttng_consumer_channel,
+ decltype(lttng_consumer_channel::node),
+ <tng_consumer_channel::node>(
+ *the_consumer_data.channel_ht->ht)) {
+ int ret;
+ /*
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
+ */
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+ if (ret) {
/*
- * Only change the channel's chunk to NULL if it still
- * references the chunk being closed. The channel may
- * reference a newer channel in the case of a session
- * rotation. When a session rotation occurs, the "next"
- * chunk is created before the "current" chunk is closed.
+ * Attempt to close the chunk on as many channels as
+ * possible.
*/
- if (channel->trace_chunk != chunk) {
- continue;
- }
- ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
- if (ret) {
- /*
- * Attempt to close the chunk on as many channels as
- * possible.
- */
- ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
- }
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
}
}
+
if (relayd_id) {
int ret;
struct consumer_relayd_sock_pair *relayd;