Fix: sessiond: size-based notification occasionally not triggered
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
1 /*
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * SPDX-License-Identifier: GPL-2.0-only
6 *
7 */
8
9 #define _LGPL_SOURCE
10 #include "cmd.hpp"
11 #include "health-sessiond.hpp"
12 #include "lttng-sessiond.hpp"
13 #include "notification-thread-commands.hpp"
14 #include "rotate.hpp"
15 #include "rotation-thread.hpp"
16 #include "session.hpp"
17 #include "thread.hpp"
18 #include "timer.hpp"
19 #include "utils.hpp"
20
21 #include <common/align.hpp>
22 #include <common/config/session-config.hpp>
23 #include <common/defaults.hpp>
24 #include <common/error.hpp>
25 #include <common/futex.hpp>
26 #include <common/hashtable/utils.hpp>
27 #include <common/kernel-ctl/kernel-ctl.hpp>
28 #include <common/time.hpp>
29 #include <common/urcu.hpp>
30 #include <common/utils.hpp>
31
32 #include <lttng/condition/condition-internal.hpp>
33 #include <lttng/location-internal.hpp>
34 #include <lttng/notification/channel-internal.hpp>
35 #include <lttng/notification/notification-internal.hpp>
36 #include <lttng/rotate-internal.hpp>
37 #include <lttng/trigger/trigger.h>
38
39 #include <inttypes.h>
40 #include <signal.h>
41 #include <sys/eventfd.h>
42 #include <sys/stat.h>
43 #include <time.h>
44 #include <urcu.h>
45 #include <urcu/list.h>
46
47 struct lttng_notification_channel *rotate_notification_channel = nullptr;
48 /*
49 * This eventfd is used to wake-up the rotation thread whenever a command
50 * completes on the notification channel. This ensures that any notification
51 * that was queued while waiting for a reply to the command is eventually
52 * consumed.
53 */
54 int rotate_notification_channel_subscription_change_eventfd = -1;
55
56 struct rotation_thread {
57 struct lttng_poll_event events;
58 };
59
60 /*
61 * The timer thread enqueues jobs and wakes up the rotation thread.
62 * When the rotation thread wakes up, it empties the queue.
63 */
64 struct rotation_thread_timer_queue {
65 struct lttng_pipe *event_pipe;
66 struct cds_list_head list;
67 pthread_mutex_t lock;
68 };
69
70 struct rotation_thread_handle {
71 struct rotation_thread_timer_queue *rotation_timer_queue;
72 /* Access to the notification thread cmd_queue */
73 struct notification_thread_handle *notification_thread_handle;
74 /* Thread-specific quit pipe. */
75 struct lttng_pipe *quit_pipe;
76 };
77
78 namespace {
79 struct rotation_thread_job {
80 enum rotation_thread_job_type type;
81 struct ltt_session *session;
82 /* List member in struct rotation_thread_timer_queue. */
83 struct cds_list_head head;
84 };
85 } /* namespace */
86
87 static const char *get_job_type_str(enum rotation_thread_job_type job_type)
88 {
89 switch (job_type) {
90 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
91 return "CHECK_PENDING_ROTATION";
92 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
93 return "SCHEDULED_ROTATION";
94 default:
95 abort();
96 }
97 }
98
99 struct rotation_thread_timer_queue *rotation_thread_timer_queue_create()
100 {
101 struct rotation_thread_timer_queue *queue = nullptr;
102
103 queue = zmalloc<rotation_thread_timer_queue>();
104 if (!queue) {
105 PERROR("Failed to allocate timer rotate queue");
106 goto end;
107 }
108
109 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
110 CDS_INIT_LIST_HEAD(&queue->list);
111 pthread_mutex_init(&queue->lock, nullptr);
112 end:
113 return queue;
114 }
115
116 void rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
117 {
118 if (!queue) {
119 return;
120 }
121
122 lttng_pipe_destroy(queue->event_pipe);
123
124 pthread_mutex_lock(&queue->lock);
125 LTTNG_ASSERT(cds_list_empty(&queue->list));
126 pthread_mutex_unlock(&queue->lock);
127 pthread_mutex_destroy(&queue->lock);
128 free(queue);
129 }
130
131 /*
132 * Destroy the thread data previously created by the init function.
133 */
134 void rotation_thread_handle_destroy(struct rotation_thread_handle *handle)
135 {
136 lttng_pipe_destroy(handle->quit_pipe);
137 free(handle);
138 }
139
140 struct rotation_thread_handle *
141 rotation_thread_handle_create(struct rotation_thread_timer_queue *rotation_timer_queue,
142 struct notification_thread_handle *notification_thread_handle)
143 {
144 struct rotation_thread_handle *handle;
145
146 handle = zmalloc<rotation_thread_handle>();
147 if (!handle) {
148 goto end;
149 }
150
151 handle->rotation_timer_queue = rotation_timer_queue;
152 handle->notification_thread_handle = notification_thread_handle;
153 handle->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
154 if (!handle->quit_pipe) {
155 goto error;
156 }
157
158 end:
159 return handle;
160 error:
161 rotation_thread_handle_destroy(handle);
162 return nullptr;
163 }
164
165 /*
166 * Called with the rotation_thread_timer_queue lock held.
167 * Return true if the same timer job already exists in the queue, false if not.
168 */
169 static bool timer_job_exists(const struct rotation_thread_timer_queue *queue,
170 enum rotation_thread_job_type job_type,
171 struct ltt_session *session)
172 {
173 bool exists = false;
174 struct rotation_thread_job *job;
175
176 cds_list_for_each_entry (job, &queue->list, head) {
177 if (job->session == session && job->type == job_type) {
178 exists = true;
179 goto end;
180 }
181 }
182 end:
183 return exists;
184 }
185
186 void rotation_thread_enqueue_job(struct rotation_thread_timer_queue *queue,
187 enum rotation_thread_job_type job_type,
188 struct ltt_session *session)
189 {
190 int ret;
191 const char dummy = '!';
192 struct rotation_thread_job *job = nullptr;
193 const char *job_type_str = get_job_type_str(job_type);
194
195 pthread_mutex_lock(&queue->lock);
196 if (timer_job_exists(queue, job_type, session)) {
197 /*
198 * This timer job is already pending, we don't need to add
199 * it.
200 */
201 goto end;
202 }
203
204 job = zmalloc<rotation_thread_job>();
205 if (!job) {
206 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
207 job_type_str,
208 session->name);
209 goto end;
210 }
211 /* No reason for this to fail as the caller must hold a reference. */
212 (void) session_get(session);
213
214 job->session = session;
215 job->type = job_type;
216 cds_list_add_tail(&job->head, &queue->list);
217
218 ret = lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
219 if (ret < 0) {
220 /*
221 * We do not want to block in the timer handler, the job has
222 * been enqueued in the list, the wakeup pipe is probably full,
223 * the job will be processed when the rotation_thread catches
224 * up.
225 */
226 DIAGNOSTIC_PUSH
227 DIAGNOSTIC_IGNORE_LOGICAL_OP
228 if (errno == EAGAIN || errno == EWOULDBLOCK) {
229 DIAGNOSTIC_POP
230 /*
231 * Not an error, but would be surprising and indicate
232 * that the rotation thread can't keep up with the
233 * current load.
234 */
235 DBG("Wake-up pipe of rotation thread job queue is full");
236 goto end;
237 }
238 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
239 job_type_str,
240 session->name);
241 goto end;
242 }
243
244 end:
245 pthread_mutex_unlock(&queue->lock);
246 }
247
248 static int init_poll_set(struct lttng_poll_event *poll_set, struct rotation_thread_handle *handle)
249 {
250 int ret;
251
252 /*
253 * Create pollset with size 3:
254 * - rotation thread quit pipe,
255 * - rotation thread timer queue pipe,
256 * - notification channel sock,
257 */
258 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
259 if (ret < 0) {
260 goto error;
261 }
262
263 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->quit_pipe), LPOLLIN);
264 if (ret < 0) {
265 ERR("Failed to add quit pipe read fd to poll set");
266 goto error;
267 }
268
269 ret = lttng_poll_add(
270 poll_set, lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe), LPOLLIN);
271 if (ret < 0) {
272 ERR("Failed to add rotate_pending fd to poll set");
273 goto error;
274 }
275
276 return ret;
277 error:
278 lttng_poll_clean(poll_set);
279 return ret;
280 }
281
282 static void fini_thread_state(struct rotation_thread *state)
283 {
284 lttng_poll_clean(&state->events);
285 if (rotate_notification_channel) {
286 lttng_notification_channel_destroy(rotate_notification_channel);
287 }
288
289 if (rotate_notification_channel_subscription_change_eventfd >= 0) {
290 const int close_ret = close(rotate_notification_channel_subscription_change_eventfd);
291
292 if (close_ret) {
293 PERROR("Failed to close rotation thread notification channel subscription change eventfd");
294 }
295 }
296 }
297
298 static int init_thread_state(struct rotation_thread_handle *handle, struct rotation_thread *state)
299 {
300 int ret;
301
302 memset(state, 0, sizeof(*state));
303 lttng_poll_init(&state->events);
304
305 ret = init_poll_set(&state->events, handle);
306 if (ret) {
307 ERR("Failed to initialize rotation thread poll set");
308 goto end;
309 }
310
311 rotate_notification_channel =
312 lttng_notification_channel_create(lttng_session_daemon_notification_endpoint);
313 if (!rotate_notification_channel) {
314 ERR("Could not create notification channel");
315 ret = -1;
316 goto end;
317 }
318 ret = lttng_poll_add(&state->events, rotate_notification_channel->socket, LPOLLIN);
319 if (ret < 0) {
320 ERR("Failed to add notification fd to pollset");
321 goto end;
322 }
323
324 rotate_notification_channel_subscription_change_eventfd =
325 eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
326 if (rotate_notification_channel_subscription_change_eventfd < 0) {
327 PERROR("Failed to create rotation thread notification channel subscription change eventfd");
328 ret = -1;
329 goto end;
330 }
331 ret = lttng_poll_add(
332 &state->events, rotate_notification_channel_subscription_change_eventfd, LPOLLIN);
333 if (ret < 0) {
334 ERR("Failed to add rotation thread notification channel subscription change eventfd to pollset");
335 goto end;
336 }
337
338 end:
339 return ret;
340 }
341
342 static void check_session_rotation_pending_on_consumers(struct ltt_session *session,
343 bool *_rotation_completed)
344 {
345 int ret = 0;
346 struct consumer_socket *socket;
347 struct cds_lfht_iter iter;
348 enum consumer_trace_chunk_exists_status exists_status;
349 uint64_t relayd_id;
350 bool chunk_exists_on_peer = false;
351 enum lttng_trace_chunk_status chunk_status;
352 lttng::urcu::read_lock_guard read_lock;
353
354 LTTNG_ASSERT(session->chunk_being_archived);
355
356 /*
357 * Check for a local pending rotation on all consumers (32-bit
358 * user space, 64-bit user space, and kernel).
359 */
360 if (!session->ust_session) {
361 goto skip_ust;
362 }
363
364 cds_lfht_for_each_entry (
365 session->ust_session->consumer->socks->ht, &iter, socket, node.node) {
366 relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
367 -1ULL :
368 session->ust_session->consumer->net_seq_index;
369
370 pthread_mutex_lock(socket->lock);
371 ret = consumer_trace_chunk_exists(socket,
372 relayd_id,
373 session->id,
374 session->chunk_being_archived,
375 &exists_status);
376 if (ret) {
377 pthread_mutex_unlock(socket->lock);
378 ERR("Error occurred while checking rotation status on consumer daemon");
379 goto end;
380 }
381
382 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
383 pthread_mutex_unlock(socket->lock);
384 chunk_exists_on_peer = true;
385 goto end;
386 }
387 pthread_mutex_unlock(socket->lock);
388 }
389
390 skip_ust:
391 if (!session->kernel_session) {
392 goto skip_kernel;
393 }
394 cds_lfht_for_each_entry (
395 session->kernel_session->consumer->socks->ht, &iter, socket, node.node) {
396 pthread_mutex_lock(socket->lock);
397 relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
398 -1ULL :
399 session->kernel_session->consumer->net_seq_index;
400
401 ret = consumer_trace_chunk_exists(socket,
402 relayd_id,
403 session->id,
404 session->chunk_being_archived,
405 &exists_status);
406 if (ret) {
407 pthread_mutex_unlock(socket->lock);
408 ERR("Error occurred while checking rotation status on consumer daemon");
409 goto end;
410 }
411
412 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
413 pthread_mutex_unlock(socket->lock);
414 chunk_exists_on_peer = true;
415 goto end;
416 }
417 pthread_mutex_unlock(socket->lock);
418 }
419 skip_kernel:
420 end:
421
422 if (!chunk_exists_on_peer) {
423 uint64_t chunk_being_archived_id;
424
425 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
426 &chunk_being_archived_id);
427 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
428 DBG("Rotation of trace archive %" PRIu64
429 " of session \"%s\" is complete on all consumers",
430 chunk_being_archived_id,
431 session->name);
432 }
433 *_rotation_completed = !chunk_exists_on_peer;
434 if (ret) {
435 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
436 if (ret) {
437 ERR("Failed to reset rotation state of session \"%s\"", session->name);
438 }
439 }
440 }
441
442 /*
443 * Check if the last rotation was completed, called with session lock held.
444 * Should only return non-zero in the event of a fatal error. Doing so will
445 * shutdown the thread.
446 */
447 static int
448 check_session_rotation_pending(struct ltt_session *session,
449 struct notification_thread_handle *notification_thread_handle)
450 {
451 int ret;
452 struct lttng_trace_archive_location *location;
453 enum lttng_trace_chunk_status chunk_status;
454 bool rotation_completed = false;
455 const char *archived_chunk_name;
456 uint64_t chunk_being_archived_id;
457
458 if (!session->chunk_being_archived) {
459 ret = 0;
460 goto end;
461 }
462
463 chunk_status =
464 lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
465 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
466
467 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
468 session->name,
469 chunk_being_archived_id);
470
471 /*
472 * The rotation-pending check timer of a session is launched in
473 * one-shot mode. If the rotation is incomplete, the rotation
474 * thread will re-enable the pending-check timer.
475 *
476 * The timer thread can't stop the timer itself since it is involved
477 * in the check for the timer's quiescence.
478 */
479 ret = timer_session_rotation_pending_check_stop(session);
480 if (ret) {
481 goto check_ongoing_rotation;
482 }
483
484 check_session_rotation_pending_on_consumers(session, &rotation_completed);
485 if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
486 goto check_ongoing_rotation;
487 }
488
489 /*
490 * Now we can clear the "ONGOING" state in the session. New
491 * rotations can start now.
492 */
493 chunk_status = lttng_trace_chunk_get_name(
494 session->chunk_being_archived, &archived_chunk_name, nullptr);
495 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
496 free(session->last_archived_chunk_name);
497 session->last_archived_chunk_name = strdup(archived_chunk_name);
498 if (!session->last_archived_chunk_name) {
499 PERROR("Failed to duplicate archived chunk name");
500 }
501 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
502
503 if (!session->quiet_rotation) {
504 location = session_get_trace_archive_location(session);
505 ret = notification_thread_command_session_rotation_completed(
506 notification_thread_handle,
507 session->id,
508 session->last_archived_chunk_id.value,
509 location);
510 lttng_trace_archive_location_put(location);
511 if (ret != LTTNG_OK) {
512 ERR("Failed to notify notification thread of completed rotation for session %s",
513 session->name);
514 }
515 }
516
517 ret = 0;
518 check_ongoing_rotation:
519 if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
520 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
521 &chunk_being_archived_id);
522 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
523
524 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
525 chunk_being_archived_id,
526 session->name);
527 ret = timer_session_rotation_pending_check_start(session,
528 DEFAULT_ROTATE_PENDING_TIMER);
529 if (ret) {
530 ERR("Failed to re-enable rotation pending timer");
531 ret = -1;
532 goto end;
533 }
534 }
535
536 end:
537 return ret;
538 }
539
540 /* Call with the session and session_list locks held. */
541 static int launch_session_rotation(struct ltt_session *session)
542 {
543 int ret;
544 struct lttng_rotate_session_return rotation_return;
545
546 DBG("Launching scheduled time-based rotation on session \"%s\"", session->name);
547
548 ret = cmd_rotate_session(
549 session, &rotation_return, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
550 if (ret == LTTNG_OK) {
551 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
552 session->name);
553 } else {
554 /* Don't consider errors as fatal. */
555 DBG("Scheduled time-based rotation aborted for session %s: %s",
556 session->name,
557 lttng_strerror(ret));
558 }
559 return 0;
560 }
561
562 static int run_job(struct rotation_thread_job *job,
563 struct ltt_session *session,
564 struct notification_thread_handle *notification_thread_handle)
565 {
566 int ret;
567
568 switch (job->type) {
569 case ROTATION_THREAD_JOB_TYPE_SCHEDULED_ROTATION:
570 ret = launch_session_rotation(session);
571 break;
572 case ROTATION_THREAD_JOB_TYPE_CHECK_PENDING_ROTATION:
573 ret = check_session_rotation_pending(session, notification_thread_handle);
574 break;
575 default:
576 abort();
577 }
578 return ret;
579 }
580
581 static int handle_job_queue(struct rotation_thread_handle *handle,
582 struct rotation_thread *state __attribute__((unused)),
583 struct rotation_thread_timer_queue *queue)
584 {
585 int ret = 0;
586
587 for (;;) {
588 struct ltt_session *session;
589 struct rotation_thread_job *job;
590
591 /* Take the queue lock only to pop an element from the list. */
592 pthread_mutex_lock(&queue->lock);
593 if (cds_list_empty(&queue->list)) {
594 pthread_mutex_unlock(&queue->lock);
595 break;
596 }
597 job = cds_list_first_entry(&queue->list, typeof(*job), head);
598 cds_list_del(&job->head);
599 pthread_mutex_unlock(&queue->lock);
600
601 session_lock_list();
602 session = job->session;
603 if (!session) {
604 DBG("Session \"%s\" not found", session->name != NULL ? session->name : "");
605 /*
606 * This is a non-fatal error, and we cannot report it to
607 * the user (timer), so just print the error and
608 * continue the processing.
609 *
610 * While the timer thread will purge pending signals for
611 * a session on the session's destruction, it is
612 * possible for a job targeting that session to have
613 * already been queued before it was destroyed.
614 */
615 free(job);
616 session_put(session);
617 session_unlock_list();
618 continue;
619 }
620
621 session_lock(session);
622 ret = run_job(job, session, handle->notification_thread_handle);
623 session_unlock(session);
624 /* Release reference held by the job. */
625 session_put(session);
626 session_unlock_list();
627 free(job);
628 if (ret) {
629 goto end;
630 }
631 }
632
633 ret = 0;
634
635 end:
636 return ret;
637 }
638
639 static int handle_condition(const struct lttng_notification *notification,
640 struct notification_thread_handle *notification_thread_handle)
641 {
642 int ret = 0;
643 const char *condition_session_name = nullptr;
644 enum lttng_condition_type condition_type;
645 enum lttng_condition_status condition_status;
646 enum lttng_evaluation_status evaluation_status;
647 uint64_t consumed;
648 struct ltt_session *session;
649 const struct lttng_condition *condition =
650 lttng_notification_get_const_condition(notification);
651 const struct lttng_evaluation *evaluation =
652 lttng_notification_get_const_evaluation(notification);
653
654 condition_type = lttng_condition_get_type(condition);
655
656 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
657 ret = -1;
658 ERR("Condition type and session usage type are not the same");
659 goto end;
660 }
661
662 /* Fetch info to test */
663 condition_status = lttng_condition_session_consumed_size_get_session_name(
664 condition, &condition_session_name);
665 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
666 ERR("Session name could not be fetched");
667 ret = -1;
668 goto end;
669 }
670 evaluation_status =
671 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
672 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
673 ERR("Failed to get evaluation");
674 ret = -1;
675 goto end;
676 }
677
678 session_lock_list();
679 session = session_find_by_name(condition_session_name);
680 if (!session) {
681 DBG("Failed to find session while handling notification: notification type = %s, session name = `%s`",
682 lttng_condition_type_str(condition_type),
683 condition_session_name);
684 /*
685 * Not a fatal error: a session can be destroyed before we get
686 * the chance to handle the notification.
687 */
688 ret = 0;
689 session_unlock_list();
690 goto end;
691 }
692 session_lock(session);
693
694 if (!lttng_trigger_is_equal(session->rotate_trigger,
695 lttng_notification_get_const_trigger(notification))) {
696 /* Notification does not originate from our rotation trigger. */
697 ret = 0;
698 goto end_unlock;
699 }
700
701 ret = unsubscribe_session_consumed_size_rotation(session, notification_thread_handle);
702 if (ret) {
703 goto end_unlock;
704 }
705
706 ret = cmd_rotate_session(
707 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
708 switch (ret) {
709 case LTTNG_OK:
710 break;
711 case -LTTNG_ERR_ROTATION_PENDING:
712 DBG("Rotate already pending, subscribe to the next threshold value");
713 break;
714 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
715 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
716 break;
717 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
718 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
719 break;
720 default:
721 ERR("Failed to rotate on size notification with error: %s", lttng_strerror(ret));
722 ret = -1;
723 goto end_unlock;
724 }
725
726 ret = subscribe_session_consumed_size_rotation(
727 session, consumed + session->rotate_size, notification_thread_handle);
728 if (ret) {
729 ERR("Failed to subscribe to session consumed size condition");
730 goto end_unlock;
731 }
732 ret = 0;
733
734 end_unlock:
735 session_unlock(session);
736 session_put(session);
737 session_unlock_list();
738 end:
739 return ret;
740 }
741
742 static int handle_notification_channel(int fd __attribute__((unused)),
743 struct rotation_thread_handle *handle,
744 struct rotation_thread *state __attribute__((unused)))
745 {
746 int ret;
747 bool notification_pending = true;
748 struct lttng_notification *notification = nullptr;
749 enum lttng_notification_channel_status status;
750
751 /*
752 * A notification channel may have multiple notifications queued-up internally in
753 * its buffers. This is because a notification channel multiplexes command replies
754 * and notifications. The current protocol specifies that multiple notifications can be
755 * received before the reply to a command.
756 *
757 * In such cases, the notification channel client implementation internally queues them and
758 * provides them on the next calls to lttng_notification_channel_get_next_notification().
759 * This is correct with respect to the public API, which is intended to be used in "blocking
760 * mode".
761 *
762 * However, this internal user relies on poll/epoll to wake-up when data is available
763 * on the notification channel's socket. As such, it can't assume that a wake-up means only
764 * one notification is available for consumption since many of them may have been queued in
765 * the channel's internal buffers.
766 */
767 while (notification_pending) {
768 status = lttng_notification_channel_has_pending_notification(
769 rotate_notification_channel, &notification_pending);
770 if (status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
771 ERR("Error occurred while checking for pending notification");
772 ret = -1;
773 goto end;
774 }
775
776 if (!notification_pending) {
777 ret = 0;
778 goto end;
779 }
780
781 /* Receive the next notification. */
782 status = lttng_notification_channel_get_next_notification(
783 rotate_notification_channel, &notification);
784 switch (status) {
785 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
786 break;
787 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
788 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
789 ret = 0;
790 goto end;
791 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
792 ERR("Notification channel was closed");
793 ret = -1;
794 goto end;
795 default:
796 /* Unhandled conditions / errors. */
797 ERR("Unknown notification channel status");
798 ret = -1;
799 goto end;
800 }
801
802 ret = handle_condition(notification, handle->notification_thread_handle);
803 lttng_notification_destroy(notification);
804 if (ret) {
805 goto end;
806 }
807 }
808 end:
809 return ret;
810 }
811
812 static void *thread_rotation(void *data)
813 {
814 int ret;
815 struct rotation_thread_handle *handle = (rotation_thread_handle *) data;
816 struct rotation_thread thread;
817 int queue_pipe_fd;
818
819 DBG("Started rotation thread");
820 rcu_register_thread();
821 rcu_thread_online();
822 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
823 health_code_update();
824
825 if (!handle) {
826 ERR("Invalid thread context provided");
827 goto end;
828 }
829
830 queue_pipe_fd = lttng_pipe_get_readfd(handle->rotation_timer_queue->event_pipe);
831
832 ret = init_thread_state(handle, &thread);
833 if (ret) {
834 goto error;
835 }
836
837 while (true) {
838 int fd_count, i;
839
840 health_poll_entry();
841 DBG("Entering poll wait");
842 ret = lttng_poll_wait(&thread.events, -1);
843 DBG("Poll wait returned (%i)", ret);
844 health_poll_exit();
845 if (ret < 0) {
846 /*
847 * Restart interrupted system call.
848 */
849 if (errno == EINTR) {
850 continue;
851 }
852 ERR("Error encountered during lttng_poll_wait (%i)", ret);
853 goto error;
854 }
855
856 fd_count = ret;
857 for (i = 0; i < fd_count; i++) {
858 int fd = LTTNG_POLL_GETFD(&thread.events, i);
859 uint32_t revents = LTTNG_POLL_GETEV(&thread.events, i);
860
861 DBG("Handling fd (%i) activity (%u)", fd, revents);
862
863 if (revents & LPOLLERR) {
864 ERR("Polling returned an error on fd %i", fd);
865 goto error;
866 }
867
868 if (fd == rotate_notification_channel->socket ||
869 fd == rotate_notification_channel_subscription_change_eventfd) {
870 ret = handle_notification_channel(fd, handle, &thread);
871 if (ret) {
872 ERR("Error occurred while handling activity on notification channel socket");
873 goto error;
874 }
875
876 if (fd == rotate_notification_channel_subscription_change_eventfd) {
877 uint64_t eventfd_value;
878 const int read_ret = lttng_read(fd, &eventfd_value, sizeof(eventfd_value));
879
880 if (read_ret != sizeof(eventfd_value)) {
881 PERROR("Failed to read value from rotation thread as writing to the rotation thread notification channel subscription change eventfd");
882 goto error;
883 }
884 }
885 } else {
886 /* Job queue or quit pipe activity. */
887
888 /*
889 * The job queue is serviced if there is
890 * activity on the quit pipe to ensure it is
891 * flushed and all references held in the queue
892 * are released.
893 */
894 ret = handle_job_queue(
895 handle, &thread, handle->rotation_timer_queue);
896 if (ret) {
897 ERR("Failed to handle rotation timer pipe event");
898 goto error;
899 }
900
901 if (fd == queue_pipe_fd) {
902 char buf;
903
904 ret = lttng_read(fd, &buf, 1);
905 if (ret != 1) {
906 ERR("Failed to read from wakeup pipe (fd = %i)",
907 fd);
908 goto error;
909 }
910 } else {
911 DBG("Quit pipe activity");
912 goto exit;
913 }
914 }
915 }
916 }
917 exit:
918 error:
919 DBG("Thread exit");
920 fini_thread_state(&thread);
921 end:
922 health_unregister(the_health_sessiond);
923 rcu_thread_offline();
924 rcu_unregister_thread();
925 return nullptr;
926 }
927
928 static bool shutdown_rotation_thread(void *thread_data)
929 {
930 struct rotation_thread_handle *handle = (rotation_thread_handle *) thread_data;
931 const int write_fd = lttng_pipe_get_writefd(handle->quit_pipe);
932
933 return notify_thread_pipe(write_fd) == 1;
934 }
935
936 bool launch_rotation_thread(struct rotation_thread_handle *handle)
937 {
938 struct lttng_thread *thread;
939
940 thread = lttng_thread_create(
941 "Rotation", thread_rotation, shutdown_rotation_thread, nullptr, handle);
942 if (!thread) {
943 goto error;
944 }
945 lttng_thread_put(thread);
946 return true;
947 error:
948 return false;
949 }
This page took 0.08088 seconds and 4 git commands to generate.