return ret;
}
+/*
+ * Append padding to the file pointed by the file descriptor fd.
+ */
+static int write_padding_to_file(int fd, uint32_t size)
+{
+ int ret = 0;
+ char *zeros;
+
+ if (size == 0) {
+ goto end;
+ }
+
+ zeros = zmalloc(size);
+ if (zeros == NULL) {
+ PERROR("zmalloc zeros for padding");
+ ret = -1;
+ goto end;
+ }
+
+ do {
+ ret = write(fd, zeros, size);
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write padding to file");
+ }
+
+end:
+ return ret;
+}
+
/*
* relay_recv_metadata: receive the metada for the session.
*/
ret = -1;
goto end_unlock;
}
+
+ ret = write_padding_to_file(metadata_stream->fd,
+ be32toh(metadata_struct->padding_size));
+ if (ret < 0) {
+ goto end_unlock;
+ }
+
DBG2("Relay metadata written");
end_unlock:
ret = -1;
goto end_unlock;
}
+
+ ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
+ if (ret < 0) {
+ goto end_unlock;
+ }
+
DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
ret, stream->stream_handle);
* Return destination file descriptor or negative value on error.
*/
static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
- size_t data_size, struct consumer_relayd_sock_pair *relayd)
+ size_t data_size, unsigned long padding,
+ struct consumer_relayd_sock_pair *relayd)
{
int outfd = -1, ret;
struct lttcomm_relayd_data_hdr data_hdr;
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
+ data_hdr.padding_size = htobe32(padding);
data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
/* Other fields are zeroed previously */
*/
static int write_relayd_metadata_id(int fd,
struct lttng_consumer_stream *stream,
- struct consumer_relayd_sock_pair *relayd)
+ struct consumer_relayd_sock_pair *relayd,
+ unsigned long padding)
{
int ret;
- uint64_t metadata_id;
+ struct lttcomm_relayd_metadata_payload hdr;
- metadata_id = htobe64(stream->relayd_stream_id);
+ hdr.stream_id = htobe64(stream->relayd_stream_id);
+ hdr.padding_size = htobe32(padding);
do {
- ret = write(fd, (void *) &metadata_id,
- sizeof(stream->relayd_stream_id));
+ ret = write(fd, (void *) &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
PERROR("write metadata stream id");
goto end;
}
- DBG("Metadata stream id %" PRIu64 " written before data",
- stream->relayd_stream_id);
+ DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
+ stream->relayd_stream_id, padding);
end:
return ret;
*/
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding)
{
unsigned long mmap_offset;
ssize_t ret = 0, written = 0;
if (stream->metadata_flag) {
/* Metadata requires the control socket. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- netlen += sizeof(stream->relayd_stream_id);
+ netlen += sizeof(struct lttcomm_relayd_metadata_payload);
}
- ret = write_relayd_stream_header(stream, netlen, relayd);
+ ret = write_relayd_stream_header(stream, netlen, padding, relayd);
if (ret >= 0) {
/* Use the returned socket. */
outfd = ret;
/* Write metadata stream id before payload */
if (stream->metadata_flag) {
- ret = write_relayd_metadata_id(outfd, stream, relayd);
+ ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
if (ret < 0) {
written = ret;
goto end;
}
}
/* Else, use the default set before which is the filesystem. */
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
}
while (len > 0) {
do {
ret = write(outfd, stream->mmap_base + mmap_offset, len);
} while (ret < 0 && errno == EINTR);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
PERROR("Error in file write");
if (written == 0) {
len -= ret;
mmap_offset += ret;
}
- DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
*/
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
}
/* Write metadata stream id before payload */
- if (stream->metadata_flag && relayd) {
- /*
- * Lock the control socket for the complete duration of the function
- * since from this point on we will use the socket.
- */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (relayd) {
+ int total_len = len;
- ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd);
- if (ret < 0) {
- written = ret;
+ if (stream->metadata_flag) {
+ /*
+ * Lock the control socket for the complete duration of the function
+ * since from this point on we will use the socket.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+
+ ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd,
+ padding);
+ if (ret < 0) {
+ written = ret;
+ goto end;
+ }
+
+ total_len += sizeof(struct lttcomm_relayd_metadata_payload);
+ }
+
+ ret = write_relayd_stream_header(stream, total_len, padding, relayd);
+ if (ret >= 0) {
+ /* Use the returned socket. */
+ outfd = ret;
+ } else {
+ ERR("Remote relayd disconnected. Stopping");
goto end;
}
+ } else {
+ /* No streaming, we have to set the len with the full padding */
+ len += padding;
}
while (len > 0) {
- DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
- (unsigned long)offset, len, fd);
+ DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
+ (unsigned long)offset, len, fd, splice_pipe[1]);
ret_splice = splice(fd, &offset, splice_pipe[1], NULL, len,
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
if (stream->metadata_flag) {
+ size_t metadata_payload_size =
+ sizeof(struct lttcomm_relayd_metadata_payload);
+
/* Update counter to fit the spliced data */
- ret_splice += sizeof(stream->relayd_stream_id);
- len += sizeof(stream->relayd_stream_id);
+ ret_splice += metadata_payload_size;
+ len += metadata_payload_size;
/*
* We do this so the return value can match the len passed as
* argument to this function.
*/
- written -= sizeof(stream->relayd_stream_id);
- }
-
- ret = write_relayd_stream_header(stream, ret_splice, relayd);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
- } else {
- ERR("Remote relayd disconnected. Stopping");
- goto end;
+ written -= metadata_payload_size;
}
}
/* Splice data out */
ret_splice = splice(splice_pipe[0], NULL, outfd, NULL,
ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
- DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice);
+ DBG("Consumer splice pipe to file, ret %zd", ret_splice);
if (ret_splice < 0) {
PERROR("Error in file splice");
if (written == 0) {
extern void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
extern ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len);
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding);
extern ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len);
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding);
extern int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream);
extern int lttng_consumer_get_produced_snapshot(
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
- unsigned long len;
+ unsigned long len, subbuf_size, padding;
int err;
ssize_t ret = 0;
int infd = stream->wait_fd;
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
- ret = -err;
+ ret = err;
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
goto end;
}
+ /* Get the full subbuffer size including padding */
+ err = kernctl_get_padded_subbuf_size(infd, &len);
+ if (err != 0) {
+ errno = -err;
+ perror("Getting sub-buffer len failed.");
+ ret = err;
+ goto end;
+ }
+
switch (stream->output) {
- case LTTNG_EVENT_SPLICE:
- /* read the whole subbuffer */
- err = kernctl_get_padded_subbuf_size(infd, &len);
- if (err != 0) {
- errno = -err;
- perror("Getting sub-buffer len failed.");
- ret = -err;
- goto end;
- }
+ case LTTNG_EVENT_SPLICE:
- /* splice the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len);
- if (ret != len) {
- /*
- * display the error but continue processing to try
- * to release the subbuffer
- */
- ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
- ret, len);
- }
+ /*
+ * XXX: The lttng-modules splice "actor" does not handle copying
+ * partial pages hence only using the subbuffer size without the
+ * padding makes the splice fail.
+ */
+ subbuf_size = len;
+ padding = 0;
+
+ /* splice the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream,
+ subbuf_size, padding);
+ if (ret != subbuf_size) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
+ ret, subbuf_size);
+ }
+ break;
+ case LTTNG_EVENT_MMAP:
+ /* Get subbuffer size without padding */
+ err = kernctl_get_subbuf_size(infd, &subbuf_size);
+ if (err != 0) {
+ errno = -err;
+ perror("Getting sub-buffer len failed.");
+ ret = err;
+ goto end;
+ }
- break;
- case LTTNG_EVENT_MMAP:
- /* read the used subbuffer size */
- err = kernctl_get_padded_subbuf_size(infd, &len);
- if (err != 0) {
- errno = -err;
- perror("Getting sub-buffer len failed.");
- ret = -err;
- goto end;
- }
- /* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
- if (ret != len) {
- /*
- * display the error but continue processing to try
- * to release the subbuffer
- */
- ERR("Error writing to tracefile");
- }
- break;
- default:
- ERR("Unknown output method");
- ret = -1;
+ /* Make sure the tracer is not gone mad on us! */
+ assert(len >= subbuf_size);
+
+ padding = len - subbuf_size;
+
+ /* write the subbuffer to the tracefile */
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
+ subbuf_size, padding);
+ if (ret != subbuf_size) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile (ret: %zd != len: %lu",
+ ret, subbuf_size);
+ }
+ break;
+ default:
+ ERR("Unknown output method");
+ ret = -1;
}
err = kernctl_put_next_subbuf(infd);
uint64_t stream_id; /* Stream ID known by the relayd */
uint64_t net_seq_num; /* Network sequence number, per stream. */
uint32_t data_size; /* data size following this header */
+ uint32_t padding_size; /* Size of 0 padding the data */
} __attribute__ ((__packed__));
/*
*/
struct lttcomm_relayd_metadata_payload {
uint64_t stream_id;
+ uint32_t padding_size;
char payload[];
} __attribute__ ((__packed__));
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
- unsigned long len;
+ unsigned long len, subbuf_size, padding;
int err;
long ret = 0;
struct lttng_ust_shm_handle *handle;
/* Get the next subbuffer */
err = ustctl_get_next_subbuf(handle, buf);
if (err != 0) {
- ret = -err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
+ ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
/*
* This is a debug message even for single-threaded consumer,
* because poll() have more relaxed criterions than get subbuf,
goto end;
}
assert(stream->output == LTTNG_EVENT_MMAP);
- /* read the used subbuffer size */
+ /* Get the full padded subbuffer size */
err = ustctl_get_padded_subbuf_size(handle, buf, &len);
assert(err == 0);
+
+ /* Get subbuffer data size (without padding) */
+ err = ustctl_get_subbuf_size(handle, buf, &subbuf_size);
+ assert(err == 0);
+
+ /* Make sure we don't get a subbuffer size bigger than the padded */
+ assert(len >= subbuf_size);
+
+ padding = len - subbuf_size;
/* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len);
- if (ret != len) {
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
+ if (ret != subbuf_size) {
/*
* display the error but continue processing to try
* to release the subbuffer
static inline
ssize_t lttng_ustconsumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len)
+ struct lttng_consumer_stream *stream, unsigned long len,
+ unsigned long padding)
{
return -ENOSYS;
}
static inline
ssize_t lttng_ustconsumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *uststream, unsigned long len)
+ struct lttng_consumer_stream *uststream, unsigned long len,
+ unsigned long padding)
{
return -ENOSYS;
}