sessiond: open_packets: use user_space_consumer_channel_keys util
[lttng-tools.git] / src / common / consumer / consumer.cpp
index 01845871b91a2a2423e4375e983de1cac6243b7f..295ea2726e3c54e2991705cf3f90770652ebd082 100644 (file)
@@ -45,6 +45,7 @@
 #include <sys/mman.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <type_traits>
 #include <unistd.h>
 
 lttng_consumer_global_data the_consumer_data;
@@ -177,13 +178,6 @@ static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
 
        /* Delete streams that might have been left in the stream list. */
        cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) {
-               /*
-                * Once a stream is added to this list, the buffers were created so we
-                * have a guarantee that this call will succeed. Setting the monitor
-                * mode to 0 so we don't lock nor try to delete the stream from the
-                * global hash table.
-                */
-               stream->monitor = 0;
                consumer_stream_destroy(stream, nullptr);
        }
 }
@@ -205,10 +199,10 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht *
                return nullptr;
        }
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        lttng_ht_lookup(ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                stream = lttng::utils::container_of(node, &lttng_consumer_stream::node);
        }
@@ -220,7 +214,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        stream = find_stream(key, ht);
        if (stream) {
                stream->key = (uint64_t) -1ULL;
@@ -253,7 +247,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key)
        }
 
        lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                channel = lttng::utils::container_of(node, &lttng_consumer_channel::node);
        }
@@ -273,7 +267,7 @@ static void steal_channel_key(uint64_t key)
 {
        struct lttng_consumer_channel *channel;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        channel = consumer_find_channel(key);
        if (channel) {
                channel->key = (uint64_t) -1ULL;
@@ -303,7 +297,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -403,7 +398,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        if (channel->is_published) {
                int ret;
 
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                iter.iter.node = &channel->node.node;
                ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
                LTTNG_ASSERT(!ret);
@@ -430,7 +425,7 @@ static void cleanup_relayd_ht()
        struct consumer_relayd_sock_pair *relayd;
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
 
                cds_lfht_for_each_entry (
                        the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
@@ -456,12 +451,14 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
 
        DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Let's begin with metadata */
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
@@ -579,7 +576,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Steal stream identifier to avoid having streams with the same key */
        steal_stream_key(stream->key, ht);
@@ -624,7 +621,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream)
  */
 static int add_relayd(struct consumer_relayd_sock_pair *relayd)
 {
-       int ret = 0;
+       const int ret = 0;
        struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
@@ -632,7 +629,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd)
        ASSERT_RCU_READ_LOCKED();
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                goto end;
        }
@@ -693,7 +690,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key)
        }
 
        lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        if (node != nullptr) {
                relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
        }
@@ -717,7 +714,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path
        LTTNG_ASSERT(path);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -770,7 +767,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
        LTTNG_ASSERT(net_seq_idx != -1ULL);
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd != nullptr) {
                /* Add stream on the relayd */
@@ -804,7 +801,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream)
        struct consumer_relayd_sock_pair *relayd;
 
        /* The stream is not metadata. Get relayd reference if exists. */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd) {
                consumer_stream_relayd_close(stream, relayd);
@@ -1017,9 +1014,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == nullptr) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1045,7 +1044,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
+               delete channel;
                channel = nullptr;
                goto end;
        }
@@ -1085,7 +1084,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        CDS_INIT_LIST_HEAD(&channel->streams.head);
 
        if (trace_chunk) {
-               int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
+               const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk);
                if (ret) {
                        goto error;
                }
@@ -1121,7 +1120,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
         */
        steal_channel_key(channel->key);
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
        lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
                         &channel->channels_by_session_id_ht_node);
@@ -1164,7 +1163,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
        *nb_inactive_fd = 0;
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
                        /*
                         * Only active streams with an active end point can be added to the
@@ -1250,11 +1249,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx,
  * Send return code to the session daemon.
  * If the socket is not defined, we return 0, it is not a fatal error
  */
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+                             enum lttcomm_return_code error_code)
 {
        if (ctx->consumer_error_socket > 0) {
+               const std::int32_t comm_code = std::int32_t(error_code);
+
+               static_assert(
+                       sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+                       "Fixed-size communication type too small to accomodate lttcomm_return_code");
                return lttcomm_send_unix_sock(
-                       ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+                       ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
        }
 
        return 0;
@@ -1271,7 +1276,7 @@ void lttng_consumer_cleanup()
        unsigned int trace_chunks_left;
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
 
                cds_lfht_for_each_entry (
                        the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
@@ -1338,7 +1343,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
  */
 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
 {
-       int outfd = stream->out_fd;
+       const int outfd = stream->out_fd;
 
        /*
         * This does a blocking write-and-wait on any page that belongs to the
@@ -1456,7 +1461,7 @@ static void destroy_data_stream_ht(struct lttng_ht *ht)
        }
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
                        /*
                         * Ignore return value since we are currently cleaning up so any error
@@ -1483,7 +1488,7 @@ static void destroy_metadata_stream_ht(struct lttng_ht *ht)
        }
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
                        /*
                         * Ignore return value since we are currently cleaning up so any error
@@ -1593,7 +1598,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        size_t write_len;
 
        /* RCU lock for the relayd pointer */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
 
        /* Flag that the current stream if set for network streaming. */
@@ -1750,7 +1755,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
        off_t orig_offset = stream->out_fd_offset;
-       int fd = stream->wait_fd;
+       const int fd = stream->wait_fd;
        /* Default is on the disk */
        int outfd = stream->out_fd;
        struct consumer_relayd_sock_pair *relayd = nullptr;
@@ -1770,7 +1775,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
        }
 
        /* RCU lock for the relayd pointer */
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /* Flag that the current stream if set for network streaming. */
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
@@ -1867,7 +1872,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
 
                /* Handle stream on the relayd if the output is on the network */
                if (relayd && stream->metadata_flag) {
-                       size_t metadata_payload_size =
+                       const size_t metadata_payload_size =
                                sizeof(struct lttcomm_relayd_metadata_payload);
 
                        /* Update counter to fit the spliced data */
@@ -2130,6 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
@@ -2172,14 +2178,14 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
         * after this point.
         */
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        /*
         * Lookup the stream just to make sure it does not exist in our internal
         * state. This should NEVER happen.
         */
        lttng_ht_lookup(ht, &stream->key, &iter);
-       node = lttng_ht_iter_get_node_u64(&iter);
+       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
        LTTNG_ASSERT(!node);
 
        /*
@@ -2223,7 +2229,7 @@ static void validate_endpoint_status_data_stream()
        DBG("Consumer delete flagged data stream");
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
 
                cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
                        /* Validate delete flag of the stream */
@@ -2249,7 +2255,7 @@ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *po
        LTTNG_ASSERT(pollset);
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                        /* Validate delete flag of the stream */
                        if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
@@ -2399,13 +2405,13 @@ void *consumer_thread_metadata_poll(void *data)
                                continue;
                        }
 
-                       lttng::urcu::read_lock_guard read_lock;
+                       const lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
                                lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
                        }
-                       node = lttng_ht_iter_get_node_u64(&iter);
+                       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
                        stream = caa_container_of(node, struct lttng_consumer_stream, node);
@@ -2800,7 +2806,7 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -2853,7 +2859,7 @@ static void destroy_channel_ht(struct lttng_ht *ht)
        }
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
 
                cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
                        ret = lttng_ht_del(ht, &iter);
@@ -2966,7 +2972,7 @@ void *consumer_thread_channel_poll(void *data)
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                                       chan->wait_fd);
-                                               lttng::urcu::read_lock_guard read_lock;
+                                               const lttng::urcu::read_lock_guard read_lock;
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                        &chan->wait_fd_node);
                                                /* Add channel to the global poll events list */
@@ -2987,7 +2993,7 @@ void *consumer_thread_channel_poll(void *data)
                                                 * GET_CHANNEL failed.
                                                 */
 
-                                               lttng::urcu::read_lock_guard read_lock;
+                                               const lttng::urcu::read_lock_guard read_lock;
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
                                                        ERR("UST consumer get channel key %" PRIu64
@@ -3055,13 +3061,13 @@ void *consumer_thread_channel_poll(void *data)
                                continue;
                        }
 
-                       lttng::urcu::read_lock_guard read_lock;
+                       const lttng::urcu::read_lock_guard read_lock;
                        {
                                uint64_t tmp_id = (uint64_t) pollfd;
 
                                lttng_ht_lookup(channel_ht, &tmp_id, &iter);
                        }
-                       node = lttng_ht_iter_get_node_u64(&iter);
+                       node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
                        LTTNG_ASSERT(node);
 
                        chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node);
@@ -3733,7 +3739,7 @@ int consumer_data_pending(uint64_t id)
 
        DBG("Consumer data pending command on session id %" PRIu64, id);
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&the_consumer_data.lock);
 
        switch (the_consumer_data.type) {
@@ -3978,7 +3984,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                 nullptr);
        lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        pthread_mutex_lock(&channel->lock);
        LTTNG_ASSERT(channel->trace_chunk);
@@ -4416,7 +4422,7 @@ static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *cha
        int ret;
        struct lttng_consumer_stream *stream;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        pthread_mutex_lock(&channel->lock);
        cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                health_code_update();
@@ -4628,7 +4634,7 @@ int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
 
        ASSERT_RCU_READ_LOCKED();
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        DBG("Consumer rotate ready streams in channel %" PRIu64, key);
 
@@ -4792,7 +4798,7 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
        }
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_lfht_for_each_entry_duplicate(
                        the_consumer_data.channels_by_session_id_ht->ht,
                        the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
@@ -4948,7 +4954,7 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
         * current chunk found in the session's channels.
         */
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_lfht_for_each_entry (
                        the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
                        int ret;
@@ -5013,7 +5019,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id
        const bool is_local_trace = !relayd_id;
        struct consumer_relayd_sock_pair *relayd = nullptr;
        bool chunk_exists_local, chunk_exists_remote;
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
 
        if (relayd_id) {
                /* Only used for logging purposes. */
@@ -5080,7 +5086,7 @@ static int consumer_clear_monitored_channel(struct lttng_consumer_channel *chann
 
        ht = the_consumer_data.stream_per_chan_id_ht;
 
-       lttng::urcu::read_lock_guard read_lock;
+       const lttng::urcu::read_lock_guard read_lock;
        cds_lfht_for_each_entry_duplicate(ht->ht,
                                          ht->hash_fct(&channel->key, lttng_ht_seed),
                                          ht->match_fct,
@@ -5148,7 +5154,7 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consum
        }
 
        {
-               lttng::urcu::read_lock_guard read_lock;
+               const lttng::urcu::read_lock_guard read_lock;
                cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
                        enum consumer_stream_open_packet_status status;
 
This page took 0.030812 seconds and 4 git commands to generate.