From: Jérémie Galarneau Date: Wed, 8 Jul 2020 18:57:40 +0000 (-0400) Subject: Clean-up: consumer: move open packet to post_consume X-Git-Tag: v2.13.0-rc1~596 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=503fefca8a1b82cfafccaa096e900c41bf4570f6;p=lttng-tools.git Clean-up: consumer: move open packet to post_consume Move the "open packet" step of read_subbuffer to a post-consume callback as this only needs to be done for data streams; it does not belong in the core of the read_subbuffer template method. Change-Id: Ia4d3f8f833e213a8d0e39bcf5ec766c2c05bcf80 Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 398d71ae0..8c21d6ae1 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -22,6 +22,7 @@ #include #include #include +#include #include "consumer-stream.h" @@ -409,6 +410,155 @@ end: return 0; } +static +bool stream_is_rotating_to_null_chunk( + const struct lttng_consumer_stream *stream) +{ + bool rotating_to_null_chunk = false; + + if (stream->rotate_position == -1ULL) { + /* No rotation ongoing. */ + goto end; + } + + if (stream->trace_chunk == stream->chan->trace_chunk || + !stream->chan->trace_chunk) { + rotating_to_null_chunk = true; + } +end: + return rotating_to_null_chunk; +} + +enum consumer_stream_open_packet_status consumer_stream_open_packet( + struct lttng_consumer_stream *stream) +{ + int ret; + enum consumer_stream_open_packet_status status; + unsigned long produced_pos_before, produced_pos_after; + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot( + stream, &produced_pos_before); + if (ret < 0) { + ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = consumer_stream_flush_buffer(stream, 0); + if (ret) { + ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_sample_snapshot_positions(stream); + if (ret < 0) { + ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after); + if (ret < 0) { + ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + status = CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR; + goto end; + } + + /* + * Determine if the flush had an effect by comparing the produced + * positons before and after the flush. + */ + status = produced_pos_before != produced_pos_after ? + CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED : + CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE; + if (status == CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED) { + stream->opened_packet_in_current_trace_chunk = true; + } + +end: + return status; +} + +/* + * An attempt to open a new packet is performed after a rotation completes to + * get a begin timestamp as close as possible to the rotation point. + * + * However, that initial attempt at opening a packet can fail due to a full + * ring-buffer. In that case, a second attempt is performed after consuming + * a packet since that will have freed enough space in the ring-buffer. + */ +static +int post_consume_open_new_packet(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + if (!stream->opened_packet_in_current_trace_chunk && + stream->trace_chunk && + !stream_is_rotating_to_null_chunk(stream)) { + const enum consumer_stream_open_packet_status status = + consumer_stream_open_packet(stream); + + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left. + * This means that new events were produced, resulting + * in a packet being opened, which is what we want + * anyhow. + */ + DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + stream->opened_packet_in_current_trace_chunk = true; + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end; + default: + abort(); + } + + stream->opened_packet_in_current_trace_chunk = true; + } + +end: + return ret; +} + struct lttng_consumer_stream *consumer_stream_create( struct lttng_consumer_channel *channel, uint64_t channel_key, @@ -506,6 +656,9 @@ struct lttng_consumer_stream *consumer_stream_create( rcu_read_unlock(); + lttng_dynamic_array_init(&stream->read_subbuffer_ops.post_consume_cbs, + sizeof(post_consume_cb), NULL); + if (type == CONSUMER_CHANNEL_TYPE_METADATA) { stream->read_subbuffer_ops.lock = consumer_stream_metadata_lock_all; @@ -514,18 +667,31 @@ struct lttng_consumer_stream *consumer_stream_create( stream->read_subbuffer_ops.pre_consume_subbuffer = metadata_stream_check_version; } else { + const post_consume_cb post_consume_index_op = channel->is_live ? + consumer_stream_sync_metadata_index : + consumer_stream_send_index; + + ret = lttng_dynamic_array_add_element( + &stream->read_subbuffer_ops.post_consume_cbs, + &post_consume_index_op); + if (ret) { + PERROR("Failed to add `send index` callback to stream's post consumption callbacks"); + goto error; + } + + ret = lttng_dynamic_array_add_element( + &stream->read_subbuffer_ops.post_consume_cbs, + &(post_consume_cb) { post_consume_open_new_packet }); + if (ret) { + PERROR("Failed to add `open new packet` callback to stream's post consumption callbacks"); + goto error; + } + stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; stream->read_subbuffer_ops.unlock = consumer_stream_data_unlock_all; stream->read_subbuffer_ops.pre_consume_subbuffer = consumer_stream_update_stats; - if (channel->is_live) { - stream->read_subbuffer_ops.post_consume = - consumer_stream_sync_metadata_index; - } else { - stream->read_subbuffer_ops.post_consume = - consumer_stream_send_index; - } } if (channel->output == CONSUMER_CHANNEL_MMAP) { @@ -541,6 +707,7 @@ struct lttng_consumer_stream *consumer_stream_create( error: rcu_read_unlock(); lttng_trace_chunk_put(stream->trace_chunk); + lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs); free(stream); end: if (alloc_ret) { @@ -852,6 +1019,7 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, /* Free stream within a RCU call. */ lttng_trace_chunk_put(stream->trace_chunk); stream->trace_chunk = NULL; + lttng_dynamic_array_reset(&stream->read_subbuffer_ops.post_consume_cbs); consumer_stream_free(stream); } @@ -1068,3 +1236,47 @@ void consumer_stream_metadata_set_version( metadata_bucket_reset(stream->metadata_bucket); } } + +int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, + bool producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + if (producer_active) { + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } else { + ret = kernctl_buffer_flush_empty(stream->wait_fd); + if (ret < 0) { + /* + * Doing a buffer flush which does not take into + * account empty packets. This is not perfect, + * but required as a fall-back when + * "flush_empty" is not implemented by + * lttng-modules. + */ + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + } + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_flush_buffer(stream, (int) producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index 50e402fe2..c9af63cd8 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -10,6 +10,12 @@ #include "consumer.h" +enum consumer_stream_open_packet_status { + CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED, + CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE, + CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR, +}; + /* * Create a consumer stream. * @@ -130,4 +136,34 @@ int consumer_stream_enable_metadata_bucketization( void consumer_stream_metadata_set_version( struct lttng_consumer_stream *stream, uint64_t new_version); +/* + * Set the version of a metadata stream (i.e. following a metadata + * regeneration). + * + * Changing the version of a metadata stream will cause any bucketized metadata + * to be discarded and will mark the metadata stream for future `reset`. + */ +void consumer_stream_metadata_set_version( + struct lttng_consumer_stream *stream, uint64_t new_version); + +/* + * Attempt to open a packet in a stream. + * + * This function must be called with the stream and channel locks held. + */ +enum consumer_stream_open_packet_status consumer_stream_open_packet( + struct lttng_consumer_stream *stream); + +/* + * Flush a stream's buffer. + * + * producer_active: if true, causes a flush to occur only if there is + * content present in the current sub-buffer. If false, forces a flush to take + * place (otherwise known as "flush_empty"). + * + * This function must be called with the stream and channel locks held. + */ +int consumer_stream_flush_buffer(struct lttng_consumer_stream *stream, + bool producer_active); + #endif /* LTTNG_CONSUMER_STREAM_H */ diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index be440e694..b2c5eb3dd 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -63,12 +63,6 @@ struct consumer_channel_msg { uint64_t key; /* del */ }; -enum open_packet_status { - OPEN_PACKET_STATUS_OPENED, - OPEN_PACKET_STATUS_NO_SPACE, - OPEN_PACKET_STATUS_ERROR, -}; - /* Flag used to temporarily pause data consumption from testpoints. */ int data_consumption_paused; @@ -3316,140 +3310,29 @@ error_testpoint: return NULL; } -static -int consumer_flush_buffer(struct lttng_consumer_stream *stream, - int producer_active) +static int post_consume(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) { + size_t i; int ret = 0; + const size_t count = lttng_dynamic_array_get_count( + &stream->read_subbuffer_ops.post_consume_cbs); - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - if (producer_active) { - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; - } - } else { - ret = kernctl_buffer_flush_empty(stream->wait_fd); - if (ret < 0) { - /* - * Doing a buffer flush which does not take into - * account empty packets. This is not perfect, - * but required as a fall-back when - * "flush_empty" is not implemented by - * lttng-modules. - */ - ret = kernctl_buffer_flush(stream->wait_fd); - if (ret < 0) { - ERR("Failed to flush kernel stream"); - goto end; - } - } + for (i = 0; i < count; i++) { + const post_consume_cb op = *(post_consume_cb *) lttng_dynamic_array_get_element( + &stream->read_subbuffer_ops.post_consume_cbs, + i); + + ret = op(stream, subbuffer, ctx); + if (ret) { + goto end; } - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - lttng_ustconsumer_flush_buffer(stream, producer_active); - break; - default: - ERR("Unknown consumer_data type"); - abort(); } - end: return ret; } -static enum open_packet_status open_packet(struct lttng_consumer_stream *stream) -{ - int ret; - enum open_packet_status status; - unsigned long produced_pos_before, produced_pos_after; - - ret = lttng_consumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Failed to snapshot positions before post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - status = OPEN_PACKET_STATUS_ERROR; - goto end; - } - - ret = lttng_consumer_get_produced_snapshot( - stream, &produced_pos_before); - if (ret < 0) { - ERR("Failed to read produced position before post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - status = OPEN_PACKET_STATUS_ERROR; - goto end; - } - - ret = consumer_flush_buffer(stream, 0); - if (ret) { - ERR("Failed to flush an empty packet at rotation point: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - status = OPEN_PACKET_STATUS_ERROR; - goto end; - } - - ret = lttng_consumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Failed to snapshot positions after post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - status = OPEN_PACKET_STATUS_ERROR; - goto end; - } - - ret = lttng_consumer_get_produced_snapshot(stream, &produced_pos_after); - if (ret < 0) { - ERR("Failed to read produced position after post-rotation empty packet flush: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - status = OPEN_PACKET_STATUS_ERROR; - goto end; - } - - /* - * Determine if the flush had an effect by comparing the produced - * positons before and after the flush. - */ - status = produced_pos_before != produced_pos_after ? - OPEN_PACKET_STATUS_OPENED : - OPEN_PACKET_STATUS_NO_SPACE; - if (status == OPEN_PACKET_STATUS_OPENED) { - stream->opened_packet_in_current_trace_chunk = true; - } -end: - return status; -} - -static bool stream_is_rotating_to_null_chunk( - const struct lttng_consumer_stream *stream) -{ - bool rotating_to_null_chunk = false; - - if (stream->rotate_position == -1ULL) { - /* No rotation ongoing. */ - goto end; - } - - if (stream->trace_chunk == stream->chan->trace_chunk || - !stream->chan->trace_chunk) { - rotating_to_null_chunk = true; - } -end: - return rotating_to_null_chunk; -} - ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx, bool locked_by_caller) @@ -3525,11 +3408,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } - if (stream->read_subbuffer_ops.post_consume) { - ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx); - if (ret) { - goto end; - } + ret = post_consume(stream, &subbuffer, ctx); + if (ret) { + goto end; } /* @@ -3547,50 +3428,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, ERR("Stream rotation error after consuming data"); goto end; } + } else if (rotation_ret < 0) { ret = rotation_ret; ERR("Failed to check if stream was ready to rotate after consuming data"); goto end; } - /* - * TODO roll into a post_consume op as this doesn't apply to metadata - * streams. - */ - if (!stream->opened_packet_in_current_trace_chunk && - stream->trace_chunk && !stream->metadata_flag && - !stream_is_rotating_to_null_chunk(stream)) { - const enum open_packet_status status = open_packet(stream); - - switch (status) { - case OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet after consuming a packet rotation: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - break; - case OPEN_PACKET_STATUS_NO_SPACE: - /* - * Can't open a packet as there is no space left. - * This means that new events were produced, resulting - * in a packet being opened, which is what we wanted - * anyhow. - */ - DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - stream->opened_packet_in_current_trace_chunk = true; - break; - case OPEN_PACKET_STATUS_ERROR: - /* Logged by callee. */ - ret = -1; - goto end; - default: - abort(); - } - } - sleep_stream: if (stream->read_subbuffer_ops.on_sleep) { stream->read_subbuffer_ops.on_sleep(stream, ctx); @@ -4148,7 +3992,8 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, * ensures we have at least one packet in each stream per trace * chunk, even if no data was produced. */ - ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0); + ret = consumer_stream_flush_buffer( + stream, stream->metadata_flag ? 1 : 0); if (ret < 0) { ERR("Failed to flush stream %" PRIu64 " during channel rotation", stream->key); @@ -4284,17 +4129,17 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, * "stamp" the beginning of the new trace chunk at the * earliest possible point. */ - const enum open_packet_status status = - open_packet(stream); + const enum consumer_stream_open_packet_status status = + consumer_stream_open_packet(stream); switch (status) { - case OPEN_PACKET_STATUS_OPENED: + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: DBG("Opened a packet after a rotation: stream id = %" PRIu64 ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); break; - case OPEN_PACKET_STATUS_NO_SPACE: + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: /* * Can't open a packet as there is no space left * in the buffer. A new packet will be opened @@ -4305,7 +4150,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, stream->key, stream->chan->name, stream->chan->session_id); break; - case OPEN_PACKET_STATUS_ERROR: + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: /* Logged by callee. */ ret = -1; goto end_unlock_stream; @@ -4412,7 +4257,7 @@ int consumer_clear_stream(struct lttng_consumer_stream *stream) { int ret; - ret = consumer_flush_buffer(stream, 1); + ret = consumer_stream_flush_buffer(stream, 1); if (ret < 0) { ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key); @@ -5184,29 +5029,29 @@ enum lttcomm_return_code lttng_consumer_open_channel_packets( rcu_read_lock(); cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - enum open_packet_status status; + enum consumer_stream_open_packet_status status; pthread_mutex_lock(&stream->lock); if (cds_lfht_is_node_deleted(&stream->node.node)) { goto next; } - status = open_packet(stream); + status = consumer_stream_open_packet(stream); switch (status) { - case OPEN_PACKET_STATUS_OPENED: + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64 ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); stream->opened_packet_in_current_trace_chunk = true; break; - case OPEN_PACKET_STATUS_NO_SPACE: + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64 ", channel name = %s, session id = %" PRIu64, stream->key, stream->chan->name, stream->chan->session_id); break; - case OPEN_PACKET_STATUS_ERROR: + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: /* * Only unexpected internal errors can lead to this * failing. Report an unknown error. diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 73189660c..b45f88b75 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -27,6 +27,7 @@ #include #include #include +#include struct lttng_consumer_local_data; @@ -636,7 +637,7 @@ struct lttng_consumer_stream { reset_metadata_cb reset_metadata; consume_subbuffer_cb consume_subbuffer; put_next_subbuffer_cb put_next_subbuffer; - post_consume_cb post_consume; + struct lttng_dynamic_array post_consume_cbs; send_live_beacon_cb send_live_beacon; on_sleep_cb on_sleep; unlock_cb unlock;