Commit | Line | Data |
---|---|---|
f5ba75b4 JG |
1 | /* |
2 | * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
3 | * | |
4 | * SPDX-License-Identifier: GPL-2.0-only | |
5 | * | |
6 | */ | |
7 | ||
c9e313bc | 8 | #include "metadata-bucket.hpp" |
f5ba75b4 | 9 | |
c9e313bc SM |
10 | #include <common/buffer-view.hpp> |
11 | #include <common/consumer/consumer.hpp> | |
12 | #include <common/dynamic-buffer.hpp> | |
c9e313bc | 13 | #include <common/error.hpp> |
28ab034a | 14 | #include <common/macros.hpp> |
f5ba75b4 JG |
15 | |
16 | struct metadata_bucket { | |
17 | struct lttng_dynamic_buffer content; | |
18 | struct { | |
19 | metadata_bucket_flush_cb fn; | |
20 | void *data; | |
21 | } flush; | |
22 | unsigned int buffer_count; | |
23 | }; | |
24 | ||
28ab034a | 25 | struct metadata_bucket *metadata_bucket_create(metadata_bucket_flush_cb flush, void *data) |
f5ba75b4 | 26 | { |
64803277 | 27 | metadata_bucket *bucket = zmalloc<metadata_bucket>(); |
f5ba75b4 JG |
28 | if (!bucket) { |
29 | PERROR("Failed to allocate buffer bucket"); | |
30 | goto end; | |
31 | } | |
32 | ||
33 | bucket->flush.fn = flush; | |
34 | bucket->flush.data = data; | |
35 | lttng_dynamic_buffer_init(&bucket->content); | |
36 | end: | |
37 | return bucket; | |
38 | } | |
39 | ||
40 | void metadata_bucket_destroy(struct metadata_bucket *bucket) | |
41 | { | |
42 | if (!bucket) { | |
43 | return; | |
44 | } | |
45 | ||
46 | if (bucket->content.size > 0) { | |
47 | WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u", | |
28ab034a JG |
48 | bucket->content.size, |
49 | bucket->buffer_count); | |
f5ba75b4 JG |
50 | } |
51 | ||
52 | lttng_dynamic_buffer_reset(&bucket->content); | |
53 | free(bucket); | |
54 | } | |
55 | ||
56 | void metadata_bucket_reset(struct metadata_bucket *bucket) | |
57 | { | |
58 | lttng_dynamic_buffer_reset(&bucket->content); | |
59 | lttng_dynamic_buffer_init(&bucket->content); | |
60 | bucket->buffer_count = 0; | |
61 | } | |
62 | ||
63 | enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, | |
28ab034a | 64 | const struct stream_subbuffer *buffer) |
f5ba75b4 JG |
65 | { |
66 | ssize_t ret; | |
67 | struct lttng_buffer_view flushed_view; | |
68 | struct stream_subbuffer flushed_subbuffer; | |
69 | enum metadata_bucket_status status; | |
28ab034a | 70 | const bool should_flush = LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent); |
f5ba75b4 | 71 | const size_t padding_this_buffer = |
28ab034a | 72 | buffer->info.metadata.padded_subbuf_size - buffer->info.metadata.subbuf_size; |
f5ba75b4 JG |
73 | size_t flush_size; |
74 | ||
75 | DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s", | |
28ab034a JG |
76 | buffer->buffer.buffer.size, |
77 | buffer->info.metadata.subbuf_size, | |
78 | buffer->info.metadata.padded_subbuf_size, | |
79 | buffer->info.metadata.coherent.value ? "true" : "false"); | |
f5ba75b4 JG |
80 | /* |
81 | * If no metadata was accumulated and this buffer should be | |
82 | * flushed, don't copy it unecessarily; just flush it directly. | |
83 | */ | |
84 | if (!should_flush || bucket->buffer_count != 0) { | |
85 | /* | |
86 | * Append the _padded_ subbuffer since they are combined | |
87 | * into a single "virtual" subbuffer that will be | |
88 | * flushed at once. | |
89 | * | |
90 | * This means that some padding will be sent over the | |
91 | * network, but should not represent a large amount | |
92 | * of data as incoherent subbuffers are typically | |
93 | * pretty full. | |
94 | * | |
95 | * The padding of the last subbuffer (coherent) added to | |
96 | * the bucket is not sent, which is what really matters | |
97 | * from an efficiency point of view. | |
98 | */ | |
28ab034a | 99 | ret = lttng_dynamic_buffer_append_view(&bucket->content, &buffer->buffer.buffer); |
f5ba75b4 JG |
100 | if (ret) { |
101 | status = METADATA_BUCKET_STATUS_ERROR; | |
102 | goto end; | |
103 | } | |
104 | } | |
105 | ||
106 | bucket->buffer_count++; | |
107 | if (!should_flush) { | |
108 | status = METADATA_BUCKET_STATUS_OK; | |
109 | goto end; | |
110 | } | |
111 | ||
112 | flushed_view = bucket->content.size != 0 ? | |
113 | lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) : | |
114 | lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1); | |
115 | ||
116 | /* | |
117 | * The flush is done with the size of all padded sub-buffers, except | |
118 | * for the last one which we can safely "trim". The padding of the last | |
119 | * packet will be reconstructed by the relay daemon. | |
120 | */ | |
121 | flush_size = flushed_view.size - padding_this_buffer; | |
122 | ||
123 | flushed_subbuffer = (typeof(flushed_subbuffer)) { | |
97535efa SM |
124 | .buffer = { |
125 | .buffer = flushed_view, | |
126 | }, | |
127 | .info = { | |
128 | .metadata = { | |
129 | .subbuf_size = flush_size, | |
130 | .padded_subbuf_size = flushed_view.size, | |
131 | .version = buffer->info.metadata.version, | |
132 | .coherent = buffer->info.metadata.coherent, | |
133 | }, | |
134 | }, | |
f5ba75b4 JG |
135 | }; |
136 | ||
137 | DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)", | |
28ab034a JG |
138 | flushed_view.size, |
139 | bucket->buffer_count, | |
140 | bucket->buffer_count > 1 ? "s" : ""); | |
f5ba75b4 JG |
141 | ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data); |
142 | if (ret >= 0) { | |
143 | status = METADATA_BUCKET_STATUS_OK; | |
144 | } else { | |
145 | status = METADATA_BUCKET_STATUS_ERROR; | |
146 | } | |
147 | ||
148 | metadata_bucket_reset(bucket); | |
149 | ||
150 | end: | |
151 | return status; | |
152 | } |