#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
+#include <type_traits>
#include <unistd.h>
lttng_consumer_global_data the_consumer_data;
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
- /*
- * Once a stream is added to this list, the buffers were created so we
- * have a guarantee that this call will succeed. Setting the monitor
- * mode to 0 so we don't lock nor try to delete the stream from the
- * global hash table.
- */
- stream->monitor = 0;
consumer_stream_destroy(stream, nullptr);
}
}
return nullptr;
}
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
{
struct lttng_consumer_stream *stream;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
stream = find_stream(key, ht);
if (stream) {
stream->key = (uint64_t) -1ULL;
}
lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
channel = lttng::utils::container_of(node, <tng_consumer_channel::node);
}
{
struct lttng_consumer_channel *channel;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(key);
if (channel) {
channel->key = (uint64_t) -1ULL;
ERR("Unknown consumer_data type");
abort();
}
- free(channel);
+
+ delete channel;
}
/*
if (channel->is_published) {
int ret;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
LTTNG_ASSERT(!ret);
struct consumer_relayd_sock_pair *relayd;
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (
the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/* Let's begin with metadata */
cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
- lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+ stream->chan->metadata_pushed_wait_queue.wake_all();
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
*/
static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
- int ret = 0;
+ const int ret = 0;
struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
goto end;
}
}
lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
if (node != nullptr) {
relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
}
LTTNG_ASSERT(path);
/* The stream is not metadata. Get relayd reference if exists. */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
LTTNG_ASSERT(net_seq_idx != -1ULL);
/* The stream is not metadata. Get relayd reference if exists. */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(net_seq_idx);
if (relayd != nullptr) {
/* Add stream on the relayd */
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
}
- channel = zmalloc<lttng_consumer_channel>();
- if (channel == nullptr) {
- PERROR("malloc struct lttng_consumer_channel");
+ try {
+ channel = new lttng_consumer_channel;
+ } catch (const std::bad_alloc& e) {
+ ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+ channel = nullptr;
goto end;
}
channel->monitor = monitor;
channel->live_timer_interval = live_timer_interval;
channel->is_live = is_in_live_session;
- pthread_mutex_init(&channel->lock, NULL);
- pthread_mutex_init(&channel->timer_lock, NULL);
- lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
+ pthread_mutex_init(&channel->lock, nullptr);
+ pthread_mutex_init(&channel->timer_lock, nullptr);
switch (output) {
case LTTNG_EVENT_SPLICE:
break;
default:
abort();
- free(channel);
+ delete channel;
channel = nullptr;
goto end;
}
CDS_INIT_LIST_HEAD(&channel->streams.head);
if (trace_chunk) {
- int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
+ const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
if (ret) {
goto error;
}
*/
steal_channel_key(channel->key);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
&channel->channels_by_session_id_ht_node);
*nb_inactive_fd = 0;
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
/*
* Only active streams with an active end point can be added to the
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+ enum lttcomm_return_code error_code)
{
if (ctx->consumer_error_socket > 0) {
+ const std::int32_t comm_code = std::int32_t(error_code);
+
+ static_assert(
+ sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+ "Fixed-size communication type too small to accomodate lttcomm_return_code");
return lttcomm_send_unix_sock(
- ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+ ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
}
return 0;
unsigned int trace_chunks_left;
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (
the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int outfd = stream->out_fd;
+ const int outfd = stream->out_fd;
/*
* This does a blocking write-and-wait on any page that belongs to the
}
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
/*
* Ignore return value since we are currently cleaning up so any error
}
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
/*
* Ignore return value since we are currently cleaning up so any error
size_t write_len;
/* RCU lock for the relayd pointer */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
off_t orig_offset = stream->out_fd_offset;
- int fd = stream->wait_fd;
+ const int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
struct consumer_relayd_sock_pair *relayd = nullptr;
}
/* RCU lock for the relayd pointer */
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
/* Handle stream on the relayd if the output is on the network */
if (relayd && stream->metadata_flag) {
- size_t metadata_payload_size =
+ const size_t metadata_payload_size =
sizeof(struct lttcomm_relayd_metadata_payload);
/* Update counter to fit the spliced data */
* pointer value.
*/
channel->metadata_stream = nullptr;
- lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+ channel->metadata_pushed_wait_queue.wake_all();
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
* after this point.
*/
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
/*
* Lookup the stream just to make sure it does not exist in our internal
* state. This should NEVER happen.
*/
lttng_ht_lookup(ht, &stream->key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(!node);
/*
DBG("Consumer delete flagged data stream");
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
LTTNG_ASSERT(pollset);
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
}
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
}
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
stream = caa_container_of(node, struct lttng_consumer_stream, node);
ht = the_consumer_data.stream_per_chan_id_ht;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct,
}
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
ret = lttng_ht_del(ht, &iter);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
/* Add channel to the global poll events list */
* GET_CHANNEL failed.
*/
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
chan = consumer_find_channel(key);
if (!chan) {
ERR("UST consumer get channel key %" PRIu64
continue;
}
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
lttng_ht_lookup(channel_ht, &tmp_id, &iter);
}
- node = lttng_ht_iter_get_node_u64(&iter);
+ node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
LTTNG_ASSERT(node);
chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
DBG("Consumer data pending command on session id %" PRIu64, id);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&the_consumer_data.lock);
switch (the_consumer_data.type) {
nullptr);
lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
int ret;
struct lttng_consumer_stream *stream;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
ASSERT_RCU_READ_LOCKED();
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
}
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(
the_consumer_data.channels_by_session_id_ht->ht,
the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
* current chunk found in the session's channels.
*/
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry (
the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
int ret;
const bool is_local_trace = !relayd_id;
struct consumer_relayd_sock_pair *relayd = nullptr;
bool chunk_exists_local, chunk_exists_remote;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
if (relayd_id) {
/* Only used for logging purposes. */
ht = the_consumer_data.stream_per_chan_id_ht;
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct,
}
{
- lttng::urcu::read_lock_guard read_lock;
+ const lttng::urcu::read_lock_guard read_lock;
cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
enum consumer_stream_open_packet_status status;