- /*
- * Iterate over all the streams in the session and check if they are
- * still waiting for data to perform their rotation.
- */
- rcu_read_lock();
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- node.node) {
- if (!stream_get(stream)) {
- continue;
- }
- if (stream->trace->session != session) {
- stream_put(stream);
- continue;
- }
- pthread_mutex_lock(&stream->lock);
- if (stream->rotate_at_seq_num != -1ULL) {
- /* We have not yet performed the rotation. */
- rotate_pending = true;
- DBG("Stream %" PRIu64 " is still rotating",
- stream->stream_handle);
- } else if (stream->current_chunk_id.value <= chunk_id) {
- /*
- * Stream closed on the consumer but still active on the
- * relay.
- */
- rotate_pending = true;
- DBG("Stream %" PRIu64 " did not exist on the consumer "
- "when the last rotation started, but is"
- "still waiting for data before getting"
- "closed",
- stream->stream_handle);
- }
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
- if (rotate_pending) {
- goto send_reply;
+ if (close_command.is_set) {
+ chunk_status = lttng_trace_chunk_set_close_command(
+ chunk, close_command.value);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID;
+ goto end;