From: Jérémie Galarneau Date: Thu, 25 Jun 2020 23:22:24 +0000 (-0400) Subject: Fix: post-clear trace chunk has a late beginning packet X-Git-Tag: v2.12.2~22 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=fc0c52e9fb0608ede702556178a68d15a1d25af8;p=lttng-tools.git Fix: post-clear trace chunk has a late beginning packet Observed issue ============== In the following scenario: - create a regular session - enable an event that occurs on only one core - start tracing - trace for a while - stop tracing - clear the session - start - trace for a while (referred to the "active period" later on) - stop The resulting trace will contain a very short stream intersection as the active stream will contain packets spanning the entire active period. However, all other streams will contain a single packet at the end of the active period with a duration of 0 ns. This presents two problems: 1) This makes the stream intersection mode of viewers unusable, 2) This misleads the user into thinking that tracing was not active for some buffers. Cause ===== The packet beginning timestamps of the buffers are initialized on creation (on the first "start" of a tracing session). When a "clear" is performed on a session, all open packets are closed and the existing contents are purged. If a stream is inactive, it is possible for no packet to be "opened" until the "stop" of the tracing session. On stop, a "flush_empty" is performed. Such a flush opens a packet (if it was not already the case), closes it, and marks the packet as being ready for consumption. Solution ======== Attempt to flush an empty packet as close to the rotation point as possible. In the event where a stream remains inactive after the rotation point, this ensures that the new trace chunk has a beginning timestamp set at the begining of the trace chunk instead of only creating an empty packet when the trace chunk is stopped. This indicates to the viewers that the stream was being recorded, but more importantly it allows viewers to determine a useable trace intersection. This presents a problem in the case where the ring-buffer is completely full. Consider the following scenario: - The consumption of data is slow (slow network, for instance), - The ring buffer is full, - A rotation is initiated, - The flush below does nothing (no space left to open a new packet), - The other streams rotate very soon, and new data is produced in the new chunk, - This stream completes its rotation long after the rotation was initiated - The session is stopped before any event can be produced in this stream's buffers. The resulting trace chunk will have a single packet temporaly at the end of the trace chunk for this stream making the stream intersection more narrow than it should be. To work-around this, an empty flush is performed after the first consumption of a packet during a rotation if the initial flush failed. The idea is that consuming a packet frees enough space to switch packets in this scenario and allows the tracer to "stamp" the beginning of the new trace chunk at the earliest possible point. Note that metadata streams are always skipped when opening a packet. This is done for two reasons: 1) Timestamps are not relevant to the metadata stream 2) Inserting an empty packet in the metadata stream after a rotation breaks the use of "clear" in live mode. The contents of the metadata streams of successive chunks must be strict superset of one another as live clients only receive the information appended to a metadata stream (i.e. the parts it already has received can't change). If a flush_empty was performed after a clear/rotation, it would result in an empty packet being inserted at the beginning of the metadata stream that wasn't present in the first chunk. This would cause the live client and relay daemon to have mismatching copies of the metadata stream. Known drawbacks =============== In the case of an inactive stream, this results in the completed trace chunk archive containing an extra empty packet at the beginning of the stream (typically 4kB). In the case of an active stream, this change will cause the first packet to be empty or contain few events. Those are all efficiency losses that are inevitable (AFAIK) given the current buffer control APIs. It will be possible to recoup those losses if an API allowing the consumer daemon to open a new packet is introduced. As noted in the comments, this patch is not final. The flush after the rotation should not be open-coded in lttng_consumer_read_subbuffer. It should be a data-stream specific "post-consume" step. Signed-off-by: Jérémie Galarneau Change-Id: I7d8ab876e55e9d0718a55ec1bb77ec6466accc02 --- diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index fb878d72c..398d71ae0 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -452,6 +452,8 @@ struct lttng_consumer_stream *consumer_stream_create( stream->index_file = NULL; stream->last_sequence_number = -1ULL; stream->rotate_position = -1ULL; + /* Buffer is created with an open packet. */ + stream->opened_packet_in_current_trace_chunk = true; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 311c64020..6505490cd 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -63,6 +63,12 @@ struct consumer_channel_msg { uint64_t key; /* del */ }; +enum open_packet_status { + OPEN_PACKET_STATUS_OPENED, + OPEN_PACKET_STATUS_NO_SPACE, + OPEN_PACKET_STATUS_ERROR, +}; + /* Flag used to temporarily pause data consumption from testpoints. */ int data_consumption_paused; @@ -3310,6 +3316,137 @@ error_testpoint: return NULL; } +static +int consumer_flush_buffer(struct lttng_consumer_stream *stream, + int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + if (producer_active) { + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } else { + ret = kernctl_buffer_flush_empty(stream->wait_fd); + if (ret < 0) { + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect, + * but required as a fall-back when + * "flush_empty" is not implemented by + * lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +static enum open_packet_status open_packet(struct lttng_consumer_stream *stream) +{ + int ret; + enum open_packet_status status; + unsigned long produced_pos_before, produced_pos_after; + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot( + stream, &produced_pos_before); + if (ret < 0) { + ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = consumer_flush_buffer(stream, 0); + if (ret) { + ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after); + if (ret < 0) { + ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = OPEN_PACKET_STATUS_ERROR; + goto end; + } + + /* + * Determine if the flush had an effect by comparing the produced + * positons before and after the flush. + */ + status = produced_pos_before != produced_pos_after ? + OPEN_PACKET_STATUS_OPENED : + OPEN_PACKET_STATUS_NO_SPACE; +end: + return status; +} + +static bool stream_is_rotating_to_null_chunk( + const struct lttng_consumer_stream *stream) +{ + bool rotating_to_null_chunk = false; + + if (stream->rotate_position == -1ULL) { + /* No rotation ongoing. */ + goto end; + } + + if (stream->trace_chunk == stream->chan->trace_chunk || + !stream->chan->trace_chunk) { + rotating_to_null_chunk = true; + } +end: + return rotating_to_null_chunk; +} + ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx, bool locked_by_caller) @@ -3413,6 +3550,48 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + /* + * TODO roll into a post_consume op as this doesn't apply to metadata + * streams. + */ + if (!stream->opened_packet_in_current_trace_chunk && + stream->trace_chunk && !stream->metadata_flag && + !stream_is_rotating_to_null_chunk(stream)) { + const enum open_packet_status status = open_packet(stream); + + switch (status) { + case OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = + true; + break; + case OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left. + * This means that new events were produced, resulting + * in a packet being opened, which is what we want + * anyhow. + */ + DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end; + default: + abort(); + } + + stream->opened_packet_in_current_trace_chunk = true; + } + sleep_stream: if (stream->read_subbuffer_ops.on_sleep) { stream->read_subbuffer_ops.on_sleep(stream, ctx); @@ -3901,50 +4080,6 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } -static -int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) -{ - int ret = 0; - - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - if (producer_active) { - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; - } - } else { - ret = kernctl_buffer_flush_empty(stream->wait_fd); - if (ret < 0) { - /* - * Doing a buffer flush which does not take into - * account empty packets. This is not perfect, - * but required as a fall-back when - * "flush_empty" is not implemented by - * lttng-modules. - */ - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; - } - } - } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_flush_buffer(stream, producer_active); - break; - default: - ERR("Unknown consumer_data type"); - abort(); - } - -end: - return ret; -} - /* * Sample the rotate position for all the streams of a channel. If a stream * is already at the rotate position (produced == consumed), we flag it as @@ -4102,6 +4237,86 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, } stream_count++; } + + stream->opened_packet_in_current_trace_chunk = false; + + if (rotating_to_new_chunk && !stream->metadata_flag) { + /* + * Attempt to flush an empty packet as close to the + * rotation point as possible. In the event where a + * stream remains inactive after the rotation point, + * this ensures that the new trace chunk has a + * beginning timestamp set at the begining of the + * trace chunk instead of only creating an empty + * packet when the trace chunk is stopped. + * + * This indicates to the viewers that the stream + * was being recorded, but more importantly it + * allows viewers to determine a useable trace + * intersection. + * + * This presents a problem in the case where the + * ring-buffer is completely full. + * + * Consider the following scenario: + * - The consumption of data is slow (slow network, + * for instance), + * - The ring buffer is full, + * - A rotation is initiated, + * - The flush below does nothing (no space left to + * open a new packet), + * - The other streams rotate very soon, and new + * data is produced in the new chunk, + * - This stream completes its rotation long after the + * rotation was initiated + * - The session is stopped before any event can be + * produced in this stream's buffers. + * + * The resulting trace chunk will have a single packet + * temporaly at the end of the trace chunk for this + * stream making the stream intersection more narrow + * than it should be. + * + * To work-around this, an empty flush is performed + * after the first consumption of a packet during a + * rotation if open_packet fails. The idea is that + * consuming a packet frees enough space to switch + * packets in this scenario and allows the tracer to + * "stamp" the beginning of the new trace chunk at the + * earliest possible point. + */ + const enum open_packet_status status = + open_packet(stream); + + switch (status) { + case OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after a rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = + true; + break; + case OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left + * in the buffer. A new packet will be opened + * once one has been consumed. + */ + DBG("No space left to open a packet after a rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + break; + case OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end_unlock_stream; + default: + abort(); + } + } + pthread_mutex_unlock(&stream->lock); } stream = NULL; diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index c5b702357..6dedb7525 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -596,6 +596,9 @@ struct lttng_consumer_stream { */ uint64_t rotate_position; + /* Whether or not a packet was opened during the current trace chunk. */ + bool opened_packet_in_current_trace_chunk; + /* * Read-only copies of channel values. We cannot safely access the * channel from a stream, so we need to have a local copy of these