2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
32 #include <common/common.h>
33 #include <common/utils.h>
34 #include <common/compat/poll.h>
35 #include <common/kernel-ctl/kernel-ctl.h>
36 #include <common/sessiond-comm/relayd.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/kernel-consumer/kernel-consumer.h>
39 #include <common/relayd/relayd.h>
40 #include <common/ust-consumer/ust-consumer.h>
44 struct lttng_consumer_global_data consumer_data
= {
47 .type
= LTTNG_CONSUMER_UNKNOWN
,
50 /* timeout parameter, to control the polling thread grace period. */
51 int consumer_poll_timeout
= -1;
54 * Flag to inform the polling thread to quit when all fd hung up. Updated by
55 * the consumer_thread_receive_fds when it notices that all fds has hung up.
56 * Also updated by the signal handler (consumer_should_exit()). Read by the
59 volatile int consumer_quit
;
62 * The following two hash tables are visible by all threads which are separated
63 * in different source files.
65 * Global hash table containing respectively metadata and data streams. The
66 * stream element in this ht should only be updated by the metadata poll thread
67 * for the metadata and the data poll thread for the data.
69 struct lttng_ht
*metadata_ht
;
70 struct lttng_ht
*data_ht
;
73 * Notify a thread pipe to poll back again. This usually means that some global
74 * state has changed so we just send back the thread in a poll wait call.
76 static void notify_thread_pipe(int wpipe
)
81 struct lttng_consumer_stream
*null_stream
= NULL
;
83 ret
= write(wpipe
, &null_stream
, sizeof(null_stream
));
84 } while (ret
< 0 && errno
== EINTR
);
88 * Find a stream. The consumer_data.lock must be locked during this
91 static struct lttng_consumer_stream
*consumer_find_stream(int key
,
94 struct lttng_ht_iter iter
;
95 struct lttng_ht_node_ulong
*node
;
96 struct lttng_consumer_stream
*stream
= NULL
;
100 /* Negative keys are lookup failures */
107 lttng_ht_lookup(ht
, (void *)((unsigned long) key
), &iter
);
108 node
= lttng_ht_iter_get_node_ulong(&iter
);
110 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
118 void consumer_steal_stream_key(int key
, struct lttng_ht
*ht
)
120 struct lttng_consumer_stream
*stream
;
123 stream
= consumer_find_stream(key
, ht
);
127 * We don't want the lookup to match, but we still need
128 * to iterate on this stream when iterating over the hash table. Just
129 * change the node key.
131 stream
->node
.key
= -1;
136 static struct lttng_consumer_channel
*consumer_find_channel(int key
)
138 struct lttng_ht_iter iter
;
139 struct lttng_ht_node_ulong
*node
;
140 struct lttng_consumer_channel
*channel
= NULL
;
142 /* Negative keys are lookup failures */
149 lttng_ht_lookup(consumer_data
.channel_ht
, (void *)((unsigned long) key
),
151 node
= lttng_ht_iter_get_node_ulong(&iter
);
153 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
161 static void consumer_steal_channel_key(int key
)
163 struct lttng_consumer_channel
*channel
;
166 channel
= consumer_find_channel(key
);
170 * We don't want the lookup to match, but we still need
171 * to iterate on this channel when iterating over the hash table. Just
172 * change the node key.
174 channel
->node
.key
= -1;
180 void consumer_free_stream(struct rcu_head
*head
)
182 struct lttng_ht_node_ulong
*node
=
183 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
184 struct lttng_consumer_stream
*stream
=
185 caa_container_of(node
, struct lttng_consumer_stream
, node
);
191 * RCU protected relayd socket pair free.
193 static void consumer_rcu_free_relayd(struct rcu_head
*head
)
195 struct lttng_ht_node_ulong
*node
=
196 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
197 struct consumer_relayd_sock_pair
*relayd
=
198 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
201 * Close all sockets. This is done in the call RCU since we don't want the
202 * socket fds to be reassigned thus potentially creating bad state of the
205 * We do not have to lock the control socket mutex here since at this stage
206 * there is no one referencing to this relayd object.
208 (void) relayd_close(&relayd
->control_sock
);
209 (void) relayd_close(&relayd
->data_sock
);
215 * Destroy and free relayd socket pair object.
217 * This function MUST be called with the consumer_data lock acquired.
219 static void destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
222 struct lttng_ht_iter iter
;
224 if (relayd
== NULL
) {
228 DBG("Consumer destroy and close relayd socket pair");
230 iter
.iter
.node
= &relayd
->node
.node
;
231 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
233 /* We assume the relayd is being or is destroyed */
237 /* RCU free() call */
238 call_rcu(&relayd
->node
.head
, consumer_rcu_free_relayd
);
242 * Update the end point status of all streams having the given network sequence
243 * index (relayd index).
245 * It's atomically set without having the stream mutex locked which is fine
246 * because we handle the write/read race with a pipe wakeup for each thread.
248 static void update_endpoint_status_by_netidx(int net_seq_idx
,
249 enum consumer_endpoint_status status
)
251 struct lttng_ht_iter iter
;
252 struct lttng_consumer_stream
*stream
;
254 DBG("Consumer set delete flag on stream by idx %d", net_seq_idx
);
258 /* Let's begin with metadata */
259 cds_lfht_for_each_entry(metadata_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
260 if (stream
->net_seq_idx
== net_seq_idx
) {
261 uatomic_set(&stream
->endpoint_status
, status
);
262 DBG("Delete flag set to metadata stream %d", stream
->wait_fd
);
266 /* Follow up by the data streams */
267 cds_lfht_for_each_entry(data_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
268 if (stream
->net_seq_idx
== net_seq_idx
) {
269 uatomic_set(&stream
->endpoint_status
, status
);
270 DBG("Delete flag set to data stream %d", stream
->wait_fd
);
277 * Cleanup a relayd object by flagging every associated streams for deletion,
278 * destroying the object meaning removing it from the relayd hash table,
279 * closing the sockets and freeing the memory in a RCU call.
281 * If a local data context is available, notify the threads that the streams'
282 * state have changed.
284 static void cleanup_relayd(struct consumer_relayd_sock_pair
*relayd
,
285 struct lttng_consumer_local_data
*ctx
)
291 /* Save the net sequence index before destroying the object */
292 netidx
= relayd
->net_seq_idx
;
295 * Delete the relayd from the relayd hash table, close the sockets and free
296 * the object in a RCU call.
298 destroy_relayd(relayd
);
300 /* Set inactive endpoint to all streams */
301 update_endpoint_status_by_netidx(netidx
, CONSUMER_ENDPOINT_INACTIVE
);
304 * With a local data context, notify the threads that the streams' state
305 * have changed. The write() action on the pipe acts as an "implicit"
306 * memory barrier ordering the updates of the end point status from the
307 * read of this status which happens AFTER receiving this notify.
310 notify_thread_pipe(ctx
->consumer_data_pipe
[1]);
311 notify_thread_pipe(ctx
->consumer_metadata_pipe
[1]);
316 * Flag a relayd socket pair for destruction. Destroy it if the refcount
319 * RCU read side lock MUST be aquired before calling this function.
321 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair
*relayd
)
325 /* Set destroy flag for this object */
326 uatomic_set(&relayd
->destroy_flag
, 1);
328 /* Destroy the relayd if refcount is 0 */
329 if (uatomic_read(&relayd
->refcount
) == 0) {
330 destroy_relayd(relayd
);
335 * Remove a stream from the global list protected by a mutex. This
336 * function is also responsible for freeing its data structures.
338 void consumer_del_stream(struct lttng_consumer_stream
*stream
,
342 struct lttng_ht_iter iter
;
343 struct lttng_consumer_channel
*free_chan
= NULL
;
344 struct consumer_relayd_sock_pair
*relayd
;
348 DBG("Consumer del stream %d", stream
->wait_fd
);
351 /* Means the stream was allocated but not successfully added */
355 pthread_mutex_lock(&stream
->lock
);
356 pthread_mutex_lock(&consumer_data
.lock
);
358 switch (consumer_data
.type
) {
359 case LTTNG_CONSUMER_KERNEL
:
360 if (stream
->mmap_base
!= NULL
) {
361 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
367 case LTTNG_CONSUMER32_UST
:
368 case LTTNG_CONSUMER64_UST
:
369 lttng_ustconsumer_del_stream(stream
);
372 ERR("Unknown consumer_data type");
378 iter
.iter
.node
= &stream
->node
.node
;
379 ret
= lttng_ht_del(ht
, &iter
);
382 /* Remove node session id from the consumer_data stream ht */
383 iter
.iter
.node
= &stream
->node_session_id
.node
;
384 ret
= lttng_ht_del(consumer_data
.stream_list_ht
, &iter
);
388 assert(consumer_data
.stream_count
> 0);
389 consumer_data
.stream_count
--;
391 if (stream
->out_fd
>= 0) {
392 ret
= close(stream
->out_fd
);
397 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
398 ret
= close(stream
->wait_fd
);
403 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
404 ret
= close(stream
->shm_fd
);
410 /* Check and cleanup relayd */
412 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
413 if (relayd
!= NULL
) {
414 uatomic_dec(&relayd
->refcount
);
415 assert(uatomic_read(&relayd
->refcount
) >= 0);
417 /* Closing streams requires to lock the control socket. */
418 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
419 ret
= relayd_send_close_stream(&relayd
->control_sock
,
420 stream
->relayd_stream_id
,
421 stream
->next_net_seq_num
- 1);
422 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
424 DBG("Unable to close stream on the relayd. Continuing");
426 * Continue here. There is nothing we can do for the relayd.
427 * Chances are that the relayd has closed the socket so we just
428 * continue cleaning up.
432 /* Both conditions are met, we destroy the relayd. */
433 if (uatomic_read(&relayd
->refcount
) == 0 &&
434 uatomic_read(&relayd
->destroy_flag
)) {
435 destroy_relayd(relayd
);
440 uatomic_dec(&stream
->chan
->refcount
);
441 if (!uatomic_read(&stream
->chan
->refcount
)
442 && !uatomic_read(&stream
->chan
->nb_init_streams
)) {
443 free_chan
= stream
->chan
;
447 consumer_data
.need_update
= 1;
448 pthread_mutex_unlock(&consumer_data
.lock
);
449 pthread_mutex_unlock(&stream
->lock
);
452 consumer_del_channel(free_chan
);
456 call_rcu(&stream
->node
.head
, consumer_free_stream
);
459 struct lttng_consumer_stream
*consumer_allocate_stream(
460 int channel_key
, int stream_key
,
461 int shm_fd
, int wait_fd
,
462 enum lttng_consumer_stream_state state
,
464 enum lttng_event_output output
,
465 const char *path_name
,
473 struct lttng_consumer_stream
*stream
;
475 stream
= zmalloc(sizeof(*stream
));
476 if (stream
== NULL
) {
477 PERROR("malloc struct lttng_consumer_stream");
478 *alloc_ret
= -ENOMEM
;
483 * Get stream's channel reference. Needed when adding the stream to the
486 stream
->chan
= consumer_find_channel(channel_key
);
488 *alloc_ret
= -ENOENT
;
489 ERR("Unable to find channel for stream %d", stream_key
);
493 stream
->key
= stream_key
;
494 stream
->shm_fd
= shm_fd
;
495 stream
->wait_fd
= wait_fd
;
497 stream
->out_fd_offset
= 0;
498 stream
->state
= state
;
499 stream
->mmap_len
= mmap_len
;
500 stream
->mmap_base
= NULL
;
501 stream
->output
= output
;
504 stream
->net_seq_idx
= net_index
;
505 stream
->metadata_flag
= metadata_flag
;
506 stream
->session_id
= session_id
;
507 strncpy(stream
->path_name
, path_name
, sizeof(stream
->path_name
));
508 stream
->path_name
[sizeof(stream
->path_name
) - 1] = '\0';
509 pthread_mutex_init(&stream
->lock
, NULL
);
512 * Index differently the metadata node because the thread is using an
513 * internal hash table to match streams in the metadata_ht to the epoll set
517 lttng_ht_node_init_ulong(&stream
->node
, stream
->wait_fd
);
519 lttng_ht_node_init_ulong(&stream
->node
, stream
->key
);
522 /* Init session id node with the stream session id */
523 lttng_ht_node_init_ulong(&stream
->node_session_id
, stream
->session_id
);
526 * The cpu number is needed before using any ustctl_* actions. Ignored for
527 * the kernel so the value does not matter.
529 pthread_mutex_lock(&consumer_data
.lock
);
530 stream
->cpu
= stream
->chan
->cpucount
++;
531 pthread_mutex_unlock(&consumer_data
.lock
);
533 DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
534 " out_fd %d, net_seq_idx %d, session_id %" PRIu64
,
535 stream
->path_name
, stream
->key
, stream
->shm_fd
, stream
->wait_fd
,
536 (unsigned long long) stream
->mmap_len
, stream
->out_fd
,
537 stream
->net_seq_idx
, stream
->session_id
);
547 * Add a stream to the global list protected by a mutex.
549 static int consumer_add_stream(struct lttng_consumer_stream
*stream
,
553 struct consumer_relayd_sock_pair
*relayd
;
558 DBG3("Adding consumer stream %d", stream
->key
);
560 pthread_mutex_lock(&consumer_data
.lock
);
563 /* Steal stream identifier to avoid having streams with the same key */
564 consumer_steal_stream_key(stream
->key
, ht
);
566 lttng_ht_add_unique_ulong(ht
, &stream
->node
);
569 * Add stream to the stream_list_ht of the consumer data. No need to steal
570 * the key since the HT does not use it and we allow to add redundant keys
573 lttng_ht_add_ulong(consumer_data
.stream_list_ht
, &stream
->node_session_id
);
575 /* Check and cleanup relayd */
576 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
577 if (relayd
!= NULL
) {
578 uatomic_inc(&relayd
->refcount
);
581 /* Update channel refcount once added without error(s). */
582 uatomic_inc(&stream
->chan
->refcount
);
585 * When nb_init_streams reaches 0, we don't need to trigger any action in
586 * terms of destroying the associated channel, because the action that
587 * causes the count to become 0 also causes a stream to be added. The
588 * channel deletion will thus be triggered by the following removal of this
591 if (uatomic_read(&stream
->chan
->nb_init_streams
) > 0) {
592 uatomic_dec(&stream
->chan
->nb_init_streams
);
595 /* Update consumer data once the node is inserted. */
596 consumer_data
.stream_count
++;
597 consumer_data
.need_update
= 1;
600 pthread_mutex_unlock(&consumer_data
.lock
);
606 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
607 * be acquired before calling this.
609 static int add_relayd(struct consumer_relayd_sock_pair
*relayd
)
612 struct lttng_ht_node_ulong
*node
;
613 struct lttng_ht_iter iter
;
615 if (relayd
== NULL
) {
620 lttng_ht_lookup(consumer_data
.relayd_ht
,
621 (void *)((unsigned long) relayd
->net_seq_idx
), &iter
);
622 node
= lttng_ht_iter_get_node_ulong(&iter
);
624 /* Relayd already exist. Ignore the insertion */
627 lttng_ht_add_unique_ulong(consumer_data
.relayd_ht
, &relayd
->node
);
634 * Allocate and return a consumer relayd socket.
636 struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
639 struct consumer_relayd_sock_pair
*obj
= NULL
;
641 /* Negative net sequence index is a failure */
642 if (net_seq_idx
< 0) {
646 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
648 PERROR("zmalloc relayd sock");
652 obj
->net_seq_idx
= net_seq_idx
;
654 obj
->destroy_flag
= 0;
655 lttng_ht_node_init_ulong(&obj
->node
, obj
->net_seq_idx
);
656 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
663 * Find a relayd socket pair in the global consumer data.
665 * Return the object if found else NULL.
666 * RCU read-side lock must be held across this call and while using the
669 struct consumer_relayd_sock_pair
*consumer_find_relayd(int key
)
671 struct lttng_ht_iter iter
;
672 struct lttng_ht_node_ulong
*node
;
673 struct consumer_relayd_sock_pair
*relayd
= NULL
;
675 /* Negative keys are lookup failures */
680 lttng_ht_lookup(consumer_data
.relayd_ht
, (void *)((unsigned long) key
),
682 node
= lttng_ht_iter_get_node_ulong(&iter
);
684 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
692 * Handle stream for relayd transmission if the stream applies for network
693 * streaming where the net sequence index is set.
695 * Return destination file descriptor or negative value on error.
697 static int write_relayd_stream_header(struct lttng_consumer_stream
*stream
,
698 size_t data_size
, unsigned long padding
,
699 struct consumer_relayd_sock_pair
*relayd
)
702 struct lttcomm_relayd_data_hdr data_hdr
;
708 /* Reset data header */
709 memset(&data_hdr
, 0, sizeof(data_hdr
));
711 if (stream
->metadata_flag
) {
712 /* Caller MUST acquire the relayd control socket lock */
713 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
718 /* Metadata are always sent on the control socket. */
719 outfd
= relayd
->control_sock
.fd
;
721 /* Set header with stream information */
722 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
723 data_hdr
.data_size
= htobe32(data_size
);
724 data_hdr
.padding_size
= htobe32(padding
);
725 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
++);
726 /* Other fields are zeroed previously */
728 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
734 /* Set to go on data socket */
735 outfd
= relayd
->data_sock
.fd
;
743 void consumer_free_channel(struct rcu_head
*head
)
745 struct lttng_ht_node_ulong
*node
=
746 caa_container_of(head
, struct lttng_ht_node_ulong
, head
);
747 struct lttng_consumer_channel
*channel
=
748 caa_container_of(node
, struct lttng_consumer_channel
, node
);
754 * Remove a channel from the global list protected by a mutex. This
755 * function is also responsible for freeing its data structures.
757 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
760 struct lttng_ht_iter iter
;
762 pthread_mutex_lock(&consumer_data
.lock
);
764 switch (consumer_data
.type
) {
765 case LTTNG_CONSUMER_KERNEL
:
767 case LTTNG_CONSUMER32_UST
:
768 case LTTNG_CONSUMER64_UST
:
769 lttng_ustconsumer_del_channel(channel
);
772 ERR("Unknown consumer_data type");
778 iter
.iter
.node
= &channel
->node
.node
;
779 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
783 if (channel
->mmap_base
!= NULL
) {
784 ret
= munmap(channel
->mmap_base
, channel
->mmap_len
);
789 if (channel
->wait_fd
>= 0 && !channel
->wait_fd_is_copy
) {
790 ret
= close(channel
->wait_fd
);
795 if (channel
->shm_fd
>= 0 && channel
->wait_fd
!= channel
->shm_fd
) {
796 ret
= close(channel
->shm_fd
);
802 call_rcu(&channel
->node
.head
, consumer_free_channel
);
804 pthread_mutex_unlock(&consumer_data
.lock
);
807 struct lttng_consumer_channel
*consumer_allocate_channel(
809 int shm_fd
, int wait_fd
,
811 uint64_t max_sb_size
,
812 unsigned int nb_init_streams
)
814 struct lttng_consumer_channel
*channel
;
817 channel
= zmalloc(sizeof(*channel
));
818 if (channel
== NULL
) {
819 PERROR("malloc struct lttng_consumer_channel");
822 channel
->key
= channel_key
;
823 channel
->shm_fd
= shm_fd
;
824 channel
->wait_fd
= wait_fd
;
825 channel
->mmap_len
= mmap_len
;
826 channel
->max_sb_size
= max_sb_size
;
827 channel
->refcount
= 0;
828 channel
->nb_init_streams
= nb_init_streams
;
829 lttng_ht_node_init_ulong(&channel
->node
, channel
->key
);
831 switch (consumer_data
.type
) {
832 case LTTNG_CONSUMER_KERNEL
:
833 channel
->mmap_base
= NULL
;
834 channel
->mmap_len
= 0;
836 case LTTNG_CONSUMER32_UST
:
837 case LTTNG_CONSUMER64_UST
:
838 ret
= lttng_ustconsumer_allocate_channel(channel
);
845 ERR("Unknown consumer_data type");
849 DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
850 channel
->key
, channel
->shm_fd
, channel
->wait_fd
,
851 (unsigned long long) channel
->mmap_len
,
852 (unsigned long long) channel
->max_sb_size
);
858 * Add a channel to the global list protected by a mutex.
860 int consumer_add_channel(struct lttng_consumer_channel
*channel
)
862 struct lttng_ht_node_ulong
*node
;
863 struct lttng_ht_iter iter
;
865 pthread_mutex_lock(&consumer_data
.lock
);
866 /* Steal channel identifier, for UST */
867 consumer_steal_channel_key(channel
->key
);
870 lttng_ht_lookup(consumer_data
.channel_ht
,
871 (void *)((unsigned long) channel
->key
), &iter
);
872 node
= lttng_ht_iter_get_node_ulong(&iter
);
874 /* Channel already exist. Ignore the insertion */
878 lttng_ht_add_unique_ulong(consumer_data
.channel_ht
, &channel
->node
);
882 pthread_mutex_unlock(&consumer_data
.lock
);
888 * Allocate the pollfd structure and the local view of the out fds to avoid
889 * doing a lookup in the linked list and concurrency issues when writing is
890 * needed. Called with consumer_data.lock held.
892 * Returns the number of fds in the structures.
894 static int consumer_update_poll_array(
895 struct lttng_consumer_local_data
*ctx
, struct pollfd
**pollfd
,
896 struct lttng_consumer_stream
**local_stream
, struct lttng_ht
*ht
)
899 struct lttng_ht_iter iter
;
900 struct lttng_consumer_stream
*stream
;
902 DBG("Updating poll fd array");
904 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
906 * Only active streams with an active end point can be added to the
907 * poll set and local stream storage of the thread.
909 * There is a potential race here for endpoint_status to be updated
910 * just after the check. However, this is OK since the stream(s) will
911 * be deleted once the thread is notified that the end point state has
912 * changed where this function will be called back again.
914 if (stream
->state
!= LTTNG_CONSUMER_ACTIVE_STREAM
||
915 stream
->endpoint_status
) {
918 DBG("Active FD %d", stream
->wait_fd
);
919 (*pollfd
)[i
].fd
= stream
->wait_fd
;
920 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
921 local_stream
[i
] = stream
;
927 * Insert the consumer_data_pipe at the end of the array and don't
928 * increment i so nb_fd is the number of real FD.
930 (*pollfd
)[i
].fd
= ctx
->consumer_data_pipe
[0];
931 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
936 * Poll on the should_quit pipe and the command socket return -1 on error and
937 * should exit, 0 if data is available on the command socket
939 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
944 num_rdy
= poll(consumer_sockpoll
, 2, -1);
947 * Restart interrupted system call.
949 if (errno
== EINTR
) {
952 PERROR("Poll error");
955 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
956 DBG("consumer_should_quit wake up");
966 * Set the error socket.
968 void lttng_consumer_set_error_sock(
969 struct lttng_consumer_local_data
*ctx
, int sock
)
971 ctx
->consumer_error_socket
= sock
;
975 * Set the command socket path.
977 void lttng_consumer_set_command_sock_path(
978 struct lttng_consumer_local_data
*ctx
, char *sock
)
980 ctx
->consumer_command_sock_path
= sock
;
984 * Send return code to the session daemon.
985 * If the socket is not defined, we return 0, it is not a fatal error
987 int lttng_consumer_send_error(
988 struct lttng_consumer_local_data
*ctx
, int cmd
)
990 if (ctx
->consumer_error_socket
> 0) {
991 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
992 sizeof(enum lttcomm_sessiond_command
));
999 * Close all the tracefiles and stream fds, should be called when all instances
1002 void lttng_consumer_cleanup(void)
1004 struct lttng_ht_iter iter
;
1005 struct lttng_ht_node_ulong
*node
;
1009 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, node
,
1011 struct lttng_consumer_channel
*channel
=
1012 caa_container_of(node
, struct lttng_consumer_channel
, node
);
1013 consumer_del_channel(channel
);
1018 lttng_ht_destroy(consumer_data
.channel_ht
);
1022 * Called from signal handler.
1024 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
1029 ret
= write(ctx
->consumer_should_quit
[1], "4", 1);
1030 } while (ret
< 0 && errno
== EINTR
);
1032 PERROR("write consumer quit");
1035 DBG("Consumer flag that it should quit");
1038 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
1041 int outfd
= stream
->out_fd
;
1044 * This does a blocking write-and-wait on any page that belongs to the
1045 * subbuffer prior to the one we just wrote.
1046 * Don't care about error values, as these are just hints and ways to
1047 * limit the amount of page cache used.
1049 if (orig_offset
< stream
->chan
->max_sb_size
) {
1052 lttng_sync_file_range(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
1053 stream
->chan
->max_sb_size
,
1054 SYNC_FILE_RANGE_WAIT_BEFORE
1055 | SYNC_FILE_RANGE_WRITE
1056 | SYNC_FILE_RANGE_WAIT_AFTER
);
1058 * Give hints to the kernel about how we access the file:
1059 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1062 * We need to call fadvise again after the file grows because the
1063 * kernel does not seem to apply fadvise to non-existing parts of the
1066 * Call fadvise _after_ having waited for the page writeback to
1067 * complete because the dirty page writeback semantic is not well
1068 * defined. So it can be expected to lead to lower throughput in
1071 posix_fadvise(outfd
, orig_offset
- stream
->chan
->max_sb_size
,
1072 stream
->chan
->max_sb_size
, POSIX_FADV_DONTNEED
);
1076 * Initialise the necessary environnement :
1077 * - create a new context
1078 * - create the poll_pipe
1079 * - create the should_quit pipe (for signal handler)
1080 * - create the thread pipe (for splice)
1082 * Takes a function pointer as argument, this function is called when data is
1083 * available on a buffer. This function is responsible to do the
1084 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1085 * buffer configuration and then kernctl_put_next_subbuf at the end.
1087 * Returns a pointer to the new context or NULL on error.
1089 struct lttng_consumer_local_data
*lttng_consumer_create(
1090 enum lttng_consumer_type type
,
1091 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
1092 struct lttng_consumer_local_data
*ctx
),
1093 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
1094 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
1095 int (*update_stream
)(int stream_key
, uint32_t state
))
1098 struct lttng_consumer_local_data
*ctx
;
1100 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
1101 consumer_data
.type
== type
);
1102 consumer_data
.type
= type
;
1104 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
1106 PERROR("allocating context");
1110 ctx
->consumer_error_socket
= -1;
1111 /* assign the callbacks */
1112 ctx
->on_buffer_ready
= buffer_ready
;
1113 ctx
->on_recv_channel
= recv_channel
;
1114 ctx
->on_recv_stream
= recv_stream
;
1115 ctx
->on_update_stream
= update_stream
;
1117 ret
= pipe(ctx
->consumer_data_pipe
);
1119 PERROR("Error creating poll pipe");
1120 goto error_poll_pipe
;
1123 /* set read end of the pipe to non-blocking */
1124 ret
= fcntl(ctx
->consumer_data_pipe
[0], F_SETFL
, O_NONBLOCK
);
1126 PERROR("fcntl O_NONBLOCK");
1127 goto error_poll_fcntl
;
1130 /* set write end of the pipe to non-blocking */
1131 ret
= fcntl(ctx
->consumer_data_pipe
[1], F_SETFL
, O_NONBLOCK
);
1133 PERROR("fcntl O_NONBLOCK");
1134 goto error_poll_fcntl
;
1137 ret
= pipe(ctx
->consumer_should_quit
);
1139 PERROR("Error creating recv pipe");
1140 goto error_quit_pipe
;
1143 ret
= pipe(ctx
->consumer_thread_pipe
);
1145 PERROR("Error creating thread pipe");
1146 goto error_thread_pipe
;
1149 ret
= utils_create_pipe(ctx
->consumer_metadata_pipe
);
1151 goto error_metadata_pipe
;
1154 ret
= utils_create_pipe(ctx
->consumer_splice_metadata_pipe
);
1156 goto error_splice_pipe
;
1162 utils_close_pipe(ctx
->consumer_metadata_pipe
);
1163 error_metadata_pipe
:
1164 utils_close_pipe(ctx
->consumer_thread_pipe
);
1166 for (i
= 0; i
< 2; i
++) {
1169 err
= close(ctx
->consumer_should_quit
[i
]);
1176 for (i
= 0; i
< 2; i
++) {
1179 err
= close(ctx
->consumer_data_pipe
[i
]);
1191 * Close all fds associated with the instance and free the context.
1193 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1197 DBG("Consumer destroying it. Closing everything.");
1199 ret
= close(ctx
->consumer_error_socket
);
1203 ret
= close(ctx
->consumer_thread_pipe
[0]);
1207 ret
= close(ctx
->consumer_thread_pipe
[1]);
1211 ret
= close(ctx
->consumer_data_pipe
[0]);
1215 ret
= close(ctx
->consumer_data_pipe
[1]);
1219 ret
= close(ctx
->consumer_should_quit
[0]);
1223 ret
= close(ctx
->consumer_should_quit
[1]);
1227 utils_close_pipe(ctx
->consumer_splice_metadata_pipe
);
1229 unlink(ctx
->consumer_command_sock_path
);
1234 * Write the metadata stream id on the specified file descriptor.
1236 static int write_relayd_metadata_id(int fd
,
1237 struct lttng_consumer_stream
*stream
,
1238 struct consumer_relayd_sock_pair
*relayd
,
1239 unsigned long padding
)
1242 struct lttcomm_relayd_metadata_payload hdr
;
1244 hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
1245 hdr
.padding_size
= htobe32(padding
);
1247 ret
= write(fd
, (void *) &hdr
, sizeof(hdr
));
1248 } while (ret
< 0 && errno
== EINTR
);
1250 PERROR("write metadata stream id");
1253 DBG("Metadata stream id %" PRIu64
" with padding %lu written before data",
1254 stream
->relayd_stream_id
, padding
);
1261 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1262 * core function for writing trace buffers to either the local filesystem or
1265 * Careful review MUST be put if any changes occur!
1267 * Returns the number of bytes written
1269 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1270 struct lttng_consumer_local_data
*ctx
,
1271 struct lttng_consumer_stream
*stream
, unsigned long len
,
1272 unsigned long padding
)
1274 unsigned long mmap_offset
;
1275 ssize_t ret
= 0, written
= 0;
1276 off_t orig_offset
= stream
->out_fd_offset
;
1277 /* Default is on the disk */
1278 int outfd
= stream
->out_fd
;
1279 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1280 unsigned int relayd_hang_up
= 0;
1282 /* RCU lock for the relayd pointer */
1285 pthread_mutex_lock(&stream
->lock
);
1287 /* Flag that the current stream if set for network streaming. */
1288 if (stream
->net_seq_idx
!= -1) {
1289 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1290 if (relayd
== NULL
) {
1295 /* get the offset inside the fd to mmap */
1296 switch (consumer_data
.type
) {
1297 case LTTNG_CONSUMER_KERNEL
:
1298 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
1300 case LTTNG_CONSUMER32_UST
:
1301 case LTTNG_CONSUMER64_UST
:
1302 ret
= lttng_ustctl_get_mmap_read_offset(stream
->chan
->handle
,
1303 stream
->buf
, &mmap_offset
);
1306 ERR("Unknown consumer_data type");
1311 PERROR("tracer ctl get_mmap_read_offset");
1316 /* Handle stream on the relayd if the output is on the network */
1318 unsigned long netlen
= len
;
1321 * Lock the control socket for the complete duration of the function
1322 * since from this point on we will use the socket.
1324 if (stream
->metadata_flag
) {
1325 /* Metadata requires the control socket. */
1326 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1327 netlen
+= sizeof(struct lttcomm_relayd_metadata_payload
);
1330 ret
= write_relayd_stream_header(stream
, netlen
, padding
, relayd
);
1332 /* Use the returned socket. */
1335 /* Write metadata stream id before payload */
1336 if (stream
->metadata_flag
) {
1337 ret
= write_relayd_metadata_id(outfd
, stream
, relayd
, padding
);
1340 /* Socket operation failed. We consider the relayd dead */
1341 if (ret
== -EPIPE
|| ret
== -EINVAL
) {
1349 /* Socket operation failed. We consider the relayd dead */
1350 if (ret
== -EPIPE
|| ret
== -EINVAL
) {
1354 /* Else, use the default set before which is the filesystem. */
1357 /* No streaming, we have to set the len with the full padding */
1363 ret
= write(outfd
, stream
->mmap_base
+ mmap_offset
, len
);
1364 } while (ret
< 0 && errno
== EINTR
);
1365 DBG("Consumer mmap write() ret %zd (len %lu)", ret
, len
);
1367 PERROR("Error in file write");
1371 /* Socket operation failed. We consider the relayd dead */
1372 if (errno
== EPIPE
|| errno
== EINVAL
) {
1377 } else if (ret
> len
) {
1378 PERROR("Error in file write (ret %zd > len %lu)", ret
, len
);
1386 /* This call is useless on a socket so better save a syscall. */
1388 /* This won't block, but will start writeout asynchronously */
1389 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret
,
1390 SYNC_FILE_RANGE_WRITE
);
1391 stream
->out_fd_offset
+= ret
;
1395 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1399 * This is a special case that the relayd has closed its socket. Let's
1400 * cleanup the relayd object and all associated streams.
1402 if (relayd
&& relayd_hang_up
) {
1403 cleanup_relayd(relayd
, ctx
);
1407 /* Unlock only if ctrl socket used */
1408 if (relayd
&& stream
->metadata_flag
) {
1409 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1411 pthread_mutex_unlock(&stream
->lock
);
1418 * Splice the data from the ring buffer to the tracefile.
1420 * Returns the number of bytes spliced.
1422 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1423 struct lttng_consumer_local_data
*ctx
,
1424 struct lttng_consumer_stream
*stream
, unsigned long len
,
1425 unsigned long padding
)
1427 ssize_t ret
= 0, written
= 0, ret_splice
= 0;
1429 off_t orig_offset
= stream
->out_fd_offset
;
1430 int fd
= stream
->wait_fd
;
1431 /* Default is on the disk */
1432 int outfd
= stream
->out_fd
;
1433 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1435 unsigned int relayd_hang_up
= 0;
1437 switch (consumer_data
.type
) {
1438 case LTTNG_CONSUMER_KERNEL
:
1440 case LTTNG_CONSUMER32_UST
:
1441 case LTTNG_CONSUMER64_UST
:
1442 /* Not supported for user space tracing */
1445 ERR("Unknown consumer_data type");
1449 /* RCU lock for the relayd pointer */
1452 pthread_mutex_lock(&stream
->lock
);
1454 /* Flag that the current stream if set for network streaming. */
1455 if (stream
->net_seq_idx
!= -1) {
1456 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1457 if (relayd
== NULL
) {
1463 * Choose right pipe for splice. Metadata and trace data are handled by
1464 * different threads hence the use of two pipes in order not to race or
1465 * corrupt the written data.
1467 if (stream
->metadata_flag
) {
1468 splice_pipe
= ctx
->consumer_splice_metadata_pipe
;
1470 splice_pipe
= ctx
->consumer_thread_pipe
;
1473 /* Write metadata stream id before payload */
1475 int total_len
= len
;
1477 if (stream
->metadata_flag
) {
1479 * Lock the control socket for the complete duration of the function
1480 * since from this point on we will use the socket.
1482 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1484 ret
= write_relayd_metadata_id(splice_pipe
[1], stream
, relayd
,
1488 /* Socket operation failed. We consider the relayd dead */
1489 if (ret
== -EBADF
) {
1490 WARN("Remote relayd disconnected. Stopping");
1497 total_len
+= sizeof(struct lttcomm_relayd_metadata_payload
);
1500 ret
= write_relayd_stream_header(stream
, total_len
, padding
, relayd
);
1502 /* Use the returned socket. */
1505 /* Socket operation failed. We consider the relayd dead */
1506 if (ret
== -EBADF
) {
1507 WARN("Remote relayd disconnected. Stopping");
1514 /* No streaming, we have to set the len with the full padding */
1519 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1520 (unsigned long)offset
, len
, fd
, splice_pipe
[1]);
1521 ret_splice
= splice(fd
, &offset
, splice_pipe
[1], NULL
, len
,
1522 SPLICE_F_MOVE
| SPLICE_F_MORE
);
1523 DBG("splice chan to pipe, ret %zd", ret_splice
);
1524 if (ret_splice
< 0) {
1525 PERROR("Error in relay splice");
1527 written
= ret_splice
;
1533 /* Handle stream on the relayd if the output is on the network */
1535 if (stream
->metadata_flag
) {
1536 size_t metadata_payload_size
=
1537 sizeof(struct lttcomm_relayd_metadata_payload
);
1539 /* Update counter to fit the spliced data */
1540 ret_splice
+= metadata_payload_size
;
1541 len
+= metadata_payload_size
;
1543 * We do this so the return value can match the len passed as
1544 * argument to this function.
1546 written
-= metadata_payload_size
;
1550 /* Splice data out */
1551 ret_splice
= splice(splice_pipe
[0], NULL
, outfd
, NULL
,
1552 ret_splice
, SPLICE_F_MOVE
| SPLICE_F_MORE
);
1553 DBG("Consumer splice pipe to file, ret %zd", ret_splice
);
1554 if (ret_splice
< 0) {
1555 PERROR("Error in file splice");
1557 written
= ret_splice
;
1559 /* Socket operation failed. We consider the relayd dead */
1560 if (errno
== EBADF
) {
1561 WARN("Remote relayd disconnected. Stopping");
1567 } else if (ret_splice
> len
) {
1569 PERROR("Wrote more data than requested %zd (len: %lu)",
1571 written
+= ret_splice
;
1577 /* This call is useless on a socket so better save a syscall. */
1579 /* This won't block, but will start writeout asynchronously */
1580 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret_splice
,
1581 SYNC_FILE_RANGE_WRITE
);
1582 stream
->out_fd_offset
+= ret_splice
;
1584 written
+= ret_splice
;
1586 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1594 * This is a special case that the relayd has closed its socket. Let's
1595 * cleanup the relayd object and all associated streams.
1597 if (relayd
&& relayd_hang_up
) {
1598 cleanup_relayd(relayd
, ctx
);
1599 /* Skip splice error so the consumer does not fail */
1604 /* send the appropriate error description to sessiond */
1607 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_EINVAL
);
1610 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ENOMEM
);
1613 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ESPIPE
);
1618 if (relayd
&& stream
->metadata_flag
) {
1619 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1621 pthread_mutex_unlock(&stream
->lock
);
1628 * Take a snapshot for a specific fd
1630 * Returns 0 on success, < 0 on error
1632 int lttng_consumer_take_snapshot(struct lttng_consumer_local_data
*ctx
,
1633 struct lttng_consumer_stream
*stream
)
1635 switch (consumer_data
.type
) {
1636 case LTTNG_CONSUMER_KERNEL
:
1637 return lttng_kconsumer_take_snapshot(ctx
, stream
);
1638 case LTTNG_CONSUMER32_UST
:
1639 case LTTNG_CONSUMER64_UST
:
1640 return lttng_ustconsumer_take_snapshot(ctx
, stream
);
1642 ERR("Unknown consumer_data type");
1650 * Get the produced position
1652 * Returns 0 on success, < 0 on error
1654 int lttng_consumer_get_produced_snapshot(
1655 struct lttng_consumer_local_data
*ctx
,
1656 struct lttng_consumer_stream
*stream
,
1659 switch (consumer_data
.type
) {
1660 case LTTNG_CONSUMER_KERNEL
:
1661 return lttng_kconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1662 case LTTNG_CONSUMER32_UST
:
1663 case LTTNG_CONSUMER64_UST
:
1664 return lttng_ustconsumer_get_produced_snapshot(ctx
, stream
, pos
);
1666 ERR("Unknown consumer_data type");
1672 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
1673 int sock
, struct pollfd
*consumer_sockpoll
)
1675 switch (consumer_data
.type
) {
1676 case LTTNG_CONSUMER_KERNEL
:
1677 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1678 case LTTNG_CONSUMER32_UST
:
1679 case LTTNG_CONSUMER64_UST
:
1680 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1682 ERR("Unknown consumer_data type");
1689 * Iterate over all streams of the hashtable and free them properly.
1691 * WARNING: *MUST* be used with data stream only.
1693 static void destroy_data_stream_ht(struct lttng_ht
*ht
)
1696 struct lttng_ht_iter iter
;
1697 struct lttng_consumer_stream
*stream
;
1704 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1705 ret
= lttng_ht_del(ht
, &iter
);
1708 call_rcu(&stream
->node
.head
, consumer_free_stream
);
1712 lttng_ht_destroy(ht
);
1716 * Iterate over all streams of the hashtable and free them properly.
1718 * XXX: Should not be only for metadata stream or else use an other name.
1720 static void destroy_stream_ht(struct lttng_ht
*ht
)
1723 struct lttng_ht_iter iter
;
1724 struct lttng_consumer_stream
*stream
;
1731 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1732 ret
= lttng_ht_del(ht
, &iter
);
1735 call_rcu(&stream
->node
.head
, consumer_free_stream
);
1739 lttng_ht_destroy(ht
);
1743 * Clean up a metadata stream and free its memory.
1745 void consumer_del_metadata_stream(struct lttng_consumer_stream
*stream
,
1746 struct lttng_ht
*ht
)
1749 struct lttng_ht_iter iter
;
1750 struct lttng_consumer_channel
*free_chan
= NULL
;
1751 struct consumer_relayd_sock_pair
*relayd
;
1755 * This call should NEVER receive regular stream. It must always be
1756 * metadata stream and this is crucial for data structure synchronization.
1758 assert(stream
->metadata_flag
);
1760 DBG3("Consumer delete metadata stream %d", stream
->wait_fd
);
1763 /* Means the stream was allocated but not successfully added */
1767 pthread_mutex_lock(&stream
->lock
);
1769 pthread_mutex_lock(&consumer_data
.lock
);
1770 switch (consumer_data
.type
) {
1771 case LTTNG_CONSUMER_KERNEL
:
1772 if (stream
->mmap_base
!= NULL
) {
1773 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
1775 PERROR("munmap metadata stream");
1779 case LTTNG_CONSUMER32_UST
:
1780 case LTTNG_CONSUMER64_UST
:
1781 lttng_ustconsumer_del_stream(stream
);
1784 ERR("Unknown consumer_data type");
1790 iter
.iter
.node
= &stream
->node
.node
;
1791 ret
= lttng_ht_del(ht
, &iter
);
1794 /* Remove node session id from the consumer_data stream ht */
1795 iter
.iter
.node
= &stream
->node_session_id
.node
;
1796 ret
= lttng_ht_del(consumer_data
.stream_list_ht
, &iter
);
1800 if (stream
->out_fd
>= 0) {
1801 ret
= close(stream
->out_fd
);
1807 if (stream
->wait_fd
>= 0 && !stream
->wait_fd_is_copy
) {
1808 ret
= close(stream
->wait_fd
);
1814 if (stream
->shm_fd
>= 0 && stream
->wait_fd
!= stream
->shm_fd
) {
1815 ret
= close(stream
->shm_fd
);
1821 /* Check and cleanup relayd */
1823 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1824 if (relayd
!= NULL
) {
1825 uatomic_dec(&relayd
->refcount
);
1826 assert(uatomic_read(&relayd
->refcount
) >= 0);
1828 /* Closing streams requires to lock the control socket. */
1829 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1830 ret
= relayd_send_close_stream(&relayd
->control_sock
,
1831 stream
->relayd_stream_id
, stream
->next_net_seq_num
- 1);
1832 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1834 DBG("Unable to close stream on the relayd. Continuing");
1836 * Continue here. There is nothing we can do for the relayd.
1837 * Chances are that the relayd has closed the socket so we just
1838 * continue cleaning up.
1842 /* Both conditions are met, we destroy the relayd. */
1843 if (uatomic_read(&relayd
->refcount
) == 0 &&
1844 uatomic_read(&relayd
->destroy_flag
)) {
1845 destroy_relayd(relayd
);
1850 /* Atomically decrement channel refcount since other threads can use it. */
1851 uatomic_dec(&stream
->chan
->refcount
);
1852 if (!uatomic_read(&stream
->chan
->refcount
)
1853 && !uatomic_read(&stream
->chan
->nb_init_streams
)) {
1854 /* Go for channel deletion! */
1855 free_chan
= stream
->chan
;
1859 pthread_mutex_unlock(&consumer_data
.lock
);
1860 pthread_mutex_unlock(&stream
->lock
);
1863 consumer_del_channel(free_chan
);
1867 call_rcu(&stream
->node
.head
, consumer_free_stream
);
1871 * Action done with the metadata stream when adding it to the consumer internal
1872 * data structures to handle it.
1874 static int consumer_add_metadata_stream(struct lttng_consumer_stream
*stream
,
1875 struct lttng_ht
*ht
)
1878 struct consumer_relayd_sock_pair
*relayd
;
1883 DBG3("Adding metadata stream %d to hash table", stream
->wait_fd
);
1885 pthread_mutex_lock(&consumer_data
.lock
);
1888 * From here, refcounts are updated so be _careful_ when returning an error
1893 /* Find relayd and, if one is found, increment refcount. */
1894 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1895 if (relayd
!= NULL
) {
1896 uatomic_inc(&relayd
->refcount
);
1899 /* Update channel refcount once added without error(s). */
1900 uatomic_inc(&stream
->chan
->refcount
);
1903 * When nb_init_streams reaches 0, we don't need to trigger any action in
1904 * terms of destroying the associated channel, because the action that
1905 * causes the count to become 0 also causes a stream to be added. The
1906 * channel deletion will thus be triggered by the following removal of this
1909 if (uatomic_read(&stream
->chan
->nb_init_streams
) > 0) {
1910 uatomic_dec(&stream
->chan
->nb_init_streams
);
1913 /* Steal stream identifier to avoid having streams with the same key */
1914 consumer_steal_stream_key(stream
->key
, ht
);
1916 lttng_ht_add_unique_ulong(ht
, &stream
->node
);
1919 * Add stream to the stream_list_ht of the consumer data. No need to steal
1920 * the key since the HT does not use it and we allow to add redundant keys
1923 lttng_ht_add_ulong(consumer_data
.stream_list_ht
, &stream
->node_session_id
);
1927 pthread_mutex_unlock(&consumer_data
.lock
);
1932 * Delete data stream that are flagged for deletion (endpoint_status).
1934 static void validate_endpoint_status_data_stream(void)
1936 struct lttng_ht_iter iter
;
1937 struct lttng_consumer_stream
*stream
;
1939 DBG("Consumer delete flagged data stream");
1942 cds_lfht_for_each_entry(data_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1943 /* Validate delete flag of the stream */
1944 if (!stream
->endpoint_status
) {
1947 /* Delete it right now */
1948 consumer_del_stream(stream
, data_ht
);
1954 * Delete metadata stream that are flagged for deletion (endpoint_status).
1956 static void validate_endpoint_status_metadata_stream(
1957 struct lttng_poll_event
*pollset
)
1959 struct lttng_ht_iter iter
;
1960 struct lttng_consumer_stream
*stream
;
1962 DBG("Consumer delete flagged metadata stream");
1967 cds_lfht_for_each_entry(metadata_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1968 /* Validate delete flag of the stream */
1969 if (!stream
->endpoint_status
) {
1973 * Remove from pollset so the metadata thread can continue without
1974 * blocking on a deleted stream.
1976 lttng_poll_del(pollset
, stream
->wait_fd
);
1978 /* Delete it right now */
1979 consumer_del_metadata_stream(stream
, metadata_ht
);
1985 * Thread polls on metadata file descriptor and write them on disk or on the
1988 void *consumer_thread_metadata_poll(void *data
)
1991 uint32_t revents
, nb_fd
;
1992 struct lttng_consumer_stream
*stream
= NULL
;
1993 struct lttng_ht_iter iter
;
1994 struct lttng_ht_node_ulong
*node
;
1995 struct lttng_poll_event events
;
1996 struct lttng_consumer_local_data
*ctx
= data
;
1999 rcu_register_thread();
2001 DBG("Thread metadata poll started");
2003 /* Size is set to 1 for the consumer_metadata pipe */
2004 ret
= lttng_poll_create(&events
, 2, LTTNG_CLOEXEC
);
2006 ERR("Poll set creation failed");
2010 ret
= lttng_poll_add(&events
, ctx
->consumer_metadata_pipe
[0], LPOLLIN
);
2016 DBG("Metadata main loop started");
2019 lttng_poll_reset(&events
);
2021 nb_fd
= LTTNG_POLL_GETNB(&events
);
2023 /* Only the metadata pipe is set */
2024 if (nb_fd
== 0 && consumer_quit
== 1) {
2029 DBG("Metadata poll wait with %d fd(s)", nb_fd
);
2030 ret
= lttng_poll_wait(&events
, -1);
2031 DBG("Metadata event catched in thread");
2033 if (errno
== EINTR
) {
2034 ERR("Poll EINTR catched");
2040 /* From here, the event is a metadata wait fd */
2041 for (i
= 0; i
< nb_fd
; i
++) {
2042 revents
= LTTNG_POLL_GETEV(&events
, i
);
2043 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
2045 /* Just don't waste time if no returned events for the fd */
2050 if (pollfd
== ctx
->consumer_metadata_pipe
[0]) {
2051 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2052 DBG("Metadata thread pipe hung up");
2054 * Remove the pipe from the poll set and continue the loop
2055 * since their might be data to consume.
2057 lttng_poll_del(&events
, ctx
->consumer_metadata_pipe
[0]);
2058 close(ctx
->consumer_metadata_pipe
[0]);
2060 } else if (revents
& LPOLLIN
) {
2062 /* Get the stream pointer received */
2063 ret
= read(pollfd
, &stream
, sizeof(stream
));
2064 } while (ret
< 0 && errno
== EINTR
);
2066 ret
< sizeof(struct lttng_consumer_stream
*)) {
2067 PERROR("read metadata stream");
2069 * Let's continue here and hope we can still work
2070 * without stopping the consumer. XXX: Should we?
2075 /* A NULL stream means that the state has changed. */
2076 if (stream
== NULL
) {
2077 /* Check for deleted streams. */
2078 validate_endpoint_status_metadata_stream(&events
);
2082 DBG("Adding metadata stream %d to poll set",
2085 ret
= consumer_add_metadata_stream(stream
, metadata_ht
);
2087 ERR("Unable to add metadata stream");
2088 /* Stream was not setup properly. Continuing. */
2089 consumer_del_metadata_stream(stream
, NULL
);
2093 /* Add metadata stream to the global poll events list */
2094 lttng_poll_add(&events
, stream
->wait_fd
,
2095 LPOLLIN
| LPOLLPRI
);
2098 /* Handle other stream */
2103 lttng_ht_lookup(metadata_ht
, (void *)((unsigned long) pollfd
),
2105 node
= lttng_ht_iter_get_node_ulong(&iter
);
2108 stream
= caa_container_of(node
, struct lttng_consumer_stream
,
2111 /* Check for error event */
2112 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2113 DBG("Metadata fd %d is hup|err.", pollfd
);
2114 if (!stream
->hangup_flush_done
2115 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
2116 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
2117 DBG("Attempting to flush and consume the UST buffers");
2118 lttng_ustconsumer_on_stream_hangup(stream
);
2120 /* We just flushed the stream now read it. */
2122 len
= ctx
->on_buffer_ready(stream
, ctx
);
2124 * We don't check the return value here since if we get
2125 * a negative len, it means an error occured thus we
2126 * simply remove it from the poll set and free the
2132 lttng_poll_del(&events
, stream
->wait_fd
);
2134 * This call update the channel states, closes file descriptors
2135 * and securely free the stream.
2137 consumer_del_metadata_stream(stream
, metadata_ht
);
2138 } else if (revents
& (LPOLLIN
| LPOLLPRI
)) {
2139 /* Get the data out of the metadata file descriptor */
2140 DBG("Metadata available on fd %d", pollfd
);
2141 assert(stream
->wait_fd
== pollfd
);
2143 len
= ctx
->on_buffer_ready(stream
, ctx
);
2144 /* It's ok to have an unavailable sub-buffer */
2145 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2146 /* Clean up stream from consumer and free it. */
2147 lttng_poll_del(&events
, stream
->wait_fd
);
2148 consumer_del_metadata_stream(stream
, metadata_ht
);
2149 } else if (len
> 0) {
2150 stream
->data_read
= 1;
2154 /* Release RCU lock for the stream looked up */
2161 DBG("Metadata poll thread exiting");
2162 lttng_poll_clean(&events
);
2165 destroy_stream_ht(metadata_ht
);
2168 rcu_unregister_thread();
2173 * This thread polls the fds in the set to consume the data and write
2174 * it to tracefile if necessary.
2176 void *consumer_thread_data_poll(void *data
)
2178 int num_rdy
, num_hup
, high_prio
, ret
, i
;
2179 struct pollfd
*pollfd
= NULL
;
2180 /* local view of the streams */
2181 struct lttng_consumer_stream
**local_stream
= NULL
, *new_stream
= NULL
;
2182 /* local view of consumer_data.fds_count */
2184 struct lttng_consumer_local_data
*ctx
= data
;
2187 rcu_register_thread();
2189 data_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2190 if (data_ht
== NULL
) {
2194 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
2201 * the fds set has been updated, we need to update our
2202 * local array as well
2204 pthread_mutex_lock(&consumer_data
.lock
);
2205 if (consumer_data
.need_update
) {
2206 if (pollfd
!= NULL
) {
2210 if (local_stream
!= NULL
) {
2212 local_stream
= NULL
;
2215 /* allocate for all fds + 1 for the consumer_data_pipe */
2216 pollfd
= zmalloc((consumer_data
.stream_count
+ 1) * sizeof(struct pollfd
));
2217 if (pollfd
== NULL
) {
2218 PERROR("pollfd malloc");
2219 pthread_mutex_unlock(&consumer_data
.lock
);
2223 /* allocate for all fds + 1 for the consumer_data_pipe */
2224 local_stream
= zmalloc((consumer_data
.stream_count
+ 1) *
2225 sizeof(struct lttng_consumer_stream
));
2226 if (local_stream
== NULL
) {
2227 PERROR("local_stream malloc");
2228 pthread_mutex_unlock(&consumer_data
.lock
);
2231 ret
= consumer_update_poll_array(ctx
, &pollfd
, local_stream
,
2234 ERR("Error in allocating pollfd or local_outfds");
2235 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
2236 pthread_mutex_unlock(&consumer_data
.lock
);
2240 consumer_data
.need_update
= 0;
2242 pthread_mutex_unlock(&consumer_data
.lock
);
2244 /* No FDs and consumer_quit, consumer_cleanup the thread */
2245 if (nb_fd
== 0 && consumer_quit
== 1) {
2248 /* poll on the array of fds */
2250 DBG("polling on %d fd", nb_fd
+ 1);
2251 num_rdy
= poll(pollfd
, nb_fd
+ 1, consumer_poll_timeout
);
2252 DBG("poll num_rdy : %d", num_rdy
);
2253 if (num_rdy
== -1) {
2255 * Restart interrupted system call.
2257 if (errno
== EINTR
) {
2260 PERROR("Poll error");
2261 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
2263 } else if (num_rdy
== 0) {
2264 DBG("Polling thread timed out");
2269 * If the consumer_data_pipe triggered poll go directly to the
2270 * beginning of the loop to update the array. We want to prioritize
2271 * array update over low-priority reads.
2273 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
2274 size_t pipe_readlen
;
2276 DBG("consumer_data_pipe wake up");
2277 /* Consume 1 byte of pipe data */
2279 pipe_readlen
= read(ctx
->consumer_data_pipe
[0], &new_stream
,
2280 sizeof(new_stream
));
2281 } while (pipe_readlen
== -1 && errno
== EINTR
);
2284 * If the stream is NULL, just ignore it. It's also possible that
2285 * the sessiond poll thread changed the consumer_quit state and is
2286 * waking us up to test it.
2288 if (new_stream
== NULL
) {
2289 validate_endpoint_status_data_stream();
2293 ret
= consumer_add_stream(new_stream
, data_ht
);
2295 ERR("Consumer add stream %d failed. Continuing",
2298 * At this point, if the add_stream fails, it is not in the
2299 * hash table thus passing the NULL value here.
2301 consumer_del_stream(new_stream
, NULL
);
2304 /* Continue to update the local streams and handle prio ones */
2308 /* Take care of high priority channels first. */
2309 for (i
= 0; i
< nb_fd
; i
++) {
2310 if (pollfd
[i
].revents
& POLLPRI
) {
2311 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
2313 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
2314 /* it's ok to have an unavailable sub-buffer */
2315 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2316 /* Clean the stream and free it. */
2317 consumer_del_stream(local_stream
[i
], data_ht
);
2318 } else if (len
> 0) {
2319 local_stream
[i
]->data_read
= 1;
2325 * If we read high prio channel in this loop, try again
2326 * for more high prio data.
2332 /* Take care of low priority channels. */
2333 for (i
= 0; i
< nb_fd
; i
++) {
2334 if ((pollfd
[i
].revents
& POLLIN
) ||
2335 local_stream
[i
]->hangup_flush_done
) {
2336 DBG("Normal read on fd %d", pollfd
[i
].fd
);
2337 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
2338 /* it's ok to have an unavailable sub-buffer */
2339 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2340 /* Clean the stream and free it. */
2341 consumer_del_stream(local_stream
[i
], data_ht
);
2342 } else if (len
> 0) {
2343 local_stream
[i
]->data_read
= 1;
2348 /* Handle hangup and errors */
2349 for (i
= 0; i
< nb_fd
; i
++) {
2350 if (!local_stream
[i
]->hangup_flush_done
2351 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
2352 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
2353 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
2354 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2356 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
2357 /* Attempt read again, for the data we just flushed. */
2358 local_stream
[i
]->data_read
= 1;
2361 * If the poll flag is HUP/ERR/NVAL and we have
2362 * read no data in this pass, we can remove the
2363 * stream from its hash table.
2365 if ((pollfd
[i
].revents
& POLLHUP
)) {
2366 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
2367 if (!local_stream
[i
]->data_read
) {
2368 consumer_del_stream(local_stream
[i
], data_ht
);
2371 } else if (pollfd
[i
].revents
& POLLERR
) {
2372 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
2373 if (!local_stream
[i
]->data_read
) {
2374 consumer_del_stream(local_stream
[i
], data_ht
);
2377 } else if (pollfd
[i
].revents
& POLLNVAL
) {
2378 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
2379 if (!local_stream
[i
]->data_read
) {
2380 consumer_del_stream(local_stream
[i
], data_ht
);
2384 local_stream
[i
]->data_read
= 0;
2388 DBG("polling thread exiting");
2389 if (pollfd
!= NULL
) {
2393 if (local_stream
!= NULL
) {
2395 local_stream
= NULL
;
2399 * Close the write side of the pipe so epoll_wait() in
2400 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2401 * read side of the pipe. If we close them both, epoll_wait strangely does
2402 * not return and could create a endless wait period if the pipe is the
2403 * only tracked fd in the poll set. The thread will take care of closing
2406 close(ctx
->consumer_metadata_pipe
[1]);
2409 destroy_data_stream_ht(data_ht
);
2412 rcu_unregister_thread();
2417 * This thread listens on the consumerd socket and receives the file
2418 * descriptors from the session daemon.
2420 void *consumer_thread_sessiond_poll(void *data
)
2422 int sock
, client_socket
, ret
;
2424 * structure to poll for incoming data on communication socket avoids
2425 * making blocking sockets.
2427 struct pollfd consumer_sockpoll
[2];
2428 struct lttng_consumer_local_data
*ctx
= data
;
2430 rcu_register_thread();
2432 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
2433 unlink(ctx
->consumer_command_sock_path
);
2434 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
2435 if (client_socket
< 0) {
2436 ERR("Cannot create command socket");
2440 ret
= lttcomm_listen_unix_sock(client_socket
);
2445 DBG("Sending ready command to lttng-sessiond");
2446 ret
= lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
);
2447 /* return < 0 on error, but == 0 is not fatal */
2449 ERR("Error sending ready command to lttng-sessiond");
2453 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
2455 PERROR("fcntl O_NONBLOCK");
2459 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2460 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
2461 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
2462 consumer_sockpoll
[1].fd
= client_socket
;
2463 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
2465 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2468 DBG("Connection on client_socket");
2470 /* Blocking call, waiting for transmission */
2471 sock
= lttcomm_accept_unix_sock(client_socket
);
2476 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
2478 PERROR("fcntl O_NONBLOCK");
2482 /* update the polling structure to poll on the established socket */
2483 consumer_sockpoll
[1].fd
= sock
;
2484 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
2487 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2490 DBG("Incoming command on sock");
2491 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
2492 if (ret
== -ENOENT
) {
2493 DBG("Received STOP command");
2498 * This could simply be a session daemon quitting. Don't output
2501 DBG("Communication interrupted on command socket");
2504 if (consumer_quit
) {
2505 DBG("consumer_thread_receive_fds received quit from signal");
2508 DBG("received fds on sock");
2511 DBG("consumer_thread_receive_fds exiting");
2514 * when all fds have hung up, the polling thread
2520 * 2s of grace period, if no polling events occur during
2521 * this period, the polling thread will exit even if there
2522 * are still open FDs (should not happen, but safety mechanism).
2524 consumer_poll_timeout
= LTTNG_CONSUMER_POLL_TIMEOUT
;
2527 * Notify the data poll thread to poll back again and test the
2528 * consumer_quit state that we just set so to quit gracefully.
2530 notify_thread_pipe(ctx
->consumer_data_pipe
[1]);
2532 rcu_unregister_thread();
2536 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
2537 struct lttng_consumer_local_data
*ctx
)
2539 switch (consumer_data
.type
) {
2540 case LTTNG_CONSUMER_KERNEL
:
2541 return lttng_kconsumer_read_subbuffer(stream
, ctx
);
2542 case LTTNG_CONSUMER32_UST
:
2543 case LTTNG_CONSUMER64_UST
:
2544 return lttng_ustconsumer_read_subbuffer(stream
, ctx
);
2546 ERR("Unknown consumer_data type");
2552 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
2554 switch (consumer_data
.type
) {
2555 case LTTNG_CONSUMER_KERNEL
:
2556 return lttng_kconsumer_on_recv_stream(stream
);
2557 case LTTNG_CONSUMER32_UST
:
2558 case LTTNG_CONSUMER64_UST
:
2559 return lttng_ustconsumer_on_recv_stream(stream
);
2561 ERR("Unknown consumer_data type");
2568 * Allocate and set consumer data hash tables.
2570 void lttng_consumer_init(void)
2572 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2573 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2574 consumer_data
.stream_list_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2576 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2577 assert(metadata_ht
);
2578 data_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_ULONG
);
2583 * Process the ADD_RELAYD command receive by a consumer.
2585 * This will create a relayd socket pair and add it to the relayd hash table.
2586 * The caller MUST acquire a RCU read side lock before calling it.
2588 int consumer_add_relayd_socket(int net_seq_idx
, int sock_type
,
2589 struct lttng_consumer_local_data
*ctx
, int sock
,
2590 struct pollfd
*consumer_sockpoll
, struct lttcomm_sock
*relayd_sock
)
2593 struct consumer_relayd_sock_pair
*relayd
;
2595 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx
);
2597 /* Get relayd reference if exists. */
2598 relayd
= consumer_find_relayd(net_seq_idx
);
2599 if (relayd
== NULL
) {
2600 /* Not found. Allocate one. */
2601 relayd
= consumer_allocate_relayd_sock_pair(net_seq_idx
);
2602 if (relayd
== NULL
) {
2603 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
2608 /* Poll on consumer socket. */
2609 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2614 /* Get relayd socket from session daemon */
2615 ret
= lttcomm_recv_fds_unix_sock(sock
, &fd
, 1);
2616 if (ret
!= sizeof(fd
)) {
2617 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_FD
);
2622 /* Copy socket information and received FD */
2623 switch (sock_type
) {
2624 case LTTNG_STREAM_CONTROL
:
2625 /* Copy received lttcomm socket */
2626 lttcomm_copy_sock(&relayd
->control_sock
, relayd_sock
);
2627 ret
= lttcomm_create_sock(&relayd
->control_sock
);
2632 /* Close the created socket fd which is useless */
2633 close(relayd
->control_sock
.fd
);
2635 /* Assign new file descriptor */
2636 relayd
->control_sock
.fd
= fd
;
2638 case LTTNG_STREAM_DATA
:
2639 /* Copy received lttcomm socket */
2640 lttcomm_copy_sock(&relayd
->data_sock
, relayd_sock
);
2641 ret
= lttcomm_create_sock(&relayd
->data_sock
);
2646 /* Close the created socket fd which is useless */
2647 close(relayd
->data_sock
.fd
);
2649 /* Assign new file descriptor */
2650 relayd
->data_sock
.fd
= fd
;
2653 ERR("Unknown relayd socket type (%d)", sock_type
);
2657 DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
2658 sock_type
== LTTNG_STREAM_CONTROL
? "control" : "data",
2659 relayd
->net_seq_idx
, fd
);
2662 * Add relayd socket pair to consumer data hashtable. If object already
2663 * exists or on error, the function gracefully returns.
2675 * Check if for a given session id there is still data needed to be extract
2678 * Return 1 if data is in fact available to be read or else 0.
2680 int consumer_data_available(uint64_t id
)
2683 struct lttng_ht_iter iter
;
2684 struct lttng_ht
*ht
;
2685 struct lttng_consumer_stream
*stream
;
2686 struct consumer_relayd_sock_pair
*relayd
;
2687 int (*data_available
)(struct lttng_consumer_stream
*);
2689 DBG("Consumer data available command on session id %" PRIu64
, id
);
2692 pthread_mutex_lock(&consumer_data
.lock
);
2694 switch (consumer_data
.type
) {
2695 case LTTNG_CONSUMER_KERNEL
:
2696 data_available
= lttng_kconsumer_data_available
;
2698 case LTTNG_CONSUMER32_UST
:
2699 case LTTNG_CONSUMER64_UST
:
2700 data_available
= lttng_ustconsumer_data_available
;
2703 ERR("Unknown consumer data type");
2707 /* Ease our life a bit */
2708 ht
= consumer_data
.stream_list_ht
;
2710 cds_lfht_for_each_entry_duplicate(ht
->ht
,
2711 ht
->hash_fct((void *)((unsigned long) id
), 0x42UL
),
2712 ht
->match_fct
, (void *)((unsigned long) id
),
2713 &iter
.iter
, stream
, node_session_id
.node
) {
2714 /* Check the stream for data. */
2715 ret
= data_available(stream
);
2717 goto data_not_available
;
2720 if (stream
->net_seq_idx
!= -1) {
2721 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
2724 pthread_mutex_lock(&stream
->lock
);
2725 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
2726 if (stream
->metadata_flag
) {
2727 ret
= relayd_quiescent_control(&relayd
->control_sock
);
2729 ret
= relayd_data_available(&relayd
->control_sock
,
2730 stream
->relayd_stream_id
, stream
->next_net_seq_num
);
2732 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
2733 pthread_mutex_unlock(&stream
->lock
);
2735 goto data_not_available
;
2741 * Finding _no_ node in the hash table means that the stream(s) have been
2742 * removed thus data is guaranteed to be available for analysis from the
2743 * trace files. This is *only* true for local consumer and not network
2747 /* Data is available to be read by a viewer. */
2748 pthread_mutex_unlock(&consumer_data
.lock
);
2753 /* Data is still being extracted from buffers. */
2754 pthread_mutex_unlock(&consumer_data
.lock
);