#include <common/fs-handle.hpp>
#include <common/futex.hpp>
#include <common/index/index.hpp>
+#include <common/make-unique-wrapper.hpp>
+#include <common/pthread-lock.hpp>
#include <common/sessiond-comm/inet.hpp>
#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
*/
static int check_new_streams(struct relay_connection *conn)
{
- struct relay_session *session;
int ret = 0;
if (!conn->viewer_session) {
goto end;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_list_for_each_entry_rcu(
- session, &conn->viewer_session->session_list, viewer_session_node)
- {
- if (!session_get(session)) {
- continue;
- }
+ for (auto *session :
+ lttng::urcu::rcu_list_iteration_adapter<relay_session,
+ &relay_session::viewer_session_node>(
+ conn->viewer_session->session_list)) {
+ if (!session_get(session)) {
+ continue;
+ }
- ret = uatomic_read(&session->new_streams);
- session_put(session);
- if (ret == 1) {
- goto end;
- }
+ ret = uatomic_read(&session->new_streams);
+ session_put(session);
+ if (ret == 1) {
+ goto end;
}
}
bool *closed)
{
int ret;
- struct relay_stream *relay_stream = nullptr;
LTTNG_ASSERT(relay_session);
ASSERT_LOCKED(relay_session->lock);
* Create viewer streams for relay streams that are ready to be
* used for a the given session id only.
*/
- for (auto *ctf_trace : lttng::urcu::
+ for (auto *raw_ctf_trace : lttng::urcu::
lfht_iteration_adapter<ctf_trace, decltype(ctf_trace::node), &ctf_trace::node>(
*relay_session->ctf_traces_ht->ht)) {
bool trace_has_metadata_stream = false;
health_code_update();
- if (!ctf_trace_get(ctf_trace)) {
+ if (!ctf_trace_get(raw_ctf_trace)) {
continue;
}
+ auto ctf_trace =
+ lttng::make_unique_wrapper<struct ctf_trace, ctf_trace_put>(raw_ctf_trace);
+
/*
* Iterate over all the streams of the trace to see if we have a
* metadata stream.
*/
- cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
- {
+ for (auto *stream :
+ lttng::urcu::rcu_list_iteration_adapter<relay_stream,
+ &relay_stream::stream_node>(
+ ctf_trace->stream_list)) {
bool is_metadata_stream;
- pthread_mutex_lock(&relay_stream->lock);
- is_metadata_stream = relay_stream->is_metadata;
- pthread_mutex_unlock(&relay_stream->lock);
+ pthread_mutex_lock(&stream->lock);
+ is_metadata_stream = stream->is_metadata;
+ pthread_mutex_unlock(&stream->lock);
if (is_metadata_stream) {
trace_has_metadata_stream = true;
}
}
- relay_stream = nullptr;
-
/*
* If there is no metadata stream in this trace at the moment
* and we never sent one to the viewer, skip the trace. We
* accept that the viewer will not see this trace at all.
*/
if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) {
- ctf_trace_put(ctf_trace);
continue;
}
- cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node)
- {
+ for (auto *raw_stream :
+ lttng::urcu::rcu_list_iteration_adapter<relay_stream,
+ &relay_stream::stream_node>(
+ ctf_trace->stream_list)) {
struct relay_viewer_stream *viewer_stream;
- if (!stream_get(relay_stream)) {
+ if (!stream_get(raw_stream)) {
continue;
}
- pthread_mutex_lock(&relay_stream->lock);
+ auto stream =
+ lttng::make_unique_wrapper<relay_stream, stream_put>(raw_stream);
+ raw_stream = nullptr;
+
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
/*
* stream published is protected by the session lock.
*/
- if (!relay_stream->published) {
- goto next;
+ if (!stream->published) {
+ continue;
}
- viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle);
+ viewer_stream = viewer_stream_get_by_id(stream->stream_handle);
if (!viewer_stream) {
struct lttng_trace_chunk *viewer_stream_trace_chunk = nullptr;
* viewer. So that we know what trace the viewer
* is aware of.
*/
- if (relay_stream->is_metadata) {
+ if (stream->is_metadata) {
ctf_trace->metadata_stream_sent_to_viewer = true;
}
* Otherwise, the viewer session's current trace
* chunk can be used safely.
*/
- if ((relay_stream->ongoing_rotation.is_set ||
+ if ((stream->ongoing_rotation.is_set ||
session_has_ongoing_rotation(relay_session)) &&
- relay_stream->trace_chunk) {
+ stream->trace_chunk) {
viewer_stream_trace_chunk =
- lttng_trace_chunk_copy(relay_stream->trace_chunk);
+ lttng_trace_chunk_copy(stream->trace_chunk);
if (!viewer_stream_trace_chunk) {
ret = -1;
- ctf_trace_put(ctf_trace);
- goto error_unlock;
+ goto end;
}
} else {
/*
*/
if (!lttng_trace_chunk_ids_equal(
viewer_session->current_trace_chunk,
- relay_stream->trace_chunk)) {
+ stream->trace_chunk)) {
ret = viewer_session_set_trace_chunk_copy(
- viewer_session, relay_stream->trace_chunk);
+ viewer_session, stream->trace_chunk);
if (ret) {
ret = -1;
- ctf_trace_put(ctf_trace);
- goto error_unlock;
+ goto end;
}
}
- if (relay_stream->trace_chunk) {
+ if (stream->trace_chunk) {
/*
* If the corresponding relay
* stream's trace chunk is set,
}
viewer_stream = viewer_stream_create(
- relay_stream, viewer_stream_trace_chunk, seek_t);
+ stream.get(), viewer_stream_trace_chunk, seek_t);
lttng_trace_chunk_put(viewer_stream_trace_chunk);
viewer_stream_trace_chunk = nullptr;
if (!viewer_stream) {
ret = -1;
- ctf_trace_put(ctf_trace);
- goto error_unlock;
+ goto end;
}
if (nb_created) {
}
/* Update number of total stream counter. */
if (nb_total) {
- if (relay_stream->is_metadata) {
- if (!relay_stream->closed ||
- relay_stream->metadata_received >
+ if (stream->is_metadata) {
+ if (!stream->closed ||
+ stream->metadata_received >
viewer_stream->metadata_sent) {
(*nb_total)++;
}
} else {
- if (!relay_stream->closed ||
- !(((int64_t) (relay_stream->prev_data_seq -
- relay_stream->last_net_seq_num)) >= 0)) {
+ if (!stream->closed ||
+ !(((int64_t) (stream->prev_data_seq -
+ stream->last_net_seq_num)) >= 0)) {
(*nb_total)++;
}
}
}
/* Put local reference. */
viewer_stream_put(viewer_stream);
- next:
- pthread_mutex_unlock(&relay_stream->lock);
- stream_put(relay_stream);
}
- relay_stream = nullptr;
- ctf_trace_put(ctf_trace);
}
ret = 0;
-error_unlock:
-
- if (relay_stream) {
- pthread_mutex_unlock(&relay_stream->lock);
- stream_put(relay_stream);
- }
-
+end:
return ret;
}