+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+ if (stream->trace) {
+ ctf_trace_put(stream->trace);
+ stream->trace = NULL;
+ }
+
+ call_rcu(&stream->rcu_node, stream_destroy_rcu);
+}
+
+void stream_put(struct relay_stream *stream)
+{
+ DBG("stream put for stream id %" PRIu64, stream->stream_handle);
+ /*
+ * Ensure existence of stream->reflock for stream unlock.
+ */
+ rcu_read_lock();
+ /*
+ * Stream reflock ensures that concurrent test and update of
+ * stream ref is atomic.
+ */
+ pthread_mutex_lock(&stream->reflock);
+ assert(stream->ref.refcount != 0);
+ /*
+ * Wait until we have processed all the stream packets before
+ * actually putting our last stream reference.
+ */
+ DBG("stream put stream id %" PRIu64 " refcount %d",
+ stream->stream_handle,
+ (int) stream->ref.refcount);
+ urcu_ref_put(&stream->ref, stream_release);
+ pthread_mutex_unlock(&stream->reflock);
+ rcu_read_unlock();
+}
+
+void try_stream_close(struct relay_stream *stream)
+{
+ DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+ pthread_mutex_lock(&stream->lock);
+ /*
+ * Can be called concurently by connection close and reception of last
+ * pending data.
+ */
+ if (stream->closed) {
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+ return;
+ }
+
+ stream->close_requested = true;
+ /*
+ * We shortcut the data pending check if no bound is known for this
+ * stream. This prevents us from never closing the stream in the case
+ * where a connection would be closed before a "close" command has
+ * been received.
+ *
+ * TODO
+ * This still leaves open the question of handling missing data after
+ * a bound has been set by a stream close command. Since we have no
+ * way of pairing data and control connection, and that a data
+ * connection has no ownership of a stream, it is likely that a
+ * timeout approach would be appropriate to handle dangling streams.
+ */
+ if (stream->last_net_seq_num != -1ULL &&
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+ /* Don't close since we still have data pending. */
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+ return;
+ }
+ stream_unpublish(stream);
+ stream->closed = true;
+ /* Relay indexes are only used by the "consumer/sessiond" end. */
+ relay_index_close_all(stream);
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
+ stream_put(stream);
+}
+
+void print_relay_streams(void)
+{
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
+ " session %" PRIu64,
+ stream,
+ stream->ref.refcount,
+ stream->stream_handle,
+ stream->trace->id,
+ stream->trace->session->id);
+ stream_put(stream);
+ }
+ rcu_read_unlock();