Cleanup: rotation-thread: enforce conding standard following fix
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 4 Apr 2023 17:45:24 +0000 (13:45 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 12 Apr 2023 14:48:33 +0000 (10:48 -0400)
A fix introducing
rotate_notification_channel_subscription_change_eventfd didn't follow
the current coding standard so as to make it easier to backport to the
stable branches.

Clean-up the affected code to follow the current standard:
  - Replace the use of a raw eventfd to use the eventfd utility,
  - Subscribe and unsubscribe functions made use of global variables to
    communicate with the rotation thread: replace that with the
    rotation_thread class to centralize the interface,
  - Make the code using eventfd exception-safe (automatic memory
    management, use of various RAII utils),
  - Replacement of non-null pointers by references.

Change-Id: I7e363e21b829fd0939a336aca2570fdbcc346967
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
25 files changed:
include/lttng/notification/channel-internal.hpp
include/lttng/notification/notification-internal.hpp
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/client.cpp
src/bin/lttng-sessiond/cmd.cpp
src/bin/lttng-sessiond/cmd.hpp
src/bin/lttng-sessiond/globals.cpp
src/bin/lttng-sessiond/kernel.cpp
src/bin/lttng-sessiond/lttng-sessiond.hpp
src/bin/lttng-sessiond/main.cpp
src/bin/lttng-sessiond/rotate.cpp [deleted file]
src/bin/lttng-sessiond/rotate.hpp [deleted file]
src/bin/lttng-sessiond/rotation-thread.cpp
src/bin/lttng-sessiond/rotation-thread.hpp
src/bin/lttng-sessiond/session.cpp
src/bin/lttng-sessiond/session.hpp
src/bin/lttng-sessiond/timer.cpp
src/bin/lttng-sessiond/timer.hpp
src/bin/lttng-sessiond/ust-app.cpp
src/common/error.hpp
src/common/exception.cpp
src/common/exception.hpp
src/common/file-descriptor.cpp
src/common/file-descriptor.hpp
src/common/pipe.hpp

index 3364704eed12ccad98f410876970df69d92c3edd..44fd9cf1637c8e2c217964c1e58ec2f2f984e57b 100644 (file)
@@ -8,12 +8,15 @@
 #ifndef LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H
 #define LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H
 
-#include <lttng/notification/channel.h>
 #include <common/macros.hpp>
+#include <common/make-unique-wrapper.hpp>
 #include <common/payload.hpp>
-#include <stdint.h>
-#include <stdbool.h>
+
+#include <lttng/notification/channel.h>
+
 #include <pthread.h>
+#include <stdbool.h>
+#include <stdint.h>
 #include <urcu/list.h>
 
 /*
@@ -82,6 +85,10 @@ struct pending_notification {
  * in the pending_notifications list.
  */
 struct lttng_notification_channel {
+       using uptr = std::unique_ptr<
+               lttng_notification_channel,
+               lttng::details::create_unique_class<lttng_notification_channel, lttng_notification_channel_destroy>::deleter>;
+
        pthread_mutex_t lock;
        int socket;
        struct {
index 9557c38b88cb738e17ead59d3cc0217ee80474ed..9837c359bf104408987d8dfec44be5ea9439c7b0 100644 (file)
@@ -8,16 +8,25 @@
 #ifndef LTTNG_NOTIFICATION_INTERNAL_H
 #define LTTNG_NOTIFICATION_INTERNAL_H
 
-#include <lttng/notification/notification.h>
 #include <common/macros.hpp>
-#include <stdint.h>
+#include <common/make-unique-wrapper.hpp>
+
+#include <lttng/notification/notification.h>
+
+#include <memory>
 #include <stdbool.h>
+#include <stdint.h>
 #include <sys/types.h>
 
 struct lttng_payload;
 struct lttng_payload_view;
 
 struct lttng_notification {
+       using uptr = std::unique_ptr<
+               lttng_notification,
+               lttng::details::create_unique_class<lttng_notification,
+                                                   lttng_notification_destroy>::deleter>;
+
        struct lttng_trigger *trigger;
        struct lttng_evaluation *evaluation;
 };
index 7d618afe21b9ebb12a233cbddac52e46204d51de..e9b6ac1f68639ca7dbfc215381f58956a378b179 100644 (file)
@@ -39,7 +39,6 @@ liblttng_sessiond_common_la_SOURCES = utils.cpp utils.hpp \
                        notification-thread-commands.hpp notification-thread-commands.cpp \
                        notification-thread-events.hpp notification-thread-events.cpp \
                        sessiond-config.hpp sessiond-config.cpp \
-                       rotate.hpp rotate.cpp \
                        rotation-thread.hpp rotation-thread.cpp \
                        timer.cpp timer.hpp \
                        globals.cpp \
index 123d07f857c39b905e8d6bee338cb9f942d4dc74..0680e14fd581eb28817baf0e077659845bded1bf 100644 (file)
@@ -1784,7 +1784,7 @@ skip_domain:
        }
        case LTTCOMM_SESSIOND_COMMAND_DESTROY_SESSION:
        {
-               ret = cmd_destroy_session(cmd_ctx->session, the_notification_thread_handle, sock);
+               ret = cmd_destroy_session(cmd_ctx->session, sock);
                break;
        }
        case LTTCOMM_SESSIOND_COMMAND_LIST_DOMAINS:
@@ -2217,8 +2217,7 @@ skip_domain:
                ret = cmd_rotation_set_schedule(cmd_ctx->session,
                                                set_schedule,
                                                schedule_type,
-                                               value,
-                                               the_notification_thread_handle);
+                                               value);
                if (ret != LTTNG_OK) {
                        goto error;
                }
index 3f066a002d8d585d43cf3401556cc8a3eabca9ac..413bb6500353ba6e72536e97513f66ef16e00cd8 100644 (file)
@@ -22,7 +22,6 @@
 #include "lttng-syscall.hpp"
 #include "notification-thread-commands.hpp"
 #include "notification-thread.hpp"
-#include "rotate.hpp"
 #include "rotation-thread.hpp"
 #include "session.hpp"
 #include "timer.hpp"
@@ -3392,7 +3391,6 @@ error:
  * Called with session lock held.
  */
 int cmd_destroy_session(struct ltt_session *session,
-                       struct notification_thread_handle *notification_thread_handle,
                        int *sock_fd)
 {
        int ret;
@@ -3435,7 +3433,15 @@ int cmd_destroy_session(struct ltt_session *session,
        }
 
        if (session->rotate_size) {
-               unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
+               try {
+                       the_rotation_thread_handle->unsubscribe_session_consumed_size_rotation(
+                               *session);
+               } catch (std::exception& e) {
+                       /* Continue the destruction of the session anyway. */
+                       ERR("Failed to unsubscribe rotation thread notification channel from consumed size condition during session destruction: %s",
+                           e.what());
+               }
+
                session->rotate_size = 0;
        }
 
@@ -5619,7 +5625,7 @@ end:
        ret = (cmd_ret == LTTNG_OK) ? cmd_ret : -((int) cmd_ret);
        return ret;
 error:
-       if (session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR)) {
+       if (session_reset_rotation_state(*session, LTTNG_ROTATION_STATE_ERROR)) {
                ERR("Failed to reset rotation state of session \"%s\"", session->name);
        }
        goto end;
@@ -5786,8 +5792,7 @@ end:
 int cmd_rotation_set_schedule(struct ltt_session *session,
                              bool activate,
                              enum lttng_rotation_schedule_type schedule_type,
-                             uint64_t new_value,
-                             struct notification_thread_handle *notification_thread_handle)
+                             uint64_t new_value)
 {
        int ret;
        uint64_t *parameter_value;
@@ -5889,18 +5894,22 @@ int cmd_rotation_set_schedule(struct ltt_session *session,
                break;
        case LTTNG_ROTATION_SCHEDULE_TYPE_SIZE_THRESHOLD:
                if (activate) {
-                       ret = subscribe_session_consumed_size_rotation(
-                               session, new_value, notification_thread_handle);
-                       if (ret) {
-                               ERR("Failed to enable consumed-size notification in ROTATION_SET_SCHEDULE command");
+                       try {
+                               the_rotation_thread_handle->subscribe_session_consumed_size_rotation(
+                                       *session, new_value);
+                       } catch (std::exception& e) {
+                               ERR("Failed to enable consumed-size notification in ROTATION_SET_SCHEDULE command: %s",
+                                   e.what());
                                ret = LTTNG_ERR_UNK;
                                goto end;
                        }
                } else {
-                       ret = unsubscribe_session_consumed_size_rotation(
-                               session, notification_thread_handle);
-                       if (ret) {
-                               ERR("Failed to disable consumed-size notification in ROTATION_SET_SCHEDULE command");
+                       try {
+                               the_rotation_thread_handle
+                                       ->unsubscribe_session_consumed_size_rotation(*session);
+                       } catch (std::exception& e) {
+                               ERR("Failed to disable consumed-size notification in ROTATION_SET_SCHEDULE command: %s",
+                                   e.what());
                                ret = LTTNG_ERR_UNK;
                                goto end;
                        }
index 445eda80c95aece5d4e9bf66791b659e94e298d4..6defe0049d6a55966ba9fcd1a7ab34af98ebf88b 100644 (file)
@@ -40,9 +40,7 @@ void cmd_init(void);
 /* Session commands */
 enum lttng_error_code cmd_create_session(struct command_ctx *cmd_ctx, int sock,
                struct lttng_session_descriptor **return_descriptor);
-int cmd_destroy_session(struct ltt_session *session,
-               struct notification_thread_handle *notification_thread_handle,
-               int *sock_fd);
+int cmd_destroy_session(struct ltt_session *session, int *sock_fd);
 
 /* Channel commands */
 int cmd_disable_channel(struct ltt_session *session,
@@ -171,8 +169,7 @@ int cmd_rotate_get_info(struct ltt_session *session,
                uint64_t rotate_id);
 int cmd_rotation_set_schedule(struct ltt_session *session,
                bool activate, enum lttng_rotation_schedule_type schedule_type,
-               uint64_t value,
-               struct notification_thread_handle *notification_thread_handle);
+               uint64_t value);
 
 const struct cmd_completion_handler *cmd_pop_completion_handler(void);
 int start_kernel_session(struct ltt_kernel_session *ksess);
index 3add6c9eb492dfa3ca36ba9c3f4000cab7ee8b99..ad9a8bf850d461fc217ed3381ded5c3819661770 100644 (file)
@@ -21,6 +21,7 @@ long the_page_size;
 struct health_app *the_health_sessiond;
 
 struct notification_thread_handle *the_notification_thread_handle;
+lttng::sessiond::rotation_thread::uptr the_rotation_thread_handle;
 
 struct lttng_ht *the_agent_apps_ht_by_sock = nullptr;
 struct lttng_ht *the_trigger_agents_ht_by_domain = nullptr;
index 6e2bdb4c6cf4d5badbb79866397217762a623e8f..2914475d55f2af0ba9052844095db1f372939214 100644 (file)
@@ -16,7 +16,6 @@
 #include "lttng-syscall.hpp"
 #include "modprobe.hpp"
 #include "notification-thread-commands.hpp"
-#include "rotate.hpp"
 #include "sessiond-config.hpp"
 #include "tracker.hpp"
 #include "utils.hpp"
index e94dd7a56eb825710cfbcf482615edf026f6f2db..a6b17c2a823ade6ba087aea3e2395b695347385d 100644 (file)
@@ -9,19 +9,20 @@
 #ifndef _LTT_SESSIOND_H
 #define _LTT_SESSIOND_H
 
-#include <urcu.h>
-#include <urcu/wfcqueue.h>
+#include "notification-thread.hpp"
+#include "rotation-thread.hpp"
+#include "session.hpp"
+#include "sessiond-config.hpp"
+#include "ust-app.hpp"
 
-#include <common/sessiond-comm/sessiond-comm.hpp>
-#include <common/payload.hpp>
 #include <common/compat/poll.hpp>
 #include <common/compat/socket.hpp>
+#include <common/payload.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
 #include <common/uuid.hpp>
 
-#include "session.hpp"
-#include "ust-app.hpp"
-#include "notification-thread.hpp"
-#include "sessiond-config.hpp"
+#include <urcu.h>
+#include <urcu/wfcqueue.h>
 
 /*
  * Consumer daemon state which is changed when spawning it, killing it or in
@@ -67,6 +68,9 @@ extern struct lttng_kernel_abi_tracer_abi_version the_kernel_tracer_abi_version;
 /* Notification thread handle. */
 extern struct notification_thread_handle *the_notification_thread_handle;
 
+/* Rotation thread handle. */
+extern lttng::sessiond::rotation_thread::uptr the_rotation_thread_handle;
+
 /*
  * This contains extra data needed for processing a command received by the
  * session daemon from the lttng client.
index cc32856198c3de9f0aac3c7bda84caf1d5b89564..312c243cf1a21cbd899285242e55fa636f73651a 100644 (file)
@@ -1303,7 +1303,7 @@ static void destroy_all_sessions_and_wait()
                        goto unlock_session;
                }
                (void) cmd_stop_trace(session);
-               (void) cmd_destroy_session(session, the_notification_thread_handle, nullptr);
+               (void) cmd_destroy_session(session, nullptr);
        unlock_session:
                session_unlock(session);
                session_put(session);
@@ -1411,10 +1411,8 @@ int main(int argc, char **argv)
                          *ust64_channel_monitor_pipe = nullptr,
                          *kernel_channel_monitor_pipe = nullptr;
        struct timer_thread_parameters timer_thread_parameters;
-       /* Rotation thread handle. */
-       struct rotation_thread_handle *rotation_thread_handle = nullptr;
        /* Queue of rotation jobs populated by the sessiond-timer. */
-       struct rotation_thread_timer_queue *rotation_timer_queue = nullptr;
+       lttng::sessiond::rotation_thread_timer_queue *rotation_timer_queue = nullptr;
        struct lttng_thread *client_thread = nullptr;
        struct lttng_thread *notification_thread = nullptr;
        struct lttng_thread *register_apps_thread = nullptr;
@@ -1600,7 +1598,7 @@ int main(int argc, char **argv)
         * sessiond timer thread and the rotation thread. The main thread keeps
         * its ownership and destroys it when both threads have been joined.
         */
-       rotation_timer_queue = rotation_thread_timer_queue_create();
+       rotation_timer_queue = lttng::sessiond::rotation_thread_timer_queue_create();
        if (!rotation_timer_queue) {
                retval = -1;
                goto stop_threads;
@@ -1771,18 +1769,21 @@ int main(int argc, char **argv)
                goto stop_threads;
        }
 
-       /* rotation_thread_data acquires the pipes' read side. */
-       rotation_thread_handle =
-               rotation_thread_handle_create(rotation_timer_queue, the_notification_thread_handle);
-       if (!rotation_thread_handle) {
+       try {
+               the_rotation_thread_handle =
+                       lttng::make_unique<lttng::sessiond::rotation_thread>(
+                               *rotation_timer_queue, *the_notification_thread_handle);
+       } catch (const std::exception& e) {
                retval = -1;
-               ERR("Failed to create rotation thread shared data");
+               ERR("Failed to create rotation thread: %s", e.what());
                goto stop_threads;
        }
 
-       /* Create rotation thread. */
-       if (!launch_rotation_thread(rotation_thread_handle)) {
+       try {
+               the_rotation_thread_handle->launch_thread();
+       } catch (const std::exception& e) {
                retval = -1;
+               ERR("Failed to launch rotation thread: %s", e.what());
                goto stop_threads;
        }
 
@@ -1943,10 +1944,6 @@ stop_threads:
        rcu_thread_offline();
        rcu_unregister_thread();
 
-       if (rotation_thread_handle) {
-               rotation_thread_handle_destroy(rotation_thread_handle);
-       }
-
        /*
         * After the rotation and timer thread have quit, we can safely destroy
         * the rotation_timer_queue.
diff --git a/src/bin/lttng-sessiond/rotate.cpp b/src/bin/lttng-sessiond/rotate.cpp
deleted file mode 100644 (file)
index 56a61fe..0000000
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
- * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * SPDX-License-Identifier: GPL-2.0-only
- *
- */
-
-#define _LGPL_SOURCE
-#include "cmd.hpp"
-#include "health-sessiond.hpp"
-#include "lttng-sessiond.hpp"
-#include "notification-thread-commands.hpp"
-#include "rotate.hpp"
-#include "rotation-thread.hpp"
-#include "session.hpp"
-#include "utils.hpp"
-
-#include <common/align.hpp>
-#include <common/config/session-config.hpp>
-#include <common/credentials.hpp>
-#include <common/defaults.hpp>
-#include <common/error.hpp>
-#include <common/futex.hpp>
-#include <common/hashtable/utils.hpp>
-#include <common/kernel-ctl/kernel-ctl.hpp>
-#include <common/time.hpp>
-#include <common/utils.hpp>
-
-#include <lttng/action/action-internal.hpp>
-#include <lttng/condition/condition-internal.hpp>
-#include <lttng/notification/channel-internal.hpp>
-#include <lttng/rotate-internal.hpp>
-#include <lttng/trigger/trigger.h>
-
-#include <inttypes.h>
-#include <signal.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <urcu.h>
-#include <urcu/list.h>
-#include <urcu/rculfhash.h>
-
-int subscribe_session_consumed_size_rotation(
-       struct ltt_session *session,
-       uint64_t size,
-       struct notification_thread_handle *notification_thread_handle)
-{
-       int ret;
-       enum lttng_condition_status condition_status;
-       enum lttng_notification_channel_status nc_status;
-       const uint64_t eventfd_increment_value = 1;
-       struct lttng_condition *rotate_condition = nullptr;
-       struct lttng_action *notify_action = nullptr;
-       const struct lttng_credentials session_creds = {
-               .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
-               .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
-       };
-
-       rotate_condition = lttng_condition_session_consumed_size_create();
-       if (!rotate_condition) {
-               ERR("Failed to create session consumed size condition object");
-               ret = -1;
-               goto end;
-       }
-
-       condition_status =
-               lttng_condition_session_consumed_size_set_threshold(rotate_condition, size);
-       if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-               ERR("Could not set session consumed size condition threshold (size = %" PRIu64 ")",
-                   size);
-               ret = -1;
-               goto end;
-       }
-
-       condition_status = lttng_condition_session_consumed_size_set_session_name(rotate_condition,
-                                                                                 session->name);
-       if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-               ERR("Could not set session consumed size condition session name (name = %s)",
-                   session->name);
-               ret = -1;
-               goto end;
-       }
-
-       notify_action = lttng_action_notify_create();
-       if (!notify_action) {
-               ERR("Could not create notify action");
-               ret = -1;
-               goto end;
-       }
-
-       LTTNG_ASSERT(!session->rotate_trigger);
-       session->rotate_trigger = lttng_trigger_create(rotate_condition, notify_action);
-       if (!session->rotate_trigger) {
-               ERR("Could not create size-based rotation trigger");
-               ret = -1;
-               goto end;
-       }
-
-       /* Ensure this trigger is not visible to external users. */
-       lttng_trigger_set_hidden(session->rotate_trigger);
-       lttng_trigger_set_credentials(session->rotate_trigger, &session_creds);
-
-       nc_status =
-               lttng_notification_channel_subscribe(rotate_notification_channel, rotate_condition);
-       if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Could not subscribe to session consumed size notification");
-               ret = -1;
-               goto end;
-       }
-
-       ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
-                         &eventfd_increment_value,
-                         sizeof(eventfd_increment_value));
-       if (ret != sizeof(eventfd_increment_value)) {
-               PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
-               ret = -1;
-               goto end;
-       }
-
-       ret = notification_thread_command_register_trigger(
-               notification_thread_handle, session->rotate_trigger, true);
-       if (ret < 0 && ret != -LTTNG_ERR_TRIGGER_EXISTS) {
-               ERR("Register trigger, %s", lttng_strerror(ret));
-               ret = -1;
-               goto end;
-       }
-
-       ret = 0;
-
-end:
-       lttng_condition_put(rotate_condition);
-       lttng_action_put(notify_action);
-       if (ret) {
-               lttng_trigger_put(session->rotate_trigger);
-       }
-       return ret;
-}
-
-int unsubscribe_session_consumed_size_rotation(
-       struct ltt_session *session, struct notification_thread_handle *notification_thread_handle)
-{
-       int ret = 0;
-       enum lttng_notification_channel_status status;
-       const uint64_t eventfd_increment_value = 1;
-
-       LTTNG_ASSERT(session->rotate_trigger);
-       status = lttng_notification_channel_unsubscribe(
-               rotate_notification_channel,
-               lttng_trigger_get_const_condition(session->rotate_trigger));
-       if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-               ERR("Session unsubscribe error: %d", (int) status);
-               ret = -1;
-               goto end;
-       }
-
-       ret = lttng_write(rotate_notification_channel_subscription_change_eventfd,
-                         &eventfd_increment_value,
-                         sizeof(eventfd_increment_value));
-       if (ret != sizeof(eventfd_increment_value)) {
-               PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed");
-               ret = -1;
-               goto end;
-       }
-
-       ret = notification_thread_command_unregister_trigger(notification_thread_handle,
-                                                            session->rotate_trigger);
-       if (ret != LTTNG_OK) {
-               ERR("Session unregister trigger error: %d", ret);
-               goto end;
-       }
-
-       lttng_trigger_put(session->rotate_trigger);
-       session->rotate_trigger = nullptr;
-
-       ret = 0;
-end:
-       return ret;
-}
diff --git a/src/bin/lttng-sessiond/rotate.hpp b/src/bin/lttng-sessiond/rotate.hpp
deleted file mode 100644 (file)
index 965a8c7..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
- * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * SPDX-License-Identifier: GPL-2.0-only
- *
- */
-
-#ifndef ROTATE_H
-#define ROTATE_H
-
-#include "rotation-thread.hpp"
-#include <stdint.h>
-
-/*
- * Subscribe/unsubscribe the notification_channel from the rotation_thread to
- * session usage notifications to perform size-based rotations.
- */
-int subscribe_session_consumed_size_rotation(struct ltt_session *session,
-               uint64_t size,
-               struct notification_thread_handle *notification_thread_handle);
-int unsubscribe_session_consumed_size_rotation(struct ltt_session *session,
-               struct notification_thread_handle *notification_thread_handle);
-
-#endif /* ROTATE_H */
index 815b4e95f1a5644d78fd18282c4f8cd0b52e6887..6f58a178d3abfe9aa34df0b669765341a66d9d10 100644 (file)
@@ -11,7 +11,6 @@
 #include "health-sessiond.hpp"
 #include "lttng-sessiond.hpp"
 #include "notification-thread-commands.hpp"
-#include "rotate.hpp"
 #include "rotation-thread.hpp"
 #include "session.hpp"
 #include "thread.hpp"
 #include <common/config/session-config.hpp>
 #include <common/defaults.hpp>
 #include <common/error.hpp>
+#include <common/eventfd.hpp>
+#include <common/exception.hpp>
+#include <common/file-descriptor.hpp>
+#include <common/format.hpp>
 #include <common/futex.hpp>
 #include <common/hashtable/utils.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/locked-reference.hpp>
+#include <common/make-unique-wrapper.hpp>
+#include <common/pthread-lock.hpp>
+#include <common/scope-exit.hpp>
 #include <common/time.hpp>
 #include <common/urcu.hpp>
 #include <common/utils.hpp>
 
+#include <lttng/action/action-internal.hpp>
 #include <lttng/condition/condition-internal.hpp>
 #include <lttng/location-internal.hpp>
 #include <lttng/notification/channel-internal.hpp>
@@ -37,6 +45,7 @@
 #include <lttng/trigger/trigger.h>
 
 #include <inttypes.h>
+#include <memory>
 #include <signal.h>
 #include <sys/eventfd.h>
 #include <sys/stat.h>
 #include <urcu.h>
 #include <urcu/list.h>
 
-struct lttng_notification_channel *rotate_notification_channel = nullptr;
-/*
- * This eventfd is used to wake-up the rotation thread whenever a command
- * completes on the notification channel. This ensures that any notification
- * that was queued while waiting for a reply to the command is eventually
- * consumed.
- */
-int rotate_notification_channel_subscription_change_eventfd = -1;
-
-struct rotation_thread {
-       struct lttng_poll_event events;
-};
+namespace ls = lttng::sessiond;
 
 /*
  * The timer thread enqueues jobs and wakes up the rotation thread.
  * When the rotation thread wakes up, it empties the queue.
  */
-struct rotation_thread_timer_queue {
+struct ls::rotation_thread_timer_queue {
        struct lttng_pipe *event_pipe;
        struct cds_list_head list;
        pthread_mutex_t lock;
 };
 
-struct rotation_thread_handle {
-       struct rotation_thread_timer_queue *rotation_timer_queue;
-       /* Access to the notification thread cmd_queue */
-       struct notification_thread_handle *notification_thread_handle;
-       /* Thread-specific quit pipe. */
-       struct lttng_pipe *quit_pipe;
-};
-
 namespace {
 struct rotation_thread_job {
-       enum rotation_thread_job_type type;
+       using uptr = std::unique_ptr<
+               rotation_thread_job,
+               lttng::details::create_unique_class<rotation_thread_job, lttng::free>>;
+
+       enum ls::rotation_thread_job_type type;
        struct ltt_session *session;
        /* List member in struct rotation_thread_timer_queue. */
        struct cds_list_head head;
 };
-} /* namespace */
 
-static const char *get_job_type_str(enum rotation_thread_job_type job_type)
+const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
 {
        switch (job_type) {
-       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+       case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
                return "CHECK_PENDING_ROTATION";
-       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+       case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
                return "SCHEDULED_ROTATION";
        default:
                abort();
        }
 }
 
-struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
-{
-       struct rotation_thread_timer_queue *queue = nullptr;
-
-       queue = zmalloc<rotation_thread_timer_queue>();
-       if (!queue) {
-               PERROR("Failed to allocate timer rotate queue");
-               goto end;
-       }
-
-       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
-       CDS_INIT_LIST_HEAD(&queue->list);
-       pthread_mutex_init(&queue->lock, nullptr);
-end:
-       return queue;
-}
-
-void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
-{
-       if (!queue) {
-               return;
-       }
-
-       lttng_pipe_destroy(queue->event_pipe);
-
-       pthread_mutex_lock(&queue->lock);
-       LTTNG_ASSERT(cds_list_empty(&queue->list));
-       pthread_mutex_unlock(&queue->lock);
-       pthread_mutex_destroy(&queue->lock);
-       free(queue);
-}
-
-/*
- * Destroy the thread data previously created by the init function.
- */
-void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
-{
-       lttng_pipe_destroy(handle->quit_pipe);
-       free(handle);
-}
-
-struct rotation_thread_handle *
-rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
-                             struct notification_thread_handle *notification_thread_handle)
-{
-       struct rotation_thread_handle *handle;
-
-       handle = zmalloc<rotation_thread_handle>();
-       if (!handle) {
-               goto end;
-       }
-
-       handle->rotation_timer_queue = rotation_timer_queue;
-       handle->notification_thread_handle = notification_thread_handle;
-       handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
-       if (!handle->quit_pipe) {
-               goto error;
-       }
-
-end:
-       return handle;
-error:
-       rotation_thread_handle_destroy(handle);
-       return nullptr;
-}
-
 /*
  * Called with the rotation_thread_timer_queue lock held.
  * Return true if the same timer job already exists in the queue, false if not.
  */
-static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
-                            enum rotation_thread_job_type job_type,
-                            struct ltt_session *session)
+bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
+                     ls::rotation_thread_job_type job_type,
+                     ltt_session *session)
 {
        bool exists = false;
        struct rotation_thread_job *job;
@@ -183,164 +110,7 @@ end:
        return exists;
 }
 
-void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
-                                enum rotation_thread_job_type job_type,
-                                struct ltt_session *session)
-{
-       int ret;
-       const char dummy = '!';
-       struct rotation_thread_job *job = nullptr;
-       const char *job_type_str = get_job_type_str(job_type);
-
-       pthread_mutex_lock(&queue->lock);
-       if (timer_job_exists(queue, job_type, session)) {
-               /*
-                * This timer job is already pending, we don't need to add
-                * it.
-                */
-               goto end;
-       }
-
-       job = zmalloc<rotation_thread_job>();
-       if (!job) {
-               PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
-                      job_type_str,
-                      session->name);
-               goto end;
-       }
-       /* No reason for this to fail as the caller must hold a reference. */
-       (void) session_get(session);
-
-       job->session = session;
-       job->type = job_type;
-       cds_list_add_tail(&job->head, &queue->list);
-
-       ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
-       if (ret < 0) {
-               /*
-                * We do not want to block in the timer handler, the job has
-                * been enqueued in the list, the wakeup pipe is probably full,
-                * the job will be processed when the rotation_thread catches
-                * up.
-                */
-               DIAGNOSTIC_PUSH
-               DIAGNOSTIC_IGNORE_LOGICAL_OP
-               if (errno == EAGAIN || errno == EWOULDBLOCK) {
-                       DIAGNOSTIC_POP
-                       /*
-                        * Not an error, but would be surprising and indicate
-                        * that the rotation thread can't keep up with the
-                        * current load.
-                        */
-                       DBG("Wake-up pipe of rotation thread job queue is full");
-                       goto end;
-               }
-               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
-                      job_type_str,
-                      session->name);
-               goto end;
-       }
-
-end:
-       pthread_mutex_unlock(&queue->lock);
-}
-
-static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
-{
-       int ret;
-
-       /*
-        * Create pollset with size 3:
-        *      - rotation thread quit pipe,
-        *      - rotation thread timer queue pipe,
-        *      - notification channel sock,
-        */
-       ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
-       if (ret < 0) {
-               goto error;
-       }
-
-       ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add quit pipe read fd to poll set");
-               goto error;
-       }
-
-       ret = lttng_poll_add(
-               poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add rotate_pending fd to poll set");
-               goto error;
-       }
-
-       return ret;
-error:
-       lttng_poll_clean(poll_set);
-       return ret;
-}
-
-static void fini_thread_state(struct rotation_thread *state)
-{
-       lttng_poll_clean(&state->events);
-       if (rotate_notification_channel) {
-               lttng_notification_channel_destroy(rotate_notification_channel);
-       }
-
-       if (rotate_notification_channel_subscription_change_eventfd >= 0) {
-               const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
-
-               if (close_ret) {
-                       PERROR("Failed to close rotation thread notification channel subscription change eventfd");
-               }
-       }
-}
-
-static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
-{
-       int ret;
-
-       memset(state, 0, sizeof(*state));
-       lttng_poll_init(&state->events);
-
-       ret = init_poll_set(&state->events, handle);
-       if (ret) {
-               ERR("Failed to initialize rotation thread poll set");
-               goto end;
-       }
-
-       rotate_notification_channel =
-               lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
-       if (!rotate_notification_channel) {
-               ERR("Could not create notification channel");
-               ret = -1;
-               goto end;
-       }
-       ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add notification fd to pollset");
-               goto end;
-       }
-
-       rotate_notification_channel_subscription_change_eventfd =
-               eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
-       if (rotate_notification_channel_subscription_change_eventfd < 0) {
-               PERROR("Failed to create rotation thread notification channel subscription change eventfd");
-               ret = -1;
-               goto end;
-       }
-       ret = lttng_poll_add(
-               &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
-       if (ret < 0) {
-               ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
-               goto end;
-       }
-
-end:
-       return ret;
-}
-
-static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
-                                                       bool *_rotation_completed)
+void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
 {
        int ret = 0;
        struct consumer_socket *socket;
@@ -351,70 +121,66 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess
        enum lttng_trace_chunk_status chunk_status;
        lttng::urcu::read_lock_guard read_lock;
 
-       LTTNG_ASSERT(session->chunk_being_archived);
+       LTTNG_ASSERT(session.chunk_being_archived);
 
        /*
         * Check for a local pending rotation on all consumers (32-bit
         * user space, 64-bit user space, and kernel).
         */
-       if (!session->ust_session) {
+       if (!session.ust_session) {
                goto skip_ust;
        }
 
        cds_lfht_for_each_entry (
-               session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
-               relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+               session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
+               relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session->ust_session->consumer->net_seq_index;
+                       session.ust_session->consumer->net_seq_index;
 
-               pthread_mutex_lock(socket->lock);
+               lttng::pthread::lock_guard socket_lock(*socket->lock);
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session->id,
-                                                 session->chunk_being_archived,
+                                                 session.id,
+                                                 session.chunk_being_archived,
                                                  &exists_status);
                if (ret) {
-                       pthread_mutex_unlock(socket->lock);
                        ERR("Error occurred while checking rotation status on consumer daemon");
                        goto end;
                }
 
                if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
-                       pthread_mutex_unlock(socket->lock);
                        chunk_exists_on_peer = true;
                        goto end;
                }
-               pthread_mutex_unlock(socket->lock);
        }
 
 skip_ust:
-       if (!session->kernel_session) {
+       if (!session.kernel_session) {
                goto skip_kernel;
        }
+
        cds_lfht_for_each_entry (
-               session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
-               pthread_mutex_lock(socket->lock);
-               relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+               session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
+               lttng::pthread::lock_guard socket_lock(*socket->lock);
+
+               relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session->kernel_session->consumer->net_seq_index;
+                       session.kernel_session->consumer->net_seq_index;
 
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session->id,
-                                                 session->chunk_being_archived,
+                                                 session.id,
+                                                 session.chunk_being_archived,
                                                  &exists_status);
                if (ret) {
-                       pthread_mutex_unlock(socket->lock);
                        ERR("Error occurred while checking rotation status on consumer daemon");
                        goto end;
                }
 
                if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
-                       pthread_mutex_unlock(socket->lock);
                        chunk_exists_on_peer = true;
                        goto end;
                }
-               pthread_mutex_unlock(socket->lock);
        }
 skip_kernel:
 end:
@@ -422,19 +188,20 @@ end:
        if (!chunk_exists_on_peer) {
                uint64_t chunk_being_archived_id;
 
-               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
                DBG("Rotation of trace archive %" PRIu64
                    " of session \"%s\" is complete on all consumers",
                    chunk_being_archived_id,
-                   session->name);
+                   session.name);
        }
-       *_rotation_completed = !chunk_exists_on_peer;
+
+       _rotation_completed = !chunk_exists_on_peer;
        if (ret) {
                ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
                if (ret) {
-                       ERR("Failed to reset rotation state of session \"%s\"", session->name);
+                       ERR("Failed to reset rotation state of session \"%s\"", session.name);
                }
        }
 }
@@ -444,9 +211,8 @@ end:
  * Should only return non-zero in the event of a fatal error. Doing so will
  * shutdown the thread.
  */
-static int
-check_session_rotation_pending(struct ltt_session *session,
-                              struct notification_thread_handle *notification_thread_handle)
+int check_session_rotation_pending(ltt_session& session,
+                                  notification_thread_handle& notification_thread_handle)
 {
        int ret;
        struct lttng_trace_archive_location *location;
@@ -455,17 +221,17 @@ check_session_rotation_pending(struct ltt_session *session,
        const char *archived_chunk_name;
        uint64_t chunk_being_archived_id;
 
-       if (!session->chunk_being_archived) {
+       if (!session.chunk_being_archived) {
                ret = 0;
                goto end;
        }
 
        chunk_status =
-               lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
+               lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
        DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
-           session->name,
+           session.name,
            chunk_being_archived_id);
 
        /*
@@ -481,8 +247,8 @@ check_session_rotation_pending(struct ltt_session *session,
                goto check_ongoing_rotation;
        }
 
-       check_session_rotation_pending_on_consumers(session, &rotation_completed);
-       if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+       check_session_rotation_pending_on_consumers(session, rotation_completed);
+       if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
                goto check_ongoing_rotation;
        }
 
@@ -491,40 +257,41 @@ check_session_rotation_pending(struct ltt_session *session,
         * rotations can start now.
         */
        chunk_status = lttng_trace_chunk_get_name(
-               session->chunk_being_archived, &archived_chunk_name, nullptr);
+               session.chunk_being_archived, &archived_chunk_name, nullptr);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-       free(session->last_archived_chunk_name);
-       session->last_archived_chunk_name = strdup(archived_chunk_name);
-       if (!session->last_archived_chunk_name) {
+       free(session.last_archived_chunk_name);
+       session.last_archived_chunk_name = strdup(archived_chunk_name);
+       if (!session.last_archived_chunk_name) {
                PERROR("Failed to duplicate archived chunk name");
        }
+
        session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
 
-       if (!session->quiet_rotation) {
-               location = session_get_trace_archive_location(session);
+       if (!session.quiet_rotation) {
+               location = session_get_trace_archive_location(&session);
                ret = notification_thread_command_session_rotation_completed(
-                       notification_thread_handle,
-                       session->id,
-                       session->last_archived_chunk_id.value,
+                       &notification_thread_handle,
+                       session.id,
+                       session.last_archived_chunk_id.value,
                        location);
                lttng_trace_archive_location_put(location);
                if (ret != LTTNG_OK) {
                        ERR("Failed to notify notification thread of completed rotation for session %s",
-                           session->name);
+                           session.name);
                }
        }
 
        ret = 0;
 check_ongoing_rotation:
-       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
-               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
+       if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
                DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
                    chunk_being_archived_id,
-                   session->name);
-               ret = timer_session_rotation_pending_check_start(session,
+                   session.name);
+               ret = timer_session_rotation_pending_check_start(&session,
                                                                 DEFAULT_ROTATE_PENDING_TIMER);
                if (ret) {
                        ERR("Failed to re-enable rotation pending timer");
@@ -538,173 +305,317 @@ end:
 }
 
 /* Call with the session and session_list locks held. */
-static int launch_session_rotation(struct ltt_session *session)
+int launch_session_rotation(ltt_session& session)
 {
        int ret;
        struct lttng_rotate_session_return rotation_return;
 
-       DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
+       DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
 
-       ret = cmd_rotate_session(
-               session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+       ASSERT_SESSION_LIST_LOCKED();
+       ASSERT_LOCKED(session.lock);
+
+       ret = cmd_rotate_session(&session,
+                                &rotation_return,
+                                false,
+                                LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        if (ret == LTTNG_OK) {
                DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
-                   session->name);
+                   session.name);
        } else {
                /* Don't consider errors as fatal. */
                DBG("Scheduled time-based rotation aborted for session %s: %s",
-                   session->name,
+                   session.name,
                    lttng_strerror(ret));
        }
+
        return 0;
 }
 
-static int run_job(struct rotation_thread_job *job,
-                  struct ltt_session *session,
-                  struct notification_thread_handle *notification_thread_handle)
+int run_job(const rotation_thread_job& job,
+           ltt_session& session,
+           notification_thread_handle& notification_thread_handle)
 {
        int ret;
 
-       switch (job->type) {
-       case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
+       switch (job.type) {
+       case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
                ret = launch_session_rotation(session);
                break;
-       case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
+       case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
                ret = check_session_rotation_pending(session, notification_thread_handle);
                break;
        default:
                abort();
        }
+
        return ret;
 }
 
-static int handle_job_queue(struct rotation_thread_handle *handle,
-                           struct rotation_thread *state __attribute__((unused)),
-                           struct rotation_thread_timer_queue *queue)
+bool shutdown_rotation_thread(void *thread_data)
 {
-       int ret = 0;
+       auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
 
-       for (;;) {
-               struct ltt_session *session;
-               struct rotation_thread_job *job;
+       return handle->shutdown();
+}
+} /* namespace */
 
-               /* Take the queue lock only to pop an element from the list. */
-               pthread_mutex_lock(&queue->lock);
-               if (cds_list_empty(&queue->list)) {
-                       pthread_mutex_unlock(&queue->lock);
-                       break;
+ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
+{
+       auto queue = zmalloc<ls::rotation_thread_timer_queue>();
+       if (!queue) {
+               PERROR("Failed to allocate timer rotate queue");
+               goto end;
+       }
+
+       queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
+       CDS_INIT_LIST_HEAD(&queue->list);
+       pthread_mutex_init(&queue->lock, nullptr);
+end:
+       return queue;
+}
+
+void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
+{
+       if (!queue) {
+               return;
+       }
+
+       lttng_pipe_destroy(queue->event_pipe);
+
+       {
+               lttng::pthread::lock_guard queue_lock(queue->lock);
+
+               LTTNG_ASSERT(cds_list_empty(&queue->list));
+       }
+
+       pthread_mutex_destroy(&queue->lock);
+       free(queue);
+}
+
+ls::rotation_thread::rotation_thread(
+       rotation_thread_timer_queue& rotation_timer_queue,
+       notification_thread_handle& notification_thread_handle) :
+       _rotation_timer_queue{ rotation_timer_queue },
+       _notification_thread_handle{ notification_thread_handle }
+{
+       _quit_pipe.reset([]() {
+               auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
+               if (!raw_pipe) {
+                       LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
                }
-               job = cds_list_first_entry(&queue->list, typeof(*job), head);
-               cds_list_del(&job->head);
-               pthread_mutex_unlock(&queue->lock);
 
-               session_lock_list();
-               session = job->session;
-               if (!session) {
-                       DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
+               return raw_pipe;
+       }());
+
+       _notification_channel.reset([]() {
+               auto channel = lttng_notification_channel_create(
+                       lttng_session_daemon_notification_endpoint);
+               if (!channel) {
+                       LTTNG_THROW_ERROR(
+                               "Failed to create notification channel of rotation thread");
+               }
+
+               return channel;
+       }());
+
+       lttng_poll_init(&_events);
+
+       /*
+        * Create pollset with size 4:
+        *      - rotation thread quit pipe,
+        *      - rotation thread timer queue pipe,
+        *      - notification channel sock,
+        *      - subscribtion change event fd
+        */
+       if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
+               LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
+       }
+
+       if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
+       }
+
+       if (lttng_poll_add(&_events,
+                          lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
+                          LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
+       }
+
+       if (lttng_poll_add(&_events,
+                          _notification_channel_subscribtion_change_eventfd.fd(),
+                          LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR(
+                       "Failed to add rotation thread notification channel subscription change eventfd to poll set");
+       }
+
+       if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
+               LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
+       }
+}
+
+ls::rotation_thread::~rotation_thread()
+{
+       lttng_poll_clean(&_events);
+}
+
+void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
+                                ls::rotation_thread_job_type job_type,
+                                ltt_session *session)
+{
+       const char dummy = '!';
+       struct rotation_thread_job *job = nullptr;
+       const char *job_type_str = get_job_type_str(job_type);
+       lttng::pthread::lock_guard queue_lock(queue->lock);
+
+       if (timer_job_exists(queue, job_type, session)) {
+               /*
+                * This timer job is already pending, we don't need to add
+                * it.
+                */
+               return;
+       }
+
+       job = zmalloc<rotation_thread_job>();
+       if (!job) {
+               PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
+                      job_type_str,
+                      session->name);
+               return;
+       }
+
+       /* No reason for this to fail as the caller must hold a reference. */
+       (void) session_get(session);
+
+       job->session = session;
+       job->type = job_type;
+       cds_list_add_tail(&job->head, &queue->list);
+
+       const int write_ret =
+               lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
+       if (write_ret < 0) {
+               /*
+                * We do not want to block in the timer handler, the job has
+                * been enqueued in the list, the wakeup pipe is probably full,
+                * the job will be processed when the rotation_thread catches
+                * up.
+                */
+               DIAGNOSTIC_PUSH
+               DIAGNOSTIC_IGNORE_LOGICAL_OP
+               if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                       DIAGNOSTIC_POP
                        /*
-                        * This is a non-fatal error, and we cannot report it to
-                        * the user (timer), so just print the error and
-                        * continue the processing.
-                        *
-                        * While the timer thread will purge pending signals for
-                        * a session on the session's destruction, it is
-                        * possible for a job targeting that session to have
-                        * already been queued before it was destroyed.
+                        * Not an error, but would be surprising and indicate
+                        * that the rotation thread can't keep up with the
+                        * current load.
                         */
-                       free(job);
-                       session_put(session);
-                       session_unlock_list();
-                       continue;
+                       DBG("Wake-up pipe of rotation thread job queue is full");
+                       return;
                }
 
-               session_lock(session);
-               ret = run_job(job, session, handle->notification_thread_handle);
-               session_unlock(session);
-               /* Release reference held by the job. */
-               session_put(session);
-               session_unlock_list();
-               free(job);
-               if (ret) {
-                       goto end;
-               }
+               PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
+                      job_type_str,
+                      session->name);
+               return;
        }
+}
 
-       ret = 0;
+void ls::rotation_thread::_handle_job_queue()
+{
+       for (;;) {
+               rotation_thread_job::uptr job;
+
+               {
+                       /* Take the queue lock only to pop an element from the list. */
+                       lttng::pthread::lock_guard rotation_timer_queue_lock(
+                               _rotation_timer_queue.lock);
+                       if (cds_list_empty(&_rotation_timer_queue.list)) {
+                               break;
+                       }
 
-end:
-       return ret;
+                       job.reset(cds_list_first_entry(
+                               &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
+                       cds_list_del(&job->head);
+               }
+
+               session_lock_list();
+               const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+
+               /* locked_ptr will unlock the session and release the ref held by the job. */
+               session_lock(job->session);
+               auto session = ltt_session::locked_ptr(job->session);
+
+               if (run_job(*job, *session, _notification_thread_handle)) {
+                       return;
+               }
+       }
 }
 
-static int handle_condition(const struct lttng_notification *notification,
-                           struct notification_thread_handle *notification_thread_handle)
+void ls::rotation_thread::_handle_notification(const lttng_notification &notification)
 {
        int ret = 0;
        const char *condition_session_name = nullptr;
-       enum lttng_condition_type condition_type;
        enum lttng_condition_status condition_status;
        enum lttng_evaluation_status evaluation_status;
        uint64_t consumed;
-       struct ltt_session *session;
-       const struct lttng_condition *condition =
-               lttng_notification_get_const_condition(notification);
-       const struct lttng_evaluation *evaluation =
-               lttng_notification_get_const_evaluation(notification);
-
-       condition_type = lttng_condition_get_type(condition);
+       auto *condition = lttng_notification_get_const_condition(&notification);
+       auto *evaluation = lttng_notification_get_const_evaluation(&notification);
+       const auto condition_type = lttng_condition_get_type(condition);
 
        if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
-               ret = -1;
-               ERR("Condition type and session usage type are not the same");
-               goto end;
+               LTTNG_THROW_ERROR("Unexpected condition type");
        }
 
-       /* Fetch info to test */
+       /* Fetch info to test. */
        condition_status = lttng_condition_session_consumed_size_get_session_name(
                condition, &condition_session_name);
        if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-               ERR("Session name could not be fetched");
-               ret = -1;
-               goto end;
+               LTTNG_THROW_ERROR("Session name could not be fetched from notification");
        }
+
        evaluation_status =
                lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
        if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
-               ERR("Failed to get evaluation");
-               ret = -1;
-               goto end;
+               LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
        }
 
+       DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
+               condition_session_name,
+               consumed);
+
        session_lock_list();
-       session = session_find_by_name(condition_session_name);
+       const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); });
+
+       ltt_session::locked_ptr session{ [&condition_session_name]() {
+               auto raw_session_ptr = session_find_by_name(condition_session_name);
+
+               if (raw_session_ptr) {
+                       session_lock(raw_session_ptr);
+               }
+
+               return raw_session_ptr;
+       }() };
        if (!session) {
-               DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
-                   lttng_condition_type_str(condition_type),
-                   condition_session_name);
+               DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
+                       lttng_condition_type_str(condition_type),
+                       condition_session_name);
                /*
                 * Not a fatal error: a session can be destroyed before we get
                 * the chance to handle the notification.
                 */
-               ret = 0;
-               session_unlock_list();
-               goto end;
+               return;
        }
-       session_lock(session);
 
        if (!lttng_trigger_is_equal(session->rotate_trigger,
-                                   lttng_notification_get_const_trigger(notification))) {
-               /* Notification does not originate from our rotation trigger. */
-               ret = 0;
-               goto end_unlock;
+                                   lttng_notification_get_const_trigger(&notification))) {
+               DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
+               return;
        }
 
-       ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
-       if (ret) {
-               goto end_unlock;
-       }
+       unsubscribe_session_consumed_size_rotation(*session);
 
        ret = cmd_rotate_session(
-               session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
        switch (ret) {
        case LTTNG_OK:
                break;
@@ -718,35 +629,16 @@ static int handle_condition(const struct lttng_notification *notification,
                DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
                break;
        default:
-               ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
-               ret = -1;
-               goto end_unlock;
-       }
-
-       ret = subscribe_session_consumed_size_rotation(
-               session, consumed + session->rotate_size, notification_thread_handle);
-       if (ret) {
-               ERR("Failed to subscribe to session consumed size condition");
-               goto end_unlock;
+               LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
+                               static_cast<lttng_error_code>(-ret));
        }
-       ret = 0;
 
-end_unlock:
-       session_unlock(session);
-       session_put(session);
-       session_unlock_list();
-end:
-       return ret;
+       subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
 }
 
-static int handle_notification_channel(int fd __attribute__((unused)),
-                                      struct rotation_thread_handle *handle,
-                                      struct rotation_thread *state __attribute__((unused)))
+void ls::rotation_thread::_handle_notification_channel_activity()
 {
-       int ret;
        bool notification_pending = true;
-       struct lttng_notification *notification = nullptr;
-       enum lttng_notification_channel_status status;
 
        /*
         * A notification channel may have multiple notifications queued-up internally in
@@ -765,122 +657,120 @@ static int handle_notification_channel(int fd __attribute__((unused)),
         * the channel's internal buffers.
         */
        while (notification_pending) {
-               status = lttng_notification_channel_has_pending_notification(
-                       rotate_notification_channel, &notification_pending);
-               if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
-                       ERR("Error occurred while checking for pending notification");
-                       ret = -1;
-                       goto end;
+               const auto pending_status = lttng_notification_channel_has_pending_notification(
+                       _notification_channel.get(), &notification_pending);
+               if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+                       LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
                }
 
                if (!notification_pending) {
-                       ret = 0;
-                       goto end;
+                       return;
                }
 
                /* Receive the next notification. */
-               status = lttng_notification_channel_get_next_notification(
-                       rotate_notification_channel, &notification);
-               switch (status) {
+               lttng_notification::uptr notification;
+               enum lttng_notification_channel_status next_notification_status;
+
+               {
+                       struct lttng_notification *raw_notification_ptr;
+
+                       next_notification_status = lttng_notification_channel_get_next_notification(
+                               _notification_channel.get(), &raw_notification_ptr);
+                       notification.reset(raw_notification_ptr);
+               }
+
+               switch (next_notification_status) {
                case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
                        break;
                case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
                        WARN("Dropped notification detected on notification channel used by the rotation management thread.");
-                       ret = 0;
-                       goto end;
+                       return;
                case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
-                       ERR("Notification channel was closed");
-                       ret = -1;
-                       goto end;
+                       LTTNG_THROW_ERROR("Notification channel was closed");
                default:
                        /* Unhandled conditions / errors. */
-                       ERR("Unknown notification channel status");
-                       ret = -1;
-                       goto end;
+                       LTTNG_THROW_ERROR("Unknown notification channel status");
                }
 
-               ret = handle_condition(notification, handle->notification_thread_handle);
-               lttng_notification_destroy(notification);
-               if (ret) {
-                       goto end;
-               }
+               _handle_notification(*notification);
        }
-end:
-       return ret;
 }
 
-static void *thread_rotation(void *data)
+void ls::rotation_thread::_thread_function() noexcept
 {
-       int ret;
-       struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
-       struct rotation_thread thread;
-       int queue_pipe_fd;
-
        DBG("Started rotation thread");
+
+       try {
+               _run();
+       } catch (const std::exception& e) {
+               ERR_FMT("Fatal rotation thread error: {}", e.what());
+       }
+
+       DBG("Thread exit");
+}
+
+void ls::rotation_thread::_run()
+{
        rcu_register_thread();
+       const auto unregister_rcu_thread =
+               lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
+
        rcu_thread_online();
+       const auto offline_rcu_thread =
+               lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
+
        health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
        health_code_update();
+       const auto unregister_health =
+               lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
 
-       if (!handle) {
-               ERR("Invalid thread context provided");
-               goto end;
-       }
-
-       queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
-
-       ret = init_thread_state(handle, &thread);
-       if (ret) {
-               goto error;
-       }
+       const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
 
        while (true) {
-               int fd_count, i;
-
                health_poll_entry();
                DBG("Entering poll wait");
-               ret = lttng_poll_wait(&thread.events, -1);
-               DBG("Poll wait returned (%i)", ret);
+               auto poll_wait_ret = lttng_poll_wait(&_events, -1);
+               DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
                health_poll_exit();
-               if (ret < 0) {
+               if (poll_wait_ret < 0) {
                        /*
                         * Restart interrupted system call.
                         */
                        if (errno == EINTR) {
                                continue;
                        }
-                       ERR("Error encountered during lttng_poll_wait (%i)", ret);
-                       goto error;
+
+                       LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
                }
 
-               fd_count = ret;
-               for (i = 0; i < fd_count; i++) {
-                       int fd = LTTNG_POLL_GETFD(&thread.events, i);
-                       uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
+               const auto fd_count = poll_wait_ret;
+               for (int i = 0; i < fd_count; i++) {
+                       const auto fd = LTTNG_POLL_GETFD(&_events, i);
+                       const auto revents = LTTNG_POLL_GETEV(&_events, i);
 
-                       DBG("Handling fd (%i) activity (%u)", fd, revents);
+                       DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
 
                        if (revents & LPOLLERR) {
-                               ERR("Polling returned an error on fd %i", fd);
-                               goto error;
+                               LTTNG_THROW_ERROR(
+                                       fmt::format("Polling returned an error on fd: fd={}", fd));
                        }
 
-                       if (fd == rotate_notification_channel->socket ||
-                           fd == rotate_notification_channel_subscription_change_eventfd) {
-                               ret = handle_notification_channel(fd, handle, &thread);
-                               if (ret) {
-                                       ERR("Error occurred while handling activity on notification channel socket");
-                                       goto error;
+                       if (fd == _notification_channel->socket ||
+                           fd == _notification_channel_subscribtion_change_eventfd.fd()) {
+                               try {
+                                       _handle_notification_channel_activity();
+                               } catch (const lttng::ctl::error& e) {
+                                       /*
+                                        * The only non-fatal error (rotation failed), others
+                                        * are caught at the top-level.
+                                        */
+                                       DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
+                                               e.what());
+                                       continue;
                                }
 
-                               if (fd == rotate_notification_channel_subscription_change_eventfd) {
-                                       uint64_t eventfd_value;
-                                       const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
-
-                                       if (read_ret != sizeof(eventfd_value)) {
-                                               PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
-                                               goto error;
-                                       }
+                               if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
+                                       _notification_channel_subscribtion_change_eventfd.decrement();
                                }
                        } else {
                                /* Job queue or quit pipe activity. */
@@ -891,59 +781,162 @@ static void *thread_rotation(void *data)
                                 * flushed and all references held in the queue
                                 * are released.
                                 */
-                               ret = handle_job_queue(
-                                       handle, &thread, handle->rotation_timer_queue);
-                               if (ret) {
-                                       ERR("Failed to handle rotation timer pipe event");
-                                       goto error;
-                               }
-
+                               _handle_job_queue();
                                if (fd == queue_pipe_fd) {
                                        char buf;
 
-                                       ret = lttng_read(fd, &buf, 1);
-                                       if (ret != 1) {
-                                               ERR("Failed to read from wakeup pipe (fd = %i)",
-                                                   fd);
-                                               goto error;
+                                       if (lttng_read(fd, &buf, 1) != 1) {
+                                               LTTNG_THROW_POSIX(
+                                                       fmt::format(
+                                                               "Failed to read from wakeup pipe: fd={}",
+                                                               fd),
+                                                       errno);
                                        }
                                } else {
                                        DBG("Quit pipe activity");
-                                       goto exit;
+                                       return;
                                }
                        }
                }
        }
-exit:
-error:
-       DBG("Thread exit");
-       fini_thread_state(&thread);
-end:
-       health_unregister(the_health_sessiond);
-       rcu_thread_offline();
-       rcu_unregister_thread();
-       return nullptr;
 }
 
-static bool shutdown_rotation_thread(void *thread_data)
+bool ls::rotation_thread::shutdown() const noexcept
 {
-       struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
-       const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
+       const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
 
        return notify_thread_pipe(write_fd) == 1;
 }
 
-bool launch_rotation_thread(struct rotation_thread_handle *handle)
+void ls::rotation_thread::launch_thread()
 {
-       struct lttng_thread *thread;
-
-       thread = lttng_thread_create(
-               "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
+       auto thread = lttng_thread_create(
+               "Rotation",
+               [](void *ptr) {
+                       auto handle = reinterpret_cast<rotation_thread *>(ptr);
+
+                       handle->_thread_function();
+                       return static_cast<void *>(nullptr);
+               },
+               shutdown_rotation_thread,
+               nullptr,
+               this);
        if (!thread) {
-               goto error;
+               LTTNG_THROW_ERROR("Failed to launch rotation thread");
        }
+
        lttng_thread_put(thread);
-       return true;
-error:
-       return false;
+}
+
+void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
+                                                                         std::uint64_t size)
+{
+       const struct lttng_credentials session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
+       };
+
+       ASSERT_LOCKED(session.lock);
+
+       auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
+               lttng_condition_session_consumed_size_create());
+       if (!rotate_condition) {
+               LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
+       }
+
+       auto condition_status =
+               lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
+       if (condition_status != LTTNG_CONDITION_STATUS_OK) {
+               LTTNG_THROW_ERROR(fmt::format(
+                       "Could not set session consumed size condition threshold: size={}", size));
+       }
+
+       condition_status = lttng_condition_session_consumed_size_set_session_name(rotate_condition.get(),
+                                                                                 session.name);
+       if (condition_status != LTTNG_CONDITION_STATUS_OK) {
+               LTTNG_THROW_ERROR(fmt::format(
+                       "Could not set session consumed size condition session name: name=`{}`",
+                       session.name));
+       }
+
+       auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
+               lttng_action_notify_create());
+       if (!notify_action) {
+               LTTNG_THROW_POSIX("Could not create notify action", errno);
+       }
+
+       LTTNG_ASSERT(!session.rotate_trigger);
+       /* trigger acquires its own reference to condition and action on success. */
+       auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
+               lttng_trigger_create(rotate_condition.get(), notify_action.get()));
+       if (!trigger)
+       {
+               LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
+       }
+
+       /* Ensure this trigger is not visible to external users. */
+       lttng_trigger_set_hidden(trigger.get());
+       lttng_trigger_set_credentials(trigger.get(), &session_creds);
+
+       auto nc_status =
+               lttng_notification_channel_subscribe(_notification_channel.get(), rotate_condition.get());
+       if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+               LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
+       }
+
+       /*
+        * Ensure any notification queued during the subscription are consumed by queueing an
+        * event.
+        */
+       _notification_channel_subscribtion_change_eventfd.increment();
+
+       const auto register_ret = notification_thread_command_register_trigger(
+               &_notification_thread_handle, trigger.get(), true);
+       if (register_ret != LTTNG_OK) {
+               LTTNG_THROW_CTL(
+                       fmt::format(
+                               "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
+                               session.name,
+                               size),
+                       register_ret);
+       }
+
+       /* Ownership transferred to the session. */
+       session.rotate_trigger = trigger.release();
+}
+
+void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
+{
+       LTTNG_ASSERT(session.rotate_trigger);
+
+       const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
+               lttng_trigger_put(session.rotate_trigger);
+               session.rotate_trigger = nullptr;
+       });
+
+       const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
+               _notification_channel.get(),
+               lttng_trigger_get_const_condition(session.rotate_trigger));
+       if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
+               LTTNG_THROW_ERROR(fmt::format(
+                       "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
+                       session.name,
+                       static_cast<int>(unsubscribe_status)));
+       }
+
+       /*
+        * Ensure any notification queued during the un-subscription are consumed by queueing an
+        * event.
+        */
+       _notification_channel_subscribtion_change_eventfd.increment();
+
+       const auto unregister_status = notification_thread_command_unregister_trigger(
+               &_notification_thread_handle, session.rotate_trigger);
+       if (unregister_status != LTTNG_OK) {
+               LTTNG_THROW_CTL(
+                       fmt::format(
+                               "Failed to unregister trigger for automatic size-based rotation: session_name{}",
+                               session.name),
+                       unregister_status);
+       }
 }
index 6dc7c97ac9a8b65142f3cf5a59c168ba8f0877ac..02ff2b2fe1549baece8eb03563e5cf86fa39d658 100644 (file)
@@ -9,44 +9,84 @@
 #ifndef ROTATION_THREAD_H
 #define ROTATION_THREAD_H
 
-#include <urcu/list.h>
-#include <urcu.h>
-#include <urcu/rculfhash.h>
-#include <lttng/domain.h>
-#include <common/pipe.hpp>
+#include "notification-thread.hpp"
+#include "session.hpp"
+
 #include <common/compat/poll.hpp>
+#include <common/eventfd.hpp>
 #include <common/hashtable/hashtable.hpp>
+#include <common/make-unique-wrapper.hpp>
+#include <common/pipe.hpp>
+
+#include <lttng/domain.h>
+#include <lttng/notification/channel-internal.hpp>
+
+#include <memory>
 #include <pthread.h>
 #include <semaphore.h>
-#include "session.hpp"
-#include "notification-thread.hpp"
+#include <urcu.h>
+#include <urcu/list.h>
+#include <urcu/rculfhash.h>
 
-extern struct lttng_notification_channel *rotate_notification_channel;
-extern int rotate_notification_channel_subscription_change_eventfd;
+namespace lttng {
+namespace sessiond {
 
-enum rotation_thread_job_type {
-       ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
-       ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION
+enum class rotation_thread_job_type {
+       SCHEDULED_ROTATION,
+       CHECK_PENDING_ROTATION
 };
 
 struct rotation_thread_timer_queue;
-struct rotation_thread_handle;
 
-struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void);
-void rotation_thread_timer_queue_destroy(
-               struct rotation_thread_timer_queue *queue);
+class rotation_thread {
+public:
+       using uptr = std::unique_ptr<rotation_thread>;
 
-struct rotation_thread_handle *rotation_thread_handle_create(
-               struct rotation_thread_timer_queue *rotation_timer_queue,
-               struct notification_thread_handle *notification_thread_handle);
+       rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
+                              notification_thread_handle& notification_thread_handle);
+       ~rotation_thread();
 
-void rotation_thread_handle_destroy(
-               struct rotation_thread_handle *handle);
+       /* Only use through the lttng_thread facilities. */
+       void launch_thread();
+       bool shutdown() const noexcept;
 
+       /*
+        * Subscribe/unsubscribe the rotation_thread's notification_channel to/from
+        * session usage notifications to perform size-based rotations.
+        */
+       void subscribe_session_consumed_size_rotation(ltt_session& session, std::uint64_t size);
+       void unsubscribe_session_consumed_size_rotation(ltt_session& session);
+
+private:
+       void _thread_function() noexcept;
+       void _run();
+       void _handle_job_queue();
+       void _handle_notification(const lttng_notification& notification);
+       void _handle_notification_channel_activity();
+
+       struct rotation_thread_timer_queue& _rotation_timer_queue;
+       /* Access to the notification thread cmd_queue */
+       notification_thread_handle& _notification_thread_handle;
+       /* Thread-specific quit pipe. */
+       lttng_pipe::uptr _quit_pipe;
+       lttng_notification_channel::uptr _notification_channel;
+       /*
+        * Use an event_fd to wake-up the rotation thread whenever a command
+        * completes on the notification channel. This ensures that any
+        * notification that was queued while waiting for a reply to the command is
+        * eventually consumed.
+        */
+       lttng::eventfd _notification_channel_subscribtion_change_eventfd;
+       lttng_poll_event _events;
+};
+
+struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void);
+void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue);
 void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
-               enum rotation_thread_job_type job_type,
-               struct ltt_session *session);
+                                enum rotation_thread_job_type job_type,
+                                struct ltt_session *session);
 
-bool launch_rotation_thread(struct rotation_thread_handle *handle);
+} /* namespace sessiond */
+} /* namespace lttng */
 
 #endif /* ROTATION_THREAD_H */
index 75d767c25644088b364c8a133a21e7ca214b77a9..0532a76867585fe8e7c22f4115a7abc8563e7330 100644 (file)
@@ -158,7 +158,7 @@ void session_list_wait_empty()
 /*
  * Acquire session list lock
  */
-void session_lock_list()
+void session_lock_list() noexcept
 {
        pthread_mutex_lock(&the_session_list.lock);
 }
@@ -166,7 +166,7 @@ void session_lock_list()
 /*
  * Try to acquire session list lock
  */
-int session_trylock_list()
+int session_trylock_list() noexcept
 {
        return pthread_mutex_trylock(&the_session_list.lock);
 }
@@ -174,7 +174,7 @@ int session_trylock_list()
 /*
  * Release session list lock
  */
-void session_unlock_list()
+void session_unlock_list() noexcept
 {
        pthread_mutex_unlock(&the_session_list.lock);
 }
@@ -928,19 +928,19 @@ static void session_notify_destruction(const struct ltt_session *session)
 /*
  * Fire each clear notifier once, and remove them from the array.
  */
-void session_notify_clear(struct ltt_session *session)
+void session_notify_clear(ltt_session &session)
 {
        size_t i;
-       const size_t count = lttng_dynamic_array_get_count(&session->clear_notifiers);
+       const size_t count = lttng_dynamic_array_get_count(&session.clear_notifiers);
 
        for (i = 0; i < count; i++) {
                const struct ltt_session_clear_notifier_element *element =
                        (ltt_session_clear_notifier_element *) lttng_dynamic_array_get_element(
-                               &session->clear_notifiers, i);
+                               &session.clear_notifiers, i);
 
-               element->notifier(session, element->user_data);
+               element->notifier(&session, element->user_data);
        }
-       lttng_dynamic_array_clear(&session->clear_notifiers);
+       lttng_dynamic_array_clear(&session.clear_notifiers);
 }
 
 static void session_release(struct urcu_ref *ref)
@@ -1359,26 +1359,26 @@ bool session_access_ok(struct ltt_session *session, uid_t uid)
  *
  * Must be called with the session and session_list locks held.
  */
-int session_reset_rotation_state(struct ltt_session *session, enum lttng_rotation_state result)
+int session_reset_rotation_state(ltt_session &session, enum lttng_rotation_state result)
 {
        int ret = 0;
 
        ASSERT_LOCKED(the_session_list.lock);
-       ASSERT_LOCKED(session->lock);
+       ASSERT_LOCKED(session.lock);
 
-       session->rotation_state = result;
-       if (session->rotation_pending_check_timer_enabled) {
+       session.rotation_state = result;
+       if (session.rotation_pending_check_timer_enabled) {
                ret = timer_session_rotation_pending_check_stop(session);
        }
-       if (session->chunk_being_archived) {
+       if (session.chunk_being_archived) {
                uint64_t chunk_id;
                enum lttng_trace_chunk_status chunk_status;
 
-               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_id);
+               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-               LTTNG_OPTIONAL_SET(&session->last_archived_chunk_id, chunk_id);
-               lttng_trace_chunk_put(session->chunk_being_archived);
-               session->chunk_being_archived = nullptr;
+               LTTNG_OPTIONAL_SET(&session.last_archived_chunk_id, chunk_id);
+               lttng_trace_chunk_put(session.chunk_being_archived);
+               session.chunk_being_archived = nullptr;
                /*
                 * Fire the clear reply notifiers if we are completing a clear
                 * rotation.
@@ -1429,6 +1429,10 @@ end:
 
 void ls::details::locked_session_release(ltt_session *session)
 {
+       if (!session) {
+               return;
+       }
+
        session_unlock(session);
        session_put(session);
 }
index 0c4f71d47b84bfb264120073e6be66427363a013..49064037d7668db5c3d7e62edbea4d5acdf6e8c4 100644 (file)
@@ -226,9 +226,9 @@ void session_unlock(struct ltt_session *session);
  * In other words, it prevents tracer configurations from changing while they
  * are being transmitted to the various applications.
  */
-void session_lock_list();
-int session_trylock_list();
-void session_unlock_list();
+void session_lock_list() noexcept;
+int session_trylock_list() noexcept;
+void session_unlock_list() noexcept;
 
 void session_destroy(struct ltt_session *session);
 int session_add_destroy_notifier(struct ltt_session *session,
@@ -236,7 +236,7 @@ int session_add_destroy_notifier(struct ltt_session *session,
 
 int session_add_clear_notifier(struct ltt_session *session,
                ltt_session_clear_notifier notifier, void *user_data);
-void session_notify_clear(struct ltt_session *session);
+void session_notify_clear(ltt_session &session);
 
 bool session_get(struct ltt_session *session);
 void session_put(struct ltt_session *session);
@@ -259,7 +259,7 @@ void session_list_wait_empty();
 
 bool session_access_ok(struct ltt_session *session, uid_t uid);
 
-int session_reset_rotation_state(struct ltt_session *session,
+int session_reset_rotation_state(ltt_session &session,
                enum lttng_rotation_state result);
 
 /* Create a new trace chunk object from the session's configuration. */
index 243b562190d12e3656e438c561f25aba4bc8215b..d87824704633ab61fbe54f9178de333990904a17 100644 (file)
@@ -233,25 +233,25 @@ end:
 /*
  * Call with session and session_list locks held.
  */
-int timer_session_rotation_pending_check_stop(struct ltt_session *session)
+int timer_session_rotation_pending_check_stop(ltt_session &session)
 {
        int ret;
 
-       LTTNG_ASSERT(session);
-       LTTNG_ASSERT(session->rotation_pending_check_timer_enabled);
+       LTTNG_ASSERT(session.rotation_pending_check_timer_enabled);
 
-       DBG("Disabling session rotation pending check timer on session %" PRIu64, session->id);
-       ret = timer_stop(&session->rotation_pending_check_timer,
+       DBG("Disabling session rotation pending check timer on session %" PRIu64, session.id);
+       ret = timer_stop(&session.rotation_pending_check_timer,
                         LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK);
        if (ret == -1) {
                ERR("Failed to stop rotate_pending_check timer");
        } else {
-               session->rotation_pending_check_timer_enabled = false;
+               session.rotation_pending_check_timer_enabled = false;
                /*
                 * The timer's reference to the session can be released safely.
                 */
-               session_put(session);
+               session_put(&session);
        }
+
        return ret;
 }
 
@@ -379,13 +379,15 @@ static void *thread_timer(void *data)
                        struct ltt_session *session =
                                (struct ltt_session *) info.si_value.sival_ptr;
 
-                       rotation_thread_enqueue_job(ctx->rotation_thread_job_queue,
-                                                   ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION,
-                                                   session);
+                       rotation_thread_enqueue_job(
+                               ctx->rotation_thread_job_queue,
+                               lttng::sessiond::rotation_thread_job_type::CHECK_PENDING_ROTATION,
+                               session);
                } else if (signr == LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION) {
-                       rotation_thread_enqueue_job(ctx->rotation_thread_job_queue,
-                                                   ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION,
-                                                   (struct ltt_session *) info.si_value.sival_ptr);
+                       rotation_thread_enqueue_job(
+                               ctx->rotation_thread_job_queue,
+                               lttng::sessiond::rotation_thread_job_type::SCHEDULED_ROTATION,
+                               (struct ltt_session *) info.si_value.sival_ptr);
                        /*
                         * The scheduled periodic rotation timer is not in
                         * "one-shot" mode. The reference to the session is not
index f746f0ed9c6fba3689894cfcb0eb49b1a921eff9..3cdebbc13590f68bb15baa093d04d6069525dcf0 100644 (file)
@@ -9,13 +9,14 @@
 #ifndef SESSIOND_TIMER_H
 #define SESSIOND_TIMER_H
 
+#include "rotation-thread.hpp"
+#include "session.hpp"
+
 #include <pthread.h>
 #include <stdbool.h>
 
-#include "session.hpp"
-
 struct timer_thread_parameters {
-       struct rotation_thread_timer_queue *rotation_thread_job_queue;
+       lttng::sessiond::rotation_thread_timer_queue *rotation_thread_job_queue;
 };
 
 int timer_signal_init(void);
@@ -24,7 +25,7 @@ int timer_signal_init(void);
 int timer_session_rotation_pending_check_start(struct ltt_session *session,
                unsigned int interval_us);
 /* Stop a session's rotation pending check timer. */
-int timer_session_rotation_pending_check_stop(struct ltt_session *session);
+int timer_session_rotation_pending_check_stop(ltt_session &session);
 
 /* Start a session's rotation schedule timer. */
 int timer_session_rotation_schedule_timer_start(struct ltt_session *session,
index aaab0a3b2512d4a517d584127a5bafc8f513696b..bdc7d6d2065b66eba162f58e620166b706bc5668 100644 (file)
@@ -19,7 +19,6 @@
 #include "lttng-ust-ctl.hpp"
 #include "lttng-ust-error.hpp"
 #include "notification-thread-commands.hpp"
-#include "rotate.hpp"
 #include "session.hpp"
 #include "ust-app.hpp"
 #include "ust-consumer.hpp"
index 8b8e4364530bfcaccc7113697eaed0ff9d8a3bac..390c4dccc1ada97dbbe8e6b886791366eac0ddc1 100644 (file)
@@ -9,14 +9,16 @@
 #define _ERROR_H
 
 #include <common/compat/errno.hpp>
-#include <stdio.h>
+#include <common/compat/time.hpp>
+#include <common/format.hpp>
+#include <common/macros.hpp>
+#include <common/string-utils/format.hpp>
+
+#include <stdbool.h>
 #include <stdint.h>
+#include <stdio.h>
 #include <string.h>
-#include <stdbool.h>
 #include <urcu/tls-compat.h>
-#include <common/compat/time.hpp>
-#include <common/string-utils/format.hpp>
-#include <common/macros.hpp>
 
 #ifndef _GNU_SOURCE
 #error "lttng-tools error.h needs _GNU_SOURCE"
@@ -252,6 +254,10 @@ static inline void __lttng_print_check_abort(enum lttng_error_level type)
        } while (0);
 #endif
 
+#define DBG_FMT(format_str, args...)  DBG("%s", fmt::format(format_str, ##args).c_str())
+#define WARN_FMT(format_str, args...) WARN("%s", fmt::format(format_str, ##args).c_str())
+#define ERR_FMT(format_str, args...)  ERR("%s", fmt::format(format_str, ##args).c_str())
+
 const char *error_get_str(int32_t code);
 
 /*
index 05c30fc6d37d198025d4c8d15433ad269bf96d16..028d253f2d885a6c9d9c29a4dc8c875a40b698f2 100644 (file)
@@ -24,12 +24,15 @@ format_throw_location(const char *file_name, const char *function_name, unsigned
 }
 } /* namespace */
 
-lttng::ctl::error::error(lttng_error_code error_code,
+lttng::ctl::error::error(const std::string& msg,
+                        lttng_error_code error_code,
                         const char *file_name,
                         const char *function_name,
                         unsigned int line_number) :
-       runtime_error(
-               std::string(error_get_str(error_code)), file_name, function_name, line_number),
+       runtime_error(msg + ": " + std::string(error_get_str(error_code)),
+                     file_name,
+                     function_name,
+                     line_number),
        _error_code{ error_code }
 {
 }
index 1efd80359654bb34de8af78708a131cd8b194db2..a3b0a83e210a5246b35e949112eef5a315c79f73 100644 (file)
 
 #include <lttng/lttng-error.h>
 
-#define LTTNG_THROW_CTL(error_code) \
+#define LTTNG_THROW_CTL(msg, error_code) \
        throw lttng::ctl::error(msg, error_code, __FILE__, __func__, __LINE__)
 #define LTTNG_THROW_POSIX(msg, errno_code) \
        throw lttng::posix_error(msg, errno_code, __FILE__, __func__, __LINE__)
-#define LTTNG_THROW_ERROR(msg) \
-       throw lttng::runtime_error(msg, __FILE__, __func__, __LINE__)
+#define LTTNG_THROW_ERROR(msg) throw lttng::runtime_error(msg, __FILE__, __func__, __LINE__)
 #define LTTNG_THROW_UNSUPPORTED_ERROR(msg) \
        throw lttng::runtime_error(msg, __FILE__, __func__, __LINE__)
 #define LTTNG_THROW_COMMUNICATION_ERROR(msg) \
@@ -50,10 +49,11 @@ namespace ctl {
 /* Wrap lttng_error_code errors which may be reported through liblttng-ctl's interface. */
 class error : public runtime_error {
 public:
-       explicit error(lttng_error_code error_code,
-                       const char *file_name,
-                       const char *function_name,
-                       unsigned int line_number);
+       explicit error(const std::string& msg,
+                      lttng_error_code error_code,
+                      const char *file_name,
+                      const char *function_name,
+                      unsigned int line_number);
 
        lttng_error_code code() const noexcept
        {
index 9491a13af444bc7dbc9d1f372c43e8a5b3c84333..4042c61cefc476b907ebb861a5d3b66800a0def9 100644 (file)
@@ -49,7 +49,7 @@ lttng::file_descriptor::~file_descriptor() noexcept
        _cleanup();
 }
 
-int lttng::file_descriptor::_fd() const noexcept
+int lttng::file_descriptor::fd() const noexcept
 {
        LTTNG_ASSERT(is_valid_fd(_raw_fd));
        return _raw_fd;
@@ -87,9 +87,9 @@ void lttng::file_descriptor::write(const void *buffer, std::size_t size)
                        max_supported_write_size));
        }
 
-       const auto write_ret = lttng_write(_fd(), buffer, size);
+       const auto write_ret = lttng_write(fd(), buffer, size);
        if (write_ret < 0 || static_cast<std::size_t>(write_ret) != size) {
-               LTTNG_THROW_POSIX(fmt::format("Failed to write to file descriptor: fd={}", _fd()),
+               LTTNG_THROW_POSIX(fmt::format("Failed to write to file descriptor: fd={}", fd()),
                                  errno);
        }
 }
@@ -111,9 +111,9 @@ void lttng::file_descriptor::read(void *buffer, std::size_t size)
                        max_supported_read_size));
        }
 
-       const auto read_ret = lttng_read(_fd(), buffer, size);
+       const auto read_ret = lttng_read(fd(), buffer, size);
        if (read_ret < 0 || static_cast<std::size_t>(read_ret) != size) {
-               LTTNG_THROW_POSIX(fmt::format("Failed to read from file descriptor: fd={}", _fd()),
+               LTTNG_THROW_POSIX(fmt::format("Failed to read from file descriptor: fd={}", fd()),
                                  errno);
        }
 }
index 164842f86ba8184d2cec46715a43be031ba8e606..68686a4adccc89efb7613a3b805e66052cce6c81 100644 (file)
@@ -42,8 +42,8 @@ public:
         */
        void write(const void *buffer, std::size_t size);
 
+       int fd() const noexcept;
 protected:
-       int _fd() const noexcept;
        void _cleanup() noexcept;
 
 private:
index d46e65d6833a32b97228e7a76573aaca829e2b64..4e6afc567831868c570ab36c2fa8998fd8357ca0 100644 (file)
@@ -8,8 +8,10 @@
 #ifndef LTTNG_PIPE_H
 #define LTTNG_PIPE_H
 
-#include <pthread.h>
 #include <common/macros.hpp>
+#include <common/make-unique-wrapper.hpp>
+
+#include <pthread.h>
 #include <sys/types.h>
 
 enum lttng_pipe_state {
@@ -17,7 +19,19 @@ enum lttng_pipe_state {
        LTTNG_PIPE_STATE_CLOSED = 2,
 };
 
+/* Close both side of pipe. */
+int lttng_pipe_close(struct lttng_pipe *pipe);
+
 struct lttng_pipe {
+       static void _lttng_pipe_close_wrapper(lttng_pipe *pipe)
+       {
+               lttng_pipe_close(pipe);
+       }
+
+       using uptr = std::unique_ptr<
+               lttng_pipe,
+               lttng::details::create_unique_class<lttng_pipe, _lttng_pipe_close_wrapper>::deleter>;
+
        /* Read: 0, Write: 1. */
        int fd[2];
        /*
@@ -69,8 +83,6 @@ struct lttng_pipe *lttng_pipe_named_open(const char *path, mode_t mode,
                int flags);
 int lttng_pipe_write_close(struct lttng_pipe *pipe);
 int lttng_pipe_read_close(struct lttng_pipe *pipe);
-/* Close both side of pipe. */
-int lttng_pipe_close(struct lttng_pipe *pipe);
 void lttng_pipe_destroy(struct lttng_pipe *pipe);
 
 ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count);
This page took 0.065779 seconds and 4 git commands to generate.