#include "lttng-relayd.h"
#include "viewer-stream.h"
+static void viewer_stream_release_composite_objects(struct relay_viewer_stream *vstream)
+{
+ if (vstream->stream_file.handle) {
+ fs_handle_close(vstream->stream_file.handle);
+ vstream->stream_file.handle = NULL;
+ }
+ if (vstream->index_file) {
+ lttng_index_file_put(vstream->index_file);
+ vstream->index_file = NULL;
+ }
+ if (vstream->stream) {
+ stream_put(vstream->stream);
+ vstream->stream = NULL;
+ }
+ lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+ vstream->stream_file.trace_chunk = NULL;
+}
+
static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
{
free(vstream->path_name);
enum lttng_viewer_seek seek_t)
{
struct relay_viewer_stream *vstream = NULL;
- const bool acquired_reference = lttng_trace_chunk_get(trace_chunk);
ASSERT_LOCKED(stream->lock);
- if (!acquired_reference) {
- goto error;
- }
vstream = zmalloc(sizeof(*vstream));
if (!vstream) {
goto error;
}
+ if (trace_chunk) {
+ const bool acquired_reference = lttng_trace_chunk_get(
+ trace_chunk);
+
+ assert(acquired_reference);
+ }
+
vstream->stream_file.trace_chunk = trace_chunk;
- trace_chunk = NULL;
vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
if (vstream->path_name == NULL) {
PERROR("relay viewer path_name alloc");
*/
if (stream->index_file == NULL) {
vstream->index_file = NULL;
- } else {
+ } else if (vstream->stream_file.trace_chunk) {
const uint32_t connection_major = stream->trace->session->major;
const uint32_t connection_minor = stream->trace->session->minor;
enum lttng_trace_chunk_status chunk_status;
* If we never received a data file for the current stream, delay the
* opening, otherwise open it right now.
*/
- if (stream->file) {
+ if (stream->file && vstream->stream_file.trace_chunk) {
int ret;
char file_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
vstream);
}
+ vstream->last_seen_rotation_count = stream->completed_rotation_count;
+
/* Globally visible after the add unique. */
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
urcu_ref_init(&vstream->ref);
error:
if (vstream) {
+ /* Not using `put` since vstream is assumed to be published. */
+ viewer_stream_release_composite_objects(vstream);
viewer_stream_destroy(vstream);
}
- if (trace_chunk && acquired_reference) {
- lttng_trace_chunk_put(trace_chunk);
- }
return NULL;
}
if (vstream->stream->is_metadata) {
rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL);
}
-
viewer_stream_unpublish(vstream);
-
- if (vstream->stream_file.handle) {
- fs_handle_close(vstream->stream_file.handle);
- vstream->stream_file.handle = NULL;
- }
- if (vstream->index_file) {
- lttng_index_file_put(vstream->index_file);
- vstream->index_file = NULL;
- }
- if (vstream->stream) {
- stream_put(vstream->stream);
- vstream->stream = NULL;
- }
- lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
- vstream->stream_file.trace_chunk = NULL;
+ viewer_stream_release_composite_objects(vstream);
call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
}
if (seq_tail == -1ULL) {
seq_tail = 0;
}
- vstream->index_sent_seqcount = seq_tail;
+
+ /*
+ * Move the index sent seqcount forward if it was lagging behind
+ * the new tail of the tracefile array. If the current
+ * index_sent_seqcount is already further than the tracefile
+ * array tail position, keep its current position.
+ */
+ vstream->index_sent_seqcount = seq_tail > vstream->index_sent_seqcount ?
+ seq_tail : vstream->index_sent_seqcount;
}
/*