stream->stream_handle,
stream->tracefile_size_current, packet_size,
stream->tracefile_current_index, new_file_index);
- tracefile_array_file_rotate(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
if (stream->stream_fd) {
ret = relay_index_try_flush(index);
if (ret == 0) {
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
*flushed = true;
}
ret = relay_index_try_flush(index);
if (ret == 0) {
+ tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
free(tfa);
}
-void tracefile_array_file_rotate(struct tracefile_array *tfa)
+void tracefile_array_file_rotate(struct tracefile_array *tfa,
+ enum tracefile_rotate_type type)
{
uint64_t *headp, *tailp;
/* Not in tracefile rotation mode. */
return;
}
- /* Rotate to next file. */
- tfa->file_head = (tfa->file_head + 1) % tfa->count;
- if (tfa->file_head == tfa->file_tail) {
- /* Move tail. */
- tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
- }
- headp = &tfa->tf[tfa->file_head].seq_head;
- tailp = &tfa->tf[tfa->file_head].seq_tail;
- /*
- * If we overwrite a file with content, we need to push the tail
- * to the position following the content we are overwriting.
- */
- if (*headp != -1ULL) {
- tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+ switch (type) {
+ case TRACEFILE_ROTATE_READ:
+ /*
+ * Rotate read head to write head position, thus allowing
+ * reader to consume the newly rotated head file.
+ */
+ tfa->file_head_read = tfa->file_head_write;
+ break;
+ case TRACEFILE_ROTATE_WRITE:
+ /* Rotate write head to next file, pushing tail if needed. */
+ tfa->file_head_write = (tfa->file_head_write + 1) % tfa->count;
+ if (tfa->file_head_write == tfa->file_tail) {
+ /* Move tail. */
+ tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
+ }
+ headp = &tfa->tf[tfa->file_head_write].seq_head;
+ tailp = &tfa->tf[tfa->file_head_write].seq_tail;
+ /*
+ * If we overwrite a file with content, we need to push the tail
+ * to the position following the content we are overwriting.
+ */
+ if (*headp != -1ULL) {
+ tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+ }
+ /* Reset this file head/tail (overwrite). */
+ *headp = -1ULL;
+ *tailp = -1ULL;
+ break;
+ default:
+ abort();
}
- /* Reset this file head/tail (overwrite). */
- *headp = -1ULL;
- *tailp = -1ULL;
}
void tracefile_array_commit_seq(struct tracefile_array *tfa)
/* Not in tracefile rotation mode. */
return;
}
- headp = &tfa->tf[tfa->file_head].seq_head;
- tailp = &tfa->tf[tfa->file_head].seq_tail;
+ headp = &tfa->tf[tfa->file_head_write].seq_head;
+ tailp = &tfa->tf[tfa->file_head_write].seq_tail;
/* Update head tracefile seq_head. */
*headp = tfa->seq_head;
/*
}
}
-uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa)
+uint64_t tracefile_array_get_read_file_index_head(struct tracefile_array *tfa)
{
- return tfa->file_head;
+ return tfa->file_head_read;
}
uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa)
uint64_t seq_tail; /* Oldest seqcount. Inclusive. */
};
+enum tracefile_rotate_type {
+ TRACEFILE_ROTATE_READ,
+ TRACEFILE_ROTATE_WRITE,
+};
+
/*
* Represents an array of trace files in a stream.
+ * head is the most recent file/trace packet.
+ * tail is the oldest file/trace packet.
+ *
+ * There are two heads: a "read" head and a "write" head. The "write" head is
+ * the position of the newest data file. The "read" head position is only moved
+ * forward when the index is received.
+ *
+ * The viewer uses the "read" head position as upper bound, which
+ * ensures it never attempts to open a non-existing index file.
*/
struct tracefile_array {
struct tracefile *tf;
size_t count;
/* Current head/tail files. */
- uint64_t file_head;
+ uint64_t file_head_read;
+ uint64_t file_head_write;
uint64_t file_tail;
/* Overall head/tail seq for the entire array. Inclusive. */
struct tracefile_array *tracefile_array_create(size_t count);
void tracefile_array_destroy(struct tracefile_array *tfa);
-void tracefile_array_file_rotate(struct tracefile_array *tfa);
+void tracefile_array_file_rotate(struct tracefile_array *tfa, enum tracefile_rotate_type type);
void tracefile_array_commit_seq(struct tracefile_array *tfa);
-uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
+uint64_t tracefile_array_get_read_file_index_head(struct tracefile_array *tfa);
/* May return -1ULL in the case where we have not received any indexes yet. */
uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa);
}
case LTTNG_VIEWER_SEEK_LAST:
vstream->current_tracefile_id =
- tracefile_array_get_file_index_head(stream->tfa);
+ tracefile_array_get_read_file_index_head(stream->tfa);
/*
* We seek at the very end of each stream, awaiting for
* a future packet to eventually come in.