Fix: consumerd: consumed size miscomputed during statistics sampling
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.cpp
CommitLineData
f2b3ef9f
JG
1/*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
c9e313bc
SM
8#include "action-executor.hpp"
9#include "cmd.hpp"
10#include "health-sessiond.hpp"
11#include "lttng-sessiond.hpp"
12#include "notification-thread-internal.hpp"
13#include "session.hpp"
14#include "thread.hpp"
28ab034a 15
c9e313bc
SM
16#include <common/dynamic-array.hpp>
17#include <common/macros.hpp>
18#include <common/optional.hpp>
56047f5a 19#include <common/urcu.hpp>
28ab034a 20
c9e313bc
SM
21#include <lttng/action/action-internal.hpp>
22#include <lttng/action/list-internal.hpp>
ad63a966 23#include <lttng/action/list.h>
c9e313bc 24#include <lttng/action/notify-internal.hpp>
f2b3ef9f
JG
25#include <lttng/action/notify.h>
26#include <lttng/action/rotate-session.h>
27#include <lttng/action/snapshot-session.h>
28#include <lttng/action/start-session.h>
29#include <lttng/action/stop-session.h>
30#include <lttng/condition/evaluation.h>
c9e313bc 31#include <lttng/condition/event-rule-matches-internal.hpp>
f2b3ef9f 32#include <lttng/lttng-error.h>
c9e313bc 33#include <lttng/trigger/trigger-internal.hpp>
28ab034a 34
f2b3ef9f
JG
35#include <pthread.h>
36#include <stdbool.h>
37#include <stddef.h>
38#include <urcu/list.h>
39
28ab034a 40#define THREAD_NAME "Action Executor"
f2b3ef9f
JG
41#define MAX_QUEUED_WORK_COUNT 8192
42
f1494934
JG
43struct action_executor {
44 struct lttng_thread *thread;
45 struct notification_thread_handle *notification_thread_handle;
46 struct {
47 uint64_t pending_count;
48 struct cds_list_head list;
49 pthread_cond_t cond;
50 pthread_mutex_t lock;
51 } work;
52 bool should_quit;
53 uint64_t next_work_item_id;
54};
55
56namespace {
72365501
JR
57/*
58 * A work item is composed of a dynamic array of sub-items which
59 * represent a flattened, and augmented, version of a trigger's actions.
60 *
61 * We cannot rely solely on the trigger's actions since each action can have an
62 * execution context we need to comply with.
63 *
64 * The notion of execution context is required since for some actions the
65 * associated object are referenced by name and not by id. This can lead to
66 * a number of ambiguities when executing an action work item.
67 *
68 * For example, let's take a simple trigger such as:
69 * - condition: ust event a
70 * - action: start session S
71 *
72 * At time T, session S exists.
73 * At T + 1, the event A is hit.
74 * At T + 2, the tracer event notification is received and the work item is
75 * queued. Here session S have an id of 1.
76 * At T + 3, the session S is destroyed and a new session S is created, with a
77 * resulting id of 200.
78 * At T +4, the work item is popped from the queue and begin execution and will
79 * start session S with an id of 200 instead of the session S id 1 that was
80 * present at the queuing phase.
81 *
82 * The context to be respected is the one when the work item is queued. If the
83 * execution context is not the same at the moment of execution, we skip the
84 * execution of that sub-item.
85 *
86 * It is the same policy in regards to the validity of the associated
87 * trigger object at the moment of execution, if the trigger is found to be
88 * unregistered, the execution is skipped.
89 */
f2b3ef9f
JG
90struct action_work_item {
91 uint64_t id;
72365501
JR
92
93 /*
94 * The actions to be executed with their respective execution context.
95 * See struct `action_work_subitem`.
96 */
be65f802 97 struct lttng_dynamic_array subitems;
72365501
JR
98
99 /* Execution context data */
f2b3ef9f
JG
100 struct lttng_trigger *trigger;
101 struct lttng_evaluation *evaluation;
102 struct notification_client_list *client_list;
103 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
104 struct cds_list_head list_node;
105};
106
72365501
JR
107struct action_work_subitem {
108 struct lttng_action *action;
109 struct {
110 /* Used by actions targeting a session. */
111 LTTNG_OPTIONAL(uint64_t) session_id;
112 } context;
113};
f1494934 114} /* namespace */
72365501 115
f2b3ef9f
JG
116/*
117 * Only return non-zero on a fatal error that should shut down the action
118 * executor.
119 */
e665dfbc
JG
120using action_executor_handler = int (*)(struct action_executor *,
121 const struct action_work_item *,
122 struct action_work_subitem *);
f2b3ef9f
JG
123
124static int action_executor_notify_handler(struct action_executor *executor,
28ab034a
JG
125 const struct action_work_item *,
126 struct action_work_subitem *);
127static int action_executor_start_session_handler(struct action_executor *executor,
128 const struct action_work_item *,
129 struct action_work_subitem *);
130static int action_executor_stop_session_handler(struct action_executor *executor,
131 const struct action_work_item *,
132 struct action_work_subitem *);
133static int action_executor_rotate_session_handler(struct action_executor *executor,
134 const struct action_work_item *,
135 struct action_work_subitem *);
136static int action_executor_snapshot_session_handler(struct action_executor *executor,
137 const struct action_work_item *,
138 struct action_work_subitem *);
7c2fae7c 139static int action_executor_list_handler(struct action_executor *executor,
28ab034a
JG
140 const struct action_work_item *,
141 struct action_work_subitem *);
f2b3ef9f 142static int action_executor_generic_handler(struct action_executor *executor,
28ab034a
JG
143 const struct action_work_item *,
144 struct action_work_subitem *);
f2b3ef9f
JG
145
146static const action_executor_handler action_executors[] = {
28ab034a
JG
147 action_executor_notify_handler, action_executor_start_session_handler,
148 action_executor_stop_session_handler, action_executor_rotate_session_handler,
149 action_executor_snapshot_session_handler, action_executor_list_handler,
f2b3ef9f
JG
150};
151
72365501
JR
152/* Forward declaration */
153static int add_action_to_subitem_array(struct lttng_action *action,
28ab034a 154 struct lttng_dynamic_array *subitems);
72365501
JR
155
156static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
28ab034a 157 struct lttng_dynamic_array *subitems);
72365501
JR
158
159static void action_work_subitem_destructor(void *element)
160{
7966af57 161 struct action_work_subitem *subitem = (action_work_subitem *) element;
72365501
JR
162
163 lttng_action_put(subitem->action);
164}
165
f2b3ef9f
JG
166static const char *get_action_name(const struct lttng_action *action)
167{
0e43bcbf
JG
168 const enum lttng_action_type action_type = lttng_action_get_type(action);
169
a0377dfe 170 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
0e43bcbf 171
c0e2990d 172 return lttng_action_type_string(action_type);
f2b3ef9f
JG
173}
174
175/* Check if this trigger allowed to interect with a given session. */
176static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
28ab034a 177 struct ltt_session *session)
f2b3ef9f
JG
178{
179 bool is_allowed = false;
180 const struct lttng_credentials session_creds = {
ff588497
JR
181 .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
182 .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
f2b3ef9f
JG
183 };
184 /* Can never be NULL. */
28ab034a 185 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
f2b3ef9f 186
ff588497 187 is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) ||
28ab034a 188 (lttng_credentials_get_uid(trigger_creds) == 0);
f2b3ef9f 189 if (!is_allowed) {
ff588497 190 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
28ab034a
JG
191 session->name,
192 (long int) session->uid,
193 (long int) session->gid,
194 (long int) lttng_credentials_get_uid(trigger_creds));
f2b3ef9f
JG
195 }
196
197 return is_allowed;
198}
199
34f87583
JR
200static const char *get_trigger_name(const struct lttng_trigger *trigger)
201{
202 const char *trigger_name;
203 enum lttng_trigger_status trigger_status;
204
205 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
0efb2ad7
JG
206 switch (trigger_status) {
207 case LTTNG_TRIGGER_STATUS_OK:
208 break;
209 case LTTNG_TRIGGER_STATUS_UNSET:
210 trigger_name = "(anonymous)";
211 break;
212 default:
213 trigger_name = "(failed to get name)";
214 break;
215 }
34f87583
JR
216
217 return trigger_name;
218}
219
28ab034a
JG
220static int client_handle_transmission_status(struct notification_client *client,
221 enum client_transmission_status status,
222 void *user_data)
f2b3ef9f
JG
223{
224 int ret = 0;
7966af57 225 struct action_executor *executor = (action_executor *) user_data;
f2b3ef9f
JG
226 bool update_communication = true;
227
f2b3ef9f
JG
228 switch (status) {
229 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
230 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
28ab034a 231 client->id);
9016dbfc
JG
232 /*
233 * There is no need to wake the (e)poll thread. If it was waiting for
234 * "out" events on the client's socket, it will see that no payload
235 * in queued and will unsubscribe from that event.
236 *
237 * In the other cases, we have to wake the the (e)poll thread to either
238 * handle the error on the client or to get it to monitor the client "out"
239 * events.
240 */
f2b3ef9f
JG
241 update_communication = false;
242 break;
243 case CLIENT_TRANSMISSION_STATUS_QUEUED:
244 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
28ab034a 245 client->id);
f2b3ef9f
JG
246 break;
247 case CLIENT_TRANSMISSION_STATUS_FAIL:
248 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
28ab034a 249 client->id);
f2b3ef9f
JG
250 break;
251 default:
252 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
28ab034a 253 client->id);
f2b3ef9f
JG
254 ret = -1;
255 goto end;
256 }
257
258 if (!update_communication) {
259 goto end;
260 }
261
6c24d3fd 262 /* Safe to read client's id without locking as it is immutable. */
f2b3ef9f 263 ret = notification_thread_client_communication_update(
28ab034a 264 executor->notification_thread_handle, client->id, status);
f2b3ef9f
JG
265end:
266 return ret;
267}
268
269static int action_executor_notify_handler(struct action_executor *executor,
28ab034a
JG
270 const struct action_work_item *work_item,
271 struct action_work_subitem *item __attribute__((unused)))
f2b3ef9f 272{
28ab034a
JG
273 return notification_client_list_send_evaluation(
274 work_item->client_list,
275 work_item->trigger,
276 work_item->evaluation,
cd9adb8b 277 work_item->object_creds.is_set ? &(work_item->object_creds.value) : nullptr,
28ab034a
JG
278 client_handle_transmission_status,
279 executor);
f2b3ef9f
JG
280}
281
28ab034a
JG
282static int action_executor_start_session_handler(struct action_executor *executor
283 __attribute__((unused)),
284 const struct action_work_item *work_item,
285 struct action_work_subitem *item)
f2b3ef9f
JG
286{
287 int ret = 0;
288 const char *session_name;
289 enum lttng_action_status action_status;
290 struct ltt_session *session;
291 enum lttng_error_code cmd_ret;
72365501 292 struct lttng_action *action = item->action;
f2b3ef9f 293
56047f5a
JG
294 lttng::urcu::read_lock_guard read_lock;
295
28ab034a 296 action_status = lttng_action_start_session_get_session_name(action, &session_name);
f2b3ef9f 297 if (action_status != LTTNG_ACTION_STATUS_OK) {
28ab034a 298 ERR("Failed to get session name from `%s` action", get_action_name(action));
f2b3ef9f
JG
299 ret = -1;
300 goto end;
301 }
302
72365501
JR
303 /*
304 * Validate if at the moment of the action was queued the session
305 * existed. If not skip the action altogether.
306 */
307 if (!item->context.session_id.is_set) {
4ec6f5b6 308 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
28ab034a
JG
309 session_name,
310 get_action_name(action),
311 get_trigger_name(work_item->trigger));
72365501 312 lttng_action_increase_execution_failure_count(action);
72365501
JR
313 goto end;
314 }
315
f2b3ef9f 316 session_lock_list();
4a467a84 317 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
f2b3ef9f 318 if (!session) {
34f87583 319 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
28ab034a
JG
320 session_name,
321 get_action_name(action),
322 get_trigger_name(work_item->trigger));
4a467a84 323 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
324 goto error_unlock_list;
325 }
326
4a467a84
JR
327 session_lock(session);
328 if (session->destroyed) {
28ab034a
JG
329 DBG("Session `%s` with id = %" PRIu64
330 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
331 session->name,
332 session->id,
333 get_action_name(action),
334 get_trigger_name(work_item->trigger));
4a467a84 335 goto error_unlock_session;
72365501
JR
336 }
337
f2b3ef9f 338 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
635e0160 339 goto error_unlock_session;
f2b3ef9f
JG
340 }
341
7966af57 342 cmd_ret = (lttng_error_code) cmd_start_trace(session);
f2b3ef9f
JG
343 switch (cmd_ret) {
344 case LTTNG_OK:
34f87583 345 DBG("Successfully started session `%s` on behalf of trigger `%s`",
28ab034a
JG
346 session_name,
347 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
348 break;
349 case LTTNG_ERR_TRACE_ALREADY_STARTED:
34f87583 350 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
28ab034a
JG
351 session_name,
352 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
353 break;
354 default:
34f87583 355 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
28ab034a
JG
356 session_name,
357 get_trigger_name(work_item->trigger),
358 lttng_strerror(-cmd_ret));
2d57482c 359 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
360 break;
361 }
362
635e0160 363error_unlock_session:
f2b3ef9f
JG
364 session_unlock(session);
365 session_put(session);
366error_unlock_list:
367 session_unlock_list();
368end:
369 return ret;
370}
371
28ab034a
JG
372static int action_executor_stop_session_handler(struct action_executor *executor
373 __attribute__((unused)),
374 const struct action_work_item *work_item,
375 struct action_work_subitem *item)
f2b3ef9f
JG
376{
377 int ret = 0;
378 const char *session_name;
379 enum lttng_action_status action_status;
380 struct ltt_session *session;
381 enum lttng_error_code cmd_ret;
72365501 382 struct lttng_action *action = item->action;
f2b3ef9f 383
56047f5a
JG
384 lttng::urcu::read_lock_guard read_lock;
385
28ab034a 386 action_status = lttng_action_stop_session_get_session_name(action, &session_name);
f2b3ef9f 387 if (action_status != LTTNG_ACTION_STATUS_OK) {
28ab034a 388 ERR("Failed to get session name from `%s` action", get_action_name(action));
f2b3ef9f
JG
389 ret = -1;
390 goto end;
391 }
392
72365501
JR
393 /*
394 * Validate if, at the moment the action was queued, the target session
395 * existed. If not, skip the action altogether.
396 */
397 if (!item->context.session_id.is_set) {
4ec6f5b6 398 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
28ab034a
JG
399 session_name,
400 get_action_name(action),
401 get_trigger_name(work_item->trigger));
72365501 402 lttng_action_increase_execution_failure_count(action);
72365501
JR
403 goto end;
404 }
405
f2b3ef9f 406 session_lock_list();
4a467a84 407 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
f2b3ef9f 408 if (!session) {
34f87583 409 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
28ab034a
JG
410 session_name,
411 get_action_name(action),
412 get_trigger_name(work_item->trigger));
2d57482c 413 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
414 goto error_unlock_list;
415 }
416
4a467a84
JR
417 session_lock(session);
418 if (session->destroyed) {
28ab034a
JG
419 DBG("Session `%s` with id = %" PRIu64
420 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
421 session->name,
422 session->id,
423 get_action_name(action),
424 get_trigger_name(work_item->trigger));
4a467a84 425 goto error_unlock_session;
72365501
JR
426 }
427
f2b3ef9f 428 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
635e0160 429 goto error_unlock_session;
f2b3ef9f
JG
430 }
431
7966af57 432 cmd_ret = (lttng_error_code) cmd_stop_trace(session);
f2b3ef9f
JG
433 switch (cmd_ret) {
434 case LTTNG_OK:
34f87583 435 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
28ab034a
JG
436 session_name,
437 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
438 break;
439 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
34f87583 440 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
28ab034a
JG
441 session_name,
442 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
443 break;
444 default:
34f87583 445 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
28ab034a
JG
446 session_name,
447 get_trigger_name(work_item->trigger),
448 lttng_strerror(-cmd_ret));
2d57482c 449 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
450 break;
451 }
452
635e0160 453error_unlock_session:
f2b3ef9f
JG
454 session_unlock(session);
455 session_put(session);
456error_unlock_list:
457 session_unlock_list();
458end:
459 return ret;
460}
461
28ab034a
JG
462static int action_executor_rotate_session_handler(struct action_executor *executor
463 __attribute__((unused)),
464 const struct action_work_item *work_item,
465 struct action_work_subitem *item)
f2b3ef9f
JG
466{
467 int ret = 0;
468 const char *session_name;
469 enum lttng_action_status action_status;
470 struct ltt_session *session;
471 enum lttng_error_code cmd_ret;
72365501 472 struct lttng_action *action = item->action;
f2b3ef9f 473
56047f5a
JG
474 lttng::urcu::read_lock_guard read_lock;
475
28ab034a 476 action_status = lttng_action_rotate_session_get_session_name(action, &session_name);
f2b3ef9f 477 if (action_status != LTTNG_ACTION_STATUS_OK) {
28ab034a 478 ERR("Failed to get session name from `%s` action", get_action_name(action));
f2b3ef9f
JG
479 ret = -1;
480 goto end;
481 }
482
72365501
JR
483 /*
484 * Validate if, at the moment the action was queued, the target session
485 * existed. If not, skip the action altogether.
486 */
487 if (!item->context.session_id.is_set) {
4ec6f5b6 488 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
28ab034a
JG
489 session_name,
490 get_action_name(action),
491 get_trigger_name(work_item->trigger));
72365501 492 lttng_action_increase_execution_failure_count(action);
72365501
JR
493 goto end;
494 }
495
f2b3ef9f 496 session_lock_list();
4a467a84 497 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
f2b3ef9f 498 if (!session) {
34f87583 499 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
28ab034a
JG
500 session_name,
501 get_action_name(action),
502 get_trigger_name(work_item->trigger));
2d57482c 503 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
504 goto error_unlock_list;
505 }
506
4a467a84
JR
507 session_lock(session);
508 if (session->destroyed) {
28ab034a
JG
509 DBG("Session `%s` with id = %" PRIu64
510 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
511 session->name,
512 session->id,
513 get_action_name(action),
514 get_trigger_name(work_item->trigger));
4a467a84 515 goto error_unlock_session;
72365501
JR
516 }
517
f2b3ef9f 518 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
635e0160 519 goto error_unlock_session;
f2b3ef9f
JG
520 }
521
28ab034a 522 cmd_ret = (lttng_error_code) cmd_rotate_session(
cd9adb8b 523 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
f2b3ef9f
JG
524 switch (cmd_ret) {
525 case LTTNG_OK:
34f87583 526 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
28ab034a
JG
527 session_name,
528 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
529 break;
530 case LTTNG_ERR_ROTATION_PENDING:
34f87583 531 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
28ab034a
JG
532 session_name,
533 get_trigger_name(work_item->trigger));
2d57482c 534 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
535 break;
536 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
537 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
34f87583 538 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
28ab034a
JG
539 session_name,
540 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
541 break;
542 default:
34f87583 543 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
28ab034a
JG
544 session_name,
545 get_trigger_name(work_item->trigger),
546 lttng_strerror(-cmd_ret));
2d57482c 547 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
548 break;
549 }
550
635e0160 551error_unlock_session:
f2b3ef9f
JG
552 session_unlock(session);
553 session_put(session);
554error_unlock_list:
555 session_unlock_list();
556end:
557 return ret;
558}
559
28ab034a
JG
560static int action_executor_snapshot_session_handler(struct action_executor *executor
561 __attribute__((unused)),
562 const struct action_work_item *work_item,
563 struct action_work_subitem *item)
f2b3ef9f
JG
564{
565 int ret = 0;
566 const char *session_name;
567 enum lttng_action_status action_status;
568 struct ltt_session *session;
7966af57 569 lttng_snapshot_output default_snapshot_output;
28ab034a 570 const struct lttng_snapshot_output *snapshot_output = &default_snapshot_output;
f2b3ef9f 571 enum lttng_error_code cmd_ret;
72365501
JR
572 struct lttng_action *action = item->action;
573
7966af57
SM
574 default_snapshot_output.max_size = UINT64_MAX;
575
56047f5a
JG
576 lttng::urcu::read_lock_guard read_lock;
577
72365501
JR
578 /*
579 * Validate if, at the moment the action was queued, the target session
580 * existed. If not, skip the action altogether.
581 */
582 if (!item->context.session_id.is_set) {
4ec6f5b6 583 DBG("Session was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
28ab034a
JG
584 get_action_name(action),
585 get_trigger_name(work_item->trigger));
72365501 586 lttng_action_increase_execution_failure_count(action);
72365501
JR
587 goto end;
588 }
f2b3ef9f 589
28ab034a 590 action_status = lttng_action_snapshot_session_get_session_name(action, &session_name);
f2b3ef9f 591 if (action_status != LTTNG_ACTION_STATUS_OK) {
28ab034a 592 ERR("Failed to get session name from `%s` action", get_action_name(action));
f2b3ef9f
JG
593 ret = -1;
594 goto end;
595 }
596
28ab034a
JG
597 action_status = lttng_action_snapshot_session_get_output(action, &snapshot_output);
598 if (action_status != LTTNG_ACTION_STATUS_OK && action_status != LTTNG_ACTION_STATUS_UNSET) {
599 ERR("Failed to get output from `%s` action", get_action_name(action));
f2b3ef9f
JG
600 ret = -1;
601 goto end;
602 }
603
604 session_lock_list();
4a467a84 605 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
f2b3ef9f 606 if (!session) {
ca46af4e 607 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
28ab034a
JG
608 session_name,
609 get_action_name(action),
610 get_trigger_name(work_item->trigger));
2d57482c 611 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
612 goto error_unlock_list;
613 }
614
4a467a84
JR
615 session_lock(session);
616 if (session->destroyed) {
28ab034a
JG
617 DBG("Session `%s` with id = %" PRIu64
618 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
619 session->name,
620 session->id,
621 get_action_name(action),
622 get_trigger_name(work_item->trigger));
4a467a84 623 goto error_unlock_session;
72365501 624 }
f2b3ef9f 625
f2b3ef9f 626 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
635e0160 627 goto error_unlock_session;
f2b3ef9f
JG
628 }
629
7966af57 630 cmd_ret = (lttng_error_code) cmd_snapshot_record(session, snapshot_output, 0);
f2b3ef9f
JG
631 switch (cmd_ret) {
632 case LTTNG_OK:
34f87583 633 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
28ab034a
JG
634 session_name,
635 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
636 break;
637 default:
34f87583 638 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
28ab034a
JG
639 session_name,
640 get_trigger_name(work_item->trigger),
641 lttng_strerror(-cmd_ret));
2d57482c 642 lttng_action_increase_execution_failure_count(action);
f2b3ef9f
JG
643 break;
644 }
645
635e0160 646error_unlock_session:
f2b3ef9f
JG
647 session_unlock(session);
648 session_put(session);
649error_unlock_list:
650 session_unlock_list();
651end:
652 return ret;
653}
654
28ab034a
JG
655static int action_executor_list_handler(struct action_executor *executor __attribute__((unused)),
656 const struct action_work_item *work_item
657 __attribute__((unused)),
658 struct action_work_subitem *item __attribute__((unused)))
f2b3ef9f 659{
7c2fae7c 660 ERR("Execution of a list action by the action executor should never occur");
72365501 661 abort();
f2b3ef9f
JG
662}
663
664static int action_executor_generic_handler(struct action_executor *executor,
28ab034a
JG
665 const struct action_work_item *work_item,
666 struct action_work_subitem *item)
f2b3ef9f 667{
2d57482c 668 int ret;
72365501 669 struct lttng_action *action = item->action;
0e43bcbf
JG
670 const enum lttng_action_type action_type = lttng_action_get_type(action);
671
a0377dfe 672 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
0e43bcbf 673
2d57482c
JR
674 lttng_action_increase_execution_request_count(action);
675 if (!lttng_action_should_execute(action)) {
676 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64,
28ab034a
JG
677 get_action_name(action),
678 get_trigger_name(work_item->trigger),
679 work_item->id);
2d57482c
JR
680 ret = 0;
681 goto end;
682 }
683
684 lttng_action_increase_execution_count(action);
2516f2d8 685 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64,
28ab034a
JG
686 get_action_name(action),
687 get_trigger_name(work_item->trigger),
688 work_item->id);
72365501 689 ret = action_executors[action_type](executor, work_item, item);
2d57482c
JR
690end:
691 return ret;
f2b3ef9f
JG
692}
693
694static int action_work_item_execute(struct action_executor *executor,
28ab034a 695 struct action_work_item *work_item)
f2b3ef9f
JG
696{
697 int ret;
72365501 698 size_t count, i;
f2b3ef9f 699
34f87583 700 DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
28ab034a
JG
701 work_item->id,
702 get_trigger_name(work_item->trigger));
72365501 703
be65f802 704 count = lttng_dynamic_array_get_count(&work_item->subitems);
72365501
JR
705 for (i = 0; i < count; i++) {
706 struct action_work_subitem *item;
707
28ab034a
JG
708 item = (action_work_subitem *) lttng_dynamic_array_get_element(&work_item->subitems,
709 i);
710 ret = action_executor_generic_handler(executor, work_item, item);
72365501
JR
711 if (ret) {
712 goto end;
713 }
714 }
715end:
34f87583 716 DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
28ab034a
JG
717 work_item->id,
718 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
719 return ret;
720}
721
722static void action_work_item_destroy(struct action_work_item *work_item)
723{
724 lttng_trigger_put(work_item->trigger);
725 lttng_evaluation_destroy(work_item->evaluation);
726 notification_client_list_put(work_item->client_list);
be65f802 727 lttng_dynamic_array_reset(&work_item->subitems);
f2b3ef9f
JG
728 free(work_item);
729}
730
731static void *action_executor_thread(void *_data)
732{
7966af57 733 struct action_executor *executor = (action_executor *) _data;
f2b3ef9f 734
a0377dfe 735 LTTNG_ASSERT(executor);
f2b3ef9f 736
28ab034a 737 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
f2b3ef9f
JG
738
739 rcu_register_thread();
740 rcu_thread_online();
741
742 DBG("Entering work execution loop");
743 pthread_mutex_lock(&executor->work.lock);
744 while (!executor->should_quit) {
da247508 745 int ret = 0;
f2b3ef9f
JG
746 struct action_work_item *work_item;
747
748 health_code_update();
749 if (executor->work.pending_count == 0) {
750 health_poll_entry();
751 DBG("No work items enqueued, entering wait");
28ab034a 752 pthread_cond_wait(&executor->work.cond, &executor->work.lock);
f2b3ef9f
JG
753 DBG("Woke-up from wait");
754 health_poll_exit();
755 continue;
756 }
757
0db0f8e0 758 /* Pop item from front of the list with work lock held. */
28ab034a
JG
759 work_item = cds_list_first_entry(
760 &executor->work.list, struct action_work_item, list_node);
f2b3ef9f
JG
761 cds_list_del(&work_item->list_node);
762 executor->work.pending_count--;
763
764 /*
765 * Work can be performed without holding the work lock,
766 * allowing new items to be queued.
767 */
768 pthread_mutex_unlock(&executor->work.lock);
d2a28b27
JR
769
770 /* Execute item only if a trigger is registered. */
771 lttng_trigger_lock(work_item->trigger);
772 if (!lttng_trigger_is_registered(work_item->trigger)) {
cd9adb8b 773 const char *trigger_name = nullptr;
d2a28b27
JR
774 uid_t trigger_owner_uid;
775 enum lttng_trigger_status trigger_status;
776
0efb2ad7 777 trigger_name = get_trigger_name(work_item->trigger);
d2a28b27 778
28ab034a
JG
779 trigger_status =
780 lttng_trigger_get_owner_uid(work_item->trigger, &trigger_owner_uid);
a0377dfe 781 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
d2a28b27 782
28ab034a
JG
783 DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64
784 ", trigger name = `%s`, trigger owner uid = %d",
785 work_item->id,
786 trigger_name,
787 (int) trigger_owner_uid);
d2a28b27
JR
788 ret = 0;
789 goto skip_execute;
790 }
791
f2b3ef9f 792 ret = action_work_item_execute(executor, work_item);
d2a28b27
JR
793
794 skip_execute:
795 lttng_trigger_unlock(work_item->trigger);
f2b3ef9f
JG
796 action_work_item_destroy(work_item);
797 if (ret) {
798 /* Fatal error. */
799 break;
800 }
801
802 health_code_update();
803 pthread_mutex_lock(&executor->work.lock);
804 }
805
f5f5c54d
JG
806 if (executor->should_quit) {
807 pthread_mutex_unlock(&executor->work.lock);
808 }
f2b3ef9f
JG
809 DBG("Left work execution loop");
810
811 health_code_update();
812
813 rcu_thread_offline();
814 rcu_unregister_thread();
412d7227 815 health_unregister(the_health_sessiond);
f2b3ef9f 816
cd9adb8b 817 return nullptr;
f2b3ef9f
JG
818}
819
820static bool shutdown_action_executor_thread(void *_data)
821{
7966af57 822 struct action_executor *executor = (action_executor *) _data;
f2b3ef9f 823
8db3acaf 824 pthread_mutex_lock(&executor->work.lock);
f2b3ef9f
JG
825 executor->should_quit = true;
826 pthread_cond_signal(&executor->work.cond);
8db3acaf 827 pthread_mutex_unlock(&executor->work.lock);
f2b3ef9f
JG
828 return true;
829}
830
831static void clean_up_action_executor_thread(void *_data)
832{
7966af57 833 struct action_executor *executor = (action_executor *) _data;
f2b3ef9f 834
a0377dfe 835 LTTNG_ASSERT(cds_list_empty(&executor->work.list));
f2b3ef9f
JG
836
837 pthread_mutex_destroy(&executor->work.lock);
838 pthread_cond_destroy(&executor->work.cond);
839 free(executor);
840}
841
28ab034a 842struct action_executor *action_executor_create(struct notification_thread_handle *handle)
f2b3ef9f 843{
64803277 844 struct action_executor *executor = zmalloc<action_executor>();
f2b3ef9f
JG
845
846 if (!executor) {
847 goto end;
848 }
849
850 CDS_INIT_LIST_HEAD(&executor->work.list);
cd9adb8b
JG
851 pthread_cond_init(&executor->work.cond, nullptr);
852 pthread_mutex_init(&executor->work.lock, nullptr);
f2b3ef9f
JG
853 executor->notification_thread_handle = handle;
854
855 executor->thread = lttng_thread_create(THREAD_NAME,
28ab034a
JG
856 action_executor_thread,
857 shutdown_action_executor_thread,
858 clean_up_action_executor_thread,
859 executor);
f2b3ef9f
JG
860end:
861 return executor;
862}
863
864void action_executor_destroy(struct action_executor *executor)
865{
866 struct action_work_item *work_item, *tmp;
867
868 /* TODO Wait for work list to drain? */
869 lttng_thread_shutdown(executor->thread);
870 pthread_mutex_lock(&executor->work.lock);
871 if (executor->work.pending_count != 0) {
872 WARN("%" PRIu64
28ab034a
JG
873 " trigger action%s still queued for execution and will be discarded",
874 executor->work.pending_count,
875 executor->work.pending_count == 1 ? " is" : "s are");
f2b3ef9f
JG
876 }
877
28ab034a
JG
878 cds_list_for_each_entry_safe (work_item, tmp, &executor->work.list, list_node) {
879 WARN("Discarding action work item %" PRIu64 " associated to trigger `%s`",
880 work_item->id,
881 get_trigger_name(work_item->trigger));
f2b3ef9f
JG
882 cds_list_del(&work_item->list_node);
883 action_work_item_destroy(work_item);
884 }
885 pthread_mutex_unlock(&executor->work.lock);
886 lttng_thread_put(executor->thread);
887}
888
889/* RCU read-lock must be held by the caller. */
28ab034a
JG
890enum action_executor_status
891action_executor_enqueue_trigger(struct action_executor *executor,
892 struct lttng_trigger *trigger,
893 struct lttng_evaluation *evaluation,
894 const struct lttng_credentials *object_creds,
895 struct notification_client_list *client_list)
f2b3ef9f 896{
72365501 897 int ret;
f2b3ef9f
JG
898 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
899 const uint64_t work_item_id = executor->next_work_item_id++;
900 struct action_work_item *work_item;
901 bool signal = false;
72365501 902
a0377dfe 903 LTTNG_ASSERT(trigger);
48b7cdc2 904 ASSERT_RCU_READ_LOCKED();
72365501 905
f2b3ef9f
JG
906 pthread_mutex_lock(&executor->work.lock);
907 /* Check for queue overflow. */
908 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
909 /* Most likely spammy, remove if it is the case. */
72365501 910 DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
28ab034a
JG
911 get_trigger_name(trigger),
912 work_item_id);
f2b3ef9f
JG
913 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
914 goto error_unlock;
915 }
916
64803277 917 work_item = zmalloc<action_work_item>();
f2b3ef9f 918 if (!work_item) {
4ec6f5b6 919 PERROR("Failed to allocate action executor work item: trigger name = `%s`",
28ab034a 920 get_trigger_name(trigger));
f2b3ef9f
JG
921 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
922 goto error_unlock;
923 }
924
925 lttng_trigger_get(trigger);
926 if (client_list) {
28ab034a 927 const bool reference_acquired = notification_client_list_get(client_list);
f2b3ef9f 928
a0377dfe 929 LTTNG_ASSERT(reference_acquired);
f2b3ef9f
JG
930 }
931
7966af57
SM
932 work_item->id = work_item_id;
933 work_item->trigger = trigger;
f2b3ef9f 934
7966af57
SM
935 /* Ownership transferred to the work item. */
936 work_item->evaluation = evaluation;
cd9adb8b 937 evaluation = nullptr;
be65f802 938
7966af57
SM
939 work_item->client_list = client_list;
940 work_item->object_creds.is_set = !!object_creds;
941 if (object_creds) {
942 work_item->object_creds.value = *object_creds;
943 }
944
945 CDS_INIT_LIST_HEAD(&work_item->list_node);
946
be65f802
JG
947 /* Build the array of action work subitems for the passed trigger. */
948 lttng_dynamic_array_init(&work_item->subitems,
28ab034a
JG
949 sizeof(struct action_work_subitem),
950 action_work_subitem_destructor);
be65f802 951
28ab034a 952 ret = populate_subitem_array_from_trigger(trigger, &work_item->subitems);
be65f802
JG
953 if (ret) {
954 ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
28ab034a 955 get_trigger_name(trigger));
be65f802
JG
956 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
957 goto error_unlock;
958 }
959
f2b3ef9f
JG
960 cds_list_add_tail(&work_item->list_node, &executor->work.list);
961 executor->work.pending_count++;
72365501 962 DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
28ab034a
JG
963 get_trigger_name(trigger),
964 work_item_id);
f2b3ef9f
JG
965 signal = true;
966
967error_unlock:
f2b3ef9f
JG
968 if (signal) {
969 pthread_cond_signal(&executor->work.cond);
970 }
971
be65f802 972 pthread_mutex_unlock(&executor->work.lock);
f2b3ef9f
JG
973 lttng_evaluation_destroy(evaluation);
974 return executor_status;
975}
72365501
JR
976
977static int add_action_to_subitem_array(struct lttng_action *action,
28ab034a 978 struct lttng_dynamic_array *subitems)
72365501 979{
da247508 980 int ret = 0;
72365501 981 enum lttng_action_type type = lttng_action_get_type(action);
cd9adb8b 982 const char *session_name = nullptr;
72365501
JR
983 enum lttng_action_status status;
984 struct action_work_subitem subitem = {
cd9adb8b 985 .action = nullptr,
72365501
JR
986 .context = {
987 .session_id = LTTNG_OPTIONAL_INIT_UNSET,
988 },
989 };
990
a0377dfe
FD
991 LTTNG_ASSERT(action);
992 LTTNG_ASSERT(subitems);
72365501 993
7c2fae7c 994 if (type == LTTNG_ACTION_TYPE_LIST) {
b17ed2ad 995 for (auto inner_action : lttng::ctl::action_list_view(action)) {
a0377dfe 996 LTTNG_ASSERT(inner_action);
2460203a 997
28ab034a 998 ret = add_action_to_subitem_array(inner_action, subitems);
72365501
JR
999 if (ret) {
1000 goto end;
1001 }
1002 }
1003
1004 /*
1005 * Go directly to the end since there is no need to add the
7c2fae7c 1006 * list action by itself to the subitems array.
72365501
JR
1007 */
1008 goto end;
1009 }
1010
1011 /* Gather execution context. */
1012 switch (type) {
1013 case LTTNG_ACTION_TYPE_NOTIFY:
1014 break;
1015 case LTTNG_ACTION_TYPE_START_SESSION:
28ab034a 1016 status = lttng_action_start_session_get_session_name(action, &session_name);
a0377dfe 1017 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
72365501
JR
1018 break;
1019 case LTTNG_ACTION_TYPE_STOP_SESSION:
28ab034a 1020 status = lttng_action_stop_session_get_session_name(action, &session_name);
a0377dfe 1021 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
72365501
JR
1022 break;
1023 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
28ab034a 1024 status = lttng_action_rotate_session_get_session_name(action, &session_name);
a0377dfe 1025 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
72365501
JR
1026 break;
1027 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
28ab034a 1028 status = lttng_action_snapshot_session_get_session_name(action, &session_name);
a0377dfe 1029 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
72365501 1030 break;
7c2fae7c 1031 case LTTNG_ACTION_TYPE_LIST:
72365501
JR
1032 case LTTNG_ACTION_TYPE_UNKNOWN:
1033 /* Fallthrough */
1034 default:
1035 abort();
1036 break;
1037 }
1038
1039 /*
1040 * Fetch the session execution context info as needed.
1041 * Note that we could decide to not add an action for which we know the
1042 * execution will not happen (i.e no session exists for that name). For
1043 * now we leave the decision to skip to the action executor for sake of
1044 * simplicity and consistency.
1045 */
cd9adb8b 1046 if (session_name != nullptr) {
e1bbf989 1047 uint64_t session_id;
72365501 1048
e1bbf989
JR
1049 /*
1050 * Instantaneous sampling of the session id if present.
1051 *
1052 * This method is preferred over `sessiond_find_by_name` then
1053 * fetching the session'd id since `sessiond_find_by_name`
1054 * requires the session list lock to be taken.
1055 *
1056 * Taking the session list lock can lead to a deadlock
1057 * between the action executor and the notification thread
1058 * (caller of add_action_to_subitem_array). It is okay if the
1059 * session state changes between the enqueuing time and the
1060 * execution time. The execution context is validated at
1061 * execution time.
1062 */
1063 if (sample_session_id_by_name(session_name, &session_id)) {
28ab034a 1064 LTTNG_OPTIONAL_SET(&subitem.context.session_id, session_id);
72365501 1065 }
72365501
JR
1066 }
1067
1068 /* Get a reference to the action. */
1069 lttng_action_get(action);
1070 subitem.action = action;
1071
1072 ret = lttng_dynamic_array_add_element(subitems, &subitem);
1073 if (ret) {
1074 ERR("Failed to add work subitem to the subitem array");
1075 lttng_action_put(action);
1076 ret = -1;
1077 goto end;
1078 }
1079
1080end:
1081 return ret;
1082}
1083
1084static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
28ab034a 1085 struct lttng_dynamic_array *subitems)
72365501
JR
1086{
1087 struct lttng_action *action;
1088
1089 action = lttng_trigger_get_action(trigger);
a0377dfe 1090 LTTNG_ASSERT(action);
72365501
JR
1091
1092 return add_action_to_subitem_array(action, subitems);
1093}
This page took 0.127476 seconds and 4 git commands to generate.