relayd: move relay_session locking outside of make_viewer_streams
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 19 Sep 2019 17:22:17 +0000 (13:22 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 19 Sep 2019 18:33:21 +0000 (14:33 -0400)
The locking period of the relay_session is slightly extended outside
of make_viewer_streams(). This makes a follow-up change easier to
review, but is also a clean-up cleaner as some of the session's
attribute are accessed without holding the lock (`live_timer` and
`id`) assuming (correctly, for the moment) that they don't change over
the lifetime of the relayd_session. It also allows us to move the
relayd_session locking logic from viewer_session_attach().

The signature of send_viewer_streams() is changed to accept the
session's id rather than a pointer to the relay_session, therefore
making the locking scope simpler to follow. Changing this makes
follow-up changes easier to review.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/viewer-session.c

index 84fa1435196057c6a506b019eedf4bcf477da49b..dc8238c9499b876de333a7c313e1e95135e847aa 100644 (file)
@@ -197,7 +197,7 @@ end:
  */
 static
 ssize_t send_viewer_streams(struct lttcomm_sock *sock,
-               struct relay_session *session, unsigned int ignore_sent_flag)
+               uint64_t session_id, unsigned int ignore_sent_flag)
 {
        ssize_t ret;
        struct lttng_viewer_stream send_stream;
@@ -218,7 +218,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock,
 
                pthread_mutex_lock(&vstream->stream->lock);
                /* Ignore if not the same session. */
-               if (vstream->stream->trace->session->id != session->id ||
+               if (vstream->stream->trace->session->id != session_id ||
                                (!ignore_sent_flag && vstream->sent_flag)) {
                        pthread_mutex_unlock(&vstream->stream->lock);
                        viewer_stream_put(vstream);
@@ -271,6 +271,9 @@ end_unlock:
  * viewer stream of the session, the number of unsent stream and the number of
  * stream created. Those counters can be NULL and thus will be ignored.
  *
+ * session must be locked to ensure that we see either none or all initial
+ * streams for a session, but no intermediate state..
+ *
  * Return 0 on success or else a negative value.
  */
 static
@@ -283,12 +286,7 @@ int make_viewer_streams(struct relay_session *session,
        struct ctf_trace *ctf_trace;
 
        assert(session);
-
-       /*
-        * Hold the session lock to ensure that we see either none or
-        * all initial streams for a session, but no intermediate state.
-        */
-       pthread_mutex_lock(&session->lock);
+       ASSERT_LOCKED(session->lock);
 
        if (session->connection_closed) {
                *closed = true;
@@ -376,7 +374,6 @@ int make_viewer_streams(struct relay_session *session,
 
 error_unlock:
        rcu_read_unlock();
-       pthread_mutex_unlock(&session->lock);
        return ret;
 }
 
@@ -946,8 +943,10 @@ int viewer_get_new_streams(struct relay_connection *conn)
        send_streams = 1;
        response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
 
+       pthread_mutex_lock(&session->lock);
        ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
                        &nb_created, &closed);
+       pthread_mutex_unlock(&session->lock);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -988,7 +987,7 @@ send_reply:
         * streams that were not sent from that point will be sent to
         * the viewer.
         */
-       ret = send_viewer_streams(conn->sock, session, 0);
+       ret = send_viewer_streams(conn->sock, session_id, 0);
        if (ret < 0) {
                goto end_put_session;
        }
@@ -1015,6 +1014,7 @@ int viewer_attach_session(struct relay_connection *conn)
        struct lttng_viewer_attach_session_response response;
        struct relay_session *session = NULL;
        bool closed = false;
+       uint64_t session_id;
 
        assert(conn);
 
@@ -1026,6 +1026,7 @@ int viewer_attach_session(struct relay_connection *conn)
                goto error;
        }
 
+       session_id = be64toh(request.session_id);
        health_code_update();
 
        memset(&response, 0, sizeof(response));
@@ -1036,16 +1037,15 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
-       session = session_get_by_id(be64toh(request.session_id));
+       session = session_get_by_id(session_id);
        if (!session) {
-               DBG("Relay session %" PRIu64 " not found",
-                               (uint64_t) be64toh(request.session_id));
+               DBG("Relay session %" PRIu64 " not found", session_id);
                response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
-       DBG("Attach session ID %" PRIu64 " received",
-               (uint64_t) be64toh(request.session_id));
+       DBG("Attach session ID %" PRIu64 " received", session_id);
 
+       pthread_mutex_lock(&session->lock);
        if (session->live_timer == 0) {
                DBG("Not live session");
                response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
@@ -1078,8 +1078,12 @@ int viewer_attach_session(struct relay_connection *conn)
        if (ret < 0) {
                goto end_put_session;
        }
-       response.streams_count = htobe32(nb_streams);
 
+       pthread_mutex_unlock(&session->lock);
+       session_put(session);
+       session = NULL;
+
+       response.streams_count = htobe32(nb_streams);
        /*
         * If the session is closed when the viewer is attaching, it
         * means some of the streams may have been concurrently removed,
@@ -1111,13 +1115,14 @@ send_reply:
        }
 
        /* Send stream and ignore the sent flag. */
-       ret = send_viewer_streams(conn->sock, session, 1);
+       ret = send_viewer_streams(conn->sock, session_id, 1);
        if (ret < 0) {
                goto end_put_session;
        }
 
 end_put_session:
        if (session) {
+               pthread_mutex_unlock(&session->lock);
                session_put(session);
        }
 error:
index a4b859726a6ea9b9bc7ef86ac5a019f43e29bdc2..5f9c7c5769f799d5ff101446b730b06c340b5f4f 100644 (file)
@@ -47,12 +47,13 @@ int viewer_session_attach(struct relay_viewer_session *vsession,
 {
        int ret = 0;
 
+       ASSERT_LOCKED(session->lock);
+
        /* Will not fail, as per the ownership guarantee. */
        if (!session_get(session)) {
                ret = -1;
                goto end;
        }
-       pthread_mutex_lock(&session->lock);
        if (session->viewer_attached) {
                ret = -1;
        } else {
@@ -69,8 +70,6 @@ int viewer_session_attach(struct relay_viewer_session *vsession,
                /* Put our local ref. */
                session_put(session);
        }
-       /* Safe since we know the session exists. */
-       pthread_mutex_unlock(&session->lock);
 end:
        return ret;
 }
This page took 0.028732 seconds and 4 git commands to generate.