2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * 2012 - David Goulet <dgoulet@efficios.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
27 #include <sys/socket.h>
28 #include <sys/types.h>
32 #include <common/common.h>
33 #include <common/utils.h>
34 #include <common/compat/poll.h>
35 #include <common/kernel-ctl/kernel-ctl.h>
36 #include <common/sessiond-comm/relayd.h>
37 #include <common/sessiond-comm/sessiond-comm.h>
38 #include <common/kernel-consumer/kernel-consumer.h>
39 #include <common/relayd/relayd.h>
40 #include <common/ust-consumer/ust-consumer.h>
44 struct lttng_consumer_global_data consumer_data
= {
47 .type
= LTTNG_CONSUMER_UNKNOWN
,
50 enum consumer_channel_action
{
52 CONSUMER_CHANNEL_QUIT
,
55 struct consumer_channel_msg
{
56 enum consumer_channel_action action
;
57 struct lttng_consumer_channel
*chan
;
61 * Flag to inform the polling thread to quit when all fd hung up. Updated by
62 * the consumer_thread_receive_fds when it notices that all fds has hung up.
63 * Also updated by the signal handler (consumer_should_exit()). Read by the
66 volatile int consumer_quit
;
69 * Global hash table containing respectively metadata and data streams. The
70 * stream element in this ht should only be updated by the metadata poll thread
71 * for the metadata and the data poll thread for the data.
73 static struct lttng_ht
*metadata_ht
;
74 static struct lttng_ht
*data_ht
;
77 * Notify a thread pipe to poll back again. This usually means that some global
78 * state has changed so we just send back the thread in a poll wait call.
80 static void notify_thread_pipe(int wpipe
)
85 struct lttng_consumer_stream
*null_stream
= NULL
;
87 ret
= write(wpipe
, &null_stream
, sizeof(null_stream
));
88 } while (ret
< 0 && errno
== EINTR
);
91 static void notify_channel_pipe(struct lttng_consumer_local_data
*ctx
,
92 struct lttng_consumer_channel
*chan
,
93 enum consumer_channel_action action
)
95 struct consumer_channel_msg msg
;
101 ret
= write(ctx
->consumer_channel_pipe
[1], &msg
, sizeof(msg
));
102 } while (ret
< 0 && errno
== EINTR
);
105 static int read_channel_pipe(struct lttng_consumer_local_data
*ctx
,
106 struct lttng_consumer_channel
**chan
,
107 enum consumer_channel_action
*action
)
109 struct consumer_channel_msg msg
;
113 ret
= read(ctx
->consumer_channel_pipe
[0], &msg
, sizeof(msg
));
114 } while (ret
< 0 && errno
== EINTR
);
116 *action
= msg
.action
;
123 * Find a stream. The consumer_data.lock must be locked during this
126 static struct lttng_consumer_stream
*find_stream(uint64_t key
,
129 struct lttng_ht_iter iter
;
130 struct lttng_ht_node_u64
*node
;
131 struct lttng_consumer_stream
*stream
= NULL
;
135 /* -1ULL keys are lookup failures */
136 if (key
== (uint64_t) -1ULL) {
142 lttng_ht_lookup(ht
, &key
, &iter
);
143 node
= lttng_ht_iter_get_node_u64(&iter
);
145 stream
= caa_container_of(node
, struct lttng_consumer_stream
, node
);
153 static void steal_stream_key(int key
, struct lttng_ht
*ht
)
155 struct lttng_consumer_stream
*stream
;
158 stream
= find_stream(key
, ht
);
162 * We don't want the lookup to match, but we still need
163 * to iterate on this stream when iterating over the hash table. Just
164 * change the node key.
166 stream
->node
.key
= -1ULL;
172 * Return a channel object for the given key.
174 * RCU read side lock MUST be acquired before calling this function and
175 * protects the channel ptr.
177 struct lttng_consumer_channel
*consumer_find_channel(uint64_t key
)
179 struct lttng_ht_iter iter
;
180 struct lttng_ht_node_u64
*node
;
181 struct lttng_consumer_channel
*channel
= NULL
;
183 /* -1ULL keys are lookup failures */
184 if (key
== (uint64_t) -1ULL) {
188 lttng_ht_lookup(consumer_data
.channel_ht
, &key
, &iter
);
189 node
= lttng_ht_iter_get_node_u64(&iter
);
191 channel
= caa_container_of(node
, struct lttng_consumer_channel
, node
);
197 static void free_stream_rcu(struct rcu_head
*head
)
199 struct lttng_ht_node_u64
*node
=
200 caa_container_of(head
, struct lttng_ht_node_u64
, head
);
201 struct lttng_consumer_stream
*stream
=
202 caa_container_of(node
, struct lttng_consumer_stream
, node
);
207 static void free_channel_rcu(struct rcu_head
*head
)
209 struct lttng_ht_node_u64
*node
=
210 caa_container_of(head
, struct lttng_ht_node_u64
, head
);
211 struct lttng_consumer_channel
*channel
=
212 caa_container_of(node
, struct lttng_consumer_channel
, node
);
218 * RCU protected relayd socket pair free.
220 static void free_relayd_rcu(struct rcu_head
*head
)
222 struct lttng_ht_node_u64
*node
=
223 caa_container_of(head
, struct lttng_ht_node_u64
, head
);
224 struct consumer_relayd_sock_pair
*relayd
=
225 caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
228 * Close all sockets. This is done in the call RCU since we don't want the
229 * socket fds to be reassigned thus potentially creating bad state of the
232 * We do not have to lock the control socket mutex here since at this stage
233 * there is no one referencing to this relayd object.
235 (void) relayd_close(&relayd
->control_sock
);
236 (void) relayd_close(&relayd
->data_sock
);
242 * Destroy and free relayd socket pair object.
244 * This function MUST be called with the consumer_data lock acquired.
246 static void destroy_relayd(struct consumer_relayd_sock_pair
*relayd
)
249 struct lttng_ht_iter iter
;
251 if (relayd
== NULL
) {
255 DBG("Consumer destroy and close relayd socket pair");
257 iter
.iter
.node
= &relayd
->node
.node
;
258 ret
= lttng_ht_del(consumer_data
.relayd_ht
, &iter
);
260 /* We assume the relayd is being or is destroyed */
264 /* RCU free() call */
265 call_rcu(&relayd
->node
.head
, free_relayd_rcu
);
269 * Remove a channel from the global list protected by a mutex. This function is
270 * also responsible for freeing its data structures.
272 void consumer_del_channel(struct lttng_consumer_channel
*channel
)
275 struct lttng_ht_iter iter
;
277 DBG("Consumer delete channel key %" PRIu64
, channel
->key
);
279 pthread_mutex_lock(&consumer_data
.lock
);
281 switch (consumer_data
.type
) {
282 case LTTNG_CONSUMER_KERNEL
:
284 case LTTNG_CONSUMER32_UST
:
285 case LTTNG_CONSUMER64_UST
:
286 lttng_ustconsumer_del_channel(channel
);
289 ERR("Unknown consumer_data type");
295 iter
.iter
.node
= &channel
->node
.node
;
296 ret
= lttng_ht_del(consumer_data
.channel_ht
, &iter
);
300 call_rcu(&channel
->node
.head
, free_channel_rcu
);
302 pthread_mutex_unlock(&consumer_data
.lock
);
306 * Iterate over the relayd hash table and destroy each element. Finally,
307 * destroy the whole hash table.
309 static void cleanup_relayd_ht(void)
311 struct lttng_ht_iter iter
;
312 struct consumer_relayd_sock_pair
*relayd
;
316 cds_lfht_for_each_entry(consumer_data
.relayd_ht
->ht
, &iter
.iter
, relayd
,
318 destroy_relayd(relayd
);
321 lttng_ht_destroy(consumer_data
.relayd_ht
);
327 * Update the end point status of all streams having the given network sequence
328 * index (relayd index).
330 * It's atomically set without having the stream mutex locked which is fine
331 * because we handle the write/read race with a pipe wakeup for each thread.
333 static void update_endpoint_status_by_netidx(int net_seq_idx
,
334 enum consumer_endpoint_status status
)
336 struct lttng_ht_iter iter
;
337 struct lttng_consumer_stream
*stream
;
339 DBG("Consumer set delete flag on stream by idx %d", net_seq_idx
);
343 /* Let's begin with metadata */
344 cds_lfht_for_each_entry(metadata_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
345 if (stream
->net_seq_idx
== net_seq_idx
) {
346 uatomic_set(&stream
->endpoint_status
, status
);
347 DBG("Delete flag set to metadata stream %d", stream
->wait_fd
);
351 /* Follow up by the data streams */
352 cds_lfht_for_each_entry(data_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
353 if (stream
->net_seq_idx
== net_seq_idx
) {
354 uatomic_set(&stream
->endpoint_status
, status
);
355 DBG("Delete flag set to data stream %d", stream
->wait_fd
);
362 * Cleanup a relayd object by flagging every associated streams for deletion,
363 * destroying the object meaning removing it from the relayd hash table,
364 * closing the sockets and freeing the memory in a RCU call.
366 * If a local data context is available, notify the threads that the streams'
367 * state have changed.
369 static void cleanup_relayd(struct consumer_relayd_sock_pair
*relayd
,
370 struct lttng_consumer_local_data
*ctx
)
376 DBG("Cleaning up relayd sockets");
378 /* Save the net sequence index before destroying the object */
379 netidx
= relayd
->net_seq_idx
;
382 * Delete the relayd from the relayd hash table, close the sockets and free
383 * the object in a RCU call.
385 destroy_relayd(relayd
);
387 /* Set inactive endpoint to all streams */
388 update_endpoint_status_by_netidx(netidx
, CONSUMER_ENDPOINT_INACTIVE
);
391 * With a local data context, notify the threads that the streams' state
392 * have changed. The write() action on the pipe acts as an "implicit"
393 * memory barrier ordering the updates of the end point status from the
394 * read of this status which happens AFTER receiving this notify.
397 notify_thread_pipe(ctx
->consumer_data_pipe
[1]);
398 notify_thread_pipe(ctx
->consumer_metadata_pipe
[1]);
403 * Flag a relayd socket pair for destruction. Destroy it if the refcount
406 * RCU read side lock MUST be aquired before calling this function.
408 void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair
*relayd
)
412 /* Set destroy flag for this object */
413 uatomic_set(&relayd
->destroy_flag
, 1);
415 /* Destroy the relayd if refcount is 0 */
416 if (uatomic_read(&relayd
->refcount
) == 0) {
417 destroy_relayd(relayd
);
422 * Remove a stream from the global list protected by a mutex. This
423 * function is also responsible for freeing its data structures.
425 void consumer_del_stream(struct lttng_consumer_stream
*stream
,
429 struct lttng_ht_iter iter
;
430 struct lttng_consumer_channel
*free_chan
= NULL
;
431 struct consumer_relayd_sock_pair
*relayd
;
435 DBG("Consumer del stream %d", stream
->wait_fd
);
438 /* Means the stream was allocated but not successfully added */
439 goto free_stream_rcu
;
442 pthread_mutex_lock(&consumer_data
.lock
);
443 pthread_mutex_lock(&stream
->lock
);
445 switch (consumer_data
.type
) {
446 case LTTNG_CONSUMER_KERNEL
:
447 if (stream
->mmap_base
!= NULL
) {
448 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
454 case LTTNG_CONSUMER32_UST
:
455 case LTTNG_CONSUMER64_UST
:
456 lttng_ustconsumer_del_stream(stream
);
459 ERR("Unknown consumer_data type");
465 iter
.iter
.node
= &stream
->node
.node
;
466 ret
= lttng_ht_del(ht
, &iter
);
469 iter
.iter
.node
= &stream
->node_channel_id
.node
;
470 ret
= lttng_ht_del(consumer_data
.stream_per_chan_id_ht
, &iter
);
473 iter
.iter
.node
= &stream
->node_session_id
.node
;
474 ret
= lttng_ht_del(consumer_data
.stream_list_ht
, &iter
);
478 assert(consumer_data
.stream_count
> 0);
479 consumer_data
.stream_count
--;
481 if (stream
->out_fd
>= 0) {
482 ret
= close(stream
->out_fd
);
488 /* Check and cleanup relayd */
490 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
491 if (relayd
!= NULL
) {
492 uatomic_dec(&relayd
->refcount
);
493 assert(uatomic_read(&relayd
->refcount
) >= 0);
495 /* Closing streams requires to lock the control socket. */
496 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
497 ret
= relayd_send_close_stream(&relayd
->control_sock
,
498 stream
->relayd_stream_id
,
499 stream
->next_net_seq_num
- 1);
500 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
502 DBG("Unable to close stream on the relayd. Continuing");
504 * Continue here. There is nothing we can do for the relayd.
505 * Chances are that the relayd has closed the socket so we just
506 * continue cleaning up.
510 /* Both conditions are met, we destroy the relayd. */
511 if (uatomic_read(&relayd
->refcount
) == 0 &&
512 uatomic_read(&relayd
->destroy_flag
)) {
513 destroy_relayd(relayd
);
518 uatomic_dec(&stream
->chan
->refcount
);
519 if (!uatomic_read(&stream
->chan
->refcount
)
520 && !uatomic_read(&stream
->chan
->nb_init_stream_left
)) {
521 free_chan
= stream
->chan
;
525 consumer_data
.need_update
= 1;
526 pthread_mutex_unlock(&stream
->lock
);
527 pthread_mutex_unlock(&consumer_data
.lock
);
530 consumer_del_channel(free_chan
);
534 call_rcu(&stream
->node
.head
, free_stream_rcu
);
537 struct lttng_consumer_stream
*consumer_allocate_stream(uint64_t channel_key
,
539 enum lttng_consumer_stream_state state
,
540 const char *channel_name
,
547 enum consumer_channel_type type
)
550 struct lttng_consumer_stream
*stream
;
552 stream
= zmalloc(sizeof(*stream
));
553 if (stream
== NULL
) {
554 PERROR("malloc struct lttng_consumer_stream");
561 stream
->key
= stream_key
;
563 stream
->out_fd_offset
= 0;
564 stream
->state
= state
;
567 stream
->net_seq_idx
= relayd_id
;
568 stream
->session_id
= session_id
;
569 pthread_mutex_init(&stream
->lock
, NULL
);
571 /* If channel is the metadata, flag this stream as metadata. */
572 if (type
== CONSUMER_CHANNEL_TYPE_METADATA
) {
573 stream
->metadata_flag
= 1;
574 /* Metadata is flat out. */
575 strncpy(stream
->name
, DEFAULT_METADATA_NAME
, sizeof(stream
->name
));
577 /* Format stream name to <channel_name>_<cpu_number> */
578 ret
= snprintf(stream
->name
, sizeof(stream
->name
), "%s_%d",
581 PERROR("snprintf stream name");
586 /* Key is always the wait_fd for streams. */
587 lttng_ht_node_init_u64(&stream
->node
, stream
->key
);
589 /* Init node per channel id key */
590 lttng_ht_node_init_u64(&stream
->node_channel_id
, channel_key
);
592 /* Init session id node with the stream session id */
593 lttng_ht_node_init_u64(&stream
->node_session_id
, stream
->session_id
);
595 DBG3("Allocated stream %s (key %" PRIu64
", chan_key %" PRIu64
" relayd_id %" PRIu64
", session_id %" PRIu64
,
596 stream
->name
, stream
->key
, channel_key
, stream
->net_seq_idx
, stream
->session_id
);
612 * Add a stream to the global list protected by a mutex.
614 static int add_stream(struct lttng_consumer_stream
*stream
,
618 struct consumer_relayd_sock_pair
*relayd
;
623 DBG3("Adding consumer stream %" PRIu64
, stream
->key
);
625 pthread_mutex_lock(&consumer_data
.lock
);
626 pthread_mutex_lock(&stream
->lock
);
629 /* Steal stream identifier to avoid having streams with the same key */
630 steal_stream_key(stream
->key
, ht
);
632 lttng_ht_add_unique_u64(ht
, &stream
->node
);
634 lttng_ht_add_u64(consumer_data
.stream_per_chan_id_ht
,
635 &stream
->node_channel_id
);
638 * Add stream to the stream_list_ht of the consumer data. No need to steal
639 * the key since the HT does not use it and we allow to add redundant keys
642 lttng_ht_add_u64(consumer_data
.stream_list_ht
, &stream
->node_session_id
);
644 /* Check and cleanup relayd */
645 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
646 if (relayd
!= NULL
) {
647 uatomic_inc(&relayd
->refcount
);
650 /* Update channel refcount once added without error(s). */
651 uatomic_inc(&stream
->chan
->refcount
);
654 * When nb_init_stream_left reaches 0, we don't need to trigger any action
655 * in terms of destroying the associated channel, because the action that
656 * causes the count to become 0 also causes a stream to be added. The
657 * channel deletion will thus be triggered by the following removal of this
660 if (uatomic_read(&stream
->chan
->nb_init_stream_left
) > 0) {
661 uatomic_dec(&stream
->chan
->nb_init_stream_left
);
664 /* Update consumer data once the node is inserted. */
665 consumer_data
.stream_count
++;
666 consumer_data
.need_update
= 1;
669 pthread_mutex_unlock(&stream
->lock
);
670 pthread_mutex_unlock(&consumer_data
.lock
);
676 * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
677 * be acquired before calling this.
679 static int add_relayd(struct consumer_relayd_sock_pair
*relayd
)
682 struct lttng_ht_node_u64
*node
;
683 struct lttng_ht_iter iter
;
687 lttng_ht_lookup(consumer_data
.relayd_ht
,
688 &relayd
->net_seq_idx
, &iter
);
689 node
= lttng_ht_iter_get_node_u64(&iter
);
693 lttng_ht_add_unique_u64(consumer_data
.relayd_ht
, &relayd
->node
);
700 * Allocate and return a consumer relayd socket.
702 struct consumer_relayd_sock_pair
*consumer_allocate_relayd_sock_pair(
705 struct consumer_relayd_sock_pair
*obj
= NULL
;
707 /* Negative net sequence index is a failure */
708 if (net_seq_idx
< 0) {
712 obj
= zmalloc(sizeof(struct consumer_relayd_sock_pair
));
714 PERROR("zmalloc relayd sock");
718 obj
->net_seq_idx
= net_seq_idx
;
720 obj
->destroy_flag
= 0;
721 lttng_ht_node_init_u64(&obj
->node
, obj
->net_seq_idx
);
722 pthread_mutex_init(&obj
->ctrl_sock_mutex
, NULL
);
729 * Find a relayd socket pair in the global consumer data.
731 * Return the object if found else NULL.
732 * RCU read-side lock must be held across this call and while using the
735 struct consumer_relayd_sock_pair
*consumer_find_relayd(uint64_t key
)
737 struct lttng_ht_iter iter
;
738 struct lttng_ht_node_u64
*node
;
739 struct consumer_relayd_sock_pair
*relayd
= NULL
;
741 /* Negative keys are lookup failures */
742 if (key
== (uint64_t) -1ULL) {
746 lttng_ht_lookup(consumer_data
.relayd_ht
, &key
,
748 node
= lttng_ht_iter_get_node_u64(&iter
);
750 relayd
= caa_container_of(node
, struct consumer_relayd_sock_pair
, node
);
758 * Handle stream for relayd transmission if the stream applies for network
759 * streaming where the net sequence index is set.
761 * Return destination file descriptor or negative value on error.
763 static int write_relayd_stream_header(struct lttng_consumer_stream
*stream
,
764 size_t data_size
, unsigned long padding
,
765 struct consumer_relayd_sock_pair
*relayd
)
768 struct lttcomm_relayd_data_hdr data_hdr
;
774 /* Reset data header */
775 memset(&data_hdr
, 0, sizeof(data_hdr
));
777 if (stream
->metadata_flag
) {
778 /* Caller MUST acquire the relayd control socket lock */
779 ret
= relayd_send_metadata(&relayd
->control_sock
, data_size
);
784 /* Metadata are always sent on the control socket. */
785 outfd
= relayd
->control_sock
.fd
;
787 /* Set header with stream information */
788 data_hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
789 data_hdr
.data_size
= htobe32(data_size
);
790 data_hdr
.padding_size
= htobe32(padding
);
792 * Note that net_seq_num below is assigned with the *current* value of
793 * next_net_seq_num and only after that the next_net_seq_num will be
794 * increment. This is why when issuing a command on the relayd using
795 * this next value, 1 should always be substracted in order to compare
796 * the last seen sequence number on the relayd side to the last sent.
798 data_hdr
.net_seq_num
= htobe64(stream
->next_net_seq_num
);
799 /* Other fields are zeroed previously */
801 ret
= relayd_send_data_hdr(&relayd
->data_sock
, &data_hdr
,
807 ++stream
->next_net_seq_num
;
809 /* Set to go on data socket */
810 outfd
= relayd
->data_sock
.fd
;
818 * Allocate and return a new lttng_consumer_channel object using the given key
819 * to initialize the hash table node.
821 * On error, return NULL.
823 struct lttng_consumer_channel
*consumer_allocate_channel(uint64_t key
,
825 const char *pathname
,
830 enum lttng_event_output output
)
832 struct lttng_consumer_channel
*channel
;
834 channel
= zmalloc(sizeof(*channel
));
835 if (channel
== NULL
) {
836 PERROR("malloc struct lttng_consumer_channel");
841 channel
->refcount
= 0;
842 channel
->session_id
= session_id
;
845 channel
->relayd_id
= relayd_id
;
846 channel
->output
= output
;
848 strncpy(channel
->pathname
, pathname
, sizeof(channel
->pathname
));
849 channel
->pathname
[sizeof(channel
->pathname
) - 1] = '\0';
851 strncpy(channel
->name
, name
, sizeof(channel
->name
));
852 channel
->name
[sizeof(channel
->name
) - 1] = '\0';
854 lttng_ht_node_init_u64(&channel
->node
, channel
->key
);
856 channel
->wait_fd
= -1;
858 CDS_INIT_LIST_HEAD(&channel
->streams
.head
);
860 DBG("Allocated channel (key %" PRIu64
")", channel
->key
)
867 * Add a channel to the global list protected by a mutex.
869 int consumer_add_channel(struct lttng_consumer_channel
*channel
,
870 struct lttng_consumer_local_data
*ctx
)
873 struct lttng_ht_node_u64
*node
;
874 struct lttng_ht_iter iter
;
876 pthread_mutex_lock(&consumer_data
.lock
);
879 lttng_ht_lookup(consumer_data
.channel_ht
, &channel
->key
, &iter
);
880 node
= lttng_ht_iter_get_node_u64(&iter
);
882 /* Channel already exist. Ignore the insertion */
883 ERR("Consumer add channel key %" PRIu64
" already exists!",
889 lttng_ht_add_unique_u64(consumer_data
.channel_ht
, &channel
->node
);
893 pthread_mutex_unlock(&consumer_data
.lock
);
895 if (!ret
&& channel
->wait_fd
!= -1 &&
896 channel
->metadata_stream
== NULL
) {
897 notify_channel_pipe(ctx
, channel
, CONSUMER_CHANNEL_ADD
);
903 * Allocate the pollfd structure and the local view of the out fds to avoid
904 * doing a lookup in the linked list and concurrency issues when writing is
905 * needed. Called with consumer_data.lock held.
907 * Returns the number of fds in the structures.
909 static int update_poll_array(struct lttng_consumer_local_data
*ctx
,
910 struct pollfd
**pollfd
, struct lttng_consumer_stream
**local_stream
,
914 struct lttng_ht_iter iter
;
915 struct lttng_consumer_stream
*stream
;
920 assert(local_stream
);
922 DBG("Updating poll fd array");
924 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
926 * Only active streams with an active end point can be added to the
927 * poll set and local stream storage of the thread.
929 * There is a potential race here for endpoint_status to be updated
930 * just after the check. However, this is OK since the stream(s) will
931 * be deleted once the thread is notified that the end point state has
932 * changed where this function will be called back again.
934 if (stream
->state
!= LTTNG_CONSUMER_ACTIVE_STREAM
||
935 stream
->endpoint_status
== CONSUMER_ENDPOINT_INACTIVE
) {
939 * This clobbers way too much the debug output. Uncomment that if you
940 * need it for debugging purposes.
942 * DBG("Active FD %d", stream->wait_fd);
944 (*pollfd
)[i
].fd
= stream
->wait_fd
;
945 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
946 local_stream
[i
] = stream
;
952 * Insert the consumer_data_pipe at the end of the array and don't
953 * increment i so nb_fd is the number of real FD.
955 (*pollfd
)[i
].fd
= ctx
->consumer_data_pipe
[0];
956 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
961 * Poll on the should_quit pipe and the command socket return -1 on error and
962 * should exit, 0 if data is available on the command socket
964 int lttng_consumer_poll_socket(struct pollfd
*consumer_sockpoll
)
969 num_rdy
= poll(consumer_sockpoll
, 2, -1);
972 * Restart interrupted system call.
974 if (errno
== EINTR
) {
977 PERROR("Poll error");
980 if (consumer_sockpoll
[0].revents
& (POLLIN
| POLLPRI
)) {
981 DBG("consumer_should_quit wake up");
991 * Set the error socket.
993 void lttng_consumer_set_error_sock(struct lttng_consumer_local_data
*ctx
,
996 ctx
->consumer_error_socket
= sock
;
1000 * Set the command socket path.
1002 void lttng_consumer_set_command_sock_path(
1003 struct lttng_consumer_local_data
*ctx
, char *sock
)
1005 ctx
->consumer_command_sock_path
= sock
;
1009 * Send return code to the session daemon.
1010 * If the socket is not defined, we return 0, it is not a fatal error
1012 int lttng_consumer_send_error(struct lttng_consumer_local_data
*ctx
, int cmd
)
1014 if (ctx
->consumer_error_socket
> 0) {
1015 return lttcomm_send_unix_sock(ctx
->consumer_error_socket
, &cmd
,
1016 sizeof(enum lttcomm_sessiond_command
));
1023 * Close all the tracefiles and stream fds and MUST be called when all
1024 * instances are destroyed i.e. when all threads were joined and are ended.
1026 void lttng_consumer_cleanup(void)
1028 struct lttng_ht_iter iter
;
1029 struct lttng_consumer_channel
*channel
;
1033 cds_lfht_for_each_entry(consumer_data
.channel_ht
->ht
, &iter
.iter
, channel
,
1035 consumer_del_channel(channel
);
1040 lttng_ht_destroy(consumer_data
.channel_ht
);
1042 cleanup_relayd_ht();
1044 lttng_ht_destroy(consumer_data
.stream_per_chan_id_ht
);
1047 * This HT contains streams that are freed by either the metadata thread or
1048 * the data thread so we do *nothing* on the hash table and simply destroy
1051 lttng_ht_destroy(consumer_data
.stream_list_ht
);
1055 * Called from signal handler.
1057 void lttng_consumer_should_exit(struct lttng_consumer_local_data
*ctx
)
1062 ret
= write(ctx
->consumer_should_quit
[1], "4", 1);
1063 } while (ret
< 0 && errno
== EINTR
);
1064 if (ret
< 0 || ret
!= 1) {
1065 PERROR("write consumer quit");
1068 DBG("Consumer flag that it should quit");
1071 void lttng_consumer_sync_trace_file(struct lttng_consumer_stream
*stream
,
1074 int outfd
= stream
->out_fd
;
1077 * This does a blocking write-and-wait on any page that belongs to the
1078 * subbuffer prior to the one we just wrote.
1079 * Don't care about error values, as these are just hints and ways to
1080 * limit the amount of page cache used.
1082 if (orig_offset
< stream
->max_sb_size
) {
1085 lttng_sync_file_range(outfd
, orig_offset
- stream
->max_sb_size
,
1086 stream
->max_sb_size
,
1087 SYNC_FILE_RANGE_WAIT_BEFORE
1088 | SYNC_FILE_RANGE_WRITE
1089 | SYNC_FILE_RANGE_WAIT_AFTER
);
1091 * Give hints to the kernel about how we access the file:
1092 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
1095 * We need to call fadvise again after the file grows because the
1096 * kernel does not seem to apply fadvise to non-existing parts of the
1099 * Call fadvise _after_ having waited for the page writeback to
1100 * complete because the dirty page writeback semantic is not well
1101 * defined. So it can be expected to lead to lower throughput in
1104 posix_fadvise(outfd
, orig_offset
- stream
->max_sb_size
,
1105 stream
->max_sb_size
, POSIX_FADV_DONTNEED
);
1109 * Initialise the necessary environnement :
1110 * - create a new context
1111 * - create the poll_pipe
1112 * - create the should_quit pipe (for signal handler)
1113 * - create the thread pipe (for splice)
1115 * Takes a function pointer as argument, this function is called when data is
1116 * available on a buffer. This function is responsible to do the
1117 * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
1118 * buffer configuration and then kernctl_put_next_subbuf at the end.
1120 * Returns a pointer to the new context or NULL on error.
1122 struct lttng_consumer_local_data
*lttng_consumer_create(
1123 enum lttng_consumer_type type
,
1124 ssize_t (*buffer_ready
)(struct lttng_consumer_stream
*stream
,
1125 struct lttng_consumer_local_data
*ctx
),
1126 int (*recv_channel
)(struct lttng_consumer_channel
*channel
),
1127 int (*recv_stream
)(struct lttng_consumer_stream
*stream
),
1128 int (*update_stream
)(int stream_key
, uint32_t state
))
1131 struct lttng_consumer_local_data
*ctx
;
1133 assert(consumer_data
.type
== LTTNG_CONSUMER_UNKNOWN
||
1134 consumer_data
.type
== type
);
1135 consumer_data
.type
= type
;
1137 ctx
= zmalloc(sizeof(struct lttng_consumer_local_data
));
1139 PERROR("allocating context");
1143 ctx
->consumer_error_socket
= -1;
1144 /* assign the callbacks */
1145 ctx
->on_buffer_ready
= buffer_ready
;
1146 ctx
->on_recv_channel
= recv_channel
;
1147 ctx
->on_recv_stream
= recv_stream
;
1148 ctx
->on_update_stream
= update_stream
;
1150 ret
= pipe(ctx
->consumer_data_pipe
);
1152 PERROR("Error creating poll pipe");
1153 goto error_poll_pipe
;
1156 /* set read end of the pipe to non-blocking */
1157 ret
= fcntl(ctx
->consumer_data_pipe
[0], F_SETFL
, O_NONBLOCK
);
1159 PERROR("fcntl O_NONBLOCK");
1160 goto error_poll_fcntl
;
1163 /* set write end of the pipe to non-blocking */
1164 ret
= fcntl(ctx
->consumer_data_pipe
[1], F_SETFL
, O_NONBLOCK
);
1166 PERROR("fcntl O_NONBLOCK");
1167 goto error_poll_fcntl
;
1170 ret
= pipe(ctx
->consumer_should_quit
);
1172 PERROR("Error creating recv pipe");
1173 goto error_quit_pipe
;
1176 ret
= pipe(ctx
->consumer_thread_pipe
);
1178 PERROR("Error creating thread pipe");
1179 goto error_thread_pipe
;
1182 ret
= pipe(ctx
->consumer_channel_pipe
);
1184 PERROR("Error creating channel pipe");
1185 goto error_channel_pipe
;
1188 ret
= utils_create_pipe(ctx
->consumer_metadata_pipe
);
1190 goto error_metadata_pipe
;
1193 ret
= utils_create_pipe(ctx
->consumer_splice_metadata_pipe
);
1195 goto error_splice_pipe
;
1201 utils_close_pipe(ctx
->consumer_metadata_pipe
);
1202 error_metadata_pipe
:
1203 utils_close_pipe(ctx
->consumer_channel_pipe
);
1205 utils_close_pipe(ctx
->consumer_thread_pipe
);
1207 utils_close_pipe(ctx
->consumer_should_quit
);
1210 utils_close_pipe(ctx
->consumer_data_pipe
);
1218 * Close all fds associated with the instance and free the context.
1220 void lttng_consumer_destroy(struct lttng_consumer_local_data
*ctx
)
1224 DBG("Consumer destroying it. Closing everything.");
1226 ret
= close(ctx
->consumer_error_socket
);
1230 utils_close_pipe(ctx
->consumer_thread_pipe
);
1231 utils_close_pipe(ctx
->consumer_channel_pipe
);
1232 utils_close_pipe(ctx
->consumer_data_pipe
);
1233 utils_close_pipe(ctx
->consumer_should_quit
);
1234 utils_close_pipe(ctx
->consumer_splice_metadata_pipe
);
1236 unlink(ctx
->consumer_command_sock_path
);
1241 * Write the metadata stream id on the specified file descriptor.
1243 static int write_relayd_metadata_id(int fd
,
1244 struct lttng_consumer_stream
*stream
,
1245 struct consumer_relayd_sock_pair
*relayd
, unsigned long padding
)
1248 struct lttcomm_relayd_metadata_payload hdr
;
1250 hdr
.stream_id
= htobe64(stream
->relayd_stream_id
);
1251 hdr
.padding_size
= htobe32(padding
);
1253 ret
= write(fd
, (void *) &hdr
, sizeof(hdr
));
1254 } while (ret
< 0 && errno
== EINTR
);
1255 if (ret
< 0 || ret
!= sizeof(hdr
)) {
1257 * This error means that the fd's end is closed so ignore the perror
1258 * not to clubber the error output since this can happen in a normal
1261 if (errno
!= EPIPE
) {
1262 PERROR("write metadata stream id");
1264 DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno
);
1266 * Set ret to a negative value because if ret != sizeof(hdr), we don't
1267 * handle writting the missing part so report that as an error and
1268 * don't lie to the caller.
1273 DBG("Metadata stream id %" PRIu64
" with padding %lu written before data",
1274 stream
->relayd_stream_id
, padding
);
1281 * Mmap the ring buffer, read it and write the data to the tracefile. This is a
1282 * core function for writing trace buffers to either the local filesystem or
1285 * It must be called with the stream lock held.
1287 * Careful review MUST be put if any changes occur!
1289 * Returns the number of bytes written
1291 ssize_t
lttng_consumer_on_read_subbuffer_mmap(
1292 struct lttng_consumer_local_data
*ctx
,
1293 struct lttng_consumer_stream
*stream
, unsigned long len
,
1294 unsigned long padding
)
1296 unsigned long mmap_offset
;
1298 ssize_t ret
= 0, written
= 0;
1299 off_t orig_offset
= stream
->out_fd_offset
;
1300 /* Default is on the disk */
1301 int outfd
= stream
->out_fd
;
1302 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1303 unsigned int relayd_hang_up
= 0;
1305 /* RCU lock for the relayd pointer */
1308 /* Flag that the current stream if set for network streaming. */
1309 if (stream
->net_seq_idx
!= -1) {
1310 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1311 if (relayd
== NULL
) {
1316 /* get the offset inside the fd to mmap */
1317 switch (consumer_data
.type
) {
1318 case LTTNG_CONSUMER_KERNEL
:
1319 mmap_base
= stream
->mmap_base
;
1320 ret
= kernctl_get_mmap_read_offset(stream
->wait_fd
, &mmap_offset
);
1322 case LTTNG_CONSUMER32_UST
:
1323 case LTTNG_CONSUMER64_UST
:
1324 mmap_base
= lttng_ustctl_get_mmap_base(stream
);
1326 ERR("read mmap get mmap base for stream %s", stream
->name
);
1330 ret
= lttng_ustctl_get_mmap_read_offset(stream
, &mmap_offset
);
1333 ERR("Unknown consumer_data type");
1338 PERROR("tracer ctl get_mmap_read_offset");
1343 /* Handle stream on the relayd if the output is on the network */
1345 unsigned long netlen
= len
;
1348 * Lock the control socket for the complete duration of the function
1349 * since from this point on we will use the socket.
1351 if (stream
->metadata_flag
) {
1352 /* Metadata requires the control socket. */
1353 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1354 netlen
+= sizeof(struct lttcomm_relayd_metadata_payload
);
1357 ret
= write_relayd_stream_header(stream
, netlen
, padding
, relayd
);
1359 /* Use the returned socket. */
1362 /* Write metadata stream id before payload */
1363 if (stream
->metadata_flag
) {
1364 ret
= write_relayd_metadata_id(outfd
, stream
, relayd
, padding
);
1367 /* Socket operation failed. We consider the relayd dead */
1368 if (ret
== -EPIPE
|| ret
== -EINVAL
) {
1376 /* Socket operation failed. We consider the relayd dead */
1377 if (ret
== -EPIPE
|| ret
== -EINVAL
) {
1381 /* Else, use the default set before which is the filesystem. */
1384 /* No streaming, we have to set the len with the full padding */
1390 ret
= write(outfd
, mmap_base
+ mmap_offset
, len
);
1391 } while (ret
< 0 && errno
== EINTR
);
1392 DBG("Consumer mmap write() ret %zd (len %lu)", ret
, len
);
1395 * This is possible if the fd is closed on the other side (outfd)
1396 * or any write problem. It can be verbose a bit for a normal
1397 * execution if for instance the relayd is stopped abruptly. This
1398 * can happen so set this to a DBG statement.
1400 DBG("Error in file write mmap");
1404 /* Socket operation failed. We consider the relayd dead */
1405 if (errno
== EPIPE
|| errno
== EINVAL
) {
1410 } else if (ret
> len
) {
1411 PERROR("Error in file write (ret %zd > len %lu)", ret
, len
);
1419 /* This call is useless on a socket so better save a syscall. */
1421 /* This won't block, but will start writeout asynchronously */
1422 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret
,
1423 SYNC_FILE_RANGE_WRITE
);
1424 stream
->out_fd_offset
+= ret
;
1428 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1432 * This is a special case that the relayd has closed its socket. Let's
1433 * cleanup the relayd object and all associated streams.
1435 if (relayd
&& relayd_hang_up
) {
1436 cleanup_relayd(relayd
, ctx
);
1440 /* Unlock only if ctrl socket used */
1441 if (relayd
&& stream
->metadata_flag
) {
1442 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1450 * Splice the data from the ring buffer to the tracefile.
1452 * It must be called with the stream lock held.
1454 * Returns the number of bytes spliced.
1456 ssize_t
lttng_consumer_on_read_subbuffer_splice(
1457 struct lttng_consumer_local_data
*ctx
,
1458 struct lttng_consumer_stream
*stream
, unsigned long len
,
1459 unsigned long padding
)
1461 ssize_t ret
= 0, written
= 0, ret_splice
= 0;
1463 off_t orig_offset
= stream
->out_fd_offset
;
1464 int fd
= stream
->wait_fd
;
1465 /* Default is on the disk */
1466 int outfd
= stream
->out_fd
;
1467 struct consumer_relayd_sock_pair
*relayd
= NULL
;
1469 unsigned int relayd_hang_up
= 0;
1471 switch (consumer_data
.type
) {
1472 case LTTNG_CONSUMER_KERNEL
:
1474 case LTTNG_CONSUMER32_UST
:
1475 case LTTNG_CONSUMER64_UST
:
1476 /* Not supported for user space tracing */
1479 ERR("Unknown consumer_data type");
1483 /* RCU lock for the relayd pointer */
1486 /* Flag that the current stream if set for network streaming. */
1487 if (stream
->net_seq_idx
!= -1) {
1488 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1489 if (relayd
== NULL
) {
1495 * Choose right pipe for splice. Metadata and trace data are handled by
1496 * different threads hence the use of two pipes in order not to race or
1497 * corrupt the written data.
1499 if (stream
->metadata_flag
) {
1500 splice_pipe
= ctx
->consumer_splice_metadata_pipe
;
1502 splice_pipe
= ctx
->consumer_thread_pipe
;
1505 /* Write metadata stream id before payload */
1507 int total_len
= len
;
1509 if (stream
->metadata_flag
) {
1511 * Lock the control socket for the complete duration of the function
1512 * since from this point on we will use the socket.
1514 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1516 ret
= write_relayd_metadata_id(splice_pipe
[1], stream
, relayd
,
1520 /* Socket operation failed. We consider the relayd dead */
1521 if (ret
== -EBADF
) {
1522 WARN("Remote relayd disconnected. Stopping");
1529 total_len
+= sizeof(struct lttcomm_relayd_metadata_payload
);
1532 ret
= write_relayd_stream_header(stream
, total_len
, padding
, relayd
);
1534 /* Use the returned socket. */
1537 /* Socket operation failed. We consider the relayd dead */
1538 if (ret
== -EBADF
) {
1539 WARN("Remote relayd disconnected. Stopping");
1546 /* No streaming, we have to set the len with the full padding */
1551 DBG("splice chan to pipe offset %lu of len %lu (fd : %d, pipe: %d)",
1552 (unsigned long)offset
, len
, fd
, splice_pipe
[1]);
1553 ret_splice
= splice(fd
, &offset
, splice_pipe
[1], NULL
, len
,
1554 SPLICE_F_MOVE
| SPLICE_F_MORE
);
1555 DBG("splice chan to pipe, ret %zd", ret_splice
);
1556 if (ret_splice
< 0) {
1557 PERROR("Error in relay splice");
1559 written
= ret_splice
;
1565 /* Handle stream on the relayd if the output is on the network */
1567 if (stream
->metadata_flag
) {
1568 size_t metadata_payload_size
=
1569 sizeof(struct lttcomm_relayd_metadata_payload
);
1571 /* Update counter to fit the spliced data */
1572 ret_splice
+= metadata_payload_size
;
1573 len
+= metadata_payload_size
;
1575 * We do this so the return value can match the len passed as
1576 * argument to this function.
1578 written
-= metadata_payload_size
;
1582 /* Splice data out */
1583 ret_splice
= splice(splice_pipe
[0], NULL
, outfd
, NULL
,
1584 ret_splice
, SPLICE_F_MOVE
| SPLICE_F_MORE
);
1585 DBG("Consumer splice pipe to file, ret %zd", ret_splice
);
1586 if (ret_splice
< 0) {
1587 PERROR("Error in file splice");
1589 written
= ret_splice
;
1591 /* Socket operation failed. We consider the relayd dead */
1592 if (errno
== EBADF
|| errno
== EPIPE
) {
1593 WARN("Remote relayd disconnected. Stopping");
1599 } else if (ret_splice
> len
) {
1601 PERROR("Wrote more data than requested %zd (len: %lu)",
1603 written
+= ret_splice
;
1609 /* This call is useless on a socket so better save a syscall. */
1611 /* This won't block, but will start writeout asynchronously */
1612 lttng_sync_file_range(outfd
, stream
->out_fd_offset
, ret_splice
,
1613 SYNC_FILE_RANGE_WRITE
);
1614 stream
->out_fd_offset
+= ret_splice
;
1616 written
+= ret_splice
;
1618 lttng_consumer_sync_trace_file(stream
, orig_offset
);
1626 * This is a special case that the relayd has closed its socket. Let's
1627 * cleanup the relayd object and all associated streams.
1629 if (relayd
&& relayd_hang_up
) {
1630 cleanup_relayd(relayd
, ctx
);
1631 /* Skip splice error so the consumer does not fail */
1636 /* send the appropriate error description to sessiond */
1639 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_EINVAL
);
1642 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ENOMEM
);
1645 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_SPLICE_ESPIPE
);
1650 if (relayd
&& stream
->metadata_flag
) {
1651 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1659 * Take a snapshot for a specific fd
1661 * Returns 0 on success, < 0 on error
1663 int lttng_consumer_take_snapshot(struct lttng_consumer_stream
*stream
)
1665 switch (consumer_data
.type
) {
1666 case LTTNG_CONSUMER_KERNEL
:
1667 return lttng_kconsumer_take_snapshot(stream
);
1668 case LTTNG_CONSUMER32_UST
:
1669 case LTTNG_CONSUMER64_UST
:
1670 return lttng_ustconsumer_take_snapshot(stream
);
1672 ERR("Unknown consumer_data type");
1679 * Get the produced position
1681 * Returns 0 on success, < 0 on error
1683 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream
*stream
,
1686 switch (consumer_data
.type
) {
1687 case LTTNG_CONSUMER_KERNEL
:
1688 return lttng_kconsumer_get_produced_snapshot(stream
, pos
);
1689 case LTTNG_CONSUMER32_UST
:
1690 case LTTNG_CONSUMER64_UST
:
1691 return lttng_ustconsumer_get_produced_snapshot(stream
, pos
);
1693 ERR("Unknown consumer_data type");
1699 int lttng_consumer_recv_cmd(struct lttng_consumer_local_data
*ctx
,
1700 int sock
, struct pollfd
*consumer_sockpoll
)
1702 switch (consumer_data
.type
) {
1703 case LTTNG_CONSUMER_KERNEL
:
1704 return lttng_kconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1705 case LTTNG_CONSUMER32_UST
:
1706 case LTTNG_CONSUMER64_UST
:
1707 return lttng_ustconsumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
1709 ERR("Unknown consumer_data type");
1716 * Iterate over all streams of the hashtable and free them properly.
1718 * WARNING: *MUST* be used with data stream only.
1720 static void destroy_data_stream_ht(struct lttng_ht
*ht
)
1722 struct lttng_ht_iter iter
;
1723 struct lttng_consumer_stream
*stream
;
1730 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1732 * Ignore return value since we are currently cleaning up so any error
1735 (void) consumer_del_stream(stream
, ht
);
1739 lttng_ht_destroy(ht
);
1743 * Iterate over all streams of the hashtable and free them properly.
1745 * XXX: Should not be only for metadata stream or else use an other name.
1747 static void destroy_stream_ht(struct lttng_ht
*ht
)
1749 struct lttng_ht_iter iter
;
1750 struct lttng_consumer_stream
*stream
;
1757 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1759 * Ignore return value since we are currently cleaning up so any error
1762 (void) consumer_del_metadata_stream(stream
, ht
);
1766 lttng_ht_destroy(ht
);
1769 void lttng_consumer_close_metadata(void)
1771 switch (consumer_data
.type
) {
1772 case LTTNG_CONSUMER_KERNEL
:
1774 * The Kernel consumer has a different metadata scheme so we don't
1775 * close anything because the stream will be closed by the session
1779 case LTTNG_CONSUMER32_UST
:
1780 case LTTNG_CONSUMER64_UST
:
1782 * Close all metadata streams. The metadata hash table is passed and
1783 * this call iterates over it by closing all wakeup fd. This is safe
1784 * because at this point we are sure that the metadata producer is
1785 * either dead or blocked.
1787 lttng_ustconsumer_close_metadata(metadata_ht
);
1790 ERR("Unknown consumer_data type");
1796 * Clean up a metadata stream and free its memory.
1798 void consumer_del_metadata_stream(struct lttng_consumer_stream
*stream
,
1799 struct lttng_ht
*ht
)
1802 struct lttng_ht_iter iter
;
1803 struct lttng_consumer_channel
*free_chan
= NULL
;
1804 struct consumer_relayd_sock_pair
*relayd
;
1808 * This call should NEVER receive regular stream. It must always be
1809 * metadata stream and this is crucial for data structure synchronization.
1811 assert(stream
->metadata_flag
);
1813 DBG3("Consumer delete metadata stream %d", stream
->wait_fd
);
1816 /* Means the stream was allocated but not successfully added */
1817 goto free_stream_rcu
;
1820 pthread_mutex_lock(&consumer_data
.lock
);
1821 pthread_mutex_lock(&stream
->lock
);
1823 switch (consumer_data
.type
) {
1824 case LTTNG_CONSUMER_KERNEL
:
1825 if (stream
->mmap_base
!= NULL
) {
1826 ret
= munmap(stream
->mmap_base
, stream
->mmap_len
);
1828 PERROR("munmap metadata stream");
1832 case LTTNG_CONSUMER32_UST
:
1833 case LTTNG_CONSUMER64_UST
:
1834 lttng_ustconsumer_del_stream(stream
);
1837 ERR("Unknown consumer_data type");
1843 iter
.iter
.node
= &stream
->node
.node
;
1844 ret
= lttng_ht_del(ht
, &iter
);
1847 iter
.iter
.node
= &stream
->node_channel_id
.node
;
1848 ret
= lttng_ht_del(consumer_data
.stream_per_chan_id_ht
, &iter
);
1851 iter
.iter
.node
= &stream
->node_session_id
.node
;
1852 ret
= lttng_ht_del(consumer_data
.stream_list_ht
, &iter
);
1856 if (stream
->out_fd
>= 0) {
1857 ret
= close(stream
->out_fd
);
1863 /* Check and cleanup relayd */
1865 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1866 if (relayd
!= NULL
) {
1867 uatomic_dec(&relayd
->refcount
);
1868 assert(uatomic_read(&relayd
->refcount
) >= 0);
1870 /* Closing streams requires to lock the control socket. */
1871 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
1872 ret
= relayd_send_close_stream(&relayd
->control_sock
,
1873 stream
->relayd_stream_id
, stream
->next_net_seq_num
- 1);
1874 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
1876 DBG("Unable to close stream on the relayd. Continuing");
1878 * Continue here. There is nothing we can do for the relayd.
1879 * Chances are that the relayd has closed the socket so we just
1880 * continue cleaning up.
1884 /* Both conditions are met, we destroy the relayd. */
1885 if (uatomic_read(&relayd
->refcount
) == 0 &&
1886 uatomic_read(&relayd
->destroy_flag
)) {
1887 destroy_relayd(relayd
);
1892 /* Atomically decrement channel refcount since other threads can use it. */
1893 uatomic_dec(&stream
->chan
->refcount
);
1894 if (!uatomic_read(&stream
->chan
->refcount
)
1895 && !uatomic_read(&stream
->chan
->nb_init_stream_left
)) {
1896 /* Go for channel deletion! */
1897 free_chan
= stream
->chan
;
1901 pthread_mutex_unlock(&stream
->lock
);
1902 pthread_mutex_unlock(&consumer_data
.lock
);
1905 consumer_del_channel(free_chan
);
1909 call_rcu(&stream
->node
.head
, free_stream_rcu
);
1913 * Action done with the metadata stream when adding it to the consumer internal
1914 * data structures to handle it.
1916 static int add_metadata_stream(struct lttng_consumer_stream
*stream
,
1917 struct lttng_ht
*ht
)
1920 struct consumer_relayd_sock_pair
*relayd
;
1921 struct lttng_ht_iter iter
;
1922 struct lttng_ht_node_u64
*node
;
1927 DBG3("Adding metadata stream %" PRIu64
" to hash table", stream
->key
);
1929 pthread_mutex_lock(&consumer_data
.lock
);
1930 pthread_mutex_lock(&stream
->lock
);
1933 * From here, refcounts are updated so be _careful_ when returning an error
1940 * Lookup the stream just to make sure it does not exist in our internal
1941 * state. This should NEVER happen.
1943 lttng_ht_lookup(ht
, &stream
->key
, &iter
);
1944 node
= lttng_ht_iter_get_node_u64(&iter
);
1947 /* Find relayd and, if one is found, increment refcount. */
1948 relayd
= consumer_find_relayd(stream
->net_seq_idx
);
1949 if (relayd
!= NULL
) {
1950 uatomic_inc(&relayd
->refcount
);
1953 /* Update channel refcount once added without error(s). */
1954 uatomic_inc(&stream
->chan
->refcount
);
1957 * When nb_init_stream_left reaches 0, we don't need to trigger any action
1958 * in terms of destroying the associated channel, because the action that
1959 * causes the count to become 0 also causes a stream to be added. The
1960 * channel deletion will thus be triggered by the following removal of this
1963 if (uatomic_read(&stream
->chan
->nb_init_stream_left
) > 0) {
1964 uatomic_dec(&stream
->chan
->nb_init_stream_left
);
1967 lttng_ht_add_unique_u64(ht
, &stream
->node
);
1969 lttng_ht_add_unique_u64(consumer_data
.stream_per_chan_id_ht
,
1970 &stream
->node_channel_id
);
1973 * Add stream to the stream_list_ht of the consumer data. No need to steal
1974 * the key since the HT does not use it and we allow to add redundant keys
1977 lttng_ht_add_u64(consumer_data
.stream_list_ht
, &stream
->node_session_id
);
1981 pthread_mutex_unlock(&stream
->lock
);
1982 pthread_mutex_unlock(&consumer_data
.lock
);
1987 * Delete data stream that are flagged for deletion (endpoint_status).
1989 static void validate_endpoint_status_data_stream(void)
1991 struct lttng_ht_iter iter
;
1992 struct lttng_consumer_stream
*stream
;
1994 DBG("Consumer delete flagged data stream");
1997 cds_lfht_for_each_entry(data_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
1998 /* Validate delete flag of the stream */
1999 if (stream
->endpoint_status
== CONSUMER_ENDPOINT_ACTIVE
) {
2002 /* Delete it right now */
2003 consumer_del_stream(stream
, data_ht
);
2009 * Delete metadata stream that are flagged for deletion (endpoint_status).
2011 static void validate_endpoint_status_metadata_stream(
2012 struct lttng_poll_event
*pollset
)
2014 struct lttng_ht_iter iter
;
2015 struct lttng_consumer_stream
*stream
;
2017 DBG("Consumer delete flagged metadata stream");
2022 cds_lfht_for_each_entry(metadata_ht
->ht
, &iter
.iter
, stream
, node
.node
) {
2023 /* Validate delete flag of the stream */
2024 if (stream
->endpoint_status
== CONSUMER_ENDPOINT_ACTIVE
) {
2028 * Remove from pollset so the metadata thread can continue without
2029 * blocking on a deleted stream.
2031 lttng_poll_del(pollset
, stream
->wait_fd
);
2033 /* Delete it right now */
2034 consumer_del_metadata_stream(stream
, metadata_ht
);
2040 * Thread polls on metadata file descriptor and write them on disk or on the
2043 void *consumer_thread_metadata_poll(void *data
)
2046 uint32_t revents
, nb_fd
;
2047 struct lttng_consumer_stream
*stream
= NULL
;
2048 struct lttng_ht_iter iter
;
2049 struct lttng_ht_node_u64
*node
;
2050 struct lttng_poll_event events
;
2051 struct lttng_consumer_local_data
*ctx
= data
;
2054 rcu_register_thread();
2056 metadata_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2058 /* ENOMEM at this point. Better to bail out. */
2062 DBG("Thread metadata poll started");
2064 /* Size is set to 1 for the consumer_metadata pipe */
2065 ret
= lttng_poll_create(&events
, 2, LTTNG_CLOEXEC
);
2067 ERR("Poll set creation failed");
2071 ret
= lttng_poll_add(&events
, ctx
->consumer_metadata_pipe
[0], LPOLLIN
);
2077 DBG("Metadata main loop started");
2080 /* Only the metadata pipe is set */
2081 if (LTTNG_POLL_GETNB(&events
) == 0 && consumer_quit
== 1) {
2086 DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events
));
2087 ret
= lttng_poll_wait(&events
, -1);
2088 DBG("Metadata event catched in thread");
2090 if (errno
== EINTR
) {
2091 ERR("Poll EINTR catched");
2099 /* From here, the event is a metadata wait fd */
2100 for (i
= 0; i
< nb_fd
; i
++) {
2101 revents
= LTTNG_POLL_GETEV(&events
, i
);
2102 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
2104 /* Just don't waste time if no returned events for the fd */
2109 if (pollfd
== ctx
->consumer_metadata_pipe
[0]) {
2110 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2111 DBG("Metadata thread pipe hung up");
2113 * Remove the pipe from the poll set and continue the loop
2114 * since their might be data to consume.
2116 lttng_poll_del(&events
, ctx
->consumer_metadata_pipe
[0]);
2117 ret
= close(ctx
->consumer_metadata_pipe
[0]);
2119 PERROR("close metadata pipe");
2122 } else if (revents
& LPOLLIN
) {
2124 /* Get the stream pointer received */
2125 ret
= read(pollfd
, &stream
, sizeof(stream
));
2126 } while (ret
< 0 && errno
== EINTR
);
2128 ret
< sizeof(struct lttng_consumer_stream
*)) {
2129 PERROR("read metadata stream");
2131 * Let's continue here and hope we can still work
2132 * without stopping the consumer. XXX: Should we?
2137 /* A NULL stream means that the state has changed. */
2138 if (stream
== NULL
) {
2139 /* Check for deleted streams. */
2140 validate_endpoint_status_metadata_stream(&events
);
2144 DBG("Adding metadata stream %d to poll set",
2147 ret
= add_metadata_stream(stream
, metadata_ht
);
2149 ERR("Unable to add metadata stream");
2150 /* Stream was not setup properly. Continuing. */
2151 consumer_del_metadata_stream(stream
, NULL
);
2155 /* Add metadata stream to the global poll events list */
2156 lttng_poll_add(&events
, stream
->wait_fd
,
2157 LPOLLIN
| LPOLLPRI
);
2160 /* Handle other stream */
2166 uint64_t tmp_id
= (uint64_t) pollfd
;
2168 lttng_ht_lookup(metadata_ht
, &tmp_id
, &iter
);
2170 node
= lttng_ht_iter_get_node_u64(&iter
);
2173 stream
= caa_container_of(node
, struct lttng_consumer_stream
,
2176 /* Check for error event */
2177 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2178 DBG("Metadata fd %d is hup|err.", pollfd
);
2179 if (!stream
->hangup_flush_done
2180 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
2181 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
2182 DBG("Attempting to flush and consume the UST buffers");
2183 lttng_ustconsumer_on_stream_hangup(stream
);
2185 /* We just flushed the stream now read it. */
2187 len
= ctx
->on_buffer_ready(stream
, ctx
);
2189 * We don't check the return value here since if we get
2190 * a negative len, it means an error occured thus we
2191 * simply remove it from the poll set and free the
2197 lttng_poll_del(&events
, stream
->wait_fd
);
2199 * This call update the channel states, closes file descriptors
2200 * and securely free the stream.
2202 consumer_del_metadata_stream(stream
, metadata_ht
);
2203 } else if (revents
& (LPOLLIN
| LPOLLPRI
)) {
2204 /* Get the data out of the metadata file descriptor */
2205 DBG("Metadata available on fd %d", pollfd
);
2206 assert(stream
->wait_fd
== pollfd
);
2208 len
= ctx
->on_buffer_ready(stream
, ctx
);
2209 /* It's ok to have an unavailable sub-buffer */
2210 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2211 /* Clean up stream from consumer and free it. */
2212 lttng_poll_del(&events
, stream
->wait_fd
);
2213 consumer_del_metadata_stream(stream
, metadata_ht
);
2214 } else if (len
> 0) {
2215 stream
->data_read
= 1;
2219 /* Release RCU lock for the stream looked up */
2226 DBG("Metadata poll thread exiting");
2228 lttng_poll_clean(&events
);
2230 destroy_stream_ht(metadata_ht
);
2232 rcu_unregister_thread();
2237 * This thread polls the fds in the set to consume the data and write
2238 * it to tracefile if necessary.
2240 void *consumer_thread_data_poll(void *data
)
2242 int num_rdy
, num_hup
, high_prio
, ret
, i
;
2243 struct pollfd
*pollfd
= NULL
;
2244 /* local view of the streams */
2245 struct lttng_consumer_stream
**local_stream
= NULL
, *new_stream
= NULL
;
2246 /* local view of consumer_data.fds_count */
2248 struct lttng_consumer_local_data
*ctx
= data
;
2251 rcu_register_thread();
2253 data_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2254 if (data_ht
== NULL
) {
2255 /* ENOMEM at this point. Better to bail out. */
2259 local_stream
= zmalloc(sizeof(struct lttng_consumer_stream
));
2266 * the fds set has been updated, we need to update our
2267 * local array as well
2269 pthread_mutex_lock(&consumer_data
.lock
);
2270 if (consumer_data
.need_update
) {
2275 local_stream
= NULL
;
2277 /* allocate for all fds + 1 for the consumer_data_pipe */
2278 pollfd
= zmalloc((consumer_data
.stream_count
+ 1) * sizeof(struct pollfd
));
2279 if (pollfd
== NULL
) {
2280 PERROR("pollfd malloc");
2281 pthread_mutex_unlock(&consumer_data
.lock
);
2285 /* allocate for all fds + 1 for the consumer_data_pipe */
2286 local_stream
= zmalloc((consumer_data
.stream_count
+ 1) *
2287 sizeof(struct lttng_consumer_stream
));
2288 if (local_stream
== NULL
) {
2289 PERROR("local_stream malloc");
2290 pthread_mutex_unlock(&consumer_data
.lock
);
2293 ret
= update_poll_array(ctx
, &pollfd
, local_stream
,
2296 ERR("Error in allocating pollfd or local_outfds");
2297 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
2298 pthread_mutex_unlock(&consumer_data
.lock
);
2302 consumer_data
.need_update
= 0;
2304 pthread_mutex_unlock(&consumer_data
.lock
);
2306 /* No FDs and consumer_quit, consumer_cleanup the thread */
2307 if (nb_fd
== 0 && consumer_quit
== 1) {
2310 /* poll on the array of fds */
2312 DBG("polling on %d fd", nb_fd
+ 1);
2313 num_rdy
= poll(pollfd
, nb_fd
+ 1, -1);
2314 DBG("poll num_rdy : %d", num_rdy
);
2315 if (num_rdy
== -1) {
2317 * Restart interrupted system call.
2319 if (errno
== EINTR
) {
2322 PERROR("Poll error");
2323 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_POLL_ERROR
);
2325 } else if (num_rdy
== 0) {
2326 DBG("Polling thread timed out");
2331 * If the consumer_data_pipe triggered poll go directly to the
2332 * beginning of the loop to update the array. We want to prioritize
2333 * array update over low-priority reads.
2335 if (pollfd
[nb_fd
].revents
& (POLLIN
| POLLPRI
)) {
2336 ssize_t pipe_readlen
;
2338 DBG("consumer_data_pipe wake up");
2339 /* Consume 1 byte of pipe data */
2341 pipe_readlen
= read(ctx
->consumer_data_pipe
[0], &new_stream
,
2342 sizeof(new_stream
));
2343 } while (pipe_readlen
== -1 && errno
== EINTR
);
2344 if (pipe_readlen
< 0) {
2345 PERROR("read consumer data pipe");
2346 /* Continue so we can at least handle the current stream(s). */
2351 * If the stream is NULL, just ignore it. It's also possible that
2352 * the sessiond poll thread changed the consumer_quit state and is
2353 * waking us up to test it.
2355 if (new_stream
== NULL
) {
2356 validate_endpoint_status_data_stream();
2360 ret
= add_stream(new_stream
, data_ht
);
2362 ERR("Consumer add stream %" PRIu64
" failed. Continuing",
2365 * At this point, if the add_stream fails, it is not in the
2366 * hash table thus passing the NULL value here.
2368 consumer_del_stream(new_stream
, NULL
);
2371 /* Continue to update the local streams and handle prio ones */
2375 /* Take care of high priority channels first. */
2376 for (i
= 0; i
< nb_fd
; i
++) {
2377 if (local_stream
[i
] == NULL
) {
2380 if (pollfd
[i
].revents
& POLLPRI
) {
2381 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
2383 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
2384 /* it's ok to have an unavailable sub-buffer */
2385 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2386 /* Clean the stream and free it. */
2387 consumer_del_stream(local_stream
[i
], data_ht
);
2388 local_stream
[i
] = NULL
;
2389 } else if (len
> 0) {
2390 local_stream
[i
]->data_read
= 1;
2396 * If we read high prio channel in this loop, try again
2397 * for more high prio data.
2403 /* Take care of low priority channels. */
2404 for (i
= 0; i
< nb_fd
; i
++) {
2405 if (local_stream
[i
] == NULL
) {
2408 if ((pollfd
[i
].revents
& POLLIN
) ||
2409 local_stream
[i
]->hangup_flush_done
) {
2410 DBG("Normal read on fd %d", pollfd
[i
].fd
);
2411 len
= ctx
->on_buffer_ready(local_stream
[i
], ctx
);
2412 /* it's ok to have an unavailable sub-buffer */
2413 if (len
< 0 && len
!= -EAGAIN
&& len
!= -ENODATA
) {
2414 /* Clean the stream and free it. */
2415 consumer_del_stream(local_stream
[i
], data_ht
);
2416 local_stream
[i
] = NULL
;
2417 } else if (len
> 0) {
2418 local_stream
[i
]->data_read
= 1;
2423 /* Handle hangup and errors */
2424 for (i
= 0; i
< nb_fd
; i
++) {
2425 if (local_stream
[i
] == NULL
) {
2428 if (!local_stream
[i
]->hangup_flush_done
2429 && (pollfd
[i
].revents
& (POLLHUP
| POLLERR
| POLLNVAL
))
2430 && (consumer_data
.type
== LTTNG_CONSUMER32_UST
2431 || consumer_data
.type
== LTTNG_CONSUMER64_UST
)) {
2432 DBG("fd %d is hup|err|nval. Attempting flush and read.",
2434 lttng_ustconsumer_on_stream_hangup(local_stream
[i
]);
2435 /* Attempt read again, for the data we just flushed. */
2436 local_stream
[i
]->data_read
= 1;
2439 * If the poll flag is HUP/ERR/NVAL and we have
2440 * read no data in this pass, we can remove the
2441 * stream from its hash table.
2443 if ((pollfd
[i
].revents
& POLLHUP
)) {
2444 DBG("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
2445 if (!local_stream
[i
]->data_read
) {
2446 consumer_del_stream(local_stream
[i
], data_ht
);
2447 local_stream
[i
] = NULL
;
2450 } else if (pollfd
[i
].revents
& POLLERR
) {
2451 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
2452 if (!local_stream
[i
]->data_read
) {
2453 consumer_del_stream(local_stream
[i
], data_ht
);
2454 local_stream
[i
] = NULL
;
2457 } else if (pollfd
[i
].revents
& POLLNVAL
) {
2458 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
2459 if (!local_stream
[i
]->data_read
) {
2460 consumer_del_stream(local_stream
[i
], data_ht
);
2461 local_stream
[i
] = NULL
;
2465 if (local_stream
[i
] != NULL
) {
2466 local_stream
[i
]->data_read
= 0;
2471 DBG("polling thread exiting");
2476 * Close the write side of the pipe so epoll_wait() in
2477 * consumer_thread_metadata_poll can catch it. The thread is monitoring the
2478 * read side of the pipe. If we close them both, epoll_wait strangely does
2479 * not return and could create a endless wait period if the pipe is the
2480 * only tracked fd in the poll set. The thread will take care of closing
2483 ret
= close(ctx
->consumer_metadata_pipe
[1]);
2485 PERROR("close data pipe");
2488 destroy_data_stream_ht(data_ht
);
2490 rcu_unregister_thread();
2495 * Close wake-up end of each stream belonging to the channel. This will
2496 * allow the poll() on the stream read-side to detect when the
2497 * write-side (application) finally closes them.
2500 void consumer_close_channel_streams(struct lttng_consumer_channel
*channel
)
2502 struct lttng_ht
*ht
;
2503 struct lttng_consumer_stream
*stream
;
2504 struct lttng_ht_iter iter
;
2506 ht
= consumer_data
.stream_per_chan_id_ht
;
2509 cds_lfht_for_each_entry_duplicate(ht
->ht
,
2510 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
2511 ht
->match_fct
, &channel
->key
,
2512 &iter
.iter
, stream
, node_channel_id
.node
) {
2513 switch (consumer_data
.type
) {
2514 case LTTNG_CONSUMER_KERNEL
:
2516 case LTTNG_CONSUMER32_UST
:
2517 case LTTNG_CONSUMER64_UST
:
2519 * Note: a mutex is taken internally within
2520 * liblttng-ust-ctl to protect timer wakeup_fd
2521 * use from concurrent close.
2523 lttng_ustconsumer_close_stream_wakeup(stream
);
2526 ERR("Unknown consumer_data type");
2533 static void destroy_channel_ht(struct lttng_ht
*ht
)
2535 struct lttng_ht_iter iter
;
2536 struct lttng_consumer_channel
*channel
;
2544 cds_lfht_for_each_entry(ht
->ht
, &iter
.iter
, channel
, wait_fd_node
.node
) {
2545 ret
= lttng_ht_del(ht
, &iter
);
2550 lttng_ht_destroy(ht
);
2554 * This thread polls the channel fds to detect when they are being
2555 * closed. It closes all related streams if the channel is detected as
2556 * closed. It is currently only used as a shim layer for UST because the
2557 * consumerd needs to keep the per-stream wakeup end of pipes open for
2560 void *consumer_thread_channel_poll(void *data
)
2563 uint32_t revents
, nb_fd
;
2564 struct lttng_consumer_channel
*chan
= NULL
;
2565 struct lttng_ht_iter iter
;
2566 struct lttng_ht_node_u64
*node
;
2567 struct lttng_poll_event events
;
2568 struct lttng_consumer_local_data
*ctx
= data
;
2569 struct lttng_ht
*channel_ht
;
2571 rcu_register_thread();
2573 channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2575 /* ENOMEM at this point. Better to bail out. */
2579 DBG("Thread channel poll started");
2581 /* Size is set to 1 for the consumer_channel pipe */
2582 ret
= lttng_poll_create(&events
, 2, LTTNG_CLOEXEC
);
2584 ERR("Poll set creation failed");
2588 ret
= lttng_poll_add(&events
, ctx
->consumer_channel_pipe
[0], LPOLLIN
);
2594 DBG("Channel main loop started");
2597 /* Only the channel pipe is set */
2598 if (LTTNG_POLL_GETNB(&events
) == 0 && consumer_quit
== 1) {
2603 DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events
));
2604 ret
= lttng_poll_wait(&events
, -1);
2605 DBG("Channel event catched in thread");
2607 if (errno
== EINTR
) {
2608 ERR("Poll EINTR catched");
2616 /* From here, the event is a channel wait fd */
2617 for (i
= 0; i
< nb_fd
; i
++) {
2618 revents
= LTTNG_POLL_GETEV(&events
, i
);
2619 pollfd
= LTTNG_POLL_GETFD(&events
, i
);
2621 /* Just don't waste time if no returned events for the fd */
2625 if (pollfd
== ctx
->consumer_channel_pipe
[0]) {
2626 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2627 DBG("Channel thread pipe hung up");
2629 * Remove the pipe from the poll set and continue the loop
2630 * since their might be data to consume.
2632 lttng_poll_del(&events
, ctx
->consumer_channel_pipe
[0]);
2634 } else if (revents
& LPOLLIN
) {
2635 enum consumer_channel_action action
;
2637 ret
= read_channel_pipe(ctx
, &chan
, &action
);
2639 ERR("Error reading channel pipe");
2644 case CONSUMER_CHANNEL_ADD
:
2645 DBG("Adding channel %d to poll set",
2648 lttng_ht_node_init_u64(&chan
->wait_fd_node
,
2650 lttng_ht_add_unique_u64(channel_ht
,
2651 &chan
->wait_fd_node
);
2652 /* Add channel to the global poll events list */
2653 lttng_poll_add(&events
, chan
->wait_fd
,
2654 LPOLLIN
| LPOLLPRI
);
2656 case CONSUMER_CHANNEL_QUIT
:
2658 * Remove the pipe from the poll set and continue the loop
2659 * since their might be data to consume.
2661 lttng_poll_del(&events
, ctx
->consumer_channel_pipe
[0]);
2664 ERR("Unknown action");
2669 /* Handle other stream */
2675 uint64_t tmp_id
= (uint64_t) pollfd
;
2677 lttng_ht_lookup(channel_ht
, &tmp_id
, &iter
);
2679 node
= lttng_ht_iter_get_node_u64(&iter
);
2682 chan
= caa_container_of(node
, struct lttng_consumer_channel
,
2685 /* Check for error event */
2686 if (revents
& (LPOLLERR
| LPOLLHUP
)) {
2687 DBG("Channel fd %d is hup|err.", pollfd
);
2689 lttng_poll_del(&events
, chan
->wait_fd
);
2690 ret
= lttng_ht_del(channel_ht
, &iter
);
2692 consumer_close_channel_streams(chan
);
2695 /* Release RCU lock for the channel looked up */
2701 lttng_poll_clean(&events
);
2703 destroy_channel_ht(channel_ht
);
2705 DBG("Channel poll thread exiting");
2706 rcu_unregister_thread();
2711 * This thread listens on the consumerd socket and receives the file
2712 * descriptors from the session daemon.
2714 void *consumer_thread_sessiond_poll(void *data
)
2716 int sock
= -1, client_socket
, ret
;
2718 * structure to poll for incoming data on communication socket avoids
2719 * making blocking sockets.
2721 struct pollfd consumer_sockpoll
[2];
2722 struct lttng_consumer_local_data
*ctx
= data
;
2724 rcu_register_thread();
2726 DBG("Creating command socket %s", ctx
->consumer_command_sock_path
);
2727 unlink(ctx
->consumer_command_sock_path
);
2728 client_socket
= lttcomm_create_unix_sock(ctx
->consumer_command_sock_path
);
2729 if (client_socket
< 0) {
2730 ERR("Cannot create command socket");
2734 ret
= lttcomm_listen_unix_sock(client_socket
);
2739 DBG("Sending ready command to lttng-sessiond");
2740 ret
= lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
);
2741 /* return < 0 on error, but == 0 is not fatal */
2743 ERR("Error sending ready command to lttng-sessiond");
2747 ret
= fcntl(client_socket
, F_SETFL
, O_NONBLOCK
);
2749 PERROR("fcntl O_NONBLOCK");
2753 /* prepare the FDs to poll : to client socket and the should_quit pipe */
2754 consumer_sockpoll
[0].fd
= ctx
->consumer_should_quit
[0];
2755 consumer_sockpoll
[0].events
= POLLIN
| POLLPRI
;
2756 consumer_sockpoll
[1].fd
= client_socket
;
2757 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
2759 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2762 DBG("Connection on client_socket");
2764 /* Blocking call, waiting for transmission */
2765 sock
= lttcomm_accept_unix_sock(client_socket
);
2770 ret
= fcntl(sock
, F_SETFL
, O_NONBLOCK
);
2772 PERROR("fcntl O_NONBLOCK");
2776 /* This socket is not useful anymore. */
2777 ret
= close(client_socket
);
2779 PERROR("close client_socket");
2783 /* update the polling structure to poll on the established socket */
2784 consumer_sockpoll
[1].fd
= sock
;
2785 consumer_sockpoll
[1].events
= POLLIN
| POLLPRI
;
2788 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2791 DBG("Incoming command on sock");
2792 ret
= lttng_consumer_recv_cmd(ctx
, sock
, consumer_sockpoll
);
2793 if (ret
== -ENOENT
) {
2794 DBG("Received STOP command");
2799 * This could simply be a session daemon quitting. Don't output
2802 DBG("Communication interrupted on command socket");
2805 if (consumer_quit
) {
2806 DBG("consumer_thread_receive_fds received quit from signal");
2809 DBG("received command on sock");
2812 DBG("Consumer thread sessiond poll exiting");
2815 * Close metadata streams since the producer is the session daemon which
2818 * NOTE: for now, this only applies to the UST tracer.
2820 lttng_consumer_close_metadata();
2823 * when all fds have hung up, the polling thread
2829 * Notify the data poll thread to poll back again and test the
2830 * consumer_quit state that we just set so to quit gracefully.
2832 notify_thread_pipe(ctx
->consumer_data_pipe
[1]);
2834 notify_channel_pipe(ctx
, NULL
, CONSUMER_CHANNEL_QUIT
);
2836 /* Cleaning up possibly open sockets. */
2840 PERROR("close sock sessiond poll");
2843 if (client_socket
>= 0) {
2846 PERROR("close client_socket sessiond poll");
2850 rcu_unregister_thread();
2854 ssize_t
lttng_consumer_read_subbuffer(struct lttng_consumer_stream
*stream
,
2855 struct lttng_consumer_local_data
*ctx
)
2859 pthread_mutex_lock(&stream
->lock
);
2861 switch (consumer_data
.type
) {
2862 case LTTNG_CONSUMER_KERNEL
:
2863 ret
= lttng_kconsumer_read_subbuffer(stream
, ctx
);
2865 case LTTNG_CONSUMER32_UST
:
2866 case LTTNG_CONSUMER64_UST
:
2867 ret
= lttng_ustconsumer_read_subbuffer(stream
, ctx
);
2870 ERR("Unknown consumer_data type");
2876 pthread_mutex_unlock(&stream
->lock
);
2880 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream
*stream
)
2882 switch (consumer_data
.type
) {
2883 case LTTNG_CONSUMER_KERNEL
:
2884 return lttng_kconsumer_on_recv_stream(stream
);
2885 case LTTNG_CONSUMER32_UST
:
2886 case LTTNG_CONSUMER64_UST
:
2887 return lttng_ustconsumer_on_recv_stream(stream
);
2889 ERR("Unknown consumer_data type");
2896 * Allocate and set consumer data hash tables.
2898 void lttng_consumer_init(void)
2900 consumer_data
.channel_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2901 consumer_data
.relayd_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2902 consumer_data
.stream_list_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2903 consumer_data
.stream_per_chan_id_ht
= lttng_ht_new(0, LTTNG_HT_TYPE_U64
);
2907 * Process the ADD_RELAYD command receive by a consumer.
2909 * This will create a relayd socket pair and add it to the relayd hash table.
2910 * The caller MUST acquire a RCU read side lock before calling it.
2912 int consumer_add_relayd_socket(int net_seq_idx
, int sock_type
,
2913 struct lttng_consumer_local_data
*ctx
, int sock
,
2914 struct pollfd
*consumer_sockpoll
, struct lttcomm_sock
*relayd_sock
,
2915 unsigned int sessiond_id
)
2917 int fd
= -1, ret
= -1, relayd_created
= 0;
2918 enum lttng_error_code ret_code
= LTTNG_OK
;
2919 struct consumer_relayd_sock_pair
*relayd
;
2921 DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx
);
2923 /* First send a status message before receiving the fds. */
2924 ret
= consumer_send_status_msg(sock
, ret_code
);
2926 /* Somehow, the session daemon is not responding anymore. */
2930 /* Get relayd reference if exists. */
2931 relayd
= consumer_find_relayd(net_seq_idx
);
2932 if (relayd
== NULL
) {
2933 /* Not found. Allocate one. */
2934 relayd
= consumer_allocate_relayd_sock_pair(net_seq_idx
);
2935 if (relayd
== NULL
) {
2936 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_OUTFD_ERROR
);
2940 relayd
->sessiond_session_id
= (uint64_t) sessiond_id
;
2944 /* Poll on consumer socket. */
2945 if (lttng_consumer_poll_socket(consumer_sockpoll
) < 0) {
2950 /* Get relayd socket from session daemon */
2951 ret
= lttcomm_recv_fds_unix_sock(sock
, &fd
, 1);
2952 if (ret
!= sizeof(fd
)) {
2953 lttng_consumer_send_error(ctx
, LTTCOMM_CONSUMERD_ERROR_RECV_FD
);
2955 fd
= -1; /* Just in case it gets set with an invalid value. */
2959 /* We have the fds without error. Send status back. */
2960 ret
= consumer_send_status_msg(sock
, ret_code
);
2962 /* Somehow, the session daemon is not responding anymore. */
2966 /* Copy socket information and received FD */
2967 switch (sock_type
) {
2968 case LTTNG_STREAM_CONTROL
:
2969 /* Copy received lttcomm socket */
2970 lttcomm_copy_sock(&relayd
->control_sock
, relayd_sock
);
2971 ret
= lttcomm_create_sock(&relayd
->control_sock
);
2972 /* Immediately try to close the created socket if valid. */
2973 if (relayd
->control_sock
.fd
>= 0) {
2974 if (close(relayd
->control_sock
.fd
)) {
2975 PERROR("close relayd control socket");
2978 /* Handle create_sock error. */
2983 /* Assign new file descriptor */
2984 relayd
->control_sock
.fd
= fd
;
2987 * Create a session on the relayd and store the returned id. Lock the
2988 * control socket mutex if the relayd was NOT created before.
2990 if (!relayd_created
) {
2991 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
2993 ret
= relayd_create_session(&relayd
->control_sock
,
2994 &relayd
->relayd_session_id
);
2995 if (!relayd_created
) {
2996 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3000 * Close all sockets of a relayd object. It will be freed if it was
3001 * created at the error code path or else it will be garbage
3004 (void) relayd_close(&relayd
->control_sock
);
3005 (void) relayd_close(&relayd
->data_sock
);
3010 case LTTNG_STREAM_DATA
:
3011 /* Copy received lttcomm socket */
3012 lttcomm_copy_sock(&relayd
->data_sock
, relayd_sock
);
3013 ret
= lttcomm_create_sock(&relayd
->data_sock
);
3014 /* Immediately try to close the created socket if valid. */
3015 if (relayd
->data_sock
.fd
>= 0) {
3016 if (close(relayd
->data_sock
.fd
)) {
3017 PERROR("close relayd data socket");
3020 /* Handle create_sock error. */
3025 /* Assign new file descriptor */
3026 relayd
->data_sock
.fd
= fd
;
3029 ERR("Unknown relayd socket type (%d)", sock_type
);
3034 DBG("Consumer %s socket created successfully with net idx %" PRIu64
" (fd: %d)",
3035 sock_type
== LTTNG_STREAM_CONTROL
? "control" : "data",
3036 relayd
->net_seq_idx
, fd
);
3039 * Add relayd socket pair to consumer data hashtable. If object already
3040 * exists or on error, the function gracefully returns.
3048 /* Close received socket if valid. */
3051 PERROR("close received socket");
3056 if (relayd_created
) {
3064 * Try to lock the stream mutex.
3066 * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
3068 static int stream_try_lock(struct lttng_consumer_stream
*stream
)
3075 * Try to lock the stream mutex. On failure, we know that the stream is
3076 * being used else where hence there is data still being extracted.
3078 ret
= pthread_mutex_trylock(&stream
->lock
);
3080 /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
3092 * Search for a relayd associated to the session id and return the reference.
3094 * A rcu read side lock MUST be acquire before calling this function and locked
3095 * until the relayd object is no longer necessary.
3097 static struct consumer_relayd_sock_pair
*find_relayd_by_session_id(uint64_t id
)
3099 struct lttng_ht_iter iter
;
3100 struct consumer_relayd_sock_pair
*relayd
= NULL
;
3102 /* Iterate over all relayd since they are indexed by net_seq_idx. */
3103 cds_lfht_for_each_entry(consumer_data
.relayd_ht
->ht
, &iter
.iter
, relayd
,
3106 * Check by sessiond id which is unique here where the relayd session
3107 * id might not be when having multiple relayd.
3109 if (relayd
->sessiond_session_id
== id
) {
3110 /* Found the relayd. There can be only one per id. */
3122 * Check if for a given session id there is still data needed to be extract
3125 * Return 1 if data is pending or else 0 meaning ready to be read.
3127 int consumer_data_pending(uint64_t id
)
3130 struct lttng_ht_iter iter
;
3131 struct lttng_ht
*ht
;
3132 struct lttng_consumer_stream
*stream
;
3133 struct consumer_relayd_sock_pair
*relayd
= NULL
;
3134 int (*data_pending
)(struct lttng_consumer_stream
*);
3136 DBG("Consumer data pending command on session id %" PRIu64
, id
);
3139 pthread_mutex_lock(&consumer_data
.lock
);
3141 switch (consumer_data
.type
) {
3142 case LTTNG_CONSUMER_KERNEL
:
3143 data_pending
= lttng_kconsumer_data_pending
;
3145 case LTTNG_CONSUMER32_UST
:
3146 case LTTNG_CONSUMER64_UST
:
3147 data_pending
= lttng_ustconsumer_data_pending
;
3150 ERR("Unknown consumer data type");
3154 /* Ease our life a bit */
3155 ht
= consumer_data
.stream_list_ht
;
3157 relayd
= find_relayd_by_session_id(id
);
3159 /* Send init command for data pending. */
3160 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
3161 ret
= relayd_begin_data_pending(&relayd
->control_sock
,
3162 relayd
->relayd_session_id
);
3163 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3165 /* Communication error thus the relayd so no data pending. */
3166 goto data_not_pending
;
3170 cds_lfht_for_each_entry_duplicate(ht
->ht
,
3171 ht
->hash_fct(&id
, lttng_ht_seed
),
3173 &iter
.iter
, stream
, node_session_id
.node
) {
3174 /* If this call fails, the stream is being used hence data pending. */
3175 ret
= stream_try_lock(stream
);
3181 * A removed node from the hash table indicates that the stream has
3182 * been deleted thus having a guarantee that the buffers are closed
3183 * on the consumer side. However, data can still be transmitted
3184 * over the network so don't skip the relayd check.
3186 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
3188 /* Check the stream if there is data in the buffers. */
3189 ret
= data_pending(stream
);
3191 pthread_mutex_unlock(&stream
->lock
);
3198 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
3199 if (stream
->metadata_flag
) {
3200 ret
= relayd_quiescent_control(&relayd
->control_sock
,
3201 stream
->relayd_stream_id
);
3203 ret
= relayd_data_pending(&relayd
->control_sock
,
3204 stream
->relayd_stream_id
,
3205 stream
->next_net_seq_num
- 1);
3207 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3209 pthread_mutex_unlock(&stream
->lock
);
3213 pthread_mutex_unlock(&stream
->lock
);
3217 unsigned int is_data_inflight
= 0;
3219 /* Send init command for data pending. */
3220 pthread_mutex_lock(&relayd
->ctrl_sock_mutex
);
3221 ret
= relayd_end_data_pending(&relayd
->control_sock
,
3222 relayd
->relayd_session_id
, &is_data_inflight
);
3223 pthread_mutex_unlock(&relayd
->ctrl_sock_mutex
);
3225 goto data_not_pending
;
3227 if (is_data_inflight
) {
3233 * Finding _no_ node in the hash table and no inflight data means that the
3234 * stream(s) have been removed thus data is guaranteed to be available for
3235 * analysis from the trace files.
3239 /* Data is available to be read by a viewer. */
3240 pthread_mutex_unlock(&consumer_data
.lock
);
3245 /* Data is still being extracted from buffers. */
3246 pthread_mutex_unlock(&consumer_data
.lock
);
3252 * Send a ret code status message to the sessiond daemon.
3254 * Return the sendmsg() return value.
3256 int consumer_send_status_msg(int sock
, int ret_code
)
3258 struct lttcomm_consumer_status_msg msg
;
3260 msg
.ret_code
= ret_code
;
3262 return lttcomm_send_unix_sock(sock
, &msg
, sizeof(msg
));
3266 * Send a channel status message to the sessiond daemon.
3268 * Return the sendmsg() return value.
3270 int consumer_send_status_channel(int sock
,
3271 struct lttng_consumer_channel
*channel
)
3273 struct lttcomm_consumer_status_channel msg
;
3278 msg
.ret_code
= -LTTNG_ERR_UST_CHAN_FAIL
;
3280 msg
.ret_code
= LTTNG_OK
;
3281 msg
.key
= channel
->key
;
3282 msg
.stream_count
= channel
->streams
.count
;
3285 return lttcomm_send_unix_sock(sock
, &msg
, sizeof(msg
));