From: Jérémie Galarneau Date: Sun, 10 May 2020 22:00:26 +0000 (-0400) Subject: consumerd: refactor: split read_subbuf into sub-operations X-Git-Tag: v2.13.0-rc1~632 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=6f9449c22;p=lttng-tools.git consumerd: refactor: split read_subbuf into sub-operations The read_subbuf code paths intertwine domain-specific operations and metadata/data-specific logic which makes modifications error prone and introduces a fair amount of code duplication. lttng_consumer_read_subbuffer is effectively turned into a template method invoking overridable callbacks making most of the consumption logic domain and data/metadata agnostic. The goal is not to extensively clean-up that code path. A follow-up fix introduces metadata buffering logic which would not reasonably fit in the current scheme. This clean-up makes it easier to safely introduce those changes. No changes in behaviour are intended by this change. Signed-off-by: Jérémie Galarneau Change-Id: I9366f2e2a38018ca9b617b93ad9259340180c55d --- diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index 8318d79d9..5dc380e5e 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include "consumer-stream.h" @@ -36,6 +38,509 @@ static void free_stream_rcu(struct rcu_head *head) free(stream); } +static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream) +{ + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->lock); +} + +static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream) +{ + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); +} + +static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream) +{ + consumer_stream_data_lock_all(stream); + pthread_mutex_lock(&stream->metadata_rdv_lock); +} + +static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream) +{ + pthread_mutex_unlock(&stream->metadata_rdv_lock); + consumer_stream_data_unlock_all(stream); +} + +/* Only used for data streams. */ +static int consumer_stream_update_stats(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuf) +{ + int ret = 0; + uint64_t sequence_number; + const uint64_t discarded_events = + LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number); + + if (!subbuf->info.data.sequence_number.is_set) { + /* Command not supported by the tracer. */ + sequence_number = -1ULL; + stream->sequence_number_unavailable = true; + } else { + sequence_number = subbuf->info.data.sequence_number.value; + } + + /* + * Start the sequence when we extract the first packet in case we don't + * start at 0 (for example if a consumer is not connected to the + * session immediately after the beginning). + */ + if (stream->last_sequence_number == -1ULL) { + stream->last_sequence_number = sequence_number; + } else if (sequence_number > stream->last_sequence_number) { + stream->chan->lost_packets += sequence_number - + stream->last_sequence_number - 1; + } else { + /* seq <= last_sequence_number */ + ERR("Sequence number inconsistent : prev = %" PRIu64 + ", current = %" PRIu64, + stream->last_sequence_number, sequence_number); + ret = -1; + goto end; + } + stream->last_sequence_number = sequence_number; + + if (discarded_events < stream->last_discarded_events) { + /* + * Overflow has occurred. We assume only one wrap-around + * has occurred. + */ + stream->chan->discarded_events += + (1ULL << (CAA_BITS_PER_LONG - 1)) - + stream->last_discarded_events + + discarded_events; + } else { + stream->chan->discarded_events += discarded_events - + stream->last_discarded_events; + } + stream->last_discarded_events = discarded_events; + ret = 0; + +end: + return ret; +} + +static +void ctf_packet_index_populate(struct ctf_packet_index *index, + off_t offset, const struct stream_subbuffer *subbuffer) +{ + *index = (typeof(*index)){ + .offset = htobe64(offset), + .packet_size = htobe64(subbuffer->info.data.packet_size), + .content_size = htobe64(subbuffer->info.data.content_size), + .timestamp_begin = htobe64( + subbuffer->info.data.timestamp_begin), + .timestamp_end = htobe64( + subbuffer->info.data.timestamp_end), + .events_discarded = htobe64( + subbuffer->info.data.events_discarded), + .stream_id = htobe64(subbuffer->info.data.stream_id), + .stream_instance_id = htobe64( + subbuffer->info.data.stream_instance_id.is_set ? + subbuffer->info.data.stream_instance_id.value : -1ULL), + .packet_seq_num = htobe64( + subbuffer->info.data.sequence_number.is_set ? + subbuffer->info.data.sequence_number.value : -1ULL), + }; +} + +static ssize_t consumer_stream_consume_mmap( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + const unsigned long padding_size = + subbuffer->info.data.padded_subbuf_size - + subbuffer->info.data.subbuf_size; + + return lttng_consumer_on_read_subbuffer_mmap( + ctx, stream, &subbuffer->buffer.buffer, padding_size); +} + +static ssize_t consumer_stream_consume_splice( + struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + return lttng_consumer_on_read_subbuffer_splice(ctx, stream, + subbuffer->info.data.padded_subbuf_size, 0); +} + +static int consumer_stream_send_index( + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) +{ + off_t packet_offset = 0; + struct ctf_packet_index index = {}; + + /* + * This is called after consuming the sub-buffer; substract the + * effect this sub-buffer from the offset. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL) { + packet_offset = stream->out_fd_offset - + subbuffer->info.data.padded_subbuf_size; + } + + ctf_packet_index_populate(&index, packet_offset, subbuffer); + return consumer_stream_write_index(stream, &index); +} + +/* + * Actually do the metadata sync using the given metadata stream. + * + * Return 0 on success else a negative value. ENODATA can be returned also + * indicating that there is no metadata available for that stream. + */ +static int do_sync_metadata(struct lttng_consumer_stream *metadata, + struct lttng_consumer_local_data *ctx) +{ + int ret; + + assert(metadata); + assert(metadata->metadata_flag); + assert(ctx); + + /* + * In UST, since we have to write the metadata from the cache packet + * by packet, we might need to start this procedure multiple times + * until all the metadata from the cache has been extracted. + */ + do { + /* + * Steps : + * - Lock the metadata stream + * - Check if metadata stream node was deleted before locking. + * - if yes, release and return success + * - Check if new metadata is ready (flush + snapshot pos) + * - If nothing : release and return. + * - Lock the metadata_rdv_lock + * - Unlock the metadata stream + * - cond_wait on metadata_rdv to wait the wakeup from the + * metadata thread + * - Unlock the metadata_rdv_lock + */ + pthread_mutex_lock(&metadata->lock); + + /* + * There is a possibility that we were able to acquire a reference on the + * stream from the RCU hash table but between then and now, the node might + * have been deleted just before the lock is acquired. Thus, after locking, + * we make sure the metadata node has not been deleted which means that the + * buffers are closed. + * + * In that case, there is no need to sync the metadata hence returning a + * success return code. + */ + ret = cds_lfht_is_node_deleted(&metadata->node.node); + if (ret) { + ret = 0; + goto end_unlock_mutex; + } + + switch (ctx->type) { + case LTTNG_CONSUMER_KERNEL: + /* + * Empty the metadata cache and flush the current stream. + */ + ret = lttng_kconsumer_sync_metadata(metadata); + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Ask the sessiond if we have new metadata waiting and update the + * consumer metadata cache. + */ + ret = lttng_ustconsumer_sync_metadata(ctx, metadata); + break; + default: + assert(0); + ret = -1; + break; + } + /* + * Error or no new metadata, we exit here. + */ + if (ret <= 0 || ret == ENODATA) { + goto end_unlock_mutex; + } + + /* + * At this point, new metadata have been flushed, so we wait on the + * rendez-vous point for the metadata thread to wake us up when it + * finishes consuming the metadata and continue execution. + */ + + pthread_mutex_lock(&metadata->metadata_rdv_lock); + + /* + * Release metadata stream lock so the metadata thread can process it. + */ + pthread_mutex_unlock(&metadata->lock); + + /* + * Wait on the rendez-vous point. Once woken up, it means the metadata was + * consumed and thus synchronization is achieved. + */ + pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); + pthread_mutex_unlock(&metadata->metadata_rdv_lock); + } while (ret == EAGAIN); + + /* Success */ + return 0; + +end_unlock_mutex: + pthread_mutex_unlock(&metadata->lock); + return ret; +} + +/* + * Synchronize the metadata using a given session ID. A successful acquisition + * of a metadata stream will trigger a request to the session daemon and a + * snapshot so the metadata thread can consume it. + * + * This function call is a rendez-vous point between the metadata thread and + * the data thread. + * + * Return 0 on success or else a negative value. + */ +int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, + uint64_t session_id) +{ + int ret; + struct lttng_consumer_stream *stream = NULL; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + + assert(ctx); + + /* Ease our life a bit. */ + ht = consumer_data.stream_list_ht; + + rcu_read_lock(); + + /* Search the metadata associated with the session id of the given stream. */ + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, + &session_id, &iter.iter, stream, node_session_id.node) { + if (!stream->metadata_flag) { + continue; + } + + ret = do_sync_metadata(stream, ctx); + if (ret < 0) { + goto end; + } + } + + /* + * Force return code to 0 (success) since ret might be ENODATA for instance + * which is not an error but rather that we should come back. + */ + ret = 0; + +end: + rcu_read_unlock(); + return ret; +} + +static int consumer_stream_sync_metadata_index( + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer, + struct lttng_consumer_local_data *ctx) +{ + int ret; + + /* Block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + + ret = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) stream->read_subbuffer_ops.send_live_beacon(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } + if (ret < 0) { + goto end; + } + + ret = consumer_stream_send_index(stream, subbuffer, ctx); +end: + return ret; +} + +/* + * Check if the local version of the metadata stream matches with the version + * of the metadata stream in the kernel. If it was updated, set the reset flag + * on the stream. + */ +static +int metadata_stream_check_version(struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + if (stream->metadata_version == subbuffer->info.metadata.version) { + goto end; + } + + DBG("New metadata version detected"); + stream->metadata_version = subbuffer->info.metadata.version; + stream->reset_metadata_flag = 1; + + if (stream->read_subbuffer_ops.reset_metadata) { + stream->read_subbuffer_ops.reset_metadata(stream); + } + +end: + return 0; +} + +struct lttng_consumer_stream *consumer_stream_create( + struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + const char *channel_name, + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor) +{ + int ret; + struct lttng_consumer_stream *stream; + + stream = zmalloc(sizeof(*stream)); + if (stream == NULL) { + PERROR("malloc struct lttng_consumer_stream"); + ret = -ENOMEM; + goto end; + } + + if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { + ERR("Failed to acquire trace chunk reference during the creation of a stream"); + ret = -1; + goto error; + } + + rcu_read_lock(); + stream->chan = channel; + stream->key = stream_key; + stream->trace_chunk = trace_chunk; + stream->out_fd = -1; + stream->out_fd_offset = 0; + stream->output_written = 0; + stream->net_seq_idx = relayd_id; + stream->session_id = session_id; + stream->monitor = monitor; + stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; + stream->index_file = NULL; + stream->last_sequence_number = -1ULL; + stream->rotate_position = -1ULL; + pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->metadata_timer_lock, NULL); + + /* If channel is the metadata, flag this stream as metadata. */ + if (type == CONSUMER_CHANNEL_TYPE_METADATA) { + stream->metadata_flag = 1; + /* Metadata is flat out. */ + strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); + /* Live rendez-vous point. */ + pthread_cond_init(&stream->metadata_rdv, NULL); + pthread_mutex_init(&stream->metadata_rdv_lock, NULL); + } else { + /* Format stream name to _ */ + ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", + channel_name, cpu); + if (ret < 0) { + PERROR("snprintf stream name"); + goto error; + } + } + + switch (channel->output) { + case CONSUMER_CHANNEL_SPLICE: + stream->output = LTTNG_EVENT_SPLICE; + ret = utils_create_pipe(stream->splice_pipe); + if (ret < 0) { + goto error; + } + break; + case CONSUMER_CHANNEL_MMAP: + stream->output = LTTNG_EVENT_MMAP; + break; + default: + abort(); + } + + /* Key is always the wait_fd for streams. */ + lttng_ht_node_init_u64(&stream->node, stream->key); + + /* Init node per channel id key */ + lttng_ht_node_init_u64(&stream->node_channel_id, channel_key); + + /* Init session id node with the stream session id */ + lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); + + DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 + " relayd_id %" PRIu64 ", session_id %" PRIu64, + stream->name, stream->key, channel_key, + stream->net_seq_idx, stream->session_id); + + rcu_read_unlock(); + + if (type == CONSUMER_CHANNEL_TYPE_METADATA) { + stream->read_subbuffer_ops.lock = + consumer_stream_metadata_lock_all; + stream->read_subbuffer_ops.unlock = + consumer_stream_metadata_unlock_all; + stream->read_subbuffer_ops.pre_consume_subbuffer = + metadata_stream_check_version; + } else { + stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all; + stream->read_subbuffer_ops.unlock = + consumer_stream_data_unlock_all; + stream->read_subbuffer_ops.pre_consume_subbuffer = + consumer_stream_update_stats; + if (channel->is_live) { + stream->read_subbuffer_ops.post_consume = + consumer_stream_sync_metadata_index; + } else { + stream->read_subbuffer_ops.post_consume = + consumer_stream_send_index; + } + } + + if (channel->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.consume_subbuffer = + consumer_stream_consume_mmap; + } else { + stream->read_subbuffer_ops.consume_subbuffer = + consumer_stream_consume_splice; + } + + return stream; + +error: + rcu_read_unlock(); + lttng_trace_chunk_put(stream->trace_chunk); + free(stream); +end: + if (alloc_ret) { + *alloc_ret = ret; + } + return NULL; +} + /* * Close stream on the relayd side. This call can destroy a relayd if the * conditions are met. @@ -393,165 +898,6 @@ error: return ret; } -/* - * Actually do the metadata sync using the given metadata stream. - * - * Return 0 on success else a negative value. ENODATA can be returned also - * indicating that there is no metadata available for that stream. - */ -static int do_sync_metadata(struct lttng_consumer_stream *metadata, - struct lttng_consumer_local_data *ctx) -{ - int ret; - - assert(metadata); - assert(metadata->metadata_flag); - assert(ctx); - - /* - * In UST, since we have to write the metadata from the cache packet - * by packet, we might need to start this procedure multiple times - * until all the metadata from the cache has been extracted. - */ - do { - /* - * Steps : - * - Lock the metadata stream - * - Check if metadata stream node was deleted before locking. - * - if yes, release and return success - * - Check if new metadata is ready (flush + snapshot pos) - * - If nothing : release and return. - * - Lock the metadata_rdv_lock - * - Unlock the metadata stream - * - cond_wait on metadata_rdv to wait the wakeup from the - * metadata thread - * - Unlock the metadata_rdv_lock - */ - pthread_mutex_lock(&metadata->lock); - - /* - * There is a possibility that we were able to acquire a reference on the - * stream from the RCU hash table but between then and now, the node might - * have been deleted just before the lock is acquired. Thus, after locking, - * we make sure the metadata node has not been deleted which means that the - * buffers are closed. - * - * In that case, there is no need to sync the metadata hence returning a - * success return code. - */ - ret = cds_lfht_is_node_deleted(&metadata->node.node); - if (ret) { - ret = 0; - goto end_unlock_mutex; - } - - switch (ctx->type) { - case LTTNG_CONSUMER_KERNEL: - /* - * Empty the metadata cache and flush the current stream. - */ - ret = lttng_kconsumer_sync_metadata(metadata); - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* - * Ask the sessiond if we have new metadata waiting and update the - * consumer metadata cache. - */ - ret = lttng_ustconsumer_sync_metadata(ctx, metadata); - break; - default: - assert(0); - ret = -1; - break; - } - /* - * Error or no new metadata, we exit here. - */ - if (ret <= 0 || ret == ENODATA) { - goto end_unlock_mutex; - } - - /* - * At this point, new metadata have been flushed, so we wait on the - * rendez-vous point for the metadata thread to wake us up when it - * finishes consuming the metadata and continue execution. - */ - - pthread_mutex_lock(&metadata->metadata_rdv_lock); - - /* - * Release metadata stream lock so the metadata thread can process it. - */ - pthread_mutex_unlock(&metadata->lock); - - /* - * Wait on the rendez-vous point. Once woken up, it means the metadata was - * consumed and thus synchronization is achieved. - */ - pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock); - pthread_mutex_unlock(&metadata->metadata_rdv_lock); - } while (ret == EAGAIN); - - /* Success */ - return 0; - -end_unlock_mutex: - pthread_mutex_unlock(&metadata->lock); - return ret; -} - -/* - * Synchronize the metadata using a given session ID. A successful acquisition - * of a metadata stream will trigger a request to the session daemon and a - * snapshot so the metadata thread can consume it. - * - * This function call is a rendez-vous point between the metadata thread and - * the data thread. - * - * Return 0 on success or else a negative value. - */ -int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx, - uint64_t session_id) -{ - int ret; - struct lttng_consumer_stream *stream = NULL; - struct lttng_ht_iter iter; - struct lttng_ht *ht; - - assert(ctx); - - /* Ease our life a bit. */ - ht = consumer_data.stream_list_ht; - - rcu_read_lock(); - - /* Search the metadata associated with the session id of the given stream. */ - - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct, - &session_id, &iter.iter, stream, node_session_id.node) { - if (!stream->metadata_flag) { - continue; - } - - ret = do_sync_metadata(stream, ctx); - if (ret < 0) { - goto end; - } - } - - /* - * Force return code to 0 (success) since ret might be ENODATA for instance - * which is not an error but rather that we should come back. - */ - ret = 0; - -end: - rcu_read_unlock(); - return ret; -} - int consumer_stream_create_output_files(struct lttng_consumer_stream *stream, bool create_index) { diff --git a/src/common/consumer/consumer-stream.h b/src/common/consumer/consumer-stream.h index 0e1827c4a..eb00dac78 100644 --- a/src/common/consumer/consumer-stream.h +++ b/src/common/consumer/consumer-stream.h @@ -10,6 +10,24 @@ #include "consumer.h" +/* + * Create a consumer stream. + * + * The channel lock MUST be acquired. + */ +struct lttng_consumer_stream *consumer_stream_create( + struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + const char *channel_name, + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor); + /* * Close stream's file descriptors and, if needed, close stream also on the * relayd side. diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index f13e90a68..5c211339d 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -7,6 +7,7 @@ * */ +#include "common/index/ctf-index.h" #define _LGPL_SOURCE #include #include @@ -568,98 +569,6 @@ void consumer_stream_update_channel_attributes( channel->tracefile_size; } -struct lttng_consumer_stream *consumer_allocate_stream( - struct lttng_consumer_channel *channel, - uint64_t channel_key, - uint64_t stream_key, - const char *channel_name, - uint64_t relayd_id, - uint64_t session_id, - struct lttng_trace_chunk *trace_chunk, - int cpu, - int *alloc_ret, - enum consumer_channel_type type, - unsigned int monitor) -{ - int ret; - struct lttng_consumer_stream *stream; - - stream = zmalloc(sizeof(*stream)); - if (stream == NULL) { - PERROR("malloc struct lttng_consumer_stream"); - ret = -ENOMEM; - goto end; - } - - if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) { - ERR("Failed to acquire trace chunk reference during the creation of a stream"); - ret = -1; - goto error; - } - - rcu_read_lock(); - stream->chan = channel; - stream->key = stream_key; - stream->trace_chunk = trace_chunk; - stream->out_fd = -1; - stream->out_fd_offset = 0; - stream->output_written = 0; - stream->net_seq_idx = relayd_id; - stream->session_id = session_id; - stream->monitor = monitor; - stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; - stream->index_file = NULL; - stream->last_sequence_number = -1ULL; - stream->rotate_position = -1ULL; - pthread_mutex_init(&stream->lock, NULL); - pthread_mutex_init(&stream->metadata_timer_lock, NULL); - - /* If channel is the metadata, flag this stream as metadata. */ - if (type == CONSUMER_CHANNEL_TYPE_METADATA) { - stream->metadata_flag = 1; - /* Metadata is flat out. */ - strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name)); - /* Live rendez-vous point. */ - pthread_cond_init(&stream->metadata_rdv, NULL); - pthread_mutex_init(&stream->metadata_rdv_lock, NULL); - } else { - /* Format stream name to _ */ - ret = snprintf(stream->name, sizeof(stream->name), "%s_%d", - channel_name, cpu); - if (ret < 0) { - PERROR("snprintf stream name"); - goto error; - } - } - - /* Key is always the wait_fd for streams. */ - lttng_ht_node_init_u64(&stream->node, stream->key); - - /* Init node per channel id key */ - lttng_ht_node_init_u64(&stream->node_channel_id, channel_key); - - /* Init session id node with the stream session id */ - lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id); - - DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 - " relayd_id %" PRIu64 ", session_id %" PRIu64, - stream->name, stream->key, channel_key, - stream->net_seq_idx, stream->session_id); - - rcu_read_unlock(); - return stream; - -error: - rcu_read_unlock(); - lttng_trace_chunk_put(stream->trace_chunk); - free(stream); -end: - if (alloc_ret) { - *alloc_ret = ret; - } - return NULL; -} - /* * Add a stream to the global list protected by a mutex. */ @@ -1467,7 +1376,7 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx), + struct lttng_consumer_local_data *ctx, bool locked_by_caller), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(uint64_t stream_key, uint32_t state)) @@ -1677,8 +1586,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, - unsigned long padding, - struct ctf_packet_index *index) + unsigned long padding) { ssize_t ret = 0; off_t orig_offset = stream->out_fd_offset; @@ -1770,10 +1678,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( orig_offset = 0; } stream->tracefile_size_current += buffer->size; - if (index) { - index->offset = htobe64(stream->out_fd_offset); - } - write_len = buffer->size; } @@ -1850,8 +1754,7 @@ end: ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index) + unsigned long padding) { ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; @@ -1955,7 +1858,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( orig_offset = 0; } stream->tracefile_size_current += len; - index->offset = htobe64(stream->out_fd_offset); } while (len > 0) { @@ -2522,7 +2424,7 @@ restart: do { health_code_update(); - len = ctx->on_buffer_ready(stream, ctx); + len = ctx->on_buffer_ready(stream, ctx, false); /* * We don't check the return value here since if we get * a negative len, it means an error occurred thus we @@ -2549,7 +2451,7 @@ restart: do { health_code_update(); - len = ctx->on_buffer_ready(stream, ctx); + len = ctx->on_buffer_ready(stream, ctx, false); /* * We don't check the return value here since if we get * a negative len, it means an error occurred thus we @@ -2765,7 +2667,7 @@ void *consumer_thread_data_poll(void *data) if (pollfd[i].revents & POLLPRI) { DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; - len = ctx->on_buffer_ready(local_stream[i], ctx); + len = ctx->on_buffer_ready(local_stream[i], ctx, false); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ @@ -2796,7 +2698,7 @@ void *consumer_thread_data_poll(void *data) local_stream[i]->hangup_flush_done || local_stream[i]->has_data) { DBG("Normal read on fd %d", pollfd[i].fd); - len = ctx->on_buffer_ready(local_stream[i], ctx); + len = ctx->on_buffer_ready(local_stream[i], ctx, false); /* it's ok to have an unavailable sub-buffer */ if (len < 0 && len != -EAGAIN && len != -ENODATA) { /* Clean the stream and free it. */ @@ -3410,15 +3312,22 @@ error_testpoint: } ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, + bool locked_by_caller) { - ssize_t ret; + ssize_t ret, written_bytes; int rotation_ret; + struct stream_subbuffer subbuffer = {}; - pthread_mutex_lock(&stream->chan->lock); - pthread_mutex_lock(&stream->lock); - if (stream->metadata_flag) { - pthread_mutex_lock(&stream->metadata_rdv_lock); + if (!locked_by_caller) { + stream->read_subbuffer_ops.lock(stream); + } + + if (stream->read_subbuffer_ops.on_wake_up) { + ret = stream->read_subbuffer_ops.on_wake_up(stream); + if (ret) { + goto end; + } } /* @@ -3434,25 +3343,55 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, } } - switch (consumer_data.type) { - case LTTNG_CONSUMER_KERNEL: - ret = lttng_kconsumer_read_subbuffer(stream, ctx); - break; - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - ret = lttng_ustconsumer_read_subbuffer(stream, ctx); - break; - default: - ERR("Unknown consumer_data type"); - assert(0); - ret = -ENOSYS; - break; + ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer); + if (ret) { + if (ret == -ENODATA) { + /* Not an error. */ + ret = 0; + } + goto end; } - if (ret < 0) { + ret = stream->read_subbuffer_ops.pre_consume_subbuffer( + stream, &subbuffer); + if (ret) { + goto error_put_subbuf; + } + + written_bytes = stream->read_subbuffer_ops.consume_subbuffer( + ctx, stream, &subbuffer); + /* + * Should write subbuf_size amount of data when network streaming or + * the full padded size when we are not streaming. + */ + if ((written_bytes != subbuffer.info.data.subbuf_size && + stream->net_seq_idx != (uint64_t) -1ULL) || + (written_bytes != subbuffer.info.data.padded_subbuf_size && + stream->net_seq_idx == + (uint64_t) -1ULL)) { + /* + * Display the error but continue processing to try to + * release the subbuffer. This is a DBG statement + * since this can happen without being a critical + * error. + */ + DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)", + written_bytes, subbuffer.info.data.subbuf_size, + subbuffer.info.data.padded_subbuf_size); + } + + ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer); + if (ret) { goto end; } + if (stream->read_subbuffer_ops.post_consume) { + ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx); + if (ret) { + goto end; + } + } + /* * After extracting the packet, we check if the stream is now ready to * be rotated and perform the action immediately. @@ -3474,14 +3413,20 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } + if (stream->read_subbuffer_ops.on_sleep) { + stream->read_subbuffer_ops.on_sleep(stream, ctx); + } + + ret = written_bytes; end: - if (stream->metadata_flag) { - pthread_cond_broadcast(&stream->metadata_rdv); - pthread_mutex_unlock(&stream->metadata_rdv_lock); + if (!locked_by_caller) { + stream->read_subbuffer_ops.unlock(stream); } - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); + return ret; +error_put_subbuf: + (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer); + goto end; } int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 000040982..aa8a401a0 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -28,6 +28,8 @@ #include #include +struct lttng_consumer_local_data; + /* Commands for consumer */ enum lttng_consumer_command { LTTNG_CONSUMER_ADD_CHANNEL, @@ -244,6 +246,142 @@ struct lttng_consumer_channel { bool streams_sent_to_relayd; }; +struct stream_subbuffer { + union { + /* + * CONSUMER_CHANNEL_SPLICE + * No ownership assumed. + */ + int fd; + /* CONSUMER_CHANNEL_MMAP */ + struct lttng_buffer_view buffer; + } buffer; + union { + /* + * Common members are fine to access through either + * union entries (as per C11, Common Initial Sequence). + */ + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t version; + } metadata; + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) sequence_number; + uint64_t stream_id; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) stream_instance_id; + } data; + } info; +}; + +/* + * Perform any operation required to acknowledge + * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe). + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); + +/* + * Perform any operation required before a consumer stream is put + * to sleep before awaiting a data availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_sleep_cb)(struct lttng_consumer_stream *, + struct lttng_consumer_local_data *); + +/* + * Acquire the subbuffer at the current 'consumed' position. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Populate the stream_subbuffer's info member. The info to populate + * depends on the type (metadata/data) of the stream. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*extract_subbuffer_info_cb)( + struct lttng_consumer_stream *, struct stream_subbuffer *); + +/* + * Invoked after a subbuffer's info has been filled. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Consume subbuffer contents. + * + * Stream and channel locks are acquired during this call. + */ +typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, + struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Release the current subbuffer and advance the 'consumed' position by + * one subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Invoked after consuming a subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*post_consume_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *, + struct lttng_consumer_local_data *); + +/* + * Send a live beacon if no data is available. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *); + +/* + * Lock the stream and channel locks and any other stream-type specific + * lock that need to be acquired during the processing of an + * availability notification. + */ +typedef void (*lock_cb)(struct lttng_consumer_stream *); + +/* + * Unlock the stream and channel locks and any other stream-type specific + * lock before sleeping until the next availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*unlock_cb)(struct lttng_consumer_stream *); + +/* + * Invoked when a subbuffer's metadata version does not match the last + * known metadata version. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *); + /* * Internal representation of the streams, sessiond_key is used to identify * uniquely a stream. @@ -467,6 +605,24 @@ struct lttng_consumer_stream { * file before writing in it (regeneration). */ unsigned int reset_metadata_flag:1; + struct { + /* + * Invoked in the order of declaration. + * See callback type definitions. + */ + lock_cb lock; + on_wake_up_cb on_wake_up; + get_next_subbuffer_cb get_next_subbuffer; + extract_subbuffer_info_cb extract_subbuffer_info; + pre_consume_subbuffer_cb pre_consume_subbuffer; + reset_metadata_cb reset_metadata; + consume_subbuffer_cb consume_subbuffer; + put_next_subbuffer_cb put_next_subbuffer; + post_consume_cb post_consume; + send_live_beacon_cb send_live_beacon; + on_sleep_cb on_sleep; + unlock_cb unlock; + } read_subbuffer_ops; }; /* @@ -523,7 +679,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -790,7 +947,8 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht); struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx), + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(uint64_t sessiond_key, uint32_t state)); @@ -799,13 +957,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, const struct lttng_buffer_view *buffer, - unsigned long padding, - struct ctf_packet_index *index); + unsigned long padding); ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index); + unsigned long padding); int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, @@ -822,7 +978,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4a2e4e07d..e06f3b3e0 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -7,8 +7,6 @@ * */ -#include "common/buffer-view.h" -#include #define _LGPL_SOURCE #include #include @@ -36,6 +34,9 @@ #include #include #include +#include +#include +#include #include "kernel-consumer.h" @@ -288,7 +289,7 @@ static int lttng_kconsumer_snapshot_channel( subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, &subbuf_view, - padded_len - len, NULL); + padded_len - len); /* * We write the padded len in local tracefiles but the data len * when using a relay. Display the error but continue processing @@ -399,7 +400,7 @@ static int lttng_kconsumer_snapshot_metadata( do { health_code_update(); - ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret_read < 0) { if (ret_read != -EAGAIN) { ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", @@ -655,7 +656,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); pthread_mutex_lock(&channel->lock); - new_stream = consumer_allocate_stream( + new_stream = consumer_stream_create( channel, channel->key, fd, @@ -690,23 +691,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_stream_update_channel_attributes(new_stream, channel); - switch (channel->output) { - case CONSUMER_CHANNEL_SPLICE: - new_stream->output = LTTNG_EVENT_SPLICE; - ret = utils_create_pipe(new_stream->splice_pipe); - if (ret < 0) { - pthread_mutex_unlock(&channel->lock); - goto error_add_stream_nosignal; - } - break; - case CONSUMER_CHANNEL_MMAP: - new_stream->output = LTTNG_EVENT_MMAP; - break; - default: - ERR("Stream output unknown %d", channel->output); - pthread_mutex_unlock(&channel->lock); - goto error_add_stream_nosignal; - } /* * We've just assigned the channel to the stream so increment the @@ -1358,93 +1342,6 @@ end: return ret; } -/* - * Populate index values of a kernel stream. Values are set in big endian order. - * - * Return 0 on success or else a negative value. - */ -static int get_index_values(struct ctf_packet_index *index, int infd) -{ - int ret; - uint64_t packet_size, content_size, timestamp_begin, timestamp_end, - events_discarded, stream_id, stream_instance_id, - packet_seq_num; - - ret = kernctl_get_timestamp_begin(infd, ×tamp_begin); - if (ret < 0) { - PERROR("kernctl_get_timestamp_begin"); - goto error; - } - - ret = kernctl_get_timestamp_end(infd, ×tamp_end); - if (ret < 0) { - PERROR("kernctl_get_timestamp_end"); - goto error; - } - - ret = kernctl_get_events_discarded(infd, &events_discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); - goto error; - } - - ret = kernctl_get_content_size(infd, &content_size); - if (ret < 0) { - PERROR("kernctl_get_content_size"); - goto error; - } - - ret = kernctl_get_packet_size(infd, &packet_size); - if (ret < 0) { - PERROR("kernctl_get_packet_size"); - goto error; - } - - ret = kernctl_get_stream_id(infd, &stream_id); - if (ret < 0) { - PERROR("kernctl_get_stream_id"); - goto error; - } - - ret = kernctl_get_instance_id(infd, &stream_instance_id); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - stream_instance_id = -1ULL; - } else { - PERROR("kernctl_get_instance_id"); - goto error; - } - } - - ret = kernctl_get_sequence_number(infd, &packet_seq_num); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - packet_seq_num = -1ULL; - ret = 0; - } else { - PERROR("kernctl_get_sequence_number"); - goto error; - } - } - index->packet_seq_num = htobe64(index->packet_seq_num); - - *index = (typeof(*index)) { - .offset = index->offset, - .packet_size = htobe64(packet_size), - .content_size = htobe64(content_size), - .timestamp_begin = htobe64(timestamp_begin), - .timestamp_end = htobe64(timestamp_end), - .events_discarded = htobe64(events_discarded), - .stream_id = htobe64(stream_id), - .stream_instance_id = htobe64(stream_instance_id), - .packet_seq_num = htobe64(packet_seq_num), - }; - -error: - return ret; -} /* * Sync metadata meaning request them to the session daemon and snapshot to the * metadata thread can consumer them. @@ -1483,348 +1380,226 @@ end: } static -int update_stream_stats(struct lttng_consumer_stream *stream) +int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - uint64_t seq, discarded; - - ret = kernctl_get_sequence_number(stream->wait_fd, &seq); - if (ret < 0) { - if (ret == -ENOTTY) { - /* Command not implemented by lttng-modules. */ - seq = -1ULL; - stream->sequence_number_unavailable = true; - } else { - PERROR("kernctl_get_sequence_number"); - goto end; - } - } - /* - * Start the sequence when we extract the first packet in case we don't - * start at 0 (for example if a consumer is not connected to the - * session immediately after the beginning). - */ - if (stream->last_sequence_number == -1ULL) { - stream->last_sequence_number = seq; - } else if (seq > stream->last_sequence_number) { - stream->chan->lost_packets += seq - - stream->last_sequence_number - 1; - } else { - /* seq <= last_sequence_number */ - ERR("Sequence number inconsistent : prev = %" PRIu64 - ", current = %" PRIu64, - stream->last_sequence_number, seq); - ret = -1; + ret = kernctl_get_subbuf_size( + stream->wait_fd, &subbuf->info.data.subbuf_size); + if (ret) { goto end; } - stream->last_sequence_number = seq; - ret = kernctl_get_events_discarded(stream->wait_fd, &discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); + ret = kernctl_get_padded_subbuf_size( + stream->wait_fd, &subbuf->info.data.padded_subbuf_size); + if (ret) { goto end; } - if (discarded < stream->last_discarded_events) { - /* - * Overflow has occurred. We assume only one wrap-around - * has occurred. - */ - stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) - - stream->last_discarded_events + discarded; - } else { - stream->chan->discarded_events += discarded - - stream->last_discarded_events; - } - stream->last_discarded_events = discarded; - ret = 0; end: return ret; } -/* - * Check if the local version of the metadata stream matches with the version - * of the metadata stream in the kernel. If it was updated, set the reset flag - * on the stream. - */ static -int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream) +int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { int ret; - uint64_t cur_version; - ret = kernctl_get_metadata_version(infd, &cur_version); - if (ret < 0) { - if (ret == -ENOTTY) { - /* - * LTTng-modules does not implement this - * command. - */ - ret = 0; - goto end; - } - ERR("Failed to get the metadata version"); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { goto end; } - if (stream->metadata_version == cur_version) { - ret = 0; + ret = kernctl_get_metadata_version( + stream->wait_fd, &subbuf->info.metadata.version); + if (ret) { goto end; } - DBG("New metadata version detected"); - stream->metadata_version = cur_version; - stream->reset_metadata_flag = 1; - ret = 0; - end: return ret; } -/* - * Consume data on a file descriptor and write it on a trace file. - * The stream and channel locks must be held by the caller. - */ -ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) +static +int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { - unsigned long len, subbuf_size, padding; - int err, write_index = 1; - ssize_t ret = 0; - int infd = stream->wait_fd; - struct ctf_packet_index index = {}; - bool in_error_state = false; + int ret; - DBG("In read_subbuffer (infd : %d)", infd); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } + ret = kernctl_get_packet_size( + stream->wait_fd, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); + goto end; + } - /* Get the next subbuffer */ - err = kernctl_get_next_subbuf(infd); - if (err != 0) { - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency)"); - ret = err; - goto error; + ret = kernctl_get_content_size( + stream->wait_fd, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); + goto end; } - /* Get the full subbuffer size including padding */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - PERROR("Getting sub-buffer len failed."); - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - ret = err; - goto error; + ret = kernctl_get_timestamp_begin( + stream->wait_fd, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; } - if (!stream->metadata_flag) { - ret = get_index_values(&index, infd); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - goto error; - } - ret = update_stream_stats(stream); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - goto error; + ret = kernctl_get_timestamp_end( + stream->wait_fd, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = kernctl_get_events_discarded( + stream->wait_fd, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = kernctl_get_sequence_number(stream->wait_fd, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); + goto end; } } else { - write_index = 0; - ret = metadata_stream_check_version(infd, stream); - if (ret < 0) { - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - goto error; - } + subbuf->info.data.sequence_number.is_set = true; } - switch (stream->chan->output) { - case CONSUMER_CHANNEL_SPLICE: - /* - * XXX: The lttng-modules splice "actor" does not handle copying - * partial pages hence only using the subbuffer size without the - * padding makes the splice fail. - */ - subbuf_size = len; - padding = 0; + ret = kernctl_get_stream_id( + stream->wait_fd, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, - padding, &index); - /* - * XXX: Splice does not support network streaming so the return value - * is simply checked against subbuf_size and not like the mmap() op. - */ - if (ret != subbuf_size) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %zd != len: %lu)", - ret, subbuf_size); - write_index = 0; - } - break; - case CONSUMER_CHANNEL_MMAP: - { - const char *subbuf_addr; - struct lttng_buffer_view subbuf_view; - - /* Get subbuffer size without padding */ - err = kernctl_get_subbuf_size(infd, &subbuf_size); - if (err != 0) { - PERROR("Getting sub-buffer len failed."); - err = kernctl_put_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } - ret = err; - goto error; + ret = kernctl_get_instance_id(stream->wait_fd, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); + goto end; } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} - ret = get_current_subbuf_addr(stream, &subbuf_addr); - if (ret) { - goto error_put_subbuf; - } +static +int get_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - /* Make sure the tracer is not gone mad on us! */ - assert(len >= subbuf_size); + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret) { + goto end; + } - padding = len - subbuf_size; + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); +end: + return ret; +} - subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); +static +int get_next_subbuffer_splice(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuf_view, padding, &index); - /* - * The mmap operation should write subbuf_size amount of data - * when network streaming or the full padding (len) size when we - * are _not_ streaming. - */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { - /* - * Display the error but continue processing to try to release the - * subbuffer. This is a DBG statement since this is possible to - * happen without being a critical error. - */ - DBG("Error writing to tracefile " - "(ret: %zd != len: %lu != subbuf_size: %lu)", - ret, len, subbuf_size); - write_index = 0; - } - break; - } - default: - ERR("Unknown output method"); - ret = -EPERM; + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; } -error_put_subbuf: - err = kernctl_put_next_subbuf(infd); - if (err != 0) { - if (err == -EFAULT) { - PERROR("Error in unreserving sub buffer\n"); - } else if (err == -EIO) { - /* Should never happen with newer LTTng versions */ - PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); - } - ret = err; - goto error; - } else if (in_error_state) { - goto error; + + subbuffer->buffer.fd = stream->wait_fd; +end: + return ret; +} + +static +int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + + ret = get_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; } - /* Write index if needed. */ - if (!write_index) { + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { goto end; } - if (stream->chan->live_timer_interval && !stream->metadata_flag) { - /* - * In live, block until all the metadata is sent. - */ - pthread_mutex_lock(&stream->metadata_timer_lock); - assert(!stream->missed_metadata_flush); - stream->waiting_on_metadata = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - - err = consumer_stream_sync_metadata(ctx, stream->session_id); - - pthread_mutex_lock(&stream->metadata_timer_lock); - stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { - stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_kernel_index(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); - } - if (err < 0) { - goto error; + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); +end: + return ret; +} + +static +int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = kernctl_put_next_subbuf(stream->wait_fd); + + if (ret) { + if (ret == -EFAULT) { + PERROR("Error in unreserving sub buffer"); + } else if (ret == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted"); } } - err = consumer_stream_write_index(stream, &index); - if (err < 0) { - goto error; + return ret; +} + +static void lttng_kconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; } -end: -error: - return ret; + if (stream->metadata_flag) { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + } else { + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_kernel_index; + } + } + + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; } int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -1865,6 +1640,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } + lttng_kconsumer_set_stream_ops(stream); + /* we return 0 to let the library handle the FD internally */ return 0; diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index b397b9cd3..48c787a46 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -22,8 +22,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); -ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream); int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream); int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 2f09eac80..1af9840cd 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -7,7 +7,6 @@ * */ -#include #define _LGPL_SOURCE #include #include @@ -24,6 +23,7 @@ #include #include #include +#include #include #include @@ -36,6 +36,7 @@ #include #include #include +#include #include "ust-consumer.h" @@ -127,7 +128,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, assert(channel); assert(ctx); - stream = consumer_allocate_stream( + stream = consumer_stream_create( channel, channel->key, key, @@ -1040,7 +1041,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, do { health_code_update(); - ret = lttng_consumer_read_subbuffer(metadata_stream, ctx); + ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret < 0) { goto error_stream; } @@ -1230,8 +1231,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, - stream, &subbuf_view, padded_len - len, - NULL); + stream, &subbuf_view, padded_len - len); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -2428,115 +2428,16 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) return ustctl_stream_close_wakeup_fd(stream->ustream); } -/* - * Populate index values of a UST stream. Values are set in big endian order. - * - * Return 0 on success or else a negative value. - */ -static int get_index_values(struct ctf_packet_index *index, - struct ustctl_consumer_stream *ustream) -{ - int ret; - uint64_t packet_size, content_size, timestamp_begin, timestamp_end, - events_discarded, stream_id, stream_instance_id, - packet_seq_num; - - ret = ustctl_get_timestamp_begin(ustream, ×tamp_begin); - if (ret < 0) { - PERROR("ustctl_get_timestamp_begin"); - goto error; - } - - ret = ustctl_get_timestamp_end(ustream, ×tamp_end); - if (ret < 0) { - PERROR("ustctl_get_timestamp_end"); - goto error; - } - - ret = ustctl_get_events_discarded(ustream, &events_discarded); - if (ret < 0) { - PERROR("ustctl_get_events_discarded"); - goto error; - } - - ret = ustctl_get_content_size(ustream, &content_size); - if (ret < 0) { - PERROR("ustctl_get_content_size"); - goto error; - } - - ret = ustctl_get_packet_size(ustream, &packet_size); - if (ret < 0) { - PERROR("ustctl_get_packet_size"); - goto error; - } - - ret = ustctl_get_stream_id(ustream, &stream_id); - if (ret < 0) { - PERROR("ustctl_get_stream_id"); - goto error; - } - - ret = ustctl_get_instance_id(ustream, &stream_instance_id); - if (ret < 0) { - PERROR("ustctl_get_instance_id"); - goto error; - } - - ret = ustctl_get_sequence_number(ustream, &packet_seq_num); - if (ret < 0) { - PERROR("ustctl_get_sequence_number"); - goto error; - } - - *index = (typeof(*index)) { - .offset = index->offset, - .packet_size = htobe64(packet_size), - .content_size = htobe64(content_size), - .timestamp_begin = htobe64(timestamp_begin), - .timestamp_end = htobe64(timestamp_end), - .events_discarded = htobe64(events_discarded), - .stream_id = htobe64(stream_id), - .stream_instance_id = htobe64(stream_instance_id), - .packet_seq_num = htobe64(packet_seq_num), - }; - -error: - return ret; -} - static -void metadata_stream_reset_cache(struct lttng_consumer_stream *stream, - struct consumer_metadata_cache *cache) +void metadata_stream_reset_cache(struct lttng_consumer_stream *stream) { - DBG("Metadata stream update to version %" PRIu64, - cache->version); + DBG("Reset metadata cache of session %" PRIu64, + stream->chan->session_id); stream->ust_metadata_pushed = 0; - stream->metadata_version = cache->version; + stream->metadata_version = stream->chan->metadata_cache->version; stream->reset_metadata_flag = 1; } -/* - * Check if the version of the metadata stream and metadata cache match. - * If the cache got updated, reset the metadata stream. - * The stream lock and metadata cache lock MUST be held. - * Return 0 on success, a negative value on error. - */ -static -int metadata_stream_check_version(struct lttng_consumer_stream *stream) -{ - int ret = 0; - struct consumer_metadata_cache *cache = stream->chan->metadata_cache; - - if (cache->version == stream->metadata_version) { - goto end; - } - metadata_stream_reset_cache(stream, cache); - -end: - return ret; -} - /* * Write up to one packet from the metadata cache to the channel. * @@ -2550,10 +2451,6 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - ret = metadata_stream_check_version(stream); - if (ret < 0) { - goto end; - } if (stream->chan->metadata_cache->max_offset == stream->ust_metadata_pushed) { ret = 0; @@ -2723,261 +2620,264 @@ end: return ret; } -static -int update_stream_stats(struct lttng_consumer_stream *stream) +static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream) { - int ret; - uint64_t seq, discarded; + int ret = 0; - ret = ustctl_get_sequence_number(stream->ustream, &seq); - if (ret < 0) { - PERROR("ustctl_get_sequence_number"); - goto end; - } /* - * Start the sequence when we extract the first packet in case we don't - * start at 0 (for example if a consumer is not connected to the - * session immediately after the beginning). + * We can consume the 1 byte written into the wait_fd by + * UST. Don't trigger error if we cannot read this one byte + * (read returns 0), or if the error is EAGAIN or EWOULDBLOCK. + * + * This is only done when the stream is monitored by a thread, + * before the flush is done after a hangup and if the stream + * is not flagged with data since there might be nothing to + * consume in the wait fd but still have data available + * flagged by the consumer wake up pipe. */ - if (stream->last_sequence_number == -1ULL) { - stream->last_sequence_number = seq; - } else if (seq > stream->last_sequence_number) { - stream->chan->lost_packets += seq - - stream->last_sequence_number - 1; - } else { - /* seq <= last_sequence_number */ - ERR("Sequence number inconsistent : prev = %" PRIu64 - ", current = %" PRIu64, - stream->last_sequence_number, seq); - ret = -1; - goto end; + if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { + char dummy; + ssize_t readlen; + + readlen = lttng_read(stream->wait_fd, &dummy, 1); + if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + ret = readlen; + } } - stream->last_sequence_number = seq; - ret = ustctl_get_events_discarded(stream->ustream, &discarded); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); + return ret; +} + +static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; + + ret = ustctl_get_subbuf_size( + stream->ustream, &subbuf->info.data.subbuf_size); + if (ret) { goto end; } - if (discarded < stream->last_discarded_events) { - /* - * Overflow has occurred. We assume only one wrap-around - * has occurred. - */ - stream->chan->discarded_events += - (1ULL << (CAA_BITS_PER_LONG - 1)) - - stream->last_discarded_events + discarded; - } else { - stream->chan->discarded_events += discarded - - stream->last_discarded_events; + + ret = ustctl_get_padded_subbuf_size( + stream->ustream, &subbuf->info.data.padded_subbuf_size); + if (ret) { + goto end; } - stream->last_discarded_events = discarded; - ret = 0; end: return ret; } -/* - * Read subbuffer from the given stream. - * - * Stream and channel locks MUST be acquired by the caller. - * - * Return 0 on success else a negative value. - */ -int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) +static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) { - unsigned long len, subbuf_size, padding; - int err, write_index = 1; - long ret = 0; - struct ustctl_consumer_stream *ustream; - struct ctf_packet_index index; - const char *subbuf_addr; - struct lttng_buffer_view subbuf_view; + int ret; - assert(stream); - assert(stream->ustream); - assert(ctx); + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; + } - DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd, - stream->name); + subbuf->info.metadata.version = stream->chan->metadata_cache->version; - /* Ease our life for what's next. */ - ustream = stream->ustream; +end: + return ret; +} - /* - * We can consume the 1 byte written into the wait_fd by UST. Don't trigger - * error if we cannot read this one byte (read returns 0), or if the error - * is EAGAIN or EWOULDBLOCK. - * - * This is only done when the stream is monitored by a thread, before the - * flush is done after a hangup and if the stream is not flagged with data - * since there might be nothing to consume in the wait fd but still have - * data available flagged by the consumer wake up pipe. - */ - if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) { - char dummy; - ssize_t readlen; +static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuf) +{ + int ret; - readlen = lttng_read(stream->wait_fd, &dummy, 1); - if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - ret = readlen; - goto error; - } + ret = extract_common_subbuffer_info(stream, subbuf); + if (ret) { + goto end; } -retry: - /* Get the next subbuffer */ - err = ustctl_get_next_subbuf(ustream); - if (err != 0) { - /* - * Populate metadata info if the existing info has - * already been read. - */ - if (stream->metadata_flag) { - ret = commit_one_metadata_packet(stream); - if (ret <= 0) { - goto error; - } - goto retry; - } + ret = ustctl_get_packet_size( + stream->ustream, &subbuf->info.data.packet_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer packet size"); + goto end; + } - ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ - /* - * This is a debug message even for single-threaded consumer, - * because poll() have more relaxed criterions than get subbuf, - * so get_subbuf may fail for short race windows where poll() - * would issue wakeups. - */ - DBG("Reserving sub buffer failed (everything is normal, " - "it is due to concurrency) [ret: %d]", err); - goto error; + ret = ustctl_get_content_size( + stream->ustream, &subbuf->info.data.content_size); + if (ret < 0) { + PERROR("Failed to get sub-buffer content size"); + goto end; } - assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); - if (!stream->metadata_flag) { - index.offset = htobe64(stream->out_fd_offset); - ret = get_index_values(&index, ustream); - if (ret < 0) { - err = ustctl_put_subbuf(ustream); - assert(err == 0); - goto error; - } + ret = ustctl_get_timestamp_begin( + stream->ustream, &subbuf->info.data.timestamp_begin); + if (ret < 0) { + PERROR("Failed to get sub-buffer begin timestamp"); + goto end; + } - /* Update the stream's sequence and discarded events count. */ - ret = update_stream_stats(stream); - if (ret < 0) { - PERROR("kernctl_get_events_discarded"); - err = ustctl_put_subbuf(ustream); - assert(err == 0); - goto error; + ret = ustctl_get_timestamp_end( + stream->ustream, &subbuf->info.data.timestamp_end); + if (ret < 0) { + PERROR("Failed to get sub-buffer end timestamp"); + goto end; + } + + ret = ustctl_get_events_discarded( + stream->ustream, &subbuf->info.data.events_discarded); + if (ret) { + PERROR("Failed to get sub-buffer events discarded count"); + goto end; + } + + ret = ustctl_get_sequence_number(stream->ustream, + &subbuf->info.data.sequence_number.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get sub-buffer sequence number"); + goto end; } } else { - write_index = 0; + subbuf->info.data.sequence_number.is_set = true; } - /* Get the full padded subbuffer size */ - err = ustctl_get_padded_subbuf_size(ustream, &len); - assert(err == 0); + ret = ustctl_get_stream_id( + stream->ustream, &subbuf->info.data.stream_id); + if (ret < 0) { + PERROR("Failed to get stream id"); + goto end; + } - /* Get subbuffer data size (without padding) */ - err = ustctl_get_subbuf_size(ustream, &subbuf_size); - assert(err == 0); + ret = ustctl_get_instance_id(stream->ustream, + &subbuf->info.data.stream_instance_id.value); + if (ret) { + /* May not be supported by older LTTng-modules. */ + if (ret != -ENOTTY) { + PERROR("Failed to get stream instance id"); + goto end; + } + } else { + subbuf->info.data.stream_instance_id.is_set = true; + } +end: + return ret; +} - /* Make sure we don't get a subbuffer size bigger than the padded */ - assert(len >= subbuf_size); +static int get_next_subbuffer_common(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; - padding = len - subbuf_size; + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } - ret = get_current_subbuf_addr(stream, &subbuf_addr); + ret = get_current_subbuf_addr(stream, &addr); if (ret) { - write_index = 0; - goto error_put_subbuf; + goto end; } - subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + assert(subbuffer->buffer.buffer.data != NULL); +end: + return ret; +} - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuf_view, padding, &index); - /* - * The mmap operation should write subbuf_size amount of data when - * network streaming or the full padding (len) size when we are _not_ - * streaming. - */ - if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || - (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { - /* - * Display the error but continue processing to try to release the - * subbuffer. This is a DBG statement since any unexpected kill or - * signal, the application gets unregistered, relayd gets closed or - * anything that affects the buffer lifetime will trigger this error. - * So, for the sake of the user, don't print this error since it can - * happen and it is OK with the code flow. - */ - DBG("Error writing to tracefile " - "(ret: %ld != len: %lu != subbuf_size: %lu)", - ret, len, subbuf_size); - write_index = 0; - } -error_put_subbuf: - err = ustctl_put_next_subbuf(ustream); - assert(err == 0); +static int get_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; - /* - * This will consumer the byte on the wait_fd if and only if there is not - * next subbuffer to be acquired. - */ - if (!stream->metadata_flag) { - ret = notify_if_more_data(stream, ctx); - if (ret < 0) { - goto error; - } + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret) { + goto end; } - /* Write index if needed. */ - if (!write_index) { + ret = get_next_subbuffer_common(stream, subbuffer); + if (ret) { goto end; } +end: + return ret; +} - if (stream->chan->live_timer_interval && !stream->metadata_flag) { - /* - * In live, block until all the metadata is sent. - */ - pthread_mutex_lock(&stream->metadata_timer_lock); - assert(!stream->missed_metadata_flush); - stream->waiting_on_metadata = true; - pthread_mutex_unlock(&stream->metadata_timer_lock); - - err = consumer_stream_sync_metadata(ctx, stream->session_id); - - pthread_mutex_lock(&stream->metadata_timer_lock); - stream->waiting_on_metadata = false; - if (stream->missed_metadata_flush) { - stream->missed_metadata_flush = false; - pthread_mutex_unlock(&stream->metadata_timer_lock); - (void) consumer_flush_ust_index(stream); - } else { - pthread_mutex_unlock(&stream->metadata_timer_lock); +static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret) { + ret = commit_one_metadata_packet(stream); + if (ret < 0) { + goto end; + } else if (ret == 0) { + /* Not an error, the cache is empty. */ + ret = -ENODATA; + goto end; } - if (err < 0) { - goto error; + ret = ustctl_get_next_subbuf(stream->ustream); + if (ret) { + goto end; } } - assert(!stream->metadata_flag); - err = consumer_stream_write_index(stream, &index); - if (err < 0) { - goto error; + ret = get_next_subbuffer_common(stream, subbuffer); + if (ret) { + goto end; } - end: -error: return ret; } +static int put_next_subbuffer(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + const int ret = ustctl_put_next_subbuf(stream->ustream); + + assert(ret == 0); + return ret; +} + +static int signal_metadata(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0; +} + +static void lttng_ustconsumer_set_stream_ops( + struct lttng_consumer_stream *stream) +{ + stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up; + if (stream->metadata_flag) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata; + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_metadata_subbuffer_info; + stream->read_subbuffer_ops.reset_metadata = + metadata_stream_reset_cache; + stream->read_subbuffer_ops.on_sleep = signal_metadata; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer; + stream->read_subbuffer_ops.extract_subbuffer_info = + extract_data_subbuffer_info; + stream->read_subbuffer_ops.on_sleep = notify_if_more_data; + if (stream->chan->is_live) { + stream->read_subbuffer_ops.send_live_beacon = + consumer_flush_ust_index; + } + } + + stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +} + /* * Called when a stream is created. * @@ -3000,6 +2900,8 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) goto error; } } + + lttng_ustconsumer_set_stream_ops(stream); ret = 0; error: