From: Jérémie Galarneau Date: Thu, 29 Jun 2023 18:04:37 +0000 (-0400) Subject: Fix: consumerd: slow metadata push slows down application registration X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=f40b76aed659ff694cf948bf8ebd1d4b5741c986;p=lttng-tools.git Fix: consumerd: slow metadata push slows down application registration Issue observed -------------- When rotating the channels of a session configured with a "per-pid" buffer sharing policy, applications with a long registration timeout (e.g. LTTNG_UST_REGISTER_TIMEOUT=-1, see LTTNG-UST(3)) sometimes experience long start-up times. Cause ----- The session list lock is held during the registration of an application and during the setup of a rotation. While setting up a rotation in the userspace domain, the session daemon flushes its metadata cache to the userspace consumer daemon and waits for a confirmation that all metadata emitted before that point in time has been serialized (whether on disk or sent through a network output). As the consumer daemon waits for the metadata to be consumed, it periodically checks the metadata stream's output position with a 200ms delay (see DEFAULT_METADATA_AVAILABILITY_WAIT_TIME). In practice, in per-uid mode, this delay is seldomly encountered since the metadata has already been pushed by the consumption thread. Moreover, if it was not, a single polling iteration will typically suffice. However, in per-pid buffering mode and with a sustained "heavy" data production rate, this delay becomes problematic since: - metadata is pushed for every application, - the delay is hit almost systematically as the consumption thread is busy and has to catch up to consume the most recent metadata. Hence, some rotation setups can easily take multiple seconds (at least 200ms per application). This makes the locking scheme employed on that path unsuitable as it blocks some operations (like application registrations) for an extended period of time. Solution -------- The polling "back-off" delay is eliminated by using a waiter that allows the consumer daemon thread that runs the metadata push command to wake-up whenever the criteria used to evaluate the "pushed" metadata position are changed. Those criteria are: - the metadata stream's pushed position - the lifetime of the metadata channel's stream - the status of the session's endpoint Whenever those states are affected, the waiters are woken-up to force a re-evaluation of the metadata cache flush position and, eventually, cause the metadata push command to complete. Note ---- The waiter queue is adapted from urcu-wait.h of liburcu (also LGPL licensed). Change-Id: Ib86c2e878abe205c73f930e6de958c0b10486a37 Signed-off-by: Jérémie Galarneau --- diff --git a/.clang-format b/.clang-format index f854e32c9..28ea69873 100644 --- a/.clang-format +++ b/.clang-format @@ -54,6 +54,7 @@ ForEachMacros: - 'cds_list_for_each_entry_safe' - 'for_each_action_mutable' - 'for_each_action_const' + - 'cds_wfs_for_each_blocking_safe' IncludeBlocks: Regroup IncludeCategories: diff --git a/src/bin/lttng-sessiond/notification-thread-events.cpp b/src/bin/lttng-sessiond/notification-thread-events.cpp index aaf24b3f0..f18bc68d4 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.cpp +++ b/src/bin/lttng-sessiond/notification-thread-events.cpp @@ -3316,12 +3316,12 @@ end: free(cmd); cmd = nullptr; } else { - lttng_waiter_wake_up(&cmd->reply_waiter); + lttng_waiter_wake(&cmd->reply_waiter); } return ret; error_unlock: /* Wake-up and return a fatal error to the calling thread. */ - lttng_waiter_wake_up(&cmd->reply_waiter); + lttng_waiter_wake(&cmd->reply_waiter); cmd->reply_code = LTTNG_ERR_FATAL; error: /* Indicate a fatal error to the caller. */ diff --git a/src/common/consumer/consumer-metadata-cache.cpp b/src/common/consumer/consumer-metadata-cache.cpp index 9af9ab0fc..462e079d8 100644 --- a/src/common/consumer/consumer-metadata-cache.cpp +++ b/src/common/consumer/consumer-metadata-cache.cpp @@ -184,18 +184,16 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) /* * Check if the cache is flushed up to the offset passed in parameter. * - * Return 0 if everything has been flushed, 1 if there is data not flushed. + * Return true if everything has been flushed, false if there is data not flushed. */ -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, - uint64_t offset, - int timer) +namespace { +bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + int timer) { - int ret = 0; + bool done_flushing = false; struct lttng_consumer_stream *metadata_stream; - LTTNG_ASSERT(channel); - LTTNG_ASSERT(channel->metadata_cache); - /* * If not called from a timer handler, we have to take the * channel lock to be mutually exclusive with channel teardown. @@ -213,7 +211,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, * Having no metadata stream means the channel is being destroyed so there * is no cache to flush anymore. */ - ret = 0; + done_flushing = true; goto end_unlock_channel; } @@ -221,22 +219,57 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, pthread_mutex_lock(&channel->metadata_cache->lock); if (metadata_stream->ust_metadata_pushed >= offset) { - ret = 0; + done_flushing = true; } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { /* An inactive endpoint means we don't have to flush anymore. */ - ret = 0; + done_flushing = true; } else { /* Still not completely flushed. */ - ret = 1; + done_flushing = false; } pthread_mutex_unlock(&channel->metadata_cache->lock); pthread_mutex_unlock(&metadata_stream->lock); + end_unlock_channel: pthread_mutex_unlock(&channel->timer_lock); if (!timer) { pthread_mutex_unlock(&channel->lock); } - return ret; + return done_flushing; +} +} /* namespace */ + +/* + * Wait until the cache is flushed up to the offset passed in parameter or the + * metadata stream has been destroyed. + */ +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + bool invoked_by_timer) +{ + assert(channel); + assert(channel->metadata_cache); + + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + return; + } + + /* Metadata cache is not currently flushed, wait on wait queue. */ + for (;;) { + struct lttng_waiter waiter; + + lttng_waiter_init(&waiter); + lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter); + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + /* Wake up all waiters, ourself included. */ + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + /* Ensure proper teardown of waiter. */ + lttng_waiter_wait(&waiter); + break; + } + + lttng_waiter_wait(&waiter); + } } diff --git a/src/common/consumer/consumer-metadata-cache.hpp b/src/common/consumer/consumer-metadata-cache.hpp index 9f958ac58..529b9f075 100644 --- a/src/common/consumer/consumer-metadata-cache.hpp +++ b/src/common/consumer/consumer-metadata-cache.hpp @@ -58,8 +58,8 @@ consumer_metadata_cache_write(struct consumer_metadata_cache *cache, const char *data); int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel); void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel); -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, - uint64_t offset, - int timer); +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, + bool invoked_by_timer); #endif /* CONSUMER_METADATA_CACHE_H */ diff --git a/src/common/consumer/consumer-timer.cpp b/src/common/consumer/consumer-timer.cpp index 133ec6c0e..8c9371bae 100644 --- a/src/common/consumer/consumer-timer.cpp +++ b/src/common/consumer/consumer-timer.cpp @@ -96,7 +96,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo * - metadata_socket_lock * - Calling lttng_ustconsumer_recv_metadata(): * - channel->metadata_cache->lock - * - Calling consumer_metadata_cache_flushed(): + * - Calling consumer_wait_metadata_cache_flushed(): * - channel->timer_lock * - channel->metadata_cache->lock * @@ -105,7 +105,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo * they are held while consumer_timer_switch_stop() is * called. */ - ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1); + ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1); if (ret < 0) { channel->switch_timer_error = 1; } diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 01845871b..4823d9b9c 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -462,6 +462,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); + lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } @@ -1033,8 +1035,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; channel->is_live = is_in_live_session; - pthread_mutex_init(&channel->lock, nullptr); - pthread_mutex_init(&channel->timer_lock, nullptr); + pthread_mutex_init(&channel->lock, NULL); + pthread_mutex_init(&channel->timer_lock, NULL); + lttng_wait_queue_init(&channel->metadata_pushed_wait_queue); switch (output) { case LTTNG_EVENT_SPLICE: @@ -2130,6 +2133,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l * pointer value. */ channel->metadata_stream = nullptr; + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index 053744904..9de8d05a3 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -184,6 +185,11 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; + /* + * Wait queue awaiting updates to metadata stream's flushed position. + */ + struct lttng_wait_queue metadata_pushed_wait_queue; + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 6274cdcc5..00671a876 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -930,6 +930,8 @@ error: */ consumer_stream_destroy(metadata->metadata_stream, nullptr); metadata->metadata_stream = nullptr; + lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue); + send_streams_error: error_no_stream: end: @@ -967,7 +969,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1); if (ret < 0) { goto error; } @@ -1013,6 +1015,7 @@ error_stream: */ consumer_stream_destroy(metadata_stream, nullptr); metadata_channel->metadata_stream = nullptr; + lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue); error: return ret; @@ -1251,7 +1254,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t len, uint64_t version, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; @@ -1339,13 +1342,8 @@ int lttng_ustconsumer_recv_metadata(int sock, if (!wait) { goto end_free; } - while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { - DBG("Waiting for metadata to be flushed"); - - health_code_update(); - usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); - } + consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer); end_free: free(metadata_str); @@ -1790,7 +1788,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); ret = lttng_ustconsumer_recv_metadata( - sock, key, offset, len, version, found_channel, 0, 1); + sock, key, offset, len, version, found_channel, false, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -2551,6 +2549,7 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) goto end; } stream->ust_metadata_pushed += write_len; + lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; @@ -2599,7 +2598,7 @@ lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { status = SYNC_METADATA_STATUS_ERROR; @@ -3238,7 +3237,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait) { struct lttcomm_metadata_request_msg request; @@ -3343,8 +3342,14 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = lttng_ustconsumer_recv_metadata( - ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait); + ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, + key, + offset, + len, + version, + channel, + invoked_by_timer, + wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive diff --git a/src/common/ust-consumer/ust-consumer.hpp b/src/common/ust-consumer/ust-consumer.hpp index 7d0e00614..13de36f1f 100644 --- a/src/common/ust-consumer/ust-consumer.hpp +++ b/src/common/ust-consumer/ust-consumer.hpp @@ -51,11 +51,11 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t len, uint64_t version, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait); int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, - int timer, + bool invoked_by_timer, int wait); enum sync_metadata_status lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *metadata); @@ -195,7 +195,7 @@ static inline int lttng_ustconsumer_recv_metadata(int sock __attribute__((unused uint64_t version __attribute__((unused)), struct lttng_consumer_channel *channel __attribute__((unused)), - int timer __attribute__((unused))) + bool invoked_by_timer __attribute__((unused))) { return -ENOSYS; } @@ -204,7 +204,7 @@ static inline int lttng_ustconsumer_request_metadata(struct lttng_consumer_local __attribute__((unused)), struct lttng_consumer_channel *channel __attribute__((unused)), - int timer __attribute__((unused)), + bool invoked_by_timer __attribute__((unused)), int wait __attribute__((unused))) { return -ENOSYS; diff --git a/src/common/waiter.cpp b/src/common/waiter.cpp index c0ea3392f..d185e2997 100644 --- a/src/common/waiter.cpp +++ b/src/common/waiter.cpp @@ -7,6 +7,7 @@ */ #include "error.hpp" +#include "macros.hpp" #include "waiter.hpp" #include @@ -41,15 +42,18 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) { unsigned int i; - DBG("Beginning of waiter wait period"); - /* Load and test condition before read state */ + DBG("Beginning of waiter \"wait\" period"); + + /* Load and test condition before read state. */ cmm_smp_rmb(); for (i = 0; i < WAIT_ATTEMPTS; i++) { if (uatomic_read(&waiter->state) != WAITER_WAITING) { goto skip_futex_wait; } + caa_cpu_relax(); } + while (uatomic_read(&waiter->state) == WAITER_WAITING) { if (!futex_noasync( &waiter->state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) { @@ -63,6 +67,7 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) */ continue; } + switch (errno) { case EAGAIN: /* Value already changed. */ @@ -89,13 +94,16 @@ skip_futex_wait: if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) { break; } + caa_cpu_relax(); } + while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) { poll(nullptr, 0, 10); } + LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN); - DBG("End of waiter wait period"); + DBG("End of waiter \"wait\" period"); } /* @@ -103,7 +111,7 @@ skip_futex_wait: * execution. In this scheme, the waiter owns the node memory, and we only allow * it to free this memory when it sees the WAITER_TEARDOWN flag. */ -void lttng_waiter_wake_up(struct lttng_waiter *waiter) +void lttng_waiter_wake(struct lttng_waiter *waiter) { cmm_smp_mb(); LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING); @@ -114,6 +122,38 @@ void lttng_waiter_wake_up(struct lttng_waiter *waiter) abort(); } } + /* Allow teardown of struct urcu_wait memory. */ uatomic_or(&waiter->state, WAITER_TEARDOWN); } + +void lttng_wait_queue_init(struct lttng_wait_queue *queue) +{ + cds_wfs_init(&queue->stack); +} + +void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter) +{ + (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node); +} + +void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue) +{ + cds_wfs_head *waiters; + cds_wfs_node *iter, *iter_n; + + /* Move all waiters from the queue to our local stack. */ + waiters = __cds_wfs_pop_all(&queue->stack); + + /* Wake all waiters in our stack head. */ + cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) { + auto *waiter = lttng::utils::container_of(iter, <tng_waiter::wait_queue_node); + + /* Don't wake already running threads. */ + if (waiter->state & WAITER_RUNNING) { + continue; + } + + lttng_waiter_wake(waiter); + } +} diff --git a/src/common/waiter.hpp b/src/common/waiter.hpp index d88fe0f66..8ea0c2d83 100644 --- a/src/common/waiter.hpp +++ b/src/common/waiter.hpp @@ -23,15 +23,33 @@ struct lttng_waiter { int32_t state; }; +struct lttng_wait_queue { + struct cds_wfs_stack stack; +}; + void lttng_waiter_init(struct lttng_waiter *waiter); void lttng_waiter_wait(struct lttng_waiter *waiter); /* - * lttng_waiter_wake_up must only be called by a single waker. + * lttng_waiter_wake must only be called by a single waker. * It is invalid for multiple "wake" operations to be invoked * on a single waiter without re-initializing it before. */ -void lttng_waiter_wake_up(struct lttng_waiter *waiter); +void lttng_waiter_wake(struct lttng_waiter *waiter); + +void lttng_wait_queue_init(struct lttng_wait_queue *queue); + +/* + * Atomically add a waiter to a wait queue. + * A full memory barrier is issued before being added to the wait queue. + */ +void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter); + +/* + * Wake every waiter present in the wait queue and remove them from + * the queue. + */ +void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue); #endif /* LTTNG_WAITER_H */