From: Jérémie Galarneau Date: Fri, 26 Jul 2024 19:06:54 +0000 (+0000) Subject: relayd: main.cpp: iterate on lfht using lfht_iteration_adapter X-Git-Url: http://git.lttng.org./?a=commitdiff_plain;h=85c34f4cd6eab63cb337c2daea9d57da7100d0df;p=lttng-tools.git relayd: main.cpp: iterate on lfht using lfht_iteration_adapter Change-Id: Ibe839a6733432e2ed464e1785569ed92087cf329 Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 87b623895..b18df2e9d 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -2194,10 +2194,8 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, { int ret; ssize_t send_ret; - struct lttng_ht_iter iter; struct lttcomm_relayd_begin_data_pending msg; struct lttcomm_relayd_generic_reply reply; - struct relay_stream *stream; LTTNG_ASSERT(recv_hdr); LTTNG_ASSERT(conn); @@ -2226,24 +2224,23 @@ static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr, * to iterate over all streams to find the one associated with * the right session_id. */ - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; - } - - if (stream->trace->session->id == msg.session_id) { - pthread_mutex_lock(&stream->lock); - stream->data_pending_check_done = false; - pthread_mutex_unlock(&stream->lock); - DBG("Set begin data pending flag to stream %" PRIu64, - stream->stream_handle); - } + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*relay_streams_ht->ht)) { + if (!stream_get(stream)) { + continue; + } - stream_put(stream); + if (stream->trace->session->id == msg.session_id) { + pthread_mutex_lock(&stream->lock); + stream->data_pending_check_done = false; + pthread_mutex_unlock(&stream->lock); + DBG("Set begin data pending flag to stream %" PRIu64, + stream->stream_handle); } + + stream_put(stream); } memset(&reply, 0, sizeof(reply)); @@ -2277,10 +2274,8 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at { int ret; ssize_t send_ret; - struct lttng_ht_iter iter; struct lttcomm_relayd_end_data_pending msg; struct lttcomm_relayd_generic_reply reply; - struct relay_stream *stream; uint32_t is_data_inflight = 0; DBG("End data pending command"); @@ -2305,48 +2300,47 @@ static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr __at * Iterate over all streams to see if the begin data pending * flag is set. */ - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (!stream_get(stream)) { - continue; - } - - if (stream->trace->session->id != msg.session_id) { - stream_put(stream); - continue; - } + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*relay_streams_ht->ht)) { + if (!stream_get(stream)) { + continue; + } - pthread_mutex_lock(&stream->lock); - if (!stream->data_pending_check_done) { - uint64_t stream_seq; + if (stream->trace->session->id != msg.session_id) { + stream_put(stream); + continue; + } - if (session_streams_have_index(conn->session)) { - /* - * Ensure that both the index and stream data have been - * flushed up to the requested point. - */ - stream_seq = std::min(stream->prev_data_seq, - stream->prev_index_seq); - } else { - stream_seq = stream->prev_data_seq; - } + pthread_mutex_lock(&stream->lock); + if (!stream->data_pending_check_done) { + uint64_t stream_seq; - if (!stream->closed || - !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { - is_data_inflight = 1; - DBG("Data is still in flight for stream %" PRIu64, - stream->stream_handle); - pthread_mutex_unlock(&stream->lock); - stream_put(stream); - break; - } + if (session_streams_have_index(conn->session)) { + /* + * Ensure that both the index and stream data have been + * flushed up to the requested point. + */ + stream_seq = + std::min(stream->prev_data_seq, stream->prev_index_seq); + } else { + stream_seq = stream->prev_data_seq; } - pthread_mutex_unlock(&stream->lock); - stream_put(stream); + if (!stream->closed || + !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) { + is_data_inflight = 1; + DBG("Data is still in flight for stream %" PRIu64, + stream->stream_handle); + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + break; + } } + + pthread_mutex_unlock(&stream->lock); + stream_put(stream); } memset(&reply, 0, sizeof(reply)); @@ -3900,8 +3894,6 @@ static void *relay_thread_worker(void *data __attribute__((unused))) uint32_t nb_fd; struct lttng_poll_event events; struct lttng_ht *relay_connections_ht; - struct lttng_ht_iter iter; - struct relay_connection *destroy_conn = nullptr; DBG("[thread] Relay worker started"); @@ -4173,22 +4165,20 @@ restart: exit: error: /* Cleanup remaining connection object. */ - { - const lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry ( - relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { - health_code_update(); + for (auto *destroy_conn : + lttng::urcu::lfht_iteration_adapter( + *relay_connections_ht->ht)) { + health_code_update(); - session_abort(destroy_conn->session); + session_abort(destroy_conn->session); - /* - * No need to grab another ref, because we own - * destroy_conn. - */ - relay_thread_close_connection( - &events, destroy_conn->sock->fd, destroy_conn); - } + /* + * No need to grab another ref, because we own + * destroy_conn. + */ + relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn); } (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);