+/*
+ * Process the commands received on the control socket
+ */
+static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret = 0;
+
+ switch (be32toh(recv_hdr->cmd)) {
+ case RELAYD_CREATE_SESSION:
+ ret = relay_create_session(recv_hdr, conn);
+ break;
+ case RELAYD_ADD_STREAM:
+ ret = relay_add_stream(recv_hdr, conn);
+ break;
+ case RELAYD_START_DATA:
+ ret = relay_start(recv_hdr, conn);
+ break;
+ case RELAYD_SEND_METADATA:
+ ret = relay_recv_metadata(recv_hdr, conn);
+ break;
+ case RELAYD_VERSION:
+ ret = relay_send_version(recv_hdr, conn);
+ break;
+ case RELAYD_CLOSE_STREAM:
+ ret = relay_close_stream(recv_hdr, conn);
+ break;
+ case RELAYD_DATA_PENDING:
+ ret = relay_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_QUIESCENT_CONTROL:
+ ret = relay_quiescent_control(recv_hdr, conn);
+ break;
+ case RELAYD_BEGIN_DATA_PENDING:
+ ret = relay_begin_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_END_DATA_PENDING:
+ ret = relay_end_data_pending(recv_hdr, conn);
+ break;
+ case RELAYD_SEND_INDEX:
+ ret = relay_recv_index(recv_hdr, conn);
+ break;
+ case RELAYD_STREAMS_SENT:
+ ret = relay_streams_sent(recv_hdr, conn);
+ break;
+ case RELAYD_RESET_METADATA:
+ ret = relay_reset_metadata(recv_hdr, conn);
+ break;
+ case RELAYD_UPDATE_SYNC_INFO:
+ default:
+ ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
+ relay_unknown_command(conn);
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Handle index for a data stream.
+ *
+ * Called with the stream lock held.
+ *
+ * Return 0 on success else a negative value.
+ */
+static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
+ int rotate_index)
+{
+ int ret = 0;
+ uint64_t data_offset;
+ struct relay_index *index;
+
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+ stream->stream_handle, net_seq_num, stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence
+ * number. If it exists, the control thread has already received the
+ * data for it, thus we need to write it to disk.
+ */
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto end;
+ }
+
+ if (rotate_index || !stream->index_file) {
+ uint32_t major, minor;
+
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+ stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->channel_name,
+ -1, -1, stream->tracefile_size,
+ tracefile_array_get_file_index_head(stream->tfa),
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor));
+ if (!stream->index_file) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ index = NULL;
+ goto end;
+ }
+ }
+
+ if (relay_index_set_file(index, stream->index_file, data_offset)) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ index = NULL;
+ goto end;
+ }
+
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
+ } else if (ret > 0) {
+ /* No flush. */
+ ret = 0;
+ } else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
+ ret = -1;
+ }
+end:
+ return ret;
+}
+
+/*
+ * relay_process_data: Process the data received on the data socket
+ */
+static int relay_process_data(struct relay_connection *conn)
+{
+ int ret = 0, rotate_index = 0;
+ ssize_t size_ret;
+ struct relay_stream *stream;
+ struct lttcomm_relayd_data_hdr data_hdr;
+ uint64_t stream_id;
+ uint64_t net_seq_num;
+ uint32_t data_size;
+ struct relay_session *session;
+ bool new_stream = false, close_requested = false;
+ size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+ size_t recv_off = 0;
+ char data_buffer[chunk_size];
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
+ sizeof(struct lttcomm_relayd_data_hdr), 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ stream_id = be64toh(data_hdr.stream_id);
+ stream = stream_get_by_id(stream_id);
+ if (!stream) {
+ ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
+ ret = -1;
+ goto end;
+ }
+ session = stream->trace->session;
+ data_size = be32toh(data_hdr.data_size);
+
+ net_seq_num = be64toh(data_hdr.net_seq_num);
+
+ DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ data_size, stream_id, net_seq_num);
+
+ pthread_mutex_lock(&stream->lock);
+
+ /* Check if a rotation is needed. */
+ if (stream->tracefile_size > 0 &&
+ (stream->tracefile_size_current + data_size) >
+ stream->tracefile_size) {
+ uint64_t old_id, new_id;
+
+ old_id = tracefile_array_get_file_index_head(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa);
+
+ /* new_id is updated by utils_rotate_stream_file. */
+ new_id = old_id;
+
+ ret = utils_rotate_stream_file(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_count, -1,
+ -1, stream->stream_fd->fd,
+ &new_id, &stream->stream_fd->fd);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end_stream_unlock;
+ }
+ /*
+ * Reset current size because we just performed a stream
+ * rotation.
+ */
+ stream->tracefile_size_current = 0;
+ rotate_index = 1;
+ }
+
+ /*
+ * Index are handled in protocol version 2.4 and above. Also,
+ * snapshot and index are NOT supported.
+ */
+ if (session->minor >= 4 && !session->snapshot) {
+ ret = handle_index_data(stream, net_seq_num, rotate_index);
+ if (ret < 0) {
+ ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
+ goto end_stream_unlock;
+ }
+ }
+
+ for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+ size_t recv_size = min(data_size - recv_off, chunk_size);
+
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Socket %d error %d", conn->sock->fd, ret);
+ }
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ /* Write data to stream output fd. */
+ size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ recv_size);
+ if (size_ret < recv_size) {
+ ERR("Relay error writing data to file");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ size_ret, stream->stream_handle);
+ }
+
+ ret = write_padding_to_file(stream->stream_fd->fd,
+ be32toh(data_hdr.padding_size));
+ if (ret < 0) {
+ ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
+ goto end_stream_unlock;
+ }
+ stream->tracefile_size_current +=
+ data_size + be32toh(data_hdr.padding_size);
+ if (stream->prev_seq == -1ULL) {
+ new_stream = true;
+ }
+
+ stream->prev_seq = net_seq_num;
+
+end_stream_unlock:
+ close_requested = stream->close_requested;
+ pthread_mutex_unlock(&stream->lock);
+ if (close_requested) {
+ try_stream_close(stream);
+ }
+
+ if (new_stream) {
+ pthread_mutex_lock(&session->lock);
+ uatomic_set(&session->new_streams, 1);
+ pthread_mutex_unlock(&session->lock);
+ }
+ stream_put(stream);
+end:
+ return ret;
+}
+
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)