Fix: consumerd: consumed size miscomputed during statistics sampling
[lttng-tools.git] / src / bin / lttng-sessiond / register.cpp
CommitLineData
1785d7f2 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
1785d7f2 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
1785d7f2 7 *
1785d7f2
JG
8 */
9
28ab034a
JG
10#include "fd-limit.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "register.hpp"
14#include "testpoint.hpp"
15#include "thread.hpp"
16#include "utils.hpp"
17
c9e313bc
SM
18#include <common/futex.hpp>
19#include <common/macros.hpp>
20#include <common/shm.hpp>
21#include <common/utils.hpp>
1785d7f2 22
671e39d7 23#include <fcntl.h>
28ab034a
JG
24#include <stddef.h>
25#include <stdlib.h>
26#include <sys/stat.h>
27#include <urcu.h>
1785d7f2 28
f1494934 29namespace {
a13091b7 30struct thread_state {
1785d7f2
JG
31 struct lttng_pipe *quit_pipe;
32 struct ust_cmd_queue *ust_cmd_queue;
9c9d917c 33 sem_t ready;
86d0f119 34 bool running;
a13091b7 35 int application_socket;
1785d7f2 36};
f1494934 37} /* namespace */
1785d7f2
JG
38
39/*
40 * Creates the application socket.
41 */
cd9adb8b 42static int create_application_socket()
1785d7f2
JG
43{
44 int ret = 0;
45 int apps_sock;
1785d7f2
JG
46
47 /* Create the application unix socket */
28ab034a 48 apps_sock = lttcomm_create_unix_sock(the_config.apps_unix_sock_path.value);
1785d7f2 49 if (apps_sock < 0) {
28ab034a 50 ERR("Create unix sock failed: %s", the_config.apps_unix_sock_path.value);
1785d7f2
JG
51 ret = -1;
52 goto end;
53 }
54
55 /* Set the cloexec flag */
56 ret = utils_set_fd_cloexec(apps_sock);
57 if (ret < 0) {
58 ERR("Unable to set CLOEXEC flag to the app Unix socket (fd: %d). "
28ab034a
JG
59 "Continuing but note that the consumer daemon will have a "
60 "reference to this socket on exec()",
61 apps_sock);
1785d7f2
JG
62 }
63
64 /* File permission MUST be 666 */
412d7227 65 ret = chmod(the_config.apps_unix_sock_path.value,
28ab034a 66 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1785d7f2 67 if (ret < 0) {
28ab034a 68 PERROR("Set file permissions failed on %s", the_config.apps_unix_sock_path.value);
d9c6b5f2 69 goto error_close_socket;
1785d7f2
JG
70 }
71
72 DBG3("Session daemon application socket created (fd = %d) ", apps_sock);
73 ret = apps_sock;
74end:
1785d7f2 75 return ret;
d9c6b5f2
JG
76error_close_socket:
77 if (close(apps_sock)) {
78 PERROR("Failed to close application socket in error path");
79 }
80 apps_sock = -1;
81 ret = -1;
82 goto end;
1785d7f2
JG
83}
84
85/*
86 * Notify UST applications using the shm mmap futex.
87 */
88static int notify_ust_apps(int active, bool is_root)
89{
90 char *wait_shm_mmap;
91
92 DBG("Notifying applications of session daemon state: %d", active);
93
94 /* See shm.c for this call implying mmap, shm and futex calls */
28ab034a 95 wait_shm_mmap = shm_ust_get_mmap(the_config.wait_shm_path.value, is_root);
cd9adb8b 96 if (wait_shm_mmap == nullptr) {
1785d7f2
JG
97 goto error;
98 }
99
100 /* Wake waiting process */
101 futex_wait_update((int32_t *) wait_shm_mmap, active);
102
103 /* Apps notified successfully */
104 return 0;
105
106error:
107 return -1;
108}
109
110static void cleanup_application_registration_thread(void *data)
111{
7966af57 112 struct thread_state *thread_state = (struct thread_state *) data;
1785d7f2 113
a13091b7
JG
114 if (!data) {
115 return;
116 }
117
118 lttng_pipe_destroy(thread_state->quit_pipe);
119 free(thread_state);
1785d7f2
JG
120}
121
a13091b7 122static void set_thread_status(struct thread_state *thread_state, bool running)
9c9d917c 123{
86d0f119 124 DBG("Marking application registration thread's state as %s", running ? "running" : "error");
a13091b7
JG
125 thread_state->running = running;
126 sem_post(&thread_state->ready);
9c9d917c
JG
127}
128
a13091b7 129static bool wait_thread_status(struct thread_state *thread_state)
9c9d917c
JG
130{
131 DBG("Waiting for application registration thread to be ready");
a13091b7
JG
132 sem_wait(&thread_state->ready);
133 if (thread_state->running) {
86d0f119
JG
134 DBG("Application registration thread is ready");
135 } else {
136 ERR("Initialization of application registration thread failed");
137 }
138
a13091b7 139 return thread_state->running;
86d0f119
JG
140}
141
142static void thread_init_cleanup(void *data)
143{
7966af57 144 struct thread_state *thread_state = (struct thread_state *) data;
86d0f119 145
a13091b7 146 set_thread_status(thread_state, false);
9c9d917c
JG
147}
148
1785d7f2
JG
149/*
150 * This thread manage application registration.
151 */
152static void *thread_application_registration(void *data)
153{
8a00688e
MJ
154 int sock = -1, i, ret, err = -1;
155 uint32_t nb_fd;
1785d7f2
JG
156 struct lttng_poll_event events;
157 /*
158 * Gets allocated in this thread, enqueued to a global queue, dequeued
159 * and freed in the manage apps thread.
160 */
cd9adb8b 161 struct ust_command *ust_cmd = nullptr;
1785d7f2 162 const bool is_root = (getuid() == 0);
7966af57 163 struct thread_state *thread_state = (struct thread_state *) data;
a13091b7 164 const int application_socket = thread_state->application_socket;
28ab034a 165 const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(thread_state->quit_pipe);
1785d7f2
JG
166
167 DBG("[thread] Manage application registration started");
168
a0b34569 169 pthread_cleanup_push(thread_init_cleanup, thread_state);
412d7227 170 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG);
1785d7f2 171
a13091b7 172 ret = lttcomm_listen_unix_sock(application_socket);
1785d7f2
JG
173 if (ret < 0) {
174 goto error_listen;
175 }
176
177 /*
178 * Pass 2 as size here for the thread quit pipe and apps_sock. Nothing
179 * more will be added to this poll set.
180 */
181 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
182 if (ret < 0) {
183 goto error_create_poll;
184 }
185
186 /* Add the application registration socket */
a13091b7 187 ret = lttng_poll_add(&events, application_socket, LPOLLIN | LPOLLRDHUP);
1785d7f2
JG
188 if (ret < 0) {
189 goto error_poll_add;
190 }
191
192 /* Add the application registration socket */
8a00688e 193 ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLRDHUP);
1785d7f2
JG
194 if (ret < 0) {
195 goto error_poll_add;
196 }
197
a13091b7
JG
198 set_thread_status(thread_state, true);
199 pthread_cleanup_pop(0);
1785d7f2 200
9ad83bb6
JR
201 if (testpoint(sessiond_thread_registration_apps)) {
202 goto error_poll_add;
203 }
204
cd9adb8b 205 while (true) {
1785d7f2
JG
206 DBG("Accepting application registration");
207
208 /* Inifinite blocking call, waiting for transmission */
209 restart:
210 health_poll_entry();
211 ret = lttng_poll_wait(&events, -1);
212 health_poll_exit();
213 if (ret < 0) {
214 /*
215 * Restart interrupted system call.
216 */
217 if (errno == EINTR) {
218 goto restart;
219 }
220 goto error;
221 }
222
223 nb_fd = ret;
224
225 for (i = 0; i < nb_fd; i++) {
226 health_code_update();
227
228 /* Fetch once the poll data */
8a00688e
MJ
229 const auto revents = LTTNG_POLL_GETEV(&events, i);
230 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
1785d7f2 231
8a00688e
MJ
232 /* Activity on thread quit pipe, closing. */
233 if (pollfd == thread_quit_pipe_fd) {
1785d7f2
JG
234 err = 0;
235 goto exit;
8a00688e 236 }
1785d7f2 237
8a00688e
MJ
238 /* Event on the registration socket. */
239 if (revents & LPOLLIN) {
240 sock = lttcomm_accept_unix_sock(application_socket);
241 if (sock < 0) {
242 goto error;
243 }
1785d7f2 244
8a00688e
MJ
245 /*
246 * Set socket timeout for both receiving and ending.
247 * app_socket_timeout is in seconds, whereas
248 * lttcomm_setsockopt_rcv_timeout and
249 * lttcomm_setsockopt_snd_timeout expect msec as
250 * parameter.
251 */
252 if (the_config.app_socket_timeout >= 0) {
28ab034a
JG
253 (void) lttcomm_setsockopt_rcv_timeout(
254 sock, the_config.app_socket_timeout * 1000);
255 (void) lttcomm_setsockopt_snd_timeout(
256 sock, the_config.app_socket_timeout * 1000);
8a00688e 257 }
1785d7f2 258
8a00688e
MJ
259 /*
260 * Set the CLOEXEC flag. Return code is useless because
261 * either way, the show must go on.
262 */
263 (void) utils_set_fd_cloexec(sock);
264
265 /* Create UST registration command for enqueuing */
266 ust_cmd = zmalloc<ust_command>();
cd9adb8b 267 if (ust_cmd == nullptr) {
8a00688e
MJ
268 PERROR("ust command zmalloc");
269 ret = close(sock);
270 if (ret) {
271 PERROR("close");
1785d7f2 272 }
8a00688e
MJ
273 sock = -1;
274 goto error;
275 }
1785d7f2 276
8a00688e
MJ
277 /*
278 * Using message-based transmissions to ensure we don't
279 * have to deal with partially received messages.
280 */
281 ret = lttng_fd_get(LTTNG_FD_APPS, 1);
282 if (ret < 0) {
283 ERR("Exhausted file descriptors allowed for applications.");
284 free(ust_cmd);
285 ret = close(sock);
286 if (ret) {
287 PERROR("close");
1785d7f2 288 }
1785d7f2 289 sock = -1;
8a00688e
MJ
290 continue;
291 }
1785d7f2 292
8a00688e
MJ
293 health_code_update();
294 ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg);
295 if (ret < 0) {
296 free(ust_cmd);
297 /* Close socket of the application. */
298 ret = close(sock);
299 if (ret) {
300 PERROR("close");
301 }
302 lttng_fd_put(LTTNG_FD_APPS, 1);
303 sock = -1;
304 continue;
1785d7f2 305 }
8a00688e
MJ
306 health_code_update();
307
308 ust_cmd->sock = sock;
309 sock = -1;
310
311 DBG("UST registration received with pid:%d ppid:%d uid:%d"
28ab034a
JG
312 " gid:%d sock:%d name:%s (version %d.%d)",
313 ust_cmd->reg_msg.pid,
314 ust_cmd->reg_msg.ppid,
315 ust_cmd->reg_msg.uid,
316 ust_cmd->reg_msg.gid,
317 ust_cmd->sock,
318 ust_cmd->reg_msg.name,
319 ust_cmd->reg_msg.major,
320 ust_cmd->reg_msg.minor);
8a00688e
MJ
321
322 /*
323 * Lock free enqueue the registration request. The red pill
324 * has been taken! This apps will be part of the *system*.
325 */
326 cds_wfcq_head_ptr_t head;
327 head.h = &thread_state->ust_cmd_queue->head;
28ab034a
JG
328 cds_wfcq_enqueue(
329 head, &thread_state->ust_cmd_queue->tail, &ust_cmd->node);
8a00688e
MJ
330
331 /*
332 * Wake the registration queue futex. Implicit memory
333 * barrier with the exchange in cds_wfcq_enqueue.
334 */
335 futex_nto1_wake(&thread_state->ust_cmd_queue->futex);
336 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
337 ERR("Register apps socket poll error");
338 goto error;
339 } else {
340 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
341 goto error;
1785d7f2
JG
342 }
343 }
344 }
345
346exit:
347error:
348 /* Notify that the registration thread is gone */
349 notify_ust_apps(0, is_root);
350
a13091b7
JG
351 ret = close(application_socket);
352 if (ret) {
353 PERROR("Failed to close application registration socket");
1785d7f2
JG
354 }
355 if (sock >= 0) {
356 ret = close(sock);
357 if (ret) {
a13091b7 358 PERROR("Failed to close application socket");
1785d7f2
JG
359 }
360 lttng_fd_put(LTTNG_FD_APPS, 1);
361 }
412d7227 362 unlink(the_config.apps_unix_sock_path.value);
1785d7f2
JG
363
364error_poll_add:
365 lttng_poll_clean(&events);
366error_listen:
367error_create_poll:
1785d7f2
JG
368 DBG("UST Registration thread cleanup complete");
369 if (err) {
370 health_error();
371 ERR("Health error occurred in %s", __func__);
372 }
412d7227 373 health_unregister(the_health_sessiond);
cd9adb8b 374 return nullptr;
1785d7f2
JG
375}
376
377static bool shutdown_application_registration_thread(void *data)
378{
7966af57 379 struct thread_state *thread_state = (struct thread_state *) data;
a13091b7 380 const int write_fd = lttng_pipe_get_writefd(thread_state->quit_pipe);
1785d7f2
JG
381
382 return notify_thread_pipe(write_fd) == 1;
383}
384
28ab034a 385struct lttng_thread *launch_application_registration_thread(struct ust_cmd_queue *cmd_queue)
1785d7f2 386{
a13091b7 387 int ret;
1785d7f2 388 struct lttng_pipe *quit_pipe;
cd9adb8b
JG
389 struct thread_state *thread_state = nullptr;
390 struct lttng_thread *thread = nullptr;
a13091b7
JG
391 const bool is_root = (getuid() == 0);
392 int application_socket = -1;
1785d7f2 393
64803277 394 thread_state = zmalloc<struct thread_state>();
a13091b7 395 if (!thread_state) {
21fa020e
JG
396 goto error_alloc;
397 }
398 quit_pipe = lttng_pipe_open(FD_CLOEXEC);
399 if (!quit_pipe) {
1785d7f2
JG
400 goto error;
401 }
a13091b7
JG
402 thread_state->quit_pipe = quit_pipe;
403 thread_state->ust_cmd_queue = cmd_queue;
404 application_socket = create_application_socket();
405 if (application_socket < 0) {
406 goto error;
407 }
408 thread_state->application_socket = application_socket;
409 sem_init(&thread_state->ready, 0, 0);
1785d7f2
JG
410
411 thread = lttng_thread_create("UST application registration",
28ab034a
JG
412 thread_application_registration,
413 shutdown_application_registration_thread,
414 cleanup_application_registration_thread,
415 thread_state);
1785d7f2
JG
416 if (!thread) {
417 goto error;
418 }
a13091b7
JG
419 /*
420 * The application registration thread now owns the application socket
421 * and the global thread state. The thread state is used to wait for
422 * the thread's status, but its ownership now belongs to the thread.
423 */
424 application_socket = -1;
425 if (!wait_thread_status(thread_state)) {
cd9adb8b 426 thread_state = nullptr;
a13091b7 427 goto error;
86d0f119 428 }
a13091b7
JG
429
430 /* Notify all applications to register. */
431 ret = notify_ust_apps(1, is_root);
432 if (ret < 0) {
433 ERR("Failed to notify applications or create the wait shared memory.\n"
28ab034a
JG
434 "Execution continues but there might be problems for already\n"
435 "running applications that wishes to register.");
a13091b7
JG
436 }
437
bd9addf7 438 return thread;
1785d7f2 439error:
a13091b7
JG
440 lttng_thread_put(thread);
441 cleanup_application_registration_thread(thread_state);
442 if (application_socket >= 0) {
443 if (close(application_socket)) {
444 PERROR("Failed to close application registration socket");
445 }
446 }
21fa020e 447error_alloc:
cd9adb8b 448 return nullptr;
1785d7f2 449}
This page took 0.086516 seconds and 4 git commands to generate.