2 * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
17 #include <sys/socket.h>
18 #include <sys/types.h>
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/kernel-ctl/kernel-ctl.h>
27 #include <common/sessiond-comm/sessiond-comm.h>
28 #include <common/sessiond-comm/relayd.h>
29 #include <common/compat/fcntl.h>
30 #include <common/compat/endian.h>
31 #include <common/pipe.h>
32 #include <common/relayd/relayd.h>
33 #include <common/utils.h>
34 #include <common/consumer/consumer-stream.h>
35 #include <common/index/index.h>
36 #include <common/consumer/consumer-timer.h>
37 #include <common/optional.h>
38 #include <common/buffer-view.h>
39 #include <common/consumer/consumer.h>
40 #include <common/consumer/metadata-bucket.h>
42 #include "kernel-consumer.h"
44 extern struct lttng_consumer_global_data the_consumer_data
;
45 extern int consumer_poll_timeout
;
48 * Take a snapshot for a specific fd
50 * Returns 0 on success, < 0 on error
52 int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream
*stream
)
55 int infd
= stream
->wait_fd
;
57 ret
= kernctl_snapshot(infd
);
59 * -EAGAIN is not an error, it just means that there is no data to
62 if (ret
!= 0 && ret
!= -EAGAIN
) {
63 PERROR("Getting sub-buffer snapshot.");
70 * Sample consumed and produced positions for a specific fd.
72 * Returns 0 on success, < 0 on error.
74 int lttng_kconsumer_sample_snapshot_positions(
75 struct lttng_consumer_stream
*stream
)
79 return kernctl_snapshot_sample_positions(stream
->wait_fd
);
83 * Get the produced position
85 * Returns 0 on success, < 0 on error
87 int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream
*stream
,
91 int infd
= stream
->wait_fd
;
93 ret
= kernctl_snapshot_get_produced(infd
, pos
);
95 PERROR("kernctl_snapshot_get_produced");
102 * Get the consumerd position
104 * Returns 0 on success, < 0 on error
106 int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream
*stream
,
110 int infd
= stream
->wait_fd
;
112 ret
= kernctl_snapshot_get_consumed(infd
, pos
);
114 PERROR("kernctl_snapshot_get_consumed");
121 int get_current_subbuf_addr(struct lttng_consumer_stream
*stream
,
125 unsigned long mmap_offset
;
126 const char *mmap_base
= stream
->mmap_base
;
128 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
130 PERROR("Failed to get mmap read offset");
134 *addr
= mmap_base
+ mmap_offset
;
140 * Take a snapshot of all the stream of a channel
141 * RCU read-side lock must be held across this function to ensure existence of
142 * channel. The channel lock must be held by the caller.
144 * Returns 0 on success, < 0 on error
146 static int lttng_kconsumer_snapshot_channel(
147 struct lttng_consumer_channel
*channel
,
148 uint64_t key
, char *path
, uint64_t relayd_id
,
149 uint64_t nb_packets_per_stream
,
150 struct lttng_consumer_local_data
*ctx
)
153 struct lttng_consumer_stream
*stream
;
155 DBG("Kernel consumer snapshot channel %" PRIu64
, key
);
159 /* Splice is not supported yet for channel snapshot. */
160 if (channel
->output
!= CONSUMER_CHANNEL_MMAP
) {
161 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
167 cds_list_for_each_entry(stream
, &channel
->streams
.head
, send_node
) {
168 unsigned long consumed_pos
, produced_pos
;
170 health_code_update();
173 * Lock stream because we are about to change its state.
175 pthread_mutex_lock(&stream
->lock
);
177 assert(channel
->trace_chunk
);
178 if (!lttng_trace_chunk_get(channel
->trace_chunk
)) {
180 * Can't happen barring an internal error as the channel
181 * holds a reference to the trace chunk.
183 ERR("Failed to acquire reference to channel's trace chunk");
187 assert(!stream
->trace_chunk
);
188 stream
->trace_chunk
= channel
->trace_chunk
;
191 * Assign the received relayd ID so we can use it for streaming. The streams
192 * are not visible to anyone so this is OK to change it.
194 stream
->net_seq_idx
= relayd_id
;
195 channel
->relayd_id
= relayd_id
;
196 if (relayd_id
!= (uint64_t) -1ULL) {
197 ret
= consumer_send_relayd_stream(stream
, path
);
199 ERR("sending stream to relayd");
203 ret
= consumer_stream_create_output_files(stream
,
208 DBG("Kernel consumer snapshot stream (%" PRIu64
")",
212 ret
= kernctl_buffer_flush_empty(stream
->wait_fd
);
215 * Doing a buffer flush which does not take into
216 * account empty packets. This is not perfect
217 * for stream intersection, but required as a
218 * fall-back when "flush_empty" is not
219 * implemented by lttng-modules.
221 ret
= kernctl_buffer_flush(stream
->wait_fd
);
223 ERR("Failed to flush kernel stream");
229 ret
= lttng_kconsumer_take_snapshot(stream
);
231 ERR("Taking kernel snapshot");
235 ret
= lttng_kconsumer_get_produced_snapshot(stream
, &produced_pos
);
237 ERR("Produced kernel snapshot position");
241 ret
= lttng_kconsumer_get_consumed_snapshot(stream
, &consumed_pos
);
243 ERR("Consumerd kernel snapshot position");
247 consumed_pos
= consumer_get_consume_start_pos(consumed_pos
,
248 produced_pos
, nb_packets_per_stream
,
249 stream
->max_sb_size
);
251 while ((long) (consumed_pos
- produced_pos
) < 0) {
253 unsigned long len
, padded_len
;
254 const char *subbuf_addr
;
255 struct lttng_buffer_view subbuf_view
;
257 health_code_update();
258 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos
);
260 ret
= kernctl_get_subbuf(stream
->wait_fd
, &consumed_pos
);
262 if (ret
!= -EAGAIN
) {
263 PERROR("kernctl_get_subbuf snapshot");
266 DBG("Kernel consumer get subbuf failed. Skipping it.");
267 consumed_pos
+= stream
->max_sb_size
;
268 stream
->chan
->lost_packets
++;
272 ret
= kernctl_get_subbuf_size(stream
->wait_fd
, &len
);
274 ERR("Snapshot kernctl_get_subbuf_size");
275 goto error_put_subbuf
;
278 ret
= kernctl_get_padded_subbuf_size(stream
->wait_fd
, &padded_len
);
280 ERR("Snapshot kernctl_get_padded_subbuf_size");
281 goto error_put_subbuf
;
284 ret
= get_current_subbuf_addr(stream
, &subbuf_addr
);
286 goto error_put_subbuf
;
289 subbuf_view
= lttng_buffer_view_init(
290 subbuf_addr
, 0, padded_len
);
291 read_len
= lttng_consumer_on_read_subbuffer_mmap(
292 stream
, &subbuf_view
,
295 * We write the padded len in local tracefiles but the data len
296 * when using a relay. Display the error but continue processing
297 * to try to release the subbuffer.
299 if (relayd_id
!= (uint64_t) -1ULL) {
300 if (read_len
!= len
) {
301 ERR("Error sending to the relay (ret: %zd != len: %lu)",
305 if (read_len
!= padded_len
) {
306 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
307 read_len
, padded_len
);
311 ret
= kernctl_put_subbuf(stream
->wait_fd
);
313 ERR("Snapshot kernctl_put_subbuf");
316 consumed_pos
+= stream
->max_sb_size
;
319 if (relayd_id
== (uint64_t) -1ULL) {
320 if (stream
->out_fd
>= 0) {
321 ret
= close(stream
->out_fd
);
323 PERROR("Kernel consumer snapshot close out_fd");
329 close_relayd_stream(stream
);
330 stream
->net_seq_idx
= (uint64_t) -1ULL;
332 lttng_trace_chunk_put(stream
->trace_chunk
);
333 stream
->trace_chunk
= NULL
;
334 pthread_mutex_unlock(&stream
->lock
);
342 ret
= kernctl_put_subbuf(stream
->wait_fd
);
344 ERR("Snapshot kernctl_put_subbuf error path");
347 pthread_mutex_unlock(&stream
->lock
);
354 * Read the whole metadata available for a snapshot.
355 * RCU read-side lock must be held across this function to ensure existence of
356 * metadata_channel. The channel lock must be held by the caller.
358 * Returns 0 on success, < 0 on error
360 static int lttng_kconsumer_snapshot_metadata(
361 struct lttng_consumer_channel
*metadata_channel
,
362 uint64_t key
, char *path
, uint64_t relayd_id
,
363 struct lttng_consumer_local_data
*ctx
)
365 int ret
, use_relayd
= 0;
367 struct lttng_consumer_stream
*metadata_stream
;
371 DBG("Kernel consumer snapshot metadata with key %" PRIu64
" at path %s",
376 metadata_stream
= metadata_channel
->metadata_stream
;
377 assert(metadata_stream
);
379 pthread_mutex_lock(&metadata_stream
->lock
);
380 assert(metadata_channel
->trace_chunk
);
381 assert(metadata_stream
->trace_chunk
);
383 /* Flag once that we have a valid relayd for the stream. */
384 if (relayd_id
!= (uint64_t) -1ULL) {
389 ret
= consumer_send_relayd_stream(metadata_stream
, path
);
394 ret
= consumer_stream_create_output_files(metadata_stream
,
402 health_code_update();
404 ret_read
= lttng_consumer_read_subbuffer(metadata_stream
, ctx
, true);
406 if (ret_read
!= -EAGAIN
) {
407 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
412 /* ret_read is negative at this point so we will exit the loop. */
415 } while (ret_read
>= 0);
418 close_relayd_stream(metadata_stream
);
419 metadata_stream
->net_seq_idx
= (uint64_t) -1ULL;
421 if (metadata_stream
->out_fd
>= 0) {
422 ret
= close(metadata_stream
->out_fd
);
424 PERROR("Kernel consumer snapshot metadata close out_fd");
426 * Don't go on error here since the snapshot was successful at this
427 * point but somehow the close failed.
430 metadata_stream
->out_fd
= -1;
431 lttng_trace_chunk_put(metadata_stream
->trace_chunk
);
432 metadata_stream
->trace_chunk
= NULL
;
438 pthread_mutex_unlock(&metadata_stream
->lock
);
439 cds_list_del(&metadata_stream
->send_node
);
440 consumer_stream_destroy(metadata_stream
, NULL
);
441 metadata_channel
->metadata_stream
= NULL
;
447 * Receive command from session daemon and process it.
449 * Return 1 on success else a negative value or 0.
451 int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
452 int sock
, struct pollfd
*consumer_sockpoll
)
455 enum lttcomm_return_code ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
456 struct lttcomm_consumer_msg msg
;
458 health_code_update();
463 ret_recv
= lttcomm_recv_unix_sock(sock
, &msg
, sizeof(msg
));
464 if (ret_recv
!= sizeof(msg
)) {
466 lttng_consumer_send_error(ctx
,
467 LTTCOMM_CONSUMERD_ERROR_RECV_CMD
);
474 health_code_update();
476 /* Deprecated command */
477 assert(msg
.cmd_type
!= LTTNG_CONSUMER_STOP
);
479 health_code_update();
481 /* relayd needs RCU read-side protection */
484 switch (msg
.cmd_type
) {
485 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET
:
487 /* Session daemon status message are handled in the following call. */
488 consumer_add_relayd_socket(msg
.u
.relayd_sock
.net_index
,
489 msg
.u
.relayd_sock
.type
, ctx
, sock
, consumer_sockpoll
,
490 &msg
.u
.relayd_sock
.sock
, msg
.u
.relayd_sock
.session_id
,
491 msg
.u
.relayd_sock
.relayd_session_id
);
494 case LTTNG_CONSUMER_ADD_CHANNEL
:
496 struct lttng_consumer_channel
*new_channel
;
497 int ret_send_status
, ret_add_channel
= 0;
498 const uint64_t chunk_id
= msg
.u
.channel
.chunk_id
.value
;
500 health_code_update();
502 /* First send a status message before receiving the fds. */
503 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
504 if (ret_send_status
< 0) {
505 /* Somehow, the session daemon is not responding anymore. */
509 health_code_update();
511 DBG("consumer_add_channel %" PRIu64
, msg
.u
.channel
.channel_key
);
512 new_channel
= consumer_allocate_channel(msg
.u
.channel
.channel_key
,
513 msg
.u
.channel
.session_id
,
514 msg
.u
.channel
.chunk_id
.is_set
?
516 msg
.u
.channel
.pathname
,
518 msg
.u
.channel
.relayd_id
, msg
.u
.channel
.output
,
519 msg
.u
.channel
.tracefile_size
,
520 msg
.u
.channel
.tracefile_count
, 0,
521 msg
.u
.channel
.monitor
,
522 msg
.u
.channel
.live_timer_interval
,
523 msg
.u
.channel
.is_live
,
525 if (new_channel
== NULL
) {
526 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
529 new_channel
->nb_init_stream_left
= msg
.u
.channel
.nb_init_streams
;
530 switch (msg
.u
.channel
.output
) {
531 case LTTNG_EVENT_SPLICE
:
532 new_channel
->output
= CONSUMER_CHANNEL_SPLICE
;
534 case LTTNG_EVENT_MMAP
:
535 new_channel
->output
= CONSUMER_CHANNEL_MMAP
;
538 ERR("Channel output unknown %d", msg
.u
.channel
.output
);
542 /* Translate and save channel type. */
543 switch (msg
.u
.channel
.type
) {
544 case CONSUMER_CHANNEL_TYPE_DATA
:
545 case CONSUMER_CHANNEL_TYPE_METADATA
:
546 new_channel
->type
= msg
.u
.channel
.type
;
553 health_code_update();
555 if (ctx
->on_recv_channel
!= NULL
) {
556 int ret_recv_channel
=
557 ctx
->on_recv_channel(new_channel
);
558 if (ret_recv_channel
== 0) {
559 ret_add_channel
= consumer_add_channel(
561 } else if (ret_recv_channel
< 0) {
566 consumer_add_channel(new_channel
, ctx
);
568 if (msg
.u
.channel
.type
== CONSUMER_CHANNEL_TYPE_DATA
&&
570 int monitor_start_ret
;
572 DBG("Consumer starting monitor timer");
573 consumer_timer_live_start(new_channel
,
574 msg
.u
.channel
.live_timer_interval
);
575 monitor_start_ret
= consumer_timer_monitor_start(
577 msg
.u
.channel
.monitor_timer_interval
);
578 if (monitor_start_ret
< 0) {
579 ERR("Starting channel monitoring timer failed");
584 health_code_update();
586 /* If we received an error in add_channel, we need to report it. */
587 if (ret_add_channel
< 0) {
588 ret_send_status
= consumer_send_status_msg(
589 sock
, ret_add_channel
);
590 if (ret_send_status
< 0) {
598 case LTTNG_CONSUMER_ADD_STREAM
:
601 struct lttng_pipe
*stream_pipe
;
602 struct lttng_consumer_stream
*new_stream
;
603 struct lttng_consumer_channel
*channel
;
605 int ret_send_status
, ret_poll
, ret_get_max_subbuf_size
;
606 ssize_t ret_pipe_write
, ret_recv
;
609 * Get stream's channel reference. Needed when adding the stream to the
612 channel
= consumer_find_channel(msg
.u
.stream
.channel_key
);
615 * We could not find the channel. Can happen if cpu hotplug
616 * happens while tearing down.
618 ERR("Unable to find channel key %" PRIu64
, msg
.u
.stream
.channel_key
);
619 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
622 health_code_update();
624 /* First send a status message before receiving the fds. */
625 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
626 if (ret_send_status
< 0) {
627 /* Somehow, the session daemon is not responding anymore. */
628 goto error_add_stream_fatal
;
631 health_code_update();
633 if (ret_code
!= LTTCOMM_CONSUMERD_SUCCESS
) {
634 /* Channel was not found. */
635 goto error_add_stream_nosignal
;
640 ret_poll
= lttng_consumer_poll_socket(consumer_sockpoll
);
643 goto error_add_stream_fatal
;
646 health_code_update();
648 /* Get stream file descriptor from socket */
649 ret_recv
= lttcomm_recv_fds_unix_sock(sock
, &fd
, 1);
650 if (ret_recv
!= sizeof(fd
)) {
651 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_FD
);
656 health_code_update();
659 * Send status code to session daemon only if the recv works. If the
660 * above recv() failed, the session daemon is notified through the
661 * error socket and the teardown is eventually done.
663 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
664 if (ret_send_status
< 0) {
665 /* Somehow, the session daemon is not responding anymore. */
666 goto error_add_stream_nosignal
;
669 health_code_update();
671 pthread_mutex_lock(&channel
->lock
);
672 new_stream
= consumer_stream_create(
679 channel
->trace_chunk
,
684 if (new_stream
== NULL
) {
689 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
692 pthread_mutex_unlock(&channel
->lock
);
693 goto error_add_stream_nosignal
;
696 new_stream
->wait_fd
= fd
;
697 ret_get_max_subbuf_size
= kernctl_get_max_subbuf_size(
698 new_stream
->wait_fd
, &new_stream
->max_sb_size
);
699 if (ret_get_max_subbuf_size
< 0) {
700 pthread_mutex_unlock(&channel
->lock
);
701 ERR("Failed to get kernel maximal subbuffer size");
702 goto error_add_stream_nosignal
;
705 consumer_stream_update_channel_attributes(new_stream
,
709 * We've just assigned the channel to the stream so increment the
710 * refcount right now. We don't need to increment the refcount for
711 * streams in no monitor because we handle manually the cleanup of
712 * those. It is very important to make sure there is NO prior
713 * consumer_del_stream() calls or else the refcount will be unbalanced.
715 if (channel
->monitor
) {
716 uatomic_inc(&new_stream
->chan
->refcount
);
720 * The buffer flush is done on the session daemon side for the kernel
721 * so no need for the stream "hangup_flush_done" variable to be
722 * tracked. This is important for a kernel stream since we don't rely
723 * on the flush state of the stream to read data. It's not the case for
724 * user space tracing.
726 new_stream
->hangup_flush_done
= 0;
728 health_code_update();
730 pthread_mutex_lock(&new_stream
->lock
);
731 if (ctx
->on_recv_stream
) {
732 int ret_recv_stream
= ctx
->on_recv_stream(new_stream
);
733 if (ret_recv_stream
< 0) {
734 pthread_mutex_unlock(&new_stream
->lock
);
735 pthread_mutex_unlock(&channel
->lock
);
736 consumer_stream_free(new_stream
);
737 goto error_add_stream_nosignal
;
740 health_code_update();
742 if (new_stream
->metadata_flag
) {
743 channel
->metadata_stream
= new_stream
;
746 /* Do not monitor this stream. */
747 if (!channel
->monitor
) {
748 DBG("Kernel consumer add stream %s in no monitor mode with "
749 "relayd id %" PRIu64
, new_stream
->name
,
750 new_stream
->net_seq_idx
);
751 cds_list_add(&new_stream
->send_node
, &channel
->streams
.head
);
752 pthread_mutex_unlock(&new_stream
->lock
);
753 pthread_mutex_unlock(&channel
->lock
);
757 /* Send stream to relayd if the stream has an ID. */
758 if (new_stream
->net_seq_idx
!= (uint64_t) -1ULL) {
759 int ret_send_relayd_stream
;
761 ret_send_relayd_stream
= consumer_send_relayd_stream(
762 new_stream
, new_stream
->chan
->pathname
);
763 if (ret_send_relayd_stream
< 0) {
764 pthread_mutex_unlock(&new_stream
->lock
);
765 pthread_mutex_unlock(&channel
->lock
);
766 consumer_stream_free(new_stream
);
767 goto error_add_stream_nosignal
;
771 * If adding an extra stream to an already
772 * existing channel (e.g. cpu hotplug), we need
773 * to send the "streams_sent" command to relayd.
775 if (channel
->streams_sent_to_relayd
) {
776 int ret_send_relayd_streams_sent
;
778 ret_send_relayd_streams_sent
=
779 consumer_send_relayd_streams_sent(
780 new_stream
->net_seq_idx
);
781 if (ret_send_relayd_streams_sent
< 0) {
782 pthread_mutex_unlock(&new_stream
->lock
);
783 pthread_mutex_unlock(&channel
->lock
);
784 goto error_add_stream_nosignal
;
788 pthread_mutex_unlock(&new_stream
->lock
);
789 pthread_mutex_unlock(&channel
->lock
);
791 /* Get the right pipe where the stream will be sent. */
792 if (new_stream
->metadata_flag
) {
793 consumer_add_metadata_stream(new_stream
);
794 stream_pipe
= ctx
->consumer_metadata_pipe
;
796 consumer_add_data_stream(new_stream
);
797 stream_pipe
= ctx
->consumer_data_pipe
;
800 /* Visible to other threads */
801 new_stream
->globally_visible
= 1;
803 health_code_update();
805 ret_pipe_write
= lttng_pipe_write(
806 stream_pipe
, &new_stream
, sizeof(new_stream
));
807 if (ret_pipe_write
< 0) {
808 ERR("Consumer write %s stream to pipe %d",
809 new_stream
->metadata_flag
? "metadata" : "data",
810 lttng_pipe_get_writefd(stream_pipe
));
811 if (new_stream
->metadata_flag
) {
812 consumer_del_stream_for_metadata(new_stream
);
814 consumer_del_stream_for_data(new_stream
);
816 goto error_add_stream_nosignal
;
819 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64
,
820 new_stream
->name
, fd
, new_stream
->chan
->pathname
, new_stream
->relayd_stream_id
);
823 error_add_stream_nosignal
:
825 error_add_stream_fatal
:
828 case LTTNG_CONSUMER_STREAMS_SENT
:
830 struct lttng_consumer_channel
*channel
;
834 * Get stream's channel reference. Needed when adding the stream to the
837 channel
= consumer_find_channel(msg
.u
.sent_streams
.channel_key
);
840 * We could not find the channel. Can happen if cpu hotplug
841 * happens while tearing down.
843 ERR("Unable to find channel key %" PRIu64
,
844 msg
.u
.sent_streams
.channel_key
);
845 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
848 health_code_update();
851 * Send status code to session daemon.
853 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
854 if (ret_send_status
< 0 ||
855 ret_code
!= LTTCOMM_CONSUMERD_SUCCESS
) {
856 /* Somehow, the session daemon is not responding anymore. */
857 goto error_streams_sent_nosignal
;
860 health_code_update();
863 * We should not send this message if we don't monitor the
864 * streams in this channel.
866 if (!channel
->monitor
) {
867 goto end_error_streams_sent
;
870 health_code_update();
871 /* Send stream to relayd if the stream has an ID. */
872 if (msg
.u
.sent_streams
.net_seq_idx
!= (uint64_t) -1ULL) {
873 int ret_send_relay_streams
;
875 ret_send_relay_streams
= consumer_send_relayd_streams_sent(
876 msg
.u
.sent_streams
.net_seq_idx
);
877 if (ret_send_relay_streams
< 0) {
878 goto error_streams_sent_nosignal
;
880 channel
->streams_sent_to_relayd
= true;
882 end_error_streams_sent
:
884 error_streams_sent_nosignal
:
887 case LTTNG_CONSUMER_UPDATE_STREAM
:
892 case LTTNG_CONSUMER_DESTROY_RELAYD
:
894 uint64_t index
= msg
.u
.destroy_relayd
.net_seq_idx
;
895 struct consumer_relayd_sock_pair
*relayd
;
898 DBG("Kernel consumer destroying relayd %" PRIu64
, index
);
900 /* Get relayd reference if exists. */
901 relayd
= consumer_find_relayd(index
);
902 if (relayd
== NULL
) {
903 DBG("Unable to find relayd %" PRIu64
, index
);
904 ret_code
= LTTCOMM_CONSUMERD_RELAYD_FAIL
;
908 * Each relayd socket pair has a refcount of stream attached to it
909 * which tells if the relayd is still active or not depending on the
912 * This will set the destroy flag of the relayd object and destroy it
913 * if the refcount reaches zero when called.
915 * The destroy can happen either here or when a stream fd hangs up.
918 consumer_flag_relayd_for_destroy(relayd
);
921 health_code_update();
923 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
924 if (ret_send_status
< 0) {
925 /* Somehow, the session daemon is not responding anymore. */
931 case LTTNG_CONSUMER_DATA_PENDING
:
933 int32_t ret_data_pending
;
934 uint64_t id
= msg
.u
.data_pending
.session_id
;
937 DBG("Kernel consumer data pending command for id %" PRIu64
, id
);
939 ret_data_pending
= consumer_data_pending(id
);
941 health_code_update();
943 /* Send back returned value to session daemon */
944 ret_send
= lttcomm_send_unix_sock(sock
, &ret_data_pending
,
945 sizeof(ret_data_pending
));
947 PERROR("send data pending ret code");
952 * No need to send back a status message since the data pending
953 * returned value is the response.
957 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL
:
959 struct lttng_consumer_channel
*channel
;
960 uint64_t key
= msg
.u
.snapshot_channel
.key
;
963 channel
= consumer_find_channel(key
);
965 ERR("Channel %" PRIu64
" not found", key
);
966 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
968 pthread_mutex_lock(&channel
->lock
);
969 if (msg
.u
.snapshot_channel
.metadata
== 1) {
972 ret_snapshot
= lttng_kconsumer_snapshot_metadata(
974 msg
.u
.snapshot_channel
.pathname
,
975 msg
.u
.snapshot_channel
.relayd_id
,
977 if (ret_snapshot
< 0) {
978 ERR("Snapshot metadata failed");
979 ret_code
= LTTCOMM_CONSUMERD_SNAPSHOT_FAILED
;
984 ret_snapshot
= lttng_kconsumer_snapshot_channel(
986 msg
.u
.snapshot_channel
.pathname
,
987 msg
.u
.snapshot_channel
.relayd_id
,
988 msg
.u
.snapshot_channel
989 .nb_packets_per_stream
,
991 if (ret_snapshot
< 0) {
992 ERR("Snapshot channel failed");
993 ret_code
= LTTCOMM_CONSUMERD_SNAPSHOT_FAILED
;
996 pthread_mutex_unlock(&channel
->lock
);
998 health_code_update();
1000 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1001 if (ret_send_status
< 0) {
1002 /* Somehow, the session daemon is not responding anymore. */
1007 case LTTNG_CONSUMER_DESTROY_CHANNEL
:
1009 uint64_t key
= msg
.u
.destroy_channel
.key
;
1010 struct lttng_consumer_channel
*channel
;
1011 int ret_send_status
;
1013 channel
= consumer_find_channel(key
);
1015 ERR("Kernel consumer destroy channel %" PRIu64
" not found", key
);
1016 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1019 health_code_update();
1021 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1022 if (ret_send_status
< 0) {
1023 /* Somehow, the session daemon is not responding anymore. */
1024 goto end_destroy_channel
;
1027 health_code_update();
1029 /* Stop right now if no channel was found. */
1031 goto end_destroy_channel
;
1035 * This command should ONLY be issued for channel with streams set in
1038 assert(!channel
->monitor
);
1041 * The refcount should ALWAYS be 0 in the case of a channel in no
1044 assert(!uatomic_sub_return(&channel
->refcount
, 1));
1046 consumer_del_channel(channel
);
1047 end_destroy_channel
:
1050 case LTTNG_CONSUMER_DISCARDED_EVENTS
:
1054 struct lttng_consumer_channel
*channel
;
1055 uint64_t id
= msg
.u
.discarded_events
.session_id
;
1056 uint64_t key
= msg
.u
.discarded_events
.channel_key
;
1058 DBG("Kernel consumer discarded events command for session id %"
1059 PRIu64
", channel key %" PRIu64
, id
, key
);
1061 channel
= consumer_find_channel(key
);
1063 ERR("Kernel consumer discarded events channel %"
1064 PRIu64
" not found", key
);
1067 count
= channel
->discarded_events
;
1070 health_code_update();
1072 /* Send back returned value to session daemon */
1073 ret
= lttcomm_send_unix_sock(sock
, &count
, sizeof(count
));
1075 PERROR("send discarded events");
1081 case LTTNG_CONSUMER_LOST_PACKETS
:
1085 struct lttng_consumer_channel
*channel
;
1086 uint64_t id
= msg
.u
.lost_packets
.session_id
;
1087 uint64_t key
= msg
.u
.lost_packets
.channel_key
;
1089 DBG("Kernel consumer lost packets command for session id %"
1090 PRIu64
", channel key %" PRIu64
, id
, key
);
1092 channel
= consumer_find_channel(key
);
1094 ERR("Kernel consumer lost packets channel %"
1095 PRIu64
" not found", key
);
1098 count
= channel
->lost_packets
;
1101 health_code_update();
1103 /* Send back returned value to session daemon */
1104 ret
= lttcomm_send_unix_sock(sock
, &count
, sizeof(count
));
1106 PERROR("send lost packets");
1112 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE
:
1114 int channel_monitor_pipe
;
1115 int ret_send_status
, ret_set_channel_monitor_pipe
;
1118 ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
1119 /* Successfully received the command's type. */
1120 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1121 if (ret_send_status
< 0) {
1125 ret_recv
= lttcomm_recv_fds_unix_sock(
1126 sock
, &channel_monitor_pipe
, 1);
1127 if (ret_recv
!= sizeof(channel_monitor_pipe
)) {
1128 ERR("Failed to receive channel monitor pipe");
1132 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe
);
1133 ret_set_channel_monitor_pipe
=
1134 consumer_timer_thread_set_channel_monitor_pipe(
1135 channel_monitor_pipe
);
1136 if (!ret_set_channel_monitor_pipe
) {
1140 ret_code
= LTTCOMM_CONSUMERD_SUCCESS
;
1141 /* Set the pipe as non-blocking. */
1142 ret_fcntl
= fcntl(channel_monitor_pipe
, F_GETFL
, 0);
1143 if (ret_fcntl
== -1) {
1144 PERROR("fcntl get flags of the channel monitoring pipe");
1149 ret_fcntl
= fcntl(channel_monitor_pipe
, F_SETFL
,
1150 flags
| O_NONBLOCK
);
1151 if (ret_fcntl
== -1) {
1152 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1155 DBG("Channel monitor pipe set as non-blocking");
1157 ret_code
= LTTCOMM_CONSUMERD_ALREADY_SET
;
1159 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1160 if (ret_send_status
< 0) {
1165 case LTTNG_CONSUMER_ROTATE_CHANNEL
:
1167 struct lttng_consumer_channel
*channel
;
1168 uint64_t key
= msg
.u
.rotate_channel
.key
;
1169 int ret_send_status
;
1171 DBG("Consumer rotate channel %" PRIu64
, key
);
1173 channel
= consumer_find_channel(key
);
1175 ERR("Channel %" PRIu64
" not found", key
);
1176 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1179 * Sample the rotate position of all the streams in this channel.
1181 int ret_rotate_channel
;
1183 ret_rotate_channel
= lttng_consumer_rotate_channel(
1185 msg
.u
.rotate_channel
.relayd_id
,
1186 msg
.u
.rotate_channel
.metadata
, ctx
);
1187 if (ret_rotate_channel
< 0) {
1188 ERR("Rotate channel failed");
1189 ret_code
= LTTCOMM_CONSUMERD_ROTATION_FAIL
;
1192 health_code_update();
1195 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1196 if (ret_send_status
< 0) {
1197 /* Somehow, the session daemon is not responding anymore. */
1198 goto error_rotate_channel
;
1201 /* Rotate the streams that are ready right now. */
1204 ret_rotate
= lttng_consumer_rotate_ready_streams(
1206 if (ret_rotate
< 0) {
1207 ERR("Rotate ready streams failed");
1211 error_rotate_channel
:
1214 case LTTNG_CONSUMER_CLEAR_CHANNEL
:
1216 struct lttng_consumer_channel
*channel
;
1217 uint64_t key
= msg
.u
.clear_channel
.key
;
1218 int ret_send_status
;
1220 channel
= consumer_find_channel(key
);
1222 DBG("Channel %" PRIu64
" not found", key
);
1223 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1225 int ret_clear_channel
;
1228 lttng_consumer_clear_channel(channel
);
1229 if (ret_clear_channel
) {
1230 ERR("Clear channel failed");
1231 ret_code
= ret_clear_channel
;
1234 health_code_update();
1237 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1238 if (ret_send_status
< 0) {
1239 /* Somehow, the session daemon is not responding anymore. */
1245 case LTTNG_CONSUMER_INIT
:
1247 int ret_send_status
;
1249 ret_code
= lttng_consumer_init_command(ctx
,
1250 msg
.u
.init
.sessiond_uuid
);
1251 health_code_update();
1252 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1253 if (ret_send_status
< 0) {
1254 /* Somehow, the session daemon is not responding anymore. */
1259 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK
:
1261 const struct lttng_credentials credentials
= {
1262 .uid
= LTTNG_OPTIONAL_INIT_VALUE(msg
.u
.create_trace_chunk
.credentials
.value
.uid
),
1263 .gid
= LTTNG_OPTIONAL_INIT_VALUE(msg
.u
.create_trace_chunk
.credentials
.value
.gid
),
1265 const bool is_local_trace
=
1266 !msg
.u
.create_trace_chunk
.relayd_id
.is_set
;
1267 const uint64_t relayd_id
=
1268 msg
.u
.create_trace_chunk
.relayd_id
.value
;
1269 const char *chunk_override_name
=
1270 *msg
.u
.create_trace_chunk
.override_name
?
1271 msg
.u
.create_trace_chunk
.override_name
:
1273 struct lttng_directory_handle
*chunk_directory_handle
= NULL
;
1276 * The session daemon will only provide a chunk directory file
1277 * descriptor for local traces.
1279 if (is_local_trace
) {
1281 int ret_send_status
;
1284 /* Acnowledge the reception of the command. */
1285 ret_send_status
= consumer_send_status_msg(
1286 sock
, LTTCOMM_CONSUMERD_SUCCESS
);
1287 if (ret_send_status
< 0) {
1288 /* Somehow, the session daemon is not responding anymore. */
1292 ret_recv
= lttcomm_recv_fds_unix_sock(
1293 sock
, &chunk_dirfd
, 1);
1294 if (ret_recv
!= sizeof(chunk_dirfd
)) {
1295 ERR("Failed to receive trace chunk directory file descriptor");
1299 DBG("Received trace chunk directory fd (%d)",
1301 chunk_directory_handle
= lttng_directory_handle_create_from_dirfd(
1303 if (!chunk_directory_handle
) {
1304 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1305 if (close(chunk_dirfd
)) {
1306 PERROR("Failed to close chunk directory file descriptor");
1312 ret_code
= lttng_consumer_create_trace_chunk(
1313 !is_local_trace
? &relayd_id
: NULL
,
1314 msg
.u
.create_trace_chunk
.session_id
,
1315 msg
.u
.create_trace_chunk
.chunk_id
,
1316 (time_t) msg
.u
.create_trace_chunk
1317 .creation_timestamp
,
1318 chunk_override_name
,
1319 msg
.u
.create_trace_chunk
.credentials
.is_set
?
1322 chunk_directory_handle
);
1323 lttng_directory_handle_put(chunk_directory_handle
);
1324 goto end_msg_sessiond
;
1326 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK
:
1328 enum lttng_trace_chunk_command_type close_command
=
1329 msg
.u
.close_trace_chunk
.close_command
.value
;
1330 const uint64_t relayd_id
=
1331 msg
.u
.close_trace_chunk
.relayd_id
.value
;
1332 struct lttcomm_consumer_close_trace_chunk_reply reply
;
1333 char path
[LTTNG_PATH_MAX
];
1336 ret_code
= lttng_consumer_close_trace_chunk(
1337 msg
.u
.close_trace_chunk
.relayd_id
.is_set
?
1340 msg
.u
.close_trace_chunk
.session_id
,
1341 msg
.u
.close_trace_chunk
.chunk_id
,
1342 (time_t) msg
.u
.close_trace_chunk
.close_timestamp
,
1343 msg
.u
.close_trace_chunk
.close_command
.is_set
?
1346 reply
.ret_code
= ret_code
;
1347 reply
.path_length
= strlen(path
) + 1;
1348 ret_send
= lttcomm_send_unix_sock(sock
, &reply
, sizeof(reply
));
1349 if (ret_send
!= sizeof(reply
)) {
1352 ret_send
= lttcomm_send_unix_sock(
1353 sock
, path
, reply
.path_length
);
1354 if (ret_send
!= reply
.path_length
) {
1359 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS
:
1361 const uint64_t relayd_id
=
1362 msg
.u
.trace_chunk_exists
.relayd_id
.value
;
1364 ret_code
= lttng_consumer_trace_chunk_exists(
1365 msg
.u
.trace_chunk_exists
.relayd_id
.is_set
?
1367 msg
.u
.trace_chunk_exists
.session_id
,
1368 msg
.u
.trace_chunk_exists
.chunk_id
);
1369 goto end_msg_sessiond
;
1371 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS
:
1373 const uint64_t key
= msg
.u
.open_channel_packets
.key
;
1374 struct lttng_consumer_channel
*channel
=
1375 consumer_find_channel(key
);
1378 pthread_mutex_lock(&channel
->lock
);
1379 ret_code
= lttng_consumer_open_channel_packets(channel
);
1380 pthread_mutex_unlock(&channel
->lock
);
1382 WARN("Channel %" PRIu64
" not found", key
);
1383 ret_code
= LTTCOMM_CONSUMERD_CHAN_NOT_FOUND
;
1386 health_code_update();
1387 goto end_msg_sessiond
;
1395 * Return 1 to indicate success since the 0 value can be a socket
1396 * shutdown during the recv() or send() call.
1401 /* This will issue a consumer stop. */
1406 * The returned value here is not useful since either way we'll return 1 to
1407 * the caller because the session daemon socket management is done
1408 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1411 int ret_send_status
;
1413 ret_send_status
= consumer_send_status_msg(sock
, ret_code
);
1414 if (ret_send_status
< 0) {
1422 health_code_update();
1428 * Sync metadata meaning request them to the session daemon and snapshot to the
1429 * metadata thread can consumer them.
1431 * Metadata stream lock MUST be acquired.
1433 enum sync_metadata_status
lttng_kconsumer_sync_metadata(
1434 struct lttng_consumer_stream
*metadata
)
1437 enum sync_metadata_status status
;
1441 ret
= kernctl_buffer_flush(metadata
->wait_fd
);
1443 ERR("Failed to flush kernel stream");
1444 status
= SYNC_METADATA_STATUS_ERROR
;
1448 ret
= kernctl_snapshot(metadata
->wait_fd
);
1450 if (errno
== EAGAIN
) {
1451 /* No new metadata, exit. */
1452 DBG("Sync metadata, no new kernel metadata");
1453 status
= SYNC_METADATA_STATUS_NO_DATA
;
1455 ERR("Sync metadata, taking kernel snapshot failed.");
1456 status
= SYNC_METADATA_STATUS_ERROR
;
1459 status
= SYNC_METADATA_STATUS_NEW_DATA
;
1467 int extract_common_subbuffer_info(struct lttng_consumer_stream
*stream
,
1468 struct stream_subbuffer
*subbuf
)
1472 ret
= kernctl_get_subbuf_size(
1473 stream
->wait_fd
, &subbuf
->info
.data
.subbuf_size
);
1478 ret
= kernctl_get_padded_subbuf_size(
1479 stream
->wait_fd
, &subbuf
->info
.data
.padded_subbuf_size
);
1489 int extract_metadata_subbuffer_info(struct lttng_consumer_stream
*stream
,
1490 struct stream_subbuffer
*subbuf
)
1494 ret
= extract_common_subbuffer_info(stream
, subbuf
);
1499 ret
= kernctl_get_metadata_version(
1500 stream
->wait_fd
, &subbuf
->info
.metadata
.version
);
1510 int extract_data_subbuffer_info(struct lttng_consumer_stream
*stream
,
1511 struct stream_subbuffer
*subbuf
)
1515 ret
= extract_common_subbuffer_info(stream
, subbuf
);
1520 ret
= kernctl_get_packet_size(
1521 stream
->wait_fd
, &subbuf
->info
.data
.packet_size
);
1523 PERROR("Failed to get sub-buffer packet size");
1527 ret
= kernctl_get_content_size(
1528 stream
->wait_fd
, &subbuf
->info
.data
.content_size
);
1530 PERROR("Failed to get sub-buffer content size");
1534 ret
= kernctl_get_timestamp_begin(
1535 stream
->wait_fd
, &subbuf
->info
.data
.timestamp_begin
);
1537 PERROR("Failed to get sub-buffer begin timestamp");
1541 ret
= kernctl_get_timestamp_end(
1542 stream
->wait_fd
, &subbuf
->info
.data
.timestamp_end
);
1544 PERROR("Failed to get sub-buffer end timestamp");
1548 ret
= kernctl_get_events_discarded(
1549 stream
->wait_fd
, &subbuf
->info
.data
.events_discarded
);
1551 PERROR("Failed to get sub-buffer events discarded count");
1555 ret
= kernctl_get_sequence_number(stream
->wait_fd
,
1556 &subbuf
->info
.data
.sequence_number
.value
);
1558 /* May not be supported by older LTTng-modules. */
1559 if (ret
!= -ENOTTY
) {
1560 PERROR("Failed to get sub-buffer sequence number");
1564 subbuf
->info
.data
.sequence_number
.is_set
= true;
1567 ret
= kernctl_get_stream_id(
1568 stream
->wait_fd
, &subbuf
->info
.data
.stream_id
);
1570 PERROR("Failed to get stream id");
1574 ret
= kernctl_get_instance_id(stream
->wait_fd
,
1575 &subbuf
->info
.data
.stream_instance_id
.value
);
1577 /* May not be supported by older LTTng-modules. */
1578 if (ret
!= -ENOTTY
) {
1579 PERROR("Failed to get stream instance id");
1583 subbuf
->info
.data
.stream_instance_id
.is_set
= true;
1590 int get_subbuffer_common(struct lttng_consumer_stream
*stream
,
1591 struct stream_subbuffer
*subbuffer
)
1595 ret
= kernctl_get_next_subbuf(stream
->wait_fd
);
1600 ret
= stream
->read_subbuffer_ops
.extract_subbuffer_info(
1607 int get_next_subbuffer_splice(struct lttng_consumer_stream
*stream
,
1608 struct stream_subbuffer
*subbuffer
)
1612 ret
= get_subbuffer_common(stream
, subbuffer
);
1617 subbuffer
->buffer
.fd
= stream
->wait_fd
;
1623 int get_next_subbuffer_mmap(struct lttng_consumer_stream
*stream
,
1624 struct stream_subbuffer
*subbuffer
)
1629 ret
= get_subbuffer_common(stream
, subbuffer
);
1634 ret
= get_current_subbuf_addr(stream
, &addr
);
1639 subbuffer
->buffer
.buffer
= lttng_buffer_view_init(
1640 addr
, 0, subbuffer
->info
.data
.padded_subbuf_size
);
1646 int get_next_subbuffer_metadata_check(struct lttng_consumer_stream
*stream
,
1647 struct stream_subbuffer
*subbuffer
)
1653 ret
= kernctl_get_next_subbuf_metadata_check(stream
->wait_fd
,
1659 ret
= stream
->read_subbuffer_ops
.extract_subbuffer_info(
1665 LTTNG_OPTIONAL_SET(&subbuffer
->info
.metadata
.coherent
, coherent
);
1667 ret
= get_current_subbuf_addr(stream
, &addr
);
1672 subbuffer
->buffer
.buffer
= lttng_buffer_view_init(
1673 addr
, 0, subbuffer
->info
.data
.padded_subbuf_size
);
1674 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1675 subbuffer
->info
.metadata
.padded_subbuf_size
,
1676 coherent
? "true" : "false");
1682 int put_next_subbuffer(struct lttng_consumer_stream
*stream
,
1683 struct stream_subbuffer
*subbuffer
)
1685 const int ret
= kernctl_put_next_subbuf(stream
->wait_fd
);
1688 if (ret
== -EFAULT
) {
1689 PERROR("Error in unreserving sub buffer");
1690 } else if (ret
== -EIO
) {
1691 /* Should never happen with newer LTTng versions */
1692 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
1700 bool is_get_next_check_metadata_available(int tracer_fd
)
1702 const int ret
= kernctl_get_next_subbuf_metadata_check(tracer_fd
, NULL
);
1703 const bool available
= ret
!= -ENOTTY
;
1706 /* get succeeded, make sure to put the subbuffer. */
1707 kernctl_put_subbuf(tracer_fd
);
1714 int lttng_kconsumer_set_stream_ops(
1715 struct lttng_consumer_stream
*stream
)
1719 if (stream
->metadata_flag
&& stream
->chan
->is_live
) {
1720 DBG("Attempting to enable metadata bucketization for live consumers");
1721 if (is_get_next_check_metadata_available(stream
->wait_fd
)) {
1722 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1723 stream
->read_subbuffer_ops
.get_next_subbuffer
=
1724 get_next_subbuffer_metadata_check
;
1725 ret
= consumer_stream_enable_metadata_bucketization(
1732 * The kernel tracer version is too old to indicate
1733 * when the metadata stream has reached a "coherent"
1734 * (parseable) point.
1736 * This means that a live viewer may see an incoherent
1737 * sequence of metadata and fail to parse it.
1739 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1740 metadata_bucket_destroy(stream
->metadata_bucket
);
1741 stream
->metadata_bucket
= NULL
;
1745 if (!stream
->read_subbuffer_ops
.get_next_subbuffer
) {
1746 if (stream
->chan
->output
== CONSUMER_CHANNEL_MMAP
) {
1747 stream
->read_subbuffer_ops
.get_next_subbuffer
=
1748 get_next_subbuffer_mmap
;
1750 stream
->read_subbuffer_ops
.get_next_subbuffer
=
1751 get_next_subbuffer_splice
;
1755 if (stream
->metadata_flag
) {
1756 stream
->read_subbuffer_ops
.extract_subbuffer_info
=
1757 extract_metadata_subbuffer_info
;
1759 stream
->read_subbuffer_ops
.extract_subbuffer_info
=
1760 extract_data_subbuffer_info
;
1761 if (stream
->chan
->is_live
) {
1762 stream
->read_subbuffer_ops
.send_live_beacon
=
1763 consumer_flush_kernel_index
;
1767 stream
->read_subbuffer_ops
.put_next_subbuffer
= put_next_subbuffer
;
1772 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
1779 * Don't create anything if this is set for streaming or if there is
1780 * no current trace chunk on the parent channel.
1782 if (stream
->net_seq_idx
== (uint64_t) -1ULL && stream
->chan
->monitor
&&
1783 stream
->chan
->trace_chunk
) {
1784 ret
= consumer_stream_create_output_files(stream
, true);
1790 if (stream
->output
== LTTNG_EVENT_MMAP
) {
1791 /* get the len of the mmap region */
1792 unsigned long mmap_len
;
1794 ret
= kernctl_get_mmap_len(stream
->wait_fd
, &mmap_len
);
1796 PERROR("kernctl_get_mmap_len");
1797 goto error_close_fd
;
1799 stream
->mmap_len
= (size_t) mmap_len
;
1801 stream
->mmap_base
= mmap(NULL
, stream
->mmap_len
, PROT_READ
,
1802 MAP_PRIVATE
, stream
->wait_fd
, 0);
1803 if (stream
->mmap_base
== MAP_FAILED
) {
1804 PERROR("Error mmaping");
1806 goto error_close_fd
;
1810 ret
= lttng_kconsumer_set_stream_ops(stream
);
1812 goto error_close_fd
;
1815 /* we return 0 to let the library handle the FD internally */
1819 if (stream
->out_fd
>= 0) {
1822 err
= close(stream
->out_fd
);
1824 stream
->out_fd
= -1;
1831 * Check if data is still being extracted from the buffers for a specific
1832 * stream. Consumer data lock MUST be acquired before calling this function
1833 * and the stream lock.
1835 * Return 1 if the traced data are still getting read else 0 meaning that the
1836 * data is available for trace viewer reading.
1838 int lttng_kconsumer_data_pending(struct lttng_consumer_stream
*stream
)
1844 if (stream
->endpoint_status
!= CONSUMER_ENDPOINT_ACTIVE
) {
1849 ret
= kernctl_get_next_subbuf(stream
->wait_fd
);
1851 /* There is still data so let's put back this subbuffer. */
1852 ret
= kernctl_put_subbuf(stream
->wait_fd
);
1854 ret
= 1; /* Data is pending */
1858 /* Data is NOT pending and ready to be read. */