From 8d18bcaede97b6d87e434310da9c13d269983c77 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Thu, 14 May 2020 14:24:17 -0400 Subject: [PATCH] Fix: consumerd: live client receives incomplete metadata MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Observed issue ============== Babeltrace 1.5.x and Babeltrace 2.x can both report errors (albeit differently) when using the "lttng-live" protocol that imply that the metadata they received is incomplete. For instance, babeltrace 1.5.3 reports the following error: ``` [error] Error creating AST [error] [Context] Cannot open_mmap_trace of format ctf. [error] Error adding trace [warning] [Context] Cannot open_trace of format lttng-live at path net://localhost:xxxx/host/session/live_session. [warning] [Context] cannot open trace "net://localhost:xxxx/host/session/live_session" for reading. [error] opening trace "net://localhost:xxxx/host/session/live_session" for reading. [error] none of the specified trace paths could be opened. ``` While debugging both viewers, I noticed that both were attempting to receive the available metadata before consuming the "data" streams' content. Typically, the following exchange between the relay daemon and the lttng-live client occurs when the problem is observed: bt lttng-live: emits LTTNG_VIEWER_GET_METADATA command relayd: returns LTTNG_VIEWER_METADATA_OK, len = 4096 (default packet size) bt lttng-live: consume 4096 bytes of metadata emits LTTNG_VIEWER_GET_METADATA command relayd: returns LTTNG_VIEWER_NO_NEW_METADATA When the lttng-live client receives the LTTNG_VIEWER_NO_NEW_METADATA status code, it attempts to parse all the metadata it has received since the last LTTNG_VIEWER_NO_NEW_METADATA reply. In effect, it is expected that this forms a logical unit of metadata that is parseable on its own. If this is the first time metadata is received for that trace, the metadata is expected to contain a trace declaration, packet header declaration, etc. If metadata was already received, it is expected that the newly parsed declarations can be "appended" to the existing trace schema. It appears that the relay daemon sends the LTTNG_VIEWER_NO_NEW_METADATA while the metadata it has sent up to that point is not parseable on its own. The live protocol description does not require or imply that a viewer should attempt to parse metadata packets until it hopefully succeeds at some point. Anyhow: 1) This would make it impossible for a live viewer to correctly handle a corrupted metadata stream beyond retrying forever, 2) This behaviour is not implemented by the two reference implementations of the protocol. Cause ===== The relay daemon provides a guarantee that it will send any available metadata before allowing a data stream packet to be served to the client. In other words, a client requesting a data packet will receive the LTTNG_VIEWER_FLAG_NEW_METADATA status code (and no data) if it attempts to get a data stream packet while the relay daemon has metadata already available. This guarantee is properly enforced as far as I can tell. However, looking at the consumer daemon implementation, it appears that metadata packets are sent as soon as they are available. A metadata packet is not guaranteed to be parseable on its own. For instance, it can end in the middle the an event declaration. Hence, this hints at a race involving the tracer, the consumer daemon, the relay daemon, and the lttng-live client. Consider the following scenario: - Metadata packets (sub-buffers) are configured to be 4kB in size, - a large number of kernel events are enabled (e.g. --kernel --all), - the network connection between the consumer and relay daemons is slow 1) The kernel tracer will produce enough TSDL metadata to fill the first sub-buffer of the "metadata" ring-buffer and signal the consumer daemon that a buffer is ready. The tracer then starts writing the remaining data in the following available sub-buffers. 2) The consumer daemon metadata thread is woken up and consumes the first metadata sub-buffer and sends it to the relay daemon. 3) A live client establishes an lttng-live connection to the relay daemon and attempts to consume the available metadata. It receives the first packet and, since the relay daemon doesn't know about any follow-up metadata, receives LTTNG_VIEWER_NO_NEW_METADATA on the next attempt. 4) Having received LTTNG_VIEWER_NO_NEW_METADATA, the lttng-live client attempts to parse the metadata it has received and fails. This scenario is easy to reproduce by inserting a "sleep(1)" at src/bin/lttng-relayd/main.c:1978 (as of this revision). This simulates a relay daemon that would be slow to receive/process metadata packets from the consumer daemon. This problem similarly applies to the user space tracer. Solution ======== Having no knowledge of TSDL, the relay daemon can't "bundle" packets of metadata until they form a "parseable unit" to send to the consumer daemon. To provide the parseability guarantee expected by the viewers, and by the relay daemon implicitly, we need to ensure that the consumer daemons only send "parseable units" of metadata to the relay daemon. Unfortunately, the consumer daemons do not know how to parse TSDL either. In fact, only the metadata producers are able to provide the boundaries of the "parseable units" of metadata. The general idea of the fix is to accumulate metadata up to a point where a "parseable unit" boundary has been identified and send that content in one request to the relay daemon. Note that the solution described here only concerns the live mode. In other cases, the mechanisms described are simply bypassed. A "metadata bucket" is added to lttng_consumer_stream when it is created from a live channel. This bucket is filled until the consumption position reaches the "parseable unit" end position. A refresher about the handling of metadata in live mode ------------------------------------------------------- Three "events" are of interest here and can cause metadata to be consumed more or less indirectly: 1) A metadata packet is closed, causing the metadata thread to wake up 2) The live timer expires 3) A data sub-buffer is closed, causing the data thread to wake-up 1) The first case is simple and happens regardless of whether or not the tracing session is in live mode or not. Metadata is always consumed by the metadata thread in the same way. However, this scenario can be "caused" by (2) and (3). See [1]. A sub-buffer is "acquired" from the metadata ring-buffer and sent to the relayd daemon as the payload of a "RELAYD_SEND_METADATA" command. 2) When the live timer expires [2], the 'check_stream' function is called on all data streams of the session. As its name clearly implies, this function is responsible for flushing all streams or sending a "live beacon" (called an "empty index" in the code) if there is no data to flush. Any flushed data will result in (3). 3) When a data sub-buffer is ready to be consumed, [1] is invoked by the data thread. This function acquires a sub-buffer and sends it to the relay daemon through the data connection. Then, an important synchronization step takes place. The index of the newly-sent packet will be sent through the control connection. The relay daemon waits for both the data packet and its matching index before making the new packet visible to live viewers. Since a data packet could contain data that requires "newer" metadata to be decoded, the data thread flushes the metadata stream and enters a "waiting" phase to pause until all metadata present in the metadata ring buffer has been consumed [3]. At the end of this waiting phase, the data thread sends the data packet's index to the relay daemon, allowing the relayd to make it visible to its live clients. How to identify a "parseable unit" boundary? -------------------------------------------- In the case of the kernel domain, the kernel tracer produces the actual TSDL descriptions directly. The TSDL metadata is serialized to a metadata cache and is flushed "just in time" to the metadata ring-buffer when a "get next" operation is performed. There is no way, from user space, to query whether or not the metadata cache of the kernel tracer is empty. Hence, a new RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK command was added to query whether or not the kernel tracer's metadata cache is empty when acquiring a sub-buffer. This allows the consumer daemon to identify a "coherent" position in the metadata stream that is safe to use as a "parseable unit" boundary. As for the user space domain, since the session daemon is responsible for generating the TSDL representation of the metadata, there is no need to change LTTng-ust APIs. The session daemon generates coherent units of metadata and adds them to its "registry" at once (protected by the registry's lock). It then flushes the contents to the consumer daemon and waits for that data to be consumed before proceeding further. On the consumer daemon side, the metadata cache is filled with the newly-produced contents. This is done atomically with respect to accesses to the metadata cache as all accesses happen through a dedicated metadata cache lock. When the consumer's metadata polling thread is woken-up, it will attempt to acquire (`get_next`) a sub-buffer from the metadata stream ring-buffer. If it fails, it will flush a sub-buffer's worth of metadata to the ring-buffer and attempt to acquire a sub-buffer again. At this point, it is possible to determine if that sub-buffer is the last one of a parseable metadata unit: the cache must be empty and the ring-buffer must be empty following the consumption of this sub-buffer. When those conditions are met, the resulting metadata `stream_subbuffer` is tagged as being `coherent`. Metadata bucket --------------- A helper interface, metadata_bucket, is introduced as part of this fix. A metadata_bucket is `fill`ed with `stream_subbuffer`s, and is eventually `flushed` when it is filled by a `coherent` sub-buffer. As older versions of LTTng-modules must remain supported, this new helper is not used when the RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK operation is not available. When the operation is available, the metadata stream's bucketization is enabled, causing a bucket to be created and the `consume` callback to be swapped. The `consume` callback of the metadata streams is replaced by a new implementation when the metadata bucketization is activated on the stream. This implementation returns the padded size of the consumed sub-buffer when they could be added to the bucket. When the bucket is flushed, the regular `mmap`-based consumption function is called with the bucket's contents. Known drawbacks =============== This implementation causes the consumer daemon to buffer the whole initial unit of metadata before sending it. In practice, this is not expected to be a problem since the largest metadata files we have seen in real use are a couple of megabytes wide. Beyond the (temporary) memory use, this causes the metadata thread to block while this potentially large chunk of metadata is sent (rather than blocking while sending 4kb at a time). The second point is just a consequence of existing shortcomings of the consumerd; slow IO should not affect other unrelated streams. The fundamental problem is that blocking IO is used and we should switch to non-blocking communication if this is a problem (as is done in the relay daemon). The first point is more problematic given the existing tracer APIs. If the tracer could provide the boundary of a "parseable unit" of metadata, we could send the header of the RELAYD_SEND_METADATA command with that size and send the various metadata packets as they are made available. This would make no difference to the relay daemon as it is not blocking on that socket and will not make the metadata size change visible to the "live server" until it has all been received. This size can't be determined right now since it could exceed the total size of the "metadata" ring buffer. In other words, we can't wait for the production of metadata to complete before starting to consume. Finally, while implementing this fix, I also realized that the computation of the rotation position of the metadata streams is erroneous. The rotation code makes use of the ring-buffer's positions to determine the rotation position. However, since both user space and kernel domains make use of a "cache" behind the ring-buffer, that cached content must be taken into account when computing the metadata stream's rotation position. References ========== [1] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer.c#L3433 [2] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-timer.c#L312 [3] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-stream.c#L492 Signed-off-by: Jérémie Galarneau Change-Id: I40ee07e5c344c72d9aae2b9b15dc36c00b21e5fa --- src/common/consumer/Makefile.am | 3 +- src/common/consumer/consumer-stream.c | 64 +++++++- src/common/consumer/consumer-stream.h | 9 ++ src/common/consumer/consumer.c | 1 - src/common/consumer/consumer.h | 10 +- src/common/consumer/metadata-bucket.c | 150 +++++++++++++++++++ src/common/consumer/metadata-bucket.h | 34 +++++ src/common/kernel-consumer/kernel-consumer.c | 100 +++++++++++-- src/common/ust-consumer/ust-consumer.c | 106 ++++++++++--- 9 files changed, 446 insertions(+), 31 deletions(-) create mode 100644 src/common/consumer/metadata-bucket.c create mode 100644 src/common/consumer/metadata-bucket.h diff --git a/src/common/consumer/Makefile.am b/src/common/consumer/Makefile.am index c62831289..33f043cbc 100644 --- a/src/common/consumer/Makefile.am +++ b/src/common/consumer/Makefile.am @@ -5,7 +5,8 @@ noinst_HEADERS = consumer-metadata-cache.h consumer-timer.h \ consumer-testpoint.h libconsumer_la_SOURCES = consumer.c consumer.h consumer-metadata-cache.c \ - consumer-timer.c consumer-stream.c consumer-stream.h + consumer-timer.c consumer-stream.c consumer-stream.h \ + metadata-bucket.c metadata-bucket.h libconsumer_la_LIBADD = \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 4243991a4..1b8d2f902 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -31,6 +31,7 @@ #include #include #include +#include #include "consumer-stream.h" @@ -163,7 +164,7 @@ static ssize_t consumer_stream_consume_mmap( subbuffer->info.data.subbuf_size; return lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuffer->buffer.buffer, padding_size); + stream, &subbuffer->buffer.buffer, padding_size); } static ssize_t consumer_stream_consume_splice( @@ -405,6 +406,10 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream, stream->metadata_version = subbuffer->info.metadata.version; stream->reset_metadata_flag = 1; + if (stream->metadata_bucket) { + metadata_bucket_reset(stream->metadata_bucket); + } + if (stream->read_subbuffer_ops.reset_metadata) { stream->read_subbuffer_ops.reset_metadata(stream); } @@ -736,6 +741,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) { assert(stream); + metadata_bucket_destroy(stream->metadata_bucket); call_rcu(&stream->node.head, free_stream_rcu); } @@ -1001,3 +1007,59 @@ bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream) assert(stream); return cds_lfht_is_node_deleted(&stream->node.node); } + +static ssize_t metadata_bucket_flush( + const struct stream_subbuffer *buffer, void *data) +{ + ssize_t ret; + struct lttng_consumer_stream *stream = data; + + ret = consumer_stream_consume_mmap(NULL, stream, buffer); + if (ret < 0) { + goto end; + } +end: + return ret; +} + +static ssize_t metadata_bucket_consume( + struct lttng_consumer_local_data *unused, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + ssize_t ret; + enum metadata_bucket_status status; + + status = metadata_bucket_fill(stream->metadata_bucket, subbuffer); + switch (status) { + case METADATA_BUCKET_STATUS_OK: + /* Return consumed size. */ + ret = subbuffer->buffer.buffer.size; + break; + default: + ret = -1; + } + + return ret; +} + +int consumer_stream_enable_metadata_bucketization( + struct lttng_consumer_stream *stream) +{ + int ret = 0; + + assert(stream->metadata_flag); + assert(!stream->metadata_bucket); + assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); + + stream->metadata_bucket = metadata_bucket_create( + metadata_bucket_flush, stream); + if (!stream->metadata_bucket) { + ret = -1; + goto end; + } + + stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume; +end: + return ret; +} diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index b6be53399..7f6b84731 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -121,4 +121,13 @@ int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream); */ bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream); +/* + * Enable metadata bucketization. This must only be enabled if the tracer + * provides a reliable metadata `coherent` flag. + * + * This must be called on initialization before any subbuffer is consumed. + */ +int consumer_stream_enable_metadata_bucketization( + struct lttng_consumer_stream *stream); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 86caf6841..d4b0cab81 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1584,7 +1584,6 @@ end: * Returns the number of bytes written */ ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, unsigned long padding) diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index ed595f988..582d18e52 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -274,6 +274,14 @@ struct stream_subbuffer { unsigned long subbuf_size; unsigned long padded_subbuf_size; uint64_t version; + /* + * Left unset when unsupported. + * + * Indicates that this is the last sub-buffer of + * a series of sub-buffer that makes-up a coherent + * (parseable) unit of metadata. + */ + LTTNG_OPTIONAL(bool) coherent; } metadata; struct { unsigned long subbuf_size; @@ -632,6 +640,7 @@ struct lttng_consumer_stream { on_sleep_cb on_sleep; unlock_cb unlock; } read_subbuffer_ops; + struct metadata_bucket *metadata_bucket; }; /* @@ -963,7 +972,6 @@ struct lttng_consumer_local_data *lttng_consumer_create( int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, unsigned long padding); diff --git a/src/common/consumer/metadata-bucket.c b/src/common/consumer/metadata-bucket.c new file mode 100644 index 000000000..1ee5022e5 --- /dev/null +++ b/src/common/consumer/metadata-bucket.c @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#include "metadata-bucket.h" + +#include +#include +#include +#include +#include + +struct metadata_bucket { + struct lttng_dynamic_buffer content; + struct { + metadata_bucket_flush_cb fn; + void *data; + } flush; + unsigned int buffer_count; +}; + +struct metadata_bucket *metadata_bucket_create( + metadata_bucket_flush_cb flush, void *data) +{ + struct metadata_bucket *bucket; + + bucket = zmalloc(sizeof(typeof(*bucket))); + if (!bucket) { + PERROR("Failed to allocate buffer bucket"); + goto end; + } + + bucket->flush.fn = flush; + bucket->flush.data = data; + lttng_dynamic_buffer_init(&bucket->content); +end: + return bucket; +} + +void metadata_bucket_destroy(struct metadata_bucket *bucket) +{ + if (!bucket) { + return; + } + + if (bucket->content.size > 0) { + WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u", + bucket->content.size, bucket->buffer_count); + } + + lttng_dynamic_buffer_reset(&bucket->content); + free(bucket); +} + +void metadata_bucket_reset(struct metadata_bucket *bucket) +{ + lttng_dynamic_buffer_reset(&bucket->content); + lttng_dynamic_buffer_init(&bucket->content); + bucket->buffer_count = 0; +} + +enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, + const struct stream_subbuffer *buffer) +{ + ssize_t ret; + struct lttng_buffer_view flushed_view; + struct stream_subbuffer flushed_subbuffer; + enum metadata_bucket_status status; + const bool should_flush = + LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent); + const size_t padding_this_buffer = + buffer->info.metadata.padded_subbuf_size - + buffer->info.metadata.subbuf_size; + size_t flush_size; + + DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s", + buffer->buffer.buffer.size, + buffer->info.metadata.subbuf_size, + buffer->info.metadata.padded_subbuf_size, + buffer->info.metadata.coherent.value ? "true" : "false"); + /* + * If no metadata was accumulated and this buffer should be + * flushed, don't copy it unecessarily; just flush it directly. + */ + if (!should_flush || bucket->buffer_count != 0) { + /* + * Append the _padded_ subbuffer since they are combined + * into a single "virtual" subbuffer that will be + * flushed at once. + * + * This means that some padding will be sent over the + * network, but should not represent a large amount + * of data as incoherent subbuffers are typically + * pretty full. + * + * The padding of the last subbuffer (coherent) added to + * the bucket is not sent, which is what really matters + * from an efficiency point of view. + */ + ret = lttng_dynamic_buffer_append_view( + &bucket->content, &buffer->buffer.buffer); + if (ret) { + status = METADATA_BUCKET_STATUS_ERROR; + goto end; + } + } + + bucket->buffer_count++; + if (!should_flush) { + status = METADATA_BUCKET_STATUS_OK; + goto end; + } + + flushed_view = bucket->content.size != 0 ? + lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) : + lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1); + + /* + * The flush is done with the size of all padded sub-buffers, except + * for the last one which we can safely "trim". The padding of the last + * packet will be reconstructed by the relay daemon. + */ + flush_size = flushed_view.size - padding_this_buffer; + + flushed_subbuffer = (typeof(flushed_subbuffer)) { + .buffer.buffer = flushed_view, + .info.metadata.subbuf_size = flush_size, + .info.metadata.padded_subbuf_size = flushed_view.size, + .info.metadata.version = buffer->info.metadata.version, + .info.metadata.coherent = buffer->info.metadata.coherent, + }; + + DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)", + flushed_view.size, bucket->buffer_count, + bucket->buffer_count > 1 ? "s" : ""); + ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data); + if (ret >= 0) { + status = METADATA_BUCKET_STATUS_OK; + } else { + status = METADATA_BUCKET_STATUS_ERROR; + } + + metadata_bucket_reset(bucket); + +end: + return status; +} diff --git a/src/common/consumer/metadata-bucket.h b/src/common/consumer/metadata-bucket.h new file mode 100644 index 000000000..0355eb3c0 --- /dev/null +++ b/src/common/consumer/metadata-bucket.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#ifndef METADATA_BUCKET_H +#define METADATA_BUCKET_H + +#include + +struct metadata_bucket; + +typedef ssize_t (*metadata_bucket_flush_cb)( + const struct stream_subbuffer *buffer, void *data); + +enum metadata_bucket_status { + METADATA_BUCKET_STATUS_OK, + METADATA_BUCKET_STATUS_ERROR, +}; + +struct metadata_bucket *metadata_bucket_create( + metadata_bucket_flush_cb flush, void *data); + +void metadata_bucket_destroy(struct metadata_bucket *bucket); + +enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, + const struct stream_subbuffer *buffer); + +void metadata_bucket_reset(struct metadata_bucket *bucket); + +#endif /* METADATA_BUCKET_H */ + diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f9da62b15..7e0b36540 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -46,7 +47,7 @@ #include #include #include -#include +#include #include "kernel-consumer.h" @@ -297,7 +298,7 @@ static int lttng_kconsumer_snapshot_channel( subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); /* @@ -1551,6 +1552,42 @@ end: return ret; } +static +int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + bool coherent; + + ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, + &coherent); + if (ret) { + goto end; + } + + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s", + subbuffer->info.metadata.padded_subbuf_size, + coherent ? "true" : "false"); +end: + return ret; +} + static int put_next_subbuffer(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) @@ -1569,15 +1606,53 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream, return ret; } -static void lttng_kconsumer_set_stream_ops( +static +bool is_get_next_check_metadata_available(int tracer_fd) +{ + return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) != + -ENOTTY; +} + +static +int lttng_kconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { - if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_mmap; - } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_splice; + int ret = 0; + + if (stream->metadata_flag && stream->chan->is_live) { + DBG("Attempting to enable metadata bucketization for live consumers"); + if (is_get_next_check_metadata_available(stream->wait_fd)) { + DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached"); + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata_check; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } else { + /* + * The kernel tracer version is too old to indicate + * when the metadata stream has reached a "coherent" + * (parseable) point. + * + * This means that a live viewer may see an incoherent + * sequence of metadata and fail to parse it. + */ + WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream"); + metadata_bucket_destroy(stream->metadata_bucket); + stream->metadata_bucket = NULL; + } + } + + if (!stream->read_subbuffer_ops.get_next_subbuffer) { + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; + } } if (stream->metadata_flag) { @@ -1593,6 +1668,8 @@ static void lttng_kconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -1633,7 +1710,10 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } - lttng_kconsumer_set_stream_ops(stream); + ret = lttng_kconsumer_set_stream_ops(stream); + if (ret) { + goto error_close_fd; + } /* we return 0 to let the library handle the FD internally */ return 0; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 832c44371..03bc5fc5e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -47,6 +47,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -1240,7 +1241,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { @@ -2447,7 +2448,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) assert(write_len != 0); if (write_len < 0) { ERR("Writing one metadata packet"); - ret = -1; + ret = write_len; goto end; } stream->ust_metadata_pushed += write_len; @@ -2793,28 +2794,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) { int ret; + bool cache_empty; + bool got_subbuffer; + bool coherent; + bool buffer_empty; + unsigned long consumed_pos, produced_pos; - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - ret = commit_one_metadata_packet(stream); - if (ret < 0) { - goto end; - } else if (ret == 0) { - /* Not an error, the cache is empty. */ - ret = -ENODATA; - goto end; + do { + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret == 0) { + got_subbuffer = true; + } else { + got_subbuffer = false; + if (ret != -EAGAIN) { + /* Fatal error. */ + goto end; + } } - ret = ustctl_get_next_subbuf(stream->ustream); - if (ret) { - goto end; + /* + * Determine if the cache is empty and ensure that a sub-buffer + * is made available if the cache is not empty. + */ + if (!got_subbuffer) { + ret = commit_one_metadata_packet(stream); + if (ret < 0 && ret != -ENOBUFS) { + goto end; + } else if (ret == 0) { + /* Not an error, the cache is empty. */ + cache_empty = true; + ret = -ENODATA; + goto end; + } else { + cache_empty = false; + } + } else { + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + cache_empty = stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed; + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } - } + } while (!got_subbuffer); + /* Populate sub-buffer infos and view. */ ret = get_next_subbuffer_common(stream, subbuffer); if (ret) { goto end; } + + ret = lttng_ustconsumer_sample_snapshot_positions(stream); + if (ret < 0) { + /* + * -EAGAIN is not expected since we got a sub-buffer and haven't + * pushed the consumption position yet (on put_next). + */ + PERROR("Failed to take a snapshot of metadata buffer positions"); + goto end; + } + + ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret) { + PERROR("Failed to get metadata consumed position"); + goto end; + } + + ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret) { + PERROR("Failed to get metadata produced position"); + goto end; + } + + /* Last sub-buffer of the ring buffer ? */ + buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos; + + /* + * The sessiond registry lock ensures that coherent units of metadata + * are pushed to the consumer daemon at once. Hence, if a sub-buffer is + * acquired, the cache is empty, and it is the only available sub-buffer + * available, it is safe to assume that it is "coherent". + */ + coherent = got_subbuffer && cache_empty && buffer_empty; + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); end: return ret; } @@ -2834,9 +2895,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream, return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; } -static void lttng_ustconsumer_set_stream_ops( +static int lttng_ustconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { + int ret = 0; + stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; if (stream->metadata_flag) { stream->read_subbuffer_ops.get_next_subbuffer = @@ -2845,7 +2908,14 @@ static void lttng_ustconsumer_set_stream_ops( extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = metadata_stream_reset_cache; - stream->read_subbuffer_ops.on_sleep = signal_metadata; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.on_sleep = signal_metadata; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } } else { stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer; @@ -2859,6 +2929,8 @@ static void lttng_ustconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } /* -- 2.34.1