From: Mathieu Desnoyers Date: Tue, 5 Nov 2019 15:40:58 +0000 (-0500) Subject: Fix: relayd: use packet sequence number for rotation position X-Git-Tag: v2.11.1~19 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=e4f82f7ae9cc080766bd3684d6297907035192ab;p=lttng-tools.git Fix: relayd: use packet sequence number for rotation position The "network" sequence number (net_seq_num) is a 64-bit sequence number tagging each packet sent over the network. The net_seq_num increments monotonically (+1) for each packet sent from consumer daemon to relay daemon, on a per-stream basis. It is tagged by the consumer daemon when sending a trace packet to the relay daemon. The LTTng kernel and user-space ring buffer "consumed position" (consumed_pos) and "produced position" (produced_pos) are free-running counters counting the number of bytes consumed and produced so far by each stream. Because those counters are updated atomically, they are limited to a size of 32-bit on 32-bit architectures. The "packet" sequence number (packet_seq_num) is a sequence number found in the packet header starting from LTTng 2.8. It is a 64-bit sequence number assigned by the lttng-modules and lttng-ust ring buffers. It increments monotonically (+1) for each packet produced within a given ring buffer (stream). Using produced_pos as rotation position and comparing it to the net_seq_num has a few issues: 1) It breaks on 32-bit producers after generating more than 4GB of data per stream, due to overflow. The net_seq_num is a 64-bit counter, which does not overflow, but the produced_pos overflows after 4GB on 32-bit architectures. This can lead to never-completing rotations. 2) It breaks scenarios where ring buffers are configured in overwrite mode, and streaming to a relay daemon. Indeed, when the ring buffer moves the consumed_pos ahead, actually overwriting data within the ring buffer, it introduces an offset between the produced_pos and the net_seq_num. Therefore, if producers are generating a low- (or no-) throughput in some streams, the rotation may never complete, even on 64-bit architectures. The solution proposed for this issue is to use the packet_seq_num as rotation position rather than the net_seq_num. It takes care of the two problematic scenarios, since the counter is always 64-bit (even on 32-bit architectures), and because the counter is managed by the producer, which therefore tracks progress of the ring buffer overwrites. This commit introduces changes required at the relayd side. A separate commit introduces the changes required in the consumerd. In relayd, one major restriction is the fact that the packet_seq_num is not sent over the data socket, only through the control socket receiving the indexes. Therefore, in order to figure out the pivot position for the data socket for a given stream, the associated index first needs to be received. At that point, the corresponding net_seq_num is known, which provides the pivot position for the data stream. Given that the data and index sockets provide no ordering guarantees with respect to their arrival, we handle the fact that data might have been saved to disk in the wrong (previous) trace chunk by moving it to the next trace chunk when the pivot position is known. In order to allow "jumps" in the sequence numbers produced by overwrite mode buffers, try_rotate_stream_index(), which previously asserted that each sequence number was received in sequence, now uses the packet_seq_num pivot position as a lower (inclusive) bound. Signed-off-by: Mathieu Desnoyers Change-Id: I755329e313f0980655a164b7bdb57e4f3d8e944a Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index 3cae94e8e..bdbd11330 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -433,6 +433,11 @@ int relay_index_set_control_data(struct relay_index *index, if (minor_version >= 8) { index->index_data.stream_instance_id = htobe64(data->stream_instance_id); index->index_data.packet_seq_num = htobe64(data->packet_seq_num); + } else { + uint64_t unset_value = -1ULL; + + index->index_data.stream_instance_id = htobe64(unset_value); + index->index_data.packet_seq_num = htobe64(unset_value); } return relay_index_set_data(index, &index_data); diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 6720eca96..0100bd6b1 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2071,6 +2071,9 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.stream_instance_id = be64toh(index_info.stream_instance_id); index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } else { + index_info.stream_instance_id = -1ULL; + index_info.packet_seq_num = -1ULL; } stream = stream_get_by_id(index_info.relay_stream_id); diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 755fb6734..c0aeb1719 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -368,18 +368,23 @@ static int try_rotate_stream_data(struct relay_stream *stream) } if (stream->prev_data_seq == -1ULL || - stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) { + stream->ongoing_rotation.value.prev_data_net_seq == -1ULL || + stream->prev_data_seq < + stream->ongoing_rotation.value.prev_data_net_seq) { /* * The next packet that will be written is not part of the next * chunk yet. */ - DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 + DBG("Stream %" PRIu64 " data not yet ready for rotation " + "(rotate_at_index_packet_seq_num = %" PRIu64 + ", rotate_at_prev_data_net_seq = %" PRIu64 ", prev_data_seq = %" PRIu64 ")", stream->stream_handle, - stream->ongoing_rotation.value.seq_num, + stream->ongoing_rotation.value.packet_seq_num, + stream->ongoing_rotation.value.prev_data_net_seq, stream->prev_data_seq); goto end; - } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) { + } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) { /* * prev_data_seq is checked here since indexes and rotation * commands are serialized with respect to each other. @@ -477,23 +482,38 @@ static int try_rotate_stream_index(struct relay_stream *stream) goto end; } - if (stream->prev_index_seq == -1ULL || - stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) { - DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")", + if (!stream->received_packet_seq_num.is_set || + LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 < + stream->ongoing_rotation.value.packet_seq_num) { + DBG("Stream %" PRIu64 " index not yet ready for rotation " + "(rotate_at_packet_seq_num = %" PRIu64 + ", received_packet_seq_num = " + "(value = %" PRIu64 ", is_set = %" PRIu8 "))", stream->stream_handle, - stream->ongoing_rotation.value.seq_num, - stream->prev_index_seq); + stream->ongoing_rotation.value.packet_seq_num, + stream->received_packet_seq_num.value, + stream->received_packet_seq_num.is_set); goto end; } else { - /* The next index belongs to the new trace chunk; rotate. */ - assert(stream->prev_index_seq + 1 == - stream->ongoing_rotation.value.seq_num); + /* + * The next index belongs to the new trace chunk; rotate. + * In overwrite mode, the packet seq num may jump over the + * rotation position. + */ + assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >= + stream->ongoing_rotation.value.packet_seq_num); DBG("Rotating stream %" PRIu64 " index file", stream->stream_handle); ret = create_index_file(stream, stream->ongoing_rotation.value.next_trace_chunk); stream->ongoing_rotation.value.index_rotated = true; + /* + * Set the rotation pivot position for the data, now that we have the + * net_seq_num matching the packet_seq_num index pivot position. + */ + stream->ongoing_rotation.value.prev_data_net_seq = + stream->prev_index_seq; if (stream->ongoing_rotation.value.data_rotated && stream->ongoing_rotation.value.index_rotated) { /* Rotation completed; reset its state. */ @@ -798,7 +818,10 @@ int stream_set_pending_rotation(struct relay_stream *stream, { int ret = 0; const struct relay_stream_rotation rotation = { - .seq_num = rotation_sequence_number, + .data_rotated = false, + .index_rotated = false, + .packet_seq_num = rotation_sequence_number, + .prev_data_net_seq = -1ULL, .next_trace_chunk = next_trace_chunk, }; @@ -816,7 +839,8 @@ int stream_set_pending_rotation(struct relay_stream *stream, } LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation); - DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64, + DBG("Setting pending rotation: stream_id = %" PRIu64 + ", rotate_at_packet_seq_num = %" PRIu64, stream->stream_handle, rotation_sequence_number); if (stream->is_metadata) { /* @@ -825,12 +849,12 @@ int stream_set_pending_rotation(struct relay_stream *stream, stream->ongoing_rotation.value.index_rotated = true; ret = stream_rotate_data_file(stream); } else { - ret = try_rotate_stream_data(stream); + ret = try_rotate_stream_index(stream); if (ret < 0) { goto end; } - ret = try_rotate_stream_index(stream); + ret = try_rotate_stream_data(stream); if (ret < 0) { goto end; } @@ -1132,6 +1156,8 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num, tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ); tracefile_array_commit_seq(stream->tfa); stream->index_received_seqcount++; + LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num, + be64toh(index->index_data.packet_seq_num)); *flushed = true; } else if (ret > 0) { index->total_size = total_size; @@ -1228,11 +1254,17 @@ int stream_add_index(struct relay_stream *stream, stream->index_received_seqcount++; stream->pos_after_last_complete_data_index += index->total_size; stream->prev_index_seq = index_info->net_seq_num; + LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num, + index_info->packet_seq_num); ret = try_rotate_stream_index(stream); if (ret < 0) { goto end; } + ret = try_rotate_stream_data(stream); + if (ret < 0) { + goto end; + } } else if (ret > 0) { /* no flush. */ ret = 0; diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index b8d14ecfb..02948fa8e 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -44,10 +44,15 @@ struct relay_stream_rotation { bool data_rotated; bool index_rotated; /* - * Sequence number of the first packet of the new trace chunk to which - * the stream is rotating. + * Packet sequence number of the first packet of the new trace chunk to + * which the stream is rotating. */ - uint64_t seq_num; + uint64_t packet_seq_num; + /* + * Monotonically increasing previous network sequence number of first + * data packet of the new trace chunk to which the stream is rotating. + */ + uint64_t prev_data_net_seq; struct lttng_trace_chunk *next_trace_chunk; }; @@ -111,6 +116,12 @@ struct relay_stream { */ uint64_t index_received_seqcount; + /* + * Packet sequence number of the last received packet index. + * Only populated when interacting with CTF_INDEX 1.1+. + */ + LTTNG_OPTIONAL(uint64_t) received_packet_seq_num; + /* * Tracefile array is an index of the stream trace files, * indexed by position. It allows keeping track of the oldest diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 5c5368391..61829f7bc 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -29,7 +29,7 @@ struct relayd_stream_rotation_position { uint64_t stream_id; /* - * Sequence number of the first packet belonging to the new + * Packet sequence number of the first packet belonging to the new * "destination" trace chunk to which the stream is rotating. * * Ignored for metadata streams.