ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream,
- const char *buffer,
- unsigned long len,
+ const struct lttng_buffer_view *buffer,
unsigned long padding,
struct ctf_packet_index *index)
{
int outfd = stream->out_fd;
struct consumer_relayd_sock_pair *relayd = NULL;
unsigned int relayd_hang_up = 0;
+ const size_t subbuf_content_size = buffer->size - padding;
+ size_t write_len;
/* RCU lock for the relayd pointer */
rcu_read_lock();
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
- unsigned long netlen = len;
+ unsigned long netlen = subbuf_content_size;
/*
* Lock the control socket for the complete duration of the function
goto write_error;
}
}
- } else {
- /* No streaming, we have to set the len with the full padding */
- len += padding;
+ write_len = subbuf_content_size;
+ } else {
+ /* No streaming; we have to write the full padding. */
if (stream->metadata_flag && stream->reset_metadata_flag) {
ret = utils_truncate_stream_file(stream->out_fd, 0);
if (ret < 0) {
* Check if we need to change the tracefile before writing the packet.
*/
if (stream->chan->tracefile_size > 0 &&
- (stream->tracefile_size_current + len) >
+ (stream->tracefile_size_current + buffer->size) >
stream->chan->tracefile_size) {
ret = consumer_stream_rotate_output_files(stream);
if (ret) {
outfd = stream->out_fd;
orig_offset = 0;
}
- stream->tracefile_size_current += len;
+ stream->tracefile_size_current += buffer->size;
if (index) {
index->offset = htobe64(stream->out_fd_offset);
}
+
+ write_len = buffer->size;
}
/*
* This call guarantee that len or less is returned. It's impossible to
* receive a ret value that is bigger than len.
*/
- ret = lttng_write(outfd, buffer, len);
- DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
- if (ret < 0 || ((size_t) ret != len)) {
+ ret = lttng_write(outfd, buffer->data, write_len);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, write_len);
+ if (ret < 0 || ((size_t) ret != write_len)) {
/*
* Report error to caller if nothing was written else at least send the
* amount written.
DBG("Consumer mmap write detected relayd hang up");
} else {
/* Unhandled error, print it and stop function right now. */
- PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
+ PERROR("Error in write mmap (ret %zd != write_len %zu)", ret,
+ write_len);
}
goto write_error;
}
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+ lttng_sync_file_range(outfd, stream->out_fd_offset, write_len,
SYNC_FILE_RANGE_WRITE);
- stream->out_fd_offset += len;
+ stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
#include <common/index/ctf-index.h>
#include <common/trace-chunk-registry.h>
#include <common/credentials.h>
+#include <common/buffer-view.h>
/* Commands for consumer */
enum lttng_consumer_command {
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream,
- const char *buffer,
- unsigned long len,
+ const struct lttng_buffer_view *buffer,
unsigned long padding,
struct ctf_packet_index *index);
ssize_t lttng_consumer_on_read_subbuffer_splice(
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+#include "common/buffer-view.h"
#include <stdint.h>
#define _LGPL_SOURCE
#include <assert.h>
ssize_t read_len;
unsigned long len, padded_len;
const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
health_code_update();
-
DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
goto error_put_subbuf;
}
+ subbuf_view = lttng_buffer_view_init(
+ subbuf_addr, 0, padded_len);
read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
- stream, subbuf_addr, len,
+ stream, &subbuf_view,
padded_len - len, NULL);
/*
* We write the padded len in local tracefiles but the data len
case CONSUMER_CHANNEL_MMAP:
{
const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
/* Get subbuffer size without padding */
err = kernctl_get_subbuf_size(infd, &subbuf_size);
padding = len - subbuf_size;
+ subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
/* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
- subbuf_addr,
- subbuf_size,
- padding, &index);
+ ret = lttng_consumer_on_read_subbuffer_mmap(
+ ctx, stream, &subbuf_view, padding, &index);
/*
- * The mmap operation should write subbuf_size amount of data when
- * network streaming or the full padding (len) size when we are _not_
- * streaming.
+ * The mmap operation should write subbuf_size amount of data
+ * when network streaming or the full padding (len) size when we
+ * are _not_ streaming.
*/
if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
(ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
ssize_t read_len;
unsigned long len, padded_len;
const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
health_code_update();
goto error_put_subbuf;
}
+ subbuf_view = lttng_buffer_view_init(
+ subbuf_addr, 0, padded_len);
read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
- stream, subbuf_addr, len,
- padded_len - len, NULL);
+ stream, &subbuf_view, padded_len - len,
+ NULL);
if (use_relayd) {
if (read_len != len) {
ret = -EPERM;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
const char *subbuf_addr;
+ struct lttng_buffer_view subbuf_view;
assert(stream);
assert(stream->ustream);
goto error_put_subbuf;
}
+ subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(
- ctx, stream, subbuf_addr, subbuf_size, padding, &index);
+ ctx, stream, &subbuf_view, padding, &index);
/*
* The mmap operation should write subbuf_size amount of data when
* network streaming or the full padding (len) size when we are _not_