struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
+ int rotate_ret;
+ bool rotated = false;
pthread_mutex_lock(&stream->lock);
if (stream->metadata_flag) {
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated);
break;
default:
ERR("Unknown consumer_data type");
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
+ if (rotated) {
+ rotate_ret = consumer_post_rotation(stream, ctx);
+ if (rotate_ret < 0) {
+ ERR("Failed after a rotation");
+ ret = -1;
+ }
+ }
+
return ret;
}
return start_pos;
}
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ unsigned long consumed_pos;
+
+ if (!stream->rotate_position && !stream->rotate_ready) {
+ ret = 0;
+ goto end;
+ }
+
+ if (stream->rotate_ready) {
+ ret = 1;
+ goto end;
+ }
+
+ /*
+ * If we don't have the rotate_ready flag, check the consumed position
+ * to determine if we need to rotate.
+ */
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+
+ /* Rotate position not reached yet (with check for overflow). */
+ if ((long) (consumed_pos - stream->rotate_position) < 0) {
+ ret = 0;
+ goto end;
+ }
+ ret = 1;
+
+end:
+ return ret;
+}
+
/*
* Reset the state for a stream after a rotation occurred.
*/
* Return 0 on success, a negative number of error.
*/
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+ struct lttng_consumer_stream *stream, bool *rotated)
{
int ret;
}
lttng_consumer_reset_stream_rotate_state(stream);
+ if (rotated) {
+ *rotated = true;
+ }
+
ret = 0;
error:
pthread_cond_t metadata_rdv;
pthread_mutex_t metadata_rdv_lock;
+ /*
+ * rotate_position represents the position in the ring-buffer that has to
+ * be flushed to disk to complete the ongoing rotation. When that position
+ * is reached, this tracefile can be closed and a new one is created in
+ * channel_read_only_attributes.path.
+ */
+ unsigned long rotate_position;
+
/*
* Read-only copies of channel values. We cannot safely access the
* channel from a stream, so we need to have a local copy of these
uint64_t tracefile_size;
} channel_read_only_attributes;
+ /*
+ * Flag to inform the data or metadata thread that a stream is
+ * ready to be rotated.
+ */
+ bool rotate_ready;
+
/* Indicate if the stream still has some data to be read. */
unsigned int has_data:1;
/*
void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream);
+ struct lttng_consumer_stream *stream, bool *rotated);
int lttng_consumer_rotate_rename(const char *current_path, const char *new_path,
uid_t uid, gid_t gid, uint64_t relayd_id);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
do {
health_code_update();
- ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+ ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL);
if (ret_read < 0) {
if (ret_read != -EAGAIN) {
ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
goto end_nosignal;
}
- DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
- new_stream->name, fd, new_stream->relayd_stream_id);
+ DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
+ new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
break;
}
case LTTNG_CONSUMER_STREAMS_SENT:
* Consume data on a file descriptor and write it on a trace file.
*/
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx, bool *rotated)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1;
+ int err, write_index = 1, rotation_ret;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct ctf_packet_index index;
DBG("In read_subbuffer (infd : %d)", infd);
+ /*
+ * If the stream was flagged to be ready for rotation before we extract the
+ * next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before extracting data");
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ }
+
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
if (err != 0) {
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
ret = err;
- goto end;
+ goto error;
}
/* Get the full subbuffer size including padding */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
if (!stream->metadata_flag) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
ret = update_stream_stats(stream);
if (ret < 0) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
} else {
write_index = 0;
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
}
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
/* Make sure the tracer is not gone mad on us! */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
/* Write index if needed. */
if (!write_index) {
- goto end;
+ goto rotate;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
pthread_mutex_unlock(&stream->metadata_timer_lock);
}
if (err < 0) {
- goto end;
+ goto error;
}
}
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
- goto end;
+ goto error;
}
-end:
+rotate:
+ /*
+ * After extracting the packet, we check if the stream is now ready to be
+ * rotated and perform the action immediately.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ } else if (rotation_ret < 0) {
+ ERR("Checking if stream is ready to rotate");
+ ret = -1;
+ goto error;
+ }
+
+error:
return ret;
}
#ifndef _LTTNG_KCONSUMER_H
#define _LTTNG_KCONSUMER_H
+#include <stdbool.h>
#include <common/consumer/consumer.h>
int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream);
int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll);
ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx);
+ struct lttng_consumer_local_data *ctx, bool *rotated);
int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream);
int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata);
#include <unistd.h>
#include <urcu/list.h>
#include <signal.h>
+#include <stdbool.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
* Return 0 on success else a negative value.
*/
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx, bool *rotated)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1;
+ int err, write_index = 1, rotation_ret;
long ret = 0;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
readlen = lttng_read(stream->wait_fd, &dummy, 1);
if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
ret = readlen;
- goto end;
+ goto error;
+ }
+ }
+
+ /*
+ * If the stream was flagged to be ready for rotation before we extract the
+ * next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before extracting data");
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
}
}
if (stream->metadata_flag) {
ret = commit_one_metadata_packet(stream);
if (ret <= 0) {
- goto end;
+ goto error;
}
ustctl_flush_buffer(stream->ustream, 1);
goto retry;
*/
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency) [ret: %d]", err);
- goto end;
+ goto error;
}
assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
if (ret < 0) {
err = ustctl_put_subbuf(ustream);
assert(err == 0);
- goto end;
+ goto error;
}
/* Update the stream's sequence and discarded events count. */
PERROR("kernctl_get_events_discarded");
err = ustctl_put_subbuf(ustream);
assert(err == 0);
- goto end;
+ goto error;
}
} else {
write_index = 0;
assert(len >= subbuf_size);
padding = len - subbuf_size;
+
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
/*
if (!stream->metadata_flag) {
ret = notify_if_more_data(stream, ctx);
if (ret < 0) {
- goto end;
+ goto error;
}
}
/* Write index if needed. */
if (!write_index) {
- goto end;
+ goto rotate;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
}
if (err < 0) {
- goto end;
+ goto error;
}
}
assert(!stream->metadata_flag);
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
- goto end;
+ goto error;
}
-end:
+rotate:
+ /*
+ * After extracting the packet, we check if the stream is now ready to be
+ * rotated and perform the action immediately.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
+ } else if (rotation_ret < 0) {
+ ERR("Checking if stream is ready to rotate");
+ ret = -1;
+ goto error;
+ }
+error:
return ret;
}
#include <errno.h>
#include <common/consumer/consumer.h>
+#include <stdbool.h>
#ifdef HAVE_LIBLTTNG_UST_CTL
extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx);
+ struct lttng_consumer_local_data *ctx, bool *rotated);
int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream);
static inline
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx, bool *rotated)
{
return -ENOSYS;
}