Fix: relayd: live: Catch short lived applications for attached viewers
authorKienan Stewart <kstewart@efficios.com>
Fri, 12 Apr 2024 18:27:09 +0000 (14:27 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 30 Aug 2024 21:09:19 +0000 (17:09 -0400)
Observed issue
==============

When a live viewer is attached to a session and a new application
starts, emits events, and exits the viewer may not see the produced
events.

With per-UID buffer allocation, the application needs to run as a new
user that hasn't had streams allocated before. With per-PID buffers,
spawning a new traced application is sufficient.

Cause
=====

When the new relay streams are created, associated viewer streams are
not immediately created. As a result, there is a gap between in which
the session may start being destroyed and/or the relay streams
unpublished and the time at which the live viewer sends a GET_NEW_STREAMS
command. When the relay streams are unpublished for any reason, the
reference to the relay stream in the ctf_trace is removed. The new
and unsent streams iterate over the relay streams in each ctf_trace.
Therefore, relay streams that were created and unpublished while
the live viewer was already attached to the session can be completely
missed.

Solution
========

The solution has three main aspects:

1. When new relayd streams are published and a viewer is attached for the
corresponding relay session or when a live viewer session attaches to
an existing relay session the viewer streams are created immediately.

2. The unsent viewer streams are tracked in a per-viewer session
list so that there continues to be a reference (via the
viewer_stream->stream backreference) held for the relay stream, and that
unpublished relay streams can be found without iterating over the
entire relay streams hashtable.

3. To cover cases where a relay stream has been closed but there are
still known trace chunks available, an additional check has been added
to the `get_next_index` viewer stream transition checks. When the
seen rotation count and relay stream rotation count are the same and
that the relay stream no longer has an active trace chunk, the
viewer stream is not forcibly rotated. This stops the final drop to
the trace chunk reference (via
viewerstream->stream_file->trace_chunk). Later, when the relay stream
is fully closed, there is a final rotation that is performed.

Known drawbacks
===============

The current implementation adds a global hash table which holds
references to created viewer sessions. When searching to determine if
new viewer streams should be created, the search is O(N*M) where N is the
number of viewer sessons and M is the number of relay sessions.

A different approach to recording references from relay sessions to
viewer sessions (if any exist) could reduce the search space.

Change-Id: Ie8f00697a4dafd5c9b0bfe60a872d1c1882f6944
Signed-off-by: Kienan Stewart <kstewart@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
doc/relayd-architecture.txt
src/bin/lttng-relayd/live.cpp
src/bin/lttng-relayd/live.hpp
src/bin/lttng-relayd/lttng-relayd.hpp
src/bin/lttng-relayd/main.cpp
src/bin/lttng-relayd/viewer-session.cpp
src/bin/lttng-relayd/viewer-session.hpp
src/bin/lttng-relayd/viewer-stream.cpp
src/bin/lttng-relayd/viewer-stream.hpp

index 56c81241fa6b363e18361780b367ba3056ea8320..1d85f0be6e443445d635ef39ffdf92dcd211410c 100644 (file)
@@ -22,6 +22,8 @@ relay connection (main.c, for sessiond/consumer)
 live connection (live.c, for client)
   |
   \-> 1 viewer session
+       |     |
+       |     \-> (transient ref) 0 or many unannounced relay streams
        |
        \-> 0 or many session (actually a reference to session as created
              |                by the relay connection)
@@ -30,8 +32,9 @@ live connection (live.c, for client)
 
 There are global tables declared in lttng-relayd.h for sessions
 (sessions_ht, indexed by session id), streams (relay_streams_ht, indexed
-by stream handle), and viewer streams (viewer_streams_ht, indexed by
-stream handle). The purpose of those tables is to allow fast lookup of
+by stream handle), viewer sessions (viewer_sessions_ht, indexed by
+connection sock fd), and viewer streams (viewer_streams_ht, indexed
+by stream handle). The purpose of those tables is to allow fast lookup of
 those objects using the IDs received in the communication protocols.
 
 There is also one connection hash table per worker thread. There is one
@@ -56,7 +59,10 @@ There is also a "lock" mutex in each object. Those are used to
 synchronize between threads (currently the main.c relay thread and
 live.c client thread) when objects are shared. Locks can be nested from
 the outermost object to the innermost object. IOW, the ctf-trace lock can
-nest within the session lock.
+nest within the session lock. The unannounced stream list lock in viewer
+sessions is an exception to the default locking order: it may be nested
+inside the following locks (in order): relay session, ctf_trace, and relay
+stream.
 
 RCU linked lists are used to iterate using RCU, and are protected by
 their own mutex for modifications. Iterations should be confirmed using
index 73548cfb33cf36e6d52218dd2bcf71f1069129f7..250e05c2effd9ad5d74315f3495b0a77a462b41f 100644 (file)
@@ -267,14 +267,62 @@ end:
        return ret;
 }
 
+/*
+ * Sends one viewer stream to the given socket.
+ *
+ * This function needs to be called with the stream locked.
+ *
+ * Return 0 on success, or else a negative value.
+ */
+static ssize_t send_one_viewer_stream(struct lttcomm_sock *sock,
+                                     struct relay_viewer_stream *vstream)
+{
+       struct ctf_trace *ctf_trace;
+       struct lttng_viewer_stream send_stream = {};
+       ssize_t ret = -1;
+
+       ASSERT_LOCKED(vstream->stream->lock);
+
+       ctf_trace = vstream->stream->trace;
+       send_stream.id = htobe64(vstream->stream->stream_handle);
+       send_stream.ctf_trace_id = htobe64(ctf_trace->id);
+       send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
+       if (lttng_strncpy(
+                   send_stream.path_name, vstream->path_name, sizeof(send_stream.path_name))) {
+               ret = -1; /* Error. */
+               goto end;
+       }
+       if (lttng_strncpy(send_stream.channel_name,
+                         vstream->channel_name,
+                         sizeof(send_stream.channel_name))) {
+               ret = -1; /* Error. */
+               goto end;
+       }
+
+       DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
+       vstream->sent_flag = true;
+
+       ret = send_response(sock, &send_stream, sizeof(send_stream));
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Send viewer streams to the given socket. The ignore_sent_flag indicates if
  * this function should ignore the sent flag or not.
  *
  * Return 0 on success or else a negative value.
  */
-static ssize_t
-send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag)
+static ssize_t send_viewer_streams(struct lttcomm_sock *sock,
+                                  uint64_t session_id,
+                                  unsigned int ignore_sent_flag,
+                                  struct relay_viewer_session *viewer_session)
 {
        ssize_t ret;
 
@@ -283,8 +331,6 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int
                                                 decltype(relay_viewer_stream::stream_n),
                                                 &relay_viewer_stream::stream_n>(
                     *viewer_streams_ht->ht)) {
-               struct ctf_trace *ctf_trace;
-               struct lttng_viewer_stream send_stream = {};
 
                health_code_update();
 
@@ -301,36 +347,50 @@ send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int
                        continue;
                }
 
-               ctf_trace = vstream->stream->trace;
-               send_stream.id = htobe64(vstream->stream->stream_handle);
-               send_stream.ctf_trace_id = htobe64(ctf_trace->id);
-               send_stream.metadata_flag = htobe32(vstream->stream->is_metadata);
-               if (lttng_strncpy(send_stream.path_name,
-                                 vstream->path_name,
-                                 sizeof(send_stream.path_name))) {
-                       pthread_mutex_unlock(&vstream->stream->lock);
+               ret = send_one_viewer_stream(sock, vstream);
+               pthread_mutex_unlock(&vstream->stream->lock);
+               if (ret < 0) {
                        viewer_stream_put(vstream);
-                       ret = -1; /* Error. */
                        goto end;
                }
-               if (lttng_strncpy(send_stream.channel_name,
-                                 vstream->channel_name,
-                                 sizeof(send_stream.channel_name))) {
+
+               pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+               cds_list_del_rcu(&vstream->viewer_stream_node);
+               pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+               viewer_stream_put(vstream);
+       }
+
+       /*
+        * Any remaining streams that have been seen, but are perhaps unpublished
+        * due to a session being destroyed in between attach and get_new_streams.
+        */
+       for (auto *vstream : lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+                    &relay_viewer_stream::viewer_stream_node>(viewer_session->unannounced_stream_list)) {
+               health_code_update();
+               if (!viewer_stream_get(vstream)) {
+                       continue;
+               }
+
+               pthread_mutex_lock(&vstream->stream->lock);
+               if (vstream->stream->trace->session->id != session_id) {
                        pthread_mutex_unlock(&vstream->stream->lock);
                        viewer_stream_put(vstream);
-                       ret = -1; /* Error. */
-                       goto end;
+                       continue;
                }
 
-               DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle);
-               vstream->sent_flag = true;
+               ret = send_one_viewer_stream(sock, vstream);
                pthread_mutex_unlock(&vstream->stream->lock);
-
-               ret = send_response(sock, &send_stream, sizeof(send_stream));
-               viewer_stream_put(vstream);
                if (ret < 0) {
+                       viewer_stream_put(vstream);
                        goto end;
                }
+
+               pthread_mutex_lock(&viewer_session->unannounced_stream_list_lock);
+               cds_list_del_rcu(&vstream->viewer_stream_node);
+               viewer_stream_put(vstream);
+               pthread_mutex_unlock(&viewer_session->unannounced_stream_list_lock);
+               viewer_stream_put(vstream);
+
        }
 
        ret = 0;
@@ -350,12 +410,12 @@ end:
  *
  * Return 0 on success or else a negative value.
  */
-static int make_viewer_streams(struct relay_session *relay_session,
+int make_viewer_streams(struct relay_session *relay_session,
                               struct relay_viewer_session *viewer_session,
                               enum lttng_viewer_seek seek_t,
-                              uint32_t *nb_total,
-                              uint32_t *nb_unsent,
-                              uint32_t *nb_created,
+                              unsigned int *nb_total,
+                              unsigned int *nb_unsent,
+                              unsigned int *nb_created,
                               bool *closed)
 {
        int ret;
@@ -367,6 +427,47 @@ static int make_viewer_streams(struct relay_session *relay_session,
                *closed = true;
        }
 
+       /*
+        * Check unannounced viewer streams for any that have been seen but are no longer published.
+        */
+       for (auto *viewer_stream : lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+                    &relay_viewer_stream::viewer_stream_node>(viewer_session->unannounced_stream_list)) {
+               if (!viewer_stream_get(viewer_stream)) {
+                       DBG("Couldn't get reference for viewer_stream");
+                       continue;
+               }
+
+               if (viewer_stream->sent_flag) {
+                       ERR("logic error -> viewer stream %ld is in unannounced_stream_list is marked as sent",
+                           viewer_stream->stream->stream_handle);
+                       abort();
+               }
+
+               if (viewer_stream->stream->published) {
+                       /*
+                        * This stream should be handled later when iterating via the
+                        * ctf_traces
+                        */
+                       viewer_stream_put(viewer_stream);
+                       continue;
+               }
+
+               if (viewer_stream->stream->trace->session->id != relay_session->id) {
+                       viewer_stream_put(viewer_stream);
+                       continue;
+               }
+
+               if (nb_unsent) {
+                       (*nb_unsent)++;
+               }
+
+               if (nb_total) {
+                       (*nb_total)++;
+               }
+
+               viewer_stream_put(viewer_stream);
+       }
+
        /*
         * Create viewer streams for relay streams that are ready to be
         * used for a the given session id only.
@@ -384,6 +485,11 @@ static int make_viewer_streams(struct relay_session *relay_session,
 
                auto ctf_trace =
                        lttng::make_unique_wrapper<struct ctf_trace, ctf_trace_put>(raw_ctf_trace);
+               /*
+                * The trace metadata state may be updated while iterating over all the
+                * relay streams associated with the trace, so the lock is required.
+                */
+               const lttng::pthread::lock_guard ctf_trace_lock(ctf_trace->lock);
 
                /*
                 * Iterate over all the streams of the trace to see if we have a
@@ -513,6 +619,25 @@ static int make_viewer_streams(struct relay_session *relay_session,
                                        goto end;
                                }
 
+                               /*
+                                * Add the new stream to the list of streams to publish for
+                                * this session.
+                                */
+                               pthread_mutex_lock(
+                                       &viewer_session->unannounced_stream_list_lock);
+                               cds_list_add_rcu(&viewer_stream->viewer_stream_node,
+                                                &viewer_session->unannounced_stream_list);
+                               pthread_mutex_unlock(
+                                       &viewer_session->unannounced_stream_list_lock);
+                               /*
+                                * Get for the unannounced stream list, this should be
+                                * put when the unannounced stream is sent.
+                                */
+                               if (!viewer_stream_get(viewer_stream)) {
+                                       ERR("Unable to get self-reference on viewer stream");
+                                       abort();
+                               }
+
                                if (nb_created) {
                                        /* Update number of created stream counter. */
                                        (*nb_created)++;
@@ -533,7 +658,7 @@ static int make_viewer_streams(struct relay_session *relay_session,
                        }
                        /* Update number of total stream counter. */
                        if (nb_total) {
-                               if (stream->is_metadata) {
+                               if (stream->is_metadata) {
                                        if (!stream->closed ||
                                            stream->metadata_received >
                                                    viewer_stream->metadata_sent) {
@@ -1124,7 +1249,7 @@ end_free:
 static int viewer_get_new_streams(struct relay_connection *conn)
 {
        int ret, send_streams = 0;
-       uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
+       unsigned int nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
        struct lttng_viewer_new_streams_request request;
        struct lttng_viewer_new_streams_response response;
        struct relay_session *session = nullptr;
@@ -1199,7 +1324,7 @@ static int viewer_get_new_streams(struct relay_connection *conn)
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
        /* Only send back the newly created streams with the unsent ones. */
-       nb_streams = nb_created + nb_unsent;
+       nb_streams = nb_unsent + nb_created;
        response.streams_count = htobe32(nb_streams);
 
        /*
@@ -1237,7 +1362,7 @@ send_reply:
         * streams that were not sent from that point will be sent to
         * the viewer.
         */
-       ret = send_viewer_streams(conn->sock, session_id, 0);
+       ret = send_viewer_streams(conn->sock, session_id, 0, conn->viewer_session);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -1257,7 +1382,7 @@ static int viewer_attach_session(struct relay_connection *conn)
 {
        int send_streams = 0;
        ssize_t ret;
-       uint32_t nb_streams = 0;
+       unsigned int nb_streams = 0;
        enum lttng_viewer_seek seek_type;
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
@@ -1390,7 +1515,7 @@ send_reply:
        }
 
        /* Send stream and ignore the sent flag. */
-       ret = send_viewer_streams(conn->sock, session_id, 1);
+       ret = send_viewer_streams(conn->sock, session_id, 1, conn->viewer_session);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -1786,6 +1911,20 @@ static int viewer_get_next_index(struct relay_connection *conn)
                    conn->viewer_session->current_trace_chunk ?
                            std::to_string(viewer_session_chunk_id).c_str() :
                            "None");
+       } else if (vstream->stream_file.trace_chunk && rstream->completed_rotation_count == vstream->last_seen_rotation_count && !rstream->trace_chunk) {
+               /*
+                * When a relay stream is closed, there is a window before the rotation of the
+                * streams happens, during which the next index may be fetched. If the seen
+                * rotations are the same and the relay stream trace chunk is null, don't rotate.
+                * When the close finishes, the rotation count on the relay stream will go up.
+                */
+               DBG("Transition to latest chunk check (%s -> %s): relay stream chunk is null, but viewer stream knows a chunk and isn't yet behind a rotation",
+                   vstream->stream_file.trace_chunk ?
+                           std::to_string(stream_file_chunk_id).c_str() :
+                           "None",
+                   conn->viewer_session->current_trace_chunk ?
+                           std::to_string(viewer_session_chunk_id).c_str() :
+                           "None");
        } else {
                DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream",
                    vstream->stream_file.trace_chunk ?
@@ -1820,7 +1959,7 @@ static int viewer_get_next_index(struct relay_connection *conn)
                if (rstream->closed) {
                        viewer_index.status = LTTNG_VIEWER_INDEX_HUP;
                        DBG("Cannot open index for stream id %" PRIu64
-                           "stream is closed, returning status=%s",
+                           " stream is closed, returning status=%s",
                            (uint64_t) be64toh(request_index.stream_id),
                            lttng_viewer_next_index_return_code_str(
                                    (enum lttng_viewer_next_index_return_code) viewer_index.status));
@@ -2181,7 +2320,7 @@ static int viewer_get_metadata(struct relay_connection *conn)
                bool acquired_reference;
 
                DBG("Viewer session and viewer stream chunk differ: "
-                   "vsession chunk %p vstream chunk %p",
+                   "vsession chunk %p vstream chunk=%p",
                    conn->viewer_session->current_trace_chunk,
                    vstream->stream_file.trace_chunk);
                lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
index 53f67d30ef019061b2a4e00cf537c3a2434489c6..47926d2ab287a12d0a475658afb145b644ce958d 100644 (file)
@@ -18,4 +18,11 @@ int relayd_live_create(struct lttng_uri *live_uri);
 int relayd_live_stop();
 int relayd_live_join();
 
+int make_viewer_streams(struct relay_session *relay_session,
+                       struct relay_viewer_session *viewer_session,
+                       enum lttng_viewer_seek seek_t,
+                       unsigned int *nb_total,
+                       unsigned int *nb_unsent,
+                       unsigned int *nb_created,
+                       bool *closed);
 #endif /* LTTNG_RELAYD_LIVE_H */
index bf86575f5589a8071e05f84d35437962382303d9..052209b471351fd48adcde745e7cebbfc1b628ce 100644 (file)
@@ -42,6 +42,7 @@ enum relay_group_output_by {
  */
 extern struct lttng_ht *sessions_ht;
 extern struct lttng_ht *relay_streams_ht;
+extern struct lttng_ht *viewer_sessions_ht;
 extern struct lttng_ht *viewer_streams_ht;
 extern struct sessiond_trace_chunk_registry *sessiond_trace_chunk_registry;
 
index 42ec33e52fe8826186702798b6cf2c51f51d9e47..c26050b456281289c1ff084c977336143c738708 100644 (file)
@@ -25,6 +25,7 @@
 #include "tracefile-array.hpp"
 #include "utils.hpp"
 #include "version.hpp"
+#include "viewer-session.hpp"
 #include "viewer-stream.hpp"
 
 #include <common/align.hpp>
@@ -169,6 +170,9 @@ struct lttng_ht *viewer_streams_ht;
 /* Global relay sessions hash table. */
 struct lttng_ht *sessions_ht;
 
+/* Global viewer sessions hash table. */
+struct lttng_ht *viewer_sessions_ht;
+
 /* Relayd health monitoring */
 struct health_app *health_relayd;
 
@@ -774,11 +778,13 @@ static void relayd_cleanup()
 
        if (viewer_streams_ht)
                lttng_ht_destroy(viewer_streams_ht);
+       if (viewer_sessions_ht) {
+               lttng_ht_destroy(viewer_sessions_ht);
+       }
        if (relay_streams_ht)
                lttng_ht_destroy(relay_streams_ht);
        if (sessions_ht)
                lttng_ht_destroy(sessions_ht);
-
        free(opt_output_path);
        free(opt_working_directory);
 
@@ -1513,6 +1519,10 @@ end:
 static void publish_connection_local_streams(struct relay_connection *conn)
 {
        struct relay_session *session = conn->session;
+       unsigned int created = 0;
+       bool closed = false;
+
+       LTTNG_ASSERT(viewer_sessions_ht);
 
        /*
         * We publish all streams belonging to a session atomically wrt
@@ -1529,9 +1539,57 @@ static void publish_connection_local_streams(struct relay_connection *conn)
        /*
         * Inform the viewer that there are new streams in the session.
         */
-       if (session->viewer_attached) {
-               uatomic_set(&session->new_streams, 1);
+       if (!session->viewer_attached) {
+               goto unlock;
+       }
+
+       /*
+        * Create viewer_streams for all the newly published streams for this relay session.
+        * This searches through all known viewer sessions and finds those that are
+        * attached to this connection's relay session. This is done so that the newer
+        * viewer streams will hold a reference on any relay streams that already exist,
+        * but may be unpublished between now and the next GET_NEW_STREAMS from the
+        * attached live viewer.
+        */
+       for (auto *viewer_session: lttng::urcu::lfht_iteration_adapter<relay_viewer_session,
+                    decltype(relay_viewer_session::viewer_session_n),
+                    &relay_viewer_session::viewer_session_n>(*viewer_sessions_ht->ht))
+       {
+               for (auto *session_iter: lttng::urcu::rcu_list_iteration_adapter<relay_session,
+                            &relay_session::viewer_session_node>(viewer_session->session_list))
+               {
+                       if (session != session_iter) {
+                               continue;
+                       }
+                       const int ret = make_viewer_streams(session,
+                                                           viewer_session,
+                                                           LTTNG_VIEWER_SEEK_BEGINNING,
+                                                           nullptr,
+                                                           nullptr,
+                                                           &created,
+                                                           &closed);
+                       if (ret == 0) {
+                               DBG("Created %d new viewer streams during publication of relay streams for relay session %" PRIu64,
+                                   created,
+                                   session->id);
+                       } else if (ret < 0) {
+                               /*
+                                * Warning, since the creation of the
+                                * streams will be retried when the viewer
+                                * next sends the GET_NEW_STREAMS again.
+                                */
+                               WARN("Failed to create new viewer streams during publication of relay streams for relay session %" PRIu64
+                                    ", ret=%d, created=%d, closed=%d",
+                                    session->id,
+                                    ret,
+                                    created,
+                                    closed);
+                       }
+               }
        }
+unlock:
+       uatomic_set(&session->new_streams, 1);
+       pthread_mutex_unlock(&session->lock);
 }
 
 static int conform_channel_path(char *channel_path)
@@ -4393,6 +4451,13 @@ int main(int argc, char **argv)
                goto exit_options;
        }
 
+       /* tables of viewer sessions indexed by session ID */
+       viewer_sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+       if (!viewer_sessions_ht) {
+               retval = -1;
+               goto exit_options;
+       }
+
        /* tables of streams indexed by stream ID */
        viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!viewer_streams_ht) {
index 9b6640f6b5547a283ba47a3ad467af8685eee39e..1ed625f75d016e582ac74237f1f808428151a17d 100644 (file)
@@ -9,6 +9,7 @@
 
 #define _LGPL_SOURCE
 #include "ctf-trace.hpp"
+#include "live.hpp"
 #include "lttng-relayd.hpp"
 #include "session.hpp"
 #include "stream.hpp"
 
 #include <urcu/rculist.h>
 
+/* Global session id used in the session creation. */
+static uint64_t last_viewer_session_id;
+static pthread_mutex_t last_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER;
+
 struct relay_viewer_session *viewer_session_create()
 {
        struct relay_viewer_session *vsession;
@@ -28,7 +33,14 @@ struct relay_viewer_session *viewer_session_create()
        if (!vsession) {
                goto end;
        }
+       pthread_mutex_lock(&last_viewer_session_id_lock);
+       vsession->id = ++last_viewer_session_id;
+       pthread_mutex_unlock(&last_viewer_session_id_lock);
        CDS_INIT_LIST_HEAD(&vsession->session_list);
+       CDS_INIT_LIST_HEAD(&vsession->unannounced_stream_list);
+       pthread_mutex_init(&vsession->unannounced_stream_list_lock, nullptr);
+       lttng_ht_node_init_u64(&vsession->viewer_session_n, vsession->id);
+       lttng_ht_add_unique_u64(viewer_sessions_ht, &vsession->viewer_session_n);
 end:
        return vsession;
 }
@@ -98,6 +110,40 @@ enum lttng_viewer_attach_return_code viewer_session_attach(struct relay_viewer_s
                /* Ownership is transfered to the list. */
                cds_list_add_rcu(&session->viewer_session_node, &vsession->session_list);
                pthread_mutex_unlock(&vsession->session_list_lock);
+
+               /*
+                * Immediately create new viewer streams for the attached session
+                * so that the viewer streams hold a reference on the any relay
+                * streams that could be unpublished between now and the next
+                * GET_NEW_STREAMS command from the live viewer.
+                */
+               uint32_t created = 0;
+               uint32_t total = 0;
+               uint32_t unsent = 0;
+               bool closed = false;
+               const int make_viewer_streams_ret = make_viewer_streams(session,
+                                                                 vsession,
+                                                                 LTTNG_VIEWER_SEEK_BEGINNING,
+                                                                 &total,
+                                                                 &unsent,
+                                                                 &created,
+                                                                 &closed);
+
+               if (make_viewer_streams_ret == 0) {
+                       DBG("Created %d new viewer streams while attaching to relay session %" PRIu64, created, session->id);
+               } else {
+                       /*
+                        * Warning, since the creation of the streams will be retried when
+                        * the viewer next sends the GET_NEW_STREAMS commands.
+                        */
+                       WARN("Failed to create new viewer streams while attaching to relay session %" PRIu64 ", ret=%d, total=%d, unsent=%d, created=%d, closed=%d",
+                            session->id,
+                            make_viewer_streams_ret,
+                            total,
+                            unsent,
+                            created,
+                            closed);
+               }
        } else {
                /* Put our local ref. */
                session_put(session);
@@ -126,6 +172,7 @@ static int viewer_session_detach(struct relay_viewer_session *vsession,
                /* Release reference held by the list. */
                session_put(session);
        }
+
        /* Safe since we know the session exists. */
        pthread_mutex_unlock(&session->lock);
        return ret;
@@ -133,6 +180,12 @@ static int viewer_session_detach(struct relay_viewer_session *vsession,
 
 void viewer_session_destroy(struct relay_viewer_session *vsession)
 {
+       struct lttng_ht_iter iter;
+
+       LTTNG_ASSERT(cds_list_empty(&vsession->unannounced_stream_list));
+
+       iter.iter.node = &vsession->viewer_session_n.node;
+       lttng_ht_del(viewer_sessions_ht, &iter);
        lttng_trace_chunk_put(vsession->current_trace_chunk);
        free(vsession);
 }
@@ -155,6 +208,7 @@ void viewer_session_close_one_session(struct relay_viewer_session *vsession,
                if (!viewer_stream_get(vstream)) {
                        continue;
                }
+
                if (vstream->stream->trace->session != session) {
                        viewer_stream_put(vstream);
                        continue;
@@ -169,6 +223,25 @@ void viewer_session_close_one_session(struct relay_viewer_session *vsession,
                viewer_stream_put(vstream);
        }
 
+       for (auto *vstream: lttng::urcu::rcu_list_iteration_adapter<relay_viewer_stream,
+                    &relay_viewer_stream::viewer_stream_node>(vsession->unannounced_stream_list))
+       {
+               if (!viewer_stream_get(vstream)) {
+                       continue;
+               }
+               if (vstream->stream->trace->session != session) {
+                       viewer_stream_put(vstream);
+                       continue;
+               }
+               pthread_mutex_lock(&vsession->unannounced_stream_list_lock);
+               cds_list_del_rcu(&vstream->viewer_stream_node);
+               pthread_mutex_unlock(&vsession->unannounced_stream_list_lock);
+               /* Local reference */
+               viewer_stream_put(vstream);
+               /* Reference from unannounced_stream_list */
+               viewer_stream_put(vstream);
+       }
+
        lttng_trace_chunk_put(vsession->current_trace_chunk);
        vsession->current_trace_chunk = nullptr;
        viewer_session_detach(vsession, session);
index 3dfa3acea809e1ccaf8dfa2c4c1006d0d082a6d9..728676b9509f417ed7f2b899cc2ed44d56cd70c8 100644 (file)
 #include <urcu/ref.h>
 
 struct relay_viewer_session {
+       /*
+        * The id of the relay viewer session. Uses the associated connection's socket FD.
+        */
+       uint64_t id;
        /*
         * Session list. Updates are protected by the session_list_lock.
         * Traversals are protected by RCU.
@@ -31,6 +35,20 @@ struct relay_viewer_session {
         */
        struct cds_list_head session_list; /* RCU list. */
        pthread_mutex_t session_list_lock; /* Protects list updates. */
+       /*
+        * Unannounced stream list. Updates are protected by the
+        * unannounced_stream_list_lock. This lock nests inside
+        * the following locks (in order): relay session, ctf_trace,
+        * and relay stream.
+        *
+        * Traversals are protected by RCU.
+        */
+       struct cds_list_head unannounced_stream_list;
+       pthread_mutex_t unannounced_stream_list_lock;
+       /*
+        * Node in the global viewer sessions hashtable.
+        */
+       struct lttng_ht_node_u64 viewer_session_n;
        /*
         * The viewer session's current trace chunk is initially set, when
         * a viewer attaches to the viewer session, to a copy the corresponding
index 06657174371cf9ac0d1c9dd1a5f333b979a7419d..ef0dfbdd36ea7c2a9f7d1d785b597a8802dace86 100644 (file)
@@ -69,7 +69,6 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                PERROR("relay viewer stream zmalloc");
                goto error;
        }
-
        if (trace_chunk) {
                const bool acquired_reference = lttng_trace_chunk_get(trace_chunk);
 
@@ -211,7 +210,6 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
        lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
        urcu_ref_init(&vstream->ref);
        lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
-
        return vstream;
 
 error:
index ea6292b2cee78074f641d1c044bca7cdac5aedd7..977a7cfa92ddb01f411fd982710a591e5708800f 100644 (file)
@@ -39,6 +39,13 @@ struct relay_viewer_stream {
        /* Back ref to stream. */
        struct relay_stream *stream;
 
+       /*
+        * Member of unannounced_stream_list in struct viewer_sesion.
+        * Updates are protected by the unannounced_stream_list_lock, and
+        * traversals are protected by RCU.
+        */
+       struct cds_list_head viewer_stream_node;
+
        struct {
                struct fs_handle *handle;
                struct lttng_trace_chunk *trace_chunk;
This page took 0.036358 seconds and 4 git commands to generate.