summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
0b50e4b)
Initializing the relayd's streams with a stream_chunk_id allows the
relayd to differentiate between a stream created before the first
rotation (at chunk id == 0) vs. a stream that has been created
after the last (or pending) rotation.
Before this fix, the relayd can fail to identify that a rotation
has been completed.
This is caused by the fact that a stream's chunk id is initialized
to 0 and updated by the RELAYD_ROTATE_STREAM command to the
id of the chunk that is currently being rotated.
The 'stream->current_chunk_id.value < chunk_id' check performed
by the RELAYD_ROTATE_PENDING will cause rotations to never
complete for streams that are created between the launch of a
rotation and the check for its completion.
For example, when the relayd is checking whether the rotation id
'3' is completed, it may see streams with the default value of
their chunk id set to '0' and determine that a rotation is still
pending.
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
uint64_t stream_handle = -1ULL;
char *path_name = NULL, *channel_name = NULL;
uint64_t tracefile_size = 0, tracefile_count = 0;
+ struct relay_stream_chunk_id stream_chunk_id = { 0 };
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
if (!session || !conn->version_check_done) {
ERR("Trying to add a stream before version check");
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
/* We pass ownership of path_name and channel_name. */
stream = stream_create(trace, stream_handle, path_name,
- channel_name, tracefile_size, tracefile_count);
+ channel_name, tracefile_size, tracefile_count,
+ &stream_chunk_id);
path_name = NULL;
channel_name = NULL;
path_name = NULL;
channel_name = NULL;
goto end_stream_unlock;
}
goto end_stream_unlock;
}
- stream->chunk_id = stream_info.new_chunk_id;
+ assert(stream->current_chunk_id.is_set);
+ stream->current_chunk_id.value = stream_info.new_chunk_id;
if (stream->is_metadata) {
/*
if (stream->is_metadata) {
/*
rotate_pending = true;
DBG("Stream %" PRIu64 " is still rotating",
stream->stream_handle);
rotate_pending = true;
DBG("Stream %" PRIu64 " is still rotating",
stream->stream_handle);
- } else if (stream->chunk_id < chunk_id) {
+ } else if (stream->current_chunk_id.value < chunk_id) {
/*
* Stream closed on the consumer but still active on the
* relay.
/*
* Stream closed on the consumer but still active on the
* relay.
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ const struct relay_stream_chunk_id *chunk_id)
{
int ret;
struct relay_stream *stream = NULL;
{
int ret;
struct relay_stream *stream = NULL;
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
urcu_ref_init(&stream->ref);
ctf_trace_get(trace);
stream->trace = trace;
+ stream->current_chunk_id = *chunk_id;
stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!stream->indexes_ht) {
stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!stream->indexes_ht) {
#include "stream-fd.h"
#include "tracefile-array.h"
#include "stream-fd.h"
#include "tracefile-array.h"
+struct relay_stream_chunk_id {
+ bool is_set;
+ uint64_t value;
+};
+
/*
* Represents a stream in the relay
*/
/*
* Represents a stream in the relay
*/
* atomically with rotate_at_seq_num.
*
* Always access with stream lock held.
* atomically with rotate_at_seq_num.
*
* Always access with stream lock held.
+ *
+ * This attribute is not set if the stream is created by a pre-2.11
+ * consumer.
+ struct relay_stream_chunk_id current_chunk_id;
};
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
};
struct relay_stream *stream_create(struct ctf_trace *trace,
uint64_t stream_handle, char *path_name,
char *channel_name, uint64_t tracefile_size,
- uint64_t tracefile_count);
+ uint64_t tracefile_count, const struct relay_stream_chunk_id *chunk_id);
struct relay_stream *stream_get_by_id(uint64_t stream_id);
bool stream_get(struct relay_stream *stream);
struct relay_stream *stream_get_by_id(uint64_t stream_id);
bool stream_get(struct relay_stream *stream);