X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=1da243601cd7ff3c72fc8febb5d6c11f2f150c4b;hb=refs%2Fheads%2Fmaster;hp=4823d9b9c59aa6f9933d9e7552fe27fb48865cb6;hpb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 4823d9b9c..f1f29d429 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +46,7 @@ #include #include #include +#include #include lttng_consumer_global_data the_consumer_data; @@ -171,19 +173,12 @@ error: */ static void clean_channel_stream_list(struct lttng_consumer_channel *channel) { - struct lttng_consumer_stream *stream, *stmp; - LTTNG_ASSERT(channel); /* Delete streams that might have been left in the stream list. */ - cds_list_for_each_entry_safe (stream, stmp, &channel->streams.head, send_node) { - /* - * Once a stream is added to this list, the buffers were created so we - * have a guarantee that this call will succeed. Setting the monitor - * mode to 0 so we don't lock nor try to delete the stream from the - * global hash table. - */ - stream->monitor = 0; + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { consumer_stream_destroy(stream, nullptr); } } @@ -205,10 +200,10 @@ static struct lttng_consumer_stream *find_stream(uint64_t key, struct lttng_ht * return nullptr; } - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; lttng_ht_lookup(ht, &key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { stream = lttng::utils::container_of(node, <tng_consumer_stream::node); } @@ -220,7 +215,7 @@ static void steal_stream_key(uint64_t key, struct lttng_ht *ht) { struct lttng_consumer_stream *stream; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; stream = find_stream(key, ht); if (stream) { stream->key = (uint64_t) -1ULL; @@ -253,7 +248,7 @@ struct lttng_consumer_channel *consumer_find_channel(uint64_t key) } lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { channel = lttng::utils::container_of(node, <tng_consumer_channel::node); } @@ -273,7 +268,7 @@ static void steal_channel_key(uint64_t key) { struct lttng_consumer_channel *channel; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; channel = consumer_find_channel(key); if (channel) { channel->key = (uint64_t) -1ULL; @@ -303,7 +298,8 @@ static void free_channel_rcu(struct rcu_head *head) ERR("Unknown consumer_data type"); abort(); } - free(channel); + + delete channel; } /* @@ -403,7 +399,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) if (channel->is_published) { int ret; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; iter.iter.node = &channel->node.node; ret = lttng_ht_del(the_consumer_data.channel_ht, &iter); LTTNG_ASSERT(!ret); @@ -426,16 +422,12 @@ end: */ static void cleanup_relayd_ht() { - struct lttng_ht_iter iter; - struct consumer_relayd_sock_pair *relayd; - - { - lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry ( - the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { - consumer_destroy_relayd(relayd); - } + for (auto *relayd : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.relayd_ht->ht)) { + consumer_destroy_relayd(relayd); } lttng_ht_destroy(the_consumer_data.relayd_ht); @@ -451,25 +443,26 @@ static void cleanup_relayd_ht() static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, enum consumer_endpoint_status status) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx); - lttng::urcu::read_lock_guard read_lock; - /* Let's begin with metadata */ - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*metadata_ht->ht)) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); - lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + stream->chan->metadata_pushed_wait_queue.wake_all(); DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } /* Follow up by the data streams */ - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*data_ht->ht)) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); DBG("Delete flag set to data stream %d", stream->wait_fd); @@ -581,7 +574,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->chan->timer_lock); pthread_mutex_lock(&stream->lock); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* Steal stream identifier to avoid having streams with the same key */ steal_stream_key(stream->key, ht); @@ -626,7 +619,7 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) */ static int add_relayd(struct consumer_relayd_sock_pair *relayd) { - int ret = 0; + const int ret = 0; struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; @@ -634,7 +627,7 @@ static int add_relayd(struct consumer_relayd_sock_pair *relayd) ASSERT_RCU_READ_LOCKED(); lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { goto end; } @@ -695,7 +688,7 @@ struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key) } lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); if (node != nullptr) { relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node); } @@ -719,7 +712,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path LTTNG_ASSERT(path); /* The stream is not metadata. Get relayd reference if exists. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -772,7 +765,7 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) LTTNG_ASSERT(net_seq_idx != -1ULL); /* The stream is not metadata. Get relayd reference if exists. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(net_seq_idx); if (relayd != nullptr) { /* Add stream on the relayd */ @@ -806,7 +799,7 @@ void close_relayd_stream(struct lttng_consumer_stream *stream) struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd) { consumer_stream_relayd_close(stream, relayd); @@ -1019,9 +1012,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = zmalloc(); - if (channel == nullptr) { - PERROR("malloc struct lttng_consumer_channel"); + try { + channel = new lttng_consumer_channel; + } catch (const std::bad_alloc& e) { + ERR("Failed to allocate lttng_consumer_channel: %s", e.what()); + channel = nullptr; goto end; } @@ -1035,9 +1030,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; channel->is_live = is_in_live_session; - pthread_mutex_init(&channel->lock, NULL); - pthread_mutex_init(&channel->timer_lock, NULL); - lttng_wait_queue_init(&channel->metadata_pushed_wait_queue); + pthread_mutex_init(&channel->lock, nullptr); + pthread_mutex_init(&channel->timer_lock, nullptr); switch (output) { case LTTNG_EVENT_SPLICE: @@ -1048,7 +1042,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, break; default: abort(); - free(channel); + delete channel; channel = nullptr; goto end; } @@ -1088,7 +1082,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, CDS_INIT_LIST_HEAD(&channel->streams.head); if (trace_chunk) { - int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk); + const int ret = lttng_consumer_channel_set_trace_chunk(channel, trace_chunk); if (ret) { goto error; } @@ -1124,7 +1118,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ steal_channel_key(channel->key); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node); lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht, &channel->channels_by_session_id_ht_node); @@ -1155,8 +1149,6 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, int *nb_inactive_fd) { int i = 0; - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; LTTNG_ASSERT(ctx); LTTNG_ASSERT(ht); @@ -1166,32 +1158,32 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, DBG("Updating poll fd array"); *nb_inactive_fd = 0; - { - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Only active streams with an active end point can be added to the - * poll set and local stream storage of the thread. - * - * There is a potential race here for endpoint_status to be updated - * just after the check. However, this is OK since the stream(s) will - * be deleted once the thread is notified that the end point state has - * changed where this function will be called back again. - * - * We track the number of inactive FDs because they still need to be - * closed by the polling thread after a wakeup on the data_pipe or - * metadata_pipe. - */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { - (*nb_inactive_fd)++; - continue; - } - - (*pollfd)[i].fd = stream->wait_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = stream; - i++; + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + /* + * Only active streams with an active end point can be added to the + * poll set and local stream storage of the thread. + * + * There is a potential race here for endpoint_status to be updated + * just after the check. However, this is OK since the stream(s) will + * be deleted once the thread is notified that the end point state has + * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. + */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; + continue; } + + (*pollfd)[i].fd = stream->wait_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_stream[i] = stream; + i++; } /* @@ -1253,11 +1245,17 @@ void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, * Send return code to the session daemon. * If the socket is not defined, we return 0, it is not a fatal error */ -int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd) +int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, + enum lttcomm_return_code error_code) { if (ctx->consumer_error_socket > 0) { + const std::int32_t comm_code = std::int32_t(error_code); + + static_assert( + sizeof(comm_code) >= sizeof(std::underlying_type), + "Fixed-size communication type too small to accomodate lttcomm_return_code"); return lttcomm_send_unix_sock( - ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command)); + ctx->consumer_error_socket, &comm_code, sizeof(comm_code)); } return 0; @@ -1269,17 +1267,14 @@ int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd) */ void lttng_consumer_cleanup() { - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; unsigned int trace_chunks_left; - { - lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry ( - the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - consumer_del_channel(channel); - } + for (auto *channel : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.channel_ht->ht)) { + consumer_del_channel(channel); } lttng_ht_destroy(the_consumer_data.channel_ht); @@ -1341,7 +1336,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) */ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { - int outfd = stream->out_fd; + const int outfd = stream->out_fd; /* * This does a blocking write-and-wait on any page that belongs to the @@ -1451,22 +1446,19 @@ error: */ static void destroy_data_stream_ht(struct lttng_ht *ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - if (ht == nullptr) { return; } - { - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_stream(stream, ht); - } + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_stream(stream, ht); } lttng_ht_destroy(ht); @@ -1478,22 +1470,19 @@ static void destroy_data_stream_ht(struct lttng_ht *ht) */ static void destroy_metadata_stream_ht(struct lttng_ht *ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - if (ht == nullptr) { return; } - { - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) { - /* - * Ignore return value since we are currently cleaning up so any error - * can't be handled. - */ - (void) consumer_del_metadata_stream(stream, ht); - } + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + /* + * Ignore return value since we are currently cleaning up so any error + * can't be handled. + */ + (void) consumer_del_metadata_stream(stream, ht); } lttng_ht_destroy(ht); @@ -1596,7 +1585,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre size_t write_len; /* RCU lock for the relayd pointer */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk); /* Flag that the current stream if set for network streaming. */ @@ -1753,7 +1742,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; off_t orig_offset = stream->out_fd_offset; - int fd = stream->wait_fd; + const int fd = stream->wait_fd; /* Default is on the disk */ int outfd = stream->out_fd; struct consumer_relayd_sock_pair *relayd = nullptr; @@ -1773,7 +1762,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data } /* RCU lock for the relayd pointer */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* Flag that the current stream if set for network streaming. */ if (stream->net_seq_idx != (uint64_t) -1ULL) { @@ -1870,7 +1859,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* Handle stream on the relayd if the output is on the network */ if (relayd && stream->metadata_flag) { - size_t metadata_payload_size = + const size_t metadata_payload_size = sizeof(struct lttcomm_relayd_metadata_payload); /* Update counter to fit the spliced data */ @@ -2133,7 +2122,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l * pointer value. */ channel->metadata_stream = nullptr; - lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + channel->metadata_pushed_wait_queue.wake_all(); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); @@ -2176,14 +2165,14 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) * after this point. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; /* * Lookup the stream just to make sure it does not exist in our internal * state. This should NEVER happen. */ lttng_ht_lookup(ht, &stream->key, &iter); - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(!node); /* @@ -2221,22 +2210,18 @@ void consumer_add_metadata_stream(struct lttng_consumer_stream *stream) */ static void validate_endpoint_status_data_stream() { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - DBG("Consumer delete flagged data stream"); - { - lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* Delete it right now */ - consumer_del_stream(stream, data_ht); + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*data_ht->ht)) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; } + /* Delete it right now */ + consumer_del_stream(stream, data_ht); } } @@ -2245,29 +2230,26 @@ static void validate_endpoint_status_data_stream() */ static void validate_endpoint_status_metadata_stream(struct lttng_poll_event *pollset) { - struct lttng_ht_iter iter; - struct lttng_consumer_stream *stream; - DBG("Consumer delete flagged metadata stream"); LTTNG_ASSERT(pollset); - { - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { - /* Validate delete flag of the stream */ - if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { - continue; - } - /* - * Remove from pollset so the metadata thread can continue without - * blocking on a deleted stream. - */ - lttng_poll_del(pollset, stream->wait_fd); - - /* Delete it right now */ - consumer_del_metadata_stream(stream, metadata_ht); + for (auto *stream : + lttng::urcu::lfht_iteration_adapter(*metadata_ht->ht)) { + /* Validate delete flag of the stream */ + if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) { + continue; } + /* + * Remove from pollset so the metadata thread can continue without + * blocking on a deleted stream. + */ + lttng_poll_del(pollset, stream->wait_fd); + + /* Delete it right now */ + consumer_del_metadata_stream(stream, metadata_ht); } } @@ -2403,16 +2385,16 @@ void *consumer_thread_metadata_poll(void *data) continue; } - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; lttng_ht_lookup(metadata_ht, &tmp_id, &iter); } - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(node); - stream = caa_container_of(node, struct lttng_consumer_stream, node); + stream = lttng::utils::container_of(node, <tng_consumer_stream::node); if (revents & (LPOLLIN | LPOLLPRI)) { /* Get the data out of the metadata file descriptor */ @@ -2798,21 +2780,16 @@ error_testpoint: */ static void consumer_close_channel_streams(struct lttng_consumer_channel *channel) { - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - - ht = the_consumer_data.stream_per_chan_id_ht; - - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + const auto ht = the_consumer_data.stream_per_chan_id_ht; + + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { /* * Protect against teardown with mutex. */ @@ -2848,21 +2825,16 @@ static void consumer_close_channel_streams(struct lttng_consumer_channel *channe static void destroy_channel_ht(struct lttng_ht *ht) { - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; - int ret; - if (ht == nullptr) { return; } - { - lttng::urcu::read_lock_guard read_lock; - - cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) { - ret = lttng_ht_del(ht, &iter); - LTTNG_ASSERT(ret != 0); - } + for (auto *channel : + lttng::urcu::lfht_iteration_adapter(*ht->ht)) { + const auto ret = cds_lfht_del(ht->ht, &channel->node.node); + LTTNG_ASSERT(ret != 0); } lttng_ht_destroy(ht); @@ -2970,7 +2942,7 @@ void *consumer_thread_channel_poll(void *data) lttng_ht_node_init_u64(&chan->wait_fd_node, chan->wait_fd); - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; lttng_ht_add_unique_u64(channel_ht, &chan->wait_fd_node); /* Add channel to the global poll events list */ @@ -2991,7 +2963,7 @@ void *consumer_thread_channel_poll(void *data) * GET_CHANNEL failed. */ - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; chan = consumer_find_channel(key); if (!chan) { ERR("UST consumer get channel key %" PRIu64 @@ -3059,16 +3031,17 @@ void *consumer_thread_channel_poll(void *data) continue; } - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; { uint64_t tmp_id = (uint64_t) pollfd; lttng_ht_lookup(channel_ht, &tmp_id, &iter); } - node = lttng_ht_iter_get_node_u64(&iter); + node = lttng_ht_iter_get_node(&iter); LTTNG_ASSERT(node); - chan = caa_container_of(node, struct lttng_consumer_channel, wait_fd_node); + chan = lttng::utils::container_of(node, + <tng_consumer_channel::wait_fd_node); /* Check for error event */ if (revents & (LPOLLERR | LPOLLHUP)) { @@ -3697,27 +3670,23 @@ error_nosignal: */ static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id) { - struct lttng_ht_iter iter; - struct consumer_relayd_sock_pair *relayd = nullptr; - - ASSERT_RCU_READ_LOCKED(); - /* Iterate over all relayd since they are indexed by net_seq_idx. */ - cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) { + for (auto *relayd : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.relayd_ht->ht)) { /* * Check by sessiond id which is unique here where the relayd session * id might not be when having multiple relayd. */ if (relayd->sessiond_session_id == id) { /* Found the relayd. There can be only one per id. */ - goto found; + return relayd; } } return nullptr; - -found: - return relayd; } /* @@ -3729,16 +3698,13 @@ found: int consumer_data_pending(uint64_t id) { int ret; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; + const auto ht = the_consumer_data.stream_list_ht; struct consumer_relayd_sock_pair *relayd = nullptr; int (*data_pending)(struct lttng_consumer_stream *); DBG("Consumer data pending command on session id %" PRIu64, id); - lttng::urcu::read_lock_guard read_lock; - pthread_mutex_lock(&the_consumer_data.lock); + const lttng::pthread::lock_guard consumer_data_lock(the_consumer_data.lock); switch (the_consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -3753,18 +3719,12 @@ int consumer_data_pending(uint64_t id) abort(); } - /* Ease our life a bit */ - ht = the_consumer_data.stream_list_ht; - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) - { - pthread_mutex_lock(&stream->lock); + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_session_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>(*ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) { + const lttng::pthread::lock_guard stream_lock(stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3777,35 +3737,30 @@ int consumer_data_pending(uint64_t id) /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) { - pthread_mutex_unlock(&stream->lock); goto data_pending; } } - - pthread_mutex_unlock(&stream->lock); } relayd = find_relayd_by_session_id(id); if (relayd) { unsigned int is_data_inflight = 0; + const lttng::pthread::lock_guard ctrl_sock_lock(relayd->ctrl_sock_mutex); + /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_begin_data_pending(&relayd->control_sock, relayd->relayd_session_id); if (ret < 0) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); /* Communication error thus the relayd so no data pending. */ goto data_not_pending; } - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&id, lttng_ht_seed), - ht->match_fct, - &id, - &iter.iter, - stream, - node_session_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_session_id), + <tng_consumer_stream::node_session_id, + std::uint64_t>( + *ht->ht, &id, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct)) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3816,13 +3771,11 @@ int consumer_data_pending(uint64_t id) } if (ret == 1) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_pending; } else if (ret < 0) { ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_not_pending; } } @@ -3830,7 +3783,6 @@ int consumer_data_pending(uint64_t id) /* Send end command for data pending. */ ret = relayd_end_data_pending( &relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); @@ -3850,12 +3802,10 @@ int consumer_data_pending(uint64_t id) data_not_pending: /* Data is available to be read by a viewer. */ - pthread_mutex_unlock(&the_consumer_data.lock); return 0; data_pending: /* Data is still being extracted from buffers. */ - pthread_mutex_unlock(&the_consumer_data.lock); return 1; } @@ -3960,9 +3910,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, uint64_t relayd_id) { int ret; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + const auto ht = the_consumer_data.stream_per_chan_id_ht; struct lttng_dynamic_array stream_rotation_positions; uint64_t next_chunk_id, stream_count = 0; enum lttng_trace_chunk_status chunk_status; @@ -3971,7 +3919,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, bool rotating_to_new_chunk = true; /* Array of `struct lttng_consumer_stream *` */ struct lttng_dynamic_pointer_array streams_packet_to_open; - size_t stream_idx; ASSERT_RCU_READ_LOCKED(); @@ -3982,24 +3929,23 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, nullptr); lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr); - lttng::urcu::read_lock_guard read_lock; + const lttng::pthread::lock_guard channel_lock(channel->lock); - pthread_mutex_lock(&channel->lock); LTTNG_ASSERT(channel->trace_chunk); chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk, &next_chunk_id); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret = -1; - goto end_unlock_channel; + goto end; } - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { unsigned long produced_pos = 0, consumed_pos = 0; health_code_update(); @@ -4007,7 +3953,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, /* * Lock stream because we are about to change its state. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); if (stream->trace_chunk == stream->chan->trace_chunk) { rotating_to_new_chunk = false; @@ -4061,7 +4007,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ret = sample_stream_positions( stream, &produced_pos, &consumed_pos); if (ret) { - goto end_unlock_stream; + goto end; } /* @@ -4110,26 +4056,26 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, if (ret < 0) { ERR("Failed to flush stream %" PRIu64 " during channel rotation", stream->key); - goto end_unlock_stream; + goto end; } } ret = lttng_consumer_take_snapshot(stream); if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) { ERR("Failed to sample snapshot position during channel rotation"); - goto end_unlock_stream; + goto end; } if (!ret) { ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Failed to sample produced position during channel rotation"); - goto end_unlock_stream; + goto end; } ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Failed to sample consumed position during channel rotation"); - goto end_unlock_stream; + goto end; } } /* @@ -4169,7 +4115,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable", stream->key); ret = -1; - goto end_unlock_stream; + goto end; } stream->rotate_position = stream->last_sequence_number + 1 + ((produced_pos - consumed_pos) / stream->max_sb_size); @@ -4192,7 +4138,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, &position); if (ret) { ERR("Failed to allocate stream rotation position"); - goto end_unlock_stream; + goto end; } stream_count++; } @@ -4254,20 +4200,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, if (ret) { PERROR("Failed to add a stream pointer to array of streams in which to open a packet"); ret = -1; - goto end_unlock_stream; + goto end; } } - - pthread_mutex_unlock(&stream->lock); } - stream = nullptr; if (!is_local_trace) { relayd = consumer_find_relayd(relayd_id); if (!relayd) { ERR("Failed to find relayd %" PRIu64, relayd_id); ret = -1; - goto end_unlock_channel; + goto end; } pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -4281,21 +4224,19 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64, relayd->net_seq_idx); lttng_consumer_cleanup_relayd(relayd); - goto end_unlock_channel; + goto end; } } - for (stream_idx = 0; + for (std::size_t stream_idx = 0; stream_idx < lttng_dynamic_pointer_array_get_count(&streams_packet_to_open); stream_idx++) { enum consumer_stream_open_packet_status status; - - stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer( + auto *stream = (lttng_consumer_stream *) lttng_dynamic_pointer_array_get_pointer( &streams_packet_to_open, stream_idx); - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); status = consumer_stream_open_packet(stream); - pthread_mutex_unlock(&stream->lock); switch (status) { case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: DBG("Opened a packet after a rotation: stream id = %" PRIu64 @@ -4319,20 +4260,13 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: /* Logged by callee. */ ret = -1; - goto end_unlock_channel; + goto end; default: abort(); } } - pthread_mutex_unlock(&channel->lock); ret = 0; - goto end; - -end_unlock_stream: - pthread_mutex_unlock(&stream->lock); -end_unlock_channel: - pthread_mutex_unlock(&channel->lock); end: lttng_dynamic_array_reset(&stream_rotation_positions); lttng_dynamic_pointer_array_reset(&streams_packet_to_open); @@ -4417,27 +4351,22 @@ error: static int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel) { - int ret; - struct lttng_consumer_stream *stream; + const lttng::urcu::read_lock_guard read_lock; + const lttng::pthread::lock_guard channel_lock(channel->lock); - lttng::urcu::read_lock_guard read_lock; - pthread_mutex_lock(&channel->lock); - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { health_code_update(); - pthread_mutex_lock(&stream->lock); - ret = consumer_clear_stream(stream); + + const lttng::pthread::lock_guard stream_lock(stream->lock); + const auto ret = consumer_clear_stream(stream); if (ret) { - goto error_unlock; + return ret; } - pthread_mutex_unlock(&stream->lock); } - pthread_mutex_unlock(&channel->lock); - return 0; -error_unlock: - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&channel->lock); - return ret; + return 0; } /* @@ -4626,39 +4555,31 @@ error: int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key) { int ret; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; - struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; + const auto ht = the_consumer_data.stream_per_chan_id_ht; ASSERT_RCU_READ_LOCKED(); - lttng::urcu::read_lock_guard read_lock; - DBG("Consumer rotate ready streams in channel %" PRIu64, key); - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { health_code_update(); - pthread_mutex_lock(&stream->chan->lock); - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard channel_lock(stream->chan->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); if (!stream->rotate_ready) { - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); continue; } - DBG("Consumer rotate ready stream %" PRIu64, stream->key); + DBG("Consumer rotate ready stream %" PRIu64, stream->key); ret = lttng_consumer_rotate_stream(stream); - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); if (ret) { goto end; } @@ -4707,8 +4628,6 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, char creation_timestamp_buffer[ISO8601_STR_LEN]; const char *relayd_id_str = "(none)"; const char *creation_timestamp_str; - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; if (relayd_id) { /* Only used for logging purposes. */ @@ -4795,50 +4714,45 @@ lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, goto error; } - { - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry_duplicate( - the_consumer_data.channels_by_session_id_ht->ht, - the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, - lttng_ht_seed), - the_consumer_data.channels_by_session_id_ht->match_fct, - &session_id, - &iter.iter, - channel, - channels_by_session_id_ht_node.node) - { - ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); - if (ret) { - /* - * Roll-back the creation of this chunk. - * - * This is important since the session daemon will - * assume that the creation of this chunk failed and - * will never ask for it to be closed, resulting - * in a leak and an inconsistent state for some - * channels. - */ - enum lttcomm_return_code close_ret; - char path[LTTNG_PATH_MAX]; - - DBG("Failed to set new trace chunk on existing channels, rolling back"); - close_ret = - lttng_consumer_close_trace_chunk(relayd_id, - session_id, - chunk_id, - chunk_creation_timestamp, - nullptr, - path); - if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { - ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 - ", chunk_id = %" PRIu64, - session_id, - chunk_id); - } + for (auto *channel : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_channel, + decltype(lttng_consumer_channel::channels_by_session_id_ht_node), + <tng_consumer_channel::channels_by_session_id_ht_node, + std::uint64_t>(*the_consumer_data.channels_by_session_id_ht->ht, + &session_id, + the_consumer_data.channels_by_session_id_ht->hash_fct( + &session_id, lttng_ht_seed), + the_consumer_data.channels_by_session_id_ht->match_fct)) { + ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk); + if (ret) { + /* + * Roll-back the creation of this chunk. + * + * This is important since the session daemon will + * assume that the creation of this chunk failed and + * will never ask for it to be closed, resulting + * in a leak and an inconsistent state for some + * channels. + */ + enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; - ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - break; + DBG("Failed to set new trace chunk on existing channels, rolling back"); + close_ret = lttng_consumer_close_trace_chunk(relayd_id, + session_id, + chunk_id, + chunk_creation_timestamp, + nullptr, + path); + if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 + ", chunk_id = %" PRIu64, + session_id, + chunk_id); } + + ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; + break; } } @@ -4896,8 +4810,6 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; const char *close_command_name = "none"; - struct lttng_ht_iter iter; - struct lttng_consumer_channel *channel; enum lttng_trace_chunk_status chunk_status; if (relayd_id) { @@ -4951,32 +4863,33 @@ lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, * it; it is only kept around to compare it (by address) to the * current chunk found in the session's channels. */ - { - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry ( - the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) { - int ret; + for (auto *channel : + lttng::urcu::lfht_iteration_adapter( + *the_consumer_data.channel_ht->ht)) { + int ret; + /* + * Only change the channel's chunk to NULL if it still + * references the chunk being closed. The channel may + * reference a newer channel in the case of a session + * rotation. When a session rotation occurs, the "next" + * chunk is created before the "current" chunk is closed. + */ + if (channel->trace_chunk != chunk) { + continue; + } + ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); + if (ret) { /* - * Only change the channel's chunk to NULL if it still - * references the chunk being closed. The channel may - * reference a newer channel in the case of a session - * rotation. When a session rotation occurs, the "next" - * chunk is created before the "current" chunk is closed. + * Attempt to close the chunk on as many channels as + * possible. */ - if (channel->trace_chunk != chunk) { - continue; - } - ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr); - if (ret) { - /* - * Attempt to close the chunk on as many channels as - * possible. - */ - ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; - } + ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED; } } + if (relayd_id) { int ret; struct consumer_relayd_sock_pair *relayd; @@ -5017,7 +4930,7 @@ lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, uint64_t session_id const bool is_local_trace = !relayd_id; struct consumer_relayd_sock_pair *relayd = nullptr; bool chunk_exists_local, chunk_exists_remote; - lttng::urcu::read_lock_guard read_lock; + const lttng::urcu::read_lock_guard read_lock; if (relayd_id) { /* Only used for logging purposes. */ @@ -5077,41 +4990,32 @@ end: static int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel) { - struct lttng_ht *ht; - struct lttng_consumer_stream *stream; - struct lttng_ht_iter iter; int ret; - - ht = the_consumer_data.stream_per_chan_id_ht; - - lttng::urcu::read_lock_guard read_lock; - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, - &channel->key, - &iter.iter, - stream, - node_channel_id.node) - { + const auto ht = the_consumer_data.stream_per_chan_id_ht; + + for (auto *stream : lttng::urcu::lfht_filtered_iteration_adapter< + lttng_consumer_stream, + decltype(lttng_consumer_stream::node_channel_id), + <tng_consumer_stream::node_channel_id, + std::uint64_t>(*ht->ht, + &channel->key, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct)) { /* * Protect against teardown with mutex. */ - pthread_mutex_lock(&stream->lock); + const lttng::pthread::lock_guard stream_lock(stream->lock); if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; + continue; } + ret = consumer_clear_stream(stream); if (ret) { - goto error_unlock; + return ret; } - next: - pthread_mutex_unlock(&stream->lock); } - return LTTCOMM_CONSUMERD_SUCCESS; -error_unlock: - pthread_mutex_unlock(&stream->lock); - return ret; + return LTTCOMM_CONSUMERD_SUCCESS; } int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel) @@ -5142,71 +5046,60 @@ end: enum lttcomm_return_code lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel) { - struct lttng_consumer_stream *stream; enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS; if (channel->metadata_stream) { ERR("Open channel packets command attempted on a metadata channel"); - ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS; - goto end; + return LTTCOMM_CONSUMERD_INVALID_PARAMETERS; } - { - lttng::urcu::read_lock_guard read_lock; - cds_list_for_each_entry (stream, &channel->streams.head, send_node) { - enum consumer_stream_open_packet_status status; - - pthread_mutex_lock(&stream->lock); - if (cds_lfht_is_node_deleted(&stream->node.node)) { - goto next; - } + const lttng::urcu::read_lock_guard read_lock; + for (auto stream : lttng::urcu::list_iteration_adapter( + channel->streams.head)) { + enum consumer_stream_open_packet_status status; - status = consumer_stream_open_packet(stream); - switch (status) { - case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = true; - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: - DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, - stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: - /* - * Only unexpected internal errors can lead to this - * failing. Report an unknown error. - */ - ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 - ", channel id = %" PRIu64 ", channel name = %s" - ", session id = %" PRIu64, - stream->key, - channel->key, - channel->name, - channel->session_id); - ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR; - goto error_unlock; - default: - abort(); - } + const lttng::pthread::lock_guard stream_lock(stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + continue; + } - next: - pthread_mutex_unlock(&stream->lock); + status = consumer_stream_open_packet(stream); + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, + stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* + * Only unexpected internal errors can lead to this + * failing. Report an unknown error. + */ + ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64 + ", channel id = %" PRIu64 ", channel name = %s" + ", session id = %" PRIu64, + stream->key, + channel->key, + channel->name, + channel->session_id); + return LTTCOMM_CONSUMERD_UNKNOWN_ERROR; + default: + abort(); } } -end_rcu_unlock: -end: - return ret; -error_unlock: - pthread_mutex_unlock(&stream->lock); - goto end_rcu_unlock; + return ret; } void lttng_consumer_sigbus_handle(void *addr)