2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/relayd.h>
34 #include <common/sessiond-comm/sessiond-comm.h>
35 #include <common/kernel-consumer/kernel-consumer.h>
36 #include <common/relayd/relayd.h>
37 #include <common/ust-consumer/ust-consumer.h>
41 struct lttng_consumer_global_data consumer_data
= {
44 .type
= LTTNG_CONSUMER_UNKNOWN
,
47 /* timeout parameter, to control the polling thread grace period. */
48 int consumer_poll_timeout
= -1;
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
56 volatile int consumer_quit
= 0;
59 * Find a stream. The consumer_data.lock must be locked during this
62 static struct lttng_consumer_stream
*consumer_find_stream(int key
)
64 struct lttng_ht_iter iter
;
65 struct lttng_ht_node_ulong
*node
;
66 struct lttng_consumer_stream
*stream
= NULL
;
68 /* Negative keys are lookup failures */
74 lttng_ht_lookup(consumer_data
.stream_ht
, (void *)((unsigned long) key
),
76 node
= lttng_ht_iter_get_node_ulong(&iter
);
78 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
86 static void consumer_steal_stream_key(int key
)
88 struct lttng_consumer_stream
*stream
;
91 stream
= consumer_find_stream(key
);
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
99 stream
->node
.key
= -1;
104 static struct lttng_consumer_channel
*consumer_find_channel(int key
)
106 struct lttng_ht_iter iter
;
107 struct lttng_ht_node_ulong
*node
;
108 struct lttng_consumer_channel
*channel
= NULL
;
110 /* Negative keys are lookup failures */
116 lttng_ht_lookup(consumer_data
.channel_ht
, (void *)((unsigned long) key
),
118 node
= lttng_ht_iter_get_node_ulong(&iter
);
120 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
128 static void consumer_steal_channel_key(int key
)
130 struct lttng_consumer_channel
*channel
;
133 channel
= consumer_find_channel(key
);
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
141 channel
->node
.key
= -1;
147 void consumer_free_stream(struct rcu_head
*head
)
149 struct lttng_ht_node_ulong
*node
=
150 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
151 struct lttng_consumer_stream
*stream
=
152 caa_container_of(node
, struct lttng_consumer_stream
, node
);
158 * RCU protected relayd socket pair free.
160 static void consumer_rcu_free_relayd(struct rcu_head
*head
)
162 struct lttng_ht_node_ulong
*node
=
163 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
164 struct consumer_relayd_sock_pair
*relayd
=
165 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
171 * Destroy and free relayd socket pair object.
173 * This function MUST be called with the consumer_data lock acquired.
175 void consumer_destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
178 struct lttng_ht_iter iter
;
180 if (relayd
== NULL
) {
184 DBG("Consumer destroy and close relayd socket pair");
186 iter
.iter
.node
= &relayd
->node
.node
;
187 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
189 /* We assume the relayd was already destroyed */
193 /* Close all sockets */
194 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
195 (void) relayd_close(&relayd
->control_sock
);
196 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
197 (void) relayd_close(&relayd
->data_sock
);
199 /* RCU free() call */
200 call_rcu(&relayd
->node
.head
, consumer_rcu_free_relayd
);
204 * Remove a stream from the global list protected by a mutex. This
205 * function is also responsible for freeing its data structures.
207 void consumer_del_stream(struct lttng_consumer_stream
*stream
)
210 struct lttng_ht_iter iter
;
211 struct lttng_consumer_channel
*free_chan
= NULL
;
212 struct consumer_relayd_sock_pair
*relayd
;
216 pthread_mutex_lock(&consumer_data
.lock
);
218 switch (consumer_data
.type
) {
219 case LTTNG_CONSUMER_KERNEL
:
220 if (stream
->mmap_base
!= NULL
) {
221 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
227 case LTTNG_CONSUMER32_UST
:
228 case LTTNG_CONSUMER64_UST
:
229 lttng_ustconsumer_del_stream(stream
);
232 ERR("Unknown consumer_data type");
238 iter
.iter
.node
= &stream
->node
.node
;
239 ret
= lttng_ht_del(consumer_data
.stream_ht
, &iter
);
244 if (consumer_data
.stream_count
<= 0) {
247 consumer_data
.stream_count
--;
251 if (stream
->out_fd
>= 0) {
252 ret
= close(stream
->out_fd
);
257 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
258 ret
= close(stream
->wait_fd
);
263 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
264 ret
= close(stream
->shm_fd
);
270 /* Check and cleanup relayd */
272 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
273 if (relayd
!= NULL
) {
274 uatomic_dec(&relayd
->refcount
);
275 assert(uatomic_read(&relayd
->refcount
) >= 0);
277 ret
= relayd_send_close_stream(&relayd
->control_sock
,
278 stream
->relayd_stream_id
,
279 stream
->next_net_seq_num
- 1);
281 ERR("Unable to close stream on the relayd. Continuing");
282 /* Continue here. There is nothing we can do for the relayd.*/
285 /* Both conditions are met, we destroy the relayd. */
286 if (uatomic_read(&relayd
->refcount
) == 0 &&
287 uatomic_read(&relayd
->destroy_flag
)) {
288 consumer_destroy_relayd(relayd
);
293 if (!--stream
->chan
->refcount
) {
294 free_chan
= stream
->chan
;
298 call_rcu(&stream
->node
.head
, consumer_free_stream
);
300 consumer_data
.need_update
= 1;
301 pthread_mutex_unlock(&consumer_data
.lock
);
304 consumer_del_channel(free_chan
);
307 struct lttng_consumer_stream
*consumer_allocate_stream(
308 int channel_key
, int stream_key
,
309 int shm_fd
, int wait_fd
,
310 enum lttng_consumer_stream_state state
,
312 enum lttng_event_output output
,
313 const char *path_name
,
319 struct lttng_consumer_stream
*stream
;
322 stream
= zmalloc(sizeof(*stream
));
323 if (stream
== NULL
) {
324 perror("malloc struct lttng_consumer_stream");
327 stream
->chan
= consumer_find_channel(channel_key
);
329 perror("Unable to find channel key");
332 stream
->chan
->refcount
++;
333 stream
->key
= stream_key
;
334 stream
->shm_fd
= shm_fd
;
335 stream
->wait_fd
= wait_fd
;
337 stream
->out_fd_offset
= 0;
338 stream
->state
= state
;
339 stream
->mmap_len
= mmap_len
;
340 stream
->mmap_base
= NULL
;
341 stream
->output
= output
;
344 stream
->net_seq_idx
= net_index
;
345 stream
->metadata_flag
= metadata_flag
;
346 strncpy(stream
->path_name
, path_name
, sizeof(stream
->path_name
));
347 stream
->path_name
[sizeof(stream
->path_name
) - 1] = '\0';
348 lttng_ht_node_init_ulong(&stream
->node
, stream
->key
);
349 lttng_ht_node_init_ulong(&stream
->waitfd_node
, stream
->wait_fd
);
351 switch (consumer_data
.type
) {
352 case LTTNG_CONSUMER_KERNEL
:
354 case LTTNG_CONSUMER32_UST
:
355 case LTTNG_CONSUMER64_UST
:
356 stream
->cpu
= stream
->chan
->cpucount
++;
357 ret
= lttng_ustconsumer_allocate_stream(stream
);
364 ERR("Unknown consumer_data type");
368 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
369 stream
->path_name
, stream
->key
,
372 (unsigned long long) stream
->mmap_len
,
374 stream
->net_seq_idx
);
380 * Add a stream to the global list protected by a mutex.
382 int consumer_add_stream(struct lttng_consumer_stream
*stream
)
385 struct lttng_ht_node_ulong
*node
;
386 struct lttng_ht_iter iter
;
387 struct consumer_relayd_sock_pair
*relayd
;
389 pthread_mutex_lock(&consumer_data
.lock
);
390 /* Steal stream identifier, for UST */
391 consumer_steal_stream_key(stream
->key
);
394 lttng_ht_lookup(consumer_data
.stream_ht
,
395 (void *)((unsigned long) stream
->key
), &iter
);
396 node
= lttng_ht_iter_get_node_ulong(&iter
);
399 /* Stream already exist. Ignore the insertion */
403 lttng_ht_add_unique_ulong(consumer_data
.stream_ht
, &stream
->node
);
405 /* Check and cleanup relayd */
406 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
407 if (relayd
!= NULL
) {
408 uatomic_inc(&relayd
->refcount
);
412 /* Update consumer data */
413 consumer_data
.stream_count
++;
414 consumer_data
.need_update
= 1;
416 switch (consumer_data
.type
) {
417 case LTTNG_CONSUMER_KERNEL
:
419 case LTTNG_CONSUMER32_UST
:
420 case LTTNG_CONSUMER64_UST
:
421 /* Streams are in CPU number order (we rely on this) */
422 stream
->cpu
= stream
->chan
->nr_streams
++;
425 ERR("Unknown consumer_data type");
431 pthread_mutex_unlock(&consumer_data
.lock
);
437 * Add relayd socket to global consumer data hashtable.
439 int consumer_add_relayd(struct consumer_relayd_sock_pair
*relayd
)
442 struct lttng_ht_node_ulong
*node
;
443 struct lttng_ht_iter iter
;
445 if (relayd
== NULL
) {
452 lttng_ht_lookup(consumer_data
.relayd_ht
,
453 (void *)((unsigned long) relayd
->net_seq_idx
), &iter
);
454 node
= lttng_ht_iter_get_node_ulong(&iter
);
457 /* Relayd already exist. Ignore the insertion */
460 lttng_ht_add_unique_ulong(consumer_data
.relayd_ht
, &relayd
->node
);
469 * Allocate and return a consumer relayd socket.
471 struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
474 struct consumer_relayd_sock_pair
*obj
= NULL
;
476 /* Negative net sequence index is a failure */
477 if (net_seq_idx
< 0) {
481 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
483 PERROR("zmalloc relayd sock");
487 obj
->net_seq_idx
= net_seq_idx
;
489 obj
->destroy_flag
= 0;
490 lttng_ht_node_init_ulong(&obj
->node
, obj
->net_seq_idx
);
491 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
498 * Find a relayd socket pair in the global consumer data.
500 * Return the object if found else NULL.
501 * RCU read-side lock must be held across this call and while using the
504 struct consumer_relayd_sock_pair
*consumer_find_relayd(int key
)
506 struct lttng_ht_iter iter
;
507 struct lttng_ht_node_ulong
*node
;
508 struct consumer_relayd_sock_pair
*relayd
= NULL
;
510 /* Negative keys are lookup failures */
515 lttng_ht_lookup(consumer_data
.relayd_ht
, (void *)((unsigned long) key
),
517 node
= lttng_ht_iter_get_node_ulong(&iter
);
519 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
527 * Handle stream for relayd transmission if the stream applies for network
528 * streaming where the net sequence index is set.
530 * Return destination file descriptor or negative value on error.
532 static int write_relayd_stream_header(struct lttng_consumer_stream
*stream
,
533 size_t data_size
, struct consumer_relayd_sock_pair
*relayd
)
536 struct lttcomm_relayd_data_hdr data_hdr
;
542 /* Reset data header */
543 memset(&data_hdr
, 0, sizeof(data_hdr
));
545 if (stream
->metadata_flag
) {
546 /* Caller MUST acquire the relayd control socket lock */
547 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
552 /* Metadata are always sent on the control socket. */
553 outfd
= relayd
->control_sock
.fd
;
555 /* Set header with stream information */
556 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
557 data_hdr
.data_size
= htobe32(data_size
);
558 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
++);
559 /* Other fields are zeroed previously */
561 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
567 /* Set to go on data socket */
568 outfd
= relayd
->data_sock
.fd
;
576 * Update a stream according to what we just received.
578 void consumer_change_stream_state(int stream_key
,
579 enum lttng_consumer_stream_state state
)
581 struct lttng_consumer_stream
*stream
;
583 pthread_mutex_lock(&consumer_data
.lock
);
584 stream
= consumer_find_stream(stream_key
);
586 stream
->state
= state
;
588 consumer_data
.need_update
= 1;
589 pthread_mutex_unlock(&consumer_data
.lock
);
593 void consumer_free_channel(struct rcu_head
*head
)
595 struct lttng_ht_node_ulong
*node
=
596 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
597 struct lttng_consumer_channel
*channel
=
598 caa_container_of(node
, struct lttng_consumer_channel
, node
);
604 * Remove a channel from the global list protected by a mutex. This
605 * function is also responsible for freeing its data structures.
607 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
610 struct lttng_ht_iter iter
;
612 pthread_mutex_lock(&consumer_data
.lock
);
614 switch (consumer_data
.type
) {
615 case LTTNG_CONSUMER_KERNEL
:
617 case LTTNG_CONSUMER32_UST
:
618 case LTTNG_CONSUMER64_UST
:
619 lttng_ustconsumer_del_channel(channel
);
622 ERR("Unknown consumer_data type");
628 iter
.iter
.node
= &channel
->node
.node
;
629 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
633 if (channel
->mmap_base
!= NULL
) {
634 ret
= munmap(channel
->mmap_base
, channel
->mmap_len
);
639 if (channel
->wait_fd
>= 0 && !channel
->wait_fd_is_copy
) {
640 ret
= close(channel
->wait_fd
);
645 if (channel
->shm_fd
>= 0 && channel
->wait_fd
!= channel
->shm_fd
) {
646 ret
= close(channel
->shm_fd
);
652 call_rcu(&channel
->node
.head
, consumer_free_channel
);
654 pthread_mutex_unlock(&consumer_data
.lock
);
657 struct lttng_consumer_channel
*consumer_allocate_channel(
659 int shm_fd
, int wait_fd
,
661 uint64_t max_sb_size
)
663 struct lttng_consumer_channel
*channel
;
666 channel
= zmalloc(sizeof(*channel
));
667 if (channel
== NULL
) {
668 perror("malloc struct lttng_consumer_channel");
671 channel
->key
= channel_key
;
672 channel
->shm_fd
= shm_fd
;
673 channel
->wait_fd
= wait_fd
;
674 channel
->mmap_len
= mmap_len
;
675 channel
->max_sb_size
= max_sb_size
;
676 channel
->refcount
= 0;
677 channel
->nr_streams
= 0;
678 lttng_ht_node_init_ulong(&channel
->node
, channel
->key
);
680 switch (consumer_data
.type
) {
681 case LTTNG_CONSUMER_KERNEL
:
682 channel
->mmap_base
= NULL
;
683 channel
->mmap_len
= 0;
685 case LTTNG_CONSUMER32_UST
:
686 case LTTNG_CONSUMER64_UST
:
687 ret
= lttng_ustconsumer_allocate_channel(channel
);
694 ERR("Unknown consumer_data type");
698 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
699 channel
->key
, channel
->shm_fd
, channel
->wait_fd
,
700 (unsigned long long) channel
->mmap_len
,
701 (unsigned long long) channel
->max_sb_size
);
707 * Add a channel to the global list protected by a mutex.
709 int consumer_add_channel(struct lttng_consumer_channel
*channel
)
711 struct lttng_ht_node_ulong
*node
;
712 struct lttng_ht_iter iter
;
714 pthread_mutex_lock(&consumer_data
.lock
);
715 /* Steal channel identifier, for UST */
716 consumer_steal_channel_key(channel
->key
);
719 lttng_ht_lookup(consumer_data
.channel_ht
,
720 (void *)((unsigned long) channel
->key
), &iter
);
721 node
= lttng_ht_iter_get_node_ulong(&iter
);
723 /* Channel already exist. Ignore the insertion */
727 lttng_ht_add_unique_ulong(consumer_data
.channel_ht
, &channel
->node
);
731 pthread_mutex_unlock(&consumer_data
.lock
);
737 * Allocate the pollfd structure and the local view of the out fds to avoid
738 * doing a lookup in the linked list and concurrency issues when writing is
739 * needed. Called with consumer_data.lock held.
741 * Returns the number of fds in the structures.
743 int consumer_update_poll_array(
744 struct lttng_consumer_local_data
*ctx
, struct pollfd
**pollfd
,
745 struct lttng_consumer_stream
**local_stream
,
746 struct lttng_ht
*metadata_ht
)
749 struct lttng_ht_iter iter
;
750 struct lttng_consumer_stream
*stream
;
752 DBG("Updating poll fd array");
754 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, stream
,
756 if (stream
->state
!= LTTNG_CONSUMER_ACTIVE_STREAM
) {
759 DBG("Active FD %d", stream
->wait_fd
);
760 (*pollfd
)[i
].fd
= stream
->wait_fd
;
761 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
762 if (stream
->metadata_flag
&& metadata_ht
) {
763 lttng_ht_add_unique_ulong(metadata_ht
, &stream
->waitfd_node
);
764 DBG("Active FD added to metadata hash table");
766 local_stream
[i
] = stream
;
772 * Insert the consumer_poll_pipe at the end of the array and don't
773 * increment i so nb_fd is the number of real FD.
775 (*pollfd
)[i
].fd
= ctx
->consumer_poll_pipe
[0];
776 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
781 * Poll on the should_quit pipe and the command socket return -1 on error and
782 * should exit, 0 if data is available on the command socket
784 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
789 num_rdy
= poll(consumer_sockpoll
, 2, -1);
792 * Restart interrupted system call.
794 if (errno
== EINTR
) {
797 perror("Poll error");
800 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
801 DBG("consumer_should_quit wake up");
811 * Set the error socket.
813 void lttng_consumer_set_error_sock(
814 struct lttng_consumer_local_data
*ctx
, int sock
)
816 ctx
->consumer_error_socket
= sock
;
820 * Set the command socket path.
822 void lttng_consumer_set_command_sock_path(
823 struct lttng_consumer_local_data
*ctx
, char *sock
)
825 ctx
->consumer_command_sock_path
= sock
;
829 * Send return code to the session daemon.
830 * If the socket is not defined, we return 0, it is not a fatal error
832 int lttng_consumer_send_error(
833 struct lttng_consumer_local_data
*ctx
, int cmd
)
835 if (ctx
->consumer_error_socket
> 0) {
836 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
837 sizeof(enum lttcomm_sessiond_command
));
844 * Close all the tracefiles and stream fds, should be called when all instances
847 void lttng_consumer_cleanup(void)
849 struct lttng_ht_iter iter
;
850 struct lttng_ht_node_ulong
*node
;
855 * close all outfd. Called when there are no more threads running (after
856 * joining on the threads), no need to protect list iteration with mutex.
858 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, node
,
860 struct lttng_consumer_stream
*stream
=
861 caa_container_of(node
, struct lttng_consumer_stream
, node
);
862 consumer_del_stream(stream
);
865 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, node
,
867 struct lttng_consumer_channel
*channel
=
868 caa_container_of(node
, struct lttng_consumer_channel
, node
);
869 consumer_del_channel(channel
);
874 lttng_ht_destroy(consumer_data
.stream_ht
);
875 lttng_ht_destroy(consumer_data
.channel_ht
);
879 * Called from signal handler.
881 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
886 ret
= write(ctx
->consumer_should_quit
[1], "4", 1);
887 } while (ret
< 0 && errno
== EINTR
);
889 perror("write consumer quit");
893 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
896 int outfd
= stream
->out_fd
;
899 * This does a blocking write-and-wait on any page that belongs to the
900 * subbuffer prior to the one we just wrote.
901 * Don't care about error values, as these are just hints and ways to
902 * limit the amount of page cache used.
904 if (orig_offset
< stream
->chan
->max_sb_size
) {
907 lttng_sync_file_range(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
908 stream
->chan
->max_sb_size
,
909 SYNC_FILE_RANGE_WAIT_BEFORE
910 | SYNC_FILE_RANGE_WRITE
911 | SYNC_FILE_RANGE_WAIT_AFTER
);
913 * Give hints to the kernel about how we access the file:
914 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
917 * We need to call fadvise again after the file grows because the
918 * kernel does not seem to apply fadvise to non-existing parts of the
921 * Call fadvise _after_ having waited for the page writeback to
922 * complete because the dirty page writeback semantic is not well
923 * defined. So it can be expected to lead to lower throughput in
926 posix_fadvise(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
927 stream
->chan
->max_sb_size
, POSIX_FADV_DONTNEED
);
931 * Initialise the necessary environnement :
932 * - create a new context
933 * - create the poll_pipe
934 * - create the should_quit pipe (for signal handler)
935 * - create the thread pipe (for splice)
937 * Takes a function pointer as argument, this function is called when data is
938 * available on a buffer. This function is responsible to do the
939 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
940 * buffer configuration and then kernctl_put_next_subbuf at the end.
942 * Returns a pointer to the new context or NULL on error.
944 struct lttng_consumer_local_data
*lttng_consumer_create(
945 enum lttng_consumer_type type
,
946 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
947 struct lttng_consumer_local_data
*ctx
),
948 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
949 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
950 int (*update_stream
)(int stream_key
, uint32_t state
))
953 struct lttng_consumer_local_data
*ctx
;
955 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
956 consumer_data
.type
== type
);
957 consumer_data
.type
= type
;
959 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
961 perror("allocating context");
965 ctx
->consumer_error_socket
= -1;
966 /* assign the callbacks */
967 ctx
->on_buffer_ready
= buffer_ready
;
968 ctx
->on_recv_channel
= recv_channel
;
969 ctx
->on_recv_stream
= recv_stream
;
970 ctx
->on_update_stream
= update_stream
;
972 ret
= pipe(ctx
->consumer_poll_pipe
);
974 perror("Error creating poll pipe");
975 goto error_poll_pipe
;
978 /* set read end of the pipe to non-blocking */
979 ret
= fcntl(ctx
->consumer_poll_pipe
[0], F_SETFL
, O_NONBLOCK
);
981 perror("fcntl O_NONBLOCK");
982 goto error_poll_fcntl
;
985 /* set write end of the pipe to non-blocking */
986 ret
= fcntl(ctx
->consumer_poll_pipe
[1], F_SETFL
, O_NONBLOCK
);
988 perror("fcntl O_NONBLOCK");
989 goto error_poll_fcntl
;
992 ret
= pipe(ctx
->consumer_should_quit
);
994 perror("Error creating recv pipe");
995 goto error_quit_pipe
;
998 ret
= pipe(ctx
->consumer_thread_pipe
);
1000 perror("Error creating thread pipe");
1001 goto error_thread_pipe
;
1008 for (i
= 0; i
< 2; i
++) {
1011 err
= close(ctx
->consumer_should_quit
[i
]);
1018 for (i
= 0; i
< 2; i
++) {
1021 err
= close(ctx
->consumer_poll_pipe
[i
]);
1033 * Close all fds associated with the instance and free the context.
1035 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1039 ret
= close(ctx
->consumer_error_socket
);
1043 ret
= close(ctx
->consumer_thread_pipe
[0]);
1047 ret
= close(ctx
->consumer_thread_pipe
[1]);
1051 ret
= close(ctx
->consumer_poll_pipe
[0]);
1055 ret
= close(ctx
->consumer_poll_pipe
[1]);
1059 ret
= close(ctx
->consumer_should_quit
[0]);
1063 ret
= close(ctx
->consumer_should_quit
[1]);
1067 unlink(ctx
->consumer_command_sock_path
);
1072 * Write the metadata stream id on the specified file descriptor.
1074 static int write_relayd_metadata_id(int fd
,
1075 struct lttng_consumer_stream
*stream
,
1076 struct consumer_relayd_sock_pair
*relayd
)
1079 uint64_t metadata_id
;
1081 metadata_id
= htobe64(stream
->relayd_stream_id
);
1083 ret
= write(fd
, (void *) &metadata_id
,
1084 sizeof(stream
->relayd_stream_id
));
1085 } while (ret
< 0 && errno
== EINTR
);
1087 PERROR("write metadata stream id");
1090 DBG("Metadata stream id %zu written before data",
1091 stream
->relayd_stream_id
);
1098 * Mmap the ring buffer, read it and write the data to the tracefile.
1100 * Returns the number of bytes written
1102 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1103 struct lttng_consumer_local_data
*ctx
,
1104 struct lttng_consumer_stream
*stream
, unsigned long len
)
1106 unsigned long mmap_offset
;
1107 ssize_t ret
= 0, written
= 0;
1108 off_t orig_offset
= stream
->out_fd_offset
;
1109 /* Default is on the disk */
1110 int outfd
= stream
->out_fd
;
1111 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1113 /* RCU lock for the relayd pointer */
1116 /* Flag that the current stream if set for network streaming. */
1117 if (stream
->net_seq_idx
!= -1) {
1118 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1119 if (relayd
== NULL
) {
1124 /* get the offset inside the fd to mmap */
1125 switch (consumer_data
.type
) {
1126 case LTTNG_CONSUMER_KERNEL
:
1127 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
1129 case LTTNG_CONSUMER32_UST
:
1130 case LTTNG_CONSUMER64_UST
:
1131 ret
= lttng_ustctl_get_mmap_read_offset(stream
->chan
->handle
,
1132 stream
->buf
, &mmap_offset
);
1135 ERR("Unknown consumer_data type");
1140 PERROR("tracer ctl get_mmap_read_offset");
1145 /* Handle stream on the relayd if the output is on the network */
1147 unsigned long netlen
= len
;
1150 * Lock the control socket for the complete duration of the function
1151 * since from this point on we will use the socket.
1153 if (stream
->metadata_flag
) {
1154 /* Metadata requires the control socket. */
1155 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1156 netlen
+= sizeof(stream
->relayd_stream_id
);
1159 ret
= write_relayd_stream_header(stream
, netlen
, relayd
);
1161 /* Use the returned socket. */
1164 /* Write metadata stream id before payload */
1165 if (stream
->metadata_flag
) {
1166 ret
= write_relayd_metadata_id(outfd
, stream
, relayd
);
1173 * We do this so the return value can match the len passed as
1174 * argument to this function.
1176 written
-= sizeof(stream
->relayd_stream_id
);
1179 /* Else, use the default set before which is the filesystem. */
1184 ret
= write(outfd
, stream
->mmap_base
+ mmap_offset
, len
);
1185 } while (ret
< 0 && errno
== EINTR
);
1187 PERROR("Error in file write");
1192 } else if (ret
> len
) {
1193 PERROR("Error in file write (ret %ld > len %lu)", ret
, len
);
1200 DBG("Consumer mmap write() ret %ld (len %lu)", ret
, len
);
1202 /* This call is useless on a socket so better save a syscall. */
1204 /* This won't block, but will start writeout asynchronously */
1205 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret
,
1206 SYNC_FILE_RANGE_WRITE
);
1207 stream
->out_fd_offset
+= ret
;
1211 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1214 /* Unlock only if ctrl socket used */
1215 if (relayd
&& stream
->metadata_flag
) {
1216 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1224 * Splice the data from the ring buffer to the tracefile.
1226 * Returns the number of bytes spliced.
1228 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1229 struct lttng_consumer_local_data
*ctx
,
1230 struct lttng_consumer_stream
*stream
, unsigned long len
)
1232 ssize_t ret
= 0, written
= 0, ret_splice
= 0;
1234 off_t orig_offset
= stream
->out_fd_offset
;
1235 int fd
= stream
->wait_fd
;
1236 /* Default is on the disk */
1237 int outfd
= stream
->out_fd
;
1238 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1240 switch (consumer_data
.type
) {
1241 case LTTNG_CONSUMER_KERNEL
:
1243 case LTTNG_CONSUMER32_UST
:
1244 case LTTNG_CONSUMER64_UST
:
1245 /* Not supported for user space tracing */
1248 ERR("Unknown consumer_data type");
1252 /* RCU lock for the relayd pointer */
1255 /* Flag that the current stream if set for network streaming. */
1256 if (stream
->net_seq_idx
!= -1) {
1257 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1258 if (relayd
== NULL
) {
1263 /* Write metadata stream id before payload */
1264 if (stream
->metadata_flag
&& relayd
) {
1266 * Lock the control socket for the complete duration of the function
1267 * since from this point on we will use the socket.
1269 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1271 ret
= write_relayd_metadata_id(ctx
->consumer_thread_pipe
[1],
1280 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
1281 (unsigned long)offset
, len
, fd
);
1282 ret_splice
= splice(fd
, &offset
, ctx
->consumer_thread_pipe
[1], NULL
, len
,
1283 SPLICE_F_MOVE
| SPLICE_F_MORE
);
1284 DBG("splice chan to pipe, ret %zd", ret_splice
);
1285 if (ret_splice
< 0) {
1286 PERROR("Error in relay splice");
1288 written
= ret_splice
;
1294 /* Handle stream on the relayd if the output is on the network */
1296 if (stream
->metadata_flag
) {
1297 /* Update counter to fit the spliced data */
1298 ret_splice
+= sizeof(stream
->relayd_stream_id
);
1299 len
+= sizeof(stream
->relayd_stream_id
);
1301 * We do this so the return value can match the len passed as
1302 * argument to this function.
1304 written
-= sizeof(stream
->relayd_stream_id
);
1307 ret
= write_relayd_stream_header(stream
, ret_splice
, relayd
);
1309 /* Use the returned socket. */
1312 ERR("Remote relayd disconnected. Stopping");
1317 /* Splice data out */
1318 ret_splice
= splice(ctx
->consumer_thread_pipe
[0], NULL
, outfd
, NULL
,
1319 ret_splice
, SPLICE_F_MOVE
| SPLICE_F_MORE
);
1320 DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice
);
1321 if (ret_splice
< 0) {
1322 PERROR("Error in file splice");
1324 written
= ret_splice
;
1328 } else if (ret_splice
> len
) {
1330 PERROR("Wrote more data than requested %zd (len: %lu)",
1332 written
+= ret_splice
;
1338 /* This call is useless on a socket so better save a syscall. */
1340 /* This won't block, but will start writeout asynchronously */
1341 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret_splice
,
1342 SYNC_FILE_RANGE_WRITE
);
1343 stream
->out_fd_offset
+= ret_splice
;
1345 written
+= ret_splice
;
1347 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1354 /* send the appropriate error description to sessiond */
1357 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_EBADF
);
1360 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_EINVAL
);
1363 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_ENOMEM
);
1366 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_ESPIPE
);
1371 if (relayd
&& stream
->metadata_flag
) {
1372 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1380 * Take a snapshot for a specific fd
1382 * Returns 0 on success, < 0 on error
1384 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data
*ctx
,
1385 struct lttng_consumer_stream
*stream
)
1387 switch (consumer_data
.type
) {
1388 case LTTNG_CONSUMER_KERNEL
:
1389 return lttng_kconsumer_take_snapshot(ctx
, stream
);
1390 case LTTNG_CONSUMER32_UST
:
1391 case LTTNG_CONSUMER64_UST
:
1392 return lttng_ustconsumer_take_snapshot(ctx
, stream
);
1394 ERR("Unknown consumer_data type");
1402 * Get the produced position
1404 * Returns 0 on success, < 0 on error
1406 int lttng_consumer_get_produced_snapshot(
1407 struct lttng_consumer_local_data
*ctx
,
1408 struct lttng_consumer_stream
*stream
,
1411 switch (consumer_data
.type
) {
1412 case LTTNG_CONSUMER_KERNEL
:
1413 return lttng_kconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1414 case LTTNG_CONSUMER32_UST
:
1415 case LTTNG_CONSUMER64_UST
:
1416 return lttng_ustconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1418 ERR("Unknown consumer_data type");
1424 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
1425 int sock
, struct pollfd
*consumer_sockpoll
)
1427 switch (consumer_data
.type
) {
1428 case LTTNG_CONSUMER_KERNEL
:
1429 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1430 case LTTNG_CONSUMER32_UST
:
1431 case LTTNG_CONSUMER64_UST
:
1432 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1434 ERR("Unknown consumer_data type");
1441 * This thread polls the fds in the set to consume the data and write
1442 * it to tracefile if necessary.
1444 void *lttng_consumer_thread_poll_fds(void *data
)
1446 int num_rdy
, num_hup
, high_prio
, ret
, i
;
1447 struct pollfd
*pollfd
= NULL
;
1448 /* local view of the streams */
1449 struct lttng_consumer_stream
**local_stream
= NULL
;
1450 /* local view of consumer_data.fds_count */
1452 struct lttng_consumer_local_data
*ctx
= data
;
1453 struct lttng_ht
*metadata_ht
;
1454 struct lttng_ht_iter iter
;
1455 struct lttng_ht_node_ulong
*node
;
1456 struct lttng_consumer_stream
*metadata_stream
;
1459 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1461 rcu_register_thread();
1463 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
1470 * the fds set has been updated, we need to update our
1471 * local array as well
1473 pthread_mutex_lock(&consumer_data
.lock
);
1474 if (consumer_data
.need_update
) {
1475 if (pollfd
!= NULL
) {
1479 if (local_stream
!= NULL
) {
1481 local_stream
= NULL
;
1484 /* allocate for all fds + 1 for the consumer_poll_pipe */
1485 pollfd
= zmalloc((consumer_data
.stream_count
+ 1) * sizeof(struct pollfd
));
1486 if (pollfd
== NULL
) {
1487 perror("pollfd malloc");
1488 pthread_mutex_unlock(&consumer_data
.lock
);
1492 /* allocate for all fds + 1 for the consumer_poll_pipe */
1493 local_stream
= zmalloc((consumer_data
.stream_count
+ 1) *
1494 sizeof(struct lttng_consumer_stream
));
1495 if (local_stream
== NULL
) {
1496 perror("local_stream malloc");
1497 pthread_mutex_unlock(&consumer_data
.lock
);
1500 ret
= consumer_update_poll_array(ctx
, &pollfd
, local_stream
,
1503 ERR("Error in allocating pollfd or local_outfds");
1504 lttng_consumer_send_error(ctx
, CONSUMERD_POLL_ERROR
);
1505 pthread_mutex_unlock(&consumer_data
.lock
);
1509 consumer_data
.need_update
= 0;
1511 pthread_mutex_unlock(&consumer_data
.lock
);
1513 /* No FDs and consumer_quit, consumer_cleanup the thread */
1514 if (nb_fd
== 0 && consumer_quit
== 1) {
1517 /* poll on the array of fds */
1519 DBG("polling on %d fd", nb_fd
+ 1);
1520 num_rdy
= poll(pollfd
, nb_fd
+ 1, consumer_poll_timeout
);
1521 DBG("poll num_rdy : %d", num_rdy
);
1522 if (num_rdy
== -1) {
1524 * Restart interrupted system call.
1526 if (errno
== EINTR
) {
1529 perror("Poll error");
1530 lttng_consumer_send_error(ctx
, CONSUMERD_POLL_ERROR
);
1532 } else if (num_rdy
== 0) {
1533 DBG("Polling thread timed out");
1538 * If the consumer_poll_pipe triggered poll go directly to the
1539 * beginning of the loop to update the array. We want to prioritize
1540 * array update over low-priority reads.
1542 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
1543 size_t pipe_readlen
;
1546 DBG("consumer_poll_pipe wake up");
1547 /* Consume 1 byte of pipe data */
1549 pipe_readlen
= read(ctx
->consumer_poll_pipe
[0], &tmp
, 1);
1550 } while (pipe_readlen
== -1 && errno
== EINTR
);
1554 /* Take care of high priority channels first. */
1555 for (i
= 0; i
< nb_fd
; i
++) {
1556 /* Lookup for metadata which is the highest priority */
1557 lttng_ht_lookup(metadata_ht
,
1558 (void *)((unsigned long) pollfd
[i
].fd
), &iter
);
1559 node
= lttng_ht_iter_get_node_ulong(&iter
);
1561 (pollfd
[i
].revents
& (POLLIN
| POLLPRI
))) {
1562 DBG("Urgent metadata read on fd %d", pollfd
[i
].fd
);
1563 metadata_stream
= caa_container_of(node
,
1564 struct lttng_consumer_stream
, waitfd_node
);
1566 len
= ctx
->on_buffer_ready(metadata_stream
, ctx
);
1567 /* it's ok to have an unavailable sub-buffer */
1568 if (len
< 0 && len
!= -EAGAIN
) {
1570 } else if (len
> 0) {
1571 metadata_stream
->data_read
= 1;
1573 } else if (pollfd
[i
].revents
& POLLPRI
) {
1574 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
1576 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1577 /* it's ok to have an unavailable sub-buffer */
1578 if (len
< 0 && len
!= -EAGAIN
) {
1580 } else if (len
> 0) {
1581 local_stream
[i
]->data_read
= 1;
1587 * If we read high prio channel in this loop, try again
1588 * for more high prio data.
1594 /* Take care of low priority channels. */
1595 for (i
= 0; i
< nb_fd
; i
++) {
1596 if ((pollfd
[i
].revents
& POLLIN
) ||
1597 local_stream
[i
]->hangup_flush_done
) {
1598 DBG("Normal read on fd %d", pollfd
[i
].fd
);
1599 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1600 /* it's ok to have an unavailable sub-buffer */
1601 if (len
< 0 && len
!= -EAGAIN
) {
1603 } else if (len
> 0) {
1604 local_stream
[i
]->data_read
= 1;
1609 /* Handle hangup and errors */
1610 for (i
= 0; i
< nb_fd
; i
++) {
1611 if (!local_stream
[i
]->hangup_flush_done
1612 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
1613 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
1614 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
1615 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1617 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
1618 /* Attempt read again, for the data we just flushed. */
1619 local_stream
[i
]->data_read
= 1;
1622 * If the poll flag is HUP/ERR/NVAL and we have
1623 * read no data in this pass, we can remove the
1624 * stream from its hash table.
1626 if ((pollfd
[i
].revents
& POLLHUP
)) {
1627 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
1628 if (!local_stream
[i
]->data_read
) {
1629 if (local_stream
[i
]->metadata_flag
) {
1630 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1631 ret
= lttng_ht_del(metadata_ht
, &iter
);
1634 consumer_del_stream(local_stream
[i
]);
1637 } else if (pollfd
[i
].revents
& POLLERR
) {
1638 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
1639 if (!local_stream
[i
]->data_read
) {
1640 if (local_stream
[i
]->metadata_flag
) {
1641 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1642 ret
= lttng_ht_del(metadata_ht
, &iter
);
1645 consumer_del_stream(local_stream
[i
]);
1648 } else if (pollfd
[i
].revents
& POLLNVAL
) {
1649 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
1650 if (!local_stream
[i
]->data_read
) {
1651 if (local_stream
[i
]->metadata_flag
) {
1652 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1653 ret
= lttng_ht_del(metadata_ht
, &iter
);
1656 consumer_del_stream(local_stream
[i
]);
1660 local_stream
[i
]->data_read
= 0;
1664 DBG("polling thread exiting");
1665 if (pollfd
!= NULL
) {
1669 if (local_stream
!= NULL
) {
1671 local_stream
= NULL
;
1673 rcu_unregister_thread();
1678 * This thread listens on the consumerd socket and receives the file
1679 * descriptors from the session daemon.
1681 void *lttng_consumer_thread_receive_fds(void *data
)
1683 int sock
, client_socket
, ret
;
1685 * structure to poll for incoming data on communication socket avoids
1686 * making blocking sockets.
1688 struct pollfd consumer_sockpoll
[2];
1689 struct lttng_consumer_local_data
*ctx
= data
;
1691 rcu_register_thread();
1693 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
1694 unlink(ctx
->consumer_command_sock_path
);
1695 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
1696 if (client_socket
< 0) {
1697 ERR("Cannot create command socket");
1701 ret
= lttcomm_listen_unix_sock(client_socket
);
1706 DBG("Sending ready command to lttng-sessiond");
1707 ret
= lttng_consumer_send_error(ctx
, CONSUMERD_COMMAND_SOCK_READY
);
1708 /* return < 0 on error, but == 0 is not fatal */
1710 ERR("Error sending ready command to lttng-sessiond");
1714 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
1716 perror("fcntl O_NONBLOCK");
1720 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1721 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
1722 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
1723 consumer_sockpoll
[1].fd
= client_socket
;
1724 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
1726 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
1729 DBG("Connection on client_socket");
1731 /* Blocking call, waiting for transmission */
1732 sock
= lttcomm_accept_unix_sock(client_socket
);
1737 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
1739 perror("fcntl O_NONBLOCK");
1743 /* update the polling structure to poll on the established socket */
1744 consumer_sockpoll
[1].fd
= sock
;
1745 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
1748 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
1751 DBG("Incoming command on sock");
1752 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1753 if (ret
== -ENOENT
) {
1754 DBG("Received STOP command");
1758 ERR("Communication interrupted on command socket");
1761 if (consumer_quit
) {
1762 DBG("consumer_thread_receive_fds received quit from signal");
1765 DBG("received fds on sock");
1768 DBG("consumer_thread_receive_fds exiting");
1771 * when all fds have hung up, the polling thread
1777 * 2s of grace period, if no polling events occur during
1778 * this period, the polling thread will exit even if there
1779 * are still open FDs (should not happen, but safety mechanism).
1781 consumer_poll_timeout
= LTTNG_CONSUMER_POLL_TIMEOUT
;
1784 * Wake-up the other end by writing a null byte in the pipe
1785 * (non-blocking). Important note: Because writing into the
1786 * pipe is non-blocking (and therefore we allow dropping wakeup
1787 * data, as long as there is wakeup data present in the pipe
1788 * buffer to wake up the other end), the other end should
1789 * perform the following sequence for waiting:
1790 * 1) empty the pipe (reads).
1791 * 2) perform update operation.
1792 * 3) wait on the pipe (poll).
1795 ret
= write(ctx
->consumer_poll_pipe
[1], "", 1);
1796 } while (ret
< 0 && errno
== EINTR
);
1797 rcu_unregister_thread();
1801 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
1802 struct lttng_consumer_local_data
*ctx
)
1804 switch (consumer_data
.type
) {
1805 case LTTNG_CONSUMER_KERNEL
:
1806 return lttng_kconsumer_read_subbuffer(stream
, ctx
);
1807 case LTTNG_CONSUMER32_UST
:
1808 case LTTNG_CONSUMER64_UST
:
1809 return lttng_ustconsumer_read_subbuffer(stream
, ctx
);
1811 ERR("Unknown consumer_data type");
1817 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
1819 switch (consumer_data
.type
) {
1820 case LTTNG_CONSUMER_KERNEL
:
1821 return lttng_kconsumer_on_recv_stream(stream
);
1822 case LTTNG_CONSUMER32_UST
:
1823 case LTTNG_CONSUMER64_UST
:
1824 return lttng_ustconsumer_on_recv_stream(stream
);
1826 ERR("Unknown consumer_data type");
1833 * Allocate and set consumer data hash tables.
1835 void lttng_consumer_init(void)
1837 consumer_data
.stream_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1838 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1839 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);