Add health thread to relayd
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Oct 2013 14:21:14 +0000 (10:21 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Oct 2013 15:59:08 +0000 (11:59 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/bin/lttng-relayd/Makefile.am
src/bin/lttng-relayd/health-relayd.c [new file with mode: 0644]
src/bin/lttng-relayd/health-relayd.h
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c
src/common/defaults.h

index fc8e6d055be2286c4df7149747528a0e5742df69..1e675454ccea25d9294f72c19f1a056d9a6dc16e 100644 (file)
@@ -12,7 +12,7 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \
                        cmd-2-1.c cmd-2-1.h \
                        cmd-2-2.c cmd-2-2.h \
                        cmd-2-4.c cmd-2-4.h \
-                       health-relayd.h
+                       health-relayd.c health-relayd.h
 
 # link on liblttngctl for check if relayd is already alive.
 lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
diff --git a/src/bin/lttng-relayd/health-relayd.c b/src/bin/lttng-relayd/health-relayd.c
new file mode 100644 (file)
index 0000000..f2b167c
--- /dev/null
@@ -0,0 +1,391 @@
+/*
+ * Copyright (C) 2013 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#include <fcntl.h>
+#include <getopt.h>
+#include <grp.h>
+#include <limits.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ipc.h>
+#include <sys/resource.h>
+#include <sys/shm.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <urcu/list.h>
+#include <poll.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <assert.h>
+#include <config.h>
+#include <urcu/compiler.h>
+#include <ulimit.h>
+#include <inttypes.h>
+
+#include <common/defaults.h>
+#include <common/common.h>
+#include <common/consumer.h>
+#include <common/consumer-timer.h>
+#include <common/compat/poll.h>
+#include <common/sessiond-comm/sessiond-comm.h>
+#include <common/utils.h>
+
+#include "lttng-relayd.h"
+#include "health-relayd.h"
+
+/* Global health check unix path */
+static char health_unix_sock_path[PATH_MAX];
+
+int health_quit_pipe[2];
+
+/*
+ * Check if the thread quit pipe was triggered.
+ *
+ * Return 1 if it was triggered else 0;
+ */
+static
+int check_health_quit_pipe(int fd, uint32_t events)
+{
+       if (fd == health_quit_pipe[0] && (events & LPOLLIN)) {
+               return 1;
+       }
+
+       return 0;
+}
+
+/*
+ * Send data on a unix socket using the liblttsessiondcomm API.
+ *
+ * Return lttcomm error code.
+ */
+static int send_unix_sock(int sock, void *buf, size_t len)
+{
+       /* Check valid length */
+       if (len == 0) {
+               return -1;
+       }
+
+       return lttcomm_send_unix_sock(sock, buf, len);
+}
+
+static int create_lttng_rundir_with_perm(const char *rundir)
+{
+       int ret;
+
+       DBG3("Creating LTTng run directory: %s", rundir);
+
+       ret = mkdir(rundir, S_IRWXU);
+       if (ret < 0) {
+               if (errno != EEXIST) {
+                       ERR("Unable to create %s", rundir);
+                       goto error;
+               } else {
+                       ret = 0;
+               }
+       } else if (ret == 0) {
+               int is_root = !getuid();
+
+               if (is_root) {
+                       ret = chown(rundir, 0,
+                                       utils_get_group_id(tracing_group_name));
+                       if (ret < 0) {
+                               ERR("Unable to set group on %s", rundir);
+                               PERROR("chown");
+                               ret = -1;
+                               goto error;
+                       }
+
+                       ret = chmod(rundir,
+                                       S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
+                       if (ret < 0) {
+                               ERR("Unable to set permissions on %s", health_unix_sock_path);
+                               PERROR("chmod");
+                               ret = -1;
+                               goto error;
+                       }
+               }
+       }
+
+error:
+       return ret;
+}
+
+static
+int setup_health_path(void)
+{
+       int is_root, ret = 0;
+       char *home_path = NULL, *rundir, *relayd_path;
+
+       is_root = !getuid();
+
+       if (is_root) {
+               rundir = strdup(DEFAULT_LTTNG_RUNDIR);
+       } else {
+               /*
+                * Create rundir from home path. This will create something like
+                * $HOME/.lttng
+                */
+               home_path = utils_get_home_dir();
+
+               if (home_path == NULL) {
+                       /* TODO: Add --socket PATH option */
+                       ERR("Can't get HOME directory for sockets creation.");
+                       ret = -EPERM;
+                       goto end;
+               }
+
+               ret = asprintf(&rundir, DEFAULT_LTTNG_HOME_RUNDIR, home_path);
+               if (ret < 0) {
+                       ret = -ENOMEM;
+                       goto end;
+               }
+       }
+
+       ret = asprintf(&relayd_path, DEFAULT_RELAYD_PATH, rundir);
+       if (ret < 0) {
+               ret = -ENOMEM;
+               goto end;
+       }
+
+       ret = create_lttng_rundir_with_perm(rundir);
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = create_lttng_rundir_with_perm(relayd_path);
+       if (ret < 0) {
+               goto end;
+       }
+
+       if (is_root) {
+               if (strlen(health_unix_sock_path) != 0) {
+                       goto end;
+               }
+               snprintf(health_unix_sock_path, sizeof(health_unix_sock_path),
+                       DEFAULT_GLOBAL_RELAY_HEALTH_UNIX_SOCK,
+                       getpid());
+       } else {
+               /* Set health check Unix path */
+               if (strlen(health_unix_sock_path) != 0) {
+                       goto end;
+               }
+
+               snprintf(health_unix_sock_path, sizeof(health_unix_sock_path),
+                       DEFAULT_HOME_RELAY_HEALTH_UNIX_SOCK,
+                       home_path, getpid());
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Thread managing health check socket.
+ */
+void *thread_manage_health(void *data)
+{
+       int sock = -1, new_sock = -1, ret, i, pollfd, err = -1;
+       uint32_t revents, nb_fd;
+       struct lttng_poll_event events;
+       struct health_comm_msg msg;
+       struct health_comm_reply reply;
+       int is_root;
+
+       DBG("[thread] Manage health check started");
+
+       setup_health_path();
+
+       rcu_register_thread();
+
+       /* We might hit an error path before this is created. */
+       lttng_poll_init(&events);
+
+       /* Create unix socket */
+       sock = lttcomm_create_unix_sock(health_unix_sock_path);
+       if (sock < 0) {
+               ERR("Unable to create health check Unix socket");
+               ret = -1;
+               goto error;
+       }
+
+       is_root = !getuid();
+       if (is_root) {
+               /* lttng health client socket path permissions */
+               ret = chown(health_unix_sock_path, 0,
+                               utils_get_group_id(tracing_group_name));
+               if (ret < 0) {
+                       ERR("Unable to set group on %s", health_unix_sock_path);
+                       PERROR("chown");
+                       ret = -1;
+                       goto error;
+               }
+
+               ret = chmod(health_unix_sock_path,
+                               S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+               if (ret < 0) {
+                       ERR("Unable to set permissions on %s", health_unix_sock_path);
+                       PERROR("chmod");
+                       ret = -1;
+                       goto error;
+               }
+       }
+
+       /*
+        * Set the CLOEXEC flag. Return code is useless because either way, the
+        * show must go on.
+        */
+       (void) utils_set_fd_cloexec(sock);
+
+       ret = lttcomm_listen_unix_sock(sock);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Size is set to 1 for the consumer_channel pipe */
+       ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
+       if (ret < 0) {
+               ERR("Poll set creation failed");
+               goto error;
+       }
+
+       ret = lttng_poll_add(&events, health_quit_pipe[0], LPOLLIN);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Add the application registration socket */
+       ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI);
+       if (ret < 0) {
+               goto error;
+       }
+
+       while (1) {
+               DBG("Health check ready");
+
+               /* Inifinite blocking call, waiting for transmission */
+restart:
+               ret = lttng_poll_wait(&events, -1);
+               if (ret < 0) {
+                       /*
+                        * Restart interrupted system call.
+                        */
+                       if (errno == EINTR) {
+                               goto restart;
+                       }
+                       goto error;
+               }
+
+               nb_fd = ret;
+
+               for (i = 0; i < nb_fd; i++) {
+                       /* Fetch once the poll data */
+                       revents = LTTNG_POLL_GETEV(&events, i);
+                       pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       /* Thread quit pipe has been closed. Killing thread. */
+                       ret = check_health_quit_pipe(pollfd, revents);
+                       if (ret) {
+                               err = 0;
+                               goto exit;
+                       }
+
+                       /* Event on the registration socket */
+                       if (pollfd == sock) {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Health socket poll error");
+                                       goto error;
+                               }
+                       }
+               }
+
+               new_sock = lttcomm_accept_unix_sock(sock);
+               if (new_sock < 0) {
+                       goto error;
+               }
+
+               /*
+                * Set the CLOEXEC flag. Return code is useless because either way, the
+                * show must go on.
+                */
+               (void) utils_set_fd_cloexec(new_sock);
+
+               DBG("Receiving data from client for health...");
+               ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg));
+               if (ret <= 0) {
+                       DBG("Nothing recv() from client... continuing");
+                       ret = close(new_sock);
+                       if (ret) {
+                               PERROR("close");
+                       }
+                       new_sock = -1;
+                       continue;
+               }
+
+               rcu_thread_online();
+
+               assert(msg.cmd == HEALTH_CMD_CHECK);
+
+               reply.ret_code = 0;
+               for (i = 0; i < NR_HEALTH_RELAYD_TYPES; i++) {
+                       /*
+                        * health_check_state return 0 if thread is in
+                        * error.
+                        */
+                       if (!health_check_state(health_relayd, i)) {
+                               reply.ret_code |= 1ULL << i;
+                       }
+               }
+
+               DBG2("Health check return value %" PRIx64, reply.ret_code);
+
+               ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply));
+               if (ret < 0) {
+                       ERR("Failed to send health data back to client");
+               }
+
+               /* End of transmission */
+               ret = close(new_sock);
+               if (ret) {
+                       PERROR("close");
+               }
+               new_sock = -1;
+       }
+
+exit:
+error:
+       if (err) {
+               ERR("Health error occurred in %s", __func__);
+       }
+       DBG("Health check thread dying");
+       unlink(health_unix_sock_path);
+       if (sock >= 0) {
+               ret = close(sock);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+
+       lttng_poll_clean(&events);
+
+       rcu_unregister_thread();
+       return NULL;
+}
index 002ce4b4ddc1802c91447eb261af01af4c6c6008..6bfb73b5a8023fd276e87cf3cb7b23e860ed52d6 100644 (file)
@@ -34,4 +34,8 @@ enum health_type_relayd {
 
 extern struct health_app *health_relayd;
 
+extern int health_quit_pipe[2];
+
+void *thread_manage_health(void *data);
+
 #endif /* HEALTH_RELAYD_H */
index 358615f9ad27476ea8ee0f4bba7cc431c379a293..f264c18679d8fec1dbdc97cad9ae879ea8ffa338 100644 (file)
@@ -179,6 +179,8 @@ extern struct lttng_ht *relay_streams_ht;
 extern struct lttng_ht *viewer_streams_ht;
 extern struct lttng_ht *indexes_ht;
 
+extern const char *tracing_group_name;
+
 struct relay_stream *relay_stream_find_by_id(uint64_t stream_id);
 
 #endif /* LTTNG_RELAYD_H */
index 6727a547de65b2a94706a76b5f2089b5b33956a5..81aa642b1d05a423d603e5e5a39b59afe99bb43c 100644 (file)
@@ -68,6 +68,8 @@ static struct lttng_uri *live_uri;
 
 const char *progname;
 
+const char *tracing_group_name = DEFAULT_TRACING_GROUP;
+
 /*
  * Quit pipe for all threads. This permits a single cancellation point
  * for all threads when receiving an event on the pipe.
@@ -86,6 +88,7 @@ static int dispatch_thread_exit;
 static pthread_t listener_thread;
 static pthread_t dispatcher_thread;
 static pthread_t worker_thread;
+static pthread_t health_thread;
 
 static uint64_t last_relay_stream_id;
 static uint64_t last_relay_session_id;
@@ -131,6 +134,7 @@ void usage(void)
        fprintf(stderr, "  -D, --data-port URL       Data port listening.\n");
        fprintf(stderr, "  -o, --output PATH         Output path for traces. Must use an absolute path.\n");
        fprintf(stderr, "  -v, --verbose             Verbose mode. Activate DBG() macro.\n");
+       fprintf(stderr, "  -g, --group NAME          Specify the tracing group name. (default: tracing)\n");
 }
 
 static
@@ -144,6 +148,7 @@ int parse_args(int argc, char **argv)
                { "control-port", 1, 0, 'C', },
                { "data-port", 1, 0, 'D', },
                { "daemonize", 0, 0, 'd', },
+               { "group", 1, 0, 'g', },
                { "help", 0, 0, 'h', },
                { "output", 1, 0, 'o', },
                { "verbose", 0, 0, 'v', },
@@ -152,7 +157,7 @@ int parse_args(int argc, char **argv)
 
        while (1) {
                int option_index = 0;
-               c = getopt_long(argc, argv, "dhv" "C:D:o:",
+               c = getopt_long(argc, argv, "dhv" "C:D:o:g:",
                                long_options, &option_index);
                if (c == -1) {
                        break;
@@ -188,6 +193,9 @@ int parse_args(int argc, char **argv)
                case 'd':
                        opt_daemon = 1;
                        break;
+               case 'g':
+                       tracing_group_name = optarg;
+                       break;
                case 'h':
                        usage();
                        exit(EXIT_FAILURE);
@@ -297,6 +305,18 @@ int notify_thread_pipe(int wpipe)
        return ret;
 }
 
+static void notify_health_quit_pipe(int *pipe)
+{
+       int ret;
+
+       do {
+               ret = write(pipe[1], "4", 1);
+       } while (ret < 0 && errno == EINTR);
+       if (ret < 0 || ret != 1) {
+               PERROR("write relay health quit");
+       }
+}
+
 /*
  * Stop all threads by closing the thread quit pipe.
  */
@@ -312,6 +332,8 @@ void stop_threads(void)
                ERR("write error on thread quit pipe");
        }
 
+       notify_health_quit_pipe(health_quit_pipe);
+
        /* Dispatch thread */
        CMM_STORE_SHARED(dispatch_thread_exit, 1);
        futex_nto1_wake(&relay_cmd_queue.futex);
@@ -2569,6 +2591,19 @@ int main(int argc, char **argv)
                goto exit_health_app_create;
        }
 
+       ret = utils_create_pipe(health_quit_pipe);
+       if (ret < 0) {
+               goto error_health_pipe;
+       }
+
+       /* Create thread to manage the client socket */
+       ret = pthread_create(&health_thread, NULL,
+                       thread_manage_health, (void *) NULL);
+       if (ret != 0) {
+               PERROR("pthread_create health");
+               goto health_error;
+       }
+
        /* Setup the dispatcher thread */
        ret = pthread_create(&dispatcher_thread, NULL,
                        relay_thread_dispatcher, (void *) NULL);
@@ -2623,6 +2658,16 @@ exit_worker:
        }
 
 exit_dispatcher:
+       ret = pthread_join(health_thread, &status);
+       if (ret != 0) {
+               PERROR("pthread_join health thread");
+               goto error;     /* join error, exit without cleanup */
+       }
+
+health_error:
+       utils_close_pipe(health_quit_pipe);
+
+error_health_pipe:
        health_app_destroy(health_relayd);
 
 exit_health_app_create:
index 40b814a16bd1dc370623f79ae041c5e4412cd049..0c9f134a91161369b1ffb21f9111edfc12fd0871 100644 (file)
 #define DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH    DEFAULT_USTCONSUMERD32_PATH "/command"
 #define DEFAULT_USTCONSUMERD32_ERR_SOCK_PATH    DEFAULT_USTCONSUMERD32_PATH "/error"
 
+/* Relayd path */
+#define DEFAULT_RELAYD_RUNDIR                  "%s"
+#define DEFAULT_RELAYD_PATH                    DEFAULT_RELAYD_RUNDIR "/relayd"
+
 /* Default lttng run directory */
 #define DEFAULT_LTTNG_HOME_ENV_VAR              "LTTNG_HOME"
 #define DEFAULT_LTTNG_FALLBACK_HOME_ENV_VAR    "HOME"
 #define DEFAULT_GLOBAL_KCONSUMER_HEALTH_UNIX_SOCK      DEFAULT_LTTNG_RUNDIR "/kconsumerd/health"
 #define DEFAULT_HOME_KCONSUMER_HEALTH_UNIX_SOCK                DEFAULT_LTTNG_HOME_RUNDIR "/kconsumerd/health"
 
+/* Default relay health unix socket path */
+#define DEFAULT_GLOBAL_RELAY_HEALTH_UNIX_SOCK          DEFAULT_LTTNG_RUNDIR "/relayd/health-%d"
+#define DEFAULT_HOME_RELAY_HEALTH_UNIX_SOCK            DEFAULT_LTTNG_HOME_RUNDIR "/relayd/health-%d"
+
 #define DEFAULT_GLOBAL_APPS_UNIX_SOCK \
        DEFAULT_LTTNG_RUNDIR "/" LTTNG_UST_SOCK_FILENAME
 #define DEFAULT_HOME_APPS_UNIX_SOCK \
This page took 0.033174 seconds and 4 git commands to generate.