From: Jérémie Galarneau Date: Mon, 12 Nov 2018 21:52:26 +0000 (-0500) Subject: Fix: split index and data file rotation logic X-Git-Tag: v2.12.0-rc1~771 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=c6db3843828a8fbf08444a2bc4191291a4807936;p=lttng-tools.git Fix: split index and data file rotation logic Issue --- There is no guarantee that index and data positions (sequence numbers) match at a given time as both are received on different sockets. Currently, the relay determines whether or not it should rotate a stream's data and index files based on the lowest sequence number of both the data and index file. When the data connection "lags" behind the control connection, index entries received on the control connection that belong in the "next" chunk are written to the "previous" chunk's index file. While there is logic to ensure that trace data received after the current rotation position is copied to the "previous" chunk when the rotation is performed, the same is not true for the index file rotation. This is fine as it should not be needed. The consumerd <-> relayd protocol ensures that a rotation command issued on a stream will provide a rotation position that is either: - the stream's current position, - a position that will be reached in the future. Then, the control connection will receive index entries that are either <= to the rotation target sequence number. Therefore, it is correct to check if an index file should be rotated everytime an index entry is flushed and change the index output file when the rotation position is reached. Again, this is not true for the data connection as it would be possible to receive a rotation command with a position that is before the current data sequence number. Solution --- This change splits the logic to evaluate the moment at which a rotation should be performed (changing the actual file to which data is written) so that both index and data files are rotated independently. Hence, when an index is flushed, the relay will always evaluate whether or not it should switch the index file to its new destination (i.e. perform the rotation of the index). The "data" reception logic remains mostly unchanged, except that the trace sequence number is no longer used. Only the data stream's position is considered to evaluate whether the data stream file should be rotated. Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index cf6e01ee0..1e2e9050f 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1570,10 +1570,12 @@ end: } static -int do_rotate_stream(struct relay_stream *stream) +int do_rotate_stream_data(struct relay_stream *stream) { int ret; + DBG("Rotating stream %" PRIu64 " data file", + stream->stream_handle); /* Perform the stream rotation. */ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, stream->tracefile_size, @@ -1585,19 +1587,17 @@ int do_rotate_stream(struct relay_stream *stream) goto end; } stream->tracefile_size_current = 0; - - /* Rotate also the index if the stream is not a metadata stream. */ - if (!stream->is_metadata) { - ret = create_rotate_index_file(stream, stream->path_name); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end; - } - } - - stream->rotate_at_seq_num = -1ULL; stream->pos_after_last_complete_data_index = 0; + stream->data_rotated = true; + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } end: return ret; } @@ -1609,9 +1609,7 @@ end: * connections are separate, the indexes as well as the commands arrive from * the control connection and we have no control over the order so we could be * in a situation where too much data has been received on the data connection - * before the rotation command on the control connection arrives. We don't need - * to update the index because its order is guaranteed with the rotation - * command message. + * before the rotation command on the control connection arrives. */ static int rotate_truncate_stream(struct relay_stream *stream) @@ -1710,12 +1708,6 @@ int rotate_truncate_stream(struct relay_stream *stream) goto end; } - ret = create_rotate_index_file(stream, stream->path_name); - if (ret < 0) { - ERR("Rotate stream index file"); - goto end; - } - /* * Update the offset and FD of all the eventual indexes created by the * data connection before the rotation command arrived. @@ -1738,30 +1730,94 @@ end: } /* - * Check if a stream should perform a rotation (for session rotation). + * Check if a stream's index file should be rotated (for session rotation). * Must be called with the stream lock held. * * Return 0 on success, a negative value on error. */ static -int try_rotate_stream(struct relay_stream *stream) +int try_rotate_stream_index(struct relay_stream *stream) { int ret = 0; - uint64_t trace_seq; - /* No rotation expected. */ if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ goto end; } - trace_seq = min(stream->prev_data_seq, stream->prev_index_seq); - if (stream->prev_data_seq == -1ULL || stream->prev_index_seq == -1ULL || - trace_seq < stream->rotate_at_seq_num) { - DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + if (stream->index_rotated) { + /* Rotation of the index has already occurred. */ + goto end; + } + + if (stream->prev_index_seq == -1ULL || + stream->prev_index_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_index_seq); + goto end; + } else if (stream->prev_index_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", stream->stream_handle, stream->rotate_at_seq_num, stream->prev_data_seq, stream->prev_index_seq); + ret = -1; + goto end; + } else { + DBG("Rotating stream %" PRIu64 " index file", + stream->stream_handle); + ret = create_rotate_index_file(stream, stream->path_name); + stream->index_rotated = true; + + if (stream->data_rotated && stream->index_rotated) { + /* Rotation completed; reset its state. */ + DBG("Rotation completed for stream %" PRIu64, + stream->stream_handle); + stream->rotate_at_seq_num = -1ULL; + stream->data_rotated = false; + stream->index_rotated = false; + } + } + +end: + return ret; +} + +/* + * Check if a stream's data file (as opposed to index) should be rotated + * (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int try_rotate_stream_data(struct relay_stream *stream) +{ + int ret = 0; + + if (stream->rotate_at_seq_num == -1ULL) { + /* No rotation expected. */ + goto end; + } + + if (stream->data_rotated) { + /* Rotation of the data file has already occurred. */ + goto end; + } + + if (stream->prev_data_seq == -1ULL || + stream->prev_data_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", + stream->stream_handle, + stream->rotate_at_seq_num, + stream->prev_data_seq); goto end; } else if (stream->prev_data_seq > stream->rotate_at_seq_num) { /* @@ -1776,24 +1832,20 @@ int try_rotate_stream(struct relay_stream *stream) ERR("Failed to truncate stream"); goto end; } - } else { - if (trace_seq != stream->rotate_at_seq_num) { - /* - * Unexpected, protocol error/bug. - * It could mean that we received a rotation position - * that is in the past. - */ - ERR("Stream %" PRIu64 " is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + } else if (stream->prev_data_seq != stream->rotate_at_seq_num) { + /* + * Unexpected, protocol error/bug. + * It could mean that we received a rotation position + * that is in the past. + */ + ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", stream->stream_handle, stream->rotate_at_seq_num, - stream->prev_data_seq, - stream->prev_index_seq); - ret = -1; - goto end; - } - DBG("Stream %" PRIu64 " ready for rotation", - stream->stream_handle); - ret = do_rotate_stream(stream); + stream->prev_data_seq); + ret = -1; + goto end; + } else { + ret = do_rotate_stream_data(stream); } end: @@ -1864,7 +1916,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); - ret = try_rotate_stream(metadata_stream); + ret = try_rotate_stream_data(metadata_stream); if (ret < 0) { goto end_put; } @@ -2366,6 +2418,11 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; stream->prev_index_seq = index_info.net_seq_num; + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_put; + } } else if (ret > 0) { /* no flush. */ ret = 0; @@ -2547,18 +2604,28 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { + /* + * Metadata streams have no index; consider its rotation + * complete. + */ + stream->index_rotated = true; /* * The metadata stream is sent only over the control connection * so we know we have all the data to perform the stream * rotation. */ - ret = do_rotate_stream(stream); + ret = do_rotate_stream_data(stream); } else { stream->rotate_at_seq_num = stream_info.rotate_at_seq_num; - ret = try_rotate_stream(stream); - } - if (ret < 0) { - goto end_stream_unlock; + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end_stream_unlock; + } + + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } end_stream_unlock: @@ -3568,6 +3635,10 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->pos_after_last_complete_data_index = stream->tracefile_size_current; stream->prev_index_seq = state->header.net_seq_num; + ret = try_rotate_stream_index(stream); + if (ret < 0) { + goto end_stream_unlock; + } } stream->prev_data_seq = state->header.net_seq_num; @@ -3580,7 +3651,7 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; - ret = try_rotate_stream(stream); + ret = try_rotate_stream_data(stream); if (ret < 0) { status = RELAY_CONNECTION_STATUS_ERROR; goto end_stream_unlock; diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index dcdacfbb6..5e23e7339 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -166,6 +166,14 @@ struct relay_stream { * Always access with stream lock held. */ uint64_t rotate_at_seq_num; + /* + * When rotate_at_seq_num != -1ULL, meaning that a rotation is ongoing, + * data_rotated and index_rotated respectively indicate if the stream's + * data and index have been rotated. A rotation is considered completed + * when both rotations have occurred. + */ + bool data_rotated; + bool index_rotated; /* * This is the id of the chunk where we are writing to if no rotation is * pending (rotate_at_seq_num == -1ULL). If a rotation is pending, this