*
* Return destination file descriptor or negative value on error.
*/
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
- size_t data_size)
+static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
+ size_t data_size, struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
- struct consumer_relayd_sock_pair *relayd;
struct lttcomm_relayd_data_hdr data_hdr;
/* Safety net */
assert(stream);
+ assert(relayd);
/* Reset data header */
memset(&data_hdr, 0, sizeof(data_hdr));
- rcu_read_lock();
- /* Get relayd reference of the stream. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
- /* Stream is either local or corrupted */
- goto error;
- }
-
- DBG("Consumer found relayd socks with index %d", stream->net_seq_idx);
if (stream->metadata_flag) {
/* Caller MUST acquire the relayd control socket lock */
ret = relayd_send_metadata(&relayd->control_sock, data_size);
}
error:
- rcu_read_unlock();
return outfd;
}
free(ctx);
}
+/*
+ * Write the metadata stream id on the specified file descriptor.
+ */
+static int write_relayd_metadata_id(int fd,
+ struct lttng_consumer_stream *stream,
+ struct consumer_relayd_sock_pair *relayd)
+{
+ int ret;
+ uint64_t metadata_id;
+
+ metadata_id = htobe64(stream->relayd_stream_id);
+ do {
+ ret = write(fd, (void *) &metadata_id,
+ sizeof(stream->relayd_stream_id));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write metadata stream id");
+ goto end;
+ }
+ DBG("Metadata stream id %zu written before data",
+ stream->relayd_stream_id);
+
+end:
+ return ret;
+}
+
/*
* Mmap the ring buffer, read it and write the data to the tracefile.
*
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
int outfd = stream->out_fd;
- uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
/* RCU lock for the relayd pointer */
netlen += sizeof(stream->relayd_stream_id);
}
- ret = consumer_handle_stream_before_relayd(stream, netlen);
+ ret = write_relayd_stream_header(stream, netlen, relayd);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
/* Write metadata stream id before payload */
if (stream->metadata_flag) {
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(outfd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- } while (ret < 0 && errno == EINTR);
+ ret = write_relayd_metadata_id(outfd, stream, relayd);
if (ret < 0) {
- PERROR("write metadata stream id");
written = ret;
goto end;
}
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
+
/*
* We do this so the return value can match the len passed as
* argument to this function.
int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
- uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
switch (consumer_data.type) {
*/
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- metadata_id = htobe64(stream->relayd_stream_id);
- do {
- ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
- } while (ret < 0 && errno == EINTR);
+ ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1],
+ stream, relayd);
if (ret < 0) {
- PERROR("write metadata stream id");
written = ret;
goto end;
}
- DBG("Metadata stream id %zu written before data",
- stream->relayd_stream_id);
}
while (len > 0) {
written -= sizeof(stream->relayd_stream_id);
}
- ret = consumer_handle_stream_before_relayd(stream, ret_splice);
+ ret = write_relayd_stream_header(stream, ret_splice, relayd);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
} else {
- if (outfd == -1) {
- ERR("Remote relayd disconnected. Stopping");
- goto end;
- }
+ ERR("Remote relayd disconnected. Stopping");
+ goto end;
}
}