static
int make_viewer_streams(struct relay_session *session,
enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent,
- uint32_t *nb_created)
+ uint32_t *nb_created, bool *closed)
{
int ret;
struct lttng_ht_iter iter;
*/
pthread_mutex_lock(&session->lock);
+ if (session->connection_closed) {
+ *closed = true;
+ }
+
/*
* Create viewer streams for relay streams that are ready to be
* used for a the given session id only.
struct lttng_viewer_new_streams_response response;
struct relay_session *session;
uint64_t session_id;
+ bool closed = false;
assert(conn);
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
- &nb_created);
+ &nb_created, &closed);
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
/*
- * If the session is closed and we have no new streams to send,
- * it means that the viewer has already received the whole trace
- * for this session and should now close it.
+ * If the session is closed, HUP when there are no more streams.
*/
- if (nb_total == 0 && session->connection_closed) {
+ if (closed && nb_total == 0) {
send_streams = 0;
+ response.streams_count = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
goto send_reply;
}
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct relay_session *session = NULL;
+ bool closed = false;
assert(conn);
goto send_reply;
}
- ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
+ ret = make_viewer_streams(session, seek_type, &nb_streams, NULL,
+ NULL, &closed);
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
+ /*
+ * If the session is closed when the viewer is attaching, it
+ * means some of the streams may have been concurrently removed,
+ * so we don't allow the viewer to attach, even if there are
+ * streams available.
+ */
+ if (closed) {
+ send_streams = 0;
+ response.streams_count = 0;
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+ goto send_reply;
+ }
+
send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));