From: Mathieu Desnoyers Date: Wed, 9 Sep 2015 15:56:34 +0000 (-0400) Subject: Fix: relayd: handle consumerd crashes without leak X-Git-Tag: v2.8.0-rc1~360 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=3d07a857948f868354589ff742b0a2f6277f558f;p=lttng-tools.git Fix: relayd: handle consumerd crashes without leak We can be clever about indexes partially received in cases where we received the data socket part, but not the control socket part: since we're currently closing the stream on behalf of the control socket, we *know* there won't be any more control information for this socket. Therefore, we can destroy all indexes for which we have received only the file descriptor (from data socket). This takes care of consumerd crashes between sending the data and control information for a packet. Since those are sent in that order, we take care of consumerd crashes. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index 7182e36cc..cb7ae3db9 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -333,3 +333,42 @@ void relay_index_close_all(struct relay_stream *stream) } rcu_read_unlock(); } + +void relay_index_close_partial_fd(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + if (!index->index_fd) { + continue; + } + /* + * Partial index has its index_fd: we have only + * received its info from the data socket. + * Put self-ref from index. + */ + relay_index_put(index); + } + rcu_read_unlock(); +} + +uint64_t relay_index_find_last(struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_index *index; + uint64_t net_seq_num = -1ULL; + + rcu_read_lock(); + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + if (net_seq_num == -1ULL || + index->index_n.key > net_seq_num) { + net_seq_num = index->index_n.key; + } + } + rcu_read_unlock(); + return net_seq_num; +} diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h index e882ed97d..15c4ac8cd 100644 --- a/src/bin/lttng-relayd/index.h +++ b/src/bin/lttng-relayd/index.h @@ -71,5 +71,7 @@ int relay_index_set_data(struct relay_index *index, int relay_index_try_flush(struct relay_index *index); void relay_index_close_all(struct relay_stream *stream); +void relay_index_close_partial_fd(struct relay_stream *stream); +uint64_t relay_index_find_last(struct relay_stream *stream); #endif /* _RELAY_INDEX_H */ diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 2a59d1ed3..cac87635e 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -354,26 +354,59 @@ void try_stream_close(struct relay_stream *stream) } stream->close_requested = true; - /* - * We shortcut the data pending check if no bound is known for this - * stream. This prevents us from never closing the stream in the case - * where a connection would be closed before a "close" command has - * been received. - * - * TODO - * This still leaves open the question of handling missing data after - * a bound has been set by a stream close command. Since we have no - * way of pairing data and control connection, and that a data - * connection has no ownership of a stream, it is likely that a - * timeout approach would be appropriate to handle dangling streams. - */ + + if (stream->last_net_seq_num == -1ULL) { + /* + * Handle connection close without explicit stream close + * command. + * + * We can be clever about indexes partially received in + * cases where we received the data socket part, but not + * the control socket part: since we're currently closing + * the stream on behalf of the control socket, we *know* + * there won't be any more control information for this + * socket. Therefore, we can destroy all indexes for + * which we have received only the file descriptor (from + * data socket). This takes care of consumerd crashes + * between sending the data and control information for + * a packet. Since those are sent in that order, we take + * care of consumerd crashes. + */ + relay_index_close_partial_fd(stream); + /* + * Use the highest net_seq_num we currently have pending + * As end of stream indicator. Leave last_net_seq_num + * at -1ULL if we cannot find any index. + */ + stream->last_net_seq_num = relay_index_find_last(stream); + /* Fall-through into the next check. */ + } + if (stream->last_net_seq_num != -1ULL && ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) { - /* Don't close since we still have data pending. */ + /* + * Don't close since we still have data pending. This + * handles cases where an explicit close command has + * been received for this stream, and cases where the + * connection has been closed, and we are awaiting for + * index information from the data socket. It is + * therefore expected that all the index fd information + * we need has already been received on the control + * socket. Matching index information from data socket + * should be Expected Soon(TM). + * + * TODO: We should implement a timer to garbage collect + * streams after a timeout to be resilient against a + * consumerd implementation that would not match this + * expected behavior. + */ pthread_mutex_unlock(&stream->lock); DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle); return; } + /* + * We received all the indexes we can expect. + */ stream_unpublish(stream); stream->closed = true; /* Relay indexes are only used by the "consumer/sessiond" end. */