Fix: file-descriptor: missing include guards
[lttng-tools.git] / src / bin / lttng-sessiond / manage-consumer.cpp
1 /*
2 * Copyright (C) 2011 EfficiOS Inc.
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10 #include "health-sessiond.hpp"
11 #include "manage-consumer.hpp"
12 #include "testpoint.hpp"
13 #include "thread.hpp"
14 #include "ust-consumer.hpp"
15 #include "utils.hpp"
16
17 #include <common/pipe.hpp>
18 #include <common/utils.hpp>
19
20 #include <signal.h>
21
22 namespace {
23 struct thread_notifiers {
24 struct lttng_pipe *quit_pipe;
25 struct consumer_data *consumer_data;
26 sem_t ready;
27 int initialization_result;
28 };
29 } /* namespace */
30
31 static void mark_thread_as_ready(struct thread_notifiers *notifiers)
32 {
33 DBG("Marking consumer management thread as ready");
34 notifiers->initialization_result = 0;
35 sem_post(&notifiers->ready);
36 }
37
38 static void mark_thread_intialization_as_failed(struct thread_notifiers *notifiers)
39 {
40 ERR("Consumer management thread entering error state");
41 notifiers->initialization_result = -1;
42 sem_post(&notifiers->ready);
43 }
44
45 static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
46 {
47 DBG("Waiting for consumer management thread to be ready");
48 sem_wait(&notifiers->ready);
49 DBG("Consumer management thread is ready");
50 }
51
52 /*
53 * This thread manage the consumer error sent back to the session daemon.
54 */
55 static void *thread_consumer_management(void *data)
56 {
57 int sock = -1, i, ret, err = -1, should_quit = 0;
58 uint32_t nb_fd;
59 enum lttcomm_return_code code;
60 struct lttng_poll_event events;
61 struct thread_notifiers *notifiers = (thread_notifiers *) data;
62 struct consumer_data *consumer_data = notifiers->consumer_data;
63 const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
64 struct consumer_socket *cmd_socket_wrapper = nullptr;
65
66 DBG("[thread] Manage consumer started");
67
68 rcu_register_thread();
69 rcu_thread_online();
70
71 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
72
73 health_code_update();
74
75 /*
76 * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
77 * metadata_sock. Nothing more will be added to this poll set.
78 */
79 ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
80 if (ret < 0) {
81 mark_thread_intialization_as_failed(notifiers);
82 goto error_poll;
83 }
84
85 ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
86 if (ret < 0) {
87 mark_thread_intialization_as_failed(notifiers);
88 goto error;
89 }
90
91 /*
92 * The error socket here is already in a listening state which was done
93 * just before spawning this thread to avoid a race between the consumer
94 * daemon exec trying to connect and the listen() call.
95 */
96 ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
97 if (ret < 0) {
98 mark_thread_intialization_as_failed(notifiers);
99 goto error;
100 }
101
102 health_code_update();
103
104 /* Infinite blocking call, waiting for transmission */
105 health_poll_entry();
106
107 if (testpoint(sessiond_thread_manage_consumer)) {
108 mark_thread_intialization_as_failed(notifiers);
109 goto error;
110 }
111
112 ret = lttng_poll_wait(&events, -1);
113 health_poll_exit();
114 if (ret < 0) {
115 mark_thread_intialization_as_failed(notifiers);
116 goto error;
117 }
118
119 nb_fd = ret;
120
121 for (i = 0; i < nb_fd; i++) {
122 /* Fetch once the poll data */
123 const auto revents = LTTNG_POLL_GETEV(&events, i);
124 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
125
126 health_code_update();
127
128 /* Activity on thread quit pipe, exiting. */
129 if (pollfd == thread_quit_pipe_fd) {
130 DBG("Activity on thread quit pipe");
131 err = 0;
132 mark_thread_intialization_as_failed(notifiers);
133 goto exit;
134 } else if (pollfd == consumer_data->err_sock) {
135 /* Event on the registration socket */
136 if (revents & LPOLLIN) {
137 continue;
138 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
139 ERR("consumer err socket poll error");
140 mark_thread_intialization_as_failed(notifiers);
141 goto error;
142 } else {
143 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
144 mark_thread_intialization_as_failed(notifiers);
145 goto error;
146 }
147 }
148 }
149
150 sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
151 if (sock < 0) {
152 mark_thread_intialization_as_failed(notifiers);
153 goto error;
154 }
155
156 /*
157 * Set the CLOEXEC flag. Return code is useless because either way, the
158 * show must go on.
159 */
160 (void) utils_set_fd_cloexec(sock);
161
162 health_code_update();
163
164 DBG2("Receiving code from consumer err_sock");
165
166 /* Getting status code from kconsumerd */
167 ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
168 if (ret <= 0) {
169 mark_thread_intialization_as_failed(notifiers);
170 goto error;
171 }
172
173 health_code_update();
174 if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
175 ERR("consumer error when waiting for SOCK_READY : %s",
176 lttcomm_get_readable_code((lttcomm_return_code) -code));
177 mark_thread_intialization_as_failed(notifiers);
178 goto error;
179 }
180
181 /* Connect both command and metadata sockets. */
182 consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
183 consumer_data->metadata_fd = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
184 if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
185 PERROR("consumer connect cmd socket");
186 mark_thread_intialization_as_failed(notifiers);
187 goto error;
188 }
189
190 consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
191
192 /* Create metadata socket lock. */
193 consumer_data->metadata_sock.lock = zmalloc<pthread_mutex_t>();
194 if (consumer_data->metadata_sock.lock == nullptr) {
195 PERROR("zmalloc pthread mutex");
196 mark_thread_intialization_as_failed(notifiers);
197 goto error;
198 }
199 pthread_mutex_init(consumer_data->metadata_sock.lock, nullptr);
200
201 DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
202 DBG("Consumer metadata socket ready (fd: %d)", consumer_data->metadata_fd);
203
204 /*
205 * Remove the consumerd error sock since we've established a connection.
206 */
207 ret = lttng_poll_del(&events, consumer_data->err_sock);
208 if (ret < 0) {
209 mark_thread_intialization_as_failed(notifiers);
210 goto error;
211 }
212
213 /* Add new accepted error socket. */
214 ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
215 if (ret < 0) {
216 mark_thread_intialization_as_failed(notifiers);
217 goto error;
218 }
219
220 /* Add metadata socket that is successfully connected. */
221 ret = lttng_poll_add(&events, consumer_data->metadata_fd, LPOLLIN | LPOLLRDHUP);
222 if (ret < 0) {
223 mark_thread_intialization_as_failed(notifiers);
224 goto error;
225 }
226
227 health_code_update();
228
229 /*
230 * Transfer the write-end of the channel monitoring pipe to the consumer
231 * by issuing a SET_CHANNEL_MONITOR_PIPE command.
232 */
233 cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
234 if (!cmd_socket_wrapper) {
235 mark_thread_intialization_as_failed(notifiers);
236 goto error;
237 }
238 cmd_socket_wrapper->lock = &consumer_data->lock;
239
240 pthread_mutex_lock(cmd_socket_wrapper->lock);
241 ret = consumer_init(cmd_socket_wrapper, the_sessiond_uuid);
242 if (ret) {
243 ERR("Failed to send sessiond uuid to consumer daemon");
244 mark_thread_intialization_as_failed(notifiers);
245 pthread_mutex_unlock(cmd_socket_wrapper->lock);
246 goto error;
247 }
248 pthread_mutex_unlock(cmd_socket_wrapper->lock);
249
250 ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
251 consumer_data->channel_monitor_pipe);
252 if (ret) {
253 mark_thread_intialization_as_failed(notifiers);
254 goto error;
255 }
256
257 /* Discard the socket wrapper as it is no longer needed. */
258 consumer_destroy_socket(cmd_socket_wrapper);
259 cmd_socket_wrapper = nullptr;
260
261 /* The thread is completely initialized, signal that it is ready. */
262 mark_thread_as_ready(notifiers);
263
264 /* Infinite blocking call, waiting for transmission */
265 while (true) {
266 health_code_update();
267
268 /* Exit the thread because the thread quit pipe has been triggered. */
269 if (should_quit) {
270 /* Not a health error. */
271 err = 0;
272 goto exit;
273 }
274
275 health_poll_entry();
276 ret = lttng_poll_wait(&events, -1);
277 health_poll_exit();
278 if (ret < 0) {
279 goto error;
280 }
281
282 nb_fd = ret;
283
284 for (i = 0; i < nb_fd; i++) {
285 /* Fetch once the poll data */
286 const auto revents = LTTNG_POLL_GETEV(&events, i);
287 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
288
289 health_code_update();
290
291 /*
292 * Thread quit pipe has been triggered, flag that we should stop
293 * but continue the current loop to handle potential data from
294 * consumer.
295 */
296 if (pollfd == thread_quit_pipe_fd) {
297 should_quit = 1;
298 } else if (pollfd == sock) {
299 /* Event on the consumerd socket */
300 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
301 !(revents & LPOLLIN)) {
302 ERR("consumer err socket second poll error");
303 goto error;
304 }
305 health_code_update();
306 /* Wait for any kconsumerd error */
307 ret = lttcomm_recv_unix_sock(
308 sock, &code, sizeof(enum lttcomm_return_code));
309 if (ret <= 0) {
310 ERR("consumer closed the command socket");
311 goto error;
312 }
313
314 ERR("consumer return code : %s",
315 lttcomm_get_readable_code((lttcomm_return_code) -code));
316
317 goto exit;
318 } else if (pollfd == consumer_data->metadata_fd) {
319 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) &&
320 !(revents & LPOLLIN)) {
321 ERR("consumer err metadata socket second poll error");
322 goto error;
323 }
324 /* UST metadata requests */
325 ret = ust_consumer_metadata_request(&consumer_data->metadata_sock);
326 if (ret < 0) {
327 ERR("Handling metadata request");
328 goto error;
329 }
330 }
331 /* No need for an else branch all FDs are tested prior. */
332 }
333 health_code_update();
334 }
335
336 exit:
337 error:
338 /*
339 * We lock here because we are about to close the sockets and some other
340 * thread might be using them so get exclusive access which will abort all
341 * other consumer command by other threads.
342 */
343 pthread_mutex_lock(&consumer_data->lock);
344
345 /* Immediately set the consumerd state to stopped */
346 if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
347 uatomic_set(&the_kernel_consumerd_state, CONSUMER_ERROR);
348 } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
349 consumer_data->type == LTTNG_CONSUMER32_UST) {
350 uatomic_set(&the_ust_consumerd_state, CONSUMER_ERROR);
351 } else {
352 /* Code flow error... */
353 abort();
354 }
355
356 if (consumer_data->err_sock >= 0) {
357 ret = close(consumer_data->err_sock);
358 if (ret) {
359 PERROR("close");
360 }
361 consumer_data->err_sock = -1;
362 }
363 if (consumer_data->cmd_sock >= 0) {
364 ret = close(consumer_data->cmd_sock);
365 if (ret) {
366 PERROR("close");
367 }
368 consumer_data->cmd_sock = -1;
369 }
370 if (consumer_data->metadata_sock.fd_ptr && *consumer_data->metadata_sock.fd_ptr >= 0) {
371 ret = close(*consumer_data->metadata_sock.fd_ptr);
372 if (ret) {
373 PERROR("close");
374 }
375 }
376 if (sock >= 0) {
377 ret = close(sock);
378 if (ret) {
379 PERROR("close");
380 }
381 }
382
383 unlink(consumer_data->err_unix_sock_path);
384 unlink(consumer_data->cmd_unix_sock_path);
385 pthread_mutex_unlock(&consumer_data->lock);
386
387 /* Cleanup metadata socket mutex. */
388 if (consumer_data->metadata_sock.lock) {
389 pthread_mutex_destroy(consumer_data->metadata_sock.lock);
390 free(consumer_data->metadata_sock.lock);
391 }
392 lttng_poll_clean(&events);
393
394 if (cmd_socket_wrapper) {
395 consumer_destroy_socket(cmd_socket_wrapper);
396 }
397 error_poll:
398 if (err) {
399 health_error();
400 ERR("Health error occurred in %s", __func__);
401 }
402 health_unregister(the_health_sessiond);
403 DBG("consumer thread cleanup completed");
404
405 rcu_thread_offline();
406 rcu_unregister_thread();
407
408 return nullptr;
409 }
410
411 static bool shutdown_consumer_management_thread(void *data)
412 {
413 struct thread_notifiers *notifiers = (thread_notifiers *) data;
414 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
415
416 return notify_thread_pipe(write_fd) == 1;
417 }
418
419 static void cleanup_consumer_management_thread(void *data)
420 {
421 struct thread_notifiers *notifiers = (thread_notifiers *) data;
422
423 lttng_pipe_destroy(notifiers->quit_pipe);
424 free(notifiers);
425 }
426
427 bool launch_consumer_management_thread(struct consumer_data *consumer_data)
428 {
429 struct lttng_pipe *quit_pipe;
430 struct thread_notifiers *notifiers = nullptr;
431 struct lttng_thread *thread;
432
433 notifiers = zmalloc<thread_notifiers>();
434 if (!notifiers) {
435 goto error_alloc;
436 }
437
438 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
439 if (!quit_pipe) {
440 goto error;
441 }
442 notifiers->quit_pipe = quit_pipe;
443 notifiers->consumer_data = consumer_data;
444 sem_init(&notifiers->ready, 0, 0);
445
446 thread = lttng_thread_create("Consumer management",
447 thread_consumer_management,
448 shutdown_consumer_management_thread,
449 cleanup_consumer_management_thread,
450 notifiers);
451 if (!thread) {
452 goto error;
453 }
454 wait_until_thread_is_ready(notifiers);
455 lttng_thread_put(thread);
456 return notifiers->initialization_result == 0;
457 error:
458 cleanup_consumer_management_thread(notifiers);
459 error_alloc:
460 return false;
461 }
This page took 0.039268 seconds and 4 git commands to generate.