}
rcu_read_unlock();
}
+
+void relay_index_close_partial_fd(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ if (!index->index_fd) {
+ continue;
+ }
+ /*
+ * Partial index has its index_fd: we have only
+ * received its info from the data socket.
+ * Put self-ref from index.
+ */
+ relay_index_put(index);
+ }
+ rcu_read_unlock();
+}
+
+uint64_t relay_index_find_last(struct relay_stream *stream)
+{
+ struct lttng_ht_iter iter;
+ struct relay_index *index;
+ uint64_t net_seq_num = -1ULL;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
+ index, index_n.node) {
+ if (net_seq_num == -1ULL ||
+ index->index_n.key > net_seq_num) {
+ net_seq_num = index->index_n.key;
+ }
+ }
+ rcu_read_unlock();
+ return net_seq_num;
+}
}
stream->close_requested = true;
- /*
- * We shortcut the data pending check if no bound is known for this
- * stream. This prevents us from never closing the stream in the case
- * where a connection would be closed before a "close" command has
- * been received.
- *
- * TODO
- * This still leaves open the question of handling missing data after
- * a bound has been set by a stream close command. Since we have no
- * way of pairing data and control connection, and that a data
- * connection has no ownership of a stream, it is likely that a
- * timeout approach would be appropriate to handle dangling streams.
- */
+
+ if (stream->last_net_seq_num == -1ULL) {
+ /*
+ * Handle connection close without explicit stream close
+ * command.
+ *
+ * We can be clever about indexes partially received in
+ * cases where we received the data socket part, but not
+ * the control socket part: since we're currently closing
+ * the stream on behalf of the control socket, we *know*
+ * there won't be any more control information for this
+ * socket. Therefore, we can destroy all indexes for
+ * which we have received only the file descriptor (from
+ * data socket). This takes care of consumerd crashes
+ * between sending the data and control information for
+ * a packet. Since those are sent in that order, we take
+ * care of consumerd crashes.
+ */
+ relay_index_close_partial_fd(stream);
+ /*
+ * Use the highest net_seq_num we currently have pending
+ * As end of stream indicator. Leave last_net_seq_num
+ * at -1ULL if we cannot find any index.
+ */
+ stream->last_net_seq_num = relay_index_find_last(stream);
+ /* Fall-through into the next check. */
+ }
+
if (stream->last_net_seq_num != -1ULL &&
((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
- /* Don't close since we still have data pending. */
+ /*
+ * Don't close since we still have data pending. This
+ * handles cases where an explicit close command has
+ * been received for this stream, and cases where the
+ * connection has been closed, and we are awaiting for
+ * index information from the data socket. It is
+ * therefore expected that all the index fd information
+ * we need has already been received on the control
+ * socket. Matching index information from data socket
+ * should be Expected Soon(TM).
+ *
+ * TODO: We should implement a timer to garbage collect
+ * streams after a timeout to be resilient against a
+ * consumerd implementation that would not match this
+ * expected behavior.
+ */
pthread_mutex_unlock(&stream->lock);
DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
return;
}
+ /*
+ * We received all the indexes we can expect.
+ */
stream_unpublish(stream);
stream->closed = true;
/* Relay indexes are only used by the "consumer/sessiond" end. */