#include "agent-thread.h"
#include "save.h"
#include "load-session-thread.h"
+#include "notification-thread.h"
+#include "notification-thread-commands.h"
#include "syscall.h"
#include "agent.h"
#include "ht-cleanup.h"
#define CONSUMERD_FILE "lttng-consumerd"
const char *progname;
-static const char *tracing_group_name = DEFAULT_TRACING_GROUP;
+const char *tracing_group_name = DEFAULT_TRACING_GROUP;
static int tracing_group_name_override;
static char *opt_pidfile;
static int opt_sig_parent;
.cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH,
.err_sock = -1,
.cmd_sock = -1,
+ .channel_monitor_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
.cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH,
.err_sock = -1,
.cmd_sock = -1,
+ .channel_monitor_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
.cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH,
.err_sock = -1,
.cmd_sock = -1,
+ .channel_monitor_pipe = -1,
.pid_mutex = PTHREAD_MUTEX_INITIALIZER,
.lock = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER,
static pthread_t ht_cleanup_thread;
static pthread_t agent_reg_thread;
static pthread_t load_session_thread;
+static pthread_t notification_thread;
/*
* UST registration command queue. This queue is tied with a futex and uses a N
/* Load session thread information to operate. */
struct load_session_thread_data *load_info;
+/* Notification thread handle. */
+struct notification_thread_handle *notification_thread_handle;
+
/* Global hash tables */
struct lttng_ht *agent_apps_ht_by_sock = NULL;
/*
- * Whether sessiond is ready for commands/health check requests.
+ * Whether sessiond is ready for commands/notification channel/health check
+ * requests.
* NR_LTTNG_SESSIOND_READY must match the number of calls to
* sessiond_notify_ready().
*/
-#define NR_LTTNG_SESSIOND_READY 3
+#define NR_LTTNG_SESSIOND_READY 4
int lttng_sessiond_ready = NR_LTTNG_SESSIOND_READY;
int sessiond_check_thread_quit_pipe(int fd, uint32_t events)
PERROR("UST consumerd64 cmd_sock close");
}
}
+ if (kconsumer_data.channel_monitor_pipe >= 0) {
+ ret = close(kconsumer_data.channel_monitor_pipe);
+ if (ret < 0) {
+ PERROR("kernel consumer channel monitor pipe close");
+ }
+ }
+ if (ustconsumer32_data.channel_monitor_pipe >= 0) {
+ ret = close(ustconsumer32_data.channel_monitor_pipe);
+ if (ret < 0) {
+ PERROR("UST consumerd32 channel monitor pipe close");
+ }
+ }
+ if (ustconsumer64_data.channel_monitor_pipe >= 0) {
+ ret = close(ustconsumer64_data.channel_monitor_pipe);
+ if (ret < 0) {
+ PERROR("UST consumerd64 channel monitor pipe close");
+ }
+ }
}
/*
enum lttcomm_return_code code;
struct lttng_poll_event events;
struct consumer_data *consumer_data = data;
+ struct consumer_socket *cmd_socket_wrapper = NULL;
DBG("[thread] Manage consumer started");
}
health_code_update();
- if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
- /* Connect both socket, command and metadata. */
- consumer_data->cmd_sock =
- lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
- consumer_data->metadata_fd =
- lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
- if (consumer_data->cmd_sock < 0
- || consumer_data->metadata_fd < 0) {
- PERROR("consumer connect cmd socket");
- /* On error, signal condition and quit. */
- signal_consumer_condition(consumer_data, -1);
- goto error;
- }
- consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
- /* Create metadata socket lock. */
- consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
- if (consumer_data->metadata_sock.lock == NULL) {
- PERROR("zmalloc pthread mutex");
- goto error;
- }
- pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
-
- signal_consumer_condition(consumer_data, 1);
- DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
- DBG("Consumer metadata socket ready (fd: %d)",
- consumer_data->metadata_fd);
- } else {
+ if (code != LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
ERR("consumer error when waiting for SOCK_READY : %s",
lttcomm_get_readable_code(-code));
goto error;
}
- /* Remove the consumerd error sock since we've established a connexion */
+ /* Connect both command and metadata sockets. */
+ consumer_data->cmd_sock =
+ lttcomm_connect_unix_sock(
+ consumer_data->cmd_unix_sock_path);
+ consumer_data->metadata_fd =
+ lttcomm_connect_unix_sock(
+ consumer_data->cmd_unix_sock_path);
+ if (consumer_data->cmd_sock < 0 || consumer_data->metadata_fd < 0) {
+ PERROR("consumer connect cmd socket");
+ /* On error, signal condition and quit. */
+ signal_consumer_condition(consumer_data, -1);
+ goto error;
+ }
+
+ consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd;
+
+ /* Create metadata socket lock. */
+ consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
+ if (consumer_data->metadata_sock.lock == NULL) {
+ PERROR("zmalloc pthread mutex");
+ goto error;
+ }
+ pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+
+ DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
+ DBG("Consumer metadata socket ready (fd: %d)",
+ consumer_data->metadata_fd);
+
+ /*
+ * Remove the consumerd error sock since we've established a connection.
+ */
ret = lttng_poll_del(&events, consumer_data->err_sock);
if (ret < 0) {
goto error;
health_code_update();
+ /*
+ * Transfer the write-end of the channel monitoring pipe to the
+ * by issuing a SET_CHANNEL_MONITOR_PIPE command.
+ */
+ cmd_socket_wrapper = consumer_allocate_socket(&consumer_data->cmd_sock);
+ if (!cmd_socket_wrapper) {
+ goto error;
+ }
+
+ ret = consumer_send_channel_monitor_pipe(cmd_socket_wrapper,
+ consumer_data->channel_monitor_pipe);
+ if (ret) {
+ goto error;
+ }
+ /* Discard the socket wrapper as it is no longer needed. */
+ consumer_destroy_socket(cmd_socket_wrapper);
+ cmd_socket_wrapper = NULL;
+
+ /* The thread is completely initialized, signal that it is ready. */
+ signal_consumer_condition(consumer_data, 1);
+
/* Infinite blocking call, waiting for transmission */
restart_poll:
while (1) {
free(consumer_data->metadata_sock.lock);
}
lttng_poll_clean(&events);
+
+ if (cmd_socket_wrapper) {
+ consumer_destroy_socket(cmd_socket_wrapper);
+ }
error_poll:
if (err) {
health_error();
case LTTNG_SET_SESSION_SHM_PATH:
case LTTNG_REGENERATE_METADATA:
case LTTNG_REGENERATE_STATEDUMP:
+ case LTTNG_REGISTER_TRIGGER:
+ case LTTNG_UNREGISTER_TRIGGER:
need_domain = 0;
break;
default:
case LTTNG_LIST_SYSCALLS:
case LTTNG_LIST_TRACEPOINT_FIELDS:
case LTTNG_SAVE_SESSION:
+ case LTTNG_REGISTER_TRIGGER:
+ case LTTNG_UNREGISTER_TRIGGER:
need_tracing_session = 0;
break;
default:
}
case LTTNG_ENABLE_CHANNEL:
{
+ cmd_ctx->lsm->u.channel.chan.attr.extended.ptr =
+ (struct lttng_channel_extended *) &cmd_ctx->lsm->u.channel.extended;
ret = cmd_enable_channel(cmd_ctx->session, &cmd_ctx->lsm->domain,
- &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]);
+ &cmd_ctx->lsm->u.channel.chan,
+ kernel_poll_pipe[1]);
break;
}
case LTTNG_TRACK_PID:
ret = cmd_regenerate_statedump(cmd_ctx->session);
break;
}
+ case LTTNG_REGISTER_TRIGGER:
+ {
+ ret = cmd_register_trigger(cmd_ctx, sock,
+ notification_thread_handle);
+ break;
+ }
+ case LTTNG_UNREGISTER_TRIGGER:
+ {
+ ret = cmd_unregister_trigger(cmd_ctx, sock,
+ notification_thread_handle);
+ break;
+ }
default:
ret = LTTNG_ERR_UND;
break;
int ret = 0, retval = 0;
void *status;
const char *home_path, *env_app_timeout;
+ struct lttng_pipe *ust32_channel_monitor_pipe = NULL,
+ *ust64_channel_monitor_pipe = NULL,
+ *kernel_channel_monitor_pipe = NULL;
init_kernel_workarounds();
kconsumer_data.err_unix_sock_path);
DBG2("Kernel consumer cmd path: %s",
kconsumer_data.cmd_unix_sock_path);
+ kernel_channel_monitor_pipe = lttng_pipe_open(0);
+ if (!kernel_channel_monitor_pipe) {
+ ERR("Failed to create kernel consumer channel monitor pipe");
+ retval = -1;
+ goto exit_init_data;
+ }
+ kconsumer_data.channel_monitor_pipe =
+ lttng_pipe_release_writefd(
+ kernel_channel_monitor_pipe);
+ if (kconsumer_data.channel_monitor_pipe < 0) {
+ retval = -1;
+ goto exit_init_data;
+ }
} else {
home_path = utils_get_home_dir();
if (home_path == NULL) {
ustconsumer32_data.err_unix_sock_path);
DBG2("UST consumer 32 bits cmd path: %s",
ustconsumer32_data.cmd_unix_sock_path);
+ ust32_channel_monitor_pipe = lttng_pipe_open(0);
+ if (!ust32_channel_monitor_pipe) {
+ ERR("Failed to create 32-bit user space consumer channel monitor pipe");
+ retval = -1;
+ goto exit_init_data;
+ }
+ ustconsumer32_data.channel_monitor_pipe = lttng_pipe_release_writefd(
+ ust32_channel_monitor_pipe);
+ if (ustconsumer32_data.channel_monitor_pipe < 0) {
+ retval = -1;
+ goto exit_init_data;
+ }
/* 64 bits consumerd path setup */
ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX,
ustconsumer64_data.err_unix_sock_path);
DBG2("UST consumer 64 bits cmd path: %s",
ustconsumer64_data.cmd_unix_sock_path);
+ ust64_channel_monitor_pipe = lttng_pipe_open(0);
+ if (!ust64_channel_monitor_pipe) {
+ ERR("Failed to create 64-bit user space consumer channel monitor pipe");
+ retval = -1;
+ goto exit_init_data;
+ }
+ ustconsumer64_data.channel_monitor_pipe = lttng_pipe_release_writefd(
+ ust64_channel_monitor_pipe);
+ if (ustconsumer64_data.channel_monitor_pipe < 0) {
+ retval = -1;
+ goto exit_init_data;
+ }
/*
* See if daemon already exist.
goto exit_health;
}
+ /* notification_thread_data acquires the pipes' read side. */
+ notification_thread_handle = notification_thread_handle_create(
+ ust32_channel_monitor_pipe,
+ ust64_channel_monitor_pipe,
+ kernel_channel_monitor_pipe);
+ if (!notification_thread_handle) {
+ retval = -1;
+ ERR("Failed to create notification thread shared data");
+ stop_threads();
+ goto exit_notification;
+ }
+
+ /* Create notification thread. */
+ ret = pthread_create(¬ification_thread, default_pthread_attr(),
+ thread_notification, notification_thread_handle);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_create notification");
+ retval = -1;
+ stop_threads();
+ goto exit_notification;
+ }
+
/* Create thread to manage the client socket */
ret = pthread_create(&client_thread, default_pthread_attr(),
thread_manage_clients, (void *) NULL);
errno = ret;
PERROR("pthread_create clients");
retval = -1;
+ stop_threads();
goto exit_client;
}
errno = ret;
PERROR("pthread_create dispatch");
retval = -1;
+ stop_threads();
goto exit_dispatch;
}
errno = ret;
PERROR("pthread_create registration");
retval = -1;
+ stop_threads();
goto exit_reg_apps;
}
errno = ret;
PERROR("pthread_create apps");
retval = -1;
+ stop_threads();
goto exit_apps;
}
errno = ret;
PERROR("pthread_create notify");
retval = -1;
+ stop_threads();
goto exit_apps_notify;
}
errno = ret;
PERROR("pthread_create agent");
retval = -1;
+ stop_threads();
goto exit_agent_reg;
}
errno = ret;
PERROR("pthread_create kernel");
retval = -1;
+ stop_threads();
goto exit_kernel;
}
}
errno = ret;
PERROR("pthread_create load_session_thread");
retval = -1;
+ stop_threads();
goto exit_load_session;
}
PERROR("pthread_join");
retval = -1;
}
-exit_client:
+exit_client:
+exit_notification:
ret = pthread_join(health_thread, &status);
if (ret) {
errno = ret;
PERROR("pthread_join health thread");
retval = -1;
}
-exit_health:
+exit_health:
exit_init_data:
/*
* Wait for all pending call_rcu work to complete before tearing
*/
rcu_thread_online();
sessiond_cleanup();
- rcu_thread_offline();
- rcu_unregister_thread();
/*
* Ensure all prior call_rcu are done. call_rcu callbacks may push
*/
rcu_barrier();
+ /*
+ * The teardown of the notification system is performed after the
+ * session daemon's teardown in order to allow it to be notified
+ * of the active session and channels at the moment of the teardown.
+ */
+ if (notification_thread_handle) {
+ notification_thread_command_quit(notification_thread_handle);
+ notification_thread_handle_destroy(notification_thread_handle);
+ }
+
+ ret = pthread_join(notification_thread, &status);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join notification thread");
+ retval = -1;
+ }
+
+ rcu_thread_offline();
+ rcu_unregister_thread();
+
ret = fini_ht_cleanup_thread(&ht_cleanup_thread);
if (ret) {
retval = -1;
}
+ lttng_pipe_destroy(ust32_channel_monitor_pipe);
+ lttng_pipe_destroy(ust64_channel_monitor_pipe);
+ lttng_pipe_destroy(kernel_channel_monitor_pipe);
exit_ht_cleanup:
health_app_destroy(health_sessiond);
sessiond_cleanup_options();
exit_set_signal_handler:
-
if (!retval) {
exit(EXIT_SUCCESS);
} else {