#include <sys/types.h>
#include <sys/wait.h>
#include <urcu/futex.h>
+ #include <urcu/uatomic.h>
#include <unistd.h>
#include <config.h>
char cmd_unix_sock_path[PATH_MAX];
};
+#include "benchmark.h"
+
/* Const values */
const char default_home_dir[] = DEFAULT_HOME_DIR;
const char default_tracing_group[] = DEFAULT_TRACING_GROUP;
static const char *consumerd32_libdir = CONFIG_CONSUMERD32_LIBDIR;
static const char *consumerd64_libdir = CONFIG_CONSUMERD64_LIBDIR;
+ /*
+ * Consumer daemon state which is changed when spawning it, killing it or in
+ * case of a fatal error.
+ */
+ enum consumerd_state {
+ CONSUMER_STARTED = 1,
+ CONSUMER_STOPPED = 2,
+ CONSUMER_ERROR = 3,
+ };
+
+ /*
+ * This consumer daemon state is used to validate if a client command will be
+ * able to reach the consumer. If not, the client is informed. For instance,
+ * doing a "lttng start" when the consumer state is set to ERROR will return an
+ * error to the client.
+ *
+ * The following example shows a possible race condition of this scheme:
+ *
+ * consumer thread error happens
+ * client cmd arrives
+ * client cmd checks state -> still OK
+ * consumer thread exit, sets error
+ * client cmd try to talk to consumer
+ * ...
+ *
+ * However, since the consumer is a different daemon, we have no way of making
+ * sure the command will reach it safely even with this state flag. This is why
+ * we consider that up to the state validation during command processing, the
+ * command is safe. After that, we can not guarantee the correctness of the
+ * client request vis-a-vis the consumer.
+ */
+ static enum consumerd_state ust_consumerd_state;
+ static enum consumerd_state kernel_consumerd_state;
+
static
void setup_consumerd_path(void)
{
if (ret) {
PERROR("close");
}
-
}
}
for (i = 0; i < 2; i++) {
}
}
+ /* OUTPUT BENCHMARK RESULTS */
+ //bench_init();
+
+ if (getenv("BENCH_UST_NOTIFY")) {
+ bench_print_ust_notification();
+ }
+
+ if (getenv("BENCH_UST_REGISTER")) {
+ bench_print_ust_register();
+ bench_print_ust_unregister();
+ }
+
+ if (getenv("BENCH_BOOT_PROCESS")) {
+ bench_print_boot_process();
+ }
+
+ if (getenv("BENCH_COMMANDS")) {
+ bench_print_enable_ust_event();
+ }
+
+ bench_close();
+ /* END BENCHMARK */
+
/* <fun> */
DBG("%c[%d;%dm*** assert failed :-) *** ==> %c[%dm%c[%d;%dm"
"Matthew, BEET driven development works!%c[%dm",
goto error;
}
+ tracepoint(ust_notify_apps_start);
+
/* Wake waiting process */
futex_wait_update((int32_t *) wait_shm_mmap, active);
+ tracepoint(ust_notify_apps_stop);
+
/* Apps notified successfully */
return 0;
char tmp;
struct lttng_poll_event events;
+ tracepoint(sessiond_th_kern_start);
+
DBG("Thread manage kernel started");
ret = create_thread_poll_set(&events, 2);
/* Zeroed the poll events */
lttng_poll_reset(&events);
+ tracepoint(sessiond_th_kern_poll);
+
/* Poll infinite value of time */
restart:
ret = lttng_poll_wait(&events, -1);
struct lttng_poll_event events;
struct consumer_data *consumer_data = data;
+ tracepoint(sessiond_th_kcon_start);
+
DBG("[thread] Manage consumer started");
ret = lttcomm_listen_unix_sock(consumer_data->err_sock);
nb_fd = LTTNG_POLL_GETNB(&events);
+ tracepoint(sessiond_th_kcon_poll);
+
/* Inifinite blocking call, waiting for transmission */
restart:
ret = lttng_poll_wait(&events, -1);
ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
error:
+ /* Immediately set the consumerd state to stopped */
+ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
+ uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR);
+ } else if (consumer_data->type == LTTNG_CONSUMER64_UST ||
+ consumer_data->type == LTTNG_CONSUMER32_UST) {
+ uatomic_set(&ust_consumerd_state, CONSUMER_ERROR);
+ } else {
+ /* Code flow error... */
+ assert(0);
+ }
+
if (consumer_data->err_sock >= 0) {
ret = close(consumer_data->err_sock);
if (ret) {
struct ust_command ust_cmd;
struct lttng_poll_event events;
+ tracepoint(sessiond_th_apps_start);
+
DBG("[thread] Manage application started");
rcu_register_thread();
DBG("Apps thread polling on %d fds", nb_fd);
+ tracepoint(sessiond_th_apps_poll);
+
/* Inifinite blocking call, waiting for transmission */
restart:
ret = lttng_poll_wait(&events, -1);
ERR("Apps command pipe error");
goto error;
} else if (revents & LPOLLIN) {
+ system("sysctl vm.drop_caches=3");
+ tracepoint(ust_register_read_start);
/* Empty pipe */
ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
if (ret < 0 || ret < sizeof(ust_cmd)) {
PERROR("read apps cmd pipe");
goto error;
}
+ tracepoint(ust_register_read_stop);
+ tracepoint(ust_register_add_start);
/* Register applicaton to the session daemon */
ret = ust_app_register(&ust_cmd.reg_msg,
ust_cmd.sock);
} else if (ret < 0) {
break;
}
+ tracepoint(ust_register_add_stop);
/*
* Validate UST version compatibility.
update_ust_app(ust_cmd.sock);
}
+ tracepoint(ust_register_done_start);
ret = ust_app_register_done(ust_cmd.sock);
if (ret < 0) {
/*
*/
ust_app_unregister(ust_cmd.sock);
} else {
+ tracepoint(ust_register_done_stop);
/*
* We just need here to monitor the close of the UST
* socket and poll set monitor those by default.
DBG("Apps with sock %d added to poll set",
ust_cmd.sock);
}
-
break;
}
} else {
* the event at poll_wait.
*/
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ tracepoint(ust_unregister_start);
+
/* Removing from the poll set */
ret = lttng_poll_del(&events, pollfd);
if (ret < 0) {
/* Socket closed on remote end. */
ust_app_unregister(pollfd);
+
+ tracepoint(ust_unregister_stop);
break;
}
}
struct cds_wfq_node *node;
struct ust_command *ust_cmd = NULL;
+ tracepoint(sessiond_th_dispatch_start);
+
DBG("[thread] Dispatch UST command started");
while (!dispatch_thread_exit) {
futex_nto1_prepare(&ust_cmd_queue.futex);
do {
+ tracepoint(sessiond_th_dispatch_block);
+
/* Dequeue command for registration */
node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue);
if (node == NULL) {
break;
}
+ tracepoint(ust_dispatch_register_start);
+
ust_cmd = caa_container_of(node, struct ust_command, node);
DBG("Dispatching UST registration pid:%d ppid:%d uid:%d"
free(ust_cmd);
} while (node != NULL);
+ tracepoint(ust_dispatch_register_stop);
+
/* Futex wait on queue. Blocking call on futex() */
futex_nto1_wait(&ust_cmd_queue.futex);
}
*/
struct ust_command *ust_cmd = NULL;
+ tracepoint(sessiond_th_reg_start);
+
DBG("[thread] Manage application registration started");
ret = lttcomm_listen_unix_sock(apps_sock);
while (1) {
DBG("Accepting application registration");
+ tracepoint(sessiond_th_reg_poll);
+
nb_fd = LTTNG_POLL_GETNB(&events);
/* Inifinite blocking call, waiting for transmission */
ERR("Register apps socket poll error");
goto error;
} else if (revents & LPOLLIN) {
+ /* Registration starts here. Recording cycles */
+ tracepoint(ust_register_start);
+
sock = lttcomm_accept_unix_sock(apps_sock);
if (sock < 0) {
goto error;
* barrier with the exchange in cds_wfq_enqueue.
*/
futex_nto1_wake(&ust_cmd_queue.futex);
+
+ tracepoint(ust_register_stop);
}
}
}
}
}
+ /* Consumer is in an ERROR state. Report back to client */
+ if (uatomic_read(&kernel_consumerd_state) == CONSUMER_ERROR) {
+ ret = LTTCOMM_NO_KERNCONSUMERD;
+ goto error;
+ }
+
/* Need a session for kernel command */
if (need_tracing_session) {
if (cmd_ctx->session->kernel_session == NULL) {
ret = LTTCOMM_KERN_CONSUMER_FAIL;
goto error;
}
+ uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED);
} else {
pthread_mutex_unlock(&kconsumer_data.pid_mutex);
}
}
+
break;
case LTTNG_DOMAIN_UST:
{
+ /* Consumer is in an ERROR state. Report back to client */
+ if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) {
+ ret = LTTCOMM_NO_USTCONSUMERD;
+ goto error;
+ }
+
if (need_tracing_session) {
if (cmd_ctx->session->ust_session == NULL) {
ret = create_ust_session(cmd_ctx->session,
}
ust_consumerd64_fd = ustconsumer64_data.cmd_sock;
+ uatomic_set(&ust_consumerd_state, CONSUMER_STARTED);
} else {
pthread_mutex_unlock(&ustconsumer64_data.pid_mutex);
}
ust_consumerd32_fd = -EINVAL;
goto error;
}
+
ust_consumerd32_fd = ustconsumer32_data.cmd_sock;
+ uatomic_set(&ust_consumerd_state, CONSUMER_STARTED);
} else {
pthread_mutex_unlock(&ustconsumer32_data.pid_mutex);
}
}
skip_domain:
+ /* Validate consumer daemon state when start/stop trace command */
+ if (cmd_ctx->lsm->cmd_type == LTTNG_START_TRACE ||
+ cmd_ctx->lsm->cmd_type == LTTNG_STOP_TRACE) {
+ switch (cmd_ctx->lsm->domain.type) {
+ case LTTNG_DOMAIN_UST:
+ if (uatomic_read(&ust_consumerd_state) != CONSUMER_STARTED) {
+ ret = LTTCOMM_NO_USTCONSUMERD;
+ goto error;
+ }
+ break;
+ case LTTNG_DOMAIN_KERNEL:
+ if (uatomic_read(&kernel_consumerd_state) != CONSUMER_STARTED) {
+ ret = LTTCOMM_NO_KERNCONSUMERD;
+ goto error;
+ }
+ break;
+ }
+ }
+
/*
* Check that the UID or GID match that of the tracing session.
* The root user can interact with all sessions.
}
case LTTNG_ENABLE_CHANNEL:
{
+ tracepoint(enable_ust_channel_start);
ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type,
&cmd_ctx->lsm->u.channel.chan);
+ tracepoint(enable_ust_channel_end);
+ bench_print_enable_ust_channel();
break;
}
case LTTNG_ENABLE_EVENT:
{
+ tracepoint(enable_ust_event_start);
ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type,
cmd_ctx->lsm->u.enable.channel_name,
&cmd_ctx->lsm->u.enable.event);
+ tracepoint(enable_ust_event_end);
break;
}
case LTTNG_ENABLE_ALL_EVENT:
{
DBG("Enabling all events");
+ tracepoint(enable_ust_event_start);
ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type,
cmd_ctx->lsm->u.enable.channel_name,
cmd_ctx->lsm->u.enable.event.type);
+ tracepoint(enable_ust_event_end);
break;
}
case LTTNG_LIST_TRACEPOINTS:
}
case LTTNG_START_TRACE:
{
+ tracepoint(start_ust_start);
ret = cmd_start_trace(cmd_ctx->session);
+ tracepoint(start_ust_end);
+ bench_print_start_ust();
break;
}
case LTTNG_STOP_TRACE:
}
case LTTNG_CREATE_SESSION:
{
+ tracepoint(create_session_start);
ret = cmd_create_session(cmd_ctx->lsm->session.name,
cmd_ctx->lsm->session.path, &cmd_ctx->creds);
+ tracepoint(create_session_end);
+ bench_print_create_session();
break;
}
case LTTNG_DESTROY_SESSION:
{
+ tracepoint(destroy_session_start);
ret = cmd_destroy_session(cmd_ctx->session,
cmd_ctx->lsm->session.name);
+ tracepoint(destroy_session_end);
/*
* Set session to NULL so we do not unlock it after
* free.
struct command_ctx *cmd_ctx = NULL;
struct lttng_poll_event events;
+ tracepoint(sessiond_th_cli_start);
+
DBG("[thread] Manage client started");
rcu_register_thread();
while (1) {
DBG("Accepting client command ...");
+ tracepoint(sessiond_th_cli_poll);
+
nb_fd = LTTNG_POLL_GETNB(&events);
/* Inifinite blocking call, waiting for transmission */
void *status;
const char *home_path;
+ tracepoint(sessiond_boot_start);
+
init_kernel_workarounds();
rcu_register_thread();
}
}
+ /* Set consumer initial state */
+ kernel_consumerd_state = CONSUMER_STOPPED;
+ ust_consumerd_state = CONSUMER_STOPPED;
+
DBG("Client socket path %s", client_unix_sock_path);
DBG("Application socket path %s", apps_unix_sock_path);
DBG("LTTng run directory path: %s", rundir);
/* Set up max poll set size */
lttng_poll_set_max_size();
+ bench_init();
+
/* Create thread to manage the client socket */
ret = pthread_create(&client_thread, NULL,
thread_manage_clients, (void *) NULL);
goto exit_kernel;
}
+ tracepoint(sessiond_boot_end);
+
ret = pthread_join(kernel_thread, &status);
if (ret != 0) {
PERROR("pthread_join");