2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
9 #include "consumer.hpp"
10 #include "health-sessiond.hpp"
11 #include "kernel-consumer.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "notification-thread-commands.hpp"
14 #include "session.hpp"
16 #include <common/common.hpp>
17 #include <common/compat/string.hpp>
18 #include <common/defaults.hpp>
19 #include <common/urcu.hpp>
27 static char *create_channel_path(struct consumer_output
*consumer
, size_t *consumer_path_offset
)
30 char tmp_path
[PATH_MAX
];
31 char *pathname
= nullptr;
33 LTTNG_ASSERT(consumer
);
35 /* Get the right path name destination */
36 if (consumer
->type
== CONSUMER_DST_LOCAL
||
37 (consumer
->type
== CONSUMER_DST_NET
&& consumer
->relay_major_version
== 2 &&
38 consumer
->relay_minor_version
>= 11)) {
39 pathname
= strdup(consumer
->domain_subdir
);
41 PERROR("Failed to copy domain subdirectory string %s",
42 consumer
->domain_subdir
);
45 *consumer_path_offset
= strlen(consumer
->domain_subdir
);
46 DBG3("Kernel local consumer trace path relative to current trace chunk: \"%s\"",
49 /* Network output, relayd < 2.11. */
50 ret
= snprintf(tmp_path
,
53 consumer
->dst
.net
.base_dir
,
54 consumer
->domain_subdir
);
56 PERROR("snprintf kernel metadata path");
58 } else if (ret
>= sizeof(tmp_path
)) {
59 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
62 consumer
->dst
.net
.base_dir
,
63 consumer
->domain_subdir
);
66 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
68 PERROR("lttng_strndup");
71 *consumer_path_offset
= 0;
72 DBG3("Kernel network consumer subdir path: %s", pathname
);
83 * Sending a single channel to the consumer with command ADD_CHANNEL.
85 static int kernel_consumer_add_channel(struct consumer_socket
*sock
,
86 struct ltt_kernel_channel
*channel
,
87 struct ltt_kernel_session
*ksession
,
91 char *pathname
= nullptr;
92 struct lttcomm_consumer_msg lkm
;
93 struct consumer_output
*consumer
;
94 enum lttng_error_code status
;
95 struct lttng_channel_extended
*channel_attr_extended
;
97 size_t consumer_path_offset
= 0;
98 const lttng::urcu::read_lock_guard read_lock
;
101 LTTNG_ASSERT(channel
);
102 LTTNG_ASSERT(ksession
);
103 LTTNG_ASSERT(ksession
->consumer
);
105 consumer
= ksession
->consumer
;
106 channel_attr_extended
=
107 (struct lttng_channel_extended
*) channel
->channel
->attr
.extended
.ptr
;
109 DBG("Kernel consumer adding channel %s to kernel consumer", channel
->channel
->name
);
110 is_local_trace
= consumer
->net_seq_index
== -1ULL;
112 pathname
= create_channel_path(consumer
, &consumer_path_offset
);
118 if (is_local_trace
&& ksession
->current_trace_chunk
) {
119 enum lttng_trace_chunk_status chunk_status
;
120 char *pathname_index
;
122 ret
= asprintf(&pathname_index
, "%s/" DEFAULT_INDEX_DIR
, pathname
);
124 ERR("Failed to format channel index directory");
130 * Create the index subdirectory which will take care
131 * of implicitly creating the channel's path.
133 chunk_status
= lttng_trace_chunk_create_subdirectory(ksession
->current_trace_chunk
,
135 free(pathname_index
);
136 if (chunk_status
!= LTTNG_TRACE_CHUNK_STATUS_OK
) {
142 /* Prep channel message structure */
143 consumer_init_add_channel_comm_msg(&lkm
,
146 &pathname
[consumer_path_offset
],
147 consumer
->net_seq_index
,
148 channel
->channel
->name
,
149 channel
->stream_count
,
150 channel
->channel
->attr
.output
,
151 CONSUMER_CHANNEL_TYPE_DATA
,
152 channel
->channel
->attr
.tracefile_size
,
153 channel
->channel
->attr
.tracefile_count
,
155 channel
->channel
->attr
.live_timer_interval
,
156 ksession
->is_live_session
,
157 channel_attr_extended
->monitor_timer_interval
,
158 ksession
->current_trace_chunk
);
160 health_code_update();
162 ret
= consumer_send_channel(sock
, &lkm
);
167 health_code_update();
170 const auto session
= ltt_session::find_session(ksession
->id
);
172 ASSERT_SESSION_LIST_LOCKED();
174 status
= notification_thread_command_add_channel(
175 the_notification_thread_handle
,
177 channel
->channel
->name
,
180 channel
->channel
->attr
.subbuf_size
* channel
->channel
->attr
.num_subbuf
);
181 } catch (const lttng::sessiond::exceptions::session_not_found_error
& ex
) {
182 ERR_FMT("Fatal error during the creation of a kernel channel: {}, location='{}'",
188 if (status
!= LTTNG_OK
) {
193 channel
->published_to_notification_thread
= true;
201 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
203 * The consumer socket lock must be held by the caller.
205 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
206 struct ltt_kernel_session
*ksession
,
207 unsigned int monitor
)
210 struct lttcomm_consumer_msg lkm
;
211 struct consumer_output
*consumer
;
213 const lttng::urcu::read_lock_guard read_lock
;
216 LTTNG_ASSERT(ksession
);
217 LTTNG_ASSERT(ksession
->consumer
);
220 DBG("Sending metadata %d to kernel consumer", ksession
->metadata_stream_fd
);
222 /* Get consumer output pointer */
223 consumer
= ksession
->consumer
;
225 /* Prep channel message structure */
226 consumer_init_add_channel_comm_msg(&lkm
,
227 ksession
->metadata
->key
,
230 consumer
->net_seq_index
,
231 ksession
->metadata
->conf
->name
,
233 ksession
->metadata
->conf
->attr
.output
,
234 CONSUMER_CHANNEL_TYPE_METADATA
,
235 ksession
->metadata
->conf
->attr
.tracefile_size
,
236 ksession
->metadata
->conf
->attr
.tracefile_count
,
238 ksession
->metadata
->conf
->attr
.live_timer_interval
,
239 ksession
->is_live_session
,
241 ksession
->current_trace_chunk
);
243 health_code_update();
245 ret
= consumer_send_channel(sock
, &lkm
);
250 health_code_update();
252 /* Prep stream message structure */
253 consumer_init_add_stream_comm_msg(&lkm
,
254 ksession
->metadata
->key
,
255 ksession
->metadata_stream_fd
,
256 0 /* CPU: 0 for metadata. */);
258 health_code_update();
260 /* Send stream and file descriptor */
261 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &ksession
->metadata_stream_fd
, 1);
266 health_code_update();
273 * Sending a single stream to the consumer with command ADD_STREAM.
275 static int kernel_consumer_add_stream(struct consumer_socket
*sock
,
276 struct ltt_kernel_channel
*channel
,
277 struct ltt_kernel_stream
*stream
,
278 struct ltt_kernel_session
*session
)
281 struct lttcomm_consumer_msg lkm
;
282 struct consumer_output
*consumer
;
284 LTTNG_ASSERT(channel
);
285 LTTNG_ASSERT(stream
);
286 LTTNG_ASSERT(session
);
287 LTTNG_ASSERT(session
->consumer
);
290 DBG("Sending stream %d of channel %s to kernel consumer",
292 channel
->channel
->name
);
294 /* Get consumer output pointer */
295 consumer
= session
->consumer
;
297 /* Prep stream consumer message */
298 consumer_init_add_stream_comm_msg(&lkm
, channel
->key
, stream
->fd
, stream
->cpu
);
300 health_code_update();
302 /* Send stream and file descriptor */
303 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
308 health_code_update();
315 * Sending the notification that all streams were sent with STREAMS_SENT.
317 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
318 struct ltt_kernel_session
*session
,
319 uint64_t channel_key
)
322 struct lttcomm_consumer_msg lkm
;
323 struct consumer_output
*consumer
;
326 LTTNG_ASSERT(session
);
328 DBG("Sending streams_sent");
329 /* Get consumer output pointer */
330 consumer
= session
->consumer
;
332 /* Prep stream consumer message */
333 consumer_init_streams_sent_comm_msg(
334 &lkm
, LTTNG_CONSUMER_STREAMS_SENT
, channel_key
, consumer
->net_seq_index
);
336 health_code_update();
338 /* Send stream and file descriptor */
339 ret
= consumer_send_msg(sock
, &lkm
);
349 * Send all stream fds of kernel channel to the consumer.
351 * The consumer socket lock must be held by the caller.
353 int kernel_consumer_send_channel_streams(struct consumer_socket
*sock
,
354 struct ltt_kernel_channel
*channel
,
355 struct ltt_kernel_session
*ksession
,
356 unsigned int monitor
)
359 struct ltt_kernel_stream
*stream
;
362 LTTNG_ASSERT(channel
);
363 LTTNG_ASSERT(ksession
);
364 LTTNG_ASSERT(ksession
->consumer
);
367 const lttng::urcu::read_lock_guard read_lock
;
369 /* Bail out if consumer is disabled */
370 if (!ksession
->consumer
->enabled
) {
375 DBG("Sending streams of channel %s to kernel consumer", channel
->channel
->name
);
377 if (!channel
->sent_to_consumer
) {
378 ret
= kernel_consumer_add_channel(sock
, channel
, ksession
, monitor
);
382 channel
->sent_to_consumer
= true;
386 cds_list_for_each_entry (stream
, &channel
->stream_list
.head
, list
) {
387 if (!stream
->fd
|| stream
->sent_to_consumer
) {
391 /* Add stream on the kernel consumer side. */
392 ret
= kernel_consumer_add_stream(sock
, channel
, stream
, ksession
);
396 stream
->sent_to_consumer
= true;
404 * Send all stream fds of the kernel session to the consumer.
406 * The consumer socket lock must be held by the caller.
408 int kernel_consumer_send_session(struct consumer_socket
*sock
, struct ltt_kernel_session
*session
)
410 int ret
, monitor
= 0;
411 struct ltt_kernel_channel
*chan
;
414 LTTNG_ASSERT(session
);
415 LTTNG_ASSERT(session
->consumer
);
418 /* Bail out if consumer is disabled */
419 if (!session
->consumer
->enabled
) {
424 /* Don't monitor the streams on the consumer if in flight recorder. */
425 if (session
->output_traces
) {
429 DBG("Sending session stream to kernel consumer");
431 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
432 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
438 /* Send channel and streams of it */
439 cds_list_for_each_entry (chan
, &session
->channel_list
.head
, list
) {
440 ret
= kernel_consumer_send_channel_streams(sock
, chan
, session
, monitor
);
446 * Inform the relay that all the streams for the
449 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->key
);
456 DBG("Kernel consumer FDs of metadata and channel streams sent");
458 session
->consumer_fds_sent
= 1;
465 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
466 struct ltt_kernel_channel
*channel
)
469 struct lttcomm_consumer_msg msg
;
471 LTTNG_ASSERT(channel
);
472 LTTNG_ASSERT(socket
);
474 DBG("Sending kernel consumer destroy channel key %" PRIu64
, channel
->key
);
476 memset(&msg
, 0, sizeof(msg
));
477 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
478 msg
.u
.destroy_channel
.key
= channel
->key
;
480 pthread_mutex_lock(socket
->lock
);
481 health_code_update();
483 ret
= consumer_send_msg(socket
, &msg
);
489 health_code_update();
490 pthread_mutex_unlock(socket
->lock
);
494 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
495 struct ltt_kernel_metadata
*metadata
)
498 struct lttcomm_consumer_msg msg
;
500 LTTNG_ASSERT(metadata
);
501 LTTNG_ASSERT(socket
);
503 DBG("Sending kernel consumer destroy channel key %" PRIu64
, metadata
->key
);
505 memset(&msg
, 0, sizeof(msg
));
506 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
507 msg
.u
.destroy_channel
.key
= metadata
->key
;
509 pthread_mutex_lock(socket
->lock
);
510 health_code_update();
512 ret
= consumer_send_msg(socket
, &msg
);
518 health_code_update();
519 pthread_mutex_unlock(socket
->lock
);