return ret;
}
+static void finalize_snapshot_stream(
+ struct lttng_consumer_stream *stream, uint64_t relayd_id)
+{
+ ASSERT_LOCKED(stream->lock);
+
+ if (relayd_id == (uint64_t) -1ULL) {
+ if (stream->out_fd >= 0) {
+ const int ret = close(stream->out_fd);
+
+ if (ret < 0) {
+ PERROR("Failed to close stream snapshot output file descriptor");
+ }
+
+ stream->out_fd = -1;
+ }
+ } else {
+ close_relayd_stream(stream);
+ stream->net_seq_idx = (uint64_t) -1ULL;
+ }
+
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = NULL;
+}
+
/*
* Take a snapshot of all the stream of a channel
* RCU read-side lock must be held across this function to ensure existence of
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
- goto end_unlock;
+ goto error_finalize_stream;
}
} else {
ret = consumer_stream_create_output_files(stream,
false);
if (ret < 0) {
- goto end_unlock;
+ goto error_finalize_stream;
}
DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
stream->key);
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto end_unlock;
+ goto error_finalize_stream;
}
goto end_unlock;
}
ret = lttng_kconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking kernel snapshot");
- goto end_unlock;
+ goto error_finalize_stream;
}
ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto end_unlock;
+ goto error_finalize_stream;
}
ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd kernel snapshot position");
- goto end_unlock;
+ goto error_finalize_stream;
}
consumed_pos = consumer_get_consume_start_pos(consumed_pos,
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- goto end_unlock;
+ goto error_finalize_stream;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
ret = kernctl_put_subbuf(stream->wait_fd);
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf");
- goto end_unlock;
+ goto error_finalize_stream;
}
consumed_pos += stream->max_sb_size;
}
- if (relayd_id == (uint64_t) -1ULL) {
- if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
- if (ret < 0) {
- PERROR("Kernel consumer snapshot close out_fd");
- goto end_unlock;
- }
- stream->out_fd = -1;
- }
- } else {
- close_relayd_stream(stream);
- stream->net_seq_idx = (uint64_t) -1ULL;
- }
- lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
+ finalize_snapshot_stream(stream, relayd_id);
pthread_mutex_unlock(&stream->lock);
}
if (ret < 0) {
ERR("Snapshot kernctl_put_subbuf error path");
}
+error_finalize_stream:
+ finalize_snapshot_stream(stream, relayd_id);
end_unlock:
pthread_mutex_unlock(&stream->lock);
end:
if (use_relayd) {
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
- goto error_unlock;
+ goto error_close_stream;
}
} else {
ret = consumer_stream_create_output_files(stream,
false);
if (ret < 0) {
- goto error_unlock;
+ goto error_close_stream;
}
DBG("UST consumer snapshot stream (%" PRIu64 ")",
stream->key);
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking UST snapshot");
- goto error_unlock;
+ goto error_close_stream;
}
ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced UST snapshot position");
- goto error_unlock;
+ goto error_close_stream;
}
ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd UST snapshot position");
- goto error_unlock;
+ goto error_close_stream;
}
/*