manage-kernel.c manage-kernel.h \
manage-consumer.c manage-consumer.h \
clear.c clear.h \
- tracker.c tracker.h
+ tracker.c tracker.h \
+ action-executor.c action-executor.h
if HAVE_LIBLTTNG_UST_CTL
lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
--- /dev/null
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "action-executor.h"
+#include "cmd.h"
+#include "health-sessiond.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-internal.h"
+#include "session.h"
+#include "thread.h"
+#include <common/macros.h>
+#include <common/optional.h>
+#include <lttng/action/action-internal.h>
+#include <lttng/action/group.h>
+#include <lttng/action/notify.h>
+#include <lttng/action/rotate-session.h>
+#include <lttng/action/snapshot-session.h>
+#include <lttng/action/start-session.h>
+#include <lttng/action/stop-session.h>
+#include <lttng/condition/evaluation.h>
+#include <lttng/lttng-error.h>
+#include <lttng/trigger/trigger-internal.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <urcu/list.h>
+
+#define THREAD_NAME "Action Executor"
+#define MAX_QUEUED_WORK_COUNT 8192
+
+struct action_work_item {
+ uint64_t id;
+ struct lttng_trigger *trigger;
+ struct lttng_evaluation *evaluation;
+ struct notification_client_list *client_list;
+ LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
+ struct cds_list_head list_node;
+};
+
+struct action_executor {
+ struct lttng_thread *thread;
+ struct notification_thread_handle *notification_thread_handle;
+ struct {
+ uint64_t pending_count;
+ struct cds_list_head list;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+ } work;
+ bool should_quit;
+ uint64_t next_work_item_id;
+};
+
+typedef int (*action_executor_handler)(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *action);
+
+static int action_executor_notify_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+static int action_executor_start_session_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+static int action_executor_stop_session_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+static int action_executor_rotate_session_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+static int action_executor_snapshot_session_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+static int action_executor_group_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+static int action_executor_generic_handler(struct action_executor *executor,
+ const struct action_work_item *,
+ const struct lttng_action *);
+
+static const action_executor_handler action_executors[] = {
+ [LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
+ [LTTNG_ACTION_TYPE_START_SESSION] = action_executor_start_session_handler,
+ [LTTNG_ACTION_TYPE_STOP_SESSION] = action_executor_stop_session_handler,
+ [LTTNG_ACTION_TYPE_ROTATE_SESSION] = action_executor_rotate_session_handler,
+ [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = action_executor_snapshot_session_handler,
+ [LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
+};
+
+static const char *action_type_names[] = {
+ [LTTNG_ACTION_TYPE_NOTIFY] = "Notify",
+ [LTTNG_ACTION_TYPE_START_SESSION] = "Start session",
+ [LTTNG_ACTION_TYPE_STOP_SESSION] = "Stop session",
+ [LTTNG_ACTION_TYPE_ROTATE_SESSION] = "Rotate session",
+ [LTTNG_ACTION_TYPE_SNAPSHOT_SESSION] = "Snapshot session",
+ [LTTNG_ACTION_TYPE_GROUP] = "Group",
+};
+
+static const char *get_action_name(const struct lttng_action *action)
+{
+ return action_type_names[lttng_action_get_type_const(action)];
+}
+
+static int client_handle_transmission_status(
+ struct notification_client *client,
+ enum client_transmission_status status,
+ void *user_data)
+{
+ int ret = 0;
+ struct action_executor *executor = user_data;
+ bool update_communication = true;
+
+ ASSERT_LOCKED(client->lock);
+
+ switch (status) {
+ case CLIENT_TRANSMISSION_STATUS_COMPLETE:
+ DBG("Successfully sent full notification to client, client_id = %" PRIu64,
+ client->id);
+ update_communication = false;
+ break;
+ case CLIENT_TRANSMISSION_STATUS_QUEUED:
+ DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
+ client->id);
+ break;
+ case CLIENT_TRANSMISSION_STATUS_FAIL:
+ DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
+ client->id);
+ client->communication.active = false;
+ break;
+ default:
+ ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
+ client->id);
+ client->communication.active = false;
+ ret = -1;
+ goto end;
+ }
+
+ if (!update_communication) {
+ goto end;
+ }
+
+ ret = notification_thread_client_communication_update(
+ executor->notification_thread_handle, client->id,
+ status);
+end:
+ return ret;
+}
+
+static int action_executor_notify_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action)
+{
+ return notification_client_list_send_evaluation(work_item->client_list,
+ lttng_trigger_get_const_condition(work_item->trigger),
+ work_item->evaluation,
+ lttng_trigger_get_credentials(work_item->trigger),
+ LTTNG_OPTIONAL_GET_PTR(work_item->object_creds),
+ client_handle_transmission_status,
+ executor);
+}
+
+static int action_executor_start_session_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action)
+{
+ int ret = 0;
+ const char *session_name;
+ enum lttng_action_status action_status;
+ struct ltt_session *session;
+
+ action_status = lttng_action_start_session_get_session_name(
+ action, &session_name);
+ if (action_status != LTTNG_ACTION_STATUS_OK) {
+ ERR("Failed to get session name from \"%s\" action",
+ get_action_name(action));
+ ret = -1;
+ goto end;
+ }
+
+ session_lock_list();
+ session = session_find_by_name(session_name);
+ if (session) {
+ enum lttng_error_code cmd_ret;
+
+ session_lock(session);
+ cmd_ret = cmd_start_trace(session);
+ session_unlock(session);
+
+ switch (cmd_ret) {
+ case LTTNG_OK:
+ DBG("Successfully started session \"%s\" on behalf of trigger \"%p\"",
+ session_name,
+ work_item->trigger);
+ break;
+ case LTTNG_ERR_TRACE_ALREADY_STARTED:
+ DBG("Attempted to start session \"%s\" on behalf of trigger \"%p\" but it was already started",
+ session_name,
+ work_item->trigger);
+ break;
+ default:
+ WARN("Failed to start session \"%s\" on behalf of trigger \"%p\": %s",
+ session_name,
+ work_item->trigger,
+ lttng_strerror(-cmd_ret));
+ break;
+ }
+ session_put(session);
+ } else {
+ DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+ session_name, get_action_name(action),
+ work_item->trigger);
+ }
+ session_unlock_list();
+end:
+ return ret;
+}
+
+static int action_executor_stop_session_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action)
+{
+ int ret = 0;
+ const char *session_name;
+ enum lttng_action_status action_status;
+ struct ltt_session *session;
+
+ action_status = lttng_action_stop_session_get_session_name(
+ action, &session_name);
+ if (action_status != LTTNG_ACTION_STATUS_OK) {
+ ERR("Failed to get session name from \"%s\" action",
+ get_action_name(action));
+ ret = -1;
+ goto end;
+ }
+
+ session_lock_list();
+ session = session_find_by_name(session_name);
+ if (session) {
+ enum lttng_error_code cmd_ret;
+
+ session_lock(session);
+ cmd_ret = cmd_stop_trace(session);
+ session_unlock(session);
+
+ switch (cmd_ret) {
+ case LTTNG_OK:
+ DBG("Successfully stopped session \"%s\" on behalf of trigger \"%p\"",
+ session_name,
+ work_item->trigger);
+ break;
+ case LTTNG_ERR_TRACE_ALREADY_STOPPED:
+ DBG("Attempted to stop session \"%s\" on behalf of trigger \"%p\" but it was already stopped",
+ session_name,
+ work_item->trigger);
+ break;
+ default:
+ WARN("Failed to stop session \"%s\" on behalf of trigger \"%p\": %s",
+ session_name,
+ work_item->trigger,
+ lttng_strerror(-cmd_ret));
+ break;
+ }
+ session_put(session);
+ } else {
+ DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+ session_name, get_action_name(action),
+ work_item->trigger);
+ }
+ session_unlock_list();
+end:
+ return ret;
+}
+
+static int action_executor_rotate_session_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action)
+{
+ int ret = 0;
+ const char *session_name;
+ enum lttng_action_status action_status;
+ struct ltt_session *session;
+
+ action_status = lttng_action_rotate_session_get_session_name(
+ action, &session_name);
+ if (action_status != LTTNG_ACTION_STATUS_OK) {
+ ERR("Failed to get session name from \"%s\" action",
+ get_action_name(action));
+ ret = -1;
+ goto end;
+ }
+
+ session_lock_list();
+ session = session_find_by_name(session_name);
+ if (session) {
+ enum lttng_error_code cmd_ret;
+
+ session_lock(session);
+ cmd_ret = cmd_rotate_session(session, NULL, false,
+ LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+ session_unlock(session);
+
+ switch (cmd_ret) {
+ case LTTNG_OK:
+ DBG("Successfully started rotation of session \"%s\" on behalf of trigger \"%p\"",
+ session_name,
+ work_item->trigger);
+ break;
+ case LTTNG_ERR_ROTATION_PENDING:
+ DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%p\" but a rotation is already ongoing",
+ session_name,
+ work_item->trigger);
+ break;
+ case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+ case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+ DBG("Attempted to start a rotation of session \"%s\" on behalf of trigger \"%p\" but a rotation has already been completed since the last stop or clear",
+ session_name,
+ work_item->trigger);
+ break;
+ default:
+ WARN("Failed to start a rotation of session \"%s\" on behalf of trigger \"%p\": %s",
+ session_name,
+ work_item->trigger,
+ lttng_strerror(-cmd_ret));
+ break;
+ }
+ session_put(session);
+ } else {
+ DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+ session_name, get_action_name(action),
+ work_item->trigger);
+ }
+ session_unlock_list();
+end:
+ return ret;
+}
+
+static int action_executor_snapshot_session_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action)
+{
+ int ret = 0;
+ const char *session_name;
+ enum lttng_action_status action_status;
+ struct ltt_session *session;
+ const struct lttng_snapshot_output default_snapshot_output = {
+ .max_size = UINT64_MAX,
+ };
+ const struct lttng_snapshot_output *snapshot_output =
+ &default_snapshot_output;
+
+ action_status = lttng_action_snapshot_session_get_session_name(
+ action, &session_name);
+ if (action_status != LTTNG_ACTION_STATUS_OK) {
+ ERR("Failed to get session name from \"%s\" action",
+ get_action_name(action));
+ ret = -1;
+ goto end;
+ }
+
+ action_status = lttng_action_snapshot_session_get_output(
+ action, &snapshot_output);
+ if (action_status != LTTNG_ACTION_STATUS_OK &&
+ action_status != LTTNG_ACTION_STATUS_UNSET) {
+ ERR("Failed to get output from \"%s\" action",
+ get_action_name(action));
+ ret = -1;
+ goto end;
+ }
+
+ session_lock_list();
+ session = session_find_by_name(session_name);
+ if (session) {
+ enum lttng_error_code cmd_ret;
+
+ session_lock(session);
+ cmd_ret = cmd_snapshot_record(session, snapshot_output, 0);
+ session_unlock(session);
+
+ switch (cmd_ret) {
+ case LTTNG_OK:
+ DBG("Successfully recorded snapshot of session \"%s\" on behalf of trigger \"%p\"",
+ session_name,
+ work_item->trigger);
+ break;
+ default:
+ WARN("Failed to record snapshot of session \"%s\" on behalf of trigger \"%p\": %s",
+ session_name,
+ work_item->trigger,
+ lttng_strerror(-cmd_ret));
+ break;
+ }
+ session_put(session);
+ } else {
+ DBG("Failed to find session \"%s\" by name while executing \"%s\" action of trigger \"%p\"",
+ session_name, get_action_name(action),
+ work_item->trigger);
+ }
+ session_unlock_list();
+end:
+ return ret;
+}
+
+static int action_executor_group_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action_group)
+{
+ int ret = 0;
+ unsigned int i, count;
+ enum lttng_action_status action_status;
+
+ action_status = lttng_action_group_get_count(action_group, &count);
+ if (action_status != LTTNG_ACTION_STATUS_OK) {
+ /* Fatal error. */
+ ERR("Failed to get count of action in action group");
+ ret = -1;
+ goto end;
+ }
+
+ DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
+ for (i = 0; i < count; i++) {
+ const struct lttng_action *action =
+ lttng_action_group_get_at_index(
+ action_group, i);
+
+ ret = action_executor_generic_handler(
+ executor, work_item, action);
+ if (ret) {
+ ERR("Stopping the execution of the action group of trigger \"%p\" following a fatal error",
+ work_item->trigger);
+ goto end;
+ }
+ }
+end:
+ return ret;
+}
+
+static int action_executor_generic_handler(struct action_executor *executor,
+ const struct action_work_item *work_item,
+ const struct lttng_action *action)
+{
+ DBG("Executing action \"%s\" of trigger \"%p\" action work item %" PRIu64,
+ get_action_name(action),
+ work_item->trigger,
+ work_item->id);
+
+ return action_executors[lttng_action_get_type_const(action)](
+ executor, work_item, action);
+}
+
+static int action_work_item_execute(struct action_executor *executor,
+ struct action_work_item *work_item)
+{
+ int ret;
+ const struct lttng_action *action =
+ lttng_trigger_get_const_action(work_item->trigger);
+
+ DBG("Starting execution of action work item %" PRIu64 " of trigger \"%p\"",
+ work_item->id, work_item->trigger);
+ ret = action_executor_generic_handler(executor, work_item, action);
+ DBG("Completed execution of action work item %" PRIu64 " of trigger \"%p\"",
+ work_item->id, work_item->trigger);
+ return ret;
+}
+
+static void action_work_item_destroy(struct action_work_item *work_item)
+{
+ lttng_trigger_put(work_item->trigger);
+ lttng_evaluation_destroy(work_item->evaluation);
+ notification_client_list_put(work_item->client_list);
+ free(work_item);
+}
+
+static void *action_executor_thread(void *_data)
+{
+ struct action_executor *executor = _data;
+
+ assert(executor);
+
+ health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
+
+ rcu_register_thread();
+ rcu_thread_online();
+
+ DBG("Entering work execution loop");
+ pthread_mutex_lock(&executor->work.lock);
+ while (!executor->should_quit) {
+ int ret;
+ struct action_work_item *work_item;
+
+ health_code_update();
+ if (executor->work.pending_count == 0) {
+ health_poll_entry();
+ DBG("No work items enqueued, entering wait");
+ pthread_cond_wait(&executor->work.cond,
+ &executor->work.lock);
+ DBG("Woke-up from wait");
+ health_poll_exit();
+ continue;
+ }
+
+ /* Pop item from front of the listwith work lock held. */
+ work_item = cds_list_first_entry(&executor->work.list,
+ struct action_work_item, list_node);
+ cds_list_del(&work_item->list_node);
+ executor->work.pending_count--;
+
+ /*
+ * Work can be performed without holding the work lock,
+ * allowing new items to be queued.
+ */
+ pthread_mutex_unlock(&executor->work.lock);
+ ret = action_work_item_execute(executor, work_item);
+ action_work_item_destroy(work_item);
+ if (ret) {
+ /* Fatal error. */
+ break;
+ }
+
+ health_code_update();
+ pthread_mutex_lock(&executor->work.lock);
+ }
+
+ pthread_mutex_unlock(&executor->work.lock);
+ DBG("Left work execution loop");
+
+ health_code_update();
+
+ rcu_thread_offline();
+ rcu_unregister_thread();
+ health_unregister(health_sessiond);
+
+ return NULL;
+}
+
+static bool shutdown_action_executor_thread(void *_data)
+{
+ struct action_executor *executor = _data;
+
+ executor->should_quit = true;
+ pthread_cond_signal(&executor->work.cond);
+ return true;
+}
+
+static void clean_up_action_executor_thread(void *_data)
+{
+ struct action_executor *executor = _data;
+
+ assert(cds_list_empty(&executor->work.list));
+
+ pthread_mutex_destroy(&executor->work.lock);
+ pthread_cond_destroy(&executor->work.cond);
+ free(executor);
+}
+
+struct action_executor *action_executor_create(
+ struct notification_thread_handle *handle)
+{
+ struct action_executor *executor = zmalloc(sizeof(*executor));
+
+ if (!executor) {
+ goto end;
+ }
+
+ CDS_INIT_LIST_HEAD(&executor->work.list);
+ pthread_cond_init(&executor->work.cond, NULL);
+ pthread_mutex_init(&executor->work.lock, NULL);
+ executor->notification_thread_handle = handle;
+
+ executor->thread = lttng_thread_create(THREAD_NAME,
+ action_executor_thread, shutdown_action_executor_thread,
+ clean_up_action_executor_thread, executor);
+end:
+ return executor;
+}
+
+void action_executor_destroy(struct action_executor *executor)
+{
+ struct action_work_item *work_item, *tmp;
+
+ /* TODO Wait for work list to drain? */
+ lttng_thread_shutdown(executor->thread);
+ pthread_mutex_lock(&executor->work.lock);
+ if (executor->work.pending_count != 0) {
+ WARN("%" PRIu64
+ " trigger action%s still queued for execution and will be discarded",
+ executor->work.pending_count,
+ executor->work.pending_count == 1 ? " is" :
+ "s are");
+ }
+
+ cds_list_for_each_entry_safe (
+ work_item, tmp, &executor->work.list, list_node) {
+ WARN("Discarding action work item %" PRIu64
+ " associated to trigger \"%p\"",
+ work_item->id, work_item->trigger);
+ cds_list_del(&work_item->list_node);
+ action_work_item_destroy(work_item);
+ }
+ pthread_mutex_unlock(&executor->work.lock);
+ lttng_thread_put(executor->thread);
+}
+
+/* RCU read-lock must be held by the caller. */
+enum action_executor_status action_executor_enqueue(
+ struct action_executor *executor,
+ struct lttng_trigger *trigger,
+ struct lttng_evaluation *evaluation,
+ const struct lttng_credentials *object_creds,
+ struct notification_client_list *client_list)
+{
+ enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
+ const uint64_t work_item_id = executor->next_work_item_id++;
+ struct action_work_item *work_item;
+ bool signal = false;
+
+ pthread_mutex_lock(&executor->work.lock);
+ /* Check for queue overflow. */
+ if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
+ /* Most likely spammy, remove if it is the case. */
+ DBG("Refusing to enqueue action for trigger \"%p\" as work item %" PRIu64
+ " (overflow)",
+ trigger, work_item_id);
+ executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
+ goto error_unlock;
+ }
+
+ work_item = zmalloc(sizeof(*work_item));
+ if (!work_item) {
+ PERROR("Failed to allocate action executor work item on behalf of trigger \"%p\"",
+ trigger);
+ executor_status = ACTION_EXECUTOR_STATUS_ERROR;
+ goto error_unlock;
+ }
+
+ lttng_trigger_get(trigger);
+ if (client_list) {
+ const bool reference_acquired =
+ notification_client_list_get(client_list);
+
+ assert(reference_acquired);
+ }
+
+ *work_item = (typeof(*work_item)){
+ .id = work_item_id,
+ .trigger = trigger,
+ /* Ownership transferred to the work item. */
+ .evaluation = evaluation,
+ .object_creds = {
+ .is_set = !!object_creds,
+ .value = object_creds ? *object_creds :
+ (typeof(work_item->object_creds.value)) {},
+ },
+ .client_list = client_list,
+ .list_node = CDS_LIST_HEAD_INIT(work_item->list_node),
+ };
+
+ evaluation = NULL;
+ cds_list_add_tail(&work_item->list_node, &executor->work.list);
+ executor->work.pending_count++;
+ DBG("Enqueued action for trigger \"%p\" as work item %" PRIu64,
+ trigger, work_item_id);
+ signal = true;
+
+error_unlock:
+ pthread_mutex_unlock(&executor->work.lock);
+ if (signal) {
+ pthread_cond_signal(&executor->work.cond);
+ }
+
+ lttng_evaluation_destroy(evaluation);
+ return executor_status;
+}
--- /dev/null
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#ifndef ACTION_EXECUTOR_H
+#define ACTION_EXECUTOR_H
+
+struct action_executor;
+struct notification_thread_handle;
+struct lttng_evaluation;
+struct lttng_trigger;
+struct notification_client_list;
+struct lttng_credentials;
+
+enum action_executor_status {
+ ACTION_EXECUTOR_STATUS_OK,
+ ACTION_EXECUTOR_STATUS_OVERFLOW,
+ ACTION_EXECUTOR_STATUS_ERROR,
+ ACTION_EXECUTOR_STATUS_INVALID,
+};
+
+struct action_executor *action_executor_create(
+ struct notification_thread_handle *handle);
+
+void action_executor_destroy(struct action_executor *executor);
+
+enum action_executor_status action_executor_enqueue(
+ struct action_executor *executor,
+ struct lttng_trigger *trigger,
+ struct lttng_evaluation *evaluation,
+ const struct lttng_credentials *object_creds,
+ struct notification_client_list *list);
+
+#endif /* ACTION_EXECUTOR_H */
HEALTH_SESSIOND_TYPE_NOTIFICATION = 8,
HEALTH_SESSIOND_TYPE_ROTATION = 9,
HEALTH_SESSIOND_TYPE_TIMER = 10,
+ HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR = 11,
NR_HEALTH_SESSIOND_TYPES,
};
ret = run_command_wait(handle, &cmd);
assert(!ret && cmd.reply_code == LTTNG_OK);
}
+
+int notification_thread_client_communication_update(
+ struct notification_thread_handle *handle,
+ notification_client_id id,
+ enum client_transmission_status transmission_status)
+{
+ struct notification_thread_command cmd = {};
+
+ init_notification_thread_command(&cmd);
+
+ cmd.type = NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE;
+ cmd.parameters.client_communication_update.id = id;
+ cmd.parameters.client_communication_update.status = transmission_status;
+ return run_command_no_wait(handle, &cmd);
+}
NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING,
NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED,
NOTIFICATION_COMMAND_TYPE_QUIT,
+ NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE,
};
struct notification_thread_command {
uint64_t trace_archive_chunk_id;
struct lttng_trace_archive_location *location;
} session_rotation;
+ /* Client communication update. */
+ struct {
+ notification_client_id id;
+ enum client_transmission_status status;
+ } client_communication_update;
+
} parameters;
/* lttng_waiter on which to wait for command reply (optional). */
struct lttng_trigger_list_element {
/* No ownership of the trigger object is assumed. */
- const struct lttng_trigger *trigger;
+ struct lttng_trigger *trigger;
struct cds_list_head node;
};
struct cds_list_head node;
};
-struct notification_client_list_element {
- struct notification_client *client;
- struct cds_list_head node;
-};
-
-/*
- * Thread safety of notification_client and notification_client_list.
- *
- * The notification thread (main thread) and the action executor
- * interact through client lists. Hence, when the action executor
- * thread looks-up the list of clients subscribed to a given
- * condition, it will acquire a reference to the list and lock it
- * while attempting to communicate with the various clients.
- *
- * It is not necessary to reference-count clients as they are guaranteed
- * to be 'alive' if they are present in a list and that list is locked. Indeed,
- * removing references to the client from those subscription lists is part of
- * the work performed on destruction of a client.
- *
- * No provision for other access scenarios are taken into account;
- * this is the bare minimum to make these accesses safe and the
- * notification thread's state is _not_ "thread-safe" in any general
- * sense.
- */
-struct notification_client_list {
- pthread_mutex_t lock;
- struct urcu_ref ref;
- const struct lttng_trigger *trigger;
- struct cds_list_head list;
- /* Weak reference to container. */
- struct cds_lfht *notification_trigger_clients_ht;
- struct cds_lfht_node notification_trigger_clients_ht_node;
- /* call_rcu delayed reclaim. */
- struct rcu_head rcu_node;
-};
-
-struct notification_client {
- /* Nests within the notification_client_list lock. */
- pthread_mutex_t lock;
- notification_client_id id;
- int socket;
- /* Client protocol version. */
- uint8_t major, minor;
- uid_t uid;
- gid_t gid;
- /*
- * Indicates if the credentials and versions of the client have been
- * checked.
- */
- bool validated;
- /*
- * Conditions to which the client's notification channel is subscribed.
- * List of struct lttng_condition_list_node. The condition member is
- * owned by the client.
- */
- struct cds_list_head condition_list;
- struct cds_lfht_node client_socket_ht_node;
- struct cds_lfht_node client_id_ht_node;
- struct {
- /*
- * If a client's communication is inactive, it means that a
- * fatal error has occurred (could be either a protocol error or
- * the socket API returned a fatal error). No further
- * communication should be attempted; the client is queued for
- * clean-up.
- */
- bool active;
- struct {
- /*
- * During the reception of a message, the reception
- * buffers' "size" is set to contain the current
- * message's complete payload.
- */
- struct lttng_dynamic_buffer buffer;
- /* Bytes left to receive for the current message. */
- size_t bytes_to_receive;
- /* Type of the message being received. */
- enum lttng_notification_channel_message_type msg_type;
- /*
- * Indicates whether or not credentials are expected
- * from the client.
- */
- bool expect_creds;
- /*
- * Indicates whether or not credentials were received
- * from the client.
- */
- bool creds_received;
- /* Only used during credentials reception. */
- lttng_sock_cred creds;
- } inbound;
- struct {
- /*
- * Indicates whether or not a notification addressed to
- * this client was dropped because a command reply was
- * already buffered.
- *
- * A notification is dropped whenever the buffer is not
- * empty.
- */
- bool dropped_notification;
- /*
- * Indicates whether or not a command reply is already
- * buffered. In this case, it means that the client is
- * not consuming command replies before emitting a new
- * one. This could be caused by a protocol error or a
- * misbehaving/malicious client.
- */
- bool queued_command_reply;
- struct lttng_dynamic_buffer buffer;
- } outbound;
- } communication;
- /* call_rcu delayed reclaim. */
- struct rcu_head rcu_node;
-};
-
struct channel_state_sample {
struct channel_key key;
struct cds_lfht_node channel_state_ht_node;
struct lttng_session_trigger_list *list);
static
int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
- const struct lttng_trigger *trigger);
+ struct lttng_trigger *trigger);
+static
+int client_handle_transmission_status(
+ struct notification_client *client,
+ enum client_transmission_status transmission_status,
+ struct notification_thread_state *state);
static
int match_client_socket(struct cds_lfht_node *node, const void *key)
return NULL;
}
-static
+LTTNG_HIDDEN
bool notification_client_list_get(struct notification_client_list *list)
{
return urcu_ref_get_unless_zero(&list->ref);
rcu_read_unlock();
}
-static
+LTTNG_HIDDEN
void notification_client_list_put(struct notification_client_list *list)
{
if (!list) {
return client;
}
+/*
+ * Call with rcu_read_lock held (and hold for the lifetime of the returned
+ * client pointer).
+ */
+static
+struct notification_client *get_client_from_id(notification_client_id id,
+ struct notification_thread_state *state)
+{
+ struct cds_lfht_iter iter;
+ struct cds_lfht_node *node;
+ struct notification_client *client = NULL;
+
+ cds_lfht_lookup(state->client_id_ht,
+ hash_client_id(id),
+ match_client_id,
+ &id,
+ &iter);
+ node = cds_lfht_iter_get_node(&iter);
+ if (!node) {
+ goto end;
+ }
+
+ client = caa_container_of(node, struct notification_client,
+ client_id_ht_node);
+end:
+ return client;
+}
+
static
bool buffer_usage_condition_applies_to_channel(
const struct lttng_condition *condition,
static
int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
- const struct lttng_trigger *trigger)
+ struct lttng_trigger *trigger)
{
int ret = 0;
struct lttng_trigger_list_element *new_element =
struct lttng_session_trigger_list *trigger_list;
struct lttng_trigger_list_element *trigger_list_element;
struct session_info *session_info;
+ const struct lttng_credentials session_creds = {
+ .uid = session_uid,
+ .gid = session_gid,
+ };
rcu_read_lock();
node) {
const struct lttng_condition *condition;
const struct lttng_action *action;
- const struct lttng_trigger *trigger;
+ struct lttng_trigger *trigger;
struct notification_client_list *client_list;
struct lttng_evaluation *evaluation = NULL;
enum lttng_condition_type condition_type;
bool client_list_is_empty;
+ enum action_executor_status executor_status;
trigger = trigger_list_element->trigger;
condition = lttng_trigger_get_const_condition(trigger);
goto put_list;
}
- /* Dispatch evaluation result to all clients. */
- ret = send_evaluation_to_clients(trigger_list_element->trigger,
- evaluation, client_list, state,
- session_info->uid,
- session_info->gid);
- lttng_evaluation_destroy(evaluation);
+ /*
+ * Ownership of `evaluation` transferred to the action executor
+ * no matter the result.
+ */
+ executor_status = action_executor_enqueue(state->executor,
+ trigger, evaluation, &session_creds,
+ client_list);
+ evaluation = NULL;
+ switch (executor_status) {
+ case ACTION_EXECUTOR_STATUS_OK:
+ break;
+ case ACTION_EXECUTOR_STATUS_ERROR:
+ case ACTION_EXECUTOR_STATUS_INVALID:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ */
+ ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
+ ret = -1;
+ goto put_list;
+ case ACTION_EXECUTOR_STATUS_OVERFLOW:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ *
+ * Not a fatal error.
+ */
+ WARN("No space left when enqueuing action associated with session rotation trigger");
+ ret = 0;
+ goto put_list;
+ default:
+ abort();
+ }
+
put_list:
notification_client_list_put(client_list);
if (caa_unlikely(ret)) {
/* Must be called with RCU read lock held. */
static
-int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
+int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
struct notification_thread_state *state)
{
int ret = 0;
/* Must be called with RCU read lock held. */
static
-int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
+int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
struct notification_thread_state *state)
{
int ret = 0;
cmd->reply_code = LTTNG_OK;
ret = 1;
goto end;
+ case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
+ {
+ const enum client_transmission_status client_status =
+ cmd->parameters.client_communication_update
+ .status;
+ const notification_client_id client_id =
+ cmd->parameters.client_communication_update.id;
+ struct notification_client *client;
+
+ rcu_read_lock();
+ client = get_client_from_id(client_id, state);
+
+ if (!client) {
+ /*
+ * Client error was probably already picked-up by the
+ * notification thread or it has disconnected
+ * gracefully while this command was queued.
+ */
+ DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
+ client_id);
+ ret = 0;
+ } else {
+ pthread_mutex_lock(&client->lock);
+ ret = client_handle_transmission_status(
+ client, client_status, state);
+ pthread_mutex_unlock(&client->lock);
+ }
+ rcu_read_unlock();
+ break;
+ }
default:
ERR("[notification-thread] Unknown internal command received");
goto error_unlock;
/* Client lock must be acquired by caller. */
static
enum client_transmission_status client_flush_outgoing_queue(
- struct notification_client *client,
- struct notification_thread_state *state)
+ struct notification_client *client)
{
ssize_t ret;
size_t to_send_count;
ASSERT_LOCKED(client->lock);
+ if (!client->communication.active) {
+ status = CLIENT_TRANSMISSION_STATUS_FAIL;
+ goto end;
+ }
+
assert(client->communication.outbound.buffer.size != 0);
to_send_count = client->communication.outbound.buffer.size;
DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
&client->communication.outbound.buffer,
to_send_count);
if (ret) {
- status = CLIENT_TRANSMISSION_STATUS_ERROR;
goto error;
}
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
ret = lttng_dynamic_buffer_set_size(
&client->communication.outbound.buffer, 0);
if (ret) {
- status = CLIENT_TRANSMISSION_STATUS_ERROR;
goto error;
}
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
}
-
- ret = client_handle_transmission_status(client, status, state);
- if (ret) {
- goto error;
- }
-
- return 0;
+end:
+ return status;
error:
- return -1;
+ return CLIENT_TRANSMISSION_STATUS_ERROR;
}
/* Client lock must be acquired by caller. */
.size = sizeof(reply),
};
char buffer[sizeof(msg) + sizeof(reply)];
+ enum client_transmission_status transmission_status;
ASSERT_LOCKED(client->lock);
goto error;
}
- ret = client_flush_outgoing_queue(client, state);
+ transmission_status = client_flush_outgoing_queue(client);
+ ret = client_handle_transmission_status(
+ client, transmission_status, state);
if (ret) {
goto error;
}
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
+ enum client_transmission_status transmission_status;
pthread_mutex_lock(&client->lock);
client->validated = true;
client->communication.active = true;
- ret = client_flush_outgoing_queue(client, state);
+ transmission_status = client_flush_outgoing_queue(client);
+ ret = client_handle_transmission_status(
+ client, transmission_status, state);
if (ret) {
goto end;
}
{
int ret;
struct notification_client *client;
+ enum client_transmission_status transmission_status;
client = get_client_from_socket(socket, state);
if (!client) {
}
pthread_mutex_lock(&client->lock);
- ret = client_flush_outgoing_queue(client, state);
+ transmission_status = client_flush_outgoing_queue(client);
+ ret = client_handle_transmission_status(
+ client, transmission_status, state);
pthread_mutex_unlock(&client->lock);
if (ret) {
goto end;
}
static
-int client_enqueue_dropped_notification(struct notification_client *client)
+int client_notification_overflow(struct notification_client *client)
{
- int ret;
- struct lttng_notification_channel_message msg = {
+ int ret = 0;
+ const struct lttng_notification_channel_message msg = {
.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
- .size = 0,
};
ASSERT_LOCKED(client->lock);
+ DBG("Dropping notification addressed to client (socket fd = %i)",
+ client->socket);
+ if (client->communication.outbound.dropped_notification) {
+ /*
+ * The client already has a "notification dropped" message
+ * in its outgoing queue. Nothing to do since all
+ * of those messages are coalesced.
+ */
+ goto end;
+ }
+
+ client->communication.outbound.dropped_notification = true;
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer, &msg,
sizeof(msg));
+ if (ret) {
+ PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
+ client->socket);
+ }
+end:
return ret;
}
+static int client_handle_transmission_status_wrapper(
+ struct notification_client *client,
+ enum client_transmission_status status,
+ void *user_data)
+{
+ return client_handle_transmission_status(client, status,
+ (struct notification_thread_state *) user_data);
+}
+
+static
+int send_evaluation_to_clients(const struct lttng_trigger *trigger,
+ const struct lttng_evaluation *evaluation,
+ struct notification_client_list* client_list,
+ struct notification_thread_state *state,
+ uid_t object_uid, gid_t object_gid)
+{
+ return notification_client_list_send_evaluation(client_list,
+ lttng_trigger_get_const_condition(trigger), evaluation,
+ lttng_trigger_get_credentials(trigger),
+ &(struct lttng_credentials){
+ .uid = object_uid, .gid = object_gid},
+ client_handle_transmission_status_wrapper, state);
+}
+
/*
* Permission checks relative to notification channel clients are performed
* here. Notice how object, client, and trigger credentials are involved in
* interference from external users (those could, for instance, unregister
* their triggers).
*/
-static
-int send_evaluation_to_clients(const struct lttng_trigger *trigger,
+LTTNG_HIDDEN
+int notification_client_list_send_evaluation(
+ struct notification_client_list *client_list,
+ const struct lttng_condition *condition,
const struct lttng_evaluation *evaluation,
- struct notification_client_list* client_list,
- struct notification_thread_state *state,
- uid_t object_uid, gid_t object_gid)
+ const struct lttng_credentials *trigger_creds,
+ const struct lttng_credentials *source_object_creds,
+ report_client_transmission_result_cb client_report,
+ void *user_data)
{
int ret = 0;
struct lttng_payload msg_payload;
struct notification_client_list_element *client_list_element, *tmp;
const struct lttng_notification notification = {
- .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
+ .condition = (struct lttng_condition *) condition,
.evaluation = (struct lttng_evaluation *) evaluation,
};
struct lttng_notification_channel_message msg_header = {
.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
};
- const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
lttng_payload_init(&msg_payload);
pthread_mutex_lock(&client_list->lock);
cds_list_for_each_entry_safe(client_list_element, tmp,
&client_list->list, node) {
+ enum client_transmission_status transmission_status;
struct notification_client *client =
client_list_element->client;
ret = 0;
pthread_mutex_lock(&client->lock);
- if (client->uid != object_uid && client->gid != object_gid &&
- client->uid != 0) {
- /* Client is not allowed to monitor this channel. */
- DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
- goto unlock_client;
+ if (source_object_creds) {
+ if (client->uid != source_object_creds->uid &&
+ client->gid != source_object_creds->gid &&
+ client->uid != 0) {
+ /*
+ * Client is not allowed to monitor this
+ * object.
+ */
+ DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
+ goto unlock_client;
+ }
}
if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
* notification since the socket spilled-over to the
* queue.
*/
- DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
- client->socket);
- if (!client->communication.outbound.dropped_notification) {
- client->communication.outbound.dropped_notification = true;
- ret = client_enqueue_dropped_notification(
- client);
- if (ret) {
- goto unlock_client;
- }
+ ret = client_notification_overflow(client);
+ if (ret) {
+ goto unlock_client;
}
- goto unlock_client;
}
ret = lttng_dynamic_buffer_append_buffer(
goto unlock_client;
}
- ret = client_flush_outgoing_queue(client, state);
+ transmission_status = client_flush_outgoing_queue(client);
+ ret = client_report(client, transmission_status, user_data);
if (ret) {
goto unlock_client;
}
bool previous_sample_available = false;
struct channel_state_sample previous_sample, latest_sample;
uint64_t previous_session_consumed_total, latest_session_consumed_total;
+ struct lttng_credentials channel_creds;
/*
* The monitoring pipe only holds messages smaller than PIPE_BUF,
goto end_unlock;
}
+ channel_creds = (typeof(channel_creds)) {
+ .uid = channel_info->session_info->uid,
+ .gid = channel_info->session_info->gid,
+ };
+
trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
channel_triggers_ht_node);
cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
node) {
const struct lttng_condition *condition;
const struct lttng_action *action;
- const struct lttng_trigger *trigger;
+ struct lttng_trigger *trigger;
struct notification_client_list *client_list = NULL;
struct lttng_evaluation *evaluation = NULL;
bool client_list_is_empty;
+ enum action_executor_status executor_status;
ret = 0;
trigger = trigger_list_element->trigger;
goto put_list;
}
- /* Dispatch evaluation result to all clients. */
- ret = send_evaluation_to_clients(trigger_list_element->trigger,
- evaluation, client_list, state,
- channel_info->session_info->uid,
- channel_info->session_info->gid);
- lttng_evaluation_destroy(evaluation);
+ /*
+ * Ownership of `evaluation` transferred to the action executor
+ * no matter the result.
+ */
+ executor_status = action_executor_enqueue(state->executor,
+ trigger, evaluation, &channel_creds,
+ client_list);
+ evaluation = NULL;
+ switch (executor_status) {
+ case ACTION_EXECUTOR_STATUS_OK:
+ break;
+ case ACTION_EXECUTOR_STATUS_ERROR:
+ case ACTION_EXECUTOR_STATUS_INVALID:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ */
+ ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
+ ret = -1;
+ goto put_list;
+ case ACTION_EXECUTOR_STATUS_OVERFLOW:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ *
+ * Not a fatal error.
+ */
+ WARN("No space left when enqueuing action associated with buffer-condition trigger");
+ ret = 0;
+ goto put_list;
+ default:
+ abort();
+ }
+
put_list:
notification_client_list_put(client_list);
if (caa_unlikely(ret)) {
#ifndef NOTIFICATION_THREAD_INTERNAL_H
#define NOTIFICATION_THREAD_INTERNAL_H
+#include <common/compat/socket.h>
+#include <common/credentials.h>
+#include <lttng/notification/channel-internal.h>
#include <lttng/ref-internal.h>
-#include <urcu/rculfhash.h>
+#include <stdbool.h>
#include <unistd.h>
+#include <urcu/rculfhash.h>
+#include <urcu/ref.h>
+#include <urcu/call-rcu.h>
+#include "notification-thread.h"
+
+struct lttng_evaluation;
+struct notification_thread_handle;
struct channel_key {
uint64_t key;
struct rcu_head rcu_node;
};
+struct notification_client_list_element {
+ struct notification_client *client;
+ struct cds_list_head node;
+};
+
+/*
+ * Thread safety of notification_client and notification_client_list.
+ *
+ * The notification thread (main thread) and the action executor
+ * interact through client lists. Hence, when the action executor
+ * thread looks-up the list of clients subscribed to a given
+ * condition, it will acquire a reference to the list and lock it
+ * while attempting to communicate with the various clients.
+ *
+ * It is not necessary to reference-count clients as they are guaranteed
+ * to be 'alive' if they are present in a list and that list is locked. Indeed,
+ * removing references to the client from those subscription lists is part of
+ * the work performed on destruction of a client.
+ *
+ * No provision for other access scenarios are taken into account;
+ * this is the bare minimum to make these accesses safe and the
+ * notification thread's state is _not_ "thread-safe" in any general
+ * sense.
+ */
+struct notification_client_list {
+ pthread_mutex_t lock;
+ struct urcu_ref ref;
+ const struct lttng_trigger *trigger;
+ struct cds_list_head list;
+ /* Weak reference to container. */
+ struct cds_lfht *notification_trigger_clients_ht;
+ struct cds_lfht_node notification_trigger_clients_ht_node;
+ /* call_rcu delayed reclaim. */
+ struct rcu_head rcu_node;
+};
+
+struct notification_client {
+ /* Nests within the notification_client_list lock. */
+ pthread_mutex_t lock;
+ notification_client_id id;
+ int socket;
+ /* Client protocol version. */
+ uint8_t major, minor;
+ uid_t uid;
+ gid_t gid;
+ /*
+ * Indicates if the credentials and versions of the client have been
+ * checked.
+ */
+ bool validated;
+ /*
+ * Conditions to which the client's notification channel is subscribed.
+ * List of struct lttng_condition_list_node. The condition member is
+ * owned by the client.
+ */
+ struct cds_list_head condition_list;
+ struct cds_lfht_node client_socket_ht_node;
+ struct cds_lfht_node client_id_ht_node;
+ struct {
+ /*
+ * If a client's communication is inactive, it means that a
+ * fatal error has occurred (could be either a protocol error or
+ * the socket API returned a fatal error). No further
+ * communication should be attempted; the client is queued for
+ * clean-up.
+ */
+ bool active;
+ struct {
+ /*
+ * During the reception of a message, the reception
+ * buffers' "size" is set to contain the current
+ * message's complete payload.
+ */
+ struct lttng_dynamic_buffer buffer;
+ /* Bytes left to receive for the current message. */
+ size_t bytes_to_receive;
+ /* Type of the message being received. */
+ enum lttng_notification_channel_message_type msg_type;
+ /*
+ * Indicates whether or not credentials are expected
+ * from the client.
+ */
+ bool expect_creds;
+ /*
+ * Indicates whether or not credentials were received
+ * from the client.
+ */
+ bool creds_received;
+ /* Only used during credentials reception. */
+ lttng_sock_cred creds;
+ } inbound;
+ struct {
+ /*
+ * Indicates whether or not a notification addressed to
+ * this client was dropped because a command reply was
+ * already buffered.
+ *
+ * A notification is dropped whenever the buffer is not
+ * empty.
+ */
+ bool dropped_notification;
+ /*
+ * Indicates whether or not a command reply is already
+ * buffered. In this case, it means that the client is
+ * not consuming command replies before emitting a new
+ * one. This could be caused by a protocol error or a
+ * misbehaving/malicious client.
+ */
+ bool queued_command_reply;
+ struct lttng_dynamic_buffer buffer;
+ } outbound;
+ } communication;
+ /* call_rcu delayed reclaim. */
+ struct rcu_head rcu_node;
+};
+
enum client_transmission_status {
CLIENT_TRANSMISSION_STATUS_COMPLETE,
CLIENT_TRANSMISSION_STATUS_QUEUED,
/* Fatal error. */
CLIENT_TRANSMISSION_STATUS_ERROR,
};
+
+LTTNG_HIDDEN
+bool notification_client_list_get(struct notification_client_list *list);
+
+LTTNG_HIDDEN
+void notification_client_list_put(struct notification_client_list *list);
+
+typedef int (*report_client_transmission_result_cb)(
+ struct notification_client *client,
+ enum client_transmission_status status,
+ void *user_data);
+
+LTTNG_HIDDEN
+int notification_client_list_send_evaluation(
+ struct notification_client_list *list,
+ const struct lttng_condition *condition,
+ const struct lttng_evaluation *evaluation,
+ const struct lttng_credentials *trigger_creds,
+ const struct lttng_credentials *source_object_creds,
+ report_client_transmission_result_cb client_report,
+ void *user_data);
+
+LTTNG_HIDDEN
+int notification_thread_client_communication_update(
+ struct notification_thread_handle *handle,
+ notification_client_id id,
+ enum client_transmission_status transmission_status);
+
#endif /* NOTIFICATION_THREAD_INTERNAL_H */
notification_channel_socket_destroy(
state->notification_channel_socket);
}
+ if (state->executor) {
+ action_executor_destroy(state->executor);
+ }
lttng_poll_clean(&state->events);
}
if (!state->triggers_ht) {
goto error;
}
+
+ state->executor = action_executor_create(handle);
+ if (!state->executor) {
+ goto error;
+ }
mark_thread_as_ready(handle);
end:
return 0;
#ifndef NOTIFICATION_THREAD_H
#define NOTIFICATION_THREAD_H
-#include <urcu/list.h>
-#include <urcu.h>
-#include <urcu/rculfhash.h>
-#include <lttng/trigger/trigger.h>
-#include <common/pipe.h>
+#include "action-executor.h"
+#include "thread.h"
#include <common/compat/poll.h>
#include <common/hashtable/hashtable.h>
+#include <common/pipe.h>
+#include <lttng/trigger/trigger.h>
#include <pthread.h>
#include <semaphore.h>
-#include "thread.h"
+#include <urcu.h>
+#include <urcu/list.h>
+#include <urcu/rculfhash.h>
typedef uint64_t notification_client_id;
struct cds_lfht *sessions_ht;
struct cds_lfht *triggers_ht;
notification_client_id next_notification_client_id;
+ struct action_executor *executor;
};
/* notification_thread_data takes ownership of the channel monitor pipes. */
result = false;
goto end;
}
- /* Release the list's reference to the thread. */
- cds_list_del(&thread->node);
- lttng_thread_put(thread);
+ DBG("Joined thread \"%s\"", thread->name);
end:
return result;
}
bool lttng_thread_shutdown(struct lttng_thread *thread)
{
- bool result;
-
- pthread_mutex_lock(&thread_list.lock);
- result = _lttng_thread_shutdown(thread);
- pthread_mutex_unlock(&thread_list.lock);
+ const bool result = _lttng_thread_shutdown(thread);
+
+ if (result) {
+ /* Release the list's reference to the thread. */
+ pthread_mutex_lock(&thread_list.lock);
+ cds_list_del(&thread->node);
+ lttng_thread_put(thread);
+ pthread_mutex_unlock(&thread_list.lock);
+ }
return result;
}
[ HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH ] = "Session daemon application registration dispatcher",
[ HEALTH_SESSIOND_TYPE_ROTATION ] = "Session daemon rotation manager",
[ HEALTH_SESSIOND_TYPE_TIMER ] = "Session daemon timer manager",
+ [ HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR ] = "Session daemon trigger action executor",
};
static
$(top_builddir)/src/bin/lttng-sessiond/kernel.$(OBJEXT) \
$(top_builddir)/src/bin/lttng-sessiond/ht-cleanup.$(OBJEXT) \
$(top_builddir)/src/bin/lttng-sessiond/notification-thread.$(OBJEXT) \
+ $(top_builddir)/src/bin/lttng-sessiond/action-executor.$(OBJEXT) \
$(top_builddir)/src/bin/lttng-sessiond/lttng-syscall.$(OBJEXT) \
$(top_builddir)/src/bin/lttng-sessiond/channel.$(OBJEXT) \
$(top_builddir)/src/bin/lttng-sessiond/agent.$(OBJEXT) \