2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <common/common.h>
26 #include <common/defaults.h>
27 #include <common/compat/string.h>
30 #include "health-sessiond.h"
31 #include "kernel-consumer.h"
32 #include "notification-thread-commands.h"
34 #include "lttng-sessiond.h"
36 static char *create_channel_path(struct consumer_output
*consumer
,
40 char tmp_path
[PATH_MAX
];
41 char *pathname
= NULL
;
45 /* Get the right path name destination */
46 if (consumer
->type
== CONSUMER_DST_LOCAL
) {
47 /* Set application path to the destination path */
48 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s%s",
49 consumer
->dst
.session_root_path
,
51 consumer
->domain_subdir
);
53 PERROR("snprintf kernel channel path");
55 } else if (ret
>= sizeof(tmp_path
)) {
56 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s%s\"",
57 sizeof(tmp_path
), ret
,
58 consumer
->dst
.session_root_path
,
60 consumer
->domain_subdir
);
63 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
65 PERROR("lttng_strndup");
69 /* Create directory */
70 ret
= run_as_mkdir_recursive(pathname
, S_IRWXU
| S_IRWXG
, uid
, gid
);
72 if (errno
!= EEXIST
) {
73 ERR("Trace directory creation error");
77 DBG3("Kernel local consumer tracefile path: %s", pathname
);
80 ret
= snprintf(tmp_path
, sizeof(tmp_path
), "%s%s",
81 consumer
->dst
.net
.base_dir
,
82 consumer
->domain_subdir
);
84 PERROR("snprintf kernel metadata path");
86 } else if (ret
>= sizeof(tmp_path
)) {
87 ERR("Kernel channel path exceeds the maximal allowed length of of %zu bytes (%i bytes required) with path \"%s%s\"",
88 sizeof(tmp_path
), ret
,
89 consumer
->dst
.net
.base_dir
,
90 consumer
->domain_subdir
);
93 pathname
= lttng_strndup(tmp_path
, sizeof(tmp_path
));
95 PERROR("lttng_strndup");
98 DBG3("Kernel network consumer subdir path: %s", pathname
);
109 * Sending a single channel to the consumer with command ADD_CHANNEL.
112 int kernel_consumer_add_channel(struct consumer_socket
*sock
,
113 struct ltt_kernel_channel
*channel
,
114 struct ltt_kernel_session
*ksession
,
115 unsigned int monitor
)
119 struct lttcomm_consumer_msg lkm
;
120 struct consumer_output
*consumer
;
121 enum lttng_error_code status
;
122 struct ltt_session
*session
= NULL
;
123 struct lttng_channel_extended
*channel_attr_extended
;
128 assert(ksession
->consumer
);
130 consumer
= ksession
->consumer
;
131 channel_attr_extended
= (struct lttng_channel_extended
*)
132 channel
->channel
->attr
.extended
.ptr
;
134 DBG("Kernel consumer adding channel %s to kernel consumer",
135 channel
->channel
->name
);
138 pathname
= create_channel_path(consumer
, ksession
->uid
,
142 pathname
= strdup("");
149 /* Prep channel message structure */
150 consumer_init_add_channel_comm_msg(&lkm
,
156 consumer
->net_seq_index
,
157 channel
->channel
->name
,
158 channel
->stream_count
,
159 channel
->channel
->attr
.output
,
160 CONSUMER_CHANNEL_TYPE_DATA
,
161 channel
->channel
->attr
.tracefile_size
,
162 channel
->channel
->attr
.tracefile_count
,
164 channel
->channel
->attr
.live_timer_interval
,
165 channel_attr_extended
->monitor_timer_interval
);
167 health_code_update();
169 ret
= consumer_send_channel(sock
, &lkm
);
174 health_code_update();
176 session
= session_find_by_id(ksession
->id
);
178 assert(pthread_mutex_trylock(&session
->lock
));
179 assert(session_trylock_list());
181 status
= notification_thread_command_add_channel(
182 notification_thread_handle
, session
->name
,
183 ksession
->uid
, ksession
->gid
,
184 channel
->channel
->name
, channel
->key
,
186 channel
->channel
->attr
.subbuf_size
* channel
->channel
->attr
.num_subbuf
);
188 if (status
!= LTTNG_OK
) {
193 channel
->published_to_notification_thread
= true;
197 session_put(session
);
204 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
206 * The consumer socket lock must be held by the caller.
208 int kernel_consumer_add_metadata(struct consumer_socket
*sock
,
209 struct ltt_kernel_session
*ksession
, unsigned int monitor
)
213 struct lttcomm_consumer_msg lkm
;
214 struct consumer_output
*consumer
;
215 struct ltt_session
*session
= NULL
;
221 assert(ksession
->consumer
);
224 DBG("Sending metadata %d to kernel consumer",
225 ksession
->metadata_stream_fd
);
227 /* Get consumer output pointer */
228 consumer
= ksession
->consumer
;
231 pathname
= create_channel_path(consumer
,
232 ksession
->uid
, ksession
->gid
);
235 pathname
= strdup("");
242 session
= session_find_by_id(ksession
->id
);
244 assert(pthread_mutex_trylock(&session
->lock
));
245 assert(session_trylock_list());
247 /* Prep channel message structure */
248 consumer_init_add_channel_comm_msg(&lkm
,
249 ksession
->metadata
->key
,
254 consumer
->net_seq_index
,
255 DEFAULT_METADATA_NAME
,
257 DEFAULT_KERNEL_CHANNEL_OUTPUT
,
258 CONSUMER_CHANNEL_TYPE_METADATA
,
262 health_code_update();
264 ret
= consumer_send_channel(sock
, &lkm
);
269 health_code_update();
271 /* Prep stream message structure */
272 consumer_init_add_stream_comm_msg(&lkm
,
273 ksession
->metadata
->key
,
274 ksession
->metadata_stream_fd
,
275 0 /* CPU: 0 for metadata. */,
276 session
->current_archive_id
);
278 health_code_update();
280 /* Send stream and file descriptor */
281 ret
= consumer_send_stream(sock
, consumer
, &lkm
,
282 &ksession
->metadata_stream_fd
, 1);
287 health_code_update();
293 session_put(session
);
299 * Sending a single stream to the consumer with command ADD_STREAM.
302 int kernel_consumer_add_stream(struct consumer_socket
*sock
,
303 struct ltt_kernel_channel
*channel
,
304 struct ltt_kernel_stream
*stream
,
305 struct ltt_kernel_session
*session
, unsigned int monitor
,
306 uint64_t trace_archive_id
)
309 struct lttcomm_consumer_msg lkm
;
310 struct consumer_output
*consumer
;
315 assert(session
->consumer
);
318 DBG("Sending stream %d of channel %s to kernel consumer",
319 stream
->fd
, channel
->channel
->name
);
321 /* Get consumer output pointer */
322 consumer
= session
->consumer
;
324 /* Prep stream consumer message */
325 consumer_init_add_stream_comm_msg(&lkm
,
331 health_code_update();
333 /* Send stream and file descriptor */
334 ret
= consumer_send_stream(sock
, consumer
, &lkm
, &stream
->fd
, 1);
339 health_code_update();
346 * Sending the notification that all streams were sent with STREAMS_SENT.
348 int kernel_consumer_streams_sent(struct consumer_socket
*sock
,
349 struct ltt_kernel_session
*session
, uint64_t channel_key
)
352 struct lttcomm_consumer_msg lkm
;
353 struct consumer_output
*consumer
;
358 DBG("Sending streams_sent");
359 /* Get consumer output pointer */
360 consumer
= session
->consumer
;
362 /* Prep stream consumer message */
363 consumer_init_streams_sent_comm_msg(&lkm
,
364 LTTNG_CONSUMER_STREAMS_SENT
,
365 channel_key
, consumer
->net_seq_index
);
367 health_code_update();
369 /* Send stream and file descriptor */
370 ret
= consumer_send_msg(sock
, &lkm
);
380 * Send all stream fds of kernel channel to the consumer.
382 * The consumer socket lock must be held by the caller.
384 int kernel_consumer_send_channel_streams(struct consumer_socket
*sock
,
385 struct ltt_kernel_channel
*channel
, struct ltt_kernel_session
*ksession
,
386 unsigned int monitor
)
389 struct ltt_kernel_stream
*stream
;
390 struct ltt_session
*session
= NULL
;
395 assert(ksession
->consumer
);
400 session
= session_find_by_id(ksession
->id
);
402 assert(pthread_mutex_trylock(&session
->lock
));
403 assert(session_trylock_list());
405 /* Bail out if consumer is disabled */
406 if (!ksession
->consumer
->enabled
) {
411 DBG("Sending streams of channel %s to kernel consumer",
412 channel
->channel
->name
);
414 if (!channel
->sent_to_consumer
) {
415 ret
= kernel_consumer_add_channel(sock
, channel
, ksession
, monitor
);
419 channel
->sent_to_consumer
= true;
423 cds_list_for_each_entry(stream
, &channel
->stream_list
.head
, list
) {
424 if (!stream
->fd
|| stream
->sent_to_consumer
) {
428 /* Add stream on the kernel consumer side. */
429 ret
= kernel_consumer_add_stream(sock
, channel
, stream
,
430 ksession
, monitor
, session
->current_archive_id
);
434 stream
->sent_to_consumer
= true;
440 session_put(session
);
446 * Send all stream fds of the kernel session to the consumer.
448 * The consumer socket lock must be held by the caller.
450 int kernel_consumer_send_session(struct consumer_socket
*sock
,
451 struct ltt_kernel_session
*session
)
453 int ret
, monitor
= 0;
454 struct ltt_kernel_channel
*chan
;
458 assert(session
->consumer
);
461 /* Bail out if consumer is disabled */
462 if (!session
->consumer
->enabled
) {
467 /* Don't monitor the streams on the consumer if in flight recorder. */
468 if (session
->output_traces
) {
472 DBG("Sending session stream to kernel consumer");
474 if (session
->metadata_stream_fd
>= 0 && session
->metadata
) {
475 ret
= kernel_consumer_add_metadata(sock
, session
, monitor
);
481 /* Send channel and streams of it */
482 cds_list_for_each_entry(chan
, &session
->channel_list
.head
, list
) {
483 ret
= kernel_consumer_send_channel_streams(sock
, chan
, session
,
490 * Inform the relay that all the streams for the
493 ret
= kernel_consumer_streams_sent(sock
, session
, chan
->key
);
500 DBG("Kernel consumer FDs of metadata and channel streams sent");
502 session
->consumer_fds_sent
= 1;
509 int kernel_consumer_destroy_channel(struct consumer_socket
*socket
,
510 struct ltt_kernel_channel
*channel
)
513 struct lttcomm_consumer_msg msg
;
518 DBG("Sending kernel consumer destroy channel key %" PRIu64
, channel
->key
);
520 memset(&msg
, 0, sizeof(msg
));
521 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
522 msg
.u
.destroy_channel
.key
= channel
->key
;
524 pthread_mutex_lock(socket
->lock
);
525 health_code_update();
527 ret
= consumer_send_msg(socket
, &msg
);
533 health_code_update();
534 pthread_mutex_unlock(socket
->lock
);
538 int kernel_consumer_destroy_metadata(struct consumer_socket
*socket
,
539 struct ltt_kernel_metadata
*metadata
)
542 struct lttcomm_consumer_msg msg
;
547 DBG("Sending kernel consumer destroy channel key %" PRIu64
, metadata
->key
);
549 memset(&msg
, 0, sizeof(msg
));
550 msg
.cmd_type
= LTTNG_CONSUMER_DESTROY_CHANNEL
;
551 msg
.u
.destroy_channel
.key
= metadata
->key
;
553 pthread_mutex_lock(socket
->lock
);
554 health_code_update();
556 ret
= consumer_send_msg(socket
, &msg
);
562 health_code_update();
563 pthread_mutex_unlock(socket
->lock
);