clang-tidy: apply suggested fixes
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.cpp
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.hpp"
9 #include "cmd.hpp"
10 #include "health-sessiond.hpp"
11 #include "lttng-sessiond.hpp"
12 #include "notification-thread-internal.hpp"
13 #include "session.hpp"
14 #include "thread.hpp"
15
16 #include <common/dynamic-array.hpp>
17 #include <common/macros.hpp>
18 #include <common/optional.hpp>
19 #include <common/urcu.hpp>
20
21 #include <lttng/action/action-internal.hpp>
22 #include <lttng/action/list-internal.hpp>
23 #include <lttng/action/list.h>
24 #include <lttng/action/notify-internal.hpp>
25 #include <lttng/action/notify.h>
26 #include <lttng/action/rotate-session.h>
27 #include <lttng/action/snapshot-session.h>
28 #include <lttng/action/start-session.h>
29 #include <lttng/action/stop-session.h>
30 #include <lttng/condition/evaluation.h>
31 #include <lttng/condition/event-rule-matches-internal.hpp>
32 #include <lttng/lttng-error.h>
33 #include <lttng/trigger/trigger-internal.hpp>
34
35 #include <pthread.h>
36 #include <stdbool.h>
37 #include <stddef.h>
38 #include <urcu/list.h>
39
40 #define THREAD_NAME "Action Executor"
41 #define MAX_QUEUED_WORK_COUNT 8192
42
43 struct action_executor {
44 struct lttng_thread *thread;
45 struct notification_thread_handle *notification_thread_handle;
46 struct {
47 uint64_t pending_count;
48 struct cds_list_head list;
49 pthread_cond_t cond;
50 pthread_mutex_t lock;
51 } work;
52 bool should_quit;
53 uint64_t next_work_item_id;
54 };
55
56 namespace {
57 /*
58 * A work item is composed of a dynamic array of sub-items which
59 * represent a flattened, and augmented, version of a trigger's actions.
60 *
61 * We cannot rely solely on the trigger's actions since each action can have an
62 * execution context we need to comply with.
63 *
64 * The notion of execution context is required since for some actions the
65 * associated object are referenced by name and not by id. This can lead to
66 * a number of ambiguities when executing an action work item.
67 *
68 * For example, let's take a simple trigger such as:
69 * - condition: ust event a
70 * - action: start session S
71 *
72 * At time T, session S exists.
73 * At T + 1, the event A is hit.
74 * At T + 2, the tracer event notification is received and the work item is
75 * queued. Here session S have an id of 1.
76 * At T + 3, the session S is destroyed and a new session S is created, with a
77 * resulting id of 200.
78 * At T +4, the work item is popped from the queue and begin execution and will
79 * start session S with an id of 200 instead of the session S id 1 that was
80 * present at the queuing phase.
81 *
82 * The context to be respected is the one when the work item is queued. If the
83 * execution context is not the same at the moment of execution, we skip the
84 * execution of that sub-item.
85 *
86 * It is the same policy in regards to the validity of the associated
87 * trigger object at the moment of execution, if the trigger is found to be
88 * unregistered, the execution is skipped.
89 */
90 struct action_work_item {
91 uint64_t id;
92
93 /*
94 * The actions to be executed with their respective execution context.
95 * See struct `action_work_subitem`.
96 */
97 struct lttng_dynamic_array subitems;
98
99 /* Execution context data */
100 struct lttng_trigger *trigger;
101 struct lttng_evaluation *evaluation;
102 struct notification_client_list *client_list;
103 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
104 struct cds_list_head list_node;
105 };
106
107 struct action_work_subitem {
108 struct lttng_action *action;
109 struct {
110 /* Used by actions targeting a session. */
111 LTTNG_OPTIONAL(uint64_t) session_id;
112 } context;
113 };
114 } /* namespace */
115
116 /*
117 * Only return non-zero on a fatal error that should shut down the action
118 * executor.
119 */
120 using action_executor_handler = int (*)(struct action_executor *,
121 const struct action_work_item *,
122 struct action_work_subitem *);
123
124 static int action_executor_notify_handler(struct action_executor *executor,
125 const struct action_work_item *,
126 struct action_work_subitem *);
127 static int action_executor_start_session_handler(struct action_executor *executor,
128 const struct action_work_item *,
129 struct action_work_subitem *);
130 static int action_executor_stop_session_handler(struct action_executor *executor,
131 const struct action_work_item *,
132 struct action_work_subitem *);
133 static int action_executor_rotate_session_handler(struct action_executor *executor,
134 const struct action_work_item *,
135 struct action_work_subitem *);
136 static int action_executor_snapshot_session_handler(struct action_executor *executor,
137 const struct action_work_item *,
138 struct action_work_subitem *);
139 static int action_executor_list_handler(struct action_executor *executor,
140 const struct action_work_item *,
141 struct action_work_subitem *);
142 static int action_executor_generic_handler(struct action_executor *executor,
143 const struct action_work_item *,
144 struct action_work_subitem *);
145
146 static const action_executor_handler action_executors[] = {
147 action_executor_notify_handler, action_executor_start_session_handler,
148 action_executor_stop_session_handler, action_executor_rotate_session_handler,
149 action_executor_snapshot_session_handler, action_executor_list_handler,
150 };
151
152 /* Forward declaration */
153 static int add_action_to_subitem_array(struct lttng_action *action,
154 struct lttng_dynamic_array *subitems);
155
156 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
157 struct lttng_dynamic_array *subitems);
158
159 static void action_work_subitem_destructor(void *element)
160 {
161 struct action_work_subitem *subitem = (action_work_subitem *) element;
162
163 lttng_action_put(subitem->action);
164 }
165
166 static const char *get_action_name(const struct lttng_action *action)
167 {
168 const enum lttng_action_type action_type = lttng_action_get_type(action);
169
170 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
171
172 return lttng_action_type_string(action_type);
173 }
174
175 /* Check if this trigger allowed to interect with a given session. */
176 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
177 const ltt_session::locked_ref& session)
178 {
179 bool is_allowed = false;
180 const struct lttng_credentials session_creds = {
181 .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
182 .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
183 };
184 /* Can never be NULL. */
185 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
186
187 is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) ||
188 (lttng_credentials_get_uid(trigger_creds) == 0);
189 if (!is_allowed) {
190 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
191 session->name,
192 (long int) session->uid,
193 (long int) session->gid,
194 (long int) lttng_credentials_get_uid(trigger_creds));
195 }
196
197 return is_allowed;
198 }
199
200 static const char *get_trigger_name(const struct lttng_trigger *trigger)
201 {
202 const char *trigger_name;
203 enum lttng_trigger_status trigger_status;
204
205 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
206 switch (trigger_status) {
207 case LTTNG_TRIGGER_STATUS_OK:
208 break;
209 case LTTNG_TRIGGER_STATUS_UNSET:
210 trigger_name = "(anonymous)";
211 break;
212 default:
213 trigger_name = "(failed to get name)";
214 break;
215 }
216
217 return trigger_name;
218 }
219
220 static int client_handle_transmission_status(struct notification_client *client,
221 enum client_transmission_status status,
222 void *user_data)
223 {
224 int ret = 0;
225 struct action_executor *executor = (action_executor *) user_data;
226 bool update_communication = true;
227
228 switch (status) {
229 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
230 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
231 client->id);
232 /*
233 * There is no need to wake the (e)poll thread. If it was waiting for
234 * "out" events on the client's socket, it will see that no payload
235 * in queued and will unsubscribe from that event.
236 *
237 * In the other cases, we have to wake the the (e)poll thread to either
238 * handle the error on the client or to get it to monitor the client "out"
239 * events.
240 */
241 update_communication = false;
242 break;
243 case CLIENT_TRANSMISSION_STATUS_QUEUED:
244 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
245 client->id);
246 break;
247 case CLIENT_TRANSMISSION_STATUS_FAIL:
248 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
249 client->id);
250 break;
251 default:
252 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
253 client->id);
254 ret = -1;
255 goto end;
256 }
257
258 if (!update_communication) {
259 goto end;
260 }
261
262 /* Safe to read client's id without locking as it is immutable. */
263 ret = notification_thread_client_communication_update(
264 executor->notification_thread_handle, client->id, status);
265 end:
266 return ret;
267 }
268
269 static int action_executor_notify_handler(struct action_executor *executor,
270 const struct action_work_item *work_item,
271 struct action_work_subitem *item __attribute__((unused)))
272 {
273 return notification_client_list_send_evaluation(
274 work_item->client_list,
275 work_item->trigger,
276 work_item->evaluation,
277 work_item->object_creds.is_set ? &(work_item->object_creds.value) : nullptr,
278 client_handle_transmission_status,
279 executor);
280 }
281
282 static int action_executor_start_session_handler(struct action_executor *executor
283 __attribute__((unused)),
284 const struct action_work_item *work_item,
285 struct action_work_subitem *item)
286 {
287 const char *session_name;
288 enum lttng_action_status action_status;
289 enum lttng_error_code cmd_ret;
290 struct lttng_action *action = item->action;
291
292 const lttng::urcu::read_lock_guard read_lock;
293
294 action_status = lttng_action_start_session_get_session_name(action, &session_name);
295 if (action_status != LTTNG_ACTION_STATUS_OK) {
296 ERR("Failed to get session name from `%s` action", get_action_name(action));
297 return -1;
298 }
299
300 /*
301 * Validate if at the moment of the action was queued the session
302 * existed. If not skip the action altogether.
303 */
304 if (!item->context.session_id.is_set) {
305 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
306 session_name,
307 get_action_name(action),
308 get_trigger_name(work_item->trigger));
309 lttng_action_increase_execution_failure_count(action);
310 return 0;
311 }
312
313 /*
314 * Mind the order of the declaration of list_lock vs session:
315 * the session list lock must always be released _after_ the release of
316 * a session's reference (the destruction of a ref/locked_ref) to ensure
317 * since the reference's release may unpublish the session from the list of
318 * sessions.
319 */
320 const auto list_lock = lttng::sessiond::lock_session_list();
321 try {
322 const auto session = ltt_session::find_locked_session(
323 LTTNG_OPTIONAL_GET(item->context.session_id));
324
325 if (session->destroyed) {
326 DBG("Session `%s` with id = %" PRIu64
327 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
328 session->name,
329 session->id,
330 get_action_name(action),
331 get_trigger_name(work_item->trigger));
332 return 0;
333 }
334
335 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
336 return 0;
337 }
338
339 cmd_ret = (lttng_error_code) cmd_start_trace(session);
340 switch (cmd_ret) {
341 case LTTNG_OK:
342 DBG("Successfully started session `%s` on behalf of trigger `%s`",
343 session_name,
344 get_trigger_name(work_item->trigger));
345 break;
346 case LTTNG_ERR_TRACE_ALREADY_STARTED:
347 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
348 session_name,
349 get_trigger_name(work_item->trigger));
350 break;
351 default:
352 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
353 session_name,
354 get_trigger_name(work_item->trigger),
355 lttng_strerror(-cmd_ret));
356 lttng_action_increase_execution_failure_count(action);
357 break;
358 }
359 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
360 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
361 ex.what(),
362 session_name,
363 get_action_name(action),
364 get_trigger_name(work_item->trigger),
365 ex.source_location);
366 lttng_action_increase_execution_failure_count(action);
367 }
368
369 return 0;
370 }
371
372 static int action_executor_stop_session_handler(struct action_executor *executor
373 __attribute__((unused)),
374 const struct action_work_item *work_item,
375 struct action_work_subitem *item)
376 {
377 const char *session_name;
378 enum lttng_action_status action_status;
379 enum lttng_error_code cmd_ret;
380 struct lttng_action *action = item->action;
381
382 const lttng::urcu::read_lock_guard read_lock;
383
384 action_status = lttng_action_stop_session_get_session_name(action, &session_name);
385 if (action_status != LTTNG_ACTION_STATUS_OK) {
386 ERR("Failed to get session name from `%s` action", get_action_name(action));
387 return -1;
388 }
389
390 /*
391 * Validate if, at the moment the action was queued, the target session
392 * existed. If not, skip the action altogether.
393 */
394 if (!item->context.session_id.is_set) {
395 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
396 session_name,
397 get_action_name(action),
398 get_trigger_name(work_item->trigger));
399 lttng_action_increase_execution_failure_count(action);
400 return 0;
401 }
402
403 /*
404 * Mind the order of the declaration of list_lock vs session:
405 * the session list lock must always be released _after_ the release of
406 * a session's reference (the destruction of a ref/locked_ref) to ensure
407 * since the reference's release may unpublish the session from the list of
408 * sessions.
409 */
410 const auto list_lock = lttng::sessiond::lock_session_list();
411
412 try {
413 const auto session = ltt_session::find_locked_session(
414 LTTNG_OPTIONAL_GET(item->context.session_id));
415
416 if (session->destroyed) {
417 DBG("Session `%s` with id = %" PRIu64
418 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
419 session->name,
420 session->id,
421 get_action_name(action),
422 get_trigger_name(work_item->trigger));
423 return 0;
424 }
425
426 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
427 return 0;
428 }
429
430 cmd_ret = (lttng_error_code) cmd_stop_trace(session);
431 switch (cmd_ret) {
432 case LTTNG_OK:
433 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
434 session_name,
435 get_trigger_name(work_item->trigger));
436 break;
437 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
438 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
439 session_name,
440 get_trigger_name(work_item->trigger));
441 break;
442 default:
443 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
444 session_name,
445 get_trigger_name(work_item->trigger),
446 lttng_strerror(-cmd_ret));
447 lttng_action_increase_execution_failure_count(action);
448 break;
449 }
450 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
451 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
452 ex.what(),
453 session_name,
454 get_action_name(action),
455 get_trigger_name(work_item->trigger),
456 ex.source_location);
457 lttng_action_increase_execution_failure_count(action);
458 }
459
460 return 0;
461 }
462
463 static int action_executor_rotate_session_handler(struct action_executor *executor
464 __attribute__((unused)),
465 const struct action_work_item *work_item,
466 struct action_work_subitem *item)
467 {
468 const char *session_name;
469 enum lttng_action_status action_status;
470 enum lttng_error_code cmd_ret;
471 struct lttng_action *action = item->action;
472
473 const lttng::urcu::read_lock_guard read_lock;
474
475 action_status = lttng_action_rotate_session_get_session_name(action, &session_name);
476 if (action_status != LTTNG_ACTION_STATUS_OK) {
477 ERR("Failed to get session name from `%s` action", get_action_name(action));
478 return -1;
479 }
480
481 /*
482 * Validate if, at the moment the action was queued, the target session
483 * existed. If not, skip the action altogether.
484 */
485 if (!item->context.session_id.is_set) {
486 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
487 session_name,
488 get_action_name(action),
489 get_trigger_name(work_item->trigger));
490 lttng_action_increase_execution_failure_count(action);
491 return 0;
492 }
493
494 /*
495 * Mind the order of the declaration of list_lock vs session:
496 * the session list lock must always be released _after_ the release of
497 * a session's reference (the destruction of a ref/locked_ref) to ensure
498 * since the reference's release may unpublish the session from the list of
499 * sessions.
500 */
501 const auto list_lock = lttng::sessiond::lock_session_list();
502 try {
503 const auto session = ltt_session::find_locked_session(
504 LTTNG_OPTIONAL_GET(item->context.session_id));
505
506 if (session->destroyed) {
507 DBG("Session `%s` with id = %" PRIu64
508 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
509 session->name,
510 session->id,
511 get_action_name(action),
512 get_trigger_name(work_item->trigger));
513 return 0;
514 }
515
516 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
517 return 0;
518 }
519
520 cmd_ret = (lttng_error_code) cmd_rotate_session(
521 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
522 switch (cmd_ret) {
523 case LTTNG_OK:
524 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
525 session_name,
526 get_trigger_name(work_item->trigger));
527 break;
528 case LTTNG_ERR_ROTATION_PENDING:
529 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
530 session_name,
531 get_trigger_name(work_item->trigger));
532 lttng_action_increase_execution_failure_count(action);
533 break;
534 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
535 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
536 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
537 session_name,
538 get_trigger_name(work_item->trigger));
539 break;
540 default:
541 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
542 session_name,
543 get_trigger_name(work_item->trigger),
544 lttng_strerror(-cmd_ret));
545 lttng_action_increase_execution_failure_count(action);
546 break;
547 }
548 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
549 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
550 ex.what(),
551 session_name,
552 get_action_name(action),
553 get_trigger_name(work_item->trigger),
554 ex.source_location);
555 lttng_action_increase_execution_failure_count(action);
556 }
557
558 return 0;
559 }
560
561 static int action_executor_snapshot_session_handler(struct action_executor *executor
562 __attribute__((unused)),
563 const struct action_work_item *work_item,
564 struct action_work_subitem *item)
565 {
566 const char *session_name;
567 enum lttng_action_status action_status;
568 lttng_snapshot_output default_snapshot_output;
569 const struct lttng_snapshot_output *snapshot_output = &default_snapshot_output;
570 enum lttng_error_code cmd_ret;
571 struct lttng_action *action = item->action;
572
573 default_snapshot_output.max_size = UINT64_MAX;
574
575 const lttng::urcu::read_lock_guard read_lock;
576
577 /*
578 * Validate if, at the moment the action was queued, the target session
579 * existed. If not, skip the action altogether.
580 */
581 if (!item->context.session_id.is_set) {
582 DBG("Session was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
583 get_action_name(action),
584 get_trigger_name(work_item->trigger));
585 lttng_action_increase_execution_failure_count(action);
586 return 0;
587 }
588
589 action_status = lttng_action_snapshot_session_get_session_name(action, &session_name);
590 if (action_status != LTTNG_ACTION_STATUS_OK) {
591 ERR("Failed to get session name from `%s` action", get_action_name(action));
592 return -1;
593 }
594
595 action_status = lttng_action_snapshot_session_get_output(action, &snapshot_output);
596 if (action_status != LTTNG_ACTION_STATUS_OK && action_status != LTTNG_ACTION_STATUS_UNSET) {
597 ERR("Failed to get output from `%s` action", get_action_name(action));
598 return -1;
599 }
600
601 /*
602 * Mind the order of the declaration of list_lock vs session:
603 * the session list lock must always be released _after_ the release of
604 * a session's reference (the destruction of a ref/locked_ref) to ensure
605 * since the reference's release may unpublish the session from the list of
606 * sessions.
607 */
608 const auto list_lock = lttng::sessiond::lock_session_list();
609 try {
610 const auto session = ltt_session::find_locked_session(
611 LTTNG_OPTIONAL_GET(item->context.session_id));
612
613 if (session->destroyed) {
614 DBG("Session `%s` with id = %" PRIu64
615 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
616 session->name,
617 session->id,
618 get_action_name(action),
619 get_trigger_name(work_item->trigger));
620 return 0;
621 }
622
623 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
624 return 0;
625 }
626
627 cmd_ret = (lttng_error_code) cmd_snapshot_record(session, snapshot_output, 0);
628 switch (cmd_ret) {
629 case LTTNG_OK:
630 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
631 session_name,
632 get_trigger_name(work_item->trigger));
633 break;
634 default:
635 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
636 session_name,
637 get_trigger_name(work_item->trigger),
638 lttng_strerror(-cmd_ret));
639 lttng_action_increase_execution_failure_count(action);
640 break;
641 }
642
643 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
644 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
645 ex.what(),
646 session_name,
647 get_action_name(action),
648 get_trigger_name(work_item->trigger),
649 ex.source_location);
650 lttng_action_increase_execution_failure_count(action);
651 }
652
653 return 0;
654 }
655
656 static int action_executor_list_handler(struct action_executor *executor __attribute__((unused)),
657 const struct action_work_item *work_item
658 __attribute__((unused)),
659 struct action_work_subitem *item __attribute__((unused)))
660 {
661 ERR("Execution of a list action by the action executor should never occur");
662 abort();
663 }
664
665 static int action_executor_generic_handler(struct action_executor *executor,
666 const struct action_work_item *work_item,
667 struct action_work_subitem *item)
668 {
669 int ret;
670 struct lttng_action *action = item->action;
671 const enum lttng_action_type action_type = lttng_action_get_type(action);
672
673 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
674
675 lttng_action_increase_execution_request_count(action);
676 if (!lttng_action_should_execute(action)) {
677 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64,
678 get_action_name(action),
679 get_trigger_name(work_item->trigger),
680 work_item->id);
681 ret = 0;
682 goto end;
683 }
684
685 lttng_action_increase_execution_count(action);
686 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64,
687 get_action_name(action),
688 get_trigger_name(work_item->trigger),
689 work_item->id);
690 ret = action_executors[action_type](executor, work_item, item);
691 end:
692 return ret;
693 }
694
695 static int action_work_item_execute(struct action_executor *executor,
696 struct action_work_item *work_item)
697 {
698 int ret;
699 size_t count, i;
700
701 DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
702 work_item->id,
703 get_trigger_name(work_item->trigger));
704
705 count = lttng_dynamic_array_get_count(&work_item->subitems);
706 for (i = 0; i < count; i++) {
707 struct action_work_subitem *item;
708
709 item = (action_work_subitem *) lttng_dynamic_array_get_element(&work_item->subitems,
710 i);
711 ret = action_executor_generic_handler(executor, work_item, item);
712 if (ret) {
713 goto end;
714 }
715 }
716 end:
717 DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
718 work_item->id,
719 get_trigger_name(work_item->trigger));
720 return ret;
721 }
722
723 static void action_work_item_destroy(struct action_work_item *work_item)
724 {
725 lttng_trigger_put(work_item->trigger);
726 lttng_evaluation_destroy(work_item->evaluation);
727 notification_client_list_put(work_item->client_list);
728 lttng_dynamic_array_reset(&work_item->subitems);
729 free(work_item);
730 }
731
732 static void *action_executor_thread(void *_data)
733 {
734 struct action_executor *executor = (action_executor *) _data;
735
736 LTTNG_ASSERT(executor);
737
738 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
739
740 rcu_register_thread();
741 rcu_thread_online();
742
743 DBG("Entering work execution loop");
744 pthread_mutex_lock(&executor->work.lock);
745 while (!executor->should_quit) {
746 int ret = 0;
747 struct action_work_item *work_item;
748
749 health_code_update();
750 if (executor->work.pending_count == 0) {
751 health_poll_entry();
752 DBG("No work items enqueued, entering wait");
753 pthread_cond_wait(&executor->work.cond, &executor->work.lock);
754 DBG("Woke-up from wait");
755 health_poll_exit();
756 continue;
757 }
758
759 /* Pop item from front of the list with work lock held. */
760 work_item = cds_list_first_entry(
761 &executor->work.list, struct action_work_item, list_node);
762 cds_list_del(&work_item->list_node);
763 executor->work.pending_count--;
764
765 /*
766 * Work can be performed without holding the work lock,
767 * allowing new items to be queued.
768 */
769 pthread_mutex_unlock(&executor->work.lock);
770
771 /* Execute item only if a trigger is registered. */
772 lttng_trigger_lock(work_item->trigger);
773 if (!lttng_trigger_is_registered(work_item->trigger)) {
774 const char *trigger_name = nullptr;
775 uid_t trigger_owner_uid;
776 enum lttng_trigger_status trigger_status;
777
778 trigger_name = get_trigger_name(work_item->trigger);
779
780 trigger_status =
781 lttng_trigger_get_owner_uid(work_item->trigger, &trigger_owner_uid);
782 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
783
784 DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64
785 ", trigger name = `%s`, trigger owner uid = %d",
786 work_item->id,
787 trigger_name,
788 (int) trigger_owner_uid);
789 ret = 0;
790 goto skip_execute;
791 }
792
793 ret = action_work_item_execute(executor, work_item);
794
795 skip_execute:
796 lttng_trigger_unlock(work_item->trigger);
797 action_work_item_destroy(work_item);
798 if (ret) {
799 /* Fatal error. */
800 break;
801 }
802
803 health_code_update();
804 pthread_mutex_lock(&executor->work.lock);
805 }
806
807 if (executor->should_quit) {
808 pthread_mutex_unlock(&executor->work.lock);
809 }
810 DBG("Left work execution loop");
811
812 health_code_update();
813
814 rcu_thread_offline();
815 rcu_unregister_thread();
816 health_unregister(the_health_sessiond);
817
818 return nullptr;
819 }
820
821 static bool shutdown_action_executor_thread(void *_data)
822 {
823 struct action_executor *executor = (action_executor *) _data;
824
825 pthread_mutex_lock(&executor->work.lock);
826 executor->should_quit = true;
827 pthread_cond_signal(&executor->work.cond);
828 pthread_mutex_unlock(&executor->work.lock);
829 return true;
830 }
831
832 static void clean_up_action_executor_thread(void *_data)
833 {
834 struct action_executor *executor = (action_executor *) _data;
835
836 LTTNG_ASSERT(cds_list_empty(&executor->work.list));
837
838 pthread_mutex_destroy(&executor->work.lock);
839 pthread_cond_destroy(&executor->work.cond);
840 free(executor);
841 }
842
843 struct action_executor *action_executor_create(struct notification_thread_handle *handle)
844 {
845 struct action_executor *executor = zmalloc<action_executor>();
846
847 if (!executor) {
848 goto end;
849 }
850
851 CDS_INIT_LIST_HEAD(&executor->work.list);
852 pthread_cond_init(&executor->work.cond, nullptr);
853 pthread_mutex_init(&executor->work.lock, nullptr);
854 executor->notification_thread_handle = handle;
855
856 executor->thread = lttng_thread_create(THREAD_NAME,
857 action_executor_thread,
858 shutdown_action_executor_thread,
859 clean_up_action_executor_thread,
860 executor);
861 end:
862 return executor;
863 }
864
865 void action_executor_destroy(struct action_executor *executor)
866 {
867 struct action_work_item *work_item, *tmp;
868
869 /* TODO Wait for work list to drain? */
870 lttng_thread_shutdown(executor->thread);
871 pthread_mutex_lock(&executor->work.lock);
872 if (executor->work.pending_count != 0) {
873 WARN("%" PRIu64
874 " trigger action%s still queued for execution and will be discarded",
875 executor->work.pending_count,
876 executor->work.pending_count == 1 ? " is" : "s are");
877 }
878
879 cds_list_for_each_entry_safe (work_item, tmp, &executor->work.list, list_node) {
880 WARN("Discarding action work item %" PRIu64 " associated to trigger `%s`",
881 work_item->id,
882 get_trigger_name(work_item->trigger));
883 cds_list_del(&work_item->list_node);
884 action_work_item_destroy(work_item);
885 }
886 pthread_mutex_unlock(&executor->work.lock);
887 lttng_thread_put(executor->thread);
888 }
889
890 /* RCU read-lock must be held by the caller. */
891 enum action_executor_status
892 action_executor_enqueue_trigger(struct action_executor *executor,
893 struct lttng_trigger *trigger,
894 struct lttng_evaluation *evaluation,
895 const struct lttng_credentials *object_creds,
896 struct notification_client_list *client_list)
897 {
898 int ret;
899 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
900 const uint64_t work_item_id = executor->next_work_item_id++;
901 struct action_work_item *work_item;
902 bool signal = false;
903
904 LTTNG_ASSERT(trigger);
905 ASSERT_RCU_READ_LOCKED();
906
907 pthread_mutex_lock(&executor->work.lock);
908 /* Check for queue overflow. */
909 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
910 /* Most likely spammy, remove if it is the case. */
911 DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
912 get_trigger_name(trigger),
913 work_item_id);
914 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
915 goto error_unlock;
916 }
917
918 work_item = zmalloc<action_work_item>();
919 if (!work_item) {
920 PERROR("Failed to allocate action executor work item: trigger name = `%s`",
921 get_trigger_name(trigger));
922 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
923 goto error_unlock;
924 }
925
926 lttng_trigger_get(trigger);
927 if (client_list) {
928 const bool reference_acquired = notification_client_list_get(client_list);
929
930 LTTNG_ASSERT(reference_acquired);
931 }
932
933 work_item->id = work_item_id;
934 work_item->trigger = trigger;
935
936 /* Ownership transferred to the work item. */
937 work_item->evaluation = evaluation;
938 evaluation = nullptr;
939
940 work_item->client_list = client_list;
941 work_item->object_creds.is_set = !!object_creds;
942 if (object_creds) {
943 work_item->object_creds.value = *object_creds;
944 }
945
946 CDS_INIT_LIST_HEAD(&work_item->list_node);
947
948 /* Build the array of action work subitems for the passed trigger. */
949 lttng_dynamic_array_init(&work_item->subitems,
950 sizeof(struct action_work_subitem),
951 action_work_subitem_destructor);
952
953 ret = populate_subitem_array_from_trigger(trigger, &work_item->subitems);
954 if (ret) {
955 ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
956 get_trigger_name(trigger));
957 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
958 goto error_unlock;
959 }
960
961 cds_list_add_tail(&work_item->list_node, &executor->work.list);
962 executor->work.pending_count++;
963 DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
964 get_trigger_name(trigger),
965 work_item_id);
966 signal = true;
967
968 error_unlock:
969 if (signal) {
970 pthread_cond_signal(&executor->work.cond);
971 }
972
973 pthread_mutex_unlock(&executor->work.lock);
974 lttng_evaluation_destroy(evaluation);
975 return executor_status;
976 }
977
978 static int add_action_to_subitem_array(struct lttng_action *action,
979 struct lttng_dynamic_array *subitems)
980 {
981 int ret = 0;
982 const lttng_action_type type = lttng_action_get_type(action);
983 const char *session_name = nullptr;
984 enum lttng_action_status status;
985 struct action_work_subitem subitem = {
986 .action = nullptr,
987 .context = {
988 .session_id = LTTNG_OPTIONAL_INIT_UNSET,
989 },
990 };
991
992 LTTNG_ASSERT(action);
993 LTTNG_ASSERT(subitems);
994
995 if (type == LTTNG_ACTION_TYPE_LIST) {
996 for (auto inner_action : lttng::ctl::action_list_view(action)) {
997 LTTNG_ASSERT(inner_action);
998
999 ret = add_action_to_subitem_array(inner_action, subitems);
1000 if (ret) {
1001 goto end;
1002 }
1003 }
1004
1005 /*
1006 * Go directly to the end since there is no need to add the
1007 * list action by itself to the subitems array.
1008 */
1009 goto end;
1010 }
1011
1012 /* Gather execution context. */
1013 switch (type) {
1014 case LTTNG_ACTION_TYPE_NOTIFY:
1015 break;
1016 case LTTNG_ACTION_TYPE_START_SESSION:
1017 status = lttng_action_start_session_get_session_name(action, &session_name);
1018 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1019 break;
1020 case LTTNG_ACTION_TYPE_STOP_SESSION:
1021 status = lttng_action_stop_session_get_session_name(action, &session_name);
1022 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1023 break;
1024 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
1025 status = lttng_action_rotate_session_get_session_name(action, &session_name);
1026 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1027 break;
1028 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
1029 status = lttng_action_snapshot_session_get_session_name(action, &session_name);
1030 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1031 break;
1032 case LTTNG_ACTION_TYPE_LIST:
1033 case LTTNG_ACTION_TYPE_UNKNOWN:
1034 /* Fallthrough */
1035 default:
1036 abort();
1037 break;
1038 }
1039
1040 /*
1041 * Fetch the session execution context info as needed.
1042 * Note that we could decide to not add an action for which we know the
1043 * execution will not happen (i.e no session exists for that name). For
1044 * now we leave the decision to skip to the action executor for sake of
1045 * simplicity and consistency.
1046 */
1047 if (session_name != nullptr) {
1048 uint64_t session_id;
1049
1050 /*
1051 * Instantaneous sampling of the session id if present.
1052 *
1053 * This method is preferred over `sessiond_find_by_name` then
1054 * fetching the session'd id since `sessiond_find_by_name`
1055 * requires the session list lock to be taken.
1056 *
1057 * Taking the session list lock can lead to a deadlock
1058 * between the action executor and the notification thread
1059 * (caller of add_action_to_subitem_array). It is okay if the
1060 * session state changes between the enqueuing time and the
1061 * execution time. The execution context is validated at
1062 * execution time.
1063 */
1064 if (sample_session_id_by_name(session_name, &session_id)) {
1065 LTTNG_OPTIONAL_SET(&subitem.context.session_id, session_id);
1066 }
1067 }
1068
1069 /* Get a reference to the action. */
1070 lttng_action_get(action);
1071 subitem.action = action;
1072
1073 ret = lttng_dynamic_array_add_element(subitems, &subitem);
1074 if (ret) {
1075 ERR("Failed to add work subitem to the subitem array");
1076 lttng_action_put(action);
1077 ret = -1;
1078 goto end;
1079 }
1080
1081 end:
1082 return ret;
1083 }
1084
1085 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
1086 struct lttng_dynamic_array *subitems)
1087 {
1088 struct lttng_action *action;
1089
1090 action = lttng_trigger_get_action(trigger);
1091 LTTNG_ASSERT(action);
1092
1093 return add_action_to_subitem_array(action, subitems);
1094 }
This page took 0.052963 seconds and 4 git commands to generate.