clang-tidy: apply suggested fixes
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
CommitLineData
f1e16794 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
f1e16794 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
f1e16794 5 *
f1e16794
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
c9e313bc
SM
9#include "consumer.hpp"
10#include "health-sessiond.hpp"
11#include "kernel-consumer.hpp"
28ab034a 12#include "lttng-sessiond.hpp"
c9e313bc
SM
13#include "notification-thread-commands.hpp"
14#include "session.hpp"
f1e16794 15
28ab034a
JG
16#include <common/common.hpp>
17#include <common/compat/string.hpp>
18#include <common/defaults.hpp>
56047f5a 19#include <common/urcu.hpp>
28ab034a
JG
20
21#include <inttypes.h>
22#include <stdio.h>
23#include <stdlib.h>
24#include <sys/stat.h>
25#include <unistd.h>
26
27static char *create_channel_path(struct consumer_output *consumer, size_t *consumer_path_offset)
00e2e675
DG
28{
29 int ret;
ffe60014 30 char tmp_path[PATH_MAX];
cd9adb8b 31 char *pathname = nullptr;
00e2e675 32
a0377dfe 33 LTTNG_ASSERT(consumer);
00e2e675 34
ffe60014 35 /* Get the right path name destination */
a5ba6fdd 36 if (consumer->type == CONSUMER_DST_LOCAL ||
28ab034a
JG
37 (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
38 consumer->relay_minor_version >= 11)) {
d2956687 39 pathname = strdup(consumer->domain_subdir);
bb3c4e70 40 if (!pathname) {
d2956687 41 PERROR("Failed to copy domain subdirectory string %s",
28ab034a 42 consumer->domain_subdir);
bb3c4e70
MD
43 goto error;
44 }
5da88b0f 45 *consumer_path_offset = strlen(consumer->domain_subdir);
d2956687 46 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
28ab034a 47 pathname);
ffe60014 48 } else {
5da88b0f 49 /* Network output, relayd < 2.11. */
28ab034a
JG
50 ret = snprintf(tmp_path,
51 sizeof(tmp_path),
52 "%s%s",
53 consumer->dst.net.base_dir,
54 consumer->domain_subdir);
ffe60014 55 if (ret < 0) {
2bba9e53 56 PERROR("snprintf kernel metadata path");
ffe60014 57 goto error;
dba13f1d
JG
58 } else if (ret >= sizeof(tmp_path)) {
59 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
28ab034a
JG
60 sizeof(tmp_path),
61 ret,
62 consumer->dst.net.base_dir,
63 consumer->domain_subdir);
dba13f1d 64 goto error;
ffe60014 65 }
f5436bfc 66 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 67 if (!pathname) {
f5436bfc 68 PERROR("lttng_strndup");
bb3c4e70
MD
69 goto error;
70 }
5da88b0f 71 *consumer_path_offset = 0;
ffe60014
DG
72 DBG3("Kernel network consumer subdir path: %s", pathname);
73 }
74
2bba9e53
DG
75 return pathname;
76
77error:
78 free(pathname);
cd9adb8b 79 return nullptr;
2bba9e53
DG
80}
81
82/*
83 * Sending a single channel to the consumer with command ADD_CHANNEL.
84 */
28ab034a
JG
85static int kernel_consumer_add_channel(struct consumer_socket *sock,
86 struct ltt_kernel_channel *channel,
87 struct ltt_kernel_session *ksession,
88 unsigned int monitor)
2bba9e53
DG
89{
90 int ret;
cd9adb8b 91 char *pathname = nullptr;
2bba9e53
DG
92 struct lttcomm_consumer_msg lkm;
93 struct consumer_output *consumer;
e9404c27 94 enum lttng_error_code status;
e9404c27 95 struct lttng_channel_extended *channel_attr_extended;
d2956687 96 bool is_local_trace;
5da88b0f 97 size_t consumer_path_offset = 0;
07c4863f 98 const lttng::urcu::read_lock_guard read_lock;
2bba9e53
DG
99
100 /* Safety net */
a0377dfe
FD
101 LTTNG_ASSERT(channel);
102 LTTNG_ASSERT(ksession);
103 LTTNG_ASSERT(ksession->consumer);
2bba9e53 104
e9404c27 105 consumer = ksession->consumer;
28ab034a
JG
106 channel_attr_extended =
107 (struct lttng_channel_extended *) channel->channel->attr.extended.ptr;
2bba9e53 108
28ab034a 109 DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name);
d2956687 110 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 111
5da88b0f 112 pathname = create_channel_path(consumer, &consumer_path_offset);
bb3c4e70
MD
113 if (!pathname) {
114 ret = -1;
115 goto error;
116 }
2bba9e53 117
d2956687
JG
118 if (is_local_trace && ksession->current_trace_chunk) {
119 enum lttng_trace_chunk_status chunk_status;
120 char *pathname_index;
121
28ab034a 122 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname);
d2956687
JG
123 if (ret < 0) {
124 ERR("Failed to format channel index directory");
125 ret = -1;
126 goto error;
127 }
128
129 /*
130 * Create the index subdirectory which will take care
131 * of implicitly creating the channel's path.
132 */
28ab034a
JG
133 chunk_status = lttng_trace_chunk_create_subdirectory(ksession->current_trace_chunk,
134 pathname_index);
d2956687
JG
135 free(pathname_index);
136 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
137 ret = -1;
138 goto error;
139 }
140 }
141
00e2e675 142 /* Prep channel message structure */
638e7b4e 143 consumer_init_add_channel_comm_msg(&lkm,
28ab034a
JG
144 channel->key,
145 ksession->id,
146 &pathname[consumer_path_offset],
147 consumer->net_seq_index,
148 channel->channel->name,
149 channel->stream_count,
150 channel->channel->attr.output,
151 CONSUMER_CHANNEL_TYPE_DATA,
152 channel->channel->attr.tracefile_size,
153 channel->channel->attr.tracefile_count,
154 monitor,
155 channel->channel->attr.live_timer_interval,
156 ksession->is_live_session,
157 channel_attr_extended->monitor_timer_interval,
158 ksession->current_trace_chunk);
00e2e675 159
840cb59c 160 health_code_update();
ca03de58 161
00e2e675
DG
162 ret = consumer_send_channel(sock, &lkm);
163 if (ret < 0) {
164 goto error;
165 }
166
840cb59c 167 health_code_update();
d9a970b7
JG
168
169 try {
a0a4f314
JG
170 const auto session = ltt_session::find_session(ksession->id);
171
172 ASSERT_SESSION_LIST_LOCKED();
173
174 status = notification_thread_command_add_channel(
175 the_notification_thread_handle,
176 session->id,
177 channel->channel->name,
178 channel->key,
179 LTTNG_DOMAIN_KERNEL,
180 channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
d9a970b7
JG
181 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
182 ERR_FMT("Fatal error during the creation of a kernel channel: {}, location='{}'",
183 ex.what(),
184 ex.source_location);
185 abort();
186 }
187
e9404c27
JG
188 if (status != LTTNG_OK) {
189 ret = -1;
190 goto error;
191 }
753873bf
JR
192
193 channel->published_to_notification_thread = true;
194
00e2e675 195error:
53efb85a 196 free(pathname);
00e2e675
DG
197 return ret;
198}
199
200/*
201 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
202 *
203 * The consumer socket lock must be held by the caller.
00e2e675 204 */
f50f23d9 205int kernel_consumer_add_metadata(struct consumer_socket *sock,
28ab034a
JG
206 struct ltt_kernel_session *ksession,
207 unsigned int monitor)
00e2e675
DG
208{
209 int ret;
00e2e675 210 struct lttcomm_consumer_msg lkm;
a7d9a3e7 211 struct consumer_output *consumer;
e098433c 212
07c4863f 213 const lttng::urcu::read_lock_guard read_lock;
00e2e675
DG
214
215 /* Safety net */
a0377dfe
FD
216 LTTNG_ASSERT(ksession);
217 LTTNG_ASSERT(ksession->consumer);
218 LTTNG_ASSERT(sock);
00e2e675 219
28ab034a 220 DBG("Sending metadata %d to kernel consumer", ksession->metadata_stream_fd);
00e2e675
DG
221
222 /* Get consumer output pointer */
e098433c 223 consumer = ksession->consumer;
00e2e675 224
00e2e675 225 /* Prep channel message structure */
d42266a4 226 consumer_init_add_channel_comm_msg(&lkm,
28ab034a
JG
227 ksession->metadata->key,
228 ksession->id,
229 "",
230 consumer->net_seq_index,
231 ksession->metadata->conf->name,
232 1,
233 ksession->metadata->conf->attr.output,
234 CONSUMER_CHANNEL_TYPE_METADATA,
235 ksession->metadata->conf->attr.tracefile_size,
236 ksession->metadata->conf->attr.tracefile_count,
237 monitor,
238 ksession->metadata->conf->attr.live_timer_interval,
239 ksession->is_live_session,
240 0,
241 ksession->current_trace_chunk);
00e2e675 242
840cb59c 243 health_code_update();
ca03de58 244
00e2e675
DG
245 ret = consumer_send_channel(sock, &lkm);
246 if (ret < 0) {
247 goto error;
248 }
249
840cb59c 250 health_code_update();
ca03de58 251
00e2e675 252 /* Prep stream message structure */
e098433c 253 consumer_init_add_stream_comm_msg(&lkm,
28ab034a
JG
254 ksession->metadata->key,
255 ksession->metadata_stream_fd,
256 0 /* CPU: 0 for metadata. */);
00e2e675 257
840cb59c 258 health_code_update();
ca03de58 259
00e2e675 260 /* Send stream and file descriptor */
28ab034a 261 ret = consumer_send_stream(sock, consumer, &lkm, &ksession->metadata_stream_fd, 1);
00e2e675
DG
262 if (ret < 0) {
263 goto error;
264 }
265
840cb59c 266 health_code_update();
ca03de58 267
00e2e675
DG
268error:
269 return ret;
270}
271
272/*
273 * Sending a single stream to the consumer with command ADD_STREAM.
274 */
28ab034a
JG
275static int kernel_consumer_add_stream(struct consumer_socket *sock,
276 struct ltt_kernel_channel *channel,
277 struct ltt_kernel_stream *stream,
278 struct ltt_kernel_session *session)
00e2e675
DG
279{
280 int ret;
00e2e675 281 struct lttcomm_consumer_msg lkm;
a7d9a3e7 282 struct consumer_output *consumer;
00e2e675 283
a0377dfe
FD
284 LTTNG_ASSERT(channel);
285 LTTNG_ASSERT(stream);
286 LTTNG_ASSERT(session);
287 LTTNG_ASSERT(session->consumer);
288 LTTNG_ASSERT(sock);
00e2e675
DG
289
290 DBG("Sending stream %d of channel %s to kernel consumer",
28ab034a
JG
291 stream->fd,
292 channel->channel->name);
00e2e675
DG
293
294 /* Get consumer output pointer */
a7d9a3e7 295 consumer = session->consumer;
00e2e675 296
00e2e675 297 /* Prep stream consumer message */
28ab034a 298 consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, stream->cpu);
00e2e675 299
840cb59c 300 health_code_update();
ca03de58 301
00e2e675 302 /* Send stream and file descriptor */
a7d9a3e7 303 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
304 if (ret < 0) {
305 goto error;
306 }
307
840cb59c 308 health_code_update();
ca03de58 309
00e2e675
DG
310error:
311 return ret;
312}
313
a4baae1b
JD
314/*
315 * Sending the notification that all streams were sent with STREAMS_SENT.
316 */
317int kernel_consumer_streams_sent(struct consumer_socket *sock,
28ab034a
JG
318 struct ltt_kernel_session *session,
319 uint64_t channel_key)
a4baae1b
JD
320{
321 int ret;
322 struct lttcomm_consumer_msg lkm;
323 struct consumer_output *consumer;
324
a0377dfe
FD
325 LTTNG_ASSERT(sock);
326 LTTNG_ASSERT(session);
a4baae1b
JD
327
328 DBG("Sending streams_sent");
329 /* Get consumer output pointer */
330 consumer = session->consumer;
331
332 /* Prep stream consumer message */
28ab034a
JG
333 consumer_init_streams_sent_comm_msg(
334 &lkm, LTTNG_CONSUMER_STREAMS_SENT, channel_key, consumer->net_seq_index);
a4baae1b
JD
335
336 health_code_update();
337
338 /* Send stream and file descriptor */
339 ret = consumer_send_msg(sock, &lkm);
340 if (ret < 0) {
341 goto error;
342 }
343
344error:
345 return ret;
346}
347
f1e16794
DG
348/*
349 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
350 *
351 * The consumer socket lock must be held by the caller.
f1e16794 352 */
1fc1b7c8 353int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
28ab034a
JG
354 struct ltt_kernel_channel *channel,
355 struct ltt_kernel_session *ksession,
356 unsigned int monitor)
f1e16794 357{
e99f9447 358 int ret = LTTNG_OK;
f1e16794 359 struct ltt_kernel_stream *stream;
00e2e675
DG
360
361 /* Safety net */
a0377dfe
FD
362 LTTNG_ASSERT(channel);
363 LTTNG_ASSERT(ksession);
364 LTTNG_ASSERT(ksession->consumer);
365 LTTNG_ASSERT(sock);
00e2e675 366
07c4863f 367 const lttng::urcu::read_lock_guard read_lock;
e098433c 368
00e2e675 369 /* Bail out if consumer is disabled */
e098433c 370 if (!ksession->consumer->enabled) {
f73fabfd 371 ret = LTTNG_OK;
00e2e675
DG
372 goto error;
373 }
f1e16794 374
28ab034a 375 DBG("Sending streams of channel %s to kernel consumer", channel->channel->name);
f1e16794 376
e99f9447 377 if (!channel->sent_to_consumer) {
e098433c 378 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
379 if (ret < 0) {
380 goto error;
381 }
382 channel->sent_to_consumer = true;
f1e16794
DG
383 }
384
385 /* Send streams */
28ab034a 386 cds_list_for_each_entry (stream, &channel->stream_list.head, list) {
6986ab9b 387 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
388 continue;
389 }
00e2e675
DG
390
391 /* Add stream on the kernel consumer side. */
28ab034a 392 ret = kernel_consumer_add_stream(sock, channel, stream, ksession);
f1e16794 393 if (ret < 0) {
f1e16794
DG
394 goto error;
395 }
6986ab9b 396 stream->sent_to_consumer = true;
f1e16794
DG
397 }
398
f1e16794
DG
399error:
400 return ret;
401}
402
403/*
404 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
405 *
406 * The consumer socket lock must be held by the caller.
f1e16794 407 */
28ab034a 408int kernel_consumer_send_session(struct consumer_socket *sock, struct ltt_kernel_session *session)
f1e16794 409{
2bba9e53 410 int ret, monitor = 0;
f1e16794 411 struct ltt_kernel_channel *chan;
f1e16794 412
00e2e675 413 /* Safety net */
a0377dfe
FD
414 LTTNG_ASSERT(session);
415 LTTNG_ASSERT(session->consumer);
416 LTTNG_ASSERT(sock);
f1e16794 417
00e2e675
DG
418 /* Bail out if consumer is disabled */
419 if (!session->consumer->enabled) {
f73fabfd 420 ret = LTTNG_OK;
00e2e675 421 goto error;
f1e16794
DG
422 }
423
2bba9e53
DG
424 /* Don't monitor the streams on the consumer if in flight recorder. */
425 if (session->output_traces) {
426 monitor = 1;
427 }
428
00e2e675
DG
429 DBG("Sending session stream to kernel consumer");
430
609af759 431 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 432 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 433 if (ret < 0) {
f1e16794
DG
434 goto error;
435 }
f1e16794
DG
436 }
437
00e2e675 438 /* Send channel and streams of it */
28ab034a
JG
439 cds_list_for_each_entry (chan, &session->channel_list.head, list) {
440 ret = kernel_consumer_send_channel_streams(sock, chan, session, monitor);
f1e16794
DG
441 if (ret < 0) {
442 goto error;
443 }
601262d6
JD
444 if (monitor) {
445 /*
446 * Inform the relay that all the streams for the
447 * channel were sent.
448 */
e1f3997a 449 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
450 if (ret < 0) {
451 goto error;
452 }
453 }
f1e16794
DG
454 }
455
00e2e675 456 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 457
4ce9ff51 458 session->consumer_fds_sent = 1;
f1e16794
DG
459 return 0;
460
461error:
462 return ret;
463}
07b86b52
JD
464
465int kernel_consumer_destroy_channel(struct consumer_socket *socket,
28ab034a 466 struct ltt_kernel_channel *channel)
07b86b52
JD
467{
468 int ret;
469 struct lttcomm_consumer_msg msg;
470
a0377dfe
FD
471 LTTNG_ASSERT(channel);
472 LTTNG_ASSERT(socket);
07b86b52 473
e1f3997a 474 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 475
53efb85a 476 memset(&msg, 0, sizeof(msg));
07b86b52 477 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 478 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
479
480 pthread_mutex_lock(socket->lock);
481 health_code_update();
482
483 ret = consumer_send_msg(socket, &msg);
484 if (ret < 0) {
485 goto error;
486 }
487
488error:
489 health_code_update();
490 pthread_mutex_unlock(socket->lock);
491 return ret;
492}
493
494int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
28ab034a 495 struct ltt_kernel_metadata *metadata)
07b86b52
JD
496{
497 int ret;
498 struct lttcomm_consumer_msg msg;
499
a0377dfe
FD
500 LTTNG_ASSERT(metadata);
501 LTTNG_ASSERT(socket);
07b86b52 502
d40f0359 503 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 504
53efb85a 505 memset(&msg, 0, sizeof(msg));
07b86b52 506 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 507 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
508
509 pthread_mutex_lock(socket->lock);
510 health_code_update();
511
512 ret = consumer_send_msg(socket, &msg);
513 if (ret < 0) {
514 goto error;
515 }
516
517error:
518 health_code_update();
519 pthread_mutex_unlock(socket->lock);
520 return ret;
521}
This page took 0.11834 seconds and 4 git commands to generate.