Fix: relayd streams can be leaked on connection error
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Sat, 28 Apr 2018 00:06:08 +0000 (20:06 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 1 May 2018 18:01:42 +0000 (14:01 -0400)
There are cases where a connection error can cause streams to be
leaked.

For instance, the control connection could receive an index and
close. Since a packet is in-flight, the stream corresponding to
that index will not close. However, nothing guarantees that
the data connection will be able to receive the packet's data.

If the protocol is respected, this is not a problem. However,
a buggy consumerd or network errors can cause the streams to
remain in the "data in-flight" state and never close.

To mitigate a case observed in the field where a consumerd
would be forcibly closed (network interface brought down) and
cause leaks on the relay daemon, the session is aborted whenever
the control or data connection encounters an error. Aborting
a session causes the streams to be closed regardless of the
fact that data is in-flight.

Currently, only the control connection holds an ownership of
the session object. This can cause the following scenario to leak
streams:

1) Control connection receives an index
  - Stream is put in "in-flight data" mode
2) Control connection is closed/shutdown cleanly
  - try_stream_close refuses to close the stream as data is in-flight,
    but it puts the stream in "closed" mode. When the data is
    received, the stream will be closed as soon as possible.
3) Data connection closes cleanly or due to an error
  - The stream "closing" condition will never be re-evaluated.

Since the data connection has no ownership of the session, it can
never clean-up the streams that are waiting for "in-flight" data to
arrive before closing.

This patch lazily associates the data connection to its session
so that the session can be aborted whenever an error happens on
either the data or control connection.

Note that this leaves the relayd vulnerable to a case which will
still leak. If the control connection receives an index and closes
cleanly, the data connection could have never been established
with the consumer daemon and result in a leak.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/connection.c
src/bin/lttng-relayd/connection.h
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/session.c

index dab794147f5e37ac773f1a8c419e16374f02af88..eeaa0a559c74d3f7aa1ee6d5ffdfa146498ec195 100644 (file)
@@ -176,3 +176,27 @@ void connection_ht_add(struct lttng_ht *relay_connections_ht,
        conn->in_socket_ht = 1;
        conn->socket_ht = relay_connections_ht;
 }
+
+int connection_set_session(struct relay_connection *conn,
+               struct relay_session *session)
+{
+       int ret = 0;
+
+       assert(conn);
+       assert(session);
+       assert(!conn->session);
+
+       if (connection_get(conn)) {
+               if (session_get(session)) {
+                       conn->session = session;
+               } else {
+                       ERR("Failed to get session reference in connection_set_session()");
+                       ret = -1;
+               }
+               connection_put(conn);
+       } else {
+               ERR("Failed to get connection reference in connection_set_session()");
+               ret = -1;
+       }
+       return ret;
+}
index 444195dff48cb955cc906c5f262bb8343d48cd8f..1b6744b6c7598eda01b7e4964b21dd742a66fb3a 100644 (file)
@@ -148,5 +148,7 @@ bool connection_get(struct relay_connection *connection);
 void connection_put(struct relay_connection *connection);
 void connection_ht_add(struct lttng_ht *relay_connections_ht,
                struct relay_connection *conn);
+int connection_set_session(struct relay_connection *conn,
+               struct relay_session *session);
 
 #endif /* _CONNECTION_H */
index 569a3a4707862c8fa1cc51c2c4ee575615c6ad8f..cb4643b76f792b3555de1a65174cdf0d73410a06 100644 (file)
@@ -3344,10 +3344,14 @@ static enum relay_connection_status relay_process_data_receive_payload(
        uint64_t left_to_receive = state->left_to_receive;
        struct relay_session *session;
 
+       DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+                       state->header.stream_id, state->header.net_seq_num,
+                       state->received, left_to_receive);
+
        stream = stream_get_by_id(state->header.stream_id);
        if (!stream) {
                /* Protocol error. */
-               DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
+               ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
                                state->header.stream_id);
                status = RELAY_CONNECTION_STATUS_ERROR;
                goto end;
@@ -3355,10 +3359,13 @@ static enum relay_connection_status relay_process_data_receive_payload(
 
        pthread_mutex_lock(&stream->lock);
        session = stream->trace->session;
-
-       DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
-                       state->header.stream_id, state->header.net_seq_num,
-                       state->received, left_to_receive);
+       if (!conn->session) {
+               ret = connection_set_session(conn, session);
+               if (ret) {
+                       status = RELAY_CONNECTION_STATUS_ERROR;
+                       goto end_stream_unlock;
+               }
+       }
 
        /*
         * The size of the "chunk" received on any iteration is bounded by:
@@ -3686,6 +3693,26 @@ restart:
 
                                        status = relay_process_control(ctrl_conn);
                                        if (status != RELAY_CONNECTION_STATUS_OK) {
+                                               /*
+                                                * On socket error flag the session as aborted to force
+                                                * the cleanup of its stream otherwise it can leak
+                                                * during the lifetime of the relayd.
+                                                *
+                                                * This prevents situations in which streams can be
+                                                * left opened because an index was received, the
+                                                * control connection is closed, and the data
+                                                * connection is closed (uncleanly) before the packet's
+                                                * data provided.
+                                                *
+                                                * Since the control connection encountered an error,
+                                                * it is okay to be conservative and close the
+                                                * session right now as we can't rely on the protocol
+                                                * being respected anymore.
+                                                */
+                                               if (status == RELAY_CONNECTION_STATUS_ERROR) {
+                                                       session_abort(ctrl_conn->session);
+                                               }
+
                                                /* Clear the connection on error or close. */
                                                relay_thread_close_connection(&events,
                                                                pollfd,
@@ -3765,6 +3792,25 @@ restart:
                                status = relay_process_data(data_conn);
                                /* Connection closed or error. */
                                if (status != RELAY_CONNECTION_STATUS_OK) {
+                                       /*
+                                        * On socket error flag the session as aborted to force
+                                        * the cleanup of its stream otherwise it can leak
+                                        * during the lifetime of the relayd.
+                                        *
+                                        * This prevents situations in which streams can be
+                                        * left opened because an index was received, the
+                                        * control connection is closed, and the data
+                                        * connection is closed (uncleanly) before the packet's
+                                        * data provided.
+                                        *
+                                        * Since the data connection encountered an error,
+                                        * it is okay to be conservative and close the
+                                        * session right now as we can't rely on the protocol
+                                        * being respected anymore.
+                                        */
+                                       if (status == RELAY_CONNECTION_STATUS_ERROR) {
+                                               session_abort(data_conn->session);
+                                       }
                                        relay_thread_close_connection(&events, pollfd,
                                                        data_conn);
                                        /*
@@ -3803,9 +3849,7 @@ error:
                        sock_n.node) {
                health_code_update();
 
-               if (session_abort(destroy_conn->session)) {
-                       assert(0);
-               }
+               session_abort(destroy_conn->session);
 
                /*
                 * No need to grab another ref, because we own
index 3ea8e50d6f9aaf60d64789069636c9ea13736e50..42c29aeb0b4816f5d83437f14aa6a4f3631762aa 100644 (file)
@@ -182,16 +182,8 @@ int session_close(struct relay_session *session)
        pthread_mutex_lock(&session->lock);
        DBG("closing session %" PRIu64 ": is conn already closed %d",
                        session->id, session->connection_closed);
-       if (session->connection_closed) {
-               ret = -1;
-               goto unlock;
-       }
        session->connection_closed = true;
-unlock:
        pthread_mutex_unlock(&session->lock);
-       if (ret) {
-               return ret;
-       }
 
        rcu_read_lock();
        cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
@@ -226,13 +218,7 @@ int session_abort(struct relay_session *session)
 
        pthread_mutex_lock(&session->lock);
        DBG("aborting session %" PRIu64, session->id);
-       if (session->aborted) {
-               ERR("session %" PRIu64 " is already aborted", session->id);
-               ret = -1;
-               goto unlock;
-       }
        session->aborted = true;
-unlock:
        pthread_mutex_unlock(&session->lock);
        return ret;
 }
This page took 0.034081 seconds and 4 git commands to generate.