dispatch.c dispatch.h \
register.c register.h \
manage-apps.c manage-apps.h \
- manage-kernel.c manage-kernel.h
+ manage-kernel.c manage-kernel.h \
+ manage-consumer.c manage-consumer.h
if HAVE_LIBLTTNG_UST_CTL
lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
#include "health-sessiond.h"
#include "testpoint.h"
#include "utils.h"
+#include "manage-consumer.h"
static bool is_root;
/*
* Start the thread_manage_consumer. This must be done after a lttng-consumerd
- * exec or it will fails.
+ * exec or it will fail.
*/
static int spawn_consumer_thread(struct consumer_data *consumer_data)
{
- int ret, clock_ret;
- struct timespec timeout;
-
- /*
- * Make sure we set the readiness flag to 0 because we are NOT ready.
- * This access to consumer_thread_is_ready does not need to be
- * protected by consumer_data.cond_mutex (yet) since the consumer
- * management thread has not been started at this point.
- */
- consumer_data->consumer_thread_is_ready = 0;
-
- /* Setup pthread condition */
- ret = pthread_condattr_init(&consumer_data->condattr);
- if (ret) {
- errno = ret;
- PERROR("pthread_condattr_init consumer data");
- goto error;
- }
-
- /*
- * Set the monotonic clock in order to make sure we DO NOT jump in time
- * between the clock_gettime() call and the timedwait call. See bug #324
- * for a more details and how we noticed it.
- */
- ret = pthread_condattr_setclock(&consumer_data->condattr, CLOCK_MONOTONIC);
- if (ret) {
- errno = ret;
- PERROR("pthread_condattr_setclock consumer data");
- goto error;
- }
-
- ret = pthread_cond_init(&consumer_data->cond, &consumer_data->condattr);
- if (ret) {
- errno = ret;
- PERROR("pthread_cond_init consumer data");
- goto error;
- }
-
- ret = pthread_create(&consumer_data->thread, default_pthread_attr(),
- thread_manage_consumer, consumer_data);
- if (ret) {
- errno = ret;
- PERROR("pthread_create consumer");
- ret = -1;
- goto error;
- }
-
- /* We are about to wait on a pthread condition */
- pthread_mutex_lock(&consumer_data->cond_mutex);
-
- /* Get time for sem_timedwait absolute timeout */
- clock_ret = lttng_clock_gettime(CLOCK_MONOTONIC, &timeout);
- /*
- * Set the timeout for the condition timed wait even if the clock gettime
- * call fails since we might loop on that call and we want to avoid to
- * increment the timeout too many times.
- */
- timeout.tv_sec += DEFAULT_SEM_WAIT_TIMEOUT;
-
- /*
- * The following loop COULD be skipped in some conditions so this is why we
- * set ret to 0 in order to make sure at least one round of the loop is
- * done.
- */
- ret = 0;
-
- /*
- * Loop until the condition is reached or when a timeout is reached. Note
- * that the pthread_cond_timedwait(P) man page specifies that EINTR can NOT
- * be returned but the pthread_cond(3), from the glibc-doc, says that it is
- * possible. This loop does not take any chances and works with both of
- * them.
- */
- while (!consumer_data->consumer_thread_is_ready && ret != ETIMEDOUT) {
- if (clock_ret < 0) {
- PERROR("clock_gettime spawn consumer");
- /* Infinite wait for the consumerd thread to be ready */
- ret = pthread_cond_wait(&consumer_data->cond,
- &consumer_data->cond_mutex);
- } else {
- ret = pthread_cond_timedwait(&consumer_data->cond,
- &consumer_data->cond_mutex, &timeout);
- }
- }
-
- /* Release the pthread condition */
- pthread_mutex_unlock(&consumer_data->cond_mutex);
-
- if (ret != 0) {
- errno = ret;
- if (ret == ETIMEDOUT) {
- int pth_ret;
-
- /*
- * Call has timed out so we kill the kconsumerd_thread and return
- * an error.
- */
- ERR("Condition timed out. The consumer thread was never ready."
- " Killing it");
- pth_ret = pthread_cancel(consumer_data->thread);
- if (pth_ret < 0) {
- PERROR("pthread_cancel consumer thread");
- }
- } else {
- PERROR("pthread_cond_wait failed consumer thread");
- }
- /* Caller is expecting a negative value on failure. */
- ret = -1;
- goto error;
- }
-
- pthread_mutex_lock(&consumer_data->pid_mutex);
- if (consumer_data->pid == 0) {
- ERR("Consumerd did not start");
- pthread_mutex_unlock(&consumer_data->pid_mutex);
- goto error;
- }
- pthread_mutex_unlock(&consumer_data->pid_mutex);
-
- return 0;
-
-error:
- return ret;
+ return launch_consumer_management_thread(consumer_data) ? 0 : -1;
}
/*
return ret;
}
-/*
- * Join consumer thread
- */
-static int join_consumer_thread(struct consumer_data *consumer_data)
-{
- void *status;
-
- /* Consumer pid must be a real one. */
- if (consumer_data->pid > 0) {
- int ret;
- ret = kill(consumer_data->pid, SIGTERM);
- if (ret) {
- PERROR("Error killing consumer daemon");
- return ret;
- }
- return pthread_join(consumer_data->thread, &status);
- } else {
- return 0;
- }
-}
-
/*
* Version of setup_lttng_msg() without command header.
*/
DBG("Client thread dying");
rcu_unregister_thread();
-
- /*
- * Since we are creating the consumer threads, we own them, so we need
- * to join them before our thread exits.
- */
- ret = join_consumer_thread(&kconsumer_data);
- if (ret) {
- errno = ret;
- PERROR("join_consumer");
- }
-
- ret = join_consumer_thread(&ustconsumer32_data);
- if (ret) {
- errno = ret;
- PERROR("join_consumer ust32");
- }
-
- ret = join_consumer_thread(&ustconsumer64_data);
- if (ret) {
- errno = ret;
- PERROR("join_consumer ust64");
- }
return NULL;
}
struct consumer_data {
enum lttng_consumer_type type;
- pthread_t thread; /* Worker thread interacting with the consumer */
-
- /* Conditions used by the consumer thread to indicate readiness. */
- pthread_cond_t cond;
- pthread_condattr_t condattr;
- pthread_mutex_t cond_mutex;
-
- /*
- * This is a flag condition indicating that the consumer thread is ready
- * and connected to the lttng-consumerd daemon. This flag MUST only be
- * updated by locking the condition mutex above or before spawning a
- * consumer thread.
- *
- * A value of 0 means that the thread is NOT ready. A value of 1 means that
- * the thread consumer did connect successfully to the lttng-consumerd
- * daemon. A negative value indicates that there is been an error and the
- * thread has likely quit.
- */
- int consumer_thread_is_ready;
-
/* Mutex to control consumerd pid assignation */
pthread_mutex_t pid_mutex;
pid_t pid;
.channel_monitor_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
- .cond = PTHREAD_COND_INITIALIZER,
- .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
};
struct consumer_data ustconsumer64_data = {
.channel_monitor_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
- .cond = PTHREAD_COND_INITIALIZER,
- .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
};
struct consumer_data ustconsumer32_data = {
.channel_monitor_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
- .cond = PTHREAD_COND_INITIALIZER,
- .cond_mutex = PTHREAD_MUTEX_INITIALIZER,
};
enum consumerd_state ust_consumerd_state;
run_as_destroy_worker();
}
-/*
- * Signal pthread condition of the consumer data that the thread.
- */
-static void signal_consumer_condition(struct consumer_data *data, int state)
-{
- pthread_mutex_lock(&data->cond_mutex);
-
- /*
- * The state is set before signaling. It can be any value, it's the waiter
- * job to correctly interpret this condition variable associated to the
- * consumer pthread_cond.
- *
- * A value of 0 means that the corresponding thread of the consumer data
- * was not started. 1 indicates that the thread has started and is ready
- * for action. A negative value means that there was an error during the
- * thread bootstrap.
- */
- data->consumer_thread_is_ready = state;
- (void) pthread_cond_signal(&data->cond);
-
- pthread_mutex_unlock(&data->cond_mutex);
-}
-
-/*
- * This thread manage the consumer error sent back to the session daemon.
- */
-void *thread_manage_consumer(void *data)
-{
- int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
- uint32_t revents, nb_fd;
- enum lttcomm_return_code code;
- struct lttng_poll_event events;
- struct consumer_data *consumer_data = data;
- struct consumer_socket *cmd_socket_wrapper = NULL;
-
- DBG("[thread] Manage consumer started");
-
- rcu_register_thread();
- rcu_thread_online();
-
- health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
-
- health_code_update();
-
- /*
- * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
- * metadata_sock. Nothing more will be added to this poll set.
- */
- ret = sessiond_set_thread_pollset(&events, 3);
- if (ret < 0) {
- goto error_poll;
- }
-
- /*
- * The error socket here is already in a listening state which was done
- * just before spawning this thread to avoid a race between the consumer
- * daemon exec trying to connect and the listen() call.
- */
- ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- health_code_update();
-
- /* Infinite blocking call, waiting for transmission */
-restart:
- health_poll_entry();
-
- if (testpoint(sessiond_thread_manage_consumer)) {
- goto error;
- }
-
- ret = lttng_poll_wait(&events, -1);
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Thread quit pipe has been closed. Killing thread. */
- ret = sessiond_check_thread_quit_pipe(pollfd, revents);
- if (ret) {
- err = 0;
- goto exit;
- }
-
- /* Event on the registration socket */
- if (pollfd == consumer_data->err_sock) {
- if (revents & LPOLLIN) {
- continue;
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("consumer err socket poll error");
- goto error;
- } else {
- ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- goto error;
- }
- }
- }
-
- sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
- if (sock < 0) {
- goto error;
- }
-
- /*
- * Set the CLOEXEC flag. Return code is useless because either way, the
- * show must go on.
- */
- (void) utils_set_fd_cloexec(sock);
-
- health_code_update();
-
- DBG2("Receiving code from consumer err_sock");
-
- /* Getting status code from kconsumerd */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
- if (ret <= 0) {
- goto error;
- }
-
- health_code_update();
- if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
- ERR("consumer error when waiting for SOCK_READY : %s",
- lttcomm_get_readable_code(-code));
- goto error;
- }
-
- /* Connect both command and metadata sockets. */
- consumer_data->cmd_sock =
- lttcomm_connect_unix_sock(
- consumer_data->cmd_unix_sock_path);
- consumer_data->metadata_fd =
- lttcomm_connect_unix_sock(
- consumer_data->cmd_unix_sock_path);
- if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
- PERROR("consumer connect cmd socket");
- /* On error, signal condition and quit. */
- signal_consumer_condition(consumer_data, -1);
- goto error;
- }
-
- consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
-
- /* Create metadata socket lock. */
- consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
- if (consumer_data->metadata_sock.lock == NULL) {
- PERROR("zmalloc pthread mutex");
- goto error;
- }
- pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
- DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
- DBG("Consumer metadata socket ready (fd: %d)",
- consumer_data->metadata_fd);
-
- /*
- * Remove the consumerd error sock since we've established a connection.
- */
- ret = lttng_poll_del(&events, consumer_data->err_sock);
- if (ret < 0) {
- goto error;
- }
-
- /* Add new accepted error socket. */
- ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- /* Add metadata socket that is successfully connected. */
- ret = lttng_poll_add(&events, consumer_data->metadata_fd,
- LPOLLIN | LPOLLRDHUP);
- if (ret < 0) {
- goto error;
- }
-
- health_code_update();
-
- /*
- * Transfer the write-end of the channel monitoring and rotate pipe
- * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
- */
- cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
- if (!cmd_socket_wrapper) {
- goto error;
- }
- cmd_socket_wrapper->lock = &consumer_data->lock;
-
- ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
- consumer_data->channel_monitor_pipe);
- if (ret) {
- goto error;
- }
-
- /* Discard the socket wrapper as it is no longer needed. */
- consumer_destroy_socket(cmd_socket_wrapper);
- cmd_socket_wrapper = NULL;
-
- /* The thread is completely initialized, signal that it is ready. */
- signal_consumer_condition(consumer_data, 1);
-
- /* Infinite blocking call, waiting for transmission */
-restart_poll:
- while (1) {
- health_code_update();
-
- /* Exit the thread because the thread quit pipe has been triggered. */
- if (should_quit) {
- /* Not a health error. */
- err = 0;
- goto exit;
- }
-
- health_poll_entry();
- ret = lttng_poll_wait(&events, -1);
- health_poll_exit();
- if (ret < 0) {
- /*
- * Restart interrupted system call.
- */
- if (errno == EINTR) {
- goto restart_poll;
- }
- goto error;
- }
-
- nb_fd = ret;
-
- for (i = 0; i < nb_fd; i++) {
- /* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /*
- * Thread quit pipe has been triggered, flag that we should stop
- * but continue the current loop to handle potential data from
- * consumer.
- */
- should_quit = sessiond_check_thread_quit_pipe(pollfd, revents);
-
- if (pollfd == sock) {
- /* Event on the consumerd socket */
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
- ERR("consumer err socket second poll error");
- goto error;
- }
- health_code_update();
- /* Wait for any kconsumerd error */
- ret = lttcomm_recv_unix_sock(sock, &code,
- sizeof(enum lttcomm_return_code));
- if (ret <= 0) {
- ERR("consumer closed the command socket");
- goto error;
- }
-
- ERR("consumer return code : %s",
- lttcomm_get_readable_code(-code));
-
- goto exit;
- } else if (pollfd == consumer_data->metadata_fd) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
- && !(revents & LPOLLIN)) {
- ERR("consumer err metadata socket second poll error");
- goto error;
- }
- /* UST metadata requests */
- ret = ust_consumer_metadata_request(
- &consumer_data->metadata_sock);
- if (ret < 0) {
- ERR("Handling metadata request");
- goto error;
- }
- }
- /* No need for an else branch all FDs are tested prior. */
- }
- health_code_update();
- }
-
-exit:
-error:
- /*
- * We lock here because we are about to close the sockets and some other
- * thread might be using them so get exclusive access which will abort all
- * other consumer command by other threads.
- */
- pthread_mutex_lock(&consumer_data->lock);
-
- /* Immediately set the consumerd state to stopped */
- if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
- uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
- } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
- consumer_data->type == LTTNG_CONSUMER32_UST) {
- uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
- } else {
- /* Code flow error... */
- assert(0);
- }
-
- if (consumer_data->err_sock >= 0) {
- ret = close(consumer_data->err_sock);
- if (ret) {
- PERROR("close");
- }
- consumer_data->err_sock = -1;
- }
- if (consumer_data->cmd_sock >= 0) {
- ret = close(consumer_data->cmd_sock);
- if (ret) {
- PERROR("close");
- }
- consumer_data->cmd_sock = -1;
- }
- if (consumer_data->metadata_sock.fd_ptr &&
- *consumer_data->metadata_sock.fd_ptr >= 0) {
- ret = close(*consumer_data->metadata_sock.fd_ptr);
- if (ret) {
- PERROR("close");
- }
- }
- if (sock >= 0) {
- ret = close(sock);
- if (ret) {
- PERROR("close");
- }
- }
-
- unlink(consumer_data->err_unix_sock_path);
- unlink(consumer_data->cmd_unix_sock_path);
- pthread_mutex_unlock(&consumer_data->lock);
-
- /* Cleanup metadata socket mutex. */
- if (consumer_data->metadata_sock.lock) {
- pthread_mutex_destroy(consumer_data->metadata_sock.lock);
- free(consumer_data->metadata_sock.lock);
- }
- lttng_poll_clean(&events);
-
- if (cmd_socket_wrapper) {
- consumer_destroy_socket(cmd_socket_wrapper);
- }
-error_poll:
- if (err) {
- health_error();
- ERR("Health error occurred in %s", __func__);
- }
- health_unregister(health_sessiond);
- DBG("consumer thread cleanup completed");
-
- rcu_thread_offline();
- rcu_unregister_thread();
-
- return NULL;
-}
-
/*
* Setup necessary data for kernel tracer action.
*/
--- /dev/null
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <signal.h>
+
+#include <common/pipe.h>
+#include <common/utils.h>
+
+#include "manage-consumer.h"
+#include "testpoint.h"
+#include "health-sessiond.h"
+#include "utils.h"
+#include "thread.h"
+#include "ust-consumer.h"
+
+struct thread_notifiers {
+ struct lttng_pipe *quit_pipe;
+ struct consumer_data *consumer_data;
+ sem_t ready;
+ int initialization_result;
+};
+
+static void mark_thread_as_ready(struct thread_notifiers *notifiers)
+{
+ DBG("Marking consumer management thread as ready");
+ notifiers->initialization_result = 0;
+ sem_post(¬ifiers->ready);
+}
+
+static void mark_thread_intialization_as_failed(
+ struct thread_notifiers *notifiers)
+{
+ ERR("Consumer management thread entering error state");
+ notifiers->initialization_result = -1;
+ sem_post(¬ifiers->ready);
+}
+
+static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
+{
+ DBG("Waiting for consumer management thread to be ready");
+ sem_wait(¬ifiers->ready);
+ DBG("Consumer management thread is ready");
+}
+
+/*
+ * This thread manage the consumer error sent back to the session daemon.
+ */
+void *thread_consumer_management(void *data)
+{
+ int sock = -1, i, ret, pollfd, err = -1, should_quit = 0;
+ uint32_t revents, nb_fd;
+ enum lttcomm_return_code code;
+ struct lttng_poll_event events;
+ struct thread_notifiers *notifiers = data;
+ struct consumer_data *consumer_data = notifiers->consumer_data;
+ const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
+ struct consumer_socket *cmd_socket_wrapper = NULL;
+
+ DBG("[thread] Manage consumer started");
+
+ rcu_register_thread();
+ rcu_thread_online();
+
+ health_register(health_sessiond, HEALTH_SESSIOND_TYPE_CONSUMER);
+
+ health_code_update();
+
+ /*
+ * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
+ * metadata_sock. Nothing more will be added to this poll set.
+ */
+ ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC);
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error_poll;
+ }
+
+ ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ /*
+ * The error socket here is already in a listening state which was done
+ * just before spawning this thread to avoid a race between the consumer
+ * daemon exec trying to connect and the listen() call.
+ */
+ ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ health_code_update();
+
+ /* Infinite blocking call, waiting for transmission */
+ health_poll_entry();
+
+ if (testpoint(sessiond_thread_manage_consumer)) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ nb_fd = ret;
+
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ if (pollfd == quit_pipe_read_fd) {
+ err = 0;
+ mark_thread_intialization_as_failed(notifiers);
+ goto exit;
+ } else if (pollfd == consumer_data->err_sock) {
+ /* Event on the registration socket */
+ if (revents & LPOLLIN) {
+ continue;
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("consumer err socket poll error");
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+ }
+ }
+
+ sock = lttcomm_accept_unix_sock(consumer_data->err_sock);
+ if (sock < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ /*
+ * Set the CLOEXEC flag. Return code is useless because either way, the
+ * show must go on.
+ */
+ (void) utils_set_fd_cloexec(sock);
+
+ health_code_update();
+
+ DBG2("Receiving code from consumer err_sock");
+
+ /* Getting status code from kconsumerd */
+ ret = lttcomm_recv_unix_sock(sock, &code,
+ sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ health_code_update();
+ if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
+ ERR("consumer error when waiting for SOCK_READY : %s",
+ lttcomm_get_readable_code(-code));
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ /* Connect both command and metadata sockets. */
+ consumer_data->cmd_sock =
+ lttcomm_connect_unix_sock(
+ consumer_data->cmd_unix_sock_path);
+ consumer_data->metadata_fd =
+ lttcomm_connect_unix_sock(
+ consumer_data->cmd_unix_sock_path);
+ if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
+ PERROR("consumer connect cmd socket");
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
+
+ /* Create metadata socket lock. */
+ consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
+ if (consumer_data->metadata_sock.lock == NULL) {
+ PERROR("zmalloc pthread mutex");
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+ pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+
+ DBG("Consumer command socket ready (fd: %d)", consumer_data->cmd_sock);
+ DBG("Consumer metadata socket ready (fd: %d)",
+ consumer_data->metadata_fd);
+
+ /*
+ * Remove the consumerd error sock since we've established a connection.
+ */
+ ret = lttng_poll_del(&events, consumer_data->err_sock);
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ /* Add new accepted error socket. */
+ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ /* Add metadata socket that is successfully connected. */
+ ret = lttng_poll_add(&events, consumer_data->metadata_fd,
+ LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ health_code_update();
+
+ /*
+ * Transfer the write-end of the channel monitoring and rotate pipe
+ * to the consumer by issuing a SET_CHANNEL_MONITOR_PIPE command.
+ */
+ cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
+ if (!cmd_socket_wrapper) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+ cmd_socket_wrapper->lock = &consumer_data->lock;
+
+ ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
+ consumer_data->channel_monitor_pipe);
+ if (ret) {
+ mark_thread_intialization_as_failed(notifiers);
+ goto error;
+ }
+
+ /* Discard the socket wrapper as it is no longer needed. */
+ consumer_destroy_socket(cmd_socket_wrapper);
+ cmd_socket_wrapper = NULL;
+
+ /* The thread is completely initialized, signal that it is ready. */
+ mark_thread_as_ready(notifiers);
+
+ /* Infinite blocking call, waiting for transmission */
+ while (1) {
+ health_code_update();
+
+ /* Exit the thread because the thread quit pipe has been triggered. */
+ if (should_quit) {
+ /* Not a health error. */
+ err = 0;
+ goto exit;
+ }
+
+ health_poll_entry();
+ ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
+ if (ret < 0) {
+ goto error;
+ }
+
+ nb_fd = ret;
+
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ health_code_update();
+
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
+ /*
+ * Thread quit pipe has been triggered, flag that we should stop
+ * but continue the current loop to handle potential data from
+ * consumer.
+ */
+ if (pollfd == quit_pipe_read_fd) {
+ should_quit = 1;
+ } else if (pollfd == sock) {
+ /* Event on the consumerd socket */
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+ && !(revents & LPOLLIN)) {
+ ERR("consumer err socket second poll error");
+ goto error;
+ }
+ health_code_update();
+ /* Wait for any kconsumerd error */
+ ret = lttcomm_recv_unix_sock(sock, &code,
+ sizeof(enum lttcomm_return_code));
+ if (ret <= 0) {
+ ERR("consumer closed the command socket");
+ goto error;
+ }
+
+ ERR("consumer return code : %s",
+ lttcomm_get_readable_code(-code));
+
+ goto exit;
+ } else if (pollfd == consumer_data->metadata_fd) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+ && !(revents & LPOLLIN)) {
+ ERR("consumer err metadata socket second poll error");
+ goto error;
+ }
+ /* UST metadata requests */
+ ret = ust_consumer_metadata_request(
+ &consumer_data->metadata_sock);
+ if (ret < 0) {
+ ERR("Handling metadata request");
+ goto error;
+ }
+ }
+ /* No need for an else branch all FDs are tested prior. */
+ }
+ health_code_update();
+ }
+
+exit:
+error:
+ /*
+ * We lock here because we are about to close the sockets and some other
+ * thread might be using them so get exclusive access which will abort all
+ * other consumer command by other threads.
+ */
+ pthread_mutex_lock(&consumer_data->lock);
+
+ /* Immediately set the consumerd state to stopped */
+ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
+ uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
+ } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
+ consumer_data->type == LTTNG_CONSUMER32_UST) {
+ uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
+ } else {
+ /* Code flow error... */
+ assert(0);
+ }
+
+ if (consumer_data->err_sock >= 0) {
+ ret = close(consumer_data->err_sock);
+ if (ret) {
+ PERROR("close");
+ }
+ consumer_data->err_sock = -1;
+ }
+ if (consumer_data->cmd_sock >= 0) {
+ ret = close(consumer_data->cmd_sock);
+ if (ret) {
+ PERROR("close");
+ }
+ consumer_data->cmd_sock = -1;
+ }
+ if (consumer_data->metadata_sock.fd_ptr &&
+ *consumer_data->metadata_sock.fd_ptr >= 0) {
+ ret = close(*consumer_data->metadata_sock.fd_ptr);
+ if (ret) {
+ PERROR("close");
+ }
+ }
+ if (sock >= 0) {
+ ret = close(sock);
+ if (ret) {
+ PERROR("close");
+ }
+ }
+
+ unlink(consumer_data->err_unix_sock_path);
+ unlink(consumer_data->cmd_unix_sock_path);
+ pthread_mutex_unlock(&consumer_data->lock);
+
+ /* Cleanup metadata socket mutex. */
+ if (consumer_data->metadata_sock.lock) {
+ pthread_mutex_destroy(consumer_data->metadata_sock.lock);
+ free(consumer_data->metadata_sock.lock);
+ }
+ lttng_poll_clean(&events);
+
+ if (cmd_socket_wrapper) {
+ consumer_destroy_socket(cmd_socket_wrapper);
+ }
+error_poll:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_sessiond);
+ DBG("consumer thread cleanup completed");
+
+ rcu_thread_offline();
+ rcu_unregister_thread();
+
+ return NULL;
+}
+
+static bool shutdown_consumer_management_thread(void *data)
+{
+ struct thread_notifiers *notifiers = data;
+ const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
+
+ return notify_thread_pipe(write_fd) == 1;
+}
+
+static void cleanup_consumer_management_thread(void *data)
+{
+ struct thread_notifiers *notifiers = data;
+
+ lttng_pipe_destroy(notifiers->quit_pipe);
+ free(notifiers);
+}
+
+bool launch_consumer_management_thread(struct consumer_data *consumer_data)
+{
+ struct lttng_pipe *quit_pipe;
+ struct thread_notifiers *notifiers = NULL;
+ struct lttng_thread *thread;
+
+ quit_pipe = lttng_pipe_open(FD_CLOEXEC);
+ if (!quit_pipe) {
+ goto error;
+ }
+
+ notifiers = zmalloc(sizeof(*notifiers));
+ if (!notifiers) {
+ goto error;
+ }
+ notifiers->quit_pipe = quit_pipe;
+ notifiers->consumer_data = consumer_data;
+ sem_init(¬ifiers->ready, 0, 0);
+
+ thread = lttng_thread_create("Consumer management",
+ thread_consumer_management,
+ shutdown_consumer_management_thread,
+ cleanup_consumer_management_thread,
+ notifiers);
+ if (!thread) {
+ goto error;
+ }
+ wait_until_thread_is_ready(notifiers);
+ lttng_thread_put(thread);
+ if (notifiers->initialization_result) {
+ goto error;
+ }
+ return true;
+error:
+ cleanup_consumer_management_thread(notifiers);
+ return false;
+}
--- /dev/null
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef SESSIOND_CONSUMER_MANAGEMENT_THREAD_H
+#define SESSIOND_CONSUMER_MANAGEMENT_THREAD_H
+
+#include <stdbool.h>
+#include "lttng-sessiond.h"
+
+bool launch_consumer_management_thread(struct consumer_data *consumer_data);
+
+#endif /* SESSIOND_CONSUMER_MANAGEMENT_THREAD_H */