decltype(relay_viewer_stream::stream_n),
&relay_viewer_stream::stream_n>(
*viewer_streams_ht->ht)) {
-
health_code_update();
if (!viewer_stream_get(vstream)) {
* Any remaining streams that have been seen, but are perhaps unpublished
* due to a session being destroyed in between attach and get_new_streams.
*/
- for (auto *vstream : lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
- &relay_viewer_stream::viewer_stream_node>(viewer_session->unannounced_stream_list)) {
+ for (auto *vstream :
+ lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+ &relay_viewer_stream::viewer_stream_node>(
+ viewer_session->unannounced_stream_list)) {
health_code_update();
if (!viewer_stream_get(vstream)) {
continue;
viewer_stream_put(vstream);
pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
viewer_stream_put(vstream);
-
}
ret = 0;
* Return 0 on success or else a negative value.
*/
int make_viewer_streams(struct relay_session *relay_session,
- struct relay_viewer_session *viewer_session,
- enum lttng_viewer_seek seek_t,
- unsigned int *nb_total,
- unsigned int *nb_unsent,
- unsigned int *nb_created,
- bool *closed)
+ struct relay_viewer_session *viewer_session,
+ enum lttng_viewer_seek seek_t,
+ unsigned int *nb_total,
+ unsigned int *nb_unsent,
+ unsigned int *nb_created,
+ bool *closed)
{
int ret;
/*
* Check unannounced viewer streams for any that have been seen but are no longer published.
*/
- for (auto *viewer_stream : lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
- &relay_viewer_stream::viewer_stream_node>(viewer_session->unannounced_stream_list)) {
+ for (auto *viewer_stream :
+ lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+ &relay_viewer_stream::viewer_stream_node>(
+ viewer_session->unannounced_stream_list)) {
if (!viewer_stream_get(viewer_stream)) {
DBG("Couldn't get reference for viewer_stream");
continue;
* Add the new stream to the list of streams to publish for
* this session.
*/
- pthread_mutex_lock(
- &viewer_session->unannounced_stream_list_lock);
+ pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
cds_list_add_rcu(&viewer_stream->viewer_stream_node,
&viewer_session->unannounced_stream_list);
- pthread_mutex_unlock(
- &viewer_session->unannounced_stream_list_lock);
+ pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
/*
* Get for the unannounced stream list, this should be
* put when the unannounced stream is sent.
}
/* Update number of total stream counter. */
if (nb_total) {
- if (stream->is_metadata) {
+ if (stream->is_metadata) {
if (!stream->closed ||
stream->metadata_received >
viewer_stream->metadata_sent) {
conn->viewer_session->current_trace_chunk ?
std::to_string(viewer_session_chunk_id).c_str() :
"None");
- } else if (vstream->stream_file.trace_chunk && rstream->completed_rotation_count == vstream->last_seen_rotation_count && !rstream->trace_chunk) {
+ } else if (vstream->stream_file.trace_chunk &&
+ rstream->completed_rotation_count == vstream->last_seen_rotation_count &&
+ !rstream->trace_chunk) {
/*
* When a relay stream is closed, there is a window before the rotation of the
* streams happens, during which the next index may be fetched. If the seen
* but may be unpublished between now and the next GET_NEW_STREAMS from the
* attached live viewer.
*/
- for (auto *viewer_session: lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
- decltype(relay_viewer_session::viewer_session_n),
- &relay_viewer_session::viewer_session_n>(*viewer_sessions_ht->ht))
- {
- for (auto *session_iter: lttng::urcu::rcu_list_iteration_adapter<relay_session,
- &relay_session::viewer_session_node>(viewer_session->session_list))
- {
+ for (auto *viewer_session :
+ lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
+ decltype(relay_viewer_session::viewer_session_n),
+ &relay_viewer_session::viewer_session_n>(
+ *viewer_sessions_ht->ht)) {
+ for (auto *session_iter :
+ lttng::urcu::rcu_list_iteration_adapter<relay_session,
+ &relay_session::viewer_session_node>(
+ viewer_session->session_list)) {
if (session != session_iter) {
continue;
}
uint32_t unsent = 0;
bool closed = false;
const int make_viewer_streams_ret = make_viewer_streams(session,
- vsession,
- LTTNG_VIEWER_SEEK_BEGINNING,
- &total,
- &unsent,
- &created,
- &closed);
+ vsession,
+ LTTNG_VIEWER_SEEK_BEGINNING,
+ &total,
+ &unsent,
+ &created,
+ &closed);
if (make_viewer_streams_ret == 0) {
- DBG("Created %d new viewer streams while attaching to relay session %" PRIu64, created, session->id);
+ DBG("Created %d new viewer streams while attaching to relay session %" PRIu64,
+ created,
+ session->id);
} else {
/*
* Warning, since the creation of the streams will be retried when
* the viewer next sends the GET_NEW_STREAMS commands.
*/
- WARN("Failed to create new viewer streams while attaching to relay session %" PRIu64 ", ret=%d, total=%d, unsent=%d, created=%d, closed=%d",
+ WARN("Failed to create new viewer streams while attaching to relay session %" PRIu64
+ ", ret=%d, total=%d, unsent=%d, created=%d, closed=%d",
session->id,
make_viewer_streams_ret,
total,
viewer_stream_put(vstream);
}
- for (auto *vstream: lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
- &relay_viewer_stream::viewer_stream_node>(vsession->unannounced_stream_list))
- {
+ for (auto *vstream :
+ lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+ &relay_viewer_stream::viewer_stream_node>(
+ vsession->unannounced_stream_list)) {
if (!viewer_stream_get(vstream)) {
continue;
}