From: Jérémie Galarneau Date: Fri, 26 Jul 2024 21:54:59 +0000 (+0000) Subject: consumerd: consumer.cpp: iterate on lfht using lfht_iteration_adapter X-Git-Url: http://git.lttng.org./?a=commitdiff_plain;h=c3ade1337c8122ce18a1ffaa181843220f1cbe29;p=lttng-tools.git consumerd: consumer.cpp: iterate on lfht using lfht_iteration_adapter Change-Id: Icecd70d1023847b1489b298e4c04625bac4ebcc9 Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index e242f64b5..c991d622b 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -421,16 +421,12 @@ end: */ static void cleanup_relayd_ht() { - struct lttng_ht_iter iter; - struct consumer_relayd_sock_pair *relayd; - - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry ( - the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - consumer_destroy_relayd(relayd); - } + for (auto *relayd : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.relayd_ht->ht)) { + consumer_destroy_relayd(relayd); } lttng_ht_destroy(the_consumer_data.relayd_ht); @@ -446,15 +442,13 @@ static void cleanup_relayd_ht() static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, enum consumer_endpoint_status status) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); - const lttng::urcu::read_lock_guard read_lock; - /* Let's begin with metadata */ - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*metadata_ht->ht)) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); stream->chan->metadata_pushed_wait_queue.wake_all(); @@ -464,7 +458,10 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, } /* Follow up by the data streams */ - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*data_ht->ht)) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to data stream %d", stream->wait_fd); @@ -1151,8 +1148,6 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, int *nb_inactive_fd) { int i = 0; - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; LTTNG_ASSERT(ctx); LTTNG_ASSERT(ht); @@ -1162,32 +1157,32 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, DBG("Updating poll fd array"); *nb_inactive_fd = 0; - { - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Only active streams with an active end point can be added to the - * poll set and local stream storage of the thread. - * - * There is a potential race here for endpoint_status to be updated - * just after the check. However, this is OK since the stream(s) will - * be deleted once the thread is notified that the end point state has - * changed where this function will be called back again. - * - * We track the number of inactive FDs because they still need to be - * closed by the polling thread after a wakeup on the data_pipe or - * metadata_pipe. - */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { - (*nb_inactive_fd)++; - continue; - } - - (*pollfd)[i].fd = stream->wait_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = stream; - i++; + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + /* + * Only active streams with an active end point can be added to the + * poll set and local stream storage of the thread. + * + * There is a potential race here for endpoint_status to be updated + * just after the check. However, this is OK since the stream(s) will + * be deleted once the thread is notified that the end point state has + * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. + */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; + continue; } + + (*pollfd)[i].fd = stream->wait_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_stream[i] = stream; + i++; } /* @@ -1271,17 +1266,14 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, */ void lttng_consumer_cleanup() { - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; unsigned int trace_chunks_left; - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry ( - the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - consumer_del_channel(channel); - } + for (auto *channel : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.channel_ht->ht)) { + consumer_del_channel(channel); } lttng_ht_destroy(the_consumer_data.channel_ht); @@ -1453,22 +1445,19 @@ error: */ static void destroy_data_stream_ht(struct lttng_ht *ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - if (ht == nullptr) { return; } - { - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); - } + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); } lttng_ht_destroy(ht); @@ -1480,22 +1469,19 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) */ static void destroy_metadata_stream_ht(struct lttng_ht *ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - if (ht == nullptr) { return; } - { - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); - } + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); } lttng_ht_destroy(ht); @@ -2223,22 +2209,18 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) */ static void validate_endpoint_status_data_stream() { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - DBG("Consumer delete flagged data stream"); - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* Delete it right now */ - consumer_del_stream(stream, data_ht); + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*data_ht->ht)) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; } + /* Delete it right now */ + consumer_del_stream(stream, data_ht); } } @@ -2247,29 +2229,26 @@ static void validate_endpoint_status_data_stream() */ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - DBG("Consumer delete flagged metadata stream"); LTTNG_ASSERT(pollset); - { - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* - * Remove from pollset so the metadata thread can continue without - * blocking on a deleted stream. - */ - lttng_poll_del(pollset, stream->wait_fd); - - /* Delete it right now */ - consumer_del_metadata_stream(stream, metadata_ht); + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*metadata_ht->ht)) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; } + /* + * Remove from pollset so the metadata thread can continue without + * blocking on a deleted stream. + */ + lttng_poll_del(pollset, stream->wait_fd); + + /* Delete it right now */ + consumer_del_metadata_stream(stream, metadata_ht); } } @@ -2850,21 +2829,16 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe static void destroy_channel_ht(struct lttng_ht *ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; - int ret; - if (ht == nullptr) { return; } - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { - ret = lttng_ht_del(ht, &iter); - LTTNG_ASSERT(ret != 0); - } + for (auto *channel : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + const auto ret = cds_lfht_del(ht->ht, &channel->node.node); + LTTNG_ASSERT(ret != 0); } lttng_ht_destroy(ht); @@ -3070,7 +3044,8 @@ void *consumer_thread_channel_poll(void *data) node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(node); - chan = lttng::utils::container_of(node, <tng_consumer_channel::wait_fd_node); + chan = lttng::utils::container_of(node, + <tng_consumer_channel::wait_fd_node); /* Check for error event */ if (revents & (LPOLLERR | LPOLLHUP)) { @@ -3699,27 +3674,23 @@ error_nosignal: */ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) { - struct lttng_ht_iter iter; - struct consumer_relayd_sock_pair *relayd = nullptr; - - ASSERT_RCU_READ_LOCKED(); - /* Iterate over all relayd since they are indexed by net_seq_idx. */ - cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { + for (auto *relayd : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.relayd_ht->ht)) { /* * Check by sessiond id which is unique here where the relayd session * id might not be when having multiple relayd. */ if (relayd->sessiond_session_id == id) { /* Found the relayd. There can be only one per id. */ - goto found; + return relayd; } } return nullptr; - -found: - return relayd; } /* @@ -4898,8 +4869,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; const char *close_command_name = "none"; - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; enum lttng_trace_chunk_status chunk_status; if (relayd_id) { @@ -4953,32 +4922,33 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, * it; it is only kept around to compare it (by address) to the * current chunk found in the session's channels. */ - { - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry ( - the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - int ret; + for (auto *channel : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.channel_ht->ht)) { + int ret; + /* + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. + */ + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); + if (ret) { /* - * Only change the channel's chunk to NULL if it still - * references the chunk being closed. The channel may - * reference a newer channel in the case of a session - * rotation. When a session rotation occurs, the "next" - * chunk is created before the "current" chunk is closed. + * Attempt to close the chunk on as many channels as + * possible. */ - if (channel->trace_chunk != chunk) { - continue; - } - ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); - if (ret) { - /* - * Attempt to close the chunk on as many channels as - * possible. - */ - ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; - } + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; } } + if (relayd_id) { int ret; struct consumer_relayd_sock_pair *relayd;