Fix: file-descriptor: missing include guards
[lttng-tools.git] / src / bin / lttng-sessiond / action-executor.cpp
1 /*
2 * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "action-executor.hpp"
9 #include "cmd.hpp"
10 #include "health-sessiond.hpp"
11 #include "lttng-sessiond.hpp"
12 #include "notification-thread-internal.hpp"
13 #include "session.hpp"
14 #include "thread.hpp"
15
16 #include <common/dynamic-array.hpp>
17 #include <common/macros.hpp>
18 #include <common/optional.hpp>
19 #include <common/urcu.hpp>
20
21 #include <lttng/action/action-internal.hpp>
22 #include <lttng/action/list-internal.hpp>
23 #include <lttng/action/list.h>
24 #include <lttng/action/notify-internal.hpp>
25 #include <lttng/action/notify.h>
26 #include <lttng/action/rotate-session.h>
27 #include <lttng/action/snapshot-session.h>
28 #include <lttng/action/start-session.h>
29 #include <lttng/action/stop-session.h>
30 #include <lttng/condition/evaluation.h>
31 #include <lttng/condition/event-rule-matches-internal.hpp>
32 #include <lttng/lttng-error.h>
33 #include <lttng/trigger/trigger-internal.hpp>
34
35 #include <pthread.h>
36 #include <stdbool.h>
37 #include <stddef.h>
38 #include <urcu/list.h>
39
40 #define THREAD_NAME "Action Executor"
41 #define MAX_QUEUED_WORK_COUNT 8192
42
43 struct action_executor {
44 struct lttng_thread *thread;
45 struct notification_thread_handle *notification_thread_handle;
46 struct {
47 uint64_t pending_count;
48 struct cds_list_head list;
49 pthread_cond_t cond;
50 pthread_mutex_t lock;
51 } work;
52 bool should_quit;
53 uint64_t next_work_item_id;
54 };
55
56 namespace {
57 /*
58 * A work item is composed of a dynamic array of sub-items which
59 * represent a flattened, and augmented, version of a trigger's actions.
60 *
61 * We cannot rely solely on the trigger's actions since each action can have an
62 * execution context we need to comply with.
63 *
64 * The notion of execution context is required since for some actions the
65 * associated object are referenced by name and not by id. This can lead to
66 * a number of ambiguities when executing an action work item.
67 *
68 * For example, let's take a simple trigger such as:
69 * - condition: ust event a
70 * - action: start session S
71 *
72 * At time T, session S exists.
73 * At T + 1, the event A is hit.
74 * At T + 2, the tracer event notification is received and the work item is
75 * queued. Here session S have an id of 1.
76 * At T + 3, the session S is destroyed and a new session S is created, with a
77 * resulting id of 200.
78 * At T +4, the work item is popped from the queue and begin execution and will
79 * start session S with an id of 200 instead of the session S id 1 that was
80 * present at the queuing phase.
81 *
82 * The context to be respected is the one when the work item is queued. If the
83 * execution context is not the same at the moment of execution, we skip the
84 * execution of that sub-item.
85 *
86 * It is the same policy in regards to the validity of the associated
87 * trigger object at the moment of execution, if the trigger is found to be
88 * unregistered, the execution is skipped.
89 */
90 struct action_work_item {
91 uint64_t id;
92
93 /*
94 * The actions to be executed with their respective execution context.
95 * See struct `action_work_subitem`.
96 */
97 struct lttng_dynamic_array subitems;
98
99 /* Execution context data */
100 struct lttng_trigger *trigger;
101 struct lttng_evaluation *evaluation;
102 struct notification_client_list *client_list;
103 LTTNG_OPTIONAL(struct lttng_credentials) object_creds;
104 struct cds_list_head list_node;
105 };
106
107 struct action_work_subitem {
108 struct lttng_action *action;
109 struct {
110 /* Used by actions targeting a session. */
111 LTTNG_OPTIONAL(uint64_t) session_id;
112 } context;
113 };
114 } /* namespace */
115
116 /*
117 * Only return non-zero on a fatal error that should shut down the action
118 * executor.
119 */
120 using action_executor_handler = int (*)(struct action_executor *,
121 const struct action_work_item *,
122 struct action_work_subitem *);
123
124 static int action_executor_notify_handler(struct action_executor *executor,
125 const struct action_work_item *,
126 struct action_work_subitem *);
127 static int action_executor_start_session_handler(struct action_executor *executor,
128 const struct action_work_item *,
129 struct action_work_subitem *);
130 static int action_executor_stop_session_handler(struct action_executor *executor,
131 const struct action_work_item *,
132 struct action_work_subitem *);
133 static int action_executor_rotate_session_handler(struct action_executor *executor,
134 const struct action_work_item *,
135 struct action_work_subitem *);
136 static int action_executor_snapshot_session_handler(struct action_executor *executor,
137 const struct action_work_item *,
138 struct action_work_subitem *);
139 static int action_executor_list_handler(struct action_executor *executor,
140 const struct action_work_item *,
141 struct action_work_subitem *);
142 static int action_executor_generic_handler(struct action_executor *executor,
143 const struct action_work_item *,
144 struct action_work_subitem *);
145
146 static const action_executor_handler action_executors[] = {
147 action_executor_notify_handler, action_executor_start_session_handler,
148 action_executor_stop_session_handler, action_executor_rotate_session_handler,
149 action_executor_snapshot_session_handler, action_executor_list_handler,
150 };
151
152 /* Forward declaration */
153 static int add_action_to_subitem_array(struct lttng_action *action,
154 struct lttng_dynamic_array *subitems);
155
156 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
157 struct lttng_dynamic_array *subitems);
158
159 static void action_work_subitem_destructor(void *element)
160 {
161 struct action_work_subitem *subitem = (action_work_subitem *) element;
162
163 lttng_action_put(subitem->action);
164 }
165
166 static const char *get_action_name(const struct lttng_action *action)
167 {
168 const enum lttng_action_type action_type = lttng_action_get_type(action);
169
170 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
171
172 return lttng_action_type_string(action_type);
173 }
174
175 /* Check if this trigger allowed to interect with a given session. */
176 static bool is_trigger_allowed_for_session(const struct lttng_trigger *trigger,
177 struct ltt_session *session)
178 {
179 bool is_allowed = false;
180 const struct lttng_credentials session_creds = {
181 .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
182 .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
183 };
184 /* Can never be NULL. */
185 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
186
187 is_allowed = (lttng_credentials_is_equal_uid(trigger_creds, &session_creds)) ||
188 (lttng_credentials_get_uid(trigger_creds) == 0);
189 if (!is_allowed) {
190 WARN("Trigger is not allowed to interact with session `%s`: session uid = %ld, session gid = %ld, trigger uid = %ld",
191 session->name,
192 (long int) session->uid,
193 (long int) session->gid,
194 (long int) lttng_credentials_get_uid(trigger_creds));
195 }
196
197 return is_allowed;
198 }
199
200 static const char *get_trigger_name(const struct lttng_trigger *trigger)
201 {
202 const char *trigger_name;
203 enum lttng_trigger_status trigger_status;
204
205 trigger_status = lttng_trigger_get_name(trigger, &trigger_name);
206 switch (trigger_status) {
207 case LTTNG_TRIGGER_STATUS_OK:
208 break;
209 case LTTNG_TRIGGER_STATUS_UNSET:
210 trigger_name = "(anonymous)";
211 break;
212 default:
213 trigger_name = "(failed to get name)";
214 break;
215 }
216
217 return trigger_name;
218 }
219
220 static int client_handle_transmission_status(struct notification_client *client,
221 enum client_transmission_status status,
222 void *user_data)
223 {
224 int ret = 0;
225 struct action_executor *executor = (action_executor *) user_data;
226 bool update_communication = true;
227
228 switch (status) {
229 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
230 DBG("Successfully sent full notification to client, client_id = %" PRIu64,
231 client->id);
232 /*
233 * There is no need to wake the (e)poll thread. If it was waiting for
234 * "out" events on the client's socket, it will see that no payload
235 * in queued and will unsubscribe from that event.
236 *
237 * In the other cases, we have to wake the the (e)poll thread to either
238 * handle the error on the client or to get it to monitor the client "out"
239 * events.
240 */
241 update_communication = false;
242 break;
243 case CLIENT_TRANSMISSION_STATUS_QUEUED:
244 DBG("Queued notification in client outgoing buffer, client_id = %" PRIu64,
245 client->id);
246 break;
247 case CLIENT_TRANSMISSION_STATUS_FAIL:
248 DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
249 client->id);
250 break;
251 default:
252 ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
253 client->id);
254 ret = -1;
255 goto end;
256 }
257
258 if (!update_communication) {
259 goto end;
260 }
261
262 /* Safe to read client's id without locking as it is immutable. */
263 ret = notification_thread_client_communication_update(
264 executor->notification_thread_handle, client->id, status);
265 end:
266 return ret;
267 }
268
269 static int action_executor_notify_handler(struct action_executor *executor,
270 const struct action_work_item *work_item,
271 struct action_work_subitem *item __attribute__((unused)))
272 {
273 return notification_client_list_send_evaluation(
274 work_item->client_list,
275 work_item->trigger,
276 work_item->evaluation,
277 work_item->object_creds.is_set ? &(work_item->object_creds.value) : nullptr,
278 client_handle_transmission_status,
279 executor);
280 }
281
282 static int action_executor_start_session_handler(struct action_executor *executor
283 __attribute__((unused)),
284 const struct action_work_item *work_item,
285 struct action_work_subitem *item)
286 {
287 int ret = 0;
288 const char *session_name;
289 enum lttng_action_status action_status;
290 struct ltt_session *session;
291 enum lttng_error_code cmd_ret;
292 struct lttng_action *action = item->action;
293
294 lttng::urcu::read_lock_guard read_lock;
295
296 action_status = lttng_action_start_session_get_session_name(action, &session_name);
297 if (action_status != LTTNG_ACTION_STATUS_OK) {
298 ERR("Failed to get session name from `%s` action", get_action_name(action));
299 ret = -1;
300 goto end;
301 }
302
303 /*
304 * Validate if at the moment of the action was queued the session
305 * existed. If not skip the action altogether.
306 */
307 if (!item->context.session_id.is_set) {
308 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
309 session_name,
310 get_action_name(action),
311 get_trigger_name(work_item->trigger));
312 lttng_action_increase_execution_failure_count(action);
313 goto end;
314 }
315
316 session_lock_list();
317 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
318 if (!session) {
319 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
320 session_name,
321 get_action_name(action),
322 get_trigger_name(work_item->trigger));
323 lttng_action_increase_execution_failure_count(action);
324 goto error_unlock_list;
325 }
326
327 session_lock(session);
328 if (session->destroyed) {
329 DBG("Session `%s` with id = %" PRIu64
330 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
331 session->name,
332 session->id,
333 get_action_name(action),
334 get_trigger_name(work_item->trigger));
335 goto error_unlock_session;
336 }
337
338 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
339 goto error_unlock_session;
340 }
341
342 cmd_ret = (lttng_error_code) cmd_start_trace(session);
343 switch (cmd_ret) {
344 case LTTNG_OK:
345 DBG("Successfully started session `%s` on behalf of trigger `%s`",
346 session_name,
347 get_trigger_name(work_item->trigger));
348 break;
349 case LTTNG_ERR_TRACE_ALREADY_STARTED:
350 DBG("Attempted to start session `%s` on behalf of trigger `%s` but it was already started",
351 session_name,
352 get_trigger_name(work_item->trigger));
353 break;
354 default:
355 WARN("Failed to start session `%s` on behalf of trigger `%s`: %s",
356 session_name,
357 get_trigger_name(work_item->trigger),
358 lttng_strerror(-cmd_ret));
359 lttng_action_increase_execution_failure_count(action);
360 break;
361 }
362
363 error_unlock_session:
364 session_unlock(session);
365 session_put(session);
366 error_unlock_list:
367 session_unlock_list();
368 end:
369 return ret;
370 }
371
372 static int action_executor_stop_session_handler(struct action_executor *executor
373 __attribute__((unused)),
374 const struct action_work_item *work_item,
375 struct action_work_subitem *item)
376 {
377 int ret = 0;
378 const char *session_name;
379 enum lttng_action_status action_status;
380 struct ltt_session *session;
381 enum lttng_error_code cmd_ret;
382 struct lttng_action *action = item->action;
383
384 lttng::urcu::read_lock_guard read_lock;
385
386 action_status = lttng_action_stop_session_get_session_name(action, &session_name);
387 if (action_status != LTTNG_ACTION_STATUS_OK) {
388 ERR("Failed to get session name from `%s` action", get_action_name(action));
389 ret = -1;
390 goto end;
391 }
392
393 /*
394 * Validate if, at the moment the action was queued, the target session
395 * existed. If not, skip the action altogether.
396 */
397 if (!item->context.session_id.is_set) {
398 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
399 session_name,
400 get_action_name(action),
401 get_trigger_name(work_item->trigger));
402 lttng_action_increase_execution_failure_count(action);
403 goto end;
404 }
405
406 session_lock_list();
407 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
408 if (!session) {
409 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
410 session_name,
411 get_action_name(action),
412 get_trigger_name(work_item->trigger));
413 lttng_action_increase_execution_failure_count(action);
414 goto error_unlock_list;
415 }
416
417 session_lock(session);
418 if (session->destroyed) {
419 DBG("Session `%s` with id = %" PRIu64
420 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
421 session->name,
422 session->id,
423 get_action_name(action),
424 get_trigger_name(work_item->trigger));
425 goto error_unlock_session;
426 }
427
428 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
429 goto error_unlock_session;
430 }
431
432 cmd_ret = (lttng_error_code) cmd_stop_trace(session);
433 switch (cmd_ret) {
434 case LTTNG_OK:
435 DBG("Successfully stopped session `%s` on behalf of trigger `%s`",
436 session_name,
437 get_trigger_name(work_item->trigger));
438 break;
439 case LTTNG_ERR_TRACE_ALREADY_STOPPED:
440 DBG("Attempted to stop session `%s` on behalf of trigger `%s` but it was already stopped",
441 session_name,
442 get_trigger_name(work_item->trigger));
443 break;
444 default:
445 WARN("Failed to stop session `%s` on behalf of trigger `%s`: %s",
446 session_name,
447 get_trigger_name(work_item->trigger),
448 lttng_strerror(-cmd_ret));
449 lttng_action_increase_execution_failure_count(action);
450 break;
451 }
452
453 error_unlock_session:
454 session_unlock(session);
455 session_put(session);
456 error_unlock_list:
457 session_unlock_list();
458 end:
459 return ret;
460 }
461
462 static int action_executor_rotate_session_handler(struct action_executor *executor
463 __attribute__((unused)),
464 const struct action_work_item *work_item,
465 struct action_work_subitem *item)
466 {
467 int ret = 0;
468 const char *session_name;
469 enum lttng_action_status action_status;
470 struct ltt_session *session;
471 enum lttng_error_code cmd_ret;
472 struct lttng_action *action = item->action;
473
474 lttng::urcu::read_lock_guard read_lock;
475
476 action_status = lttng_action_rotate_session_get_session_name(action, &session_name);
477 if (action_status != LTTNG_ACTION_STATUS_OK) {
478 ERR("Failed to get session name from `%s` action", get_action_name(action));
479 ret = -1;
480 goto end;
481 }
482
483 /*
484 * Validate if, at the moment the action was queued, the target session
485 * existed. If not, skip the action altogether.
486 */
487 if (!item->context.session_id.is_set) {
488 DBG("Session `%s` was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
489 session_name,
490 get_action_name(action),
491 get_trigger_name(work_item->trigger));
492 lttng_action_increase_execution_failure_count(action);
493 goto end;
494 }
495
496 session_lock_list();
497 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
498 if (!session) {
499 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
500 session_name,
501 get_action_name(action),
502 get_trigger_name(work_item->trigger));
503 lttng_action_increase_execution_failure_count(action);
504 goto error_unlock_list;
505 }
506
507 session_lock(session);
508 if (session->destroyed) {
509 DBG("Session `%s` with id = %" PRIu64
510 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
511 session->name,
512 session->id,
513 get_action_name(action),
514 get_trigger_name(work_item->trigger));
515 goto error_unlock_session;
516 }
517
518 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
519 goto error_unlock_session;
520 }
521
522 cmd_ret = (lttng_error_code) cmd_rotate_session(
523 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
524 switch (cmd_ret) {
525 case LTTNG_OK:
526 DBG("Successfully started rotation of session `%s` on behalf of trigger `%s`",
527 session_name,
528 get_trigger_name(work_item->trigger));
529 break;
530 case LTTNG_ERR_ROTATION_PENDING:
531 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation is already ongoing",
532 session_name,
533 get_trigger_name(work_item->trigger));
534 lttng_action_increase_execution_failure_count(action);
535 break;
536 case LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
537 case LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
538 DBG("Attempted to start a rotation of session `%s` on behalf of trigger `%s` but a rotation has already been completed since the last stop or clear",
539 session_name,
540 get_trigger_name(work_item->trigger));
541 break;
542 default:
543 WARN("Failed to start a rotation of session `%s` on behalf of trigger `%s`: %s",
544 session_name,
545 get_trigger_name(work_item->trigger),
546 lttng_strerror(-cmd_ret));
547 lttng_action_increase_execution_failure_count(action);
548 break;
549 }
550
551 error_unlock_session:
552 session_unlock(session);
553 session_put(session);
554 error_unlock_list:
555 session_unlock_list();
556 end:
557 return ret;
558 }
559
560 static int action_executor_snapshot_session_handler(struct action_executor *executor
561 __attribute__((unused)),
562 const struct action_work_item *work_item,
563 struct action_work_subitem *item)
564 {
565 int ret = 0;
566 const char *session_name;
567 enum lttng_action_status action_status;
568 struct ltt_session *session;
569 lttng_snapshot_output default_snapshot_output;
570 const struct lttng_snapshot_output *snapshot_output = &default_snapshot_output;
571 enum lttng_error_code cmd_ret;
572 struct lttng_action *action = item->action;
573
574 default_snapshot_output.max_size = UINT64_MAX;
575
576 lttng::urcu::read_lock_guard read_lock;
577
578 /*
579 * Validate if, at the moment the action was queued, the target session
580 * existed. If not, skip the action altogether.
581 */
582 if (!item->context.session_id.is_set) {
583 DBG("Session was not present at the moment the work item was enqueued for `%s` action of trigger `%s`",
584 get_action_name(action),
585 get_trigger_name(work_item->trigger));
586 lttng_action_increase_execution_failure_count(action);
587 goto end;
588 }
589
590 action_status = lttng_action_snapshot_session_get_session_name(action, &session_name);
591 if (action_status != LTTNG_ACTION_STATUS_OK) {
592 ERR("Failed to get session name from `%s` action", get_action_name(action));
593 ret = -1;
594 goto end;
595 }
596
597 action_status = lttng_action_snapshot_session_get_output(action, &snapshot_output);
598 if (action_status != LTTNG_ACTION_STATUS_OK && action_status != LTTNG_ACTION_STATUS_UNSET) {
599 ERR("Failed to get output from `%s` action", get_action_name(action));
600 ret = -1;
601 goto end;
602 }
603
604 session_lock_list();
605 session = session_find_by_id(LTTNG_OPTIONAL_GET(item->context.session_id));
606 if (!session) {
607 DBG("Failed to find session `%s` by name while executing `%s` action of trigger `%s`",
608 session_name,
609 get_action_name(action),
610 get_trigger_name(work_item->trigger));
611 lttng_action_increase_execution_failure_count(action);
612 goto error_unlock_list;
613 }
614
615 session_lock(session);
616 if (session->destroyed) {
617 DBG("Session `%s` with id = %" PRIu64
618 " is flagged as destroyed. Skipping: action = `%s`, trigger = `%s`",
619 session->name,
620 session->id,
621 get_action_name(action),
622 get_trigger_name(work_item->trigger));
623 goto error_unlock_session;
624 }
625
626 if (!is_trigger_allowed_for_session(work_item->trigger, session)) {
627 goto error_unlock_session;
628 }
629
630 cmd_ret = (lttng_error_code) cmd_snapshot_record(session, snapshot_output, 0);
631 switch (cmd_ret) {
632 case LTTNG_OK:
633 DBG("Successfully recorded snapshot of session `%s` on behalf of trigger `%s`",
634 session_name,
635 get_trigger_name(work_item->trigger));
636 break;
637 default:
638 WARN("Failed to record snapshot of session `%s` on behalf of trigger `%s`: %s",
639 session_name,
640 get_trigger_name(work_item->trigger),
641 lttng_strerror(-cmd_ret));
642 lttng_action_increase_execution_failure_count(action);
643 break;
644 }
645
646 error_unlock_session:
647 session_unlock(session);
648 session_put(session);
649 error_unlock_list:
650 session_unlock_list();
651 end:
652 return ret;
653 }
654
655 static int action_executor_list_handler(struct action_executor *executor __attribute__((unused)),
656 const struct action_work_item *work_item
657 __attribute__((unused)),
658 struct action_work_subitem *item __attribute__((unused)))
659 {
660 ERR("Execution of a list action by the action executor should never occur");
661 abort();
662 }
663
664 static int action_executor_generic_handler(struct action_executor *executor,
665 const struct action_work_item *work_item,
666 struct action_work_subitem *item)
667 {
668 int ret;
669 struct lttng_action *action = item->action;
670 const enum lttng_action_type action_type = lttng_action_get_type(action);
671
672 LTTNG_ASSERT(action_type != LTTNG_ACTION_TYPE_UNKNOWN);
673
674 lttng_action_increase_execution_request_count(action);
675 if (!lttng_action_should_execute(action)) {
676 DBG("Policy prevented execution of action `%s` of trigger `%s` action work item %" PRIu64,
677 get_action_name(action),
678 get_trigger_name(work_item->trigger),
679 work_item->id);
680 ret = 0;
681 goto end;
682 }
683
684 lttng_action_increase_execution_count(action);
685 DBG("Executing action `%s` of trigger `%s` action work item %" PRIu64,
686 get_action_name(action),
687 get_trigger_name(work_item->trigger),
688 work_item->id);
689 ret = action_executors[action_type](executor, work_item, item);
690 end:
691 return ret;
692 }
693
694 static int action_work_item_execute(struct action_executor *executor,
695 struct action_work_item *work_item)
696 {
697 int ret;
698 size_t count, i;
699
700 DBG("Starting execution of action work item %" PRIu64 " of trigger `%s`",
701 work_item->id,
702 get_trigger_name(work_item->trigger));
703
704 count = lttng_dynamic_array_get_count(&work_item->subitems);
705 for (i = 0; i < count; i++) {
706 struct action_work_subitem *item;
707
708 item = (action_work_subitem *) lttng_dynamic_array_get_element(&work_item->subitems,
709 i);
710 ret = action_executor_generic_handler(executor, work_item, item);
711 if (ret) {
712 goto end;
713 }
714 }
715 end:
716 DBG("Completed execution of action work item %" PRIu64 " of trigger `%s`",
717 work_item->id,
718 get_trigger_name(work_item->trigger));
719 return ret;
720 }
721
722 static void action_work_item_destroy(struct action_work_item *work_item)
723 {
724 lttng_trigger_put(work_item->trigger);
725 lttng_evaluation_destroy(work_item->evaluation);
726 notification_client_list_put(work_item->client_list);
727 lttng_dynamic_array_reset(&work_item->subitems);
728 free(work_item);
729 }
730
731 static void *action_executor_thread(void *_data)
732 {
733 struct action_executor *executor = (action_executor *) _data;
734
735 LTTNG_ASSERT(executor);
736
737 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ACTION_EXECUTOR);
738
739 rcu_register_thread();
740 rcu_thread_online();
741
742 DBG("Entering work execution loop");
743 pthread_mutex_lock(&executor->work.lock);
744 while (!executor->should_quit) {
745 int ret = 0;
746 struct action_work_item *work_item;
747
748 health_code_update();
749 if (executor->work.pending_count == 0) {
750 health_poll_entry();
751 DBG("No work items enqueued, entering wait");
752 pthread_cond_wait(&executor->work.cond, &executor->work.lock);
753 DBG("Woke-up from wait");
754 health_poll_exit();
755 continue;
756 }
757
758 /* Pop item from front of the list with work lock held. */
759 work_item = cds_list_first_entry(
760 &executor->work.list, struct action_work_item, list_node);
761 cds_list_del(&work_item->list_node);
762 executor->work.pending_count--;
763
764 /*
765 * Work can be performed without holding the work lock,
766 * allowing new items to be queued.
767 */
768 pthread_mutex_unlock(&executor->work.lock);
769
770 /* Execute item only if a trigger is registered. */
771 lttng_trigger_lock(work_item->trigger);
772 if (!lttng_trigger_is_registered(work_item->trigger)) {
773 const char *trigger_name = nullptr;
774 uid_t trigger_owner_uid;
775 enum lttng_trigger_status trigger_status;
776
777 trigger_name = get_trigger_name(work_item->trigger);
778
779 trigger_status =
780 lttng_trigger_get_owner_uid(work_item->trigger, &trigger_owner_uid);
781 LTTNG_ASSERT(trigger_status == LTTNG_TRIGGER_STATUS_OK);
782
783 DBG("Work item skipped since the associated trigger is no longer registered: work item id = %" PRIu64
784 ", trigger name = `%s`, trigger owner uid = %d",
785 work_item->id,
786 trigger_name,
787 (int) trigger_owner_uid);
788 ret = 0;
789 goto skip_execute;
790 }
791
792 ret = action_work_item_execute(executor, work_item);
793
794 skip_execute:
795 lttng_trigger_unlock(work_item->trigger);
796 action_work_item_destroy(work_item);
797 if (ret) {
798 /* Fatal error. */
799 break;
800 }
801
802 health_code_update();
803 pthread_mutex_lock(&executor->work.lock);
804 }
805
806 if (executor->should_quit) {
807 pthread_mutex_unlock(&executor->work.lock);
808 }
809 DBG("Left work execution loop");
810
811 health_code_update();
812
813 rcu_thread_offline();
814 rcu_unregister_thread();
815 health_unregister(the_health_sessiond);
816
817 return nullptr;
818 }
819
820 static bool shutdown_action_executor_thread(void *_data)
821 {
822 struct action_executor *executor = (action_executor *) _data;
823
824 pthread_mutex_lock(&executor->work.lock);
825 executor->should_quit = true;
826 pthread_cond_signal(&executor->work.cond);
827 pthread_mutex_unlock(&executor->work.lock);
828 return true;
829 }
830
831 static void clean_up_action_executor_thread(void *_data)
832 {
833 struct action_executor *executor = (action_executor *) _data;
834
835 LTTNG_ASSERT(cds_list_empty(&executor->work.list));
836
837 pthread_mutex_destroy(&executor->work.lock);
838 pthread_cond_destroy(&executor->work.cond);
839 free(executor);
840 }
841
842 struct action_executor *action_executor_create(struct notification_thread_handle *handle)
843 {
844 struct action_executor *executor = zmalloc<action_executor>();
845
846 if (!executor) {
847 goto end;
848 }
849
850 CDS_INIT_LIST_HEAD(&executor->work.list);
851 pthread_cond_init(&executor->work.cond, nullptr);
852 pthread_mutex_init(&executor->work.lock, nullptr);
853 executor->notification_thread_handle = handle;
854
855 executor->thread = lttng_thread_create(THREAD_NAME,
856 action_executor_thread,
857 shutdown_action_executor_thread,
858 clean_up_action_executor_thread,
859 executor);
860 end:
861 return executor;
862 }
863
864 void action_executor_destroy(struct action_executor *executor)
865 {
866 struct action_work_item *work_item, *tmp;
867
868 /* TODO Wait for work list to drain? */
869 lttng_thread_shutdown(executor->thread);
870 pthread_mutex_lock(&executor->work.lock);
871 if (executor->work.pending_count != 0) {
872 WARN("%" PRIu64
873 " trigger action%s still queued for execution and will be discarded",
874 executor->work.pending_count,
875 executor->work.pending_count == 1 ? " is" : "s are");
876 }
877
878 cds_list_for_each_entry_safe (work_item, tmp, &executor->work.list, list_node) {
879 WARN("Discarding action work item %" PRIu64 " associated to trigger `%s`",
880 work_item->id,
881 get_trigger_name(work_item->trigger));
882 cds_list_del(&work_item->list_node);
883 action_work_item_destroy(work_item);
884 }
885 pthread_mutex_unlock(&executor->work.lock);
886 lttng_thread_put(executor->thread);
887 }
888
889 /* RCU read-lock must be held by the caller. */
890 enum action_executor_status
891 action_executor_enqueue_trigger(struct action_executor *executor,
892 struct lttng_trigger *trigger,
893 struct lttng_evaluation *evaluation,
894 const struct lttng_credentials *object_creds,
895 struct notification_client_list *client_list)
896 {
897 int ret;
898 enum action_executor_status executor_status = ACTION_EXECUTOR_STATUS_OK;
899 const uint64_t work_item_id = executor->next_work_item_id++;
900 struct action_work_item *work_item;
901 bool signal = false;
902
903 LTTNG_ASSERT(trigger);
904 ASSERT_RCU_READ_LOCKED();
905
906 pthread_mutex_lock(&executor->work.lock);
907 /* Check for queue overflow. */
908 if (executor->work.pending_count >= MAX_QUEUED_WORK_COUNT) {
909 /* Most likely spammy, remove if it is the case. */
910 DBG("Refusing to enqueue action for trigger (overflow): trigger name = `%s`, work item id = %" PRIu64,
911 get_trigger_name(trigger),
912 work_item_id);
913 executor_status = ACTION_EXECUTOR_STATUS_OVERFLOW;
914 goto error_unlock;
915 }
916
917 work_item = zmalloc<action_work_item>();
918 if (!work_item) {
919 PERROR("Failed to allocate action executor work item: trigger name = `%s`",
920 get_trigger_name(trigger));
921 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
922 goto error_unlock;
923 }
924
925 lttng_trigger_get(trigger);
926 if (client_list) {
927 const bool reference_acquired = notification_client_list_get(client_list);
928
929 LTTNG_ASSERT(reference_acquired);
930 }
931
932 work_item->id = work_item_id;
933 work_item->trigger = trigger;
934
935 /* Ownership transferred to the work item. */
936 work_item->evaluation = evaluation;
937 evaluation = nullptr;
938
939 work_item->client_list = client_list;
940 work_item->object_creds.is_set = !!object_creds;
941 if (object_creds) {
942 work_item->object_creds.value = *object_creds;
943 }
944
945 CDS_INIT_LIST_HEAD(&work_item->list_node);
946
947 /* Build the array of action work subitems for the passed trigger. */
948 lttng_dynamic_array_init(&work_item->subitems,
949 sizeof(struct action_work_subitem),
950 action_work_subitem_destructor);
951
952 ret = populate_subitem_array_from_trigger(trigger, &work_item->subitems);
953 if (ret) {
954 ERR("Failed to populate work item sub items on behalf of trigger: trigger name = `%s`",
955 get_trigger_name(trigger));
956 executor_status = ACTION_EXECUTOR_STATUS_ERROR;
957 goto error_unlock;
958 }
959
960 cds_list_add_tail(&work_item->list_node, &executor->work.list);
961 executor->work.pending_count++;
962 DBG("Enqueued action for trigger: trigger name = `%s`, work item id = %" PRIu64,
963 get_trigger_name(trigger),
964 work_item_id);
965 signal = true;
966
967 error_unlock:
968 if (signal) {
969 pthread_cond_signal(&executor->work.cond);
970 }
971
972 pthread_mutex_unlock(&executor->work.lock);
973 lttng_evaluation_destroy(evaluation);
974 return executor_status;
975 }
976
977 static int add_action_to_subitem_array(struct lttng_action *action,
978 struct lttng_dynamic_array *subitems)
979 {
980 int ret = 0;
981 enum lttng_action_type type = lttng_action_get_type(action);
982 const char *session_name = nullptr;
983 enum lttng_action_status status;
984 struct action_work_subitem subitem = {
985 .action = nullptr,
986 .context = {
987 .session_id = LTTNG_OPTIONAL_INIT_UNSET,
988 },
989 };
990
991 LTTNG_ASSERT(action);
992 LTTNG_ASSERT(subitems);
993
994 if (type == LTTNG_ACTION_TYPE_LIST) {
995 unsigned int count, i;
996
997 status = lttng_action_list_get_count(action, &count);
998 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
999
1000 for (i = 0; i < count; i++) {
1001 struct lttng_action *inner_action = nullptr;
1002
1003 inner_action = lttng_action_list_borrow_mutable_at_index(action, i);
1004 LTTNG_ASSERT(inner_action);
1005 ret = add_action_to_subitem_array(inner_action, subitems);
1006 if (ret) {
1007 goto end;
1008 }
1009 }
1010
1011 /*
1012 * Go directly to the end since there is no need to add the
1013 * list action by itself to the subitems array.
1014 */
1015 goto end;
1016 }
1017
1018 /* Gather execution context. */
1019 switch (type) {
1020 case LTTNG_ACTION_TYPE_NOTIFY:
1021 break;
1022 case LTTNG_ACTION_TYPE_START_SESSION:
1023 status = lttng_action_start_session_get_session_name(action, &session_name);
1024 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1025 break;
1026 case LTTNG_ACTION_TYPE_STOP_SESSION:
1027 status = lttng_action_stop_session_get_session_name(action, &session_name);
1028 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1029 break;
1030 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
1031 status = lttng_action_rotate_session_get_session_name(action, &session_name);
1032 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1033 break;
1034 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
1035 status = lttng_action_snapshot_session_get_session_name(action, &session_name);
1036 LTTNG_ASSERT(status == LTTNG_ACTION_STATUS_OK);
1037 break;
1038 case LTTNG_ACTION_TYPE_LIST:
1039 case LTTNG_ACTION_TYPE_UNKNOWN:
1040 /* Fallthrough */
1041 default:
1042 abort();
1043 break;
1044 }
1045
1046 /*
1047 * Fetch the session execution context info as needed.
1048 * Note that we could decide to not add an action for which we know the
1049 * execution will not happen (i.e no session exists for that name). For
1050 * now we leave the decision to skip to the action executor for sake of
1051 * simplicity and consistency.
1052 */
1053 if (session_name != nullptr) {
1054 uint64_t session_id;
1055
1056 /*
1057 * Instantaneous sampling of the session id if present.
1058 *
1059 * This method is preferred over `sessiond_find_by_name` then
1060 * fetching the session'd id since `sessiond_find_by_name`
1061 * requires the session list lock to be taken.
1062 *
1063 * Taking the session list lock can lead to a deadlock
1064 * between the action executor and the notification thread
1065 * (caller of add_action_to_subitem_array). It is okay if the
1066 * session state changes between the enqueuing time and the
1067 * execution time. The execution context is validated at
1068 * execution time.
1069 */
1070 if (sample_session_id_by_name(session_name, &session_id)) {
1071 LTTNG_OPTIONAL_SET(&subitem.context.session_id, session_id);
1072 }
1073 }
1074
1075 /* Get a reference to the action. */
1076 lttng_action_get(action);
1077 subitem.action = action;
1078
1079 ret = lttng_dynamic_array_add_element(subitems, &subitem);
1080 if (ret) {
1081 ERR("Failed to add work subitem to the subitem array");
1082 lttng_action_put(action);
1083 ret = -1;
1084 goto end;
1085 }
1086
1087 end:
1088 return ret;
1089 }
1090
1091 static int populate_subitem_array_from_trigger(struct lttng_trigger *trigger,
1092 struct lttng_dynamic_array *subitems)
1093 {
1094 struct lttng_action *action;
1095
1096 action = lttng_trigger_get_action(trigger);
1097 LTTNG_ASSERT(action);
1098
1099 return add_action_to_subitem_array(action, subitems);
1100 }
This page took 0.053688 seconds and 4 git commands to generate.