Fix: relayd: file rotation and live read
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 3 Sep 2015 21:17:30 +0000 (17:17 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 24 Sep 2015 02:26:40 +0000 (22:26 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/Makefile.am
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/bin/lttng-relayd/tracefile-array.c [new file with mode: 0644]
src/bin/lttng-relayd/tracefile-array.h [new file with mode: 0644]
src/bin/lttng-relayd/viewer-stream.c
src/bin/lttng-relayd/viewer-stream.h

index 428f35202b52c31ec24a40b3e2464de9461c6c49..07eb73223eb71c6c3ce062b518037ea710fd4fe7 100644 (file)
@@ -19,7 +19,8 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
                        stream.c stream.h \
                        stream-fd.c stream-fd.h \
                        connection.c connection.h \
-                       viewer-session.c viewer-session.h
+                       viewer-session.c viewer-session.h \
+                       tracefile-array.c tracefile-array.h
 
 # link on liblttngctl for check if relayd is already alive.
 lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
index 359b0691f4b3bd66aa99bbd3107528904554b8ad..f224faf6850dbe68186b719a85d1ec7ee9644240 100644 (file)
@@ -1129,7 +1129,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->total_index_received == 0) {
+       if (rstream->index_received_seqcount == 0) {
                ret = -ENOENT;
                goto end;
        }
@@ -1171,14 +1171,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
        int ret;
 
        if (trace->session->connection_closed
-                       && rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       && rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /* Last index sent and session connection is closed. */
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
-                       rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1192,21 +1192,24 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
                goto index_ready;
-       } else if (rstream->total_index_received <= vstream->last_sent_index) {
+       } else if (rstream->index_received_seqcount
+                       == vstream->index_sent_seqcount) {
                /*
-                * This actually checks the case where recv == last_sent.
-                * In this case, we have not received a beacon. Therefore, we
-                * can only ask the client to retry later.
+                * This checks whether received == sent seqcount. In
+                * this case, we have not received a beacon. Therefore,
+                * we can only ask the client to retry later.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto index_ready;
-       } else if (!viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq)) {
+       } else if (!tracefile_array_seq_in_file(rstream->tfa,
+                       vstream->current_tracefile_id,
+                       vstream->index_sent_seqcount)) {
                /*
-                * The producer has overwritten our current file. We
-                * need to rotate.
+                * The next index we want to send cannot be read either
+                * because we need to perform a rotation, or due to
+                * the producer having overwritten its trace file.
                 */
-               DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+               DBG("Viewer stream %" PRIu64 " rotation",
                                vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
                if (ret < 0) {
@@ -1216,50 +1219,34 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                        index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        goto hup;
                }
-               assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq));
-               /* ret == 0 means successful so we continue. */
-               ret = 0;
-       } else {
-               ssize_t read_ret;
-               char tmp[1];
-
                /*
-                * Use EOF on current index file to find out when we
-                * need to rotate.
+                * If we have been pushed due to overwrite, it
+                * necessarily means there is data that can be read in
+                * the stream. If we rotated because we reached the end
+                * of a tracefile, it means the following tracefile
+                * needs to contain at least one index, else we would
+                * have already returned LTTNG_VIEWER_INDEX_RETRY to the
+                * viewer. The updated index_sent_seqcount needs to
+                * point to a readable index entry now.
+                *
+                * In the case where we "rotate" on a single file, we
+                * can end up in a case where the requested index is
+                * still unavailable.
                 */
-               read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
-               if (read_ret == 1) {
-                       off_t seek_ret;
-
-                       /* There is still data to read. Rewind position. */
-                       seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
-                       if (seek_ret < 0) {
-                               ret = -1;
-                               goto end;
-                       }
-                       ret = 0;
-               } else if (read_ret == 0) {
-                       /* EOF. We need to rotate. */
-                       DBG("Viewer stream %" PRIu64 " rotation due to EOF",
-                                       vstream->stream->stream_handle);
-                       ret = viewer_stream_rotate(vstream);
-                       if (ret < 0) {
-                               goto end;
-                       } else if (ret == 1) {
-                               /* EOF across entire stream. */
-                               index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-                               goto hup;
-                       }
-                       assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                               vstream->current_tracefile_seq));
-                       /* ret == 0 means successful so we continue. */
-                       ret = 0;
-               } else {
-                       /* Error reading index. */
-                       ret = -1;
+               if (rstream->tracefile_count == 1 &&
+                               !tracefile_array_seq_in_file(
+                                       rstream->tfa,
+                                       vstream->current_tracefile_id,
+                                       vstream->index_sent_seqcount)) {
+                       index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto index_ready;
                }
+               assert(tracefile_array_seq_in_file(rstream->tfa,
+                               vstream->current_tracefile_id,
+                               vstream->index_sent_seqcount));
        }
+       /* ret == 0 means successful so we continue. */
+       ret = 0;
 end:
        return ret;
 
@@ -1302,6 +1289,8 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
+               DBG("Client requested index of unknown stream id %" PRIu64,
+                               be64toh(request_index.stream_id));
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        }
@@ -1408,7 +1397,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        } else {
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
-               vstream->last_sent_index++;
+               vstream->index_sent_seqcount++;
        }
 
        /*
@@ -1455,7 +1444,7 @@ send_reply:
 
        if (vstream) {
                DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-                               vstream->last_sent_index,
+                               vstream->index_sent_seqcount,
                                vstream->stream->stream_handle);
        }
 end:
@@ -1508,6 +1497,8 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
+               DBG("Client requested packet of unknown stream id %" PRIu64,
+                               be64toh(get_packet_info.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                goto send_reply_nolock;
        }
@@ -1619,6 +1610,8 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * Reply back to the client with an error if we cannot
                 * find it.
                 */
+               DBG("Client requested metadata of unknown stream id %" PRIu64,
+                               be64toh(request.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
index 905e245faad068228f2fd74c007818e019ebda23..e5990e1b743cd2b05376bd2c3b5f8d08ad8a4316 100644 (file)
@@ -69,6 +69,7 @@
 #include "session.h"
 #include "stream.h"
 #include "connection.h"
+#include "tracefile-array.h"
 
 /* command line options */
 char *opt_output_path;
@@ -1858,7 +1859,7 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                 * Only flag a stream inactive when it has already
                 * received data and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0
+               if (stream->index_received_seqcount > 0
                                && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end =
                                be64toh(index_info.timestamp_end);
@@ -1886,7 +1887,8 @@ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -2059,7 +2061,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
                fd = index_create_file(stream->path_name, stream->channel_name,
                                -1, -1, stream->tracefile_size,
-                               stream->current_tracefile_id);
+                               tracefile_array_get_file_index_head(stream->tfa));
                if (fd < 0) {
                        ret = -1;
                        /* Put self-ref for this index due to error. */
@@ -2088,7 +2090,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               stream->total_index_received++;
+               tracefile_array_commit_seq(stream->tfa);
+               stream->index_received_seqcount++;
        } else if (ret > 0) {
                /* No flush. */
                ret = 0;
@@ -2172,35 +2175,23 @@ static int relay_process_data(struct relay_connection *conn)
        if (stream->tracefile_size > 0 &&
                        (stream->tracefile_size_current + data_size) >
                        stream->tracefile_size) {
-               uint64_t new_id;
+               uint64_t old_id, new_id;
+
+               old_id = tracefile_array_get_file_index_head(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa);
+
+               /* new_id is updated by utils_rotate_stream_file. */
+               new_id = old_id;
 
-               new_id = (stream->current_tracefile_id + 1) %
-                       stream->tracefile_count;
-               /*
-                * Move viewer oldest available data position forward if
-                * we are overwriting a tracefile.
-                */
-               if (new_id == stream->oldest_tracefile_id) {
-                       stream->oldest_tracefile_id =
-                               (stream->oldest_tracefile_id + 1) %
-                               stream->tracefile_count;
-               }
                ret = utils_rotate_stream_file(stream->path_name,
                                stream->channel_name, stream->tracefile_size,
                                stream->tracefile_count, -1,
                                -1, stream->stream_fd->fd,
-                               &stream->current_tracefile_id,
-                               &stream->stream_fd->fd);
+                               &new_id, &stream->stream_fd->fd);
                if (ret < 0) {
                        ERR("Rotating stream output file");
                        goto end_stream_unlock;
                }
-               stream->current_tracefile_seq++;
-               if (stream->current_tracefile_seq
-                       - stream->oldest_tracefile_seq >=
-                               stream->tracefile_count) {
-                       stream->oldest_tracefile_seq++;
-               }
                /*
                 * Reset current size because we just performed a stream
                 * rotation.
index d11e85b2ae7cfd79cab6712abaf50095c93cc3dd..f80296a487b8f28eb133a1d2e7614ca4fee7ff1c 100644 (file)
@@ -136,6 +136,11 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
                ret = -1;
                goto end;
        }
+       stream->tfa = tracefile_array_create(stream->tracefile_count);
+       if (!stream->tfa) {
+               ret = -1;
+               goto end;
+       }
        if (stream->tracefile_size) {
                DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
        } else {
@@ -240,6 +245,9 @@ static void stream_destroy(struct relay_stream *stream)
        if (stream->indexes_ht) {
                lttng_ht_destroy(stream->indexes_ht);
        }
+       if (stream->tfa) {
+               tracefile_array_destroy(stream->tfa);
+       }
        free(stream->path_name);
        free(stream->channel_name);
        free(stream);
index 7e2b1334ec9c54e3e3ca3c5229b4e664c31c06b8..ca6be8133dc9cd963f5fcdbdae379b813119ef84 100644 (file)
@@ -29,6 +29,7 @@
 
 #include "session.h"
 #include "stream-fd.h"
+#include "tracefile-array.h"
 
 /*
  * Represents a stream in the relay
@@ -67,15 +68,22 @@ struct relay_stream {
        uint64_t tracefile_size;
        uint64_t tracefile_size_current;
        uint64_t tracefile_count;
-       uint64_t current_tracefile_id;
 
-       uint64_t current_tracefile_seq; /* Free-running counter. */
-       uint64_t oldest_tracefile_seq;  /* Free-running counter. */
-
-       /* To inform the viewer up to where it can go back in time. */
-       uint64_t oldest_tracefile_id;
+       /*
+        * Counts the number of received indexes. The "tag" associated
+        * with an index is taken before incrementing this seqcount.
+        * Therefore, the sequence tag associated with the last index
+        * received is always index_received_seqcount - 1.
+        */
+       uint64_t index_received_seqcount;
 
-       uint64_t total_index_received;
+       /*
+        * Tracefile array is an index of the stream trace files,
+        * indexed by position. It allows keeping track of the oldest
+        * available indexes when overwriting trace files in tracefile
+        * rotation.
+        */
+       struct tracefile_array *tfa;
 
        bool closed;    /* Stream is closed. */
 
diff --git a/src/bin/lttng-relayd/tracefile-array.c b/src/bin/lttng-relayd/tracefile-array.c
new file mode 100644 (file)
index 0000000..bcbee5c
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <assert.h>
+#include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+
+#include "tracefile-array.h"
+
+struct tracefile_array *tracefile_array_create(size_t count)
+{
+       struct tracefile_array *tfa = NULL;
+       int i;
+
+       tfa = zmalloc(sizeof(*tfa));
+       if (!tfa) {
+               goto error;
+       }
+       tfa->tf = zmalloc(sizeof(*tfa->tf) * count);
+       if (!tfa->tf) {
+               goto error;
+       }
+       tfa->count = count;
+       for (i = 0; i < count; i++) {
+               tfa->tf[i].seq_head = -1ULL;
+               tfa->tf[i].seq_tail = -1ULL;
+       }
+       tfa->seq_head = -1ULL;
+       tfa->seq_tail = -1ULL;
+       return tfa;
+
+error:
+       if (tfa) {
+               free(tfa->tf);
+       }
+       free(tfa);
+       return NULL;
+}
+
+void tracefile_array_destroy(struct tracefile_array *tfa)
+{
+       if (!tfa) {
+               return;
+       }
+       free(tfa->tf);
+       free(tfa);
+}
+
+void tracefile_array_file_rotate(struct tracefile_array *tfa)
+{
+       uint64_t *headp, *tailp;
+
+       if (!tfa->count) {
+               /* Not in tracefile rotation mode. */
+               return;
+       }
+       /* Rotate to next file.  */
+       tfa->file_head = (tfa->file_head + 1) % tfa->count;
+       if (tfa->file_head == tfa->file_tail) {
+               /* Move tail. */
+               tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
+       }
+       headp = &tfa->tf[tfa->file_head].seq_head;
+       tailp = &tfa->tf[tfa->file_head].seq_tail;
+       /*
+        * If we overwrite a file with content, we need to push the tail
+        * to the position following the content we are overwriting.
+        */
+       if (*headp != -1ULL) {
+               tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+       }
+       /* Reset this file head/tail (overwrite). */
+       *headp = -1ULL;
+       *tailp = -1ULL;
+}
+
+void tracefile_array_commit_seq(struct tracefile_array *tfa)
+{
+       uint64_t *headp, *tailp;
+
+       /* Increment overall head. */
+       tfa->seq_head++;
+       /* If we are committing our first index overall, set tail to 0. */
+       if (tfa->seq_tail == -1ULL) {
+               tfa->seq_tail = 0;
+       }
+       if (!tfa->count) {
+               /* Not in tracefile rotation mode. */
+               return;
+       }
+       headp = &tfa->tf[tfa->file_head].seq_head;
+       tailp = &tfa->tf[tfa->file_head].seq_tail;
+       /* Update head tracefile seq_head. */
+       *headp = tfa->seq_head;
+       /*
+        * If we are committing our first index in this packet, set tail
+        * to this index seq count.
+        */
+       if (*tailp == -1ULL) {
+               *tailp = tfa->seq_head;
+       }
+}
+
+uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa)
+{
+       return tfa->file_head;
+}
+
+uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa)
+{
+       return tfa->seq_head;
+}
+
+uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa)
+{
+       return tfa->file_tail;
+}
+
+uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa)
+{
+       return tfa->seq_tail;
+}
+
+bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
+               uint64_t file_index, uint64_t seq)
+{
+       if (!tfa->count) {
+               /*
+                * Not in tracefile rotation mode; we are guaranteed to have the
+                * index in this file.
+                */
+               return true;
+       }
+       assert(file_index < tfa->count);
+       if (seq == -1ULL) {
+               return false;
+       }
+       if (seq >= tfa->tf[file_index].seq_tail
+                       && seq <= tfa->tf[file_index].seq_head) {
+               return true;
+       } else {
+               return false;
+       }
+}
diff --git a/src/bin/lttng-relayd/tracefile-array.h b/src/bin/lttng-relayd/tracefile-array.h
new file mode 100644 (file)
index 0000000..9158f4f
--- /dev/null
@@ -0,0 +1,65 @@
+#ifndef _TRACEFILE_ARRAY_H
+#define _TRACEFILE_ARRAY_H
+
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <limits.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+struct tracefile {
+       /* Per-tracefile head/tail seq. */
+       uint64_t seq_head;      /* Newest seqcount. Inclusive. */
+       uint64_t seq_tail;      /* Oldest seqcount. Inclusive. */
+};
+
+/*
+ * Represents an array of trace files in a stream.
+ */
+struct tracefile_array {
+       struct tracefile *tf;
+       size_t count;
+
+       /* Current head/tail files. */
+       uint64_t file_head;
+       uint64_t file_tail;
+
+       /* Overall head/tail seq for the entire array. Inclusive. */
+       uint64_t seq_head;
+       uint64_t seq_tail;
+};
+
+struct tracefile_array *tracefile_array_create(size_t count);
+void tracefile_array_destroy(struct tracefile_array *tfa);
+
+void tracefile_array_file_rotate(struct tracefile_array *tfa);
+void tracefile_array_commit_seq(struct tracefile_array *tfa);
+
+uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
+/* May return -1ULL in the case where we have not received any indexes yet. */
+uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa);
+
+uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa);
+/* May return -1ULL in the case where we have not received any indexes yet. */
+uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa);
+
+bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
+               uint64_t file_index, uint64_t seq);
+
+#endif /* _STREAM_H */
index 18da9a76e5cbb71bd4e07079e8136e349a398fa4..cd3cd0bab2b94e0c769be3ea523261d0b3260538 100644 (file)
@@ -62,29 +62,59 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                goto error;
        }
 
+       if (!stream_get(stream)) {
+               ERR("Cannot get stream");
+               goto error;
+       }
+       vstream->stream = stream;
+
+       pthread_mutex_lock(&stream->lock);
+
+       if (stream->is_metadata && stream->trace->viewer_metadata_stream) {
+               ERR("Cannot attach viewer metadata stream to trace (busy).");
+               goto error_unlock;
+       }
+
        switch (seek_t) {
        case LTTNG_VIEWER_SEEK_BEGINNING:
-               vstream->current_tracefile_id = stream->oldest_tracefile_id;
+       {
+               uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
+
+               if (seq_tail == -1ULL) {
+                       /*
+                        * Tail may not be initialized yet. Nonetheless, we know
+                        * we want to send the first index once it becomes
+                        * available.
+                        */
+                       seq_tail = 0;
+               }
+               vstream->current_tracefile_id =
+                       tracefile_array_get_file_index_tail(stream->tfa);
+               vstream->index_sent_seqcount = seq_tail;
                break;
+       }
        case LTTNG_VIEWER_SEEK_LAST:
-               vstream->current_tracefile_id = stream->current_tracefile_id;
+               vstream->current_tracefile_id =
+                       tracefile_array_get_file_index_head(stream->tfa);
+               /*
+                * We seek at the very end of each stream, awaiting for
+                * a future packet to eventually come in.
+                *
+                * We don't need to check the head position for -1ULL since the
+                * increment will set it to 0.
+                */
+               vstream->index_sent_seqcount =
+                               tracefile_array_get_seq_head(stream->tfa) + 1;
                break;
        default:
-               goto error;
-       }
-       if (!stream_get(stream)) {
-               ERR("Cannot get stream");
-               goto error;
+               goto error_unlock;
        }
-       vstream->stream = stream;
 
-       pthread_mutex_lock(&stream->lock);
        /*
-        * If we never received an index for the current stream, delay the opening
-        * of the index, otherwise open it right now.
+        * If we never received an index for the current stream, delay
+        * the opening of the index, otherwise open it right now.
         */
-       if (vstream->current_tracefile_id == stream->current_tracefile_id
-                       && stream->total_index_received == 0) {
+       if (stream->index_received_seqcount == 0) {
                vstream->index_fd = NULL;
        } else {
                int read_fd;
@@ -111,14 +141,12 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                if (lseek_ret < 0) {
                        goto error_unlock;
                }
-               vstream->last_sent_index = stream->total_index_received;
        }
-       pthread_mutex_unlock(&stream->lock);
-
        if (stream->is_metadata) {
                rcu_assign_pointer(stream->trace->viewer_metadata_stream,
                                vstream);
        }
+       pthread_mutex_unlock(&stream->lock);
 
        /* Globally visible after the add unique. */
        lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
@@ -225,26 +253,6 @@ void viewer_stream_put(struct relay_viewer_stream *vstream)
        rcu_read_unlock();
 }
 
-/*
- * Returns whether the current tracefile is readable. If not, it has
- * been overwritten.
- * Must be called with rstream lock held.
- */
-bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream,
-                uint64_t seq)
-{
-       struct relay_stream *stream = vstream->stream;
-
-       if (seq >= stream->oldest_tracefile_seq
-                       && seq <= stream->current_tracefile_seq) {
-               /* seq is a readable file. */
-               return true;
-       } else {
-               /* seq is not readable. */
-               return false;
-       }
-}
-
 /*
  * Rotate a stream to the next tracefile.
  *
@@ -255,9 +263,11 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
 {
        int ret;
        struct relay_stream *stream = vstream->stream;
+       uint64_t new_id;
 
        /* Detect the last tracefile to open. */
-       if (stream->total_index_received == vstream->last_sent_index
+       if (stream->index_received_seqcount
+                       == vstream->index_sent_seqcount
                        && stream->trace->session->connection_closed) {
                ret = 1;
                goto end;
@@ -269,17 +279,29 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
                goto end;
        }
 
-       if (!viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq + 1)) {
-               vstream->current_tracefile_id =
-                               stream->oldest_tracefile_id;
-               vstream->current_tracefile_seq =
-                               stream->oldest_tracefile_seq;
+       /*
+        * Try to move to the next file.
+        */
+       new_id = (vstream->current_tracefile_id + 1)
+                       % stream->tracefile_count;
+       if (tracefile_array_seq_in_file(stream->tfa, new_id,
+                       vstream->index_sent_seqcount)) {
+               vstream->current_tracefile_id = new_id;
        } else {
+               uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
+
+               /*
+                * This can only be reached on overwrite, which implies there
+                * has been data written at some point, which will have set the
+                * tail.
+                */
+               assert(seq_tail != -1ULL);
+               /*
+                * We need to resync because we lag behind tail.
+                */
                vstream->current_tracefile_id =
-                               (vstream->current_tracefile_id + 1)
-                                       % stream->tracefile_count;
-               vstream->current_tracefile_seq++;
+                       tracefile_array_get_file_index_tail(stream->tfa);
+               vstream->index_sent_seqcount = seq_tail;
        }
 
        if (vstream->index_fd) {
index cc46db4e2a3545cb283ddb5c3260001bcf82d320..5dc135dc6c4972eb104aec630ca088cbd8ed62fc 100644 (file)
@@ -59,10 +59,15 @@ struct relay_viewer_stream {
        char *channel_name;
 
        uint64_t current_tracefile_id;
-       /* Free-running counter. */
-       uint64_t current_tracefile_seq;
 
-       uint64_t last_sent_index;
+       /*
+        * Counts the number of sent indexes. The "tag" associated
+        * with an index to send is the current index_received_seqcount,
+        * because we increment index_received_seqcount after sending
+        * each index. This index_received_seqcount counter can also be
+        * updated when catching up with the producer.
+        */
+       uint64_t index_sent_seqcount;
 
        /* Indicates if this stream has been sent to a viewer client. */
        bool sent_flag;
This page took 0.037886 seconds and 4 git commands to generate.