clang-tidy: apply suggested fixes
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
1 /*
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9 #define _LGPL_SOURCE
10 #include "cmd.hpp"
11 #include "health-sessiond.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "notification-thread-commands.hpp"
14 #include "rotation-thread.hpp"
15 #include "session.hpp"
16 #include "thread.hpp"
17 #include "timer.hpp"
18 #include "utils.hpp"
19
20 #include <common/align.hpp>
21 #include <common/config/session-config.hpp>
22 #include <common/defaults.hpp>
23 #include <common/error.hpp>
24 #include <common/eventfd.hpp>
25 #include <common/exception.hpp>
26 #include <common/file-descriptor.hpp>
27 #include <common/format.hpp>
28 #include <common/futex.hpp>
29 #include <common/hashtable/utils.hpp>
30 #include <common/kernel-ctl/kernel-ctl.hpp>
31 #include <common/locked-reference.hpp>
32 #include <common/make-unique-wrapper.hpp>
33 #include <common/pthread-lock.hpp>
34 #include <common/scope-exit.hpp>
35 #include <common/time.hpp>
36 #include <common/urcu.hpp>
37 #include <common/utils.hpp>
38
39 #include <lttng/action/action-internal.hpp>
40 #include <lttng/condition/condition-internal.hpp>
41 #include <lttng/location-internal.hpp>
42 #include <lttng/notification/channel-internal.hpp>
43 #include <lttng/notification/notification-internal.hpp>
44 #include <lttng/rotate-internal.hpp>
45 #include <lttng/trigger/trigger.h>
46
47 #include <fcntl.h>
48 #include <inttypes.h>
49 #include <memory>
50 #include <signal.h>
51 #include <sys/eventfd.h>
52 #include <sys/stat.h>
53 #include <time.h>
54 #include <urcu.h>
55 #include <urcu/list.h>
56
57 namespace ls = lttng::sessiond;
58
59 /*
60 * The timer thread enqueues jobs and wakes up the rotation thread.
61 * When the rotation thread wakes up, it empties the queue.
62 */
63 struct ls::rotation_thread_timer_queue {
64 struct lttng_pipe *event_pipe;
65 struct cds_list_head list;
66 pthread_mutex_t lock;
67 };
68
69 namespace {
70 struct rotation_thread_job {
71 using uptr =
72 std::unique_ptr<rotation_thread_job,
73 lttng::memory::create_deleter_class<rotation_thread_job,
74 lttng::memory::free>::deleter>;
75
76 enum ls::rotation_thread_job_type type;
77 struct ltt_session *session;
78 /* List member in struct rotation_thread_timer_queue. */
79 struct cds_list_head head;
80 };
81
82 const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
83 {
84 switch (job_type) {
85 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
86 return "CHECK_PENDING_ROTATION";
87 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
88 return "SCHEDULED_ROTATION";
89 default:
90 abort();
91 }
92 }
93
94 /*
95 * Called with the rotation_thread_timer_queue lock held.
96 * Return true if the same timer job already exists in the queue, false if not.
97 */
98 bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
99 ls::rotation_thread_job_type job_type,
100 ltt_session *session)
101 {
102 bool exists = false;
103 struct rotation_thread_job *job;
104
105 cds_list_for_each_entry (job, &queue->list, head) {
106 if (job->session == session && job->type == job_type) {
107 exists = true;
108 goto end;
109 }
110 }
111 end:
112 return exists;
113 }
114
115 void check_session_rotation_pending_on_consumers(const ltt_session::locked_ref& session,
116 bool& _rotation_completed)
117 {
118 int ret = 0;
119 struct consumer_socket *socket;
120 struct cds_lfht_iter iter;
121 enum consumer_trace_chunk_exists_status exists_status;
122 uint64_t relayd_id;
123 bool chunk_exists_on_peer = false;
124 enum lttng_trace_chunk_status chunk_status;
125 const lttng::urcu::read_lock_guard read_lock;
126
127 LTTNG_ASSERT(session->chunk_being_archived);
128
129 /*
130 * Check for a local pending rotation on all consumers (32-bit
131 * user space, 64-bit user space, and kernel).
132 */
133 if (!session->ust_session) {
134 goto skip_ust;
135 }
136
137 cds_lfht_for_each_entry (
138 session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
139 relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
140 -1ULL :
141 session->ust_session->consumer->net_seq_index;
142
143 const lttng::pthread::lock_guard socket_lock(*socket->lock);
144 ret = consumer_trace_chunk_exists(socket,
145 relayd_id,
146 session->id,
147 session->chunk_being_archived,
148 &exists_status);
149 if (ret) {
150 ERR("Error occurred while checking rotation status on consumer daemon");
151 goto end;
152 }
153
154 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
155 chunk_exists_on_peer = true;
156 goto end;
157 }
158 }
159
160 skip_ust:
161 if (!session->kernel_session) {
162 goto skip_kernel;
163 }
164
165 cds_lfht_for_each_entry (
166 session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
167 const lttng::pthread::lock_guard socket_lock(*socket->lock);
168
169 relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
170 -1ULL :
171 session->kernel_session->consumer->net_seq_index;
172
173 ret = consumer_trace_chunk_exists(socket,
174 relayd_id,
175 session->id,
176 session->chunk_being_archived,
177 &exists_status);
178 if (ret) {
179 ERR("Error occurred while checking rotation status on consumer daemon");
180 goto end;
181 }
182
183 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
184 chunk_exists_on_peer = true;
185 goto end;
186 }
187 }
188 skip_kernel:
189 end:
190
191 if (!chunk_exists_on_peer) {
192 uint64_t chunk_being_archived_id;
193
194 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
195 &chunk_being_archived_id);
196 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
197 DBG("Rotation of trace archive %" PRIu64
198 " of session \"%s\" is complete on all consumers",
199 chunk_being_archived_id,
200 session->name);
201 }
202
203 _rotation_completed = !chunk_exists_on_peer;
204 if (ret) {
205 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
206 if (ret) {
207 ERR("Failed to reset rotation state of session \"%s\"", session->name);
208 }
209 }
210 }
211
212 /*
213 * Check if the last rotation was completed, called with session lock held.
214 * Should only return non-zero in the event of a fatal error. Doing so will
215 * shutdown the thread.
216 */
217 int check_session_rotation_pending(const ltt_session::locked_ref& session,
218 notification_thread_handle& notification_thread_handle)
219 {
220 int ret;
221 struct lttng_trace_archive_location *location;
222 enum lttng_trace_chunk_status chunk_status;
223 bool rotation_completed = false;
224 const char *archived_chunk_name;
225 uint64_t chunk_being_archived_id;
226
227 if (!session->chunk_being_archived) {
228 ret = 0;
229 goto end;
230 }
231
232 chunk_status =
233 lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
234 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
235
236 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
237 session->name,
238 chunk_being_archived_id);
239
240 /*
241 * The rotation-pending check timer of a session is launched in
242 * one-shot mode. If the rotation is incomplete, the rotation
243 * thread will re-enable the pending-check timer.
244 *
245 * The timer thread can't stop the timer itself since it is involved
246 * in the check for the timer's quiescence.
247 */
248 ret = timer_session_rotation_pending_check_stop(session);
249 if (ret) {
250 goto check_ongoing_rotation;
251 }
252
253 check_session_rotation_pending_on_consumers(session, rotation_completed);
254 if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
255 goto check_ongoing_rotation;
256 }
257
258 /*
259 * Now we can clear the "ONGOING" state in the session. New
260 * rotations can start now.
261 */
262 chunk_status = lttng_trace_chunk_get_name(
263 session->chunk_being_archived, &archived_chunk_name, nullptr);
264 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
265 free(session->last_archived_chunk_name);
266 session->last_archived_chunk_name = strdup(archived_chunk_name);
267 if (!session->last_archived_chunk_name) {
268 PERROR("Failed to duplicate archived chunk name");
269 }
270
271 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
272
273 if (!session->quiet_rotation) {
274 location = session_get_trace_archive_location(session);
275 ret = notification_thread_command_session_rotation_completed(
276 &notification_thread_handle,
277 session->id,
278 session->last_archived_chunk_id.value,
279 location);
280 lttng_trace_archive_location_put(location);
281 if (ret != LTTNG_OK) {
282 ERR("Failed to notify notification thread of completed rotation for session %s",
283 session->name);
284 }
285 }
286
287 ret = 0;
288 check_ongoing_rotation:
289 if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
290 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
291 &chunk_being_archived_id);
292 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
293
294 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
295 chunk_being_archived_id,
296 session->name);
297 ret = timer_session_rotation_pending_check_start(session,
298 DEFAULT_ROTATE_PENDING_TIMER);
299 if (ret) {
300 ERR("Failed to re-enable rotation pending timer");
301 ret = -1;
302 goto end;
303 }
304 }
305
306 end:
307 return ret;
308 }
309
310 /* Call with the session and session_list locks held. */
311 void launch_session_rotation(const ltt_session::locked_ref& session)
312 {
313 int ret;
314
315 DBG_FMT("Launching scheduled time-based rotation: session_name='{}'", session->name);
316
317 ASSERT_SESSION_LIST_LOCKED();
318
319 ret = cmd_rotate_session(
320 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
321 if (ret != LTTNG_OK) {
322 LTTNG_THROW_CTL(fmt::format("Failed to launch session rotation: session_name={}",
323 session->name),
324 static_cast<lttng_error_code>(ret));
325 } else {
326 /* Don't consider errors as fatal. */
327 DBG_FMT("Scheduled time-based rotation aborted session_name=`{}`, error='{}'",
328 session->name,
329 lttng_strerror(ret));
330 }
331 }
332
333 int run_job(const rotation_thread_job& job,
334 const ltt_session::locked_ref& session,
335 notification_thread_handle& notification_thread_handle)
336 {
337 int ret = 0;
338
339 switch (job.type) {
340 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
341 try {
342 launch_session_rotation(session);
343 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
344 session->name);
345 } catch (const lttng::ctl::error& ctl_ex) {
346 /* Don't consider errors as fatal. */
347 DBG("Scheduled time-based rotation aborted for session %s: %s",
348 session->name,
349 lttng_strerror(ctl_ex.code()));
350 }
351 break;
352 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
353 ret = check_session_rotation_pending(session, notification_thread_handle);
354 break;
355 default:
356 abort();
357 }
358
359 return ret;
360 }
361
362 bool shutdown_rotation_thread(void *thread_data)
363 {
364 auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
365
366 return handle->shutdown();
367 }
368 } /* namespace */
369
370 ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
371 {
372 auto queue = zmalloc<ls::rotation_thread_timer_queue>();
373 if (!queue) {
374 PERROR("Failed to allocate timer rotate queue");
375 goto end;
376 }
377
378 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
379 CDS_INIT_LIST_HEAD(&queue->list);
380 pthread_mutex_init(&queue->lock, nullptr);
381 end:
382 return queue;
383 }
384
385 void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
386 {
387 if (!queue) {
388 return;
389 }
390
391 lttng_pipe_destroy(queue->event_pipe);
392
393 {
394 const lttng::pthread::lock_guard queue_lock(queue->lock);
395
396 LTTNG_ASSERT(cds_list_empty(&queue->list));
397 }
398
399 pthread_mutex_destroy(&queue->lock);
400 free(queue);
401 }
402
403 ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
404 notification_thread_handle& notification_thread_handle) :
405 _rotation_timer_queue(rotation_timer_queue),
406 _notification_thread_handle(notification_thread_handle)
407 {
408 _quit_pipe.reset([]() {
409 auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
410 if (!raw_pipe) {
411 LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
412 }
413
414 return raw_pipe;
415 }());
416
417 _notification_channel.reset([]() {
418 auto channel = lttng_notification_channel_create(
419 lttng_session_daemon_notification_endpoint);
420 if (!channel) {
421 LTTNG_THROW_ERROR(
422 "Failed to create notification channel of rotation thread");
423 }
424
425 return channel;
426 }());
427
428 lttng_poll_init(&_events);
429
430 /*
431 * Create pollset with size 4:
432 * - rotation thread quit pipe,
433 * - rotation thread timer queue pipe,
434 * - notification channel sock,
435 * - subscribtion change event fd
436 */
437 if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
438 LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
439 }
440
441 if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
442 LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
443 }
444
445 if (lttng_poll_add(&_events,
446 lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
447 LPOLLIN) < 0) {
448 LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
449 }
450
451 if (lttng_poll_add(&_events,
452 _notification_channel_subscribtion_change_eventfd.fd(),
453 LPOLLIN) < 0) {
454 LTTNG_THROW_ERROR(
455 "Failed to add rotation thread notification channel subscription change eventfd to poll set");
456 }
457
458 if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
459 LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
460 }
461 }
462
463 ls::rotation_thread::~rotation_thread()
464 {
465 lttng_poll_clean(&_events);
466 }
467
468 void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
469 ls::rotation_thread_job_type job_type,
470 ltt_session *session)
471 {
472 const char dummy = '!';
473 struct rotation_thread_job *job = nullptr;
474 const char *job_type_str = get_job_type_str(job_type);
475 const lttng::pthread::lock_guard queue_lock(queue->lock);
476
477 if (timer_job_exists(queue, job_type, session)) {
478 /*
479 * This timer job is already pending, we don't need to add
480 * it.
481 */
482 return;
483 }
484
485 job = zmalloc<rotation_thread_job>();
486 if (!job) {
487 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
488 job_type_str,
489 session->name);
490 return;
491 }
492
493 /* No reason for this to fail as the caller must hold a reference. */
494 (void) session_get(session);
495
496 job->session = session;
497 job->type = job_type;
498 cds_list_add_tail(&job->head, &queue->list);
499
500 const int write_ret =
501 lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
502 if (write_ret < 0) {
503 /*
504 * We do not want to block in the timer handler, the job has
505 * been enqueued in the list, the wakeup pipe is probably full,
506 * the job will be processed when the rotation_thread catches
507 * up.
508 */
509 DIAGNOSTIC_PUSH
510 DIAGNOSTIC_IGNORE_LOGICAL_OP
511 if (errno == EAGAIN || errno == EWOULDBLOCK) {
512 DIAGNOSTIC_POP
513 /*
514 * Not an error, but would be surprising and indicate
515 * that the rotation thread can't keep up with the
516 * current load.
517 */
518 DBG("Wake-up pipe of rotation thread job queue is full");
519 return;
520 }
521
522 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
523 job_type_str,
524 session->name);
525 return;
526 }
527 }
528
529 void ls::rotation_thread::_handle_job_queue()
530 {
531 for (;;) {
532 rotation_thread_job::uptr job;
533
534 {
535 /* Take the queue lock only to pop an element from the list. */
536 const lttng::pthread::lock_guard rotation_timer_queue_lock(
537 _rotation_timer_queue.lock);
538 if (cds_list_empty(&_rotation_timer_queue.list)) {
539 break;
540 }
541
542 job.reset(cds_list_first_entry(
543 &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
544 cds_list_del(&job->head);
545 }
546
547 const auto list_lock = lttng::sessiond::lock_session_list();
548
549 /* locked_ref will unlock the session and release the ref held by the job. */
550 session_lock(job->session);
551 auto session = ltt_session::make_locked_ref(*job->session);
552
553 if (run_job(*job, session, _notification_thread_handle)) {
554 return;
555 }
556 }
557 }
558
559 void ls::rotation_thread::_handle_notification(const lttng_notification& notification)
560 {
561 int ret = 0;
562 const char *condition_session_name = nullptr;
563 enum lttng_condition_status condition_status;
564 enum lttng_evaluation_status evaluation_status;
565 uint64_t consumed;
566 auto *condition = lttng_notification_get_const_condition(&notification);
567 auto *evaluation = lttng_notification_get_const_evaluation(&notification);
568 const auto condition_type = lttng_condition_get_type(condition);
569
570 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
571 LTTNG_THROW_ERROR("Unexpected condition type");
572 }
573
574 /* Fetch info to test. */
575 condition_status = lttng_condition_session_consumed_size_get_session_name(
576 condition, &condition_session_name);
577 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
578 LTTNG_THROW_ERROR("Session name could not be fetched from notification");
579 }
580
581 evaluation_status =
582 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
583 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
584 LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
585 }
586
587 DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
588 condition_session_name,
589 consumed);
590
591 /*
592 * Mind the order of the declaration of list_lock vs session:
593 * the session list lock must always be released _after_ the release of
594 * a session's reference (the destruction of a ref/locked_ref) to ensure
595 * since the reference's release may unpublish the session from the list of
596 * sessions.
597 */
598 const auto list_lock = lttng::sessiond::lock_session_list();
599 try {
600 const auto session = ltt_session::find_locked_session(condition_session_name);
601
602 if (!lttng_trigger_is_equal(session->rotate_trigger,
603 lttng_notification_get_const_trigger(&notification))) {
604 DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
605 return;
606 }
607
608 unsubscribe_session_consumed_size_rotation(*session);
609
610 ret = cmd_rotate_session(
611 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
612 if (ret != LTTNG_OK) {
613 switch (ret) {
614 case LTTNG_OK:
615 break;
616 case -LTTNG_ERR_ROTATION_PENDING:
617 DBG("Rotate already pending, subscribe to the next threshold value");
618 break;
619 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
620 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
621 break;
622 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
623 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
624 break;
625 default:
626 LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
627 static_cast<lttng_error_code>(-ret));
628 }
629 }
630
631 subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
632 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
633 DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
634 lttng_condition_type_str(condition_type),
635 condition_session_name);
636 /*
637 * Not a fatal error: a session can be destroyed before we get
638 * the chance to handle the notification.
639 */
640 return;
641 }
642 }
643
644 void ls::rotation_thread::_handle_notification_channel_activity()
645 {
646 bool notification_pending = true;
647
648 /*
649 * A notification channel may have multiple notifications queued-up internally in
650 * its buffers. This is because a notification channel multiplexes command replies
651 * and notifications. The current protocol specifies that multiple notifications can be
652 * received before the reply to a command.
653 *
654 * In such cases, the notification channel client implementation internally queues them and
655 * provides them on the next calls to lttng_notification_channel_get_next_notification().
656 * This is correct with respect to the public API, which is intended to be used in "blocking
657 * mode".
658 *
659 * However, this internal user relies on poll/epoll to wake-up when data is available
660 * on the notification channel's socket. As such, it can't assume that a wake-up means only
661 * one notification is available for consumption since many of them may have been queued in
662 * the channel's internal buffers.
663 */
664 while (notification_pending) {
665 const auto pending_status = lttng_notification_channel_has_pending_notification(
666 _notification_channel.get(), &notification_pending);
667 if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
668 LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
669 }
670
671 if (!notification_pending) {
672 return;
673 }
674
675 /* Receive the next notification. */
676 lttng_notification::uptr notification;
677 enum lttng_notification_channel_status next_notification_status;
678
679 {
680 struct lttng_notification *raw_notification_ptr;
681
682 next_notification_status = lttng_notification_channel_get_next_notification(
683 _notification_channel.get(), &raw_notification_ptr);
684 notification.reset(raw_notification_ptr);
685 }
686
687 switch (next_notification_status) {
688 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
689 break;
690 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
691 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
692 return;
693 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
694 LTTNG_THROW_ERROR("Notification channel was closed");
695 default:
696 /* Unhandled conditions / errors. */
697 LTTNG_THROW_ERROR("Unknown notification channel status");
698 }
699
700 _handle_notification(*notification);
701 }
702 }
703
704 void ls::rotation_thread::_thread_function() noexcept
705 {
706 DBG("Started rotation thread");
707
708 try {
709 _run();
710 } catch (const std::exception& e) {
711 ERR_FMT("Fatal rotation thread error: {}", e.what());
712 }
713
714 DBG("Thread exit");
715 }
716
717 void ls::rotation_thread::_run()
718 {
719 rcu_register_thread();
720 const auto unregister_rcu_thread =
721 lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
722
723 rcu_thread_online();
724 const auto offline_rcu_thread =
725 lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
726
727 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
728 health_code_update();
729 const auto unregister_health =
730 lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
731
732 const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
733
734 while (true) {
735 health_poll_entry();
736 DBG("Entering poll wait");
737 auto poll_wait_ret = lttng_poll_wait(&_events, -1);
738 DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
739 health_poll_exit();
740 if (poll_wait_ret < 0) {
741 /*
742 * Restart interrupted system call.
743 */
744 if (errno == EINTR) {
745 continue;
746 }
747
748 LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
749 }
750
751 const auto fd_count = poll_wait_ret;
752 for (int i = 0; i < fd_count; i++) {
753 const auto fd = LTTNG_POLL_GETFD(&_events, i);
754 const auto revents = LTTNG_POLL_GETEV(&_events, i);
755
756 DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
757
758 if (revents & LPOLLERR) {
759 LTTNG_THROW_ERROR(lttng::format(
760 "Polling returned an error on fd: fd={}", fd));
761 }
762
763 if (fd == _notification_channel->socket ||
764 fd == _notification_channel_subscribtion_change_eventfd.fd()) {
765 try {
766 _handle_notification_channel_activity();
767 } catch (const lttng::ctl::error& e) {
768 /*
769 * The only non-fatal error (rotation failed), others
770 * are caught at the top-level.
771 */
772 DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
773 e.what());
774 continue;
775 }
776
777 if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
778 _notification_channel_subscribtion_change_eventfd
779 .decrement();
780 }
781 } else {
782 /* Job queue or quit pipe activity. */
783
784 /*
785 * The job queue is serviced if there is
786 * activity on the quit pipe to ensure it is
787 * flushed and all references held in the queue
788 * are released.
789 */
790 _handle_job_queue();
791 if (fd == queue_pipe_fd) {
792 char buf;
793
794 if (lttng_read(fd, &buf, 1) != 1) {
795 LTTNG_THROW_POSIX(
796 lttng::format(
797 "Failed to read from wakeup pipe: fd={}",
798 fd),
799 errno);
800 }
801 } else {
802 DBG("Quit pipe activity");
803 return;
804 }
805 }
806 }
807 }
808 }
809
810 bool ls::rotation_thread::shutdown() const noexcept
811 {
812 const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
813
814 return notify_thread_pipe(write_fd) == 1;
815 }
816
817 void ls::rotation_thread::launch_thread()
818 {
819 auto thread = lttng_thread_create(
820 "Rotation",
821 [](void *ptr) {
822 auto handle = reinterpret_cast<rotation_thread *>(ptr);
823
824 handle->_thread_function();
825 return static_cast<void *>(nullptr);
826 },
827 shutdown_rotation_thread,
828 nullptr,
829 this);
830 if (!thread) {
831 LTTNG_THROW_ERROR("Failed to launch rotation thread");
832 }
833
834 lttng_thread_put(thread);
835 }
836
837 void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
838 std::uint64_t size)
839 {
840 const struct lttng_credentials session_creds = {
841 .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
842 .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
843 };
844
845 ASSERT_LOCKED(session._lock);
846
847 auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
848 lttng_condition_session_consumed_size_create());
849 if (!rotate_condition) {
850 LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
851 }
852
853 auto condition_status =
854 lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
855 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
856 LTTNG_THROW_ERROR(lttng::format(
857 "Could not set session consumed size condition threshold: size={}", size));
858 }
859
860 condition_status = lttng_condition_session_consumed_size_set_session_name(
861 rotate_condition.get(), session.name);
862 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
863 LTTNG_THROW_ERROR(lttng::format(
864 "Could not set session consumed size condition session name: name=`{}`",
865 session.name));
866 }
867
868 auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
869 lttng_action_notify_create());
870 if (!notify_action) {
871 LTTNG_THROW_POSIX("Could not create notify action", errno);
872 }
873
874 LTTNG_ASSERT(!session.rotate_trigger);
875 /* trigger acquires its own reference to condition and action on success. */
876 auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
877 lttng_trigger_create(rotate_condition.get(), notify_action.get()));
878 if (!trigger) {
879 LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
880 }
881
882 /* Ensure this trigger is not visible to external users. */
883 lttng_trigger_set_hidden(trigger.get());
884 lttng_trigger_set_credentials(trigger.get(), &session_creds);
885
886 auto nc_status = lttng_notification_channel_subscribe(_notification_channel.get(),
887 rotate_condition.get());
888 if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
889 LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
890 }
891
892 /*
893 * Ensure any notification queued during the subscription are consumed by queueing an
894 * event.
895 */
896 _notification_channel_subscribtion_change_eventfd.increment();
897
898 const auto register_ret = notification_thread_command_register_trigger(
899 &_notification_thread_handle, trigger.get(), true);
900 if (register_ret != LTTNG_OK) {
901 LTTNG_THROW_CTL(
902 lttng::format(
903 "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
904 session.name,
905 size),
906 register_ret);
907 }
908
909 /* Ownership transferred to the session. */
910 session.rotate_trigger = trigger.release();
911 }
912
913 void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
914 {
915 LTTNG_ASSERT(session.rotate_trigger);
916
917 const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
918 lttng_trigger_put(session.rotate_trigger);
919 session.rotate_trigger = nullptr;
920 });
921
922 const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
923 _notification_channel.get(),
924 lttng_trigger_get_const_condition(session.rotate_trigger));
925 if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
926 LTTNG_THROW_ERROR(lttng::format(
927 "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
928 session.name,
929 static_cast<int>(unsubscribe_status)));
930 }
931
932 /*
933 * Ensure any notification queued during the un-subscription are consumed by queueing an
934 * event.
935 */
936 _notification_channel_subscribtion_change_eventfd.increment();
937
938 const auto unregister_status = notification_thread_command_unregister_trigger(
939 &_notification_thread_handle, session.rotate_trigger);
940 if (unregister_status != LTTNG_OK) {
941 LTTNG_THROW_CTL(
942 lttng::format(
943 "Failed to unregister trigger for automatic size-based rotation: session_name{}",
944 session.name),
945 unregister_status);
946 }
947 }
This page took 0.051567 seconds and 4 git commands to generate.