From: Jérémie Galarneau Date: Tue, 30 Jul 2024 03:09:39 +0000 (+0000) Subject: consumerd: ust-consumer.cpp: iterate on list using list_iteration_adapter X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=7900c20a8a4035b73c42b669e869107426de5ed3;p=lttng-tools.git consumerd: ust-consumer.cpp: iterate on list using list_iteration_adapter Change-Id: I90d06ef8bbe674bc91fc19f489758a8c7c07cad1 Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 82df2c945..de9838aab 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -475,7 +476,6 @@ static int send_channel_to_sessiond_and_relayd(int sock, int *relayd_error) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; - struct lttng_consumer_stream *stream; uint64_t net_seq_idx = -1ULL; LTTNG_ASSERT(channel); @@ -485,7 +485,10 @@ static int send_channel_to_sessiond_and_relayd(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); if (channel->relayd_id != (uint64_t) -1ULL) { - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : + lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Try to send the stream to the relayd if one is available. */ @@ -531,7 +534,9 @@ static int send_channel_to_sessiond_and_relayd(int sock, } /* The channel was sent successfully to the sessiond at this point. */ - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Send stream to session daemon. */ @@ -629,13 +634,14 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, struct lttng_consumer_local_data *ctx) { int ret = 0; - struct lttng_consumer_stream *stream, *stmp; LTTNG_ASSERT(channel); LTTNG_ASSERT(ctx); /* Send streams to the corresponding thread. */ - cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Sending the stream to the thread. */ @@ -1051,7 +1057,6 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, int ret; unsigned use_relayd = 0; unsigned long consumed_pos, produced_pos; - struct lttng_consumer_stream *stream; LTTNG_ASSERT(path); LTTNG_ASSERT(ctx); @@ -1066,11 +1071,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(!channel->monitor); DBG("UST consumer snapshot channel %" PRIu64, key); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); /* Lock stream because we are about to change its state. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); LTTNG_ASSERT(channel->trace_chunk); if (!lttng_trace_chunk_get(channel->trace_chunk)) { /* @@ -1078,24 +1085,28 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, * holds a reference to the trace chunk. */ ERR("Failed to acquire reference to channel's trace chunk"); - ret = -1; - goto error_unlock; + return -1; } LTTNG_ASSERT(!stream->trace_chunk); stream->trace_chunk = channel->trace_chunk; stream->net_seq_idx = relayd_id; + /* Close stream output when were are done. */ + const auto close_stream_output = lttng::make_scope_exit( + [stream]() noexcept { consumer_stream_close_output(stream); }); + if (use_relayd) { ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { - goto error_close_stream; + return ret; } } else { ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto error_close_stream; + return ret; } + DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key); } @@ -1110,26 +1121,26 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, ", channel name = '%s'", channel->key, channel->name); - goto error_unlock; + return ret; } } ret = lttng_ustconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking UST snapshot"); - goto error_close_stream; + return ret; } ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced UST snapshot position"); - goto error_close_stream; + return ret; } ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd UST snapshot position"); - goto error_close_stream; + return ret; } /* @@ -1155,29 +1166,37 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, if (ret < 0) { if (ret != -EAGAIN) { PERROR("lttng_ust_ctl_get_subbuf snapshot"); - goto error_close_stream; + return ret; } + DBG("UST consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; stream->chan->lost_packets++; continue; } + /* Put the subbuffer once we are done. */ + const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept { + if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) { + ERR("Snapshot lttng_ust_ctl_put_subbuf"); + } + }); + ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len); if (ret < 0) { ERR("Snapshot lttng_ust_ctl_get_subbuf_size"); - goto error_put_subbuf; + return ret; } ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len); if (ret < 0) { ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size"); - goto error_put_subbuf; + return ret; } ret = get_current_subbuf_addr(stream, &subbuf_addr); if (ret) { - goto error_put_subbuf; + return ret; } subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); @@ -1185,40 +1204,22 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { - ret = -EPERM; - goto error_put_subbuf; + return -EPERM; } } else { if (read_len != padded_len) { - ret = -EPERM; - goto error_put_subbuf; + return -EPERM; } } - ret = lttng_ust_ctl_put_subbuf(stream->ustream); - if (ret < 0) { - ERR("Snapshot lttng_ust_ctl_put_subbuf"); - goto error_close_stream; - } consumed_pos += stream->max_sb_size; } /* Simply close the stream so we can use it on the next snapshot. */ consumer_stream_close_output(stream); - pthread_mutex_unlock(&stream->lock); } return 0; - -error_put_subbuf: - if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) { - ERR("Snapshot lttng_ust_ctl_put_subbuf"); - } -error_close_stream: - consumer_stream_close_output(stream); -error_unlock: - pthread_mutex_unlock(&stream->lock); - return ret; } static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)