From: Jérémie Galarneau Date: Tue, 4 Apr 2023 17:45:24 +0000 (-0400) Subject: Cleanup: rotation-thread: enforce conding standard following fix X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=0038180de36c422cfaeade1145fa9fbc9436b8ad;p=lttng-tools.git Cleanup: rotation-thread: enforce conding standard following fix A fix introducing rotate_notification_channel_subscription_change_eventfd didn't follow the current coding standard so as to make it easier to backport to the stable branches. Clean-up the affected code to follow the current standard: - Replace the use of a raw eventfd to use the eventfd utility, - Subscribe and unsubscribe functions made use of global variables to communicate with the rotation thread: replace that with the rotation_thread class to centralize the interface, - Make the code using eventfd exception-safe (automatic memory management, use of various RAII utils), - Replacement of non-null pointers by references. Change-Id: I7e363e21b829fd0939a336aca2570fdbcc346967 Signed-off-by: Jérémie Galarneau --- diff --git a/include/lttng/notification/channel-internal.hpp b/include/lttng/notification/channel-internal.hpp index 3364704ee..44fd9cf16 100644 --- a/include/lttng/notification/channel-internal.hpp +++ b/include/lttng/notification/channel-internal.hpp @@ -8,12 +8,15 @@ #ifndef LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H #define LTTNG_NOTIFICATION_CHANNEL_INTERNAL_H -#include #include +#include #include -#include -#include + +#include + #include +#include +#include #include /* @@ -82,6 +85,10 @@ struct pending_notification { * in the pending_notifications list. */ struct lttng_notification_channel { + using uptr = std::unique_ptr< + lttng_notification_channel, + lttng::details::create_unique_class::deleter>; + pthread_mutex_t lock; int socket; struct { diff --git a/include/lttng/notification/notification-internal.hpp b/include/lttng/notification/notification-internal.hpp index 9557c38b8..9837c359b 100644 --- a/include/lttng/notification/notification-internal.hpp +++ b/include/lttng/notification/notification-internal.hpp @@ -8,16 +8,25 @@ #ifndef LTTNG_NOTIFICATION_INTERNAL_H #define LTTNG_NOTIFICATION_INTERNAL_H -#include #include -#include +#include + +#include + +#include #include +#include #include struct lttng_payload; struct lttng_payload_view; struct lttng_notification { + using uptr = std::unique_ptr< + lttng_notification, + lttng::details::create_unique_class::deleter>; + struct lttng_trigger *trigger; struct lttng_evaluation *evaluation; }; diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 7d618afe2..e9b6ac1f6 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -39,7 +39,6 @@ liblttng_sessiond_common_la_SOURCES = utils.cpp utils.hpp \ notification-thread-commands.hpp notification-thread-commands.cpp \ notification-thread-events.hpp notification-thread-events.cpp \ sessiond-config.hpp sessiond-config.cpp \ - rotate.hpp rotate.cpp \ rotation-thread.hpp rotation-thread.cpp \ timer.cpp timer.hpp \ globals.cpp \ diff --git a/src/bin/lttng-sessiond/client.cpp b/src/bin/lttng-sessiond/client.cpp index 123d07f85..0680e14fd 100644 --- a/src/bin/lttng-sessiond/client.cpp +++ b/src/bin/lttng-sessiond/client.cpp @@ -1784,7 +1784,7 @@ skip_domain: } case LTTCOMM_SESSIOND_COMMAND_DESTROY_SESSION: { - ret = cmd_destroy_session(cmd_ctx->session, the_notification_thread_handle, sock); + ret = cmd_destroy_session(cmd_ctx->session, sock); break; } case LTTCOMM_SESSIOND_COMMAND_LIST_DOMAINS: @@ -2217,8 +2217,7 @@ skip_domain: ret = cmd_rotation_set_schedule(cmd_ctx->session, set_schedule, schedule_type, - value, - the_notification_thread_handle); + value); if (ret != LTTNG_OK) { goto error; } diff --git a/src/bin/lttng-sessiond/cmd.cpp b/src/bin/lttng-sessiond/cmd.cpp index 3f066a002..413bb6500 100644 --- a/src/bin/lttng-sessiond/cmd.cpp +++ b/src/bin/lttng-sessiond/cmd.cpp @@ -22,7 +22,6 @@ #include "lttng-syscall.hpp" #include "notification-thread-commands.hpp" #include "notification-thread.hpp" -#include "rotate.hpp" #include "rotation-thread.hpp" #include "session.hpp" #include "timer.hpp" @@ -3392,7 +3391,6 @@ error: * Called with session lock held. */ int cmd_destroy_session(struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle, int *sock_fd) { int ret; @@ -3435,7 +3433,15 @@ int cmd_destroy_session(struct ltt_session *session, } if (session->rotate_size) { - unsubscribe_session_consumed_size_rotation(session, notification_thread_handle); + try { + the_rotation_thread_handle->unsubscribe_session_consumed_size_rotation( + *session); + } catch (std::exception& e) { + /* Continue the destruction of the session anyway. */ + ERR("Failed to unsubscribe rotation thread notification channel from consumed size condition during session destruction: %s", + e.what()); + } + session->rotate_size = 0; } @@ -5619,7 +5625,7 @@ end: ret = (cmd_ret == LTTNG_OK) ? cmd_ret : -((int) cmd_ret); return ret; error: - if (session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR)) { + if (session_reset_rotation_state(*session, LTTNG_ROTATION_STATE_ERROR)) { ERR("Failed to reset rotation state of session \"%s\"", session->name); } goto end; @@ -5786,8 +5792,7 @@ end: int cmd_rotation_set_schedule(struct ltt_session *session, bool activate, enum lttng_rotation_schedule_type schedule_type, - uint64_t new_value, - struct notification_thread_handle *notification_thread_handle) + uint64_t new_value) { int ret; uint64_t *parameter_value; @@ -5889,18 +5894,22 @@ int cmd_rotation_set_schedule(struct ltt_session *session, break; case LTTNG_ROTATION_SCHEDULE_TYPE_SIZE_THRESHOLD: if (activate) { - ret = subscribe_session_consumed_size_rotation( - session, new_value, notification_thread_handle); - if (ret) { - ERR("Failed to enable consumed-size notification in ROTATION_SET_SCHEDULE command"); + try { + the_rotation_thread_handle->subscribe_session_consumed_size_rotation( + *session, new_value); + } catch (std::exception& e) { + ERR("Failed to enable consumed-size notification in ROTATION_SET_SCHEDULE command: %s", + e.what()); ret = LTTNG_ERR_UNK; goto end; } } else { - ret = unsubscribe_session_consumed_size_rotation( - session, notification_thread_handle); - if (ret) { - ERR("Failed to disable consumed-size notification in ROTATION_SET_SCHEDULE command"); + try { + the_rotation_thread_handle + ->unsubscribe_session_consumed_size_rotation(*session); + } catch (std::exception& e) { + ERR("Failed to disable consumed-size notification in ROTATION_SET_SCHEDULE command: %s", + e.what()); ret = LTTNG_ERR_UNK; goto end; } diff --git a/src/bin/lttng-sessiond/cmd.hpp b/src/bin/lttng-sessiond/cmd.hpp index 445eda80c..6defe0049 100644 --- a/src/bin/lttng-sessiond/cmd.hpp +++ b/src/bin/lttng-sessiond/cmd.hpp @@ -40,9 +40,7 @@ void cmd_init(void); /* Session commands */ enum lttng_error_code cmd_create_session(struct command_ctx *cmd_ctx, int sock, struct lttng_session_descriptor **return_descriptor); -int cmd_destroy_session(struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle, - int *sock_fd); +int cmd_destroy_session(struct ltt_session *session, int *sock_fd); /* Channel commands */ int cmd_disable_channel(struct ltt_session *session, @@ -171,8 +169,7 @@ int cmd_rotate_get_info(struct ltt_session *session, uint64_t rotate_id); int cmd_rotation_set_schedule(struct ltt_session *session, bool activate, enum lttng_rotation_schedule_type schedule_type, - uint64_t value, - struct notification_thread_handle *notification_thread_handle); + uint64_t value); const struct cmd_completion_handler *cmd_pop_completion_handler(void); int start_kernel_session(struct ltt_kernel_session *ksess); diff --git a/src/bin/lttng-sessiond/globals.cpp b/src/bin/lttng-sessiond/globals.cpp index 3add6c9eb..ad9a8bf85 100644 --- a/src/bin/lttng-sessiond/globals.cpp +++ b/src/bin/lttng-sessiond/globals.cpp @@ -21,6 +21,7 @@ long the_page_size; struct health_app *the_health_sessiond; struct notification_thread_handle *the_notification_thread_handle; +lttng::sessiond::rotation_thread::uptr the_rotation_thread_handle; struct lttng_ht *the_agent_apps_ht_by_sock = nullptr; struct lttng_ht *the_trigger_agents_ht_by_domain = nullptr; diff --git a/src/bin/lttng-sessiond/kernel.cpp b/src/bin/lttng-sessiond/kernel.cpp index 6e2bdb4c6..2914475d5 100644 --- a/src/bin/lttng-sessiond/kernel.cpp +++ b/src/bin/lttng-sessiond/kernel.cpp @@ -16,7 +16,6 @@ #include "lttng-syscall.hpp" #include "modprobe.hpp" #include "notification-thread-commands.hpp" -#include "rotate.hpp" #include "sessiond-config.hpp" #include "tracker.hpp" #include "utils.hpp" diff --git a/src/bin/lttng-sessiond/lttng-sessiond.hpp b/src/bin/lttng-sessiond/lttng-sessiond.hpp index e94dd7a56..a6b17c2a8 100644 --- a/src/bin/lttng-sessiond/lttng-sessiond.hpp +++ b/src/bin/lttng-sessiond/lttng-sessiond.hpp @@ -9,19 +9,20 @@ #ifndef _LTT_SESSIOND_H #define _LTT_SESSIOND_H -#include -#include +#include "notification-thread.hpp" +#include "rotation-thread.hpp" +#include "session.hpp" +#include "sessiond-config.hpp" +#include "ust-app.hpp" -#include -#include #include #include +#include +#include #include -#include "session.hpp" -#include "ust-app.hpp" -#include "notification-thread.hpp" -#include "sessiond-config.hpp" +#include +#include /* * Consumer daemon state which is changed when spawning it, killing it or in @@ -67,6 +68,9 @@ extern struct lttng_kernel_abi_tracer_abi_version the_kernel_tracer_abi_version; /* Notification thread handle. */ extern struct notification_thread_handle *the_notification_thread_handle; +/* Rotation thread handle. */ +extern lttng::sessiond::rotation_thread::uptr the_rotation_thread_handle; + /* * This contains extra data needed for processing a command received by the * session daemon from the lttng client. diff --git a/src/bin/lttng-sessiond/main.cpp b/src/bin/lttng-sessiond/main.cpp index cc3285619..312c243cf 100644 --- a/src/bin/lttng-sessiond/main.cpp +++ b/src/bin/lttng-sessiond/main.cpp @@ -1303,7 +1303,7 @@ static void destroy_all_sessions_and_wait() goto unlock_session; } (void) cmd_stop_trace(session); - (void) cmd_destroy_session(session, the_notification_thread_handle, nullptr); + (void) cmd_destroy_session(session, nullptr); unlock_session: session_unlock(session); session_put(session); @@ -1411,10 +1411,8 @@ int main(int argc, char **argv) *ust64_channel_monitor_pipe = nullptr, *kernel_channel_monitor_pipe = nullptr; struct timer_thread_parameters timer_thread_parameters; - /* Rotation thread handle. */ - struct rotation_thread_handle *rotation_thread_handle = nullptr; /* Queue of rotation jobs populated by the sessiond-timer. */ - struct rotation_thread_timer_queue *rotation_timer_queue = nullptr; + lttng::sessiond::rotation_thread_timer_queue *rotation_timer_queue = nullptr; struct lttng_thread *client_thread = nullptr; struct lttng_thread *notification_thread = nullptr; struct lttng_thread *register_apps_thread = nullptr; @@ -1600,7 +1598,7 @@ int main(int argc, char **argv) * sessiond timer thread and the rotation thread. The main thread keeps * its ownership and destroys it when both threads have been joined. */ - rotation_timer_queue = rotation_thread_timer_queue_create(); + rotation_timer_queue = lttng::sessiond::rotation_thread_timer_queue_create(); if (!rotation_timer_queue) { retval = -1; goto stop_threads; @@ -1771,18 +1769,21 @@ int main(int argc, char **argv) goto stop_threads; } - /* rotation_thread_data acquires the pipes' read side. */ - rotation_thread_handle = - rotation_thread_handle_create(rotation_timer_queue, the_notification_thread_handle); - if (!rotation_thread_handle) { + try { + the_rotation_thread_handle = + lttng::make_unique( + *rotation_timer_queue, *the_notification_thread_handle); + } catch (const std::exception& e) { retval = -1; - ERR("Failed to create rotation thread shared data"); + ERR("Failed to create rotation thread: %s", e.what()); goto stop_threads; } - /* Create rotation thread. */ - if (!launch_rotation_thread(rotation_thread_handle)) { + try { + the_rotation_thread_handle->launch_thread(); + } catch (const std::exception& e) { retval = -1; + ERR("Failed to launch rotation thread: %s", e.what()); goto stop_threads; } @@ -1943,10 +1944,6 @@ stop_threads: rcu_thread_offline(); rcu_unregister_thread(); - if (rotation_thread_handle) { - rotation_thread_handle_destroy(rotation_thread_handle); - } - /* * After the rotation and timer thread have quit, we can safely destroy * the rotation_timer_queue. diff --git a/src/bin/lttng-sessiond/rotate.cpp b/src/bin/lttng-sessiond/rotate.cpp deleted file mode 100644 index 56a61fe2a..000000000 --- a/src/bin/lttng-sessiond/rotate.cpp +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright (C) 2017 Julien Desfossez - * Copyright (C) 2018 Jérémie Galarneau - * - * SPDX-License-Identifier: GPL-2.0-only - * - */ - -#define _LGPL_SOURCE -#include "cmd.hpp" -#include "health-sessiond.hpp" -#include "lttng-sessiond.hpp" -#include "notification-thread-commands.hpp" -#include "rotate.hpp" -#include "rotation-thread.hpp" -#include "session.hpp" -#include "utils.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -int subscribe_session_consumed_size_rotation( - struct ltt_session *session, - uint64_t size, - struct notification_thread_handle *notification_thread_handle) -{ - int ret; - enum lttng_condition_status condition_status; - enum lttng_notification_channel_status nc_status; - const uint64_t eventfd_increment_value = 1; - struct lttng_condition *rotate_condition = nullptr; - struct lttng_action *notify_action = nullptr; - const struct lttng_credentials session_creds = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid), - }; - - rotate_condition = lttng_condition_session_consumed_size_create(); - if (!rotate_condition) { - ERR("Failed to create session consumed size condition object"); - ret = -1; - goto end; - } - - condition_status = - lttng_condition_session_consumed_size_set_threshold(rotate_condition, size); - if (condition_status != LTTNG_CONDITION_STATUS_OK) { - ERR("Could not set session consumed size condition threshold (size = %" PRIu64 ")", - size); - ret = -1; - goto end; - } - - condition_status = lttng_condition_session_consumed_size_set_session_name(rotate_condition, - session->name); - if (condition_status != LTTNG_CONDITION_STATUS_OK) { - ERR("Could not set session consumed size condition session name (name = %s)", - session->name); - ret = -1; - goto end; - } - - notify_action = lttng_action_notify_create(); - if (!notify_action) { - ERR("Could not create notify action"); - ret = -1; - goto end; - } - - LTTNG_ASSERT(!session->rotate_trigger); - session->rotate_trigger = lttng_trigger_create(rotate_condition, notify_action); - if (!session->rotate_trigger) { - ERR("Could not create size-based rotation trigger"); - ret = -1; - goto end; - } - - /* Ensure this trigger is not visible to external users. */ - lttng_trigger_set_hidden(session->rotate_trigger); - lttng_trigger_set_credentials(session->rotate_trigger, &session_creds); - - nc_status = - lttng_notification_channel_subscribe(rotate_notification_channel, rotate_condition); - if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Could not subscribe to session consumed size notification"); - ret = -1; - goto end; - } - - ret = lttng_write(rotate_notification_channel_subscription_change_eventfd, - &eventfd_increment_value, - sizeof(eventfd_increment_value)); - if (ret != sizeof(eventfd_increment_value)) { - PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed"); - ret = -1; - goto end; - } - - ret = notification_thread_command_register_trigger( - notification_thread_handle, session->rotate_trigger, true); - if (ret < 0 && ret != -LTTNG_ERR_TRIGGER_EXISTS) { - ERR("Register trigger, %s", lttng_strerror(ret)); - ret = -1; - goto end; - } - - ret = 0; - -end: - lttng_condition_put(rotate_condition); - lttng_action_put(notify_action); - if (ret) { - lttng_trigger_put(session->rotate_trigger); - } - return ret; -} - -int unsubscribe_session_consumed_size_rotation( - struct ltt_session *session, struct notification_thread_handle *notification_thread_handle) -{ - int ret = 0; - enum lttng_notification_channel_status status; - const uint64_t eventfd_increment_value = 1; - - LTTNG_ASSERT(session->rotate_trigger); - status = lttng_notification_channel_unsubscribe( - rotate_notification_channel, - lttng_trigger_get_const_condition(session->rotate_trigger)); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Session unsubscribe error: %d", (int) status); - ret = -1; - goto end; - } - - ret = lttng_write(rotate_notification_channel_subscription_change_eventfd, - &eventfd_increment_value, - sizeof(eventfd_increment_value)); - if (ret != sizeof(eventfd_increment_value)) { - PERROR("Failed to wake up rotation thread as writing to the rotation thread notification channel subscription change eventfd failed"); - ret = -1; - goto end; - } - - ret = notification_thread_command_unregister_trigger(notification_thread_handle, - session->rotate_trigger); - if (ret != LTTNG_OK) { - ERR("Session unregister trigger error: %d", ret); - goto end; - } - - lttng_trigger_put(session->rotate_trigger); - session->rotate_trigger = nullptr; - - ret = 0; -end: - return ret; -} diff --git a/src/bin/lttng-sessiond/rotate.hpp b/src/bin/lttng-sessiond/rotate.hpp deleted file mode 100644 index 965a8c76c..000000000 --- a/src/bin/lttng-sessiond/rotate.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (C) 2017 Julien Desfossez - * Copyright (C) 2018 Jérémie Galarneau - * - * SPDX-License-Identifier: GPL-2.0-only - * - */ - -#ifndef ROTATE_H -#define ROTATE_H - -#include "rotation-thread.hpp" -#include - -/* - * Subscribe/unsubscribe the notification_channel from the rotation_thread to - * session usage notifications to perform size-based rotations. - */ -int subscribe_session_consumed_size_rotation(struct ltt_session *session, - uint64_t size, - struct notification_thread_handle *notification_thread_handle); -int unsubscribe_session_consumed_size_rotation(struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle); - -#endif /* ROTATE_H */ diff --git a/src/bin/lttng-sessiond/rotation-thread.cpp b/src/bin/lttng-sessiond/rotation-thread.cpp index 815b4e95f..6f58a178d 100644 --- a/src/bin/lttng-sessiond/rotation-thread.cpp +++ b/src/bin/lttng-sessiond/rotation-thread.cpp @@ -11,7 +11,6 @@ #include "health-sessiond.hpp" #include "lttng-sessiond.hpp" #include "notification-thread-commands.hpp" -#include "rotate.hpp" #include "rotation-thread.hpp" #include "session.hpp" #include "thread.hpp" @@ -22,13 +21,22 @@ #include #include #include +#include +#include +#include +#include #include #include #include +#include +#include +#include +#include #include #include #include +#include #include #include #include @@ -37,6 +45,7 @@ #include #include +#include #include #include #include @@ -44,131 +53,49 @@ #include #include -struct lttng_notification_channel *rotate_notification_channel = nullptr; -/* - * This eventfd is used to wake-up the rotation thread whenever a command - * completes on the notification channel. This ensures that any notification - * that was queued while waiting for a reply to the command is eventually - * consumed. - */ -int rotate_notification_channel_subscription_change_eventfd = -1; - -struct rotation_thread { - struct lttng_poll_event events; -}; +namespace ls = lttng::sessiond; /* * The timer thread enqueues jobs and wakes up the rotation thread. * When the rotation thread wakes up, it empties the queue. */ -struct rotation_thread_timer_queue { +struct ls::rotation_thread_timer_queue { struct lttng_pipe *event_pipe; struct cds_list_head list; pthread_mutex_t lock; }; -struct rotation_thread_handle { - struct rotation_thread_timer_queue *rotation_timer_queue; - /* Access to the notification thread cmd_queue */ - struct notification_thread_handle *notification_thread_handle; - /* Thread-specific quit pipe. */ - struct lttng_pipe *quit_pipe; -}; - namespace { struct rotation_thread_job { - enum rotation_thread_job_type type; + using uptr = std::unique_ptr< + rotation_thread_job, + lttng::details::create_unique_class>; + + enum ls::rotation_thread_job_type type; struct ltt_session *session; /* List member in struct rotation_thread_timer_queue. */ struct cds_list_head head; }; -} /* namespace */ -static const char *get_job_type_str(enum rotation_thread_job_type job_type) +const char *get_job_type_str(enum ls::rotation_thread_job_type job_type) { switch (job_type) { - case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION: + case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION: return "CHECK_PENDING_ROTATION"; - case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION: + case ls::rotation_thread_job_type::SCHEDULED_ROTATION: return "SCHEDULED_ROTATION"; default: abort(); } } -struct rotation_thread_timer_queue *rotation_thread_timer_queue_create() -{ - struct rotation_thread_timer_queue *queue = nullptr; - - queue = zmalloc(); - if (!queue) { - PERROR("Failed to allocate timer rotate queue"); - goto end; - } - - queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK); - CDS_INIT_LIST_HEAD(&queue->list); - pthread_mutex_init(&queue->lock, nullptr); -end: - return queue; -} - -void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue) -{ - if (!queue) { - return; - } - - lttng_pipe_destroy(queue->event_pipe); - - pthread_mutex_lock(&queue->lock); - LTTNG_ASSERT(cds_list_empty(&queue->list)); - pthread_mutex_unlock(&queue->lock); - pthread_mutex_destroy(&queue->lock); - free(queue); -} - -/* - * Destroy the thread data previously created by the init function. - */ -void rotation_thread_handle_destroy(struct rotation_thread_handle *handle) -{ - lttng_pipe_destroy(handle->quit_pipe); - free(handle); -} - -struct rotation_thread_handle * -rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue, - struct notification_thread_handle *notification_thread_handle) -{ - struct rotation_thread_handle *handle; - - handle = zmalloc(); - if (!handle) { - goto end; - } - - handle->rotation_timer_queue = rotation_timer_queue; - handle->notification_thread_handle = notification_thread_handle; - handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC); - if (!handle->quit_pipe) { - goto error; - } - -end: - return handle; -error: - rotation_thread_handle_destroy(handle); - return nullptr; -} - /* * Called with the rotation_thread_timer_queue lock held. * Return true if the same timer job already exists in the queue, false if not. */ -static bool timer_job_exists(const struct rotation_thread_timer_queue *queue, - enum rotation_thread_job_type job_type, - struct ltt_session *session) +bool timer_job_exists(const ls::rotation_thread_timer_queue *queue, + ls::rotation_thread_job_type job_type, + ltt_session *session) { bool exists = false; struct rotation_thread_job *job; @@ -183,164 +110,7 @@ end: return exists; } -void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue, - enum rotation_thread_job_type job_type, - struct ltt_session *session) -{ - int ret; - const char dummy = '!'; - struct rotation_thread_job *job = nullptr; - const char *job_type_str = get_job_type_str(job_type); - - pthread_mutex_lock(&queue->lock); - if (timer_job_exists(queue, job_type, session)) { - /* - * This timer job is already pending, we don't need to add - * it. - */ - goto end; - } - - job = zmalloc(); - if (!job) { - PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"", - job_type_str, - session->name); - goto end; - } - /* No reason for this to fail as the caller must hold a reference. */ - (void) session_get(session); - - job->session = session; - job->type = job_type; - cds_list_add_tail(&job->head, &queue->list); - - ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy)); - if (ret < 0) { - /* - * We do not want to block in the timer handler, the job has - * been enqueued in the list, the wakeup pipe is probably full, - * the job will be processed when the rotation_thread catches - * up. - */ - DIAGNOSTIC_PUSH - DIAGNOSTIC_IGNORE_LOGICAL_OP - if (errno == EAGAIN || errno == EWOULDBLOCK) { - DIAGNOSTIC_POP - /* - * Not an error, but would be surprising and indicate - * that the rotation thread can't keep up with the - * current load. - */ - DBG("Wake-up pipe of rotation thread job queue is full"); - goto end; - } - PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"", - job_type_str, - session->name); - goto end; - } - -end: - pthread_mutex_unlock(&queue->lock); -} - -static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle) -{ - int ret; - - /* - * Create pollset with size 3: - * - rotation thread quit pipe, - * - rotation thread timer queue pipe, - * - notification channel sock, - */ - ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC); - if (ret < 0) { - goto error; - } - - ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN); - if (ret < 0) { - ERR("Failed to add quit pipe read fd to poll set"); - goto error; - } - - ret = lttng_poll_add( - poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN); - if (ret < 0) { - ERR("Failed to add rotate_pending fd to poll set"); - goto error; - } - - return ret; -error: - lttng_poll_clean(poll_set); - return ret; -} - -static void fini_thread_state(struct rotation_thread *state) -{ - lttng_poll_clean(&state->events); - if (rotate_notification_channel) { - lttng_notification_channel_destroy(rotate_notification_channel); - } - - if (rotate_notification_channel_subscription_change_eventfd >= 0) { - const int close_ret = close(rotate_notification_channel_subscription_change_eventfd); - - if (close_ret) { - PERROR("Failed to close rotation thread notification channel subscription change eventfd"); - } - } -} - -static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state) -{ - int ret; - - memset(state, 0, sizeof(*state)); - lttng_poll_init(&state->events); - - ret = init_poll_set(&state->events, handle); - if (ret) { - ERR("Failed to initialize rotation thread poll set"); - goto end; - } - - rotate_notification_channel = - lttng_notification_channel_create(lttng_session_daemon_notification_endpoint); - if (!rotate_notification_channel) { - ERR("Could not create notification channel"); - ret = -1; - goto end; - } - ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN); - if (ret < 0) { - ERR("Failed to add notification fd to pollset"); - goto end; - } - - rotate_notification_channel_subscription_change_eventfd = - eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); - if (rotate_notification_channel_subscription_change_eventfd < 0) { - PERROR("Failed to create rotation thread notification channel subscription change eventfd"); - ret = -1; - goto end; - } - ret = lttng_poll_add( - &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN); - if (ret < 0) { - ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset"); - goto end; - } - -end: - return ret; -} - -static void check_session_rotation_pending_on_consumers(struct ltt_session *session, - bool *_rotation_completed) +void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed) { int ret = 0; struct consumer_socket *socket; @@ -351,70 +121,66 @@ static void check_session_rotation_pending_on_consumers(struct ltt_session *sess enum lttng_trace_chunk_status chunk_status; lttng::urcu::read_lock_guard read_lock; - LTTNG_ASSERT(session->chunk_being_archived); + LTTNG_ASSERT(session.chunk_being_archived); /* * Check for a local pending rotation on all consumers (32-bit * user space, 64-bit user space, and kernel). */ - if (!session->ust_session) { + if (!session.ust_session) { goto skip_ust; } cds_lfht_for_each_entry ( - session->ust_session->consumer->socks->ht, &iter, socket, node.node) { - relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ? + session.ust_session->consumer->socks->ht, &iter, socket, node.node) { + relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ? -1ULL : - session->ust_session->consumer->net_seq_index; + session.ust_session->consumer->net_seq_index; - pthread_mutex_lock(socket->lock); + lttng::pthread::lock_guard socket_lock(*socket->lock); ret = consumer_trace_chunk_exists(socket, relayd_id, - session->id, - session->chunk_being_archived, + session.id, + session.chunk_being_archived, &exists_status); if (ret) { - pthread_mutex_unlock(socket->lock); ERR("Error occurred while checking rotation status on consumer daemon"); goto end; } if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) { - pthread_mutex_unlock(socket->lock); chunk_exists_on_peer = true; goto end; } - pthread_mutex_unlock(socket->lock); } skip_ust: - if (!session->kernel_session) { + if (!session.kernel_session) { goto skip_kernel; } + cds_lfht_for_each_entry ( - session->kernel_session->consumer->socks->ht, &iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ? + session.kernel_session->consumer->socks->ht, &iter, socket, node.node) { + lttng::pthread::lock_guard socket_lock(*socket->lock); + + relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ? -1ULL : - session->kernel_session->consumer->net_seq_index; + session.kernel_session->consumer->net_seq_index; ret = consumer_trace_chunk_exists(socket, relayd_id, - session->id, - session->chunk_being_archived, + session.id, + session.chunk_being_archived, &exists_status); if (ret) { - pthread_mutex_unlock(socket->lock); ERR("Error occurred while checking rotation status on consumer daemon"); goto end; } if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) { - pthread_mutex_unlock(socket->lock); chunk_exists_on_peer = true; goto end; } - pthread_mutex_unlock(socket->lock); } skip_kernel: end: @@ -422,19 +188,20 @@ end: if (!chunk_exists_on_peer) { uint64_t chunk_being_archived_id; - chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, + chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("Rotation of trace archive %" PRIu64 " of session \"%s\" is complete on all consumers", chunk_being_archived_id, - session->name); + session.name); } - *_rotation_completed = !chunk_exists_on_peer; + + _rotation_completed = !chunk_exists_on_peer; if (ret) { ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR); if (ret) { - ERR("Failed to reset rotation state of session \"%s\"", session->name); + ERR("Failed to reset rotation state of session \"%s\"", session.name); } } } @@ -444,9 +211,8 @@ end: * Should only return non-zero in the event of a fatal error. Doing so will * shutdown the thread. */ -static int -check_session_rotation_pending(struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle) +int check_session_rotation_pending(ltt_session& session, + notification_thread_handle& notification_thread_handle) { int ret; struct lttng_trace_archive_location *location; @@ -455,17 +221,17 @@ check_session_rotation_pending(struct ltt_session *session, const char *archived_chunk_name; uint64_t chunk_being_archived_id; - if (!session->chunk_being_archived) { + if (!session.chunk_being_archived) { ret = 0; goto end; } chunk_status = - lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id); + lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64, - session->name, + session.name, chunk_being_archived_id); /* @@ -481,8 +247,8 @@ check_session_rotation_pending(struct ltt_session *session, goto check_ongoing_rotation; } - check_session_rotation_pending_on_consumers(session, &rotation_completed); - if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) { + check_session_rotation_pending_on_consumers(session, rotation_completed); + if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) { goto check_ongoing_rotation; } @@ -491,40 +257,41 @@ check_session_rotation_pending(struct ltt_session *session, * rotations can start now. */ chunk_status = lttng_trace_chunk_get_name( - session->chunk_being_archived, &archived_chunk_name, nullptr); + session.chunk_being_archived, &archived_chunk_name, nullptr); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - free(session->last_archived_chunk_name); - session->last_archived_chunk_name = strdup(archived_chunk_name); - if (!session->last_archived_chunk_name) { + free(session.last_archived_chunk_name); + session.last_archived_chunk_name = strdup(archived_chunk_name); + if (!session.last_archived_chunk_name) { PERROR("Failed to duplicate archived chunk name"); } + session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED); - if (!session->quiet_rotation) { - location = session_get_trace_archive_location(session); + if (!session.quiet_rotation) { + location = session_get_trace_archive_location(&session); ret = notification_thread_command_session_rotation_completed( - notification_thread_handle, - session->id, - session->last_archived_chunk_id.value, + ¬ification_thread_handle, + session.id, + session.last_archived_chunk_id.value, location); lttng_trace_archive_location_put(location); if (ret != LTTNG_OK) { ERR("Failed to notify notification thread of completed rotation for session %s", - session->name); + session.name); } } ret = 0; check_ongoing_rotation: - if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) { - chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, + if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) { + chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s", chunk_being_archived_id, - session->name); - ret = timer_session_rotation_pending_check_start(session, + session.name); + ret = timer_session_rotation_pending_check_start(&session, DEFAULT_ROTATE_PENDING_TIMER); if (ret) { ERR("Failed to re-enable rotation pending timer"); @@ -538,173 +305,317 @@ end: } /* Call with the session and session_list locks held. */ -static int launch_session_rotation(struct ltt_session *session) +int launch_session_rotation(ltt_session& session) { int ret; struct lttng_rotate_session_return rotation_return; - DBG("Launching scheduled time-based rotation on session \"%s\"", session->name); + DBG("Launching scheduled time-based rotation on session \"%s\"", session.name); - ret = cmd_rotate_session( - session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + ASSERT_SESSION_LIST_LOCKED(); + ASSERT_LOCKED(session.lock); + + ret = cmd_rotate_session(&session, + &rotation_return, + false, + LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); if (ret == LTTNG_OK) { DBG("Scheduled time-based rotation successfully launched on session \"%s\"", - session->name); + session.name); } else { /* Don't consider errors as fatal. */ DBG("Scheduled time-based rotation aborted for session %s: %s", - session->name, + session.name, lttng_strerror(ret)); } + return 0; } -static int run_job(struct rotation_thread_job *job, - struct ltt_session *session, - struct notification_thread_handle *notification_thread_handle) +int run_job(const rotation_thread_job& job, + ltt_session& session, + notification_thread_handle& notification_thread_handle) { int ret; - switch (job->type) { - case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION: + switch (job.type) { + case ls::rotation_thread_job_type::SCHEDULED_ROTATION: ret = launch_session_rotation(session); break; - case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION: + case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION: ret = check_session_rotation_pending(session, notification_thread_handle); break; default: abort(); } + return ret; } -static int handle_job_queue(struct rotation_thread_handle *handle, - struct rotation_thread *state __attribute__((unused)), - struct rotation_thread_timer_queue *queue) +bool shutdown_rotation_thread(void *thread_data) { - int ret = 0; + auto *handle = reinterpret_cast(thread_data); - for (;;) { - struct ltt_session *session; - struct rotation_thread_job *job; + return handle->shutdown(); +} +} /* namespace */ - /* Take the queue lock only to pop an element from the list. */ - pthread_mutex_lock(&queue->lock); - if (cds_list_empty(&queue->list)) { - pthread_mutex_unlock(&queue->lock); - break; +ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create() +{ + auto queue = zmalloc(); + if (!queue) { + PERROR("Failed to allocate timer rotate queue"); + goto end; + } + + queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK); + CDS_INIT_LIST_HEAD(&queue->list); + pthread_mutex_init(&queue->lock, nullptr); +end: + return queue; +} + +void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue) +{ + if (!queue) { + return; + } + + lttng_pipe_destroy(queue->event_pipe); + + { + lttng::pthread::lock_guard queue_lock(queue->lock); + + LTTNG_ASSERT(cds_list_empty(&queue->list)); + } + + pthread_mutex_destroy(&queue->lock); + free(queue); +} + +ls::rotation_thread::rotation_thread( + rotation_thread_timer_queue& rotation_timer_queue, + notification_thread_handle& notification_thread_handle) : + _rotation_timer_queue{ rotation_timer_queue }, + _notification_thread_handle{ notification_thread_handle } +{ + _quit_pipe.reset([]() { + auto raw_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!raw_pipe) { + LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno); } - job = cds_list_first_entry(&queue->list, typeof(*job), head); - cds_list_del(&job->head); - pthread_mutex_unlock(&queue->lock); - session_lock_list(); - session = job->session; - if (!session) { - DBG("Session \"%s\" not found", session->name != NULL ? session->name : ""); + return raw_pipe; + }()); + + _notification_channel.reset([]() { + auto channel = lttng_notification_channel_create( + lttng_session_daemon_notification_endpoint); + if (!channel) { + LTTNG_THROW_ERROR( + "Failed to create notification channel of rotation thread"); + } + + return channel; + }()); + + lttng_poll_init(&_events); + + /* + * Create pollset with size 4: + * - rotation thread quit pipe, + * - rotation thread timer queue pipe, + * - notification channel sock, + * - subscribtion change event fd + */ + if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) { + LTTNG_THROW_ERROR("Failed to create poll object for rotation thread"); + } + + if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) { + LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set"); + } + + if (lttng_poll_add(&_events, + lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe), + LPOLLIN) < 0) { + LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set"); + } + + if (lttng_poll_add(&_events, + _notification_channel_subscribtion_change_eventfd.fd(), + LPOLLIN) < 0) { + LTTNG_THROW_ERROR( + "Failed to add rotation thread notification channel subscription change eventfd to poll set"); + } + + if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) { + LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset"); + } +} + +ls::rotation_thread::~rotation_thread() +{ + lttng_poll_clean(&_events); +} + +void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue, + ls::rotation_thread_job_type job_type, + ltt_session *session) +{ + const char dummy = '!'; + struct rotation_thread_job *job = nullptr; + const char *job_type_str = get_job_type_str(job_type); + lttng::pthread::lock_guard queue_lock(queue->lock); + + if (timer_job_exists(queue, job_type, session)) { + /* + * This timer job is already pending, we don't need to add + * it. + */ + return; + } + + job = zmalloc(); + if (!job) { + PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"", + job_type_str, + session->name); + return; + } + + /* No reason for this to fail as the caller must hold a reference. */ + (void) session_get(session); + + job->session = session; + job->type = job_type; + cds_list_add_tail(&job->head, &queue->list); + + const int write_ret = + lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy)); + if (write_ret < 0) { + /* + * We do not want to block in the timer handler, the job has + * been enqueued in the list, the wakeup pipe is probably full, + * the job will be processed when the rotation_thread catches + * up. + */ + DIAGNOSTIC_PUSH + DIAGNOSTIC_IGNORE_LOGICAL_OP + if (errno == EAGAIN || errno == EWOULDBLOCK) { + DIAGNOSTIC_POP /* - * This is a non-fatal error, and we cannot report it to - * the user (timer), so just print the error and - * continue the processing. - * - * While the timer thread will purge pending signals for - * a session on the session's destruction, it is - * possible for a job targeting that session to have - * already been queued before it was destroyed. + * Not an error, but would be surprising and indicate + * that the rotation thread can't keep up with the + * current load. */ - free(job); - session_put(session); - session_unlock_list(); - continue; + DBG("Wake-up pipe of rotation thread job queue is full"); + return; } - session_lock(session); - ret = run_job(job, session, handle->notification_thread_handle); - session_unlock(session); - /* Release reference held by the job. */ - session_put(session); - session_unlock_list(); - free(job); - if (ret) { - goto end; - } + PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"", + job_type_str, + session->name); + return; } +} - ret = 0; +void ls::rotation_thread::_handle_job_queue() +{ + for (;;) { + rotation_thread_job::uptr job; + + { + /* Take the queue lock only to pop an element from the list. */ + lttng::pthread::lock_guard rotation_timer_queue_lock( + _rotation_timer_queue.lock); + if (cds_list_empty(&_rotation_timer_queue.list)) { + break; + } -end: - return ret; + job.reset(cds_list_first_entry( + &_rotation_timer_queue.list, typeof(rotation_thread_job), head)); + cds_list_del(&job->head); + } + + session_lock_list(); + const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); }); + + /* locked_ptr will unlock the session and release the ref held by the job. */ + session_lock(job->session); + auto session = ltt_session::locked_ptr(job->session); + + if (run_job(*job, *session, _notification_thread_handle)) { + return; + } + } } -static int handle_condition(const struct lttng_notification *notification, - struct notification_thread_handle *notification_thread_handle) +void ls::rotation_thread::_handle_notification(const lttng_notification ¬ification) { int ret = 0; const char *condition_session_name = nullptr; - enum lttng_condition_type condition_type; enum lttng_condition_status condition_status; enum lttng_evaluation_status evaluation_status; uint64_t consumed; - struct ltt_session *session; - const struct lttng_condition *condition = - lttng_notification_get_const_condition(notification); - const struct lttng_evaluation *evaluation = - lttng_notification_get_const_evaluation(notification); - - condition_type = lttng_condition_get_type(condition); + auto *condition = lttng_notification_get_const_condition(¬ification); + auto *evaluation = lttng_notification_get_const_evaluation(¬ification); + const auto condition_type = lttng_condition_get_type(condition); if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) { - ret = -1; - ERR("Condition type and session usage type are not the same"); - goto end; + LTTNG_THROW_ERROR("Unexpected condition type"); } - /* Fetch info to test */ + /* Fetch info to test. */ condition_status = lttng_condition_session_consumed_size_get_session_name( condition, &condition_session_name); if (condition_status != LTTNG_CONDITION_STATUS_OK) { - ERR("Session name could not be fetched"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Session name could not be fetched from notification"); } + evaluation_status = lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed); if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) { - ERR("Failed to get evaluation"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Failed to get consumed size from evaluation"); } + DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}", + condition_session_name, + consumed); + session_lock_list(); - session = session_find_by_name(condition_session_name); + const auto unlock_list = lttng::make_scope_exit([]() noexcept { session_unlock_list(); }); + + ltt_session::locked_ptr session{ [&condition_session_name]() { + auto raw_session_ptr = session_find_by_name(condition_session_name); + + if (raw_session_ptr) { + session_lock(raw_session_ptr); + } + + return raw_session_ptr; + }() }; if (!session) { - DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`", - lttng_condition_type_str(condition_type), - condition_session_name); + DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`", + lttng_condition_type_str(condition_type), + condition_session_name); /* * Not a fatal error: a session can be destroyed before we get * the chance to handle the notification. */ - ret = 0; - session_unlock_list(); - goto end; + return; } - session_lock(session); if (!lttng_trigger_is_equal(session->rotate_trigger, - lttng_notification_get_const_trigger(notification))) { - /* Notification does not originate from our rotation trigger. */ - ret = 0; - goto end_unlock; + lttng_notification_get_const_trigger(¬ification))) { + DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping"); + return; } - ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle); - if (ret) { - goto end_unlock; - } + unsubscribe_session_consumed_size_rotation(*session); ret = cmd_rotate_session( - session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); + session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED); switch (ret) { case LTTNG_OK: break; @@ -718,35 +629,16 @@ static int handle_condition(const struct lttng_notification *notification, DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value"); break; default: - ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret)); - ret = -1; - goto end_unlock; - } - - ret = subscribe_session_consumed_size_rotation( - session, consumed + session->rotate_size, notification_thread_handle); - if (ret) { - ERR("Failed to subscribe to session consumed size condition"); - goto end_unlock; + LTTNG_THROW_CTL("Failed to rotate on consumed size notification", + static_cast(-ret)); } - ret = 0; -end_unlock: - session_unlock(session); - session_put(session); - session_unlock_list(); -end: - return ret; + subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size); } -static int handle_notification_channel(int fd __attribute__((unused)), - struct rotation_thread_handle *handle, - struct rotation_thread *state __attribute__((unused))) +void ls::rotation_thread::_handle_notification_channel_activity() { - int ret; bool notification_pending = true; - struct lttng_notification *notification = nullptr; - enum lttng_notification_channel_status status; /* * A notification channel may have multiple notifications queued-up internally in @@ -765,122 +657,120 @@ static int handle_notification_channel(int fd __attribute__((unused)), * the channel's internal buffers. */ while (notification_pending) { - status = lttng_notification_channel_has_pending_notification( - rotate_notification_channel, ¬ification_pending); - if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { - ERR("Error occurred while checking for pending notification"); - ret = -1; - goto end; + const auto pending_status = lttng_notification_channel_has_pending_notification( + _notification_channel.get(), ¬ification_pending); + if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + LTTNG_THROW_ERROR("Error occurred while checking for pending notification"); } if (!notification_pending) { - ret = 0; - goto end; + return; } /* Receive the next notification. */ - status = lttng_notification_channel_get_next_notification( - rotate_notification_channel, ¬ification); - switch (status) { + lttng_notification::uptr notification; + enum lttng_notification_channel_status next_notification_status; + + { + struct lttng_notification *raw_notification_ptr; + + next_notification_status = lttng_notification_channel_get_next_notification( + _notification_channel.get(), &raw_notification_ptr); + notification.reset(raw_notification_ptr); + } + + switch (next_notification_status) { case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK: break; case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED: WARN("Dropped notification detected on notification channel used by the rotation management thread."); - ret = 0; - goto end; + return; case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED: - ERR("Notification channel was closed"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Notification channel was closed"); default: /* Unhandled conditions / errors. */ - ERR("Unknown notification channel status"); - ret = -1; - goto end; + LTTNG_THROW_ERROR("Unknown notification channel status"); } - ret = handle_condition(notification, handle->notification_thread_handle); - lttng_notification_destroy(notification); - if (ret) { - goto end; - } + _handle_notification(*notification); } -end: - return ret; } -static void *thread_rotation(void *data) +void ls::rotation_thread::_thread_function() noexcept { - int ret; - struct rotation_thread_handle *handle = (rotation_thread_handle *) data; - struct rotation_thread thread; - int queue_pipe_fd; - DBG("Started rotation thread"); + + try { + _run(); + } catch (const std::exception& e) { + ERR_FMT("Fatal rotation thread error: {}", e.what()); + } + + DBG("Thread exit"); +} + +void ls::rotation_thread::_run() +{ rcu_register_thread(); + const auto unregister_rcu_thread = + lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); }); + rcu_thread_online(); + const auto offline_rcu_thread = + lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); }); + health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION); health_code_update(); + const auto unregister_health = + lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); }); - if (!handle) { - ERR("Invalid thread context provided"); - goto end; - } - - queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe); - - ret = init_thread_state(handle, &thread); - if (ret) { - goto error; - } + const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe); while (true) { - int fd_count, i; - health_poll_entry(); DBG("Entering poll wait"); - ret = lttng_poll_wait(&thread.events, -1); - DBG("Poll wait returned (%i)", ret); + auto poll_wait_ret = lttng_poll_wait(&_events, -1); + DBG_FMT("Poll wait returned: ret={}", poll_wait_ret); health_poll_exit(); - if (ret < 0) { + if (poll_wait_ret < 0) { /* * Restart interrupted system call. */ if (errno == EINTR) { continue; } - ERR("Error encountered during lttng_poll_wait (%i)", ret); - goto error; + + LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno); } - fd_count = ret; - for (i = 0; i < fd_count; i++) { - int fd = LTTNG_POLL_GETFD(&thread.events, i); - uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i); + const auto fd_count = poll_wait_ret; + for (int i = 0; i < fd_count; i++) { + const auto fd = LTTNG_POLL_GETFD(&_events, i); + const auto revents = LTTNG_POLL_GETEV(&_events, i); - DBG("Handling fd (%i) activity (%u)", fd, revents); + DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents); if (revents & LPOLLERR) { - ERR("Polling returned an error on fd %i", fd); - goto error; + LTTNG_THROW_ERROR( + fmt::format("Polling returned an error on fd: fd={}", fd)); } - if (fd == rotate_notification_channel->socket || - fd == rotate_notification_channel_subscription_change_eventfd) { - ret = handle_notification_channel(fd, handle, &thread); - if (ret) { - ERR("Error occurred while handling activity on notification channel socket"); - goto error; + if (fd == _notification_channel->socket || + fd == _notification_channel_subscribtion_change_eventfd.fd()) { + try { + _handle_notification_channel_activity(); + } catch (const lttng::ctl::error& e) { + /* + * The only non-fatal error (rotation failed), others + * are caught at the top-level. + */ + DBG_FMT("Control error occurred while handling activity on notification channel socket: {}", + e.what()); + continue; } - if (fd == rotate_notification_channel_subscription_change_eventfd) { - uint64_t eventfd_value; - const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value)); - - if (read_ret != sizeof(eventfd_value)) { - PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd"); - goto error; - } + if (fd == _notification_channel_subscribtion_change_eventfd.fd()) { + _notification_channel_subscribtion_change_eventfd.decrement(); } } else { /* Job queue or quit pipe activity. */ @@ -891,59 +781,162 @@ static void *thread_rotation(void *data) * flushed and all references held in the queue * are released. */ - ret = handle_job_queue( - handle, &thread, handle->rotation_timer_queue); - if (ret) { - ERR("Failed to handle rotation timer pipe event"); - goto error; - } - + _handle_job_queue(); if (fd == queue_pipe_fd) { char buf; - ret = lttng_read(fd, &buf, 1); - if (ret != 1) { - ERR("Failed to read from wakeup pipe (fd = %i)", - fd); - goto error; + if (lttng_read(fd, &buf, 1) != 1) { + LTTNG_THROW_POSIX( + fmt::format( + "Failed to read from wakeup pipe: fd={}", + fd), + errno); } } else { DBG("Quit pipe activity"); - goto exit; + return; } } } } -exit: -error: - DBG("Thread exit"); - fini_thread_state(&thread); -end: - health_unregister(the_health_sessiond); - rcu_thread_offline(); - rcu_unregister_thread(); - return nullptr; } -static bool shutdown_rotation_thread(void *thread_data) +bool ls::rotation_thread::shutdown() const noexcept { - struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data; - const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe); + const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get()); return notify_thread_pipe(write_fd) == 1; } -bool launch_rotation_thread(struct rotation_thread_handle *handle) +void ls::rotation_thread::launch_thread() { - struct lttng_thread *thread; - - thread = lttng_thread_create( - "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle); + auto thread = lttng_thread_create( + "Rotation", + [](void *ptr) { + auto handle = reinterpret_cast(ptr); + + handle->_thread_function(); + return static_cast(nullptr); + }, + shutdown_rotation_thread, + nullptr, + this); if (!thread) { - goto error; + LTTNG_THROW_ERROR("Failed to launch rotation thread"); } + lttng_thread_put(thread); - return true; -error: - return false; +} + +void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session, + std::uint64_t size) +{ + const struct lttng_credentials session_creds = { + .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid), + }; + + ASSERT_LOCKED(session.lock); + + auto rotate_condition = lttng::make_unique_wrapper( + lttng_condition_session_consumed_size_create()); + if (!rotate_condition) { + LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno); + } + + auto condition_status = + lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size); + if (condition_status != LTTNG_CONDITION_STATUS_OK) { + LTTNG_THROW_ERROR(fmt::format( + "Could not set session consumed size condition threshold: size={}", size)); + } + + condition_status = lttng_condition_session_consumed_size_set_session_name(rotate_condition.get(), + session.name); + if (condition_status != LTTNG_CONDITION_STATUS_OK) { + LTTNG_THROW_ERROR(fmt::format( + "Could not set session consumed size condition session name: name=`{}`", + session.name)); + } + + auto notify_action = lttng::make_unique_wrapper( + lttng_action_notify_create()); + if (!notify_action) { + LTTNG_THROW_POSIX("Could not create notify action", errno); + } + + LTTNG_ASSERT(!session.rotate_trigger); + /* trigger acquires its own reference to condition and action on success. */ + auto trigger = lttng::make_unique_wrapper( + lttng_trigger_create(rotate_condition.get(), notify_action.get())); + if (!trigger) + { + LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno); + } + + /* Ensure this trigger is not visible to external users. */ + lttng_trigger_set_hidden(trigger.get()); + lttng_trigger_set_credentials(trigger.get(), &session_creds); + + auto nc_status = + lttng_notification_channel_subscribe(_notification_channel.get(), rotate_condition.get()); + if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification"); + } + + /* + * Ensure any notification queued during the subscription are consumed by queueing an + * event. + */ + _notification_channel_subscribtion_change_eventfd.increment(); + + const auto register_ret = notification_thread_command_register_trigger( + &_notification_thread_handle, trigger.get(), true); + if (register_ret != LTTNG_OK) { + LTTNG_THROW_CTL( + fmt::format( + "Failed to register trigger for automatic size-based rotation: session_name{}, size={}", + session.name, + size), + register_ret); + } + + /* Ownership transferred to the session. */ + session.rotate_trigger = trigger.release(); +} + +void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session) +{ + LTTNG_ASSERT(session.rotate_trigger); + + const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept { + lttng_trigger_put(session.rotate_trigger); + session.rotate_trigger = nullptr; + }); + + const auto unsubscribe_status = lttng_notification_channel_unsubscribe( + _notification_channel.get(), + lttng_trigger_get_const_condition(session.rotate_trigger)); + if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) { + LTTNG_THROW_ERROR(fmt::format( + "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}", + session.name, + static_cast(unsubscribe_status))); + } + + /* + * Ensure any notification queued during the un-subscription are consumed by queueing an + * event. + */ + _notification_channel_subscribtion_change_eventfd.increment(); + + const auto unregister_status = notification_thread_command_unregister_trigger( + &_notification_thread_handle, session.rotate_trigger); + if (unregister_status != LTTNG_OK) { + LTTNG_THROW_CTL( + fmt::format( + "Failed to unregister trigger for automatic size-based rotation: session_name{}", + session.name), + unregister_status); + } } diff --git a/src/bin/lttng-sessiond/rotation-thread.hpp b/src/bin/lttng-sessiond/rotation-thread.hpp index 6dc7c97ac..02ff2b2fe 100644 --- a/src/bin/lttng-sessiond/rotation-thread.hpp +++ b/src/bin/lttng-sessiond/rotation-thread.hpp @@ -9,44 +9,84 @@ #ifndef ROTATION_THREAD_H #define ROTATION_THREAD_H -#include -#include -#include -#include -#include +#include "notification-thread.hpp" +#include "session.hpp" + #include +#include #include +#include +#include + +#include +#include + +#include #include #include -#include "session.hpp" -#include "notification-thread.hpp" +#include +#include +#include -extern struct lttng_notification_channel *rotate_notification_channel; -extern int rotate_notification_channel_subscription_change_eventfd; +namespace lttng { +namespace sessiond { -enum rotation_thread_job_type { - ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION, - ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION +enum class rotation_thread_job_type { + SCHEDULED_ROTATION, + CHECK_PENDING_ROTATION }; struct rotation_thread_timer_queue; -struct rotation_thread_handle; -struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void); -void rotation_thread_timer_queue_destroy( - struct rotation_thread_timer_queue *queue); +class rotation_thread { +public: + using uptr = std::unique_ptr; -struct rotation_thread_handle *rotation_thread_handle_create( - struct rotation_thread_timer_queue *rotation_timer_queue, - struct notification_thread_handle *notification_thread_handle); + rotation_thread(rotation_thread_timer_queue& rotation_timer_queue, + notification_thread_handle& notification_thread_handle); + ~rotation_thread(); -void rotation_thread_handle_destroy( - struct rotation_thread_handle *handle); + /* Only use through the lttng_thread facilities. */ + void launch_thread(); + bool shutdown() const noexcept; + /* + * Subscribe/unsubscribe the rotation_thread's notification_channel to/from + * session usage notifications to perform size-based rotations. + */ + void subscribe_session_consumed_size_rotation(ltt_session& session, std::uint64_t size); + void unsubscribe_session_consumed_size_rotation(ltt_session& session); + +private: + void _thread_function() noexcept; + void _run(); + void _handle_job_queue(); + void _handle_notification(const lttng_notification& notification); + void _handle_notification_channel_activity(); + + struct rotation_thread_timer_queue& _rotation_timer_queue; + /* Access to the notification thread cmd_queue */ + notification_thread_handle& _notification_thread_handle; + /* Thread-specific quit pipe. */ + lttng_pipe::uptr _quit_pipe; + lttng_notification_channel::uptr _notification_channel; + /* + * Use an event_fd to wake-up the rotation thread whenever a command + * completes on the notification channel. This ensures that any + * notification that was queued while waiting for a reply to the command is + * eventually consumed. + */ + lttng::eventfd _notification_channel_subscribtion_change_eventfd; + lttng_poll_event _events; +}; + +struct rotation_thread_timer_queue *rotation_thread_timer_queue_create(void); +void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue); void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue, - enum rotation_thread_job_type job_type, - struct ltt_session *session); + enum rotation_thread_job_type job_type, + struct ltt_session *session); -bool launch_rotation_thread(struct rotation_thread_handle *handle); +} /* namespace sessiond */ +} /* namespace lttng */ #endif /* ROTATION_THREAD_H */ diff --git a/src/bin/lttng-sessiond/session.cpp b/src/bin/lttng-sessiond/session.cpp index 75d767c25..0532a7686 100644 --- a/src/bin/lttng-sessiond/session.cpp +++ b/src/bin/lttng-sessiond/session.cpp @@ -158,7 +158,7 @@ void session_list_wait_empty() /* * Acquire session list lock */ -void session_lock_list() +void session_lock_list() noexcept { pthread_mutex_lock(&the_session_list.lock); } @@ -166,7 +166,7 @@ void session_lock_list() /* * Try to acquire session list lock */ -int session_trylock_list() +int session_trylock_list() noexcept { return pthread_mutex_trylock(&the_session_list.lock); } @@ -174,7 +174,7 @@ int session_trylock_list() /* * Release session list lock */ -void session_unlock_list() +void session_unlock_list() noexcept { pthread_mutex_unlock(&the_session_list.lock); } @@ -928,19 +928,19 @@ static void session_notify_destruction(const struct ltt_session *session) /* * Fire each clear notifier once, and remove them from the array. */ -void session_notify_clear(struct ltt_session *session) +void session_notify_clear(ltt_session &session) { size_t i; - const size_t count = lttng_dynamic_array_get_count(&session->clear_notifiers); + const size_t count = lttng_dynamic_array_get_count(&session.clear_notifiers); for (i = 0; i < count; i++) { const struct ltt_session_clear_notifier_element *element = (ltt_session_clear_notifier_element *) lttng_dynamic_array_get_element( - &session->clear_notifiers, i); + &session.clear_notifiers, i); - element->notifier(session, element->user_data); + element->notifier(&session, element->user_data); } - lttng_dynamic_array_clear(&session->clear_notifiers); + lttng_dynamic_array_clear(&session.clear_notifiers); } static void session_release(struct urcu_ref *ref) @@ -1359,26 +1359,26 @@ bool session_access_ok(struct ltt_session *session, uid_t uid) * * Must be called with the session and session_list locks held. */ -int session_reset_rotation_state(struct ltt_session *session, enum lttng_rotation_state result) +int session_reset_rotation_state(ltt_session &session, enum lttng_rotation_state result) { int ret = 0; ASSERT_LOCKED(the_session_list.lock); - ASSERT_LOCKED(session->lock); + ASSERT_LOCKED(session.lock); - session->rotation_state = result; - if (session->rotation_pending_check_timer_enabled) { + session.rotation_state = result; + if (session.rotation_pending_check_timer_enabled) { ret = timer_session_rotation_pending_check_stop(session); } - if (session->chunk_being_archived) { + if (session.chunk_being_archived) { uint64_t chunk_id; enum lttng_trace_chunk_status chunk_status; - chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_id); + chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_id); LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK); - LTTNG_OPTIONAL_SET(&session->last_archived_chunk_id, chunk_id); - lttng_trace_chunk_put(session->chunk_being_archived); - session->chunk_being_archived = nullptr; + LTTNG_OPTIONAL_SET(&session.last_archived_chunk_id, chunk_id); + lttng_trace_chunk_put(session.chunk_being_archived); + session.chunk_being_archived = nullptr; /* * Fire the clear reply notifiers if we are completing a clear * rotation. @@ -1429,6 +1429,10 @@ end: void ls::details::locked_session_release(ltt_session *session) { + if (!session) { + return; + } + session_unlock(session); session_put(session); } diff --git a/src/bin/lttng-sessiond/session.hpp b/src/bin/lttng-sessiond/session.hpp index 0c4f71d47..49064037d 100644 --- a/src/bin/lttng-sessiond/session.hpp +++ b/src/bin/lttng-sessiond/session.hpp @@ -226,9 +226,9 @@ void session_unlock(struct ltt_session *session); * In other words, it prevents tracer configurations from changing while they * are being transmitted to the various applications. */ -void session_lock_list(); -int session_trylock_list(); -void session_unlock_list(); +void session_lock_list() noexcept; +int session_trylock_list() noexcept; +void session_unlock_list() noexcept; void session_destroy(struct ltt_session *session); int session_add_destroy_notifier(struct ltt_session *session, @@ -236,7 +236,7 @@ int session_add_destroy_notifier(struct ltt_session *session, int session_add_clear_notifier(struct ltt_session *session, ltt_session_clear_notifier notifier, void *user_data); -void session_notify_clear(struct ltt_session *session); +void session_notify_clear(ltt_session &session); bool session_get(struct ltt_session *session); void session_put(struct ltt_session *session); @@ -259,7 +259,7 @@ void session_list_wait_empty(); bool session_access_ok(struct ltt_session *session, uid_t uid); -int session_reset_rotation_state(struct ltt_session *session, +int session_reset_rotation_state(ltt_session &session, enum lttng_rotation_state result); /* Create a new trace chunk object from the session's configuration. */ diff --git a/src/bin/lttng-sessiond/timer.cpp b/src/bin/lttng-sessiond/timer.cpp index 243b56219..d87824704 100644 --- a/src/bin/lttng-sessiond/timer.cpp +++ b/src/bin/lttng-sessiond/timer.cpp @@ -233,25 +233,25 @@ end: /* * Call with session and session_list locks held. */ -int timer_session_rotation_pending_check_stop(struct ltt_session *session) +int timer_session_rotation_pending_check_stop(ltt_session &session) { int ret; - LTTNG_ASSERT(session); - LTTNG_ASSERT(session->rotation_pending_check_timer_enabled); + LTTNG_ASSERT(session.rotation_pending_check_timer_enabled); - DBG("Disabling session rotation pending check timer on session %" PRIu64, session->id); - ret = timer_stop(&session->rotation_pending_check_timer, + DBG("Disabling session rotation pending check timer on session %" PRIu64, session.id); + ret = timer_stop(&session.rotation_pending_check_timer, LTTNG_SESSIOND_SIG_PENDING_ROTATION_CHECK); if (ret == -1) { ERR("Failed to stop rotate_pending_check timer"); } else { - session->rotation_pending_check_timer_enabled = false; + session.rotation_pending_check_timer_enabled = false; /* * The timer's reference to the session can be released safely. */ - session_put(session); + session_put(&session); } + return ret; } @@ -379,13 +379,15 @@ static void *thread_timer(void *data) struct ltt_session *session = (struct ltt_session *) info.si_value.sival_ptr; - rotation_thread_enqueue_job(ctx->rotation_thread_job_queue, - ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION, - session); + rotation_thread_enqueue_job( + ctx->rotation_thread_job_queue, + lttng::sessiond::rotation_thread_job_type::CHECK_PENDING_ROTATION, + session); } else if (signr == LTTNG_SESSIOND_SIG_SCHEDULED_ROTATION) { - rotation_thread_enqueue_job(ctx->rotation_thread_job_queue, - ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION, - (struct ltt_session *) info.si_value.sival_ptr); + rotation_thread_enqueue_job( + ctx->rotation_thread_job_queue, + lttng::sessiond::rotation_thread_job_type::SCHEDULED_ROTATION, + (struct ltt_session *) info.si_value.sival_ptr); /* * The scheduled periodic rotation timer is not in * "one-shot" mode. The reference to the session is not diff --git a/src/bin/lttng-sessiond/timer.hpp b/src/bin/lttng-sessiond/timer.hpp index f746f0ed9..3cdebbc13 100644 --- a/src/bin/lttng-sessiond/timer.hpp +++ b/src/bin/lttng-sessiond/timer.hpp @@ -9,13 +9,14 @@ #ifndef SESSIOND_TIMER_H #define SESSIOND_TIMER_H +#include "rotation-thread.hpp" +#include "session.hpp" + #include #include -#include "session.hpp" - struct timer_thread_parameters { - struct rotation_thread_timer_queue *rotation_thread_job_queue; + lttng::sessiond::rotation_thread_timer_queue *rotation_thread_job_queue; }; int timer_signal_init(void); @@ -24,7 +25,7 @@ int timer_signal_init(void); int timer_session_rotation_pending_check_start(struct ltt_session *session, unsigned int interval_us); /* Stop a session's rotation pending check timer. */ -int timer_session_rotation_pending_check_stop(struct ltt_session *session); +int timer_session_rotation_pending_check_stop(ltt_session &session); /* Start a session's rotation schedule timer. */ int timer_session_rotation_schedule_timer_start(struct ltt_session *session, diff --git a/src/bin/lttng-sessiond/ust-app.cpp b/src/bin/lttng-sessiond/ust-app.cpp index aaab0a3b2..bdc7d6d20 100644 --- a/src/bin/lttng-sessiond/ust-app.cpp +++ b/src/bin/lttng-sessiond/ust-app.cpp @@ -19,7 +19,6 @@ #include "lttng-ust-ctl.hpp" #include "lttng-ust-error.hpp" #include "notification-thread-commands.hpp" -#include "rotate.hpp" #include "session.hpp" #include "ust-app.hpp" #include "ust-consumer.hpp" diff --git a/src/common/error.hpp b/src/common/error.hpp index 8b8e43645..390c4dccc 100644 --- a/src/common/error.hpp +++ b/src/common/error.hpp @@ -9,14 +9,16 @@ #define _ERROR_H #include -#include +#include +#include +#include +#include + +#include #include +#include #include -#include #include -#include -#include -#include #ifndef _GNU_SOURCE #error "lttng-tools error.h needs _GNU_SOURCE" @@ -252,6 +254,10 @@ static inline void __lttng_print_check_abort(enum lttng_error_level type) } while (0); #endif +#define DBG_FMT(format_str, args...) DBG("%s", fmt::format(format_str, ##args).c_str()) +#define WARN_FMT(format_str, args...) WARN("%s", fmt::format(format_str, ##args).c_str()) +#define ERR_FMT(format_str, args...) ERR("%s", fmt::format(format_str, ##args).c_str()) + const char *error_get_str(int32_t code); /* diff --git a/src/common/exception.cpp b/src/common/exception.cpp index 05c30fc6d..028d253f2 100644 --- a/src/common/exception.cpp +++ b/src/common/exception.cpp @@ -24,12 +24,15 @@ format_throw_location(const char *file_name, const char *function_name, unsigned } } /* namespace */ -lttng::ctl::error::error(lttng_error_code error_code, +lttng::ctl::error::error(const std::string& msg, + lttng_error_code error_code, const char *file_name, const char *function_name, unsigned int line_number) : - runtime_error( - std::string(error_get_str(error_code)), file_name, function_name, line_number), + runtime_error(msg + ": " + std::string(error_get_str(error_code)), + file_name, + function_name, + line_number), _error_code{ error_code } { } diff --git a/src/common/exception.hpp b/src/common/exception.hpp index 1efd80359..a3b0a83e2 100644 --- a/src/common/exception.hpp +++ b/src/common/exception.hpp @@ -14,12 +14,11 @@ #include -#define LTTNG_THROW_CTL(error_code) \ +#define LTTNG_THROW_CTL(msg, error_code) \ throw lttng::ctl::error(msg, error_code, __FILE__, __func__, __LINE__) #define LTTNG_THROW_POSIX(msg, errno_code) \ throw lttng::posix_error(msg, errno_code, __FILE__, __func__, __LINE__) -#define LTTNG_THROW_ERROR(msg) \ - throw lttng::runtime_error(msg, __FILE__, __func__, __LINE__) +#define LTTNG_THROW_ERROR(msg) throw lttng::runtime_error(msg, __FILE__, __func__, __LINE__) #define LTTNG_THROW_UNSUPPORTED_ERROR(msg) \ throw lttng::runtime_error(msg, __FILE__, __func__, __LINE__) #define LTTNG_THROW_COMMUNICATION_ERROR(msg) \ @@ -50,10 +49,11 @@ namespace ctl { /* Wrap lttng_error_code errors which may be reported through liblttng-ctl's interface. */ class error : public runtime_error { public: - explicit error(lttng_error_code error_code, - const char *file_name, - const char *function_name, - unsigned int line_number); + explicit error(const std::string& msg, + lttng_error_code error_code, + const char *file_name, + const char *function_name, + unsigned int line_number); lttng_error_code code() const noexcept { diff --git a/src/common/file-descriptor.cpp b/src/common/file-descriptor.cpp index 9491a13af..4042c61ce 100644 --- a/src/common/file-descriptor.cpp +++ b/src/common/file-descriptor.cpp @@ -49,7 +49,7 @@ lttng::file_descriptor::~file_descriptor() noexcept _cleanup(); } -int lttng::file_descriptor::_fd() const noexcept +int lttng::file_descriptor::fd() const noexcept { LTTNG_ASSERT(is_valid_fd(_raw_fd)); return _raw_fd; @@ -87,9 +87,9 @@ void lttng::file_descriptor::write(const void *buffer, std::size_t size) max_supported_write_size)); } - const auto write_ret = lttng_write(_fd(), buffer, size); + const auto write_ret = lttng_write(fd(), buffer, size); if (write_ret < 0 || static_cast(write_ret) != size) { - LTTNG_THROW_POSIX(fmt::format("Failed to write to file descriptor: fd={}", _fd()), + LTTNG_THROW_POSIX(fmt::format("Failed to write to file descriptor: fd={}", fd()), errno); } } @@ -111,9 +111,9 @@ void lttng::file_descriptor::read(void *buffer, std::size_t size) max_supported_read_size)); } - const auto read_ret = lttng_read(_fd(), buffer, size); + const auto read_ret = lttng_read(fd(), buffer, size); if (read_ret < 0 || static_cast(read_ret) != size) { - LTTNG_THROW_POSIX(fmt::format("Failed to read from file descriptor: fd={}", _fd()), + LTTNG_THROW_POSIX(fmt::format("Failed to read from file descriptor: fd={}", fd()), errno); } } diff --git a/src/common/file-descriptor.hpp b/src/common/file-descriptor.hpp index 164842f86..68686a4ad 100644 --- a/src/common/file-descriptor.hpp +++ b/src/common/file-descriptor.hpp @@ -42,8 +42,8 @@ public: */ void write(const void *buffer, std::size_t size); + int fd() const noexcept; protected: - int _fd() const noexcept; void _cleanup() noexcept; private: diff --git a/src/common/pipe.hpp b/src/common/pipe.hpp index d46e65d68..4e6afc567 100644 --- a/src/common/pipe.hpp +++ b/src/common/pipe.hpp @@ -8,8 +8,10 @@ #ifndef LTTNG_PIPE_H #define LTTNG_PIPE_H -#include #include +#include + +#include #include enum lttng_pipe_state { @@ -17,7 +19,19 @@ enum lttng_pipe_state { LTTNG_PIPE_STATE_CLOSED = 2, }; +/* Close both side of pipe. */ +int lttng_pipe_close(struct lttng_pipe *pipe); + struct lttng_pipe { + static void _lttng_pipe_close_wrapper(lttng_pipe *pipe) + { + lttng_pipe_close(pipe); + } + + using uptr = std::unique_ptr< + lttng_pipe, + lttng::details::create_unique_class::deleter>; + /* Read: 0, Write: 1. */ int fd[2]; /* @@ -69,8 +83,6 @@ struct lttng_pipe *lttng_pipe_named_open(const char *path, mode_t mode, int flags); int lttng_pipe_write_close(struct lttng_pipe *pipe); int lttng_pipe_read_close(struct lttng_pipe *pipe); -/* Close both side of pipe. */ -int lttng_pipe_close(struct lttng_pipe *pipe); void lttng_pipe_destroy(struct lttng_pipe *pipe); ssize_t lttng_pipe_read(struct lttng_pipe *pipe, void *buf, size_t count);