Fix: consumerd: consumed size miscomputed during statistics sampling
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.cpp
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include "health-sessiond.hpp"
10 #include "kernel.hpp"
11 #include "lttng-sessiond.hpp"
12 #include "notification-thread-commands.hpp"
13 #include "notification-thread-events.hpp"
14 #include "notification-thread.hpp"
15 #include "testpoint.hpp"
16 #include "thread.hpp"
17
18 #include <common/align.hpp>
19 #include <common/config/session-config.hpp>
20 #include <common/defaults.hpp>
21 #include <common/error.hpp>
22 #include <common/kernel-ctl/kernel-ctl.hpp>
23 #include <common/time.hpp>
24 #include <common/utils.hpp>
25
26 #include <lttng/condition/buffer-usage-internal.hpp>
27 #include <lttng/condition/condition-internal.hpp>
28 #include <lttng/notification/channel-internal.hpp>
29 #include <lttng/notification/notification-internal.hpp>
30 #include <lttng/trigger/trigger.h>
31
32 #include <signal.h>
33 #include <sys/eventfd.h>
34 #include <sys/stat.h>
35 #include <time.h>
36 #include <urcu.h>
37 #include <urcu/list.h>
38 #include <urcu/rculfhash.h>
39
40 /*
41 * Flag used to temporarily pause data consumption from testpoints.
42 *
43 * This variable is dlsym-ed from a test, so needs to be exported.
44 */
45 LTTNG_EXPORT int notifier_consumption_paused;
46
47 /*
48 * Destroy the thread data previously created by the init function.
49 */
50 void notification_thread_handle_destroy(struct notification_thread_handle *handle)
51 {
52 int ret;
53
54 if (!handle) {
55 goto end;
56 }
57
58 LTTNG_ASSERT(cds_list_empty(&handle->cmd_queue.list));
59 pthread_mutex_destroy(&handle->cmd_queue.lock);
60 sem_destroy(&handle->ready);
61
62 if (handle->cmd_queue.event_fd >= 0) {
63 ret = close(handle->cmd_queue.event_fd);
64 if (ret < 0) {
65 PERROR("Failed to close notification command queue event fd");
66 }
67 }
68 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
69 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
70 if (ret) {
71 PERROR("close 32-bit consumer channel monitoring pipe");
72 }
73 }
74 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
75 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
76 if (ret) {
77 PERROR("close 64-bit consumer channel monitoring pipe");
78 }
79 }
80 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
81 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
82 if (ret) {
83 PERROR("close kernel consumer channel monitoring pipe");
84 }
85 }
86
87 end:
88 free(handle);
89 }
90
91 struct notification_thread_handle *
92 notification_thread_handle_create(struct lttng_pipe *ust32_channel_monitor_pipe,
93 struct lttng_pipe *ust64_channel_monitor_pipe,
94 struct lttng_pipe *kernel_channel_monitor_pipe)
95 {
96 int ret;
97 struct notification_thread_handle *handle;
98 int event_fd = -1;
99
100 handle = zmalloc<notification_thread_handle>();
101 if (!handle) {
102 goto end;
103 }
104
105 sem_init(&handle->ready, 0, 0);
106
107 event_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
108 if (event_fd < 0) {
109 PERROR("event_fd creation");
110 goto error;
111 }
112
113 handle->cmd_queue.event_fd = event_fd;
114
115 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
116 ret = pthread_mutex_init(&handle->cmd_queue.lock, nullptr);
117 if (ret) {
118 goto error;
119 }
120
121 if (ust32_channel_monitor_pipe) {
122 handle->channel_monitoring_pipes.ust32_consumer =
123 lttng_pipe_release_readfd(ust32_channel_monitor_pipe);
124 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
125 goto error;
126 }
127 } else {
128 handle->channel_monitoring_pipes.ust32_consumer = -1;
129 }
130 if (ust64_channel_monitor_pipe) {
131 handle->channel_monitoring_pipes.ust64_consumer =
132 lttng_pipe_release_readfd(ust64_channel_monitor_pipe);
133 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
134 goto error;
135 }
136 } else {
137 handle->channel_monitoring_pipes.ust64_consumer = -1;
138 }
139 if (kernel_channel_monitor_pipe) {
140 handle->channel_monitoring_pipes.kernel_consumer =
141 lttng_pipe_release_readfd(kernel_channel_monitor_pipe);
142 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
143 goto error;
144 }
145 } else {
146 handle->channel_monitoring_pipes.kernel_consumer = -1;
147 }
148
149 end:
150 return handle;
151 error:
152 notification_thread_handle_destroy(handle);
153 return nullptr;
154 }
155
156 static char *get_notification_channel_sock_path()
157 {
158 int ret;
159 bool is_root = !getuid();
160 char *sock_path;
161
162 sock_path = calloc<char>(LTTNG_PATH_MAX);
163 if (!sock_path) {
164 goto error;
165 }
166
167 if (is_root) {
168 ret = snprintf(
169 sock_path, LTTNG_PATH_MAX, DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
170 if (ret < 0) {
171 goto error;
172 }
173 } else {
174 const char *home_path = utils_get_home_dir();
175
176 if (!home_path) {
177 ERR("Can't get HOME directory for socket creation");
178 goto error;
179 }
180
181 ret = snprintf(sock_path,
182 LTTNG_PATH_MAX,
183 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
184 home_path);
185 if (ret < 0) {
186 goto error;
187 }
188 }
189
190 return sock_path;
191 error:
192 free(sock_path);
193 return nullptr;
194 }
195
196 static void notification_channel_socket_destroy(int fd)
197 {
198 int ret;
199 char *sock_path = get_notification_channel_sock_path();
200
201 DBG("Destroying notification channel socket");
202
203 if (sock_path) {
204 ret = unlink(sock_path);
205 free(sock_path);
206 if (ret < 0) {
207 PERROR("unlink notification channel socket");
208 }
209 }
210
211 ret = close(fd);
212 if (ret) {
213 PERROR("close notification channel socket");
214 }
215 }
216
217 static int notification_channel_socket_create()
218 {
219 int fd = -1, ret;
220 char *sock_path = get_notification_channel_sock_path();
221
222 DBG("Creating notification channel UNIX socket at %s", sock_path);
223
224 ret = lttcomm_create_unix_sock(sock_path);
225 if (ret < 0) {
226 ERR("Failed to create notification socket");
227 goto error;
228 }
229 fd = ret;
230
231 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
232 if (ret < 0) {
233 ERR("Set file permissions failed: %s", sock_path);
234 PERROR("chmod notification channel socket");
235 goto error;
236 }
237
238 if (getuid() == 0) {
239 gid_t gid;
240
241 ret = utils_get_group_id(the_config.tracing_group_name.value, true, &gid);
242 if (ret) {
243 /* Default to root group. */
244 gid = 0;
245 }
246
247 ret = chown(sock_path, 0, gid);
248 if (ret) {
249 ERR("Failed to set the notification channel socket's group");
250 ret = -1;
251 goto error;
252 }
253 }
254
255 DBG("Notification channel UNIX socket created (fd = %i)", fd);
256 free(sock_path);
257 return fd;
258 error:
259 if (fd >= 0 && close(fd) < 0) {
260 PERROR("close notification channel socket");
261 }
262 free(sock_path);
263 return ret;
264 }
265
266 static int init_poll_set(struct lttng_poll_event *poll_set,
267 struct notification_thread_handle *handle,
268 int notification_channel_socket)
269 {
270 int ret;
271
272 /*
273 * Create pollset with size 5:
274 * - notification channel socket (listen for new connections),
275 * - command queue event fd (internal sessiond commands),
276 * - consumerd (32-bit user space) channel monitor pipe,
277 * - consumerd (64-bit user space) channel monitor pipe,
278 * - consumerd (kernel) channel monitor pipe.
279 */
280 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
281 if (ret < 0) {
282 goto end;
283 }
284
285 ret = lttng_poll_add(poll_set, notification_channel_socket, LPOLLIN | LPOLLRDHUP);
286 if (ret < 0) {
287 ERR("Failed to add notification channel socket to pollset");
288 goto error;
289 }
290 ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd, LPOLLIN);
291 if (ret < 0) {
292 ERR("Failed to add notification command queue event fd to pollset");
293 goto error;
294 }
295 ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.ust32_consumer, LPOLLIN);
296 if (ret < 0) {
297 ERR("Failed to add ust-32 channel monitoring pipe fd to pollset");
298 goto error;
299 }
300 ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.ust64_consumer, LPOLLIN);
301 if (ret < 0) {
302 ERR("Failed to add ust-64 channel monitoring pipe fd to pollset");
303 goto error;
304 }
305 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
306 goto end;
307 }
308 ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.kernel_consumer, LPOLLIN);
309 if (ret < 0) {
310 ERR("Failed to add kernel channel monitoring pipe fd to pollset");
311 goto error;
312 }
313 end:
314 return ret;
315 error:
316 lttng_poll_clean(poll_set);
317 return ret;
318 }
319
320 static void fini_thread_state(struct notification_thread_state *state)
321 {
322 int ret;
323
324 if (state->client_socket_ht) {
325 ret = handle_notification_thread_client_disconnect_all(state);
326 LTTNG_ASSERT(!ret);
327 ret = cds_lfht_destroy(state->client_socket_ht, nullptr);
328 LTTNG_ASSERT(!ret);
329 }
330 if (state->client_id_ht) {
331 ret = cds_lfht_destroy(state->client_id_ht, nullptr);
332 LTTNG_ASSERT(!ret);
333 }
334 if (state->triggers_ht) {
335 ret = handle_notification_thread_trigger_unregister_all(state);
336 LTTNG_ASSERT(!ret);
337 ret = cds_lfht_destroy(state->triggers_ht, nullptr);
338 LTTNG_ASSERT(!ret);
339 }
340 if (state->channel_triggers_ht) {
341 ret = cds_lfht_destroy(state->channel_triggers_ht, nullptr);
342 LTTNG_ASSERT(!ret);
343 }
344 if (state->channel_state_ht) {
345 ret = cds_lfht_destroy(state->channel_state_ht, nullptr);
346 LTTNG_ASSERT(!ret);
347 }
348 if (state->notification_trigger_clients_ht) {
349 ret = cds_lfht_destroy(state->notification_trigger_clients_ht, nullptr);
350 LTTNG_ASSERT(!ret);
351 }
352 if (state->channels_ht) {
353 ret = cds_lfht_destroy(state->channels_ht, nullptr);
354 LTTNG_ASSERT(!ret);
355 }
356 if (state->sessions_ht) {
357 ret = cds_lfht_destroy(state->sessions_ht, nullptr);
358 LTTNG_ASSERT(!ret);
359 }
360 if (state->triggers_by_name_uid_ht) {
361 ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, nullptr);
362 LTTNG_ASSERT(!ret);
363 }
364 if (state->trigger_tokens_ht) {
365 ret = cds_lfht_destroy(state->trigger_tokens_ht, nullptr);
366 LTTNG_ASSERT(!ret);
367 }
368 /*
369 * Must be destroyed after all channels have been destroyed.
370 * See comment in struct lttng_session_trigger_list.
371 */
372 if (state->session_triggers_ht) {
373 ret = cds_lfht_destroy(state->session_triggers_ht, nullptr);
374 LTTNG_ASSERT(!ret);
375 }
376 if (state->notification_channel_socket >= 0) {
377 notification_channel_socket_destroy(state->notification_channel_socket);
378 }
379
380 LTTNG_ASSERT(cds_list_empty(&state->tracer_event_sources_list));
381
382 if (state->executor) {
383 action_executor_destroy(state->executor);
384 }
385 lttng_poll_clean(&state->events);
386 }
387
388 static void mark_thread_as_ready(struct notification_thread_handle *handle)
389 {
390 DBG("Marking notification thread as ready");
391 sem_post(&handle->ready);
392 }
393
394 static void wait_until_thread_is_ready(struct notification_thread_handle *handle)
395 {
396 DBG("Waiting for notification thread to be ready");
397 sem_wait(&handle->ready);
398 DBG("Notification thread is ready");
399 }
400
401 static int init_thread_state(struct notification_thread_handle *handle,
402 struct notification_thread_state *state)
403 {
404 int ret;
405
406 memset(state, 0, sizeof(*state));
407 state->notification_channel_socket = -1;
408 state->trigger_id.next_tracer_token = 1;
409 lttng_poll_init(&state->events);
410
411 ret = notification_channel_socket_create();
412 if (ret < 0) {
413 goto end;
414 }
415 state->notification_channel_socket = ret;
416
417 ret = init_poll_set(&state->events, handle, state->notification_channel_socket);
418 if (ret) {
419 goto end;
420 }
421
422 DBG("Listening on notification channel socket");
423 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
424 if (ret < 0) {
425 ERR("Listen failed on notification channel socket");
426 goto error;
427 }
428
429 state->client_socket_ht = cds_lfht_new(
430 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
431 if (!state->client_socket_ht) {
432 goto error;
433 }
434
435 state->client_id_ht = cds_lfht_new(
436 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
437 if (!state->client_id_ht) {
438 goto error;
439 }
440
441 state->channel_triggers_ht = cds_lfht_new(
442 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
443 if (!state->channel_triggers_ht) {
444 goto error;
445 }
446
447 state->session_triggers_ht = cds_lfht_new(
448 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
449 if (!state->session_triggers_ht) {
450 goto error;
451 }
452
453 state->channel_state_ht = cds_lfht_new(
454 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
455 if (!state->channel_state_ht) {
456 goto error;
457 }
458
459 state->notification_trigger_clients_ht = cds_lfht_new(
460 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
461 if (!state->notification_trigger_clients_ht) {
462 goto error;
463 }
464
465 state->channels_ht = cds_lfht_new(
466 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
467 if (!state->channels_ht) {
468 goto error;
469 }
470 state->sessions_ht = cds_lfht_new(
471 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
472 if (!state->sessions_ht) {
473 goto error;
474 }
475 state->triggers_ht = cds_lfht_new(
476 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
477 if (!state->triggers_ht) {
478 goto error;
479 }
480 state->triggers_by_name_uid_ht = cds_lfht_new(
481 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
482 if (!state->triggers_by_name_uid_ht) {
483 goto error;
484 }
485
486 state->trigger_tokens_ht = cds_lfht_new(
487 DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, nullptr);
488 if (!state->trigger_tokens_ht) {
489 goto error;
490 }
491
492 CDS_INIT_LIST_HEAD(&state->tracer_event_sources_list);
493
494 state->executor = action_executor_create(handle);
495 if (!state->executor) {
496 goto error;
497 }
498
499 state->restart_poll = false;
500
501 mark_thread_as_ready(handle);
502 end:
503 return 0;
504 error:
505 fini_thread_state(state);
506 return -1;
507 }
508
509 static int handle_channel_monitoring_pipe(int fd,
510 uint32_t revents,
511 struct notification_thread_handle *handle,
512 struct notification_thread_state *state)
513 {
514 int ret = 0;
515 enum lttng_domain_type domain;
516
517 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
518 fd == handle->channel_monitoring_pipes.ust64_consumer) {
519 domain = LTTNG_DOMAIN_UST;
520 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
521 domain = LTTNG_DOMAIN_KERNEL;
522 } else {
523 abort();
524 }
525
526 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
527 ret = lttng_poll_del(&state->events, fd);
528 if (ret) {
529 ERR("Failed to remove consumer monitoring pipe from poll set");
530 }
531 goto end;
532 }
533
534 ret = handle_notification_thread_channel_sample(state, fd, domain);
535 if (ret) {
536 ERR("Consumer sample handling error occurred");
537 ret = -1;
538 goto end;
539 }
540 end:
541 return ret;
542 }
543
544 static int handle_event_notification_pipe(int event_source_fd,
545 enum lttng_domain_type domain,
546 uint32_t revents,
547 struct notification_thread_state *state)
548 {
549 int ret = 0;
550
551 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
552 ret = handle_notification_thread_tracer_event_source_died(state, event_source_fd);
553 if (ret) {
554 ERR("Failed to remove event notification pipe from poll set: fd = %d",
555 event_source_fd);
556 }
557 goto end;
558 }
559
560 if (testpoint(sessiond_handle_notifier_event_pipe)) {
561 ret = 0;
562 goto end;
563 }
564
565 if (caa_unlikely(notifier_consumption_paused)) {
566 DBG("Event notifier notification consumption paused, sleeping...");
567 sleep(1);
568 goto end;
569 }
570
571 ret = handle_notification_thread_event_notification(state, event_source_fd, domain);
572 if (ret) {
573 ERR("Event notification handling error occurred for fd: %d", event_source_fd);
574 ret = -1;
575 goto end;
576 }
577
578 end:
579 return ret;
580 }
581
582 /*
583 * Return the event source domain type via parameter.
584 */
585 static bool fd_is_event_notification_source(const struct notification_thread_state *state,
586 int fd,
587 enum lttng_domain_type *domain)
588 {
589 struct notification_event_tracer_event_source_element *source_element;
590
591 LTTNG_ASSERT(domain);
592
593 cds_list_for_each_entry (source_element, &state->tracer_event_sources_list, node) {
594 if (source_element->fd != fd) {
595 continue;
596 }
597
598 *domain = source_element->domain;
599 return true;
600 }
601
602 return false;
603 }
604
605 /*
606 * This thread services notification channel clients and commands received
607 * from various lttng-sessiond components over a command queue.
608 */
609 static void *thread_notification(void *data)
610 {
611 int ret;
612 struct notification_thread_handle *handle = (notification_thread_handle *) data;
613 struct notification_thread_state state;
614 enum lttng_domain_type domain;
615
616 DBG("Started notification thread");
617
618 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
619 rcu_register_thread();
620 rcu_thread_online();
621
622 if (!handle) {
623 ERR("Invalid thread context provided");
624 goto end;
625 }
626
627 health_code_update();
628
629 ret = init_thread_state(handle, &state);
630 if (ret) {
631 goto end;
632 }
633
634 if (testpoint(sessiond_thread_notification)) {
635 goto end;
636 }
637
638 while (true) {
639 int fd_count, i;
640
641 health_poll_entry();
642 DBG("Entering poll wait");
643 ret = lttng_poll_wait(&state.events, -1);
644 DBG("Poll wait returned (%i)", ret);
645 health_poll_exit();
646 if (ret < 0) {
647 /*
648 * Restart interrupted system call.
649 */
650 if (errno == EINTR) {
651 continue;
652 }
653 ERR("Error encountered during lttng_poll_wait (%i)", ret);
654 goto error;
655 }
656
657 /*
658 * Reset restart_poll flag so that calls below might turn it
659 * on.
660 */
661 state.restart_poll = false;
662
663 fd_count = ret;
664 for (i = 0; i < fd_count; i++) {
665 int fd = LTTNG_POLL_GETFD(&state.events, i);
666 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
667
668 DBG("Handling fd (%i) activity (%u)", fd, revents);
669
670 if (fd == state.notification_channel_socket) {
671 if (revents & LPOLLIN) {
672 ret = handle_notification_thread_client_connect(&state);
673 if (ret < 0) {
674 goto error;
675 }
676 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
677 ERR("Notification socket poll error");
678 goto error;
679 } else {
680 ERR("Unexpected poll events %u for notification socket %i",
681 revents,
682 fd);
683 goto error;
684 }
685 } else if (fd == handle->cmd_queue.event_fd) {
686 ret = handle_notification_thread_command(handle, &state);
687 if (ret < 0) {
688 DBG("Error encountered while servicing command queue");
689 goto error;
690 } else if (ret > 0) {
691 goto exit;
692 }
693 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
694 fd == handle->channel_monitoring_pipes.ust64_consumer ||
695 fd == handle->channel_monitoring_pipes.kernel_consumer) {
696 ret = handle_channel_monitoring_pipe(fd, revents, handle, &state);
697 if (ret) {
698 goto error;
699 }
700 } else if (fd_is_event_notification_source(&state, fd, &domain)) {
701 ret = handle_event_notification_pipe(fd, domain, revents, &state);
702 if (ret) {
703 goto error;
704 }
705 } else {
706 /* Activity on a client's socket. */
707 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
708 /*
709 * It doesn't matter if a command was
710 * pending on the client socket at this
711 * point since it now has no way to
712 * receive the notifications to which
713 * it was subscribing or unsubscribing.
714 */
715 ret = handle_notification_thread_client_disconnect(fd,
716 &state);
717 if (ret) {
718 goto error;
719 }
720 } else {
721 if (revents & LPOLLIN) {
722 ret = handle_notification_thread_client_in(&state,
723 fd);
724 if (ret) {
725 goto error;
726 }
727 }
728
729 if (revents & LPOLLOUT) {
730 ret = handle_notification_thread_client_out(&state,
731 fd);
732 if (ret) {
733 goto error;
734 }
735 }
736 }
737 }
738
739 /*
740 * Calls above might have changed the state of the
741 * FDs in `state.events`. Call _poll_wait() again to
742 * ensure we have a consistent state.
743 */
744 if (state.restart_poll) {
745 break;
746 }
747 }
748 }
749 exit:
750 error:
751 fini_thread_state(&state);
752 end:
753 rcu_thread_offline();
754 rcu_unregister_thread();
755 health_unregister(the_health_sessiond);
756 return nullptr;
757 }
758
759 static bool shutdown_notification_thread(void *thread_data)
760 {
761 struct notification_thread_handle *handle = (notification_thread_handle *) thread_data;
762
763 notification_thread_command_quit(handle);
764 return true;
765 }
766
767 struct lttng_thread *launch_notification_thread(struct notification_thread_handle *handle)
768 {
769 struct lttng_thread *thread;
770
771 thread = lttng_thread_create(
772 "Notification", thread_notification, shutdown_notification_thread, nullptr, handle);
773 if (!thread) {
774 goto error;
775 }
776
777 /*
778 * Wait for the thread to be marked as "ready" before returning
779 * as other subsystems depend on the notification subsystem
780 * (e.g. rotation thread).
781 */
782 wait_until_thread_is_ready(handle);
783 return thread;
784 error:
785 return nullptr;
786 }
This page took 0.083459 seconds and 4 git commands to generate.