2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
10 #include "health-sessiond.hpp"
11 #include "manage-consumer.hpp"
12 #include "testpoint.hpp"
14 #include "ust-consumer.hpp"
17 #include <common/pipe.hpp>
18 #include <common/utils.hpp>
24 struct thread_notifiers
{
25 struct lttng_pipe
*quit_pipe
;
26 struct consumer_data
*consumer_data
;
28 int initialization_result
;
32 static void mark_thread_as_ready(struct thread_notifiers
*notifiers
)
34 DBG("Marking consumer management thread as ready");
35 notifiers
->initialization_result
= 0;
36 sem_post(¬ifiers
->ready
);
39 static void mark_thread_intialization_as_failed(struct thread_notifiers
*notifiers
)
41 ERR("Consumer management thread entering error state");
42 notifiers
->initialization_result
= -1;
43 sem_post(¬ifiers
->ready
);
46 static void wait_until_thread_is_ready(struct thread_notifiers
*notifiers
)
48 DBG("Waiting for consumer management thread to be ready");
49 sem_wait(¬ifiers
->ready
);
50 DBG("Consumer management thread is ready");
54 * This thread manage the consumer error sent back to the session daemon.
56 static void *thread_consumer_management(void *data
)
58 int sock
= -1, i
, ret
, err
= -1, should_quit
= 0;
60 enum lttcomm_return_code code
;
61 struct lttng_poll_event events
;
62 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
63 struct consumer_data
*consumer_data
= notifiers
->consumer_data
;
64 const auto thread_quit_pipe_fd
= lttng_pipe_get_readfd(notifiers
->quit_pipe
);
65 struct consumer_socket
*cmd_socket_wrapper
= nullptr;
67 DBG("[thread] Manage consumer started");
69 rcu_register_thread();
72 health_register(the_health_sessiond
, HEALTH_SESSIOND_TYPE_CONSUMER
);
77 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
78 * metadata_sock. Nothing more will be added to this poll set.
80 ret
= lttng_poll_create(&events
, 3, LTTNG_CLOEXEC
);
82 mark_thread_intialization_as_failed(notifiers
);
86 ret
= lttng_poll_add(&events
, thread_quit_pipe_fd
, LPOLLIN
);
88 mark_thread_intialization_as_failed(notifiers
);
93 * The error socket here is already in a listening state which was done
94 * just before spawning this thread to avoid a race between the consumer
95 * daemon exec trying to connect and the listen() call.
97 ret
= lttng_poll_add(&events
, consumer_data
->err_sock
, LPOLLIN
| LPOLLRDHUP
);
99 mark_thread_intialization_as_failed(notifiers
);
103 health_code_update();
105 /* Infinite blocking call, waiting for transmission */
108 if (testpoint(sessiond_thread_manage_consumer
)) {
109 mark_thread_intialization_as_failed(notifiers
);
113 ret
= lttng_poll_wait(&events
, -1);
116 mark_thread_intialization_as_failed(notifiers
);
122 for (i
= 0; i
< nb_fd
; i
++) {
123 /* Fetch once the poll data */
124 const auto revents
= LTTNG_POLL_GETEV(&events
, i
);
125 const auto pollfd
= LTTNG_POLL_GETFD(&events
, i
);
127 health_code_update();
129 /* Activity on thread quit pipe, exiting. */
130 if (pollfd
== thread_quit_pipe_fd
) {
131 DBG("Activity on thread quit pipe");
133 mark_thread_intialization_as_failed(notifiers
);
135 } else if (pollfd
== consumer_data
->err_sock
) {
136 /* Event on the registration socket */
137 if (revents
& LPOLLIN
) {
139 } else if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
140 ERR("consumer err socket poll error");
141 mark_thread_intialization_as_failed(notifiers
);
144 ERR("Unexpected poll events %u for sock %d", revents
, pollfd
);
145 mark_thread_intialization_as_failed(notifiers
);
151 sock
= lttcomm_accept_unix_sock(consumer_data
->err_sock
);
153 mark_thread_intialization_as_failed(notifiers
);
158 * Set the CLOEXEC flag. Return code is useless because either way, the
161 (void) utils_set_fd_cloexec(sock
);
163 health_code_update();
165 DBG2("Receiving code from consumer err_sock");
167 /* Getting status code from consumerd */
169 std::int32_t comm_code
= 0;
171 ret
= lttcomm_recv_unix_sock(sock
, &comm_code
, sizeof(comm_code
));
172 code
= static_cast<decltype(code
)>(comm_code
);
175 mark_thread_intialization_as_failed(notifiers
);
179 health_code_update();
180 if (code
!= LTTCOMM_CONSUMERD_COMMAND_SOCK_READY
) {
181 ERR("consumer error when waiting for SOCK_READY : %s",
182 lttcomm_get_readable_code((lttcomm_return_code
) -code
));
183 mark_thread_intialization_as_failed(notifiers
);
187 /* Connect both command and metadata sockets. */
188 consumer_data
->cmd_sock
= lttcomm_connect_unix_sock(consumer_data
->cmd_unix_sock_path
);
189 consumer_data
->metadata_fd
= lttcomm_connect_unix_sock(consumer_data
->cmd_unix_sock_path
);
190 if (consumer_data
->cmd_sock
< 0 || consumer_data
->metadata_fd
< 0) {
191 PERROR("consumer connect cmd socket");
192 mark_thread_intialization_as_failed(notifiers
);
196 consumer_data
->metadata_sock
.fd_ptr
= &consumer_data
->metadata_fd
;
198 /* Create metadata socket lock. */
199 consumer_data
->metadata_sock
.lock
= zmalloc
<pthread_mutex_t
>();
200 if (consumer_data
->metadata_sock
.lock
== nullptr) {
201 PERROR("zmalloc pthread mutex");
202 mark_thread_intialization_as_failed(notifiers
);
205 pthread_mutex_init(consumer_data
->metadata_sock
.lock
, nullptr);
207 DBG("Consumer command socket ready (fd: %d)", consumer_data
->cmd_sock
);
208 DBG("Consumer metadata socket ready (fd: %d)", consumer_data
->metadata_fd
);
211 * Remove the consumerd error sock since we've established a connection.
213 ret
= lttng_poll_del(&events
, consumer_data
->err_sock
);
215 mark_thread_intialization_as_failed(notifiers
);
219 /* Add new accepted error socket. */
220 ret
= lttng_poll_add(&events
, sock
, LPOLLIN
| LPOLLRDHUP
);
222 mark_thread_intialization_as_failed(notifiers
);
226 /* Add metadata socket that is successfully connected. */
227 ret
= lttng_poll_add(&events
, consumer_data
->metadata_fd
, LPOLLIN
| LPOLLRDHUP
);
229 mark_thread_intialization_as_failed(notifiers
);
233 health_code_update();
236 * Transfer the write-end of the channel monitoring pipe to the consumer
237 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
239 cmd_socket_wrapper
= consumer_allocate_socket(&consumer_data
->cmd_sock
);
240 if (!cmd_socket_wrapper
) {
241 mark_thread_intialization_as_failed(notifiers
);
244 cmd_socket_wrapper
->lock
= &consumer_data
->lock
;
246 pthread_mutex_lock(cmd_socket_wrapper
->lock
);
247 ret
= consumer_init(cmd_socket_wrapper
, the_sessiond_uuid
);
249 ERR("Failed to send sessiond uuid to consumer daemon");
250 mark_thread_intialization_as_failed(notifiers
);
251 pthread_mutex_unlock(cmd_socket_wrapper
->lock
);
254 pthread_mutex_unlock(cmd_socket_wrapper
->lock
);
256 ret
= consumer_send_channel_monitor_pipe(cmd_socket_wrapper
,
257 consumer_data
->channel_monitor_pipe
);
259 mark_thread_intialization_as_failed(notifiers
);
263 /* Discard the socket wrapper as it is no longer needed. */
264 consumer_destroy_socket(cmd_socket_wrapper
);
265 cmd_socket_wrapper
= nullptr;
267 /* The thread is completely initialized, signal that it is ready. */
268 mark_thread_as_ready(notifiers
);
270 /* Infinite blocking call, waiting for transmission */
272 health_code_update();
274 /* Exit the thread because the thread quit pipe has been triggered. */
276 /* Not a health error. */
282 ret
= lttng_poll_wait(&events
, -1);
290 for (i
= 0; i
< nb_fd
; i
++) {
291 /* Fetch once the poll data */
292 const auto revents
= LTTNG_POLL_GETEV(&events
, i
);
293 const auto pollfd
= LTTNG_POLL_GETFD(&events
, i
);
295 health_code_update();
298 * Thread quit pipe has been triggered, flag that we should stop
299 * but continue the current loop to handle potential data from
302 if (pollfd
== thread_quit_pipe_fd
) {
304 } else if (pollfd
== sock
) {
305 /* Event on the consumerd socket */
306 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
) &&
307 !(revents
& LPOLLIN
)) {
308 ERR("consumer err socket second poll error");
311 health_code_update();
312 /* Wait for any consumerd error */
314 std::int32_t comm_code
= 0;
316 ret
= lttcomm_recv_unix_sock(
317 sock
, &comm_code
, sizeof(comm_code
));
318 code
= static_cast<decltype(code
)>(comm_code
);
321 ERR("consumer closed the command socket");
325 ERR("consumer return code : %s",
326 lttcomm_get_readable_code((lttcomm_return_code
) -code
));
329 } else if (pollfd
== consumer_data
->metadata_fd
) {
330 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
) &&
331 !(revents
& LPOLLIN
)) {
332 ERR("consumer err metadata socket second poll error");
335 /* UST metadata requests */
336 ret
= ust_consumer_metadata_request(&consumer_data
->metadata_sock
);
338 ERR("Handling metadata request");
342 /* No need for an else branch all FDs are tested prior. */
344 health_code_update();
350 * We lock here because we are about to close the sockets and some other
351 * thread might be using them so get exclusive access which will abort all
352 * other consumer command by other threads.
354 pthread_mutex_lock(&consumer_data
->lock
);
356 /* Immediately set the consumerd state to stopped */
357 if (consumer_data
->type
== LTTNG_CONSUMER_KERNEL
) {
358 uatomic_set(&the_kernel_consumerd_state
, CONSUMER_ERROR
);
359 } else if (consumer_data
->type
== LTTNG_CONSUMER64_UST
||
360 consumer_data
->type
== LTTNG_CONSUMER32_UST
) {
361 uatomic_set(&the_ust_consumerd_state
, CONSUMER_ERROR
);
363 /* Code flow error... */
367 if (consumer_data
->err_sock
>= 0) {
368 ret
= close(consumer_data
->err_sock
);
372 consumer_data
->err_sock
= -1;
374 if (consumer_data
->cmd_sock
>= 0) {
375 ret
= close(consumer_data
->cmd_sock
);
379 consumer_data
->cmd_sock
= -1;
381 if (consumer_data
->metadata_sock
.fd_ptr
&& *consumer_data
->metadata_sock
.fd_ptr
>= 0) {
382 ret
= close(*consumer_data
->metadata_sock
.fd_ptr
);
394 unlink(consumer_data
->err_unix_sock_path
);
395 unlink(consumer_data
->cmd_unix_sock_path
);
396 pthread_mutex_unlock(&consumer_data
->lock
);
398 /* Cleanup metadata socket mutex. */
399 if (consumer_data
->metadata_sock
.lock
) {
400 pthread_mutex_destroy(consumer_data
->metadata_sock
.lock
);
401 free(consumer_data
->metadata_sock
.lock
);
403 lttng_poll_clean(&events
);
405 if (cmd_socket_wrapper
) {
406 consumer_destroy_socket(cmd_socket_wrapper
);
411 ERR("Health error occurred in %s", __func__
);
413 health_unregister(the_health_sessiond
);
414 DBG("consumer thread cleanup completed");
416 rcu_thread_offline();
417 rcu_unregister_thread();
422 static bool shutdown_consumer_management_thread(void *data
)
424 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
425 const int write_fd
= lttng_pipe_get_writefd(notifiers
->quit_pipe
);
427 return notify_thread_pipe(write_fd
) == 1;
430 static void cleanup_consumer_management_thread(void *data
)
432 struct thread_notifiers
*notifiers
= (thread_notifiers
*) data
;
434 lttng_pipe_destroy(notifiers
->quit_pipe
);
438 bool launch_consumer_management_thread(struct consumer_data
*consumer_data
)
440 struct lttng_pipe
*quit_pipe
;
441 struct thread_notifiers
*notifiers
= nullptr;
442 struct lttng_thread
*thread
;
444 notifiers
= zmalloc
<thread_notifiers
>();
449 quit_pipe
= lttng_pipe_open(FD_CLOEXEC
);
453 notifiers
->quit_pipe
= quit_pipe
;
454 notifiers
->consumer_data
= consumer_data
;
455 sem_init(¬ifiers
->ready
, 0, 0);
457 thread
= lttng_thread_create("Consumer management",
458 thread_consumer_management
,
459 shutdown_consumer_management_thread
,
460 cleanup_consumer_management_thread
,
465 wait_until_thread_is_ready(notifiers
);
466 lttng_thread_put(thread
);
467 return notifiers
->initialization_result
== 0;
469 cleanup_consumer_management_thread(notifiers
);