sessiond: introduce ltt_session::locked_ref look-up functions
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
CommitLineData
db66e574 1/*
ab5be9fa
MJ
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
db66e574 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
db66e574 6 *
db66e574
JD
7 */
8
9#define _LGPL_SOURCE
28ab034a
JG
10#include "cmd.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "notification-thread-commands.hpp"
28ab034a
JG
14#include "rotation-thread.hpp"
15#include "session.hpp"
16#include "thread.hpp"
17#include "timer.hpp"
18#include "utils.hpp"
19
20#include <common/align.hpp>
c9e313bc
SM
21#include <common/config/session-config.hpp>
22#include <common/defaults.hpp>
28ab034a 23#include <common/error.hpp>
0038180d
JG
24#include <common/eventfd.hpp>
25#include <common/exception.hpp>
26#include <common/file-descriptor.hpp>
27#include <common/format.hpp>
c9e313bc 28#include <common/futex.hpp>
c9e313bc 29#include <common/hashtable/utils.hpp>
c9e313bc 30#include <common/kernel-ctl/kernel-ctl.hpp>
0038180d
JG
31#include <common/locked-reference.hpp>
32#include <common/make-unique-wrapper.hpp>
33#include <common/pthread-lock.hpp>
34#include <common/scope-exit.hpp>
28ab034a 35#include <common/time.hpp>
56047f5a 36#include <common/urcu.hpp>
28ab034a
JG
37#include <common/utils.hpp>
38
0038180d 39#include <lttng/action/action-internal.hpp>
c9e313bc 40#include <lttng/condition/condition-internal.hpp>
28ab034a
JG
41#include <lttng/location-internal.hpp>
42#include <lttng/notification/channel-internal.hpp>
c08136a3 43#include <lttng/notification/notification-internal.hpp>
28ab034a
JG
44#include <lttng/rotate-internal.hpp>
45#include <lttng/trigger/trigger.h>
db66e574 46
671e39d7 47#include <fcntl.h>
28ab034a 48#include <inttypes.h>
0038180d 49#include <memory>
28ab034a 50#include <signal.h>
dc65dda3 51#include <sys/eventfd.h>
28ab034a
JG
52#include <sys/stat.h>
53#include <time.h>
db66e574
JD
54#include <urcu.h>
55#include <urcu/list.h>
db66e574 56
0038180d 57namespace ls = lttng::sessiond;
db66e574 58
92816cc3
JG
59/*
60 * The timer thread enqueues jobs and wakes up the rotation thread.
61 * When the rotation thread wakes up, it empties the queue.
62 */
0038180d 63struct ls::rotation_thread_timer_queue {
92816cc3
JG
64 struct lttng_pipe *event_pipe;
65 struct cds_list_head list;
66 pthread_mutex_t lock;
67};
68
f1494934
JG
69namespace {
70struct rotation_thread_job {
303ac4ed
JG
71 using uptr =
72 std::unique_ptr<rotation_thread_job,
73 lttng::memory::create_deleter_class<rotation_thread_job,
74 lttng::memory::free>::deleter>;
0038180d
JG
75
76 enum ls::rotation_thread_job_type type;
f1494934
JG
77 struct ltt_session *session;
78 /* List member in struct rotation_thread_timer_queue. */
79 struct cds_list_head head;
80};
f1494934 81
0038180d 82const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
db66e574 83{
92816cc3 84 switch (job_type) {
0038180d 85 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
92816cc3 86 return "CHECK_PENDING_ROTATION";
0038180d 87 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
92816cc3
JG
88 return "SCHEDULED_ROTATION";
89 default:
90 abort();
91 }
db66e574
JD
92}
93
92816cc3
JG
94/*
95 * Called with the rotation_thread_timer_queue lock held.
96 * Return true if the same timer job already exists in the queue, false if not.
97 */
0038180d
JG
98bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
99 ls::rotation_thread_job_type job_type,
100 ltt_session *session)
92816cc3
JG
101{
102 bool exists = false;
103 struct rotation_thread_job *job;
104
28ab034a 105 cds_list_for_each_entry (job, &queue->list, head) {
c7031a2c 106 if (job->session == session && job->type == job_type) {
92816cc3
JG
107 exists = true;
108 goto end;
db66e574 109 }
db66e574 110 }
92816cc3
JG
111end:
112 return exists;
113}
114
0038180d 115void check_session_rotation_pending_on_consumers(ltt_session& session, bool& _rotation_completed)
92816cc3 116{
db582e11 117 int ret = 0;
92816cc3
JG
118 struct consumer_socket *socket;
119 struct cds_lfht_iter iter;
d2956687
JG
120 enum consumer_trace_chunk_exists_status exists_status;
121 uint64_t relayd_id;
122 bool chunk_exists_on_peer = false;
123 enum lttng_trace_chunk_status chunk_status;
56047f5a 124 lttng::urcu::read_lock_guard read_lock;
d2956687 125
0038180d 126 LTTNG_ASSERT(session.chunk_being_archived);
92816cc3
JG
127
128 /*
129 * Check for a local pending rotation on all consumers (32-bit
130 * user space, 64-bit user space, and kernel).
131 */
0038180d 132 if (!session.ust_session) {
92816cc3
JG
133 goto skip_ust;
134 }
56047f5a 135
28ab034a 136 cds_lfht_for_each_entry (
0038180d
JG
137 session.ust_session->consumer->socks->ht, &iter, socket, node.node) {
138 relayd_id = session.ust_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 139 -1ULL :
0038180d 140 session.ust_session->consumer->net_seq_index;
d2956687 141
0038180d 142 lttng::pthread::lock_guard socket_lock(*socket->lock);
d2956687 143 ret = consumer_trace_chunk_exists(socket,
28ab034a 144 relayd_id,
0038180d
JG
145 session.id,
146 session.chunk_being_archived,
28ab034a 147 &exists_status);
d2956687 148 if (ret) {
83ed9e90 149 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3 150 goto end;
db66e574 151 }
d2956687 152
16100d7a 153 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
154 chunk_exists_on_peer = true;
155 goto end;
16100d7a 156 }
16100d7a 157 }
db66e574 158
92816cc3 159skip_ust:
0038180d 160 if (!session.kernel_session) {
92816cc3 161 goto skip_kernel;
db66e574 162 }
0038180d 163
28ab034a 164 cds_lfht_for_each_entry (
0038180d
JG
165 session.kernel_session->consumer->socks->ht, &iter, socket, node.node) {
166 lttng::pthread::lock_guard socket_lock(*socket->lock);
167
168 relayd_id = session.kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 169 -1ULL :
0038180d 170 session.kernel_session->consumer->net_seq_index;
d2956687
JG
171
172 ret = consumer_trace_chunk_exists(socket,
28ab034a 173 relayd_id,
0038180d
JG
174 session.id,
175 session.chunk_being_archived,
28ab034a 176 &exists_status);
d2956687 177 if (ret) {
83ed9e90 178 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3
JG
179 goto end;
180 }
d2956687 181
16100d7a 182 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
183 chunk_exists_on_peer = true;
184 goto end;
16100d7a 185 }
92816cc3
JG
186 }
187skip_kernel:
188end:
db66e574 189
d2956687
JG
190 if (!chunk_exists_on_peer) {
191 uint64_t chunk_being_archived_id;
192
0038180d 193 chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
28ab034a 194 &chunk_being_archived_id);
a0377dfe 195 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
28ab034a
JG
196 DBG("Rotation of trace archive %" PRIu64
197 " of session \"%s\" is complete on all consumers",
198 chunk_being_archived_id,
0038180d 199 session.name);
db66e574 200 }
0038180d
JG
201
202 _rotation_completed = !chunk_exists_on_peer;
92816cc3 203 if (ret) {
28ab034a 204 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
2961f09e 205 if (ret) {
0038180d 206 ERR("Failed to reset rotation state of session \"%s\"", session.name);
2961f09e 207 }
db66e574 208 }
db66e574
JD
209}
210
d88744a4 211/*
92816cc3 212 * Check if the last rotation was completed, called with session lock held.
d2956687
JG
213 * Should only return non-zero in the event of a fatal error. Doing so will
214 * shutdown the thread.
d88744a4 215 */
0038180d
JG
216int check_session_rotation_pending(ltt_session& session,
217 notification_thread_handle& notification_thread_handle)
d88744a4
JD
218{
219 int ret;
92816cc3 220 struct lttng_trace_archive_location *location;
d2956687
JG
221 enum lttng_trace_chunk_status chunk_status;
222 bool rotation_completed = false;
223 const char *archived_chunk_name;
224 uint64_t chunk_being_archived_id;
225
0038180d 226 if (!session.chunk_being_archived) {
dc1d5967
FD
227 ret = 0;
228 goto end;
229 }
230
28ab034a 231 chunk_status =
0038180d 232 lttng_trace_chunk_get_id(session.chunk_being_archived, &chunk_being_archived_id);
a0377dfe 233 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d88744a4 234
bd0514a5 235 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
0038180d 236 session.name,
28ab034a 237 chunk_being_archived_id);
d2956687 238
faf1bdcf
JG
239 /*
240 * The rotation-pending check timer of a session is launched in
241 * one-shot mode. If the rotation is incomplete, the rotation
242 * thread will re-enable the pending-check timer.
243 *
244 * The timer thread can't stop the timer itself since it is involved
245 * in the check for the timer's quiescence.
246 */
247 ret = timer_session_rotation_pending_check_stop(session);
248 if (ret) {
6ae1bf46 249 goto check_ongoing_rotation;
faf1bdcf
JG
250 }
251
0038180d
JG
252 check_session_rotation_pending_on_consumers(session, rotation_completed);
253 if (!rotation_completed || session.rotation_state == LTTNG_ROTATION_STATE_ERROR) {
6ae1bf46 254 goto check_ongoing_rotation;
92816cc3
JG
255 }
256
92816cc3
JG
257 /*
258 * Now we can clear the "ONGOING" state in the session. New
259 * rotations can start now.
260 */
28ab034a 261 chunk_status = lttng_trace_chunk_get_name(
0038180d 262 session.chunk_being_archived, &archived_chunk_name, nullptr);
a0377dfe 263 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
0038180d
JG
264 free(session.last_archived_chunk_name);
265 session.last_archived_chunk_name = strdup(archived_chunk_name);
266 if (!session.last_archived_chunk_name) {
d2956687
JG
267 PERROR("Failed to duplicate archived chunk name");
268 }
0038180d 269
d2956687 270 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
92816cc3 271
0038180d
JG
272 if (!session.quiet_rotation) {
273 location = session_get_trace_archive_location(&session);
7fdbed1c 274 ret = notification_thread_command_session_rotation_completed(
0038180d
JG
275 &notification_thread_handle,
276 session.id,
277 session.last_archived_chunk_id.value,
28ab034a 278 location);
d3740619 279 lttng_trace_archive_location_put(location);
7fdbed1c 280 if (ret != LTTNG_OK) {
bd0514a5 281 ERR("Failed to notify notification thread of completed rotation for session %s",
0038180d 282 session.name);
7fdbed1c 283 }
92816cc3
JG
284 }
285
286 ret = 0;
6ae1bf46 287check_ongoing_rotation:
0038180d
JG
288 if (session.rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
289 chunk_status = lttng_trace_chunk_get_id(session.chunk_being_archived,
28ab034a 290 &chunk_being_archived_id);
a0377dfe 291 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 292
bd0514a5 293 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
28ab034a 294 chunk_being_archived_id,
0038180d
JG
295 session.name);
296 ret = timer_session_rotation_pending_check_start(&session,
28ab034a 297 DEFAULT_ROTATE_PENDING_TIMER);
92816cc3 298 if (ret) {
d2956687 299 ERR("Failed to re-enable rotation pending timer");
92816cc3
JG
300 ret = -1;
301 goto end;
302 }
303 }
304
6ae1bf46 305end:
d88744a4
JD
306 return ret;
307}
308
ed1e52a3 309/* Call with the session and session_list locks held. */
0038180d 310int launch_session_rotation(ltt_session& session)
259c2674
JD
311{
312 int ret;
92816cc3 313 struct lttng_rotate_session_return rotation_return;
259c2674 314
0038180d 315 DBG("Launching scheduled time-based rotation on session \"%s\"", session.name);
259c2674 316
0038180d 317 ASSERT_SESSION_LIST_LOCKED();
d9a970b7 318 ASSERT_LOCKED(session._lock);
0038180d
JG
319
320 ret = cmd_rotate_session(&session,
321 &rotation_return,
322 false,
323 LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
92816cc3 324 if (ret == LTTNG_OK) {
bd0514a5 325 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
0038180d 326 session.name);
92816cc3
JG
327 } else {
328 /* Don't consider errors as fatal. */
bd0514a5 329 DBG("Scheduled time-based rotation aborted for session %s: %s",
0038180d 330 session.name,
28ab034a 331 lttng_strerror(ret));
259c2674 332 }
0038180d 333
92816cc3
JG
334 return 0;
335}
259c2674 336
0038180d
JG
337int run_job(const rotation_thread_job& job,
338 ltt_session& session,
339 notification_thread_handle& notification_thread_handle)
92816cc3
JG
340{
341 int ret;
259c2674 342
0038180d
JG
343 switch (job.type) {
344 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
16100d7a 345 ret = launch_session_rotation(session);
92816cc3 346 break;
0038180d 347 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
28ab034a 348 ret = check_session_rotation_pending(session, notification_thread_handle);
92816cc3
JG
349 break;
350 default:
351 abort();
259c2674 352 }
0038180d 353
259c2674
JD
354 return ret;
355}
356
0038180d 357bool shutdown_rotation_thread(void *thread_data)
d88744a4 358{
0038180d 359 auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
d88744a4 360
0038180d
JG
361 return handle->shutdown();
362}
363} /* namespace */
d88744a4 364
0038180d
JG
365ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
366{
367 auto queue = zmalloc<ls::rotation_thread_timer_queue>();
368 if (!queue) {
369 PERROR("Failed to allocate timer rotate queue");
370 goto end;
371 }
372
373 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
374 CDS_INIT_LIST_HEAD(&queue->list);
375 pthread_mutex_init(&queue->lock, nullptr);
376end:
377 return queue;
378}
379
380void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
381{
382 if (!queue) {
383 return;
384 }
385
386 lttng_pipe_destroy(queue->event_pipe);
387
388 {
389 lttng::pthread::lock_guard queue_lock(queue->lock);
390
391 LTTNG_ASSERT(cds_list_empty(&queue->list));
392 }
393
394 pthread_mutex_destroy(&queue->lock);
395 free(queue);
396}
397
28f23191
JG
398ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
399 notification_thread_handle& notification_thread_handle) :
83885b70
MJ
400 _rotation_timer_queue(rotation_timer_queue),
401 _notification_thread_handle(notification_thread_handle)
0038180d
JG
402{
403 _quit_pipe.reset([]() {
404 auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
405 if (!raw_pipe) {
406 LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
d88744a4 407 }
d88744a4 408
0038180d
JG
409 return raw_pipe;
410 }());
411
412 _notification_channel.reset([]() {
413 auto channel = lttng_notification_channel_create(
414 lttng_session_daemon_notification_endpoint);
415 if (!channel) {
416 LTTNG_THROW_ERROR(
417 "Failed to create notification channel of rotation thread");
418 }
419
420 return channel;
421 }());
422
423 lttng_poll_init(&_events);
424
425 /*
426 * Create pollset with size 4:
427 * - rotation thread quit pipe,
428 * - rotation thread timer queue pipe,
429 * - notification channel sock,
430 * - subscribtion change event fd
431 */
432 if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
433 LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
434 }
435
436 if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
437 LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
438 }
439
440 if (lttng_poll_add(&_events,
441 lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
442 LPOLLIN) < 0) {
443 LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
444 }
445
446 if (lttng_poll_add(&_events,
447 _notification_channel_subscribtion_change_eventfd.fd(),
448 LPOLLIN) < 0) {
449 LTTNG_THROW_ERROR(
450 "Failed to add rotation thread notification channel subscription change eventfd to poll set");
451 }
452
453 if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
454 LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
455 }
456}
457
458ls::rotation_thread::~rotation_thread()
459{
460 lttng_poll_clean(&_events);
461}
462
463void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
28f23191
JG
464 ls::rotation_thread_job_type job_type,
465 ltt_session *session)
0038180d
JG
466{
467 const char dummy = '!';
468 struct rotation_thread_job *job = nullptr;
469 const char *job_type_str = get_job_type_str(job_type);
470 lttng::pthread::lock_guard queue_lock(queue->lock);
471
472 if (timer_job_exists(queue, job_type, session)) {
473 /*
474 * This timer job is already pending, we don't need to add
475 * it.
476 */
477 return;
478 }
479
480 job = zmalloc<rotation_thread_job>();
481 if (!job) {
482 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
483 job_type_str,
484 session->name);
485 return;
486 }
487
488 /* No reason for this to fail as the caller must hold a reference. */
489 (void) session_get(session);
490
491 job->session = session;
492 job->type = job_type;
493 cds_list_add_tail(&job->head, &queue->list);
494
495 const int write_ret =
496 lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
497 if (write_ret < 0) {
498 /*
499 * We do not want to block in the timer handler, the job has
500 * been enqueued in the list, the wakeup pipe is probably full,
501 * the job will be processed when the rotation_thread catches
502 * up.
503 */
504 DIAGNOSTIC_PUSH
505 DIAGNOSTIC_IGNORE_LOGICAL_OP
506 if (errno == EAGAIN || errno == EWOULDBLOCK) {
507 DIAGNOSTIC_POP
d88744a4 508 /*
0038180d
JG
509 * Not an error, but would be surprising and indicate
510 * that the rotation thread can't keep up with the
511 * current load.
d88744a4 512 */
0038180d
JG
513 DBG("Wake-up pipe of rotation thread job queue is full");
514 return;
d88744a4
JD
515 }
516
0038180d
JG
517 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
518 job_type_str,
519 session->name);
520 return;
d88744a4 521 }
0038180d 522}
d88744a4 523
0038180d
JG
524void ls::rotation_thread::_handle_job_queue()
525{
526 for (;;) {
527 rotation_thread_job::uptr job;
528
529 {
530 /* Take the queue lock only to pop an element from the list. */
531 lttng::pthread::lock_guard rotation_timer_queue_lock(
532 _rotation_timer_queue.lock);
533 if (cds_list_empty(&_rotation_timer_queue.list)) {
534 break;
535 }
d88744a4 536
0038180d
JG
537 job.reset(cds_list_first_entry(
538 &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
539 cds_list_del(&job->head);
540 }
541
d9a970b7 542 const auto list_lock = lttng::sessiond::lock_session_list();
0038180d 543
77682be9 544 /* locked_ref will unlock the session and release the ref held by the job. */
0038180d 545 session_lock(job->session);
77682be9 546 auto session = ltt_session::locked_ref(job->session);
0038180d
JG
547
548 if (run_job(*job, *session, _notification_thread_handle)) {
549 return;
550 }
551 }
d88744a4
JD
552}
553
28f23191 554void ls::rotation_thread::_handle_notification(const lttng_notification& notification)
90936dcf
JD
555{
556 int ret = 0;
cd9adb8b 557 const char *condition_session_name = nullptr;
90936dcf
JD
558 enum lttng_condition_status condition_status;
559 enum lttng_evaluation_status evaluation_status;
560 uint64_t consumed;
0038180d
JG
561 auto *condition = lttng_notification_get_const_condition(&notification);
562 auto *evaluation = lttng_notification_get_const_evaluation(&notification);
563 const auto condition_type = lttng_condition_get_type(condition);
90936dcf
JD
564
565 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
0038180d 566 LTTNG_THROW_ERROR("Unexpected condition type");
90936dcf
JD
567 }
568
0038180d 569 /* Fetch info to test. */
90936dcf 570 condition_status = lttng_condition_session_consumed_size_get_session_name(
28ab034a 571 condition, &condition_session_name);
90936dcf 572 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
0038180d 573 LTTNG_THROW_ERROR("Session name could not be fetched from notification");
90936dcf 574 }
0038180d 575
28ab034a
JG
576 evaluation_status =
577 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
90936dcf 578 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
0038180d 579 LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
90936dcf
JD
580 }
581
0038180d
JG
582 DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
583 condition_session_name,
584 consumed);
585
d9a970b7
JG
586 /*
587 * Mind the order of the declaration of list_lock vs session:
588 * the session list lock must always be released _after_ the release of
589 * a session's reference (the destruction of a ref/locked_ref) to ensure
590 * since the reference's release may unpublish the session from the list of
591 * sessions.
592 */
593 const auto list_lock = lttng::sessiond::lock_session_list();
594 ltt_session::locked_ref session;
595 try {
596 session = ltt_session::find_locked_session(condition_session_name);
597 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
0038180d
JG
598 DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
599 lttng_condition_type_str(condition_type),
600 condition_session_name);
eb2827a4
JG
601 /*
602 * Not a fatal error: a session can be destroyed before we get
603 * the chance to handle the notification.
604 */
0038180d 605 return;
90936dcf 606 }
90936dcf 607
c08136a3 608 if (!lttng_trigger_is_equal(session->rotate_trigger,
0038180d
JG
609 lttng_notification_get_const_trigger(&notification))) {
610 DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
611 return;
c08136a3
JG
612 }
613
0038180d 614 unsubscribe_session_consumed_size_rotation(*session);
90936dcf 615
2545db87 616 ret = cmd_rotate_session(
0038180d 617 session.get(), nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
2545db87
JG
618 switch (ret) {
619 case LTTNG_OK:
620 break;
621 case -LTTNG_ERR_ROTATION_PENDING:
90936dcf 622 DBG("Rotate already pending, subscribe to the next threshold value");
2545db87
JG
623 break;
624 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
625 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
626 break;
627 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
628 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
629 break;
630 default:
0038180d
JG
631 LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
632 static_cast<lttng_error_code>(-ret));
90936dcf 633 }
90936dcf 634
0038180d 635 subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
90936dcf
JD
636}
637
0038180d 638void ls::rotation_thread::_handle_notification_channel_activity()
90936dcf 639{
dc65dda3 640 bool notification_pending = true;
90936dcf 641
dc65dda3
JG
642 /*
643 * A notification channel may have multiple notifications queued-up internally in
644 * its buffers. This is because a notification channel multiplexes command replies
645 * and notifications. The current protocol specifies that multiple notifications can be
646 * received before the reply to a command.
647 *
648 * In such cases, the notification channel client implementation internally queues them and
649 * provides them on the next calls to lttng_notification_channel_get_next_notification().
650 * This is correct with respect to the public API, which is intended to be used in "blocking
651 * mode".
652 *
653 * However, this internal user relies on poll/epoll to wake-up when data is available
654 * on the notification channel's socket. As such, it can't assume that a wake-up means only
655 * one notification is available for consumption since many of them may have been queued in
656 * the channel's internal buffers.
657 */
658 while (notification_pending) {
0038180d
JG
659 const auto pending_status = lttng_notification_channel_has_pending_notification(
660 _notification_channel.get(), &notification_pending);
661 if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
662 LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
dc65dda3 663 }
d73ee93f 664
dc65dda3 665 if (!notification_pending) {
0038180d 666 return;
dc65dda3 667 }
d73ee93f 668
dc65dda3 669 /* Receive the next notification. */
0038180d
JG
670 lttng_notification::uptr notification;
671 enum lttng_notification_channel_status next_notification_status;
672
673 {
674 struct lttng_notification *raw_notification_ptr;
675
676 next_notification_status = lttng_notification_channel_get_next_notification(
677 _notification_channel.get(), &raw_notification_ptr);
678 notification.reset(raw_notification_ptr);
679 }
680
681 switch (next_notification_status) {
dc65dda3
JG
682 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
683 break;
684 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
685 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
0038180d 686 return;
dc65dda3 687 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
0038180d 688 LTTNG_THROW_ERROR("Notification channel was closed");
dc65dda3
JG
689 default:
690 /* Unhandled conditions / errors. */
0038180d 691 LTTNG_THROW_ERROR("Unknown notification channel status");
dc65dda3 692 }
90936dcf 693
0038180d 694 _handle_notification(*notification);
90936dcf 695 }
90936dcf
JD
696}
697
0038180d 698void ls::rotation_thread::_thread_function() noexcept
db66e574 699{
bd0514a5 700 DBG("Started rotation thread");
0038180d
JG
701
702 try {
703 _run();
704 } catch (const std::exception& e) {
705 ERR_FMT("Fatal rotation thread error: {}", e.what());
706 }
707
708 DBG("Thread exit");
709}
710
711void ls::rotation_thread::_run()
712{
f620cc28 713 rcu_register_thread();
0038180d
JG
714 const auto unregister_rcu_thread =
715 lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
716
f620cc28 717 rcu_thread_online();
0038180d
JG
718 const auto offline_rcu_thread =
719 lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
720
412d7227 721 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
f620cc28 722 health_code_update();
0038180d
JG
723 const auto unregister_health =
724 lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
db66e574 725
0038180d 726 const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
db66e574 727
db66e574 728 while (true) {
db66e574 729 health_poll_entry();
bd0514a5 730 DBG("Entering poll wait");
0038180d
JG
731 auto poll_wait_ret = lttng_poll_wait(&_events, -1);
732 DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
db66e574 733 health_poll_exit();
0038180d 734 if (poll_wait_ret < 0) {
db66e574
JD
735 /*
736 * Restart interrupted system call.
737 */
738 if (errno == EINTR) {
739 continue;
740 }
0038180d
JG
741
742 LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
db66e574
JD
743 }
744
0038180d
JG
745 const auto fd_count = poll_wait_ret;
746 for (int i = 0; i < fd_count; i++) {
747 const auto fd = LTTNG_POLL_GETFD(&_events, i);
748 const auto revents = LTTNG_POLL_GETEV(&_events, i);
db66e574 749
0038180d 750 DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
db66e574 751
92816cc3 752 if (revents & LPOLLERR) {
f9a41357
JG
753 LTTNG_THROW_ERROR(lttng::format(
754 "Polling returned an error on fd: fd={}", fd));
92816cc3
JG
755 }
756
0038180d
JG
757 if (fd == _notification_channel->socket ||
758 fd == _notification_channel_subscribtion_change_eventfd.fd()) {
759 try {
760 _handle_notification_channel_activity();
761 } catch (const lttng::ctl::error& e) {
762 /*
763 * The only non-fatal error (rotation failed), others
764 * are caught at the top-level.
765 */
766 DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
767 e.what());
768 continue;
85e17b27 769 }
dc65dda3 770
0038180d 771 if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
28f23191
JG
772 _notification_channel_subscribtion_change_eventfd
773 .decrement();
dc65dda3 774 }
85e17b27
JG
775 } else {
776 /* Job queue or quit pipe activity. */
85e17b27
JG
777
778 /*
779 * The job queue is serviced if there is
780 * activity on the quit pipe to ensure it is
781 * flushed and all references held in the queue
782 * are released.
783 */
0038180d 784 _handle_job_queue();
64d9b072
JG
785 if (fd == queue_pipe_fd) {
786 char buf;
787
0038180d
JG
788 if (lttng_read(fd, &buf, 1) != 1) {
789 LTTNG_THROW_POSIX(
f9a41357 790 lttng::format(
0038180d
JG
791 "Failed to read from wakeup pipe: fd={}",
792 fd),
793 errno);
64d9b072
JG
794 }
795 } else {
bd0514a5 796 DBG("Quit pipe activity");
0038180d 797 return;
90936dcf 798 }
db66e574
JD
799 }
800 }
801 }
db66e574 802}
64d9b072 803
0038180d 804bool ls::rotation_thread::shutdown() const noexcept
64d9b072 805{
0038180d 806 const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
64d9b072
JG
807
808 return notify_thread_pipe(write_fd) == 1;
809}
810
0038180d 811void ls::rotation_thread::launch_thread()
64d9b072 812{
0038180d
JG
813 auto thread = lttng_thread_create(
814 "Rotation",
815 [](void *ptr) {
816 auto handle = reinterpret_cast<rotation_thread *>(ptr);
817
818 handle->_thread_function();
819 return static_cast<void *>(nullptr);
820 },
821 shutdown_rotation_thread,
822 nullptr,
823 this);
64d9b072 824 if (!thread) {
0038180d 825 LTTNG_THROW_ERROR("Failed to launch rotation thread");
64d9b072 826 }
0038180d 827
64d9b072 828 lttng_thread_put(thread);
0038180d
JG
829}
830
831void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
28f23191 832 std::uint64_t size)
0038180d
JG
833{
834 const struct lttng_credentials session_creds = {
835 .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
836 .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
837 };
838
d9a970b7 839 ASSERT_LOCKED(session._lock);
0038180d
JG
840
841 auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
842 lttng_condition_session_consumed_size_create());
843 if (!rotate_condition) {
844 LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
845 }
846
847 auto condition_status =
848 lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
849 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
f9a41357 850 LTTNG_THROW_ERROR(lttng::format(
0038180d
JG
851 "Could not set session consumed size condition threshold: size={}", size));
852 }
853
28f23191
JG
854 condition_status = lttng_condition_session_consumed_size_set_session_name(
855 rotate_condition.get(), session.name);
0038180d 856 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
f9a41357 857 LTTNG_THROW_ERROR(lttng::format(
0038180d
JG
858 "Could not set session consumed size condition session name: name=`{}`",
859 session.name));
860 }
861
862 auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
863 lttng_action_notify_create());
864 if (!notify_action) {
865 LTTNG_THROW_POSIX("Could not create notify action", errno);
866 }
867
868 LTTNG_ASSERT(!session.rotate_trigger);
869 /* trigger acquires its own reference to condition and action on success. */
870 auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
871 lttng_trigger_create(rotate_condition.get(), notify_action.get()));
28f23191 872 if (!trigger) {
0038180d
JG
873 LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
874 }
875
876 /* Ensure this trigger is not visible to external users. */
877 lttng_trigger_set_hidden(trigger.get());
878 lttng_trigger_set_credentials(trigger.get(), &session_creds);
879
28f23191
JG
880 auto nc_status = lttng_notification_channel_subscribe(_notification_channel.get(),
881 rotate_condition.get());
0038180d
JG
882 if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
883 LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
884 }
885
886 /*
887 * Ensure any notification queued during the subscription are consumed by queueing an
888 * event.
889 */
890 _notification_channel_subscribtion_change_eventfd.increment();
891
892 const auto register_ret = notification_thread_command_register_trigger(
893 &_notification_thread_handle, trigger.get(), true);
894 if (register_ret != LTTNG_OK) {
895 LTTNG_THROW_CTL(
f9a41357 896 lttng::format(
0038180d
JG
897 "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
898 session.name,
899 size),
900 register_ret);
901 }
902
903 /* Ownership transferred to the session. */
904 session.rotate_trigger = trigger.release();
905}
906
907void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
908{
909 LTTNG_ASSERT(session.rotate_trigger);
910
911 const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
912 lttng_trigger_put(session.rotate_trigger);
913 session.rotate_trigger = nullptr;
914 });
915
916 const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
917 _notification_channel.get(),
918 lttng_trigger_get_const_condition(session.rotate_trigger));
919 if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
f9a41357 920 LTTNG_THROW_ERROR(lttng::format(
0038180d
JG
921 "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
922 session.name,
923 static_cast<int>(unsubscribe_status)));
924 }
925
926 /*
927 * Ensure any notification queued during the un-subscription are consumed by queueing an
928 * event.
929 */
930 _notification_channel_subscribtion_change_eventfd.increment();
931
932 const auto unregister_status = notification_thread_command_unregister_trigger(
933 &_notification_thread_handle, session.rotate_trigger);
934 if (unregister_status != LTTNG_OK) {
935 LTTNG_THROW_CTL(
f9a41357 936 lttng::format(
0038180d
JG
937 "Failed to unregister trigger for automatic size-based rotation: session_name{}",
938 session.name),
939 unregister_status);
940 }
64d9b072 941}
This page took 0.120349 seconds and 4 git commands to generate.