#include <common/kernel-ctl/kernel-ctl.hpp>
#include <common/optional.hpp>
#include <common/pipe.hpp>
+#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/urcu.hpp>
uint64_t nb_packets_per_stream)
{
int ret;
- struct lttng_consumer_stream *stream;
DBG("Kernel consumer snapshot channel %" PRIu64, key);
/* Prevent channel modifications while we perform the snapshot.*/
- pthread_mutex_lock(&channel->lock);
+ const lttng::pthread::lock_guard channe_lock(channel->lock);
const lttng::urcu::read_lock_guard read_lock;
goto end;
}
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
unsigned long consumed_pos, produced_pos;
health_code_update();
/*
* Lock stream because we are about to change its state.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
LTTNG_ASSERT(channel->trace_chunk);
if (!lttng_trace_chunk_get(channel->trace_chunk)) {
*/
ERR("Failed to acquire reference to channel's trace chunk");
ret = -1;
- goto end_unlock;
+ goto end;
}
LTTNG_ASSERT(!stream->trace_chunk);
stream->trace_chunk = channel->trace_chunk;
*/
stream->net_seq_idx = relayd_id;
channel->relayd_id = relayd_id;
+
+ /* Close stream output when were are done. */
+ const auto close_stream_output = lttng::make_scope_exit(
+ [stream]() noexcept { consumer_stream_close_output(stream); });
+
if (relayd_id != (uint64_t) -1ULL) {
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
- goto error_close_stream_output;
+ goto end;
}
} else {
ret = consumer_stream_create_output_files(stream, false);
if (ret < 0) {
- goto error_close_stream_output;
+ goto end;
}
DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
}
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
ERR("Failed to flush kernel stream");
- goto error_close_stream_output;
+ goto end;
}
- goto end_unlock;
+ goto end;
}
ret = lttng_kconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking kernel snapshot");
- goto error_close_stream_output;
+ goto end;
}
ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto error_close_stream_output;
+ goto end;
}
ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd kernel snapshot position");
- goto error_close_stream_output;
+ goto end;
}
consumed_pos = consumer_get_consume_start_pos(
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("kernctl_get_subbuf snapshot");
- goto error_close_stream_output;
+ goto end;
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
continue;
}
+ /* Put the subbuffer once we are done. */
+ const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+ const auto put_ret = kernctl_put_subbuf(stream->wait_fd);
+ if (put_ret < 0) {
+ ERR("Snapshot kernctl_put_subbuf");
+ }
+ });
+
ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
if (ret < 0) {
ERR("Snapshot kernctl_get_subbuf_size");
- goto error_put_subbuf;
+ goto end;
}
ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
if (ret < 0) {
ERR("Snapshot kernctl_get_padded_subbuf_size");
- goto error_put_subbuf;
+ goto end;
}
ret = get_current_subbuf_addr(stream, &subbuf_addr);
if (ret) {
- goto error_put_subbuf;
+ goto end;
}
subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
}
}
- ret = kernctl_put_subbuf(stream->wait_fd);
- if (ret < 0) {
- ERR("Snapshot kernctl_put_subbuf");
- goto error_close_stream_output;
- }
consumed_pos += stream->max_sb_size;
}
-
- consumer_stream_close_output(stream);
- pthread_mutex_unlock(&stream->lock);
}
/* All good! */
ret = 0;
goto end;
-error_put_subbuf:
- ret = kernctl_put_subbuf(stream->wait_fd);
- if (ret < 0) {
- ERR("Snapshot kernctl_put_subbuf error path");
- }
-error_close_stream_output:
- consumer_stream_close_output(stream);
-end_unlock:
- pthread_mutex_unlock(&stream->lock);
end:
- pthread_mutex_unlock(&channel->lock);
return ret;
}