#include "notification-thread-internal.h"
#include "session.h"
#include "thread.h"
+#include <common/dynamic-array.h>
#include <common/macros.h>
#include <common/optional.h>
#include <lttng/action/action-internal.h>
#define THREAD_NAME "Action Executor"
#define MAX_QUEUED_WORK_COUNT 8192
+/*
+ * A work item is composed of a dynamic array of sub-items which
+ * represent a flattened, and augmented, version of a trigger's actions.
+ *
+ * We cannot rely solely on the trigger's actions since each action can have an
+ * execution context we need to comply with.
+ *
+ * The notion of execution context is required since for some actions the
+ * associated object are referenced by name and not by id. This can lead to
+ * a number of ambiguities when executing an action work item.
+ *
+ * For example, let's take a simple trigger such as:
+ * - condition: ust event a
+ * - action: start session S
+ *
+ * At time T, session S exists.
+ * At T + 1, the event A is hit.
+ * At T + 2, the tracer event notification is received and the work item is
+ * queued. Here session S have an id of 1.
+ * At T + 3, the session S is destroyed and a new session S is created, with a
+ * resulting id of 200.
+ * At T +4, the work item is popped from the queue and begin execution and will
+ * start session S with an id of 200 instead of the session S id 1 that was
+ * present at the queuing phase.
+ *
+ * The context to be respected is the one when the work item is queued. If the
+ * execution context is not the same at the moment of execution, we skip the
+ * execution of that sub-item.
+ *
+ * It is the same policy in regards to the validity of the associated
+ * trigger object at the moment of execution, if the trigger is found to be
+ * unregistered, the execution is skipped.
+ */
+
struct action_work_item {
uint64_t id;
+
+ /*
+ * The actions to be executed with their respective execution context.
+ * See struct `action_work_subitem`.
+ */
+ struct lttng_dynamic_array *subitems;
+
+ /* Execution context data */
struct lttng_trigger *trigger;
struct lttng_evaluation *evaluation;
struct notification_client_list *client_list;
struct cds_list_head list_node;
};
+struct action_work_subitem {
+ struct lttng_action *action;
+ struct {
+ /* Used by actions targeting a session. */
+ LTTNG_OPTIONAL(uint64_t) session_id;
+ } context;
+};
+
struct action_executor {
struct lttng_thread *thread;
struct notification_thread_handle *notification_thread_handle;
*/
typedef int (*action_executor_handler)(struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *action);
+ struct action_work_subitem *item);
static int action_executor_notify_handler(struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static int action_executor_start_session_handler(
struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static int action_executor_stop_session_handler(
struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static int action_executor_rotate_session_handler(
struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static int action_executor_snapshot_session_handler(
struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static int action_executor_group_handler(struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static int action_executor_generic_handler(struct action_executor *executor,
const struct action_work_item *,
- struct lttng_action *);
+ struct action_work_subitem *);
static const action_executor_handler action_executors[] = {
[LTTNG_ACTION_TYPE_NOTIFY] = action_executor_notify_handler,
[LTTNG_ACTION_TYPE_GROUP] = action_executor_group_handler,
};
+/* Forward declaration */
+static int add_action_to_subitem_array(struct lttng_action *action,
+ struct lttng_dynamic_array *subitems);
+
+static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
+ struct lttng_dynamic_array *subitems);
+
+static void action_work_subitem_destructor(void *element)
+{
+ struct action_work_subitem *subitem = element;
+
+ lttng_action_put(subitem->action);
+}
+
static const char *get_action_name(const struct lttng_action *action)
{
const enum lttng_action_type action_type = lttng_action_get_type(action);
static int action_executor_notify_handler(struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action)
+ struct action_work_subitem *item)
{
return notification_client_list_send_evaluation(work_item->client_list,
work_item->trigger,
static int action_executor_start_session_handler(
struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action)
+ struct action_work_subitem *item)
{
int ret = 0;
const char *session_name;
enum lttng_action_status action_status;
struct ltt_session *session;
enum lttng_error_code cmd_ret;
+ struct lttng_action *action = item->action;
action_status = lttng_action_start_session_get_session_name(
action, &session_name);
goto end;
}
+ /*
+ * Validate if at the moment of the action was queued the session
+ * existed. If not skip the action altogether.
+ */
+ if (!item->context.session_id.is_set) {
+ DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ lttng_action_increase_execution_failure_count(action);
+ ret = 0;
+ goto end;
+ }
+
session_lock_list();
session = session_find_by_name(session_name);
if (!session) {
goto error_unlock_list;
}
+ /*
+ * Check if the session id is the same as when the work item was
+ * enqueued.
+ */
+ if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) {
+ DBG("Session id for session `%s` (id: %" PRIu64
+ " is not the same that was sampled (id: %" PRIu64
+ " at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, session->id,
+ LTTNG_OPTIONAL_GET(item->context.session_id),
+ get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ ret = 0;
+ goto error_unlock_list;
+ }
+
session_lock(session);
if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
goto error_dispose_session;
static int action_executor_stop_session_handler(
struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action)
+ struct action_work_subitem *item)
{
int ret = 0;
const char *session_name;
enum lttng_action_status action_status;
struct ltt_session *session;
enum lttng_error_code cmd_ret;
+ struct lttng_action *action = item->action;
action_status = lttng_action_stop_session_get_session_name(
action, &session_name);
goto end;
}
+ /*
+ * Validate if, at the moment the action was queued, the target session
+ * existed. If not, skip the action altogether.
+ */
+ if (!item->context.session_id.is_set) {
+ DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ lttng_action_increase_execution_failure_count(action);
+ ret = 0;
+ goto end;
+ }
+
session_lock_list();
session = session_find_by_name(session_name);
if (!session) {
goto error_unlock_list;
}
+ /*
+ * Check if the session id is the same as when the work item was
+ * enqueued
+ */
+ if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) {
+ DBG("Session id for session `%s` (id: %" PRIu64
+ " is not the same that was sampled (id: %" PRIu64
+ " at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, session->id,
+ LTTNG_OPTIONAL_GET(item->context.session_id),
+ get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ ret = 0;
+ goto error_unlock_list;
+ }
+
session_lock(session);
if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
goto error_dispose_session;
static int action_executor_rotate_session_handler(
struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action)
+ struct action_work_subitem *item)
{
int ret = 0;
const char *session_name;
enum lttng_action_status action_status;
struct ltt_session *session;
enum lttng_error_code cmd_ret;
+ struct lttng_action *action = item->action;
action_status = lttng_action_rotate_session_get_session_name(
action, &session_name);
goto end;
}
+ /*
+ * Validate if, at the moment the action was queued, the target session
+ * existed. If not, skip the action altogether.
+ */
+ if (!item->context.session_id.is_set) {
+ DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ lttng_action_increase_execution_failure_count(action);
+ ret = 0;
+ goto end;
+ }
+
session_lock_list();
session = session_find_by_name(session_name);
if (!session) {
goto error_unlock_list;
}
+ /*
+ * Check if the session id is the same as when the work item was
+ * enqueued.
+ */
+ if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) {
+ DBG("Session id for session `%s` (id: %" PRIu64
+ " is not the same that was sampled (id: %" PRIu64
+ " at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, session->id,
+ LTTNG_OPTIONAL_GET(item->context.session_id),
+ get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ ret = 0;
+ goto error_unlock_list;
+ }
+
session_lock(session);
if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
goto error_dispose_session;
static int action_executor_snapshot_session_handler(
struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action)
+ struct action_work_subitem *item)
{
int ret = 0;
const char *session_name;
const struct lttng_snapshot_output *snapshot_output =
&default_snapshot_output;
enum lttng_error_code cmd_ret;
+ struct lttng_action *action = item->action;
+
+ /*
+ * Validate if, at the moment the action was queued, the target session
+ * existed. If not, skip the action altogether.
+ */
+ if (!item->context.session_id.is_set) {
+ DBG("Session `%s` was not present at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ lttng_action_increase_execution_failure_count(action);
+ ret = 0;
+ goto end;
+ }
action_status = lttng_action_snapshot_session_get_session_name(
action, &session_name);
goto error_unlock_list;
}
+ /*
+ * Check if the session id is the same as when the work item was
+ * enqueued.
+ */
+ if (session->id != LTTNG_OPTIONAL_GET(item->context.session_id)) {
+ DBG("Session id for session `%s` (id: %" PRIu64
+ " is not the same that was sampled (id: %" PRIu64
+ " at the moment the work item was enqueued for %s` action of trigger `%s`",
+ session_name, session->id,
+ LTTNG_OPTIONAL_GET(item->context.session_id),
+ get_action_name(action),
+ get_trigger_name(work_item->trigger));
+ ret = 0;
+ goto error_unlock_list;
+ }
session_lock(session);
if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
static int action_executor_group_handler(struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action_group)
+ struct action_work_subitem *item)
{
- int ret = 0;
- unsigned int i, count;
- enum lttng_action_status action_status;
-
- action_status = lttng_action_group_get_count(action_group, &count);
- if (action_status != LTTNG_ACTION_STATUS_OK) {
- /* Fatal error. */
- ERR("Failed to get count of action in action group");
- ret = -1;
- goto end;
- }
-
- DBG("Action group has %u action%s", count, count != 1 ? "s" : "");
- for (i = 0; i < count; i++) {
- struct lttng_action *action =
- lttng_action_group_borrow_mutable_at_index(
- action_group, i);
-
- ret = action_executor_generic_handler(
- executor, work_item, action);
- if (ret) {
- ERR("Stopping the execution of the action group of trigger `%s` following a fatal error",
- get_trigger_name(work_item->trigger));
- goto end;
- }
- }
-end:
- return ret;
+ ERR("Execution of a group action by the action executor should never occur");
+ abort();
}
static int action_executor_generic_handler(struct action_executor *executor,
const struct action_work_item *work_item,
- struct lttng_action *action)
+ struct action_work_subitem *item)
{
int ret;
+ struct lttng_action *action = item->action;
const enum lttng_action_type action_type = lttng_action_get_type(action);
assert(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
get_action_name(action),
get_trigger_name(work_item->trigger),
work_item->id);
- ret = action_executors[action_type](executor, work_item, action);
+ ret = action_executors[action_type](executor, work_item, item);
end:
return ret;
}
struct action_work_item *work_item)
{
int ret;
- struct lttng_action *action =
- lttng_trigger_get_action(work_item->trigger);
+ size_t count, i;
DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
work_item->id, get_trigger_name(work_item->trigger));
- ret = action_executor_generic_handler(executor, work_item, action);
+
+ count = lttng_dynamic_array_get_count(work_item->subitems);
+ for (i = 0; i < count; i++) {
+ struct action_work_subitem *item;
+
+ item = lttng_dynamic_array_get_element(work_item->subitems, i);
+ ret = action_executor_generic_handler(
+ executor, work_item, item);
+ if (ret) {
+ goto end;
+ }
+ }
+end:
DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
work_item->id, get_trigger_name(work_item->trigger));
return ret;
lttng_trigger_put(work_item->trigger);
lttng_evaluation_destroy(work_item->evaluation);
notification_client_list_put(work_item->client_list);
+ lttng_dynamic_array_reset(work_item->subitems);
free(work_item);
}
}
/* RCU read-lock must be held by the caller. */
-enum action_executor_status action_executor_enqueue(
+enum action_executor_status action_executor_enqueue_trigger(
struct action_executor *executor,
struct lttng_trigger *trigger,
struct lttng_evaluation *evaluation,
const struct lttng_credentials *object_creds,
struct notification_client_list *client_list)
{
+ int ret;
enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
const uint64_t work_item_id = executor->next_work_item_id++;
struct action_work_item *work_item;
bool signal = false;
+ struct lttng_dynamic_array *subitems = NULL;
+
+ assert(trigger);
+
+ /* Build the array of action work subitems for the passed trigger. */
+ subitems = zmalloc(sizeof(*subitems));
+ if (!subitems) {
+ PERROR("Failed to allocate action executor subitems array: trigger name = `%s`",
+ get_trigger_name(trigger));
+ executor_status = ACTION_EXECUTOR_STATUS_ERROR;
+ goto error_unlock;
+ }
+
+ lttng_dynamic_array_init(subitems, sizeof(struct action_work_subitem),
+ action_work_subitem_destructor);
+
+ ret = populate_subitem_array_from_trigger(trigger, subitems);
+ if (ret) {
+ ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
+ get_trigger_name(trigger));
+ executor_status = ACTION_EXECUTOR_STATUS_ERROR;
+ goto error_unlock;
+ }
pthread_mutex_lock(&executor->work.lock);
/* Check for queue overflow. */
if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
/* Most likely spammy, remove if it is the case. */
- DBG("Refusing to enqueue action for trigger `%s` as work item %" PRIu64
- " (overflow)", get_trigger_name(trigger), work_item_id);
+ DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
+ get_trigger_name(trigger), work_item_id);
executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
goto error_unlock;
}
work_item = zmalloc(sizeof(*work_item));
if (!work_item) {
- PERROR("Failed to allocate action executor work item on behalf of trigger `%s`",
+ PERROR("Failed to allocate action executor work item: trigger name = '%s'",
get_trigger_name(trigger));
executor_status = ACTION_EXECUTOR_STATUS_ERROR;
goto error_unlock;
*work_item = (typeof(*work_item)){
.id = work_item_id,
+ /* Ownership transferred to the work item. */
+ .subitems = subitems,
.trigger = trigger,
/* Ownership transferred to the work item. */
.evaluation = evaluation,
};
evaluation = NULL;
+ subitems = NULL;
cds_list_add_tail(&work_item->list_node, &executor->work.list);
executor->work.pending_count++;
- DBG("Enqueued action for trigger `%s` as work item #%" PRIu64,
+ DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
get_trigger_name(trigger), work_item_id);
signal = true;
pthread_mutex_unlock(&executor->work.lock);
lttng_evaluation_destroy(evaluation);
+ if (subitems) {
+ lttng_dynamic_array_reset(subitems);
+ free(subitems);
+ }
return executor_status;
}
+
+static int add_action_to_subitem_array(struct lttng_action *action,
+ struct lttng_dynamic_array *subitems)
+{
+ int ret;
+ enum lttng_action_type type = lttng_action_get_type(action);
+ const char *session_name = NULL;
+ enum lttng_action_status status;
+ struct action_work_subitem subitem = {
+ .action = NULL,
+ .context = {
+ .session_id = LTTNG_OPTIONAL_INIT_UNSET,
+ },
+ };
+
+ assert(action);
+ assert(subitems);
+
+ if (type == LTTNG_ACTION_TYPE_GROUP) {
+ unsigned int count, i;
+
+ status = lttng_action_group_get_count(action, &count);
+ assert(status == LTTNG_ACTION_STATUS_OK);
+
+ for (i = 0; i < count; i++) {
+ struct lttng_action *inner_action = NULL;
+
+ inner_action = lttng_action_group_borrow_mutable_at_index(
+ action, i);
+ assert(inner_action);
+ ret = add_action_to_subitem_array(
+ inner_action, subitems);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ /*
+ * Go directly to the end since there is no need to add the
+ * group action by itself to the subitems array.
+ */
+ goto end;
+ }
+
+ /* Gather execution context. */
+ switch (type) {
+ case LTTNG_ACTION_TYPE_NOTIFY:
+ break;
+ case LTTNG_ACTION_TYPE_START_SESSION:
+ status = lttng_action_start_session_get_session_name(
+ action, &session_name);
+ assert(status == LTTNG_ACTION_STATUS_OK);
+ break;
+ case LTTNG_ACTION_TYPE_STOP_SESSION:
+ status = lttng_action_stop_session_get_session_name(
+ action, &session_name);
+ assert(status == LTTNG_ACTION_STATUS_OK);
+ break;
+ case LTTNG_ACTION_TYPE_ROTATE_SESSION:
+ status = lttng_action_rotate_session_get_session_name(
+ action, &session_name);
+ assert(status == LTTNG_ACTION_STATUS_OK);
+ break;
+ case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
+ status = lttng_action_snapshot_session_get_session_name(
+ action, &session_name);
+ assert(status == LTTNG_ACTION_STATUS_OK);
+ break;
+ case LTTNG_ACTION_TYPE_GROUP:
+ case LTTNG_ACTION_TYPE_UNKNOWN:
+ /* Fallthrough */
+ default:
+ abort();
+ break;
+ }
+
+ /*
+ * Fetch the session execution context info as needed.
+ * Note that we could decide to not add an action for which we know the
+ * execution will not happen (i.e no session exists for that name). For
+ * now we leave the decision to skip to the action executor for sake of
+ * simplicity and consistency.
+ */
+ if (session_name != NULL) {
+ struct ltt_session *session = NULL;
+
+ session_lock_list();
+ session = session_find_by_name(session_name);
+ if (session) {
+ LTTNG_OPTIONAL_SET(&subitem.context.session_id,
+ session->id);
+ session_put(session);
+ }
+
+ session_unlock_list();
+ }
+
+ /* Get a reference to the action. */
+ lttng_action_get(action);
+ subitem.action = action;
+
+ ret = lttng_dynamic_array_add_element(subitems, &subitem);
+ if (ret) {
+ ERR("Failed to add work subitem to the subitem array");
+ lttng_action_put(action);
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
+ struct lttng_dynamic_array *subitems)
+{
+ struct lttng_action *action;
+
+ action = lttng_trigger_get_action(trigger);
+ assert(action);
+
+ return add_action_to_subitem_array(action, subitems);
+}