index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
+ (rstream->index_received_seqcount == 0 ||
+ (vstream->index_sent_seqcount != 0 &&
rstream->index_received_seqcount
- == vstream->index_sent_seqcount) {
+ <= vstream->index_sent_seqcount))) {
/*
* We've received a synchronization beacon and the last index
* available has been sent, the index for now is inactive.
* inform the client of a time interval during which we can
* guarantee that there are no events to read (and never will
* be).
+ *
+ * The sent seqcount can grow higher than receive seqcount on
+ * clear because the rotation performed by clear will push
+ * the index_sent_seqcount ahead (see
+ * viewer_stream_sync_tracefile_array_tail) and skip over
+ * packet sequence numbers.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
index->timestamp_end = htobe64(rstream->beacon_ts_end);
DBG("Check index status: inactive with beacon, for stream %" PRIu64,
vstream->stream->stream_handle);
goto index_ready;
- } else if (rstream->index_received_seqcount
- == vstream->index_sent_seqcount) {
+ } else if (rstream->index_received_seqcount == 0 ||
+ (vstream->index_sent_seqcount != 0 &&
+ rstream->index_received_seqcount
+ <= vstream->index_sent_seqcount)) {
/*
- * This checks whether received == sent seqcount. In
+ * This checks whether received <= sent seqcount. In
* this case, we have not received a beacon. Therefore,
* we can only ask the client to retry later.
+ *
+ * The sent seqcount can grow higher than receive seqcount on
+ * clear because the rotation performed by clear will push
+ * the index_sent_seqcount ahead (see
+ * viewer_stream_sync_tracefile_array_tail) and skip over
+ * packet sequence numbers.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
DBG("Check index status: retry for stream %" PRIu64,
DBG("Viewer stream %" PRIu64 " rotation",
vstream->stream->stream_handle);
ret = viewer_stream_rotate(vstream);
- if (ret < 0) {
- goto end;
- } else if (ret == 1) {
+ if (ret == 1) {
/* EOF across entire stream. */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
}
/* ret == 0 means successful so we continue. */
ret = 0;
-end:
return ret;
hup:
goto send_reply;
}
- /* Try to open an index if one is needed for that stream. */
- ret = try_open_index(vstream, rstream);
- if (ret < 0) {
- if (ret == -ENOENT) {
- /*
- * The index is created only when the first data
- * packet arrives, it might not be ready at the
- * beginning of the session
- */
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
- } else {
- /* Unhandled error. */
+ if (rstream->ongoing_rotation.is_set) {
+ /* Rotation is ongoing, try again later. */
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto send_reply;
+ }
+
+ if (rstream->trace->session->ongoing_rotation) {
+ /* Rotation is ongoing, try again later. */
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto send_reply;
+ }
+
+ if (rstream->trace_chunk) {
+ uint64_t rchunk_id, vchunk_id;
+
+ /*
+ * If the relay stream is not yet closed, ensure the viewer
+ * chunk matches the relay chunk after clear.
+ */
+ if (lttng_trace_chunk_get_id(rstream->trace_chunk,
+ &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
+ }
+ if (lttng_trace_chunk_get_id(
+ conn->viewer_session->current_trace_chunk,
+ &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
+ }
+
+ if (rchunk_id != vchunk_id) {
+ DBG("Relay and viewer chunk ids differ: "
+ "rchunk_id %" PRIu64 " vchunk_id %" PRIu64,
+ rchunk_id, vchunk_id);
+
+ lttng_trace_chunk_put(
+ conn->viewer_session->current_trace_chunk);
+ conn->viewer_session->current_trace_chunk = NULL;
+ ret = viewer_session_set_trace_chunk_copy(
+ conn->viewer_session,
+ rstream->trace_chunk);
+ if (ret) {
+ viewer_index.status =
+ htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
+ }
}
- goto send_reply;
+ }
+ if (conn->viewer_session->current_trace_chunk !=
+ vstream->stream_file.trace_chunk) {
+ bool acquired_reference;
+
+ DBG("Viewer session and viewer stream chunk differ: "
+ "vsession chunk %p vstream chunk %p",
+ conn->viewer_session->current_trace_chunk,
+ vstream->stream_file.trace_chunk);
+ lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+ acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
+ assert(acquired_reference);
+ vstream->stream_file.trace_chunk =
+ conn->viewer_session->current_trace_chunk;
+ viewer_stream_sync_tracefile_array_tail(vstream);
+ viewer_stream_close_files(vstream);
}
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
/* At this point, ret is 0 thus we will be able to read the index. */
assert(!ret);
+ /* Try to open an index if one is needed for that stream. */
+ ret = try_open_index(vstream, rstream);
+ if (ret == -ENOENT) {
+ if (rstream->closed) {
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ goto send_reply;
+ } else {
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto send_reply;
+ }
+ }
+ if (ret < 0) {
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+ goto send_reply;
+ }
+
/*
* vstream->stream_fd may be NULL if it has been closed by
* tracefile rotation, or if we are at the beginning of the
vstream->stream_file.trace_chunk,
file_path, O_RDONLY, 0, &fd, true);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
+ rstream->closed) {
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+ goto send_reply;
+ }
PERROR("Failed to open trace file for viewer stream");
goto error_put;
}
goto error;
}
- assert(vstream->metadata_sent <= vstream->stream->metadata_received);
-
- len = vstream->stream->metadata_received - vstream->metadata_sent;
- if (len == 0) {
+ if (vstream->metadata_sent >= vstream->stream->metadata_received) {
/*
* The live viewers expect to receive a NO_NEW_METADATA
* status before a stream disappears, otherwise they abort the
* entire live connection when receiving an error status.
+ *
+ * Clear feature resets the metadata_sent to 0 until the
+ * same metadata is received again.
*/
reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
/*
goto send_reply;
}
+ len = vstream->stream->metadata_received - vstream->metadata_sent;
+
/* first time, we open the metadata file */
if (!vstream->stream_file.fd) {
int fd;
vstream->stream_file.trace_chunk,
file_path, O_RDONLY, 0, &fd, true);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
+ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+ len = 0;
+ if (vstream->stream->closed) {
+ viewer_stream_put(vstream);
+ }
+ goto send_reply;
+ }
PERROR("Failed to open metadata file for viewer stream");
goto error;
}
#include <common/common.h>
#include <common/index/index.h>
#include <common/compat/string.h>
+#include <common/utils.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
#include "lttng-relayd.h"
#include "viewer-stream.h"
* If we never received an index for the current stream, delay
* the opening of the index, otherwise open it right now.
*/
- if (stream->index_received_seqcount == 0) {
+ if (stream->index_file == NULL) {
vstream->index_file = NULL;
} else {
const uint32_t connection_major = stream->trace->session->major;
}
}
+ /*
+ * If we never received a data file for the current stream, delay the
+ * opening, otherwise open it right now.
+ */
+ if (stream->stream_fd) {
+ int fd, ret;
+ char file_path[LTTNG_PATH_MAX];
+ enum lttng_trace_chunk_status status;
+
+ ret = utils_stream_file_path(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ vstream->current_tracefile_id, NULL, file_path,
+ sizeof(file_path));
+ if (ret < 0) {
+ goto error_unlock;
+ }
+
+ status = lttng_trace_chunk_open_file(
+ vstream->stream_file.trace_chunk,
+ file_path, O_RDONLY, 0, &fd, true);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ goto error_unlock;
+ }
+ vstream->stream_file.fd = stream_fd_create(fd);
+ if (!vstream->stream_file.fd) {
+ if (close(fd)) {
+ PERROR("Failed to close viewer %sfile",
+ stream->is_metadata ? "metadata " : "");
+ }
+ goto error_unlock;
+ }
+ }
+
if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
off_t lseek_ret;
* Rotate a stream to the next tracefile.
*
* Must be called with the rstream lock held.
- * Returns 0 on success, 1 on EOF, a negative value on error.
+ * Returns 0 on success, 1 on EOF.
*/
int viewer_stream_rotate(struct relay_viewer_stream *vstream)
{
int ret;
uint64_t new_id;
const struct relay_stream *stream = vstream->stream;
- const uint32_t connection_major = stream->trace->session->major;
- const uint32_t connection_minor = stream->trace->session->minor;
- enum lttng_trace_chunk_status chunk_status;
/* Detect the last tracefile to open. */
if (stream->index_received_seqcount
tracefile_array_get_file_index_tail(stream->tfa);
vstream->index_sent_seqcount = seq_tail;
}
-
- if (vstream->index_file) {
- lttng_index_file_put(vstream->index_file);
- vstream->index_file = NULL;
- }
- if (vstream->stream_file.fd) {
- stream_fd_put(vstream->stream_file.fd);
- vstream->stream_file.fd = NULL;
- }
- chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
- vstream->stream_file.trace_chunk,
- stream->path_name,
- stream->channel_name,
- stream->tracefile_size,
- vstream->current_tracefile_id,
- lttng_to_index_major(connection_major,
- connection_minor),
- lttng_to_index_minor(connection_major,
- connection_minor),
- true, &vstream->index_file);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
- ret = -1;
- goto end;
- } else {
- ret = 0;
- }
+ viewer_stream_close_files(vstream);
+ ret = 0;
end:
return ret;
}