Fix: file-descriptor: missing include guards
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include "consumer.hpp"
10 #include "health-sessiond.hpp"
11 #include "kernel-consumer.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "notification-thread-commands.hpp"
14 #include "session.hpp"
15
16 #include <common/common.hpp>
17 #include <common/compat/string.hpp>
18 #include <common/defaults.hpp>
19 #include <common/urcu.hpp>
20
21 #include <inttypes.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <sys/stat.h>
25 #include <unistd.h>
26
27 static char *create_channel_path(struct consumer_output *consumer, size_t *consumer_path_offset)
28 {
29 int ret;
30 char tmp_path[PATH_MAX];
31 char *pathname = nullptr;
32
33 LTTNG_ASSERT(consumer);
34
35 /* Get the right path name destination */
36 if (consumer->type == CONSUMER_DST_LOCAL ||
37 (consumer->type == CONSUMER_DST_NET && consumer->relay_major_version == 2 &&
38 consumer->relay_minor_version >= 11)) {
39 pathname = strdup(consumer->domain_subdir);
40 if (!pathname) {
41 PERROR("Failed to copy domain subdirectory string %s",
42 consumer->domain_subdir);
43 goto error;
44 }
45 *consumer_path_offset = strlen(consumer->domain_subdir);
46 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
47 pathname);
48 } else {
49 /* Network output, relayd < 2.11. */
50 ret = snprintf(tmp_path,
51 sizeof(tmp_path),
52 "%s%s",
53 consumer->dst.net.base_dir,
54 consumer->domain_subdir);
55 if (ret < 0) {
56 PERROR("snprintf kernel metadata path");
57 goto error;
58 } else if (ret >= sizeof(tmp_path)) {
59 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
60 sizeof(tmp_path),
61 ret,
62 consumer->dst.net.base_dir,
63 consumer->domain_subdir);
64 goto error;
65 }
66 pathname = lttng_strndup(tmp_path, sizeof(tmp_path));
67 if (!pathname) {
68 PERROR("lttng_strndup");
69 goto error;
70 }
71 *consumer_path_offset = 0;
72 DBG3("Kernel network consumer subdir path: %s", pathname);
73 }
74
75 return pathname;
76
77 error:
78 free(pathname);
79 return nullptr;
80 }
81
82 /*
83 * Sending a single channel to the consumer with command ADD_CHANNEL.
84 */
85 static int kernel_consumer_add_channel(struct consumer_socket *sock,
86 struct ltt_kernel_channel *channel,
87 struct ltt_kernel_session *ksession,
88 unsigned int monitor)
89 {
90 int ret;
91 char *pathname = nullptr;
92 struct lttcomm_consumer_msg lkm;
93 struct consumer_output *consumer;
94 enum lttng_error_code status;
95 struct ltt_session *session = nullptr;
96 struct lttng_channel_extended *channel_attr_extended;
97 bool is_local_trace;
98 size_t consumer_path_offset = 0;
99 lttng::urcu::read_lock_guard read_lock;
100
101 /* Safety net */
102 LTTNG_ASSERT(channel);
103 LTTNG_ASSERT(ksession);
104 LTTNG_ASSERT(ksession->consumer);
105
106 consumer = ksession->consumer;
107 channel_attr_extended =
108 (struct lttng_channel_extended *) channel->channel->attr.extended.ptr;
109
110 DBG("Kernel consumer adding channel %s to kernel consumer", channel->channel->name);
111 is_local_trace = consumer->net_seq_index == -1ULL;
112
113 pathname = create_channel_path(consumer, &consumer_path_offset);
114 if (!pathname) {
115 ret = -1;
116 goto error;
117 }
118
119 if (is_local_trace && ksession->current_trace_chunk) {
120 enum lttng_trace_chunk_status chunk_status;
121 char *pathname_index;
122
123 ret = asprintf(&pathname_index, "%s/" DEFAULT_INDEX_DIR, pathname);
124 if (ret < 0) {
125 ERR("Failed to format channel index directory");
126 ret = -1;
127 goto error;
128 }
129
130 /*
131 * Create the index subdirectory which will take care
132 * of implicitly creating the channel's path.
133 */
134 chunk_status = lttng_trace_chunk_create_subdirectory(ksession->current_trace_chunk,
135 pathname_index);
136 free(pathname_index);
137 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
138 ret = -1;
139 goto error;
140 }
141 }
142
143 /* Prep channel message structure */
144 consumer_init_add_channel_comm_msg(&lkm,
145 channel->key,
146 ksession->id,
147 &pathname[consumer_path_offset],
148 consumer->net_seq_index,
149 channel->channel->name,
150 channel->stream_count,
151 channel->channel->attr.output,
152 CONSUMER_CHANNEL_TYPE_DATA,
153 channel->channel->attr.tracefile_size,
154 channel->channel->attr.tracefile_count,
155 monitor,
156 channel->channel->attr.live_timer_interval,
157 ksession->is_live_session,
158 channel_attr_extended->monitor_timer_interval,
159 ksession->current_trace_chunk);
160
161 health_code_update();
162
163 ret = consumer_send_channel(sock, &lkm);
164 if (ret < 0) {
165 goto error;
166 }
167
168 health_code_update();
169 session = session_find_by_id(ksession->id);
170 LTTNG_ASSERT(session);
171 ASSERT_LOCKED(session->lock);
172 ASSERT_SESSION_LIST_LOCKED();
173
174 status = notification_thread_command_add_channel(the_notification_thread_handle,
175 session->id,
176 channel->channel->name,
177 channel->key,
178 LTTNG_DOMAIN_KERNEL,
179 channel->channel->attr.subbuf_size *
180 channel->channel->attr.num_subbuf);
181 if (status != LTTNG_OK) {
182 ret = -1;
183 goto error;
184 }
185
186 channel->published_to_notification_thread = true;
187
188 error:
189 if (session) {
190 session_put(session);
191 }
192 free(pathname);
193 return ret;
194 }
195
196 /*
197 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
198 *
199 * The consumer socket lock must be held by the caller.
200 */
201 int kernel_consumer_add_metadata(struct consumer_socket *sock,
202 struct ltt_kernel_session *ksession,
203 unsigned int monitor)
204 {
205 int ret;
206 struct lttcomm_consumer_msg lkm;
207 struct consumer_output *consumer;
208
209 lttng::urcu::read_lock_guard read_lock;
210
211 /* Safety net */
212 LTTNG_ASSERT(ksession);
213 LTTNG_ASSERT(ksession->consumer);
214 LTTNG_ASSERT(sock);
215
216 DBG("Sending metadata %d to kernel consumer", ksession->metadata_stream_fd);
217
218 /* Get consumer output pointer */
219 consumer = ksession->consumer;
220
221 /* Prep channel message structure */
222 consumer_init_add_channel_comm_msg(&lkm,
223 ksession->metadata->key,
224 ksession->id,
225 "",
226 consumer->net_seq_index,
227 ksession->metadata->conf->name,
228 1,
229 ksession->metadata->conf->attr.output,
230 CONSUMER_CHANNEL_TYPE_METADATA,
231 ksession->metadata->conf->attr.tracefile_size,
232 ksession->metadata->conf->attr.tracefile_count,
233 monitor,
234 ksession->metadata->conf->attr.live_timer_interval,
235 ksession->is_live_session,
236 0,
237 ksession->current_trace_chunk);
238
239 health_code_update();
240
241 ret = consumer_send_channel(sock, &lkm);
242 if (ret < 0) {
243 goto error;
244 }
245
246 health_code_update();
247
248 /* Prep stream message structure */
249 consumer_init_add_stream_comm_msg(&lkm,
250 ksession->metadata->key,
251 ksession->metadata_stream_fd,
252 0 /* CPU: 0 for metadata. */);
253
254 health_code_update();
255
256 /* Send stream and file descriptor */
257 ret = consumer_send_stream(sock, consumer, &lkm, &ksession->metadata_stream_fd, 1);
258 if (ret < 0) {
259 goto error;
260 }
261
262 health_code_update();
263
264 error:
265 return ret;
266 }
267
268 /*
269 * Sending a single stream to the consumer with command ADD_STREAM.
270 */
271 static int kernel_consumer_add_stream(struct consumer_socket *sock,
272 struct ltt_kernel_channel *channel,
273 struct ltt_kernel_stream *stream,
274 struct ltt_kernel_session *session)
275 {
276 int ret;
277 struct lttcomm_consumer_msg lkm;
278 struct consumer_output *consumer;
279
280 LTTNG_ASSERT(channel);
281 LTTNG_ASSERT(stream);
282 LTTNG_ASSERT(session);
283 LTTNG_ASSERT(session->consumer);
284 LTTNG_ASSERT(sock);
285
286 DBG("Sending stream %d of channel %s to kernel consumer",
287 stream->fd,
288 channel->channel->name);
289
290 /* Get consumer output pointer */
291 consumer = session->consumer;
292
293 /* Prep stream consumer message */
294 consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, stream->cpu);
295
296 health_code_update();
297
298 /* Send stream and file descriptor */
299 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
300 if (ret < 0) {
301 goto error;
302 }
303
304 health_code_update();
305
306 error:
307 return ret;
308 }
309
310 /*
311 * Sending the notification that all streams were sent with STREAMS_SENT.
312 */
313 int kernel_consumer_streams_sent(struct consumer_socket *sock,
314 struct ltt_kernel_session *session,
315 uint64_t channel_key)
316 {
317 int ret;
318 struct lttcomm_consumer_msg lkm;
319 struct consumer_output *consumer;
320
321 LTTNG_ASSERT(sock);
322 LTTNG_ASSERT(session);
323
324 DBG("Sending streams_sent");
325 /* Get consumer output pointer */
326 consumer = session->consumer;
327
328 /* Prep stream consumer message */
329 consumer_init_streams_sent_comm_msg(
330 &lkm, LTTNG_CONSUMER_STREAMS_SENT, channel_key, consumer->net_seq_index);
331
332 health_code_update();
333
334 /* Send stream and file descriptor */
335 ret = consumer_send_msg(sock, &lkm);
336 if (ret < 0) {
337 goto error;
338 }
339
340 error:
341 return ret;
342 }
343
344 /*
345 * Send all stream fds of kernel channel to the consumer.
346 *
347 * The consumer socket lock must be held by the caller.
348 */
349 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
350 struct ltt_kernel_channel *channel,
351 struct ltt_kernel_session *ksession,
352 unsigned int monitor)
353 {
354 int ret = LTTNG_OK;
355 struct ltt_kernel_stream *stream;
356
357 /* Safety net */
358 LTTNG_ASSERT(channel);
359 LTTNG_ASSERT(ksession);
360 LTTNG_ASSERT(ksession->consumer);
361 LTTNG_ASSERT(sock);
362
363 lttng::urcu::read_lock_guard read_lock;
364
365 /* Bail out if consumer is disabled */
366 if (!ksession->consumer->enabled) {
367 ret = LTTNG_OK;
368 goto error;
369 }
370
371 DBG("Sending streams of channel %s to kernel consumer", channel->channel->name);
372
373 if (!channel->sent_to_consumer) {
374 ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
375 if (ret < 0) {
376 goto error;
377 }
378 channel->sent_to_consumer = true;
379 }
380
381 /* Send streams */
382 cds_list_for_each_entry (stream, &channel->stream_list.head, list) {
383 if (!stream->fd || stream->sent_to_consumer) {
384 continue;
385 }
386
387 /* Add stream on the kernel consumer side. */
388 ret = kernel_consumer_add_stream(sock, channel, stream, ksession);
389 if (ret < 0) {
390 goto error;
391 }
392 stream->sent_to_consumer = true;
393 }
394
395 error:
396 return ret;
397 }
398
399 /*
400 * Send all stream fds of the kernel session to the consumer.
401 *
402 * The consumer socket lock must be held by the caller.
403 */
404 int kernel_consumer_send_session(struct consumer_socket *sock, struct ltt_kernel_session *session)
405 {
406 int ret, monitor = 0;
407 struct ltt_kernel_channel *chan;
408
409 /* Safety net */
410 LTTNG_ASSERT(session);
411 LTTNG_ASSERT(session->consumer);
412 LTTNG_ASSERT(sock);
413
414 /* Bail out if consumer is disabled */
415 if (!session->consumer->enabled) {
416 ret = LTTNG_OK;
417 goto error;
418 }
419
420 /* Don't monitor the streams on the consumer if in flight recorder. */
421 if (session->output_traces) {
422 monitor = 1;
423 }
424
425 DBG("Sending session stream to kernel consumer");
426
427 if (session->metadata_stream_fd >= 0 && session->metadata) {
428 ret = kernel_consumer_add_metadata(sock, session, monitor);
429 if (ret < 0) {
430 goto error;
431 }
432 }
433
434 /* Send channel and streams of it */
435 cds_list_for_each_entry (chan, &session->channel_list.head, list) {
436 ret = kernel_consumer_send_channel_streams(sock, chan, session, monitor);
437 if (ret < 0) {
438 goto error;
439 }
440 if (monitor) {
441 /*
442 * Inform the relay that all the streams for the
443 * channel were sent.
444 */
445 ret = kernel_consumer_streams_sent(sock, session, chan->key);
446 if (ret < 0) {
447 goto error;
448 }
449 }
450 }
451
452 DBG("Kernel consumer FDs of metadata and channel streams sent");
453
454 session->consumer_fds_sent = 1;
455 return 0;
456
457 error:
458 return ret;
459 }
460
461 int kernel_consumer_destroy_channel(struct consumer_socket *socket,
462 struct ltt_kernel_channel *channel)
463 {
464 int ret;
465 struct lttcomm_consumer_msg msg;
466
467 LTTNG_ASSERT(channel);
468 LTTNG_ASSERT(socket);
469
470 DBG("Sending kernel consumer destroy channel key %" PRIu64, channel->key);
471
472 memset(&msg, 0, sizeof(msg));
473 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
474 msg.u.destroy_channel.key = channel->key;
475
476 pthread_mutex_lock(socket->lock);
477 health_code_update();
478
479 ret = consumer_send_msg(socket, &msg);
480 if (ret < 0) {
481 goto error;
482 }
483
484 error:
485 health_code_update();
486 pthread_mutex_unlock(socket->lock);
487 return ret;
488 }
489
490 int kernel_consumer_destroy_metadata(struct consumer_socket *socket,
491 struct ltt_kernel_metadata *metadata)
492 {
493 int ret;
494 struct lttcomm_consumer_msg msg;
495
496 LTTNG_ASSERT(metadata);
497 LTTNG_ASSERT(socket);
498
499 DBG("Sending kernel consumer destroy channel key %" PRIu64, metadata->key);
500
501 memset(&msg, 0, sizeof(msg));
502 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
503 msg.u.destroy_channel.key = metadata->key;
504
505 pthread_mutex_lock(socket->lock);
506 health_code_update();
507
508 ret = consumer_send_msg(socket, &msg);
509 if (ret < 0) {
510 goto error;
511 }
512
513 error:
514 health_code_update();
515 pthread_mutex_unlock(socket->lock);
516 return ret;
517 }
This page took 0.04781 seconds and 4 git commands to generate.