Commit | Line | Data |
---|---|---|
f5ba75b4 JG |
1 | /* |
2 | * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
3 | * | |
4 | * SPDX-License-Identifier: GPL-2.0-only | |
5 | * | |
6 | */ | |
7 | ||
c9e313bc | 8 | #include "metadata-bucket.hpp" |
f5ba75b4 | 9 | |
c9e313bc SM |
10 | #include <common/buffer-view.hpp> |
11 | #include <common/consumer/consumer.hpp> | |
12 | #include <common/dynamic-buffer.hpp> | |
13 | #include <common/macros.hpp> | |
14 | #include <common/error.hpp> | |
f5ba75b4 JG |
15 | |
16 | struct metadata_bucket { | |
17 | struct lttng_dynamic_buffer content; | |
18 | struct { | |
19 | metadata_bucket_flush_cb fn; | |
20 | void *data; | |
21 | } flush; | |
22 | unsigned int buffer_count; | |
23 | }; | |
24 | ||
25 | struct metadata_bucket *metadata_bucket_create( | |
26 | metadata_bucket_flush_cb flush, void *data) | |
27 | { | |
64803277 | 28 | metadata_bucket *bucket = zmalloc<metadata_bucket>(); |
f5ba75b4 JG |
29 | if (!bucket) { |
30 | PERROR("Failed to allocate buffer bucket"); | |
31 | goto end; | |
32 | } | |
33 | ||
34 | bucket->flush.fn = flush; | |
35 | bucket->flush.data = data; | |
36 | lttng_dynamic_buffer_init(&bucket->content); | |
37 | end: | |
38 | return bucket; | |
39 | } | |
40 | ||
41 | void metadata_bucket_destroy(struct metadata_bucket *bucket) | |
42 | { | |
43 | if (!bucket) { | |
44 | return; | |
45 | } | |
46 | ||
47 | if (bucket->content.size > 0) { | |
48 | WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u", | |
49 | bucket->content.size, bucket->buffer_count); | |
50 | } | |
51 | ||
52 | lttng_dynamic_buffer_reset(&bucket->content); | |
53 | free(bucket); | |
54 | } | |
55 | ||
56 | void metadata_bucket_reset(struct metadata_bucket *bucket) | |
57 | { | |
58 | lttng_dynamic_buffer_reset(&bucket->content); | |
59 | lttng_dynamic_buffer_init(&bucket->content); | |
60 | bucket->buffer_count = 0; | |
61 | } | |
62 | ||
63 | enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, | |
64 | const struct stream_subbuffer *buffer) | |
65 | { | |
66 | ssize_t ret; | |
67 | struct lttng_buffer_view flushed_view; | |
68 | struct stream_subbuffer flushed_subbuffer; | |
69 | enum metadata_bucket_status status; | |
70 | const bool should_flush = | |
71 | LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent); | |
72 | const size_t padding_this_buffer = | |
73 | buffer->info.metadata.padded_subbuf_size - | |
74 | buffer->info.metadata.subbuf_size; | |
75 | size_t flush_size; | |
76 | ||
77 | DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s", | |
78 | buffer->buffer.buffer.size, | |
79 | buffer->info.metadata.subbuf_size, | |
80 | buffer->info.metadata.padded_subbuf_size, | |
81 | buffer->info.metadata.coherent.value ? "true" : "false"); | |
82 | /* | |
83 | * If no metadata was accumulated and this buffer should be | |
84 | * flushed, don't copy it unecessarily; just flush it directly. | |
85 | */ | |
86 | if (!should_flush || bucket->buffer_count != 0) { | |
87 | /* | |
88 | * Append the _padded_ subbuffer since they are combined | |
89 | * into a single "virtual" subbuffer that will be | |
90 | * flushed at once. | |
91 | * | |
92 | * This means that some padding will be sent over the | |
93 | * network, but should not represent a large amount | |
94 | * of data as incoherent subbuffers are typically | |
95 | * pretty full. | |
96 | * | |
97 | * The padding of the last subbuffer (coherent) added to | |
98 | * the bucket is not sent, which is what really matters | |
99 | * from an efficiency point of view. | |
100 | */ | |
101 | ret = lttng_dynamic_buffer_append_view( | |
102 | &bucket->content, &buffer->buffer.buffer); | |
103 | if (ret) { | |
104 | status = METADATA_BUCKET_STATUS_ERROR; | |
105 | goto end; | |
106 | } | |
107 | } | |
108 | ||
109 | bucket->buffer_count++; | |
110 | if (!should_flush) { | |
111 | status = METADATA_BUCKET_STATUS_OK; | |
112 | goto end; | |
113 | } | |
114 | ||
115 | flushed_view = bucket->content.size != 0 ? | |
116 | lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) : | |
117 | lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1); | |
118 | ||
119 | /* | |
120 | * The flush is done with the size of all padded sub-buffers, except | |
121 | * for the last one which we can safely "trim". The padding of the last | |
122 | * packet will be reconstructed by the relay daemon. | |
123 | */ | |
124 | flush_size = flushed_view.size - padding_this_buffer; | |
125 | ||
126 | flushed_subbuffer = (typeof(flushed_subbuffer)) { | |
97535efa SM |
127 | .buffer = { |
128 | .buffer = flushed_view, | |
129 | }, | |
130 | .info = { | |
131 | .metadata = { | |
132 | .subbuf_size = flush_size, | |
133 | .padded_subbuf_size = flushed_view.size, | |
134 | .version = buffer->info.metadata.version, | |
135 | .coherent = buffer->info.metadata.coherent, | |
136 | }, | |
137 | }, | |
f5ba75b4 JG |
138 | }; |
139 | ||
140 | DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)", | |
141 | flushed_view.size, bucket->buffer_count, | |
142 | bucket->buffer_count > 1 ? "s" : ""); | |
143 | ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data); | |
144 | if (ret >= 0) { | |
145 | status = METADATA_BUCKET_STATUS_OK; | |
146 | } else { | |
147 | status = METADATA_BUCKET_STATUS_ERROR; | |
148 | } | |
149 | ||
150 | metadata_bucket_reset(bucket); | |
151 | ||
152 | end: | |
153 | return status; | |
154 | } |