sessiond: propagate the use of ltt_session::locked_ref
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
index ca7237802e61096846d9fca358a7d3cd39767842..71f9bbcb5c10d4cba3998dfb5eecd55ec50b9c39 100644 (file)
@@ -112,7 +112,8 @@ end:
        return exists;
 }
 
-void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
+void check_session_rotation_pending_on_consumers(const ltt_session::locked_ref& session,
+                                                bool& _rotation_completed)
 {
        int ret = 0;
        struct consumer_socket *socket;
@@ -123,27 +124,27 @@ void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _ro
        enum lttng_trace_chunk_status chunk_status;
        lttng::urcu::read_lock_guard read_lock;
 
-       LTTNG_ASSERT(session.chunk_being_archived);
+       LTTNG_ASSERT(session->chunk_being_archived);
 
        /*
         * Check for a local pending rotation on all consumers (32-bit
         * user space, 64-bit user space, and kernel).
         */
-       if (!session.ust_session) {
+       if (!session->ust_session) {
                goto skip_ust;
        }
 
        cds_lfht_for_each_entry (
-               session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
-               relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
+               session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
+               relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session.ust_session->consumer->net_seq_index;
+                       session->ust_session->consumer->net_seq_index;
 
                lttng::pthread::lock_guard socket_lock(*socket->lock);
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session.id,
-                                                 session.chunk_being_archived,
+                                                 session->id,
+                                                 session->chunk_being_archived,
                                                  &exists_status);
                if (ret) {
                        ERR("Error occurred while checking rotation status on consumer daemon");
@@ -157,22 +158,22 @@ void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _ro
        }
 
 skip_ust:
-       if (!session.kernel_session) {
+       if (!session->kernel_session) {
                goto skip_kernel;
        }
 
        cds_lfht_for_each_entry (
-               session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
+               session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
                lttng::pthread::lock_guard socket_lock(*socket->lock);
 
-               relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
+               relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
                        -1ULL :
-                       session.kernel_session->consumer->net_seq_index;
+                       session->kernel_session->consumer->net_seq_index;
 
                ret = consumer_trace_chunk_exists(socket,
                                                  relayd_id,
-                                                 session.id,
-                                                 session.chunk_being_archived,
+                                                 session->id,
+                                                 session->chunk_being_archived,
                                                  &exists_status);
                if (ret) {
                        ERR("Error occurred while checking rotation status on consumer daemon");
@@ -190,20 +191,20 @@ end:
        if (!chunk_exists_on_peer) {
                uint64_t chunk_being_archived_id;
 
-               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
+               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
                DBG("Rotation of trace archive %" PRIu64
                    " of session \"%s\" is complete on all consumers",
                    chunk_being_archived_id,
-                   session.name);
+                   session->name);
        }
 
        _rotation_completed = !chunk_exists_on_peer;
        if (ret) {
                ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
                if (ret) {
-                       ERR("Failed to reset rotation state of session \"%s\"", session.name);
+                       ERR("Failed to reset rotation state of session \"%s\"", session->name);
                }
        }
 }
@@ -213,7 +214,7 @@ end:
  * Should only return non-zero in the event of a fatal error. Doing so will
  * shutdown the thread.
  */
-int check_session_rotation_pending(ltt_session& session,
+int check_session_rotation_pending(const ltt_session::locked_ref& session,
                                   notification_thread_handle& notification_thread_handle)
 {
        int ret;
@@ -223,17 +224,17 @@ int check_session_rotation_pending(ltt_session& session,
        const char *archived_chunk_name;
        uint64_t chunk_being_archived_id;
 
-       if (!session.chunk_being_archived) {
+       if (!session->chunk_being_archived) {
                ret = 0;
                goto end;
        }
 
        chunk_status =
-               lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
+               lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
        DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
-           session.name,
+           session->name,
            chunk_being_archived_id);
 
        /*
@@ -250,7 +251,7 @@ int check_session_rotation_pending(ltt_session& session,
        }
 
        check_session_rotation_pending_on_consumers(session, rotation_completed);
-       if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
+       if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
                goto check_ongoing_rotation;
        }
 
@@ -259,41 +260,41 @@ int check_session_rotation_pending(ltt_session& session,
         * rotations can start now.
         */
        chunk_status = lttng_trace_chunk_get_name(
-               session.chunk_being_archived, &archived_chunk_name, nullptr);
+               session->chunk_being_archived, &archived_chunk_name, nullptr);
        LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
-       free(session.last_archived_chunk_name);
-       session.last_archived_chunk_name = strdup(archived_chunk_name);
-       if (!session.last_archived_chunk_name) {
+       free(session->last_archived_chunk_name);
+       session->last_archived_chunk_name = strdup(archived_chunk_name);
+       if (!session->last_archived_chunk_name) {
                PERROR("Failed to duplicate archived chunk name");
        }
 
        session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
 
-       if (!session.quiet_rotation) {
-               location = session_get_trace_archive_location(&session);
+       if (!session->quiet_rotation) {
+               location = session_get_trace_archive_location(session);
                ret = notification_thread_command_session_rotation_completed(
                        &notification_thread_handle,
-                       session.id,
-                       session.last_archived_chunk_id.value,
+                       session->id,
+                       session->last_archived_chunk_id.value,
                        location);
                lttng_trace_archive_location_put(location);
                if (ret != LTTNG_OK) {
                        ERR("Failed to notify notification thread of completed rotation for session %s",
-                           session.name);
+                           session->name);
                }
        }
 
        ret = 0;
 check_ongoing_rotation:
-       if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
-               chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
+       if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
+               chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
                                                        &chunk_being_archived_id);
                LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
 
                DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
                    chunk_being_archived_id,
-                   session.name);
-               ret = timer_session_rotation_pending_check_start(&session,
+                   session->name);
+               ret = timer_session_rotation_pending_check_start(session,
                                                                 DEFAULT_ROTATE_PENDING_TIMER);
                if (ret) {
                        ERR("Failed to re-enable rotation pending timer");
@@ -307,42 +308,46 @@ end:
 }
 
 /* Call with the session and session_list locks held. */
-int launch_session_rotation(ltt_session& session)
+void launch_session_rotation(const ltt_session::locked_ref& session)
 {
        int ret;
-       struct lttng_rotate_session_return rotation_return;
 
-       DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
+       DBG_FMT("Launching scheduled time-based rotation: session_name='{}'", session->name);
 
        ASSERT_SESSION_LIST_LOCKED();
-       ASSERT_LOCKED(session._lock);
 
-       ret = cmd_rotate_session(&session,
-                                &rotation_return,
-                                false,
-                                LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
-       if (ret == LTTNG_OK) {
-               DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
-                   session.name);
+       ret = cmd_rotate_session(
+               session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+       if (ret != LTTNG_OK) {
+               LTTNG_THROW_CTL(fmt::format("Failed to launch session rotation: session_name={}",
+                                           session->name),
+                               static_cast<lttng_error_code>(ret));
        } else {
                /* Don't consider errors as fatal. */
-               DBG("Scheduled time-based rotation aborted for session %s: %s",
-                   session.name,
-                   lttng_strerror(ret));
+               DBG_FMT("Scheduled time-based rotation aborted session_name=`{}`, error='{}'",
+                       session->name,
+                       lttng_strerror(ret));
        }
-
-       return 0;
 }
 
 int run_job(const rotation_thread_job& job,
-           ltt_session& session,
+           const ltt_session::locked_ref& session,
            notification_thread_handle& notification_thread_handle)
 {
-       int ret;
+       int ret = 0;
 
        switch (job.type) {
        case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
-               ret = launch_session_rotation(session);
+               try {
+                       launch_session_rotation(session);
+                       DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
+                           session->name);
+               } catch (const lttng::ctl::error& ctl_ex) {
+                       /* Don't consider errors as fatal. */
+                       DBG("Scheduled time-based rotation aborted for session %s: %s",
+                           session->name,
+                           lttng_strerror(ctl_ex.code()));
+               }
                break;
        case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
                ret = check_session_rotation_pending(session, notification_thread_handle);
@@ -543,9 +548,9 @@ void ls::rotation_thread::_handle_job_queue()
 
                /* locked_ref will unlock the session and release the ref held by the job. */
                session_lock(job->session);
-               auto session = ltt_session::locked_ref(job->session);
+               auto session = ltt_session::locked_ref(*job->session);
 
-               if (run_job(*job, *session, _notification_thread_handle)) {
+               if (run_job(*job, session, _notification_thread_handle)) {
                        return;
                }
        }
@@ -591,9 +596,39 @@ void ls::rotation_thread::_handle_notification(const lttng_notification& notific
         * sessions.
         */
        const auto list_lock = lttng::sessiond::lock_session_list();
-       ltt_session::locked_ref session;
        try {
-               session = ltt_session::find_locked_session(condition_session_name);
+               const auto session = ltt_session::find_locked_session(condition_session_name);
+
+               if (!lttng_trigger_is_equal(session->rotate_trigger,
+                                           lttng_notification_get_const_trigger(&notification))) {
+                       DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
+                       return;
+               }
+
+               unsubscribe_session_consumed_size_rotation(*session);
+
+               ret = cmd_rotate_session(
+                       session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
+               if (ret != LTTNG_OK) {
+                       switch (ret) {
+                       case LTTNG_OK:
+                               break;
+                       case -LTTNG_ERR_ROTATION_PENDING:
+                               DBG("Rotate already pending, subscribe to the next threshold value");
+                               break;
+                       case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
+                               DBG("Rotation already happened since last stop, subscribe to the next threshold value");
+                               break;
+                       case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
+                               DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
+                               break;
+                       default:
+                               LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
+                                               static_cast<lttng_error_code>(-ret));
+                       }
+               }
+
+               subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
        } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
                DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
                        lttng_condition_type_str(condition_type),
@@ -604,35 +639,6 @@ void ls::rotation_thread::_handle_notification(const lttng_notification& notific
                 */
                return;
        }
-
-       if (!lttng_trigger_is_equal(session->rotate_trigger,
-                                   lttng_notification_get_const_trigger(&notification))) {
-               DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
-               return;
-       }
-
-       unsubscribe_session_consumed_size_rotation(*session);
-
-       ret = cmd_rotate_session(
-               session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
-       switch (ret) {
-       case LTTNG_OK:
-               break;
-       case -LTTNG_ERR_ROTATION_PENDING:
-               DBG("Rotate already pending, subscribe to the next threshold value");
-               break;
-       case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
-               DBG("Rotation already happened since last stop, subscribe to the next threshold value");
-               break;
-       case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
-               DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
-               break;
-       default:
-               LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
-                               static_cast<lttng_error_code>(-ret));
-       }
-
-       subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
 }
 
 void ls::rotation_thread::_handle_notification_channel_activity()
This page took 0.029133 seconds and 4 git commands to generate.