live connection (live.c, for client)
|
\-> 1 viewer session
+ | |
+ | \-> (transient ref) 0 or many unannounced relay streams
|
\-> 0 or many session (actually a reference to session as created
| by the relay connection)
There are global tables declared in lttng-relayd.h for sessions
(sessions_ht, indexed by session id), streams (relay_streams_ht, indexed
-by stream handle), and viewer streams (viewer_streams_ht, indexed by
-stream handle). The purpose of those tables is to allow fast lookup of
+by stream handle), viewer sessions (viewer_sessions_ht, indexed by
+connection sock fd), and viewer streams (viewer_streams_ht, indexed
+by stream handle). The purpose of those tables is to allow fast lookup of
those objects using the IDs received in the communication protocols.
There is also one connection hash table per worker thread. There is one
synchronize between threads (currently the main.c relay thread and
live.c client thread) when objects are shared. Locks can be nested from
the outermost object to the innermost object. IOW, the ctf-trace lock can
-nest within the session lock.
+nest within the session lock. The unannounced stream list lock in viewer
+sessions is an exception to the default locking order: it may be nested
+inside the following locks (in order): relay session, ctf_trace, and relay
+stream.
RCU linked lists are used to iterate using RCU, and are protected by
their own mutex for modifications. Iterations should be confirmed using
return ret;
}
+/*
+ * Sends one viewer stream to the given socket.
+ *
+ * This function needs to be called with the stream locked.
+ *
+ * Return 0 on success, or else a negative value.
+ */
+static ssize_t send_one_viewer_stream(struct lttcomm_sock *sock,
+ struct relay_viewer_stream *vstream)
+{
+ struct ctf_trace *ctf_trace;
+ struct lttng_viewer_stream send_stream = {};
+ ssize_t ret = -1;
+
+ ASSERT_LOCKED(vstream->stream->lock);
+
+ ctf_trace = vstream->stream->trace;
+ send_stream.id = htobe64(vstream->stream->stream_handle);
+ send_stream.ctf_trace_id = htobe64(ctf_trace->id);
+ send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
+ if (lttng_strncpy(
+ send_stream.path_name, vstream->path_name, sizeof(send_stream.path_name))) {
+ ret = -1; /* Error. */
+ goto end;
+ }
+ if (lttng_strncpy(send_stream.channel_name,
+ vstream->channel_name,
+ sizeof(send_stream.channel_name))) {
+ ret = -1; /* Error. */
+ goto end;
+ }
+
+ DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
+ vstream->sent_flag = true;
+
+ ret = send_response(sock, &send_stream, sizeof(send_stream));
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Send viewer streams to the given socket. The ignore_sent_flag indicates if
* this function should ignore the sent flag or not.
*
* Return 0 on success or else a negative value.
*/
-static ssize_t
-send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag)
+static ssize_t send_viewer_streams(struct lttcomm_sock *sock,
+ uint64_t session_id,
+ unsigned int ignore_sent_flag,
+ struct relay_viewer_session *viewer_session)
{
ssize_t ret;
decltype(relay_viewer_stream::stream_n),
&relay_viewer_stream::stream_n>(
*viewer_streams_ht->ht)) {
- struct ctf_trace *ctf_trace;
- struct lttng_viewer_stream send_stream = {};
health_code_update();
continue;
}
- ctf_trace = vstream->stream->trace;
- send_stream.id = htobe64(vstream->stream->stream_handle);
- send_stream.ctf_trace_id = htobe64(ctf_trace->id);
- send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
- if (lttng_strncpy(send_stream.path_name,
- vstream->path_name,
- sizeof(send_stream.path_name))) {
- pthread_mutex_unlock(&vstream->stream->lock);
+ ret = send_one_viewer_stream(sock, vstream);
+ pthread_mutex_unlock(&vstream->stream->lock);
+ if (ret < 0) {
viewer_stream_put(vstream);
- ret = -1; /* Error. */
goto end;
}
- if (lttng_strncpy(send_stream.channel_name,
- vstream->channel_name,
- sizeof(send_stream.channel_name))) {
+
+ pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+ cds_list_del_rcu(&vstream->viewer_stream_node);
+ pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+ viewer_stream_put(vstream);
+ }
+
+ /*
+ * Any remaining streams that have been seen, but are perhaps unpublished
+ * due to a session being destroyed in between attach and get_new_streams.
+ */
+ for (auto *vstream : lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+ &relay_viewer_stream::viewer_stream_node>(viewer_session->unannounced_stream_list)) {
+ health_code_update();
+ if (!viewer_stream_get(vstream)) {
+ continue;
+ }
+
+ pthread_mutex_lock(&vstream->stream->lock);
+ if (vstream->stream->trace->session->id != session_id) {
pthread_mutex_unlock(&vstream->stream->lock);
viewer_stream_put(vstream);
- ret = -1; /* Error. */
- goto end;
+ continue;
}
- DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
- vstream->sent_flag = true;
+ ret = send_one_viewer_stream(sock, vstream);
pthread_mutex_unlock(&vstream->stream->lock);
-
- ret = send_response(sock, &send_stream, sizeof(send_stream));
- viewer_stream_put(vstream);
if (ret < 0) {
+ viewer_stream_put(vstream);
goto end;
}
+
+ pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+ cds_list_del_rcu(&vstream->viewer_stream_node);
+ viewer_stream_put(vstream);
+ pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+ viewer_stream_put(vstream);
+
}
ret = 0;
*
* Return 0 on success or else a negative value.
*/
-static int make_viewer_streams(struct relay_session *relay_session,
+int make_viewer_streams(struct relay_session *relay_session,
struct relay_viewer_session *viewer_session,
enum lttng_viewer_seek seek_t,
- uint32_t *nb_total,
- uint32_t *nb_unsent,
- uint32_t *nb_created,
+ unsigned int *nb_total,
+ unsigned int *nb_unsent,
+ unsigned int *nb_created,
bool *closed)
{
int ret;
*closed = true;
}
+ /*
+ * Check unannounced viewer streams for any that have been seen but are no longer published.
+ */
+ for (auto *viewer_stream : lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+ &relay_viewer_stream::viewer_stream_node>(viewer_session->unannounced_stream_list)) {
+ if (!viewer_stream_get(viewer_stream)) {
+ DBG("Couldn't get reference for viewer_stream");
+ continue;
+ }
+
+ if (viewer_stream->sent_flag) {
+ ERR("logic error -> viewer stream %ld is in unannounced_stream_list is marked as sent",
+ viewer_stream->stream->stream_handle);
+ abort();
+ }
+
+ if (viewer_stream->stream->published) {
+ /*
+ * This stream should be handled later when iterating via the
+ * ctf_traces
+ */
+ viewer_stream_put(viewer_stream);
+ continue;
+ }
+
+ if (viewer_stream->stream->trace->session->id != relay_session->id) {
+ viewer_stream_put(viewer_stream);
+ continue;
+ }
+
+ if (nb_unsent) {
+ (*nb_unsent)++;
+ }
+
+ if (nb_total) {
+ (*nb_total)++;
+ }
+
+ viewer_stream_put(viewer_stream);
+ }
+
/*
* Create viewer streams for relay streams that are ready to be
* used for a the given session id only.
auto ctf_trace =
lttng::make_unique_wrapper<struct ctf_trace, ctf_trace_put>(raw_ctf_trace);
+ /*
+ * The trace metadata state may be updated while iterating over all the
+ * relay streams associated with the trace, so the lock is required.
+ */
+ const lttng::pthread::lock_guard ctf_trace_lock(ctf_trace->lock);
/*
* Iterate over all the streams of the trace to see if we have a
goto end;
}
+ /*
+ * Add the new stream to the list of streams to publish for
+ * this session.
+ */
+ pthread_mutex_lock(
+ &viewer_session->unannounced_stream_list_lock);
+ cds_list_add_rcu(&viewer_stream->viewer_stream_node,
+ &viewer_session->unannounced_stream_list);
+ pthread_mutex_unlock(
+ &viewer_session->unannounced_stream_list_lock);
+ /*
+ * Get for the unannounced stream list, this should be
+ * put when the unannounced stream is sent.
+ */
+ if (!viewer_stream_get(viewer_stream)) {
+ ERR("Unable to get self-reference on viewer stream");
+ abort();
+ }
+
if (nb_created) {
/* Update number of created stream counter. */
(*nb_created)++;
}
/* Update number of total stream counter. */
if (nb_total) {
- if (stream->is_metadata) {
+ if (stream->is_metadata) {
if (!stream->closed ||
stream->metadata_received >
viewer_stream->metadata_sent) {
static int viewer_get_new_streams(struct relay_connection *conn)
{
int ret, send_streams = 0;
- uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
+ unsigned int nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
struct lttng_viewer_new_streams_request request;
struct lttng_viewer_new_streams_response response;
struct relay_session *session = nullptr;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
/* Only send back the newly created streams with the unsent ones. */
- nb_streams = nb_created + nb_unsent;
+ nb_streams = nb_unsent + nb_created;
response.streams_count = htobe32(nb_streams);
/*
* streams that were not sent from that point will be sent to
* the viewer.
*/
- ret = send_viewer_streams(conn->sock, session_id, 0);
+ ret = send_viewer_streams(conn->sock, session_id, 0, conn->viewer_session);
if (ret < 0) {
goto end_put_session;
}
{
int send_streams = 0;
ssize_t ret;
- uint32_t nb_streams = 0;
+ unsigned int nb_streams = 0;
enum lttng_viewer_seek seek_type;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
}
/* Send stream and ignore the sent flag. */
- ret = send_viewer_streams(conn->sock, session_id, 1);
+ ret = send_viewer_streams(conn->sock, session_id, 1, conn->viewer_session);
if (ret < 0) {
goto end_put_session;
}
conn->viewer_session->current_trace_chunk ?
std::to_string(viewer_session_chunk_id).c_str() :
"None");
+ } else if (vstream->stream_file.trace_chunk && rstream->completed_rotation_count == vstream->last_seen_rotation_count && !rstream->trace_chunk) {
+ /*
+ * When a relay stream is closed, there is a window before the rotation of the
+ * streams happens, during which the next index may be fetched. If the seen
+ * rotations are the same and the relay stream trace chunk is null, don't rotate.
+ * When the close finishes, the rotation count on the relay stream will go up.
+ */
+ DBG("Transition to latest chunk check (%s -> %s): relay stream chunk is null, but viewer stream knows a chunk and isn't yet behind a rotation",
+ vstream->stream_file.trace_chunk ?
+ std::to_string(stream_file_chunk_id).c_str() :
+ "None",
+ conn->viewer_session->current_trace_chunk ?
+ std::to_string(viewer_session_chunk_id).c_str() :
+ "None");
} else {
DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
vstream->stream_file.trace_chunk ?
if (rstream->closed) {
viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
DBG("Cannot open index for stream id %" PRIu64
- "stream is closed, returning status=%s",
+ " stream is closed, returning status=%s",
(uint64_t) be64toh(request_index.stream_id),
lttng_viewer_next_index_return_code_str(
(enum lttng_viewer_next_index_return_code) viewer_index.status));
bool acquired_reference;
DBG("Viewer session and viewer stream chunk differ: "
- "vsession chunk %p vstream chunk %p",
+ "vsession chunk %p vstream chunk=%p",
conn->viewer_session->current_trace_chunk,
vstream->stream_file.trace_chunk);
lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
int relayd_live_stop();
int relayd_live_join();
+int make_viewer_streams(struct relay_session *relay_session,
+ struct relay_viewer_session *viewer_session,
+ enum lttng_viewer_seek seek_t,
+ unsigned int *nb_total,
+ unsigned int *nb_unsent,
+ unsigned int *nb_created,
+ bool *closed);
#endif /* LTTNG_RELAYD_LIVE_H */
*/
extern struct lttng_ht *sessions_ht;
extern struct lttng_ht *relay_streams_ht;
+extern struct lttng_ht *viewer_sessions_ht;
extern struct lttng_ht *viewer_streams_ht;
extern struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry;
#include "tracefile-array.hpp"
#include "utils.hpp"
#include "version.hpp"
+#include "viewer-session.hpp"
#include "viewer-stream.hpp"
#include <common/align.hpp>
/* Global relay sessions hash table. */
struct lttng_ht *sessions_ht;
+/* Global viewer sessions hash table. */
+struct lttng_ht *viewer_sessions_ht;
+
/* Relayd health monitoring */
struct health_app *health_relayd;
if (viewer_streams_ht)
lttng_ht_destroy(viewer_streams_ht);
+ if (viewer_sessions_ht) {
+ lttng_ht_destroy(viewer_sessions_ht);
+ }
if (relay_streams_ht)
lttng_ht_destroy(relay_streams_ht);
if (sessions_ht)
lttng_ht_destroy(sessions_ht);
-
free(opt_output_path);
free(opt_working_directory);
static void publish_connection_local_streams(struct relay_connection *conn)
{
struct relay_session *session = conn->session;
+ unsigned int created = 0;
+ bool closed = false;
+
+ LTTNG_ASSERT(viewer_sessions_ht);
/*
* We publish all streams belonging to a session atomically wrt
/*
* Inform the viewer that there are new streams in the session.
*/
- if (session->viewer_attached) {
- uatomic_set(&session->new_streams, 1);
+ if (!session->viewer_attached) {
+ goto unlock;
+ }
+
+ /*
+ * Create viewer_streams for all the newly published streams for this relay session.
+ * This searches through all known viewer sessions and finds those that are
+ * attached to this connection's relay session. This is done so that the newer
+ * viewer streams will hold a reference on any relay streams that already exist,
+ * but may be unpublished between now and the next GET_NEW_STREAMS from the
+ * attached live viewer.
+ */
+ for (auto *viewer_session: lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
+ decltype(relay_viewer_session::viewer_session_n),
+ &relay_viewer_session::viewer_session_n>(*viewer_sessions_ht->ht))
+ {
+ for (auto *session_iter: lttng::urcu::rcu_list_iteration_adapter<relay_session,
+ &relay_session::viewer_session_node>(viewer_session->session_list))
+ {
+ if (session != session_iter) {
+ continue;
+ }
+ const int ret = make_viewer_streams(session,
+ viewer_session,
+ LTTNG_VIEWER_SEEK_BEGINNING,
+ nullptr,
+ nullptr,
+ &created,
+ &closed);
+ if (ret == 0) {
+ DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64,
+ created,
+ session->id);
+ } else if (ret < 0) {
+ /*
+ * Warning, since the creation of the
+ * streams will be retried when the viewer
+ * next sends the GET_NEW_STREAMS again.
+ */
+ WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64
+ ", ret=%d, created=%d, closed=%d",
+ session->id,
+ ret,
+ created,
+ closed);
+ }
+ }
}
+unlock:
+ uatomic_set(&session->new_streams, 1);
+ pthread_mutex_unlock(&session->lock);
}
static int conform_channel_path(char *channel_path)
goto exit_options;
}
+ /* tables of viewer sessions indexed by session ID */
+ viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!viewer_sessions_ht) {
+ retval = -1;
+ goto exit_options;
+ }
+
/* tables of streams indexed by stream ID */
viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!viewer_streams_ht) {
#define _LGPL_SOURCE
#include "ctf-trace.hpp"
+#include "live.hpp"
#include "lttng-relayd.hpp"
#include "session.hpp"
#include "stream.hpp"
#include <urcu/rculist.h>
+/* Global session id used in the session creation. */
+static uint64_t last_viewer_session_id;
+static pthread_mutex_t last_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
+
struct relay_viewer_session *viewer_session_create()
{
struct relay_viewer_session *vsession;
if (!vsession) {
goto end;
}
+ pthread_mutex_lock(&last_viewer_session_id_lock);
+ vsession->id = ++last_viewer_session_id;
+ pthread_mutex_unlock(&last_viewer_session_id_lock);
CDS_INIT_LIST_HEAD(&vsession->session_list);
+ CDS_INIT_LIST_HEAD(&vsession->unannounced_stream_list);
+ pthread_mutex_init(&vsession->unannounced_stream_list_lock, nullptr);
+ lttng_ht_node_init_u64(&vsession->viewer_session_n, vsession->id);
+ lttng_ht_add_unique_u64(viewer_sessions_ht, &vsession->viewer_session_n);
end:
return vsession;
}
/* Ownership is transfered to the list. */
cds_list_add_rcu(&session->viewer_session_node, &vsession->session_list);
pthread_mutex_unlock(&vsession->session_list_lock);
+
+ /*
+ * Immediately create new viewer streams for the attached session
+ * so that the viewer streams hold a reference on the any relay
+ * streams that could be unpublished between now and the next
+ * GET_NEW_STREAMS command from the live viewer.
+ */
+ uint32_t created = 0;
+ uint32_t total = 0;
+ uint32_t unsent = 0;
+ bool closed = false;
+ const int make_viewer_streams_ret = make_viewer_streams(session,
+ vsession,
+ LTTNG_VIEWER_SEEK_BEGINNING,
+ &total,
+ &unsent,
+ &created,
+ &closed);
+
+ if (make_viewer_streams_ret == 0) {
+ DBG("Created %d new viewer streams while attaching to relay session %" PRIu64, created, session->id);
+ } else {
+ /*
+ * Warning, since the creation of the streams will be retried when
+ * the viewer next sends the GET_NEW_STREAMS commands.
+ */
+ WARN("Failed to create new viewer streams while attaching to relay session %" PRIu64 ", ret=%d, total=%d, unsent=%d, created=%d, closed=%d",
+ session->id,
+ make_viewer_streams_ret,
+ total,
+ unsent,
+ created,
+ closed);
+ }
} else {
/* Put our local ref. */
session_put(session);
/* Release reference held by the list. */
session_put(session);
}
+
/* Safe since we know the session exists. */
pthread_mutex_unlock(&session->lock);
return ret;
void viewer_session_destroy(struct relay_viewer_session *vsession)
{
+ struct lttng_ht_iter iter;
+
+ LTTNG_ASSERT(cds_list_empty(&vsession->unannounced_stream_list));
+
+ iter.iter.node = &vsession->viewer_session_n.node;
+ lttng_ht_del(viewer_sessions_ht, &iter);
lttng_trace_chunk_put(vsession->current_trace_chunk);
free(vsession);
}
if (!viewer_stream_get(vstream)) {
continue;
}
+
if (vstream->stream->trace->session != session) {
viewer_stream_put(vstream);
continue;
viewer_stream_put(vstream);
}
+ for (auto *vstream: lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+ &relay_viewer_stream::viewer_stream_node>(vsession->unannounced_stream_list))
+ {
+ if (!viewer_stream_get(vstream)) {
+ continue;
+ }
+ if (vstream->stream->trace->session != session) {
+ viewer_stream_put(vstream);
+ continue;
+ }
+ pthread_mutex_lock(&vsession->unannounced_stream_list_lock);
+ cds_list_del_rcu(&vstream->viewer_stream_node);
+ pthread_mutex_unlock(&vsession->unannounced_stream_list_lock);
+ /* Local reference */
+ viewer_stream_put(vstream);
+ /* Reference from unannounced_stream_list */
+ viewer_stream_put(vstream);
+ }
+
lttng_trace_chunk_put(vsession->current_trace_chunk);
vsession->current_trace_chunk = nullptr;
viewer_session_detach(vsession, session);
#include <urcu/ref.h>
struct relay_viewer_session {
+ /*
+ * The id of the relay viewer session. Uses the associated connection's socket FD.
+ */
+ uint64_t id;
/*
* Session list. Updates are protected by the session_list_lock.
* Traversals are protected by RCU.
*/
struct cds_list_head session_list; /* RCU list. */
pthread_mutex_t session_list_lock; /* Protects list updates. */
+ /*
+ * Unannounced stream list. Updates are protected by the
+ * unannounced_stream_list_lock. This lock nests inside
+ * the following locks (in order): relay session, ctf_trace,
+ * and relay stream.
+ *
+ * Traversals are protected by RCU.
+ */
+ struct cds_list_head unannounced_stream_list;
+ pthread_mutex_t unannounced_stream_list_lock;
+ /*
+ * Node in the global viewer sessions hashtable.
+ */
+ struct lttng_ht_node_u64 viewer_session_n;
/*
* The viewer session's current trace chunk is initially set, when
* a viewer attaches to the viewer session, to a copy the corresponding
PERROR("relay viewer stream zmalloc");
goto error;
}
-
if (trace_chunk) {
const bool acquired_reference = lttng_trace_chunk_get(trace_chunk);
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
urcu_ref_init(&vstream->ref);
lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
-
return vstream;
error:
/* Back ref to stream. */
struct relay_stream *stream;
+ /*
+ * Member of unannounced_stream_list in struct viewer_sesion.
+ * Updates are protected by the unannounced_stream_list_lock, and
+ * traversals are protected by RCU.
+ */
+ struct cds_list_head viewer_stream_node;
+
struct {
struct fs_handle *handle;
struct lttng_trace_chunk *trace_chunk;