sessiond: consumer.cpp: iterate on lfht using lfht_iteration_adapter
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 26 Jul 2024 19:51:14 +0000 (19:51 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 30 Jul 2024 01:26:51 +0000 (01:26 +0000)
Change-Id: Ie970c9da116aa7daa87287366dc4935353152bbb
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-sessiond/consumer.cpp

index 3a25cf5eb2c90515442c552532d0fe40122c9a16..8ab6eb0f3ba5fab8b751d7c6bcebbc9cd7f2ed21 100644 (file)
@@ -286,23 +286,23 @@ error:
  */
 void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
 {
-       struct lttng_ht_iter iter;
-       struct consumer_socket *socket;
-
        LTTNG_ASSERT(consumer);
 
        /* Destroy any relayd connection */
-       if (consumer->type == CONSUMER_DST_NET) {
-               const lttng::urcu::read_lock_guard read_lock;
+       if (consumer->type != CONSUMER_DST_NET) {
+               return;
+       }
 
-               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-                       /* Send destroy relayd command. */
-                       const int ret = consumer_send_destroy_relayd(socket, consumer);
+       for (auto *socket :
+            lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                decltype(consumer_socket::node),
+                                                &consumer_socket::node>(*consumer->socks->ht)) {
+               /* Send destroy relayd command. */
+               const int ret = consumer_send_destroy_relayd(socket, consumer);
 
-                       if (ret < 0) {
-                               DBG("Unable to send destroy relayd command to consumer");
-                               /* Continue since we MUST delete everything at this point. */
-                       }
+               if (ret < 0) {
+                       DBG("Unable to send destroy relayd command to consumer");
+                       /* Continue since we MUST delete everything at this point. */
                }
        }
 }
@@ -536,20 +536,16 @@ error:
  */
 void consumer_destroy_output_sockets(struct consumer_output *obj)
 {
-       struct lttng_ht_iter iter;
-       struct consumer_socket *socket;
-
        if (!obj->socks) {
                return;
        }
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
-                       consumer_del_socket(socket, obj);
-                       consumer_destroy_socket(socket);
-               }
+       for (auto *socket :
+            lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                decltype(consumer_socket::node),
+                                                &consumer_socket::node>(*obj->socks->ht)) {
+               consumer_del_socket(socket, obj);
+               consumer_destroy_socket(socket);
        }
 }
 
@@ -631,38 +627,35 @@ error_put:
 int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src)
 {
        int ret = 0;
-       struct lttng_ht_iter iter;
-       struct consumer_socket *socket, *copy_sock;
 
        LTTNG_ASSERT(dst);
        LTTNG_ASSERT(src);
 
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
-                       /* Ignore socket that are already there. */
-                       copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
-                       if (copy_sock) {
-                               continue;
-                       }
-
-                       /* Create new socket object. */
-                       copy_sock = consumer_allocate_socket(socket->fd_ptr);
-                       if (copy_sock == nullptr) {
-                               ret = -ENOMEM;
-                               goto error;
-                       }
+       for (auto *socket :
+            lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                decltype(consumer_socket::node),
+                                                &consumer_socket::node>(*src->socks->ht)) {
+               /* Ignore socket that are already there. */
+               auto *copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
+               if (copy_sock) {
+                       continue;
+               }
 
-                       copy_sock->registered = socket->registered;
-                       /*
-                        * This is valid because this lock is shared accross all consumer
-                        * object being the global lock of the consumer data structure of the
-                        * session daemon.
-                        */
-                       copy_sock->lock = socket->lock;
-                       consumer_add_socket(copy_sock, dst);
+               /* Create new socket object. */
+               copy_sock = consumer_allocate_socket(socket->fd_ptr);
+               if (copy_sock == nullptr) {
+                       ret = -ENOMEM;
+                       goto error;
                }
+
+               copy_sock->registered = socket->registered;
+               /*
+                * This is valid because this lock is shared accross all consumer
+                * object being the global lock of the consumer data structure of the
+                * session daemon.
+                */
+               copy_sock->lock = socket->lock;
+               consumer_add_socket(copy_sock, dst);
        }
 
 error:
@@ -1258,8 +1251,6 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum
 {
        int ret;
        int32_t ret_code = 0; /* Default is that the data is NOT pending */
-       struct consumer_socket *socket;
-       struct lttng_ht_iter iter;
        struct lttcomm_consumer_msg msg;
 
        LTTNG_ASSERT(consumer);
@@ -1270,33 +1261,31 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum
        msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
        msg.u.data_pending.session_id = session_id;
 
-       {
-               /* Send command for each consumer. */
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-                       pthread_mutex_lock(socket->lock);
-                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
-                       if (ret < 0) {
-                               pthread_mutex_unlock(socket->lock);
-                               goto error_unlock;
-                       }
-
-                       /*
-                        * No need for a recv reply status because the answer to the command is
-                        * the reply status message.
-                        */
-                       ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
-                       if (ret < 0) {
-                               pthread_mutex_unlock(socket->lock);
-                               goto error_unlock;
-                       }
+       for (auto *socket :
+            lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                decltype(consumer_socket::node),
+                                                &consumer_socket::node>(*consumer->socks->ht)) {
+               pthread_mutex_lock(socket->lock);
+               ret = consumer_socket_send(socket, &msg, sizeof(msg));
+               if (ret < 0) {
+                       pthread_mutex_unlock(socket->lock);
+                       goto error_unlock;
+               }
 
+               /*
+                * No need for a recv reply status because the answer to the command is
+                * the reply status message.
+                */
+               ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+               if (ret < 0) {
                        pthread_mutex_unlock(socket->lock);
+                       goto error_unlock;
+               }
 
-                       if (ret_code == 1) {
-                               break;
-                       }
+               pthread_mutex_unlock(socket->lock);
+
+               if (ret_code == 1) {
+                       break;
                }
        }
 
@@ -1568,8 +1557,6 @@ int consumer_get_discarded_events(uint64_t session_id,
                                  uint64_t *discarded)
 {
        int ret;
-       struct consumer_socket *socket;
-       struct lttng_ht_iter iter;
        struct lttcomm_consumer_msg msg;
 
        LTTNG_ASSERT(consumer);
@@ -1584,34 +1571,32 @@ int consumer_get_discarded_events(uint64_t session_id,
        *discarded = 0;
 
        /* Send command for each consumer. */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-                       uint64_t consumer_discarded = 0;
-
-                       pthread_mutex_lock(socket->lock);
-                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
-                       if (ret < 0) {
-                               pthread_mutex_unlock(socket->lock);
-                               goto end;
-                       }
-
-                       /*
-                        * No need for a recv reply status because the answer to the
-                        * command is the reply status message.
-                        */
-                       ret = consumer_socket_recv(
-                               socket, &consumer_discarded, sizeof(consumer_discarded));
-                       if (ret < 0) {
-                               ERR("get discarded events");
-                               pthread_mutex_unlock(socket->lock);
-                               goto end;
-                       }
+       for (auto *socket :
+            lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                decltype(consumer_socket::node),
+                                                &consumer_socket::node>(*consumer->socks->ht)) {
+               uint64_t consumer_discarded = 0;
+
+               pthread_mutex_lock(socket->lock);
+               ret = consumer_socket_send(socket, &msg, sizeof(msg));
+               if (ret < 0) {
+                       pthread_mutex_unlock(socket->lock);
+                       goto end;
+               }
 
+               /*
+                * No need for a recv reply status because the answer to the
+                * command is the reply status message.
+                */
+               ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
+               if (ret < 0) {
+                       ERR("get discarded events");
                        pthread_mutex_unlock(socket->lock);
-                       *discarded += consumer_discarded;
+                       goto end;
                }
+
+               pthread_mutex_unlock(socket->lock);
+               *discarded += consumer_discarded;
        }
 
        ret = 0;
@@ -1630,8 +1615,6 @@ int consumer_get_lost_packets(uint64_t session_id,
                              uint64_t *lost)
 {
        int ret;
-       struct consumer_socket *socket;
-       struct lttng_ht_iter iter;
        struct lttcomm_consumer_msg msg;
 
        LTTNG_ASSERT(consumer);
@@ -1646,31 +1629,30 @@ int consumer_get_lost_packets(uint64_t session_id,
        *lost = 0;
 
        /* Send command for each consumer. */
-       {
-               const lttng::urcu::read_lock_guard read_lock;
-
-               cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
-                       uint64_t consumer_lost = 0;
-                       pthread_mutex_lock(socket->lock);
-                       ret = consumer_socket_send(socket, &msg, sizeof(msg));
-                       if (ret < 0) {
-                               pthread_mutex_unlock(socket->lock);
-                               goto end;
-                       }
+       for (auto *socket :
+            lttng::urcu::lfht_iteration_adapter<consumer_socket,
+                                                decltype(consumer_socket::node),
+                                                &consumer_socket::node>(*consumer->socks->ht)) {
+               uint64_t consumer_lost = 0;
+               pthread_mutex_lock(socket->lock);
+               ret = consumer_socket_send(socket, &msg, sizeof(msg));
+               if (ret < 0) {
+                       pthread_mutex_unlock(socket->lock);
+                       goto end;
+               }
 
-                       /*
-                        * No need for a recv reply status because the answer to the
-                        * command is the reply status message.
-                        */
-                       ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
-                       if (ret < 0) {
-                               ERR("get lost packets");
-                               pthread_mutex_unlock(socket->lock);
-                               goto end;
-                       }
+               /*
+                * No need for a recv reply status because the answer to the
+                * command is the reply status message.
+                */
+               ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
+               if (ret < 0) {
+                       ERR("get lost packets");
                        pthread_mutex_unlock(socket->lock);
-                       *lost += consumer_lost;
+                       goto end;
                }
+               pthread_mutex_unlock(socket->lock);
+               *lost += consumer_lost;
        }
 
        ret = 0;
This page took 0.029422 seconds and 4 git commands to generate.