struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
+ int rotation_ret;
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&stream->metadata_rdv_lock);
}
+ /*
+ * If the stream was flagged to be ready for rotation before we extract
+ * the next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before consuming data");
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Stream rotation error before consuming data");
+ goto end;
+ }
+ }
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
ret = lttng_kconsumer_read_subbuffer(stream, ctx);
break;
}
+ if (ret < 0) {
+ goto end;
+ }
+
+ /*
+ * After extracting the packet, we check if the stream is now ready to
+ * be rotated and perform the action immediately.
+ *
+ * Don't overwrite `ret` as callers expect the number of bytes
+ * consumed to be returned on success.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (rotation_ret < 0) {
+ ret = rotation_ret;
+ ERR("Stream rotation error after consuming data");
+ goto end;
+ }
+ } else if (rotation_ret < 0) {
+ ret = rotation_ret;
+ ERR("Failed to check if stream was ready to rotate after consuming data");
+ goto end;
+ }
+
+end:
if (stream->metadata_flag) {
pthread_cond_broadcast(&stream->metadata_rdv);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
-
return ret;
}
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1, rotation_ret;
+ int err, write_index = 1;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct ctf_packet_index index = {};
+ bool in_error_state = false;
DBG("In read_subbuffer (infd : %d)", infd);
- /*
- * If the stream was flagged to be ready for rotation before we extract the
- * next packet, rotate it now.
- */
- if (stream->rotate_ready) {
- DBG("Rotate stream before extracting data");
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
- if (rotation_ret < 0) {
- ERR("Stream rotation error");
- ret = -1;
- goto error;
- }
- }
/* Get the next subbuffer */
err = kernctl_get_next_subbuf(infd);
}
ret = err;
goto error;
+ } else if (in_error_state) {
+ goto error;
}
/* Write index if needed. */
if (!write_index) {
- goto rotate;
+ goto end;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
goto error;
}
-rotate:
- /*
- * After extracting the packet, we check if the stream is now ready to be
- * rotated and perform the action immediately.
- */
- rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
- if (rotation_ret == 1) {
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
- if (rotation_ret < 0) {
- ERR("Stream rotation error");
- ret = -1;
- goto error;
- }
- } else if (rotation_ret < 0) {
- ERR("Checking if stream is ready to rotate");
- ret = -1;
- goto error;
- }
-
+end:
error:
return ret;
}
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1, rotation_ret;
+ int err, write_index = 1;
long ret = 0;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
}
}
- /*
- * If the stream was flagged to be ready for rotation before we extract the
- * next packet, rotate it now.
- */
- if (stream->rotate_ready) {
- DBG("Rotate stream before extracting data");
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
- if (rotation_ret < 0) {
- ERR("Stream rotation error");
- ret = -1;
- goto error;
- }
- }
-
retry:
/* Get the next subbuffer */
err = ustctl_get_next_subbuf(ustream);
/* Write index if needed. */
if (!write_index) {
- goto rotate;
+ goto end;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
goto error;
}
-rotate:
- /*
- * After extracting the packet, we check if the stream is now ready to be
- * rotated and perform the action immediately.
- */
- rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
- if (rotation_ret == 1) {
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
- if (rotation_ret < 0) {
- ERR("Stream rotation error");
- ret = -1;
- goto error;
- }
- } else if (rotation_ret < 0) {
- ERR("Checking if stream is ready to rotate");
- ret = -1;
- goto error;
- }
+end:
error:
return ret;
}