consumerd: consumer.cpp: iterate on list using list_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 02:44:58 +0000 (02:44 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 31 Jul 2024 03:36:52 +0000 (23:36 -0400)
Change-Id: Iad57a1bfdce8b2b3f14702e93f2eceb4ff535f6a
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer.cpp

index 6ed6e216b9ea060227e93eea76bb04382b2a2485..f1f29d4292c4411c28dc2b756c7e6a98bbb3b42c 100644 (file)
@@ -173,12 +173,12 @@ error:
  */
 static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
 {
-       struct lttng_consumer_stream *stream, *stmp;
-
        LTTNG_ASSERT(channel);
 
        /* Delete streams that might have been left in the stream list. */
-       cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                consumer_stream_destroy(stream, nullptr);
        }
 }
@@ -4351,27 +4351,22 @@ error:
 
 static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
 {
-       int ret;
-       struct lttng_consumer_stream *stream;
-
        const lttng::urcu::read_lock_guard read_lock;
-       pthread_mutex_lock(&channel->lock);
-       cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+       const lttng::pthread::lock_guard channel_lock(channel->lock);
+
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
                health_code_update();
-               pthread_mutex_lock(&stream->lock);
-               ret = consumer_clear_stream(stream);
+
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
+               const auto ret = consumer_clear_stream(stream);
                if (ret) {
-                       goto error_unlock;
+                       return ret;
                }
-               pthread_mutex_unlock(&stream->lock);
        }
-       pthread_mutex_unlock(&channel->lock);
-       return 0;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&channel->lock);
-       return ret;
+       return 0;
 }
 
 /*
@@ -5051,71 +5046,60 @@ end:
 
 enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel)
 {
-       struct lttng_consumer_stream *stream;
        enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
 
        if (channel->metadata_stream) {
                ERR("Open channel packets command attempted on a metadata channel");
-               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
-               goto end;
+               return LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-               cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
-                       enum consumer_stream_open_packet_status status;
-
-                       pthread_mutex_lock(&stream->lock);
-                       if (cds_lfht_is_node_deleted(&stream->node.node)) {
-                               goto next;
-                       }
+       const lttng::urcu::read_lock_guard read_lock;
+       for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+                                                              &lttng_consumer_stream::send_node>(
+                    channel->streams.head)) {
+               enum consumer_stream_open_packet_status status;
 
-                       status = consumer_stream_open_packet(stream);
-                       switch (status) {
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                               DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
-                                   ", channel name = %s, session id = %" PRIu64,
-                                   stream->key,
-                                   stream->chan->name,
-                                   stream->chan->session_id);
-                               stream->opened_packet_in_current_trace_chunk = true;
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                               DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
-                                   ", channel name = %s, session id = %" PRIu64,
-                                   stream->key,
-                                   stream->chan->name,
-                                   stream->chan->session_id);
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                               /*
-                                * Only unexpected internal errors can lead to this
-                                * failing. Report an unknown error.
-                                */
-                               ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
-                                   ", channel id = %" PRIu64 ", channel name = %s"
-                                   ", session id = %" PRIu64,
-                                   stream->key,
-                                   channel->key,
-                                   channel->name,
-                                   channel->session_id);
-                               ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
-                               goto error_unlock;
-                       default:
-                               abort();
-                       }
+               const lttng::pthread::lock_guard stream_lock(stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       continue;
+               }
 
-               next:
-                       pthread_mutex_unlock(&stream->lock);
+               status = consumer_stream_open_packet(stream);
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                           stream->key,
+                           stream->chan->name,
+                           stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                           stream->key,
+                           stream->chan->name,
+                           stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /*
+                        * Only unexpected internal errors can lead to this
+                        * failing. Report an unknown error.
+                        */
+                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                           ", channel id = %" PRIu64 ", channel name = %s"
+                           ", session id = %" PRIu64,
+                           stream->key,
+                           channel->key,
+                           channel->name,
+                           channel->session_id);
+                       return LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+               default:
+                       abort();
                }
        }
-end_rcu_unlock:
-end:
-       return ret;
 
-error_unlock:
-       pthread_mutex_unlock(&stream->lock);
-       goto end_rcu_unlock;
+       return ret;
 }
 
 void lttng_consumer_sigbus_handle(void *addr)
This page took 0.028193 seconds and 4 git commands to generate.