relayd: live: implement support for clear feature
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 12 Dec 2019 18:24:29 +0000 (13:24 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 19 Dec 2019 23:48:53 +0000 (18:48 -0500)
Implement support for clear feature on live side of relayd.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: I2d8996098abbdcd635757425bfcee405daf1bb13
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/viewer-stream.c

index ae0c2cb738aa4e82022ed51bf82725b275f1b188..2a73556dffa0c22c33df136ee53eb39bcc5af362 100644 (file)
@@ -1287,8 +1287,10 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
+                       (rstream->index_received_seqcount == 0 ||
+                       (vstream->index_sent_seqcount != 0 &&
                        rstream->index_received_seqcount
-                               == vstream->index_sent_seqcount) {
+                               <= vstream->index_sent_seqcount))) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1297,6 +1299,12 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * inform the client of a time interval during which we can
                 * guarantee that there are no events to read (and never will
                 * be).
+                *
+                * The sent seqcount can grow higher than receive seqcount on
+                * clear because the rotation performed by clear will push
+                * the index_sent_seqcount ahead (see
+                * viewer_stream_sync_tracefile_array_tail) and skip over
+                * packet sequence numbers.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
@@ -1304,12 +1312,20 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                DBG("Check index status: inactive with beacon, for stream %" PRIu64,
                                vstream->stream->stream_handle);
                goto index_ready;
-       } else if (rstream->index_received_seqcount
-                       == vstream->index_sent_seqcount) {
+       } else if (rstream->index_received_seqcount == 0 ||
+                       (vstream->index_sent_seqcount != 0 &&
+                       rstream->index_received_seqcount
+                               <= vstream->index_sent_seqcount)) {
                /*
-                * This checks whether received == sent seqcount. In
+                * This checks whether received <= sent seqcount. In
                 * this case, we have not received a beacon. Therefore,
                 * we can only ask the client to retry later.
+                *
+                * The sent seqcount can grow higher than receive seqcount on
+                * clear because the rotation performed by clear will push
+                * the index_sent_seqcount ahead (see
+                * viewer_stream_sync_tracefile_array_tail) and skip over
+                * packet sequence numbers.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                DBG("Check index status: retry for stream %" PRIu64,
@@ -1326,9 +1342,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                DBG("Viewer stream %" PRIu64 " rotation",
                                vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
-               if (ret < 0) {
-                       goto end;
-               } else if (ret == 1) {
+               if (ret == 1) {
                        /* EOF across entire stream. */
                        index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        goto hup;
@@ -1366,7 +1380,6 @@ static int check_index_status(struct relay_viewer_stream *vstream,
        }
        /* ret == 0 means successful so we continue. */
        ret = 0;
-end:
        return ret;
 
 hup:
@@ -1431,21 +1444,70 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
-       /* Try to open an index if one is needed for that stream. */
-       ret = try_open_index(vstream, rstream);
-       if (ret < 0) {
-               if (ret == -ENOENT) {
-                       /*
-                        * The index is created only when the first data
-                        * packet arrives, it might not be ready at the
-                        * beginning of the session
-                        */
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-               } else {
-                       /* Unhandled error. */
+       if (rstream->ongoing_rotation.is_set) {
+               /* Rotation is ongoing, try again later. */
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               goto send_reply;
+       }
+
+       if (rstream->trace->session->ongoing_rotation) {
+               /* Rotation is ongoing, try again later. */
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               goto send_reply;
+       }
+
+       if (rstream->trace_chunk) {
+               uint64_t rchunk_id, vchunk_id;
+
+               /*
+                * If the relay stream is not yet closed, ensure the viewer
+                * chunk matches the relay chunk after clear.
+                */
+               if (lttng_trace_chunk_get_id(rstream->trace_chunk,
+                               &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+                       goto send_reply;
+               }
+               if (lttng_trace_chunk_get_id(
+                               conn->viewer_session->current_trace_chunk,
+                               &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+                       goto send_reply;
+               }
+
+               if (rchunk_id != vchunk_id) {
+                       DBG("Relay and viewer chunk ids differ: "
+                               "rchunk_id %" PRIu64 " vchunk_id %" PRIu64,
+                               rchunk_id, vchunk_id);
+
+                       lttng_trace_chunk_put(
+                               conn->viewer_session->current_trace_chunk);
+                       conn->viewer_session->current_trace_chunk = NULL;
+                       ret = viewer_session_set_trace_chunk_copy(
+                                       conn->viewer_session,
+                                       rstream->trace_chunk);
+                       if (ret) {
+                               viewer_index.status =
+                                       htobe32(LTTNG_VIEWER_INDEX_ERR);
+                               goto send_reply;
+                       }
                }
-               goto send_reply;
+       }
+       if (conn->viewer_session->current_trace_chunk !=
+                       vstream->stream_file.trace_chunk) {
+               bool acquired_reference;
+
+               DBG("Viewer session and viewer stream chunk differ: "
+                               "vsession chunk %p vstream chunk %p",
+                               conn->viewer_session->current_trace_chunk,
+                               vstream->stream_file.trace_chunk);
+               lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+               acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
+               assert(acquired_reference);
+               vstream->stream_file.trace_chunk =
+                       conn->viewer_session->current_trace_chunk;
+               viewer_stream_sync_tracefile_array_tail(vstream);
+               viewer_stream_close_files(vstream);
        }
 
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
@@ -1461,6 +1523,22 @@ int viewer_get_next_index(struct relay_connection *conn)
        /* At this point, ret is 0 thus we will be able to read the index. */
        assert(!ret);
 
+       /* Try to open an index if one is needed for that stream. */
+       ret = try_open_index(vstream, rstream);
+       if (ret == -ENOENT) {
+              if (rstream->closed) {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                       goto send_reply;
+              } else {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto send_reply;
+              }
+       }
+       if (ret < 0) {
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               goto send_reply;
+       }
+
        /*
         * vstream->stream_fd may be NULL if it has been closed by
         * tracefile rotation, or if we are at the beginning of the
@@ -1490,6 +1568,11 @@ int viewer_get_next_index(struct relay_connection *conn)
                                vstream->stream_file.trace_chunk,
                                file_path, O_RDONLY, 0, &fd, true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
+                                       rstream->closed) {
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                               goto send_reply;
+                       }
                        PERROR("Failed to open trace file for viewer stream");
                        goto error_put;
                }
@@ -1746,14 +1829,14 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       assert(vstream->metadata_sent <= vstream->stream->metadata_received);
-
-       len = vstream->stream->metadata_received - vstream->metadata_sent;
-       if (len == 0) {
+       if (vstream->metadata_sent >= vstream->stream->metadata_received) {
                /*
                 * The live viewers expect to receive a NO_NEW_METADATA
                 * status before a stream disappears, otherwise they abort the
                 * entire live connection when receiving an error status.
+                *
+                * Clear feature resets the metadata_sent to 0 until the
+                * same metadata is received again.
                 */
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
                /*
@@ -1770,6 +1853,8 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto send_reply;
        }
 
+       len = vstream->stream->metadata_received - vstream->metadata_sent;
+
        /* first time, we open the metadata file */
        if (!vstream->stream_file.fd) {
                int fd;
@@ -1794,6 +1879,14 @@ int viewer_get_metadata(struct relay_connection *conn)
                                vstream->stream_file.trace_chunk,
                                file_path, O_RDONLY, 0, &fd, true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
+                               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+                               len = 0;
+                               if (vstream->stream->closed) {
+                                       viewer_stream_put(vstream);
+                               }
+                               goto send_reply;
+                       }
                        PERROR("Failed to open metadata file for viewer stream");
                        goto error;
                }
index 42431a8fa3588fc20a5745f80c0e54ecb5887545..5c19fb9747e4e124e393e872b15d042810a7aff5 100644 (file)
 #include <common/common.h>
 #include <common/index/index.h>
 #include <common/compat/string.h>
+#include <common/utils.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 
 #include "lttng-relayd.h"
 #include "viewer-stream.h"
@@ -124,7 +128,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
         * If we never received an index for the current stream, delay
         * the opening of the index, otherwise open it right now.
         */
-       if (stream->index_received_seqcount == 0) {
+       if (stream->index_file == NULL) {
                vstream->index_file = NULL;
        } else {
                const uint32_t connection_major = stream->trace->session->major;
@@ -150,6 +154,39 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
                }
        }
 
+       /*
+        * If we never received a data file for the current stream, delay the
+        * opening, otherwise open it right now.
+        */
+       if (stream->stream_fd) {
+               int fd, ret;
+               char file_path[LTTNG_PATH_MAX];
+               enum lttng_trace_chunk_status status;
+
+               ret = utils_stream_file_path(stream->path_name,
+                               stream->channel_name, stream->tracefile_size,
+                               vstream->current_tracefile_id, NULL, file_path,
+                               sizeof(file_path));
+               if (ret < 0) {
+                       goto error_unlock;
+               }
+
+               status = lttng_trace_chunk_open_file(
+                               vstream->stream_file.trace_chunk,
+                               file_path, O_RDONLY, 0, &fd, true);
+               if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       goto error_unlock;
+               }
+               vstream->stream_file.fd = stream_fd_create(fd);
+               if (!vstream->stream_file.fd) {
+                       if (close(fd)) {
+                               PERROR("Failed to close viewer %sfile",
+                                       stream->is_metadata ? "metadata " : "");
+                       }
+                       goto error_unlock;
+               }
+       }
+
        if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
                off_t lseek_ret;
 
@@ -290,16 +327,13 @@ void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream
  * Rotate a stream to the next tracefile.
  *
  * Must be called with the rstream lock held.
- * Returns 0 on success, 1 on EOF, a negative value on error.
+ * Returns 0 on success, 1 on EOF.
  */
 int viewer_stream_rotate(struct relay_viewer_stream *vstream)
 {
        int ret;
        uint64_t new_id;
        const struct relay_stream *stream = vstream->stream;
-       const uint32_t connection_major = stream->trace->session->major;
-       const uint32_t connection_minor = stream->trace->session->minor;
-       enum lttng_trace_chunk_status chunk_status;
 
        /* Detect the last tracefile to open. */
        if (stream->index_received_seqcount
@@ -339,32 +373,8 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream)
                        tracefile_array_get_file_index_tail(stream->tfa);
                vstream->index_sent_seqcount = seq_tail;
        }
-
-       if (vstream->index_file) {
-               lttng_index_file_put(vstream->index_file);
-               vstream->index_file = NULL;
-       }
-       if (vstream->stream_file.fd) {
-               stream_fd_put(vstream->stream_file.fd);
-               vstream->stream_file.fd = NULL;
-       }
-       chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
-                                       vstream->stream_file.trace_chunk,
-                                       stream->path_name,
-                                       stream->channel_name,
-                                       stream->tracefile_size,
-                                       vstream->current_tracefile_id,
-                                       lttng_to_index_major(connection_major,
-                                                       connection_minor),
-                                       lttng_to_index_minor(connection_major,
-                                                       connection_minor),
-                                       true, &vstream->index_file);
-       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
-               ret = -1;
-               goto end;
-       } else {
-               ret = 0;
-       }
+       viewer_stream_close_files(vstream);
+       ret = 0;
 end:
        return ret;
 }
This page took 0.030838 seconds and 4 git commands to generate.