X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=1da243601cd7ff3c72fc8febb5d6c11f2f150c4b;hb=refs%2Fheads%2Fmaster;hp=4823d9b9c59aa6f9933d9e7552fe27fb48865cb6;hpb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 4823d9b9c..295ea2726 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -45,6 +45,7 @@ #include #include #include +#include #include lttng_consumer_global_data the_consumer_data; @@ -177,13 +178,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { - /* - * Once a stream is added to this list, the buffers were created so we - * have a guarantee that this call will succeed. Setting the monitor - * mode to 0 so we don't lock nor try to delete the stream from the - * global hash table. - */ - stream->monitor = 0; consumer_stream_destroy(stream, nullptr); } } @@ -205,10 +199,10 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * return nullptr; } - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(ht, &key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } @@ -220,7 +214,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; stream = find_stream(key, ht); if (stream) { stream->key = (uint64_t) -1ULL; @@ -253,7 +247,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) } lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { channel = lttng::utils::container_of(node, <tng_consumer_channel::node); } @@ -273,7 +267,7 @@ static void steal_channel_key(uint64_t key) { struct lttng_consumer_channel *channel; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(key); if (channel) { channel->key = (uint64_t) -1ULL; @@ -303,7 +297,8 @@ static void free_channel_rcu(struct rcu_head *head) ERR("Unknown consumer_data type"); abort(); } - free(channel); + + delete channel; } /* @@ -403,7 +398,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->is_published) { int ret; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; iter.iter.node = &channel->node.node; ret = lttng_ht_del(the_consumer_data.channel_ht, &iter); LTTNG_ASSERT(!ret); @@ -430,7 +425,7 @@ static void cleanup_relayd_ht() struct consumer_relayd_sock_pair *relayd; { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry ( the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { @@ -456,13 +451,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* Let's begin with metadata */ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); - lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + stream->chan->metadata_pushed_wait_queue.wake_all(); DBG("Delete flag set to metadata stream %d", stream->wait_fd); } @@ -581,7 +576,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* Steal stream identifier to avoid having streams with the same key */ steal_stream_key(stream->key, ht); @@ -626,7 +621,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) */ static int add_relayd(struct consumer_relayd_sock_pair *relayd) { - int ret = 0; + const int ret = 0; struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; @@ -634,7 +629,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) ASSERT_RCU_READ_LOCKED(); lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { goto end; } @@ -695,7 +690,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) } lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); } @@ -719,7 +714,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path LTTNG_ASSERT(path); /* The stream is not metadata. Get relayd reference if exists. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -772,7 +767,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) LTTNG_ASSERT(net_seq_idx != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -806,7 +801,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { consumer_stream_relayd_close(stream, relayd); @@ -1019,9 +1014,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = zmalloc(); - if (channel == nullptr) { - PERROR("malloc struct lttng_consumer_channel"); + try { + channel = new lttng_consumer_channel; + } catch (const std::bad_alloc& e) { + ERR("Failed to allocate lttng_consumer_channel: %s", e.what()); + channel = nullptr; goto end; } @@ -1035,9 +1032,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; channel->is_live = is_in_live_session; - pthread_mutex_init(&channel->lock, NULL); - pthread_mutex_init(&channel->timer_lock, NULL); - lttng_wait_queue_init(&channel->metadata_pushed_wait_queue); + pthread_mutex_init(&channel->lock, nullptr); + pthread_mutex_init(&channel->timer_lock, nullptr); switch (output) { case LTTNG_EVENT_SPLICE: @@ -1048,7 +1044,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, break; default: abort(); - free(channel); + delete channel; channel = nullptr; goto end; } @@ -1088,7 +1084,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, CDS_INIT_LIST_HEAD(&channel->streams.head); if (trace_chunk) { - int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk); + const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk); if (ret) { goto error; } @@ -1124,7 +1120,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ steal_channel_key(channel->key); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node); lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); @@ -1167,7 +1163,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, *nb_inactive_fd = 0; { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { /* * Only active streams with an active end point can be added to the @@ -1253,11 +1249,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, * Send return code to the session daemon. * If the socket is not defined, we return 0, it is not a fatal error */ -int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd) +int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, + enum lttcomm_return_code error_code) { if (ctx->consumer_error_socket > 0) { + const std::int32_t comm_code = std::int32_t(error_code); + + static_assert( + sizeof(comm_code) >= sizeof(std::underlying_type), + "Fixed-size communication type too small to accomodate lttcomm_return_code"); return lttcomm_send_unix_sock( - ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command)); + ctx->consumer_error_socket, &comm_code, sizeof(comm_code)); } return 0; @@ -1274,7 +1276,7 @@ void lttng_consumer_cleanup() unsigned int trace_chunks_left; { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry ( the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { @@ -1341,7 +1343,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) */ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { - int outfd = stream->out_fd; + const int outfd = stream->out_fd; /* * This does a blocking write-and-wait on any page that belongs to the @@ -1459,7 +1461,7 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) } { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { /* * Ignore return value since we are currently cleaning up so any error @@ -1486,7 +1488,7 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht) } { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { /* * Ignore return value since we are currently cleaning up so any error @@ -1596,7 +1598,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre size_t write_len; /* RCU lock for the relayd pointer */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk); /* Flag that the current stream if set for network streaming. */ @@ -1753,7 +1755,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; off_t orig_offset = stream->out_fd_offset; - int fd = stream->wait_fd; + const int fd = stream->wait_fd; /* Default is on the disk */ int outfd = stream->out_fd; struct consumer_relayd_sock_pair *relayd = nullptr; @@ -1773,7 +1775,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data } /* RCU lock for the relayd pointer */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { @@ -1870,7 +1872,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* Handle stream on the relayd if the output is on the network */ if (relayd && stream->metadata_flag) { - size_t metadata_payload_size = + const size_t metadata_payload_size = sizeof(struct lttcomm_relayd_metadata_payload); /* Update counter to fit the spliced data */ @@ -2133,7 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l * pointer value. */ channel->metadata_stream = nullptr; - lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + channel->metadata_pushed_wait_queue.wake_all(); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); @@ -2176,14 +2178,14 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) * after this point. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* * Lookup the stream just to make sure it does not exist in our internal * state. This should NEVER happen. */ lttng_ht_lookup(ht, &stream->key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(!node); /* @@ -2227,7 +2229,7 @@ static void validate_endpoint_status_data_stream() DBG("Consumer delete flagged data stream"); { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ @@ -2253,7 +2255,7 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po LTTNG_ASSERT(pollset); { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { /* Validate delete flag of the stream */ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { @@ -2403,13 +2405,13 @@ void *consumer_thread_metadata_poll(void *data) continue; } - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; lttng_ht_lookup(metadata_ht, &tmp_id, &iter); } - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(node); stream = caa_container_of(node, struct lttng_consumer_stream, node); @@ -2804,7 +2806,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe ht = the_consumer_data.stream_per_chan_id_ht; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -2857,7 +2859,7 @@ static void destroy_channel_ht(struct lttng_ht *ht) } { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { ret = lttng_ht_del(ht, &iter); @@ -2970,7 +2972,7 @@ void *consumer_thread_channel_poll(void *data) lttng_ht_node_init_u64(&chan->wait_fd_node, chan->wait_fd); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(channel_ht, &chan->wait_fd_node); /* Add channel to the global poll events list */ @@ -2991,7 +2993,7 @@ void *consumer_thread_channel_poll(void *data) * GET_CHANNEL failed. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; chan = consumer_find_channel(key); if (!chan) { ERR("UST consumer get channel key %" PRIu64 @@ -3059,13 +3061,13 @@ void *consumer_thread_channel_poll(void *data) continue; } - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; lttng_ht_lookup(channel_ht, &tmp_id, &iter); } - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(node); chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node); @@ -3737,7 +3739,7 @@ int consumer_data_pending(uint64_t id) DBG("Consumer data pending command on session id %" PRIu64, id); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&the_consumer_data.lock); switch (the_consumer_data.type) { @@ -3982,7 +3984,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, nullptr); lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); LTTNG_ASSERT(channel->trace_chunk); @@ -4420,7 +4422,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha int ret; struct lttng_consumer_stream *stream; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; pthread_mutex_lock(&channel->lock); cds_list_for_each_entry (stream, &channel->streams.head, send_node) { health_code_update(); @@ -4632,7 +4634,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, ASSERT_RCU_READ_LOCKED(); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; DBG("Consumer rotate ready streams in channel %" PRIu64, key); @@ -4796,7 +4798,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, } { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate( the_consumer_data.channels_by_session_id_ht->ht, the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, @@ -4952,7 +4954,7 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, * current chunk found in the session's channels. */ { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry ( the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { int ret; @@ -5017,7 +5019,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id const bool is_local_trace = !relayd_id; struct consumer_relayd_sock_pair *relayd = nullptr; bool chunk_exists_local, chunk_exists_remote; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; if (relayd_id) { /* Only used for logging purposes. */ @@ -5084,7 +5086,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann ht = the_consumer_data.stream_per_chan_id_ht; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, @@ -5152,7 +5154,7 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum } { - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; cds_list_for_each_entry (stream, &channel->streams.head, send_node) { enum consumer_stream_open_packet_status status;