2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
31 #include <common/common.h>
32 #include <common/kernel-ctl/kernel-ctl.h>
33 #include <common/sessiond-comm/relayd.h>
34 #include <common/sessiond-comm/sessiond-comm.h>
35 #include <common/kernel-consumer/kernel-consumer.h>
36 #include <common/relayd/relayd.h>
37 #include <common/ust-consumer/ust-consumer.h>
41 struct lttng_consumer_global_data consumer_data
= {
44 .type
= LTTNG_CONSUMER_UNKNOWN
,
47 /* timeout parameter, to control the polling thread grace period. */
48 int consumer_poll_timeout
= -1;
51 * Flag to inform the polling thread to quit when all fd hung up. Updated by
52 * the consumer_thread_receive_fds when it notices that all fds has hung up.
53 * Also updated by the signal handler (consumer_should_exit()). Read by the
56 volatile int consumer_quit
= 0;
59 * Find a stream. The consumer_data.lock must be locked during this
62 static struct lttng_consumer_stream
*consumer_find_stream(int key
)
64 struct lttng_ht_iter iter
;
65 struct lttng_ht_node_ulong
*node
;
66 struct lttng_consumer_stream
*stream
= NULL
;
68 /* Negative keys are lookup failures */
74 lttng_ht_lookup(consumer_data
.stream_ht
, (void *)((unsigned long) key
),
76 node
= lttng_ht_iter_get_node_ulong(&iter
);
78 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
86 static void consumer_steal_stream_key(int key
)
88 struct lttng_consumer_stream
*stream
;
91 stream
= consumer_find_stream(key
);
95 * We don't want the lookup to match, but we still need
96 * to iterate on this stream when iterating over the hash table. Just
97 * change the node key.
99 stream
->node
.key
= -1;
104 static struct lttng_consumer_channel
*consumer_find_channel(int key
)
106 struct lttng_ht_iter iter
;
107 struct lttng_ht_node_ulong
*node
;
108 struct lttng_consumer_channel
*channel
= NULL
;
110 /* Negative keys are lookup failures */
116 lttng_ht_lookup(consumer_data
.channel_ht
, (void *)((unsigned long) key
),
118 node
= lttng_ht_iter_get_node_ulong(&iter
);
120 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
128 static void consumer_steal_channel_key(int key
)
130 struct lttng_consumer_channel
*channel
;
133 channel
= consumer_find_channel(key
);
137 * We don't want the lookup to match, but we still need
138 * to iterate on this channel when iterating over the hash table. Just
139 * change the node key.
141 channel
->node
.key
= -1;
147 void consumer_free_stream(struct rcu_head
*head
)
149 struct lttng_ht_node_ulong
*node
=
150 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
151 struct lttng_consumer_stream
*stream
=
152 caa_container_of(node
, struct lttng_consumer_stream
, node
);
158 * RCU protected relayd socket pair free.
160 static void consumer_rcu_free_relayd(struct rcu_head
*head
)
162 struct lttng_ht_node_ulong
*node
=
163 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
164 struct consumer_relayd_sock_pair
*relayd
=
165 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
171 * Destroy and free relayd socket pair object.
173 * This function MUST be called with the consumer_data lock acquired.
175 void consumer_destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
178 struct lttng_ht_iter iter
;
180 if (relayd
== NULL
) {
184 DBG("Consumer destroy and close relayd socket pair");
186 iter
.iter
.node
= &relayd
->node
.node
;
187 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
189 /* We assume the relayd was already destroyed */
193 /* Close all sockets */
194 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
195 (void) relayd_close(&relayd
->control_sock
);
196 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
197 (void) relayd_close(&relayd
->data_sock
);
199 /* RCU free() call */
200 call_rcu(&relayd
->node
.head
, consumer_rcu_free_relayd
);
204 * Remove a stream from the global list protected by a mutex. This
205 * function is also responsible for freeing its data structures.
207 void consumer_del_stream(struct lttng_consumer_stream
*stream
)
210 struct lttng_ht_iter iter
;
211 struct lttng_consumer_channel
*free_chan
= NULL
;
212 struct consumer_relayd_sock_pair
*relayd
;
216 pthread_mutex_lock(&consumer_data
.lock
);
218 switch (consumer_data
.type
) {
219 case LTTNG_CONSUMER_KERNEL
:
220 if (stream
->mmap_base
!= NULL
) {
221 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
227 case LTTNG_CONSUMER32_UST
:
228 case LTTNG_CONSUMER64_UST
:
229 lttng_ustconsumer_del_stream(stream
);
232 ERR("Unknown consumer_data type");
238 iter
.iter
.node
= &stream
->node
.node
;
239 ret
= lttng_ht_del(consumer_data
.stream_ht
, &iter
);
244 if (consumer_data
.stream_count
<= 0) {
247 consumer_data
.stream_count
--;
251 if (stream
->out_fd
>= 0) {
252 ret
= close(stream
->out_fd
);
257 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
258 ret
= close(stream
->wait_fd
);
263 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
264 ret
= close(stream
->shm_fd
);
270 /* Check and cleanup relayd */
272 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
273 if (relayd
!= NULL
) {
274 uatomic_dec(&relayd
->refcount
);
275 assert(uatomic_read(&relayd
->refcount
) >= 0);
277 ret
= relayd_send_close_stream(&relayd
->control_sock
,
278 stream
->relayd_stream_id
,
279 stream
->next_net_seq_num
- 1);
281 ERR("Unable to close stream on the relayd. Continuing");
282 /* Continue here. There is nothing we can do for the relayd.*/
285 /* Both conditions are met, we destroy the relayd. */
286 if (uatomic_read(&relayd
->refcount
) == 0 &&
287 uatomic_read(&relayd
->destroy_flag
)) {
288 consumer_destroy_relayd(relayd
);
293 if (!--stream
->chan
->refcount
) {
294 free_chan
= stream
->chan
;
298 call_rcu(&stream
->node
.head
, consumer_free_stream
);
300 consumer_data
.need_update
= 1;
301 pthread_mutex_unlock(&consumer_data
.lock
);
304 consumer_del_channel(free_chan
);
307 struct lttng_consumer_stream
*consumer_allocate_stream(
308 int channel_key
, int stream_key
,
309 int shm_fd
, int wait_fd
,
310 enum lttng_consumer_stream_state state
,
312 enum lttng_event_output output
,
313 const char *path_name
,
319 struct lttng_consumer_stream
*stream
;
322 stream
= zmalloc(sizeof(*stream
));
323 if (stream
== NULL
) {
324 perror("malloc struct lttng_consumer_stream");
327 stream
->chan
= consumer_find_channel(channel_key
);
329 perror("Unable to find channel key");
332 stream
->chan
->refcount
++;
333 stream
->key
= stream_key
;
334 stream
->shm_fd
= shm_fd
;
335 stream
->wait_fd
= wait_fd
;
337 stream
->out_fd_offset
= 0;
338 stream
->state
= state
;
339 stream
->mmap_len
= mmap_len
;
340 stream
->mmap_base
= NULL
;
341 stream
->output
= output
;
344 stream
->net_seq_idx
= net_index
;
345 stream
->metadata_flag
= metadata_flag
;
346 strncpy(stream
->path_name
, path_name
, sizeof(stream
->path_name
));
347 stream
->path_name
[sizeof(stream
->path_name
) - 1] = '\0';
348 lttng_ht_node_init_ulong(&stream
->node
, stream
->key
);
349 lttng_ht_node_init_ulong(&stream
->waitfd_node
, stream
->wait_fd
);
351 switch (consumer_data
.type
) {
352 case LTTNG_CONSUMER_KERNEL
:
354 case LTTNG_CONSUMER32_UST
:
355 case LTTNG_CONSUMER64_UST
:
356 stream
->cpu
= stream
->chan
->cpucount
++;
357 ret
= lttng_ustconsumer_allocate_stream(stream
);
364 ERR("Unknown consumer_data type");
368 DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
369 stream
->path_name
, stream
->key
,
372 (unsigned long long) stream
->mmap_len
,
374 stream
->net_seq_idx
);
380 * Add a stream to the global list protected by a mutex.
382 int consumer_add_stream(struct lttng_consumer_stream
*stream
)
385 struct lttng_ht_node_ulong
*node
;
386 struct lttng_ht_iter iter
;
387 struct consumer_relayd_sock_pair
*relayd
;
389 pthread_mutex_lock(&consumer_data
.lock
);
390 /* Steal stream identifier, for UST */
391 consumer_steal_stream_key(stream
->key
);
394 lttng_ht_lookup(consumer_data
.stream_ht
,
395 (void *)((unsigned long) stream
->key
), &iter
);
396 node
= lttng_ht_iter_get_node_ulong(&iter
);
399 /* Stream already exist. Ignore the insertion */
403 lttng_ht_add_unique_ulong(consumer_data
.stream_ht
, &stream
->node
);
405 /* Check and cleanup relayd */
406 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
407 if (relayd
!= NULL
) {
408 uatomic_inc(&relayd
->refcount
);
412 /* Update consumer data */
413 consumer_data
.stream_count
++;
414 consumer_data
.need_update
= 1;
416 switch (consumer_data
.type
) {
417 case LTTNG_CONSUMER_KERNEL
:
419 case LTTNG_CONSUMER32_UST
:
420 case LTTNG_CONSUMER64_UST
:
421 /* Streams are in CPU number order (we rely on this) */
422 stream
->cpu
= stream
->chan
->nr_streams
++;
425 ERR("Unknown consumer_data type");
431 pthread_mutex_unlock(&consumer_data
.lock
);
437 * Add relayd socket to global consumer data hashtable.
439 int consumer_add_relayd(struct consumer_relayd_sock_pair
*relayd
)
442 struct lttng_ht_node_ulong
*node
;
443 struct lttng_ht_iter iter
;
445 if (relayd
== NULL
) {
452 lttng_ht_lookup(consumer_data
.relayd_ht
,
453 (void *)((unsigned long) relayd
->net_seq_idx
), &iter
);
454 node
= lttng_ht_iter_get_node_ulong(&iter
);
457 /* Relayd already exist. Ignore the insertion */
460 lttng_ht_add_unique_ulong(consumer_data
.relayd_ht
, &relayd
->node
);
469 * Allocate and return a consumer relayd socket.
471 struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
474 struct consumer_relayd_sock_pair
*obj
= NULL
;
476 /* Negative net sequence index is a failure */
477 if (net_seq_idx
< 0) {
481 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
483 PERROR("zmalloc relayd sock");
487 obj
->net_seq_idx
= net_seq_idx
;
489 obj
->destroy_flag
= 0;
490 lttng_ht_node_init_ulong(&obj
->node
, obj
->net_seq_idx
);
491 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
498 * Find a relayd socket pair in the global consumer data.
500 * Return the object if found else NULL.
501 * RCU read-side lock must be held across this call and while using the
504 struct consumer_relayd_sock_pair
*consumer_find_relayd(int key
)
506 struct lttng_ht_iter iter
;
507 struct lttng_ht_node_ulong
*node
;
508 struct consumer_relayd_sock_pair
*relayd
= NULL
;
510 /* Negative keys are lookup failures */
515 lttng_ht_lookup(consumer_data
.relayd_ht
, (void *)((unsigned long) key
),
517 node
= lttng_ht_iter_get_node_ulong(&iter
);
519 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
527 * Handle stream for relayd transmission if the stream applies for network
528 * streaming where the net sequence index is set.
530 * Return destination file descriptor or negative value on error.
532 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream
*stream
,
536 struct consumer_relayd_sock_pair
*relayd
;
537 struct lttcomm_relayd_data_hdr data_hdr
;
542 /* Reset data header */
543 memset(&data_hdr
, 0, sizeof(data_hdr
));
546 /* Get relayd reference of the stream. */
547 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
548 if (relayd
== NULL
) {
549 /* Stream is either local or corrupted */
553 DBG("Consumer found relayd socks with index %d", stream
->net_seq_idx
);
554 if (stream
->metadata_flag
) {
555 /* Caller MUST acquire the relayd control socket lock */
556 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
561 /* Metadata are always sent on the control socket. */
562 outfd
= relayd
->control_sock
.fd
;
564 /* Set header with stream information */
565 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
566 data_hdr
.data_size
= htobe32(data_size
);
567 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
++);
568 /* Other fields are zeroed previously */
570 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
576 /* Set to go on data socket */
577 outfd
= relayd
->data_sock
.fd
;
586 * Update a stream according to what we just received.
588 void consumer_change_stream_state(int stream_key
,
589 enum lttng_consumer_stream_state state
)
591 struct lttng_consumer_stream
*stream
;
593 pthread_mutex_lock(&consumer_data
.lock
);
594 stream
= consumer_find_stream(stream_key
);
596 stream
->state
= state
;
598 consumer_data
.need_update
= 1;
599 pthread_mutex_unlock(&consumer_data
.lock
);
603 void consumer_free_channel(struct rcu_head
*head
)
605 struct lttng_ht_node_ulong
*node
=
606 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
607 struct lttng_consumer_channel
*channel
=
608 caa_container_of(node
, struct lttng_consumer_channel
, node
);
614 * Remove a channel from the global list protected by a mutex. This
615 * function is also responsible for freeing its data structures.
617 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
620 struct lttng_ht_iter iter
;
622 pthread_mutex_lock(&consumer_data
.lock
);
624 switch (consumer_data
.type
) {
625 case LTTNG_CONSUMER_KERNEL
:
627 case LTTNG_CONSUMER32_UST
:
628 case LTTNG_CONSUMER64_UST
:
629 lttng_ustconsumer_del_channel(channel
);
632 ERR("Unknown consumer_data type");
638 iter
.iter
.node
= &channel
->node
.node
;
639 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
643 if (channel
->mmap_base
!= NULL
) {
644 ret
= munmap(channel
->mmap_base
, channel
->mmap_len
);
649 if (channel
->wait_fd
>= 0 && !channel
->wait_fd_is_copy
) {
650 ret
= close(channel
->wait_fd
);
655 if (channel
->shm_fd
>= 0 && channel
->wait_fd
!= channel
->shm_fd
) {
656 ret
= close(channel
->shm_fd
);
662 call_rcu(&channel
->node
.head
, consumer_free_channel
);
664 pthread_mutex_unlock(&consumer_data
.lock
);
667 struct lttng_consumer_channel
*consumer_allocate_channel(
669 int shm_fd
, int wait_fd
,
671 uint64_t max_sb_size
)
673 struct lttng_consumer_channel
*channel
;
676 channel
= zmalloc(sizeof(*channel
));
677 if (channel
== NULL
) {
678 perror("malloc struct lttng_consumer_channel");
681 channel
->key
= channel_key
;
682 channel
->shm_fd
= shm_fd
;
683 channel
->wait_fd
= wait_fd
;
684 channel
->mmap_len
= mmap_len
;
685 channel
->max_sb_size
= max_sb_size
;
686 channel
->refcount
= 0;
687 channel
->nr_streams
= 0;
688 lttng_ht_node_init_ulong(&channel
->node
, channel
->key
);
690 switch (consumer_data
.type
) {
691 case LTTNG_CONSUMER_KERNEL
:
692 channel
->mmap_base
= NULL
;
693 channel
->mmap_len
= 0;
695 case LTTNG_CONSUMER32_UST
:
696 case LTTNG_CONSUMER64_UST
:
697 ret
= lttng_ustconsumer_allocate_channel(channel
);
704 ERR("Unknown consumer_data type");
708 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
709 channel
->key
, channel
->shm_fd
, channel
->wait_fd
,
710 (unsigned long long) channel
->mmap_len
,
711 (unsigned long long) channel
->max_sb_size
);
717 * Add a channel to the global list protected by a mutex.
719 int consumer_add_channel(struct lttng_consumer_channel
*channel
)
721 struct lttng_ht_node_ulong
*node
;
722 struct lttng_ht_iter iter
;
724 pthread_mutex_lock(&consumer_data
.lock
);
725 /* Steal channel identifier, for UST */
726 consumer_steal_channel_key(channel
->key
);
729 lttng_ht_lookup(consumer_data
.channel_ht
,
730 (void *)((unsigned long) channel
->key
), &iter
);
731 node
= lttng_ht_iter_get_node_ulong(&iter
);
733 /* Channel already exist. Ignore the insertion */
737 lttng_ht_add_unique_ulong(consumer_data
.channel_ht
, &channel
->node
);
741 pthread_mutex_unlock(&consumer_data
.lock
);
747 * Allocate the pollfd structure and the local view of the out fds to avoid
748 * doing a lookup in the linked list and concurrency issues when writing is
749 * needed. Called with consumer_data.lock held.
751 * Returns the number of fds in the structures.
753 int consumer_update_poll_array(
754 struct lttng_consumer_local_data
*ctx
, struct pollfd
**pollfd
,
755 struct lttng_consumer_stream
**local_stream
,
756 struct lttng_ht
*metadata_ht
)
759 struct lttng_ht_iter iter
;
760 struct lttng_consumer_stream
*stream
;
762 DBG("Updating poll fd array");
764 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, stream
,
766 if (stream
->state
!= LTTNG_CONSUMER_ACTIVE_STREAM
) {
769 DBG("Active FD %d", stream
->wait_fd
);
770 (*pollfd
)[i
].fd
= stream
->wait_fd
;
771 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
772 if (stream
->metadata_flag
&& metadata_ht
) {
773 lttng_ht_add_unique_ulong(metadata_ht
, &stream
->waitfd_node
);
774 DBG("Active FD added to metadata hash table");
776 local_stream
[i
] = stream
;
782 * Insert the consumer_poll_pipe at the end of the array and don't
783 * increment i so nb_fd is the number of real FD.
785 (*pollfd
)[i
].fd
= ctx
->consumer_poll_pipe
[0];
786 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
791 * Poll on the should_quit pipe and the command socket return -1 on error and
792 * should exit, 0 if data is available on the command socket
794 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
799 num_rdy
= poll(consumer_sockpoll
, 2, -1);
802 * Restart interrupted system call.
804 if (errno
== EINTR
) {
807 perror("Poll error");
810 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
811 DBG("consumer_should_quit wake up");
821 * Set the error socket.
823 void lttng_consumer_set_error_sock(
824 struct lttng_consumer_local_data
*ctx
, int sock
)
826 ctx
->consumer_error_socket
= sock
;
830 * Set the command socket path.
832 void lttng_consumer_set_command_sock_path(
833 struct lttng_consumer_local_data
*ctx
, char *sock
)
835 ctx
->consumer_command_sock_path
= sock
;
839 * Send return code to the session daemon.
840 * If the socket is not defined, we return 0, it is not a fatal error
842 int lttng_consumer_send_error(
843 struct lttng_consumer_local_data
*ctx
, int cmd
)
845 if (ctx
->consumer_error_socket
> 0) {
846 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
847 sizeof(enum lttcomm_sessiond_command
));
854 * Close all the tracefiles and stream fds, should be called when all instances
857 void lttng_consumer_cleanup(void)
859 struct lttng_ht_iter iter
;
860 struct lttng_ht_node_ulong
*node
;
865 * close all outfd. Called when there are no more threads running (after
866 * joining on the threads), no need to protect list iteration with mutex.
868 cds_lfht_for_each_entry(consumer_data
.stream_ht
->ht
, &iter
.iter
, node
,
870 struct lttng_consumer_stream
*stream
=
871 caa_container_of(node
, struct lttng_consumer_stream
, node
);
872 consumer_del_stream(stream
);
875 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, node
,
877 struct lttng_consumer_channel
*channel
=
878 caa_container_of(node
, struct lttng_consumer_channel
, node
);
879 consumer_del_channel(channel
);
884 lttng_ht_destroy(consumer_data
.stream_ht
);
885 lttng_ht_destroy(consumer_data
.channel_ht
);
889 * Called from signal handler.
891 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
896 ret
= write(ctx
->consumer_should_quit
[1], "4", 1);
897 } while (ret
< 0 && errno
== EINTR
);
899 perror("write consumer quit");
903 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
906 int outfd
= stream
->out_fd
;
909 * This does a blocking write-and-wait on any page that belongs to the
910 * subbuffer prior to the one we just wrote.
911 * Don't care about error values, as these are just hints and ways to
912 * limit the amount of page cache used.
914 if (orig_offset
< stream
->chan
->max_sb_size
) {
917 lttng_sync_file_range(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
918 stream
->chan
->max_sb_size
,
919 SYNC_FILE_RANGE_WAIT_BEFORE
920 | SYNC_FILE_RANGE_WRITE
921 | SYNC_FILE_RANGE_WAIT_AFTER
);
923 * Give hints to the kernel about how we access the file:
924 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
927 * We need to call fadvise again after the file grows because the
928 * kernel does not seem to apply fadvise to non-existing parts of the
931 * Call fadvise _after_ having waited for the page writeback to
932 * complete because the dirty page writeback semantic is not well
933 * defined. So it can be expected to lead to lower throughput in
936 posix_fadvise(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
937 stream
->chan
->max_sb_size
, POSIX_FADV_DONTNEED
);
941 * Initialise the necessary environnement :
942 * - create a new context
943 * - create the poll_pipe
944 * - create the should_quit pipe (for signal handler)
945 * - create the thread pipe (for splice)
947 * Takes a function pointer as argument, this function is called when data is
948 * available on a buffer. This function is responsible to do the
949 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
950 * buffer configuration and then kernctl_put_next_subbuf at the end.
952 * Returns a pointer to the new context or NULL on error.
954 struct lttng_consumer_local_data
*lttng_consumer_create(
955 enum lttng_consumer_type type
,
956 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
957 struct lttng_consumer_local_data
*ctx
),
958 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
959 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
960 int (*update_stream
)(int stream_key
, uint32_t state
))
963 struct lttng_consumer_local_data
*ctx
;
965 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
966 consumer_data
.type
== type
);
967 consumer_data
.type
= type
;
969 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
971 perror("allocating context");
975 ctx
->consumer_error_socket
= -1;
976 /* assign the callbacks */
977 ctx
->on_buffer_ready
= buffer_ready
;
978 ctx
->on_recv_channel
= recv_channel
;
979 ctx
->on_recv_stream
= recv_stream
;
980 ctx
->on_update_stream
= update_stream
;
982 ret
= pipe(ctx
->consumer_poll_pipe
);
984 perror("Error creating poll pipe");
985 goto error_poll_pipe
;
988 /* set read end of the pipe to non-blocking */
989 ret
= fcntl(ctx
->consumer_poll_pipe
[0], F_SETFL
, O_NONBLOCK
);
991 perror("fcntl O_NONBLOCK");
992 goto error_poll_fcntl
;
995 /* set write end of the pipe to non-blocking */
996 ret
= fcntl(ctx
->consumer_poll_pipe
[1], F_SETFL
, O_NONBLOCK
);
998 perror("fcntl O_NONBLOCK");
999 goto error_poll_fcntl
;
1002 ret
= pipe(ctx
->consumer_should_quit
);
1004 perror("Error creating recv pipe");
1005 goto error_quit_pipe
;
1008 ret
= pipe(ctx
->consumer_thread_pipe
);
1010 perror("Error creating thread pipe");
1011 goto error_thread_pipe
;
1018 for (i
= 0; i
< 2; i
++) {
1021 err
= close(ctx
->consumer_should_quit
[i
]);
1028 for (i
= 0; i
< 2; i
++) {
1031 err
= close(ctx
->consumer_poll_pipe
[i
]);
1043 * Close all fds associated with the instance and free the context.
1045 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1049 ret
= close(ctx
->consumer_error_socket
);
1053 ret
= close(ctx
->consumer_thread_pipe
[0]);
1057 ret
= close(ctx
->consumer_thread_pipe
[1]);
1061 ret
= close(ctx
->consumer_poll_pipe
[0]);
1065 ret
= close(ctx
->consumer_poll_pipe
[1]);
1069 ret
= close(ctx
->consumer_should_quit
[0]);
1073 ret
= close(ctx
->consumer_should_quit
[1]);
1077 unlink(ctx
->consumer_command_sock_path
);
1082 * Mmap the ring buffer, read it and write the data to the tracefile.
1084 * Returns the number of bytes written
1086 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1087 struct lttng_consumer_local_data
*ctx
,
1088 struct lttng_consumer_stream
*stream
, unsigned long len
)
1090 unsigned long mmap_offset
;
1091 ssize_t ret
= 0, written
= 0;
1092 off_t orig_offset
= stream
->out_fd_offset
;
1093 /* Default is on the disk */
1094 int outfd
= stream
->out_fd
;
1095 uint64_t metadata_id
;
1096 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1098 /* RCU lock for the relayd pointer */
1101 /* Flag that the current stream if set for network streaming. */
1102 if (stream
->net_seq_idx
!= -1) {
1103 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1104 if (relayd
== NULL
) {
1109 /* get the offset inside the fd to mmap */
1110 switch (consumer_data
.type
) {
1111 case LTTNG_CONSUMER_KERNEL
:
1112 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
1114 case LTTNG_CONSUMER32_UST
:
1115 case LTTNG_CONSUMER64_UST
:
1116 ret
= lttng_ustctl_get_mmap_read_offset(stream
->chan
->handle
,
1117 stream
->buf
, &mmap_offset
);
1120 ERR("Unknown consumer_data type");
1125 PERROR("tracer ctl get_mmap_read_offset");
1130 /* Handle stream on the relayd if the output is on the network */
1132 unsigned long netlen
= len
;
1135 * Lock the control socket for the complete duration of the function
1136 * since from this point on we will use the socket.
1138 if (stream
->metadata_flag
) {
1139 /* Metadata requires the control socket. */
1140 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1141 netlen
+= sizeof(stream
->relayd_stream_id
);
1144 ret
= consumer_handle_stream_before_relayd(stream
, netlen
);
1146 /* Use the returned socket. */
1149 /* Write metadata stream id before payload */
1150 if (stream
->metadata_flag
) {
1151 metadata_id
= htobe64(stream
->relayd_stream_id
);
1153 ret
= write(outfd
, (void *) &metadata_id
,
1154 sizeof(stream
->relayd_stream_id
));
1155 } while (ret
< 0 && errno
== EINTR
);
1157 PERROR("write metadata stream id");
1161 DBG("Metadata stream id %zu written before data",
1162 stream
->relayd_stream_id
);
1164 * We do this so the return value can match the len passed as
1165 * argument to this function.
1167 written
-= sizeof(stream
->relayd_stream_id
);
1170 /* Else, use the default set before which is the filesystem. */
1175 ret
= write(outfd
, stream
->mmap_base
+ mmap_offset
, len
);
1176 } while (ret
< 0 && errno
== EINTR
);
1178 PERROR("Error in file write");
1183 } else if (ret
> len
) {
1184 PERROR("Error in file write (ret %ld > len %lu)", ret
, len
);
1191 DBG("Consumer mmap write() ret %ld (len %lu)", ret
, len
);
1193 /* This call is useless on a socket so better save a syscall. */
1195 /* This won't block, but will start writeout asynchronously */
1196 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret
,
1197 SYNC_FILE_RANGE_WRITE
);
1198 stream
->out_fd_offset
+= ret
;
1202 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1205 /* Unlock only if ctrl socket used */
1206 if (relayd
&& stream
->metadata_flag
) {
1207 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1215 * Splice the data from the ring buffer to the tracefile.
1217 * Returns the number of bytes spliced.
1219 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1220 struct lttng_consumer_local_data
*ctx
,
1221 struct lttng_consumer_stream
*stream
, unsigned long len
)
1223 ssize_t ret
= 0, written
= 0, ret_splice
= 0;
1225 off_t orig_offset
= stream
->out_fd_offset
;
1226 int fd
= stream
->wait_fd
;
1227 /* Default is on the disk */
1228 int outfd
= stream
->out_fd
;
1229 uint64_t metadata_id
;
1230 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1232 switch (consumer_data
.type
) {
1233 case LTTNG_CONSUMER_KERNEL
:
1235 case LTTNG_CONSUMER32_UST
:
1236 case LTTNG_CONSUMER64_UST
:
1237 /* Not supported for user space tracing */
1240 ERR("Unknown consumer_data type");
1244 /* RCU lock for the relayd pointer */
1247 /* Flag that the current stream if set for network streaming. */
1248 if (stream
->net_seq_idx
!= -1) {
1249 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1250 if (relayd
== NULL
) {
1255 /* Write metadata stream id before payload */
1256 if (stream
->metadata_flag
&& relayd
) {
1258 * Lock the control socket for the complete duration of the function
1259 * since from this point on we will use the socket.
1261 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1263 metadata_id
= htobe64(stream
->relayd_stream_id
);
1265 ret
= write(ctx
->consumer_thread_pipe
[1], (void *) &metadata_id
,
1266 sizeof(stream
->relayd_stream_id
));
1267 } while (ret
< 0 && errno
== EINTR
);
1269 PERROR("write metadata stream id");
1273 DBG("Metadata stream id %zu written before data",
1274 stream
->relayd_stream_id
);
1278 DBG("splice chan to pipe offset %lu of len %lu (fd : %d)",
1279 (unsigned long)offset
, len
, fd
);
1280 ret_splice
= splice(fd
, &offset
, ctx
->consumer_thread_pipe
[1], NULL
, len
,
1281 SPLICE_F_MOVE
| SPLICE_F_MORE
);
1282 DBG("splice chan to pipe, ret %zd", ret_splice
);
1283 if (ret_splice
< 0) {
1284 PERROR("Error in relay splice");
1286 written
= ret_splice
;
1292 /* Handle stream on the relayd if the output is on the network */
1294 if (stream
->metadata_flag
) {
1295 /* Update counter to fit the spliced data */
1296 ret_splice
+= sizeof(stream
->relayd_stream_id
);
1297 len
+= sizeof(stream
->relayd_stream_id
);
1299 * We do this so the return value can match the len passed as
1300 * argument to this function.
1302 written
-= sizeof(stream
->relayd_stream_id
);
1305 ret
= consumer_handle_stream_before_relayd(stream
, ret_splice
);
1307 /* Use the returned socket. */
1311 ERR("Remote relayd disconnected. Stopping");
1317 /* Splice data out */
1318 ret_splice
= splice(ctx
->consumer_thread_pipe
[0], NULL
, outfd
, NULL
,
1319 ret_splice
, SPLICE_F_MOVE
| SPLICE_F_MORE
);
1320 DBG("Kernel consumer splice pipe to file, ret %zd", ret_splice
);
1321 if (ret_splice
< 0) {
1322 PERROR("Error in file splice");
1324 written
= ret_splice
;
1328 } else if (ret_splice
> len
) {
1330 PERROR("Wrote more data than requested %zd (len: %lu)",
1332 written
+= ret_splice
;
1338 /* This call is useless on a socket so better save a syscall. */
1340 /* This won't block, but will start writeout asynchronously */
1341 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret_splice
,
1342 SYNC_FILE_RANGE_WRITE
);
1343 stream
->out_fd_offset
+= ret_splice
;
1345 written
+= ret_splice
;
1347 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1354 /* send the appropriate error description to sessiond */
1357 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_EBADF
);
1360 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_EINVAL
);
1363 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_ENOMEM
);
1366 lttng_consumer_send_error(ctx
, CONSUMERD_SPLICE_ESPIPE
);
1371 if (relayd
&& stream
->metadata_flag
) {
1372 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1380 * Take a snapshot for a specific fd
1382 * Returns 0 on success, < 0 on error
1384 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data
*ctx
,
1385 struct lttng_consumer_stream
*stream
)
1387 switch (consumer_data
.type
) {
1388 case LTTNG_CONSUMER_KERNEL
:
1389 return lttng_kconsumer_take_snapshot(ctx
, stream
);
1390 case LTTNG_CONSUMER32_UST
:
1391 case LTTNG_CONSUMER64_UST
:
1392 return lttng_ustconsumer_take_snapshot(ctx
, stream
);
1394 ERR("Unknown consumer_data type");
1402 * Get the produced position
1404 * Returns 0 on success, < 0 on error
1406 int lttng_consumer_get_produced_snapshot(
1407 struct lttng_consumer_local_data
*ctx
,
1408 struct lttng_consumer_stream
*stream
,
1411 switch (consumer_data
.type
) {
1412 case LTTNG_CONSUMER_KERNEL
:
1413 return lttng_kconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1414 case LTTNG_CONSUMER32_UST
:
1415 case LTTNG_CONSUMER64_UST
:
1416 return lttng_ustconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1418 ERR("Unknown consumer_data type");
1424 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
1425 int sock
, struct pollfd
*consumer_sockpoll
)
1427 switch (consumer_data
.type
) {
1428 case LTTNG_CONSUMER_KERNEL
:
1429 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1430 case LTTNG_CONSUMER32_UST
:
1431 case LTTNG_CONSUMER64_UST
:
1432 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1434 ERR("Unknown consumer_data type");
1441 * This thread polls the fds in the set to consume the data and write
1442 * it to tracefile if necessary.
1444 void *lttng_consumer_thread_poll_fds(void *data
)
1446 int num_rdy
, num_hup
, high_prio
, ret
, i
;
1447 struct pollfd
*pollfd
= NULL
;
1448 /* local view of the streams */
1449 struct lttng_consumer_stream
**local_stream
= NULL
;
1450 /* local view of consumer_data.fds_count */
1452 struct lttng_consumer_local_data
*ctx
= data
;
1453 struct lttng_ht
*metadata_ht
;
1454 struct lttng_ht_iter iter
;
1455 struct lttng_ht_node_ulong
*node
;
1456 struct lttng_consumer_stream
*metadata_stream
;
1459 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1461 rcu_register_thread();
1463 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
1470 * the fds set has been updated, we need to update our
1471 * local array as well
1473 pthread_mutex_lock(&consumer_data
.lock
);
1474 if (consumer_data
.need_update
) {
1475 if (pollfd
!= NULL
) {
1479 if (local_stream
!= NULL
) {
1481 local_stream
= NULL
;
1484 /* allocate for all fds + 1 for the consumer_poll_pipe */
1485 pollfd
= zmalloc((consumer_data
.stream_count
+ 1) * sizeof(struct pollfd
));
1486 if (pollfd
== NULL
) {
1487 perror("pollfd malloc");
1488 pthread_mutex_unlock(&consumer_data
.lock
);
1492 /* allocate for all fds + 1 for the consumer_poll_pipe */
1493 local_stream
= zmalloc((consumer_data
.stream_count
+ 1) *
1494 sizeof(struct lttng_consumer_stream
));
1495 if (local_stream
== NULL
) {
1496 perror("local_stream malloc");
1497 pthread_mutex_unlock(&consumer_data
.lock
);
1500 ret
= consumer_update_poll_array(ctx
, &pollfd
, local_stream
,
1503 ERR("Error in allocating pollfd or local_outfds");
1504 lttng_consumer_send_error(ctx
, CONSUMERD_POLL_ERROR
);
1505 pthread_mutex_unlock(&consumer_data
.lock
);
1509 consumer_data
.need_update
= 0;
1511 pthread_mutex_unlock(&consumer_data
.lock
);
1513 /* No FDs and consumer_quit, consumer_cleanup the thread */
1514 if (nb_fd
== 0 && consumer_quit
== 1) {
1517 /* poll on the array of fds */
1519 DBG("polling on %d fd", nb_fd
+ 1);
1520 num_rdy
= poll(pollfd
, nb_fd
+ 1, consumer_poll_timeout
);
1521 DBG("poll num_rdy : %d", num_rdy
);
1522 if (num_rdy
== -1) {
1524 * Restart interrupted system call.
1526 if (errno
== EINTR
) {
1529 perror("Poll error");
1530 lttng_consumer_send_error(ctx
, CONSUMERD_POLL_ERROR
);
1532 } else if (num_rdy
== 0) {
1533 DBG("Polling thread timed out");
1538 * If the consumer_poll_pipe triggered poll go directly to the
1539 * beginning of the loop to update the array. We want to prioritize
1540 * array update over low-priority reads.
1542 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
1543 size_t pipe_readlen
;
1546 DBG("consumer_poll_pipe wake up");
1547 /* Consume 1 byte of pipe data */
1549 pipe_readlen
= read(ctx
->consumer_poll_pipe
[0], &tmp
, 1);
1550 } while (pipe_readlen
== -1 && errno
== EINTR
);
1554 /* Take care of high priority channels first. */
1555 for (i
= 0; i
< nb_fd
; i
++) {
1556 /* Lookup for metadata which is the highest priority */
1557 lttng_ht_lookup(metadata_ht
,
1558 (void *)((unsigned long) pollfd
[i
].fd
), &iter
);
1559 node
= lttng_ht_iter_get_node_ulong(&iter
);
1561 (pollfd
[i
].revents
& (POLLIN
| POLLPRI
))) {
1562 DBG("Urgent metadata read on fd %d", pollfd
[i
].fd
);
1563 metadata_stream
= caa_container_of(node
,
1564 struct lttng_consumer_stream
, waitfd_node
);
1566 len
= ctx
->on_buffer_ready(metadata_stream
, ctx
);
1567 /* it's ok to have an unavailable sub-buffer */
1568 if (len
< 0 && len
!= -EAGAIN
) {
1570 } else if (len
> 0) {
1571 metadata_stream
->data_read
= 1;
1573 } else if (pollfd
[i
].revents
& POLLPRI
) {
1574 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
1576 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1577 /* it's ok to have an unavailable sub-buffer */
1578 if (len
< 0 && len
!= -EAGAIN
) {
1580 } else if (len
> 0) {
1581 local_stream
[i
]->data_read
= 1;
1587 * If we read high prio channel in this loop, try again
1588 * for more high prio data.
1594 /* Take care of low priority channels. */
1595 for (i
= 0; i
< nb_fd
; i
++) {
1596 if ((pollfd
[i
].revents
& POLLIN
) ||
1597 local_stream
[i
]->hangup_flush_done
) {
1598 DBG("Normal read on fd %d", pollfd
[i
].fd
);
1599 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
1600 /* it's ok to have an unavailable sub-buffer */
1601 if (len
< 0 && len
!= -EAGAIN
) {
1603 } else if (len
> 0) {
1604 local_stream
[i
]->data_read
= 1;
1609 /* Handle hangup and errors */
1610 for (i
= 0; i
< nb_fd
; i
++) {
1611 if (!local_stream
[i
]->hangup_flush_done
1612 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
1613 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
1614 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
1615 DBG("fd %d is hup|err|nval. Attempting flush and read.",
1617 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
1618 /* Attempt read again, for the data we just flushed. */
1619 local_stream
[i
]->data_read
= 1;
1622 * If the poll flag is HUP/ERR/NVAL and we have
1623 * read no data in this pass, we can remove the
1624 * stream from its hash table.
1626 if ((pollfd
[i
].revents
& POLLHUP
)) {
1627 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
1628 if (!local_stream
[i
]->data_read
) {
1629 if (local_stream
[i
]->metadata_flag
) {
1630 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1631 ret
= lttng_ht_del(metadata_ht
, &iter
);
1634 consumer_del_stream(local_stream
[i
]);
1637 } else if (pollfd
[i
].revents
& POLLERR
) {
1638 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
1639 if (!local_stream
[i
]->data_read
) {
1640 if (local_stream
[i
]->metadata_flag
) {
1641 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1642 ret
= lttng_ht_del(metadata_ht
, &iter
);
1645 consumer_del_stream(local_stream
[i
]);
1648 } else if (pollfd
[i
].revents
& POLLNVAL
) {
1649 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
1650 if (!local_stream
[i
]->data_read
) {
1651 if (local_stream
[i
]->metadata_flag
) {
1652 iter
.iter
.node
= &local_stream
[i
]->waitfd_node
.node
;
1653 ret
= lttng_ht_del(metadata_ht
, &iter
);
1656 consumer_del_stream(local_stream
[i
]);
1660 local_stream
[i
]->data_read
= 0;
1664 DBG("polling thread exiting");
1665 if (pollfd
!= NULL
) {
1669 if (local_stream
!= NULL
) {
1671 local_stream
= NULL
;
1673 rcu_unregister_thread();
1678 * This thread listens on the consumerd socket and receives the file
1679 * descriptors from the session daemon.
1681 void *lttng_consumer_thread_receive_fds(void *data
)
1683 int sock
, client_socket
, ret
;
1685 * structure to poll for incoming data on communication socket avoids
1686 * making blocking sockets.
1688 struct pollfd consumer_sockpoll
[2];
1689 struct lttng_consumer_local_data
*ctx
= data
;
1691 rcu_register_thread();
1693 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
1694 unlink(ctx
->consumer_command_sock_path
);
1695 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
1696 if (client_socket
< 0) {
1697 ERR("Cannot create command socket");
1701 ret
= lttcomm_listen_unix_sock(client_socket
);
1706 DBG("Sending ready command to lttng-sessiond");
1707 ret
= lttng_consumer_send_error(ctx
, CONSUMERD_COMMAND_SOCK_READY
);
1708 /* return < 0 on error, but == 0 is not fatal */
1710 ERR("Error sending ready command to lttng-sessiond");
1714 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
1716 perror("fcntl O_NONBLOCK");
1720 /* prepare the FDs to poll : to client socket and the should_quit pipe */
1721 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
1722 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
1723 consumer_sockpoll
[1].fd
= client_socket
;
1724 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
1726 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
1729 DBG("Connection on client_socket");
1731 /* Blocking call, waiting for transmission */
1732 sock
= lttcomm_accept_unix_sock(client_socket
);
1737 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
1739 perror("fcntl O_NONBLOCK");
1743 /* update the polling structure to poll on the established socket */
1744 consumer_sockpoll
[1].fd
= sock
;
1745 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
1748 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
1751 DBG("Incoming command on sock");
1752 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1753 if (ret
== -ENOENT
) {
1754 DBG("Received STOP command");
1758 ERR("Communication interrupted on command socket");
1761 if (consumer_quit
) {
1762 DBG("consumer_thread_receive_fds received quit from signal");
1765 DBG("received fds on sock");
1768 DBG("consumer_thread_receive_fds exiting");
1771 * when all fds have hung up, the polling thread
1777 * 2s of grace period, if no polling events occur during
1778 * this period, the polling thread will exit even if there
1779 * are still open FDs (should not happen, but safety mechanism).
1781 consumer_poll_timeout
= LTTNG_CONSUMER_POLL_TIMEOUT
;
1784 * Wake-up the other end by writing a null byte in the pipe
1785 * (non-blocking). Important note: Because writing into the
1786 * pipe is non-blocking (and therefore we allow dropping wakeup
1787 * data, as long as there is wakeup data present in the pipe
1788 * buffer to wake up the other end), the other end should
1789 * perform the following sequence for waiting:
1790 * 1) empty the pipe (reads).
1791 * 2) perform update operation.
1792 * 3) wait on the pipe (poll).
1795 ret
= write(ctx
->consumer_poll_pipe
[1], "", 1);
1796 } while (ret
< 0 && errno
== EINTR
);
1797 rcu_unregister_thread();
1801 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
1802 struct lttng_consumer_local_data
*ctx
)
1804 switch (consumer_data
.type
) {
1805 case LTTNG_CONSUMER_KERNEL
:
1806 return lttng_kconsumer_read_subbuffer(stream
, ctx
);
1807 case LTTNG_CONSUMER32_UST
:
1808 case LTTNG_CONSUMER64_UST
:
1809 return lttng_ustconsumer_read_subbuffer(stream
, ctx
);
1811 ERR("Unknown consumer_data type");
1817 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
1819 switch (consumer_data
.type
) {
1820 case LTTNG_CONSUMER_KERNEL
:
1821 return lttng_kconsumer_on_recv_stream(stream
);
1822 case LTTNG_CONSUMER32_UST
:
1823 case LTTNG_CONSUMER64_UST
:
1824 return lttng_ustconsumer_on_recv_stream(stream
);
1826 ERR("Unknown consumer_data type");
1833 * Allocate and set consumer data hash tables.
1835 void lttng_consumer_init(void)
1837 consumer_data
.stream_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1838 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
1839 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);