Fix: relayd: tracefile rotation: viewer opening missing index file
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 1 Nov 2019 20:23:03 +0000 (16:23 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 6 Nov 2019 19:35:10 +0000 (14:35 -0500)
Moving the head position of the tracefile array when the data is
received opens a window where a viewer attaching to the session could
try to open a missing index file (which has not been received yet).

However, we want to bump the tail position as soon as we receive
data, because the prior tail is not valid anymore.

Solve this by introducing two head positions: the "read" head
and the "write" head. The "write" head is the position of the
newest data file (equivalent to the prior "head" position). We
also introduce a "read" head position, which is only moved
forward when the index is received.

The viewer now uses the "read" head position as upper bound, which
ensures it never attempts to open a non-existing index file.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/tracefile-array.c
src/bin/lttng-relayd/tracefile-array.h
src/bin/lttng-relayd/viewer-stream.c

index 94698f8d868e4da4d3e04770429a0ccfe40f3b7a..4d3d37a2bc91d745b8e6ee83935aa623f27b3db9 100644 (file)
@@ -958,7 +958,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -1095,6 +1095,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                *flushed = true;
@@ -1188,6 +1189,7 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
index 20b760c06893f2387968f3f773dd2cd846675fa1..3d62317ac76d7a6fbe2d731a9991aa788181db78 100644 (file)
@@ -62,7 +62,8 @@ void tracefile_array_destroy(struct tracefile_array *tfa)
        free(tfa);
 }
 
-void tracefile_array_file_rotate(struct tracefile_array *tfa)
+void tracefile_array_file_rotate(struct tracefile_array *tfa,
+               enum tracefile_rotate_type type)
 {
        uint64_t *headp, *tailp;
 
@@ -70,24 +71,37 @@ void tracefile_array_file_rotate(struct tracefile_array *tfa)
                /* Not in tracefile rotation mode. */
                return;
        }
-       /* Rotate to next file.  */
-       tfa->file_head = (tfa->file_head + 1) % tfa->count;
-       if (tfa->file_head == tfa->file_tail) {
-               /* Move tail. */
-               tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
-       }
-       headp = &tfa->tf[tfa->file_head].seq_head;
-       tailp = &tfa->tf[tfa->file_head].seq_tail;
-       /*
-        * If we overwrite a file with content, we need to push the tail
-        * to the position following the content we are overwriting.
-        */
-       if (*headp != -1ULL) {
-               tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+       switch (type) {
+       case TRACEFILE_ROTATE_READ:
+               /*
+                * Rotate read head to write head position, thus allowing
+                * reader to consume the newly rotated head file.
+                */
+               tfa->file_head_read = tfa->file_head_write;
+               break;
+       case TRACEFILE_ROTATE_WRITE:
+               /* Rotate write head to next file, pushing tail if needed.  */
+               tfa->file_head_write = (tfa->file_head_write + 1) % tfa->count;
+               if (tfa->file_head_write == tfa->file_tail) {
+                       /* Move tail. */
+                       tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
+               }
+               headp = &tfa->tf[tfa->file_head_write].seq_head;
+               tailp = &tfa->tf[tfa->file_head_write].seq_tail;
+               /*
+                * If we overwrite a file with content, we need to push the tail
+                * to the position following the content we are overwriting.
+                */
+               if (*headp != -1ULL) {
+                       tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+               }
+               /* Reset this file head/tail (overwrite). */
+               *headp = -1ULL;
+               *tailp = -1ULL;
+               break;
+       default:
+               abort();
        }
-       /* Reset this file head/tail (overwrite). */
-       *headp = -1ULL;
-       *tailp = -1ULL;
 }
 
 void tracefile_array_commit_seq(struct tracefile_array *tfa)
@@ -104,8 +118,8 @@ void tracefile_array_commit_seq(struct tracefile_array *tfa)
                /* Not in tracefile rotation mode. */
                return;
        }
-       headp = &tfa->tf[tfa->file_head].seq_head;
-       tailp = &tfa->tf[tfa->file_head].seq_tail;
+       headp = &tfa->tf[tfa->file_head_write].seq_head;
+       tailp = &tfa->tf[tfa->file_head_write].seq_tail;
        /* Update head tracefile seq_head. */
        *headp = tfa->seq_head;
        /*
@@ -117,9 +131,9 @@ void tracefile_array_commit_seq(struct tracefile_array *tfa)
        }
 }
 
-uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa)
+uint64_t tracefile_array_get_read_file_index_head(struct tracefile_array *tfa)
 {
-       return tfa->file_head;
+       return tfa->file_head_read;
 }
 
 uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa)
index 9158f4fe40c3c16bd5446ea811cc44cef379b966..04d9123d03fc3caa9a3257dd654a11e45d7a94ba 100644 (file)
@@ -29,15 +29,30 @@ struct tracefile {
        uint64_t seq_tail;      /* Oldest seqcount. Inclusive. */
 };
 
+enum tracefile_rotate_type {
+       TRACEFILE_ROTATE_READ,
+       TRACEFILE_ROTATE_WRITE,
+};
+
 /*
  * Represents an array of trace files in a stream.
+ * head is the most recent file/trace packet.
+ * tail is the oldest file/trace packet.
+ *
+ * There are two heads: a "read" head and a "write" head. The "write" head is
+ * the position of the newest data file. The "read" head position is only moved
+ * forward when the index is received.
+ *
+ * The viewer uses the "read" head position as upper bound, which
+ * ensures it never attempts to open a non-existing index file.
  */
 struct tracefile_array {
        struct tracefile *tf;
        size_t count;
 
        /* Current head/tail files. */
-       uint64_t file_head;
+       uint64_t file_head_read;
+       uint64_t file_head_write;
        uint64_t file_tail;
 
        /* Overall head/tail seq for the entire array. Inclusive. */
@@ -48,10 +63,10 @@ struct tracefile_array {
 struct tracefile_array *tracefile_array_create(size_t count);
 void tracefile_array_destroy(struct tracefile_array *tfa);
 
-void tracefile_array_file_rotate(struct tracefile_array *tfa);
+void tracefile_array_file_rotate(struct tracefile_array *tfa, enum tracefile_rotate_type type);
 void tracefile_array_commit_seq(struct tracefile_array *tfa);
 
-uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
+uint64_t tracefile_array_get_read_file_index_head(struct tracefile_array *tfa);
 /* May return -1ULL in the case where we have not received any indexes yet. */
 uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa);
 
index f41bbe1a8de8c45a13b96c05491f64e16694b244..f3baf105bb466dd36f2edd23f69a3a535f21f556 100644 (file)
@@ -106,7 +106,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
        }
        case LTTNG_VIEWER_SEEK_LAST:
                vstream->current_tracefile_id =
-                       tracefile_array_get_file_index_head(stream->tfa);
+                       tracefile_array_get_read_file_index_head(stream->tfa);
                /*
                 * We seek at the very end of each stream, awaiting for
                 * a future packet to eventually come in.
This page took 0.033278 seconds and 4 git commands to generate.