#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/pthread-lock.hpp>
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
*/
static void consumer_close_channel_streams(struct lttng_consumer_channel *channel)
{
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
-
- ht = the_consumer_data.stream_per_chan_id_ht;
-
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
/*
* Protect against teardown with mutex.
*/
int consumer_data_pending(uint64_t id)
{
int ret;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
+ const auto ht = the_consumer_data.stream_list_ht;
struct consumer_relayd_sock_pair *relayd = nullptr;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
- const lttng::urcu::read_lock_guard read_lock;
- pthread_mutex_lock(&the_consumer_data.lock);
+ const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock);
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
abort();
}
- /* Ease our life a bit */
- ht = the_consumer_data.stream_list_ht;
-
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
- {
- pthread_mutex_lock(&stream->lock);
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
/*
* A removed node from the hash table indicates that the stream has
/* Check the stream if there is data in the buffers. */
ret = data_pending(stream);
if (ret == 1) {
- pthread_mutex_unlock(&stream->lock);
goto data_pending;
}
}
-
- pthread_mutex_unlock(&stream->lock);
}
relayd = find_relayd_by_session_id(id);
if (relayd) {
unsigned int is_data_inflight = 0;
+ const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex);
+
/* Send init command for data pending. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id);
if (ret < 0) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
/* Communication error thus the relayd so no data pending. */
goto data_not_pending;
}
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&id, lttng_ht_seed),
- ht->match_fct,
- &id,
- &iter.iter,
- stream,
- node_session_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_session_id),
+ <tng_consumer_stream::node_session_id,
+ std::uint64_t>(
+ *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) {
if (stream->metadata_flag) {
ret = relayd_quiescent_control(&relayd->control_sock,
stream->relayd_stream_id);
}
if (ret == 1) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_pending;
} else if (ret < 0) {
ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".",
relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
goto data_not_pending;
}
}
/* Send end command for data pending. */
ret = relayd_end_data_pending(
&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".",
relayd->net_seq_idx);
data_not_pending:
/* Data is available to be read by a viewer. */
- pthread_mutex_unlock(&the_consumer_data.lock);
return 0;
data_pending:
/* Data is still being extracted from buffers. */
- pthread_mutex_unlock(&the_consumer_data.lock);
return 1;
}
uint64_t relayd_id)
{
int ret;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
struct lttng_dynamic_array stream_rotation_positions;
uint64_t next_chunk_id, stream_count = 0;
enum lttng_trace_chunk_status chunk_status;
bool rotating_to_new_chunk = true;
/* Array of `struct lttng_consumer_stream *` */
struct lttng_dynamic_pointer_array streams_packet_to_open;
- size_t stream_idx;
ASSERT_RCU_READ_LOCKED();
nullptr);
lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- const lttng::urcu::read_lock_guard read_lock;
+ const lttng::pthread::lock_guard channel_lock(channel->lock);
- pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
- goto end_unlock_channel;
+ goto end;
}
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
unsigned long produced_pos = 0, consumed_pos = 0;
health_code_update();
/*
* Lock stream because we are about to change its state.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (stream->trace_chunk == stream->chan->trace_chunk) {
rotating_to_new_chunk = false;
ret = sample_stream_positions(
stream, &produced_pos, &consumed_pos);
if (ret) {
- goto end_unlock_stream;
+ goto end;
}
/*
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
stream->key);
- goto end_unlock_stream;
+ goto end;
}
}
ret = lttng_consumer_take_snapshot(stream);
if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
ERR("Failed to sample snapshot position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
if (!ret) {
ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Failed to sample produced position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Failed to sample consumed position during channel rotation");
- goto end_unlock_stream;
+ goto end;
}
}
/*
ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
stream->key);
ret = -1;
- goto end_unlock_stream;
+ goto end;
}
stream->rotate_position = stream->last_sequence_number + 1 +
((produced_pos - consumed_pos) / stream->max_sb_size);
&position);
if (ret) {
ERR("Failed to allocate stream rotation position");
- goto end_unlock_stream;
+ goto end;
}
stream_count++;
}
if (ret) {
PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
ret = -1;
- goto end_unlock_stream;
+ goto end;
}
}
-
- pthread_mutex_unlock(&stream->lock);
}
- stream = nullptr;
if (!is_local_trace) {
relayd = consumer_find_relayd(relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, relayd_id);
ret = -1;
- goto end_unlock_channel;
+ goto end;
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
relayd->net_seq_idx);
lttng_consumer_cleanup_relayd(relayd);
- goto end_unlock_channel;
+ goto end;
}
}
- for (stream_idx = 0;
+ for (std::size_t stream_idx = 0;
stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open);
stream_idx++) {
enum consumer_stream_open_packet_status status;
-
- stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
+ auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer(
&streams_packet_to_open, stream_idx);
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
status = consumer_stream_open_packet(stream);
- pthread_mutex_unlock(&stream->lock);
switch (status) {
case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
DBG("Opened a packet after a rotation: stream id = %" PRIu64
case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
/* Logged by callee. */
ret = -1;
- goto end_unlock_channel;
+ goto end;
default:
abort();
}
}
- pthread_mutex_unlock(&channel->lock);
ret = 0;
- goto end;
-
-end_unlock_stream:
- pthread_mutex_unlock(&stream->lock);
-end_unlock_channel:
- pthread_mutex_unlock(&channel->lock);
end:
lttng_dynamic_array_reset(&stream_rotation_positions);
lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key)
{
int ret;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
- struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
ASSERT_RCU_READ_LOCKED();
- const lttng::urcu::read_lock_guard read_lock;
-
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
health_code_update();
- pthread_mutex_lock(&stream->chan->lock);
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard channel_lock(stream->chan->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (!stream->rotate_ready) {
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
continue;
}
- DBG("Consumer rotate ready stream %" PRIu64, stream->key);
+ DBG("Consumer rotate ready stream %" PRIu64, stream->key);
ret = lttng_consumer_rotate_stream(stream);
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&stream->chan->lock);
if (ret) {
goto end;
}
char creation_timestamp_buffer[ISO8601_STR_LEN];
const char *relayd_id_str = "(none)";
const char *creation_timestamp_str;
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *channel;
if (relayd_id) {
/* Only used for logging purposes. */
goto error;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry_duplicate(
- the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
- lttng_ht_seed),
- the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id,
- &iter.iter,
- channel,
- channels_by_session_id_ht_node.node)
- {
- ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
- if (ret) {
- /*
- * Roll-back the creation of this chunk.
- *
- * This is important since the session daemon will
- * assume that the creation of this chunk failed and
- * will never ask for it to be closed, resulting
- * in a leak and an inconsistent state for some
- * channels.
- */
- enum lttcomm_return_code close_ret;
- char path[LTTNG_PATH_MAX];
-
- DBG("Failed to set new trace chunk on existing channels, rolling back");
- close_ret =
- lttng_consumer_close_trace_chunk(relayd_id,
- session_id,
- chunk_id,
- chunk_creation_timestamp,
- nullptr,
- path);
- if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- session_id,
- chunk_id);
- }
+ for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_channel,
+ decltype(lttng_consumer_channel::channels_by_session_id_ht_node),
+ <tng_consumer_channel::channels_by_session_id_ht_node,
+ std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht,
+ &session_id,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct)) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
- ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
- break;
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ nullptr,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
}
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
}
}
static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel)
{
- struct lttng_ht *ht;
- struct lttng_consumer_stream *stream;
- struct lttng_ht_iter iter;
int ret;
-
- ht = the_consumer_data.stream_per_chan_id_ht;
-
- const lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct,
- &channel->key,
- &iter.iter,
- stream,
- node_channel_id.node)
- {
+ const auto ht = the_consumer_data.stream_per_chan_id_ht;
+
+ for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter<
+ lttng_consumer_stream,
+ decltype(lttng_consumer_stream::node_channel_id),
+ <tng_consumer_stream::node_channel_id,
+ std::uint64_t>(*ht->ht,
+ &channel->key,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct)) {
/*
* Protect against teardown with mutex.
*/
- pthread_mutex_lock(&stream->lock);
+ const lttng::pthread::lock_guard stream_lock(stream->lock);
if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
+ continue;
}
+
ret = consumer_clear_stream(stream);
if (ret) {
- goto error_unlock;
+ return ret;
}
- next:
- pthread_mutex_unlock(&stream->lock);
}
- return LTTCOMM_CONSUMERD_SUCCESS;
-error_unlock:
- pthread_mutex_unlock(&stream->lock);
- return ret;
+ return LTTCOMM_CONSUMERD_SUCCESS;
}
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)