2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
8 #include "metadata-bucket.hpp"
10 #include <common/buffer-view.hpp>
11 #include <common/consumer/consumer.hpp>
12 #include <common/dynamic-buffer.hpp>
13 #include <common/macros.hpp>
14 #include <common/error.hpp>
16 struct metadata_bucket
{
17 struct lttng_dynamic_buffer content
;
19 metadata_bucket_flush_cb fn
;
22 unsigned int buffer_count
;
25 struct metadata_bucket
*metadata_bucket_create(
26 metadata_bucket_flush_cb flush
, void *data
)
28 metadata_bucket
*bucket
= zmalloc
<metadata_bucket
>();
30 PERROR("Failed to allocate buffer bucket");
34 bucket
->flush
.fn
= flush
;
35 bucket
->flush
.data
= data
;
36 lttng_dynamic_buffer_init(&bucket
->content
);
41 void metadata_bucket_destroy(struct metadata_bucket
*bucket
)
47 if (bucket
->content
.size
> 0) {
48 WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
49 bucket
->content
.size
, bucket
->buffer_count
);
52 lttng_dynamic_buffer_reset(&bucket
->content
);
56 void metadata_bucket_reset(struct metadata_bucket
*bucket
)
58 lttng_dynamic_buffer_reset(&bucket
->content
);
59 lttng_dynamic_buffer_init(&bucket
->content
);
60 bucket
->buffer_count
= 0;
63 enum metadata_bucket_status
metadata_bucket_fill(struct metadata_bucket
*bucket
,
64 const struct stream_subbuffer
*buffer
)
67 struct lttng_buffer_view flushed_view
;
68 struct stream_subbuffer flushed_subbuffer
;
69 enum metadata_bucket_status status
;
70 const bool should_flush
=
71 LTTNG_OPTIONAL_GET(buffer
->info
.metadata
.coherent
);
72 const size_t padding_this_buffer
=
73 buffer
->info
.metadata
.padded_subbuf_size
-
74 buffer
->info
.metadata
.subbuf_size
;
77 DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
78 buffer
->buffer
.buffer
.size
,
79 buffer
->info
.metadata
.subbuf_size
,
80 buffer
->info
.metadata
.padded_subbuf_size
,
81 buffer
->info
.metadata
.coherent
.value
? "true" : "false");
83 * If no metadata was accumulated and this buffer should be
84 * flushed, don't copy it unecessarily; just flush it directly.
86 if (!should_flush
|| bucket
->buffer_count
!= 0) {
88 * Append the _padded_ subbuffer since they are combined
89 * into a single "virtual" subbuffer that will be
92 * This means that some padding will be sent over the
93 * network, but should not represent a large amount
94 * of data as incoherent subbuffers are typically
97 * The padding of the last subbuffer (coherent) added to
98 * the bucket is not sent, which is what really matters
99 * from an efficiency point of view.
101 ret
= lttng_dynamic_buffer_append_view(
102 &bucket
->content
, &buffer
->buffer
.buffer
);
104 status
= METADATA_BUCKET_STATUS_ERROR
;
109 bucket
->buffer_count
++;
111 status
= METADATA_BUCKET_STATUS_OK
;
115 flushed_view
= bucket
->content
.size
!= 0 ?
116 lttng_buffer_view_from_dynamic_buffer(&bucket
->content
, 0, -1) :
117 lttng_buffer_view_from_view(&buffer
->buffer
.buffer
, 0, -1);
120 * The flush is done with the size of all padded sub-buffers, except
121 * for the last one which we can safely "trim". The padding of the last
122 * packet will be reconstructed by the relay daemon.
124 flush_size
= flushed_view
.size
- padding_this_buffer
;
126 flushed_subbuffer
= (typeof(flushed_subbuffer
)) {
128 .buffer
= flushed_view
,
132 .subbuf_size
= flush_size
,
133 .padded_subbuf_size
= flushed_view
.size
,
134 .version
= buffer
->info
.metadata
.version
,
135 .coherent
= buffer
->info
.metadata
.coherent
,
140 DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
141 flushed_view
.size
, bucket
->buffer_count
,
142 bucket
->buffer_count
> 1 ? "s" : "");
143 ret
= bucket
->flush
.fn(&flushed_subbuffer
, bucket
->flush
.data
);
145 status
= METADATA_BUCKET_STATUS_OK
;
147 status
= METADATA_BUCKET_STATUS_ERROR
;
150 metadata_bucket_reset(bucket
);