From: David Goulet Date: Tue, 31 Jul 2012 16:35:39 +0000 (-0400) Subject: Divide read subbuffer consumer function X-Git-Tag: v2.1.0-rc1~41 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=6197aea7399cfe3bb67f8602ba4c3122867ecf52;p=lttng-tools.git Divide read subbuffer consumer function Signed-off-by: David Goulet --- diff --git a/src/common/consumer.c b/src/common/consumer.c index 1863cddc5..761ce9375 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -529,28 +529,19 @@ error: * * Return destination file descriptor or negative value on error. */ -int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, - size_t data_size) +static int write_relayd_stream_header(struct lttng_consumer_stream *stream, + size_t data_size, struct consumer_relayd_sock_pair *relayd) { int outfd = -1, ret; - struct consumer_relayd_sock_pair *relayd; struct lttcomm_relayd_data_hdr data_hdr; /* Safety net */ assert(stream); + assert(relayd); /* Reset data header */ memset(&data_hdr, 0, sizeof(data_hdr)); - rcu_read_lock(); - /* Get relayd reference of the stream. */ - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd == NULL) { - /* Stream is either local or corrupted */ - goto error; - } - - DBG("Consumer found relayd socks with index %d", stream->net_seq_idx); if (stream->metadata_flag) { /* Caller MUST acquire the relayd control socket lock */ ret = relayd_send_metadata(&relayd->control_sock, data_size); @@ -578,7 +569,6 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, } error: - rcu_read_unlock(); return outfd; } @@ -1078,6 +1068,32 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) free(ctx); } +/* + * Write the metadata stream id on the specified file descriptor. + */ +static int write_relayd_metadata_id(int fd, + struct lttng_consumer_stream *stream, + struct consumer_relayd_sock_pair *relayd) +{ + int ret; + uint64_t metadata_id; + + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(fd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + goto end; + } + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + +end: + return ret; +} + /* * Mmap the ring buffer, read it and write the data to the tracefile. * @@ -1092,7 +1108,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( off_t orig_offset = stream->out_fd_offset; /* Default is on the disk */ int outfd = stream->out_fd; - uint64_t metadata_id; struct consumer_relayd_sock_pair *relayd = NULL; /* RCU lock for the relayd pointer */ @@ -1141,25 +1156,19 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( netlen += sizeof(stream->relayd_stream_id); } - ret = consumer_handle_stream_before_relayd(stream, netlen); + ret = write_relayd_stream_header(stream, netlen, relayd); if (ret >= 0) { /* Use the returned socket. */ outfd = ret; /* Write metadata stream id before payload */ if (stream->metadata_flag) { - metadata_id = htobe64(stream->relayd_stream_id); - do { - ret = write(outfd, (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); - } while (ret < 0 && errno == EINTR); + ret = write_relayd_metadata_id(outfd, stream, relayd); if (ret < 0) { - PERROR("write metadata stream id"); written = ret; goto end; } - DBG("Metadata stream id %zu written before data", - stream->relayd_stream_id); + /* * We do this so the return value can match the len passed as * argument to this function. @@ -1226,7 +1235,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( int fd = stream->wait_fd; /* Default is on the disk */ int outfd = stream->out_fd; - uint64_t metadata_id; struct consumer_relayd_sock_pair *relayd = NULL; switch (consumer_data.type) { @@ -1260,18 +1268,12 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - metadata_id = htobe64(stream->relayd_stream_id); - do { - ret = write(ctx->consumer_thread_pipe[1], (void *) &metadata_id, - sizeof(stream->relayd_stream_id)); - } while (ret < 0 && errno == EINTR); + ret = write_relayd_metadata_id(ctx->consumer_thread_pipe[1], + stream, relayd); if (ret < 0) { - PERROR("write metadata stream id"); written = ret; goto end; } - DBG("Metadata stream id %zu written before data", - stream->relayd_stream_id); } while (len > 0) { @@ -1302,15 +1304,13 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( written -= sizeof(stream->relayd_stream_id); } - ret = consumer_handle_stream_before_relayd(stream, ret_splice); + ret = write_relayd_stream_header(stream, ret_splice, relayd); if (ret >= 0) { /* Use the returned socket. */ outfd = ret; } else { - if (outfd == -1) { - ERR("Remote relayd disconnected. Stopping"); - goto end; - } + ERR("Remote relayd disconnected. Stopping"); + goto end; } }