From: Jérémie Galarneau Date: Mon, 8 Feb 2021 19:40:33 +0000 (-0500) Subject: Fix: ust-consumer: metadata thread not woken-up after version change X-Git-Tag: v2.12.3~1 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=920f2ddba471d1fe16fe7e6f2d8cb9e0af34f047;p=lttng-tools.git Fix: ust-consumer: metadata thread not woken-up after version change Issue observed ============== The metadata regeneration test fails, very rarely, in the "streaming" case on the CI. The interesting part of the test boils down to: 1) start session 2) launch an app tracing one event 3) stop session 4) delete metadata file 5) start session 6) regenerate metadata 7) stop session 8) destroy session 9) read trace: babeltrace fails on an invalid metadata file. The problem is hard to capture, but modifying the test allows us to see that there appears to be a short window between steps 7 and 8 where the metadata file is empty or doesn't exist. Cause ===== When metadata is regenerated, its version is bumped and the metadata cache is "reset". In some cases, such as in this test, the new metadata will have exactly the same size as it had prior as nothing happened to change that (e.g. no new apps/probes were registered). When this occurs, the metadata thread is not woken-up by consumer_metadata_cache_write() as it sees that max_offset of the metadata cache didn't change; the data was replaced but it has the same size. The metadata consumption thread also checks for version bumps and resets the amount of consumed metadata. Hence, if the "cache write" operation woke up the metadata consumption thread, the stream's "ust metadata pushed" state would be reset and the new contents would be consumed. Solution ======== The metadata stream's "ust metadata pushed" position is directly reset to zero when a metadata version change is detected by the metadata cache. The metadata poll thread is also woken up to resume the consumption of the newly-available data. It is unclear why the change to the consumption position was only done on the metadata consumption thread's code path and not directly by the session daemon command handling. Note that a session rotation will also result in a reset of the pushed position and a wake-up of the metadata poll thread from the command handling thread. I am speculating that this couldn't be done due to the design of the locking at the time of the original implementation (I haven't checked). In implementing this change, the metadata reception code path is untangled a bit to separate the logic that affects the metadata stream from the logic that manages the metadata cache. I suspect the original error stems from a mix-up/confusion between both concerns. When a metadata version change happens, the metadata cache resets its 'max_offset' (in other words, it's current size) and notifies the caller. The caller then resets the "ust pushed metadata" position to zero and wakes-up the metadata thread to consume the new contents of the metadata cache. Known drawbacks =============== None. Signed-off-by: Jérémie Galarneau Change-Id: I142ef957140d497ac7fc4294ca65a55c12518598 --- diff --git a/src/common/consumer/consumer-metadata-cache.c b/src/common/consumer/consumer-metadata-cache.c index 822217174..acb6cfe39 100644 --- a/src/common/consumer/consumer-metadata-cache.c +++ b/src/common/consumer/consumer-metadata-cache.c @@ -23,6 +23,11 @@ #include "consumer-metadata-cache.h" +enum metadata_cache_update_version_status { + METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED, + METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED, +}; + extern struct lttng_consumer_global_data consumer_data; /* @@ -74,60 +79,23 @@ void metadata_cache_reset(struct consumer_metadata_cache *cache) * Check if the metadata cache version changed. * If it did, reset the metadata cache. * The metadata cache lock MUST be held. - * - * Returns 0 on success, a negative value on error. */ -static -int metadata_cache_check_version(struct consumer_metadata_cache *cache, - uint64_t version) +static enum metadata_cache_update_version_status metadata_cache_update_version( + struct consumer_metadata_cache *cache, uint64_t version) { - int ret = 0; + enum metadata_cache_update_version_status status; if (cache->version == version) { + status = METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED; goto end; } DBG("Metadata cache version update to %" PRIu64, version); - metadata_cache_reset(cache); cache->version = version; + status = METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED; end: - return ret; -} - -/* - * Write a character on the metadata poll pipe to wake the metadata thread. - * Returns 0 on success, -1 on error. - */ -int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel) -{ - int ret = 0; - const char dummy = 'c'; - - if (channel->monitor && channel->metadata_stream) { - ssize_t write_ret; - - write_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1], - &dummy, 1); - if (write_ret < 1) { - if (errno == EWOULDBLOCK) { - /* - * This is fine, the metadata poll thread - * is having a hard time keeping-up, but - * it will eventually wake-up and consume - * the available data. - */ - ret = 0; - } else { - PERROR("Wake-up UST metadata pipe"); - ret = -1; - goto end; - } - } - } - -end: - return ret; + return status; } /* @@ -136,23 +104,31 @@ end: * contiguous metadata in cache to the ring buffer. The metadata cache * lock MUST be acquired to write in the cache. * - * Return 0 on success, a negative value on error. + * See `enum consumer_metadata_cache_write_status` for the meaning of the + * various returned status codes. */ -int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, +enum consumer_metadata_cache_write_status +consumer_metadata_cache_write(struct lttng_consumer_channel *channel, unsigned int offset, unsigned int len, uint64_t version, char *data) { int ret = 0; struct consumer_metadata_cache *cache; + enum consumer_metadata_cache_write_status status; + bool cache_is_invalidated = false; + uint64_t original_max_offset; assert(channel); assert(channel->metadata_cache); cache = channel->metadata_cache; + ASSERT_LOCKED(cache->lock); + original_max_offset = cache->max_offset; - ret = metadata_cache_check_version(cache, version); - if (ret < 0) { - goto end; + if (metadata_cache_update_version(cache, version) == + METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) { + metadata_cache_reset(cache); + cache_is_invalidated = true; } DBG("Writing %u bytes from offset %u in metadata cache", len, offset); @@ -162,18 +138,25 @@ int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, len - cache->cache_alloc_size + offset); if (ret < 0) { ERR("Extending metadata cache"); + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR; goto end; } } memcpy(cache->data + offset, data, len); - if (offset + len > cache->max_offset) { - cache->max_offset = offset + len; - ret = consumer_metadata_wakeup_pipe(channel); + cache->max_offset = max(cache->max_offset, offset + len); + + if (cache_is_invalidated) { + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED; + } else if (cache->max_offset > original_max_offset) { + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT; + } else { + status = CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE; + assert(cache->max_offset == original_max_offset); } end: - return ret; + return status; } /* diff --git a/src/common/consumer/consumer-metadata-cache.h b/src/common/consumer/consumer-metadata-cache.h index a442dcaf2..106363edf 100644 --- a/src/common/consumer/consumer-metadata-cache.h +++ b/src/common/consumer/consumer-metadata-cache.h @@ -11,6 +11,27 @@ #include +enum consumer_metadata_cache_write_status { + CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR = -1, + /* + * New metadata content was appended to the cache successfully. + * Previously available content remains valid. + */ + CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT = 0, + /* + * The new content pushed to the cache invalidated the content that + * was already present. The contents of the cache should be re-read. + */ + CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED, + /* + * A metadata cache write can simply overwrite an already existing + * section of the cache (and it should be a write-through with identical + * data). From the caller's standpoint, there is no change to the state + * of the cache. + */ + CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE, +}; + struct consumer_metadata_cache { char *data; uint64_t cache_alloc_size; @@ -35,13 +56,13 @@ struct consumer_metadata_cache { pthread_mutex_t lock; }; -int consumer_metadata_cache_write(struct lttng_consumer_channel *channel, +enum consumer_metadata_cache_write_status +consumer_metadata_cache_write(struct lttng_consumer_channel *channel, unsigned int offset, unsigned int len, uint64_t version, char *data); int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel); void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel); int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, uint64_t offset, int timer); -int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); #endif /* CONSUMER_METADATA_CACHE_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 0a7a7ab67..0027cc41a 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -883,6 +883,43 @@ error: return outfd; } +/* + * Write a character on the metadata poll pipe to wake the metadata thread. + * Returns 0 on success, -1 on error. + */ +int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel) +{ + int ret = 0; + + DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'", + channel->name); + if (channel->monitor && channel->metadata_stream) { + const char dummy = 'c'; + const ssize_t write_ret = lttng_write( + channel->metadata_stream->ust_metadata_poll_pipe[1], + &dummy, 1); + + if (write_ret < 1) { + if (errno == EWOULDBLOCK) { + /* + * This is fine, the metadata poll thread + * is having a hard time keeping-up, but + * it will eventually wake-up and consume + * the available data. + */ + ret = 0; + } else { + PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread"); + ret = -1; + goto end; + } + } + } + +end: + return ret; +} + /* * Trigger a dump of the metadata content. Following/during the succesful * completion of this call, the metadata poll thread will start receiving diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 73189660c..f75246d36 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -1051,5 +1051,6 @@ enum lttcomm_return_code lttng_consumer_init_command( int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); enum lttcomm_return_code lttng_consumer_open_channel_packets( struct lttng_consumer_channel *channel); +int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 88520dbe2..69a07ec84 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1278,6 +1278,17 @@ error_unlock: return ret; } +static +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) +{ + ASSERT_LOCKED(stream->lock); + + DBG("Reset metadata cache of session %" PRIu64, + stream->chan->session_id); + stream->ust_metadata_pushed = 0; +} + /* * Receive the metadata updates from the sessiond. Supports receiving * overlapping metadata, but is needs to always belong to a contiguous @@ -1292,6 +1303,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; char *metadata_str; + enum consumer_metadata_cache_write_status cache_write_status; DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); @@ -1315,10 +1327,40 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, health_code_update(); pthread_mutex_lock(&channel->metadata_cache->lock); - ret = consumer_metadata_cache_write(channel, offset, len, version, - metadata_str); + cache_write_status = consumer_metadata_cache_write( + channel, offset, len, version, metadata_str); pthread_mutex_unlock(&channel->metadata_cache->lock); - if (ret < 0) { + switch (cache_write_status) { + case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE: + /* + * The write entirely overlapped with existing contents of the + * same metadata version (same content); there is nothing to do. + */ + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED: + /* + * The metadata cache was invalidated (previously pushed + * content has been overwritten). Reset the stream's consumed + * metadata position to ensure the metadata poll thread consumes + * the whole cache. + */ + pthread_mutex_lock(&channel->metadata_stream->lock); + metadata_stream_reset_cache_consumed_position( + channel->metadata_stream); + pthread_mutex_unlock(&channel->metadata_stream->lock); + /* Fall-through. */ + case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT: + /* + * In both cases, the metadata poll thread has new data to + * consume. + */ + ret = consumer_metadata_wakeup_pipe(channel); + if (ret) { + ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; + goto end_free; + } + break; + case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR: /* Unable to handle metadata. Notify session daemon. */ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; /* @@ -1327,6 +1369,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ goto end_free; + default: + abort(); } if (!wait) { @@ -2455,15 +2499,6 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) return ustctl_stream_close_wakeup_fd(stream->ustream); } -static -void metadata_stream_reset_cache_consumed_position( - struct lttng_consumer_stream *stream) -{ - DBG("Reset metadata cache of session %" PRIu64, - stream->chan->session_id); - stream->ust_metadata_pushed = 0; -} - /* * Write up to one packet from the metadata cache to the channel. * @@ -3041,6 +3076,7 @@ int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); assert(stream->ustream); + ASSERT_LOCKED(stream->lock); DBG("UST consumer checking data pending");