From: Jérémie Galarneau Date: Tue, 10 Sep 2019 14:27:22 +0000 (-0400) Subject: Fix: use the trace chunk to truncate streams on late rotation X-Git-Tag: v2.11.0-rc4~75 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=f40001f7dbb65d9add273317e1598df5da8906c5;p=lttng-tools.git Fix: use the trace chunk to truncate streams on late rotation A stream's rotation can occur after the reception of data that should be part of the "next" trace chunk. In those cases, the current stream file and the next one (belonging to the new trace chunk) need to be opened. The misplaced data is copied between both files and the now-old file is closed. This code was not transitioned to use the trace chunk interface and is the last user of raw stream file FDs. This patch transitions the function (rewrites it, really) to use the trace chunk interface. Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index efe132d1c..c02820d31 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -77,132 +77,6 @@ static void stream_complete_rotation(struct relay_stream *stream) stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {}; } -/* - * If too much data has been written in a tracefile before we received the - * rotation command, we have to move the excess data to the new tracefile and - * perform the rotation. This can happen because the control and data - * connections are separate, the indexes as well as the commands arrive from - * the control connection and we have no control over the order so we could be - * in a situation where too much data has been received on the data connection - * before the rotation command on the control connection arrives. - */ -static int rotate_truncate_stream(struct relay_stream *stream) -{ - int ret, new_fd; - off_t lseek_ret; - uint64_t diff, pos = 0; - char buf[FILE_IO_STACK_BUFFER_SIZE]; - - assert(!stream->is_metadata); - - assert(stream->tracefile_size_current > - stream->pos_after_last_complete_data_index); - diff = stream->tracefile_size_current - - stream->pos_after_last_complete_data_index; - - /* Create the new tracefile. */ - new_fd = utils_create_stream_file(stream->path_name, - stream->channel_name, - stream->tracefile_size, stream->tracefile_count, - /* uid */ -1, /* gid */ -1, /* suffix */ NULL); - if (new_fd < 0) { - ERR("Failed to create new stream file at path %s for channel %s", - stream->path_name, stream->channel_name); - ret = -1; - goto end; - } - - /* - * Rewind the current tracefile to the position at which the rotation - * should have occurred. - */ - lseek_ret = lseek(stream->stream_fd->fd, - stream->pos_after_last_complete_data_index, SEEK_SET); - if (lseek_ret < 0) { - PERROR("seek truncate stream"); - ret = -1; - goto end; - } - - /* Move data from the old file to the new file. */ - while (pos < diff) { - uint64_t count, bytes_left; - ssize_t io_ret; - - bytes_left = diff - pos; - count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left; - assert(count <= SIZE_MAX); - - io_ret = lttng_read(stream->stream_fd->fd, buf, count); - if (io_ret < (ssize_t) count) { - char error_string[256]; - - snprintf(error_string, sizeof(error_string), - "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi", - count, stream->stream_fd->fd, io_ret); - if (io_ret == -1) { - PERROR("%s", error_string); - } else { - ERR("%s", error_string); - } - ret = -1; - goto end; - } - - io_ret = lttng_write(new_fd, buf, count); - if (io_ret < (ssize_t) count) { - char error_string[256]; - - snprintf(error_string, sizeof(error_string), - "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi", - count, new_fd, io_ret); - if (io_ret == -1) { - PERROR("%s", error_string); - } else { - ERR("%s", error_string); - } - ret = -1; - goto end; - } - - pos += count; - } - - /* Truncate the file to get rid of the excess data. */ - ret = ftruncate(stream->stream_fd->fd, - stream->pos_after_last_complete_data_index); - if (ret) { - PERROR("ftruncate"); - goto end; - } - - ret = close(stream->stream_fd->fd); - if (ret < 0) { - PERROR("Closing tracefile"); - goto end; - } - - /* - * Update the offset and FD of all the eventual indexes created by the - * data connection before the rotation command arrived. - */ - ret = relay_index_switch_all_files(stream); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end; - } - - stream->stream_fd->fd = new_fd; - stream->tracefile_size_current = diff; - stream->pos_after_last_complete_data_index = 0; - stream_complete_rotation(stream); - - ret = 0; - -end: - return ret; -} - static int stream_create_data_output_file_from_trace_chunk( struct relay_stream *stream, struct lttng_trace_chunk *trace_chunk, @@ -318,6 +192,150 @@ end: return ret; } +/* + * If too much data has been written in a tracefile before we received the + * rotation command, we have to move the excess data to the new tracefile and + * perform the rotation. This can happen because the control and data + * connections are separate, the indexes as well as the commands arrive from + * the control connection and we have no control over the order so we could be + * in a situation where too much data has been received on the data connection + * before the rotation command on the control connection arrives. + */ +static int rotate_truncate_stream(struct relay_stream *stream) +{ + int ret; + off_t lseek_ret, previous_stream_copy_origin; + uint64_t copy_bytes_left, misplaced_data_size; + bool acquired_reference; + struct stream_fd *previous_stream_fd = NULL; + struct lttng_trace_chunk *previous_chunk = NULL; + + ASSERT_LOCKED(stream->lock); + /* + * Acquire a reference to the current trace chunk to ensure + * it is not reclaimed when `stream_rotate_data_file` is called. + * Failing to do so would violate the contract of the trace + * chunk API as an active file descriptor would outlive the + * trace chunk. + */ + acquired_reference = lttng_trace_chunk_get(stream->trace_chunk); + assert(acquired_reference); + previous_chunk = stream->trace_chunk; + + /* + * Steal the stream's reference to its stream_fd. A new + * stream_fd will be created when the rotation completes and + * the orinal stream_fd will be used to copy the "extra" data + * to the new file. + */ + assert(stream->stream_fd); + previous_stream_fd = stream->stream_fd; + stream->stream_fd = NULL; + + assert(!stream->is_metadata); + assert(stream->tracefile_size_current > + stream->pos_after_last_complete_data_index); + misplaced_data_size = stream->tracefile_size_current - + stream->pos_after_last_complete_data_index; + copy_bytes_left = misplaced_data_size; + previous_stream_copy_origin = stream->pos_after_last_complete_data_index; + + ret = stream_rotate_data_file(stream); + if (ret) { + goto end; + } + + /* + * Seek the current tracefile to the position at which the rotation + * should have occurred. + */ + lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin, + SEEK_SET); + if (lseek_ret < 0) { + PERROR("Failed to seek to offset %" PRIu64 + " while copying extra data received before a stream rotation", + (uint64_t) previous_stream_copy_origin); + ret = -1; + goto end; + } + + /* Move data from the old file to the new file. */ + while (copy_bytes_left) { + ssize_t io_ret; + char copy_buffer[FILE_IO_STACK_BUFFER_SIZE]; + const off_t copy_size_this_pass = min_t( + off_t, copy_bytes_left, sizeof(copy_buffer)); + + io_ret = lttng_read(previous_stream_fd->fd, copy_buffer, + copy_size_this_pass); + if (io_ret < (ssize_t) copy_size_this_pass) { + if (io_ret == -1) { + PERROR("Failed to read %" PRIu64 + " bytes from fd %i in %s(), returned %zi", + copy_size_this_pass, + previous_stream_fd->fd, + __FUNCTION__, io_ret); + } else { + ERR("Failed to read %" PRIu64 + " bytes from fd %i in %s(), returned %zi", + copy_size_this_pass, + previous_stream_fd->fd, + __FUNCTION__, io_ret); + } + ret = -1; + goto end; + } + + io_ret = lttng_write(stream->stream_fd->fd, copy_buffer, + copy_size_this_pass); + if (io_ret < (ssize_t) copy_size_this_pass) { + if (io_ret == -1) { + PERROR("Failed to write %" PRIu64 + " bytes from fd %i in %s(), returned %zi", + copy_size_this_pass, + stream->stream_fd->fd, + __FUNCTION__, io_ret); + } else { + ERR("Failed to write %" PRIu64 + " bytes from fd %i in %s(), returned %zi", + copy_size_this_pass, + stream->stream_fd->fd, + __FUNCTION__, io_ret); + } + ret = -1; + goto end; + } + copy_bytes_left -= copy_size_this_pass; + } + + /* Truncate the file to get rid of the excess data. */ + ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin); + if (ret) { + PERROR("Failed to truncate current stream file to offset %" PRIu64, + previous_stream_copy_origin); + goto end; + } + + /* + * Update the offset and FD of all the eventual indexes created by the + * data connection before the rotation command arrived. + */ + ret = relay_index_switch_all_files(stream); + if (ret < 0) { + ERR("Failed to rotate index file"); + goto end; + } + + stream->tracefile_size_current = misplaced_data_size; + /* Index and data contents are back in sync. */ + stream->pos_after_last_complete_data_index = 0; + ret = 0; +end: + lttng_trace_chunk_put(previous_chunk); + stream_fd_put(previous_stream_fd); + return ret; +} + /* * Check if a stream's data file (as opposed to index) should be rotated * (for session rotation).