2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 * SPDX-License-Identifier: GPL-2.0-only
11 #include "health-sessiond.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "notification-thread-commands.hpp"
14 #include "rotation-thread.hpp"
15 #include "session.hpp"
20 #include <common/align.hpp>
21 #include <common/config/session-config.hpp>
22 #include <common/defaults.hpp>
23 #include <common/error.hpp>
24 #include <common/eventfd.hpp>
25 #include <common/exception.hpp>
26 #include <common/file-descriptor.hpp>
27 #include <common/format.hpp>
28 #include <common/futex.hpp>
29 #include <common/hashtable/utils.hpp>
30 #include <common/kernel-ctl/kernel-ctl.hpp>
31 #include <common/locked-reference.hpp>
32 #include <common/make-unique-wrapper.hpp>
33 #include <common/pthread-lock.hpp>
34 #include <common/scope-exit.hpp>
35 #include <common/time.hpp>
36 #include <common/urcu.hpp>
37 #include <common/utils.hpp>
39 #include <lttng/action/action-internal.hpp>
40 #include <lttng/condition/condition-internal.hpp>
41 #include <lttng/location-internal.hpp>
42 #include <lttng/notification/channel-internal.hpp>
43 #include <lttng/notification/notification-internal.hpp>
44 #include <lttng/rotate-internal.hpp>
45 #include <lttng/trigger/trigger.h>
51 #include <sys/eventfd.h>
55 #include <urcu/list.h>
57 namespace ls
= lttng::sessiond
;
60 * The timer thread enqueues jobs and wakes up the rotation thread.
61 * When the rotation thread wakes up, it empties the queue.
63 struct ls::rotation_thread_timer_queue
{
64 struct lttng_pipe
*event_pipe
;
65 struct cds_list_head list
;
70 struct rotation_thread_job
{
72 std::unique_ptr
<rotation_thread_job
,
73 lttng::memory::create_deleter_class
<rotation_thread_job
,
74 lttng::memory::free
>::deleter
>;
76 enum ls::rotation_thread_job_type type
;
77 struct ltt_session
*session
;
78 /* List member in struct rotation_thread_timer_queue. */
79 struct cds_list_head head
;
82 const char *get_job_type_str(enum ls::rotation_thread_job_type job_type
)
85 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION
:
86 return "CHECK_PENDING_ROTATION";
87 case ls::rotation_thread_job_type::SCHEDULED_ROTATION
:
88 return "SCHEDULED_ROTATION";
95 * Called with the rotation_thread_timer_queue lock held.
96 * Return true if the same timer job already exists in the queue, false if not.
98 bool timer_job_exists(const ls::rotation_thread_timer_queue
*queue
,
99 ls::rotation_thread_job_type job_type
,
100 ltt_session
*session
)
103 struct rotation_thread_job
*job
;
105 cds_list_for_each_entry (job
, &queue
->list
, head
) {
106 if (job
->session
== session
&& job
->type
== job_type
) {
115 void check_session_rotation_pending_on_consumers(const ltt_session::locked_ref
& session
,
116 bool& _rotation_completed
)
119 enum consumer_trace_chunk_exists_status exists_status
;
121 bool chunk_exists_on_peer
= false;
122 enum lttng_trace_chunk_status chunk_status
;
123 const lttng::urcu::read_lock_guard read_lock
;
125 LTTNG_ASSERT(session
->chunk_being_archived
);
128 * Check for a local pending rotation on all consumers (32-bit
129 * user space, 64-bit user space, and kernel).
131 if (!session
->ust_session
) {
135 for (auto *socket
: lttng::urcu::lfht_iteration_adapter
<consumer_socket
,
136 decltype(consumer_socket::node
),
137 &consumer_socket::node
>(
138 *session
->ust_session
->consumer
->socks
->ht
)) {
139 relayd_id
= session
->ust_session
->consumer
->type
== CONSUMER_DST_LOCAL
?
141 session
->ust_session
->consumer
->net_seq_index
;
143 const lttng::pthread::lock_guard
socket_lock(*socket
->lock
);
144 ret
= consumer_trace_chunk_exists(socket
,
147 session
->chunk_being_archived
,
150 ERR("Error occurred while checking rotation status on consumer daemon");
154 if (exists_status
!= CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK
) {
155 chunk_exists_on_peer
= true;
161 if (!session
->kernel_session
) {
165 for (auto *socket
: lttng::urcu::lfht_iteration_adapter
<consumer_socket
,
166 decltype(consumer_socket::node
),
167 &consumer_socket::node
>(
168 *session
->kernel_session
->consumer
->socks
->ht
)) {
169 const lttng::pthread::lock_guard
socket_lock(*socket
->lock
);
171 relayd_id
= session
->kernel_session
->consumer
->type
== CONSUMER_DST_LOCAL
?
173 session
->kernel_session
->consumer
->net_seq_index
;
175 ret
= consumer_trace_chunk_exists(socket
,
178 session
->chunk_being_archived
,
181 ERR("Error occurred while checking rotation status on consumer daemon");
185 if (exists_status
!= CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK
) {
186 chunk_exists_on_peer
= true;
193 if (!chunk_exists_on_peer
) {
194 uint64_t chunk_being_archived_id
;
196 chunk_status
= lttng_trace_chunk_get_id(session
->chunk_being_archived
,
197 &chunk_being_archived_id
);
198 LTTNG_ASSERT(chunk_status
== LTTNG_TRACE_CHUNK_STATUS_OK
);
199 DBG("Rotation of trace archive %" PRIu64
200 " of session \"%s\" is complete on all consumers",
201 chunk_being_archived_id
,
205 _rotation_completed
= !chunk_exists_on_peer
;
207 ret
= session_reset_rotation_state(session
, LTTNG_ROTATION_STATE_ERROR
);
209 ERR("Failed to reset rotation state of session \"%s\"", session
->name
);
215 * Check if the last rotation was completed, called with session lock held.
216 * Should only return non-zero in the event of a fatal error. Doing so will
217 * shutdown the thread.
219 int check_session_rotation_pending(const ltt_session::locked_ref
& session
,
220 notification_thread_handle
& notification_thread_handle
)
223 struct lttng_trace_archive_location
*location
;
224 enum lttng_trace_chunk_status chunk_status
;
225 bool rotation_completed
= false;
226 const char *archived_chunk_name
;
227 uint64_t chunk_being_archived_id
;
229 if (!session
->chunk_being_archived
) {
235 lttng_trace_chunk_get_id(session
->chunk_being_archived
, &chunk_being_archived_id
);
236 LTTNG_ASSERT(chunk_status
== LTTNG_TRACE_CHUNK_STATUS_OK
);
238 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64
,
240 chunk_being_archived_id
);
243 * The rotation-pending check timer of a session is launched in
244 * one-shot mode. If the rotation is incomplete, the rotation
245 * thread will re-enable the pending-check timer.
247 * The timer thread can't stop the timer itself since it is involved
248 * in the check for the timer's quiescence.
250 ret
= timer_session_rotation_pending_check_stop(session
);
252 goto check_ongoing_rotation
;
255 check_session_rotation_pending_on_consumers(session
, rotation_completed
);
256 if (!rotation_completed
|| session
->rotation_state
== LTTNG_ROTATION_STATE_ERROR
) {
257 goto check_ongoing_rotation
;
261 * Now we can clear the "ONGOING" state in the session. New
262 * rotations can start now.
264 chunk_status
= lttng_trace_chunk_get_name(
265 session
->chunk_being_archived
, &archived_chunk_name
, nullptr);
266 LTTNG_ASSERT(chunk_status
== LTTNG_TRACE_CHUNK_STATUS_OK
);
267 free(session
->last_archived_chunk_name
);
268 session
->last_archived_chunk_name
= strdup(archived_chunk_name
);
269 if (!session
->last_archived_chunk_name
) {
270 PERROR("Failed to duplicate archived chunk name");
273 session_reset_rotation_state(session
, LTTNG_ROTATION_STATE_COMPLETED
);
275 if (!session
->quiet_rotation
) {
276 location
= session_get_trace_archive_location(session
);
277 ret
= notification_thread_command_session_rotation_completed(
278 ¬ification_thread_handle
,
280 session
->last_archived_chunk_id
.value
,
282 lttng_trace_archive_location_put(location
);
283 if (ret
!= LTTNG_OK
) {
284 ERR("Failed to notify notification thread of completed rotation for session %s",
290 check_ongoing_rotation
:
291 if (session
->rotation_state
== LTTNG_ROTATION_STATE_ONGOING
) {
292 chunk_status
= lttng_trace_chunk_get_id(session
->chunk_being_archived
,
293 &chunk_being_archived_id
);
294 LTTNG_ASSERT(chunk_status
== LTTNG_TRACE_CHUNK_STATUS_OK
);
296 DBG("Rotation of trace archive %" PRIu64
" is still pending for session %s",
297 chunk_being_archived_id
,
299 ret
= timer_session_rotation_pending_check_start(session
,
300 DEFAULT_ROTATE_PENDING_TIMER
);
302 ERR("Failed to re-enable rotation pending timer");
312 /* Call with the session and session_list locks held. */
313 void launch_session_rotation(const ltt_session::locked_ref
& session
)
317 DBG_FMT("Launching scheduled time-based rotation: session_name='{}'", session
->name
);
319 ASSERT_SESSION_LIST_LOCKED();
321 ret
= cmd_rotate_session(
322 session
, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED
);
323 if (ret
!= LTTNG_OK
) {
324 LTTNG_THROW_CTL(fmt::format("Failed to launch session rotation: session_name={}",
326 static_cast<lttng_error_code
>(ret
));
328 /* Don't consider errors as fatal. */
329 DBG_FMT("Scheduled time-based rotation aborted session_name=`{}`, error='{}'",
331 lttng_strerror(ret
));
335 int run_job(const rotation_thread_job
& job
,
336 const ltt_session::locked_ref
& session
,
337 notification_thread_handle
& notification_thread_handle
)
342 case ls::rotation_thread_job_type::SCHEDULED_ROTATION
:
344 launch_session_rotation(session
);
345 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
347 } catch (const lttng::ctl::error
& ctl_ex
) {
348 /* Don't consider errors as fatal. */
349 DBG("Scheduled time-based rotation aborted for session %s: %s",
351 lttng_strerror(ctl_ex
.code()));
354 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION
:
355 ret
= check_session_rotation_pending(session
, notification_thread_handle
);
364 bool shutdown_rotation_thread(void *thread_data
)
366 auto *handle
= reinterpret_cast<const ls::rotation_thread
*>(thread_data
);
368 return handle
->shutdown();
372 ls::rotation_thread_timer_queue
*ls::rotation_thread_timer_queue_create()
374 auto queue
= zmalloc
<ls::rotation_thread_timer_queue
>();
376 PERROR("Failed to allocate timer rotate queue");
380 queue
->event_pipe
= lttng_pipe_open(FD_CLOEXEC
| O_NONBLOCK
);
381 CDS_INIT_LIST_HEAD(&queue
->list
);
382 pthread_mutex_init(&queue
->lock
, nullptr);
387 void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue
*queue
)
393 lttng_pipe_destroy(queue
->event_pipe
);
396 const lttng::pthread::lock_guard
queue_lock(queue
->lock
);
398 LTTNG_ASSERT(cds_list_empty(&queue
->list
));
401 pthread_mutex_destroy(&queue
->lock
);
405 ls::rotation_thread::rotation_thread(rotation_thread_timer_queue
& rotation_timer_queue
,
406 notification_thread_handle
& notification_thread_handle
) :
407 _rotation_timer_queue(rotation_timer_queue
),
408 _notification_thread_handle(notification_thread_handle
)
410 _quit_pipe
.reset([]() {
411 auto raw_pipe
= lttng_pipe_open(FD_CLOEXEC
);
413 LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno
);
419 _notification_channel
.reset([]() {
420 auto channel
= lttng_notification_channel_create(
421 lttng_session_daemon_notification_endpoint
);
424 "Failed to create notification channel of rotation thread");
430 lttng_poll_init(&_events
);
433 * Create pollset with size 4:
434 * - rotation thread quit pipe,
435 * - rotation thread timer queue pipe,
436 * - notification channel sock,
437 * - subscribtion change event fd
439 if (lttng_poll_create(&_events
, 4, LTTNG_CLOEXEC
) < 0) {
440 LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
443 if (lttng_poll_add(&_events
, lttng_pipe_get_readfd(_quit_pipe
.get()), LPOLLIN
) < 0) {
444 LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
447 if (lttng_poll_add(&_events
,
448 lttng_pipe_get_readfd(_rotation_timer_queue
.event_pipe
),
450 LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
453 if (lttng_poll_add(&_events
,
454 _notification_channel_subscribtion_change_eventfd
.fd(),
457 "Failed to add rotation thread notification channel subscription change eventfd to poll set");
460 if (lttng_poll_add(&_events
, _notification_channel
->socket
, LPOLLIN
) < 0) {
461 LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
465 ls::rotation_thread::~rotation_thread()
467 lttng_poll_clean(&_events
);
470 void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue
*queue
,
471 ls::rotation_thread_job_type job_type
,
472 ltt_session
*session
)
474 const char dummy
= '!';
475 struct rotation_thread_job
*job
= nullptr;
476 const char *job_type_str
= get_job_type_str(job_type
);
477 const lttng::pthread::lock_guard
queue_lock(queue
->lock
);
479 if (timer_job_exists(queue
, job_type
, session
)) {
481 * This timer job is already pending, we don't need to add
487 job
= zmalloc
<rotation_thread_job
>();
489 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
495 /* No reason for this to fail as the caller must hold a reference. */
496 (void) session_get(session
);
498 job
->session
= session
;
499 job
->type
= job_type
;
500 cds_list_add_tail(&job
->head
, &queue
->list
);
502 const int write_ret
=
503 lttng_write(lttng_pipe_get_writefd(queue
->event_pipe
), &dummy
, sizeof(dummy
));
506 * We do not want to block in the timer handler, the job has
507 * been enqueued in the list, the wakeup pipe is probably full,
508 * the job will be processed when the rotation_thread catches
512 DIAGNOSTIC_IGNORE_LOGICAL_OP
513 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
516 * Not an error, but would be surprising and indicate
517 * that the rotation thread can't keep up with the
520 DBG("Wake-up pipe of rotation thread job queue is full");
524 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
531 void ls::rotation_thread::_handle_job_queue()
534 rotation_thread_job::uptr job
;
537 /* Take the queue lock only to pop an element from the list. */
538 const lttng::pthread::lock_guard
rotation_timer_queue_lock(
539 _rotation_timer_queue
.lock
);
540 if (cds_list_empty(&_rotation_timer_queue
.list
)) {
544 job
.reset(cds_list_first_entry(
545 &_rotation_timer_queue
.list
, typeof(rotation_thread_job
), head
));
546 cds_list_del(&job
->head
);
549 const auto list_lock
= lttng::sessiond::lock_session_list();
551 /* locked_ref will unlock the session and release the ref held by the job. */
552 session_lock(job
->session
);
553 auto session
= ltt_session::make_locked_ref(*job
->session
);
555 if (run_job(*job
, session
, _notification_thread_handle
)) {
561 void ls::rotation_thread::_handle_notification(const lttng_notification
& notification
)
564 const char *condition_session_name
= nullptr;
565 enum lttng_condition_status condition_status
;
566 enum lttng_evaluation_status evaluation_status
;
568 auto *condition
= lttng_notification_get_const_condition(¬ification
);
569 auto *evaluation
= lttng_notification_get_const_evaluation(¬ification
);
570 const auto condition_type
= lttng_condition_get_type(condition
);
572 if (condition_type
!= LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
) {
573 LTTNG_THROW_ERROR("Unexpected condition type");
576 /* Fetch info to test. */
577 condition_status
= lttng_condition_session_consumed_size_get_session_name(
578 condition
, &condition_session_name
);
579 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
580 LTTNG_THROW_ERROR("Session name could not be fetched from notification");
584 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation
, &consumed
);
585 if (evaluation_status
!= LTTNG_EVALUATION_STATUS_OK
) {
586 LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
589 DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
590 condition_session_name
,
594 * Mind the order of the declaration of list_lock vs session:
595 * the session list lock must always be released _after_ the release of
596 * a session's reference (the destruction of a ref/locked_ref) to ensure
597 * since the reference's release may unpublish the session from the list of
600 const auto list_lock
= lttng::sessiond::lock_session_list();
602 const auto session
= ltt_session::find_locked_session(condition_session_name
);
604 if (!lttng_trigger_is_equal(session
->rotate_trigger
,
605 lttng_notification_get_const_trigger(¬ification
))) {
606 DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
610 unsubscribe_session_consumed_size_rotation(*session
);
612 ret
= cmd_rotate_session(
613 session
, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED
);
614 if (ret
!= LTTNG_OK
) {
618 case -LTTNG_ERR_ROTATION_PENDING
:
619 DBG("Rotate already pending, subscribe to the next threshold value");
621 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP
:
622 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
624 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR
:
625 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
628 LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
629 static_cast<lttng_error_code
>(-ret
));
633 subscribe_session_consumed_size_rotation(*session
, consumed
+ session
->rotate_size
);
634 } catch (const lttng::sessiond::exceptions::session_not_found_error
& ex
) {
635 DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
636 lttng_condition_type_str(condition_type
),
637 condition_session_name
);
639 * Not a fatal error: a session can be destroyed before we get
640 * the chance to handle the notification.
646 void ls::rotation_thread::_handle_notification_channel_activity()
648 bool notification_pending
= true;
651 * A notification channel may have multiple notifications queued-up internally in
652 * its buffers. This is because a notification channel multiplexes command replies
653 * and notifications. The current protocol specifies that multiple notifications can be
654 * received before the reply to a command.
656 * In such cases, the notification channel client implementation internally queues them and
657 * provides them on the next calls to lttng_notification_channel_get_next_notification().
658 * This is correct with respect to the public API, which is intended to be used in "blocking
661 * However, this internal user relies on poll/epoll to wake-up when data is available
662 * on the notification channel's socket. As such, it can't assume that a wake-up means only
663 * one notification is available for consumption since many of them may have been queued in
664 * the channel's internal buffers.
666 while (notification_pending
) {
667 const auto pending_status
= lttng_notification_channel_has_pending_notification(
668 _notification_channel
.get(), ¬ification_pending
);
669 if (pending_status
!= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
) {
670 LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
673 if (!notification_pending
) {
677 /* Receive the next notification. */
678 lttng_notification::uptr notification
;
679 enum lttng_notification_channel_status next_notification_status
;
682 struct lttng_notification
*raw_notification_ptr
;
684 next_notification_status
= lttng_notification_channel_get_next_notification(
685 _notification_channel
.get(), &raw_notification_ptr
);
686 notification
.reset(raw_notification_ptr
);
689 switch (next_notification_status
) {
690 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
:
692 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
:
693 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
695 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
:
696 LTTNG_THROW_ERROR("Notification channel was closed");
698 /* Unhandled conditions / errors. */
699 LTTNG_THROW_ERROR("Unknown notification channel status");
702 _handle_notification(*notification
);
706 void ls::rotation_thread::_thread_function() noexcept
708 DBG("Started rotation thread");
712 } catch (const std::exception
& e
) {
713 ERR_FMT("Fatal rotation thread error: {}", e
.what());
719 void ls::rotation_thread::_run()
721 rcu_register_thread();
722 const auto unregister_rcu_thread
=
723 lttng::make_scope_exit([]() noexcept
{ rcu_unregister_thread(); });
726 const auto offline_rcu_thread
=
727 lttng::make_scope_exit([]() noexcept
{ rcu_thread_offline(); });
729 health_register(the_health_sessiond
, HEALTH_SESSIOND_TYPE_ROTATION
);
730 health_code_update();
731 const auto unregister_health
=
732 lttng::make_scope_exit([]() noexcept
{ health_unregister(the_health_sessiond
); });
734 const auto queue_pipe_fd
= lttng_pipe_get_readfd(_rotation_timer_queue
.event_pipe
);
738 DBG("Entering poll wait");
739 auto poll_wait_ret
= lttng_poll_wait(&_events
, -1);
740 DBG_FMT("Poll wait returned: ret={}", poll_wait_ret
);
742 if (poll_wait_ret
< 0) {
744 * Restart interrupted system call.
746 if (errno
== EINTR
) {
750 LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno
);
753 const auto fd_count
= poll_wait_ret
;
754 for (int i
= 0; i
< fd_count
; i
++) {
755 const auto fd
= LTTNG_POLL_GETFD(&_events
, i
);
756 const auto revents
= LTTNG_POLL_GETEV(&_events
, i
);
758 DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd
, revents
);
760 if (revents
& LPOLLERR
) {
761 LTTNG_THROW_ERROR(lttng::format(
762 "Polling returned an error on fd: fd={}", fd
));
765 if (fd
== _notification_channel
->socket
||
766 fd
== _notification_channel_subscribtion_change_eventfd
.fd()) {
768 _handle_notification_channel_activity();
769 } catch (const lttng::ctl::error
& e
) {
771 * The only non-fatal error (rotation failed), others
772 * are caught at the top-level.
774 DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
779 if (fd
== _notification_channel_subscribtion_change_eventfd
.fd()) {
780 _notification_channel_subscribtion_change_eventfd
784 /* Job queue or quit pipe activity. */
787 * The job queue is serviced if there is
788 * activity on the quit pipe to ensure it is
789 * flushed and all references held in the queue
793 if (fd
== queue_pipe_fd
) {
796 if (lttng_read(fd
, &buf
, 1) != 1) {
799 "Failed to read from wakeup pipe: fd={}",
804 DBG("Quit pipe activity");
812 bool ls::rotation_thread::shutdown() const noexcept
814 const int write_fd
= lttng_pipe_get_writefd(_quit_pipe
.get());
816 return notify_thread_pipe(write_fd
) == 1;
819 void ls::rotation_thread::launch_thread()
821 auto thread
= lttng_thread_create(
824 auto handle
= reinterpret_cast<rotation_thread
*>(ptr
);
826 handle
->_thread_function();
827 return static_cast<void *>(nullptr);
829 shutdown_rotation_thread
,
833 LTTNG_THROW_ERROR("Failed to launch rotation thread");
836 lttng_thread_put(thread
);
839 void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session
& session
,
842 const struct lttng_credentials session_creds
= {
843 .uid
= LTTNG_OPTIONAL_INIT_VALUE(session
.uid
),
844 .gid
= LTTNG_OPTIONAL_INIT_VALUE(session
.gid
),
847 ASSERT_LOCKED(session
._lock
);
849 auto rotate_condition
= lttng::make_unique_wrapper
<lttng_condition
, lttng_condition_put
>(
850 lttng_condition_session_consumed_size_create());
851 if (!rotate_condition
) {
852 LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno
);
855 auto condition_status
=
856 lttng_condition_session_consumed_size_set_threshold(rotate_condition
.get(), size
);
857 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
858 LTTNG_THROW_ERROR(lttng::format(
859 "Could not set session consumed size condition threshold: size={}", size
));
862 condition_status
= lttng_condition_session_consumed_size_set_session_name(
863 rotate_condition
.get(), session
.name
);
864 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
865 LTTNG_THROW_ERROR(lttng::format(
866 "Could not set session consumed size condition session name: name=`{}`",
870 auto notify_action
= lttng::make_unique_wrapper
<lttng_action
, lttng_action_put
>(
871 lttng_action_notify_create());
872 if (!notify_action
) {
873 LTTNG_THROW_POSIX("Could not create notify action", errno
);
876 LTTNG_ASSERT(!session
.rotate_trigger
);
877 /* trigger acquires its own reference to condition and action on success. */
878 auto trigger
= lttng::make_unique_wrapper
<lttng_trigger
, lttng_trigger_put
>(
879 lttng_trigger_create(rotate_condition
.get(), notify_action
.get()));
881 LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno
);
884 /* Ensure this trigger is not visible to external users. */
885 lttng_trigger_set_hidden(trigger
.get());
886 lttng_trigger_set_credentials(trigger
.get(), &session_creds
);
888 auto nc_status
= lttng_notification_channel_subscribe(_notification_channel
.get(),
889 rotate_condition
.get());
890 if (nc_status
!= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
) {
891 LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
895 * Ensure any notification queued during the subscription are consumed by queueing an
898 _notification_channel_subscribtion_change_eventfd
.increment();
900 const auto register_ret
= notification_thread_command_register_trigger(
901 &_notification_thread_handle
, trigger
.get(), true);
902 if (register_ret
!= LTTNG_OK
) {
905 "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
911 /* Ownership transferred to the session. */
912 session
.rotate_trigger
= trigger
.release();
915 void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session
& session
)
917 LTTNG_ASSERT(session
.rotate_trigger
);
919 const auto remove_session_trigger
= lttng::make_scope_exit([&session
]() noexcept
{
920 lttng_trigger_put(session
.rotate_trigger
);
921 session
.rotate_trigger
= nullptr;
924 const auto unsubscribe_status
= lttng_notification_channel_unsubscribe(
925 _notification_channel
.get(),
926 lttng_trigger_get_const_condition(session
.rotate_trigger
));
927 if (unsubscribe_status
!= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
) {
928 LTTNG_THROW_ERROR(lttng::format(
929 "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
931 static_cast<int>(unsubscribe_status
)));
935 * Ensure any notification queued during the un-subscription are consumed by queueing an
938 _notification_channel_subscribtion_change_eventfd
.increment();
940 const auto unregister_status
= notification_thread_command_unregister_trigger(
941 &_notification_thread_handle
, session
.rotate_trigger
);
942 if (unregister_status
!= LTTNG_OK
) {
945 "Failed to unregister trigger for automatic size-based rotation: session_name{}",