Commit | Line | Data |
---|---|---|
f5ba75b4 JG |
1 | /* |
2 | * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com> | |
3 | * | |
4 | * SPDX-License-Identifier: GPL-2.0-only | |
5 | * | |
6 | */ | |
7 | ||
c9e313bc | 8 | #include "metadata-bucket.hpp" |
f5ba75b4 | 9 | |
c9e313bc SM |
10 | #include <common/buffer-view.hpp> |
11 | #include <common/consumer/consumer.hpp> | |
12 | #include <common/dynamic-buffer.hpp> | |
13 | #include <common/macros.hpp> | |
14 | #include <common/error.hpp> | |
f5ba75b4 JG |
15 | |
16 | struct metadata_bucket { | |
17 | struct lttng_dynamic_buffer content; | |
18 | struct { | |
19 | metadata_bucket_flush_cb fn; | |
20 | void *data; | |
21 | } flush; | |
22 | unsigned int buffer_count; | |
23 | }; | |
24 | ||
25 | struct metadata_bucket *metadata_bucket_create( | |
26 | metadata_bucket_flush_cb flush, void *data) | |
27 | { | |
28 | struct metadata_bucket *bucket; | |
29 | ||
97535efa | 30 | bucket = (metadata_bucket *) zmalloc(sizeof(typeof(*bucket))); |
f5ba75b4 JG |
31 | if (!bucket) { |
32 | PERROR("Failed to allocate buffer bucket"); | |
33 | goto end; | |
34 | } | |
35 | ||
36 | bucket->flush.fn = flush; | |
37 | bucket->flush.data = data; | |
38 | lttng_dynamic_buffer_init(&bucket->content); | |
39 | end: | |
40 | return bucket; | |
41 | } | |
42 | ||
43 | void metadata_bucket_destroy(struct metadata_bucket *bucket) | |
44 | { | |
45 | if (!bucket) { | |
46 | return; | |
47 | } | |
48 | ||
49 | if (bucket->content.size > 0) { | |
50 | WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u", | |
51 | bucket->content.size, bucket->buffer_count); | |
52 | } | |
53 | ||
54 | lttng_dynamic_buffer_reset(&bucket->content); | |
55 | free(bucket); | |
56 | } | |
57 | ||
58 | void metadata_bucket_reset(struct metadata_bucket *bucket) | |
59 | { | |
60 | lttng_dynamic_buffer_reset(&bucket->content); | |
61 | lttng_dynamic_buffer_init(&bucket->content); | |
62 | bucket->buffer_count = 0; | |
63 | } | |
64 | ||
65 | enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket, | |
66 | const struct stream_subbuffer *buffer) | |
67 | { | |
68 | ssize_t ret; | |
69 | struct lttng_buffer_view flushed_view; | |
70 | struct stream_subbuffer flushed_subbuffer; | |
71 | enum metadata_bucket_status status; | |
72 | const bool should_flush = | |
73 | LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent); | |
74 | const size_t padding_this_buffer = | |
75 | buffer->info.metadata.padded_subbuf_size - | |
76 | buffer->info.metadata.subbuf_size; | |
77 | size_t flush_size; | |
78 | ||
79 | DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s", | |
80 | buffer->buffer.buffer.size, | |
81 | buffer->info.metadata.subbuf_size, | |
82 | buffer->info.metadata.padded_subbuf_size, | |
83 | buffer->info.metadata.coherent.value ? "true" : "false"); | |
84 | /* | |
85 | * If no metadata was accumulated and this buffer should be | |
86 | * flushed, don't copy it unecessarily; just flush it directly. | |
87 | */ | |
88 | if (!should_flush || bucket->buffer_count != 0) { | |
89 | /* | |
90 | * Append the _padded_ subbuffer since they are combined | |
91 | * into a single "virtual" subbuffer that will be | |
92 | * flushed at once. | |
93 | * | |
94 | * This means that some padding will be sent over the | |
95 | * network, but should not represent a large amount | |
96 | * of data as incoherent subbuffers are typically | |
97 | * pretty full. | |
98 | * | |
99 | * The padding of the last subbuffer (coherent) added to | |
100 | * the bucket is not sent, which is what really matters | |
101 | * from an efficiency point of view. | |
102 | */ | |
103 | ret = lttng_dynamic_buffer_append_view( | |
104 | &bucket->content, &buffer->buffer.buffer); | |
105 | if (ret) { | |
106 | status = METADATA_BUCKET_STATUS_ERROR; | |
107 | goto end; | |
108 | } | |
109 | } | |
110 | ||
111 | bucket->buffer_count++; | |
112 | if (!should_flush) { | |
113 | status = METADATA_BUCKET_STATUS_OK; | |
114 | goto end; | |
115 | } | |
116 | ||
117 | flushed_view = bucket->content.size != 0 ? | |
118 | lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) : | |
119 | lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1); | |
120 | ||
121 | /* | |
122 | * The flush is done with the size of all padded sub-buffers, except | |
123 | * for the last one which we can safely "trim". The padding of the last | |
124 | * packet will be reconstructed by the relay daemon. | |
125 | */ | |
126 | flush_size = flushed_view.size - padding_this_buffer; | |
127 | ||
128 | flushed_subbuffer = (typeof(flushed_subbuffer)) { | |
97535efa SM |
129 | .buffer = { |
130 | .buffer = flushed_view, | |
131 | }, | |
132 | .info = { | |
133 | .metadata = { | |
134 | .subbuf_size = flush_size, | |
135 | .padded_subbuf_size = flushed_view.size, | |
136 | .version = buffer->info.metadata.version, | |
137 | .coherent = buffer->info.metadata.coherent, | |
138 | }, | |
139 | }, | |
f5ba75b4 JG |
140 | }; |
141 | ||
142 | DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)", | |
143 | flushed_view.size, bucket->buffer_count, | |
144 | bucket->buffer_count > 1 ? "s" : ""); | |
145 | ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data); | |
146 | if (ret >= 0) { | |
147 | status = METADATA_BUCKET_STATUS_OK; | |
148 | } else { | |
149 | status = METADATA_BUCKET_STATUS_ERROR; | |
150 | } | |
151 | ||
152 | metadata_bucket_reset(bucket); | |
153 | ||
154 | end: | |
155 | return status; | |
156 | } |