Fix: relayd: handle consumerd crashes without leak
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Sep 2015 15:56:34 +0000 (11:56 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 17 Sep 2015 00:52:28 +0000 (20:52 -0400)
We can be clever about indexes partially received in cases where we
received the data socket part, but not the control socket part: since
we're currently closing the stream on behalf of the control socket, we
*know* there won't be any more control information for this socket.
Therefore, we can destroy all indexes for which we have received only
the file descriptor (from data socket). This takes care of consumerd
crashes between sending the data and control information for a packet.
Since those are sent in that order, we take care of consumerd crashes.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/index.c
src/bin/lttng-relayd/index.h
src/bin/lttng-relayd/stream.c

index 7182e36cc8e461daf666d6dbc32079dd11b7e5f3..cb7ae3db966e34ae3205c00a4f04f15853f3e247 100644 (file)
@@ -333,3 +333,42 @@ void relay_index_close_all(struct relay_stream *stream)
        }
        rcu_read_unlock();
 }
+
+void relay_index_close_partial_fd(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               if (!index->index_fd) {
+                       continue;
+               }
+               /*
+                * Partial index has its index_fd: we have only
+                * received its info from the data socket.
+                * Put self-ref from index.
+                */
+               relay_index_put(index);
+       }
+       rcu_read_unlock();
+}
+
+uint64_t relay_index_find_last(struct relay_stream *stream)
+{
+       struct lttng_ht_iter iter;
+       struct relay_index *index;
+       uint64_t net_seq_num = -1ULL;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+                       index, index_n.node) {
+               if (net_seq_num == -1ULL ||
+                               index->index_n.key > net_seq_num) {
+                       net_seq_num = index->index_n.key;
+               }
+       }
+       rcu_read_unlock();
+       return net_seq_num;
+}
index e882ed97d2552b71225fee0517698fcdf0143475..15c4ac8cd8201b906e458a4e819416004a9a1d1e 100644 (file)
@@ -71,5 +71,7 @@ int relay_index_set_data(struct relay_index *index,
 int relay_index_try_flush(struct relay_index *index);
 
 void relay_index_close_all(struct relay_stream *stream);
+void relay_index_close_partial_fd(struct relay_stream *stream);
+uint64_t relay_index_find_last(struct relay_stream *stream);
 
 #endif /* _RELAY_INDEX_H */
index 2a59d1ed3e166edd319209ea97065eebf81e3876..cac87635ed46c2044f72c44e0e99940c6af1af17 100644 (file)
@@ -354,26 +354,59 @@ void try_stream_close(struct relay_stream *stream)
        }
 
        stream->close_requested = true;
-       /*
-        * We shortcut the data pending check if no bound is known for this
-        * stream. This prevents us from never closing the stream in the case
-        * where a connection would be closed before a "close" command has
-        * been received.
-        *
-        * TODO
-        * This still leaves open the question of handling missing data after
-        * a bound has been set by a stream close command. Since we have no
-        * way of pairing data and control connection, and that a data
-        * connection has no ownership of a stream, it is likely that a
-        * timeout approach would be appropriate to handle dangling streams.
-        */
+
+       if (stream->last_net_seq_num == -1ULL) {
+               /*
+                * Handle connection close without explicit stream close
+                * command.
+                *
+                * We can be clever about indexes partially received in
+                * cases where we received the data socket part, but not
+                * the control socket part: since we're currently closing
+                * the stream on behalf of the control socket, we *know*
+                * there won't be any more control information for this
+                * socket. Therefore, we can destroy all indexes for
+                * which we have received only the file descriptor (from
+                * data socket). This takes care of consumerd crashes
+                * between sending the data and control information for
+                * a packet. Since those are sent in that order, we take
+                * care of consumerd crashes.
+                */
+               relay_index_close_partial_fd(stream);
+               /*
+                * Use the highest net_seq_num we currently have pending
+                * As end of stream indicator.  Leave last_net_seq_num
+                * at -1ULL if we cannot find any index.
+                */
+               stream->last_net_seq_num = relay_index_find_last(stream);
+               /* Fall-through into the next check. */
+       }
+
        if (stream->last_net_seq_num != -1ULL &&
                        ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
-               /* Don't close since we still have data pending. */
+               /*
+                * Don't close since we still have data pending. This
+                * handles cases where an explicit close command has
+                * been received for this stream, and cases where the
+                * connection has been closed, and we are awaiting for
+                * index information from the data socket. It is
+                * therefore expected that all the index fd information
+                * we need has already been received on the control
+                * socket. Matching index information from data socket
+                * should be Expected Soon(TM).
+                *
+                * TODO: We should implement a timer to garbage collect
+                * streams after a timeout to be resilient against a
+                * consumerd implementation that would not match this
+                * expected behavior.
+                */
                pthread_mutex_unlock(&stream->lock);
                DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
                return;
        }
+       /*
+        * We received all the indexes we can expect.
+        */
        stream_unpublish(stream);
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
This page took 0.061701 seconds and 4 git commands to generate.