#include <common/optional.hpp>
#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
+#include <common/scope-exit.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/shm.hpp>
#include <common/urcu.hpp>
int *relayd_error)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- struct lttng_consumer_stream *stream;
uint64_t net_seq_idx = -1ULL;
LTTNG_ASSERT(channel);
DBG("UST consumer sending channel %s to sessiond", channel->name);
if (channel->relayd_id != (uint64_t) -1ULL) {
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream :
+ lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Try to send the stream to the relayd if one is available. */
}
/* The channel was sent successfully to the sessiond at this point. */
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Send stream to session daemon. */
struct lttng_consumer_local_data *ctx)
{
int ret = 0;
- struct lttng_consumer_stream *stream, *stmp;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ctx);
/* Send streams to the corresponding thread. */
- cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Sending the stream to the thread. */
int ret;
unsigned use_relayd = 0;
unsigned long consumed_pos, produced_pos;
- struct lttng_consumer_stream *stream;
LTTNG_ASSERT(path);
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(!channel->monitor);
DBG("UST consumer snapshot channel %" PRIu64, key);
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ for (auto stream : lttng::urcu::list_iteration_adapter<lttng_consumer_stream,
+ <tng_consumer_stream::send_node>(
+ channel->streams.head)) {
health_code_update();
/* Lock stream because we are about to change its state. */
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
LTTNG_ASSERT(channel->trace_chunk);
if (!lttng_trace_chunk_get(channel->trace_chunk)) {
/*
* holds a reference to the trace chunk.
*/
ERR("Failed to acquire reference to channel's trace chunk");
- ret = -1;
- goto error_unlock;
+ return -1;
}
LTTNG_ASSERT(!stream->trace_chunk);
stream->trace_chunk = channel->trace_chunk;
stream->net_seq_idx = relayd_id;
+ /* Close stream output when were are done. */
+ const auto close_stream_output = lttng::make_scope_exit(
+ [stream]() noexcept { consumer_stream_close_output(stream); });
+
if (use_relayd) {
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
- goto error_close_stream;
+ return ret;
}
} else {
ret = consumer_stream_create_output_files(stream, false);
if (ret < 0) {
- goto error_close_stream;
+ return ret;
}
+
DBG("UST consumer snapshot stream (%" PRIu64 ")", stream->key);
}
", channel name = '%s'",
channel->key,
channel->name);
- goto error_unlock;
+ return ret;
}
}
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking UST snapshot");
- goto error_close_stream;
+ return ret;
}
ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced UST snapshot position");
- goto error_close_stream;
+ return ret;
}
ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd UST snapshot position");
- goto error_close_stream;
+ return ret;
}
/*
if (ret < 0) {
if (ret != -EAGAIN) {
PERROR("lttng_ust_ctl_get_subbuf snapshot");
- goto error_close_stream;
+ return ret;
}
+
DBG("UST consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
stream->chan->lost_packets++;
continue;
}
+ /* Put the subbuffer once we are done. */
+ const auto put_subbuf = lttng::make_scope_exit([stream]() noexcept {
+ if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
+ ERR("Snapshot lttng_ust_ctl_put_subbuf");
+ }
+ });
+
ret = lttng_ust_ctl_get_subbuf_size(stream->ustream, &len);
if (ret < 0) {
ERR("Snapshot lttng_ust_ctl_get_subbuf_size");
- goto error_put_subbuf;
+ return ret;
}
ret = lttng_ust_ctl_get_padded_subbuf_size(stream->ustream, &padded_len);
if (ret < 0) {
ERR("Snapshot lttng_ust_ctl_get_padded_subbuf_size");
- goto error_put_subbuf;
+ return ret;
}
ret = get_current_subbuf_addr(stream, &subbuf_addr);
if (ret) {
- goto error_put_subbuf;
+ return ret;
}
subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
stream, &subbuf_view, padded_len - len);
if (use_relayd) {
if (read_len != len) {
- ret = -EPERM;
- goto error_put_subbuf;
+ return -EPERM;
}
} else {
if (read_len != padded_len) {
- ret = -EPERM;
- goto error_put_subbuf;
+ return -EPERM;
}
}
- ret = lttng_ust_ctl_put_subbuf(stream->ustream);
- if (ret < 0) {
- ERR("Snapshot lttng_ust_ctl_put_subbuf");
- goto error_close_stream;
- }
consumed_pos += stream->max_sb_size;
}
/* Simply close the stream so we can use it on the next snapshot. */
consumer_stream_close_output(stream);
- pthread_mutex_unlock(&stream->lock);
}
return 0;
-
-error_put_subbuf:
- if (lttng_ust_ctl_put_subbuf(stream->ustream) < 0) {
- ERR("Snapshot lttng_ust_ctl_put_subbuf");
- }
-error_close_stream:
- consumer_stream_close_output(stream);
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- return ret;
}
static void metadata_stream_reset_cache_consumed_position(struct lttng_consumer_stream *stream)