From: Jérémie Galarneau Date: Thu, 29 Jun 2023 18:04:37 +0000 (-0400) Subject: Fix: consumerd: slow metadata push slows down application registration X-Git-Tag: v2.13.11~8 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=95671f5349e87cdd2ea6cb47243608e9368ab8d5;p=lttng-tools.git Fix: consumerd: slow metadata push slows down application registration Issue observed -------------- When rotating the channels of a session configured with a "per-pid" buffer sharing policy, applications with a long registration timeout (e.g. LTTNG_UST_REGISTER_TIMEOUT=-1, see LTTNG-UST(3)) sometimes experience long start-up times. Cause ----- The session list lock is held during the registration of an application and during the setup of a rotation. While setting up a rotation in the userspace domain, the session daemon flushes its metadata cache to the userspace consumer daemon and waits for a confirmation that all metadata emitted before that point in time has been serialized (whether on disk or sent through a network output). As the consumer daemon waits for the metadata to be consumed, it periodically checks the metadata stream's output position with a 200ms delay (see DEFAULT_METADATA_AVAILABILITY_WAIT_TIME). In practice, in per-uid mode, this delay is seldomly encountered since the metadata has already been pushed by the consumption thread. Moreover, if it was not, a single polling iteration will typically suffice. However, in per-pid buffering mode and with a sustained "heavy" data production rate, this delay becomes problematic since: - metadata is pushed for every application, - the delay is hit almost systematically as the consumption thread is busy and has to catch up to consume the most recent metadata. Hence, some rotation setups can easily take multiple seconds (at least 200ms per application). This makes the locking scheme employed on that path unsuitable as it blocks some operations (like application registrations) for an extended period of time. Solution -------- The polling "back-off" delay is eliminated by using a waiter that allows the consumer daemon thread that runs the metadata push command to wake-up whenever the criteria used to evaluate the "pushed" metadata position are changed. Those criteria are: - the metadata stream's pushed position - the lifetime of the metadata channel's stream - the status of the session's endpoint Whenever those states are affected, the waiters are woken-up to force a re-evaluation of the metadata cache flush position and, eventually, cause the metadata push command to complete. Note ---- The waiter queue is adapted from urcu-wait.h of liburcu (also LGPL licensed). Change-Id: Ib86c2e878abe205c73f930e6de958c0b10486a37 Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index 5733ecda8..eba9947fb 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -3286,12 +3286,12 @@ end: free(cmd); cmd = NULL; } else { - lttng_waiter_wake_up(&cmd->reply_waiter); + lttng_waiter_wake(&cmd->reply_waiter); } return ret; error_unlock: /* Wake-up and return a fatal error to the calling thread. */ - lttng_waiter_wake_up(&cmd->reply_waiter); + lttng_waiter_wake(&cmd->reply_waiter); cmd->reply_code = LTTNG_ERR_FATAL; error: /* Indicate a fatal error to the caller. */ diff --git a/src/common/consumer/consumer-metadata-cache.c b/src/common/consumer/consumer-metadata-cache.c index 1038b84e6..a5943fd33 100644 --- a/src/common/consumer/consumer-metadata-cache.c +++ b/src/common/consumer/consumer-metadata-cache.c @@ -188,17 +188,15 @@ void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel) /* * Check if the cache is flushed up to the offset passed in parameter. * - * Return 0 if everything has been flushed, 1 if there is data not flushed. + * Return true if everything has been flushed, false if there is data not flushed. */ -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, +static +bool consumer_metadata_cache_is_flushed(struct lttng_consumer_channel *channel, uint64_t offset, int timer) { - int ret = 0; + bool done_flushing = false; struct lttng_consumer_stream *metadata_stream; - assert(channel); - assert(channel->metadata_cache); - /* * If not called from a timer handler, we have to take the * channel lock to be mutually exclusive with channel teardown. @@ -216,7 +214,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, * Having no metadata stream means the channel is being destroyed so there * is no cache to flush anymore. */ - ret = 0; + done_flushing = true; goto end_unlock_channel; } @@ -224,23 +222,56 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, pthread_mutex_lock(&channel->metadata_cache->lock); if (metadata_stream->ust_metadata_pushed >= offset) { - ret = 0; + done_flushing = true; } else if (channel->metadata_stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { /* An inactive endpoint means we don't have to flush anymore. */ - ret = 0; + done_flushing = true; } else { /* Still not completely flushed. */ - ret = 1; + done_flushing = false; } pthread_mutex_unlock(&channel->metadata_cache->lock); pthread_mutex_unlock(&metadata_stream->lock); + end_unlock_channel: pthread_mutex_unlock(&channel->timer_lock); if (!timer) { pthread_mutex_unlock(&channel->lock); } - return ret; + return done_flushing; +} + +/* + * Wait until the cache is flushed up to the offset passed in parameter or the + * metadata stream has been destroyed. + */ +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, bool invoked_by_timer) +{ + assert(channel); + assert(channel->metadata_cache); + + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + return; + } + + /* Metadata cache is not currently flushed, wait on wait queue. */ + for (;;) { + struct lttng_waiter waiter; + + lttng_waiter_init(&waiter); + lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter); + if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { + /* Wake up all waiters, ourself included. */ + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + /* Ensure proper teardown of waiter. */ + lttng_waiter_wait(&waiter); + break; + } + + lttng_waiter_wait(&waiter); + } } diff --git a/src/common/consumer/consumer-metadata-cache.h b/src/common/consumer/consumer-metadata-cache.h index b8f4efadc..ae588e8d5 100644 --- a/src/common/consumer/consumer-metadata-cache.h +++ b/src/common/consumer/consumer-metadata-cache.h @@ -56,7 +56,7 @@ consumer_metadata_cache_write(struct consumer_metadata_cache *cache, const char *data); int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel); void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel); -int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, - uint64_t offset, int timer); +void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel, + uint64_t offset, bool invoked_by_timer); #endif /* CONSUMER_METADATA_CACHE_H */ diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index d0cf170da..a16ae2eb9 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -99,7 +99,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, * - metadata_socket_lock * - Calling lttng_ustconsumer_recv_metadata(): * - channel->metadata_cache->lock - * - Calling consumer_metadata_cache_flushed(): + * - Calling consumer_wait_metadata_cache_flushed(): * - channel->timer_lock * - channel->metadata_cache->lock * @@ -108,7 +108,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, * they are held while consumer_timer_switch_stop() is * called. */ - ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1); + ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1); if (ret < 0) { channel->switch_timer_error = 1; } diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index cbd3f67a9..a2df6c958 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -465,6 +465,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); + lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } @@ -1047,6 +1049,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->is_live = is_in_live_session; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); + lttng_wait_queue_init(&channel->metadata_pushed_wait_queue); switch (output) { case LTTNG_EVENT_SPLICE: @@ -2177,6 +2180,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, * pointer value. */ channel->metadata_stream = NULL; + lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 5128bcd25..53c39e6eb 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -29,6 +29,7 @@ #include #include #include +#include struct lttng_consumer_local_data; @@ -185,6 +186,11 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; + /* + * Wait queue awaiting updates to metadata stream's flushed position. + */ + struct lttng_wait_queue metadata_pushed_wait_queue; + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index e27e15ca5..b43ae58ff 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -950,6 +950,8 @@ error: */ consumer_stream_destroy(metadata->metadata_stream, NULL); metadata->metadata_stream = NULL; + lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue); + send_streams_error: error_no_stream: end: @@ -985,7 +987,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, * Ask the sessiond if we have new metadata waiting and update the * consumer metadata cache. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1); if (ret < 0) { goto error; } @@ -1032,6 +1034,7 @@ error_stream: */ consumer_stream_destroy(metadata_stream, NULL); metadata_channel->metadata_stream = NULL; + lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue); error: rcu_read_unlock(); @@ -1275,7 +1278,7 @@ void metadata_stream_reset_cache_consumed_position( */ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, uint64_t version, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait) { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; @@ -1364,13 +1367,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, if (!wait) { goto end_free; } - while (consumer_metadata_cache_flushed(channel, offset + len, timer)) { - DBG("Waiting for metadata to be flushed"); - - health_code_update(); - usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME); - } + consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer); end_free: free(metadata_str); @@ -1821,7 +1819,7 @@ end_get_channel_nosignal: health_code_update(); ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len, - version, found_channel, 0, 1); + version, found_channel, false, 1); if (ret < 0) { /* error receiving from sessiond */ goto error_push_metadata_fatal; @@ -2613,6 +2611,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) goto end; } stream->ust_metadata_pushed += write_len; + lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); assert(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); @@ -2662,7 +2661,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { status = SYNC_METADATA_STATUS_ERROR; @@ -3312,7 +3311,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream) * pushed out due to concurrent interaction with the session daemon. */ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer, int wait) + struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait) { struct lttcomm_metadata_request_msg request; struct lttcomm_consumer_msg msg; @@ -3420,7 +3419,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, health_code_update(); ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket, - key, offset, len, version, channel, timer, wait); + key, offset, len, version, channel, invoked_by_timer, wait); if (ret >= 0) { /* * Only send the status msg if the sessiond is alive meaning a positive diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index e481f0850..d8d7b6b1a 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -49,9 +49,11 @@ void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata); void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream); int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, uint64_t len, uint64_t version, - struct lttng_consumer_channel *channel, int timer, int wait); + struct lttng_consumer_channel *channel, bool invoked_by_timer, + int wait); int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_channel *channel, int timer, int wait); + struct lttng_consumer_channel *channel, bool invoked_by_timer, + int wait); enum sync_metadata_status lttng_ustconsumer_sync_metadata( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *metadata); diff --git a/src/common/waiter.c b/src/common/waiter.c index db2de6557..e72f53bd4 100644 --- a/src/common/waiter.c +++ b/src/common/waiter.c @@ -43,15 +43,18 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) { unsigned int i; - DBG("Beginning of waiter wait period"); - /* Load and test condition before read state */ + DBG("Beginning of waiter \"wait\" period"); + + /* Load and test condition before read state. */ cmm_smp_rmb(); for (i = 0; i < WAIT_ATTEMPTS; i++) { if (uatomic_read(&waiter->state) != WAITER_WAITING) { goto skip_futex_wait; } + caa_cpu_relax(); } + while (uatomic_read(&waiter->state) == WAITER_WAITING) { if (!futex_noasync(&waiter->state, FUTEX_WAIT, WAITER_WAITING, NULL, NULL, 0)) { /* @@ -64,6 +67,7 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) */ continue; } + switch (errno) { case EAGAIN: /* Value already changed. */ @@ -90,13 +94,16 @@ skip_futex_wait: if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) { break; } + caa_cpu_relax(); } + while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) { poll(NULL, 0, 10); } + assert(uatomic_read(&waiter->state) & WAITER_TEARDOWN); - DBG("End of waiter wait period"); + DBG("End of waiter \"wait\" period"); } /* @@ -105,7 +112,7 @@ skip_futex_wait: * it to free this memory when it sees the WAITER_TEARDOWN flag. */ LTTNG_HIDDEN -void lttng_waiter_wake_up(struct lttng_waiter *waiter) +void lttng_waiter_wake(struct lttng_waiter *waiter) { cmm_smp_mb(); assert(uatomic_read(&waiter->state) == WAITER_WAITING); @@ -117,6 +124,44 @@ void lttng_waiter_wake_up(struct lttng_waiter *waiter) abort(); } } + /* Allow teardown of struct urcu_wait memory. */ uatomic_or(&waiter->state, WAITER_TEARDOWN); } + + +LTTNG_HIDDEN +void lttng_wait_queue_init(struct lttng_wait_queue *queue) +{ + cds_wfs_init(&queue->stack); +} + +LTTNG_HIDDEN +void lttng_wait_queue_add(struct lttng_wait_queue *queue, + struct lttng_waiter *waiter) +{ + (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node); +} + +LTTNG_HIDDEN +void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue) +{ + struct cds_wfs_head *waiters; + struct cds_wfs_node *iter, *iter_n; + + /* Move all waiters from the queue to our local stack. */ + waiters = __cds_wfs_pop_all(&queue->stack); + + /* Wake all waiters in our stack head. */ + cds_wfs_for_each_blocking_safe(waiters, iter, iter_n) { + struct lttng_waiter *waiter = + container_of(iter, struct lttng_waiter, wait_queue_node); + + /* Don't wake already running threads. */ + if (waiter->state & WAITER_RUNNING) { + continue; + } + + lttng_waiter_wake(waiter); + } +} diff --git a/src/common/waiter.h b/src/common/waiter.h index f4b73c744..ed84ac3c2 100644 --- a/src/common/waiter.h +++ b/src/common/waiter.h @@ -22,6 +22,10 @@ struct lttng_waiter { int32_t state; }; +struct lttng_wait_queue { + struct cds_wfs_stack stack; +}; + LTTNG_HIDDEN void lttng_waiter_init(struct lttng_waiter *waiter); @@ -29,11 +33,29 @@ LTTNG_HIDDEN void lttng_waiter_wait(struct lttng_waiter *waiter); /* - * lttng_waiter_wake_up must only be called by a single waker. + * lttng_waiter_wake must only be called by a single waker. * It is invalid for multiple "wake" operations to be invoked * on a single waiter without re-initializing it before. */ LTTNG_HIDDEN -void lttng_waiter_wake_up(struct lttng_waiter *waiter); +void lttng_waiter_wake(struct lttng_waiter *waiter); + +LTTNG_HIDDEN +void lttng_wait_queue_init(struct lttng_wait_queue *queue); + +/* + * Atomically add a waiter to a wait queue. + * A full memory barrier is issued before being added to the wait queue. + */ +LTTNG_HIDDEN +void lttng_wait_queue_add(struct lttng_wait_queue *queue, + struct lttng_waiter *waiter); + +/* + * Wake every waiter present in the wait queue and remove them from + * the queue. + */ +LTTNG_HIDDEN +void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue); #endif /* LTTNG_WAITER_H */