+
+static ssize_t metadata_bucket_flush(
+ const struct stream_subbuffer *buffer, void *data)
+{
+ ssize_t ret;
+ struct lttng_consumer_stream *stream = data;
+
+ ret = consumer_stream_consume_mmap(NULL, stream, buffer);
+ if (ret < 0) {
+ goto end;
+ }
+end:
+ return ret;
+}
+
+static ssize_t metadata_bucket_consume(
+ struct lttng_consumer_local_data *unused,
+ struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer)
+{
+ ssize_t ret;
+ enum metadata_bucket_status status;
+
+ status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
+ switch (status) {
+ case METADATA_BUCKET_STATUS_OK:
+ /* Return consumed size. */
+ ret = subbuffer->buffer.buffer.size;
+ break;
+ default:
+ ret = -1;
+ }
+
+ return ret;
+}
+
+int consumer_stream_enable_metadata_bucketization(
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ assert(stream->metadata_flag);
+ assert(!stream->metadata_bucket);
+ assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+ stream->metadata_bucket = metadata_bucket_create(
+ metadata_bucket_flush, stream);
+ if (!stream->metadata_bucket) {
+ ret = -1;
+ goto end;
+ }
+
+ stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
+end:
+ return ret;
+}
+
+void consumer_stream_metadata_set_version(
+ struct lttng_consumer_stream *stream, uint64_t new_version)
+{
+ assert(new_version > stream->metadata_version);
+ stream->metadata_version = new_version;
+ stream->reset_metadata_flag = 1;
+
+ if (stream->metadata_bucket) {
+ metadata_bucket_reset(stream->metadata_bucket);
+ }
+}