From: Jérémie Galarneau Date: Fri, 8 May 2020 20:00:11 +0000 (-0400) Subject: consumerd: move rotation logic to domain-agnostic read path X-Git-Tag: v2.11.5~28 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=ab1f27f2803120be93a7c148022d5c9c412e2b7b;p=lttng-tools.git consumerd: move rotation logic to domain-agnostic read path The "rotation ready" logic is duplicated in both user space and kernel specializations of the read subbuffer functions. It is moved to the domain-agnostic caller where it is needed only once. This makes it easier to implement a follow-up fix and reduces code duplication. Signed-off-by: Jérémie Galarneau Change-Id: Iae952a2cd52fa458cec956ae219492557e4adf79 --- diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index f3b042da4..b92909761 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3413,6 +3413,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotation_ret; pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); @@ -3420,6 +3421,19 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_lock(&stream->metadata_rdv_lock); } + /* + * If the stream was flagged to be ready for rotation before we extract + * the next packet, rotate it now. + */ + if (stream->rotate_ready) { + DBG("Rotate stream before consuming data"); + ret = lttng_consumer_rotate_stream(ctx, stream); + if (ret < 0) { + ERR("Stream rotation error before consuming data"); + goto end; + } + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: ret = lttng_kconsumer_read_subbuffer(stream, ctx); @@ -3435,13 +3449,38 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, break; } + if (ret < 0) { + goto end; + } + + /* + * After extracting the packet, we check if the stream is now ready to + * be rotated and perform the action immediately. + * + * Don't overwrite `ret` as callers expect the number of bytes + * consumed to be returned on success. + */ + rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); + if (rotation_ret == 1) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + if (rotation_ret < 0) { + ret = rotation_ret; + ERR("Stream rotation error after consuming data"); + goto end; + } + } else if (rotation_ret < 0) { + ret = rotation_ret; + ERR("Failed to check if stream was ready to rotate after consuming data"); + goto end; + } + +end: if (stream->metadata_flag) { pthread_cond_broadcast(&stream->metadata_rdv); pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); - return ret; } diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4fe37d3c9..6c87b8a62 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1583,26 +1583,14 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1; ssize_t ret = 0; int infd = stream->wait_fd; struct ctf_packet_index index = {}; + bool in_error_state = false; DBG("In read_subbuffer (infd : %d)", infd); - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); @@ -1788,11 +1776,13 @@ error_put_subbuf: } ret = err; goto error; + } else if (in_error_state) { + goto error; } /* Write index if needed. */ if (!write_index) { - goto rotate; + goto end; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -1825,25 +1815,7 @@ error_put_subbuf: goto error; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; - } - +end: error: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index db95179dc..553d44269 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2772,7 +2772,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2810,20 +2810,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } } - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } - retry: /* Get the next subbuffer */ err = ustctl_get_next_subbuf(ustream); @@ -2935,7 +2921,7 @@ error_put_subbuf: /* Write index if needed. */ if (!write_index) { - goto rotate; + goto end; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2970,24 +2956,7 @@ error_put_subbuf: goto error; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; - } +end: error: return ret; }