sessiond: main.cpp: iterate on list using list_iteration_adapter
[lttng-tools.git] / src / bin / lttng-relayd / index.cpp
CommitLineData
1c20f0e2 1/*
ab5be9fa
MJ
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
1c20f0e2 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
1c20f0e2 7 *
1c20f0e2
JD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
1c20f0e2 11
28ab034a
JG
12#include "connection.hpp"
13#include "index.hpp"
c9e313bc
SM
14#include "lttng-relayd.hpp"
15#include "stream.hpp"
28ab034a
JG
16
17#include <common/common.hpp>
18#include <common/compat/endian.hpp>
56047f5a 19#include <common/urcu.hpp>
28ab034a 20#include <common/utils.hpp>
1c20f0e2
JD
21
22/*
7591bab1
MD
23 * Allocate a new relay index object. Pass the stream in which it is
24 * contained as parameter. The sequence number will be used as the hash
25 * table key.
26 *
27 * Called with stream mutex held.
28 * Return allocated object or else NULL on error.
1c20f0e2 29 */
28ab034a 30static struct relay_index *relay_index_create(struct relay_stream *stream, uint64_t net_seq_num)
1c20f0e2 31{
7591bab1 32 struct relay_index *index;
1c20f0e2 33
7591bab1 34 DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64,
28ab034a
JG
35 stream->stream_handle,
36 net_seq_num);
1c20f0e2 37
64803277 38 index = zmalloc<relay_index>();
7591bab1
MD
39 if (!index) {
40 PERROR("Relay index zmalloc");
41 goto end;
42 }
43 if (!stream_get(stream)) {
44 ERR("Cannot get stream");
45 free(index);
cd9adb8b 46 index = nullptr;
7591bab1 47 goto end;
1c20f0e2 48 }
7591bab1 49 index->stream = stream;
1c20f0e2 50
7591bab1 51 lttng_ht_node_init_u64(&index->index_n, net_seq_num);
cd9adb8b 52 pthread_mutex_init(&index->lock, nullptr);
7591bab1
MD
53 urcu_ref_init(&index->ref);
54
55end:
56 return index;
1c20f0e2
JD
57}
58
59/*
7591bab1
MD
60 * Add unique relay index to the given hash table. In case of a collision, the
61 * already existing object is put in the given _index variable.
1c20f0e2 62 *
7591bab1 63 * RCU read side lock MUST be acquired.
1c20f0e2 64 */
7591bab1 65static struct relay_index *relay_index_add_unique(struct relay_stream *stream,
28ab034a 66 struct relay_index *index)
1c20f0e2 67{
7591bab1
MD
68 struct cds_lfht_node *node_ptr;
69 struct relay_index *_index;
1c20f0e2 70
48b7cdc2
FD
71 ASSERT_RCU_READ_LOCKED();
72
7591bab1 73 DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64,
28ab034a
JG
74 stream->stream_handle,
75 index->index_n.key);
1c20f0e2 76
7591bab1 77 node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht,
2f566a88 78 stream->indexes_ht->hash_fct(&index->index_n.key, lttng_ht_seed),
28ab034a 79 stream->indexes_ht->match_fct,
2f566a88 80 &index->index_n.key,
28ab034a 81 &index->index_n.node);
7591bab1 82 if (node_ptr != &index->index_n.node) {
efff93c5 83 _index = lttng_ht_node_container_of(node_ptr, &relay_index::index_n);
7591bab1 84 } else {
cd9adb8b 85 _index = nullptr;
1c20f0e2 86 }
7591bab1
MD
87 return _index;
88}
89
90/*
91 * Should be called with RCU read-side lock held.
92 */
93static bool relay_index_get(struct relay_index *index)
94{
48b7cdc2
FD
95 ASSERT_RCU_READ_LOCKED();
96
7591bab1 97 DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
28ab034a
JG
98 index->stream->stream_handle,
99 index->index_n.key,
100 (int) index->ref.refcount);
1c20f0e2 101
ce4d4083 102 return urcu_ref_get_unless_zero(&index->ref);
1c20f0e2
JD
103}
104
105/*
7591bab1
MD
106 * Get a relayd index in within the given stream, or create it if not
107 * present.
1c20f0e2 108 *
7591bab1 109 * Called with stream mutex held.
1c20f0e2
JD
110 * Return index object or else NULL on error.
111 */
7591bab1 112struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream,
28ab034a 113 uint64_t net_seq_num)
1c20f0e2 114{
7591bab1 115 struct lttng_ht_node_u64 *node;
1c20f0e2 116 struct lttng_ht_iter iter;
cd9adb8b 117 struct relay_index *index = nullptr;
1c20f0e2 118
1c20f0e2 119 DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64,
28ab034a
JG
120 stream->stream_handle,
121 net_seq_num);
1c20f0e2 122
07c4863f 123 const lttng::urcu::read_lock_guard read_lock;
7591bab1 124 lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter);
00d7d903 125 node = lttng_ht_iter_get_node<lttng_ht_node_u64>(&iter);
7591bab1 126 if (node) {
0114db0e 127 index = lttng::utils::container_of(node, &relay_index::index_n);
7591bab1
MD
128 } else {
129 struct relay_index *oldindex;
130
131 index = relay_index_create(stream, net_seq_num);
132 if (!index) {
133 ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64,
28ab034a
JG
134 stream->stream_handle,
135 net_seq_num);
7591bab1
MD
136 goto end;
137 }
138 oldindex = relay_index_add_unique(stream, index);
139 if (oldindex) {
140 /* Added concurrently, keep old. */
141 relay_index_put(index);
142 index = oldindex;
143 if (!relay_index_get(index)) {
cd9adb8b 144 index = nullptr;
7591bab1
MD
145 }
146 } else {
147 stream->indexes_in_flight++;
148 index->in_hash_table = true;
149 }
1c20f0e2 150 }
1c20f0e2 151end:
7591bab1 152 DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64,
28ab034a
JG
153 (index == NULL) ? "NOT " : "",
154 stream->stream_handle,
155 net_seq_num);
1c20f0e2
JD
156 return index;
157}
158
f8f3885c 159int relay_index_set_file(struct relay_index *index,
28ab034a
JG
160 struct lttng_index_file *index_file,
161 uint64_t data_offset)
1c20f0e2 162{
7591bab1 163 int ret = 0;
1c20f0e2 164
7591bab1 165 pthread_mutex_lock(&index->lock);
f8f3885c 166 if (index->index_file) {
7591bab1
MD
167 ret = -1;
168 goto end;
169 }
f8f3885c
MD
170 lttng_index_file_get(index_file);
171 index->index_file = index_file;
7591bab1
MD
172 index->index_data.offset = data_offset;
173end:
174 pthread_mutex_unlock(&index->lock);
175 return ret;
176}
1c20f0e2 177
28ab034a 178int relay_index_set_data(struct relay_index *index, const struct ctf_packet_index *data)
7591bab1
MD
179{
180 int ret = 0;
1c20f0e2 181
7591bab1
MD
182 pthread_mutex_lock(&index->lock);
183 if (index->has_index_data) {
184 ret = -1;
185 goto end;
1c20f0e2 186 }
7591bab1
MD
187 /* Set everything except data_offset. */
188 index->index_data.packet_size = data->packet_size;
189 index->index_data.content_size = data->content_size;
190 index->index_data.timestamp_begin = data->timestamp_begin;
191 index->index_data.timestamp_end = data->timestamp_end;
192 index->index_data.events_discarded = data->events_discarded;
193 index->index_data.stream_id = data->stream_id;
194 index->has_index_data = true;
195end:
196 pthread_mutex_unlock(&index->lock);
197 return ret;
1c20f0e2
JD
198}
199
7591bab1 200static void index_destroy(struct relay_index *index)
1c20f0e2 201{
7591bab1
MD
202 free(index);
203}
1c20f0e2 204
7591bab1
MD
205static void index_destroy_rcu(struct rcu_head *rcu_head)
206{
28ab034a 207 struct relay_index *index = lttng::utils::container_of(rcu_head, &relay_index::rcu_node);
1c20f0e2 208
7591bab1 209 index_destroy(index);
1c20f0e2
JD
210}
211
7591bab1
MD
212/* Stream lock must be held by the caller. */
213static void index_release(struct urcu_ref *ref)
1c20f0e2 214{
0114db0e 215 struct relay_index *index = lttng::utils::container_of(ref, &relay_index::ref);
7591bab1
MD
216 struct relay_stream *stream = index->stream;
217 int ret;
218 struct lttng_ht_iter iter;
219
f8f3885c
MD
220 if (index->index_file) {
221 lttng_index_file_put(index->index_file);
cd9adb8b 222 index->index_file = nullptr;
7591bab1
MD
223 }
224 if (index->in_hash_table) {
225 /* Delete index from hash table. */
226 iter.iter.node = &index->index_n.node;
227 ret = lttng_ht_del(stream->indexes_ht, &iter);
a0377dfe 228 LTTNG_ASSERT(!ret);
7591bab1
MD
229 stream->indexes_in_flight--;
230 }
231
232 stream_put(index->stream);
cd9adb8b 233 index->stream = nullptr;
7591bab1
MD
234
235 call_rcu(&index->rcu_node, index_destroy_rcu);
1c20f0e2
JD
236}
237
238/*
7591bab1
MD
239 * Called with stream mutex held.
240 *
241 * Stream lock must be held by the caller.
1c20f0e2 242 */
7591bab1 243void relay_index_put(struct relay_index *index)
1c20f0e2 244{
7591bab1 245 DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d",
28ab034a
JG
246 index->stream->stream_handle,
247 index->index_n.key,
248 (int) index->ref.refcount);
7591bab1 249 /*
0f1b1d25 250 * Ensure existence of index->lock for index unlock.
7591bab1 251 */
07c4863f 252 const lttng::urcu::read_lock_guard read_lock;
7591bab1
MD
253 /*
254 * Index lock ensures that concurrent test and update of stream
255 * ref is atomic.
256 */
a0377dfe 257 LTTNG_ASSERT(index->ref.refcount != 0);
7591bab1 258 urcu_ref_put(&index->ref, index_release);
1c20f0e2
JD
259}
260
261/*
7591bab1
MD
262 * Try to flush index to disk. Releases self-reference to index once
263 * flush succeeds.
1c20f0e2 264 *
7591bab1
MD
265 * Stream lock must be held by the caller.
266 * Return 0 on successful flush, a negative value on error, or positive
267 * value if no flush was performed.
1c20f0e2 268 */
7591bab1 269int relay_index_try_flush(struct relay_index *index)
1c20f0e2 270{
7591bab1
MD
271 int ret = 1;
272 bool flushed = false;
1c20f0e2 273
7591bab1
MD
274 pthread_mutex_lock(&index->lock);
275 if (index->flushed) {
276 goto skip;
277 }
278 /* Check if we are ready to flush. */
f8f3885c 279 if (!index->has_index_data || !index->index_file) {
7591bab1
MD
280 goto skip;
281 }
8bb66c3c
JG
282
283 DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
28ab034a
JG
284 index->stream->stream_handle,
285 index->index_n.key);
7591bab1
MD
286 flushed = true;
287 index->flushed = true;
f8f3885c 288 ret = lttng_index_file_write(index->index_file, &index->index_data);
7591bab1
MD
289skip:
290 pthread_mutex_unlock(&index->lock);
1c20f0e2 291
7591bab1
MD
292 if (flushed) {
293 /* Put self-ref from index now that it has been flushed. */
294 relay_index_put(index);
295 }
296 return ret;
1c20f0e2
JD
297}
298
299/*
7591bab1
MD
300 * Close every relay index within a given stream, without flushing
301 * them.
1c20f0e2 302 */
7591bab1 303void relay_index_close_all(struct relay_stream *stream)
1c20f0e2 304{
7d9e46e2
JG
305 for (auto *index :
306 lttng::urcu::lfht_iteration_adapter<relay_index,
307 decltype(relay_index::index_n),
308 &relay_index::index_n>(*stream->indexes_ht->ht)) {
7591bab1
MD
309 /* Put self-ref from index. */
310 relay_index_put(index);
1c20f0e2 311 }
1c20f0e2 312}
3d07a857
MD
313
314void relay_index_close_partial_fd(struct relay_stream *stream)
315{
7d9e46e2
JG
316 for (auto *index :
317 lttng::urcu::lfht_iteration_adapter<relay_index,
318 decltype(relay_index::index_n),
319 &relay_index::index_n>(*stream->indexes_ht->ht)) {
f8f3885c 320 if (!index->index_file) {
3d07a857
MD
321 continue;
322 }
323 /*
f8f3885c 324 * Partial index has its index_file: we have only
3d07a857
MD
325 * received its info from the data socket.
326 * Put self-ref from index.
327 */
328 relay_index_put(index);
329 }
3d07a857
MD
330}
331
332uint64_t relay_index_find_last(struct relay_stream *stream)
333{
3d07a857
MD
334 uint64_t net_seq_num = -1ULL;
335
7d9e46e2
JG
336 for (auto *index :
337 lttng::urcu::lfht_iteration_adapter<relay_index,
338 decltype(relay_index::index_n),
339 &relay_index::index_n>(*stream->indexes_ht->ht)) {
28ab034a 340 if (net_seq_num == -1ULL || index->index_n.key > net_seq_num) {
3d07a857
MD
341 net_seq_num = index->index_n.key;
342 }
343 }
56047f5a 344
3d07a857
MD
345 return net_seq_num;
346}
d3ecc550
JD
347
348/*
349 * Update the index file of an already existing relay_index.
350 * Offsets by 'removed_data_count' the offset field of an index.
351 */
28ab034a
JG
352static int relay_index_switch_file(struct relay_index *index,
353 struct lttng_index_file *new_index_file,
354 uint64_t removed_data_count)
d3ecc550
JD
355{
356 int ret = 0;
357 uint64_t offset;
358
359 pthread_mutex_lock(&index->lock);
360 if (!index->index_file) {
361 ERR("No index_file");
362 ret = 0;
363 goto end;
364 }
365
366 lttng_index_file_put(index->index_file);
367 lttng_index_file_get(new_index_file);
368 index->index_file = new_index_file;
369 offset = be64toh(index->index_data.offset);
370 index->index_data.offset = htobe64(offset - removed_data_count);
371
372end:
373 pthread_mutex_unlock(&index->lock);
374 return ret;
375}
376
377/*
378 * Switch the index file of all pending indexes for a stream and update the
379 * data offset by substracting the last safe position.
380 * Stream lock must be held.
381 */
382int relay_index_switch_all_files(struct relay_stream *stream)
383{
7d9e46e2
JG
384 for (auto *index :
385 lttng::urcu::lfht_iteration_adapter<relay_index,
386 decltype(relay_index::index_n),
387 &relay_index::index_n>(*stream->indexes_ht->ht)) {
388 const auto ret = relay_index_switch_file(
28ab034a 389 index, stream->index_file, stream->pos_after_last_complete_data_index);
d3ecc550 390 if (ret) {
56047f5a 391 return ret;
d3ecc550
JD
392 }
393 }
56047f5a 394
7d9e46e2 395 return 0;
d3ecc550 396}
c35f9726
JG
397
398/*
399 * Set index data from the control port to a given index object.
400 */
401int relay_index_set_control_data(struct relay_index *index,
28ab034a
JG
402 const struct lttcomm_relayd_index *data,
403 unsigned int minor_version)
c35f9726
JG
404{
405 /* The index on disk is encoded in big endian. */
28ab034a 406 ctf_packet_index index_data{};
ac497a37
SM
407
408 index_data.packet_size = htobe64(data->packet_size);
409 index_data.content_size = htobe64(data->content_size);
410 index_data.timestamp_begin = htobe64(data->timestamp_begin);
411 index_data.timestamp_end = htobe64(data->timestamp_end);
412 index_data.events_discarded = htobe64(data->events_discarded);
413 index_data.stream_id = htobe64(data->stream_id);
c35f9726
JG
414
415 if (minor_version >= 8) {
416 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
417 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
0f83d1cc 418 } else {
07c4863f 419 const uint64_t unset_value = -1ULL;
0f83d1cc
MD
420
421 index->index_data.stream_instance_id = htobe64(unset_value);
422 index->index_data.packet_seq_num = htobe64(unset_value);
c35f9726
JG
423 }
424
425 return relay_index_set_data(index, &index_data);
426}
This page took 0.092452 seconds and 5 git commands to generate.