sessiond: defer tracer notification jobs to the action executor
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 24 Jan 2020 22:37:54 +0000 (17:37 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 11 Feb 2021 16:25:40 +0000 (11:25 -0500)
Create an event rule evaluation and enqueue a job to execute the
actions associated with the corresponding trigger on the action executor
work queue.

Clients are notified that a notification was dropped if the action
executor queue is full at the time of invocation.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I7b36af2553a4ca9404b58f185e9a12f3900a1d35
Depends-on: lttng-ust: I5a800fc92e588c2a6a0e26282b0ad5f31c044479

src/bin/lttng-sessiond/notification-thread-events.c
src/common/evaluation.c

index 833b550f87067672a682ccc99bcec710361c90be..93363c708e6712bd26105bf57b2a2323cfc5b1da 100644 (file)
@@ -4181,14 +4181,17 @@ int handle_notification_thread_event_notification(struct notification_thread_sta
        int ret;
        struct lttng_ust_event_notifier_notification ust_notification;
        struct lttng_kernel_event_notifier_notification kernel_notification;
+       struct lttng_evaluation *evaluation = NULL;
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
        struct notification_trigger_tokens_ht_element *element;
-       enum lttng_action_type action_type;
-       const struct lttng_action *action;
+       enum lttng_trigger_status status;
        struct lttng_event_notifier_notification notification;
        void *reception_buffer;
        size_t reception_size;
+       enum action_executor_status executor_status;
+       struct notification_client_list *client_list = NULL;
+       const char *trigger_name;
 
        notification.type = domain;
 
@@ -4237,11 +4240,12 @@ int handle_notification_thread_event_notification(struct notification_thread_sta
                        hash_key_u64(&notification.token, lttng_ht_seed),
                        match_trigger_token, &notification.token, &iter);
        node = cds_lfht_iter_get_node(&iter);
-       if (caa_likely(!node)) {
+       if (caa_unlikely(!node)) {
                /*
-                * This is not an error, slow consumption of the pipe can lead
-                * to situations where a trigger is removed but we still get
-                * tracer notification matching to a previous trigger.
+                * This is not an error, slow consumption of the tracer
+                * notifications can lead to situations where a trigger is
+                * removed but we still get tracer notifications matching a
+                * trigger that no longer exists.
                 */
                ret = 0;
                goto end_unlock;
@@ -4251,17 +4255,91 @@ int handle_notification_thread_event_notification(struct notification_thread_sta
                        struct notification_trigger_tokens_ht_element,
                        node);
 
-       action = lttng_trigger_get_const_action(element->trigger);
-       action_type = lttng_action_get_type(action);
-       DBG("Received message from tracer event source: event source fd = %d, token = %" PRIu64 ", action type = '%s'",
-                       notification_pipe_read_fd, notification.token,
-                       lttng_action_type_string(action_type));
+       if (!lttng_trigger_should_fire(element->trigger)) {
+               ret = 0;
+               goto end_unlock;
+       }
 
-       /* TODO: Perform actions */
+       lttng_trigger_fire(element->trigger);
 
-       ret = 0;
+       status = lttng_trigger_get_name(element->trigger, &trigger_name);
+       assert(status == LTTNG_TRIGGER_STATUS_OK);
+       evaluation = lttng_evaluation_event_rule_create(trigger_name);
+       if (evaluation == NULL) {
+               ERR("Failed to create event rule evaluation while creating and enqueuing action executor job");
+               ret = -1;
+               goto end_unlock;
+       }
+
+       client_list = get_client_list_from_condition(state,
+                       lttng_trigger_get_const_condition(element->trigger));
+       executor_status = action_executor_enqueue(state->executor,
+                       element->trigger, evaluation, NULL, client_list);
+       switch (executor_status) {
+       case ACTION_EXECUTOR_STATUS_OK:
+               ret = 0;
+               break;
+       case ACTION_EXECUTOR_STATUS_OVERFLOW:
+       {
+               struct notification_client_list_element *client_list_element,
+                               *tmp;
+
+               /*
+                * Not a fatal error; this is expected and simply means the
+                * executor has too much work queued already.
+                */
+               ret = 0;
+
+               /* No clients subscribed to notifications for this trigger. */
+               if (!client_list) {
+                       break;
+               }
+
+               /* Warn clients that a notification (or more) was dropped. */
+               pthread_mutex_lock(&client_list->lock);
+               cds_list_for_each_entry_safe(client_list_element, tmp,
+                               &client_list->list, node) {
+                       enum client_transmission_status transmission_status;
+                       struct notification_client *client =
+                                       client_list_element->client;
+
+                       pthread_mutex_lock(&client->lock);
+                       ret = client_notification_overflow(client);
+                       if (ret) {
+                               /* Fatal error. */
+                               goto next_client;
+                       }
+
+                       transmission_status =
+                                       client_flush_outgoing_queue(client);
+                       ret = client_handle_transmission_status(
+                                       client, transmission_status, state);
+                       if (ret) {
+                               /* Fatal error. */
+                               goto next_client;
+                       }
+next_client:
+                       pthread_mutex_unlock(&client->lock);
+                       if (ret) {
+                               break;
+                       }
+               }
+
+               pthread_mutex_unlock(&client_list->lock);
+               break;
+       }
+       case ACTION_EXECUTOR_STATUS_ERROR:
+               /* Fatal error, shut down everything. */
+               ERR("Fatal error encoutered while enqueuing action to the action executor");
+               ret = -1;
+               goto end_unlock;
+       default:
+               /* Unhandled error. */
+               abort();
+       }
 
 end_unlock:
+       notification_client_list_put(client_list);
        rcu_read_unlock();
 end:
        return ret;
index b76d349fa40526225d9b7199a0d919955a1d8bfb..d8a68a7840429b44bf73679bd3b0abc1b30f6273 100644 (file)
@@ -9,6 +9,7 @@
 #include <lttng/condition/buffer-usage-internal.h>
 #include <lttng/condition/session-consumed-size-internal.h>
 #include <lttng/condition/session-rotation-internal.h>
+#include <lttng/condition/event-rule-internal.h>
 #include <common/macros.h>
 #include <common/error.h>
 #include <stdbool.h>
@@ -114,6 +115,13 @@ ssize_t lttng_evaluation_create_from_payload(
                }
                evaluation_size += ret;
                break;
+       case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
+               ret = lttng_evaluation_event_rule_create_from_payload(&evaluation_view, evaluation);
+               if (ret < 0) {
+                       goto end;
+               }
+               evaluation_size += ret;
+               break;
        default:
                ERR("Attempted to create evaluation of unknown type (%i)",
                                (int) evaluation_comm->type);
This page took 0.029074 seconds and 4 git commands to generate.