2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
10 #include "dispatch.hpp"
11 #include "fd-limit.hpp"
12 #include "health-sessiond.hpp"
13 #include "lttng-sessiond.hpp"
14 #include "testpoint.hpp"
16 #include "ust-app.hpp"
18 #include <common/futex.hpp>
19 #include <common/macros.hpp>
26 struct thread_notifiers
{
27 struct ust_cmd_queue
*ust_cmd_queue
;
28 int apps_cmd_pipe_write_fd
;
29 int apps_cmd_notify_pipe_write_fd
;
30 int dispatch_thread_exit
;
35 * For each tracing session, update newly registered apps. The session list
36 * lock MUST be acquired before calling this.
38 static void update_ust_app(int app_sock
)
40 struct ltt_session
*sess
, *stmp
;
41 const struct ltt_session_list
*session_list
= session_get_list();
44 /* Consumer is in an ERROR state. Stop any application update. */
45 if (uatomic_read(&the_ust_consumerd_state
) == CONSUMER_ERROR
) {
46 /* Stop the update process since the consumer is dead. */
51 LTTNG_ASSERT(app_sock
>= 0);
52 app
= ust_app_find_by_sock(app_sock
);
55 * Application can be unregistered before so
56 * this is possible hence simply stopping the
59 DBG3("UST app update failed to find app sock %d", app_sock
);
63 /* Update all event notifiers for the app. */
64 ust_app_global_update_event_notifier_rules(app
);
66 /* For all tracing session(s) */
67 cds_list_for_each_entry_safe (sess
, stmp
, &session_list
->head
, list
) {
68 if (!session_get(sess
)) {
72 if (!sess
->active
|| !sess
->ust_session
|| !sess
->ust_session
->active
) {
76 ust_app_global_update(sess
->ust_session
, app
);
87 * Sanitize the wait queue of the dispatch registration thread meaning removing
88 * invalid nodes from it. This is to avoid memory leaks for the case the UST
89 * notify socket is never received.
91 static void sanitize_wait_queue(struct ust_reg_wait_queue
*wait_queue
)
93 int ret
, nb_fd
= 0, i
;
94 unsigned int fd_added
= 0;
95 struct lttng_poll_event events
;
96 struct ust_reg_wait_node
*wait_node
= nullptr, *tmp_wait_node
;
98 LTTNG_ASSERT(wait_queue
);
100 lttng_poll_init(&events
);
102 /* Just skip everything for an empty queue. */
103 if (!wait_queue
->count
) {
107 ret
= lttng_poll_create(&events
, wait_queue
->count
, LTTNG_CLOEXEC
);
112 cds_list_for_each_entry_safe (wait_node
, tmp_wait_node
, &wait_queue
->head
, head
) {
113 LTTNG_ASSERT(wait_node
->app
);
114 ret
= lttng_poll_add(&events
, wait_node
->app
->sock
, LPOLLIN
);
127 * Poll but don't block so we can quickly identify the faulty events and
128 * clean them afterwards from the wait queue.
130 ret
= lttng_poll_wait(&events
, 0);
136 for (i
= 0; i
< nb_fd
; i
++) {
138 uint32_t revents
= LTTNG_POLL_GETEV(&events
, i
);
139 int pollfd
= LTTNG_POLL_GETFD(&events
, i
);
141 cds_list_for_each_entry_safe (wait_node
, tmp_wait_node
, &wait_queue
->head
, head
) {
142 if (pollfd
== wait_node
->app
->sock
&& (revents
& (LPOLLHUP
| LPOLLERR
))) {
143 cds_list_del(&wait_node
->head
);
145 ust_app_destroy(wait_node
->app
);
148 * Silence warning of use-after-free in
149 * cds_list_for_each_entry_safe which uses
150 * __typeof__(*wait_node).
155 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
162 DBG("Wait queue sanitized, %d node were cleaned up", nb_fd
);
166 lttng_poll_clean(&events
);
170 lttng_poll_clean(&events
);
172 ERR("Unable to sanitize wait queue");
177 * Send a socket to a thread This is called from the dispatch UST registration
178 * thread once all sockets are set for the application.
180 * The sock value can be invalid, we don't really care, the thread will handle
181 * it and make the necessary cleanup if so.
183 * On success, return 0 else a negative value being the errno message of the
186 static int send_socket_to_thread(int fd
, int sock
)
191 * It's possible that the FD is set as invalid with -1 concurrently just
192 * before calling this function being a shutdown state of the thread.
199 ret
= lttng_write(fd
, &sock
, sizeof(sock
));
200 if (ret
< sizeof(sock
)) {
201 PERROR("write apps pipe %d", fd
);
208 /* All good. Don't send back the write positive ret value. */
214 static void cleanup_ust_dispatch_thread(void *data
)
220 * Dispatch request from the registration threads to the application
221 * communication thread.
223 static void *thread_dispatch_ust_registration(void *data
)
226 struct cds_wfcq_node
*node
;
227 struct ust_command
*ust_cmd
= nullptr;
228 struct ust_reg_wait_node
*wait_node
= nullptr, *tmp_wait_node
;
229 struct ust_reg_wait_queue wait_queue
= {
233 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
235 rcu_register_thread();
237 health_register(the_health_sessiond
, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH
);
239 if (testpoint(sessiond_thread_app_reg_dispatch
)) {
240 goto error_testpoint
;
243 health_code_update();
245 CDS_INIT_LIST_HEAD(&wait_queue
.head
);
247 DBG("[thread] Dispatch UST command started");
250 health_code_update();
252 /* Atomically prepare the queue futex */
253 futex_nto1_prepare(¬ifiers
->ust_cmd_queue
->futex
);
255 if (CMM_LOAD_SHARED(notifiers
->dispatch_thread_exit
)) {
260 struct ust_app
*app
= nullptr;
264 * Make sure we don't have node(s) that have hung up before receiving
265 * the notify socket. This is to clean the list in order to avoid
266 * memory leaks from notify socket that are never seen.
268 sanitize_wait_queue(&wait_queue
);
270 health_code_update();
271 /* Dequeue command for registration */
272 node
= cds_wfcq_dequeue_blocking(¬ifiers
->ust_cmd_queue
->head
,
273 ¬ifiers
->ust_cmd_queue
->tail
);
274 if (node
== nullptr) {
275 DBG("Woken up but nothing in the UST command queue");
276 /* Continue thread execution */
280 ust_cmd
= lttng::utils::container_of(node
, &ust_command::node
);
282 DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
283 " gid:%d sock:%d name:%s (version %d.%d)",
284 ust_cmd
->reg_msg
.pid
,
285 ust_cmd
->reg_msg
.ppid
,
286 ust_cmd
->reg_msg
.uid
,
287 ust_cmd
->reg_msg
.gid
,
289 ust_cmd
->reg_msg
.name
,
290 ust_cmd
->reg_msg
.major
,
291 ust_cmd
->reg_msg
.minor
);
293 if (ust_cmd
->reg_msg
.type
== LTTNG_UST_CTL_SOCKET_CMD
) {
294 wait_node
= zmalloc
<ust_reg_wait_node
>();
296 PERROR("zmalloc wait_node dispatch");
297 ret
= close(ust_cmd
->sock
);
299 PERROR("close ust sock dispatch %d", ust_cmd
->sock
);
301 lttng_fd_put(LTTNG_FD_APPS
, 1);
306 CDS_INIT_LIST_HEAD(&wait_node
->head
);
308 /* Create application object if socket is CMD. */
309 wait_node
->app
= ust_app_create(&ust_cmd
->reg_msg
, ust_cmd
->sock
);
310 if (!wait_node
->app
) {
311 ret
= close(ust_cmd
->sock
);
313 PERROR("close ust sock dispatch %d", ust_cmd
->sock
);
315 lttng_fd_put(LTTNG_FD_APPS
, 1);
323 * Add application to the wait queue so we can set the notify
324 * socket before putting this object in the global ht.
326 cds_list_add(&wait_node
->head
, &wait_queue
.head
);
332 * We have to continue here since we don't have the notify
333 * socket and the application MUST be added to the hash table
334 * only at that moment.
339 * Look for the application in the local wait queue and set the
340 * notify socket if found.
342 cds_list_for_each_entry_safe (
343 wait_node
, tmp_wait_node
, &wait_queue
.head
, head
) {
344 health_code_update();
345 if (wait_node
->app
->pid
== ust_cmd
->reg_msg
.pid
) {
346 wait_node
->app
->notify_sock
= ust_cmd
->sock
;
347 cds_list_del(&wait_node
->head
);
349 app
= wait_node
->app
;
352 DBG3("UST app notify socket %d is set",
359 * With no application at this stage the received socket is
360 * basically useless so close it before we free the cmd data
361 * structure for good.
364 ret
= close(ust_cmd
->sock
);
366 PERROR("close ust sock dispatch %d", ust_cmd
->sock
);
368 lttng_fd_put(LTTNG_FD_APPS
, 1);
378 * Lock the global session list so from the register up to the
379 * registration done message, no thread can see the application
380 * and change its state.
386 * Add application to the global hash table. This needs to be
387 * done before the update to the UST registry can locate the
392 /* Set app version. This call will print an error if needed. */
393 (void) ust_app_version(app
);
395 (void) ust_app_setup_event_notifier_group(app
);
397 /* Send notify socket through the notify pipe. */
398 ret
= send_socket_to_thread(
399 notifiers
->apps_cmd_notify_pipe_write_fd
, app
->notify_sock
);
402 session_unlock_list();
404 * No notify thread, stop the UST tracing. However, this is
405 * not an internal error of the this thread thus setting
406 * the health error code to a normal exit.
413 * Update newly registered application with the tracing
414 * registry info already enabled information.
416 update_ust_app(app
->sock
);
419 * Don't care about return value. Let the manage apps threads
420 * handle app unregistration upon socket close.
422 (void) ust_app_register_done(app
);
425 * Even if the application socket has been closed, send the app
426 * to the thread and unregistration will take place at that
429 ret
= send_socket_to_thread(notifiers
->apps_cmd_pipe_write_fd
,
433 session_unlock_list();
435 * No apps. thread, stop the UST tracing. However, this is
436 * not an internal error of the this thread thus setting
437 * the health error code to a normal exit.
444 session_unlock_list();
446 } while (node
!= nullptr);
449 /* Futex wait on queue. Blocking call on futex() */
450 futex_nto1_wait(¬ifiers
->ust_cmd_queue
->futex
);
453 /* Normal exit, no error */
457 /* Clean up wait queue. */
458 cds_list_for_each_entry_safe (wait_node
, tmp_wait_node
, &wait_queue
.head
, head
) {
459 cds_list_del(&wait_node
->head
);
464 /* Empty command queue. */
466 /* Dequeue command for registration */
467 node
= cds_wfcq_dequeue_blocking(¬ifiers
->ust_cmd_queue
->head
,
468 ¬ifiers
->ust_cmd_queue
->tail
);
469 if (node
== nullptr) {
472 ust_cmd
= lttng::utils::container_of(node
, &ust_command::node
);
473 ret
= close(ust_cmd
->sock
);
475 PERROR("close ust sock exit dispatch %d", ust_cmd
->sock
);
477 lttng_fd_put(LTTNG_FD_APPS
, 1);
482 DBG("Dispatch thread dying");
485 ERR("Health error occurred in %s", __func__
);
487 health_unregister(the_health_sessiond
);
488 rcu_unregister_thread();
492 static bool shutdown_ust_dispatch_thread(void *data
)
494 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
496 CMM_STORE_SHARED(notifiers
->dispatch_thread_exit
, 1);
497 futex_nto1_wake(¬ifiers
->ust_cmd_queue
->futex
);
501 bool launch_ust_dispatch_thread(struct ust_cmd_queue
*cmd_queue
,
502 int apps_cmd_pipe_write_fd
,
503 int apps_cmd_notify_pipe_write_fd
)
505 struct lttng_thread
*thread
;
506 struct thread_notifiers
*notifiers
;
508 notifiers
= zmalloc
<thread_notifiers
>();
512 notifiers
->ust_cmd_queue
= cmd_queue
;
513 notifiers
->apps_cmd_pipe_write_fd
= apps_cmd_pipe_write_fd
;
514 notifiers
->apps_cmd_notify_pipe_write_fd
= apps_cmd_notify_pipe_write_fd
;
516 thread
= lttng_thread_create("UST registration dispatch",
517 thread_dispatch_ust_registration
,
518 shutdown_ust_dispatch_thread
,
519 cleanup_ust_dispatch_thread
,
524 lttng_thread_put(thread
);