static pthread_mutex_t last_relay_viewer_session_id_lock =
PTHREAD_MUTEX_INITIALIZER;
+static const char *
+lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code)
+{
+ switch (code) {
+ case LTTNG_VIEWER_INDEX_OK:
+ return "INDEX_OK";
+ case LTTNG_VIEWER_INDEX_RETRY:
+ return "INDEX_RETRY";
+ case LTTNG_VIEWER_INDEX_HUP:
+ return "INDEX_HUP";
+ case LTTNG_VIEWER_INDEX_ERR:
+ return "INDEX_ERR";
+ case LTTNG_VIEWER_INDEX_INACTIVE:
+ return "INDEX_INACTIVE";
+ case LTTNG_VIEWER_INDEX_EOF:
+ return "INDEX_EOF";
+ default:
+ abort();
+ }
+}
+
/*
* Cleanup the daemon
*/
int check_new_streams(struct relay_connection *conn)
{
struct relay_session *session;
- unsigned long current_val;
int ret = 0;
+ rcu_read_lock();
if (!conn->viewer_session) {
goto end;
}
- rcu_read_lock();
- cds_list_for_each_entry_rcu(session,
- &conn->viewer_session->session_list,
- viewer_session_node) {
+
+ cds_list_for_each_entry_rcu(
+ session, &conn->viewer_session->session_list, viewer_session_node)
+ {
if (!session_get(session)) {
continue;
}
- current_val = uatomic_cmpxchg(&session->new_streams, 1, 0);
- ret = current_val;
+
+ ret = uatomic_read(&session->new_streams);
session_put(session);
if (ret == 1) {
goto end;
}
}
+
end:
rcu_read_unlock();
+ DBG("Viewer connection has%s new streams: socket_fd = %d", ret == 0 ? " no" : "", conn->sock->fd);
return ret;
}
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply_unlock;
}
+
+ uatomic_set(&session->new_streams, 0);
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
struct relay_stream *rstream = NULL;
struct ctf_trace *ctf_trace = NULL;
struct relay_viewer_stream *metadata_viewer_stream = NULL;
+ bool attached_sessions_have_new_streams = false;
assert(conn);
goto send_reply;
}
+ ret = check_new_streams(conn);
+ if (ret < 0) {
+ viewer_index.status = LTTNG_VIEWER_INDEX_ERR;
+ ERR("Error checking for new streams in the attached sessions, returning status=%s",
+ lttng_viewer_next_index_return_code_str(
+ (enum lttng_viewer_next_index_return_code) viewer_index.status));
+ goto send_reply;
+ } else if (ret == 1) {
+ attached_sessions_have_new_streams = true;
+ }
+
if (rstream->ongoing_rotation.is_set) {
/* Rotation is ongoing, try again later. */
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
*/
goto send_reply;
}
+
/* At this point, ret is 0 thus we will be able to read the index. */
assert(!ret);
vstream->stream_file.handle = fs_handle;
}
- ret = check_new_streams(conn);
- if (ret < 0) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
- goto send_reply;
- } else if (ret == 1) {
- viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
- }
-
ret = lttng_index_file_read(vstream->index_file, &packet_index);
if (ret) {
ERR("Relay error reading index file");
pthread_mutex_unlock(&metadata_viewer_stream->stream->lock);
}
+ if (attached_sessions_have_new_streams) {
+ viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
+ }
+
viewer_index.flags = htobe32(viewer_index.flags);
health_code_update();