consumerd: refactor: split read_subbuf into sub-operations
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Sun, 10 May 2020 22:00:26 +0000 (18:00 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 26 May 2020 20:25:31 +0000 (16:25 -0400)
The read_subbuf code paths intertwine domain-specific operations and
metadata/data-specific logic which makes modifications error prone and
introduces a fair amount of code duplication.

lttng_consumer_read_subbuffer is effectively turned into a template
method invoking overridable callbacks making most of the consumption
logic domain and data/metadata agnostic.

The goal is not to extensively clean-up that code path. A follow-up
fix introduces metadata buffering logic which would not reasonably fit
in the current scheme. This clean-up makes it easier to safely
introduce those changes.

No changes in behaviour are intended by this change.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I9366f2e2a38018ca9b617b93ad9259340180c55d

src/common/consumer/consumer-stream.c
src/common/consumer/consumer-stream.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/kernel-consumer/kernel-consumer.h
src/common/ust-consumer/ust-consumer.c

index 8318d79d9a09e1451bf90c715d714bb9962f52e0..5dc380e5e32aa9a23c6d75c878bb6d2a3e10086d 100644 (file)
@@ -19,6 +19,8 @@
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
 #include <common/utils.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/consumer-timer.h>
 
 #include "consumer-stream.h"
 
@@ -36,6 +38,509 @@ static void free_stream_rcu(struct rcu_head *head)
        free(stream);
 }
 
+static void consumer_stream_data_lock_all(struct lttng_consumer_stream *stream)
+{
+       pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->lock);
+}
+
+static void consumer_stream_data_unlock_all(struct lttng_consumer_stream *stream)
+{
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
+}
+
+static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_data_lock_all(stream);
+       pthread_mutex_lock(&stream->metadata_rdv_lock);
+}
+
+static void consumer_stream_metadata_unlock_all(struct lttng_consumer_stream *stream)
+{
+       pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       consumer_stream_data_unlock_all(stream);
+}
+
+/* Only used for data streams. */
+static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuf)
+{
+       int ret = 0;
+       uint64_t sequence_number;
+       const uint64_t discarded_events =
+                       LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number);
+
+       if (!subbuf->info.data.sequence_number.is_set) {
+               /* Command not supported by the tracer. */
+               sequence_number = -1ULL;
+               stream->sequence_number_unavailable = true;
+       } else {
+               sequence_number = subbuf->info.data.sequence_number.value;
+       }
+
+       /*
+        * Start the sequence when we extract the first packet in case we don't
+        * start at 0 (for example if a consumer is not connected to the
+        * session immediately after the beginning).
+        */
+       if (stream->last_sequence_number == -1ULL) {
+               stream->last_sequence_number = sequence_number;
+       } else if (sequence_number > stream->last_sequence_number) {
+               stream->chan->lost_packets += sequence_number -
+                               stream->last_sequence_number - 1;
+       } else {
+               /* seq <= last_sequence_number */
+               ERR("Sequence number inconsistent : prev = %" PRIu64
+                   ", current = %" PRIu64,
+                               stream->last_sequence_number, sequence_number);
+               ret = -1;
+               goto end;
+       }
+       stream->last_sequence_number = sequence_number;
+
+       if (discarded_events < stream->last_discarded_events) {
+               /*
+                * Overflow has occurred. We assume only one wrap-around
+                * has occurred.
+                */
+               stream->chan->discarded_events +=
+                               (1ULL << (CAA_BITS_PER_LONG - 1)) -
+                               stream->last_discarded_events +
+                               discarded_events;
+       } else {
+               stream->chan->discarded_events += discarded_events -
+                                                 stream->last_discarded_events;
+       }
+       stream->last_discarded_events = discarded_events;
+       ret = 0;
+
+end:
+       return ret;
+}
+
+static
+void ctf_packet_index_populate(struct ctf_packet_index *index,
+               off_t offset, const struct stream_subbuffer *subbuffer)
+{
+       *index = (typeof(*index)){
+               .offset = htobe64(offset),
+               .packet_size = htobe64(subbuffer->info.data.packet_size),
+               .content_size = htobe64(subbuffer->info.data.content_size),
+               .timestamp_begin = htobe64(
+                               subbuffer->info.data.timestamp_begin),
+               .timestamp_end = htobe64(
+                               subbuffer->info.data.timestamp_end),
+               .events_discarded = htobe64(
+                               subbuffer->info.data.events_discarded),
+               .stream_id = htobe64(subbuffer->info.data.stream_id),
+               .stream_instance_id = htobe64(
+                               subbuffer->info.data.stream_instance_id.is_set ?
+                               subbuffer->info.data.stream_instance_id.value : -1ULL),
+               .packet_seq_num = htobe64(
+                               subbuffer->info.data.sequence_number.is_set ?
+                               subbuffer->info.data.sequence_number.value : -1ULL),
+       };
+}
+
+static ssize_t consumer_stream_consume_mmap(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer)
+{
+       const unsigned long padding_size =
+                       subbuffer->info.data.padded_subbuf_size -
+                       subbuffer->info.data.subbuf_size;
+
+       return lttng_consumer_on_read_subbuffer_mmap(
+                       ctx, stream, &subbuffer->buffer.buffer, padding_size);
+}
+
+static ssize_t consumer_stream_consume_splice(
+               struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer)
+{
+       return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
+                       subbuffer->info.data.padded_subbuf_size, 0);
+}
+
+static int consumer_stream_send_index(
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
+{
+       off_t packet_offset = 0;
+       struct ctf_packet_index index = {};
+
+       /*
+        * This is called after consuming the sub-buffer; substract the
+        * effect this sub-buffer from the offset.
+        */
+       if (stream->net_seq_idx == (uint64_t) -1ULL) {
+               packet_offset = stream->out_fd_offset -
+                               subbuffer->info.data.padded_subbuf_size;
+       }
+
+       ctf_packet_index_populate(&index, packet_offset, subbuffer);
+       return consumer_stream_write_index(stream, &index);
+}
+
+/*
+ * Actually do the metadata sync using the given metadata stream.
+ *
+ * Return 0 on success else a negative value. ENODATA can be returned also
+ * indicating that there is no metadata available for that stream.
+ */
+static int do_sync_metadata(struct lttng_consumer_stream *metadata,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+
+       assert(metadata);
+       assert(metadata->metadata_flag);
+       assert(ctx);
+
+       /*
+        * In UST, since we have to write the metadata from the cache packet
+        * by packet, we might need to start this procedure multiple times
+        * until all the metadata from the cache has been extracted.
+        */
+       do {
+               /*
+                * Steps :
+                * - Lock the metadata stream
+                * - Check if metadata stream node was deleted before locking.
+                *   - if yes, release and return success
+                * - Check if new metadata is ready (flush + snapshot pos)
+                * - If nothing : release and return.
+                * - Lock the metadata_rdv_lock
+                * - Unlock the metadata stream
+                * - cond_wait on metadata_rdv to wait the wakeup from the
+                *   metadata thread
+                * - Unlock the metadata_rdv_lock
+                */
+               pthread_mutex_lock(&metadata->lock);
+
+               /*
+                * There is a possibility that we were able to acquire a reference on the
+                * stream from the RCU hash table but between then and now, the node might
+                * have been deleted just before the lock is acquired. Thus, after locking,
+                * we make sure the metadata node has not been deleted which means that the
+                * buffers are closed.
+                *
+                * In that case, there is no need to sync the metadata hence returning a
+                * success return code.
+                */
+               ret = cds_lfht_is_node_deleted(&metadata->node.node);
+               if (ret) {
+                       ret = 0;
+                       goto end_unlock_mutex;
+               }
+
+               switch (ctx->type) {
+               case LTTNG_CONSUMER_KERNEL:
+                       /*
+                        * Empty the metadata cache and flush the current stream.
+                        */
+                       ret = lttng_kconsumer_sync_metadata(metadata);
+                       break;
+               case LTTNG_CONSUMER32_UST:
+               case LTTNG_CONSUMER64_UST:
+                       /*
+                        * Ask the sessiond if we have new metadata waiting and update the
+                        * consumer metadata cache.
+                        */
+                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+                       break;
+               default:
+                       assert(0);
+                       ret = -1;
+                       break;
+               }
+               /*
+                * Error or no new metadata, we exit here.
+                */
+               if (ret <= 0 || ret == ENODATA) {
+                       goto end_unlock_mutex;
+               }
+
+               /*
+                * At this point, new metadata have been flushed, so we wait on the
+                * rendez-vous point for the metadata thread to wake us up when it
+                * finishes consuming the metadata and continue execution.
+                */
+
+               pthread_mutex_lock(&metadata->metadata_rdv_lock);
+
+               /*
+                * Release metadata stream lock so the metadata thread can process it.
+                */
+               pthread_mutex_unlock(&metadata->lock);
+
+               /*
+                * Wait on the rendez-vous point. Once woken up, it means the metadata was
+                * consumed and thus synchronization is achieved.
+                */
+               pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
+               pthread_mutex_unlock(&metadata->metadata_rdv_lock);
+       } while (ret == EAGAIN);
+
+       /* Success */
+       return 0;
+
+end_unlock_mutex:
+       pthread_mutex_unlock(&metadata->lock);
+       return ret;
+}
+
+/*
+ * Synchronize the metadata using a given session ID. A successful acquisition
+ * of a metadata stream will trigger a request to the session daemon and a
+ * snapshot so the metadata thread can consume it.
+ *
+ * This function call is a rendez-vous point between the metadata thread and
+ * the data thread.
+ *
+ * Return 0 on success or else a negative value.
+ */
+int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
+               uint64_t session_id)
+{
+       int ret;
+       struct lttng_consumer_stream *stream = NULL;
+       struct lttng_ht_iter iter;
+       struct lttng_ht *ht;
+
+       assert(ctx);
+
+       /* Ease our life a bit. */
+       ht = consumer_data.stream_list_ht;
+
+       rcu_read_lock();
+
+       /* Search the metadata associated with the session id of the given stream. */
+
+       cds_lfht_for_each_entry_duplicate(ht->ht,
+                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
+                       &session_id, &iter.iter, stream, node_session_id.node) {
+               if (!stream->metadata_flag) {
+                       continue;
+               }
+
+               ret = do_sync_metadata(stream, ctx);
+               if (ret < 0) {
+                       goto end;
+               }
+       }
+
+       /*
+        * Force return code to 0 (success) since ret might be ENODATA for instance
+        * which is not an error but rather that we should come back.
+        */
+       ret = 0;
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+static int consumer_stream_sync_metadata_index(
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer,
+               struct lttng_consumer_local_data *ctx)
+{
+       int ret;
+
+       /* Block until all the metadata is sent. */
+       pthread_mutex_lock(&stream->metadata_timer_lock);
+       assert(!stream->missed_metadata_flush);
+       stream->waiting_on_metadata = true;
+       pthread_mutex_unlock(&stream->metadata_timer_lock);
+
+       ret = consumer_stream_sync_metadata(ctx, stream->session_id);
+
+       pthread_mutex_lock(&stream->metadata_timer_lock);
+       stream->waiting_on_metadata = false;
+       if (stream->missed_metadata_flush) {
+               stream->missed_metadata_flush = false;
+               pthread_mutex_unlock(&stream->metadata_timer_lock);
+               (void) stream->read_subbuffer_ops.send_live_beacon(stream);
+       } else {
+               pthread_mutex_unlock(&stream->metadata_timer_lock);
+       }
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = consumer_stream_send_index(stream, subbuffer, ctx);
+end:
+       return ret;
+}
+
+/*
+ * Check if the local version of the metadata stream matches with the version
+ * of the metadata stream in the kernel. If it was updated, set the reset flag
+ * on the stream.
+ */
+static
+int metadata_stream_check_version(struct lttng_consumer_stream *stream,
+       const struct stream_subbuffer *subbuffer)
+{
+       if (stream->metadata_version == subbuffer->info.metadata.version) {
+               goto end;
+       }
+
+       DBG("New metadata version detected");
+       stream->metadata_version = subbuffer->info.metadata.version;
+       stream->reset_metadata_flag = 1;
+
+       if (stream->read_subbuffer_ops.reset_metadata) {
+               stream->read_subbuffer_ops.reset_metadata(stream);
+       }
+
+end:
+       return 0;
+}
+
+struct lttng_consumer_stream *consumer_stream_create(
+               struct lttng_consumer_channel *channel,
+               uint64_t channel_key,
+               uint64_t stream_key,
+               const char *channel_name,
+               uint64_t relayd_id,
+               uint64_t session_id,
+               struct lttng_trace_chunk *trace_chunk,
+               int cpu,
+               int *alloc_ret,
+               enum consumer_channel_type type,
+               unsigned int monitor)
+{
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       stream = zmalloc(sizeof(*stream));
+       if (stream == NULL) {
+               PERROR("malloc struct lttng_consumer_stream");
+               ret = -ENOMEM;
+               goto end;
+       }
+
+       if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
+               ERR("Failed to acquire trace chunk reference during the creation of a stream");
+               ret = -1;
+               goto error;
+       }
+
+       rcu_read_lock();
+       stream->chan = channel;
+       stream->key = stream_key;
+       stream->trace_chunk = trace_chunk;
+       stream->out_fd = -1;
+       stream->out_fd_offset = 0;
+       stream->output_written = 0;
+       stream->net_seq_idx = relayd_id;
+       stream->session_id = session_id;
+       stream->monitor = monitor;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+       stream->index_file = NULL;
+       stream->last_sequence_number = -1ULL;
+       stream->rotate_position = -1ULL;
+       pthread_mutex_init(&stream->lock, NULL);
+       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
+
+       /* If channel is the metadata, flag this stream as metadata. */
+       if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               stream->metadata_flag = 1;
+               /* Metadata is flat out. */
+               strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
+       } else {
+               /* Format stream name to <channel_name>_<cpu_number> */
+               ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
+                               channel_name, cpu);
+               if (ret < 0) {
+                       PERROR("snprintf stream name");
+                       goto error;
+               }
+       }
+
+       switch (channel->output) {
+       case CONSUMER_CHANNEL_SPLICE:
+               stream->output = LTTNG_EVENT_SPLICE;
+               ret = utils_create_pipe(stream->splice_pipe);
+               if (ret < 0) {
+                       goto error;
+               }
+               break;
+       case CONSUMER_CHANNEL_MMAP:
+               stream->output = LTTNG_EVENT_MMAP;
+               break;
+       default:
+               abort();
+       }
+
+       /* Key is always the wait_fd for streams. */
+       lttng_ht_node_init_u64(&stream->node, stream->key);
+
+       /* Init node per channel id key */
+       lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
+
+       /* Init session id node with the stream session id */
+       lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
+
+       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+                       " relayd_id %" PRIu64 ", session_id %" PRIu64,
+                       stream->name, stream->key, channel_key,
+                       stream->net_seq_idx, stream->session_id);
+
+       rcu_read_unlock();
+
+       if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+               stream->read_subbuffer_ops.lock =
+                               consumer_stream_metadata_lock_all;
+               stream->read_subbuffer_ops.unlock =
+                               consumer_stream_metadata_unlock_all;
+               stream->read_subbuffer_ops.pre_consume_subbuffer =
+                               metadata_stream_check_version;
+       } else {
+               stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
+               stream->read_subbuffer_ops.unlock =
+                               consumer_stream_data_unlock_all;
+               stream->read_subbuffer_ops.pre_consume_subbuffer =
+                               consumer_stream_update_stats;
+               if (channel->is_live) {
+                       stream->read_subbuffer_ops.post_consume =
+                                       consumer_stream_sync_metadata_index;
+               } else {
+                       stream->read_subbuffer_ops.post_consume =
+                                       consumer_stream_send_index;
+               }
+       }
+
+       if (channel->output == CONSUMER_CHANNEL_MMAP) {
+               stream->read_subbuffer_ops.consume_subbuffer =
+                               consumer_stream_consume_mmap;
+       } else {
+               stream->read_subbuffer_ops.consume_subbuffer =
+                               consumer_stream_consume_splice;
+       }
+
+       return stream;
+
+error:
+       rcu_read_unlock();
+       lttng_trace_chunk_put(stream->trace_chunk);
+       free(stream);
+end:
+       if (alloc_ret) {
+               *alloc_ret = ret;
+       }
+       return NULL;
+}
+
 /*
  * Close stream on the relayd side. This call can destroy a relayd if the
  * conditions are met.
@@ -393,165 +898,6 @@ error:
        return ret;
 }
 
-/*
- * Actually do the metadata sync using the given metadata stream.
- *
- * Return 0 on success else a negative value. ENODATA can be returned also
- * indicating that there is no metadata available for that stream.
- */
-static int do_sync_metadata(struct lttng_consumer_stream *metadata,
-               struct lttng_consumer_local_data *ctx)
-{
-       int ret;
-
-       assert(metadata);
-       assert(metadata->metadata_flag);
-       assert(ctx);
-
-       /*
-        * In UST, since we have to write the metadata from the cache packet
-        * by packet, we might need to start this procedure multiple times
-        * until all the metadata from the cache has been extracted.
-        */
-       do {
-               /*
-                * Steps :
-                * - Lock the metadata stream
-                * - Check if metadata stream node was deleted before locking.
-                *   - if yes, release and return success
-                * - Check if new metadata is ready (flush + snapshot pos)
-                * - If nothing : release and return.
-                * - Lock the metadata_rdv_lock
-                * - Unlock the metadata stream
-                * - cond_wait on metadata_rdv to wait the wakeup from the
-                *   metadata thread
-                * - Unlock the metadata_rdv_lock
-                */
-               pthread_mutex_lock(&metadata->lock);
-
-               /*
-                * There is a possibility that we were able to acquire a reference on the
-                * stream from the RCU hash table but between then and now, the node might
-                * have been deleted just before the lock is acquired. Thus, after locking,
-                * we make sure the metadata node has not been deleted which means that the
-                * buffers are closed.
-                *
-                * In that case, there is no need to sync the metadata hence returning a
-                * success return code.
-                */
-               ret = cds_lfht_is_node_deleted(&metadata->node.node);
-               if (ret) {
-                       ret = 0;
-                       goto end_unlock_mutex;
-               }
-
-               switch (ctx->type) {
-               case LTTNG_CONSUMER_KERNEL:
-                       /*
-                        * Empty the metadata cache and flush the current stream.
-                        */
-                       ret = lttng_kconsumer_sync_metadata(metadata);
-                       break;
-               case LTTNG_CONSUMER32_UST:
-               case LTTNG_CONSUMER64_UST:
-                       /*
-                        * Ask the sessiond if we have new metadata waiting and update the
-                        * consumer metadata cache.
-                        */
-                       ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
-                       break;
-               default:
-                       assert(0);
-                       ret = -1;
-                       break;
-               }
-               /*
-                * Error or no new metadata, we exit here.
-                */
-               if (ret <= 0 || ret == ENODATA) {
-                       goto end_unlock_mutex;
-               }
-
-               /*
-                * At this point, new metadata have been flushed, so we wait on the
-                * rendez-vous point for the metadata thread to wake us up when it
-                * finishes consuming the metadata and continue execution.
-                */
-
-               pthread_mutex_lock(&metadata->metadata_rdv_lock);
-
-               /*
-                * Release metadata stream lock so the metadata thread can process it.
-                */
-               pthread_mutex_unlock(&metadata->lock);
-
-               /*
-                * Wait on the rendez-vous point. Once woken up, it means the metadata was
-                * consumed and thus synchronization is achieved.
-                */
-               pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
-               pthread_mutex_unlock(&metadata->metadata_rdv_lock);
-       } while (ret == EAGAIN);
-
-       /* Success */
-       return 0;
-
-end_unlock_mutex:
-       pthread_mutex_unlock(&metadata->lock);
-       return ret;
-}
-
-/*
- * Synchronize the metadata using a given session ID. A successful acquisition
- * of a metadata stream will trigger a request to the session daemon and a
- * snapshot so the metadata thread can consume it.
- *
- * This function call is a rendez-vous point between the metadata thread and
- * the data thread.
- *
- * Return 0 on success or else a negative value.
- */
-int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
-               uint64_t session_id)
-{
-       int ret;
-       struct lttng_consumer_stream *stream = NULL;
-       struct lttng_ht_iter iter;
-       struct lttng_ht *ht;
-
-       assert(ctx);
-
-       /* Ease our life a bit. */
-       ht = consumer_data.stream_list_ht;
-
-       rcu_read_lock();
-
-       /* Search the metadata associated with the session id of the given stream. */
-
-       cds_lfht_for_each_entry_duplicate(ht->ht,
-                       ht->hash_fct(&session_id, lttng_ht_seed), ht->match_fct,
-                       &session_id, &iter.iter, stream, node_session_id.node) {
-               if (!stream->metadata_flag) {
-                       continue;
-               }
-
-               ret = do_sync_metadata(stream, ctx);
-               if (ret < 0) {
-                       goto end;
-               }
-       }
-
-       /*
-        * Force return code to 0 (success) since ret might be ENODATA for instance
-        * which is not an error but rather that we should come back.
-        */
-       ret = 0;
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
 int consumer_stream_create_output_files(struct lttng_consumer_stream *stream,
                bool create_index)
 {
index 0e1827c4a55382a4eabffdb78c832249317461de..eb00dac78849d62df66911b8423f4687ea4a4f5d 100644 (file)
 
 #include "consumer.h"
 
+/*
+ * Create a consumer stream.
+ *
+ * The channel lock MUST be acquired.
+ */
+struct lttng_consumer_stream *consumer_stream_create(
+               struct lttng_consumer_channel *channel,
+               uint64_t channel_key,
+               uint64_t stream_key,
+               const char *channel_name,
+               uint64_t relayd_id,
+               uint64_t session_id,
+               struct lttng_trace_chunk *trace_chunk,
+               int cpu,
+               int *alloc_ret,
+               enum consumer_channel_type type,
+               unsigned int monitor);
+
 /*
  * Close stream's file descriptors and, if needed, close stream also on the
  * relayd side.
index f13e90a6881ac4ab87312c61a7e632004d97e81d..5c211339d42bd2017a8888b590ff5b3941cbbadf 100644 (file)
@@ -7,6 +7,7 @@
  *
  */
 
+#include "common/index/ctf-index.h"
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
@@ -568,98 +569,6 @@ void consumer_stream_update_channel_attributes(
                        channel->tracefile_size;
 }
 
-struct lttng_consumer_stream *consumer_allocate_stream(
-               struct lttng_consumer_channel *channel,
-               uint64_t channel_key,
-               uint64_t stream_key,
-               const char *channel_name,
-               uint64_t relayd_id,
-               uint64_t session_id,
-               struct lttng_trace_chunk *trace_chunk,
-               int cpu,
-               int *alloc_ret,
-               enum consumer_channel_type type,
-               unsigned int monitor)
-{
-       int ret;
-       struct lttng_consumer_stream *stream;
-
-       stream = zmalloc(sizeof(*stream));
-       if (stream == NULL) {
-               PERROR("malloc struct lttng_consumer_stream");
-               ret = -ENOMEM;
-               goto end;
-       }
-
-       if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
-               ERR("Failed to acquire trace chunk reference during the creation of a stream");
-               ret = -1;
-               goto error;
-       }
-
-       rcu_read_lock();
-       stream->chan = channel;
-       stream->key = stream_key;
-       stream->trace_chunk = trace_chunk;
-       stream->out_fd = -1;
-       stream->out_fd_offset = 0;
-       stream->output_written = 0;
-       stream->net_seq_idx = relayd_id;
-       stream->session_id = session_id;
-       stream->monitor = monitor;
-       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
-       stream->index_file = NULL;
-       stream->last_sequence_number = -1ULL;
-       stream->rotate_position = -1ULL;
-       pthread_mutex_init(&stream->lock, NULL);
-       pthread_mutex_init(&stream->metadata_timer_lock, NULL);
-
-       /* If channel is the metadata, flag this stream as metadata. */
-       if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
-               stream->metadata_flag = 1;
-               /* Metadata is flat out. */
-               strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
-               /* Live rendez-vous point. */
-               pthread_cond_init(&stream->metadata_rdv, NULL);
-               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
-       } else {
-               /* Format stream name to <channel_name>_<cpu_number> */
-               ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
-                               channel_name, cpu);
-               if (ret < 0) {
-                       PERROR("snprintf stream name");
-                       goto error;
-               }
-       }
-
-       /* Key is always the wait_fd for streams. */
-       lttng_ht_node_init_u64(&stream->node, stream->key);
-
-       /* Init node per channel id key */
-       lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
-
-       /* Init session id node with the stream session id */
-       lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
-
-       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
-                       " relayd_id %" PRIu64 ", session_id %" PRIu64,
-                       stream->name, stream->key, channel_key,
-                       stream->net_seq_idx, stream->session_id);
-
-       rcu_read_unlock();
-       return stream;
-
-error:
-       rcu_read_unlock();
-       lttng_trace_chunk_put(stream->trace_chunk);
-       free(stream);
-end:
-       if (alloc_ret) {
-               *alloc_ret = ret;
-       }
-       return NULL;
-}
-
 /*
  * Add a stream to the global list protected by a mutex.
  */
@@ -1467,7 +1376,7 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
                ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx),
+                       struct lttng_consumer_local_data *ctx, bool locked_by_caller),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(uint64_t stream_key, uint32_t state))
@@ -1677,8 +1586,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
-               unsigned long padding,
-               struct ctf_packet_index *index)
+               unsigned long padding)
 {
        ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
@@ -1770,10 +1678,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        orig_offset = 0;
                }
                stream->tracefile_size_current += buffer->size;
-               if (index) {
-                       index->offset = htobe64(stream->out_fd_offset);
-               }
-
                write_len = buffer->size;
        }
 
@@ -1850,8 +1754,7 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index)
+               unsigned long padding)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1955,7 +1858,6 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        orig_offset = 0;
                }
                stream->tracefile_size_current += len;
-               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -2522,7 +2424,7 @@ restart:
                                do {
                                        health_code_update();
 
-                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       len = ctx->on_buffer_ready(stream, ctx, false);
                                        /*
                                         * We don't check the return value here since if we get
                                         * a negative len, it means an error occurred thus we
@@ -2549,7 +2451,7 @@ restart:
                                        do {
                                                health_code_update();
 
-                                               len = ctx->on_buffer_ready(stream, ctx);
+                                               len = ctx->on_buffer_ready(stream, ctx, false);
                                                /*
                                                 * We don't check the return value here since if we get
                                                 * a negative len, it means an error occurred thus we
@@ -2765,7 +2667,7 @@ void *consumer_thread_data_poll(void *data)
                        if (pollfd[i].revents & POLLPRI) {
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
-                               len = ctx->on_buffer_ready(local_stream[i], ctx);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx, false);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
@@ -2796,7 +2698,7 @@ void *consumer_thread_data_poll(void *data)
                                        local_stream[i]->hangup_flush_done ||
                                        local_stream[i]->has_data) {
                                DBG("Normal read on fd %d", pollfd[i].fd);
-                               len = ctx->on_buffer_ready(local_stream[i], ctx);
+                               len = ctx->on_buffer_ready(local_stream[i], ctx, false);
                                /* it's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean the stream and free it. */
@@ -3410,15 +3312,22 @@ error_testpoint:
 }
 
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx,
+               bool locked_by_caller)
 {
-       ssize_t ret;
+       ssize_t ret, written_bytes;
        int rotation_ret;
+       struct stream_subbuffer subbuffer = {};
 
-       pthread_mutex_lock(&stream->chan->lock);
-       pthread_mutex_lock(&stream->lock);
-       if (stream->metadata_flag) {
-               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       if (!locked_by_caller) {
+               stream->read_subbuffer_ops.lock(stream);
+       }
+
+       if (stream->read_subbuffer_ops.on_wake_up) {
+               ret = stream->read_subbuffer_ops.on_wake_up(stream);
+               if (ret) {
+                       goto end;
+               }
        }
 
        /*
@@ -3434,25 +3343,55 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                }
        }
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               ret = lttng_kconsumer_read_subbuffer(stream, ctx);
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               ret = -ENOSYS;
-               break;
+       ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
+       if (ret) {
+               if (ret == -ENODATA) {
+                       /* Not an error. */
+                       ret = 0;
+               }
+               goto end;
        }
 
-       if (ret < 0) {
+       ret = stream->read_subbuffer_ops.pre_consume_subbuffer(
+                       stream, &subbuffer);
+       if (ret) {
+               goto error_put_subbuf;
+       }
+
+       written_bytes = stream->read_subbuffer_ops.consume_subbuffer(
+                       ctx, stream, &subbuffer);
+       /*
+        * Should write subbuf_size amount of data when network streaming or
+        * the full padded size when we are not streaming.
+        */
+       if ((written_bytes != subbuffer.info.data.subbuf_size &&
+                           stream->net_seq_idx != (uint64_t) -1ULL) ||
+                       (written_bytes != subbuffer.info.data.padded_subbuf_size &&
+                                       stream->net_seq_idx ==
+                                                       (uint64_t) -1ULL)) {
+               /*
+                * Display the error but continue processing to try to
+                * release the subbuffer. This is a DBG statement
+                * since this can happen without being a critical
+                * error.
+                */
+               DBG("Failed to write to tracefile (written_bytes: %zd != padded subbuffer size: %lu, subbuffer size: %lu)",
+                               written_bytes, subbuffer.info.data.subbuf_size,
+                               subbuffer.info.data.padded_subbuf_size);
+       }
+
+       ret = stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
+       if (ret) {
                goto end;
        }
 
+       if (stream->read_subbuffer_ops.post_consume) {
+               ret = stream->read_subbuffer_ops.post_consume(stream, &subbuffer, ctx);
+               if (ret) {
+                       goto end;
+               }
+       }
+
        /*
         * After extracting the packet, we check if the stream is now ready to
         * be rotated and perform the action immediately.
@@ -3474,14 +3413,20 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
+       if (stream->read_subbuffer_ops.on_sleep) {
+               stream->read_subbuffer_ops.on_sleep(stream, ctx);
+       }
+
+       ret = written_bytes;
 end:
-       if (stream->metadata_flag) {
-               pthread_cond_broadcast(&stream->metadata_rdv);
-               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       if (!locked_by_caller) {
+               stream->read_subbuffer_ops.unlock(stream);
        }
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&stream->chan->lock);
+
        return ret;
+error_put_subbuf:
+       (void) stream->read_subbuffer_ops.put_next_subbuffer(stream, &subbuffer);
+       goto end;
 }
 
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
index 000040982d5dfb4bba6abaa2bb2a6a9e1f3d5b75..aa8a401a0416efe089057f2e07b6187c38bf78d5 100644 (file)
@@ -28,6 +28,8 @@
 #include <common/credentials.h>
 #include <common/buffer-view.h>
 
+struct lttng_consumer_local_data;
+
 /* Commands for consumer */
 enum lttng_consumer_command {
        LTTNG_CONSUMER_ADD_CHANNEL,
@@ -244,6 +246,142 @@ struct lttng_consumer_channel {
        bool streams_sent_to_relayd;
 };
 
+struct stream_subbuffer {
+       union {
+               /*
+                * CONSUMER_CHANNEL_SPLICE
+                * No ownership assumed.
+                */
+               int fd;
+               /* CONSUMER_CHANNEL_MMAP */
+               struct lttng_buffer_view buffer;
+       } buffer;
+       union {
+               /*
+                * Common members are fine to access through either
+                * union entries (as per C11, Common Initial Sequence).
+                */
+               struct {
+                       unsigned long subbuf_size;
+                       unsigned long padded_subbuf_size;
+                       uint64_t version;
+               } metadata;
+               struct {
+                       unsigned long subbuf_size;
+                       unsigned long padded_subbuf_size;
+                       uint64_t packet_size;
+                       uint64_t content_size;
+                       uint64_t timestamp_begin;
+                       uint64_t timestamp_end;
+                       uint64_t events_discarded;
+                       /* Left unset when unsupported. */
+                       LTTNG_OPTIONAL(uint64_t) sequence_number;
+                       uint64_t stream_id;
+                       /* Left unset when unsupported. */
+                       LTTNG_OPTIONAL(uint64_t) stream_instance_id;
+               } data;
+       } info;
+};
+
+/*
+ * Perform any operation required to acknowledge
+ * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe).
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Perform any operation required before a consumer stream is put
+ * to sleep before awaiting a data availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_sleep_cb)(struct lttng_consumer_stream *,
+               struct lttng_consumer_local_data *);
+
+/*
+ * Acquire the subbuffer at the current 'consumed' position.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *,
+               struct stream_subbuffer *);
+
+/*
+ * Populate the stream_subbuffer's info member. The info to populate
+ * depends on the type (metadata/data) of the stream.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*extract_subbuffer_info_cb)(
+               struct lttng_consumer_stream *, struct stream_subbuffer *);
+
+/*
+ * Invoked after a subbuffer's info has been filled.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *,
+               const struct stream_subbuffer *);
+
+/*
+ * Consume subbuffer contents.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *,
+               struct lttng_consumer_stream *,
+               const struct stream_subbuffer *);
+
+/*
+ * Release the current subbuffer and advance the 'consumed' position by
+ * one subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *,
+               struct stream_subbuffer *);
+
+/*
+ * Invoked after consuming a subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*post_consume_cb)(struct lttng_consumer_stream *,
+               const struct stream_subbuffer *,
+               struct lttng_consumer_local_data *);
+
+/*
+ * Send a live beacon if no data is available.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Lock the stream and channel locks and any other stream-type specific
+ * lock that need to be acquired during the processing of an
+ * availability notification.
+ */
+typedef void (*lock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Unlock the stream and channel locks and any other stream-type specific
+ * lock before sleeping until the next availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*unlock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Invoked when a subbuffer's metadata version does not match the last
+ * known metadata version.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *);
+
 /*
  * Internal representation of the streams, sessiond_key is used to identify
  * uniquely a stream.
@@ -467,6 +605,24 @@ struct lttng_consumer_stream {
         * file before writing in it (regeneration).
         */
        unsigned int reset_metadata_flag:1;
+       struct {
+               /*
+                * Invoked in the order of declaration.
+                * See callback type definitions.
+                */
+               lock_cb lock;
+               on_wake_up_cb on_wake_up;
+               get_next_subbuffer_cb get_next_subbuffer;
+               extract_subbuffer_info_cb extract_subbuffer_info;
+               pre_consume_subbuffer_cb pre_consume_subbuffer;
+               reset_metadata_cb reset_metadata;
+               consume_subbuffer_cb consume_subbuffer;
+               put_next_subbuffer_cb put_next_subbuffer;
+               post_consume_cb post_consume;
+               send_live_beacon_cb send_live_beacon;
+               on_sleep_cb on_sleep;
+               unlock_cb unlock;
+       } read_subbuffer_ops;
 };
 
 /*
@@ -523,7 +679,8 @@ struct lttng_consumer_local_data {
         * Returns the number of bytes read, or negative error value.
         */
        ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx);
+                       struct lttng_consumer_local_data *ctx,
+                       bool locked_by_caller);
        /*
         * function to call when we receive a new channel, it receives a
         * newly allocated channel, depending on the return code of this
@@ -790,7 +947,8 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht);
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
                ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx),
+                       struct lttng_consumer_local_data *ctx,
+                       bool locked_by_caller),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(uint64_t sessiond_key, uint32_t state));
@@ -799,13 +957,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
-               unsigned long padding,
-               struct ctf_packet_index *index);
+               unsigned long padding);
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index);
+               unsigned long padding);
 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
@@ -822,7 +978,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx);
+               struct lttng_consumer_local_data *ctx,
+               bool locked_by_caller);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
index 4a2e4e07d7c0b8e5eac8e34927afad5f8762da4a..e06f3b3e079aae44f494ba6d4409f02120018af4 100644 (file)
@@ -7,8 +7,6 @@
  *
  */
 
-#include "common/buffer-view.h"
-#include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
@@ -36,6 +34,9 @@
 #include <common/index/index.h>
 #include <common/consumer/consumer-timer.h>
 #include <common/optional.h>
+#include <common/buffer-view.h>
+#include <common/consumer/consumer.h>
+#include <stdint.h>
 
 #include "kernel-consumer.h"
 
@@ -288,7 +289,7 @@ static int lttng_kconsumer_snapshot_channel(
                                        subbuf_addr, 0, padded_len);
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
                                        stream, &subbuf_view,
-                                       padded_len - len, NULL);
+                                       padded_len - len);
                        /*
                         * We write the padded len in local tracefiles but the data len
                         * when using a relay. Display the error but continue processing
@@ -399,7 +400,7 @@ static int lttng_kconsumer_snapshot_metadata(
        do {
                health_code_update();
 
-               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
                if (ret_read < 0) {
                        if (ret_read != -EAGAIN) {
                                ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
@@ -655,7 +656,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                pthread_mutex_lock(&channel->lock);
-               new_stream = consumer_allocate_stream(
+               new_stream = consumer_stream_create(
                                channel,
                                channel->key,
                                fd,
@@ -690,23 +691,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                consumer_stream_update_channel_attributes(new_stream,
                                channel);
-               switch (channel->output) {
-               case CONSUMER_CHANNEL_SPLICE:
-                       new_stream->output = LTTNG_EVENT_SPLICE;
-                       ret = utils_create_pipe(new_stream->splice_pipe);
-                       if (ret < 0) {
-                               pthread_mutex_unlock(&channel->lock);
-                               goto error_add_stream_nosignal;
-                       }
-                       break;
-               case CONSUMER_CHANNEL_MMAP:
-                       new_stream->output = LTTNG_EVENT_MMAP;
-                       break;
-               default:
-                       ERR("Stream output unknown %d", channel->output);
-                       pthread_mutex_unlock(&channel->lock);
-                       goto error_add_stream_nosignal;
-               }
 
                /*
                 * We've just assigned the channel to the stream so increment the
@@ -1358,93 +1342,6 @@ end:
        return ret;
 }
 
-/*
- * Populate index values of a kernel stream. Values are set in big endian order.
- *
- * Return 0 on success or else a negative value.
- */
-static int get_index_values(struct ctf_packet_index *index, int infd)
-{
-       int ret;
-       uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
-                       events_discarded, stream_id, stream_instance_id,
-                       packet_seq_num;
-
-       ret = kernctl_get_timestamp_begin(infd, &timestamp_begin);
-       if (ret < 0) {
-               PERROR("kernctl_get_timestamp_begin");
-               goto error;
-       }
-
-       ret = kernctl_get_timestamp_end(infd, &timestamp_end);
-       if (ret < 0) {
-               PERROR("kernctl_get_timestamp_end");
-               goto error;
-       }
-
-       ret = kernctl_get_events_discarded(infd, &events_discarded);
-       if (ret < 0) {
-               PERROR("kernctl_get_events_discarded");
-               goto error;
-       }
-
-       ret = kernctl_get_content_size(infd, &content_size);
-       if (ret < 0) {
-               PERROR("kernctl_get_content_size");
-               goto error;
-       }
-
-       ret = kernctl_get_packet_size(infd, &packet_size);
-       if (ret < 0) {
-               PERROR("kernctl_get_packet_size");
-               goto error;
-       }
-
-       ret = kernctl_get_stream_id(infd, &stream_id);
-       if (ret < 0) {
-               PERROR("kernctl_get_stream_id");
-               goto error;
-       }
-
-       ret = kernctl_get_instance_id(infd, &stream_instance_id);
-       if (ret < 0) {
-               if (ret == -ENOTTY) {
-                       /* Command not implemented by lttng-modules. */
-                       stream_instance_id = -1ULL;
-               } else {
-                       PERROR("kernctl_get_instance_id");
-                       goto error;
-               }
-       }
-
-       ret = kernctl_get_sequence_number(infd, &packet_seq_num);
-       if (ret < 0) {
-               if (ret == -ENOTTY) {
-                       /* Command not implemented by lttng-modules. */
-                       packet_seq_num = -1ULL;
-                       ret = 0;
-               } else {
-                       PERROR("kernctl_get_sequence_number");
-                       goto error;
-               }
-       }
-       index->packet_seq_num = htobe64(index->packet_seq_num);
-
-       *index = (typeof(*index)) {
-               .offset = index->offset,
-               .packet_size = htobe64(packet_size),
-               .content_size = htobe64(content_size),
-               .timestamp_begin = htobe64(timestamp_begin),
-               .timestamp_end = htobe64(timestamp_end),
-               .events_discarded = htobe64(events_discarded),
-               .stream_id = htobe64(stream_id),
-               .stream_instance_id = htobe64(stream_instance_id),
-               .packet_seq_num = htobe64(packet_seq_num),
-       };
-
-error:
-       return ret;
-}
 /*
  * Sync metadata meaning request them to the session daemon and snapshot to the
  * metadata thread can consumer them.
@@ -1483,348 +1380,226 @@ end:
 }
 
 static
-int update_stream_stats(struct lttng_consumer_stream *stream)
+int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuf)
 {
        int ret;
-       uint64_t seq, discarded;
-
-       ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
-       if (ret < 0) {
-               if (ret == -ENOTTY) {
-                       /* Command not implemented by lttng-modules. */
-                       seq = -1ULL;
-                       stream->sequence_number_unavailable = true;
-               } else {
-                       PERROR("kernctl_get_sequence_number");
-                       goto end;
-               }
-       }
 
-       /*
-        * Start the sequence when we extract the first packet in case we don't
-        * start at 0 (for example if a consumer is not connected to the
-        * session immediately after the beginning).
-        */
-       if (stream->last_sequence_number == -1ULL) {
-               stream->last_sequence_number = seq;
-       } else if (seq > stream->last_sequence_number) {
-               stream->chan->lost_packets += seq -
-                               stream->last_sequence_number - 1;
-       } else {
-               /* seq <= last_sequence_number */
-               ERR("Sequence number inconsistent : prev = %" PRIu64
-                               ", current = %" PRIu64,
-                               stream->last_sequence_number, seq);
-               ret = -1;
+       ret = kernctl_get_subbuf_size(
+                       stream->wait_fd, &subbuf->info.data.subbuf_size);
+       if (ret) {
                goto end;
        }
-       stream->last_sequence_number = seq;
 
-       ret = kernctl_get_events_discarded(stream->wait_fd, &discarded);
-       if (ret < 0) {
-               PERROR("kernctl_get_events_discarded");
+       ret = kernctl_get_padded_subbuf_size(
+                       stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
+       if (ret) {
                goto end;
        }
-       if (discarded < stream->last_discarded_events) {
-               /*
-                * Overflow has occurred. We assume only one wrap-around
-                * has occurred.
-                */
-               stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
-                       stream->last_discarded_events + discarded;
-       } else {
-               stream->chan->discarded_events += discarded -
-                       stream->last_discarded_events;
-       }
-       stream->last_discarded_events = discarded;
-       ret = 0;
 
 end:
        return ret;
 }
 
-/*
- * Check if the local version of the metadata stream matches with the version
- * of the metadata stream in the kernel. If it was updated, set the reset flag
- * on the stream.
- */
 static
-int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream)
+int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuf)
 {
        int ret;
-       uint64_t cur_version;
 
-       ret = kernctl_get_metadata_version(infd, &cur_version);
-       if (ret < 0) {
-               if (ret == -ENOTTY) {
-                       /*
-                        * LTTng-modules does not implement this
-                        * command.
-                        */
-                       ret = 0;
-                       goto end;
-               }
-               ERR("Failed to get the metadata version");
+       ret = extract_common_subbuffer_info(stream, subbuf);
+       if (ret) {
                goto end;
        }
 
-       if (stream->metadata_version == cur_version) {
-               ret = 0;
+       ret = kernctl_get_metadata_version(
+                       stream->wait_fd, &subbuf->info.metadata.version);
+       if (ret) {
                goto end;
        }
 
-       DBG("New metadata version detected");
-       stream->metadata_version = cur_version;
-       stream->reset_metadata_flag = 1;
-       ret = 0;
-
 end:
        return ret;
 }
 
-/*
- * Consume data on a file descriptor and write it on a trace file.
- * The stream and channel locks must be held by the caller.
- */
-ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+static
+int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuf)
 {
-       unsigned long len, subbuf_size, padding;
-       int err, write_index = 1;
-       ssize_t ret = 0;
-       int infd = stream->wait_fd;
-       struct ctf_packet_index index = {};
-       bool in_error_state = false;
+       int ret;
 
-       DBG("In read_subbuffer (infd : %d)", infd);
+       ret = extract_common_subbuffer_info(stream, subbuf);
+       if (ret) {
+               goto end;
+       }
 
+       ret = kernctl_get_packet_size(
+                       stream->wait_fd, &subbuf->info.data.packet_size);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer packet size");
+               goto end;
+       }
 
-       /* Get the next subbuffer */
-       err = kernctl_get_next_subbuf(infd);
-       if (err != 0) {
-               /*
-                * This is a debug message even for single-threaded consumer,
-                * because poll() have more relaxed criterions than get subbuf,
-                * so get_subbuf may fail for short race windows where poll()
-                * would issue wakeups.
-                */
-               DBG("Reserving sub buffer failed (everything is normal, "
-                               "it is due to concurrency)");
-               ret = err;
-               goto error;
+       ret = kernctl_get_content_size(
+                       stream->wait_fd, &subbuf->info.data.content_size);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer content size");
+               goto end;
        }
 
-       /* Get the full subbuffer size including padding */
-       err = kernctl_get_padded_subbuf_size(infd, &len);
-       if (err != 0) {
-               PERROR("Getting sub-buffer len failed.");
-               err = kernctl_put_subbuf(infd);
-               if (err != 0) {
-                       if (err == -EFAULT) {
-                               PERROR("Error in unreserving sub buffer\n");
-                       } else if (err == -EIO) {
-                               /* Should never happen with newer LTTng versions */
-                               PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-                       }
-                       ret = err;
-                       goto error;
-               }
-               ret = err;
-               goto error;
+       ret = kernctl_get_timestamp_begin(
+                       stream->wait_fd, &subbuf->info.data.timestamp_begin);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer begin timestamp");
+               goto end;
        }
 
-       if (!stream->metadata_flag) {
-               ret = get_index_values(&index, infd);
-               if (ret < 0) {
-                       err = kernctl_put_subbuf(infd);
-                       if (err != 0) {
-                               if (err == -EFAULT) {
-                                       PERROR("Error in unreserving sub buffer\n");
-                               } else if (err == -EIO) {
-                                       /* Should never happen with newer LTTng versions */
-                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-                               }
-                               ret = err;
-                               goto error;
-                       }
-                       goto error;
-               }
-               ret = update_stream_stats(stream);
-               if (ret < 0) {
-                       err = kernctl_put_subbuf(infd);
-                       if (err != 0) {
-                               if (err == -EFAULT) {
-                                       PERROR("Error in unreserving sub buffer\n");
-                               } else if (err == -EIO) {
-                                       /* Should never happen with newer LTTng versions */
-                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-                               }
-                               ret = err;
-                               goto error;
-                       }
-                       goto error;
+       ret = kernctl_get_timestamp_end(
+                       stream->wait_fd, &subbuf->info.data.timestamp_end);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer end timestamp");
+               goto end;
+       }
+
+       ret = kernctl_get_events_discarded(
+                       stream->wait_fd, &subbuf->info.data.events_discarded);
+       if (ret) {
+               PERROR("Failed to get sub-buffer events discarded count");
+               goto end;
+       }
+
+       ret = kernctl_get_sequence_number(stream->wait_fd,
+                       &subbuf->info.data.sequence_number.value);
+       if (ret) {
+               /* May not be supported by older LTTng-modules. */
+               if (ret != -ENOTTY) {
+                       PERROR("Failed to get sub-buffer sequence number");
+                       goto end;
                }
        } else {
-               write_index = 0;
-               ret = metadata_stream_check_version(infd, stream);
-               if (ret < 0) {
-                       err = kernctl_put_subbuf(infd);
-                       if (err != 0) {
-                               if (err == -EFAULT) {
-                                       PERROR("Error in unreserving sub buffer\n");
-                               } else if (err == -EIO) {
-                                       /* Should never happen with newer LTTng versions */
-                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-                               }
-                               ret = err;
-                               goto error;
-                       }
-                       goto error;
-               }
+               subbuf->info.data.sequence_number.is_set = true;
        }
 
-       switch (stream->chan->output) {
-       case CONSUMER_CHANNEL_SPLICE:
-               /*
-                * XXX: The lttng-modules splice "actor" does not handle copying
-                * partial pages hence only using the subbuffer size without the
-                * padding makes the splice fail.
-                */
-               subbuf_size = len;
-               padding = 0;
+       ret = kernctl_get_stream_id(
+                       stream->wait_fd, &subbuf->info.data.stream_id);
+       if (ret < 0) {
+               PERROR("Failed to get stream id");
+               goto end;
+       }
 
-               /* splice the subbuffer to the tracefile */
-               ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
-                               padding, &index);
-               /*
-                * XXX: Splice does not support network streaming so the return value
-                * is simply checked against subbuf_size and not like the mmap() op.
-                */
-               if (ret != subbuf_size) {
-                       /*
-                        * display the error but continue processing to try
-                        * to release the subbuffer
-                        */
-                       ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
-                                       ret, subbuf_size);
-                       write_index = 0;
-               }
-               break;
-       case CONSUMER_CHANNEL_MMAP:
-       {
-               const char *subbuf_addr;
-               struct lttng_buffer_view subbuf_view;
-
-               /* Get subbuffer size without padding */
-               err = kernctl_get_subbuf_size(infd, &subbuf_size);
-               if (err != 0) {
-                       PERROR("Getting sub-buffer len failed.");
-                       err = kernctl_put_subbuf(infd);
-                       if (err != 0) {
-                               if (err == -EFAULT) {
-                                       PERROR("Error in unreserving sub buffer\n");
-                               } else if (err == -EIO) {
-                                       /* Should never happen with newer LTTng versions */
-                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-                               }
-                               ret = err;
-                               goto error;
-                       }
-                       ret = err;
-                       goto error;
+       ret = kernctl_get_instance_id(stream->wait_fd,
+                       &subbuf->info.data.stream_instance_id.value);
+       if (ret) {
+               /* May not be supported by older LTTng-modules. */
+               if (ret != -ENOTTY) {
+                       PERROR("Failed to get stream instance id");
+                       goto end;
                }
+       } else {
+               subbuf->info.data.stream_instance_id.is_set = true;
+       }
+end:
+       return ret;
+}
 
-               ret = get_current_subbuf_addr(stream, &subbuf_addr);
-               if (ret) {
-                       goto error_put_subbuf;
-               }
+static
+int get_subbuffer_common(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
 
-               /* Make sure the tracer is not gone mad on us! */
-               assert(len >= subbuf_size);
+       ret = kernctl_get_next_subbuf(stream->wait_fd);
+       if (ret) {
+               goto end;
+       }
 
-               padding = len - subbuf_size;
+       ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+                       stream, subbuffer);
+end:
+       return ret;
+}
 
-               subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+static
+int get_next_subbuffer_splice(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
 
-               /* write the subbuffer to the tracefile */
-               ret = lttng_consumer_on_read_subbuffer_mmap(
-                               ctx, stream, &subbuf_view, padding, &index);
-               /*
-                * The mmap operation should write subbuf_size amount of data
-                * when network streaming or the full padding (len) size when we
-                * are _not_ streaming.
-                */
-               if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
-                               (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
-                       /*
-                        * Display the error but continue processing to try to release the
-                        * subbuffer. This is a DBG statement since this is possible to
-                        * happen without being a critical error.
-                        */
-                       DBG("Error writing to tracefile "
-                                       "(ret: %zd != len: %lu != subbuf_size: %lu)",
-                                       ret, len, subbuf_size);
-                       write_index = 0;
-               }
-               break;
-       }
-       default:
-               ERR("Unknown output method");
-               ret = -EPERM;
+       ret = get_subbuffer_common(stream, subbuffer);
+       if (ret) {
+               goto end;
        }
-error_put_subbuf:
-       err = kernctl_put_next_subbuf(infd);
-       if (err != 0) {
-               if (err == -EFAULT) {
-                       PERROR("Error in unreserving sub buffer\n");
-               } else if (err == -EIO) {
-                       /* Should never happen with newer LTTng versions */
-                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
-               }
-               ret = err;
-               goto error;
-       } else if (in_error_state) {
-               goto error;
+
+       subbuffer->buffer.fd = stream->wait_fd;
+end:
+       return ret;
+}
+
+static
+int get_next_subbuffer_mmap(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
+       const char *addr;
+
+       ret = get_subbuffer_common(stream, subbuffer);
+       if (ret) {
+               goto end;
        }
 
-       /* Write index if needed. */
-       if (!write_index) {
+       ret = get_current_subbuf_addr(stream, &addr);
+       if (ret) {
                goto end;
        }
 
-       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
-               /*
-                * In live, block until all the metadata is sent.
-                */
-               pthread_mutex_lock(&stream->metadata_timer_lock);
-               assert(!stream->missed_metadata_flush);
-               stream->waiting_on_metadata = true;
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
-
-               err = consumer_stream_sync_metadata(ctx, stream->session_id);
-
-               pthread_mutex_lock(&stream->metadata_timer_lock);
-               stream->waiting_on_metadata = false;
-               if (stream->missed_metadata_flush) {
-                       stream->missed_metadata_flush = false;
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
-                       (void) consumer_flush_kernel_index(stream);
-               } else {
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
-               }
-               if (err < 0) {
-                       goto error;
+       subbuffer->buffer.buffer = lttng_buffer_view_init(
+                       addr, 0, subbuffer->info.data.padded_subbuf_size);
+end:
+       return ret;
+}
+
+static
+int put_next_subbuffer(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       const int ret = kernctl_put_next_subbuf(stream->wait_fd);
+
+       if (ret) {
+               if (ret == -EFAULT) {
+                       PERROR("Error in unreserving sub buffer");
+               } else if (ret == -EIO) {
+                       /* Should never happen with newer LTTng versions */
+                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
                }
        }
 
-       err = consumer_stream_write_index(stream, &index);
-       if (err < 0) {
-               goto error;
+       return ret;
+}
+
+static void lttng_kconsumer_set_stream_ops(
+               struct lttng_consumer_stream *stream)
+{
+       if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+               stream->read_subbuffer_ops.get_next_subbuffer =
+                               get_next_subbuffer_mmap;
+       } else {
+               stream->read_subbuffer_ops.get_next_subbuffer =
+                               get_next_subbuffer_splice;
        }
 
-end:
-error:
-       return ret;
+       if (stream->metadata_flag) {
+               stream->read_subbuffer_ops.extract_subbuffer_info =
+                               extract_metadata_subbuffer_info;
+       } else {
+               stream->read_subbuffer_ops.extract_subbuffer_info =
+                               extract_data_subbuffer_info;
+               if (stream->chan->is_live) {
+                       stream->read_subbuffer_ops.send_live_beacon =
+                                       consumer_flush_kernel_index;
+               }
+       }
+
+       stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
 }
 
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
@@ -1865,6 +1640,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                }
        }
 
+       lttng_kconsumer_set_stream_ops(stream);
+
        /* we return 0 to let the library handle the FD internally */
        return 0;
 
index b397b9cd33e6951cae3280c6478f034df90de82a..48c787a462424bd5740e8cfd66eaf5308b00eea3 100644 (file)
@@ -22,8 +22,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
                unsigned long *pos);
 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
-ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx);
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream);
 int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream);
 int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata);
index 2f09eac80763e2c20ae60b83e38e4593d5f84346..1af9840cd5184b0cfebad0bf680257fbbadac9a5 100644 (file)
@@ -7,7 +7,6 @@
  *
  */
 
-#include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <lttng/ust-ctl.h>
@@ -24,6 +23,7 @@
 #include <urcu/list.h>
 #include <signal.h>
 #include <stdbool.h>
+#include <stdint.h>
 
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
@@ -36,6 +36,7 @@
 #include <common/consumer/consumer-timer.h>
 #include <common/utils.h>
 #include <common/index/index.h>
+#include <common/consumer/consumer.h>
 
 #include "ust-consumer.h"
 
@@ -127,7 +128,7 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
        assert(channel);
        assert(ctx);
 
-       stream = consumer_allocate_stream(
+       stream = consumer_stream_create(
                        channel,
                        channel->key,
                        key,
@@ -1040,7 +1041,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
        do {
                health_code_update();
 
-               ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
+               ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
                if (ret < 0) {
                        goto error_stream;
                }
@@ -1230,8 +1231,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
-                                       stream, &subbuf_view, padded_len - len,
-                                       NULL);
+                                       stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
                                        ret = -EPERM;
@@ -2428,115 +2428,16 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
        return ustctl_stream_close_wakeup_fd(stream->ustream);
 }
 
-/*
- * Populate index values of a UST stream. Values are set in big endian order.
- *
- * Return 0 on success or else a negative value.
- */
-static int get_index_values(struct ctf_packet_index *index,
-               struct ustctl_consumer_stream *ustream)
-{
-       int ret;
-       uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
-                       events_discarded, stream_id, stream_instance_id,
-                       packet_seq_num;
-
-       ret = ustctl_get_timestamp_begin(ustream, &timestamp_begin);
-       if (ret < 0) {
-               PERROR("ustctl_get_timestamp_begin");
-               goto error;
-       }
-
-       ret = ustctl_get_timestamp_end(ustream, &timestamp_end);
-       if (ret < 0) {
-               PERROR("ustctl_get_timestamp_end");
-               goto error;
-       }
-
-       ret = ustctl_get_events_discarded(ustream, &events_discarded);
-       if (ret < 0) {
-               PERROR("ustctl_get_events_discarded");
-               goto error;
-       }
-
-       ret = ustctl_get_content_size(ustream, &content_size);
-       if (ret < 0) {
-               PERROR("ustctl_get_content_size");
-               goto error;
-       }
-
-       ret = ustctl_get_packet_size(ustream, &packet_size);
-       if (ret < 0) {
-               PERROR("ustctl_get_packet_size");
-               goto error;
-       }
-
-       ret = ustctl_get_stream_id(ustream, &stream_id);
-       if (ret < 0) {
-               PERROR("ustctl_get_stream_id");
-               goto error;
-       }
-
-       ret = ustctl_get_instance_id(ustream, &stream_instance_id);
-       if (ret < 0) {
-               PERROR("ustctl_get_instance_id");
-               goto error;
-       }
-
-       ret = ustctl_get_sequence_number(ustream, &packet_seq_num);
-       if (ret < 0) {
-               PERROR("ustctl_get_sequence_number");
-               goto error;
-       }
-
-       *index = (typeof(*index)) {
-               .offset = index->offset,
-               .packet_size = htobe64(packet_size),
-               .content_size = htobe64(content_size),
-               .timestamp_begin = htobe64(timestamp_begin),
-               .timestamp_end = htobe64(timestamp_end),
-               .events_discarded = htobe64(events_discarded),
-               .stream_id = htobe64(stream_id),
-               .stream_instance_id = htobe64(stream_instance_id),
-               .packet_seq_num = htobe64(packet_seq_num),
-       };
-
-error:
-       return ret;
-}
-
 static
-void metadata_stream_reset_cache(struct lttng_consumer_stream *stream,
-               struct consumer_metadata_cache *cache)
+void metadata_stream_reset_cache(struct lttng_consumer_stream *stream)
 {
-       DBG("Metadata stream update to version %" PRIu64,
-                       cache->version);
+       DBG("Reset metadata cache of session %" PRIu64,
+                       stream->chan->session_id);
        stream->ust_metadata_pushed = 0;
-       stream->metadata_version = cache->version;
+       stream->metadata_version = stream->chan->metadata_cache->version;
        stream->reset_metadata_flag = 1;
 }
 
-/*
- * Check if the version of the metadata stream and metadata cache match.
- * If the cache got updated, reset the metadata stream.
- * The stream lock and metadata cache lock MUST be held.
- * Return 0 on success, a negative value on error.
- */
-static
-int metadata_stream_check_version(struct lttng_consumer_stream *stream)
-{
-       int ret = 0;
-       struct consumer_metadata_cache *cache = stream->chan->metadata_cache;
-
-       if (cache->version == stream->metadata_version) {
-               goto end;
-       }
-       metadata_stream_reset_cache(stream, cache);
-
-end:
-       return ret;
-}
-
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
@@ -2550,10 +2451,6 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-       ret = metadata_stream_check_version(stream);
-       if (ret < 0) {
-               goto end;
-       }
        if (stream->chan->metadata_cache->max_offset
                        == stream->ust_metadata_pushed) {
                ret = 0;
@@ -2723,261 +2620,264 @@ end:
        return ret;
 }
 
-static
-int update_stream_stats(struct lttng_consumer_stream *stream)
+static int consumer_stream_ust_on_wake_up(struct lttng_consumer_stream *stream)
 {
-       int ret;
-       uint64_t seq, discarded;
+       int ret = 0;
 
-       ret = ustctl_get_sequence_number(stream->ustream, &seq);
-       if (ret < 0) {
-               PERROR("ustctl_get_sequence_number");
-               goto end;
-       }
        /*
-        * Start the sequence when we extract the first packet in case we don't
-        * start at 0 (for example if a consumer is not connected to the
-        * session immediately after the beginning).
+        * We can consume the 1 byte written into the wait_fd by
+        * UST. Don't trigger error if we cannot read this one byte
+        * (read returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+        *
+        * This is only done when the stream is monitored by a thread,
+        * before the flush is done after a hangup and if the stream
+        * is not flagged with data since there might be nothing to
+        * consume in the wait fd but still have data available
+        * flagged by the consumer wake up pipe.
         */
-       if (stream->last_sequence_number == -1ULL) {
-               stream->last_sequence_number = seq;
-       } else if (seq > stream->last_sequence_number) {
-               stream->chan->lost_packets += seq -
-                               stream->last_sequence_number - 1;
-       } else {
-               /* seq <= last_sequence_number */
-               ERR("Sequence number inconsistent : prev = %" PRIu64
-                               ", current = %" PRIu64,
-                               stream->last_sequence_number, seq);
-               ret = -1;
-               goto end;
+       if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
+               char dummy;
+               ssize_t readlen;
+
+               readlen = lttng_read(stream->wait_fd, &dummy, 1);
+               if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       ret = readlen;
+               }
        }
-       stream->last_sequence_number = seq;
 
-       ret = ustctl_get_events_discarded(stream->ustream, &discarded);
-       if (ret < 0) {
-               PERROR("kernctl_get_events_discarded");
+       return ret;
+}
+
+static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuf)
+{
+       int ret;
+
+       ret = ustctl_get_subbuf_size(
+                       stream->ustream, &subbuf->info.data.subbuf_size);
+       if (ret) {
                goto end;
        }
-       if (discarded < stream->last_discarded_events) {
-               /*
-                * Overflow has occurred. We assume only one wrap-around
-                * has occurred.
-                */
-               stream->chan->discarded_events +=
-                               (1ULL << (CAA_BITS_PER_LONG - 1)) -
-                               stream->last_discarded_events + discarded;
-       } else {
-               stream->chan->discarded_events += discarded -
-                               stream->last_discarded_events;
+
+       ret = ustctl_get_padded_subbuf_size(
+                       stream->ustream, &subbuf->info.data.padded_subbuf_size);
+       if (ret) {
+               goto end;
        }
-       stream->last_discarded_events = discarded;
-       ret = 0;
 
 end:
        return ret;
 }
 
-/*
- * Read subbuffer from the given stream.
- *
- * Stream and channel locks MUST be acquired by the caller.
- *
- * Return 0 on success else a negative value.
- */
-int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuf)
 {
-       unsigned long len, subbuf_size, padding;
-       int err, write_index = 1;
-       long ret = 0;
-       struct ustctl_consumer_stream *ustream;
-       struct ctf_packet_index index;
-       const char *subbuf_addr;
-       struct lttng_buffer_view subbuf_view;
+       int ret;
 
-       assert(stream);
-       assert(stream->ustream);
-       assert(ctx);
+       ret = extract_common_subbuffer_info(stream, subbuf);
+       if (ret) {
+               goto end;
+       }
 
-       DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
-                       stream->name);
+       subbuf->info.metadata.version = stream->chan->metadata_cache->version;
 
-       /* Ease our life for what's next. */
-       ustream = stream->ustream;
+end:
+       return ret;
+}
 
-       /*
-        * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
-        * error if we cannot read this one byte (read returns 0), or if the error
-        * is EAGAIN or EWOULDBLOCK.
-        *
-        * This is only done when the stream is monitored by a thread, before the
-        * flush is done after a hangup and if the stream is not flagged with data
-        * since there might be nothing to consume in the wait fd but still have
-        * data available flagged by the consumer wake up pipe.
-        */
-       if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
-               char dummy;
-               ssize_t readlen;
+static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuf)
+{
+       int ret;
 
-               readlen = lttng_read(stream->wait_fd, &dummy, 1);
-               if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
-                       ret = readlen;
-                       goto error;
-               }
+       ret = extract_common_subbuffer_info(stream, subbuf);
+       if (ret) {
+               goto end;
        }
 
-retry:
-       /* Get the next subbuffer */
-       err = ustctl_get_next_subbuf(ustream);
-       if (err != 0) {
-               /*
-                * Populate metadata info if the existing info has
-                * already been read.
-                */
-               if (stream->metadata_flag) {
-                       ret = commit_one_metadata_packet(stream);
-                       if (ret <= 0) {
-                               goto error;
-                       }
-                       goto retry;
-               }
+       ret = ustctl_get_packet_size(
+                       stream->ustream, &subbuf->info.data.packet_size);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer packet size");
+               goto end;
+       }
 
-               ret = err;      /* ustctl_get_next_subbuf returns negative, caller expect positive. */
-               /*
-                * This is a debug message even for single-threaded consumer,
-                * because poll() have more relaxed criterions than get subbuf,
-                * so get_subbuf may fail for short race windows where poll()
-                * would issue wakeups.
-                */
-               DBG("Reserving sub buffer failed (everything is normal, "
-                               "it is due to concurrency) [ret: %d]", err);
-               goto error;
+       ret = ustctl_get_content_size(
+                       stream->ustream, &subbuf->info.data.content_size);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer content size");
+               goto end;
        }
-       assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
 
-       if (!stream->metadata_flag) {
-               index.offset = htobe64(stream->out_fd_offset);
-               ret = get_index_values(&index, ustream);
-               if (ret < 0) {
-                       err = ustctl_put_subbuf(ustream);
-                       assert(err == 0);
-                       goto error;
-               }
+       ret = ustctl_get_timestamp_begin(
+                       stream->ustream, &subbuf->info.data.timestamp_begin);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer begin timestamp");
+               goto end;
+       }
 
-               /* Update the stream's sequence and discarded events count. */
-               ret = update_stream_stats(stream);
-               if (ret < 0) {
-                       PERROR("kernctl_get_events_discarded");
-                       err = ustctl_put_subbuf(ustream);
-                       assert(err == 0);
-                       goto error;
+       ret = ustctl_get_timestamp_end(
+                       stream->ustream, &subbuf->info.data.timestamp_end);
+       if (ret < 0) {
+               PERROR("Failed to get sub-buffer end timestamp");
+               goto end;
+       }
+
+       ret = ustctl_get_events_discarded(
+                       stream->ustream, &subbuf->info.data.events_discarded);
+       if (ret) {
+               PERROR("Failed to get sub-buffer events discarded count");
+               goto end;
+       }
+
+       ret = ustctl_get_sequence_number(stream->ustream,
+                       &subbuf->info.data.sequence_number.value);
+       if (ret) {
+               /* May not be supported by older LTTng-modules. */
+               if (ret != -ENOTTY) {
+                       PERROR("Failed to get sub-buffer sequence number");
+                       goto end;
                }
        } else {
-               write_index = 0;
+               subbuf->info.data.sequence_number.is_set = true;
        }
 
-       /* Get the full padded subbuffer size */
-       err = ustctl_get_padded_subbuf_size(ustream, &len);
-       assert(err == 0);
+       ret = ustctl_get_stream_id(
+                       stream->ustream, &subbuf->info.data.stream_id);
+       if (ret < 0) {
+               PERROR("Failed to get stream id");
+               goto end;
+       }
 
-       /* Get subbuffer data size (without padding) */
-       err = ustctl_get_subbuf_size(ustream, &subbuf_size);
-       assert(err == 0);
+       ret = ustctl_get_instance_id(stream->ustream,
+                       &subbuf->info.data.stream_instance_id.value);
+       if (ret) {
+               /* May not be supported by older LTTng-modules. */
+               if (ret != -ENOTTY) {
+                       PERROR("Failed to get stream instance id");
+                       goto end;
+               }
+       } else {
+               subbuf->info.data.stream_instance_id.is_set = true;
+       }
+end:
+       return ret;
+}
 
-       /* Make sure we don't get a subbuffer size bigger than the padded */
-       assert(len >= subbuf_size);
+static int get_next_subbuffer_common(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
+       const char *addr;
 
-       padding = len - subbuf_size;
+       ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+                       stream, subbuffer);
+       if (ret) {
+               goto end;
+       }
 
-       ret = get_current_subbuf_addr(stream, &subbuf_addr);
+       ret = get_current_subbuf_addr(stream, &addr);
        if (ret) {
-               write_index = 0;
-               goto error_put_subbuf;
+               goto end;
        }
 
-       subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
+       subbuffer->buffer.buffer = lttng_buffer_view_init(
+                       addr, 0, subbuffer->info.data.padded_subbuf_size);
+       assert(subbuffer->buffer.buffer.data != NULL);
+end:
+       return ret;
+}
 
-       /* write the subbuffer to the tracefile */
-       ret = lttng_consumer_on_read_subbuffer_mmap(
-                       ctx, stream, &subbuf_view, padding, &index);
-       /*
-        * The mmap operation should write subbuf_size amount of data when
-        * network streaming or the full padding (len) size when we are _not_
-        * streaming.
-        */
-       if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
-                       (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
-               /*
-                * Display the error but continue processing to try to release the
-                * subbuffer. This is a DBG statement since any unexpected kill or
-                * signal, the application gets unregistered, relayd gets closed or
-                * anything that affects the buffer lifetime will trigger this error.
-                * So, for the sake of the user, don't print this error since it can
-                * happen and it is OK with the code flow.
-                */
-               DBG("Error writing to tracefile "
-                               "(ret: %ld != len: %lu != subbuf_size: %lu)",
-                               ret, len, subbuf_size);
-               write_index = 0;
-       }
-error_put_subbuf:
-       err = ustctl_put_next_subbuf(ustream);
-       assert(err == 0);
+static int get_next_subbuffer(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
 
-       /*
-        * This will consumer the byte on the wait_fd if and only if there is not
-        * next subbuffer to be acquired.
-        */
-       if (!stream->metadata_flag) {
-               ret = notify_if_more_data(stream, ctx);
-               if (ret < 0) {
-                       goto error;
-               }
+       ret = ustctl_get_next_subbuf(stream->ustream);
+       if (ret) {
+               goto end;
        }
 
-       /* Write index if needed. */
-       if (!write_index) {
+       ret = get_next_subbuffer_common(stream, subbuffer);
+       if (ret) {
                goto end;
        }
+end:
+       return ret;
+}
 
-       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
-               /*
-                * In live, block until all the metadata is sent.
-                */
-               pthread_mutex_lock(&stream->metadata_timer_lock);
-               assert(!stream->missed_metadata_flush);
-               stream->waiting_on_metadata = true;
-               pthread_mutex_unlock(&stream->metadata_timer_lock);
-
-               err = consumer_stream_sync_metadata(ctx, stream->session_id);
-
-               pthread_mutex_lock(&stream->metadata_timer_lock);
-               stream->waiting_on_metadata = false;
-               if (stream->missed_metadata_flush) {
-                       stream->missed_metadata_flush = false;
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
-                       (void) consumer_flush_ust_index(stream);
-               } else {
-                       pthread_mutex_unlock(&stream->metadata_timer_lock);
+static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
+
+       ret = ustctl_get_next_subbuf(stream->ustream);
+       if (ret) {
+               ret = commit_one_metadata_packet(stream);
+               if (ret < 0) {
+                       goto end;
+               } else if (ret == 0) {
+                       /* Not an error, the cache is empty. */
+                       ret = -ENODATA;
+                       goto end;
                }
 
-               if (err < 0) {
-                       goto error;
+               ret = ustctl_get_next_subbuf(stream->ustream);
+               if (ret) {
+                       goto end;
                }
        }
 
-       assert(!stream->metadata_flag);
-       err = consumer_stream_write_index(stream, &index);
-       if (err < 0) {
-               goto error;
+       ret = get_next_subbuffer_common(stream, subbuffer);
+       if (ret) {
+               goto end;
        }
-
 end:
-error:
        return ret;
 }
 
+static int put_next_subbuffer(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       const int ret = ustctl_put_next_subbuf(stream->ustream);
+
+       assert(ret == 0);
+       return ret;
+}
+
+static int signal_metadata(struct lttng_consumer_stream *stream,
+               struct lttng_consumer_local_data *ctx)
+{
+       return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
+}
+
+static void lttng_ustconsumer_set_stream_ops(
+               struct lttng_consumer_stream *stream)
+{
+       stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
+       if (stream->metadata_flag) {
+               stream->read_subbuffer_ops.get_next_subbuffer =
+                               get_next_subbuffer_metadata;
+               stream->read_subbuffer_ops.extract_subbuffer_info =
+                               extract_metadata_subbuffer_info;
+               stream->read_subbuffer_ops.reset_metadata =
+                               metadata_stream_reset_cache;
+               stream->read_subbuffer_ops.on_sleep = signal_metadata;
+       } else {
+               stream->read_subbuffer_ops.get_next_subbuffer =
+                               get_next_subbuffer;
+               stream->read_subbuffer_ops.extract_subbuffer_info =
+                               extract_data_subbuffer_info;
+               stream->read_subbuffer_ops.on_sleep = notify_if_more_data;
+               if (stream->chan->is_live) {
+                       stream->read_subbuffer_ops.send_live_beacon =
+                                       consumer_flush_ust_index;
+               }
+       }
+
+       stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+}
+
 /*
  * Called when a stream is created.
  *
@@ -3000,6 +2900,8 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                        goto error;
                }
        }
+
+       lttng_ustconsumer_set_stream_ops(stream);
        ret = 0;
 
 error:
This page took 0.059488 seconds and 4 git commands to generate.