From: Jérémie Galarneau Date: Tue, 11 Feb 2020 04:29:18 +0000 (-0500) Subject: sessiond: trigger: run trigger actions through an action executor X-Git-Tag: v2.13.0-rc1~494 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=f2b3ef9f7d7adf6cc0678cd5933b638f5a9e75a6;p=lttng-tools.git sessiond: trigger: run trigger actions through an action executor The `action executor` interface allows the notification subsystem to enqueue work items to execute on behalf of a given trigger. This allows the notification thread to remain responsive even if the actions to execute are blocking (as through the use of network communication). Before this commit, the notification subsystem only handled `notify` actions; handling code for new action types are added as part of the action executor. The existing `notify` action is now performed through the action executor so that all actions can be managed in the same way. This is less efficient than sending the notifications directly, but could be optimized trivially (if it ever becomes a problem) when: - the action is a group containing only a `notify` action, - the action is a `notify` action. Overview of changes to existing code === Managing the new action types requires fairly localized changes to the existing notification subsystem code. The main code paths that are modified are the sites where `evaluation` objects are created: 1) on an object state change (session or channel state changes, see handle_notification_thread_channel_sample and handle_notification_thread_command_session_rotation), 2) on registration of a trigger (see handle_notification_thread_command_register_trigger), 3) on subscription to a condition (see client_handle_message_subscription). To understand the lifetime of most objects involved in a work deferral to the action executor, see the paragraph in notification-thread-internal.h (line 82) to understand the relation between clients and client lists. 1) Object state changes As hinted in the notification_client_list documentation, deferring work on a state change is straight-forward: a reference is taken on a client list and the list is provided to the action executor as part of a work item. Hence, very little changes are made to the the two state-change handling sites beyond enqueuing a work item rather than directly sending a notification. 2) Subscription to a condition A notification client can subscribe to a condition before or after a matching trigger (same condition and containing a notify action) has been registered. If no matching trigger were registered, no client list exists and there is nothing to do. If a matching trigger existed, a client list (which could be empty) will already exist and the client is simply added to the client list. However, it is important to evaluate the condition for the client (as the condition could already be true) and send the notification to that client only and not to all clients in the list. Before this change, since everything was done in the same thread, a temporary list containing only the newly-subscribed client was created on the stack and the notification was sent/queued immediately. After sending the condition, the client was removed from the temporary list and added to the "real" client list. This strategy cannot be used with the action executor as the "temporary" client list must exist beyond the scope of the function. Moreover, the notification subsystem assumes that clients are in per-condition client lists and that they can safely be destroyed when they are not present in any list. Fortunately, here we know that the action to perform is to `notify` and nothing else. The enqueuing of the notification is performed "in place" by the notification thread without deferring to the action executor. 3) Registration of a trigger When a client subscribes to a condition, the current state of that condition is immediately evaluated. If the condition is true (for instance, a channel's buffer are filled beyond X% of their capacity), the action associated with the trigger is executed right away. This path requires little changes as a client list is created when a trigger is registered. Hence, it is possible to use the client list to defer work as is done in `1`. 4) Trigger registration Since the `notify` action was the only supported action type, the notification subsystem always created a client list associated with the new trigger's condition. This is changed to only perform the creation (and publication) of the client list if the trigger's action is (or contains, in the case of a group) a `notify` action. Signed-off-by: Jérémie Galarneau Change-Id: I43b54b93c1244591aeff6e0d0fa8076c7b5e0c50 --- diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index ee53655be..dd807125b 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -54,7 +54,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \ manage-kernel.c manage-kernel.h \ manage-consumer.c manage-consumer.h \ clear.c clear.h \ - tracker.c tracker.h + tracker.c tracker.h \ + action-executor.c action-executor.h if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/action-executor.c b/src/bin/lttng-sessiond/action-executor.c new file mode 100644 index 000000000..c51bb7f63 --- /dev/null +++ b/src/bin/lttng-sessiond/action-executor.c @@ -0,0 +1,718 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#include "action-executor.h" +#include "cmd.h" +#include "health-sessiond.h" +#include "lttng-sessiond.h" +#include "notification-thread-internal.h" +#include "session.h" +#include "thread.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define THREAD_NAME "Action Executor" +#define MAX_QUEUED_WORK_COUNT 8192 + +struct action_work_item { + uint64_t id; + struct lttng_trigger *trigger; + struct lttng_evaluation *evaluation; + struct notification_client_list *client_list; + LTTNG_OPTIONAL(struct lttng_credentials) object_creds; + struct cds_list_head list_node; +}; + +struct action_executor { + struct lttng_thread *thread; + struct notification_thread_handle *notification_thread_handle; + struct { + uint64_t pending_count; + struct cds_list_head list; + pthread_cond_t cond; + pthread_mutex_t lock; + } work; + bool should_quit; + uint64_t next_work_item_id; +}; + +/* + * Only return non-zero on a fatal error that should shut down the action + * executor. + */ +typedef int (*action_executor_handler)(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *action); + +static int action_executor_notify_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_start_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_stop_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_rotate_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_snapshot_session_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_group_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); +static int action_executor_generic_handler(struct action_executor *executor, + const struct action_work_item *, + const struct lttng_action *); + +static const action_executor_handler action_executors[] = { + [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler, + [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler, + [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler, + [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler, + [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler, + [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler, +}; + +static const char *action_type_names[] = { + [LTTNG_ACTION_TYPE_NOTIFY] = "Notify", + [LTTNG_ACTION_TYPE_START_SESSION] = "Start session", + [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session", + [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session", + [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session", + [LTTNG_ACTION_TYPE_GROUP] = "Group", +}; + +static const char *get_action_name(const struct lttng_action *action) +{ + return action_type_names[lttng_action_get_type_const(action)]; +} + +/* Check if this trigger allowed to interect with a given session. */ +static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger, + struct ltt_session *session) +{ + bool is_allowed = false; + const struct lttng_credentials session_creds = { + .uid = session->uid, + .gid = session->gid, + }; + /* Can never be NULL. */ + const struct lttng_credentials *trigger_creds = + lttng_trigger_get_credentials(trigger); + + is_allowed = (trigger_creds->uid == session_creds.uid) || + (trigger_creds->uid == 0); + if (!is_allowed) { + WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld, trigger gid = %ld", + session->name, + (long int) session->uid, + (long int) session->gid, + (long int) trigger_creds->uid, + (long int) trigger_creds->gid); + } + + return is_allowed; +} + +static int client_handle_transmission_status( + struct notification_client *client, + enum client_transmission_status status, + void *user_data) +{ + int ret = 0; + struct action_executor *executor = user_data; + bool update_communication = true; + + ASSERT_LOCKED(client->lock); + + switch (status) { + case CLIENT_TRANSMISSION_STATUS_COMPLETE: + DBG("Successfully sent full notification to client, client_id = %" PRIu64, + client->id); + update_communication = false; + break; + case CLIENT_TRANSMISSION_STATUS_QUEUED: + DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64, + client->id); + break; + case CLIENT_TRANSMISSION_STATUS_FAIL: + DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64, + client->id); + client->communication.active = false; + break; + default: + ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64, + client->id); + client->communication.active = false; + ret = -1; + goto end; + } + + if (!update_communication) { + goto end; + } + + ret = notification_thread_client_communication_update( + executor->notification_thread_handle, client->id, + status); +end: + return ret; +} + +static int action_executor_notify_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + return notification_client_list_send_evaluation(work_item->client_list, + lttng_trigger_get_const_condition(work_item->trigger), + work_item->evaluation, + lttng_trigger_get_credentials(work_item->trigger), + LTTNG_OPTIONAL_GET_PTR(work_item->object_creds), + client_handle_transmission_status, + executor); +} + +static int action_executor_start_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + enum lttng_error_code cmd_ret; + + action_status = lttng_action_start_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from `%s` action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (!session) { + DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`", + session_name, get_action_name(action), + work_item->trigger); + goto error_unlock_list; + } + + session_lock(session); + if (!is_trigger_allowed_for_session(work_item->trigger, session)) { + goto error_dispose_session; + } + + cmd_ret = cmd_start_trace(session); + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully started session `%s` on behalf of trigger `%p`", + session_name, work_item->trigger); + break; + case LTTNG_ERR_TRACE_ALREADY_STARTED: + DBG("Attempted to start session `%s` on behalf of trigger `%p` but it was already started", + session_name, work_item->trigger); + break; + default: + WARN("Failed to start session `%s` on behalf of trigger `%p`: %s", + session_name, work_item->trigger, + lttng_strerror(-cmd_ret)); + break; + } + +error_dispose_session: + session_unlock(session); + session_put(session); +error_unlock_list: + session_unlock_list(); +end: + return ret; +} + +static int action_executor_stop_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + enum lttng_error_code cmd_ret; + + action_status = lttng_action_stop_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from `%s` action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (!session) { + DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`", + session_name, get_action_name(action), + work_item->trigger); + goto error_unlock_list; + } + + session_lock(session); + if (!is_trigger_allowed_for_session(work_item->trigger, session)) { + goto error_dispose_session; + } + + cmd_ret = cmd_stop_trace(session); + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully stopped session `%s` on behalf of trigger `%p`", + session_name, work_item->trigger); + break; + case LTTNG_ERR_TRACE_ALREADY_STOPPED: + DBG("Attempted to stop session `%s` on behalf of trigger `%p` but it was already stopped", + session_name, work_item->trigger); + break; + default: + WARN("Failed to stop session `%s` on behalf of trigger `%p`: %s", + session_name, work_item->trigger, + lttng_strerror(-cmd_ret)); + break; + } + +error_dispose_session: + session_unlock(session); + session_put(session); +error_unlock_list: + session_unlock_list(); +end: + return ret; +} + +static int action_executor_rotate_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + enum lttng_error_code cmd_ret; + + action_status = lttng_action_rotate_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from `%s` action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (!session) { + DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`", + session_name, get_action_name(action), + work_item->trigger); + goto error_unlock_list; + } + + session_lock(session); + if (!is_trigger_allowed_for_session(work_item->trigger, session)) { + goto error_dispose_session; + } + + cmd_ret = cmd_rotate_session(session, NULL, false, + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully started rotation of session `%s` on behalf of trigger `%p`", + session_name, work_item->trigger); + break; + case LTTNG_ERR_ROTATION_PENDING: + DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation is already ongoing", + session_name, work_item->trigger); + break; + case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP: + case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR: + DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%p` but a rotation has already been completed since the last stop or clear", + session_name, work_item->trigger); + break; + default: + WARN("Failed to start a rotation of session `%s` on behalf of trigger `%p`: %s", + session_name, work_item->trigger, + lttng_strerror(-cmd_ret)); + break; + } + +error_dispose_session: + session_unlock(session); + session_put(session); +error_unlock_list: + session_unlock_list(); +end: + return ret; +} + +static int action_executor_snapshot_session_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + int ret = 0; + const char *session_name; + enum lttng_action_status action_status; + struct ltt_session *session; + const struct lttng_snapshot_output default_snapshot_output = { + .max_size = UINT64_MAX, + }; + const struct lttng_snapshot_output *snapshot_output = + &default_snapshot_output; + enum lttng_error_code cmd_ret; + + action_status = lttng_action_snapshot_session_get_session_name( + action, &session_name); + if (action_status != LTTNG_ACTION_STATUS_OK) { + ERR("Failed to get session name from `%s` action", + get_action_name(action)); + ret = -1; + goto end; + } + + action_status = lttng_action_snapshot_session_get_output( + action, &snapshot_output); + if (action_status != LTTNG_ACTION_STATUS_OK && + action_status != LTTNG_ACTION_STATUS_UNSET) { + ERR("Failed to get output from `%s` action", + get_action_name(action)); + ret = -1; + goto end; + } + + session_lock_list(); + session = session_find_by_name(session_name); + if (!session) { + DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%p`", + session_name, get_action_name(action), + work_item->trigger); + goto error_unlock_list; + } + + + session_lock(session); + if (!is_trigger_allowed_for_session(work_item->trigger, session)) { + goto error_dispose_session; + } + + cmd_ret = cmd_snapshot_record(session, snapshot_output, 0); + switch (cmd_ret) { + case LTTNG_OK: + DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%p`", + session_name, work_item->trigger); + break; + default: + WARN("Failed to record snapshot of session `%s` on behalf of trigger `%p`: %s", + session_name, work_item->trigger, + lttng_strerror(-cmd_ret)); + break; + } + +error_dispose_session: + session_unlock(session); + session_put(session); +error_unlock_list: + session_unlock_list(); +end: + return ret; +} + +static int action_executor_group_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action_group) +{ + int ret = 0; + unsigned int i, count; + enum lttng_action_status action_status; + + action_status = lttng_action_group_get_count(action_group, &count); + if (action_status != LTTNG_ACTION_STATUS_OK) { + /* Fatal error. */ + ERR("Failed to get count of action in action group"); + ret = -1; + goto end; + } + + DBG("Action group has %u action%s", count, count != 1 ? "s" : ""); + for (i = 0; i < count; i++) { + const struct lttng_action *action = + lttng_action_group_get_at_index( + action_group, i); + + ret = action_executor_generic_handler( + executor, work_item, action); + if (ret) { + ERR("Stopping the execution of the action group of trigger `%p` following a fatal error", + work_item->trigger); + goto end; + } + } +end: + return ret; +} + +static int action_executor_generic_handler(struct action_executor *executor, + const struct action_work_item *work_item, + const struct lttng_action *action) +{ + DBG("Executing action `%s` of trigger `%p` action work item %" PRIu64, + get_action_name(action), + work_item->trigger, + work_item->id); + + return action_executors[lttng_action_get_type_const(action)]( + executor, work_item, action); +} + +static int action_work_item_execute(struct action_executor *executor, + struct action_work_item *work_item) +{ + int ret; + const struct lttng_action *action = + lttng_trigger_get_const_action(work_item->trigger); + + DBG("Starting execution of action work item %" PRIu64 " of trigger `%p`", + work_item->id, work_item->trigger); + ret = action_executor_generic_handler(executor, work_item, action); + DBG("Completed execution of action work item %" PRIu64 " of trigger `%p`", + work_item->id, work_item->trigger); + return ret; +} + +static void action_work_item_destroy(struct action_work_item *work_item) +{ + lttng_trigger_put(work_item->trigger); + lttng_evaluation_destroy(work_item->evaluation); + notification_client_list_put(work_item->client_list); + free(work_item); +} + +static void *action_executor_thread(void *_data) +{ + struct action_executor *executor = _data; + + assert(executor); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR); + + rcu_register_thread(); + rcu_thread_online(); + + DBG("Entering work execution loop"); + pthread_mutex_lock(&executor->work.lock); + while (!executor->should_quit) { + int ret; + struct action_work_item *work_item; + + health_code_update(); + if (executor->work.pending_count == 0) { + health_poll_entry(); + DBG("No work items enqueued, entering wait"); + pthread_cond_wait(&executor->work.cond, + &executor->work.lock); + DBG("Woke-up from wait"); + health_poll_exit(); + continue; + } + + /* Pop item from front of the listwith work lock held. */ + work_item = cds_list_first_entry(&executor->work.list, + struct action_work_item, list_node); + cds_list_del(&work_item->list_node); + executor->work.pending_count--; + + /* + * Work can be performed without holding the work lock, + * allowing new items to be queued. + */ + pthread_mutex_unlock(&executor->work.lock); + ret = action_work_item_execute(executor, work_item); + action_work_item_destroy(work_item); + if (ret) { + /* Fatal error. */ + break; + } + + health_code_update(); + pthread_mutex_lock(&executor->work.lock); + } + + pthread_mutex_unlock(&executor->work.lock); + DBG("Left work execution loop"); + + health_code_update(); + + rcu_thread_offline(); + rcu_unregister_thread(); + health_unregister(health_sessiond); + + return NULL; +} + +static bool shutdown_action_executor_thread(void *_data) +{ + struct action_executor *executor = _data; + + executor->should_quit = true; + pthread_cond_signal(&executor->work.cond); + return true; +} + +static void clean_up_action_executor_thread(void *_data) +{ + struct action_executor *executor = _data; + + assert(cds_list_empty(&executor->work.list)); + + pthread_mutex_destroy(&executor->work.lock); + pthread_cond_destroy(&executor->work.cond); + free(executor); +} + +struct action_executor *action_executor_create( + struct notification_thread_handle *handle) +{ + struct action_executor *executor = zmalloc(sizeof(*executor)); + + if (!executor) { + goto end; + } + + CDS_INIT_LIST_HEAD(&executor->work.list); + pthread_cond_init(&executor->work.cond, NULL); + pthread_mutex_init(&executor->work.lock, NULL); + executor->notification_thread_handle = handle; + + executor->thread = lttng_thread_create(THREAD_NAME, + action_executor_thread, shutdown_action_executor_thread, + clean_up_action_executor_thread, executor); +end: + return executor; +} + +void action_executor_destroy(struct action_executor *executor) +{ + struct action_work_item *work_item, *tmp; + + /* TODO Wait for work list to drain? */ + lttng_thread_shutdown(executor->thread); + pthread_mutex_lock(&executor->work.lock); + if (executor->work.pending_count != 0) { + WARN("%" PRIu64 + " trigger action%s still queued for execution and will be discarded", + executor->work.pending_count, + executor->work.pending_count == 1 ? " is" : + "s are"); + } + + cds_list_for_each_entry_safe ( + work_item, tmp, &executor->work.list, list_node) { + WARN("Discarding action work item %" PRIu64 + " associated to trigger `%p`", + work_item->id, work_item->trigger); + cds_list_del(&work_item->list_node); + action_work_item_destroy(work_item); + } + pthread_mutex_unlock(&executor->work.lock); + lttng_thread_put(executor->thread); +} + +/* RCU read-lock must be held by the caller. */ +enum action_executor_status action_executor_enqueue( + struct action_executor *executor, + struct lttng_trigger *trigger, + struct lttng_evaluation *evaluation, + const struct lttng_credentials *object_creds, + struct notification_client_list *client_list) +{ + enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK; + const uint64_t work_item_id = executor->next_work_item_id++; + struct action_work_item *work_item; + bool signal = false; + + pthread_mutex_lock(&executor->work.lock); + /* Check for queue overflow. */ + if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) { + /* Most likely spammy, remove if it is the case. */ + DBG("Refusing to enqueue action for trigger `%p` as work item %" PRIu64 + " (overflow)", + trigger, work_item_id); + executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW; + goto error_unlock; + } + + work_item = zmalloc(sizeof(*work_item)); + if (!work_item) { + PERROR("Failed to allocate action executor work item on behalf of trigger `%p`", + trigger); + executor_status = ACTION_EXECUTOR_STATUS_ERROR; + goto error_unlock; + } + + lttng_trigger_get(trigger); + if (client_list) { + const bool reference_acquired = + notification_client_list_get(client_list); + + assert(reference_acquired); + } + + *work_item = (typeof(*work_item)){ + .id = work_item_id, + .trigger = trigger, + /* Ownership transferred to the work item. */ + .evaluation = evaluation, + .object_creds = { + .is_set = !!object_creds, + .value = object_creds ? *object_creds : + (typeof(work_item->object_creds.value)) {}, + }, + .client_list = client_list, + .list_node = CDS_LIST_HEAD_INIT(work_item->list_node), + }; + + evaluation = NULL; + cds_list_add_tail(&work_item->list_node, &executor->work.list); + executor->work.pending_count++; + DBG("Enqueued action for trigger `%p` as work item %" PRIu64, + trigger, work_item_id); + signal = true; + +error_unlock: + pthread_mutex_unlock(&executor->work.lock); + if (signal) { + pthread_cond_signal(&executor->work.cond); + } + + lttng_evaluation_destroy(evaluation); + return executor_status; +} diff --git a/src/bin/lttng-sessiond/action-executor.h b/src/bin/lttng-sessiond/action-executor.h new file mode 100644 index 000000000..e01a37773 --- /dev/null +++ b/src/bin/lttng-sessiond/action-executor.h @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2020 Jérémie Galarneau + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#ifndef ACTION_EXECUTOR_H +#define ACTION_EXECUTOR_H + +struct action_executor; +struct notification_thread_handle; +struct lttng_evaluation; +struct lttng_trigger; +struct notification_client_list; +struct lttng_credentials; + +enum action_executor_status { + ACTION_EXECUTOR_STATUS_OK, + ACTION_EXECUTOR_STATUS_OVERFLOW, + ACTION_EXECUTOR_STATUS_ERROR, + ACTION_EXECUTOR_STATUS_INVALID, +}; + +struct action_executor *action_executor_create( + struct notification_thread_handle *handle); + +void action_executor_destroy(struct action_executor *executor); + +enum action_executor_status action_executor_enqueue( + struct action_executor *executor, + struct lttng_trigger *trigger, + struct lttng_evaluation *evaluation, + const struct lttng_credentials *object_creds, + struct notification_client_list *list); + +#endif /* ACTION_EXECUTOR_H */ diff --git a/src/bin/lttng-sessiond/health-sessiond.h b/src/bin/lttng-sessiond/health-sessiond.h index 7c9dbd0b3..b541822f8 100644 --- a/src/bin/lttng-sessiond/health-sessiond.h +++ b/src/bin/lttng-sessiond/health-sessiond.h @@ -23,6 +23,7 @@ enum health_type_sessiond { HEALTH_SESSIOND_TYPE_NOTIFICATION = 8, HEALTH_SESSIOND_TYPE_ROTATION = 9, HEALTH_SESSIOND_TYPE_TIMER = 10, + HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR = 11, NR_HEALTH_SESSIOND_TYPES, }; diff --git a/src/bin/lttng-sessiond/notification-thread-commands.c b/src/bin/lttng-sessiond/notification-thread-commands.c index a3b41f989..dc03e6a2d 100644 --- a/src/bin/lttng-sessiond/notification-thread-commands.c +++ b/src/bin/lttng-sessiond/notification-thread-commands.c @@ -280,3 +280,18 @@ void notification_thread_command_quit( ret = run_command_wait(handle, &cmd); assert(!ret && cmd.reply_code == LTTNG_OK); } + +int notification_thread_client_communication_update( + struct notification_thread_handle *handle, + notification_client_id id, + enum client_transmission_status transmission_status) +{ + struct notification_thread_command cmd = {}; + + init_notification_thread_command(&cmd); + + cmd.type = NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE; + cmd.parameters.client_communication_update.id = id; + cmd.parameters.client_communication_update.status = transmission_status; + return run_command_no_wait(handle, &cmd); +} diff --git a/src/bin/lttng-sessiond/notification-thread-commands.h b/src/bin/lttng-sessiond/notification-thread-commands.h index 11889f934..c09ebea46 100644 --- a/src/bin/lttng-sessiond/notification-thread-commands.h +++ b/src/bin/lttng-sessiond/notification-thread-commands.h @@ -28,6 +28,7 @@ enum notification_thread_command_type { NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING, NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED, NOTIFICATION_COMMAND_TYPE_QUIT, + NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE, }; struct notification_thread_command { @@ -63,6 +64,12 @@ struct notification_thread_command { uint64_t trace_archive_chunk_id; struct lttng_trace_archive_location *location; } session_rotation; + /* Client communication update. */ + struct { + notification_client_id id; + enum client_transmission_status status; + } client_communication_update; + } parameters; /* lttng_waiter on which to wait for command reply (optional). */ diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index 6ba6c415b..de654a4ee 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -5,6 +5,8 @@ * */ +#include "lttng/action/action.h" +#include "lttng/trigger/trigger-internal.h" #define _LGPL_SOURCE #include #include @@ -50,7 +52,7 @@ enum lttng_object_type { struct lttng_trigger_list_element { /* No ownership of the trigger object is assumed. */ - const struct lttng_trigger *trigger; + struct lttng_trigger *trigger; struct cds_list_head node; }; @@ -117,122 +119,6 @@ struct lttng_condition_list_element { struct cds_list_head node; }; -struct notification_client_list_element { - struct notification_client *client; - struct cds_list_head node; -}; - -/* - * Thread safety of notification_client and notification_client_list. - * - * The notification thread (main thread) and the action executor - * interact through client lists. Hence, when the action executor - * thread looks-up the list of clients subscribed to a given - * condition, it will acquire a reference to the list and lock it - * while attempting to communicate with the various clients. - * - * It is not necessary to reference-count clients as they are guaranteed - * to be 'alive' if they are present in a list and that list is locked. Indeed, - * removing references to the client from those subscription lists is part of - * the work performed on destruction of a client. - * - * No provision for other access scenarios are taken into account; - * this is the bare minimum to make these accesses safe and the - * notification thread's state is _not_ "thread-safe" in any general - * sense. - */ -struct notification_client_list { - pthread_mutex_t lock; - struct urcu_ref ref; - const struct lttng_trigger *trigger; - struct cds_list_head list; - /* Weak reference to container. */ - struct cds_lfht *notification_trigger_clients_ht; - struct cds_lfht_node notification_trigger_clients_ht_node; - /* call_rcu delayed reclaim. */ - struct rcu_head rcu_node; -}; - -struct notification_client { - /* Nests within the notification_client_list lock. */ - pthread_mutex_t lock; - notification_client_id id; - int socket; - /* Client protocol version. */ - uint8_t major, minor; - uid_t uid; - gid_t gid; - /* - * Indicates if the credentials and versions of the client have been - * checked. - */ - bool validated; - /* - * Conditions to which the client's notification channel is subscribed. - * List of struct lttng_condition_list_node. The condition member is - * owned by the client. - */ - struct cds_list_head condition_list; - struct cds_lfht_node client_socket_ht_node; - struct cds_lfht_node client_id_ht_node; - struct { - /* - * If a client's communication is inactive, it means that a - * fatal error has occurred (could be either a protocol error or - * the socket API returned a fatal error). No further - * communication should be attempted; the client is queued for - * clean-up. - */ - bool active; - struct { - /* - * During the reception of a message, the reception - * buffers' "size" is set to contain the current - * message's complete payload. - */ - struct lttng_dynamic_buffer buffer; - /* Bytes left to receive for the current message. */ - size_t bytes_to_receive; - /* Type of the message being received. */ - enum lttng_notification_channel_message_type msg_type; - /* - * Indicates whether or not credentials are expected - * from the client. - */ - bool expect_creds; - /* - * Indicates whether or not credentials were received - * from the client. - */ - bool creds_received; - /* Only used during credentials reception. */ - lttng_sock_cred creds; - } inbound; - struct { - /* - * Indicates whether or not a notification addressed to - * this client was dropped because a command reply was - * already buffered. - * - * A notification is dropped whenever the buffer is not - * empty. - */ - bool dropped_notification; - /* - * Indicates whether or not a command reply is already - * buffered. In this case, it means that the client is - * not consuming command replies before emitting a new - * one. This could be caused by a protocol error or a - * misbehaving/malicious client. - */ - bool queued_command_reply; - struct lttng_dynamic_buffer buffer; - } outbound; - } communication; - /* call_rcu delayed reclaim. */ - struct rcu_head rcu_node; -}; - struct channel_state_sample { struct channel_key key; struct cds_lfht_node channel_state_ht_node; @@ -293,8 +179,13 @@ void lttng_session_trigger_list_destroy( struct lttng_session_trigger_list *list); static int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list, - const struct lttng_trigger *trigger); + struct lttng_trigger *trigger); +static +int client_handle_transmission_status( + struct notification_client *client, + enum client_transmission_status transmission_status, + struct notification_thread_state *state); static int match_client_socket(struct cds_lfht_node *node, const void *key) @@ -722,7 +613,7 @@ error: return NULL; } -static +LTTNG_HIDDEN bool notification_client_list_get(struct notification_client_list *list) { return urcu_ref_get_unless_zero(&list->ref); @@ -785,6 +676,7 @@ void publish_notification_client_list( lttng_trigger_get_const_condition(list->trigger); assert(!list->notification_trigger_clients_ht); + notification_client_list_get(list); list->notification_trigger_clients_ht = state->notification_trigger_clients_ht; @@ -796,7 +688,7 @@ void publish_notification_client_list( rcu_read_unlock(); } -static +LTTNG_HIDDEN void notification_client_list_put(struct notification_client_list *list) { if (!list) { @@ -1326,6 +1218,34 @@ end: return client; } +/* + * Call with rcu_read_lock held (and hold for the lifetime of the returned + * client pointer). + */ +static +struct notification_client *get_client_from_id(notification_client_id id, + struct notification_thread_state *state) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + struct notification_client *client = NULL; + + cds_lfht_lookup(state->client_id_ht, + hash_client_id(id), + match_client_id, + &id, + &iter); + node = cds_lfht_iter_get_node(&iter); + if (!node) { + goto end; + } + + client = caa_container_of(node, struct notification_client, + client_id_ht_node); +end: + return client; +} + static bool buffer_usage_condition_applies_to_channel( const struct lttng_condition *condition, @@ -1529,7 +1449,7 @@ void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list) static int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list, - const struct lttng_trigger *trigger) + struct lttng_trigger *trigger) { int ret = 0; struct lttng_trigger_list_element *new_element = @@ -1886,6 +1806,10 @@ int handle_notification_thread_command_session_rotation( struct lttng_session_trigger_list *trigger_list; struct lttng_trigger_list_element *trigger_list_element; struct session_info *session_info; + const struct lttng_credentials session_creds = { + .uid = session_uid, + .gid = session_gid, + }; rcu_read_lock(); @@ -1911,12 +1835,11 @@ int handle_notification_thread_command_session_rotation( cds_list_for_each_entry(trigger_list_element, &trigger_list->list, node) { const struct lttng_condition *condition; - const struct lttng_action *action; - const struct lttng_trigger *trigger; + struct lttng_trigger *trigger; struct notification_client_list *client_list; struct lttng_evaluation *evaluation = NULL; enum lttng_condition_type condition_type; - bool client_list_is_empty; + enum action_executor_status executor_status; trigger = trigger_list_element->trigger; condition = lttng_trigger_get_const_condition(trigger); @@ -1931,26 +1854,7 @@ int handle_notification_thread_command_session_rotation( continue; } - action = lttng_trigger_get_const_action(trigger); - - /* Notify actions are the only type currently supported. */ - assert(lttng_action_get_type_const(action) == - LTTNG_ACTION_TYPE_NOTIFY); - client_list = get_client_list_from_condition(state, condition); - assert(client_list); - - pthread_mutex_lock(&client_list->lock); - client_list_is_empty = cds_list_empty(&client_list->list); - pthread_mutex_unlock(&client_list->lock); - if (client_list_is_empty) { - /* - * No clients interested in the evaluation's result, - * skip it. - */ - continue; - } - if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) { evaluation = lttng_evaluation_session_rotation_ongoing_create( trace_archive_chunk_id); @@ -1966,12 +1870,40 @@ int handle_notification_thread_command_session_rotation( goto put_list; } - /* Dispatch evaluation result to all clients. */ - ret = send_evaluation_to_clients(trigger_list_element->trigger, - evaluation, client_list, state, - session_info->uid, - session_info->gid); - lttng_evaluation_destroy(evaluation); + /* + * Ownership of `evaluation` transferred to the action executor + * no matter the result. + */ + executor_status = action_executor_enqueue(state->executor, + trigger, evaluation, &session_creds, + client_list); + evaluation = NULL; + switch (executor_status) { + case ACTION_EXECUTOR_STATUS_OK: + break; + case ACTION_EXECUTOR_STATUS_ERROR: + case ACTION_EXECUTOR_STATUS_INVALID: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + */ + ERR("Fatal error occurred while enqueuing action associated with session rotation trigger"); + ret = -1; + goto put_list; + case ACTION_EXECUTOR_STATUS_OVERFLOW: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + * + * Not a fatal error. + */ + WARN("No space left when enqueuing action associated with session rotation trigger"); + ret = 0; + goto put_list; + default: + abort(); + } + put_list: notification_client_list_put(client_list); if (caa_unlikely(ret)) { @@ -2025,7 +1957,7 @@ end: /* Must be called with RCU read lock held. */ static -int bind_trigger_to_matching_session(const struct lttng_trigger *trigger, +int bind_trigger_to_matching_session(struct lttng_trigger *trigger, struct notification_thread_state *state) { int ret = 0; @@ -2071,7 +2003,7 @@ end: /* Must be called with RCU read lock held. */ static -int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger, +int bind_trigger_to_matching_channels(struct lttng_trigger *trigger, struct notification_thread_state *state) { int ret = 0; @@ -2115,9 +2047,46 @@ end: return ret; } +static +bool is_trigger_action_notify(const struct lttng_trigger *trigger) +{ + bool is_notify = false; + unsigned int i, count; + enum lttng_action_status action_status; + const struct lttng_action *action = + lttng_trigger_get_const_action(trigger); + enum lttng_action_type action_type; + + assert(action); + action_type = lttng_action_get_type_const(action); + if (action_type == LTTNG_ACTION_TYPE_NOTIFY) { + is_notify = true; + goto end; + } else if (action_type != LTTNG_ACTION_TYPE_GROUP) { + goto end; + } + + action_status = lttng_action_group_get_count(action, &count); + assert(action_status == LTTNG_ACTION_STATUS_OK); + + for (i = 0; i < count; i++) { + const struct lttng_action *inner_action = + lttng_action_group_get_at_index( + action, i); + + action_type = lttng_action_get_type_const(inner_action); + if (action_type == LTTNG_ACTION_TYPE_NOTIFY) { + is_notify = true; + goto end; + } + } + +end: + return is_notify; +} + /* - * FIXME A client's credentials are not checked when registering a trigger, nor - * are they stored alongside with the trigger. + * FIXME A client's credentials are not checked when registering a trigger. * * The effects of this are benign since: * - The client will succeed in registering the trigger, as it is valid, @@ -2142,10 +2111,13 @@ int handle_notification_thread_command_register_trigger( struct notification_client *client; struct notification_client_list *client_list = NULL; struct lttng_trigger_ht_element *trigger_ht_element = NULL; - struct notification_client_list_element *client_list_element, *tmp; + struct notification_client_list_element *client_list_element; struct cds_lfht_node *node; struct cds_lfht_iter iter; bool free_trigger = true; + struct lttng_evaluation *evaluation = NULL; + struct lttng_credentials object_creds; + enum action_executor_status executor_status; rcu_read_lock(); @@ -2196,27 +2168,38 @@ int handle_notification_thread_command_register_trigger( * It is not skipped as this is the only action type currently * supported. */ - client_list = notification_client_list_create(trigger); - if (!client_list) { - ret = -1; - goto error_free_ht_element; - } - - /* Build a list of clients to which this new trigger applies. */ - cds_lfht_for_each_entry(state->client_socket_ht, &iter, client, - client_socket_ht_node) { - if (!trigger_applies_to_client(trigger, client)) { - continue; + if (is_trigger_action_notify(trigger)) { + client_list = notification_client_list_create(trigger); + if (!client_list) { + ret = -1; + goto error_free_ht_element; } - client_list_element = zmalloc(sizeof(*client_list_element)); - if (!client_list_element) { - ret = -1; - goto error_put_client_list; + /* Build a list of clients to which this new trigger applies. */ + cds_lfht_for_each_entry (state->client_socket_ht, &iter, client, + client_socket_ht_node) { + if (!trigger_applies_to_client(trigger, client)) { + continue; + } + + client_list_element = + zmalloc(sizeof(*client_list_element)); + if (!client_list_element) { + ret = -1; + goto error_put_client_list; + } + + CDS_INIT_LIST_HEAD(&client_list_element->node); + client_list_element->client = client; + cds_list_add(&client_list_element->node, + &client_list->list); } - CDS_INIT_LIST_HEAD(&client_list_element->node); - client_list_element->client = client; - cds_list_add(&client_list_element->node, &client_list->list); + + /* + * Client list ownership transferred to the + * notification_trigger_clients_ht. + */ + publish_notification_client_list(state, client_list); } switch (get_condition_binding_object(condition)) { @@ -2240,15 +2223,18 @@ int handle_notification_thread_command_register_trigger( case LTTNG_OBJECT_TYPE_NONE: break; default: - ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered"); + ERR("Unknown object type on which to bind a newly registered trigger was encountered"); ret = -1; goto error_put_client_list; } /* - * Since there is nothing preventing clients from subscribing to a - * condition before the corresponding trigger is registered, we have - * to evaluate this new condition right away. + * The new trigger's condition must be evaluated against the current + * state. + * + * In the case of `notify` action, nothing preventing clients from + * subscribing to a condition before the corresponding trigger is + * registered, we have to evaluate this new condition right away. * * At some point, we were waiting for the next "evaluation" (e.g. on * reception of a channel sample) to evaluate this new condition, but @@ -2270,24 +2256,72 @@ int handle_notification_thread_command_register_trigger( * current state. Otherwise, the next evaluation cycle may only see * that the evaluations remain the same (true for samples n-1 and n) and * the client will never know that the condition has been met. - * - * No need to lock the list here as it has not been published yet. */ - cds_list_for_each_entry_safe(client_list_element, tmp, - &client_list->list, node) { - ret = evaluate_condition_for_client(trigger, condition, - client_list_element->client, state); - if (ret) { - goto error_put_client_list; - } + switch (get_condition_binding_object(condition)) { + case LTTNG_OBJECT_TYPE_SESSION: + ret = evaluate_session_condition_for_client(condition, state, + &evaluation, &object_creds.uid, + &object_creds.gid); + break; + case LTTNG_OBJECT_TYPE_CHANNEL: + ret = evaluate_channel_condition_for_client(condition, state, + &evaluation, &object_creds.uid, + &object_creds.gid); + break; + case LTTNG_OBJECT_TYPE_NONE: + ret = 0; + goto error_put_client_list; + case LTTNG_OBJECT_TYPE_UNKNOWN: + default: + ret = -1; + goto error_put_client_list; + } + + if (ret) { + /* Fatal error. */ + goto error_put_client_list; + } + + DBG("Newly registered trigger's condition evaluated to %s", + evaluation ? "true" : "false"); + if (!evaluation) { + /* Evaluation yielded nothing. Normal exit. */ + ret = 0; + goto error_put_client_list; } /* - * Client list ownership transferred to the - * notification_trigger_clients_ht. + * Ownership of `evaluation` transferred to the action executor + * no matter the result. */ - publish_notification_client_list(state, client_list); - client_list = NULL; + executor_status = action_executor_enqueue(state->executor, trigger, + evaluation, &object_creds, client_list); + evaluation = NULL; + switch (executor_status) { + case ACTION_EXECUTOR_STATUS_OK: + break; + case ACTION_EXECUTOR_STATUS_ERROR: + case ACTION_EXECUTOR_STATUS_INVALID: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + */ + ERR("Fatal error occurred while enqueuing action associated to newly registered trigger"); + ret = -1; + goto error_put_client_list; + case ACTION_EXECUTOR_STATUS_OVERFLOW: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + * + * Not a fatal error. + */ + WARN("No space left when enqueuing action associated to newly registered trigger"); + ret = 0; + goto error_put_client_list; + default: + abort(); + } *cmd_result = LTTNG_OK; @@ -2467,6 +2501,36 @@ int handle_notification_thread_command( cmd->reply_code = LTTNG_OK; ret = 1; goto end; + case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE: + { + const enum client_transmission_status client_status = + cmd->parameters.client_communication_update + .status; + const notification_client_id client_id = + cmd->parameters.client_communication_update.id; + struct notification_client *client; + + rcu_read_lock(); + client = get_client_from_id(client_id, state); + + if (!client) { + /* + * Client error was probably already picked-up by the + * notification thread or it has disconnected + * gracefully while this command was queued. + */ + DBG("Failed to find notification client to update communication status, client id = %" PRIu64, + client_id); + ret = 0; + } else { + pthread_mutex_lock(&client->lock); + ret = client_handle_transmission_status( + client, client_status, state); + pthread_mutex_unlock(&client->lock); + } + rcu_read_unlock(); + break; + } default: ERR("[notification-thread] Unknown internal command received"); goto error_unlock; @@ -2773,8 +2837,7 @@ end: /* Client lock must be acquired by caller. */ static enum client_transmission_status client_flush_outgoing_queue( - struct notification_client *client, - struct notification_thread_state *state) + struct notification_client *client) { ssize_t ret; size_t to_send_count; @@ -2782,6 +2845,11 @@ enum client_transmission_status client_flush_outgoing_queue( ASSERT_LOCKED(client->lock); + if (!client->communication.active) { + status = CLIENT_TRANSMISSION_STATUS_FAIL; + goto end; + } + assert(client->communication.outbound.buffer.size != 0); to_send_count = client->communication.outbound.buffer.size; DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue", @@ -2803,7 +2871,6 @@ enum client_transmission_status client_flush_outgoing_queue( &client->communication.outbound.buffer, to_send_count); if (ret) { - status = CLIENT_TRANSMISSION_STATUS_ERROR; goto error; } status = CLIENT_TRANSMISSION_STATUS_QUEUED; @@ -2817,20 +2884,14 @@ enum client_transmission_status client_flush_outgoing_queue( ret = lttng_dynamic_buffer_set_size( &client->communication.outbound.buffer, 0); if (ret) { - status = CLIENT_TRANSMISSION_STATUS_ERROR; goto error; } status = CLIENT_TRANSMISSION_STATUS_COMPLETE; } - - ret = client_handle_transmission_status(client, status, state); - if (ret) { - goto error; - } - - return 0; +end: + return status; error: - return -1; + return CLIENT_TRANSMISSION_STATUS_ERROR; } /* Client lock must be acquired by caller. */ @@ -2848,6 +2909,7 @@ int client_send_command_reply(struct notification_client *client, .size = sizeof(reply), }; char buffer[sizeof(msg) + sizeof(reply)]; + enum client_transmission_status transmission_status; ASSERT_LOCKED(client->lock); @@ -2868,7 +2930,9 @@ int client_send_command_reply(struct notification_client *client, goto error; } - ret = client_flush_outgoing_queue(client, state); + transmission_status = client_flush_outgoing_queue(client); + ret = client_handle_transmission_status( + client, transmission_status, state); if (ret) { goto error; } @@ -2948,6 +3012,7 @@ int client_handle_message_handshake(struct notification_client *client, enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)]; + enum client_transmission_status transmission_status; pthread_mutex_lock(&client->lock); @@ -2991,7 +3056,9 @@ int client_handle_message_handshake(struct notification_client *client, client->validated = true; client->communication.active = true; - ret = client_flush_outgoing_queue(client, state); + transmission_status = client_flush_outgoing_queue(client); + ret = client_handle_transmission_status( + client, transmission_status, state); if (ret) { goto end; } @@ -3181,6 +3248,7 @@ int handle_notification_thread_client_out( { int ret; struct notification_client *client; + enum client_transmission_status transmission_status; client = get_client_from_socket(socket, state); if (!client) { @@ -3190,7 +3258,9 @@ int handle_notification_thread_client_out( } pthread_mutex_lock(&client->lock); - ret = client_flush_outgoing_queue(client, state); + transmission_status = client_flush_outgoing_queue(client); + ret = client_handle_transmission_status( + client, transmission_status, state); pthread_mutex_unlock(&client->lock); if (ret) { goto end; @@ -3362,22 +3432,62 @@ end: } static -int client_enqueue_dropped_notification(struct notification_client *client) +int client_notification_overflow(struct notification_client *client) { - int ret; - struct lttng_notification_channel_message msg = { + int ret = 0; + const struct lttng_notification_channel_message msg = { .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED, - .size = 0, }; ASSERT_LOCKED(client->lock); + DBG("Dropping notification addressed to client (socket fd = %i)", + client->socket); + if (client->communication.outbound.dropped_notification) { + /* + * The client already has a "notification dropped" message + * in its outgoing queue. Nothing to do since all + * of those messages are coalesced. + */ + goto end; + } + + client->communication.outbound.dropped_notification = true; ret = lttng_dynamic_buffer_append( &client->communication.outbound.buffer, &msg, sizeof(msg)); + if (ret) { + PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue", + client->socket); + } +end: return ret; } +static int client_handle_transmission_status_wrapper( + struct notification_client *client, + enum client_transmission_status status, + void *user_data) +{ + return client_handle_transmission_status(client, status, + (struct notification_thread_state *) user_data); +} + +static +int send_evaluation_to_clients(const struct lttng_trigger *trigger, + const struct lttng_evaluation *evaluation, + struct notification_client_list* client_list, + struct notification_thread_state *state, + uid_t object_uid, gid_t object_gid) +{ + return notification_client_list_send_evaluation(client_list, + lttng_trigger_get_const_condition(trigger), evaluation, + lttng_trigger_get_credentials(trigger), + &(struct lttng_credentials){ + .uid = object_uid, .gid = object_gid}, + client_handle_transmission_status_wrapper, state); +} + /* * Permission checks relative to notification channel clients are performed * here. Notice how object, client, and trigger credentials are involved in @@ -3411,24 +3521,26 @@ int client_enqueue_dropped_notification(struct notification_client *client) * interference from external users (those could, for instance, unregister * their triggers). */ -static -int send_evaluation_to_clients(const struct lttng_trigger *trigger, +LTTNG_HIDDEN +int notification_client_list_send_evaluation( + struct notification_client_list *client_list, + const struct lttng_condition *condition, const struct lttng_evaluation *evaluation, - struct notification_client_list* client_list, - struct notification_thread_state *state, - uid_t object_uid, gid_t object_gid) + const struct lttng_credentials *trigger_creds, + const struct lttng_credentials *source_object_creds, + report_client_transmission_result_cb client_report, + void *user_data) { int ret = 0; struct lttng_payload msg_payload; struct notification_client_list_element *client_list_element, *tmp; const struct lttng_notification notification = { - .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger), + .condition = (struct lttng_condition *) condition, .evaluation = (struct lttng_evaluation *) evaluation, }; struct lttng_notification_channel_message msg_header = { .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION, }; - const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger); lttng_payload_init(&msg_payload); @@ -3453,16 +3565,23 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger, pthread_mutex_lock(&client_list->lock); cds_list_for_each_entry_safe(client_list_element, tmp, &client_list->list, node) { + enum client_transmission_status transmission_status; struct notification_client *client = client_list_element->client; ret = 0; pthread_mutex_lock(&client->lock); - if (client->uid != object_uid && client->gid != object_gid && - client->uid != 0) { - /* Client is not allowed to monitor this channel. */ - DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger"); - goto unlock_client; + if (source_object_creds) { + if (client->uid != source_object_creds->uid && + client->gid != source_object_creds->gid && + client->uid != 0) { + /* + * Client is not allowed to monitor this + * object. + */ + DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger"); + goto unlock_client; + } } if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) { @@ -3480,17 +3599,10 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger, * notification since the socket spilled-over to the * queue. */ - DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)", - client->socket); - if (!client->communication.outbound.dropped_notification) { - client->communication.outbound.dropped_notification = true; - ret = client_enqueue_dropped_notification( - client); - if (ret) { - goto unlock_client; - } + ret = client_notification_overflow(client); + if (ret) { + goto unlock_client; } - goto unlock_client; } ret = lttng_dynamic_buffer_append_buffer( @@ -3500,7 +3612,8 @@ int send_evaluation_to_clients(const struct lttng_trigger *trigger, goto unlock_client; } - ret = client_flush_outgoing_queue(client, state); + transmission_status = client_flush_outgoing_queue(client); + ret = client_report(client, transmission_status, user_data); if (ret) { goto unlock_client; } @@ -3533,6 +3646,7 @@ int handle_notification_thread_channel_sample( bool previous_sample_available = false; struct channel_state_sample previous_sample, latest_sample; uint64_t previous_session_consumed_total, latest_session_consumed_total; + struct lttng_credentials channel_creds; /* * The monitoring pipe only holds messages smaller than PIPE_BUF, @@ -3651,41 +3765,31 @@ int handle_notification_thread_channel_sample( goto end_unlock; } + channel_creds = (typeof(channel_creds)) { + .uid = channel_info->session_info->uid, + .gid = channel_info->session_info->gid, + }; + trigger_list = caa_container_of(node, struct lttng_channel_trigger_list, channel_triggers_ht_node); cds_list_for_each_entry(trigger_list_element, &trigger_list->list, node) { const struct lttng_condition *condition; - const struct lttng_action *action; - const struct lttng_trigger *trigger; + struct lttng_trigger *trigger; struct notification_client_list *client_list = NULL; struct lttng_evaluation *evaluation = NULL; - bool client_list_is_empty; + enum action_executor_status executor_status; ret = 0; trigger = trigger_list_element->trigger; condition = lttng_trigger_get_const_condition(trigger); assert(condition); - action = lttng_trigger_get_const_action(trigger); - - /* Notify actions are the only type currently supported. */ - assert(lttng_action_get_type_const(action) == - LTTNG_ACTION_TYPE_NOTIFY); /* * Check if any client is subscribed to the result of this * evaluation. */ client_list = get_client_list_from_condition(state, condition); - assert(client_list); - client_list_is_empty = cds_list_empty(&client_list->list); - if (client_list_is_empty) { - /* - * No clients interested in the evaluation's result, - * skip it. - */ - goto put_list; - } ret = evaluate_buffer_condition(condition, &evaluation, state, previous_sample_available ? &previous_sample : NULL, @@ -3701,12 +3805,40 @@ int handle_notification_thread_channel_sample( goto put_list; } - /* Dispatch evaluation result to all clients. */ - ret = send_evaluation_to_clients(trigger_list_element->trigger, - evaluation, client_list, state, - channel_info->session_info->uid, - channel_info->session_info->gid); - lttng_evaluation_destroy(evaluation); + /* + * Ownership of `evaluation` transferred to the action executor + * no matter the result. + */ + executor_status = action_executor_enqueue(state->executor, + trigger, evaluation, &channel_creds, + client_list); + evaluation = NULL; + switch (executor_status) { + case ACTION_EXECUTOR_STATUS_OK: + break; + case ACTION_EXECUTOR_STATUS_ERROR: + case ACTION_EXECUTOR_STATUS_INVALID: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + */ + ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger"); + ret = -1; + goto put_list; + case ACTION_EXECUTOR_STATUS_OVERFLOW: + /* + * TODO Add trigger identification (name/id) when + * it is added to the API. + * + * Not a fatal error. + */ + WARN("No space left when enqueuing action associated with buffer-condition trigger"); + ret = 0; + goto put_list; + default: + abort(); + } + put_list: notification_client_list_put(client_list); if (caa_unlikely(ret)) { diff --git a/src/bin/lttng-sessiond/notification-thread-internal.h b/src/bin/lttng-sessiond/notification-thread-internal.h index b278f83d3..5aaac9d23 100644 --- a/src/bin/lttng-sessiond/notification-thread-internal.h +++ b/src/bin/lttng-sessiond/notification-thread-internal.h @@ -8,9 +8,19 @@ #ifndef NOTIFICATION_THREAD_INTERNAL_H #define NOTIFICATION_THREAD_INTERNAL_H +#include +#include +#include #include -#include +#include #include +#include +#include +#include +#include "notification-thread.h" + +struct lttng_evaluation; +struct notification_thread_handle; struct channel_key { uint64_t key; @@ -64,6 +74,122 @@ struct channel_info { struct rcu_head rcu_node; }; +struct notification_client_list_element { + struct notification_client *client; + struct cds_list_head node; +}; + +/* + * Thread safety of notification_client and notification_client_list. + * + * The notification thread (main thread) and the action executor + * interact through client lists. Hence, when the action executor + * thread looks-up the list of clients subscribed to a given + * condition, it will acquire a reference to the list and lock it + * while attempting to communicate with the various clients. + * + * It is not necessary to reference-count clients as they are guaranteed + * to be 'alive' if they are present in a list and that list is locked. Indeed, + * removing references to the client from those subscription lists is part of + * the work performed on destruction of a client. + * + * No provision for other access scenarios are taken into account; + * this is the bare minimum to make these accesses safe and the + * notification thread's state is _not_ "thread-safe" in any general + * sense. + */ +struct notification_client_list { + pthread_mutex_t lock; + struct urcu_ref ref; + const struct lttng_trigger *trigger; + struct cds_list_head list; + /* Weak reference to container. */ + struct cds_lfht *notification_trigger_clients_ht; + struct cds_lfht_node notification_trigger_clients_ht_node; + /* call_rcu delayed reclaim. */ + struct rcu_head rcu_node; +}; + +struct notification_client { + /* Nests within the notification_client_list lock. */ + pthread_mutex_t lock; + notification_client_id id; + int socket; + /* Client protocol version. */ + uint8_t major, minor; + uid_t uid; + gid_t gid; + /* + * Indicates if the credentials and versions of the client have been + * checked. + */ + bool validated; + /* + * Conditions to which the client's notification channel is subscribed. + * List of struct lttng_condition_list_node. The condition member is + * owned by the client. + */ + struct cds_list_head condition_list; + struct cds_lfht_node client_socket_ht_node; + struct cds_lfht_node client_id_ht_node; + struct { + /* + * If a client's communication is inactive, it means that a + * fatal error has occurred (could be either a protocol error or + * the socket API returned a fatal error). No further + * communication should be attempted; the client is queued for + * clean-up. + */ + bool active; + struct { + /* + * During the reception of a message, the reception + * buffers' "size" is set to contain the current + * message's complete payload. + */ + struct lttng_dynamic_buffer buffer; + /* Bytes left to receive for the current message. */ + size_t bytes_to_receive; + /* Type of the message being received. */ + enum lttng_notification_channel_message_type msg_type; + /* + * Indicates whether or not credentials are expected + * from the client. + */ + bool expect_creds; + /* + * Indicates whether or not credentials were received + * from the client. + */ + bool creds_received; + /* Only used during credentials reception. */ + lttng_sock_cred creds; + } inbound; + struct { + /* + * Indicates whether or not a notification addressed to + * this client was dropped because a command reply was + * already buffered. + * + * A notification is dropped whenever the buffer is not + * empty. + */ + bool dropped_notification; + /* + * Indicates whether or not a command reply is already + * buffered. In this case, it means that the client is + * not consuming command replies before emitting a new + * one. This could be caused by a protocol error or a + * misbehaving/malicious client. + */ + bool queued_command_reply; + struct lttng_dynamic_buffer buffer; + } outbound; + } communication; + /* call_rcu delayed reclaim. */ + struct rcu_head rcu_node; +}; + enum client_transmission_status { CLIENT_TRANSMISSION_STATUS_COMPLETE, CLIENT_TRANSMISSION_STATUS_QUEUED, @@ -72,4 +198,32 @@ enum client_transmission_status { /* Fatal error. */ CLIENT_TRANSMISSION_STATUS_ERROR, }; + +LTTNG_HIDDEN +bool notification_client_list_get(struct notification_client_list *list); + +LTTNG_HIDDEN +void notification_client_list_put(struct notification_client_list *list); + +typedef int (*report_client_transmission_result_cb)( + struct notification_client *client, + enum client_transmission_status status, + void *user_data); + +LTTNG_HIDDEN +int notification_client_list_send_evaluation( + struct notification_client_list *list, + const struct lttng_condition *condition, + const struct lttng_evaluation *evaluation, + const struct lttng_credentials *trigger_creds, + const struct lttng_credentials *source_object_creds, + report_client_transmission_result_cb client_report, + void *user_data); + +LTTNG_HIDDEN +int notification_thread_client_communication_update( + struct notification_thread_handle *handle, + notification_client_id id, + enum client_transmission_status transmission_status); + #endif /* NOTIFICATION_THREAD_INTERNAL_H */ diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c index c964e7c91..3ae8741d6 100644 --- a/src/bin/lttng-sessiond/notification-thread.c +++ b/src/bin/lttng-sessiond/notification-thread.c @@ -375,6 +375,9 @@ void fini_thread_state(struct notification_thread_state *state) notification_channel_socket_destroy( state->notification_channel_socket); } + if (state->executor) { + action_executor_destroy(state->executor); + } lttng_poll_clean(&state->events); } @@ -473,6 +476,11 @@ int init_thread_state(struct notification_thread_handle *handle, if (!state->triggers_ht) { goto error; } + + state->executor = action_executor_create(handle); + if (!state->executor) { + goto error; + } mark_thread_as_ready(handle); end: return 0; diff --git a/src/bin/lttng-sessiond/notification-thread.h b/src/bin/lttng-sessiond/notification-thread.h index 21cc086c6..134804f9d 100644 --- a/src/bin/lttng-sessiond/notification-thread.h +++ b/src/bin/lttng-sessiond/notification-thread.h @@ -8,16 +8,17 @@ #ifndef NOTIFICATION_THREAD_H #define NOTIFICATION_THREAD_H -#include -#include -#include -#include -#include +#include "action-executor.h" +#include "thread.h" #include #include +#include +#include #include #include -#include "thread.h" +#include +#include +#include typedef uint64_t notification_client_id; @@ -210,6 +211,7 @@ struct notification_thread_state { struct cds_lfht *sessions_ht; struct cds_lfht *triggers_ht; notification_client_id next_notification_client_id; + struct action_executor *executor; }; /* notification_thread_data takes ownership of the channel monitor pipes. */ diff --git a/src/bin/lttng-sessiond/thread.c b/src/bin/lttng-sessiond/thread.c index 26661a36f..ae7f45fd9 100644 --- a/src/bin/lttng-sessiond/thread.c +++ b/src/bin/lttng-sessiond/thread.c @@ -165,20 +165,22 @@ bool _lttng_thread_shutdown(struct lttng_thread *thread) result = false; goto end; } - /* Release the list's reference to the thread. */ - cds_list_del(&thread->node); - lttng_thread_put(thread); + DBG("Joined thread \"%s\"", thread->name); end: return result; } bool lttng_thread_shutdown(struct lttng_thread *thread) { - bool result; - - pthread_mutex_lock(&thread_list.lock); - result = _lttng_thread_shutdown(thread); - pthread_mutex_unlock(&thread_list.lock); + const bool result = _lttng_thread_shutdown(thread); + + if (result) { + /* Release the list's reference to the thread. */ + pthread_mutex_lock(&thread_list.lock); + cds_list_del(&thread->node); + lttng_thread_put(thread); + pthread_mutex_unlock(&thread_list.lock); + } return result; } diff --git a/src/lib/lttng-ctl/lttng-ctl-health.c b/src/lib/lttng-ctl/lttng-ctl-health.c index d6a3e4f13..91108c166 100644 --- a/src/lib/lttng-ctl/lttng-ctl-health.c +++ b/src/lib/lttng-ctl/lttng-ctl-health.c @@ -62,6 +62,7 @@ const char *sessiond_thread_name[NR_HEALTH_SESSIOND_TYPES] = { [ HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH ] = "Session daemon application registration dispatcher", [ HEALTH_SESSIOND_TYPE_ROTATION ] = "Session daemon rotation manager", [ HEALTH_SESSIOND_TYPE_TIMER ] = "Session daemon timer manager", + [ HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR ] = "Session daemon trigger action executor", }; static diff --git a/tests/unit/Makefile.am b/tests/unit/Makefile.am index 4ecc7a8c8..338772068 100644 --- a/tests/unit/Makefile.am +++ b/tests/unit/Makefile.am @@ -65,6 +65,7 @@ SESSIOND_OBJS = $(top_builddir)/src/bin/lttng-sessiond/buffer-registry.$(OBJEXT) $(top_builddir)/src/bin/lttng-sessiond/kernel.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/ht-cleanup.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/notification-thread.$(OBJEXT) \ + $(top_builddir)/src/bin/lttng-sessiond/action-executor.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/lttng-syscall.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/channel.$(OBJEXT) \ $(top_builddir)/src/bin/lttng-sessiond/agent.$(OBJEXT) \