2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
10 #include <urcu/rculfhash.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
44 enum lttng_object_type
{
45 LTTNG_OBJECT_TYPE_UNKNOWN
,
46 LTTNG_OBJECT_TYPE_NONE
,
47 LTTNG_OBJECT_TYPE_CHANNEL
,
48 LTTNG_OBJECT_TYPE_SESSION
,
51 struct lttng_trigger_list_element
{
52 /* No ownership of the trigger object is assumed. */
53 struct lttng_trigger
*trigger
;
54 struct cds_list_head node
;
57 struct lttng_channel_trigger_list
{
58 struct channel_key channel_key
;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list
;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node
;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node
;
68 * List of triggers applying to a given session.
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
76 struct lttng_session_trigger_list
{
78 * Not owned by this; points to the session_info structure's
81 const char *session_name
;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list
;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node
;
87 * Weak reference to the notification system's session triggers
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
103 struct cds_lfht
*session_triggers_ht
;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node
;
108 struct lttng_trigger_ht_element
{
109 struct lttng_trigger
*trigger
;
110 struct cds_lfht_node node
;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node
;
115 struct lttng_condition_list_element
{
116 struct lttng_condition
*condition
;
117 struct cds_list_head node
;
120 struct channel_state_sample
{
121 struct channel_key key
;
122 struct cds_lfht_node channel_state_ht_node
;
123 uint64_t highest_usage
;
124 uint64_t lowest_usage
;
125 uint64_t channel_total_consumed
;
126 /* call_rcu delayed reclaim. */
127 struct rcu_head rcu_node
;
130 static unsigned long hash_channel_key(struct channel_key
*key
);
131 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
132 struct lttng_evaluation
**evaluation
,
133 const struct notification_thread_state
*state
,
134 const struct channel_state_sample
*previous_sample
,
135 const struct channel_state_sample
*latest_sample
,
136 uint64_t previous_session_consumed_total
,
137 uint64_t latest_session_consumed_total
,
138 struct channel_info
*channel_info
);
140 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
141 const struct lttng_evaluation
*evaluation
,
142 struct notification_client_list
*client_list
,
143 struct notification_thread_state
*state
,
144 uid_t channel_uid
, gid_t channel_gid
);
147 /* session_info API */
149 void session_info_destroy(void *_data
);
151 void session_info_get(struct session_info
*session_info
);
153 void session_info_put(struct session_info
*session_info
);
155 struct session_info
*session_info_create(const char *name
,
156 uid_t uid
, gid_t gid
,
157 struct lttng_session_trigger_list
*trigger_list
,
158 struct cds_lfht
*sessions_ht
);
160 void session_info_add_channel(struct session_info
*session_info
,
161 struct channel_info
*channel_info
);
163 void session_info_remove_channel(struct session_info
*session_info
,
164 struct channel_info
*channel_info
);
166 /* lttng_session_trigger_list API */
168 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
169 const char *session_name
,
170 struct cds_lfht
*session_triggers_ht
);
172 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
173 const struct notification_thread_state
*state
,
174 const char *session_name
);
176 void lttng_session_trigger_list_destroy(
177 struct lttng_session_trigger_list
*list
);
179 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
180 struct lttng_trigger
*trigger
);
183 int client_handle_transmission_status(
184 struct notification_client
*client
,
185 enum client_transmission_status transmission_status
,
186 struct notification_thread_state
*state
);
189 int match_client_socket(struct cds_lfht_node
*node
, const void *key
)
191 /* This double-cast is intended to supress pointer-to-cast warning. */
192 const int socket
= (int) (intptr_t) key
;
193 const struct notification_client
*client
= caa_container_of(node
,
194 struct notification_client
, client_socket_ht_node
);
196 return client
->socket
== socket
;
200 int match_client_id(struct cds_lfht_node
*node
, const void *key
)
202 /* This double-cast is intended to supress pointer-to-cast warning. */
203 const notification_client_id id
= *((notification_client_id
*) key
);
204 const struct notification_client
*client
= caa_container_of(
205 node
, struct notification_client
, client_id_ht_node
);
207 return client
->id
== id
;
211 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
213 struct channel_key
*channel_key
= (struct channel_key
*) key
;
214 struct lttng_channel_trigger_list
*trigger_list
;
216 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
217 channel_triggers_ht_node
);
219 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
220 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
224 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
226 const char *session_name
= (const char *) key
;
227 struct lttng_session_trigger_list
*trigger_list
;
229 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
230 session_triggers_ht_node
);
232 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
236 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
238 struct channel_key
*channel_key
= (struct channel_key
*) key
;
239 struct channel_state_sample
*sample
;
241 sample
= caa_container_of(node
, struct channel_state_sample
,
242 channel_state_ht_node
);
244 return !!((channel_key
->key
== sample
->key
.key
) &&
245 (channel_key
->domain
== sample
->key
.domain
));
249 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
251 struct channel_key
*channel_key
= (struct channel_key
*) key
;
252 struct channel_info
*channel_info
;
254 channel_info
= caa_container_of(node
, struct channel_info
,
257 return !!((channel_key
->key
== channel_info
->key
.key
) &&
258 (channel_key
->domain
== channel_info
->key
.domain
));
262 int match_condition(struct cds_lfht_node
*node
, const void *key
)
264 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
265 struct lttng_trigger_ht_element
*trigger
;
266 struct lttng_condition
*condition
;
268 trigger
= caa_container_of(node
, struct lttng_trigger_ht_element
,
270 condition
= lttng_trigger_get_condition(trigger
->trigger
);
273 return !!lttng_condition_is_equal(condition_key
, condition
);
277 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
279 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
280 struct notification_client_list
*client_list
;
281 const struct lttng_condition
*condition
;
283 assert(condition_key
);
285 client_list
= caa_container_of(node
, struct notification_client_list
,
286 notification_trigger_clients_ht_node
);
287 condition
= lttng_trigger_get_const_condition(client_list
->trigger
);
289 return !!lttng_condition_is_equal(condition_key
, condition
);
293 int match_session(struct cds_lfht_node
*node
, const void *key
)
295 const char *name
= key
;
296 struct session_info
*session_info
= caa_container_of(
297 node
, struct session_info
, sessions_ht_node
);
299 return !strcmp(session_info
->name
, name
);
303 unsigned long lttng_condition_buffer_usage_hash(
304 const struct lttng_condition
*_condition
)
307 unsigned long condition_type
;
308 struct lttng_condition_buffer_usage
*condition
;
310 condition
= container_of(_condition
,
311 struct lttng_condition_buffer_usage
, parent
);
313 condition_type
= (unsigned long) condition
->parent
.type
;
314 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
315 if (condition
->session_name
) {
316 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
318 if (condition
->channel_name
) {
319 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
321 if (condition
->domain
.set
) {
322 hash
^= hash_key_ulong(
323 (void *) condition
->domain
.type
,
326 if (condition
->threshold_ratio
.set
) {
329 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
330 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
331 } else if (condition
->threshold_bytes
.set
) {
334 val
= condition
->threshold_bytes
.value
;
335 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
341 unsigned long lttng_condition_session_consumed_size_hash(
342 const struct lttng_condition
*_condition
)
345 unsigned long condition_type
=
346 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
347 struct lttng_condition_session_consumed_size
*condition
;
350 condition
= container_of(_condition
,
351 struct lttng_condition_session_consumed_size
, parent
);
353 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
354 if (condition
->session_name
) {
355 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
357 val
= condition
->consumed_threshold_bytes
.value
;
358 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
363 unsigned long lttng_condition_session_rotation_hash(
364 const struct lttng_condition
*_condition
)
366 unsigned long hash
, condition_type
;
367 struct lttng_condition_session_rotation
*condition
;
369 condition
= container_of(_condition
,
370 struct lttng_condition_session_rotation
, parent
);
371 condition_type
= (unsigned long) condition
->parent
.type
;
372 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
373 assert(condition
->session_name
);
374 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
379 * The lttng_condition hashing code is kept in this file (rather than
380 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
381 * don't want to link in liblttng-ctl.
384 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
386 switch (condition
->type
) {
387 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
388 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
389 return lttng_condition_buffer_usage_hash(condition
);
390 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
391 return lttng_condition_session_consumed_size_hash(condition
);
392 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
393 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
394 return lttng_condition_session_rotation_hash(condition
);
396 ERR("[notification-thread] Unexpected condition type caught");
402 unsigned long hash_channel_key(struct channel_key
*key
)
404 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
405 unsigned long domain_hash
= hash_key_ulong(
406 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
408 return key_hash
^ domain_hash
;
412 unsigned long hash_client_socket(int socket
)
414 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
418 unsigned long hash_client_id(notification_client_id id
)
420 return hash_key_u64(&id
, lttng_ht_seed
);
424 * Get the type of object to which a given condition applies. Bindings let
425 * the notification system evaluate a trigger's condition when a given
426 * object's state is updated.
428 * For instance, a condition bound to a channel will be evaluated everytime
429 * the channel's state is changed by a channel monitoring sample.
432 enum lttng_object_type
get_condition_binding_object(
433 const struct lttng_condition
*condition
)
435 switch (lttng_condition_get_type(condition
)) {
436 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
437 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
438 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
439 return LTTNG_OBJECT_TYPE_CHANNEL
;
440 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
441 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
442 return LTTNG_OBJECT_TYPE_SESSION
;
444 return LTTNG_OBJECT_TYPE_UNKNOWN
;
449 void free_channel_info_rcu(struct rcu_head
*node
)
451 free(caa_container_of(node
, struct channel_info
, rcu_node
));
455 void channel_info_destroy(struct channel_info
*channel_info
)
461 if (channel_info
->session_info
) {
462 session_info_remove_channel(channel_info
->session_info
,
464 session_info_put(channel_info
->session_info
);
466 if (channel_info
->name
) {
467 free(channel_info
->name
);
469 call_rcu(&channel_info
->rcu_node
, free_channel_info_rcu
);
473 void free_session_info_rcu(struct rcu_head
*node
)
475 free(caa_container_of(node
, struct session_info
, rcu_node
));
478 /* Don't call directly, use the ref-counting mechanism. */
480 void session_info_destroy(void *_data
)
482 struct session_info
*session_info
= _data
;
485 assert(session_info
);
486 if (session_info
->channel_infos_ht
) {
487 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
489 ERR("[notification-thread] Failed to destroy channel information hash table");
492 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
495 cds_lfht_del(session_info
->sessions_ht
,
496 &session_info
->sessions_ht_node
);
498 free(session_info
->name
);
499 call_rcu(&session_info
->rcu_node
, free_session_info_rcu
);
503 void session_info_get(struct session_info
*session_info
)
508 lttng_ref_get(&session_info
->ref
);
512 void session_info_put(struct session_info
*session_info
)
517 lttng_ref_put(&session_info
->ref
);
521 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
522 struct lttng_session_trigger_list
*trigger_list
,
523 struct cds_lfht
*sessions_ht
)
525 struct session_info
*session_info
;
529 session_info
= zmalloc(sizeof(*session_info
));
533 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
535 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
536 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
537 if (!session_info
->channel_infos_ht
) {
541 cds_lfht_node_init(&session_info
->sessions_ht_node
);
542 session_info
->name
= strdup(name
);
543 if (!session_info
->name
) {
546 session_info
->uid
= uid
;
547 session_info
->gid
= gid
;
548 session_info
->trigger_list
= trigger_list
;
549 session_info
->sessions_ht
= sessions_ht
;
553 session_info_put(session_info
);
558 void session_info_add_channel(struct session_info
*session_info
,
559 struct channel_info
*channel_info
)
562 cds_lfht_add(session_info
->channel_infos_ht
,
563 hash_channel_key(&channel_info
->key
),
564 &channel_info
->session_info_channels_ht_node
);
569 void session_info_remove_channel(struct session_info
*session_info
,
570 struct channel_info
*channel_info
)
573 cds_lfht_del(session_info
->channel_infos_ht
,
574 &channel_info
->session_info_channels_ht_node
);
579 struct channel_info
*channel_info_create(const char *channel_name
,
580 struct channel_key
*channel_key
, uint64_t channel_capacity
,
581 struct session_info
*session_info
)
583 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
589 cds_lfht_node_init(&channel_info
->channels_ht_node
);
590 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
591 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
592 channel_info
->capacity
= channel_capacity
;
594 channel_info
->name
= strdup(channel_name
);
595 if (!channel_info
->name
) {
600 * Set the references between session and channel infos:
601 * - channel_info holds a strong reference to session_info
602 * - session_info holds a weak reference to channel_info
604 session_info_get(session_info
);
605 session_info_add_channel(session_info
, channel_info
);
606 channel_info
->session_info
= session_info
;
610 channel_info_destroy(channel_info
);
615 bool notification_client_list_get(struct notification_client_list
*list
)
617 return urcu_ref_get_unless_zero(&list
->ref
);
621 void free_notification_client_list_rcu(struct rcu_head
*node
)
623 free(caa_container_of(node
, struct notification_client_list
,
628 void notification_client_list_release(struct urcu_ref
*list_ref
)
630 struct notification_client_list
*list
=
631 container_of(list_ref
, typeof(*list
), ref
);
632 struct notification_client_list_element
*client_list_element
, *tmp
;
634 if (list
->notification_trigger_clients_ht
) {
636 cds_lfht_del(list
->notification_trigger_clients_ht
,
637 &list
->notification_trigger_clients_ht_node
);
639 list
->notification_trigger_clients_ht
= NULL
;
641 cds_list_for_each_entry_safe(client_list_element
, tmp
,
643 free(client_list_element
);
645 pthread_mutex_destroy(&list
->lock
);
646 call_rcu(&list
->rcu_node
, free_notification_client_list_rcu
);
650 struct notification_client_list
*notification_client_list_create(
651 const struct lttng_trigger
*trigger
)
653 struct notification_client_list
*client_list
=
654 zmalloc(sizeof(*client_list
));
659 pthread_mutex_init(&client_list
->lock
, NULL
);
660 urcu_ref_init(&client_list
->ref
);
661 cds_lfht_node_init(&client_list
->notification_trigger_clients_ht_node
);
662 CDS_INIT_LIST_HEAD(&client_list
->list
);
663 client_list
->trigger
= trigger
;
669 void publish_notification_client_list(
670 struct notification_thread_state
*state
,
671 struct notification_client_list
*list
)
673 const struct lttng_condition
*condition
=
674 lttng_trigger_get_const_condition(list
->trigger
);
676 assert(!list
->notification_trigger_clients_ht
);
678 list
->notification_trigger_clients_ht
=
679 state
->notification_trigger_clients_ht
;
682 cds_lfht_add(state
->notification_trigger_clients_ht
,
683 lttng_condition_hash(condition
),
684 &list
->notification_trigger_clients_ht_node
);
689 void notification_client_list_put(struct notification_client_list
*list
)
694 return urcu_ref_put(&list
->ref
, notification_client_list_release
);
697 /* Provides a reference to the returned list. */
699 struct notification_client_list
*get_client_list_from_condition(
700 struct notification_thread_state
*state
,
701 const struct lttng_condition
*condition
)
703 struct cds_lfht_node
*node
;
704 struct cds_lfht_iter iter
;
705 struct notification_client_list
*list
= NULL
;
708 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
709 lttng_condition_hash(condition
),
710 match_client_list_condition
,
713 node
= cds_lfht_iter_get_node(&iter
);
715 list
= container_of(node
, struct notification_client_list
,
716 notification_trigger_clients_ht_node
);
717 list
= notification_client_list_get(list
) ? list
: NULL
;
725 int evaluate_channel_condition_for_client(
726 const struct lttng_condition
*condition
,
727 struct notification_thread_state
*state
,
728 struct lttng_evaluation
**evaluation
,
729 uid_t
*session_uid
, gid_t
*session_gid
)
732 struct cds_lfht_iter iter
;
733 struct cds_lfht_node
*node
;
734 struct channel_info
*channel_info
= NULL
;
735 struct channel_key
*channel_key
= NULL
;
736 struct channel_state_sample
*last_sample
= NULL
;
737 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
741 /* Find the channel associated with the condition. */
742 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
743 channel_trigger_list
, channel_triggers_ht_node
) {
744 struct lttng_trigger_list_element
*element
;
746 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
747 const struct lttng_condition
*current_condition
=
748 lttng_trigger_get_const_condition(
751 assert(current_condition
);
752 if (!lttng_condition_is_equal(condition
,
753 current_condition
)) {
757 /* Found the trigger, save the channel key. */
758 channel_key
= &channel_trigger_list
->channel_key
;
762 /* The channel key was found stop iteration. */
768 /* No channel found; normal exit. */
769 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
774 /* Fetch channel info for the matching channel. */
775 cds_lfht_lookup(state
->channels_ht
,
776 hash_channel_key(channel_key
),
780 node
= cds_lfht_iter_get_node(&iter
);
782 channel_info
= caa_container_of(node
, struct channel_info
,
785 /* Retrieve the channel's last sample, if it exists. */
786 cds_lfht_lookup(state
->channel_state_ht
,
787 hash_channel_key(channel_key
),
788 match_channel_state_sample
,
791 node
= cds_lfht_iter_get_node(&iter
);
793 last_sample
= caa_container_of(node
,
794 struct channel_state_sample
,
795 channel_state_ht_node
);
797 /* Nothing to evaluate, no sample was ever taken. Normal exit */
798 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
803 ret
= evaluate_buffer_condition(condition
, evaluation
, state
,
805 0, channel_info
->session_info
->consumed_data_size
,
808 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
812 *session_uid
= channel_info
->session_info
->uid
;
813 *session_gid
= channel_info
->session_info
->gid
;
820 const char *get_condition_session_name(const struct lttng_condition
*condition
)
822 const char *session_name
= NULL
;
823 enum lttng_condition_status status
;
825 switch (lttng_condition_get_type(condition
)) {
826 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
827 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
828 status
= lttng_condition_buffer_usage_get_session_name(
829 condition
, &session_name
);
831 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
832 status
= lttng_condition_session_consumed_size_get_session_name(
833 condition
, &session_name
);
835 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
836 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
837 status
= lttng_condition_session_rotation_get_session_name(
838 condition
, &session_name
);
843 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
844 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
852 int evaluate_session_condition_for_client(
853 const struct lttng_condition
*condition
,
854 struct notification_thread_state
*state
,
855 struct lttng_evaluation
**evaluation
,
856 uid_t
*session_uid
, gid_t
*session_gid
)
859 struct cds_lfht_iter iter
;
860 struct cds_lfht_node
*node
;
861 const char *session_name
;
862 struct session_info
*session_info
= NULL
;
865 session_name
= get_condition_session_name(condition
);
867 /* Find the session associated with the trigger. */
868 cds_lfht_lookup(state
->sessions_ht
,
869 hash_key_str(session_name
, lttng_ht_seed
),
873 node
= cds_lfht_iter_get_node(&iter
);
875 DBG("[notification-thread] No known session matching name \"%s\"",
881 session_info
= caa_container_of(node
, struct session_info
,
883 session_info_get(session_info
);
886 * Evaluation is performed in-line here since only one type of
887 * session-bound condition is handled for the moment.
889 switch (lttng_condition_get_type(condition
)) {
890 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
891 if (!session_info
->rotation
.ongoing
) {
893 goto end_session_put
;
896 *evaluation
= lttng_evaluation_session_rotation_ongoing_create(
897 session_info
->rotation
.id
);
900 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
903 goto end_session_put
;
909 goto end_session_put
;
912 *session_uid
= session_info
->uid
;
913 *session_gid
= session_info
->gid
;
916 session_info_put(session_info
);
923 int evaluate_condition_for_client(const struct lttng_trigger
*trigger
,
924 const struct lttng_condition
*condition
,
925 struct notification_client
*client
,
926 struct notification_thread_state
*state
)
929 struct lttng_evaluation
*evaluation
= NULL
;
930 struct notification_client_list client_list
= {
931 .lock
= PTHREAD_MUTEX_INITIALIZER
,
933 struct notification_client_list_element client_list_element
= { 0 };
934 uid_t object_uid
= 0;
935 gid_t object_gid
= 0;
942 switch (get_condition_binding_object(condition
)) {
943 case LTTNG_OBJECT_TYPE_SESSION
:
944 ret
= evaluate_session_condition_for_client(condition
, state
,
945 &evaluation
, &object_uid
, &object_gid
);
947 case LTTNG_OBJECT_TYPE_CHANNEL
:
948 ret
= evaluate_channel_condition_for_client(condition
, state
,
949 &evaluation
, &object_uid
, &object_gid
);
951 case LTTNG_OBJECT_TYPE_NONE
:
954 case LTTNG_OBJECT_TYPE_UNKNOWN
:
964 /* Evaluation yielded nothing. Normal exit. */
965 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
971 * Create a temporary client list with the client currently
974 cds_lfht_node_init(&client_list
.notification_trigger_clients_ht_node
);
975 CDS_INIT_LIST_HEAD(&client_list
.list
);
976 client_list
.trigger
= trigger
;
978 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
979 client_list_element
.client
= client
;
980 cds_list_add(&client_list_element
.node
, &client_list
.list
);
982 /* Send evaluation result to the newly-subscribed client. */
983 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
984 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
985 state
, object_uid
, object_gid
);
992 int notification_thread_client_subscribe(struct notification_client
*client
,
993 struct lttng_condition
*condition
,
994 struct notification_thread_state
*state
,
995 enum lttng_notification_channel_status
*_status
)
998 struct notification_client_list
*client_list
= NULL
;
999 struct lttng_condition_list_element
*condition_list_element
= NULL
;
1000 struct notification_client_list_element
*client_list_element
= NULL
;
1001 enum lttng_notification_channel_status status
=
1002 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1005 * Ensure that the client has not already subscribed to this condition
1008 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
1009 if (lttng_condition_is_equal(condition_list_element
->condition
,
1011 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
1016 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
1017 if (!condition_list_element
) {
1021 client_list_element
= zmalloc(sizeof(*client_list_element
));
1022 if (!client_list_element
) {
1028 * Add the newly-subscribed condition to the client's subscription list.
1030 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
1031 condition_list_element
->condition
= condition
;
1032 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
1034 client_list
= get_client_list_from_condition(state
, condition
);
1037 * No notification-emiting trigger registered with this
1038 * condition. We don't evaluate the condition right away
1039 * since this trigger is not registered yet.
1041 free(client_list_element
);
1046 * The condition to which the client just subscribed is evaluated
1047 * at this point so that conditions that are already TRUE result
1048 * in a notification being sent out.
1050 * The client_list's trigger is used without locking the list itself.
1051 * This is correct since the list doesn't own the trigger and the
1052 * object is immutable.
1054 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
1056 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1058 free(client_list_element
);
1063 * Add the client to the list of clients interested in a given trigger
1064 * if a "notification" trigger with a corresponding condition was
1067 client_list_element
->client
= client
;
1068 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1070 pthread_mutex_lock(&client_list
->lock
);
1071 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1072 pthread_mutex_unlock(&client_list
->lock
);
1078 notification_client_list_put(client_list
);
1082 free(condition_list_element
);
1083 free(client_list_element
);
1088 int notification_thread_client_unsubscribe(
1089 struct notification_client
*client
,
1090 struct lttng_condition
*condition
,
1091 struct notification_thread_state
*state
,
1092 enum lttng_notification_channel_status
*_status
)
1094 struct notification_client_list
*client_list
;
1095 struct lttng_condition_list_element
*condition_list_element
,
1097 struct notification_client_list_element
*client_list_element
,
1099 bool condition_found
= false;
1100 enum lttng_notification_channel_status status
=
1101 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1103 /* Remove the condition from the client's condition list. */
1104 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
1105 &client
->condition_list
, node
) {
1106 if (!lttng_condition_is_equal(condition_list_element
->condition
,
1111 cds_list_del(&condition_list_element
->node
);
1113 * The caller may be iterating on the client's conditions to
1114 * tear down a client's connection. In this case, the condition
1115 * will be destroyed at the end.
1117 if (condition
!= condition_list_element
->condition
) {
1118 lttng_condition_destroy(
1119 condition_list_element
->condition
);
1121 free(condition_list_element
);
1122 condition_found
= true;
1126 if (!condition_found
) {
1127 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
1132 * Remove the client from the list of clients interested the trigger
1133 * matching the condition.
1135 client_list
= get_client_list_from_condition(state
, condition
);
1140 pthread_mutex_lock(&client_list
->lock
);
1141 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
1142 &client_list
->list
, node
) {
1143 if (client_list_element
->client
->id
!= client
->id
) {
1146 cds_list_del(&client_list_element
->node
);
1147 free(client_list_element
);
1150 pthread_mutex_unlock(&client_list
->lock
);
1151 notification_client_list_put(client_list
);
1154 lttng_condition_destroy(condition
);
1162 void free_notification_client_rcu(struct rcu_head
*node
)
1164 free(caa_container_of(node
, struct notification_client
, rcu_node
));
1168 void notification_client_destroy(struct notification_client
*client
,
1169 struct notification_thread_state
*state
)
1176 * The client object is not reachable by other threads, no need to lock
1179 if (client
->socket
>= 0) {
1180 (void) lttcomm_close_unix_sock(client
->socket
);
1181 client
->socket
= -1;
1183 client
->communication
.active
= false;
1184 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
1185 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
1186 pthread_mutex_destroy(&client
->lock
);
1187 call_rcu(&client
->rcu_node
, free_notification_client_rcu
);
1191 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1195 struct notification_client
*get_client_from_socket(int socket
,
1196 struct notification_thread_state
*state
)
1198 struct cds_lfht_iter iter
;
1199 struct cds_lfht_node
*node
;
1200 struct notification_client
*client
= NULL
;
1202 cds_lfht_lookup(state
->client_socket_ht
,
1203 hash_client_socket(socket
),
1204 match_client_socket
,
1205 (void *) (unsigned long) socket
,
1207 node
= cds_lfht_iter_get_node(&iter
);
1212 client
= caa_container_of(node
, struct notification_client
,
1213 client_socket_ht_node
);
1219 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1223 struct notification_client
*get_client_from_id(notification_client_id id
,
1224 struct notification_thread_state
*state
)
1226 struct cds_lfht_iter iter
;
1227 struct cds_lfht_node
*node
;
1228 struct notification_client
*client
= NULL
;
1230 cds_lfht_lookup(state
->client_id_ht
,
1235 node
= cds_lfht_iter_get_node(&iter
);
1240 client
= caa_container_of(node
, struct notification_client
,
1247 bool buffer_usage_condition_applies_to_channel(
1248 const struct lttng_condition
*condition
,
1249 const struct channel_info
*channel_info
)
1251 enum lttng_condition_status status
;
1252 enum lttng_domain_type condition_domain
;
1253 const char *condition_session_name
= NULL
;
1254 const char *condition_channel_name
= NULL
;
1256 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
1258 assert(status
== LTTNG_CONDITION_STATUS_OK
);
1259 if (channel_info
->key
.domain
!= condition_domain
) {
1263 status
= lttng_condition_buffer_usage_get_session_name(
1264 condition
, &condition_session_name
);
1265 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1267 status
= lttng_condition_buffer_usage_get_channel_name(
1268 condition
, &condition_channel_name
);
1269 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1271 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1274 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1284 bool session_consumed_size_condition_applies_to_channel(
1285 const struct lttng_condition
*condition
,
1286 const struct channel_info
*channel_info
)
1288 enum lttng_condition_status status
;
1289 const char *condition_session_name
= NULL
;
1291 status
= lttng_condition_session_consumed_size_get_session_name(
1292 condition
, &condition_session_name
);
1293 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1295 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1305 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1306 const struct channel_info
*channel_info
)
1308 const struct lttng_condition
*condition
;
1309 bool trigger_applies
;
1311 condition
= lttng_trigger_get_const_condition(trigger
);
1316 switch (lttng_condition_get_type(condition
)) {
1317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1318 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1319 trigger_applies
= buffer_usage_condition_applies_to_channel(
1320 condition
, channel_info
);
1322 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1323 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1324 condition
, channel_info
);
1330 return trigger_applies
;
1336 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1337 struct notification_client
*client
)
1339 bool applies
= false;
1340 struct lttng_condition_list_element
*condition_list_element
;
1342 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1344 applies
= lttng_condition_is_equal(
1345 condition_list_element
->condition
,
1346 lttng_trigger_get_condition(trigger
));
1354 /* Must be called with RCU read lock held. */
1356 struct lttng_session_trigger_list
*get_session_trigger_list(
1357 struct notification_thread_state
*state
,
1358 const char *session_name
)
1360 struct lttng_session_trigger_list
*list
= NULL
;
1361 struct cds_lfht_node
*node
;
1362 struct cds_lfht_iter iter
;
1364 cds_lfht_lookup(state
->session_triggers_ht
,
1365 hash_key_str(session_name
, lttng_ht_seed
),
1366 match_session_trigger_list
,
1369 node
= cds_lfht_iter_get_node(&iter
);
1372 * Not an error, the list of triggers applying to that session
1373 * will be initialized when the session is created.
1375 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1380 list
= caa_container_of(node
,
1381 struct lttng_session_trigger_list
,
1382 session_triggers_ht_node
);
1388 * Allocate an empty lttng_session_trigger_list for the session named
1391 * No ownership of 'session_name' is assumed by the session trigger list.
1392 * It is the caller's responsability to ensure the session name is alive
1393 * for as long as this list is.
1396 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1397 const char *session_name
,
1398 struct cds_lfht
*session_triggers_ht
)
1400 struct lttng_session_trigger_list
*list
;
1402 list
= zmalloc(sizeof(*list
));
1406 list
->session_name
= session_name
;
1407 CDS_INIT_LIST_HEAD(&list
->list
);
1408 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1409 list
->session_triggers_ht
= session_triggers_ht
;
1412 /* Publish the list through the session_triggers_ht. */
1413 cds_lfht_add(session_triggers_ht
,
1414 hash_key_str(session_name
, lttng_ht_seed
),
1415 &list
->session_triggers_ht_node
);
1422 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1424 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1429 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1431 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1433 /* Empty the list element by element, and then free the list itself. */
1434 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1435 &list
->list
, node
) {
1436 cds_list_del(&trigger_list_element
->node
);
1437 free(trigger_list_element
);
1440 /* Unpublish the list from the session_triggers_ht. */
1441 cds_lfht_del(list
->session_triggers_ht
,
1442 &list
->session_triggers_ht_node
);
1444 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1448 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1449 struct lttng_trigger
*trigger
)
1452 struct lttng_trigger_list_element
*new_element
=
1453 zmalloc(sizeof(*new_element
));
1459 CDS_INIT_LIST_HEAD(&new_element
->node
);
1460 new_element
->trigger
= trigger
;
1461 cds_list_add(&new_element
->node
, &list
->list
);
1467 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1468 const char *session_name
)
1470 bool applies
= false;
1471 const struct lttng_condition
*condition
;
1473 condition
= lttng_trigger_get_const_condition(trigger
);
1474 switch (lttng_condition_get_type(condition
)) {
1475 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1476 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1478 enum lttng_condition_status condition_status
;
1479 const char *condition_session_name
;
1481 condition_status
= lttng_condition_session_rotation_get_session_name(
1482 condition
, &condition_session_name
);
1483 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1484 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1488 assert(condition_session_name
);
1489 applies
= !strcmp(condition_session_name
, session_name
);
1500 * Allocate and initialize an lttng_session_trigger_list which contains
1501 * all triggers that apply to the session named 'session_name'.
1503 * No ownership of 'session_name' is assumed by the session trigger list.
1504 * It is the caller's responsability to ensure the session name is alive
1505 * for as long as this list is.
1508 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1509 const struct notification_thread_state
*state
,
1510 const char *session_name
)
1512 int trigger_count
= 0;
1513 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1514 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1515 struct cds_lfht_iter iter
;
1517 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1518 state
->session_triggers_ht
);
1520 /* Add all triggers applying to the session named 'session_name'. */
1521 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1525 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1530 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1531 trigger_ht_element
->trigger
);
1539 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1541 return session_trigger_list
;
1543 lttng_session_trigger_list_destroy(session_trigger_list
);
1548 struct session_info
*find_or_create_session_info(
1549 struct notification_thread_state
*state
,
1550 const char *name
, uid_t uid
, gid_t gid
)
1552 struct session_info
*session
= NULL
;
1553 struct cds_lfht_node
*node
;
1554 struct cds_lfht_iter iter
;
1555 struct lttng_session_trigger_list
*trigger_list
;
1558 cds_lfht_lookup(state
->sessions_ht
,
1559 hash_key_str(name
, lttng_ht_seed
),
1563 node
= cds_lfht_iter_get_node(&iter
);
1565 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1567 session
= caa_container_of(node
, struct session_info
,
1569 assert(session
->uid
== uid
);
1570 assert(session
->gid
== gid
);
1571 session_info_get(session
);
1575 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1576 if (!trigger_list
) {
1580 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1581 state
->sessions_ht
);
1583 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1585 lttng_session_trigger_list_destroy(trigger_list
);
1588 trigger_list
= NULL
;
1590 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1591 &session
->sessions_ht_node
);
1597 session_info_put(session
);
1602 int handle_notification_thread_command_add_channel(
1603 struct notification_thread_state
*state
,
1604 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1605 const char *channel_name
, enum lttng_domain_type channel_domain
,
1606 uint64_t channel_key_int
, uint64_t channel_capacity
,
1607 enum lttng_error_code
*cmd_result
)
1609 struct cds_list_head trigger_list
;
1610 struct channel_info
*new_channel_info
= NULL
;
1611 struct channel_key channel_key
= {
1612 .key
= channel_key_int
,
1613 .domain
= channel_domain
,
1615 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1616 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1617 int trigger_count
= 0;
1618 struct cds_lfht_iter iter
;
1619 struct session_info
*session_info
= NULL
;
1621 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1622 channel_name
, session_name
, channel_key_int
,
1623 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1625 CDS_INIT_LIST_HEAD(&trigger_list
);
1627 session_info
= find_or_create_session_info(state
, session_name
,
1628 session_uid
, session_gid
);
1629 if (!session_info
) {
1630 /* Allocation error or an internal error occurred. */
1634 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1635 channel_capacity
, session_info
);
1636 if (!new_channel_info
) {
1641 /* Build a list of all triggers applying to the new channel. */
1642 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1644 struct lttng_trigger_list_element
*new_element
;
1646 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1647 new_channel_info
)) {
1651 new_element
= zmalloc(sizeof(*new_element
));
1656 CDS_INIT_LIST_HEAD(&new_element
->node
);
1657 new_element
->trigger
= trigger_ht_element
->trigger
;
1658 cds_list_add(&new_element
->node
, &trigger_list
);
1663 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1665 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1666 if (!channel_trigger_list
) {
1669 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1670 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1671 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1672 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1675 /* Add channel to the channel_ht which owns the channel_infos. */
1676 cds_lfht_add(state
->channels_ht
,
1677 hash_channel_key(&new_channel_info
->key
),
1678 &new_channel_info
->channels_ht_node
);
1680 * Add the list of triggers associated with this channel to the
1681 * channel_triggers_ht.
1683 cds_lfht_add(state
->channel_triggers_ht
,
1684 hash_channel_key(&new_channel_info
->key
),
1685 &channel_trigger_list
->channel_triggers_ht_node
);
1687 session_info_put(session_info
);
1688 *cmd_result
= LTTNG_OK
;
1691 channel_info_destroy(new_channel_info
);
1692 session_info_put(session_info
);
1697 void free_channel_trigger_list_rcu(struct rcu_head
*node
)
1699 free(caa_container_of(node
, struct lttng_channel_trigger_list
,
1704 void free_channel_state_sample_rcu(struct rcu_head
*node
)
1706 free(caa_container_of(node
, struct channel_state_sample
,
1711 int handle_notification_thread_command_remove_channel(
1712 struct notification_thread_state
*state
,
1713 uint64_t channel_key
, enum lttng_domain_type domain
,
1714 enum lttng_error_code
*cmd_result
)
1716 struct cds_lfht_node
*node
;
1717 struct cds_lfht_iter iter
;
1718 struct lttng_channel_trigger_list
*trigger_list
;
1719 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1720 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1721 struct channel_info
*channel_info
;
1723 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1724 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1728 cds_lfht_lookup(state
->channel_triggers_ht
,
1729 hash_channel_key(&key
),
1730 match_channel_trigger_list
,
1733 node
= cds_lfht_iter_get_node(&iter
);
1735 * There is a severe internal error if we are being asked to remove a
1736 * channel that doesn't exist.
1739 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1743 /* Free the list of triggers associated with this channel. */
1744 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1745 channel_triggers_ht_node
);
1746 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1747 &trigger_list
->list
, node
) {
1748 cds_list_del(&trigger_list_element
->node
);
1749 free(trigger_list_element
);
1751 cds_lfht_del(state
->channel_triggers_ht
, node
);
1752 call_rcu(&trigger_list
->rcu_node
, free_channel_trigger_list_rcu
);
1754 /* Free sampled channel state. */
1755 cds_lfht_lookup(state
->channel_state_ht
,
1756 hash_channel_key(&key
),
1757 match_channel_state_sample
,
1760 node
= cds_lfht_iter_get_node(&iter
);
1762 * This is expected to be NULL if the channel is destroyed before we
1763 * received a sample.
1766 struct channel_state_sample
*sample
= caa_container_of(node
,
1767 struct channel_state_sample
,
1768 channel_state_ht_node
);
1770 cds_lfht_del(state
->channel_state_ht
, node
);
1771 call_rcu(&sample
->rcu_node
, free_channel_state_sample_rcu
);
1774 /* Remove the channel from the channels_ht and free it. */
1775 cds_lfht_lookup(state
->channels_ht
,
1776 hash_channel_key(&key
),
1780 node
= cds_lfht_iter_get_node(&iter
);
1782 channel_info
= caa_container_of(node
, struct channel_info
,
1784 cds_lfht_del(state
->channels_ht
, node
);
1785 channel_info_destroy(channel_info
);
1788 *cmd_result
= LTTNG_OK
;
1793 int handle_notification_thread_command_session_rotation(
1794 struct notification_thread_state
*state
,
1795 enum notification_thread_command_type cmd_type
,
1796 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1797 uint64_t trace_archive_chunk_id
,
1798 struct lttng_trace_archive_location
*location
,
1799 enum lttng_error_code
*_cmd_result
)
1802 enum lttng_error_code cmd_result
= LTTNG_OK
;
1803 struct lttng_session_trigger_list
*trigger_list
;
1804 struct lttng_trigger_list_element
*trigger_list_element
;
1805 struct session_info
*session_info
;
1806 const struct lttng_credentials session_creds
= {
1813 session_info
= find_or_create_session_info(state
, session_name
,
1814 session_uid
, session_gid
);
1815 if (!session_info
) {
1816 /* Allocation error or an internal error occurred. */
1818 cmd_result
= LTTNG_ERR_NOMEM
;
1822 session_info
->rotation
.ongoing
=
1823 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1824 session_info
->rotation
.id
= trace_archive_chunk_id
;
1825 trigger_list
= get_session_trigger_list(state
, session_name
);
1826 if (!trigger_list
) {
1827 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1832 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1834 const struct lttng_condition
*condition
;
1835 const struct lttng_action
*action
;
1836 struct lttng_trigger
*trigger
;
1837 struct notification_client_list
*client_list
;
1838 struct lttng_evaluation
*evaluation
= NULL
;
1839 enum lttng_condition_type condition_type
;
1840 bool client_list_is_empty
;
1841 enum action_executor_status executor_status
;
1843 trigger
= trigger_list_element
->trigger
;
1844 condition
= lttng_trigger_get_const_condition(trigger
);
1846 condition_type
= lttng_condition_get_type(condition
);
1848 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1849 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1851 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1852 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1856 action
= lttng_trigger_get_const_action(trigger
);
1858 /* Notify actions are the only type currently supported. */
1859 assert(lttng_action_get_type_const(action
) ==
1860 LTTNG_ACTION_TYPE_NOTIFY
);
1862 client_list
= get_client_list_from_condition(state
, condition
);
1863 assert(client_list
);
1865 pthread_mutex_lock(&client_list
->lock
);
1866 client_list_is_empty
= cds_list_empty(&client_list
->list
);
1867 pthread_mutex_unlock(&client_list
->lock
);
1868 if (client_list_is_empty
) {
1870 * No clients interested in the evaluation's result,
1876 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1877 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1878 trace_archive_chunk_id
);
1880 evaluation
= lttng_evaluation_session_rotation_completed_create(
1881 trace_archive_chunk_id
, location
);
1885 /* Internal error */
1887 cmd_result
= LTTNG_ERR_UNK
;
1892 * Ownership of `evaluation` transferred to the action executor
1893 * no matter the result.
1895 executor_status
= action_executor_enqueue(state
->executor
,
1896 trigger
, evaluation
, &session_creds
,
1899 switch (executor_status
) {
1900 case ACTION_EXECUTOR_STATUS_OK
:
1902 case ACTION_EXECUTOR_STATUS_ERROR
:
1903 case ACTION_EXECUTOR_STATUS_INVALID
:
1905 * TODO Add trigger identification (name/id) when
1906 * it is added to the API.
1908 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1911 case ACTION_EXECUTOR_STATUS_OVERFLOW
:
1913 * TODO Add trigger identification (name/id) when
1914 * it is added to the API.
1916 * Not a fatal error.
1918 WARN("No space left when enqueuing action associated with session rotation trigger");
1926 notification_client_list_put(client_list
);
1927 if (caa_unlikely(ret
)) {
1932 session_info_put(session_info
);
1933 *_cmd_result
= cmd_result
;
1939 int condition_is_supported(struct lttng_condition
*condition
)
1943 switch (lttng_condition_get_type(condition
)) {
1944 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1945 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1947 enum lttng_domain_type domain
;
1949 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
1956 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
1962 * Older kernel tracers don't expose the API to monitor their
1963 * buffers. Therefore, we reject triggers that require that
1964 * mechanism to be available to be evaluated.
1966 ret
= kernel_supports_ring_buffer_snapshot_sample_positions();
1976 /* Must be called with RCU read lock held. */
1978 int bind_trigger_to_matching_session(struct lttng_trigger
*trigger
,
1979 struct notification_thread_state
*state
)
1982 const struct lttng_condition
*condition
;
1983 const char *session_name
;
1984 struct lttng_session_trigger_list
*trigger_list
;
1986 condition
= lttng_trigger_get_const_condition(trigger
);
1987 switch (lttng_condition_get_type(condition
)) {
1988 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1989 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1991 enum lttng_condition_status status
;
1993 status
= lttng_condition_session_rotation_get_session_name(
1994 condition
, &session_name
);
1995 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
1996 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2007 trigger_list
= get_session_trigger_list(state
, session_name
);
2008 if (!trigger_list
) {
2009 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2015 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2017 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
2022 /* Must be called with RCU read lock held. */
2024 int bind_trigger_to_matching_channels(struct lttng_trigger
*trigger
,
2025 struct notification_thread_state
*state
)
2028 struct cds_lfht_node
*node
;
2029 struct cds_lfht_iter iter
;
2030 struct channel_info
*channel
;
2032 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
2034 struct lttng_trigger_list_element
*trigger_list_element
;
2035 struct lttng_channel_trigger_list
*trigger_list
;
2036 struct cds_lfht_iter lookup_iter
;
2038 if (!trigger_applies_to_channel(trigger
, channel
)) {
2042 cds_lfht_lookup(state
->channel_triggers_ht
,
2043 hash_channel_key(&channel
->key
),
2044 match_channel_trigger_list
,
2047 node
= cds_lfht_iter_get_node(&lookup_iter
);
2049 trigger_list
= caa_container_of(node
,
2050 struct lttng_channel_trigger_list
,
2051 channel_triggers_ht_node
);
2053 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
2054 if (!trigger_list_element
) {
2058 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
2059 trigger_list_element
->trigger
= trigger
;
2060 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
2061 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2069 * FIXME A client's credentials are not checked when registering a trigger, nor
2070 * are they stored alongside with the trigger.
2072 * The effects of this are benign since:
2073 * - The client will succeed in registering the trigger, as it is valid,
2074 * - The trigger will, internally, be bound to the channel/session,
2075 * - The notifications will not be sent since the client's credentials
2076 * are checked against the channel at that moment.
2078 * If this function returns a non-zero value, it means something is
2079 * fundamentally broken and the whole subsystem/thread will be torn down.
2081 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2085 int handle_notification_thread_command_register_trigger(
2086 struct notification_thread_state
*state
,
2087 struct lttng_trigger
*trigger
,
2088 enum lttng_error_code
*cmd_result
)
2091 struct lttng_condition
*condition
;
2092 struct notification_client
*client
;
2093 struct notification_client_list
*client_list
= NULL
;
2094 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2095 struct notification_client_list_element
*client_list_element
, *tmp
;
2096 struct cds_lfht_node
*node
;
2097 struct cds_lfht_iter iter
;
2098 bool free_trigger
= true;
2102 condition
= lttng_trigger_get_condition(trigger
);
2105 ret
= condition_is_supported(condition
);
2108 } else if (ret
== 0) {
2109 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2112 /* Feature is supported, continue. */
2116 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
2117 if (!trigger_ht_element
) {
2122 /* Add trigger to the trigger_ht. */
2123 cds_lfht_node_init(&trigger_ht_element
->node
);
2124 trigger_ht_element
->trigger
= trigger
;
2126 node
= cds_lfht_add_unique(state
->triggers_ht
,
2127 lttng_condition_hash(condition
),
2130 &trigger_ht_element
->node
);
2131 if (node
!= &trigger_ht_element
->node
) {
2132 /* Not a fatal error, simply report it to the client. */
2133 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2134 goto error_free_ht_element
;
2138 * Ownership of the trigger and of its wrapper was transfered to
2141 trigger_ht_element
= NULL
;
2142 free_trigger
= false;
2145 * The rest only applies to triggers that have a "notify" action.
2146 * It is not skipped as this is the only action type currently
2149 client_list
= notification_client_list_create(trigger
);
2152 goto error_free_ht_element
;
2155 /* Build a list of clients to which this new trigger applies. */
2156 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2157 client_socket_ht_node
) {
2158 if (!trigger_applies_to_client(trigger
, client
)) {
2162 client_list_element
= zmalloc(sizeof(*client_list_element
));
2163 if (!client_list_element
) {
2165 goto error_put_client_list
;
2167 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
2168 client_list_element
->client
= client
;
2169 cds_list_add(&client_list_element
->node
, &client_list
->list
);
2172 switch (get_condition_binding_object(condition
)) {
2173 case LTTNG_OBJECT_TYPE_SESSION
:
2174 /* Add the trigger to the list if it matches a known session. */
2175 ret
= bind_trigger_to_matching_session(trigger
, state
);
2177 goto error_put_client_list
;
2180 case LTTNG_OBJECT_TYPE_CHANNEL
:
2182 * Add the trigger to list of triggers bound to the channels
2185 ret
= bind_trigger_to_matching_channels(trigger
, state
);
2187 goto error_put_client_list
;
2190 case LTTNG_OBJECT_TYPE_NONE
:
2193 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2195 goto error_put_client_list
;
2199 * Since there is nothing preventing clients from subscribing to a
2200 * condition before the corresponding trigger is registered, we have
2201 * to evaluate this new condition right away.
2203 * At some point, we were waiting for the next "evaluation" (e.g. on
2204 * reception of a channel sample) to evaluate this new condition, but
2207 * The reason it was broken is that waiting for the next sample
2208 * does not allow us to properly handle transitions for edge-triggered
2211 * Consider this example: when we handle a new channel sample, we
2212 * evaluate each conditions twice: once with the previous state, and
2213 * again with the newest state. We then use those two results to
2214 * determine whether a state change happened: a condition was false and
2215 * became true. If a state change happened, we have to notify clients.
2217 * Now, if a client subscribes to a given notification and registers
2218 * a trigger *after* that subscription, we have to make sure the
2219 * condition is evaluated at this point while considering only the
2220 * current state. Otherwise, the next evaluation cycle may only see
2221 * that the evaluations remain the same (true for samples n-1 and n) and
2222 * the client will never know that the condition has been met.
2224 * No need to lock the list here as it has not been published yet.
2226 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2227 &client_list
->list
, node
) {
2228 ret
= evaluate_condition_for_client(trigger
, condition
,
2229 client_list_element
->client
, state
);
2231 goto error_put_client_list
;
2236 * Client list ownership transferred to the
2237 * notification_trigger_clients_ht.
2239 publish_notification_client_list(state
, client_list
);
2242 *cmd_result
= LTTNG_OK
;
2244 error_put_client_list
:
2245 notification_client_list_put(client_list
);
2247 error_free_ht_element
:
2248 free(trigger_ht_element
);
2251 lttng_trigger_destroy(trigger
);
2258 void free_lttng_trigger_ht_element_rcu(struct rcu_head
*node
)
2260 free(caa_container_of(node
, struct lttng_trigger_ht_element
,
2265 int handle_notification_thread_command_unregister_trigger(
2266 struct notification_thread_state
*state
,
2267 struct lttng_trigger
*trigger
,
2268 enum lttng_error_code
*_cmd_reply
)
2270 struct cds_lfht_iter iter
;
2271 struct cds_lfht_node
*triggers_ht_node
;
2272 struct lttng_channel_trigger_list
*trigger_list
;
2273 struct notification_client_list
*client_list
;
2274 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2275 struct lttng_condition
*condition
= lttng_trigger_get_condition(
2277 enum lttng_error_code cmd_reply
;
2281 cds_lfht_lookup(state
->triggers_ht
,
2282 lttng_condition_hash(condition
),
2286 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2287 if (!triggers_ht_node
) {
2288 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2291 cmd_reply
= LTTNG_OK
;
2294 /* Remove trigger from channel_triggers_ht. */
2295 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2296 channel_triggers_ht_node
) {
2297 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2299 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2300 &trigger_list
->list
, node
) {
2301 const struct lttng_condition
*current_condition
=
2302 lttng_trigger_get_const_condition(
2303 trigger_element
->trigger
);
2305 assert(current_condition
);
2306 if (!lttng_condition_is_equal(condition
,
2307 current_condition
)) {
2311 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2312 cds_list_del(&trigger_element
->node
);
2313 /* A trigger can only appear once per channel */
2319 * Remove and release the client list from
2320 * notification_trigger_clients_ht.
2322 client_list
= get_client_list_from_condition(state
, condition
);
2323 assert(client_list
);
2325 /* Put new reference and the hashtable's reference. */
2326 notification_client_list_put(client_list
);
2327 notification_client_list_put(client_list
);
2330 /* Remove trigger from triggers_ht. */
2331 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2332 struct lttng_trigger_ht_element
, node
);
2333 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2335 /* Release the ownership of the trigger. */
2336 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2337 call_rcu(&trigger_ht_element
->rcu_node
, free_lttng_trigger_ht_element_rcu
);
2341 *_cmd_reply
= cmd_reply
;
2346 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2347 int handle_notification_thread_command(
2348 struct notification_thread_handle
*handle
,
2349 struct notification_thread_state
*state
)
2353 struct notification_thread_command
*cmd
;
2355 /* Read the event pipe to put it back into a quiescent state. */
2356 ret
= lttng_read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2358 if (ret
!= sizeof(counter
)) {
2362 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2363 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2364 struct notification_thread_command
, cmd_list_node
);
2365 cds_list_del(&cmd
->cmd_list_node
);
2366 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2367 switch (cmd
->type
) {
2368 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2369 DBG("[notification-thread] Received register trigger command");
2370 ret
= handle_notification_thread_command_register_trigger(
2371 state
, cmd
->parameters
.trigger
,
2374 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2375 DBG("[notification-thread] Received unregister trigger command");
2376 ret
= handle_notification_thread_command_unregister_trigger(
2377 state
, cmd
->parameters
.trigger
,
2380 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2381 DBG("[notification-thread] Received add channel command");
2382 ret
= handle_notification_thread_command_add_channel(
2384 cmd
->parameters
.add_channel
.session
.name
,
2385 cmd
->parameters
.add_channel
.session
.uid
,
2386 cmd
->parameters
.add_channel
.session
.gid
,
2387 cmd
->parameters
.add_channel
.channel
.name
,
2388 cmd
->parameters
.add_channel
.channel
.domain
,
2389 cmd
->parameters
.add_channel
.channel
.key
,
2390 cmd
->parameters
.add_channel
.channel
.capacity
,
2393 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2394 DBG("[notification-thread] Received remove channel command");
2395 ret
= handle_notification_thread_command_remove_channel(
2396 state
, cmd
->parameters
.remove_channel
.key
,
2397 cmd
->parameters
.remove_channel
.domain
,
2400 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2401 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2402 DBG("[notification-thread] Received session rotation %s command",
2403 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2404 "ongoing" : "completed");
2405 ret
= handle_notification_thread_command_session_rotation(
2408 cmd
->parameters
.session_rotation
.session_name
,
2409 cmd
->parameters
.session_rotation
.uid
,
2410 cmd
->parameters
.session_rotation
.gid
,
2411 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2412 cmd
->parameters
.session_rotation
.location
,
2415 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2416 DBG("[notification-thread] Received quit command");
2417 cmd
->reply_code
= LTTNG_OK
;
2420 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE
:
2422 const enum client_transmission_status client_status
=
2423 cmd
->parameters
.client_communication_update
2425 const notification_client_id client_id
=
2426 cmd
->parameters
.client_communication_update
.id
;
2427 struct notification_client
*client
;
2430 client
= get_client_from_id(client_id
, state
);
2434 * Client error was probably already picked-up by the
2435 * notification thread or it has disconnected
2436 * gracefully while this command was queued.
2438 DBG("Failed to find notification client to update communication status, client id = %" PRIu64
,
2442 pthread_mutex_lock(&client
->lock
);
2443 ret
= client_handle_transmission_status(
2444 client
, client_status
, state
);
2445 pthread_mutex_unlock(&client
->lock
);
2451 ERR("[notification-thread] Unknown internal command received");
2459 if (cmd
->is_async
) {
2463 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2467 /* Wake-up and return a fatal error to the calling thread. */
2468 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2469 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2471 /* Indicate a fatal error to the caller. */
2476 int socket_set_non_blocking(int socket
)
2480 /* Set the pipe as non-blocking. */
2481 ret
= fcntl(socket
, F_GETFL
, 0);
2483 PERROR("fcntl get socket flags");
2488 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
2490 PERROR("fcntl set O_NONBLOCK socket flag");
2493 DBG("Client socket (fd = %i) set as non-blocking", socket
);
2498 /* Client lock must be acquired by caller. */
2500 int client_reset_inbound_state(struct notification_client
*client
)
2504 ASSERT_LOCKED(client
->lock
);
2506 ret
= lttng_dynamic_buffer_set_size(
2507 &client
->communication
.inbound
.buffer
, 0);
2510 client
->communication
.inbound
.bytes_to_receive
=
2511 sizeof(struct lttng_notification_channel_message
);
2512 client
->communication
.inbound
.msg_type
=
2513 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
2514 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
2515 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
2516 ret
= lttng_dynamic_buffer_set_size(
2517 &client
->communication
.inbound
.buffer
,
2518 client
->communication
.inbound
.bytes_to_receive
);
2522 int handle_notification_thread_client_connect(
2523 struct notification_thread_state
*state
)
2526 struct notification_client
*client
;
2528 DBG("[notification-thread] Handling new notification channel client connection");
2530 client
= zmalloc(sizeof(*client
));
2536 pthread_mutex_init(&client
->lock
, NULL
);
2537 client
->id
= state
->next_notification_client_id
++;
2538 CDS_INIT_LIST_HEAD(&client
->condition_list
);
2539 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
2540 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
2541 client
->communication
.inbound
.expect_creds
= true;
2543 pthread_mutex_lock(&client
->lock
);
2544 ret
= client_reset_inbound_state(client
);
2545 pthread_mutex_unlock(&client
->lock
);
2547 ERR("[notification-thread] Failed to reset client communication's inbound state");
2552 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
2554 ERR("[notification-thread] Failed to accept new notification channel client connection");
2559 client
->socket
= ret
;
2561 ret
= socket_set_non_blocking(client
->socket
);
2563 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2567 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
2569 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2574 ret
= lttng_poll_add(&state
->events
, client
->socket
,
2575 LPOLLIN
| LPOLLERR
|
2576 LPOLLHUP
| LPOLLRDHUP
);
2578 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2582 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2586 cds_lfht_add(state
->client_socket_ht
,
2587 hash_client_socket(client
->socket
),
2588 &client
->client_socket_ht_node
);
2589 cds_lfht_add(state
->client_id_ht
,
2590 hash_client_id(client
->id
),
2591 &client
->client_id_ht_node
);
2596 notification_client_destroy(client
, state
);
2600 /* RCU read-lock must be held by the caller. */
2601 /* Client lock must be held by the caller */
2603 int notification_thread_client_disconnect(
2604 struct notification_client
*client
,
2605 struct notification_thread_state
*state
)
2608 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
2610 /* Acquire the client lock to disable its communication atomically. */
2611 client
->communication
.active
= false;
2612 ret
= lttng_poll_del(&state
->events
, client
->socket
);
2614 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2618 cds_lfht_del(state
->client_socket_ht
, &client
->client_socket_ht_node
);
2619 cds_lfht_del(state
->client_id_ht
, &client
->client_id_ht_node
);
2621 /* Release all conditions to which the client was subscribed. */
2622 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
2623 &client
->condition_list
, node
) {
2624 (void) notification_thread_client_unsubscribe(client
,
2625 condition_list_element
->condition
, state
, NULL
);
2629 * Client no longer accessible to other threads (through the
2632 notification_client_destroy(client
, state
);
2636 int handle_notification_thread_client_disconnect(
2637 int client_socket
, struct notification_thread_state
*state
)
2640 struct notification_client
*client
;
2643 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2645 client
= get_client_from_socket(client_socket
, state
);
2647 /* Internal state corruption, fatal error. */
2648 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2654 pthread_mutex_lock(&client
->lock
);
2655 ret
= notification_thread_client_disconnect(client
, state
);
2656 pthread_mutex_unlock(&client
->lock
);
2662 int handle_notification_thread_client_disconnect_all(
2663 struct notification_thread_state
*state
)
2665 struct cds_lfht_iter iter
;
2666 struct notification_client
*client
;
2667 bool error_encoutered
= false;
2670 DBG("[notification-thread] Closing all client connections");
2671 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2672 client_socket_ht_node
) {
2675 pthread_mutex_lock(&client
->lock
);
2676 ret
= notification_thread_client_disconnect(
2678 pthread_mutex_unlock(&client
->lock
);
2680 error_encoutered
= true;
2684 return error_encoutered
? 1 : 0;
2687 int handle_notification_thread_trigger_unregister_all(
2688 struct notification_thread_state
*state
)
2690 bool error_occurred
= false;
2691 struct cds_lfht_iter iter
;
2692 struct lttng_trigger_ht_element
*trigger_ht_element
;
2695 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
2697 int ret
= handle_notification_thread_command_unregister_trigger(
2698 state
, trigger_ht_element
->trigger
, NULL
);
2700 error_occurred
= true;
2704 return error_occurred
? -1 : 0;
2708 int client_handle_transmission_status(
2709 struct notification_client
*client
,
2710 enum client_transmission_status transmission_status
,
2711 struct notification_thread_state
*state
)
2715 switch (transmission_status
) {
2716 case CLIENT_TRANSMISSION_STATUS_COMPLETE
:
2717 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2718 CLIENT_POLL_MASK_IN
);
2723 client
->communication
.outbound
.queued_command_reply
= false;
2724 client
->communication
.outbound
.dropped_notification
= false;
2726 case CLIENT_TRANSMISSION_STATUS_QUEUED
:
2728 * We want to be notified whenever there is buffer space
2729 * available to send the rest of the payload.
2731 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2732 CLIENT_POLL_MASK_IN_OUT
);
2737 case CLIENT_TRANSMISSION_STATUS_FAIL
:
2738 ret
= notification_thread_client_disconnect(client
, state
);
2743 case CLIENT_TRANSMISSION_STATUS_ERROR
:
2753 /* Client lock must be acquired by caller. */
2755 enum client_transmission_status
client_flush_outgoing_queue(
2756 struct notification_client
*client
)
2759 size_t to_send_count
;
2760 enum client_transmission_status status
;
2762 ASSERT_LOCKED(client
->lock
);
2764 if (!client
->communication
.active
) {
2765 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
2769 assert(client
->communication
.outbound
.buffer
.size
!= 0);
2770 to_send_count
= client
->communication
.outbound
.buffer
.size
;
2771 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2774 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
2775 client
->communication
.outbound
.buffer
.data
,
2777 if ((ret
>= 0 && ret
< to_send_count
)) {
2778 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2780 to_send_count
-= max(ret
, 0);
2782 memcpy(client
->communication
.outbound
.buffer
.data
,
2783 client
->communication
.outbound
.buffer
.data
+
2784 client
->communication
.outbound
.buffer
.size
- to_send_count
,
2786 ret
= lttng_dynamic_buffer_set_size(
2787 &client
->communication
.outbound
.buffer
,
2792 status
= CLIENT_TRANSMISSION_STATUS_QUEUED
;
2793 } else if (ret
< 0) {
2794 /* Generic error, disconnect the client. */
2795 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2797 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
2799 /* No error and flushed the queue completely. */
2800 ret
= lttng_dynamic_buffer_set_size(
2801 &client
->communication
.outbound
.buffer
, 0);
2805 status
= CLIENT_TRANSMISSION_STATUS_COMPLETE
;
2810 return CLIENT_TRANSMISSION_STATUS_ERROR
;
2813 /* Client lock must be acquired by caller. */
2815 int client_send_command_reply(struct notification_client
*client
,
2816 struct notification_thread_state
*state
,
2817 enum lttng_notification_channel_status status
)
2820 struct lttng_notification_channel_command_reply reply
= {
2821 .status
= (int8_t) status
,
2823 struct lttng_notification_channel_message msg
= {
2824 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
2825 .size
= sizeof(reply
),
2827 char buffer
[sizeof(msg
) + sizeof(reply
)];
2828 enum client_transmission_status transmission_status
;
2830 ASSERT_LOCKED(client
->lock
);
2832 if (client
->communication
.outbound
.queued_command_reply
) {
2833 /* Protocol error. */
2837 memcpy(buffer
, &msg
, sizeof(msg
));
2838 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
2839 DBG("[notification-thread] Send command reply (%i)", (int) status
);
2841 /* Enqueue buffer to outgoing queue and flush it. */
2842 ret
= lttng_dynamic_buffer_append(
2843 &client
->communication
.outbound
.buffer
,
2844 buffer
, sizeof(buffer
));
2849 transmission_status
= client_flush_outgoing_queue(client
);
2850 ret
= client_handle_transmission_status(
2851 client
, transmission_status
, state
);
2856 if (client
->communication
.outbound
.buffer
.size
!= 0) {
2857 /* Queue could not be emptied. */
2858 client
->communication
.outbound
.queued_command_reply
= true;
2867 int client_handle_message_unknown(struct notification_client
*client
,
2868 struct notification_thread_state
*state
)
2872 pthread_mutex_lock(&client
->lock
);
2875 * Receiving message header. The function will be called again
2876 * once the rest of the message as been received and can be
2879 const struct lttng_notification_channel_message
*msg
;
2881 assert(sizeof(*msg
) == client
->communication
.inbound
.buffer
.size
);
2882 msg
= (const struct lttng_notification_channel_message
*)
2883 client
->communication
.inbound
.buffer
.data
;
2885 if (msg
->size
== 0 ||
2886 msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
2887 ERR("[notification-thread] Invalid notification channel message: length = %u",
2893 switch (msg
->type
) {
2894 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2895 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2896 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2900 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2904 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
2905 client
->communication
.inbound
.msg_type
=
2906 (enum lttng_notification_channel_message_type
) msg
->type
;
2907 ret
= lttng_dynamic_buffer_set_size(
2908 &client
->communication
.inbound
.buffer
, msg
->size
);
2910 pthread_mutex_unlock(&client
->lock
);
2915 int client_handle_message_handshake(struct notification_client
*client
,
2916 struct notification_thread_state
*state
)
2919 struct lttng_notification_channel_command_handshake
*handshake_client
;
2920 const struct lttng_notification_channel_command_handshake handshake_reply
= {
2921 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
2922 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
2924 const struct lttng_notification_channel_message msg_header
= {
2925 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
2926 .size
= sizeof(handshake_reply
),
2928 enum lttng_notification_channel_status status
=
2929 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2930 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
2931 enum client_transmission_status transmission_status
;
2933 pthread_mutex_lock(&client
->lock
);
2935 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
2936 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
2937 sizeof(handshake_reply
));
2940 (struct lttng_notification_channel_command_handshake
*)
2941 client
->communication
.inbound
.buffer
2943 client
->major
= handshake_client
->major
;
2944 client
->minor
= handshake_client
->minor
;
2945 if (!client
->communication
.inbound
.creds_received
) {
2946 ERR("[notification-thread] No credentials received from client");
2951 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
2952 &client
->communication
.inbound
.creds
);
2953 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
2954 &client
->communication
.inbound
.creds
);
2955 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2956 client
->uid
, client
->gid
, (int) client
->major
,
2957 (int) client
->minor
);
2959 if (handshake_client
->major
!=
2960 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
2961 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
2964 ret
= lttng_dynamic_buffer_append(
2965 &client
->communication
.outbound
.buffer
, send_buffer
,
2966 sizeof(send_buffer
));
2968 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2972 client
->validated
= true;
2973 client
->communication
.active
= true;
2975 transmission_status
= client_flush_outgoing_queue(client
);
2976 ret
= client_handle_transmission_status(
2977 client
, transmission_status
, state
);
2982 ret
= client_send_command_reply(client
, state
, status
);
2984 ERR("[notification-thread] Failed to send reply to notification channel client");
2988 /* Set reception state to receive the next message header. */
2989 ret
= client_reset_inbound_state(client
);
2991 ERR("[notification-thread] Failed to reset client communication's inbound state");
2996 pthread_mutex_unlock(&client
->lock
);
3001 int client_handle_message_subscription(
3002 struct notification_client
*client
,
3003 enum lttng_notification_channel_message_type msg_type
,
3004 struct notification_thread_state
*state
)
3007 struct lttng_condition
*condition
;
3008 enum lttng_notification_channel_status status
=
3009 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3010 struct lttng_payload_view condition_view
=
3011 lttng_payload_view_from_dynamic_buffer(
3012 &client
->communication
.inbound
.buffer
,
3014 size_t expected_condition_size
;
3016 pthread_mutex_lock(&client
->lock
);
3017 expected_condition_size
= client
->communication
.inbound
.buffer
.size
;
3018 pthread_mutex_unlock(&client
->lock
);
3020 ret
= lttng_condition_create_from_payload(&condition_view
, &condition
);
3021 if (ret
!= expected_condition_size
) {
3022 ERR("[notification-thread] Malformed condition received from client");
3026 if (msg_type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
3027 ret
= notification_thread_client_subscribe(
3028 client
, condition
, state
, &status
);
3030 ret
= notification_thread_client_unsubscribe(
3031 client
, condition
, state
, &status
);
3037 pthread_mutex_lock(&client
->lock
);
3038 ret
= client_send_command_reply(client
, state
, status
);
3040 ERR("[notification-thread] Failed to send reply to notification channel client");
3044 /* Set reception state to receive the next message header. */
3045 ret
= client_reset_inbound_state(client
);
3047 ERR("[notification-thread] Failed to reset client communication's inbound state");
3052 pthread_mutex_unlock(&client
->lock
);
3058 int client_dispatch_message(struct notification_client
*client
,
3059 struct notification_thread_state
*state
)
3063 if (client
->communication
.inbound
.msg_type
!=
3064 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
3065 client
->communication
.inbound
.msg_type
!=
3066 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
3067 !client
->validated
) {
3068 WARN("[notification-thread] client attempted a command before handshake");
3073 switch (client
->communication
.inbound
.msg_type
) {
3074 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
3076 ret
= client_handle_message_unknown(client
, state
);
3079 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3081 ret
= client_handle_message_handshake(client
, state
);
3084 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3085 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3087 ret
= client_handle_message_subscription(client
,
3088 client
->communication
.inbound
.msg_type
, state
);
3098 /* Incoming data from client. */
3099 int handle_notification_thread_client_in(
3100 struct notification_thread_state
*state
, int socket
)
3103 struct notification_client
*client
;
3106 bool message_is_complete
= false;
3108 client
= get_client_from_socket(socket
, state
);
3110 /* Internal error, abort. */
3115 pthread_mutex_lock(&client
->lock
);
3116 offset
= client
->communication
.inbound
.buffer
.size
-
3117 client
->communication
.inbound
.bytes_to_receive
;
3118 if (client
->communication
.inbound
.expect_creds
) {
3119 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
3120 client
->communication
.inbound
.buffer
.data
+ offset
,
3121 client
->communication
.inbound
.bytes_to_receive
,
3122 &client
->communication
.inbound
.creds
);
3124 client
->communication
.inbound
.expect_creds
= false;
3125 client
->communication
.inbound
.creds_received
= true;
3128 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
3129 client
->communication
.inbound
.buffer
.data
+ offset
,
3130 client
->communication
.inbound
.bytes_to_receive
);
3132 if (recv_ret
>= 0) {
3133 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
3134 message_is_complete
= client
->communication
.inbound
3135 .bytes_to_receive
== 0;
3137 pthread_mutex_unlock(&client
->lock
);
3139 goto error_disconnect_client
;
3142 if (message_is_complete
) {
3143 ret
= client_dispatch_message(client
, state
);
3146 * Only returns an error if this client must be
3149 goto error_disconnect_client
;
3154 error_disconnect_client
:
3155 pthread_mutex_lock(&client
->lock
);
3156 ret
= notification_thread_client_disconnect(client
, state
);
3157 pthread_mutex_unlock(&client
->lock
);
3161 /* Client ready to receive outgoing data. */
3162 int handle_notification_thread_client_out(
3163 struct notification_thread_state
*state
, int socket
)
3166 struct notification_client
*client
;
3167 enum client_transmission_status transmission_status
;
3169 client
= get_client_from_socket(socket
, state
);
3171 /* Internal error, abort. */
3176 pthread_mutex_lock(&client
->lock
);
3177 transmission_status
= client_flush_outgoing_queue(client
);
3178 ret
= client_handle_transmission_status(
3179 client
, transmission_status
, state
);
3180 pthread_mutex_unlock(&client
->lock
);
3189 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
3190 const struct channel_state_sample
*sample
,
3191 uint64_t buffer_capacity
)
3193 bool result
= false;
3195 enum lttng_condition_type condition_type
;
3196 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
3197 condition
, struct lttng_condition_buffer_usage
,
3200 if (use_condition
->threshold_bytes
.set
) {
3201 threshold
= use_condition
->threshold_bytes
.value
;
3204 * Threshold was expressed as a ratio.
3206 * TODO the threshold (in bytes) of conditions expressed
3207 * as a ratio of total buffer size could be cached to
3208 * forego this double-multiplication or it could be performed
3209 * as fixed-point math.
3211 * Note that caching should accommodates the case where the
3212 * condition applies to multiple channels (i.e. don't assume
3213 * that all channels matching my_chann* have the same size...)
3215 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
3216 (double) buffer_capacity
);
3219 condition_type
= lttng_condition_get_type(condition
);
3220 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
3221 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3222 threshold
, sample
->highest_usage
);
3225 * The low condition should only be triggered once _all_ of the
3226 * streams in a channel have gone below the "low" threshold.
3228 if (sample
->highest_usage
<= threshold
) {
3232 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3233 threshold
, sample
->highest_usage
);
3236 * For high buffer usage scenarios, we want to trigger whenever
3237 * _any_ of the streams has reached the "high" threshold.
3239 if (sample
->highest_usage
>= threshold
) {
3248 bool evaluate_session_consumed_size_condition(
3249 const struct lttng_condition
*condition
,
3250 uint64_t session_consumed_size
)
3253 const struct lttng_condition_session_consumed_size
*size_condition
=
3254 container_of(condition
,
3255 struct lttng_condition_session_consumed_size
,
3258 threshold
= size_condition
->consumed_threshold_bytes
.value
;
3259 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64
", current size = %" PRIu64
,
3260 threshold
, session_consumed_size
);
3261 return session_consumed_size
>= threshold
;
3265 int evaluate_buffer_condition(const struct lttng_condition
*condition
,
3266 struct lttng_evaluation
**evaluation
,
3267 const struct notification_thread_state
*state
,
3268 const struct channel_state_sample
*previous_sample
,
3269 const struct channel_state_sample
*latest_sample
,
3270 uint64_t previous_session_consumed_total
,
3271 uint64_t latest_session_consumed_total
,
3272 struct channel_info
*channel_info
)
3275 enum lttng_condition_type condition_type
;
3276 const bool previous_sample_available
= !!previous_sample
;
3277 bool previous_sample_result
= false;
3278 bool latest_sample_result
;
3280 condition_type
= lttng_condition_get_type(condition
);
3282 switch (condition_type
) {
3283 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3284 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3285 if (caa_likely(previous_sample_available
)) {
3286 previous_sample_result
=
3287 evaluate_buffer_usage_condition(condition
,
3288 previous_sample
, channel_info
->capacity
);
3290 latest_sample_result
= evaluate_buffer_usage_condition(
3291 condition
, latest_sample
,
3292 channel_info
->capacity
);
3294 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3295 if (caa_likely(previous_sample_available
)) {
3296 previous_sample_result
=
3297 evaluate_session_consumed_size_condition(
3299 previous_session_consumed_total
);
3301 latest_sample_result
=
3302 evaluate_session_consumed_size_condition(
3304 latest_session_consumed_total
);
3307 /* Unknown condition type; internal error. */
3311 if (!latest_sample_result
||
3312 (previous_sample_result
== latest_sample_result
)) {
3314 * Only trigger on a condition evaluation transition.
3316 * NOTE: This edge-triggered logic may not be appropriate for
3317 * future condition types.
3322 if (!evaluation
|| !latest_sample_result
) {
3326 switch (condition_type
) {
3327 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3328 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3329 *evaluation
= lttng_evaluation_buffer_usage_create(
3331 latest_sample
->highest_usage
,
3332 channel_info
->capacity
);
3334 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3335 *evaluation
= lttng_evaluation_session_consumed_size_create(
3336 latest_session_consumed_total
);
3351 int client_notification_overflow(struct notification_client
*client
)
3354 const struct lttng_notification_channel_message msg
= {
3355 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
,
3358 ASSERT_LOCKED(client
->lock
);
3360 DBG("Dropping notification addressed to client (socket fd = %i)",
3362 if (client
->communication
.outbound
.dropped_notification
) {
3364 * The client already has a "notification dropped" message
3365 * in its outgoing queue. Nothing to do since all
3366 * of those messages are coalesced.
3371 client
->communication
.outbound
.dropped_notification
= true;
3372 ret
= lttng_dynamic_buffer_append(
3373 &client
->communication
.outbound
.buffer
, &msg
,
3376 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3383 static int client_handle_transmission_status_wrapper(
3384 struct notification_client
*client
,
3385 enum client_transmission_status status
,
3388 return client_handle_transmission_status(client
, status
,
3389 (struct notification_thread_state
*) user_data
);
3393 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
3394 const struct lttng_evaluation
*evaluation
,
3395 struct notification_client_list
* client_list
,
3396 struct notification_thread_state
*state
,
3397 uid_t object_uid
, gid_t object_gid
)
3399 return notification_client_list_send_evaluation(client_list
,
3400 lttng_trigger_get_const_condition(trigger
), evaluation
,
3401 lttng_trigger_get_credentials(trigger
),
3402 &(struct lttng_credentials
){
3403 .uid
= object_uid
, .gid
= object_gid
},
3404 client_handle_transmission_status_wrapper
, state
);
3408 * Permission checks relative to notification channel clients are performed
3409 * here. Notice how object, client, and trigger credentials are involved in
3412 * The `object` credentials are the credentials associated with the "subject"
3413 * of a condition. For instance, a `rotation completed` condition applies
3414 * to a session. When that condition is met, it will produce an evaluation
3415 * against a session. Hence, in this case, the `object` credentials are the
3416 * credentials of the "subject" session.
3418 * The `trigger` credentials are the credentials of the user that registered the
3421 * The `client` credentials are the credentials of the user that created a given
3422 * notification channel.
3424 * In terms of visibility, it is expected that non-privilieged users can only
3425 * register triggers against "their" objects (their own sessions and
3426 * applications they are allowed to interact with). They can then open a
3427 * notification channel and subscribe to notifications associated with those
3430 * As for privilieged users, they can register triggers against the objects of
3431 * other users. They can then subscribe to the notifications associated to their
3432 * triggers. Privilieged users _can't_ subscribe to the notifications of
3433 * triggers owned by other users; they must create their own triggers.
3435 * This is more a concern of usability than security. It would be difficult for
3436 * a root user reliably subscribe to a specific set of conditions without
3437 * interference from external users (those could, for instance, unregister
3441 int notification_client_list_send_evaluation(
3442 struct notification_client_list
*client_list
,
3443 const struct lttng_condition
*condition
,
3444 const struct lttng_evaluation
*evaluation
,
3445 const struct lttng_credentials
*trigger_creds
,
3446 const struct lttng_credentials
*source_object_creds
,
3447 report_client_transmission_result_cb client_report
,
3451 struct lttng_payload msg_payload
;
3452 struct notification_client_list_element
*client_list_element
, *tmp
;
3453 const struct lttng_notification notification
= {
3454 .condition
= (struct lttng_condition
*) condition
,
3455 .evaluation
= (struct lttng_evaluation
*) evaluation
,
3457 struct lttng_notification_channel_message msg_header
= {
3458 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
,
3461 lttng_payload_init(&msg_payload
);
3463 ret
= lttng_dynamic_buffer_append(&msg_payload
.buffer
, &msg_header
,
3464 sizeof(msg_header
));
3469 ret
= lttng_notification_serialize(¬ification
, &msg_payload
);
3471 ERR("[notification-thread] Failed to serialize notification");
3476 /* Update payload size. */
3477 ((struct lttng_notification_channel_message
*) msg_payload
.buffer
.data
)
3478 ->size
= (uint32_t)(
3479 msg_payload
.buffer
.size
- sizeof(msg_header
));
3481 pthread_mutex_lock(&client_list
->lock
);
3482 cds_list_for_each_entry_safe(client_list_element
, tmp
,
3483 &client_list
->list
, node
) {
3484 enum client_transmission_status transmission_status
;
3485 struct notification_client
*client
=
3486 client_list_element
->client
;
3489 pthread_mutex_lock(&client
->lock
);
3490 if (source_object_creds
) {
3491 if (client
->uid
!= source_object_creds
->uid
&&
3492 client
->gid
!= source_object_creds
->gid
&&
3495 * Client is not allowed to monitor this
3498 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3503 if (client
->uid
!= trigger_creds
->uid
&& client
->gid
!= trigger_creds
->gid
) {
3504 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3508 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3509 client
->socket
, msg_payload
.buffer
.size
);
3510 if (client
->communication
.outbound
.buffer
.size
) {
3512 * Outgoing data is already buffered for this client;
3513 * drop the notification and enqueue a "dropped
3514 * notification" message if this is the first dropped
3515 * notification since the socket spilled-over to the
3518 ret
= client_notification_overflow(client
);
3524 ret
= lttng_dynamic_buffer_append_buffer(
3525 &client
->communication
.outbound
.buffer
,
3526 &msg_payload
.buffer
);
3531 transmission_status
= client_flush_outgoing_queue(client
);
3532 ret
= client_report(client
, transmission_status
, user_data
);
3537 pthread_mutex_unlock(&client
->lock
);
3539 goto end_unlock_list
;
3545 pthread_mutex_unlock(&client_list
->lock
);
3547 lttng_payload_reset(&msg_payload
);
3551 int handle_notification_thread_channel_sample(
3552 struct notification_thread_state
*state
, int pipe
,
3553 enum lttng_domain_type domain
)
3556 struct lttcomm_consumer_channel_monitor_msg sample_msg
;
3557 struct channel_info
*channel_info
;
3558 struct cds_lfht_node
*node
;
3559 struct cds_lfht_iter iter
;
3560 struct lttng_channel_trigger_list
*trigger_list
;
3561 struct lttng_trigger_list_element
*trigger_list_element
;
3562 bool previous_sample_available
= false;
3563 struct channel_state_sample previous_sample
, latest_sample
;
3564 uint64_t previous_session_consumed_total
, latest_session_consumed_total
;
3565 struct lttng_credentials channel_creds
;
3568 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3569 * ensuring that read/write of sampling messages are atomic.
3571 ret
= lttng_read(pipe
, &sample_msg
, sizeof(sample_msg
));
3572 if (ret
!= sizeof(sample_msg
)) {
3573 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3580 latest_sample
.key
.key
= sample_msg
.key
;
3581 latest_sample
.key
.domain
= domain
;
3582 latest_sample
.highest_usage
= sample_msg
.highest
;
3583 latest_sample
.lowest_usage
= sample_msg
.lowest
;
3584 latest_sample
.channel_total_consumed
= sample_msg
.total_consumed
;
3588 /* Retrieve the channel's informations */
3589 cds_lfht_lookup(state
->channels_ht
,
3590 hash_channel_key(&latest_sample
.key
),
3594 node
= cds_lfht_iter_get_node(&iter
);
3595 if (caa_unlikely(!node
)) {
3597 * Not an error since the consumer can push a sample to the pipe
3598 * and the rest of the session daemon could notify us of the
3599 * channel's destruction before we get a chance to process that
3602 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64
" in %s domain",
3603 latest_sample
.key
.key
,
3604 domain
== LTTNG_DOMAIN_KERNEL
? "kernel" :
3608 channel_info
= caa_container_of(node
, struct channel_info
,
3610 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64
") in session %s (highest usage = %" PRIu64
", lowest usage = %" PRIu64
", total consumed = %" PRIu64
")",
3612 latest_sample
.key
.key
,
3613 channel_info
->session_info
->name
,
3614 latest_sample
.highest_usage
,
3615 latest_sample
.lowest_usage
,
3616 latest_sample
.channel_total_consumed
);
3618 previous_session_consumed_total
=
3619 channel_info
->session_info
->consumed_data_size
;
3621 /* Retrieve the channel's last sample, if it exists, and update it. */
3622 cds_lfht_lookup(state
->channel_state_ht
,
3623 hash_channel_key(&latest_sample
.key
),
3624 match_channel_state_sample
,
3627 node
= cds_lfht_iter_get_node(&iter
);
3628 if (caa_likely(node
)) {
3629 struct channel_state_sample
*stored_sample
;
3631 /* Update the sample stored. */
3632 stored_sample
= caa_container_of(node
,
3633 struct channel_state_sample
,
3634 channel_state_ht_node
);
3636 memcpy(&previous_sample
, stored_sample
,
3637 sizeof(previous_sample
));
3638 stored_sample
->highest_usage
= latest_sample
.highest_usage
;
3639 stored_sample
->lowest_usage
= latest_sample
.lowest_usage
;
3640 stored_sample
->channel_total_consumed
= latest_sample
.channel_total_consumed
;
3641 previous_sample_available
= true;
3643 latest_session_consumed_total
=
3644 previous_session_consumed_total
+
3645 (latest_sample
.channel_total_consumed
- previous_sample
.channel_total_consumed
);
3648 * This is the channel's first sample, allocate space for and
3649 * store the new sample.
3651 struct channel_state_sample
*stored_sample
;
3653 stored_sample
= zmalloc(sizeof(*stored_sample
));
3654 if (!stored_sample
) {
3659 memcpy(stored_sample
, &latest_sample
, sizeof(*stored_sample
));
3660 cds_lfht_node_init(&stored_sample
->channel_state_ht_node
);
3661 cds_lfht_add(state
->channel_state_ht
,
3662 hash_channel_key(&stored_sample
->key
),
3663 &stored_sample
->channel_state_ht_node
);
3665 latest_session_consumed_total
=
3666 previous_session_consumed_total
+
3667 latest_sample
.channel_total_consumed
;
3670 channel_info
->session_info
->consumed_data_size
=
3671 latest_session_consumed_total
;
3673 /* Find triggers associated with this channel. */
3674 cds_lfht_lookup(state
->channel_triggers_ht
,
3675 hash_channel_key(&latest_sample
.key
),
3676 match_channel_trigger_list
,
3679 node
= cds_lfht_iter_get_node(&iter
);
3680 if (caa_likely(!node
)) {
3684 channel_creds
= (typeof(channel_creds
)) {
3685 .uid
= channel_info
->session_info
->uid
,
3686 .gid
= channel_info
->session_info
->gid
,
3689 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
3690 channel_triggers_ht_node
);
3691 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
3693 const struct lttng_condition
*condition
;
3694 const struct lttng_action
*action
;
3695 struct lttng_trigger
*trigger
;
3696 struct notification_client_list
*client_list
= NULL
;
3697 struct lttng_evaluation
*evaluation
= NULL
;
3698 bool client_list_is_empty
;
3699 enum action_executor_status executor_status
;
3702 trigger
= trigger_list_element
->trigger
;
3703 condition
= lttng_trigger_get_const_condition(trigger
);
3705 action
= lttng_trigger_get_const_action(trigger
);
3707 /* Notify actions are the only type currently supported. */
3708 assert(lttng_action_get_type_const(action
) ==
3709 LTTNG_ACTION_TYPE_NOTIFY
);
3712 * Check if any client is subscribed to the result of this
3715 client_list
= get_client_list_from_condition(state
, condition
);
3716 assert(client_list
);
3717 client_list_is_empty
= cds_list_empty(&client_list
->list
);
3718 if (client_list_is_empty
) {
3720 * No clients interested in the evaluation's result,
3726 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
3727 previous_sample_available
? &previous_sample
: NULL
,
3729 previous_session_consumed_total
,
3730 latest_session_consumed_total
,
3732 if (caa_unlikely(ret
)) {
3736 if (caa_likely(!evaluation
)) {
3741 * Ownership of `evaluation` transferred to the action executor
3742 * no matter the result.
3744 executor_status
= action_executor_enqueue(state
->executor
,
3745 trigger
, evaluation
, &channel_creds
,
3748 switch (executor_status
) {
3749 case ACTION_EXECUTOR_STATUS_OK
:
3751 case ACTION_EXECUTOR_STATUS_ERROR
:
3752 case ACTION_EXECUTOR_STATUS_INVALID
:
3754 * TODO Add trigger identification (name/id) when
3755 * it is added to the API.
3757 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
3760 case ACTION_EXECUTOR_STATUS_OVERFLOW
:
3762 * TODO Add trigger identification (name/id) when
3763 * it is added to the API.
3765 * Not a fatal error.
3767 WARN("No space left when enqueuing action associated with buffer-condition trigger");
3775 notification_client_list_put(client_list
);
3776 if (caa_unlikely(ret
)) {