stream.c stream.h \
stream-fd.c stream-fd.h \
connection.c connection.h \
- viewer-session.c viewer-session.h
+ viewer-session.c viewer-session.h \
+ tracefile-array.c tracefile-array.h
# link on liblttngctl for check if relayd is already alive.
lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
/*
* First time, we open the index file and at least one index is ready.
*/
- if (rstream->total_index_received == 0) {
+ if (rstream->index_received_seqcount == 0) {
ret = -ENOENT;
goto end;
}
int ret;
if (trace->session->connection_closed
- && rstream->total_index_received
- == vstream->last_sent_index) {
+ && rstream->index_received_seqcount
+ == vstream->index_sent_seqcount) {
/* Last index sent and session connection is closed. */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
- rstream->total_index_received
- == vstream->last_sent_index) {
+ rstream->index_received_seqcount
+ == vstream->index_sent_seqcount) {
/*
* We've received a synchronization beacon and the last index
* available has been sent, the index for now is inactive.
index->timestamp_end = htobe64(rstream->beacon_ts_end);
index->stream_id = htobe64(rstream->ctf_stream_id);
goto index_ready;
- } else if (rstream->total_index_received <= vstream->last_sent_index) {
+ } else if (rstream->index_received_seqcount
+ == vstream->index_sent_seqcount) {
/*
- * This actually checks the case where recv == last_sent.
- * In this case, we have not received a beacon. Therefore, we
- * can only ask the client to retry later.
+ * This checks whether received == sent seqcount. In
+ * this case, we have not received a beacon. Therefore,
+ * we can only ask the client to retry later.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
goto index_ready;
- } else if (!viewer_stream_is_tracefile_seq_readable(vstream,
- vstream->current_tracefile_seq)) {
+ } else if (!tracefile_array_seq_in_file(rstream->tfa,
+ vstream->current_tracefile_id,
+ vstream->index_sent_seqcount)) {
/*
- * The producer has overwritten our current file. We
- * need to rotate.
+ * The next index we want to send cannot be read either
+ * because we need to perform a rotation, or due to
+ * the producer having overwritten its trace file.
*/
- DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+ DBG("Viewer stream %" PRIu64 " rotation",
vstream->stream->stream_handle);
ret = viewer_stream_rotate(vstream);
if (ret < 0) {
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
}
- assert(viewer_stream_is_tracefile_seq_readable(vstream,
- vstream->current_tracefile_seq));
- /* ret == 0 means successful so we continue. */
- ret = 0;
- } else {
- ssize_t read_ret;
- char tmp[1];
-
/*
- * Use EOF on current index file to find out when we
- * need to rotate.
+ * If we have been pushed due to overwrite, it
+ * necessarily means there is data that can be read in
+ * the stream. If we rotated because we reached the end
+ * of a tracefile, it means the following tracefile
+ * needs to contain at least one index, else we would
+ * have already returned LTTNG_VIEWER_INDEX_RETRY to the
+ * viewer. The updated index_sent_seqcount needs to
+ * point to a readable index entry now.
+ *
+ * In the case where we "rotate" on a single file, we
+ * can end up in a case where the requested index is
+ * still unavailable.
*/
- read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
- if (read_ret == 1) {
- off_t seek_ret;
-
- /* There is still data to read. Rewind position. */
- seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
- if (seek_ret < 0) {
- ret = -1;
- goto end;
- }
- ret = 0;
- } else if (read_ret == 0) {
- /* EOF. We need to rotate. */
- DBG("Viewer stream %" PRIu64 " rotation due to EOF",
- vstream->stream->stream_handle);
- ret = viewer_stream_rotate(vstream);
- if (ret < 0) {
- goto end;
- } else if (ret == 1) {
- /* EOF across entire stream. */
- index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
- goto hup;
- }
- assert(viewer_stream_is_tracefile_seq_readable(vstream,
- vstream->current_tracefile_seq));
- /* ret == 0 means successful so we continue. */
- ret = 0;
- } else {
- /* Error reading index. */
- ret = -1;
+ if (rstream->tracefile_count == 1 &&
+ !tracefile_array_seq_in_file(
+ rstream->tfa,
+ vstream->current_tracefile_id,
+ vstream->index_sent_seqcount)) {
+ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto index_ready;
}
+ assert(tracefile_array_seq_in_file(rstream->tfa,
+ vstream->current_tracefile_id,
+ vstream->index_sent_seqcount));
}
+ /* ret == 0 means successful so we continue. */
+ ret = 0;
end:
return ret;
vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
if (!vstream) {
+ DBG("Client requested index of unknown stream id %" PRIu64,
+ be64toh(request_index.stream_id));
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
}
goto send_reply;
} else {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
- vstream->last_sent_index++;
+ vstream->index_sent_seqcount++;
}
/*
if (vstream) {
DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
- vstream->last_sent_index,
+ vstream->index_sent_seqcount,
vstream->stream->stream_handle);
}
end:
vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
if (!vstream) {
+ DBG("Client requested packet of unknown stream id %" PRIu64,
+ be64toh(get_packet_info.stream_id));
reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
goto send_reply_nolock;
}
* Reply back to the client with an error if we cannot
* find it.
*/
+ DBG("Client requested metadata of unknown stream id %" PRIu64,
+ be64toh(request.stream_id));
reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
goto send_reply;
}
#include "session.h"
#include "stream.h"
#include "connection.h"
+#include "tracefile-array.h"
/* command line options */
char *opt_output_path;
* Only flag a stream inactive when it has already
* received data and no indexes are in flight.
*/
- if (stream->total_index_received > 0
+ if (stream->index_received_seqcount > 0
&& stream->indexes_in_flight == 0) {
stream->beacon_ts_end =
be64toh(index_info.timestamp_end);
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- stream->total_index_received++;
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
} else if (ret > 0) {
/* no flush. */
ret = 0;
fd = index_create_file(stream->path_name, stream->channel_name,
-1, -1, stream->tracefile_size,
- stream->current_tracefile_id);
+ tracefile_array_get_file_index_head(stream->tfa));
if (fd < 0) {
ret = -1;
/* Put self-ref for this index due to error. */
ret = relay_index_try_flush(index);
if (ret == 0) {
- stream->total_index_received++;
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
} else if (ret > 0) {
/* No flush. */
ret = 0;
if (stream->tracefile_size > 0 &&
(stream->tracefile_size_current + data_size) >
stream->tracefile_size) {
- uint64_t new_id;
+ uint64_t old_id, new_id;
+
+ old_id = tracefile_array_get_file_index_head(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa);
+
+ /* new_id is updated by utils_rotate_stream_file. */
+ new_id = old_id;
- new_id = (stream->current_tracefile_id + 1) %
- stream->tracefile_count;
- /*
- * Move viewer oldest available data position forward if
- * we are overwriting a tracefile.
- */
- if (new_id == stream->oldest_tracefile_id) {
- stream->oldest_tracefile_id =
- (stream->oldest_tracefile_id + 1) %
- stream->tracefile_count;
- }
ret = utils_rotate_stream_file(stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->tracefile_count, -1,
-1, stream->stream_fd->fd,
- &stream->current_tracefile_id,
- &stream->stream_fd->fd);
+ &new_id, &stream->stream_fd->fd);
if (ret < 0) {
ERR("Rotating stream output file");
goto end_stream_unlock;
}
- stream->current_tracefile_seq++;
- if (stream->current_tracefile_seq
- - stream->oldest_tracefile_seq >=
- stream->tracefile_count) {
- stream->oldest_tracefile_seq++;
- }
/*
* Reset current size because we just performed a stream
* rotation.
ret = -1;
goto end;
}
+ stream->tfa = tracefile_array_create(stream->tracefile_count);
+ if (!stream->tfa) {
+ ret = -1;
+ goto end;
+ }
if (stream->tracefile_size) {
DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
} else {
if (stream->indexes_ht) {
lttng_ht_destroy(stream->indexes_ht);
}
+ if (stream->tfa) {
+ tracefile_array_destroy(stream->tfa);
+ }
free(stream->path_name);
free(stream->channel_name);
free(stream);
#include "session.h"
#include "stream-fd.h"
+#include "tracefile-array.h"
/*
* Represents a stream in the relay
uint64_t tracefile_size;
uint64_t tracefile_size_current;
uint64_t tracefile_count;
- uint64_t current_tracefile_id;
- uint64_t current_tracefile_seq; /* Free-running counter. */
- uint64_t oldest_tracefile_seq; /* Free-running counter. */
-
- /* To inform the viewer up to where it can go back in time. */
- uint64_t oldest_tracefile_id;
+ /*
+ * Counts the number of received indexes. The "tag" associated
+ * with an index is taken before incrementing this seqcount.
+ * Therefore, the sequence tag associated with the last index
+ * received is always index_received_seqcount - 1.
+ */
+ uint64_t index_received_seqcount;
- uint64_t total_index_received;
+ /*
+ * Tracefile array is an index of the stream trace files,
+ * indexed by position. It allows keeping track of the oldest
+ * available indexes when overwriting trace files in tracefile
+ * rotation.
+ */
+ struct tracefile_array *tfa;
bool closed; /* Stream is closed. */
--- /dev/null
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#define _LGPL_SOURCE
+#include <assert.h>
+#include <common/common.h>
+#include <common/utils.h>
+#include <common/defaults.h>
+
+#include "tracefile-array.h"
+
+struct tracefile_array *tracefile_array_create(size_t count)
+{
+ struct tracefile_array *tfa = NULL;
+ int i;
+
+ tfa = zmalloc(sizeof(*tfa));
+ if (!tfa) {
+ goto error;
+ }
+ tfa->tf = zmalloc(sizeof(*tfa->tf) * count);
+ if (!tfa->tf) {
+ goto error;
+ }
+ tfa->count = count;
+ for (i = 0; i < count; i++) {
+ tfa->tf[i].seq_head = -1ULL;
+ tfa->tf[i].seq_tail = -1ULL;
+ }
+ tfa->seq_head = -1ULL;
+ tfa->seq_tail = -1ULL;
+ return tfa;
+
+error:
+ if (tfa) {
+ free(tfa->tf);
+ }
+ free(tfa);
+ return NULL;
+}
+
+void tracefile_array_destroy(struct tracefile_array *tfa)
+{
+ if (!tfa) {
+ return;
+ }
+ free(tfa->tf);
+ free(tfa);
+}
+
+void tracefile_array_file_rotate(struct tracefile_array *tfa)
+{
+ uint64_t *headp, *tailp;
+
+ if (!tfa->count) {
+ /* Not in tracefile rotation mode. */
+ return;
+ }
+ /* Rotate to next file. */
+ tfa->file_head = (tfa->file_head + 1) % tfa->count;
+ if (tfa->file_head == tfa->file_tail) {
+ /* Move tail. */
+ tfa->file_tail = (tfa->file_tail + 1) % tfa->count;
+ }
+ headp = &tfa->tf[tfa->file_head].seq_head;
+ tailp = &tfa->tf[tfa->file_head].seq_tail;
+ /*
+ * If we overwrite a file with content, we need to push the tail
+ * to the position following the content we are overwriting.
+ */
+ if (*headp != -1ULL) {
+ tfa->seq_tail = tfa->tf[tfa->file_tail].seq_tail;
+ }
+ /* Reset this file head/tail (overwrite). */
+ *headp = -1ULL;
+ *tailp = -1ULL;
+}
+
+void tracefile_array_commit_seq(struct tracefile_array *tfa)
+{
+ uint64_t *headp, *tailp;
+
+ /* Increment overall head. */
+ tfa->seq_head++;
+ /* If we are committing our first index overall, set tail to 0. */
+ if (tfa->seq_tail == -1ULL) {
+ tfa->seq_tail = 0;
+ }
+ if (!tfa->count) {
+ /* Not in tracefile rotation mode. */
+ return;
+ }
+ headp = &tfa->tf[tfa->file_head].seq_head;
+ tailp = &tfa->tf[tfa->file_head].seq_tail;
+ /* Update head tracefile seq_head. */
+ *headp = tfa->seq_head;
+ /*
+ * If we are committing our first index in this packet, set tail
+ * to this index seq count.
+ */
+ if (*tailp == -1ULL) {
+ *tailp = tfa->seq_head;
+ }
+}
+
+uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa)
+{
+ return tfa->file_head;
+}
+
+uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa)
+{
+ return tfa->seq_head;
+}
+
+uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa)
+{
+ return tfa->file_tail;
+}
+
+uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa)
+{
+ return tfa->seq_tail;
+}
+
+bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
+ uint64_t file_index, uint64_t seq)
+{
+ if (!tfa->count) {
+ /*
+ * Not in tracefile rotation mode; we are guaranteed to have the
+ * index in this file.
+ */
+ return true;
+ }
+ assert(file_index < tfa->count);
+ if (seq == -1ULL) {
+ return false;
+ }
+ if (seq >= tfa->tf[file_index].seq_tail
+ && seq <= tfa->tf[file_index].seq_head) {
+ return true;
+ } else {
+ return false;
+ }
+}
--- /dev/null
+#ifndef _TRACEFILE_ARRAY_H
+#define _TRACEFILE_ARRAY_H
+
+/*
+ * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <limits.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdbool.h>
+
+struct tracefile {
+ /* Per-tracefile head/tail seq. */
+ uint64_t seq_head; /* Newest seqcount. Inclusive. */
+ uint64_t seq_tail; /* Oldest seqcount. Inclusive. */
+};
+
+/*
+ * Represents an array of trace files in a stream.
+ */
+struct tracefile_array {
+ struct tracefile *tf;
+ size_t count;
+
+ /* Current head/tail files. */
+ uint64_t file_head;
+ uint64_t file_tail;
+
+ /* Overall head/tail seq for the entire array. Inclusive. */
+ uint64_t seq_head;
+ uint64_t seq_tail;
+};
+
+struct tracefile_array *tracefile_array_create(size_t count);
+void tracefile_array_destroy(struct tracefile_array *tfa);
+
+void tracefile_array_file_rotate(struct tracefile_array *tfa);
+void tracefile_array_commit_seq(struct tracefile_array *tfa);
+
+uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
+/* May return -1ULL in the case where we have not received any indexes yet. */
+uint64_t tracefile_array_get_seq_head(struct tracefile_array *tfa);
+
+uint64_t tracefile_array_get_file_index_tail(struct tracefile_array *tfa);
+/* May return -1ULL in the case where we have not received any indexes yet. */
+uint64_t tracefile_array_get_seq_tail(struct tracefile_array *tfa);
+
+bool tracefile_array_seq_in_file(struct tracefile_array *tfa,
+ uint64_t file_index, uint64_t seq);
+
+#endif /* _STREAM_H */
goto error;
}
+ if (!stream_get(stream)) {
+ ERR("Cannot get stream");
+ goto error;
+ }
+ vstream->stream = stream;
+
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->is_metadata && stream->trace->viewer_metadata_stream) {
+ ERR("Cannot attach viewer metadata stream to trace (busy).");
+ goto error_unlock;
+ }
+
switch (seek_t) {
case LTTNG_VIEWER_SEEK_BEGINNING:
- vstream->current_tracefile_id = stream->oldest_tracefile_id;
+ {
+ uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
+
+ if (seq_tail == -1ULL) {
+ /*
+ * Tail may not be initialized yet. Nonetheless, we know
+ * we want to send the first index once it becomes
+ * available.
+ */
+ seq_tail = 0;
+ }
+ vstream->current_tracefile_id =
+ tracefile_array_get_file_index_tail(stream->tfa);
+ vstream->index_sent_seqcount = seq_tail;
break;
+ }
case LTTNG_VIEWER_SEEK_LAST:
- vstream->current_tracefile_id = stream->current_tracefile_id;
+ vstream->current_tracefile_id =
+ tracefile_array_get_file_index_head(stream->tfa);
+ /*
+ * We seek at the very end of each stream, awaiting for
+ * a future packet to eventually come in.
+ *
+ * We don't need to check the head position for -1ULL since the
+ * increment will set it to 0.
+ */
+ vstream->index_sent_seqcount =
+ tracefile_array_get_seq_head(stream->tfa) + 1;
break;
default:
- goto error;
- }
- if (!stream_get(stream)) {
- ERR("Cannot get stream");
- goto error;
+ goto error_unlock;
}
- vstream->stream = stream;
- pthread_mutex_lock(&stream->lock);
/*
- * If we never received an index for the current stream, delay the opening
- * of the index, otherwise open it right now.
+ * If we never received an index for the current stream, delay
+ * the opening of the index, otherwise open it right now.
*/
- if (vstream->current_tracefile_id == stream->current_tracefile_id
- && stream->total_index_received == 0) {
+ if (stream->index_received_seqcount == 0) {
vstream->index_fd = NULL;
} else {
int read_fd;
if (lseek_ret < 0) {
goto error_unlock;
}
- vstream->last_sent_index = stream->total_index_received;
}
- pthread_mutex_unlock(&stream->lock);
-
if (stream->is_metadata) {
rcu_assign_pointer(stream->trace->viewer_metadata_stream,
vstream);
}
+ pthread_mutex_unlock(&stream->lock);
/* Globally visible after the add unique. */
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
rcu_read_unlock();
}
-/*
- * Returns whether the current tracefile is readable. If not, it has
- * been overwritten.
- * Must be called with rstream lock held.
- */
-bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream,
- uint64_t seq)
-{
- struct relay_stream *stream = vstream->stream;
-
- if (seq >= stream->oldest_tracefile_seq
- && seq <= stream->current_tracefile_seq) {
- /* seq is a readable file. */
- return true;
- } else {
- /* seq is not readable. */
- return false;
- }
-}
-
/*
* Rotate a stream to the next tracefile.
*
{
int ret;
struct relay_stream *stream = vstream->stream;
+ uint64_t new_id;
/* Detect the last tracefile to open. */
- if (stream->total_index_received == vstream->last_sent_index
+ if (stream->index_received_seqcount
+ == vstream->index_sent_seqcount
&& stream->trace->session->connection_closed) {
ret = 1;
goto end;
goto end;
}
- if (!viewer_stream_is_tracefile_seq_readable(vstream,
- vstream->current_tracefile_seq + 1)) {
- vstream->current_tracefile_id =
- stream->oldest_tracefile_id;
- vstream->current_tracefile_seq =
- stream->oldest_tracefile_seq;
+ /*
+ * Try to move to the next file.
+ */
+ new_id = (vstream->current_tracefile_id + 1)
+ % stream->tracefile_count;
+ if (tracefile_array_seq_in_file(stream->tfa, new_id,
+ vstream->index_sent_seqcount)) {
+ vstream->current_tracefile_id = new_id;
} else {
+ uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
+
+ /*
+ * This can only be reached on overwrite, which implies there
+ * has been data written at some point, which will have set the
+ * tail.
+ */
+ assert(seq_tail != -1ULL);
+ /*
+ * We need to resync because we lag behind tail.
+ */
vstream->current_tracefile_id =
- (vstream->current_tracefile_id + 1)
- % stream->tracefile_count;
- vstream->current_tracefile_seq++;
+ tracefile_array_get_file_index_tail(stream->tfa);
+ vstream->index_sent_seqcount = seq_tail;
}
if (vstream->index_fd) {
char *channel_name;
uint64_t current_tracefile_id;
- /* Free-running counter. */
- uint64_t current_tracefile_seq;
- uint64_t last_sent_index;
+ /*
+ * Counts the number of sent indexes. The "tag" associated
+ * with an index to send is the current index_received_seqcount,
+ * because we increment index_received_seqcount after sending
+ * each index. This index_received_seqcount counter can also be
+ * updated when catching up with the producer.
+ */
+ uint64_t index_sent_seqcount;
/* Indicates if this stream has been sent to a viewer client. */
bool sent_flag;