#include "consumer-metadata-cache.h"
+enum metadata_cache_update_version_status {
+ METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED,
+ METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED,
+};
+
extern struct lttng_consumer_global_data consumer_data;
/*
* Check if the metadata cache version changed.
* If it did, reset the metadata cache.
* The metadata cache lock MUST be held.
- *
- * Returns 0 on success, a negative value on error.
*/
-static
-int metadata_cache_check_version(struct consumer_metadata_cache *cache,
- uint64_t version)
+static enum metadata_cache_update_version_status metadata_cache_update_version(
+ struct consumer_metadata_cache *cache, uint64_t version)
{
- int ret = 0;
+ enum metadata_cache_update_version_status status;
if (cache->version == version) {
+ status = METADATA_CACHE_UPDATE_STATUS_VERSION_NOT_UPDATED;
goto end;
}
DBG("Metadata cache version update to %" PRIu64, version);
- metadata_cache_reset(cache);
cache->version = version;
+ status = METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED;
end:
- return ret;
-}
-
-/*
- * Write a character on the metadata poll pipe to wake the metadata thread.
- * Returns 0 on success, -1 on error.
- */
-int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
-{
- int ret = 0;
- const char dummy = 'c';
-
- if (channel->monitor && channel->metadata_stream) {
- ssize_t write_ret;
-
- write_ret = lttng_write(channel->metadata_stream->ust_metadata_poll_pipe[1],
- &dummy, 1);
- if (write_ret < 1) {
- if (errno == EWOULDBLOCK) {
- /*
- * This is fine, the metadata poll thread
- * is having a hard time keeping-up, but
- * it will eventually wake-up and consume
- * the available data.
- */
- ret = 0;
- } else {
- PERROR("Wake-up UST metadata pipe");
- ret = -1;
- goto end;
- }
- }
- }
-
-end:
- return ret;
+ return status;
}
/*
* contiguous metadata in cache to the ring buffer. The metadata cache
* lock MUST be acquired to write in the cache.
*
- * Return 0 on success, a negative value on error.
+ * See `enum consumer_metadata_cache_write_status` for the meaning of the
+ * various returned status codes.
*/
-int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
+enum consumer_metadata_cache_write_status
+consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
unsigned int offset, unsigned int len, uint64_t version,
char *data)
{
int ret = 0;
struct consumer_metadata_cache *cache;
+ enum consumer_metadata_cache_write_status status;
+ bool cache_is_invalidated = false;
+ uint64_t original_max_offset;
assert(channel);
assert(channel->metadata_cache);
cache = channel->metadata_cache;
+ ASSERT_LOCKED(cache->lock);
+ original_max_offset = cache->max_offset;
- ret = metadata_cache_check_version(cache, version);
- if (ret < 0) {
- goto end;
+ if (metadata_cache_update_version(cache, version) ==
+ METADATA_CACHE_UPDATE_STATUS_VERSION_UPDATED) {
+ metadata_cache_reset(cache);
+ cache_is_invalidated = true;
}
DBG("Writing %u bytes from offset %u in metadata cache", len, offset);
len - cache->cache_alloc_size + offset);
if (ret < 0) {
ERR("Extending metadata cache");
+ status = CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR;
goto end;
}
}
memcpy(cache->data + offset, data, len);
- if (offset + len > cache->max_offset) {
- cache->max_offset = offset + len;
- ret = consumer_metadata_wakeup_pipe(channel);
+ cache->max_offset = max(cache->max_offset, offset + len);
+
+ if (cache_is_invalidated) {
+ status = CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED;
+ } else if (cache->max_offset > original_max_offset) {
+ status = CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT;
+ } else {
+ status = CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE;
+ assert(cache->max_offset == original_max_offset);
}
end:
- return ret;
+ return status;
}
/*
#include <common/consumer/consumer.h>
+enum consumer_metadata_cache_write_status {
+ CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR = -1,
+ /*
+ * New metadata content was appended to the cache successfully.
+ * Previously available content remains valid.
+ */
+ CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT = 0,
+ /*
+ * The new content pushed to the cache invalidated the content that
+ * was already present. The contents of the cache should be re-read.
+ */
+ CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED,
+ /*
+ * A metadata cache write can simply overwrite an already existing
+ * section of the cache (and it should be a write-through with identical
+ * data). From the caller's standpoint, there is no change to the state
+ * of the cache.
+ */
+ CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE,
+};
+
struct consumer_metadata_cache {
char *data;
uint64_t cache_alloc_size;
pthread_mutex_t lock;
};
-int consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
+enum consumer_metadata_cache_write_status
+consumer_metadata_cache_write(struct lttng_consumer_channel *channel,
unsigned int offset, unsigned int len, uint64_t version,
char *data);
int consumer_metadata_cache_allocate(struct lttng_consumer_channel *channel);
void consumer_metadata_cache_destroy(struct lttng_consumer_channel *channel);
int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel,
uint64_t offset, int timer);
-int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
#endif /* CONSUMER_METADATA_CACHE_H */
return outfd;
}
+/*
+ * Write a character on the metadata poll pipe to wake the metadata thread.
+ * Returns 0 on success, -1 on error.
+ */
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel)
+{
+ int ret = 0;
+
+ DBG("Waking up metadata poll thread (writing to pipe): channel name = '%s'",
+ channel->name);
+ if (channel->monitor && channel->metadata_stream) {
+ const char dummy = 'c';
+ const ssize_t write_ret = lttng_write(
+ channel->metadata_stream->ust_metadata_poll_pipe[1],
+ &dummy, 1);
+
+ if (write_ret < 1) {
+ if (errno == EWOULDBLOCK) {
+ /*
+ * This is fine, the metadata poll thread
+ * is having a hard time keeping-up, but
+ * it will eventually wake-up and consume
+ * the available data.
+ */
+ ret = 0;
+ } else {
+ PERROR("Failed to write to UST metadata pipe while attempting to wake-up the metadata poll thread");
+ ret = -1;
+ goto end;
+ }
+ }
+ }
+
+end:
+ return ret;
+}
+
/*
* Trigger a dump of the metadata content. Following/during the succesful
* completion of this call, the metadata poll thread will start receiving
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
enum lttcomm_return_code lttng_consumer_open_channel_packets(
struct lttng_consumer_channel *channel);
+int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
#endif /* LIB_CONSUMER_H */
return ret;
}
+static
+void metadata_stream_reset_cache_consumed_position(
+ struct lttng_consumer_stream *stream)
+{
+ ASSERT_LOCKED(stream->lock);
+
+ DBG("Reset metadata cache of session %" PRIu64,
+ stream->chan->session_id);
+ stream->ust_metadata_pushed = 0;
+}
+
/*
* Receive the metadata updates from the sessiond. Supports receiving
* overlapping metadata, but is needs to always belong to a contiguous
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
+ enum consumer_metadata_cache_write_status cache_write_status;
DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
health_code_update();
pthread_mutex_lock(&channel->metadata_cache->lock);
- ret = consumer_metadata_cache_write(channel, offset, len, version,
- metadata_str);
+ cache_write_status = consumer_metadata_cache_write(
+ channel, offset, len, version, metadata_str);
pthread_mutex_unlock(&channel->metadata_cache->lock);
- if (ret < 0) {
+ switch (cache_write_status) {
+ case CONSUMER_METADATA_CACHE_WRITE_STATUS_NO_CHANGE:
+ /*
+ * The write entirely overlapped with existing contents of the
+ * same metadata version (same content); there is nothing to do.
+ */
+ break;
+ case CONSUMER_METADATA_CACHE_WRITE_STATUS_INVALIDATED:
+ /*
+ * The metadata cache was invalidated (previously pushed
+ * content has been overwritten). Reset the stream's consumed
+ * metadata position to ensure the metadata poll thread consumes
+ * the whole cache.
+ */
+ pthread_mutex_lock(&channel->metadata_stream->lock);
+ metadata_stream_reset_cache_consumed_position(
+ channel->metadata_stream);
+ pthread_mutex_unlock(&channel->metadata_stream->lock);
+ /* Fall-through. */
+ case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
+ /*
+ * In both cases, the metadata poll thread has new data to
+ * consume.
+ */
+ ret = consumer_metadata_wakeup_pipe(channel);
+ if (ret) {
+ ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
+ goto end_free;
+ }
+ break;
+ case CONSUMER_METADATA_CACHE_WRITE_STATUS_ERROR:
/* Unable to handle metadata. Notify session daemon. */
ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
/*
* waiting for the metadata cache to be flushed.
*/
goto end_free;
+ default:
+ abort();
}
if (!wait) {
return ustctl_stream_close_wakeup_fd(stream->ustream);
}
-static
-void metadata_stream_reset_cache_consumed_position(
- struct lttng_consumer_stream *stream)
-{
- DBG("Reset metadata cache of session %" PRIu64,
- stream->chan->session_id);
- stream->ust_metadata_pushed = 0;
-}
-
/*
* Write up to one packet from the metadata cache to the channel.
*
assert(stream);
assert(stream->ustream);
+ ASSERT_LOCKED(stream->lock);
DBG("UST consumer checking data pending");