{
int ret;
ssize_t send_ret;
- struct lttng_ht_iter iter;
struct lttcomm_relayd_begin_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
LTTNG_ASSERT(recv_hdr);
LTTNG_ASSERT(conn);
* to iterate over all streams to find the one associated with
* the right session_id.
*/
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
- if (!stream_get(stream)) {
- continue;
- }
-
- if (stream->trace->session->id == msg.session_id) {
- pthread_mutex_lock(&stream->lock);
- stream->data_pending_check_done = false;
- pthread_mutex_unlock(&stream->lock);
- DBG("Set begin data pending flag to stream %" PRIu64,
- stream->stream_handle);
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<relay_stream,
+ decltype(relay_stream::node),
+ &relay_stream::node>(*relay_streams_ht->ht)) {
+ if (!stream_get(stream)) {
+ continue;
+ }
- stream_put(stream);
+ if (stream->trace->session->id == msg.session_id) {
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = false;
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
}
+
+ stream_put(stream);
}
memset(&reply, 0, sizeof(reply));
{
int ret;
ssize_t send_ret;
- struct lttng_ht_iter iter;
struct lttcomm_relayd_end_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
uint32_t is_data_inflight = 0;
DBG("End data pending command");
* Iterate over all streams to see if the begin data pending
* flag is set.
*/
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
- if (!stream_get(stream)) {
- continue;
- }
-
- if (stream->trace->session->id != msg.session_id) {
- stream_put(stream);
- continue;
- }
+ for (auto *stream :
+ lttng::urcu::lfht_iteration_adapter<relay_stream,
+ decltype(relay_stream::node),
+ &relay_stream::node>(*relay_streams_ht->ht)) {
+ if (!stream_get(stream)) {
+ continue;
+ }
- pthread_mutex_lock(&stream->lock);
- if (!stream->data_pending_check_done) {
- uint64_t stream_seq;
+ if (stream->trace->session->id != msg.session_id) {
+ stream_put(stream);
+ continue;
+ }
- if (session_streams_have_index(conn->session)) {
- /*
- * Ensure that both the index and stream data have been
- * flushed up to the requested point.
- */
- stream_seq = std::min(stream->prev_data_seq,
- stream->prev_index_seq);
- } else {
- stream_seq = stream->prev_data_seq;
- }
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->data_pending_check_done) {
+ uint64_t stream_seq;
- if (!stream->closed ||
- !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
- is_data_inflight = 1;
- DBG("Data is still in flight for stream %" PRIu64,
- stream->stream_handle);
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
- break;
- }
+ if (session_streams_have_index(conn->session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq =
+ std::min(stream->prev_data_seq, stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_data_seq;
}
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
+ if (!stream->closed ||
+ !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ break;
+ }
}
+
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
}
memset(&reply, 0, sizeof(reply));
uint32_t nb_fd;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
- struct lttng_ht_iter iter;
- struct relay_connection *destroy_conn = nullptr;
DBG("[thread] Relay worker started");
exit:
error:
/* Cleanup remaining connection object. */
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (
- relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
- health_code_update();
+ for (auto *destroy_conn :
+ lttng::urcu::lfht_iteration_adapter<relay_connection,
+ decltype(relay_connection::sock_n),
+ &relay_connection::sock_n>(
+ *relay_connections_ht->ht)) {
+ health_code_update();
- session_abort(destroy_conn->session);
+ session_abort(destroy_conn->session);
- /*
- * No need to grab another ref, because we own
- * destroy_conn.
- */
- relay_thread_close_connection(
- &events, destroy_conn->sock->fd, destroy_conn);
- }
+ /*
+ * No need to grab another ref, because we own
+ * destroy_conn.
+ */
+ relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
}
(void) fd_tracker_util_poll_clean(the_fd_tracker, &events);