From 63b052698bbc604240f0c4fc2acd43e6d7b9f2b9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Fri, 26 Jul 2024 19:51:14 +0000 Subject: [PATCH] sessiond: consumer.cpp: iterate on lfht using lfht_iteration_adapter MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Change-Id: Ie970c9da116aa7daa87287366dc4935353152bbb Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/consumer.cpp | 232 +++++++++++++--------------- 1 file changed, 107 insertions(+), 125 deletions(-) diff --git a/src/bin/lttng-sessiond/consumer.cpp b/src/bin/lttng-sessiond/consumer.cpp index 3a25cf5eb..8ab6eb0f3 100644 --- a/src/bin/lttng-sessiond/consumer.cpp +++ b/src/bin/lttng-sessiond/consumer.cpp @@ -286,23 +286,23 @@ error: */ void consumer_output_send_destroy_relayd(struct consumer_output *consumer) { - struct lttng_ht_iter iter; - struct consumer_socket *socket; - LTTNG_ASSERT(consumer); /* Destroy any relayd connection */ - if (consumer->type == CONSUMER_DST_NET) { - const lttng::urcu::read_lock_guard read_lock; + if (consumer->type != CONSUMER_DST_NET) { + return; + } - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - /* Send destroy relayd command. */ - const int ret = consumer_send_destroy_relayd(socket, consumer); + for (auto *socket : + lttng::urcu::lfht_iteration_adapter(*consumer->socks->ht)) { + /* Send destroy relayd command. */ + const int ret = consumer_send_destroy_relayd(socket, consumer); - if (ret < 0) { - DBG("Unable to send destroy relayd command to consumer"); - /* Continue since we MUST delete everything at this point. */ - } + if (ret < 0) { + DBG("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ } } } @@ -536,20 +536,16 @@ error: */ void consumer_destroy_output_sockets(struct consumer_output *obj) { - struct lttng_ht_iter iter; - struct consumer_socket *socket; - if (!obj->socks) { return; } - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) { - consumer_del_socket(socket, obj); - consumer_destroy_socket(socket); - } + for (auto *socket : + lttng::urcu::lfht_iteration_adapter(*obj->socks->ht)) { + consumer_del_socket(socket, obj); + consumer_destroy_socket(socket); } } @@ -631,38 +627,35 @@ error_put: int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src) { int ret = 0; - struct lttng_ht_iter iter; - struct consumer_socket *socket, *copy_sock; LTTNG_ASSERT(dst); LTTNG_ASSERT(src); - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) { - /* Ignore socket that are already there. */ - copy_sock = consumer_find_socket(*socket->fd_ptr, dst); - if (copy_sock) { - continue; - } - - /* Create new socket object. */ - copy_sock = consumer_allocate_socket(socket->fd_ptr); - if (copy_sock == nullptr) { - ret = -ENOMEM; - goto error; - } + for (auto *socket : + lttng::urcu::lfht_iteration_adapter(*src->socks->ht)) { + /* Ignore socket that are already there. */ + auto *copy_sock = consumer_find_socket(*socket->fd_ptr, dst); + if (copy_sock) { + continue; + } - copy_sock->registered = socket->registered; - /* - * This is valid because this lock is shared accross all consumer - * object being the global lock of the consumer data structure of the - * session daemon. - */ - copy_sock->lock = socket->lock; - consumer_add_socket(copy_sock, dst); + /* Create new socket object. */ + copy_sock = consumer_allocate_socket(socket->fd_ptr); + if (copy_sock == nullptr) { + ret = -ENOMEM; + goto error; } + + copy_sock->registered = socket->registered; + /* + * This is valid because this lock is shared accross all consumer + * object being the global lock of the consumer data structure of the + * session daemon. + */ + copy_sock->lock = socket->lock; + consumer_add_socket(copy_sock, dst); } error: @@ -1258,8 +1251,6 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum { int ret; int32_t ret_code = 0; /* Default is that the data is NOT pending */ - struct consumer_socket *socket; - struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; LTTNG_ASSERT(consumer); @@ -1270,33 +1261,31 @@ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consum msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING; msg.u.data_pending.session_id = session_id; - { - /* Send command for each consumer. */ - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto error_unlock; - } - - /* - * No need for a recv reply status because the answer to the command is - * the reply status message. - */ - ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto error_unlock; - } + for (auto *socket : + lttng::urcu::lfht_iteration_adapter(*consumer->socks->ht)) { + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto error_unlock; + } + /* + * No need for a recv reply status because the answer to the command is + * the reply status message. + */ + ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); + if (ret < 0) { pthread_mutex_unlock(socket->lock); + goto error_unlock; + } - if (ret_code == 1) { - break; - } + pthread_mutex_unlock(socket->lock); + + if (ret_code == 1) { + break; } } @@ -1568,8 +1557,6 @@ int consumer_get_discarded_events(uint64_t session_id, uint64_t *discarded) { int ret; - struct consumer_socket *socket; - struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; LTTNG_ASSERT(consumer); @@ -1584,34 +1571,32 @@ int consumer_get_discarded_events(uint64_t session_id, *discarded = 0; /* Send command for each consumer. */ - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - uint64_t consumer_discarded = 0; - - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } - - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv( - socket, &consumer_discarded, sizeof(consumer_discarded)); - if (ret < 0) { - ERR("get discarded events"); - pthread_mutex_unlock(socket->lock); - goto end; - } + for (auto *socket : + lttng::urcu::lfht_iteration_adapter(*consumer->socks->ht)) { + uint64_t consumer_discarded = 0; + + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); pthread_mutex_unlock(socket->lock); - *discarded += consumer_discarded; + goto end; } + + pthread_mutex_unlock(socket->lock); + *discarded += consumer_discarded; } ret = 0; @@ -1630,8 +1615,6 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t *lost) { int ret; - struct consumer_socket *socket; - struct lttng_ht_iter iter; struct lttcomm_consumer_msg msg; LTTNG_ASSERT(consumer); @@ -1646,31 +1629,30 @@ int consumer_get_lost_packets(uint64_t session_id, *lost = 0; /* Send command for each consumer. */ - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) { - uint64_t consumer_lost = 0; - pthread_mutex_lock(socket->lock); - ret = consumer_socket_send(socket, &msg, sizeof(msg)); - if (ret < 0) { - pthread_mutex_unlock(socket->lock); - goto end; - } + for (auto *socket : + lttng::urcu::lfht_iteration_adapter(*consumer->socks->ht)) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } - /* - * No need for a recv reply status because the answer to the - * command is the reply status message. - */ - ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); - if (ret < 0) { - ERR("get lost packets"); - pthread_mutex_unlock(socket->lock); - goto end; - } + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); pthread_mutex_unlock(socket->lock); - *lost += consumer_lost; + goto end; } + pthread_mutex_unlock(socket->lock); + *lost += consumer_lost; } ret = 0; -- 2.34.1