consumerd: kernel-consumer.cpp: iterate on list using list_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 02:59:29 +0000 (02:59 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 31 Jul 2024 03:36:52 +0000 (23:36 -0400)
Change-Id: I4464898e56cc88baa9a9b2b4c56461d95f342ac0
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/kernel-consumer/kernel-consumer.cpp

index fe360ea694e1d026fba26ee02052d7a46157d14d..f02380fbcddc39368a88373adba462506907753a 100644 (file)
@@ -21,7 +21,9 @@
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/optional.hpp>
 #include <common/pipe.hpp>
+#include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
 #include <common/sessiond-comm/relayd.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/urcu.hpp>
@@ -145,12 +147,11 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                                            uint64_t nb_packets_per_stream)
 {
        int ret;
-       struct lttng_consumer_stream *stream;
 
        DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        /* Prevent channel modifications while we perform the snapshot.*/
-       pthread_mutex_lock(&channel->lock);
+       const lttng::pthread::lock_guard channe_lock(channel->lock);
 
        const lttng::urcu::read_lock_guard read_lock;
 
@@ -162,7 +163,9 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                goto end;
        }
 
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                unsigned long consumed_pos, produced_pos;
 
                health_code_update();
@@ -170,7 +173,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                /*
                 * Lock stream because we are about to change its state.
                 */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
 
                LTTNG_ASSERT(channel->trace_chunk);
                if (!lttng_trace_chunk_get(channel->trace_chunk)) {
@@ -180,7 +183,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                         */
                        ERR("Failed to acquire reference to channel's trace chunk");
                        ret = -1;
-                       goto end_unlock;
+                       goto end;
                }
                LTTNG_ASSERT(!stream->trace_chunk);
                stream->trace_chunk = channel->trace_chunk;
@@ -191,16 +194,21 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                 */
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
+
+               /* Close stream output when were are done. */
+               const auto close_stream_output = lttng::make_scope_exit(
+                       [stream]() noexcept { consumer_stream_close_output(stream); });
+
                if (relayd_id != (uint64_t) -1ULL) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
-                               goto error_close_stream_output;
+                               goto end;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
-                               goto error_close_stream_output;
+                               goto end;
                        }
                        DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
@@ -217,27 +225,27 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                        ret = kernctl_buffer_flush(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Failed to flush kernel stream");
-                               goto error_close_stream_output;
+                               goto end;
                        }
-                       goto end_unlock;
+                       goto end;
                }
 
                ret = lttng_kconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot");
-                       goto error_close_stream_output;
+                       goto end;
                }
 
                ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced kernel snapshot position");
-                       goto error_close_stream_output;
+                       goto end;
                }
 
                ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd kernel snapshot position");
-                       goto error_close_stream_output;
+                       goto end;
                }
 
                consumed_pos = consumer_get_consume_start_pos(
@@ -256,7 +264,7 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
-                                       goto error_close_stream_output;
+                                       goto end;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
@@ -264,21 +272,29 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                                continue;
                        }
 
+                       /* Put the subbuffer once we are done. */
+                       const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+                               const auto put_ret = kernctl_put_subbuf(stream->wait_fd);
+                               if (put_ret < 0) {
+                                       ERR("Snapshot kernctl_put_subbuf");
+                               }
+                       });
+
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
-                               goto error_put_subbuf;
+                               goto end;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
-                               goto error_put_subbuf;
+                               goto end;
                        }
 
                        ret = get_current_subbuf_addr(stream, &subbuf_addr);
                        if (ret) {
-                               goto error_put_subbuf;
+                               goto end;
                        }
 
                        subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
@@ -303,33 +319,15 @@ static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *chann
                                }
                        }
 
-                       ret = kernctl_put_subbuf(stream->wait_fd);
-                       if (ret < 0) {
-                               ERR("Snapshot kernctl_put_subbuf");
-                               goto error_close_stream_output;
-                       }
                        consumed_pos += stream->max_sb_size;
                }
-
-               consumer_stream_close_output(stream);
-               pthread_mutex_unlock(&stream->lock);
        }
 
        /* All good! */
        ret = 0;
        goto end;
 
-error_put_subbuf:
-       ret = kernctl_put_subbuf(stream->wait_fd);
-       if (ret < 0) {
-               ERR("Snapshot kernctl_put_subbuf error path");
-       }
-error_close_stream_output:
-       consumer_stream_close_output(stream);
-end_unlock:
-       pthread_mutex_unlock(&stream->lock);
 end:
-       pthread_mutex_unlock(&channel->lock);
        return ret;
 }
 
This page took 0.029397 seconds and 4 git commands to generate.