destroy_conn,
sock_n.node) {
health_code_update();
+
+ if (session_abort(destroy_conn->session)) {
+ assert(0);
+ }
+
/*
* No need to grab another ref, because we own
* destroy_conn.
return ret;
}
+int session_abort(struct relay_session *session)
+{
+ int ret = 0;
+
+ if (!session) {
+ return 0;
+ }
+
+ pthread_mutex_lock(&session->lock);
+ DBG("aborting session %" PRIu64, session->id);
+ if (session->aborted) {
+ ERR("session %" PRIu64 " is already aborted", session->id);
+ ret = -1;
+ goto unlock;
+ }
+ session->aborted = true;
+unlock:
+ pthread_mutex_unlock(&session->lock);
+ return ret;
+}
+
void print_sessions(void)
{
struct lttng_ht_iter iter;
/* Tell if the session connection has been closed on the streaming side. */
bool connection_closed;
+ /*
+ * Tell if the session is currently living in a exiting relayd and
+ * should be cleaned forcefully without waiting for pending data or
+ * pending ctrl data.
+ */
+ bool aborted;
+
/* Contains ctf_trace object of that session indexed by path name. */
struct lttng_ht *ctf_traces_ht;
void session_put(struct relay_session *session);
int session_close(struct relay_session *session);
+int session_abort(struct relay_session *session);
+
void print_sessions(void);
#endif /* _SESSION_H */
void try_stream_close(struct relay_stream *stream)
{
+ bool session_aborted;
+ struct relay_session *session = stream->trace->session;
+
DBG("Trying to close stream %" PRIu64, stream->stream_handle);
+
+ pthread_mutex_lock(&session->lock);
+ session_aborted = session->aborted;
+ pthread_mutex_unlock(&session->lock);
+
pthread_mutex_lock(&stream->lock);
/*
* Can be called concurently by connection close and reception of last
}
if (stream->last_net_seq_num != -1ULL &&
- ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+ ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0
+ && !session_aborted) {
/*
* Don't close since we still have data pending. This
* handles cases where an explicit close command has