2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include <lttng/notification/channel-internal.h>
36 #include <lttng/rotate-internal.h>
38 #include "rotation-thread.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
44 #include "sessiond-timer.h"
47 #include <urcu/list.h>
48 #include <urcu/rculfhash.h>
51 * Store a struct rotation_channel_info for each channel that is currently
52 * being rotated by the consumer.
54 struct cds_lfht
*channel_pending_rotate_ht
;
56 struct lttng_notification_channel
*rotate_notification_channel
= NULL
;
58 struct rotation_thread_state
{
59 struct lttng_poll_event events
;
63 void channel_rotation_info_destroy(struct rotation_channel_info
*channel_info
)
70 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
72 struct rotation_channel_key
*channel_key
= (struct rotation_channel_key
*) key
;
73 struct rotation_channel_info
*channel_info
;
75 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
76 rotate_channels_ht_node
);
78 return !!((channel_key
->key
== channel_info
->channel_key
.key
) &&
79 (channel_key
->domain
== channel_info
->channel_key
.domain
));
83 struct rotation_channel_info
*lookup_channel_pending(uint64_t key
,
84 enum lttng_domain_type domain
)
86 struct cds_lfht_iter iter
;
87 struct cds_lfht_node
*node
;
88 struct rotation_channel_info
*channel_info
= NULL
;
89 struct rotation_channel_key channel_key
= { .key
= key
,
92 cds_lfht_lookup(channel_pending_rotate_ht
,
93 hash_channel_key(&channel_key
),
96 node
= cds_lfht_iter_get_node(&iter
);
101 channel_info
= caa_container_of(node
, struct rotation_channel_info
,
102 rotate_channels_ht_node
);
103 cds_lfht_del(channel_pending_rotate_ht
, node
);
109 * Destroy the thread data previously created by the init function.
111 void rotation_thread_handle_destroy(
112 struct rotation_thread_handle
*handle
)
120 if (handle
->ust32_consumer
>= 0) {
121 ret
= close(handle
->ust32_consumer
);
123 PERROR("close 32-bit consumer channel rotation pipe");
126 if (handle
->ust64_consumer
>= 0) {
127 ret
= close(handle
->ust64_consumer
);
129 PERROR("close 64-bit consumer channel rotation pipe");
132 if (handle
->kernel_consumer
>= 0) {
133 ret
= close(handle
->kernel_consumer
);
135 PERROR("close kernel consumer channel rotation pipe");
143 struct rotation_thread_handle
*rotation_thread_handle_create(
144 struct lttng_pipe
*ust32_channel_rotate_pipe
,
145 struct lttng_pipe
*ust64_channel_rotate_pipe
,
146 struct lttng_pipe
*kernel_channel_rotate_pipe
,
147 int thread_quit_pipe
,
148 struct rotation_thread_timer_queue
*rotation_timer_queue
,
149 struct notification_thread_handle
*notification_thread_handle
,
150 sem_t
*notification_thread_ready
)
152 struct rotation_thread_handle
*handle
;
154 handle
= zmalloc(sizeof(*handle
));
159 if (ust32_channel_rotate_pipe
) {
160 handle
->ust32_consumer
=
161 lttng_pipe_release_readfd(
162 ust32_channel_rotate_pipe
);
163 if (handle
->ust32_consumer
< 0) {
167 handle
->ust32_consumer
= -1;
169 if (ust64_channel_rotate_pipe
) {
170 handle
->ust64_consumer
=
171 lttng_pipe_release_readfd(
172 ust64_channel_rotate_pipe
);
173 if (handle
->ust64_consumer
< 0) {
177 handle
->ust64_consumer
= -1;
179 if (kernel_channel_rotate_pipe
) {
180 handle
->kernel_consumer
=
181 lttng_pipe_release_readfd(
182 kernel_channel_rotate_pipe
);
183 if (handle
->kernel_consumer
< 0) {
187 handle
->kernel_consumer
= -1;
189 handle
->thread_quit_pipe
= thread_quit_pipe
;
190 handle
->rotation_timer_queue
= rotation_timer_queue
;
191 handle
->notification_thread_handle
= notification_thread_handle
;
192 handle
->notification_thread_ready
= notification_thread_ready
;
197 rotation_thread_handle_destroy(handle
);
202 int init_poll_set(struct lttng_poll_event
*poll_set
,
203 struct rotation_thread_handle
*handle
)
208 * Create pollset with size 5:
209 * - sessiond quit pipe
210 * - sessiond timer pipe,
211 * - consumerd (32-bit user space) channel rotate pipe,
212 * - consumerd (64-bit user space) channel rotate pipe,
213 * - consumerd (kernel) channel rotate pipe,
215 ret
= lttng_poll_create(poll_set
, 5, LTTNG_CLOEXEC
);
220 ret
= lttng_poll_add(poll_set
, handle
->thread_quit_pipe
,
223 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
226 ret
= lttng_poll_add(poll_set
,
227 lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
),
230 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
233 ret
= lttng_poll_add(poll_set
, handle
->ust32_consumer
,
236 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
239 ret
= lttng_poll_add(poll_set
, handle
->ust64_consumer
,
242 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
245 if (handle
->kernel_consumer
>= 0) {
246 ret
= lttng_poll_add(poll_set
, handle
->kernel_consumer
,
249 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
257 lttng_poll_clean(poll_set
);
262 void fini_thread_state(struct rotation_thread_state
*state
)
266 lttng_poll_clean(&state
->events
);
267 ret
= cds_lfht_destroy(channel_pending_rotate_ht
, NULL
);
269 if (rotate_notification_channel
) {
270 lttng_notification_channel_destroy(rotate_notification_channel
);
275 int init_thread_state(struct rotation_thread_handle
*handle
,
276 struct rotation_thread_state
*state
)
280 memset(state
, 0, sizeof(*state
));
281 lttng_poll_init(&state
->events
);
283 ret
= init_poll_set(&state
->events
, handle
);
285 ERR("[rotation-thread] Failed to initialize rotation thread poll set");
289 channel_pending_rotate_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
290 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
291 if (!channel_pending_rotate_ht
) {
292 ERR("[rotation-thread] Failed to create channel pending rotation hash table");
298 * We wait until the notification thread is ready to create the
299 * notification channel and add it to the poll_set.
301 sem_wait(handle
->notification_thread_ready
);
302 rotate_notification_channel
= lttng_notification_channel_create(
303 lttng_session_daemon_notification_endpoint
);
304 if (!rotate_notification_channel
) {
305 ERR("[rotation-thread] Could not create notification channel");
309 ret
= lttng_poll_add(&state
->events
, rotate_notification_channel
->socket
,
312 ERR("[rotation-thread] Failed to add notification fd to pollset");
321 int handle_channel_rotation_pipe(int fd
, uint32_t revents
,
322 struct rotation_thread_handle
*handle
,
323 struct rotation_thread_state
*state
)
326 enum lttng_domain_type domain
;
327 struct rotation_channel_info
*channel_info
;
328 struct ltt_session
*session
= NULL
;
331 if (fd
== handle
->ust32_consumer
||
332 fd
== handle
->ust64_consumer
) {
333 domain
= LTTNG_DOMAIN_UST
;
334 } else if (fd
== handle
->kernel_consumer
) {
335 domain
= LTTNG_DOMAIN_KERNEL
;
337 ERR("[rotation-thread] Unknown channel rotation pipe fd %d",
342 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
343 ret
= lttng_poll_del(&state
->events
, fd
);
345 ERR("[rotation-thread] Failed to remove consumer "
346 "rotation pipe from poll set");
352 ret
= read(fd
, &key
, sizeof(key
));
353 } while (ret
== -1 && errno
== EINTR
);
354 if (ret
!= sizeof(key
)) {
355 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
361 DBG("[rotation-thread] Received notification for chan %" PRIu64
362 ", domain %d", key
, domain
);
364 channel_info
= lookup_channel_pending(key
, domain
);
366 ERR("[rotation-thread] Failed to find channel_info (key = %"
373 session
= session_find_by_id(channel_info
->session_id
);
376 * The session may have been destroyed before we had a chance to
377 * perform this action, return gracefully.
379 DBG("[rotation-thread] Session %" PRIu64
" not found",
380 channel_info
->session_id
);
382 goto end_unlock_session_list
;
385 session_lock(session
);
386 if (--session
->nr_chan_rotate_pending
== 0) {
387 time_t now
= time(NULL
);
389 if (now
== (time_t) -1) {
390 session
->rotation_state
= LTTNG_ROTATION_STATE_ERROR
;
392 goto end_unlock_session
;
395 ret
= rename_complete_chunk(session
, now
);
397 ERR("Failed to rename completed rotation chunk");
398 goto end_unlock_session
;
400 session
->rotate_pending
= false;
401 session
->last_chunk_start_ts
= session
->current_chunk_start_ts
;
402 if (session
->rotate_pending_relay
) {
403 ret
= sessiond_timer_rotate_pending_start(
405 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
407 ERR("Failed to enable rotate pending timer");
409 goto end_unlock_session
;
412 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
414 DBG("Rotation completed for session %s", session
->name
);
420 channel_rotation_info_destroy(channel_info
);
421 session_unlock(session
);
422 end_unlock_session_list
:
423 session_unlock_list();
430 * Process the rotate_pending check, called with session lock held.
433 int rotate_pending_relay_timer(struct ltt_session
*session
)
437 DBG("[rotation-thread] Check rotate pending on session %" PRIu64
,
439 ret
= relay_rotate_pending(session
, session
->current_archive_id
- 1);
441 ERR("[rotation-thread] Check relay rotate pending");
445 DBG("[rotation-thread] Rotation completed on the relay for "
446 "session %" PRIu64
, session
->id
);
448 * Now we can clear the pending flag in the session. New
449 * rotations can start now.
451 session
->rotate_pending_relay
= false;
452 session
->rotation_state
= LTTNG_ROTATION_STATE_COMPLETED
;
453 } else if (ret
== 1) {
454 DBG("[rotation-thread] Rotation still pending on the relay for "
455 "session %" PRIu64
, session
->id
);
456 ret
= sessiond_timer_rotate_pending_start(session
,
457 DEFAULT_ROTATE_PENDING_RELAY_TIMER
);
459 ERR("Re-enabling rotate pending timer");
472 * Process the rotate_timer, called with session lock held.
475 int rotate_timer(struct ltt_session
*session
)
480 * Complete _at most_ one scheduled rotation on a stopped session.
482 if (!session
->active
&& session
->rotate_timer_enabled
&&
483 session
->rotated_after_last_stop
) {
488 /* Ignore this timer if a rotation is already in progress. */
489 if (session
->rotate_pending
|| session
->rotate_pending_relay
) {
494 DBG("[rotation-thread] Rotate timer on session %s", session
->name
);
496 ret
= cmd_rotate_session(session
, NULL
);
497 if (ret
== -LTTNG_ERR_ROTATION_PENDING
) {
498 DBG("Scheduled rotation aborted since a rotation is already in progress");
501 } else if (ret
!= LTTNG_OK
) {
502 ERR("[rotation-thread] Automatic time-triggered rotation failed with error code %i", ret
);
514 int handle_rotate_timer_pipe(uint32_t revents
,
515 struct rotation_thread_handle
*handle
,
516 struct rotation_thread_state
*state
,
517 struct rotation_thread_timer_queue
*queue
)
520 int fd
= lttng_pipe_get_readfd(queue
->event_pipe
);
521 struct ltt_session
*session
;
524 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
525 ret
= lttng_poll_del(&state
->events
, fd
);
527 ERR("[rotation-thread] Failed to remove consumer "
528 "rotate pending pipe from poll set");
533 ret
= lttng_read(fd
, buf
, 1);
535 ERR("[rotation-thread] Failed to read from wakeup pipe (fd = %i)", fd
);
541 struct sessiond_rotation_timer
*timer_data
;
544 * Take the queue lock only to pop elements from the list.
546 pthread_mutex_lock(&queue
->lock
);
547 if (cds_list_empty(&queue
->list
)) {
548 pthread_mutex_unlock(&queue
->lock
);
551 timer_data
= cds_list_first_entry(&queue
->list
,
552 struct sessiond_rotation_timer
, head
);
553 cds_list_del(&timer_data
->head
);
554 pthread_mutex_unlock(&queue
->lock
);
557 * session lock to lookup the session ID.
560 session
= session_find_by_id(timer_data
->session_id
);
562 DBG("[rotation-thread] Session %" PRIu64
" not found",
563 timer_data
->session_id
);
565 * This is a non-fatal error, and we cannot report it to the
566 * user (timer), so just print the error and continue the
569 session_unlock_list();
575 * Take the session lock and release the session_list lock.
577 session_lock(session
);
578 session_unlock_list();
580 if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_PENDING
) {
581 ret
= rotate_pending_relay_timer(session
);
582 } else if (timer_data
->signal
== LTTNG_SESSIOND_SIG_ROTATE_TIMER
) {
583 ret
= rotate_timer(session
);
585 ERR("Unknown signal in rotate timer %d", timer_data
->signal
);
588 session_unlock(session
);
591 ERR("Error processing timer");
602 int handle_condition(
603 const struct lttng_condition
*condition
,
604 const struct lttng_evaluation
*evaluation
,
605 struct notification_thread_handle
*notification_thread_handle
)
608 const char *condition_session_name
= NULL
;
609 enum lttng_condition_type condition_type
;
610 enum lttng_condition_status condition_status
;
611 enum lttng_evaluation_status evaluation_status
;
613 struct ltt_session
*session
;
615 condition_type
= lttng_condition_get_type(condition
);
617 if (condition_type
!= LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
) {
619 ERR("[rotation-thread] Condition type and session usage type are not the same");
623 /* Fetch info to test */
624 condition_status
= lttng_condition_session_consumed_size_get_session_name(
625 condition
, &condition_session_name
);
626 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
627 ERR("[rotation-thread] Session name could not be fetched");
631 evaluation_status
= lttng_evaluation_session_consumed_size_get_consumed_size(evaluation
,
633 if (evaluation_status
!= LTTNG_EVALUATION_STATUS_OK
) {
634 ERR("[rotation-thread] Failed to get evaluation");
640 session
= session_find_by_name(condition_session_name
);
643 session_unlock_list();
644 ERR("[rotation-thread] Session \"%s\" not found",
645 condition_session_name
);
648 session_lock(session
);
649 session_unlock_list();
651 ret
= unsubscribe_session_consumed_size_rotation(session
,
652 notification_thread_handle
);
657 ret
= cmd_rotate_session(session
, NULL
);
658 if (ret
== -LTTNG_ERR_ROTATION_PENDING
) {
659 DBG("Rotate already pending, subscribe to the next threshold value");
661 } else if (ret
!= LTTNG_OK
) {
662 ERR("[rotation-thread] Failed to rotate on size notification with error: %s",
663 lttng_strerror(ret
));
667 ret
= subscribe_session_consumed_size_rotation(session
,
668 consumed
+ session
->rotate_size
,
669 notification_thread_handle
);
671 ERR("[rotation-thread] Failed to subscribe to session consumed size condition");
677 session_unlock(session
);
683 int handle_notification_channel(int fd
, uint32_t revents
,
684 struct rotation_thread_handle
*handle
,
685 struct rotation_thread_state
*state
)
688 bool notification_pending
;
689 struct lttng_notification
*notification
= NULL
;
690 enum lttng_notification_channel_status status
;
691 const struct lttng_evaluation
*notification_evaluation
;
692 const struct lttng_condition
*notification_condition
;
694 status
= lttng_notification_channel_has_pending_notification(
695 rotate_notification_channel
, ¬ification_pending
);
696 if (status
!= LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
) {
697 ERR("[rotation-thread ]Error occured while checking for pending notification");
702 if (!notification_pending
) {
707 /* Receive the next notification. */
708 status
= lttng_notification_channel_get_next_notification(
709 rotate_notification_channel
,
713 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
:
715 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED
:
716 /* Not an error, we will wait for the next one */
719 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED
:
720 ERR("Notification channel was closed");
724 /* Unhandled conditions / errors. */
725 ERR("Unknown notification channel status");
730 notification_condition
= lttng_notification_get_condition(notification
);
731 notification_evaluation
= lttng_notification_get_evaluation(notification
);
733 ret
= handle_condition(notification_condition
, notification_evaluation
,
734 handle
->notification_thread_handle
);
737 lttng_notification_destroy(notification
);
741 void *thread_rotation(void *data
)
744 struct rotation_thread_handle
*handle
= data
;
745 struct rotation_thread_state state
;
747 DBG("[rotation-thread] Started rotation thread");
750 ERR("[rotation-thread] Invalid thread context provided");
754 rcu_register_thread();
757 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_ROTATION
);
758 health_code_update();
760 ret
= init_thread_state(handle
, &state
);
765 /* Ready to handle client connections. */
766 sessiond_notify_ready();
772 DBG("[rotation-thread] Entering poll wait");
773 ret
= lttng_poll_wait(&state
.events
, -1);
774 DBG("[rotation-thread] Poll wait returned (%i)", ret
);
778 * Restart interrupted system call.
780 if (errno
== EINTR
) {
783 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret
);
788 for (i
= 0; i
< fd_count
; i
++) {
789 int fd
= LTTNG_POLL_GETFD(&state
.events
, i
);
790 uint32_t revents
= LTTNG_POLL_GETEV(&state
.events
, i
);
792 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
795 if (fd
== handle
->thread_quit_pipe
) {
796 DBG("[rotation-thread] Quit pipe activity");
798 } else if (fd
== lttng_pipe_get_readfd(handle
->rotation_timer_queue
->event_pipe
)) {
799 ret
= handle_rotate_timer_pipe(revents
,
800 handle
, &state
, handle
->rotation_timer_queue
);
802 ERR("[rotation-thread] Failed to handle rotation timer pipe event");
805 } else if (fd
== handle
->ust32_consumer
||
806 fd
== handle
->ust64_consumer
||
807 fd
== handle
->kernel_consumer
) {
808 ret
= handle_channel_rotation_pipe(fd
,
809 revents
, handle
, &state
);
811 ERR("[rotation-thread] Failed to handle channel rotation pipe");
814 } else if (fd
== rotate_notification_channel
->socket
) {
815 ret
= handle_notification_channel(fd
, revents
,
818 ERR("[rotation-thread] Error occured while handling activity on notification channel socket");
826 DBG("[rotation-thread] Exit");
827 fini_thread_state(&state
);
828 health_unregister(health_sessiond
);
829 rcu_thread_offline();
830 rcu_unregister_thread();