uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
+ struct relay_stream_chunk_id stream_chunk_id = { 0 };
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
- channel_name, tracefile_size, tracefile_count);
+ channel_name, tracefile_size, tracefile_count,
+ &stream_chunk_id);
path_name = NULL;
channel_name = NULL;
goto end_stream_unlock;
}
- stream->chunk_id = stream_info.new_chunk_id;
+ assert(stream->current_chunk_id.is_set);
+ stream->current_chunk_id.value = stream_info.new_chunk_id;
if (stream->is_metadata) {
/*
rotate_pending = true;
DBG("Stream %" PRIu64 " is still rotating",
stream->stream_handle);
- } else if (stream->chunk_id < chunk_id) {
+ } else if (stream->current_chunk_id.value < chunk_id) {
/*
* Stream closed on the consumer but still active on the
* relay.
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ const struct relay_stream_chunk_id *chunk_id)
{
int ret;
struct relay_stream *stream = NULL;
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
+ stream->current_chunk_id = *chunk_id;
stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!stream->indexes_ht) {
#include "stream-fd.h"
#include "tracefile-array.h"
+struct relay_stream_chunk_id {
+ bool is_set;
+ uint64_t value;
+};
+
/*
* Represents a stream in the relay
*/
* atomically with rotate_at_seq_num.
*
* Always access with stream lock held.
+ *
+ * This attribute is not set if the stream is created by a pre-2.11
+ * consumer.
*/
- uint64_t chunk_id;
+ struct relay_stream_chunk_id current_chunk_id;
};
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count);
+ uint64_t tracefile_count, const struct relay_stream_chunk_id *chunk_id);
struct relay_stream *stream_get_by_id(uint64_t stream_id);
bool stream_get(struct relay_stream *stream);