* The consumer data lock MUST be acquired.
* The stream lock MUST be acquired.
*/
-void consumer_stream_close(struct lttng_consumer_stream *stream)
+void consumer_stream_close_output(struct lttng_consumer_stream *stream)
{
- int ret;
struct consumer_relayd_sock_pair *relayd;
LTTNG_ASSERT(stream);
- switch (the_consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (stream->mmap_base != NULL) {
- ret = munmap(stream->mmap_base, stream->mmap_len);
- if (ret != 0) {
- PERROR("munmap");
- }
- }
-
- if (stream->wait_fd >= 0) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- stream->wait_fd = -1;
- }
- if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
- utils_close_pipe(stream->splice_pipe);
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- {
- /*
- * Special case for the metadata since the wait fd is an internal pipe
- * polled in the metadata thread.
- */
- if (stream->metadata_flag && stream->chan->monitor) {
- int rpipe = stream->ust_metadata_poll_pipe[0];
-
- /*
- * This will stop the channel timer if one and close the write side
- * of the metadata poll pipe.
- */
- lttng_ustconsumer_close_metadata(stream->chan);
- if (rpipe >= 0) {
- ret = close(rpipe);
- if (ret < 0) {
- PERROR("closing metadata pipe read side");
- }
- stream->ust_metadata_poll_pipe[0] = -1;
- }
- }
- break;
- }
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
/* Close output fd. Could be a socket or local file at this point. */
if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
+ const auto ret = close(stream->out_fd);
if (ret) {
- PERROR("close");
+ PERROR("Failed to close stream output file descriptor");
}
+
stream->out_fd = -1;
}
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
consumer_stream_relayd_close(stream, relayd);
+ stream->net_seq_idx = -1ULL;
}
+
rcu_read_unlock();
}
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
+ if (stream->mmap_base != NULL) {
+ const auto ret = munmap(stream->mmap_base, stream->mmap_len);
+
+ if (ret != 0) {
+ PERROR("munmap");
+ }
+ }
+
+ if (stream->wait_fd >= 0) {
+ const auto ret = close(stream->wait_fd);
+
+ if (ret) {
+ PERROR("close");
+ }
+
+ stream->wait_fd = -1;
+ }
+
+ if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+ utils_close_pipe(stream->splice_pipe);
+ }
+
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /*
+ * Special case for the metadata since the wait fd is an internal pipe
+ * polled in the metadata thread.
+ */
+ if (stream->metadata_flag && stream->chan->monitor) {
+ const auto rpipe = stream->ust_metadata_poll_pipe[0];
+
+ /*
+ * This will stop the channel timer if one and close the write side
+ * of the metadata poll pipe.
+ */
+ lttng_ustconsumer_close_metadata(stream->chan);
+ if (rpipe >= 0) {
+ const auto ret = close(rpipe);
+
+ if (ret < 0) {
+ PERROR("closing metadata pipe read side");
+ }
+
+ stream->ust_metadata_poll_pipe[0] = -1;
+ }
+ }
+
lttng_ustconsumer_del_stream(stream);
break;
default:
/* Destroy tracer buffers of the stream. */
consumer_stream_destroy_buffers(stream);
/* Close down everything including the relayd if one. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
}
/*
* The stream lock MUST be acquired.
* The consumer data lock MUST be acquired.
*/
-void consumer_stream_close(struct lttng_consumer_stream *stream);
+void consumer_stream_close_output(struct lttng_consumer_stream *stream);
/*
* Close stream on the relayd side. This call can destroy a relayd if the
consumer_stream_delete(stream, ht);
/* Close down everything including the relayd if one. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
/* Destroy tracer buffers of the stream. */
consumer_stream_destroy_buffers(stream);
return ret;
}
-static void finalize_snapshot_stream(
- struct lttng_consumer_stream *stream, uint64_t relayd_id)
-{
- ASSERT_LOCKED(stream->lock);
-
- if (relayd_id == (uint64_t) -1ULL) {
- if (stream->out_fd >= 0) {
- const int ret = close(stream->out_fd);
-
- if (ret < 0) {
- PERROR("Failed to close stream snapshot output file descriptor");
- }
-
- stream->out_fd = -1;
- }
- } else {
- close_relayd_stream(stream);
- stream->net_seq_idx = (uint64_t) -1ULL;
- }
-
- lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
-}
-
/*
* Take a snapshot of all the stream of a channel
* RCU read-side lock must be held across this function to ensure existence of
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
} else {
ret = consumer_stream_create_output_files(stream,
false);
if (ret < 0) {
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
stream->key);
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
goto end_unlock;
}
ret = lttng_kconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking kernel snapshot");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd kernel snapshot position");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
consumed_pos = consumer_get_consume_start_pos(consumed_pos,
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
- goto error_finalize_stream;
+ goto error_close_stream_output;
}
consumed_pos += stream->max_sb_size;
}
- finalize_snapshot_stream(stream, relayd_id);
+ consumer_stream_close_output(stream);
pthread_mutex_unlock(&stream->lock);
}
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf error path");
}
-error_finalize_stream:
- finalize_snapshot_stream(stream, relayd_id);
+error_close_stream_output:
+ consumer_stream_close_output(stream);
end_unlock:
pthread_mutex_unlock(&stream->lock);
end:
}
/* Simply close the stream so we can use it on the next snapshot. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
pthread_mutex_unlock(&stream->lock);
}
ERR("Snapshot lttng_ust_ctl_put_subbuf");
}
error_close_stream:
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
error_unlock:
pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();