From: Jérémie Galarneau Date: Tue, 30 Jul 2024 01:26:10 +0000 (+0000) Subject: consumerd: consumer.cpp: iterate on lfht using lfht_filtered_iteration_adapter X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=3d46ea1aaacdbd331adacaf3acbc0d5e2d08de64;p=lttng-tools.git consumerd: consumer.cpp: iterate on lfht using lfht_filtered_iteration_adapter Change-Id: I6f7e66d8446e1ebc05679a61fd190053f252902f Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index c991d622b..6ed6e216b 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -2779,21 +2780,16 @@ error_testpoint: */ static void consumer_close_channel_streams(struct lttng_consumer_channel *channel) { - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - - ht = the_consumer_data.stream_per_chan_id_ht; - - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + const auto ht = the_consumer_data.stream_per_chan_id_ht; + + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { /* * Protect against teardown with mutex. */ @@ -3702,16 +3698,13 @@ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) int consumer_data_pending(uint64_t id) { int ret; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; + const auto ht = the_consumer_data.stream_list_ht; struct consumer_relayd_sock_pair *relayd = nullptr; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); - const lttng::urcu::read_lock_guard read_lock; - pthread_mutex_lock(&the_consumer_data.lock); + const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock); switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -3726,18 +3719,12 @@ int consumer_data_pending(uint64_t id) abort(); } - /* Ease our life a bit */ - ht = the_consumer_data.stream_list_ht; - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) - { - pthread_mutex_lock(&stream->lock); + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_session_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) { + const lttng::pthread::lock_guard stream_lock(stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3750,35 +3737,30 @@ int consumer_data_pending(uint64_t id) /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) { - pthread_mutex_unlock(&stream->lock); goto data_pending; } } - - pthread_mutex_unlock(&stream->lock); } relayd = find_relayd_by_session_id(id); if (relayd) { unsigned int is_data_inflight = 0; + const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex); + /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id); if (ret < 0) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); /* Communication error thus the relayd so no data pending. */ goto data_not_pending; } - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_session_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>( + *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3789,13 +3771,11 @@ int consumer_data_pending(uint64_t id) } if (ret == 1) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_pending; } else if (ret < 0) { ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_not_pending; } } @@ -3803,7 +3783,6 @@ int consumer_data_pending(uint64_t id) /* Send end command for data pending. */ ret = relayd_end_data_pending( &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); @@ -3823,12 +3802,10 @@ int consumer_data_pending(uint64_t id) data_not_pending: /* Data is available to be read by a viewer. */ - pthread_mutex_unlock(&the_consumer_data.lock); return 0; data_pending: /* Data is still being extracted from buffers. */ - pthread_mutex_unlock(&the_consumer_data.lock); return 1; } @@ -3933,9 +3910,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, uint64_t relayd_id) { int ret; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + const auto ht = the_consumer_data.stream_per_chan_id_ht; struct lttng_dynamic_array stream_rotation_positions; uint64_t next_chunk_id, stream_count = 0; enum lttng_trace_chunk_status chunk_status; @@ -3944,7 +3919,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, bool rotating_to_new_chunk = true; /* Array of `struct lttng_consumer_stream *` */ struct lttng_dynamic_pointer_array streams_packet_to_open; - size_t stream_idx; ASSERT_RCU_READ_LOCKED(); @@ -3955,24 +3929,23 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, nullptr); lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr); - const lttng::urcu::read_lock_guard read_lock; + const lttng::pthread::lock_guard channel_lock(channel->lock); - pthread_mutex_lock(&channel->lock); LTTNG_ASSERT(channel->trace_chunk); chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; - goto end_unlock_channel; + goto end; } - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { unsigned long produced_pos = 0, consumed_pos = 0; health_code_update(); @@ -3980,7 +3953,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, /* * Lock stream because we are about to change its state. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); if (stream->trace_chunk == stream->chan->trace_chunk) { rotating_to_new_chunk = false; @@ -4034,7 +4007,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ret = sample_stream_positions( stream, &produced_pos, &consumed_pos); if (ret) { - goto end_unlock_stream; + goto end; } /* @@ -4083,26 +4056,26 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, if (ret < 0) { ERR("Failed to flush stream %" PRIu64 " during channel rotation", stream->key); - goto end_unlock_stream; + goto end; } } ret = lttng_consumer_take_snapshot(stream); if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) { ERR("Failed to sample snapshot position during channel rotation"); - goto end_unlock_stream; + goto end; } if (!ret) { ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Failed to sample produced position during channel rotation"); - goto end_unlock_stream; + goto end; } ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Failed to sample consumed position during channel rotation"); - goto end_unlock_stream; + goto end; } } /* @@ -4142,7 +4115,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable", stream->key); ret = -1; - goto end_unlock_stream; + goto end; } stream->rotate_position = stream->last_sequence_number + 1 + ((produced_pos - consumed_pos) / stream->max_sb_size); @@ -4165,7 +4138,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, &position); if (ret) { ERR("Failed to allocate stream rotation position"); - goto end_unlock_stream; + goto end; } stream_count++; } @@ -4227,20 +4200,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, if (ret) { PERROR("Failed to add a stream pointer to array of streams in which to open a packet"); ret = -1; - goto end_unlock_stream; + goto end; } } - - pthread_mutex_unlock(&stream->lock); } - stream = nullptr; if (!is_local_trace) { relayd = consumer_find_relayd(relayd_id); if (!relayd) { ERR("Failed to find relayd %" PRIu64, relayd_id); ret = -1; - goto end_unlock_channel; + goto end; } pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -4254,21 +4224,19 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64, relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - goto end_unlock_channel; + goto end; } } - for (stream_idx = 0; + for (std::size_t stream_idx = 0; stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open); stream_idx++) { enum consumer_stream_open_packet_status status; - - stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer( + auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer( &streams_packet_to_open, stream_idx); - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); status = consumer_stream_open_packet(stream); - pthread_mutex_unlock(&stream->lock); switch (status) { case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: DBG("Opened a packet after a rotation: stream id = %" PRIu64 @@ -4292,20 +4260,13 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: /* Logged by callee. */ ret = -1; - goto end_unlock_channel; + goto end; default: abort(); } } - pthread_mutex_unlock(&channel->lock); ret = 0; - goto end; - -end_unlock_stream: - pthread_mutex_unlock(&stream->lock); -end_unlock_channel: - pthread_mutex_unlock(&channel->lock); end: lttng_dynamic_array_reset(&stream_rotation_positions); lttng_dynamic_pointer_array_reset(&streams_packet_to_open); @@ -4599,39 +4560,31 @@ error: int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key) { int ret; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + const auto ht = the_consumer_data.stream_per_chan_id_ht; ASSERT_RCU_READ_LOCKED(); - const lttng::urcu::read_lock_guard read_lock; - DBG("Consumer rotate ready streams in channel %" PRIu64, key); - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { health_code_update(); - pthread_mutex_lock(&stream->chan->lock); - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard channel_lock(stream->chan->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); if (!stream->rotate_ready) { - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); continue; } - DBG("Consumer rotate ready stream %" PRIu64, stream->key); + DBG("Consumer rotate ready stream %" PRIu64, stream->key); ret = lttng_consumer_rotate_stream(stream); - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); if (ret) { goto end; } @@ -4680,8 +4633,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, char creation_timestamp_buffer[ISO8601_STR_LEN]; const char *relayd_id_str = "(none)"; const char *creation_timestamp_str; - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; if (relayd_id) { /* Only used for logging purposes. */ @@ -4768,50 +4719,45 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, goto error; } - { - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry_duplicate( - the_consumer_data.channels_by_session_id_ht->ht, - the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, - lttng_ht_seed), - the_consumer_data.channels_by_session_id_ht->match_fct, - &session_id, - &iter.iter, - channel, - channels_by_session_id_ht_node.node) - { - ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); - if (ret) { - /* - * Roll-back the creation of this chunk. - * - * This is important since the session daemon will - * assume that the creation of this chunk failed and - * will never ask for it to be closed, resulting - * in a leak and an inconsistent state for some - * channels. - */ - enum lttcomm_return_code close_ret; - char path[LTTNG_PATH_MAX]; - - DBG("Failed to set new trace chunk on existing channels, rolling back"); - close_ret = - lttng_consumer_close_trace_chunk(relayd_id, - session_id, - chunk_id, - chunk_creation_timestamp, - nullptr, - path); - if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { - ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 - ", chunk_id = %" PRIu64, - session_id, - chunk_id); - } + for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_channel, + decltype(lttng_consumer_channel::channels_by_session_id_ht_node), + <tng_consumer_channel::channels_by_session_id_ht_node, + std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht, + &session_id, + the_consumer_data.channels_by_session_id_ht->hash_fct( + &session_id, lttng_ht_seed), + the_consumer_data.channels_by_session_id_ht->match_fct)) { + ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; - ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - break; + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp, + nullptr, + path); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, + chunk_id); } + + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; } } @@ -5049,41 +4995,32 @@ end: static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel) { - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; int ret; - - ht = the_consumer_data.stream_per_chan_id_ht; - - const lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + const auto ht = the_consumer_data.stream_per_chan_id_ht; + + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { /* * Protect against teardown with mutex. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; + continue; } + ret = consumer_clear_stream(stream); if (ret) { - goto error_unlock; + return ret; } - next: - pthread_mutex_unlock(&stream->lock); } - return LTTCOMM_CONSUMERD_SUCCESS; -error_unlock: - pthread_mutex_unlock(&stream->lock); - return ret; + return LTTCOMM_CONSUMERD_SUCCESS; } int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)