LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER = 150, /* trace chunk close failure on consumer */
LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER = 151, /* failed to query consumer for trace chunk existence */
LTTNG_ERR_INVALID_PROTOCOL = 152, /* a protocol error occurred */
+ LTTNG_ERR_FILE_CREATION_ERROR = 153, /* failed to create a file */
/* MUST be last element */
LTTNG_ERR_NR, /* Last element */
/* Trace sub-folder relative to the session output path. */
char *path;
- bool index_folder_created;
/*
* The ctf_trace lock nests inside the session lock.
#include "lttng-relayd.h"
#include "stream.h"
#include "index.h"
+#include "connection.h"
/*
* Allocate a new relay index object. Pass the stream in which it is
rcu_read_unlock();
return ret;
}
+
+/*
+ * Set index data from the control port to a given index object.
+ */
+int relay_index_set_control_data(struct relay_index *index,
+ const struct lttcomm_relayd_index *data,
+ unsigned int minor_version)
+{
+ /* The index on disk is encoded in big endian. */
+ const struct ctf_packet_index index_data = {
+ .packet_size = htobe64(data->packet_size),
+ .content_size = htobe64(data->content_size),
+ .timestamp_begin = htobe64(data->timestamp_begin),
+ .timestamp_end = htobe64(data->timestamp_end),
+ .events_discarded = htobe64(data->events_discarded),
+ .stream_id = htobe64(data->stream_id),
+ };
+
+ if (minor_version >= 8) {
+ index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
+ index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+ }
+
+ return relay_index_set_data(index, &index_data);
+}
#include "stream-fd.h"
struct relay_stream;
+struct relay_connection;
+struct lttcomm_relayd_index;
struct relay_index {
/*
void relay_index_close_partial_fd(struct relay_stream *stream);
uint64_t relay_index_find_last(struct relay_stream *stream);
int relay_index_switch_all_files(struct relay_stream *stream);
+int relay_index_set_control_data(struct relay_index *index,
+ const struct lttcomm_relayd_index *data,
+ unsigned int minor_version);
#endif /* _RELAY_INDEX_H */
/* Size of receive buffer. */
#define RECV_DATA_BUFFER_SIZE 65536
-#define FILE_COPY_BUFFER_SIZE 65536
static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */
static pid_t child_ppid; /* Internal parent PID use with daemonize. */
return NULL;
}
-/*
- * Set index data from the control port to a given index object.
- */
-static int set_index_control_data(struct relay_index *index,
- struct lttcomm_relayd_index *data,
- struct relay_connection *conn)
-{
- struct ctf_packet_index index_data;
-
- /*
- * The index on disk is encoded in big endian.
- */
- index_data.packet_size = htobe64(data->packet_size);
- index_data.content_size = htobe64(data->content_size);
- index_data.timestamp_begin = htobe64(data->timestamp_begin);
- index_data.timestamp_end = htobe64(data->timestamp_end);
- index_data.events_discarded = htobe64(data->events_discarded);
- index_data.stream_id = htobe64(data->stream_id);
-
- if (conn->minor >= 8) {
- index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
- index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
- }
-
- return relay_index_set_data(index, &index_data);
-}
-
static bool session_streams_have_index(const struct relay_session *session)
{
return session->minor >= 4 && !session->snapshot;
goto end_unlock;
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- 0, 0, -1, -1, stream->stream_fd->fd, NULL,
- &stream->stream_fd->fd);
+ ret = stream_reset_file(stream);
if (ret < 0) {
- ERR("Failed to rotate metadata file %s of channel %s",
- stream->path_name, stream->channel_name);
+ ERR("Failed to reset metadata stream %" PRIu64
+ ": stream_path = %s, channel = %s",
+ stream->stream_handle, stream->path_name,
+ stream->channel_name);
goto end_unlock;
}
-
end_unlock:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
return ret;
}
-/*
- * Append padding to the file pointed by the file descriptor fd.
- */
-static int write_padding_to_file(int fd, uint32_t size)
-{
- ssize_t ret = 0;
- char *zeros;
-
- if (size == 0) {
- goto end;
- }
-
- zeros = zmalloc(size);
- if (zeros == NULL) {
- PERROR("zmalloc zeros for padding");
- ret = -1;
- goto end;
- }
-
- ret = lttng_write(fd, zeros, size);
- if (ret < size) {
- PERROR("write padding to file");
- }
-
- free(zeros);
-
-end:
- return ret;
-}
-
-/*
- * Close the current index file if it is open, and create a new one.
- *
- * Return 0 on success, -1 on error.
- */
-static
-int create_rotate_index_file(struct relay_stream *stream,
- const char *channel_path)
-{
- int ret;
- uint32_t major, minor;
-
- ASSERT_LOCKED(stream->lock);
-
- /* Put ref on previous index_file. */
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
- }
- major = stream->trace->session->major;
- minor = stream->trace->session->minor;
- if (!stream->trace->index_folder_created) {
- char *index_subpath = NULL;
-
- ret = asprintf(&index_subpath, "%s/%s", channel_path, DEFAULT_INDEX_DIR);
- if (ret < 0) {
- goto end;
- }
-
- ret = lttng_trace_chunk_create_subdirectory(stream->trace_chunk, index_subpath);
- free(index_subpath);
- if (ret) {
- goto end;
- }
- stream->trace->index_folder_created = true;
- }
- stream->index_file = lttng_index_file_create_from_trace_chunk(
- stream->trace_chunk, channel_path, stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor), true);
- if (!stream->index_file) {
- ret = -1;
- goto end;
- }
-
- ret = 0;
-
-end:
- return ret;
-}
-
-static
-int do_rotate_stream_data(struct relay_stream *stream)
-{
- int ret;
-
- DBG("Rotating stream %" PRIu64 " data file",
- stream->stream_handle);
- /* Perform the stream rotation. */
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- NULL, &stream->stream_fd->fd);
- if (ret < 0) {
- ERR("Rotating stream output file");
- goto end;
- }
- stream->tracefile_size_current = 0;
- stream->pos_after_last_complete_data_index = 0;
- stream->data_rotated = true;
-
- if (stream->data_rotated && stream->index_rotated) {
- /* Rotation completed; reset its state. */
- DBG("Rotation completed for stream %" PRIu64,
- stream->stream_handle);
- stream->rotate_at_seq_num = -1ULL;
- stream->data_rotated = false;
- stream->index_rotated = false;
- }
-end:
- return ret;
-}
-
-/*
- * If too much data has been written in a tracefile before we received the
- * rotation command, we have to move the excess data to the new tracefile and
- * perform the rotation. This can happen because the control and data
- * connections are separate, the indexes as well as the commands arrive from
- * the control connection and we have no control over the order so we could be
- * in a situation where too much data has been received on the data connection
- * before the rotation command on the control connection arrives.
- */
-static
-int rotate_truncate_stream(struct relay_stream *stream)
-{
- int ret, new_fd;
- off_t lseek_ret;
- uint64_t diff, pos = 0;
- char buf[FILE_COPY_BUFFER_SIZE];
-
- assert(!stream->is_metadata);
-
- assert(stream->tracefile_size_current >
- stream->pos_after_last_complete_data_index);
- diff = stream->tracefile_size_current -
- stream->pos_after_last_complete_data_index;
-
- /* Create the new tracefile. */
- new_fd = utils_create_stream_file(stream->path_name,
- stream->channel_name,
- stream->tracefile_size, stream->tracefile_count,
- /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
- if (new_fd < 0) {
- ERR("Failed to create new stream file at path %s for channel %s",
- stream->path_name, stream->channel_name);
- ret = -1;
- goto end;
- }
-
- /*
- * Rewind the current tracefile to the position at which the rotation
- * should have occurred.
- */
- lseek_ret = lseek(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index, SEEK_SET);
- if (lseek_ret < 0) {
- PERROR("seek truncate stream");
- ret = -1;
- goto end;
- }
-
- /* Move data from the old file to the new file. */
- while (pos < diff) {
- uint64_t count, bytes_left;
- ssize_t io_ret;
-
- bytes_left = diff - pos;
- count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
- assert(count <= SIZE_MAX);
-
- io_ret = lttng_read(stream->stream_fd->fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, stream->stream_fd->fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- io_ret = lttng_write(new_fd, buf, count);
- if (io_ret < (ssize_t) count) {
- char error_string[256];
-
- snprintf(error_string, sizeof(error_string),
- "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
- count, new_fd, io_ret);
- if (io_ret == -1) {
- PERROR("%s", error_string);
- } else {
- ERR("%s", error_string);
- }
- ret = -1;
- goto end;
- }
-
- pos += count;
- }
-
- /* Truncate the file to get rid of the excess data. */
- ret = ftruncate(stream->stream_fd->fd,
- stream->pos_after_last_complete_data_index);
- if (ret) {
- PERROR("ftruncate");
- goto end;
- }
-
- ret = close(stream->stream_fd->fd);
- if (ret < 0) {
- PERROR("Closing tracefile");
- goto end;
- }
-
- /*
- * Update the offset and FD of all the eventual indexes created by the
- * data connection before the rotation command arrived.
- */
- ret = relay_index_switch_all_files(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end;
- }
-
- stream->stream_fd->fd = new_fd;
- stream->tracefile_size_current = diff;
- stream->pos_after_last_complete_data_index = 0;
- stream->rotate_at_seq_num = -1ULL;
-
- ret = 0;
-
-end:
- return ret;
-}
-
-/*
- * Check if a stream's index file should be rotated (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream_index(struct relay_stream *stream)
-{
- int ret = 0;
-
- if (stream->rotate_at_seq_num == -1ULL) {
- /* No rotation expected. */
- goto end;
- }
-
- if (stream->index_rotated) {
- /* Rotation of the index has already occurred. */
- goto end;
- }
-
- if (stream->prev_index_seq == -1ULL ||
- stream->prev_index_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_index_seq);
- goto end;
- } else if (stream->prev_index_seq != stream->rotate_at_seq_num) {
- /*
- * Unexpected, protocol error/bug.
- * It could mean that we received a rotation position
- * that is in the past.
- */
- ERR("Stream %" PRIu64 " index is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq,
- stream->prev_index_seq);
- ret = -1;
- goto end;
- } else {
- DBG("Rotating stream %" PRIu64 " index file",
- stream->stream_handle);
- ret = create_rotate_index_file(stream, stream->path_name);
- stream->index_rotated = true;
-
- if (stream->data_rotated && stream->index_rotated) {
- /* Rotation completed; reset its state. */
- DBG("Rotation completed for stream %" PRIu64,
- stream->stream_handle);
- stream->rotate_at_seq_num = -1ULL;
- stream->data_rotated = false;
- stream->index_rotated = false;
- }
- }
-
-end:
- return ret;
-}
-
-/*
- * Check if a stream's data file (as opposed to index) should be rotated
- * (for session rotation).
- * Must be called with the stream lock held.
- *
- * Return 0 on success, a negative value on error.
- */
-static
-int try_rotate_stream_data(struct relay_stream *stream)
-{
- int ret = 0;
-
- if (stream->rotate_at_seq_num == -1ULL) {
- /* No rotation expected. */
- goto end;
- }
-
- if (stream->data_rotated) {
- /* Rotation of the data file has already occurred. */
- goto end;
- }
-
- if (stream->prev_data_seq == -1ULL ||
- stream->prev_data_seq < stream->rotate_at_seq_num) {
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq);
- goto end;
- } else if (stream->prev_data_seq > stream->rotate_at_seq_num) {
- /*
- * prev_data_seq is checked here since indexes and rotation
- * commands are serialized with respect to each other.
- */
- DBG("Rotation after too much data has been written in tracefile "
- "for stream %" PRIu64 ", need to truncate before "
- "rotating", stream->stream_handle);
- ret = rotate_truncate_stream(stream);
- if (ret) {
- ERR("Failed to truncate stream");
- goto end;
- }
- } else if (stream->prev_data_seq != stream->rotate_at_seq_num) {
- /*
- * Unexpected, protocol error/bug.
- * It could mean that we received a rotation position
- * that is in the past.
- */
- ERR("Stream %" PRIu64 " data is in an inconsistent state (rotate_at_seq_num = %" PRIu64 ", prev_data_seq = %" PRIu64 ")",
- stream->stream_handle,
- stream->rotate_at_seq_num,
- stream->prev_data_seq);
- ret = -1;
- goto end;
- } else {
- ret = do_rotate_stream_data(stream);
- }
-
-end:
- return ret;
-}
-
/*
* relay_recv_metadata: receive the metadata for the session.
*/
const struct lttng_buffer_view *payload)
{
int ret = 0;
- ssize_t size_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
+ struct lttng_buffer_view packet_view;
if (!session) {
ERR("Metadata sent before version check");
goto end;
}
- pthread_mutex_lock(&metadata_stream->lock);
-
- size_ret = lttng_write(metadata_stream->stream_fd->fd,
- payload->data + sizeof(metadata_payload_header),
- metadata_payload_size);
- if (size_ret < metadata_payload_size) {
- ERR("Relay error writing metadata on file");
+ packet_view = lttng_buffer_view_from_view(payload,
+ sizeof(metadata_payload_header), metadata_payload_size);
+ if (!packet_view.data) {
+ ERR("Invalid metadata packet length announced by header");
ret = -1;
goto end_put;
}
- size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ pthread_mutex_lock(&metadata_stream->lock);
+ ret = stream_write(metadata_stream, &packet_view,
metadata_payload_header.padding_size);
- if (size_ret < (int64_t) metadata_payload_header.padding_size) {
+ pthread_mutex_unlock(&metadata_stream->lock);
+ if (ret){
ret = -1;
goto end_put;
}
-
- metadata_stream->metadata_received +=
- metadata_payload_size + metadata_payload_header.padding_size;
- DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
- metadata_stream->metadata_received);
-
- ret = try_rotate_stream_data(metadata_stream);
- if (ret < 0) {
- goto end_put;
- }
-
end_put:
- pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
end:
return ret;
ssize_t send_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_index index_info;
- struct relay_index *index;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
size_t msg_len;
ret = -1;
goto end;
}
- pthread_mutex_lock(&stream->lock);
-
- /* Live beacon handling */
- if (index_info.packet_size == 0) {
- DBG("Received live beacon for stream %" PRIu64,
- stream->stream_handle);
-
- /*
- * Only flag a stream inactive when it has already
- * received data and no indexes are in flight.
- */
- if (stream->index_received_seqcount > 0
- && stream->indexes_in_flight == 0) {
- stream->beacon_ts_end = index_info.timestamp_end;
- }
- ret = 0;
- goto end_stream_put;
- } else {
- stream->beacon_ts_end = -1ULL;
- }
- if (stream->ctf_stream_id == -1ULL) {
- stream->ctf_stream_id = index_info.stream_id;
- }
- index = relay_index_get_by_id_or_create(stream, index_info.net_seq_num);
- if (!index) {
- ret = -1;
- ERR("relay_index_get_by_id_or_create index NULL");
- goto end_stream_put;
- }
- if (set_index_control_data(index, &index_info, conn)) {
- ERR("set_index_control_data error");
- relay_index_put(index);
- ret = -1;
+ pthread_mutex_lock(&stream->lock);
+ ret = stream_add_index(stream, &index_info);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
goto end_stream_put;
}
- ret = relay_index_try_flush(index);
- if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
- stream->index_received_seqcount++;
- stream->pos_after_last_complete_data_index += index->total_size;
- stream->prev_index_seq = index_info.net_seq_num;
-
- ret = try_rotate_stream_index(stream);
- if (ret < 0) {
- goto end_stream_put;
- }
- } else if (ret > 0) {
- /* no flush. */
- ret = 0;
- } else {
- /*
- * ret < 0
- *
- * relay_index_try_flush is responsible for the self-reference
- * put of the index object on error.
- */
- ERR("relay_index_try_flush error %d", ret);
- ret = -1;
- }
end_stream_put:
- pthread_mutex_unlock(&stream->lock);
stream_put(stream);
-
end:
-
memset(&reply, 0, sizeof(reply));
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
}
/*
- * relay_rotate_session_stream: rotate a stream to a new tracefile for the session
- * rotation feature (not the tracefile rotation feature).
+ * relay_rotate_session_stream: rotate a stream to a new tracefile for the
+ * session rotation feature (not the tracefile rotation feature).
*/
-static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr,
+static int relay_rotate_session_streams(
+ const struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn,
const struct lttng_buffer_view *payload)
{
int ret;
+ uint32_t i;
ssize_t send_ret;
+ enum lttng_error_code reply_code = LTTNG_ERR_UNK;
struct relay_session *session = conn->session;
- struct lttcomm_relayd_rotate_stream stream_info;
- struct lttcomm_relayd_generic_reply reply;
- struct relay_stream *stream;
- size_t header_len;
- size_t path_len;
- struct lttng_buffer_view new_path_view;
+ struct lttcomm_relayd_rotate_streams rotate_streams;
+ struct lttcomm_relayd_generic_reply reply = {};
+ struct relay_stream *stream = NULL;
+ const size_t header_len = sizeof(struct lttcomm_relayd_rotate_streams);
+ struct lttng_trace_chunk *next_trace_chunk = NULL;
+ struct lttng_buffer_view stream_positions;
if (!session || !conn->version_check_done) {
ERR("Trying to rotate a stream before version check");
goto end_no_reply;
}
- header_len = sizeof(struct lttcomm_relayd_rotate_stream);
-
if (payload->size < header_len) {
ERR("Unexpected payload size in \"relay_rotate_session_stream\": expected >= %zu bytes, got %zu bytes",
header_len, payload->size);
goto end_no_reply;
}
- memcpy(&stream_info, payload->data, header_len);
+ memcpy(&rotate_streams, payload->data, header_len);
- /* Convert to host */
- stream_info.pathname_length = be32toh(stream_info.pathname_length);
- stream_info.stream_id = be64toh(stream_info.stream_id);
- stream_info.new_chunk_id = be64toh(stream_info.new_chunk_id);
- stream_info.rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+ /* Convert header to host endianness. */
+ rotate_streams = (typeof(rotate_streams)) {
+ .stream_count = be32toh(rotate_streams.stream_count),
+ .new_chunk_id = (typeof(rotate_streams.new_chunk_id)) {
+ .is_set = !!rotate_streams.new_chunk_id.is_set,
+ .value = be64toh(rotate_streams.new_chunk_id.value),
+ }
+ };
- path_len = stream_info.pathname_length;
- if (payload->size < header_len + path_len) {
- ERR("Unexpected payload size in \"relay_rotate_session_stream\" including path: expected >= %zu bytes, got %zu bytes",
- header_len + path_len, payload->size);
- ret = -1;
- goto end_no_reply;
- }
-
- /* Ensure it fits in local filename length. */
- if (path_len >= LTTNG_PATH_MAX) {
- ret = -ENAMETOOLONG;
- ERR("Length of relay_rotate_session_stream command's path name (%zu bytes) exceeds the maximal allowed length of %i bytes",
- path_len, LTTNG_PATH_MAX);
- goto end;
+ if (rotate_streams.new_chunk_id.is_set) {
+ /*
+ * Retrieve the trace chunk the stream must transition to. As
+ * per the protocol, this chunk should have been created
+ * before this command is received.
+ */
+ next_trace_chunk = sessiond_trace_chunk_registry_get_chunk(
+ sessiond_trace_chunk_registry,
+ session->sessiond_uuid, session->id,
+ rotate_streams.new_chunk_id.value);
+ if (!next_trace_chunk) {
+ char uuid_str[UUID_STR_LEN];
+
+ lttng_uuid_to_str(session->sessiond_uuid, uuid_str);
+ ERR("Unknown next trace chunk in ROTATE_STREAMS command: sessiond_uuid = {%s}, session_id = %" PRIu64
+ ", trace_chunk_id = %" PRIu64,
+ uuid_str, session->id,
+ rotate_streams.new_chunk_id.value);
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
+ ret = -1;
+ goto end;
+ }
}
- new_path_view = lttng_buffer_view_from_view(payload, header_len,
- stream_info.pathname_length);
-
- stream = stream_get_by_id(stream_info.stream_id);
- if (!stream) {
+ stream_positions = lttng_buffer_view_from_view(payload,
+ sizeof(rotate_streams), -1);
+ if (!stream_positions.data ||
+ stream_positions.size <
+ (rotate_streams.stream_count *
+ sizeof(struct lttcomm_relayd_stream_rotation_position))) {
+ reply_code = LTTNG_ERR_INVALID_PROTOCOL;
ret = -1;
goto end;
}
- pthread_mutex_lock(&stream->lock);
-
- /*
- * Update the trace path (just the folder, the stream name does not
- * change).
- */
- free(stream->prev_path_name);
- stream->prev_path_name = stream->path_name;
- stream->path_name = create_output_path(new_path_view.data);
- if (!stream->path_name) {
- ERR("Failed to create a new output path");
- ret = -1;
- goto end_stream_unlock;
- }
- ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG,
- -1, -1);
- if (ret < 0) {
- ERR("relay creating output directory");
- ret = -1;
- goto end_stream_unlock;
- }
+ for (i = 0; i < rotate_streams.stream_count; i++) {
+ struct lttcomm_relayd_stream_rotation_position *position_comm =
+ &((typeof(position_comm)) stream_positions.data)[i];
+ const struct lttcomm_relayd_stream_rotation_position pos = {
+ .stream_id = be64toh(position_comm->stream_id),
+ .rotate_at_seq_num = be64toh(
+ position_comm->rotate_at_seq_num),
+ };
- if (stream->is_metadata) {
- /*
- * Metadata streams have no index; consider its rotation
- * complete.
- */
- stream->index_rotated = true;
- /*
- * The metadata stream is sent only over the control connection
- * so we know we have all the data to perform the stream
- * rotation.
- */
- ret = do_rotate_stream_data(stream);
- } else {
- stream->rotate_at_seq_num = stream_info.rotate_at_seq_num;
- ret = try_rotate_stream_data(stream);
- if (ret < 0) {
- goto end_stream_unlock;
+ stream = stream_get_by_id(pos.stream_id);
+ if (!stream) {
+ reply_code = LTTNG_ERR_INVALID;
+ ret = -1;
+ goto end;
}
- ret = try_rotate_stream_index(stream);
- if (ret < 0) {
- goto end_stream_unlock;
+ pthread_mutex_lock(&stream->lock);
+ ret = stream_set_pending_rotation(stream, next_trace_chunk,
+ pos.rotate_at_seq_num);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
+ reply_code = LTTNG_ERR_FILE_CREATION_ERROR;
+ goto end;
}
+
+ stream_put(stream);
+ stream = NULL;
}
-end_stream_unlock:
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
+ reply_code = LTTNG_OK;
end:
- memset(&reply, 0, sizeof(reply));
- if (ret < 0) {
- reply.ret_code = htobe32(LTTNG_ERR_UNK);
- } else {
- reply.ret_code = htobe32(LTTNG_OK);
+ if (stream) {
+ stream_put(stream);
}
+
+ reply.ret_code = htobe32((uint32_t) reply_code);
send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
if (send_ret < (ssize_t) sizeof(reply)) {
}
end_no_reply:
+ lttng_trace_chunk_put(next_trace_chunk);
return ret;
}
pthread_mutex_lock(&conn->session->lock);
lttng_trace_chunk_put(conn->session->current_trace_chunk);
conn->session->current_trace_chunk = published_chunk;
- pthread_mutex_unlock(&conn->session->lock);
published_chunk = NULL;
+ pthread_mutex_unlock(&conn->session->lock);
end:
reply.ret_code = htobe32((uint32_t) reply_code);
enum lttng_error_code reply_code = LTTNG_OK;
enum lttng_trace_chunk_status chunk_status;
uint64_t chunk_id;
- LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command;
+ LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
time_t close_timestamp;
if (!session || !conn->version_check_done) {
}
}
+ pthread_mutex_lock(&session->lock);
+ if (session->current_trace_chunk == chunk) {
+ /*
+ * After a trace chunk close command, no new streams
+ * referencing the chunk may be created. Hence, on the
+ * event that no new trace chunk have been created for
+ * the session, the reference to the current trace chunk
+ * is released in order to allow it to be reclaimed when
+ * the last stream releases its reference to it.
+ */
+ lttng_trace_chunk_put(session->current_trace_chunk);
+ session->current_trace_chunk = NULL;
+ }
+ pthread_mutex_unlock(&session->lock);
+
end:
reply.ret_code = htobe32((uint32_t) reply_code);
send_ret = conn->sock->ops->sendmsg(conn->sock,
return ret;
}
+/*
+ * relay_trace_chunk_exists: check if a trace chunk exists
+ */
+static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn,
+ const struct lttng_buffer_view *payload)
+{
+ int ret = 0;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_trace_chunk_exists *msg;
+ struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
+ struct lttng_buffer_view header_view;
+ struct lttng_trace_chunk *chunk = NULL;
+ uint64_t chunk_id;
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to close a trace chunk before version check");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ if (session->major == 2 && session->minor < 11) {
+ ERR("Chunk close command is unsupported before 2.11");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
+ if (!header_view.data) {
+ ERR("Failed to receive payload of chunk close command");
+ ret = -1;
+ goto end_no_reply;
+ }
+
+ /* Convert to host endianness. */
+ msg = (typeof(msg)) header_view.data;
+ chunk_id = be64toh(msg->chunk_id);
+
+ chunk = sessiond_trace_chunk_registry_get_chunk(
+ sessiond_trace_chunk_registry,
+ conn->session->sessiond_uuid,
+ conn->session->id,
+ chunk_id);
+
+ reply = (typeof(reply)) {
+ .generic.ret_code = htobe32((uint32_t) LTTNG_OK),
+ .trace_chunk_exists = !!chunk,
+ };
+ send_ret = conn->sock->ops->sendmsg(conn->sock,
+ &reply, sizeof(reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+end_no_reply:
+ lttng_trace_chunk_put(chunk);
+ return ret;
+}
+
#define DBG_CMD(cmd_name, conn) \
DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
DBG_CMD("RELAYD_RESET_METADATA", conn);
ret = relay_reset_metadata(header, conn, payload);
break;
- case RELAYD_ROTATE_STREAM:
- DBG_CMD("RELAYD_ROTATE_STREAM", conn);
- ret = relay_rotate_session_stream(header, conn, payload);
+ case RELAYD_ROTATE_STREAMS:
+ DBG_CMD("RELAYD_ROTATE_STREAMS", conn);
+ ret = relay_rotate_session_streams(header, conn, payload);
break;
case RELAYD_CREATE_TRACE_CHUNK:
DBG_CMD("RELAYD_CREATE_TRACE_CHUNK", conn);
DBG_CMD("RELAYD_CLOSE_TRACE_CHUNK", conn);
ret = relay_close_trace_chunk(header, conn, payload);
break;
+ case RELAYD_TRACE_CHUNK_EXISTS:
+ DBG_CMD("RELAYD_TRACE_CHUNK_EXISTS", conn);
+ ret = relay_trace_chunk_exists(header, conn, payload);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", header->cmd);
return status;
}
-/*
- * Handle index for a data stream.
- *
- * Called with the stream lock held.
- *
- * Return 0 on success else a negative value.
- */
-static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
- bool rotate_index, bool *flushed, uint64_t total_size)
-{
- int ret = 0;
- uint64_t data_offset;
- struct relay_index *index;
-
- /* Get data offset because we are about to update the index. */
- data_offset = htobe64(stream->tracefile_size_current);
-
- DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
- stream->stream_handle, net_seq_num, stream->tracefile_size_current);
-
- /*
- * Lookup for an existing index for that stream id/sequence
- * number. If it exists, the control thread has already received the
- * data for it, thus we need to write it to disk.
- */
- index = relay_index_get_by_id_or_create(stream, net_seq_num);
- if (!index) {
- ret = -1;
- goto end;
- }
-
- if (rotate_index || !stream->index_file) {
- const char *stream_path;
-
- /*
- * The data connection creates the stream's first index file.
- *
- * This can happen _after_ a ROTATE_STREAM command. In
- * other words, the data of the first packet of this stream
- * can be received after a ROTATE_STREAM command.
- *
- * The ROTATE_STREAM command changes the stream's path_name
- * to point to the "next" chunk. If a rotation is pending for
- * this stream, as indicated by "rotate_at_seq_num != -1ULL",
- * it means that we are still receiving data that belongs in the
- * stream's former path.
- *
- * In this very specific case, we must ensure that the index
- * file is created in the streams's former path,
- * "prev_path_name".
- *
- * All other rotations beyond the first one are not affected
- * by this problem since the actual rotation operation creates
- * the new chunk's index file.
- */
- stream_path = stream->rotate_at_seq_num == -1ULL ?
- stream->path_name:
- stream->prev_path_name;
-
- ret = create_rotate_index_file(stream, stream_path);
- if (ret < 0) {
- ERR("Failed to rotate index");
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
- goto end;
- }
- }
-
- if (relay_index_set_file(index, stream->index_file, data_offset)) {
- ret = -1;
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
- goto end;
- }
-
- ret = relay_index_try_flush(index);
- if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
- stream->index_received_seqcount++;
- *flushed = true;
- } else if (ret > 0) {
- index->total_size = total_size;
- /* No flush. */
- ret = 0;
- } else {
- /*
- * ret < 0
- *
- * relay_index_try_flush is responsible for the self-reference
- * put of the index object on error.
- */
- ERR("relay_index_try_flush error %d", ret);
- ret = -1;
- }
-end:
- return ret;
-}
-
static enum relay_connection_status relay_process_data_receive_header(
struct relay_connection *conn)
{
}
pthread_mutex_lock(&stream->lock);
-
- /* Check if a rotation is needed. */
- if (stream->tracefile_size > 0 &&
- (stream->tracefile_size_current + header.data_size) >
- stream->tracefile_size) {
- uint64_t old_id, new_id;
-
- old_id = tracefile_array_get_file_index_head(stream->tfa);
- tracefile_array_file_rotate(stream->tfa);
-
- /* new_id is updated by utils_rotate_stream_file. */
- new_id = old_id;
-
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- &new_id, &stream->stream_fd->fd);
- if (ret < 0) {
- ERR("Failed to rotate stream output file");
- status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
- }
-
- /*
- * Reset current size because we just performed a stream
- * rotation.
- */
- stream->tracefile_size_current = 0;
- conn->protocol.data.state.receive_payload.rotate_index = true;
+ /* Prepare stream for the reception of a new packet. */
+ ret = stream_init_packet(stream, header.data_size,
+ &conn->protocol.data.state.receive_payload.rotate_index);
+ pthread_mutex_unlock(&stream->lock);
+ if (ret) {
+ ERR("Failed to rotate stream output file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
}
end_stream_unlock:
- pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
return status;
* - the on-stack data buffer
*/
while (left_to_receive > 0 && !partial_recv) {
- ssize_t write_ret;
size_t recv_size = min(left_to_receive, chunk_size);
+ struct lttng_buffer_view packet_chunk;
ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
recv_size, MSG_DONTWAIT);
* consumed.
*/
partial_recv = true;
+ recv_size = ret;
}
- recv_size = ret;
+ packet_chunk = lttng_buffer_view_init(data_buffer,
+ 0, recv_size);
+ assert(packet_chunk.data);
- /* Write data to stream output fd. */
- write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
- recv_size);
- if (write_ret < (ssize_t) recv_size) {
+ ret = stream_write(stream, &packet_chunk, 0);
+ if (ret) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
left_to_receive -= recv_size;
state->received += recv_size;
state->left_to_receive = left_to_receive;
-
- DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
- write_ret, stream->stream_handle);
}
if (state->left_to_receive > 0) {
goto end_stream_unlock;
}
- ret = write_padding_to_file(stream->stream_fd->fd,
- state->header.padding_size);
- if ((int64_t) ret < (int64_t) state->header.padding_size) {
- ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
- stream->stream_handle,
- state->header.net_seq_num, ret);
+ ret = stream_write(stream, NULL, state->header.padding_size);
+ if (ret) {
status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
-
if (session_streams_have_index(session)) {
- ret = handle_index_data(stream, state->header.net_seq_num,
- state->rotate_index, &index_flushed, state->header.data_size + state->header.padding_size);
+ ret = stream_update_index(stream, state->header.net_seq_num,
+ state->rotate_index, &index_flushed,
+ state->header.data_size + state->header.padding_size);
if (ret < 0) {
- ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ ERR("Failed to update index: stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
}
}
- stream->tracefile_size_current += state->header.data_size +
- state->header.padding_size;
-
if (stream->prev_data_seq == -1ULL) {
new_stream = true;
}
- if (index_flushed) {
- stream->pos_after_last_complete_data_index =
- stream->tracefile_size_current;
- stream->prev_index_seq = state->header.net_seq_num;
- ret = try_rotate_stream_index(stream);
- if (ret < 0) {
- goto end_stream_unlock;
- }
- }
- stream->prev_data_seq = state->header.net_seq_num;
+ ret = stream_complete_packet(stream, state->header.data_size +
+ state->header.padding_size, state->header.net_seq_num,
+ index_flushed);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
/*
* Resetting the protocol state (to RECEIVE_HEADER) will trash the
connection_reset_protocol_state(conn);
state = NULL;
- ret = try_rotate_stream_data(stream);
- if (ret < 0) {
- status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_stream_unlock;
- }
-
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
ret = session_delete(session);
assert(!ret);
lttng_trace_chunk_put(session->current_trace_chunk);
+ session->current_trace_chunk = NULL;
ret = sessiond_trace_chunk_registry_session_destroyed(
sessiond_trace_chunk_registry, session->sessiond_uuid);
assert(!ret);
- lttng_trace_chunk_put(session->current_trace_chunk);
- session->current_trace_chunk = NULL;
call_rcu(&session->rcu_node, rcu_destroy_session);
}
struct trace_chunk_registry_ht_element *element =
container_of(node, typeof(*element), rcu_node);
- lttng_trace_chunk_registry_destroy(element->trace_chunk_registry);
free(element);
}
element->sessiond_trace_chunk_registry = NULL;
}
+ lttng_trace_chunk_registry_destroy(element->trace_chunk_registry);
/* Defered reclaim of the object */
call_rcu(&element->rcu_node, trace_chunk_registry_ht_element_free);
}
published_chunk = lttng_trace_chunk_registry_publish_chunk(
element->trace_chunk_registry, session_id, new_chunk);
+ /*
+ * At this point, two references to the published chunks exist. One
+ * is taken by the registry while the other is being returned to the
+ * caller. In the use case of the relay daemon, the reference held
+ * by the registry itself is undesirable.
+ *
+ * We want the trace chunk to be removed from the registry as soon
+ * as it is not being used by the relay daemon (through a session
+ * or a stream). This differs from the behaviour of the consumer
+ * daemon which relies on an explicit command from the session
+ * daemon to release the registry's reference.
+ */
+ lttng_trace_chunk_put(published_chunk);
end:
trace_chunk_registry_ht_element_put(element);
return published_chunk;
* Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
* David Goulet <dgoulet@efficios.com>
* 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
#include <common/common.h>
#include <common/utils.h>
#include <common/defaults.h>
+#include <common/sessiond-comm/relayd.h>
#include <urcu/rculist.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
+#define FILE_IO_STACK_BUFFER_SIZE 65536
+
/* Should be called with RCU read-side lock held. */
bool stream_get(struct relay_stream *stream)
{
return stream;
}
-static int stream_create_data_output_file(struct relay_stream *stream)
+static void stream_complete_rotation(struct relay_stream *stream)
+{
+ DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+ lttng_trace_chunk_put(stream->trace_chunk);
+ stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
+ stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
+}
+
+/*
+ * If too much data has been written in a tracefile before we received the
+ * rotation command, we have to move the excess data to the new tracefile and
+ * perform the rotation. This can happen because the control and data
+ * connections are separate, the indexes as well as the commands arrive from
+ * the control connection and we have no control over the order so we could be
+ * in a situation where too much data has been received on the data connection
+ * before the rotation command on the control connection arrives.
+ */
+static int rotate_truncate_stream(struct relay_stream *stream)
+{
+ int ret, new_fd;
+ off_t lseek_ret;
+ uint64_t diff, pos = 0;
+ char buf[FILE_IO_STACK_BUFFER_SIZE];
+
+ assert(!stream->is_metadata);
+
+ assert(stream->tracefile_size_current >
+ stream->pos_after_last_complete_data_index);
+ diff = stream->tracefile_size_current -
+ stream->pos_after_last_complete_data_index;
+
+ /* Create the new tracefile. */
+ new_fd = utils_create_stream_file(stream->path_name,
+ stream->channel_name,
+ stream->tracefile_size, stream->tracefile_count,
+ /* uid */ -1, /* gid */ -1, /* suffix */ NULL);
+ if (new_fd < 0) {
+ ERR("Failed to create new stream file at path %s for channel %s",
+ stream->path_name, stream->channel_name);
+ ret = -1;
+ goto end;
+ }
+
+ /*
+ * Rewind the current tracefile to the position at which the rotation
+ * should have occurred.
+ */
+ lseek_ret = lseek(stream->stream_fd->fd,
+ stream->pos_after_last_complete_data_index, SEEK_SET);
+ if (lseek_ret < 0) {
+ PERROR("seek truncate stream");
+ ret = -1;
+ goto end;
+ }
+
+ /* Move data from the old file to the new file. */
+ while (pos < diff) {
+ uint64_t count, bytes_left;
+ ssize_t io_ret;
+
+ bytes_left = diff - pos;
+ count = bytes_left > sizeof(buf) ? sizeof(buf) : bytes_left;
+ assert(count <= SIZE_MAX);
+
+ io_ret = lttng_read(stream->stream_fd->fd, buf, count);
+ if (io_ret < (ssize_t) count) {
+ char error_string[256];
+
+ snprintf(error_string, sizeof(error_string),
+ "Failed to read %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+ count, stream->stream_fd->fd, io_ret);
+ if (io_ret == -1) {
+ PERROR("%s", error_string);
+ } else {
+ ERR("%s", error_string);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ io_ret = lttng_write(new_fd, buf, count);
+ if (io_ret < (ssize_t) count) {
+ char error_string[256];
+
+ snprintf(error_string, sizeof(error_string),
+ "Failed to write %" PRIu64 " bytes from fd %i in rotate_truncate_stream(), returned %zi",
+ count, new_fd, io_ret);
+ if (io_ret == -1) {
+ PERROR("%s", error_string);
+ } else {
+ ERR("%s", error_string);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ pos += count;
+ }
+
+ /* Truncate the file to get rid of the excess data. */
+ ret = ftruncate(stream->stream_fd->fd,
+ stream->pos_after_last_complete_data_index);
+ if (ret) {
+ PERROR("ftruncate");
+ goto end;
+ }
+
+ ret = close(stream->stream_fd->fd);
+ if (ret < 0) {
+ PERROR("Closing tracefile");
+ goto end;
+ }
+
+ /*
+ * Update the offset and FD of all the eventual indexes created by the
+ * data connection before the rotation command arrived.
+ */
+ ret = relay_index_switch_all_files(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end;
+ }
+
+ stream->stream_fd->fd = new_fd;
+ stream->tracefile_size_current = diff;
+ stream->pos_after_last_complete_data_index = 0;
+ stream_complete_rotation(stream);
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+static int stream_create_data_output_file_from_trace_chunk(
+ struct relay_stream *stream,
+ struct lttng_trace_chunk *trace_chunk,
+ bool force_unlink,
+ struct stream_fd **out_stream_fd)
{
int ret, fd;
+ char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
- char stream_path[LTTNG_PATH_MAX];
ASSERT_LOCKED(stream->lock);
assert(stream->trace_chunk);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
- }
-
ret = utils_stream_file_path(stream->path_name, stream->channel_name,
- stream->tracefile_size, stream->tracefile_count, NULL,
- stream_path, sizeof(stream_path));
+ stream->tracefile_size, stream->tracefile_current_index,
+ NULL, stream_path, sizeof(stream_path));
if (ret < 0) {
goto end;
}
- DBG("Opening stream output file \"%s\"", stream_path);
+ if (stream->tracefile_wrapped_around || force_unlink) {
+ /*
+ * The on-disk ring-buffer has wrapped around.
+ * Newly created stream files will replace existing files. Since
+ * live clients may be consuming existing files, the file about
+ * to be replaced is unlinked in order to not overwrite its
+ * content.
+ */
+ status = lttng_trace_chunk_unlink_file(trace_chunk,
+ stream_path);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
+ stream_path);
+ /*
+ * Don't abort if the file doesn't exist, it is
+ * unexpected, but should not be a fatal error.
+ */
+ if (errno != ENOENT) {
+ ret = -1;
+ goto end;
+ }
+ }
+ }
+
status = lttng_trace_chunk_open_file(
- stream->trace_chunk, stream_path, flags, mode, &fd);
+ trace_chunk, stream_path, flags, mode, &fd);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
- stream->stream_fd = stream_fd_create(fd);
- if (!stream->stream_fd) {
+ *out_stream_fd = stream_fd_create(fd);
+ if (!*out_stream_fd) {
if (close(ret)) {
PERROR("Error closing stream file descriptor %d", ret);
}
return ret;
}
+static int stream_rotate_data_file(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ DBG("Rotating stream %" PRIu64 " data file",
+ stream->stream_handle);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+
+ stream->tracefile_wrapped_around = false;
+ stream->tracefile_current_index = 0;
+
+ if (stream->ongoing_rotation.value.next_trace_chunk) {
+ struct stream_fd *new_stream_fd = NULL;
+ enum lttng_trace_chunk_status chunk_status;
+
+ chunk_status = lttng_trace_chunk_create_subdirectory(
+ stream->ongoing_rotation.value.next_trace_chunk,
+ stream->path_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end;
+ }
+
+ /* Rotate the data file. */
+ ret = stream_create_data_output_file_from_trace_chunk(stream,
+ stream->ongoing_rotation.value.next_trace_chunk,
+ false, &new_stream_fd);
+ stream->stream_fd = new_stream_fd;
+ if (ret < 0) {
+ ERR("Failed to rotate stream data file");
+ goto end;
+ }
+ }
+ stream->tracefile_size_current = 0;
+ stream->pos_after_last_complete_data_index = 0;
+ stream->ongoing_rotation.value.data_rotated = true;
+
+ if (stream->ongoing_rotation.value.index_rotated) {
+ /* Rotation completed; reset its state. */
+ stream_complete_rotation(stream);
+ }
+end:
+ return ret;
+}
+
+/*
+ * Check if a stream's data file (as opposed to index) should be rotated
+ * (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static int try_rotate_stream_data(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ if (caa_likely(!stream->ongoing_rotation.is_set)) {
+ /* No rotation expected. */
+ goto end;
+ }
+
+ if (stream->ongoing_rotation.value.data_rotated) {
+ /* Rotation of the data file has already occurred. */
+ goto end;
+ }
+
+ if (stream->prev_data_seq == -1ULL ||
+ stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+ /*
+ * The next packet that will be written is not part of the next
+ * chunk yet.
+ */
+ DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+ ", prev_data_seq = %" PRIu64 ")",
+ stream->stream_handle,
+ stream->ongoing_rotation.value.seq_num,
+ stream->prev_data_seq);
+ goto end;
+ } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+ /*
+ * prev_data_seq is checked here since indexes and rotation
+ * commands are serialized with respect to each other.
+ */
+ DBG("Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating", stream->stream_handle);
+ ret = rotate_truncate_stream(stream);
+ if (ret) {
+ ERR("Failed to truncate stream");
+ goto end;
+ }
+ } else {
+ ret = stream_rotate_data_file(stream);
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Close the current index file if it is open, and create a new one.
+ *
+ * Return 0 on success, -1 on error.
+ */
+static int create_index_file(struct relay_stream *stream,
+ struct lttng_trace_chunk *chunk)
+{
+ int ret;
+ uint32_t major, minor;
+ char *index_subpath = NULL;
+
+ ASSERT_LOCKED(stream->lock);
+
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+
+ if (!chunk) {
+ ret = 0;
+ goto end;
+ }
+ ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
+ DEFAULT_INDEX_DIR);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = lttng_trace_chunk_create_subdirectory(chunk,
+ index_subpath);
+ free(index_subpath);
+ if (ret) {
+ goto end;
+ }
+ stream->index_file = lttng_index_file_create_from_trace_chunk(
+ chunk, stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_current_index,
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor), true);
+ if (!stream->index_file) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if a stream's index file should be rotated (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static int try_rotate_stream_index(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ if (!stream->ongoing_rotation.is_set) {
+ /* No rotation expected. */
+ goto end;
+ }
+
+ if (stream->ongoing_rotation.value.index_rotated) {
+ /* Rotation of the index has already occurred. */
+ goto end;
+ }
+
+ if (stream->prev_index_seq == -1ULL ||
+ stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+ DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ stream->stream_handle,
+ stream->ongoing_rotation.value.seq_num,
+ stream->prev_index_seq);
+ goto end;
+ } else {
+ /* The next index belongs to the new trace chunk; rotate. */
+ assert(stream->prev_index_seq + 1 ==
+ stream->ongoing_rotation.value.seq_num);
+ DBG("Rotating stream %" PRIu64 " index file",
+ stream->stream_handle);
+ ret = create_index_file(stream,
+ stream->ongoing_rotation.value.next_trace_chunk);
+ stream->ongoing_rotation.value.index_rotated = true;
+
+ if (stream->ongoing_rotation.value.data_rotated &&
+ stream->ongoing_rotation.value.index_rotated) {
+ /* Rotation completed; reset its state. */
+ DBG("Rotation completed for stream %" PRIu64,
+ stream->stream_handle);
+ stream_complete_rotation(stream);
+ }
+ }
+
+end:
+ return ret;
+}
+
static int stream_set_trace_chunk(struct relay_stream *stream,
struct lttng_trace_chunk *chunk)
{
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
+ struct stream_fd *new_stream_fd = NULL;
- pthread_mutex_lock(&stream->lock);
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
acquired_reference = lttng_trace_chunk_get(chunk);
assert(acquired_reference);
stream->trace_chunk = chunk;
- ret = stream_create_data_output_file(stream);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
+ false, &new_stream_fd);
+ stream->stream_fd = new_stream_fd;
end:
- pthread_mutex_unlock(&stream->lock);
return ret;
}
stream->tracefile_size = tracefile_size;
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
- stream->prev_path_name = NULL;
stream->channel_name = channel_name;
- stream->rotate_at_seq_num = -1ULL;
stream->beacon_ts_end = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
goto end;
}
+ pthread_mutex_lock(&stream->lock);
ret = stream_set_trace_chunk(stream, current_trace_chunk);
+ pthread_mutex_unlock(&stream->lock);
if (ret) {
ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
trace->session->session_name,
tracefile_array_destroy(stream->tfa);
}
free(stream->path_name);
- free(stream->prev_path_name);
free(stream->channel_name);
free(stream);
}
ctf_trace_put(stream->trace);
stream->trace = NULL;
}
+ stream_complete_rotation(stream);
lttng_trace_chunk_put(stream->trace_chunk);
stream->trace_chunk = NULL;
void stream_put(struct relay_stream *stream)
{
- DBG("stream put for stream id %" PRIu64, stream->stream_handle);
rcu_read_lock();
assert(stream->ref.refcount != 0);
/*
* Wait until we have processed all the stream packets before
* actually putting our last stream reference.
*/
- DBG("stream put stream id %" PRIu64 " refcount %d",
- stream->stream_handle,
- (int) stream->ref.refcount);
urcu_ref_put(&stream->ref, stream_release);
rcu_read_unlock();
}
+int stream_set_pending_rotation(struct relay_stream *stream,
+ struct lttng_trace_chunk *next_trace_chunk,
+ uint64_t rotation_sequence_number)
+{
+ int ret = 0;
+ const struct relay_stream_rotation rotation = {
+ .seq_num = rotation_sequence_number,
+ .next_trace_chunk = next_trace_chunk,
+ };
+
+ if (stream->ongoing_rotation.is_set) {
+ ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
+ ret = -1;
+ goto end;
+ }
+
+ if (next_trace_chunk) {
+ const bool reference_acquired =
+ lttng_trace_chunk_get(next_trace_chunk);
+
+ assert(reference_acquired);
+ }
+ LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
+
+ DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+ stream->stream_handle, rotation_sequence_number);
+ if (stream->is_metadata) {
+ /*
+ * A metadata stream has no index; consider it already rotated.
+ */
+ stream->ongoing_rotation.value.index_rotated = true;
+ ret = stream_rotate_data_file(stream);
+ } else {
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+end:
+ return ret;
+}
+
void try_stream_close(struct relay_stream *stream)
{
bool session_aborted;
stream_put(stream);
}
+int stream_init_packet(struct relay_stream *stream, size_t packet_size,
+ bool *file_rotated)
+{
+ int ret = 0;
+
+ ASSERT_LOCKED(stream->lock);
+ if (caa_likely(stream->tracefile_size == 0)) {
+ /* No size limit set; nothing to check. */
+ goto end;
+ }
+
+ /*
+ * Check if writing the new packet would exceed the maximal file size.
+ */
+ if (caa_unlikely((stream->tracefile_size_current + packet_size) >
+ stream->tracefile_size)) {
+ const uint64_t new_file_index =
+ (stream->tracefile_current_index + 1) %
+ stream->tracefile_count;
+
+ if (new_file_index < stream->tracefile_current_index) {
+ stream->tracefile_wrapped_around = true;
+ }
+ DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
+ ", current_file_size = %" PRIu64
+ ", packet_size = %" PRIu64 ", current_file_index = %" PRIu64
+ " new_file_index = %" PRIu64,
+ stream->stream_handle,
+ stream->tracefile_size_current, packet_size,
+ stream->tracefile_current_index, new_file_index);
+ tracefile_array_file_rotate(stream->tfa);
+ stream->tracefile_current_index = new_file_index;
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+ ret = stream_create_data_output_file_from_trace_chunk(stream,
+ stream->trace_chunk, false, &stream->stream_fd);
+ if (ret) {
+ ERR("Failed to perform trace file rotation of stream %" PRIu64,
+ stream->stream_handle);
+ goto end;
+ }
+
+ /*
+ * Reset current size because we just performed a stream
+ * rotation.
+ */
+ stream->tracefile_size_current = 0;
+ *file_rotated = true;
+ } else {
+ *file_rotated = false;
+ }
+end:
+ return ret;
+}
+
+/* Note that the packet is not necessarily complete. */
+int stream_write(struct relay_stream *stream,
+ const struct lttng_buffer_view *packet, size_t padding_len)
+{
+ int ret = 0;
+ ssize_t write_ret;
+ size_t padding_to_write = padding_len;
+ char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
+
+ ASSERT_LOCKED(stream->lock);
+ memset(padding_buffer, 0,
+ min(sizeof(padding_buffer), padding_to_write));
+
+ if (packet) {
+ write_ret = lttng_write(stream->stream_fd->fd,
+ packet->data, packet->size);
+ if (write_ret != packet->size) {
+ PERROR("Failed to write to stream file of %sstream %" PRIu64,
+ stream->is_metadata ? "metadata " : "",
+ stream->stream_handle);
+ ret = -1;
+ goto end;
+ }
+ }
+
+ while (padding_to_write > 0) {
+ const size_t padding_to_write_this_pass =
+ min(padding_to_write, sizeof(padding_buffer));
+
+ write_ret = lttng_write(stream->stream_fd->fd,
+ padding_buffer, padding_to_write_this_pass);
+ if (write_ret != padding_to_write_this_pass) {
+ PERROR("Failed to write padding to file of %sstream %" PRIu64,
+ stream->is_metadata ? "metadata " : "",
+ stream->stream_handle);
+ ret = -1;
+ goto end;
+ }
+ padding_to_write -= padding_to_write_this_pass;
+ }
+
+ if (stream->is_metadata) {
+ stream->metadata_received += packet->size + padding_len;
+ }
+
+ DBG("Wrote to %sstream %" PRIu64 ": data_length = %" PRIu64 ", padding_length = %" PRIu64,
+ stream->is_metadata ? "metadata " : "",
+ stream->stream_handle,
+ packet ? packet->size : 0, padding_len);
+end:
+ return ret;
+}
+
+/*
+ * Update index after receiving a packet for a data stream.
+ *
+ * Called with the stream lock held.
+ *
+ * Return 0 on success else a negative value.
+ */
+int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
+ bool rotate_index, bool *flushed, uint64_t total_size)
+{
+ int ret = 0;
+ uint64_t data_offset;
+ struct relay_index *index;
+
+ ASSERT_LOCKED(stream->lock);
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+ stream->stream_handle, net_seq_num, stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence
+ * number. If it exists, the control thread has already received the
+ * data for it, thus we need to write it to disk.
+ */
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto end;
+ }
+
+ if (rotate_index || !stream->index_file) {
+ ret = create_index_file(stream, stream->trace_chunk);
+ if (ret) {
+ ERR("Failed to create index file for stream %" PRIu64,
+ stream->stream_handle);
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ index = NULL;
+ goto end;
+ }
+ }
+
+ if (relay_index_set_file(index, stream->index_file, data_offset)) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ index = NULL;
+ goto end;
+ }
+
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
+ *flushed = true;
+ } else if (ret > 0) {
+ index->total_size = total_size;
+ /* No flush. */
+ ret = 0;
+ } else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
+ ret = -1;
+ }
+end:
+ return ret;
+}
+
+int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
+ uint64_t sequence_number, bool index_flushed)
+{
+ int ret = 0;
+
+ ASSERT_LOCKED(stream->lock);
+
+ stream->tracefile_size_current += packet_total_size;
+ if (index_flushed) {
+ stream->pos_after_last_complete_data_index =
+ stream->tracefile_size_current;
+ stream->prev_index_seq = sequence_number;
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end;
+ }
+ }
+
+ stream->prev_data_seq = sequence_number;
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
+end:
+ return ret;
+}
+
+int stream_add_index(struct relay_stream *stream,
+ const struct lttcomm_relayd_index *index_info)
+{
+ int ret = 0;
+ struct relay_index *index;
+
+ ASSERT_LOCKED(stream->lock);
+
+ /* Live beacon handling */
+ if (index_info->packet_size == 0) {
+ DBG("Received live beacon for stream %" PRIu64,
+ stream->stream_handle);
+
+ /*
+ * Only flag a stream inactive when it has already
+ * received data and no indexes are in flight.
+ */
+ if (stream->index_received_seqcount > 0
+ && stream->indexes_in_flight == 0) {
+ stream->beacon_ts_end = index_info->timestamp_end;
+ }
+ ret = 0;
+ goto end;
+ } else {
+ stream->beacon_ts_end = -1ULL;
+ }
+
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = index_info->stream_id;
+ }
+
+ index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
+ if (!index) {
+ ret = -1;
+ ERR("Failed to get or create index %" PRIu64,
+ index_info->net_seq_num);
+ goto end;
+ }
+ if (relay_index_set_control_data(index, index_info,
+ stream->trace->session->minor)) {
+ ERR("set_index_control_data error");
+ relay_index_put(index);
+ ret = -1;
+ goto end;
+ }
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
+ stream->pos_after_last_complete_data_index += index->total_size;
+ stream->prev_index_seq = index_info->net_seq_num;
+
+ ret = try_rotate_stream_index(stream);
+ if (ret < 0) {
+ goto end;
+ }
+ } else if (ret > 0) {
+ /* no flush. */
+ ret = 0;
+ } else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
+ ret = -1;
+ }
+end:
+ return ret;
+}
+
static void print_stream_indexes(struct relay_stream *stream)
{
struct lttng_ht_iter iter;
rcu_read_unlock();
}
+int stream_reset_file(struct relay_stream *stream)
+{
+ ASSERT_LOCKED(stream->lock);
+
+ if (stream->stream_fd) {
+ stream_fd_put(stream->stream_fd);
+ stream->stream_fd = NULL;
+ }
+
+ stream->tracefile_size_current = 0;
+ stream->prev_data_seq = 0;
+ stream->prev_index_seq = 0;
+ /* Note that this does not reset the tracefile array. */
+ stream->tracefile_current_index = 0;
+ stream->pos_after_last_complete_data_index = 0;
+
+ return stream_create_data_output_file_from_trace_chunk(stream,
+ stream->trace_chunk, true, &stream->stream_fd);
+}
+
void print_relay_streams(void)
{
struct lttng_ht_iter iter;
#include <common/hashtable/hashtable.h>
#include <common/trace-chunk.h>
+#include <common/optional.h>
+#include <common/buffer-view.h>
#include "session.h"
#include "stream-fd.h"
#include "tracefile-array.h"
+struct lttcomm_relayd_index;
+
+struct relay_stream_rotation {
+ /*
+ * Indicates if the stream's data and index have been rotated. A
+ * rotation is considered completed when both rotations have occurred.
+ */
+ bool data_rotated;
+ bool index_rotated;
+ /*
+ * Sequence number of the first packet of the new trace chunk to which
+ * the stream is rotating.
+ */
+ uint64_t seq_num;
+ struct lttng_trace_chunk *next_trace_chunk;
+};
+
/*
* Represents a stream in the relay
*/
struct lttng_index_file *index_file;
char *path_name;
- /*
- * prev_path_name is only used for session rotation support.
- * It is essentially used to work around the fact that index
- * files are always created from the 'data' connection.
- *
- * Hence, it is possible to receive a ROTATE_STREAM command
- * which affects the stream's path_name before the creation of
- * an index file. In this situation, the index file of the
- * 'previous' chunk would be created in the new destination folder.
- *
- * It would then be unlinked when the actual index of the new chunk
- * is created.
- */
- char *prev_path_name;
char *channel_name;
/* On-disk circular buffer of tracefiles. */
uint64_t tracefile_size;
uint64_t tracefile_size_current;
+ /* Max number of trace files for this stream. */
uint64_t tracefile_count;
+ /*
+ * Index of the currently active file for this stream's on-disk
+ * ring buffer.
+ */
+ uint64_t tracefile_current_index;
+ /*
+ * Indicates that the on-disk buffer has wrapped around. Stream
+ * files shall be unlinked before being opened after this has occurred.
+ */
+ bool tracefile_wrapped_around;
/*
* Position in the tracefile where we have the full index also on disk.
*/
bool in_recv_list;
struct cds_list_head recv_node;
- bool published; /* Protected by session lock. */
+ /* Protected by session lock. */
+ bool published;
/*
* Node of stream within global stream hash table.
*/
bool in_stream_ht; /* is stream in stream hash table. */
struct rcu_head rcu_node; /* For call_rcu teardown. */
/*
- * When we have written the data and index corresponding to this
- * seq_num, rotate the tracefile (session rotation). The path_name is
- * already up-to-date.
- * This is set to -1ULL when no rotation is pending.
- *
- * Always access with stream lock held.
- */
- uint64_t rotate_at_seq_num;
- /*
- * When rotate_at_seq_num != -1ULL, meaning that a rotation is ongoing,
- * data_rotated and index_rotated respectively indicate if the stream's
- * data and index have been rotated. A rotation is considered completed
- * when both rotations have occurred.
- */
- bool data_rotated;
- bool index_rotated;
- /*
- * `trace_chunk` is the trace chunk to which the file currently
- * being produced (if any) belongs.
+ * The trace chunk to which the file currently being produced (if any)
+ * belongs.
*/
struct lttng_trace_chunk *trace_chunk;
+ LTTNG_OPTIONAL(struct relay_stream_rotation) ongoing_rotation;
};
struct relay_stream *stream_create(struct ctf_trace *trace,
struct relay_stream *stream_get_by_id(uint64_t stream_id);
bool stream_get(struct relay_stream *stream);
void stream_put(struct relay_stream *stream);
+int stream_rotate_output_files(struct relay_session *session,
+ struct relay_stream *stream);
+int stream_set_pending_rotation(struct relay_stream *stream,
+ struct lttng_trace_chunk *next_trace_chunk,
+ uint64_t rotation_sequence_number);
void try_stream_close(struct relay_stream *stream);
void stream_publish(struct relay_stream *stream);
+int stream_init_packet(struct relay_stream *stream, size_t packet_size,
+ bool *file_rotated);
+int stream_write(struct relay_stream *stream,
+ const struct lttng_buffer_view *packet, size_t padding_len);
+/* Called after the reception of a complete data packet. */
+int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
+ bool rotate_index, bool *flushed, uint64_t total_size);
+int stream_complete_packet(struct relay_stream *stream,
+ size_t packet_total_size, uint64_t sequence_number,
+ bool index_flushed);
+/* Index info is in host endianness. */
+int stream_add_index(struct relay_stream *stream,
+ const struct lttcomm_relayd_index *index_info);
+int stream_reset_file(struct relay_stream *stream);
+
void print_relay_streams(void);
#endif /* _STREAM_H */
ret = close(handle->dirfd);
if (ret == -1) {
PERROR("Failed to close directory file descriptor of directory handle");
+ abort();
}
end:
lttng_directory_handle_invalidate(handle);
uint64_t session_id);
/*
- * Create the output files of a local stream.
+ * Create the output files of a local stream.
*
* This must be called with the channel's and the stream's lock held.
*/
#include <common/trace-chunk.h>
#include <common/trace-chunk-registry.h>
#include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
+
/*
* Note that net_seq_num below is assigned with the *current* value of
* next_net_seq_num and only after that the next_net_seq_num will be
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
+
rcu_read_lock();
pthread_mutex_lock(&channel->lock);
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end_unlock_channel;
+ }
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
*/
pthread_mutex_lock(&stream->lock);
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
ERR("Failed to sample snapshot position during channel rotation");
goto end_unlock_stream;
}
+ if (!is_local_trace) {
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = (stream->rotate_position /
+ stream->max_sb_size) + 1,
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
pthread_mutex_unlock(&stream->lock);
}
+ stream = NULL;
pthread_mutex_unlock(&channel->lock);
+ if (is_local_trace) {
+ ret = 0;
+ goto end;
+ }
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end;
+ }
+
ret = 0;
goto end;
end_unlock_stream:
pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
pthread_mutex_unlock(&channel->lock);
end:
rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
return ret;
}
return ret;
}
-/*
- * Perform the rotation a stream file on the relay.
- */
-int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
-{
- int ret;
- struct consumer_relayd_sock_pair *relayd;
- uint64_t chunk_id;
- enum lttng_trace_chunk_status chunk_status;
-
- DBG("Rotate relay stream");
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- ERR("Failed to find relayd");
- ret = -1;
- goto end;
- }
-
- chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk,
- &chunk_id);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
- ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"",
- stream->chan->name);
- ret = -1;
- goto end;
- }
-
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_stream(&relayd->control_sock,
- stream->relayd_stream_id,
- chunk_id,
- stream->last_sequence_number);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- }
- if (ret) {
- ERR("Rotate relay stream");
- }
-
-end:
- return ret;
-}
-
/*
* Performs the stream rotation for the rotate session feature if needed.
* It must be called with the channel and stream locks held.
stream->trace_chunk = stream->chan->trace_chunk;
}
- if (stream->net_seq_idx != (uint64_t) -1ULL) {
- ret = rotate_relay_stream(ctx, stream);
- } else {
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
ret = rotate_local_stream(ctx, stream);
- }
- if (ret < 0) {
- ERR("Failed to rotate stream, ret = %i", ret);
- goto error;
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
+ }
}
if (stream->metadata_flag && stream->trace_chunk) {
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id)
{
+ int ret;
enum lttcomm_return_code ret_code;
struct lttng_trace_chunk *chunk;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
+ const bool is_local_trace = !relayd_id;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool chunk_exists_remote;
if (relayd_id) {
int ret;
}
DBG("Consumer trace chunk exists command: relayd_id = %s"
- ", session_id = %" PRIu64
", chunk_id = %" PRIu64, relayd_id_str,
- session_id, chunk_id);
+ chunk_id);
chunk = lttng_trace_chunk_registry_find_chunk(
consumer_data.chunk_registry, session_id,
chunk_id);
DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
- ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL :
+ if (chunk) {
+ ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+ lttng_trace_chunk_put(chunk);
+ goto end;
+ } else if (is_local_trace) {
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(*relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, *relayd_id);
+ ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end_rcu_unlock;
+ }
+ DBG("Looking up existence of trace chunk on relay daemon");
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+ &chunk_exists_remote);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Failed to look-up the existence of trace chunk on relay daemon");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto end_rcu_unlock;
+ }
+
+ ret_code = chunk_exists_remote ?
+ LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon",
+ chunk_exists_remote ? "exists" : "does not exist");
- lttng_trace_chunk_put(chunk);
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
return ret_code;
}
bool missed_metadata_flush;
enum lttng_event_output output;
- /* Maximum subbuffer size. */
+ /* Maximum subbuffer size (in bytes). */
unsigned long max_sb_size;
/*
[ ERROR_INDEX(LTTNG_ERR_CLOSE_TRACE_CHUNK_FAIL_CONSUMER) ] = "Trace chunk close failed on consumer",
[ ERROR_INDEX(LTTNG_ERR_TRACE_CHUNK_EXISTS_FAIL_CONSUMER) ] = "Failed to query consumer for trace chunk existence",
[ ERROR_INDEX(LTTNG_ERR_INVALID_PROTOCOL) ] = "Protocol error occurred",
+ [ ERROR_INDEX(LTTNG_ERR_FILE_CREATION_ERROR) ] = "Failed to create file",
/* Last element */
[ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
struct lttng_index_file *lttng_index_file_create_from_trace_chunk(
struct lttng_trace_chunk *chunk,
const char *channel_path, char *stream_name,
- uint64_t stream_file_size, uint64_t stream_count,
+ uint64_t stream_file_size, uint64_t stream_file_index,
uint32_t index_major, uint32_t index_minor,
bool unlink_existing_file)
{
index_minor);
const int flags = O_WRONLY | O_CREAT | O_TRUNC;
const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
+ bool acquired_reference = lttng_trace_chunk_get(chunk);
+
+ assert(acquired_reference);
index_file = zmalloc(sizeof(*index_file));
if (!index_file) {
goto error;
}
+ index_file->trace_chunk = chunk;
ret = snprintf(index_directory_path, sizeof(index_directory_path),
"%s/" DEFAULT_INDEX_DIR, channel_path);
if (ret < 0 || ret >= sizeof(index_directory_path)) {
}
ret = utils_stream_file_path(index_directory_path, stream_name,
- stream_file_size, stream_count,
+ stream_file_size, stream_file_index,
DEFAULT_INDEX_FILE_SUFFIX,
index_file_path, sizeof(index_file_path));
if (ret) {
if (close(index_file->fd)) {
PERROR("close index fd");
}
+ lttng_trace_chunk_put(index_file->trace_chunk);
free(index_file);
}
uint32_t major;
uint32_t minor;
uint32_t element_len;
+ struct lttng_trace_chunk *trace_chunk;
struct urcu_ref ref;
};
#include <common/sessiond-comm/relayd.h>
#include <common/index/ctf-index.h>
#include <common/trace-chunk.h>
+#include <common/string-utils/format.h>
#include "relayd.h"
return ret;
}
-int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
- uint64_t new_chunk_id, uint64_t seq_num)
+int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
+ unsigned int stream_count, uint64_t *new_chunk_id,
+ const struct relayd_stream_rotation_position *positions)
{
int ret;
- struct lttcomm_relayd_rotate_stream *msg = NULL;
- struct lttcomm_relayd_generic_reply reply;
- size_t len;
- int msg_len;
- /* FIXME */
- char *new_pathname = NULL;
+ unsigned int i;
+ struct lttng_dynamic_buffer payload;
+ struct lttcomm_relayd_generic_reply reply = {};
+ const struct lttcomm_relayd_rotate_streams msg = {
+ .stream_count = htobe32((uint32_t) stream_count),
+ .new_chunk_id = (typeof(msg.new_chunk_id)) {
+ .is_set = !!new_chunk_id,
+ .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
+ },
+ };
+ char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
+ const char *new_chunk_id_str;
- /* Code flow error. Safety net. */
- assert(rsock);
+ lttng_dynamic_buffer_init(&payload);
- DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
+ /* Code flow error. Safety net. */
+ assert(sock);
- /* Account for the trailing NULL. */
- len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
- if (len > LTTNG_PATH_MAX) {
- ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
- ret = -1;
- goto error;
+ if (new_chunk_id) {
+ ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
+ "%" PRIu64, *new_chunk_id);
+ if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
+ new_chunk_id_str = "formatting error";
+ } else {
+ new_chunk_id_str = new_chunk_id_buf;
+ }
+ } else {
+ new_chunk_id_str = "none";
}
- msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
- msg = zmalloc(msg_len);
- if (!msg) {
- PERROR("Failed to allocate relayd rotate stream command of %d bytes",
- msg_len);
- ret = -1;
- goto error;
- }
+ DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
+ new_chunk_id_str, stream_count);
- if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
- ret = -1;
- ERR("Failed to copy relayd rotate stream command's new path name");
+ ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
+ if (ret) {
+ ERR("Failed to allocate \"rotate streams\" command payload");
goto error;
}
- msg->pathname_length = htobe32(len);
- msg->stream_id = htobe64(stream_id);
- msg->new_chunk_id = htobe64(new_chunk_id);
- /*
- * The seq_num is invalid for metadata streams, but it is ignored on
- * the relay.
- */
- msg->rotate_at_seq_num = htobe64(seq_num);
+ for (i = 0; i < stream_count; i++) {
+ const struct relayd_stream_rotation_position *position =
+ &positions[i];
+ const struct lttcomm_relayd_stream_rotation_position comm_position = {
+ .stream_id = htobe64(position->stream_id),
+ .rotate_at_seq_num = htobe64(
+ position->rotate_at_seq_num),
+ };
+
+ DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
+ position->stream_id,
+ position->rotate_at_seq_num);
+ ret = lttng_dynamic_buffer_append(&payload, &comm_position,
+ sizeof(comm_position));
+ if (ret) {
+ ERR("Failed to allocate \"rotate streams\" command payload");
+ goto error;
+ }
+ }
/* Send command. */
- ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
+ ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
+ payload.size, 0);
if (ret < 0) {
- ERR("Send rotate command");
+ ERR("Failed to send \"rotate stream\" command");
goto error;
}
/* Receive response. */
- ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(sock, &reply, sizeof(reply));
if (ret < 0) {
- ERR("Receive rotate reply");
+ ERR("Failed to receive \"rotate streams\" command reply");
goto error;
}
reply.ret_code = be32toh(reply.ret_code);
-
- /* Return session id or negative ret code. */
if (reply.ret_code != LTTNG_OK) {
ret = -1;
- ERR("Relayd rotate stream replied error %d", reply.ret_code);
+ ERR("Relayd rotate streams replied error %d", reply.ret_code);
} else {
/* Success. */
ret = 0;
- DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+ DBG("Relayd rotated streams successfully");
}
error:
- free(msg);
+ lttng_dynamic_buffer_reset(&payload);
return ret;
}
end:
return ret;
}
+
+int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
+ uint64_t chunk_id, bool *chunk_exists)
+{
+ int ret = 0;
+ struct lttcomm_relayd_trace_chunk_exists msg = {};
+ struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
+
+ msg = (typeof(msg)){
+ .chunk_id = htobe64(chunk_id),
+ };
+
+ ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
+ 0);
+ if (ret < 0) {
+ ERR("Failed to send trace chunk exists command to relay daemon");
+ goto end;
+ }
+
+ ret = recv_reply(sock, &reply, sizeof(reply));
+ if (ret < 0) {
+ ERR("Failed to receive relay daemon trace chunk close command reply");
+ goto end;
+ }
+
+ reply.generic.ret_code = be32toh(reply.generic.ret_code);
+ if (reply.generic.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd trace chunk close replied error %d",
+ reply.generic.ret_code);
+ } else {
+ ret = 0;
+ DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
+ ", exists = %s", chunk_id,
+ reply.trace_chunk_exists ? "true" : "false");
+ *chunk_exists = !!reply.trace_chunk_exists;
+ }
+end:
+ return ret;
+}
#define _RELAYD_H
#include <unistd.h>
+#include <stdbool.h>
#include <common/sessiond-comm/relayd.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/trace-chunk.h>
+#include <common/dynamic-array.h>
+
+struct relayd_stream_rotation_position {
+ uint64_t stream_id;
+ /*
+ * Sequence number of the first packet belonging to the new
+ * "destination" trace chunk to which the stream is rotating.
+ *
+ * Ignored for metadata streams.
+ */
+ uint64_t rotate_at_seq_num;
+};
int relayd_connect(struct lttcomm_relayd_sock *sock);
int relayd_close(struct lttcomm_relayd_sock *sock);
uint64_t net_seq_num);
int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
uint64_t stream_id, uint64_t version);
-int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
- uint64_t new_chunk_id, uint64_t seq_num);
+/* `positions` is an array of `stream_count` relayd_stream_rotation_position. */
+int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
+ unsigned int stream_count, uint64_t *new_chunk_id,
+ const struct relayd_stream_rotation_position *positions);
int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
struct lttng_trace_chunk *chunk);
int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
struct lttng_trace_chunk *chunk);
+int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
+ uint64_t chunk_id, bool *chunk_exists);
#endif /* _RELAYD_H */
uint64_t version;
} LTTNG_PACKED;
-struct lttcomm_relayd_rotate_stream {
+struct lttcomm_relayd_stream_rotation_position {
uint64_t stream_id;
- /* Ignored for metadata streams. */
+ /*
+ * Sequence number of the first packet belonging to the new
+ * "destination" trace chunk to which the stream is rotating.
+ *
+ * Ignored for metadata streams.
+ */
uint64_t rotate_at_seq_num;
- uint64_t new_chunk_id;
- /* Includes trailing NULL. */
- uint32_t pathname_length;
- /* Must be the last member of this structure. */
- char new_pathname[];
+} LTTNG_PACKED;
+
+struct lttcomm_relayd_rotate_streams {
+ uint32_t stream_count;
+ /*
+ * Streams can be rotated outside of a chunk but not be parented to
+ * a new chunk.
+ */
+ LTTNG_OPTIONAL_COMM(uint64_t) new_chunk_id;
+ /* `stream_count` positions follow. */
+ struct lttcomm_relayd_stream_rotation_position rotation_positions[];
} LTTNG_PACKED;
struct lttcomm_relayd_create_trace_chunk {
LTTNG_OPTIONAL_COMM(uint32_t) LTTNG_PACKED close_command;
} LTTNG_PACKED;
+struct lttcomm_relayd_trace_chunk_exists {
+ uint64_t chunk_id;
+} LTTNG_PACKED;
+
+struct lttcomm_relayd_trace_chunk_exists_reply {
+ struct lttcomm_relayd_generic_reply generic;
+ uint8_t trace_chunk_exists;
+} LTTNG_PACKED;
+
#endif /* _RELAYD_COMM */
RELAYD_STREAMS_SENT = 16,
/* Ask the relay to reset the metadata trace file (2.8+) */
RELAYD_RESET_METADATA = 17,
- /* Ask the relay to rotate a stream file (2.11+) */
- RELAYD_ROTATE_STREAM = 18,
+ /* Ask the relay to rotate a set of stream files (2.11+) */
+ RELAYD_ROTATE_STREAMS = 18,
/* Ask the relay to create a trace chunk (2.11+) */
RELAYD_CREATE_TRACE_CHUNK = 19,
/* Ask the relay to close a trace chunk (2.11+) */
RELAYD_CLOSE_TRACE_CHUNK = 20,
+ /* Ask the relay whether a trace chunk exists (2.11+) */
+ RELAYD_TRACE_CHUNK_EXISTS = 21,
};
/*
LTTNG_OPTIONAL_GET(trace_chunk->timestamp_creation);
const time_t close_timestamp =
LTTNG_OPTIONAL_GET(trace_chunk->timestamp_close);
- LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory;
+ LTTNG_OPTIONAL(struct lttng_directory_handle) archived_chunks_directory = {};
if (!trace_chunk->mode.is_set ||
trace_chunk->mode.value != TRACE_CHUNK_MODE_OWNER ||
return ret;
}
-/*
- * Change the output tracefile according to the given size and count The
- * new_count pointer is set during this operation.
- *
- * From the consumer, the stream lock MUST be held before calling this function
- * because we are modifying the stream status.
- *
- * Return 0 on success or else a negative value.
- */
-LTTNG_HIDDEN
-int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
- uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
- int *stream_fd)
-{
- int ret;
-
- assert(stream_fd);
-
- ret = close(out_fd);
- if (ret < 0) {
- PERROR("Closing tracefile");
- goto error;
- }
- *stream_fd = -1;
-
- if (count > 0) {
- /*
- * In tracefile rotation, for the relay daemon we need
- * to unlink the old file if present, because it may
- * still be open in reading by the live thread, and we
- * need to ensure that we do not overwrite the content
- * between get_index and get_packet. Since we have no
- * way to verify integrity of the data content compared
- * to the associated index, we need to ensure the reader
- * has exclusive access to the file content, and that
- * the open of the data file is performed in get_index.
- * Unlinking the old file rather than overwriting it
- * achieves this.
- */
- if (new_count) {
- *new_count = (*new_count + 1) % count;
- }
- ret = utils_unlink_stream_file(path_name, file_name, size,
- new_count ? *new_count : 0, uid, gid, 0);
- if (ret < 0 && errno != ENOENT) {
- goto error;
- }
- } else {
- if (new_count) {
- (*new_count)++;
- }
- }
-
- ret = utils_create_stream_file(path_name, file_name, size,
- new_count ? *new_count : 0, uid, gid, 0);
- if (ret < 0) {
- goto error;
- }
- *stream_fd = ret;
-
- /* Success. */
- ret = 0;
-
-error:
- return ret;
-}
-
-
/**
* Parse a string that represents a size in human readable format. It
* supports decimal integers suffixed by 'k', 'K', 'M' or 'G'.
uint64_t count, int uid, int gid, char *suffix);
int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size,
uint64_t count, int uid, int gid, char *suffix);
-int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size,
- uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count,
- int *stream_fd);
int utils_stream_file_path(const char *path_name, const char *file_name,
uint64_t size, uint64_t count, const char *suffix,
char *out_stream_path, size_t stream_path_len);