uint64_t key; /* del */
};
+enum open_packet_status {
+ OPEN_PACKET_STATUS_OPENED,
+ OPEN_PACKET_STATUS_NO_SPACE,
+ OPEN_PACKET_STATUS_ERROR,
+};
+
/* Flag used to temporarily pause data consumption from testpoints. */
int data_consumption_paused;
return NULL;
}
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream,
+ int producer_active)
+{
+ int ret = 0;
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ if (producer_active) {
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ } else {
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
+ if (ret < 0) {
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect,
+ * but required as a fall-back when
+ * "flush_empty" is not implemented by
+ * lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ }
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_flush_buffer(stream, producer_active);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+end:
+ return ret;
+}
+
+static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ enum open_packet_status status;
+ unsigned long produced_pos_before, produced_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(
+ stream, &produced_pos_before);
+ if (ret < 0) {
+ ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = consumer_flush_buffer(stream, 0);
+ if (ret) {
+ ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after);
+ if (ret < 0) {
+ ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ status = OPEN_PACKET_STATUS_ERROR;
+ goto end;
+ }
+
+ /*
+ * Determine if the flush had an effect by comparing the produced
+ * positons before and after the flush.
+ */
+ status = produced_pos_before != produced_pos_after ?
+ OPEN_PACKET_STATUS_OPENED :
+ OPEN_PACKET_STATUS_NO_SPACE;
+end:
+ return status;
+}
+
+static bool stream_is_rotating_to_null_chunk(
+ const struct lttng_consumer_stream *stream)
+{
+ bool rotating_to_null_chunk = false;
+
+ if (stream->rotate_position == -1ULL) {
+ /* No rotation ongoing. */
+ goto end;
+ }
+
+ if (stream->trace_chunk == stream->chan->trace_chunk ||
+ !stream->chan->trace_chunk) {
+ rotating_to_null_chunk = true;
+ }
+end:
+ return rotating_to_null_chunk;
+}
+
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx,
bool locked_by_caller)
goto end;
}
+ /*
+ * TODO roll into a post_consume op as this doesn't apply to metadata
+ * streams.
+ */
+ if (!stream->opened_packet_in_current_trace_chunk &&
+ stream->trace_chunk && !stream->metadata_flag &&
+ !stream_is_rotating_to_null_chunk(stream)) {
+ const enum open_packet_status status = open_packet(stream);
+
+ switch (status) {
+ case OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk =
+ true;
+ break;
+ case OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left.
+ * This means that new events were produced, resulting
+ * in a packet being opened, which is what we want
+ * anyhow.
+ */
+ DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end;
+ default:
+ abort();
+ }
+
+ stream->opened_packet_in_current_trace_chunk = true;
+ }
+
sleep_stream:
if (stream->read_subbuffer_ops.on_sleep) {
stream->read_subbuffer_ops.on_sleep(stream, ctx);
return start_pos;
}
-static
-int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
-{
- int ret = 0;
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (producer_active) {
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
- goto end;
- }
- } else {
- ret = kernctl_buffer_flush_empty(stream->wait_fd);
- if (ret < 0) {
- /*
- * Doing a buffer flush which does not take into
- * account empty packets. This is not perfect,
- * but required as a fall-back when
- * "flush_empty" is not implemented by
- * lttng-modules.
- */
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
- goto end;
- }
- }
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_flush_buffer(stream, producer_active);
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
-end:
- return ret;
-}
-
/*
* Sample the rotate position for all the streams of a channel. If a stream
* is already at the rotate position (produced == consumed), we flag it as
}
stream_count++;
}
+
+ stream->opened_packet_in_current_trace_chunk = false;
+
+ if (rotating_to_new_chunk && !stream->metadata_flag) {
+ /*
+ * Attempt to flush an empty packet as close to the
+ * rotation point as possible. In the event where a
+ * stream remains inactive after the rotation point,
+ * this ensures that the new trace chunk has a
+ * beginning timestamp set at the begining of the
+ * trace chunk instead of only creating an empty
+ * packet when the trace chunk is stopped.
+ *
+ * This indicates to the viewers that the stream
+ * was being recorded, but more importantly it
+ * allows viewers to determine a useable trace
+ * intersection.
+ *
+ * This presents a problem in the case where the
+ * ring-buffer is completely full.
+ *
+ * Consider the following scenario:
+ * - The consumption of data is slow (slow network,
+ * for instance),
+ * - The ring buffer is full,
+ * - A rotation is initiated,
+ * - The flush below does nothing (no space left to
+ * open a new packet),
+ * - The other streams rotate very soon, and new
+ * data is produced in the new chunk,
+ * - This stream completes its rotation long after the
+ * rotation was initiated
+ * - The session is stopped before any event can be
+ * produced in this stream's buffers.
+ *
+ * The resulting trace chunk will have a single packet
+ * temporaly at the end of the trace chunk for this
+ * stream making the stream intersection more narrow
+ * than it should be.
+ *
+ * To work-around this, an empty flush is performed
+ * after the first consumption of a packet during a
+ * rotation if open_packet fails. The idea is that
+ * consuming a packet frees enough space to switch
+ * packets in this scenario and allows the tracer to
+ * "stamp" the beginning of the new trace chunk at the
+ * earliest possible point.
+ */
+ const enum open_packet_status status =
+ open_packet(stream);
+
+ switch (status) {
+ case OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk =
+ true;
+ break;
+ case OPEN_PACKET_STATUS_NO_SPACE:
+ /*
+ * Can't open a packet as there is no space left
+ * in the buffer. A new packet will be opened
+ * once one has been consumed.
+ */
+ DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key, stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case OPEN_PACKET_STATUS_ERROR:
+ /* Logged by callee. */
+ ret = -1;
+ goto end_unlock_stream;
+ default:
+ abort();
+ }
+ }
+
pthread_mutex_unlock(&stream->lock);
}
stream = NULL;