e3eded0faf3df57460131ae2604efe25c04ad863
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.cpp
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.hpp"
9 #include "cmd.hpp"
10 #include "health-sessiond.hpp"
11 #include "lttng-sessiond.hpp"
12 #include "notification-thread-internal.hpp"
13 #include "session.hpp"
14 #include "thread.hpp"
15
16 #include <common/dynamic-array.hpp>
17 #include <common/macros.hpp>
18 #include <common/optional.hpp>
19 #include <common/urcu.hpp>
20
21 #include <lttng/action/action-internal.hpp>
22 #include <lttng/action/list-internal.hpp>
23 #include <lttng/action/list.h>
24 #include <lttng/action/notify-internal.hpp>
25 #include <lttng/action/notify.h>
26 #include <lttng/action/rotate-session.h>
27 #include <lttng/action/snapshot-session.h>
28 #include <lttng/action/start-session.h>
29 #include <lttng/action/stop-session.h>
30 #include <lttng/condition/evaluation.h>
31 #include <lttng/condition/event-rule-matches-internal.hpp>
32 #include <lttng/lttng-error.h>
33 #include <lttng/trigger/trigger-internal.hpp>
34
35 #include <pthread.h>
36 #include <stdbool.h>
37 #include <stddef.h>
38 #include <urcu/list.h>
39
40 #define THREAD_NAME "Action Executor"
41 #define MAX_QUEUED_WORK_COUNT 8192
42
43 struct action_executor {
44 struct lttng_thread *thread;
45 struct notification_thread_handle *notification_thread_handle;
46 struct {
47 uint64_t pending_count;
48 struct cds_list_head list;
49 pthread_cond_t cond;
50 pthread_mutex_t lock;
51 } work;
52 bool should_quit;
53 uint64_t next_work_item_id;
54 };
55
56 namespace {
57 /*
58 * A work item is composed of a dynamic array of sub-items which
59 * represent a flattened, and augmented, version of a trigger's actions.
60 *
61 * We cannot rely solely on the trigger's actions since each action can have an
62 * execution context we need to comply with.
63 *
64 * The notion of execution context is required since for some actions the
65 * associated object are referenced by name and not by id. This can lead to
66 * a number of ambiguities when executing an action work item.
67 *
68 * For example, let's take a simple trigger such as:
69 * - condition: ust event a
70 * - action: start session S
71 *
72 * At time T, session S exists.
73 * At T + 1, the event A is hit.
74 * At T + 2, the tracer event notification is received and the work item is
75 * queued. Here session S have an id of 1.
76 * At T + 3, the session S is destroyed and a new session S is created, with a
77 * resulting id of 200.
78 * At T +4, the work item is popped from the queue and begin execution and will
79 * start session S with an id of 200 instead of the session S id 1 that was
80 * present at the queuing phase.
81 *
82 * The context to be respected is the one when the work item is queued. If the
83 * execution context is not the same at the moment of execution, we skip the
84 * execution of that sub-item.
85 *
86 * It is the same policy in regards to the validity of the associated
87 * trigger object at the moment of execution, if the trigger is found to be
88 * unregistered, the execution is skipped.
89 */
90 struct action_work_item {
91 uint64_t id;
92
93 /*
94 * The actions to be executed with their respective execution context.
95 * See struct `action_work_subitem`.
96 */
97 struct lttng_dynamic_array subitems;
98
99 /* Execution context data */
100 struct lttng_trigger *trigger;
101 struct lttng_evaluation *evaluation;
102 struct notification_client_list *client_list;
103 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
104 struct cds_list_head list_node;
105 };
106
107 struct action_work_subitem {
108 struct lttng_action *action;
109 struct {
110 /* Used by actions targeting a session. */
111 LTTNG_OPTIONAL(uint64_t) session_id;
112 } context;
113 };
114 } /* namespace */
115
116 /*
117 * Only return non-zero on a fatal error that should shut down the action
118 * executor.
119 */
120 using action_executor_handler = int (*)(struct action_executor *,
121 const struct action_work_item *,
122 struct action_work_subitem *);
123
124 static int action_executor_notify_handler(struct action_executor *executor,
125 const struct action_work_item *,
126 struct action_work_subitem *);
127 static int action_executor_start_session_handler(struct action_executor *executor,
128 const struct action_work_item *,
129 struct action_work_subitem *);
130 static int action_executor_stop_session_handler(struct action_executor *executor,
131 const struct action_work_item *,
132 struct action_work_subitem *);
133 static int action_executor_rotate_session_handler(struct action_executor *executor,
134 const struct action_work_item *,
135 struct action_work_subitem *);
136 static int action_executor_snapshot_session_handler(struct action_executor *executor,
137 const struct action_work_item *,
138 struct action_work_subitem *);
139 static int action_executor_list_handler(struct action_executor *executor,
140 const struct action_work_item *,
141 struct action_work_subitem *);
142 static int action_executor_generic_handler(struct action_executor *executor,
143 const struct action_work_item *,
144 struct action_work_subitem *);
145
146 static const action_executor_handler action_executors[] = {
147 action_executor_notify_handler, action_executor_start_session_handler,
148 action_executor_stop_session_handler, action_executor_rotate_session_handler,
149 action_executor_snapshot_session_handler, action_executor_list_handler,
150 };
151
152 /* Forward declaration */
153 static int add_action_to_subitem_array(struct lttng_action *action,
154 struct lttng_dynamic_array *subitems);
155
156 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
157 struct lttng_dynamic_array *subitems);
158
159 static void action_work_subitem_destructor(void *element)
160 {
161 struct action_work_subitem *subitem = (action_work_subitem *) element;
162
163 lttng_action_put(subitem->action);
164 }
165
166 static const char *get_action_name(const struct lttng_action *action)
167 {
168 const enum lttng_action_type action_type = lttng_action_get_type(action);
169
170 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
171
172 return lttng_action_type_string(action_type);
173 }
174
175 /* Check if this trigger allowed to interect with a given session. */
176 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
177 struct ltt_session *session)
178 {
179 bool is_allowed = false;
180 const struct lttng_credentials session_creds = {
181 .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
182 .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
183 };
184 /* Can never be NULL. */
185 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
186
187 is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) ||
188 (lttng_credentials_get_uid(trigger_creds) == 0);
189 if (!is_allowed) {
190 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
191 session->name,
192 (long int) session->uid,
193 (long int) session->gid,
194 (long int) lttng_credentials_get_uid(trigger_creds));
195 }
196
197 return is_allowed;
198 }
199
200 static const char *get_trigger_name(const struct lttng_trigger *trigger)
201 {
202 const char *trigger_name;
203 enum lttng_trigger_status trigger_status;
204
205 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
206 switch (trigger_status) {
207 case LTTNG_TRIGGER_STATUS_OK:
208 break;
209 case LTTNG_TRIGGER_STATUS_UNSET:
210 trigger_name = "(anonymous)";
211 break;
212 default:
213 trigger_name = "(failed to get name)";
214 break;
215 }
216
217 return trigger_name;
218 }
219
220 static int client_handle_transmission_status(struct notification_client *client,
221 enum client_transmission_status status,
222 void *user_data)
223 {
224 int ret = 0;
225 struct action_executor *executor = (action_executor *) user_data;
226 bool update_communication = true;
227
228 switch (status) {
229 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
230 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
231 client->id);
232 /*
233 * There is no need to wake the (e)poll thread. If it was waiting for
234 * "out" events on the client's socket, it will see that no payload
235 * in queued and will unsubscribe from that event.
236 *
237 * In the other cases, we have to wake the the (e)poll thread to either
238 * handle the error on the client or to get it to monitor the client "out"
239 * events.
240 */
241 update_communication = false;
242 break;
243 case CLIENT_TRANSMISSION_STATUS_QUEUED:
244 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
245 client->id);
246 break;
247 case CLIENT_TRANSMISSION_STATUS_FAIL:
248 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
249 client->id);
250 break;
251 default:
252 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
253 client->id);
254 ret = -1;
255 goto end;
256 }
257
258 if (!update_communication) {
259 goto end;
260 }
261
262 /* Safe to read client's id without locking as it is immutable. */
263 ret = notification_thread_client_communication_update(
264 executor->notification_thread_handle, client->id, status);
265 end:
266 return ret;
267 }
268
269 static int action_executor_notify_handler(struct action_executor *executor,
270 const struct action_work_item *work_item,
271 struct action_work_subitem *item __attribute__((unused)))
272 {
273 return notification_client_list_send_evaluation(
274 work_item->client_list,
275 work_item->trigger,
276 work_item->evaluation,
277 work_item->object_creds.is_set ? &(work_item->object_creds.value) : nullptr,
278 client_handle_transmission_status,
279 executor);
280 }
281
282 static int action_executor_start_session_handler(struct action_executor *executor
283 __attribute__((unused)),
284 const struct action_work_item *work_item,
285 struct action_work_subitem *item)
286 {
287 const char *session_name;
288 enum lttng_action_status action_status;
289 enum lttng_error_code cmd_ret;
290 struct lttng_action *action = item->action;
291
292 lttng::urcu::read_lock_guard read_lock;
293
294 action_status = lttng_action_start_session_get_session_name(action, &session_name);
295 if (action_status != LTTNG_ACTION_STATUS_OK) {
296 ERR("Failed to get session name from `%s` action", get_action_name(action));
297 return -1;
298 }
299
300 /*
301 * Validate if at the moment of the action was queued the session
302 * existed. If not skip the action altogether.
303 */
304 if (!item->context.session_id.is_set) {
305 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
306 session_name,
307 get_action_name(action),
308 get_trigger_name(work_item->trigger));
309 lttng_action_increase_execution_failure_count(action);
310 return 0;
311 }
312
313 /*
314 * Mind the order of the declaration of list_lock vs target_session:
315 * the session list lock must always be released _after_ the release of
316 * a session's reference (the destruction of a ref/locked_ref) to ensure
317 * since the reference's release may unpublish the session from the list of
318 * sessions.
319 */
320 const auto list_lock = lttng::sessiond::lock_session_list();
321 ltt_session::locked_ref session;
322
323 try {
324 session = ltt_session::find_locked_session(
325 LTTNG_OPTIONAL_GET(item->context.session_id));
326 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
327 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
328 ex.what(),
329 session_name,
330 get_action_name(action),
331 get_trigger_name(work_item->trigger),
332 ex.source_location);
333 lttng_action_increase_execution_failure_count(action);
334 return 0;
335 }
336
337 if (session->destroyed) {
338 DBG("Session `%s` with id = %" PRIu64
339 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
340 session->name,
341 session->id,
342 get_action_name(action),
343 get_trigger_name(work_item->trigger));
344 return 0;
345 }
346
347 if (!is_trigger_allowed_for_session(work_item->trigger, session.get())) {
348 return 0;
349 }
350
351 cmd_ret = (lttng_error_code) cmd_start_trace(session.get());
352 switch (cmd_ret) {
353 case LTTNG_OK:
354 DBG("Successfully started session `%s` on behalf of trigger `%s`",
355 session_name,
356 get_trigger_name(work_item->trigger));
357 break;
358 case LTTNG_ERR_TRACE_ALREADY_STARTED:
359 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
360 session_name,
361 get_trigger_name(work_item->trigger));
362 break;
363 default:
364 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
365 session_name,
366 get_trigger_name(work_item->trigger),
367 lttng_strerror(-cmd_ret));
368 lttng_action_increase_execution_failure_count(action);
369 break;
370 }
371
372 return 0;
373 }
374
375 static int action_executor_stop_session_handler(struct action_executor *executor
376 __attribute__((unused)),
377 const struct action_work_item *work_item,
378 struct action_work_subitem *item)
379 {
380 const char *session_name;
381 enum lttng_action_status action_status;
382 enum lttng_error_code cmd_ret;
383 struct lttng_action *action = item->action;
384
385 lttng::urcu::read_lock_guard read_lock;
386
387 action_status = lttng_action_stop_session_get_session_name(action, &session_name);
388 if (action_status != LTTNG_ACTION_STATUS_OK) {
389 ERR("Failed to get session name from `%s` action", get_action_name(action));
390 return -1;
391 }
392
393 /*
394 * Validate if, at the moment the action was queued, the target session
395 * existed. If not, skip the action altogether.
396 */
397 if (!item->context.session_id.is_set) {
398 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
399 session_name,
400 get_action_name(action),
401 get_trigger_name(work_item->trigger));
402 lttng_action_increase_execution_failure_count(action);
403 return 0;
404 }
405
406 /*
407 * Mind the order of the declaration of list_lock vs target_session:
408 * the session list lock must always be released _after_ the release of
409 * a session's reference (the destruction of a ref/locked_ref) to ensure
410 * since the reference's release may unpublish the session from the list of
411 * sessions.
412 */
413 const auto list_lock = lttng::sessiond::lock_session_list();
414 ltt_session::locked_ref session;
415
416 try {
417 session = ltt_session::find_locked_session(
418 LTTNG_OPTIONAL_GET(item->context.session_id));
419 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
420 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
421 ex.what(),
422 session_name,
423 get_action_name(action),
424 get_trigger_name(work_item->trigger),
425 ex.source_location);
426 lttng_action_increase_execution_failure_count(action);
427 return 0;
428 }
429
430 if (session->destroyed) {
431 DBG("Session `%s` with id = %" PRIu64
432 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
433 session->name,
434 session->id,
435 get_action_name(action),
436 get_trigger_name(work_item->trigger));
437 return 0;
438 }
439
440 if (!is_trigger_allowed_for_session(work_item->trigger, session.get())) {
441 return 0;
442 }
443
444 cmd_ret = (lttng_error_code) cmd_stop_trace(session.get());
445 switch (cmd_ret) {
446 case LTTNG_OK:
447 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
448 session_name,
449 get_trigger_name(work_item->trigger));
450 break;
451 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
452 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
453 session_name,
454 get_trigger_name(work_item->trigger));
455 break;
456 default:
457 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
458 session_name,
459 get_trigger_name(work_item->trigger),
460 lttng_strerror(-cmd_ret));
461 lttng_action_increase_execution_failure_count(action);
462 break;
463 }
464
465 return 0;
466 }
467
468 static int action_executor_rotate_session_handler(struct action_executor *executor
469 __attribute__((unused)),
470 const struct action_work_item *work_item,
471 struct action_work_subitem *item)
472 {
473 const char *session_name;
474 enum lttng_action_status action_status;
475 enum lttng_error_code cmd_ret;
476 struct lttng_action *action = item->action;
477
478 lttng::urcu::read_lock_guard read_lock;
479
480 action_status = lttng_action_rotate_session_get_session_name(action, &session_name);
481 if (action_status != LTTNG_ACTION_STATUS_OK) {
482 ERR("Failed to get session name from `%s` action", get_action_name(action));
483 return -1;
484 }
485
486 /*
487 * Validate if, at the moment the action was queued, the target session
488 * existed. If not, skip the action altogether.
489 */
490 if (!item->context.session_id.is_set) {
491 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
492 session_name,
493 get_action_name(action),
494 get_trigger_name(work_item->trigger));
495 lttng_action_increase_execution_failure_count(action);
496 return 0;
497 }
498
499 /*
500 * Mind the order of the declaration of list_lock vs target_session:
501 * the session list lock must always be released _after_ the release of
502 * a session's reference (the destruction of a ref/locked_ref) to ensure
503 * since the reference's release may unpublish the session from the list of
504 * sessions.
505 */
506 const auto list_lock = lttng::sessiond::lock_session_list();
507 ltt_session::locked_ref session;
508
509 try {
510 session = ltt_session::find_locked_session(
511 LTTNG_OPTIONAL_GET(item->context.session_id));
512 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
513 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
514 ex.what(),
515 session_name,
516 get_action_name(action),
517 get_trigger_name(work_item->trigger),
518 ex.source_location);
519 lttng_action_increase_execution_failure_count(action);
520 return 0;
521 }
522
523 if (session->destroyed) {
524 DBG("Session `%s` with id = %" PRIu64
525 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
526 session->name,
527 session->id,
528 get_action_name(action),
529 get_trigger_name(work_item->trigger));
530 return 0;
531 }
532
533 if (!is_trigger_allowed_for_session(work_item->trigger, session.get())) {
534 return 0;
535 }
536
537 cmd_ret = (lttng_error_code) cmd_rotate_session(
538 session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
539 switch (cmd_ret) {
540 case LTTNG_OK:
541 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
542 session_name,
543 get_trigger_name(work_item->trigger));
544 break;
545 case LTTNG_ERR_ROTATION_PENDING:
546 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
547 session_name,
548 get_trigger_name(work_item->trigger));
549 lttng_action_increase_execution_failure_count(action);
550 break;
551 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
552 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
553 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
554 session_name,
555 get_trigger_name(work_item->trigger));
556 break;
557 default:
558 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
559 session_name,
560 get_trigger_name(work_item->trigger),
561 lttng_strerror(-cmd_ret));
562 lttng_action_increase_execution_failure_count(action);
563 break;
564 }
565
566 return 0;
567 }
568
569 static int action_executor_snapshot_session_handler(struct action_executor *executor
570 __attribute__((unused)),
571 const struct action_work_item *work_item,
572 struct action_work_subitem *item)
573 {
574 const char *session_name;
575 enum lttng_action_status action_status;
576 lttng_snapshot_output default_snapshot_output;
577 const struct lttng_snapshot_output *snapshot_output = &default_snapshot_output;
578 enum lttng_error_code cmd_ret;
579 struct lttng_action *action = item->action;
580
581 default_snapshot_output.max_size = UINT64_MAX;
582
583 lttng::urcu::read_lock_guard read_lock;
584
585 /*
586 * Validate if, at the moment the action was queued, the target session
587 * existed. If not, skip the action altogether.
588 */
589 if (!item->context.session_id.is_set) {
590 DBG("Session was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
591 get_action_name(action),
592 get_trigger_name(work_item->trigger));
593 lttng_action_increase_execution_failure_count(action);
594 return 0;
595 }
596
597 action_status = lttng_action_snapshot_session_get_session_name(action, &session_name);
598 if (action_status != LTTNG_ACTION_STATUS_OK) {
599 ERR("Failed to get session name from `%s` action", get_action_name(action));
600 return -1;
601 }
602
603 action_status = lttng_action_snapshot_session_get_output(action, &snapshot_output);
604 if (action_status != LTTNG_ACTION_STATUS_OK && action_status != LTTNG_ACTION_STATUS_UNSET) {
605 ERR("Failed to get output from `%s` action", get_action_name(action));
606 return -1;
607 }
608
609 /*
610 * Mind the order of the declaration of list_lock vs session:
611 * the session list lock must always be released _after_ the release of
612 * a session's reference (the destruction of a ref/locked_ref) to ensure
613 * since the reference's release may unpublish the session from the list of
614 * sessions.
615 */
616 const auto list_lock = lttng::sessiond::lock_session_list();
617 ltt_session::locked_ref session;
618
619 try {
620 session = ltt_session::find_locked_session(
621 LTTNG_OPTIONAL_GET(item->context.session_id));
622 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
623 DBG_FMT("Failed to execution trigger action: {}, action=`{}`, trigger_name=`{}`, location='{}'",
624 ex.what(),
625 session_name,
626 get_action_name(action),
627 get_trigger_name(work_item->trigger),
628 ex.source_location);
629 lttng_action_increase_execution_failure_count(action);
630 return 0;
631 }
632
633 if (session->destroyed) {
634 DBG("Session `%s` with id = %" PRIu64
635 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
636 session->name,
637 session->id,
638 get_action_name(action),
639 get_trigger_name(work_item->trigger));
640 return 0;
641 }
642
643 if (!is_trigger_allowed_for_session(work_item->trigger, session.get())) {
644 return 0;
645 }
646
647 cmd_ret = (lttng_error_code) cmd_snapshot_record(session.get(), snapshot_output, 0);
648 switch (cmd_ret) {
649 case LTTNG_OK:
650 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
651 session_name,
652 get_trigger_name(work_item->trigger));
653 break;
654 default:
655 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
656 session_name,
657 get_trigger_name(work_item->trigger),
658 lttng_strerror(-cmd_ret));
659 lttng_action_increase_execution_failure_count(action);
660 break;
661 }
662
663 return 0;
664 }
665
666 static int action_executor_list_handler(struct action_executor *executor __attribute__((unused)),
667 const struct action_work_item *work_item
668 __attribute__((unused)),
669 struct action_work_subitem *item __attribute__((unused)))
670 {
671 ERR("Execution of a list action by the action executor should never occur");
672 abort();
673 }
674
675 static int action_executor_generic_handler(struct action_executor *executor,
676 const struct action_work_item *work_item,
677 struct action_work_subitem *item)
678 {
679 int ret;
680 struct lttng_action *action = item->action;
681 const enum lttng_action_type action_type = lttng_action_get_type(action);
682
683 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
684
685 lttng_action_increase_execution_request_count(action);
686 if (!lttng_action_should_execute(action)) {
687 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64,
688 get_action_name(action),
689 get_trigger_name(work_item->trigger),
690 work_item->id);
691 ret = 0;
692 goto end;
693 }
694
695 lttng_action_increase_execution_count(action);
696 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64,
697 get_action_name(action),
698 get_trigger_name(work_item->trigger),
699 work_item->id);
700 ret = action_executors[action_type](executor, work_item, item);
701 end:
702 return ret;
703 }
704
705 static int action_work_item_execute(struct action_executor *executor,
706 struct action_work_item *work_item)
707 {
708 int ret;
709 size_t count, i;
710
711 DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
712 work_item->id,
713 get_trigger_name(work_item->trigger));
714
715 count = lttng_dynamic_array_get_count(&work_item->subitems);
716 for (i = 0; i < count; i++) {
717 struct action_work_subitem *item;
718
719 item = (action_work_subitem *) lttng_dynamic_array_get_element(&work_item->subitems,
720 i);
721 ret = action_executor_generic_handler(executor, work_item, item);
722 if (ret) {
723 goto end;
724 }
725 }
726 end:
727 DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
728 work_item->id,
729 get_trigger_name(work_item->trigger));
730 return ret;
731 }
732
733 static void action_work_item_destroy(struct action_work_item *work_item)
734 {
735 lttng_trigger_put(work_item->trigger);
736 lttng_evaluation_destroy(work_item->evaluation);
737 notification_client_list_put(work_item->client_list);
738 lttng_dynamic_array_reset(&work_item->subitems);
739 free(work_item);
740 }
741
742 static void *action_executor_thread(void *_data)
743 {
744 struct action_executor *executor = (action_executor *) _data;
745
746 LTTNG_ASSERT(executor);
747
748 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
749
750 rcu_register_thread();
751 rcu_thread_online();
752
753 DBG("Entering work execution loop");
754 pthread_mutex_lock(&executor->work.lock);
755 while (!executor->should_quit) {
756 int ret = 0;
757 struct action_work_item *work_item;
758
759 health_code_update();
760 if (executor->work.pending_count == 0) {
761 health_poll_entry();
762 DBG("No work items enqueued, entering wait");
763 pthread_cond_wait(&executor->work.cond, &executor->work.lock);
764 DBG("Woke-up from wait");
765 health_poll_exit();
766 continue;
767 }
768
769 /* Pop item from front of the list with work lock held. */
770 work_item = cds_list_first_entry(
771 &executor->work.list, struct action_work_item, list_node);
772 cds_list_del(&work_item->list_node);
773 executor->work.pending_count--;
774
775 /*
776 * Work can be performed without holding the work lock,
777 * allowing new items to be queued.
778 */
779 pthread_mutex_unlock(&executor->work.lock);
780
781 /* Execute item only if a trigger is registered. */
782 lttng_trigger_lock(work_item->trigger);
783 if (!lttng_trigger_is_registered(work_item->trigger)) {
784 const char *trigger_name = nullptr;
785 uid_t trigger_owner_uid;
786 enum lttng_trigger_status trigger_status;
787
788 trigger_name = get_trigger_name(work_item->trigger);
789
790 trigger_status =
791 lttng_trigger_get_owner_uid(work_item->trigger, &trigger_owner_uid);
792 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
793
794 DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64
795 ", trigger name = `%s`, trigger owner uid = %d",
796 work_item->id,
797 trigger_name,
798 (int) trigger_owner_uid);
799 ret = 0;
800 goto skip_execute;
801 }
802
803 ret = action_work_item_execute(executor, work_item);
804
805 skip_execute:
806 lttng_trigger_unlock(work_item->trigger);
807 action_work_item_destroy(work_item);
808 if (ret) {
809 /* Fatal error. */
810 break;
811 }
812
813 health_code_update();
814 pthread_mutex_lock(&executor->work.lock);
815 }
816
817 if (executor->should_quit) {
818 pthread_mutex_unlock(&executor->work.lock);
819 }
820 DBG("Left work execution loop");
821
822 health_code_update();
823
824 rcu_thread_offline();
825 rcu_unregister_thread();
826 health_unregister(the_health_sessiond);
827
828 return nullptr;
829 }
830
831 static bool shutdown_action_executor_thread(void *_data)
832 {
833 struct action_executor *executor = (action_executor *) _data;
834
835 pthread_mutex_lock(&executor->work.lock);
836 executor->should_quit = true;
837 pthread_cond_signal(&executor->work.cond);
838 pthread_mutex_unlock(&executor->work.lock);
839 return true;
840 }
841
842 static void clean_up_action_executor_thread(void *_data)
843 {
844 struct action_executor *executor = (action_executor *) _data;
845
846 LTTNG_ASSERT(cds_list_empty(&executor->work.list));
847
848 pthread_mutex_destroy(&executor->work.lock);
849 pthread_cond_destroy(&executor->work.cond);
850 free(executor);
851 }
852
853 struct action_executor *action_executor_create(struct notification_thread_handle *handle)
854 {
855 struct action_executor *executor = zmalloc<action_executor>();
856
857 if (!executor) {
858 goto end;
859 }
860
861 CDS_INIT_LIST_HEAD(&executor->work.list);
862 pthread_cond_init(&executor->work.cond, nullptr);
863 pthread_mutex_init(&executor->work.lock, nullptr);
864 executor->notification_thread_handle = handle;
865
866 executor->thread = lttng_thread_create(THREAD_NAME,
867 action_executor_thread,
868 shutdown_action_executor_thread,
869 clean_up_action_executor_thread,
870 executor);
871 end:
872 return executor;
873 }
874
875 void action_executor_destroy(struct action_executor *executor)
876 {
877 struct action_work_item *work_item, *tmp;
878
879 /* TODO Wait for work list to drain? */
880 lttng_thread_shutdown(executor->thread);
881 pthread_mutex_lock(&executor->work.lock);
882 if (executor->work.pending_count != 0) {
883 WARN("%" PRIu64
884 " trigger action%s still queued for execution and will be discarded",
885 executor->work.pending_count,
886 executor->work.pending_count == 1 ? " is" : "s are");
887 }
888
889 cds_list_for_each_entry_safe (work_item, tmp, &executor->work.list, list_node) {
890 WARN("Discarding action work item %" PRIu64 " associated to trigger `%s`",
891 work_item->id,
892 get_trigger_name(work_item->trigger));
893 cds_list_del(&work_item->list_node);
894 action_work_item_destroy(work_item);
895 }
896 pthread_mutex_unlock(&executor->work.lock);
897 lttng_thread_put(executor->thread);
898 }
899
900 /* RCU read-lock must be held by the caller. */
901 enum action_executor_status
902 action_executor_enqueue_trigger(struct action_executor *executor,
903 struct lttng_trigger *trigger,
904 struct lttng_evaluation *evaluation,
905 const struct lttng_credentials *object_creds,
906 struct notification_client_list *client_list)
907 {
908 int ret;
909 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
910 const uint64_t work_item_id = executor->next_work_item_id++;
911 struct action_work_item *work_item;
912 bool signal = false;
913
914 LTTNG_ASSERT(trigger);
915 ASSERT_RCU_READ_LOCKED();
916
917 pthread_mutex_lock(&executor->work.lock);
918 /* Check for queue overflow. */
919 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
920 /* Most likely spammy, remove if it is the case. */
921 DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
922 get_trigger_name(trigger),
923 work_item_id);
924 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
925 goto error_unlock;
926 }
927
928 work_item = zmalloc<action_work_item>();
929 if (!work_item) {
930 PERROR("Failed to allocate action executor work item: trigger name = `%s`",
931 get_trigger_name(trigger));
932 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
933 goto error_unlock;
934 }
935
936 lttng_trigger_get(trigger);
937 if (client_list) {
938 const bool reference_acquired = notification_client_list_get(client_list);
939
940 LTTNG_ASSERT(reference_acquired);
941 }
942
943 work_item->id = work_item_id;
944 work_item->trigger = trigger;
945
946 /* Ownership transferred to the work item. */
947 work_item->evaluation = evaluation;
948 evaluation = nullptr;
949
950 work_item->client_list = client_list;
951 work_item->object_creds.is_set = !!object_creds;
952 if (object_creds) {
953 work_item->object_creds.value = *object_creds;
954 }
955
956 CDS_INIT_LIST_HEAD(&work_item->list_node);
957
958 /* Build the array of action work subitems for the passed trigger. */
959 lttng_dynamic_array_init(&work_item->subitems,
960 sizeof(struct action_work_subitem),
961 action_work_subitem_destructor);
962
963 ret = populate_subitem_array_from_trigger(trigger, &work_item->subitems);
964 if (ret) {
965 ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
966 get_trigger_name(trigger));
967 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
968 goto error_unlock;
969 }
970
971 cds_list_add_tail(&work_item->list_node, &executor->work.list);
972 executor->work.pending_count++;
973 DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
974 get_trigger_name(trigger),
975 work_item_id);
976 signal = true;
977
978 error_unlock:
979 if (signal) {
980 pthread_cond_signal(&executor->work.cond);
981 }
982
983 pthread_mutex_unlock(&executor->work.lock);
984 lttng_evaluation_destroy(evaluation);
985 return executor_status;
986 }
987
988 static int add_action_to_subitem_array(struct lttng_action *action,
989 struct lttng_dynamic_array *subitems)
990 {
991 int ret = 0;
992 enum lttng_action_type type = lttng_action_get_type(action);
993 const char *session_name = nullptr;
994 enum lttng_action_status status;
995 struct action_work_subitem subitem = {
996 .action = nullptr,
997 .context = {
998 .session_id = LTTNG_OPTIONAL_INIT_UNSET,
999 },
1000 };
1001
1002 LTTNG_ASSERT(action);
1003 LTTNG_ASSERT(subitems);
1004
1005 if (type == LTTNG_ACTION_TYPE_LIST) {
1006 for (auto inner_action : lttng::ctl::action_list_view(action)) {
1007 LTTNG_ASSERT(inner_action);
1008
1009 ret = add_action_to_subitem_array(inner_action, subitems);
1010 if (ret) {
1011 goto end;
1012 }
1013 }
1014
1015 /*
1016 * Go directly to the end since there is no need to add the
1017 * list action by itself to the subitems array.
1018 */
1019 goto end;
1020 }
1021
1022 /* Gather execution context. */
1023 switch (type) {
1024 case LTTNG_ACTION_TYPE_NOTIFY:
1025 break;
1026 case LTTNG_ACTION_TYPE_START_SESSION:
1027 status = lttng_action_start_session_get_session_name(action, &session_name);
1028 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1029 break;
1030 case LTTNG_ACTION_TYPE_STOP_SESSION:
1031 status = lttng_action_stop_session_get_session_name(action, &session_name);
1032 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1033 break;
1034 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
1035 status = lttng_action_rotate_session_get_session_name(action, &session_name);
1036 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1037 break;
1038 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
1039 status = lttng_action_snapshot_session_get_session_name(action, &session_name);
1040 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1041 break;
1042 case LTTNG_ACTION_TYPE_LIST:
1043 case LTTNG_ACTION_TYPE_UNKNOWN:
1044 /* Fallthrough */
1045 default:
1046 abort();
1047 break;
1048 }
1049
1050 /*
1051 * Fetch the session execution context info as needed.
1052 * Note that we could decide to not add an action for which we know the
1053 * execution will not happen (i.e no session exists for that name). For
1054 * now we leave the decision to skip to the action executor for sake of
1055 * simplicity and consistency.
1056 */
1057 if (session_name != nullptr) {
1058 uint64_t session_id;
1059
1060 /*
1061 * Instantaneous sampling of the session id if present.
1062 *
1063 * This method is preferred over `sessiond_find_by_name` then
1064 * fetching the session'd id since `sessiond_find_by_name`
1065 * requires the session list lock to be taken.
1066 *
1067 * Taking the session list lock can lead to a deadlock
1068 * between the action executor and the notification thread
1069 * (caller of add_action_to_subitem_array). It is okay if the
1070 * session state changes between the enqueuing time and the
1071 * execution time. The execution context is validated at
1072 * execution time.
1073 */
1074 if (sample_session_id_by_name(session_name, &session_id)) {
1075 LTTNG_OPTIONAL_SET(&subitem.context.session_id, session_id);
1076 }
1077 }
1078
1079 /* Get a reference to the action. */
1080 lttng_action_get(action);
1081 subitem.action = action;
1082
1083 ret = lttng_dynamic_array_add_element(subitems, &subitem);
1084 if (ret) {
1085 ERR("Failed to add work subitem to the subitem array");
1086 lttng_action_put(action);
1087 ret = -1;
1088 goto end;
1089 }
1090
1091 end:
1092 return ret;
1093 }
1094
1095 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
1096 struct lttng_dynamic_array *subitems)
1097 {
1098 struct lttng_action *action;
1099
1100 action = lttng_trigger_get_action(trigger);
1101 LTTNG_ASSERT(action);
1102
1103 return add_action_to_subitem_array(action, subitems);
1104 }
This page took 0.050343 seconds and 3 git commands to generate.