From: Julien Desfossez Date: Thu, 14 Dec 2017 19:15:57 +0000 (-0500) Subject: Consumer perform the rotation when extracting a packet X-Git-Tag: v2.11.0-rc1~349 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=02d02e31d47c091a38154c9c188c08387902d97b;p=lttng-tools.git Consumer perform the rotation when extracting a packet When the consumer reads a subbuffer, it checks if the stream needs to be rotated before or after writing the data. The post-rotation action must take place after we have released the stream lock, so we need to add a flag to the read_subbuffer functions to know if a rotation occurred while the stream lock was held. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 8e6b63b01..1ea14f8ca 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3392,6 +3392,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotate_ret; + bool rotated = false; pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { @@ -3400,11 +3402,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); + ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated); break; case LTTNG_CONSUMER32_UST: case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); + ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated); break; default: ERR("Unknown consumer_data type"); @@ -3418,6 +3420,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); + if (rotated) { + rotate_ret = consumer_post_rotation(stream, ctx); + if (rotate_ret < 0) { + ERR("Failed after a rotation"); + ret = -1; + } + } + return ret; } @@ -3903,6 +3913,54 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, return start_pos; } +/* + * Check if a stream is ready to be rotated after extracting it. + * + * Return 1 if it is ready for rotation, 0 if it is not, a negative value on + * error. Stream lock must be held. + */ +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) +{ + int ret; + unsigned long consumed_pos; + + if (!stream->rotate_position && !stream->rotate_ready) { + ret = 0; + goto end; + } + + if (stream->rotate_ready) { + ret = 1; + goto end; + } + + /* + * If we don't have the rotate_ready flag, check the consumed position + * to determine if we need to rotate. + */ + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Taking snapshot positions"); + goto end; + } + + ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumed snapshot position"); + goto end; + } + + /* Rotate position not reached yet (with check for overflow). */ + if ((long) (consumed_pos - stream->rotate_position) < 0) { + ret = 0; + goto end; + } + ret = 1; + +end: + return ret; +} + /* * Reset the state for a stream after a rotation occurred. */ @@ -4014,7 +4072,7 @@ end: * Return 0 on success, a negative number of error. */ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) + struct lttng_consumer_stream *stream, bool *rotated) { int ret; @@ -4058,6 +4116,10 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } lttng_consumer_reset_stream_rotate_state(stream); + if (rotated) { + *rotated = true; + } + ret = 0; error: diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 9e77ff999..181c8d8ec 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -430,6 +430,14 @@ struct lttng_consumer_stream { pthread_cond_t metadata_rdv; pthread_mutex_t metadata_rdv_lock; + /* + * rotate_position represents the position in the ring-buffer that has to + * be flushed to disk to complete the ongoing rotation. When that position + * is reached, this tracefile can be closed and a new one is created in + * channel_read_only_attributes.path. + */ + unsigned long rotate_position; + /* * Read-only copies of channel values. We cannot safely access the * channel from a stream, so we need to have a local copy of these @@ -441,6 +449,12 @@ struct lttng_consumer_stream { uint64_t tracefile_size; } channel_read_only_attributes; + /* + * Flag to inform the data or metadata thread that a stream is + * ready to be rotated. + */ + bool rotate_ready; + /* Indicate if the stream still has some data to be read. */ unsigned int has_data:1; /* @@ -803,8 +817,9 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream); + struct lttng_consumer_stream *stream, bool *rotated); int lttng_consumer_rotate_rename(const char *current_path, const char *new_path, uid_t uid, gid_t gid, uint64_t relayd_id); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f7704b465..7c01bc772 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -385,7 +385,7 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, do { health_code_update(); - ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx, NULL); if (ret_read < 0) { if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", @@ -771,8 +771,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, - new_stream->name, fd, new_stream->relayd_stream_id); + DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64, + new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_STREAMS_SENT: @@ -1406,16 +1406,30 @@ end: * Consume data on a file descriptor and write it on a trace file. */ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { unsigned long len, subbuf_size, padding; - int err, write_index = 1; + int err, write_index = 1, rotation_ret; ssize_t ret = 0; int infd = stream->wait_fd; struct ctf_packet_index index; DBG("In read_subbuffer (infd : %d)", infd); + /* + * If the stream was flagged to be ready for rotation before we extract the + * next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before extracting data"); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } + /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { @@ -1428,7 +1442,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); ret = err; - goto end; + goto error; } /* Get the full subbuffer size including padding */ @@ -1444,10 +1458,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } ret = err; - goto end; + goto error; } if (!stream->metadata_flag) { @@ -1462,9 +1476,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } ret = update_stream_stats(stream); if (ret < 0) { @@ -1477,9 +1491,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } } else { write_index = 0; @@ -1494,9 +1508,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } } @@ -1541,10 +1555,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } ret = err; - goto end; + goto error; } /* Make sure the tracer is not gone mad on us! */ @@ -1587,12 +1601,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -1616,16 +1630,35 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_timer_lock); } if (err < 0) { - goto end; + goto error; } } err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: +rotate: + /* + * After extracting the packet, we check if the stream is now ready to be + * rotated and perform the action immediately. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } else if (rotation_ret < 0) { + ERR("Checking if stream is ready to rotate"); + ret = -1; + goto error; + } + +error: return ret; } diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index fe6923678..2aee20327 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -20,6 +20,7 @@ #ifndef _LTTNG_KCONSUMER_H #define _LTTNG_KCONSUMER_H +#include #include int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream); @@ -32,7 +33,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, bool *rotated); int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream); int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 5e1ff896c..6030ca182 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -2594,10 +2595,10 @@ end: * Return 0 on success else a negative value. */ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { unsigned long len, subbuf_size, padding; - int err, write_index = 1; + int err, write_index = 1, rotation_ret; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2629,7 +2630,21 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, readlen = lttng_read(stream->wait_fd, &dummy, 1); if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; - goto end; + goto error; + } + } + + /* + * If the stream was flagged to be ready for rotation before we extract the + * next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before extracting data"); + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; } } @@ -2644,7 +2659,7 @@ retry: if (stream->metadata_flag) { ret = commit_one_metadata_packet(stream); if (ret <= 0) { - goto end; + goto error; } ustctl_flush_buffer(stream->ustream, 1); goto retry; @@ -2659,7 +2674,7 @@ retry: */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency) [ret: %d]", err); - goto end; + goto error; } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); @@ -2669,7 +2684,7 @@ retry: if (ret < 0) { err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } /* Update the stream's sequence and discarded events count. */ @@ -2678,7 +2693,7 @@ retry: PERROR("kernctl_get_events_discarded"); err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } } else { write_index = 0; @@ -2696,6 +2711,7 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); /* @@ -2727,13 +2743,13 @@ retry: if (!stream->metadata_flag) { ret = notify_if_more_data(stream, ctx); if (ret < 0) { - goto end; + goto error; } } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2758,17 +2774,35 @@ retry: } if (err < 0) { - goto end; + goto error; } } assert(!stream->metadata_flag); err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: +rotate: + /* + * After extracting the packet, we check if the stream is now ready to be + * rotated and perform the action immediately. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } + } else if (rotation_ret < 0) { + ERR("Checking if stream is ready to rotate"); + ret = -1; + goto error; + } +error: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index b29fe318b..b0e1c7d0f 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -22,6 +22,7 @@ #include #include +#include #ifdef HAVE_LIBLTTNG_UST_CTL @@ -44,7 +45,7 @@ extern int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream); extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream); int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, bool *rotated); int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream); void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream); @@ -157,7 +158,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) static inline int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, bool *rotated) { return -ENOSYS; }