sessiond: introduce ltt_session::locked_ref look-up functions
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
CommitLineData
f1e16794 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
f1e16794 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
f1e16794 5 *
f1e16794
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
c9e313bc
SM
9#include "consumer.hpp"
10#include "health-sessiond.hpp"
11#include "kernel-consumer.hpp"
28ab034a 12#include "lttng-sessiond.hpp"
c9e313bc
SM
13#include "notification-thread-commands.hpp"
14#include "session.hpp"
f1e16794 15
28ab034a
JG
16#include <common/common.hpp>
17#include <common/compat/string.hpp>
18#include <common/defaults.hpp>
56047f5a 19#include <common/urcu.hpp>
28ab034a
JG
20
21#include <inttypes.h>
22#include <stdio.h>
23#include <stdlib.h>
24#include <sys/stat.h>
25#include <unistd.h>
26
27static char *create_channel_path(struct consumer_output *consumer, size_t *consumer_path_offset)
00e2e675
DG
28{
29 int ret;
ffe60014 30 char tmp_path[PATH_MAX];
cd9adb8b 31 char *pathname = nullptr;
00e2e675 32
a0377dfe 33 LTTNG_ASSERT(consumer);
00e2e675 34
ffe60014 35 /* Get the right path name destination */
a5ba6fdd 36 if (consumer->type == CONSUMER_DST_LOCAL ||
28ab034a
JG
37 (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
38 consumer->relay_minor_version >= 11)) {
d2956687 39 pathname = strdup(consumer->domain_subdir);
bb3c4e70 40 if (!pathname) {
d2956687 41 PERROR("Failed to copy domain subdirectory string %s",
28ab034a 42 consumer->domain_subdir);
bb3c4e70
MD
43 goto error;
44 }
5da88b0f 45 *consumer_path_offset = strlen(consumer->domain_subdir);
d2956687 46 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
28ab034a 47 pathname);
ffe60014 48 } else {
5da88b0f 49 /* Network output, relayd < 2.11. */
28ab034a
JG
50 ret = snprintf(tmp_path,
51 sizeof(tmp_path),
52 "%s%s",
53 consumer->dst.net.base_dir,
54 consumer->domain_subdir);
ffe60014 55 if (ret < 0) {
2bba9e53 56 PERROR("snprintf kernel metadata path");
ffe60014 57 goto error;
dba13f1d
JG
58 } else if (ret >= sizeof(tmp_path)) {
59 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
28ab034a
JG
60 sizeof(tmp_path),
61 ret,
62 consumer->dst.net.base_dir,
63 consumer->domain_subdir);
dba13f1d 64 goto error;
ffe60014 65 }
f5436bfc 66 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
bb3c4e70 67 if (!pathname) {
f5436bfc 68 PERROR("lttng_strndup");
bb3c4e70
MD
69 goto error;
70 }
5da88b0f 71 *consumer_path_offset = 0;
ffe60014
DG
72 DBG3("Kernel network consumer subdir path: %s", pathname);
73 }
74
2bba9e53
DG
75 return pathname;
76
77error:
78 free(pathname);
cd9adb8b 79 return nullptr;
2bba9e53
DG
80}
81
82/*
83 * Sending a single channel to the consumer with command ADD_CHANNEL.
84 */
28ab034a
JG
85static int kernel_consumer_add_channel(struct consumer_socket *sock,
86 struct ltt_kernel_channel *channel,
87 struct ltt_kernel_session *ksession,
88 unsigned int monitor)
2bba9e53
DG
89{
90 int ret;
cd9adb8b 91 char *pathname = nullptr;
2bba9e53
DG
92 struct lttcomm_consumer_msg lkm;
93 struct consumer_output *consumer;
e9404c27 94 enum lttng_error_code status;
e9404c27 95 struct lttng_channel_extended *channel_attr_extended;
d2956687 96 bool is_local_trace;
5da88b0f 97 size_t consumer_path_offset = 0;
56047f5a 98 lttng::urcu::read_lock_guard read_lock;
d9a970b7 99 ltt_session::ref session;
2bba9e53
DG
100
101 /* Safety net */
a0377dfe
FD
102 LTTNG_ASSERT(channel);
103 LTTNG_ASSERT(ksession);
104 LTTNG_ASSERT(ksession->consumer);
2bba9e53 105
e9404c27 106 consumer = ksession->consumer;
28ab034a
JG
107 channel_attr_extended =
108 (struct lttng_channel_extended *) channel->channel->attr.extended.ptr;
2bba9e53 109
28ab034a 110 DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name);
d2956687 111 is_local_trace = consumer->net_seq_index == -1ULL;
2bba9e53 112
5da88b0f 113 pathname = create_channel_path(consumer, &consumer_path_offset);
bb3c4e70
MD
114 if (!pathname) {
115 ret = -1;
116 goto error;
117 }
2bba9e53 118
d2956687
JG
119 if (is_local_trace && ksession->current_trace_chunk) {
120 enum lttng_trace_chunk_status chunk_status;
121 char *pathname_index;
122
28ab034a 123 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname);
d2956687
JG
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
28ab034a
JG
134 chunk_status = lttng_trace_chunk_create_subdirectory(ksession->current_trace_chunk,
135 pathname_index);
d2956687
JG
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
00e2e675 143 /* Prep channel message structure */
638e7b4e 144 consumer_init_add_channel_comm_msg(&lkm,
28ab034a
JG
145 channel->key,
146 ksession->id,
147 &pathname[consumer_path_offset],
148 consumer->net_seq_index,
149 channel->channel->name,
150 channel->stream_count,
151 channel->channel->attr.output,
152 CONSUMER_CHANNEL_TYPE_DATA,
153 channel->channel->attr.tracefile_size,
154 channel->channel->attr.tracefile_count,
155 monitor,
156 channel->channel->attr.live_timer_interval,
157 ksession->is_live_session,
158 channel_attr_extended->monitor_timer_interval,
159 ksession->current_trace_chunk);
00e2e675 160
840cb59c 161 health_code_update();
ca03de58 162
00e2e675
DG
163 ret = consumer_send_channel(sock, &lkm);
164 if (ret < 0) {
165 goto error;
166 }
167
840cb59c 168 health_code_update();
d9a970b7
JG
169
170 try {
171 session = ltt_session::find_session(ksession->id);
172 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
173 ERR_FMT("Fatal error during the creation of a kernel channel: {}, location='{}'",
174 ex.what(),
175 ex.source_location);
176 abort();
177 }
178
179 ASSERT_LOCKED(session->_lock);
3130a40c 180 ASSERT_SESSION_LIST_LOCKED();
ca03de58 181
139a8d25 182 status = notification_thread_command_add_channel(the_notification_thread_handle,
28ab034a
JG
183 session->id,
184 channel->channel->name,
185 channel->key,
186 LTTNG_DOMAIN_KERNEL,
187 channel->channel->attr.subbuf_size *
188 channel->channel->attr.num_subbuf);
e9404c27
JG
189 if (status != LTTNG_OK) {
190 ret = -1;
191 goto error;
192 }
753873bf
JR
193
194 channel->published_to_notification_thread = true;
195
00e2e675 196error:
53efb85a 197 free(pathname);
00e2e675
DG
198 return ret;
199}
200
201/*
202 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
9a318688
JG
203 *
204 * The consumer socket lock must be held by the caller.
00e2e675 205 */
f50f23d9 206int kernel_consumer_add_metadata(struct consumer_socket *sock,
28ab034a
JG
207 struct ltt_kernel_session *ksession,
208 unsigned int monitor)
00e2e675
DG
209{
210 int ret;
00e2e675 211 struct lttcomm_consumer_msg lkm;
a7d9a3e7 212 struct consumer_output *consumer;
e098433c 213
56047f5a 214 lttng::urcu::read_lock_guard read_lock;
00e2e675
DG
215
216 /* Safety net */
a0377dfe
FD
217 LTTNG_ASSERT(ksession);
218 LTTNG_ASSERT(ksession->consumer);
219 LTTNG_ASSERT(sock);
00e2e675 220
28ab034a 221 DBG("Sending metadata %d to kernel consumer", ksession->metadata_stream_fd);
00e2e675
DG
222
223 /* Get consumer output pointer */
e098433c 224 consumer = ksession->consumer;
00e2e675 225
00e2e675 226 /* Prep channel message structure */
d42266a4 227 consumer_init_add_channel_comm_msg(&lkm,
28ab034a
JG
228 ksession->metadata->key,
229 ksession->id,
230 "",
231 consumer->net_seq_index,
232 ksession->metadata->conf->name,
233 1,
234 ksession->metadata->conf->attr.output,
235 CONSUMER_CHANNEL_TYPE_METADATA,
236 ksession->metadata->conf->attr.tracefile_size,
237 ksession->metadata->conf->attr.tracefile_count,
238 monitor,
239 ksession->metadata->conf->attr.live_timer_interval,
240 ksession->is_live_session,
241 0,
242 ksession->current_trace_chunk);
00e2e675 243
840cb59c 244 health_code_update();
ca03de58 245
00e2e675
DG
246 ret = consumer_send_channel(sock, &lkm);
247 if (ret < 0) {
248 goto error;
249 }
250
840cb59c 251 health_code_update();
ca03de58 252
00e2e675 253 /* Prep stream message structure */
e098433c 254 consumer_init_add_stream_comm_msg(&lkm,
28ab034a
JG
255 ksession->metadata->key,
256 ksession->metadata_stream_fd,
257 0 /* CPU: 0 for metadata. */);
00e2e675 258
840cb59c 259 health_code_update();
ca03de58 260
00e2e675 261 /* Send stream and file descriptor */
28ab034a 262 ret = consumer_send_stream(sock, consumer, &lkm, &ksession->metadata_stream_fd, 1);
00e2e675
DG
263 if (ret < 0) {
264 goto error;
265 }
266
840cb59c 267 health_code_update();
ca03de58 268
00e2e675
DG
269error:
270 return ret;
271}
272
273/*
274 * Sending a single stream to the consumer with command ADD_STREAM.
275 */
28ab034a
JG
276static int kernel_consumer_add_stream(struct consumer_socket *sock,
277 struct ltt_kernel_channel *channel,
278 struct ltt_kernel_stream *stream,
279 struct ltt_kernel_session *session)
00e2e675
DG
280{
281 int ret;
00e2e675 282 struct lttcomm_consumer_msg lkm;
a7d9a3e7 283 struct consumer_output *consumer;
00e2e675 284
a0377dfe
FD
285 LTTNG_ASSERT(channel);
286 LTTNG_ASSERT(stream);
287 LTTNG_ASSERT(session);
288 LTTNG_ASSERT(session->consumer);
289 LTTNG_ASSERT(sock);
00e2e675
DG
290
291 DBG("Sending stream %d of channel %s to kernel consumer",
28ab034a
JG
292 stream->fd,
293 channel->channel->name);
00e2e675
DG
294
295 /* Get consumer output pointer */
a7d9a3e7 296 consumer = session->consumer;
00e2e675 297
00e2e675 298 /* Prep stream consumer message */
28ab034a 299 consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, stream->cpu);
00e2e675 300
840cb59c 301 health_code_update();
ca03de58 302
00e2e675 303 /* Send stream and file descriptor */
a7d9a3e7 304 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
305 if (ret < 0) {
306 goto error;
307 }
308
840cb59c 309 health_code_update();
ca03de58 310
00e2e675
DG
311error:
312 return ret;
313}
314
a4baae1b
JD
315/*
316 * Sending the notification that all streams were sent with STREAMS_SENT.
317 */
318int kernel_consumer_streams_sent(struct consumer_socket *sock,
28ab034a
JG
319 struct ltt_kernel_session *session,
320 uint64_t channel_key)
a4baae1b
JD
321{
322 int ret;
323 struct lttcomm_consumer_msg lkm;
324 struct consumer_output *consumer;
325
a0377dfe
FD
326 LTTNG_ASSERT(sock);
327 LTTNG_ASSERT(session);
a4baae1b
JD
328
329 DBG("Sending streams_sent");
330 /* Get consumer output pointer */
331 consumer = session->consumer;
332
333 /* Prep stream consumer message */
28ab034a
JG
334 consumer_init_streams_sent_comm_msg(
335 &lkm, LTTNG_CONSUMER_STREAMS_SENT, channel_key, consumer->net_seq_index);
a4baae1b
JD
336
337 health_code_update();
338
339 /* Send stream and file descriptor */
340 ret = consumer_send_msg(sock, &lkm);
341 if (ret < 0) {
342 goto error;
343 }
344
345error:
346 return ret;
347}
348
f1e16794
DG
349/*
350 * Send all stream fds of kernel channel to the consumer.
9a318688
JG
351 *
352 * The consumer socket lock must be held by the caller.
f1e16794 353 */
1fc1b7c8 354int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
28ab034a
JG
355 struct ltt_kernel_channel *channel,
356 struct ltt_kernel_session *ksession,
357 unsigned int monitor)
f1e16794 358{
e99f9447 359 int ret = LTTNG_OK;
f1e16794 360 struct ltt_kernel_stream *stream;
00e2e675
DG
361
362 /* Safety net */
a0377dfe
FD
363 LTTNG_ASSERT(channel);
364 LTTNG_ASSERT(ksession);
365 LTTNG_ASSERT(ksession->consumer);
366 LTTNG_ASSERT(sock);
00e2e675 367
56047f5a 368 lttng::urcu::read_lock_guard read_lock;
e098433c 369
00e2e675 370 /* Bail out if consumer is disabled */
e098433c 371 if (!ksession->consumer->enabled) {
f73fabfd 372 ret = LTTNG_OK;
00e2e675
DG
373 goto error;
374 }
f1e16794 375
28ab034a 376 DBG("Sending streams of channel %s to kernel consumer", channel->channel->name);
f1e16794 377
e99f9447 378 if (!channel->sent_to_consumer) {
e098433c 379 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
e99f9447
MD
380 if (ret < 0) {
381 goto error;
382 }
383 channel->sent_to_consumer = true;
f1e16794
DG
384 }
385
386 /* Send streams */
28ab034a 387 cds_list_for_each_entry (stream, &channel->stream_list.head, list) {
6986ab9b 388 if (!stream->fd || stream->sent_to_consumer) {
f1e16794
DG
389 continue;
390 }
00e2e675
DG
391
392 /* Add stream on the kernel consumer side. */
28ab034a 393 ret = kernel_consumer_add_stream(sock, channel, stream, ksession);
f1e16794 394 if (ret < 0) {
f1e16794
DG
395 goto error;
396 }
6986ab9b 397 stream->sent_to_consumer = true;
f1e16794
DG
398 }
399
f1e16794
DG
400error:
401 return ret;
402}
403
404/*
405 * Send all stream fds of the kernel session to the consumer.
9a318688
JG
406 *
407 * The consumer socket lock must be held by the caller.
f1e16794 408 */
28ab034a 409int kernel_consumer_send_session(struct consumer_socket *sock, struct ltt_kernel_session *session)
f1e16794 410{
2bba9e53 411 int ret, monitor = 0;
f1e16794 412 struct ltt_kernel_channel *chan;
f1e16794 413
00e2e675 414 /* Safety net */
a0377dfe
FD
415 LTTNG_ASSERT(session);
416 LTTNG_ASSERT(session->consumer);
417 LTTNG_ASSERT(sock);
f1e16794 418
00e2e675
DG
419 /* Bail out if consumer is disabled */
420 if (!session->consumer->enabled) {
f73fabfd 421 ret = LTTNG_OK;
00e2e675 422 goto error;
f1e16794
DG
423 }
424
2bba9e53
DG
425 /* Don't monitor the streams on the consumer if in flight recorder. */
426 if (session->output_traces) {
427 monitor = 1;
428 }
429
00e2e675
DG
430 DBG("Sending session stream to kernel consumer");
431
609af759 432 if (session->metadata_stream_fd >= 0 && session->metadata) {
2bba9e53 433 ret = kernel_consumer_add_metadata(sock, session, monitor);
f1e16794 434 if (ret < 0) {
f1e16794
DG
435 goto error;
436 }
f1e16794
DG
437 }
438
00e2e675 439 /* Send channel and streams of it */
28ab034a
JG
440 cds_list_for_each_entry (chan, &session->channel_list.head, list) {
441 ret = kernel_consumer_send_channel_streams(sock, chan, session, monitor);
f1e16794
DG
442 if (ret < 0) {
443 goto error;
444 }
601262d6
JD
445 if (monitor) {
446 /*
447 * Inform the relay that all the streams for the
448 * channel were sent.
449 */
e1f3997a 450 ret = kernel_consumer_streams_sent(sock, session, chan->key);
601262d6
JD
451 if (ret < 0) {
452 goto error;
453 }
454 }
f1e16794
DG
455 }
456
00e2e675 457 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794 458
4ce9ff51 459 session->consumer_fds_sent = 1;
f1e16794
DG
460 return 0;
461
462error:
463 return ret;
464}
07b86b52
JD
465
466int kernel_consumer_destroy_channel(struct consumer_socket *socket,
28ab034a 467 struct ltt_kernel_channel *channel)
07b86b52
JD
468{
469 int ret;
470 struct lttcomm_consumer_msg msg;
471
a0377dfe
FD
472 LTTNG_ASSERT(channel);
473 LTTNG_ASSERT(socket);
07b86b52 474
e1f3997a 475 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
07b86b52 476
53efb85a 477 memset(&msg, 0, sizeof(msg));
07b86b52 478 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
e1f3997a 479 msg.u.destroy_channel.key = channel->key;
07b86b52
JD
480
481 pthread_mutex_lock(socket->lock);
482 health_code_update();
483
484 ret = consumer_send_msg(socket, &msg);
485 if (ret < 0) {
486 goto error;
487 }
488
489error:
490 health_code_update();
491 pthread_mutex_unlock(socket->lock);
492 return ret;
493}
494
495int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
28ab034a 496 struct ltt_kernel_metadata *metadata)
07b86b52
JD
497{
498 int ret;
499 struct lttcomm_consumer_msg msg;
500
a0377dfe
FD
501 LTTNG_ASSERT(metadata);
502 LTTNG_ASSERT(socket);
07b86b52 503
d40f0359 504 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
07b86b52 505
53efb85a 506 memset(&msg, 0, sizeof(msg));
07b86b52 507 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
d40f0359 508 msg.u.destroy_channel.key = metadata->key;
07b86b52
JD
509
510 pthread_mutex_lock(socket->lock);
511 health_code_update();
512
513 ret = consumer_send_msg(socket, &msg);
514 if (ret < 0) {
515 goto error;
516 }
517
518error:
519 health_code_update();
520 pthread_mutex_unlock(socket->lock);
521 return ret;
522}
This page took 0.10935 seconds and 4 git commands to generate.