consumerd: ust-consumer.cpp: iterate on list using list_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 03:09:39 +0000 (03:09 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 31 Jul 2024 03:36:52 +0000 (23:36 -0400)
Change-Id: I90d06ef8bbe674bc91fc19f489758a8c7c07cad1
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/ust-consumer/ust-consumer.cpp

index 82df2c94548b8b178826e672874a8060a45cabe1..de9838aab14284626a8a281a02afc59f60b95246 100644 (file)
@@ -20,6 +20,7 @@
 #include <common/optional.hpp>
 #include <common/pthread-lock.hpp>
 #include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
 #include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/shm.hpp>
 #include <common/urcu.hpp>
@@ -475,7 +476,6 @@ static int send_channel_to_sessiond_and_relayd(int sock,
                                               int *relayd_error)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-       struct lttng_consumer_stream *stream;
        uint64_t net_seq_idx = -1ULL;
 
        LTTNG_ASSERT(channel);
@@ -485,7 +485,10 @@ static int send_channel_to_sessiond_and_relayd(int sock,
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
        if (channel->relayd_id != (uint64_t) -1ULL) {
-               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+               for (auto stream :
+                    lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                        &lttng_consumer_stream::send_node>(
+                            channel->streams.head)) {
                        health_code_update();
 
                        /* Try to send the stream to the relayd if one is available. */
@@ -531,7 +534,9 @@ static int send_channel_to_sessiond_and_relayd(int sock,
        }
 
        /* The channel was sent successfully to the sessiond at this point. */
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
 
                /* Send stream to session daemon. */
@@ -629,13 +634,14 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
                                  struct lttng_consumer_local_data *ctx)
 {
        int ret = 0;
-       struct lttng_consumer_stream *stream, *stmp;
 
        LTTNG_ASSERT(channel);
        LTTNG_ASSERT(ctx);
 
        /* Send streams to the corresponding thread. */
-       cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
 
                /* Sending the stream to the thread. */
@@ -1051,7 +1057,6 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
        int ret;
        unsigned use_relayd = 0;
        unsigned long consumed_pos, produced_pos;
-       struct lttng_consumer_stream *stream;
 
        LTTNG_ASSERT(path);
        LTTNG_ASSERT(ctx);
@@ -1066,11 +1071,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
        LTTNG_ASSERT(!channel->monitor);
        DBG("UST consumer snapshot channel %" PRIu64, key);
 
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
 
                /* Lock stream because we are about to change its state. */
-               pthread_mutex_lock(&stream->lock);
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
                LTTNG_ASSERT(channel->trace_chunk);
                if (!lttng_trace_chunk_get(channel->trace_chunk)) {
                        /*
@@ -1078,24 +1085,28 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                         * holds a reference to the trace chunk.
                         */
                        ERR("Failed to acquire reference to channel's trace chunk");
-                       ret = -1;
-                       goto error_unlock;
+                       return -1;
                }
                LTTNG_ASSERT(!stream->trace_chunk);
                stream->trace_chunk = channel->trace_chunk;
 
                stream->net_seq_idx = relayd_id;
 
+               /* Close stream output when were are done. */
+               const auto close_stream_output = lttng::make_scope_exit(
+                       [stream]() noexcept { consumer_stream_close_output(stream); });
+
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
-                               goto error_close_stream;
+                               return ret;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream, false);
                        if (ret < 0) {
-                               goto error_close_stream;
+                               return ret;
                        }
+
                        DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
                }
 
@@ -1110,26 +1121,26 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                                    ", channel name = '%s'",
                                    channel->key,
                                    channel->name);
-                               goto error_unlock;
+                               return ret;
                        }
                }
 
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking UST snapshot");
-                       goto error_close_stream;
+                       return ret;
                }
 
                ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced UST snapshot position");
-                       goto error_close_stream;
+                       return ret;
                }
 
                ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd UST snapshot position");
-                       goto error_close_stream;
+                       return ret;
                }
 
                /*
@@ -1155,29 +1166,37 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                        if (ret < 0) {
                                if (ret != -EAGAIN) {
                                        PERROR("lttng_ust_ctl_get_subbuf snapshot");
-                                       goto error_close_stream;
+                                       return ret;
                                }
+
                                DBG("UST consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
                                stream->chan->lost_packets++;
                                continue;
                        }
 
+                       /* Put the subbuffer once we are done. */
+                       const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+                               if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
+                                       ERR("Snapshot lttng_ust_ctl_put_subbuf");
+                               }
+                       });
+
                        ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
                        if (ret < 0) {
                                ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
-                               goto error_put_subbuf;
+                               return ret;
                        }
 
                        ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
-                               goto error_put_subbuf;
+                               return ret;
                        }
 
                        ret = get_current_subbuf_addr(stream, &subbuf_addr);
                        if (ret) {
-                               goto error_put_subbuf;
+                               return ret;
                        }
 
                        subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
@@ -1185,40 +1204,22 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                                stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
-                                       ret = -EPERM;
-                                       goto error_put_subbuf;
+                                       return -EPERM;
                                }
                        } else {
                                if (read_len != padded_len) {
-                                       ret = -EPERM;
-                                       goto error_put_subbuf;
+                                       return -EPERM;
                                }
                        }
 
-                       ret = lttng_ust_ctl_put_subbuf(stream->ustream);
-                       if (ret < 0) {
-                               ERR("Snapshot lttng_ust_ctl_put_subbuf");
-                               goto error_close_stream;
-                       }
                        consumed_pos += stream->max_sb_size;
                }
 
                /* Simply close the stream so we can use it on the next snapshot. */
                consumer_stream_close_output(stream);
-               pthread_mutex_unlock(&stream->lock);
        }
 
        return 0;
-
-error_put_subbuf:
-       if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
-               ERR("Snapshot lttng_ust_ctl_put_subbuf");
-       }
-error_close_stream:
-       consumer_stream_close_output(stream);
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       return ret;
 }
 
 static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)
This page took 0.029312 seconds and 4 git commands to generate.