cds_list_for_each_entry_rcu(stream, &trace->stream_list,
stream_node) {
/*
- * Close the stream.
+ * Close stream since the connection owning the trace is being
+ * torn down.
*/
- stream_close(stream);
+ try_stream_close(stream);
}
rcu_read_unlock();
/*
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
pthread_mutex_unlock(&stream->lock);
- stream_close(stream);
-
+ /*
+ * This is one of the conditions which may trigger a stream close
+ * with the others being:
+ * 1) A close command is received for a stream
+ * 2) The control connection owning the stream is closed
+ * 3) We have received all of the stream's data _after_ a close
+ * request.
+ */
+ try_stream_close(stream);
if (stream->is_metadata) {
struct relay_viewer_stream *vstream;
uint64_t net_seq_num;
uint32_t data_size;
struct relay_session *session;
- bool new_stream = false;
+ bool new_stream = false, close_requested = false;
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
stream->prev_seq = net_seq_num;
end_stream_unlock:
+ close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
+ if (close_requested) {
+ try_stream_close(stream);
+ }
+
if (new_stream) {
pthread_mutex_lock(&session->lock);
uatomic_set(&session->new_streams, 1);
}
cds_list_for_each_entry_rcu(stream, &session->recv_list,
recv_node) {
- stream_close(stream);
+ /* Close streams which have not been published yet. */
+ try_stream_close(stream);
}
rcu_unlock:
rcu_read_unlock();
stream->stream_handle = stream_handle;
stream->prev_seq = -1ULL;
+ stream->last_net_seq_num = -1ULL;
stream->ctf_stream_id = -1ULL;
stream->tracefile_size = tracefile_size;
stream->tracefile_count = tracefile_count;
rcu_read_unlock();
}
-void stream_close(struct relay_stream *stream)
+void try_stream_close(struct relay_stream *stream)
{
- DBG("closing stream %" PRIu64, stream->stream_handle);
+ DBG("Trying to close stream %" PRIu64, stream->stream_handle);
pthread_mutex_lock(&stream->lock);
+ /*
+ * Can be called concurently by connection close and reception of last
+ * pending data.
+ */
+ if (stream->closed) {
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+ return;
+ }
+
+ stream->close_requested = true;
+ /*
+ * We shortcut the data pending check if no bound is known for this
+ * stream. This prevents us from never closing the stream in the case
+ * where a connection would be closed before a "close" command has
+ * been received.
+ *
+ * TODO
+ * This still leaves open the question of handling missing data after
+ * a bound has been set by a stream close command. Since we have no
+ * way of pairing data and control connection, and that a data
+ * connection has no ownership of a stream, it is likely that a
+ * timeout approach would be appropriate to handle dangling streams.
+ */
+ if (stream->last_net_seq_num != -1ULL &&
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+ /* Don't close since we still have data pending. */
+ pthread_mutex_unlock(&stream->lock);
+ DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+ return;
+ }
stream_unpublish(stream);
stream->closed = true;
+ /* Relay indexes are only used by the "consumer/sessiond" end. */
relay_index_close_all(stream);
pthread_mutex_unlock(&stream->lock);
+ DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
stream_put(stream);
}
*/
struct tracefile_array *tfa;
- bool closed; /* Stream is closed. */
+ bool closed; /* Stream is closed. */
+ bool close_requested; /* Close command has been received. */
/*
* Counts number of indexes in indexes_ht. Redundant info.
struct relay_stream *stream_get_by_id(uint64_t stream_id);
bool stream_get(struct relay_stream *stream);
void stream_put(struct relay_stream *stream);
-void stream_close(struct relay_stream *stream);
+void try_stream_close(struct relay_stream *stream);
void stream_publish(struct relay_stream *stream);
void print_relay_streams(void);