/* Update number of created stream counter. */
(*nb_created)++;
}
+ /*
+ * Ensure a self-reference is preserved even
+ * after we have put our local reference.
+ */
+ viewer_stream_get(vstream);
} else {
if (!vstream->sent_flag && nb_unsent) {
/* Update number of unsent stream counter. */
(*nb_unsent)++;
}
- viewer_stream_put(vstream);
}
/* Update number of total stream counter. */
if (nb_total) {
- (*nb_total)++;
+ if (stream->is_metadata) {
+ if (!stream->closed ||
+ stream->metadata_received > vstream->metadata_sent) {
+ (*nb_total)++;
+ }
+ } else {
+ if (!stream->closed ||
+ !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+
+ (*nb_total)++;
+ }
+ }
}
+ /* Put local reference. */
+ viewer_stream_put(vstream);
next:
stream_put(stream);
}
response.streams_count = htobe32(nb_streams);
/*
- * If the session is closed, HUP when there are no more streams.
+ * If the session is closed, HUP when there are no more streams
+ * with data.
*/
if (closed && nb_total == 0) {
send_streams = 0;
{
DBG("closing stream %" PRIu64, stream->stream_handle);
pthread_mutex_lock(&stream->lock);
+ stream->closed = true;
relay_index_close_all(stream);
pthread_mutex_unlock(&stream->lock);
stream_put(stream);