From: Jérémie Galarneau Date: Tue, 30 Jul 2024 02:44:58 +0000 (+0000) Subject: consumerd: consumer.cpp: iterate on list using list_iteration_adapter X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=a1a1df6511d6e53b3a2ffe0cef982214ac86faf7;p=lttng-tools.git consumerd: consumer.cpp: iterate on list using list_iteration_adapter Change-Id: Iad57a1bfdce8b2b3f14702e93f2eceb4ff535f6a Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 6ed6e216b..f1f29d429 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -173,12 +173,12 @@ error: */ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) { - struct lttng_consumer_stream *stream, *stmp; - LTTNG_ASSERT(channel); /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { consumer_stream_destroy(stream, nullptr); } } @@ -4351,27 +4351,22 @@ error: static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel) { - int ret; - struct lttng_consumer_stream *stream; - const lttng::urcu::read_lock_guard read_lock; - pthread_mutex_lock(&channel->lock); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + const lttng::pthread::lock_guard channel_lock(channel->lock); + + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); - pthread_mutex_lock(&stream->lock); - ret = consumer_clear_stream(stream); + + const lttng::pthread::lock_guard stream_lock(stream->lock); + const auto ret = consumer_clear_stream(stream); if (ret) { - goto error_unlock; + return ret; } - pthread_mutex_unlock(&stream->lock); } - pthread_mutex_unlock(&channel->lock); - return 0; -error_unlock: - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&channel->lock); - return ret; + return 0; } /* @@ -5051,71 +5046,60 @@ end: enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel) { - struct lttng_consumer_stream *stream; enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS; if (channel->metadata_stream) { ERR("Open channel packets command attempted on a metadata channel"); - ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS; - goto end; + return LTTCOMM_CONSUMERD_INVALID_PARAMETERS; } - { - const lttng::urcu::read_lock_guard read_lock; - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { - enum consumer_stream_open_packet_status status; - - pthread_mutex_lock(&stream->lock); - if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; - } + const lttng::urcu::read_lock_guard read_lock; + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { + enum consumer_stream_open_packet_status status; - status = consumer_stream_open_packet(stream); - switch (status) { - case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = true; - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: - DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: - /* - * Only unexpected internal errors can lead to this - * failing. Report an unknown error. - */ - ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 - ", channel id = %" PRIu64 ", channel name = %s" - ", session id = %" PRIu64, - stream->key, - channel->key, - channel->name, - channel->session_id); - ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; - goto error_unlock; - default: - abort(); - } + const lttng::pthread::lock_guard stream_lock(stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + continue; + } - next: - pthread_mutex_unlock(&stream->lock); + status = consumer_stream_open_packet(stream); + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 ", channel name = %s" + ", session id = %" PRIu64, + stream->key, + channel->key, + channel->name, + channel->session_id); + return LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + default: + abort(); } } -end_rcu_unlock: -end: - return ret; -error_unlock: - pthread_mutex_unlock(&stream->lock); - goto end_rcu_unlock; + return ret; } void lttng_consumer_sigbus_handle(void *addr)