From: Jérémie Galarneau Date: Tue, 30 Jul 2024 02:59:29 +0000 (+0000) Subject: consumerd: kernel-consumer.cpp: iterate on list using list_iteration_adapter X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=1e407985205df2fb91e369a0585f9715a5b1b125;p=lttng-tools.git consumerd: kernel-consumer.cpp: iterate on list using list_iteration_adapter Change-Id: I4464898e56cc88baa9a9b2b4c56461d95f342ac0 Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/kernel-consumer/kernel-consumer.cpp b/src/common/kernel-consumer/kernel-consumer.cpp index fe360ea69..f02380fbc 100644 --- a/src/common/kernel-consumer/kernel-consumer.cpp +++ b/src/common/kernel-consumer/kernel-consumer.cpp @@ -21,7 +21,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -145,12 +147,11 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann uint64_t nb_packets_per_stream) { int ret; - struct lttng_consumer_stream *stream; DBG("Kernel consumer snapshot channel %" PRIu64, key); /* Prevent channel modifications while we perform the snapshot.*/ - pthread_mutex_lock(&channel->lock); + const lttng::pthread::lock_guard channe_lock(channel->lock); const lttng::urcu::read_lock_guard read_lock; @@ -162,7 +163,9 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann goto end; } - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { unsigned long consumed_pos, produced_pos; health_code_update(); @@ -170,7 +173,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann /* * Lock stream because we are about to change its state. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); LTTNG_ASSERT(channel->trace_chunk); if (!lttng_trace_chunk_get(channel->trace_chunk)) { @@ -180,7 +183,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann */ ERR("Failed to acquire reference to channel's trace chunk"); ret = -1; - goto end_unlock; + goto end; } LTTNG_ASSERT(!stream->trace_chunk); stream->trace_chunk = channel->trace_chunk; @@ -191,16 +194,21 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann */ stream->net_seq_idx = relayd_id; channel->relayd_id = relayd_id; + + /* Close stream output when were are done. */ + const auto close_stream_output = lttng::make_scope_exit( + [stream]() noexcept { consumer_stream_close_output(stream); }); + if (relayd_id != (uint64_t) -1ULL) { ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); - goto error_close_stream_output; + goto end; } } else { ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto error_close_stream_output; + goto end; } DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key); } @@ -217,27 +225,27 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); - goto error_close_stream_output; + goto end; } - goto end_unlock; + goto end; } ret = lttng_kconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking kernel snapshot"); - goto error_close_stream_output; + goto end; } ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced kernel snapshot position"); - goto error_close_stream_output; + goto end; } ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd kernel snapshot position"); - goto error_close_stream_output; + goto end; } consumed_pos = consumer_get_consume_start_pos( @@ -256,7 +264,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann if (ret < 0) { if (ret != -EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); - goto error_close_stream_output; + goto end; } DBG("Kernel consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; @@ -264,21 +272,29 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann continue; } + /* Put the subbuffer once we are done. */ + const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept { + const auto put_ret = kernctl_put_subbuf(stream->wait_fd); + if (put_ret < 0) { + ERR("Snapshot kernctl_put_subbuf"); + } + }); + ret = kernctl_get_subbuf_size(stream->wait_fd, &len); if (ret < 0) { ERR("Snapshot kernctl_get_subbuf_size"); - goto error_put_subbuf; + goto end; } ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); if (ret < 0) { ERR("Snapshot kernctl_get_padded_subbuf_size"); - goto error_put_subbuf; + goto end; } ret = get_current_subbuf_addr(stream, &subbuf_addr); if (ret) { - goto error_put_subbuf; + goto end; } subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len); @@ -303,33 +319,15 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann } } - ret = kernctl_put_subbuf(stream->wait_fd); - if (ret < 0) { - ERR("Snapshot kernctl_put_subbuf"); - goto error_close_stream_output; - } consumed_pos += stream->max_sb_size; } - - consumer_stream_close_output(stream); - pthread_mutex_unlock(&stream->lock); } /* All good! */ ret = 0; goto end; -error_put_subbuf: - ret = kernctl_put_subbuf(stream->wait_fd); - if (ret < 0) { - ERR("Snapshot kernctl_put_subbuf error path"); - } -error_close_stream_output: - consumer_stream_close_output(stream); -end_unlock: - pthread_mutex_unlock(&stream->lock); end: - pthread_mutex_unlock(&channel->lock); return ret; }