From: Jérémie Galarneau Date: Wed, 5 Dec 2018 04:19:25 +0000 (-0500) Subject: Launch the consumer management thread using lttng_thread X-Git-Tag: v2.12.0-rc1~712 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=4ec029ed8d032ac98d8dc158a751841b7c150639;p=lttng-tools.git Launch the consumer management thread using lttng_thread Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index e1e5a00e8..fba2a5a1c 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -48,7 +48,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \ dispatch.c dispatch.h \ register.c register.h \ manage-apps.c manage-apps.h \ - manage-kernel.c manage-kernel.h + manage-kernel.c manage-kernel.h \ + manage-consumer.c manage-consumer.h if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/client.c b/src/bin/lttng-sessiond/client.c index 119062c15..ea8ffd45f 100644 --- a/src/bin/lttng-sessiond/client.c +++ b/src/bin/lttng-sessiond/client.c @@ -35,6 +35,7 @@ #include "health-sessiond.h" #include "testpoint.h" #include "utils.h" +#include "manage-consumer.h" static bool is_root; @@ -116,133 +117,11 @@ end: /* * Start the thread_manage_consumer. This must be done after a lttng-consumerd - * exec or it will fails. + * exec or it will fail. */ static int spawn_consumer_thread(struct consumer_data *consumer_data) { - int ret, clock_ret; - struct timespec timeout; - - /* - * Make sure we set the readiness flag to 0 because we are NOT ready. - * This access to consumer_thread_is_ready does not need to be - * protected by consumer_data.cond_mutex (yet) since the consumer - * management thread has not been started at this point. - */ - consumer_data->consumer_thread_is_ready = 0; - - /* Setup pthread condition */ - ret = pthread_condattr_init(&consumer_data->condattr); - if (ret) { - errno = ret; - PERROR("pthread_condattr_init consumer data"); - goto error; - } - - /* - * Set the monotonic clock in order to make sure we DO NOT jump in time - * between the clock_gettime() call and the timedwait call. See bug #324 - * for a more details and how we noticed it. - */ - ret = pthread_condattr_setclock(&consumer_data->condattr, CLOCK_MONOTONIC); - if (ret) { - errno = ret; - PERROR("pthread_condattr_setclock consumer data"); - goto error; - } - - ret = pthread_cond_init(&consumer_data->cond, &consumer_data->condattr); - if (ret) { - errno = ret; - PERROR("pthread_cond_init consumer data"); - goto error; - } - - ret = pthread_create(&consumer_data->thread, default_pthread_attr(), - thread_manage_consumer, consumer_data); - if (ret) { - errno = ret; - PERROR("pthread_create consumer"); - ret = -1; - goto error; - } - - /* We are about to wait on a pthread condition */ - pthread_mutex_lock(&consumer_data->cond_mutex); - - /* Get time for sem_timedwait absolute timeout */ - clock_ret = lttng_clock_gettime(CLOCK_MONOTONIC, &timeout); - /* - * Set the timeout for the condition timed wait even if the clock gettime - * call fails since we might loop on that call and we want to avoid to - * increment the timeout too many times. - */ - timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT; - - /* - * The following loop COULD be skipped in some conditions so this is why we - * set ret to 0 in order to make sure at least one round of the loop is - * done. - */ - ret = 0; - - /* - * Loop until the condition is reached or when a timeout is reached. Note - * that the pthread_cond_timedwait(P) man page specifies that EINTR can NOT - * be returned but the pthread_cond(3), from the glibc-doc, says that it is - * possible. This loop does not take any chances and works with both of - * them. - */ - while (!consumer_data->consumer_thread_is_ready && ret != ETIMEDOUT) { - if (clock_ret < 0) { - PERROR("clock_gettime spawn consumer"); - /* Infinite wait for the consumerd thread to be ready */ - ret = pthread_cond_wait(&consumer_data->cond, - &consumer_data->cond_mutex); - } else { - ret = pthread_cond_timedwait(&consumer_data->cond, - &consumer_data->cond_mutex, &timeout); - } - } - - /* Release the pthread condition */ - pthread_mutex_unlock(&consumer_data->cond_mutex); - - if (ret != 0) { - errno = ret; - if (ret == ETIMEDOUT) { - int pth_ret; - - /* - * Call has timed out so we kill the kconsumerd_thread and return - * an error. - */ - ERR("Condition timed out. The consumer thread was never ready." - " Killing it"); - pth_ret = pthread_cancel(consumer_data->thread); - if (pth_ret < 0) { - PERROR("pthread_cancel consumer thread"); - } - } else { - PERROR("pthread_cond_wait failed consumer thread"); - } - /* Caller is expecting a negative value on failure. */ - ret = -1; - goto error; - } - - pthread_mutex_lock(&consumer_data->pid_mutex); - if (consumer_data->pid == 0) { - ERR("Consumerd did not start"); - pthread_mutex_unlock(&consumer_data->pid_mutex); - goto error; - } - pthread_mutex_unlock(&consumer_data->pid_mutex); - - return 0; - -error: - return ret; + return launch_consumer_management_thread(consumer_data) ? 0 : -1; } /* @@ -757,27 +636,6 @@ error: return ret; } -/* - * Join consumer thread - */ -static int join_consumer_thread(struct consumer_data *consumer_data) -{ - void *status; - - /* Consumer pid must be a real one. */ - if (consumer_data->pid > 0) { - int ret; - ret = kill(consumer_data->pid, SIGTERM); - if (ret) { - PERROR("Error killing consumer daemon"); - return ret; - } - return pthread_join(consumer_data->thread, &status); - } else { - return 0; - } -} - /* * Version of setup_lttng_msg() without command header. */ @@ -2490,28 +2348,6 @@ error_create_poll: DBG("Client thread dying"); rcu_unregister_thread(); - - /* - * Since we are creating the consumer threads, we own them, so we need - * to join them before our thread exits. - */ - ret = join_consumer_thread(&kconsumer_data); - if (ret) { - errno = ret; - PERROR("join_consumer"); - } - - ret = join_consumer_thread(&ustconsumer32_data); - if (ret) { - errno = ret; - PERROR("join_consumer ust32"); - } - - ret = join_consumer_thread(&ustconsumer64_data); - if (ret) { - errno = ret; - PERROR("join_consumer ust64"); - } return NULL; } diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index c98ebb34f..0e7ed0017 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -66,26 +66,6 @@ struct consumer_socket { struct consumer_data { enum lttng_consumer_type type; - pthread_t thread; /* Worker thread interacting with the consumer */ - - /* Conditions used by the consumer thread to indicate readiness. */ - pthread_cond_t cond; - pthread_condattr_t condattr; - pthread_mutex_t cond_mutex; - - /* - * This is a flag condition indicating that the consumer thread is ready - * and connected to the lttng-consumerd daemon. This flag MUST only be - * updated by locking the condition mutex above or before spawning a - * consumer thread. - * - * A value of 0 means that the thread is NOT ready. A value of 1 means that - * the thread consumer did connect successfully to the lttng-consumerd - * daemon. A negative value indicates that there is been an error and the - * thread has likely quit. - */ - int consumer_thread_is_ready; - /* Mutex to control consumerd pid assignation */ pthread_mutex_t pid_mutex; pid_t pid; diff --git a/src/bin/lttng-sessiond/globals.c b/src/bin/lttng-sessiond/globals.c index 6ca5a5a71..efe80baaf 100644 --- a/src/bin/lttng-sessiond/globals.c +++ b/src/bin/lttng-sessiond/globals.c @@ -48,8 +48,6 @@ struct consumer_data kconsumer_data = { .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, - .cond = PTHREAD_COND_INITIALIZER, - .cond_mutex = PTHREAD_MUTEX_INITIALIZER, }; struct consumer_data ustconsumer64_data = { @@ -59,8 +57,6 @@ struct consumer_data ustconsumer64_data = { .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, - .cond = PTHREAD_COND_INITIALIZER, - .cond_mutex = PTHREAD_MUTEX_INITIALIZER, }; struct consumer_data ustconsumer32_data = { @@ -70,8 +66,6 @@ struct consumer_data ustconsumer32_data = { .channel_monitor_pipe = -1, .pid_mutex = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER, - .cond = PTHREAD_COND_INITIALIZER, - .cond_mutex = PTHREAD_MUTEX_INITIALIZER, }; enum consumerd_state ust_consumerd_state; diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 6d0fb42e3..1fbfb1b05 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -372,390 +372,6 @@ static void sessiond_cleanup_options(void) run_as_destroy_worker(); } -/* - * Signal pthread condition of the consumer data that the thread. - */ -static void signal_consumer_condition(struct consumer_data *data, int state) -{ - pthread_mutex_lock(&data->cond_mutex); - - /* - * The state is set before signaling. It can be any value, it's the waiter - * job to correctly interpret this condition variable associated to the - * consumer pthread_cond. - * - * A value of 0 means that the corresponding thread of the consumer data - * was not started. 1 indicates that the thread has started and is ready - * for action. A negative value means that there was an error during the - * thread bootstrap. - */ - data->consumer_thread_is_ready = state; - (void) pthread_cond_signal(&data->cond); - - pthread_mutex_unlock(&data->cond_mutex); -} - -/* - * This thread manage the consumer error sent back to the session daemon. - */ -void *thread_manage_consumer(void *data) -{ - int sock = -1, i, ret, pollfd, err = -1, should_quit = 0; - uint32_t revents, nb_fd; - enum lttcomm_return_code code; - struct lttng_poll_event events; - struct consumer_data *consumer_data = data; - struct consumer_socket *cmd_socket_wrapper = NULL; - - DBG("[thread] Manage consumer started"); - - rcu_register_thread(); - rcu_thread_online(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER); - - health_code_update(); - - /* - * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the - * metadata_sock. Nothing more will be added to this poll set. - */ - ret = sessiond_set_thread_pollset(&events, 3); - if (ret < 0) { - goto error_poll; - } - - /* - * The error socket here is already in a listening state which was done - * just before spawning this thread to avoid a race between the consumer - * daemon exec trying to connect and the listen() call. - */ - ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - health_code_update(); - - /* Infinite blocking call, waiting for transmission */ -restart: - health_poll_entry(); - - if (testpoint(sessiond_thread_manage_consumer)) { - goto error; - } - - ret = lttng_poll_wait(&events, -1); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart; - } - goto error; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } - - /* Event on the registration socket */ - if (pollfd == consumer_data->err_sock) { - if (revents & LPOLLIN) { - continue; - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("consumer err socket poll error"); - goto error; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - } - - sock = lttcomm_accept_unix_sock(consumer_data->err_sock); - if (sock < 0) { - goto error; - } - - /* - * Set the CLOEXEC flag. Return code is useless because either way, the - * show must go on. - */ - (void) utils_set_fd_cloexec(sock); - - health_code_update(); - - DBG2("Receiving code from consumer err_sock"); - - /* Getting status code from kconsumerd */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); - if (ret <= 0) { - goto error; - } - - health_code_update(); - if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { - ERR("consumer error when waiting for SOCK_READY : %s", - lttcomm_get_readable_code(-code)); - goto error; - } - - /* Connect both command and metadata sockets. */ - consumer_data->cmd_sock = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); - consumer_data->metadata_fd = - lttcomm_connect_unix_sock( - consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { - PERROR("consumer connect cmd socket"); - /* On error, signal condition and quit. */ - signal_consumer_condition(consumer_data, -1); - goto error; - } - - consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; - - /* Create metadata socket lock. */ - consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); - if (consumer_data->metadata_sock.lock == NULL) { - PERROR("zmalloc pthread mutex"); - goto error; - } - pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); - - DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); - DBG("Consumer metadata socket ready (fd: %d)", - consumer_data->metadata_fd); - - /* - * Remove the consumerd error sock since we've established a connection. - */ - ret = lttng_poll_del(&events, consumer_data->err_sock); - if (ret < 0) { - goto error; - } - - /* Add new accepted error socket. */ - ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - /* Add metadata socket that is successfully connected. */ - ret = lttng_poll_add(&events, consumer_data->metadata_fd, - LPOLLIN | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - health_code_update(); - - /* - * Transfer the write-end of the channel monitoring and rotate pipe - * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command. - */ - cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock); - if (!cmd_socket_wrapper) { - goto error; - } - cmd_socket_wrapper->lock = &consumer_data->lock; - - ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, - consumer_data->channel_monitor_pipe); - if (ret) { - goto error; - } - - /* Discard the socket wrapper as it is no longer needed. */ - consumer_destroy_socket(cmd_socket_wrapper); - cmd_socket_wrapper = NULL; - - /* The thread is completely initialized, signal that it is ready. */ - signal_consumer_condition(consumer_data, 1); - - /* Infinite blocking call, waiting for transmission */ -restart_poll: - while (1) { - health_code_update(); - - /* Exit the thread because the thread quit pipe has been triggered. */ - if (should_quit) { - /* Not a health error. */ - err = 0; - goto exit; - } - - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart_poll; - } - goto error; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* - * Thread quit pipe has been triggered, flag that we should stop - * but continue the current loop to handle potential data from - * consumer. - */ - should_quit = sessiond_check_thread_quit_pipe(pollfd, revents); - - if (pollfd == sock) { - /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { - ERR("consumer err socket second poll error"); - goto error; - } - health_code_update(); - /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); - if (ret <= 0) { - ERR("consumer closed the command socket"); - goto error; - } - - ERR("consumer return code : %s", - lttcomm_get_readable_code(-code)); - - goto exit; - } else if (pollfd == consumer_data->metadata_fd) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) - && !(revents & LPOLLIN)) { - ERR("consumer err metadata socket second poll error"); - goto error; - } - /* UST metadata requests */ - ret = ust_consumer_metadata_request( - &consumer_data->metadata_sock); - if (ret < 0) { - ERR("Handling metadata request"); - goto error; - } - } - /* No need for an else branch all FDs are tested prior. */ - } - health_code_update(); - } - -exit: -error: - /* - * We lock here because we are about to close the sockets and some other - * thread might be using them so get exclusive access which will abort all - * other consumer command by other threads. - */ - pthread_mutex_lock(&consumer_data->lock); - - /* Immediately set the consumerd state to stopped */ - if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { - uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR); - } else if (consumer_data->type == LTTNG_CONSUMER64_UST || - consumer_data->type == LTTNG_CONSUMER32_UST) { - uatomic_set(&ust_consumerd_state, CONSUMER_ERROR); - } else { - /* Code flow error... */ - assert(0); - } - - if (consumer_data->err_sock >= 0) { - ret = close(consumer_data->err_sock); - if (ret) { - PERROR("close"); - } - consumer_data->err_sock = -1; - } - if (consumer_data->cmd_sock >= 0) { - ret = close(consumer_data->cmd_sock); - if (ret) { - PERROR("close"); - } - consumer_data->cmd_sock = -1; - } - if (consumer_data->metadata_sock.fd_ptr && - *consumer_data->metadata_sock.fd_ptr >= 0) { - ret = close(*consumer_data->metadata_sock.fd_ptr); - if (ret) { - PERROR("close"); - } - } - if (sock >= 0) { - ret = close(sock); - if (ret) { - PERROR("close"); - } - } - - unlink(consumer_data->err_unix_sock_path); - unlink(consumer_data->cmd_unix_sock_path); - pthread_mutex_unlock(&consumer_data->lock); - - /* Cleanup metadata socket mutex. */ - if (consumer_data->metadata_sock.lock) { - pthread_mutex_destroy(consumer_data->metadata_sock.lock); - free(consumer_data->metadata_sock.lock); - } - lttng_poll_clean(&events); - - if (cmd_socket_wrapper) { - consumer_destroy_socket(cmd_socket_wrapper); - } -error_poll: - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - } - health_unregister(health_sessiond); - DBG("consumer thread cleanup completed"); - - rcu_thread_offline(); - rcu_unregister_thread(); - - return NULL; -} - /* * Setup necessary data for kernel tracer action. */ diff --git a/src/bin/lttng-sessiond/manage-consumer.c b/src/bin/lttng-sessiond/manage-consumer.c new file mode 100644 index 000000000..47bfe521c --- /dev/null +++ b/src/bin/lttng-sessiond/manage-consumer.c @@ -0,0 +1,480 @@ +/* + * Copyright (C) 2011 - David Goulet + * Mathieu Desnoyers + * 2013 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include + +#include +#include + +#include "manage-consumer.h" +#include "testpoint.h" +#include "health-sessiond.h" +#include "utils.h" +#include "thread.h" +#include "ust-consumer.h" + +struct thread_notifiers { + struct lttng_pipe *quit_pipe; + struct consumer_data *consumer_data; + sem_t ready; + int initialization_result; +}; + +static void mark_thread_as_ready(struct thread_notifiers *notifiers) +{ + DBG("Marking consumer management thread as ready"); + notifiers->initialization_result = 0; + sem_post(¬ifiers->ready); +} + +static void mark_thread_intialization_as_failed( + struct thread_notifiers *notifiers) +{ + ERR("Consumer management thread entering error state"); + notifiers->initialization_result = -1; + sem_post(¬ifiers->ready); +} + +static void wait_until_thread_is_ready(struct thread_notifiers *notifiers) +{ + DBG("Waiting for consumer management thread to be ready"); + sem_wait(¬ifiers->ready); + DBG("Consumer management thread is ready"); +} + +/* + * This thread manage the consumer error sent back to the session daemon. + */ +void *thread_consumer_management(void *data) +{ + int sock = -1, i, ret, pollfd, err = -1, should_quit = 0; + uint32_t revents, nb_fd; + enum lttcomm_return_code code; + struct lttng_poll_event events; + struct thread_notifiers *notifiers = data; + struct consumer_data *consumer_data = notifiers->consumer_data; + const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + struct consumer_socket *cmd_socket_wrapper = NULL; + + DBG("[thread] Manage consumer started"); + + rcu_register_thread(); + rcu_thread_online(); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER); + + health_code_update(); + + /* + * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the + * metadata_sock. Nothing more will be added to this poll set. + */ + ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error_poll; + } + + ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + /* + * The error socket here is already in a listening state which was done + * just before spawning this thread to avoid a race between the consumer + * daemon exec trying to connect and the listen() call. + */ + ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + health_code_update(); + + /* Infinite blocking call, waiting for transmission */ + health_poll_entry(); + + if (testpoint(sessiond_thread_manage_consumer)) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + ret = lttng_poll_wait(&events, -1); + health_poll_exit(); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + + /* Thread quit pipe has been closed. Killing thread. */ + if (pollfd == quit_pipe_read_fd) { + err = 0; + mark_thread_intialization_as_failed(notifiers); + goto exit; + } else if (pollfd == consumer_data->err_sock) { + /* Event on the registration socket */ + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("consumer err socket poll error"); + mark_thread_intialization_as_failed(notifiers); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + mark_thread_intialization_as_failed(notifiers); + goto error; + } + } + } + + sock = lttcomm_accept_unix_sock(consumer_data->err_sock); + if (sock < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(sock); + + health_code_update(); + + DBG2("Receiving code from consumer err_sock"); + + /* Getting status code from kconsumerd */ + ret = lttcomm_recv_unix_sock(sock, &code, + sizeof(enum lttcomm_return_code)); + if (ret <= 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + health_code_update(); + if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { + ERR("consumer error when waiting for SOCK_READY : %s", + lttcomm_get_readable_code(-code)); + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + /* Connect both command and metadata sockets. */ + consumer_data->cmd_sock = + lttcomm_connect_unix_sock( + consumer_data->cmd_unix_sock_path); + consumer_data->metadata_fd = + lttcomm_connect_unix_sock( + consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) { + PERROR("consumer connect cmd socket"); + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; + + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + mark_thread_intialization_as_failed(notifiers); + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + + DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_fd); + + /* + * Remove the consumerd error sock since we've established a connection. + */ + ret = lttng_poll_del(&events, consumer_data->err_sock); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + /* Add new accepted error socket. */ + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + /* Add metadata socket that is successfully connected. */ + ret = lttng_poll_add(&events, consumer_data->metadata_fd, + LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + health_code_update(); + + /* + * Transfer the write-end of the channel monitoring and rotate pipe + * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command. + */ + cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock); + if (!cmd_socket_wrapper) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + cmd_socket_wrapper->lock = &consumer_data->lock; + + ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper, + consumer_data->channel_monitor_pipe); + if (ret) { + mark_thread_intialization_as_failed(notifiers); + goto error; + } + + /* Discard the socket wrapper as it is no longer needed. */ + consumer_destroy_socket(cmd_socket_wrapper); + cmd_socket_wrapper = NULL; + + /* The thread is completely initialized, signal that it is ready. */ + mark_thread_as_ready(notifiers); + + /* Infinite blocking call, waiting for transmission */ + while (1) { + health_code_update(); + + /* Exit the thread because the thread quit pipe has been triggered. */ + if (should_quit) { + /* Not a health error. */ + err = 0; + goto exit; + } + + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + health_poll_exit(); + if (ret < 0) { + goto error; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + + /* + * Thread quit pipe has been triggered, flag that we should stop + * but continue the current loop to handle potential data from + * consumer. + */ + if (pollfd == quit_pipe_read_fd) { + should_quit = 1; + } else if (pollfd == sock) { + /* Event on the consumerd socket */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { + ERR("consumer err socket second poll error"); + goto error; + } + health_code_update(); + /* Wait for any kconsumerd error */ + ret = lttcomm_recv_unix_sock(sock, &code, + sizeof(enum lttcomm_return_code)); + if (ret <= 0) { + ERR("consumer closed the command socket"); + goto error; + } + + ERR("consumer return code : %s", + lttcomm_get_readable_code(-code)); + + goto exit; + } else if (pollfd == consumer_data->metadata_fd) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { + ERR("consumer err metadata socket second poll error"); + goto error; + } + /* UST metadata requests */ + ret = ust_consumer_metadata_request( + &consumer_data->metadata_sock); + if (ret < 0) { + ERR("Handling metadata request"); + goto error; + } + } + /* No need for an else branch all FDs are tested prior. */ + } + health_code_update(); + } + +exit: +error: + /* + * We lock here because we are about to close the sockets and some other + * thread might be using them so get exclusive access which will abort all + * other consumer command by other threads. + */ + pthread_mutex_lock(&consumer_data->lock); + + /* Immediately set the consumerd state to stopped */ + if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { + uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR); + } else if (consumer_data->type == LTTNG_CONSUMER64_UST || + consumer_data->type == LTTNG_CONSUMER32_UST) { + uatomic_set(&ust_consumerd_state, CONSUMER_ERROR); + } else { + /* Code flow error... */ + assert(0); + } + + if (consumer_data->err_sock >= 0) { + ret = close(consumer_data->err_sock); + if (ret) { + PERROR("close"); + } + consumer_data->err_sock = -1; + } + if (consumer_data->cmd_sock >= 0) { + ret = close(consumer_data->cmd_sock); + if (ret) { + PERROR("close"); + } + consumer_data->cmd_sock = -1; + } + if (consumer_data->metadata_sock.fd_ptr && + *consumer_data->metadata_sock.fd_ptr >= 0) { + ret = close(*consumer_data->metadata_sock.fd_ptr); + if (ret) { + PERROR("close"); + } + } + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + } + + unlink(consumer_data->err_unix_sock_path); + unlink(consumer_data->cmd_unix_sock_path); + pthread_mutex_unlock(&consumer_data->lock); + + /* Cleanup metadata socket mutex. */ + if (consumer_data->metadata_sock.lock) { + pthread_mutex_destroy(consumer_data->metadata_sock.lock); + free(consumer_data->metadata_sock.lock); + } + lttng_poll_clean(&events); + + if (cmd_socket_wrapper) { + consumer_destroy_socket(cmd_socket_wrapper); + } +error_poll: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_sessiond); + DBG("consumer thread cleanup completed"); + + rcu_thread_offline(); + rcu_unregister_thread(); + + return NULL; +} + +static bool shutdown_consumer_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe); + + return notify_thread_pipe(write_fd) == 1; +} + +static void cleanup_consumer_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + + lttng_pipe_destroy(notifiers->quit_pipe); + free(notifiers); +} + +bool launch_consumer_management_thread(struct consumer_data *consumer_data) +{ + struct lttng_pipe *quit_pipe; + struct thread_notifiers *notifiers = NULL; + struct lttng_thread *thread; + + quit_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!quit_pipe) { + goto error; + } + + notifiers = zmalloc(sizeof(*notifiers)); + if (!notifiers) { + goto error; + } + notifiers->quit_pipe = quit_pipe; + notifiers->consumer_data = consumer_data; + sem_init(¬ifiers->ready, 0, 0); + + thread = lttng_thread_create("Consumer management", + thread_consumer_management, + shutdown_consumer_management_thread, + cleanup_consumer_management_thread, + notifiers); + if (!thread) { + goto error; + } + wait_until_thread_is_ready(notifiers); + lttng_thread_put(thread); + if (notifiers->initialization_result) { + goto error; + } + return true; +error: + cleanup_consumer_management_thread(notifiers); + return false; +} diff --git a/src/bin/lttng-sessiond/manage-consumer.h b/src/bin/lttng-sessiond/manage-consumer.h new file mode 100644 index 000000000..0a0067c29 --- /dev/null +++ b/src/bin/lttng-sessiond/manage-consumer.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2011 - David Goulet + * Mathieu Desnoyers + * 2013 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef SESSIOND_CONSUMER_MANAGEMENT_THREAD_H +#define SESSIOND_CONSUMER_MANAGEMENT_THREAD_H + +#include +#include "lttng-sessiond.h" + +bool launch_consumer_management_thread(struct consumer_data *consumer_data); + +#endif /* SESSIOND_CONSUMER_MANAGEMENT_THREAD_H */