From 8a00688e1d58cc5a2e77eba206ff23bd6105130c Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Tue, 10 Nov 2020 16:33:36 -0500 Subject: [PATCH] Standardize quit pipes behavior MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Standardize the behavior of the quit pipes to trigger on any poll events. Change-Id: I0beeefcbd1a55b2aa308eb28b617487ffdeb737e Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- src/bin/lttng-consumerd/health-consumerd.cpp | 31 +-- src/bin/lttng-relayd/Makefile.am | 3 +- src/bin/lttng-relayd/health-relayd.cpp | 29 +-- src/bin/lttng-relayd/live.cpp | 76 ++----- src/bin/lttng-relayd/lttng-relayd.hpp | 11 +- src/bin/lttng-relayd/main.cpp | 117 ++--------- src/bin/lttng-relayd/thread-utils.cpp | 101 +++++++++ src/bin/lttng-sessiond/agent-thread.cpp | 17 +- src/bin/lttng-sessiond/client.cpp | 31 +-- src/bin/lttng-sessiond/health.cpp | 45 ++-- src/bin/lttng-sessiond/manage-apps.cpp | 20 +- src/bin/lttng-sessiond/manage-consumer.cpp | 23 ++- src/bin/lttng-sessiond/manage-kernel.cpp | 16 +- src/bin/lttng-sessiond/notify-apps.cpp | 21 +- src/bin/lttng-sessiond/register.cpp | 204 +++++++++---------- src/bin/lttng-sessiond/utils.cpp | 3 +- 16 files changed, 356 insertions(+), 392 deletions(-) create mode 100644 src/bin/lttng-relayd/thread-utils.cpp diff --git a/src/bin/lttng-consumerd/health-consumerd.cpp b/src/bin/lttng-consumerd/health-consumerd.cpp index 8417fc6dc..f8be972c5 100644 --- a/src/bin/lttng-consumerd/health-consumerd.cpp +++ b/src/bin/lttng-consumerd/health-consumerd.cpp @@ -43,22 +43,7 @@ /* Global health check unix path */ static char health_unix_sock_path[PATH_MAX]; -int health_quit_pipe[2]; - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static -int check_health_quit_pipe(int fd, uint32_t events) -{ - if (fd == health_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} +int health_quit_pipe[2] = {-1, -1}; /* * Send data on a unix socket using the liblttsessiondcomm API. @@ -146,8 +131,8 @@ end: */ void *thread_manage_health_consumerd(void *data __attribute__((unused))) { - int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; - uint32_t revents, nb_fd; + int sock = -1, new_sock = -1, ret, i, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct health_comm_msg msg; struct health_comm_reply reply; @@ -252,12 +237,12 @@ restart: for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_health_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on health quit pipe, exiting. */ + if (pollfd == health_quit_pipe[0]) { + DBG("Activity on health quit pipe"); err = 0; goto exit; } diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index b473d5c62..fce8c2346 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -25,7 +25,8 @@ lttng_relayd_SOURCES = main.cpp lttng-relayd.hpp utils.hpp utils.cpp cmd.hpp \ tracefile-array.cpp tracefile-array.hpp \ tcp_keep_alive.cpp tcp_keep_alive.hpp \ sessiond-trace-chunks.cpp sessiond-trace-chunks.hpp \ - backward-compatibility-group-by.cpp backward-compatibility-group-by.hpp + backward-compatibility-group-by.cpp backward-compatibility-group-by.hpp \ + thread-utils.cpp # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = $(URCU_LIBS) \ diff --git a/src/bin/lttng-relayd/health-relayd.cpp b/src/bin/lttng-relayd/health-relayd.cpp index 8e22dfe66..af8487bdb 100644 --- a/src/bin/lttng-relayd/health-relayd.cpp +++ b/src/bin/lttng-relayd/health-relayd.cpp @@ -47,21 +47,6 @@ char health_unix_sock_path[PATH_MAX]; int health_quit_pipe[2] = { -1, -1 }; -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static -int check_health_quit_pipe(int fd, uint32_t events) -{ - if (fd == health_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - /* * Send data on a unix socket using the liblttsessiondcomm API. * @@ -261,8 +246,8 @@ end: */ void *thread_manage_health_relayd(void *data __attribute__((unused))) { - int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; - uint32_t revents, nb_fd; + int sock = -1, new_sock = -1, ret, i, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct health_comm_msg msg; struct health_comm_reply reply; @@ -379,12 +364,12 @@ restart: for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_health_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == health_quit_pipe[0]) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index abbec8c20..4724a3ada 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -590,54 +590,6 @@ int relayd_live_stop(void) return 0; } -/* - * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. - */ -static -int create_named_thread_poll_set(struct lttng_poll_event *events, - int size, const char *name) -{ - int ret; - - if (events == NULL || size == 0) { - ret = -1; - goto error; - } - - ret = fd_tracker_util_poll_create(the_fd_tracker, - name, events, 1, LTTNG_CLOEXEC); - if (ret) { - PERROR("Failed to create \"%s\" poll file descriptor", name); - goto error; - } - - /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); - if (ret < 0) { - goto error; - } - - return 0; - -error: - return ret; -} - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static -int check_thread_quit_pipe(int fd, uint32_t events) -{ - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - static int create_sock(void *data, int *out_fd) { @@ -768,8 +720,8 @@ error: static void *thread_listener(void *data __attribute__((unused))) { - int i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *live_control_sock; @@ -826,15 +778,15 @@ restart: DBG("Relay new viewer connection received"); for (i = 0; i < nb_fd; i++) { - health_code_update(); - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -2729,14 +2681,14 @@ restart: */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } diff --git a/src/bin/lttng-relayd/lttng-relayd.hpp b/src/bin/lttng-relayd/lttng-relayd.hpp index a92ace77d..32911f09c 100644 --- a/src/bin/lttng-relayd/lttng-relayd.hpp +++ b/src/bin/lttng-relayd/lttng-relayd.hpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -49,11 +50,17 @@ extern const char *tracing_group_name; extern const char * const config_section_name; extern enum relay_group_output_by opt_group_output_by; -extern int thread_quit_pipe[2]; - extern struct fd_tracker *the_fd_tracker; void lttng_relay_notify_ready(void); int lttng_relay_stop_threads(void); +int relayd_init_thread_quit_pipe(void); +int relayd_notify_thread_quit_pipe(void); +void relayd_close_thread_quit_pipe(void); +bool relayd_is_thread_quit_pipe(const int fd); + +int create_named_thread_poll_set(struct lttng_poll_event *events, + int size, const char *name); + #endif /* LTTNG_RELAYD_H */ diff --git a/src/bin/lttng-relayd/main.cpp b/src/bin/lttng-relayd/main.cpp index 318af66f7..528d1451c 100644 --- a/src/bin/lttng-relayd/main.cpp +++ b/src/bin/lttng-relayd/main.cpp @@ -126,12 +126,6 @@ static int tracing_group_name_override; const char * const config_section_name = "relayd"; -/* - * Quit pipe for all threads. This permits a single cancellation point - * for all threads when receiving an event on the pipe. - */ -int thread_quit_pipe[2] = { -1, -1 }; - /* * This pipe is used to inform the worker thread that a command is queued and * ready to be processed. @@ -722,10 +716,7 @@ static void relayd_cleanup(void) (void) fd_tracker_util_pipe_close( the_fd_tracker, health_quit_pipe); } - if (thread_quit_pipe[0] != -1) { - (void) fd_tracker_util_pipe_close( - the_fd_tracker, thread_quit_pipe); - } + relayd_close_thread_quit_pipe(); if (sessiond_trace_chunk_registry) { sessiond_trace_chunk_registry_destroy( sessiond_trace_chunk_registry); @@ -748,23 +739,6 @@ static void relayd_cleanup(void) } } -/* - * Write to writable pipe used to notify a thread. - */ -static int notify_thread_pipe(int wpipe) -{ - ssize_t ret; - - ret = lttng_write(wpipe, "!", 1); - if (ret < 1) { - PERROR("write poll pipe"); - goto end; - } - ret = 0; -end: - return ret; -} - static int notify_health_quit_pipe(int *pipe) { ssize_t ret; @@ -788,7 +762,7 @@ int lttng_relay_stop_threads(void) /* Stopping all threads */ DBG("Terminating all threads"); - if (notify_thread_pipe(thread_quit_pipe[1])) { + if (relayd_notify_thread_quit_pipe()) { ERR("write error on thread quit pipe"); retval = -1; } @@ -892,17 +866,6 @@ void lttng_relay_notify_ready(void) } } -/* - * Init thread quit pipe. - * - * Return -1 on error or 0 if all pipes are created. - */ -static int init_thread_quit_pipe(void) -{ - return fd_tracker_util_pipe_open_cloexec( - the_fd_tracker, "Quit pipe", thread_quit_pipe); -} - /* * Init health quit pipe. * @@ -914,52 +877,6 @@ static int init_health_quit_pipe(void) "Health quit pipe", health_quit_pipe); } -/* - * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. - */ -static int create_named_thread_poll_set(struct lttng_poll_event *events, - int size, const char *name) -{ - int ret; - - if (events == NULL || size == 0) { - ret = -1; - goto error; - } - - ret = fd_tracker_util_poll_create(the_fd_tracker, - name, events, 1, LTTNG_CLOEXEC); - if (ret) { - PERROR("Failed to create \"%s\" poll file descriptor", name); - goto error; - } - - /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); - if (ret < 0) { - goto error; - } - - return 0; - -error: - return ret; -} - -/* - * Check if the thread quit pipe was triggered. - * - * Return 1 if it was triggered else 0; - */ -static int check_thread_quit_pipe(int fd, uint32_t events) -{ - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { - return 1; - } - - return 0; -} - static int create_sock(void *data, int *out_fd) { int ret; @@ -1089,8 +1006,8 @@ end: */ static void *relay_thread_listener(void *data __attribute__((unused))) { - int i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *control_sock, *data_sock; @@ -1161,15 +1078,15 @@ restart: DBG("Relay new connection received"); for (i = 0; i < nb_fd; i++) { - health_code_update(); - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + health_code_update(); + + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -3978,14 +3895,14 @@ restart: */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { + /* Activity on thread quit pipe, exiting. */ + if (relayd_is_thread_quit_pipe(pollfd)) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } @@ -4391,7 +4308,7 @@ int main(int argc, char **argv) } /* Create thread quit pipe */ - if (init_thread_quit_pipe()) { + if (relayd_init_thread_quit_pipe()) { retval = -1; goto exit_options; } diff --git a/src/bin/lttng-relayd/thread-utils.cpp b/src/bin/lttng-relayd/thread-utils.cpp new file mode 100644 index 000000000..d469f90e5 --- /dev/null +++ b/src/bin/lttng-relayd/thread-utils.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2022 EfficiOS Inc. + * + * SPDX-License-Identifier: GPL-2.0-only + * + */ + +#include "lttng-relayd.hpp" + +#include +#include +#include +#include +#include + +/* + * Quit pipe for all threads. This permits a single cancellation point + * for all threads when receiving an event on the pipe. + */ +static int thread_quit_pipe[2] = { -1, -1 }; + +/* + * Write to writable pipe used to notify a thread. + */ +static int notify_thread_pipe(int wpipe) +{ + const auto ret = lttng_write(wpipe, "!", 1); + + if (ret < 1) { + PERROR("Failed to write to thread pipe"); + return -1; + } + + return 0; +} + +/* + * Initialize the thread quit pipe. + * + * Return -1 on error or 0 if all pipes are created. + */ +int relayd_init_thread_quit_pipe(void) +{ + return fd_tracker_util_pipe_open_cloexec( + the_fd_tracker, "Thread quit pipe", thread_quit_pipe); +} + +/* + * Notify the threads to initiate shutdown. + * + * Return 0 on success or -1 on error. + */ +int relayd_notify_thread_quit_pipe(void) +{ + return notify_thread_pipe(thread_quit_pipe[1]); +} + +/* + * Close the thread quit pipe. + */ +void relayd_close_thread_quit_pipe(void) +{ + if (thread_quit_pipe[0] != -1) { + (void) fd_tracker_util_pipe_close( + the_fd_tracker, thread_quit_pipe); + } +} + +/* + * Return 1 if 'fd' is the thread quit pipe read fd. + */ +bool relayd_is_thread_quit_pipe(const int fd) +{ + return (fd == thread_quit_pipe[0]); +} + +/* + * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. + */ +int create_named_thread_poll_set(struct lttng_poll_event *events, + int size, const char *name) +{ + if (events == NULL || size == 0) { + return -1; + } + + const auto create_ret = fd_tracker_util_poll_create(the_fd_tracker, + name, events, 1, LTTNG_CLOEXEC); + if (create_ret) { + PERROR("Failed to create \"%s\" poll file descriptor", name); + return -1; + } + + /* Add thread quit pipe to monitored events. */ + const auto poll_add_ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); + if (poll_add_ret < 0) { + return -1; + } + + return 0; +} diff --git a/src/bin/lttng-sessiond/agent-thread.cpp b/src/bin/lttng-sessiond/agent-thread.cpp index c8fab6164..dc8cb2961 100644 --- a/src/bin/lttng-sessiond/agent-thread.cpp +++ b/src/bin/lttng-sessiond/agent-thread.cpp @@ -355,12 +355,12 @@ void wait_until_thread_is_ready(struct thread_notifiers *notifiers) */ static void *thread_agent_management(void *data) { - int i, ret, pollfd; - uint32_t revents, nb_fd; + int i, ret; + uint32_t nb_fd; struct lttng_poll_event events; struct lttcomm_sock *reg_sock; struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd( + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd( notifiers->quit_pipe); DBG("Manage agent application registration."); @@ -377,7 +377,7 @@ static void *thread_agent_management(void *data) goto error_poll_create; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR); if (ret < 0) { goto error_tcp_socket; @@ -439,11 +439,12 @@ restart: for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); goto exit; } diff --git a/src/bin/lttng-sessiond/client.cpp b/src/bin/lttng-sessiond/client.cpp index 3a83f06b9..4c4cdf540 100644 --- a/src/bin/lttng-sessiond/client.cpp +++ b/src/bin/lttng-sessiond/client.cpp @@ -2457,9 +2457,9 @@ static void thread_init_cleanup(void *data __attribute__((unused))) */ static void *thread_manage_clients(void *data) { - int sock = -1, ret, i, pollfd, err = -1; + int sock = -1, ret, i, err = -1; int sock_error; - uint32_t revents, nb_fd; + uint32_t nb_fd; struct lttng_poll_event events; const int client_sock = thread_state.client_sock; struct lttng_pipe *quit_pipe = (lttng_pipe *) data; @@ -2551,25 +2551,28 @@ static void *thread_manage_clients(void *data) nb_fd = ret; for (i = 0; i < nb_fd; i++) { - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + /* Fetch once the poll data. */ + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); + /* Activity on thread quit pipe, exiting. */ if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; + } + + /* Event on the registration socket */ + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Client socket poll error"); + goto error; } else { - /* Event on the registration socket */ - if (revents & LPOLLIN) { - continue; - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Client socket poll error"); - goto error; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } diff --git a/src/bin/lttng-sessiond/health.cpp b/src/bin/lttng-sessiond/health.cpp index 0735907a2..1ba991447 100644 --- a/src/bin/lttng-sessiond/health.cpp +++ b/src/bin/lttng-sessiond/health.cpp @@ -54,14 +54,14 @@ static void cleanup_health_management_thread(void *data) static void *thread_manage_health(void *data) { const bool is_root = (getuid() == 0); - int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; - uint32_t revents, nb_fd; + int sock = -1, new_sock = -1, ret, i, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; struct health_comm_msg msg; struct health_comm_reply reply; /* Thread-specific quit pipe. */ struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd( + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd( notifiers->quit_pipe); DBG("[thread] Manage health check started"); @@ -70,7 +70,7 @@ static void *thread_manage_health(void *data) /* * Created with a size of two for: - * - client socket + * - health client socket * - thread quit pipe */ ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); @@ -122,12 +122,12 @@ static void *thread_manage_health(void *data) goto error; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } - /* Add the application registration socket */ + /* Add the health client socket. */ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI); if (ret < 0) { goto error; @@ -154,25 +154,26 @@ restart: for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - /* Event on the registration socket */ - if (pollfd == sock) { - if (revents & LPOLLIN) { - continue; - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Health socket poll error"); - goto error; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } else { - /* Event on the thread's quit pipe. */ + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } + + /* Event on the health client socket. */ + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Health socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; + } } new_sock = lttcomm_accept_unix_sock(sock); diff --git a/src/bin/lttng-sessiond/manage-apps.cpp b/src/bin/lttng-sessiond/manage-apps.cpp index f8297a6df..7698ea442 100644 --- a/src/bin/lttng-sessiond/manage-apps.cpp +++ b/src/bin/lttng-sessiond/manage-apps.cpp @@ -43,12 +43,12 @@ static void cleanup_application_management_thread(void *data) */ static void *thread_application_management(void *data) { - int i, ret, pollfd, err = -1; + int i, ret, err = -1; ssize_t size_ret; - uint32_t revents, nb_fd; + uint32_t nb_fd; struct lttng_poll_event events; struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd( + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd( notifiers->quit_pipe); DBG("[thread] Manage application started"); @@ -75,7 +75,7 @@ static void *thread_application_management(void *data) goto error; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -110,15 +110,19 @@ static void *thread_application_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; - } else if (pollfd == notifiers->apps_cmd_pipe_read_fd) { + } + + if (pollfd == notifiers->apps_cmd_pipe_read_fd) { /* Inspect the apps cmd pipe */ if (revents & LPOLLIN) { int sock; diff --git a/src/bin/lttng-sessiond/manage-consumer.cpp b/src/bin/lttng-sessiond/manage-consumer.cpp index a3bb816f9..d9d1e670e 100644 --- a/src/bin/lttng-sessiond/manage-consumer.cpp +++ b/src/bin/lttng-sessiond/manage-consumer.cpp @@ -55,13 +55,13 @@ static void wait_until_thread_is_ready(struct thread_notifiers *notifiers) */ static void *thread_consumer_management(void *data) { - int sock = -1, i, ret, pollfd, err = -1, should_quit = 0; - uint32_t revents, nb_fd; + int sock = -1, i, ret, err = -1, should_quit = 0; + uint32_t nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; struct thread_notifiers *notifiers = (thread_notifiers *) data; struct consumer_data *consumer_data = notifiers->consumer_data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); struct consumer_socket *cmd_socket_wrapper = NULL; DBG("[thread] Manage consumer started"); @@ -83,7 +83,7 @@ static void *thread_consumer_management(void *data) goto error_poll; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR); if (ret < 0) { mark_thread_intialization_as_failed(notifiers); goto error; @@ -121,13 +121,14 @@ static void *thread_consumer_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; mark_thread_intialization_as_failed(notifiers); goto exit; @@ -290,8 +291,8 @@ static void *thread_consumer_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); @@ -300,7 +301,7 @@ static void *thread_consumer_management(void *data) * but continue the current loop to handle potential data from * consumer. */ - if (pollfd == quit_pipe_read_fd) { + if (pollfd == thread_quit_pipe_fd) { should_quit = 1; } else if (pollfd == sock) { /* Event on the consumerd socket */ diff --git a/src/bin/lttng-sessiond/manage-kernel.cpp b/src/bin/lttng-sessiond/manage-kernel.cpp index 8d2795466..092085250 100644 --- a/src/bin/lttng-sessiond/manage-kernel.cpp +++ b/src/bin/lttng-sessiond/manage-kernel.cpp @@ -165,12 +165,12 @@ error: */ static void *thread_kernel_management(void *data) { - int ret, i, pollfd, update_poll_flag = 1, err = -1; - uint32_t revents, nb_fd; + int ret, i, update_poll_flag = 1, err = -1; + uint32_t nb_fd; char tmp; struct lttng_poll_event events; struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); DBG("[thread] Thread manage kernel started"); @@ -212,7 +212,7 @@ static void *thread_kernel_management(void *data) } ret = lttng_poll_add(&events, - quit_pipe_read_fd, + thread_quit_pipe_fd, LPOLLIN); if (ret < 0) { goto error; @@ -254,12 +254,14 @@ static void *thread_kernel_management(void *data) for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); health_code_update(); - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; } diff --git a/src/bin/lttng-sessiond/notify-apps.cpp b/src/bin/lttng-sessiond/notify-apps.cpp index 1fff92d07..1b83491d1 100644 --- a/src/bin/lttng-sessiond/notify-apps.cpp +++ b/src/bin/lttng-sessiond/notify-apps.cpp @@ -30,12 +30,12 @@ struct thread_notifiers { */ static void *thread_application_notification(void *data) { - int i, ret, pollfd, err = -1; + int i, ret, err = -1; ssize_t size_ret; - uint32_t revents, nb_fd; + uint32_t nb_fd; struct lttng_poll_event events; struct thread_notifiers *notifiers = (thread_notifiers *) data; - const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); DBG("[ust-thread] Manage application notify command"); @@ -63,7 +63,7 @@ static void *thread_application_notification(void *data) goto error; } - ret = lttng_poll_add(&events, quit_pipe_read_fd, + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLERR); if (ret < 0) { goto error; @@ -97,14 +97,17 @@ restart: health_code_update(); /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, exiting. */ + if (pollfd == thread_quit_pipe_fd) { + DBG("Activity on thread quit pipe"); err = 0; goto exit; - } else if (pollfd == notifiers->apps_cmd_notify_pipe_read_fd) { + } + + if (pollfd == notifiers->apps_cmd_notify_pipe_read_fd) { /* Inspect the apps cmd pipe */ int sock; diff --git a/src/bin/lttng-sessiond/register.cpp b/src/bin/lttng-sessiond/register.cpp index 34efeac40..4478221ff 100644 --- a/src/bin/lttng-sessiond/register.cpp +++ b/src/bin/lttng-sessiond/register.cpp @@ -153,8 +153,8 @@ static void thread_init_cleanup(void *data) */ static void *thread_application_registration(void *data) { - int sock = -1, i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; + int sock = -1, i, ret, err = -1; + uint32_t nb_fd; struct lttng_poll_event events; /* * Gets allocated in this thread, enqueued to a global queue, dequeued @@ -164,7 +164,7 @@ static void *thread_application_registration(void *data) const bool is_root = (getuid() == 0); struct thread_state *thread_state = (struct thread_state *) data; const int application_socket = thread_state->application_socket; - const int quit_pipe_read_fd = lttng_pipe_get_readfd( + const auto thread_quit_pipe_fd = lttng_pipe_get_readfd( thread_state->quit_pipe); DBG("[thread] Manage application registration started"); @@ -193,7 +193,7 @@ static void *thread_application_registration(void *data) } /* Add the application registration socket */ - ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error_poll_add; } @@ -229,116 +229,116 @@ static void *thread_application_registration(void *data) health_code_update(); /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + const auto revents = LTTNG_POLL_GETEV(&events, i); + const auto pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - if (pollfd == quit_pipe_read_fd) { + /* Activity on thread quit pipe, closing. */ + if (pollfd == thread_quit_pipe_fd) { err = 0; goto exit; - } else { - /* Event on the registration socket */ - if (revents & LPOLLIN) { - sock = lttcomm_accept_unix_sock(application_socket); - if (sock < 0) { - goto error; - } + } - /* - * Set socket timeout for both receiving and ending. - * app_socket_timeout is in seconds, whereas - * lttcomm_setsockopt_rcv_timeout and - * lttcomm_setsockopt_snd_timeout expect msec as - * parameter. - */ - if (the_config.app_socket_timeout >= 0) { - (void) lttcomm_setsockopt_rcv_timeout(sock, - the_config.app_socket_timeout * 1000); - (void) lttcomm_setsockopt_snd_timeout(sock, - the_config.app_socket_timeout * 1000); - } + /* Event on the registration socket. */ + if (revents & LPOLLIN) { + sock = lttcomm_accept_unix_sock(application_socket); + if (sock < 0) { + goto error; + } - /* - * Set the CLOEXEC flag. Return code is useless because - * either way, the show must go on. - */ - (void) utils_set_fd_cloexec(sock); - - /* Create UST registration command for enqueuing */ - ust_cmd = zmalloc(); - if (ust_cmd == NULL) { - PERROR("ust command zmalloc"); - ret = close(sock); - if (ret) { - PERROR("close"); - } - sock = -1; - goto error; - } + /* + * Set socket timeout for both receiving and ending. + * app_socket_timeout is in seconds, whereas + * lttcomm_setsockopt_rcv_timeout and + * lttcomm_setsockopt_snd_timeout expect msec as + * parameter. + */ + if (the_config.app_socket_timeout >= 0) { + (void) lttcomm_setsockopt_rcv_timeout(sock, + the_config.app_socket_timeout * 1000); + (void) lttcomm_setsockopt_snd_timeout(sock, + the_config.app_socket_timeout * 1000); + } - /* - * Using message-based transmissions to ensure we don't - * have to deal with partially received messages. - */ - ret = lttng_fd_get(LTTNG_FD_APPS, 1); - if (ret < 0) { - ERR("Exhausted file descriptors allowed for applications."); - free(ust_cmd); - ret = close(sock); - if (ret) { - PERROR("close"); - } - sock = -1; - continue; + /* + * Set the CLOEXEC flag. Return code is useless because + * either way, the show must go on. + */ + (void) utils_set_fd_cloexec(sock); + + /* Create UST registration command for enqueuing */ + ust_cmd = zmalloc(); + if (ust_cmd == NULL) { + PERROR("ust command zmalloc"); + ret = close(sock); + if (ret) { + PERROR("close"); } + sock = -1; + goto error; + } - health_code_update(); - ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); - if (ret < 0) { - free(ust_cmd); - /* Close socket of the application. */ - ret = close(sock); - if (ret) { - PERROR("close"); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - sock = -1; - continue; + /* + * Using message-based transmissions to ensure we don't + * have to deal with partially received messages. + */ + ret = lttng_fd_get(LTTNG_FD_APPS, 1); + if (ret < 0) { + ERR("Exhausted file descriptors allowed for applications."); + free(ust_cmd); + ret = close(sock); + if (ret) { + PERROR("close"); } - health_code_update(); - - ust_cmd->sock = sock; sock = -1; + continue; + } - DBG("UST registration received with pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - - /* - * Lock free enqueue the registration request. The red pill - * has been taken! This apps will be part of the *system*. - */ - cds_wfcq_head_ptr_t head; - head.h = &thread_state->ust_cmd_queue->head; - cds_wfcq_enqueue(head, - &thread_state->ust_cmd_queue->tail, - &ust_cmd->node); - - /* - * Wake the registration queue futex. Implicit memory - * barrier with the exchange in cds_wfcq_enqueue. - */ - futex_nto1_wake(&thread_state->ust_cmd_queue->futex); - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Register apps socket poll error"); - goto error; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; + health_code_update(); + ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); + if (ret < 0) { + free(ust_cmd); + /* Close socket of the application. */ + ret = close(sock); + if (ret) { + PERROR("close"); + } + lttng_fd_put(LTTNG_FD_APPS, 1); + sock = -1; + continue; } + health_code_update(); + + ust_cmd->sock = sock; + sock = -1; + + DBG("UST registration received with pid:%d ppid:%d uid:%d" + " gid:%d sock:%d name:%s (version %d.%d)", + ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, + ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, + ust_cmd->sock, ust_cmd->reg_msg.name, + ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); + + /* + * Lock free enqueue the registration request. The red pill + * has been taken! This apps will be part of the *system*. + */ + cds_wfcq_head_ptr_t head; + head.h = &thread_state->ust_cmd_queue->head; + cds_wfcq_enqueue(head, + &thread_state->ust_cmd_queue->tail, + &ust_cmd->node); + + /* + * Wake the registration queue futex. Implicit memory + * barrier with the exchange in cds_wfcq_enqueue. + */ + futex_nto1_wake(&thread_state->ust_cmd_queue->futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Register apps socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } diff --git a/src/bin/lttng-sessiond/utils.cpp b/src/bin/lttng-sessiond/utils.cpp index f3446bc26..8fec34040 100644 --- a/src/bin/lttng-sessiond/utils.cpp +++ b/src/bin/lttng-sessiond/utils.cpp @@ -30,7 +30,8 @@ int notify_thread_pipe(int wpipe) ret = lttng_write(wpipe, "!", 1); if (ret < 1) { - PERROR("write poll pipe"); + ret = -1; + PERROR("Failed to write to thread pipe"); } return (int) ret; -- 2.34.1