From: Julien Desfossez Date: Tue, 10 Jul 2012 21:40:27 +0000 (-0400) Subject: Initial import of the new binary lttng-relayd X-Git-Tag: v2.1.0-rc1~98 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=b8aa16822f579a6e15b41d2761801a0a65d5f2a5;p=lttng-tools.git Initial import of the new binary lttng-relayd The lttng-relayd listens on the network and receives traces streamed by the consumer. At this first commit, the relayd supports IPv4 and IPv6 over TCP. The following commits will add the support to use it with the lttng command line interface, control library, consumer and session daemon. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- diff --git a/.gitignore b/.gitignore index 8a0d08796..411aaa0f6 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ config/ src/bin/lttng-sessiond/lttng-sessiond src/bin/lttng/lttng src/bin/lttng-consumerd/lttng-consumerd +src/bin/lttng-relayd/lttng-relayd # Tests test_sessions diff --git a/AUTHORS b/AUTHORS index 29de56f75..531f1e306 100644 --- a/AUTHORS +++ b/AUTHORS @@ -5,6 +5,7 @@ Authors: Julien Desfossez -) Kernel Consumer and Control. + -) Relayd daemon. Mathieu Desnoyers -) Helped designed the lttng-tools infrastructure and doing a lot of code diff --git a/README b/README index aabdf5a62..45c44187f 100644 --- a/README +++ b/README @@ -92,6 +92,9 @@ PACKAGE CONTENTS: - libhashtable (internal) Library wrapper over URCU hashtables. + - lttng-relayd + The relay daemon used for network streaming + - lttng-consumerd The consumer daemon which uses libconsumer. diff --git a/configure.ac b/configure.ac index d5d37094d..7f4912321 100644 --- a/configure.ac +++ b/configure.ac @@ -193,6 +193,7 @@ AC_CONFIG_FILES([ src/bin/Makefile src/bin/lttng-consumerd/Makefile src/bin/lttng-sessiond/Makefile + src/bin/lttng-relayd/Makefile src/bin/lttng/Makefile tests/Makefile tests/kernel/Makefile diff --git a/src/bin/Makefile.am b/src/bin/Makefile.am index 07b694918..ce5057dd7 100644 --- a/src/bin/Makefile.am +++ b/src/bin/Makefile.am @@ -4,5 +4,6 @@ SUBDIRS = lttng-consumerd if ! BUILD_CONSUMERD_ONLY SUBDIRS += lttng \ - lttng-sessiond + lttng-sessiond \ + lttng-relayd endif diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am new file mode 100644 index 000000000..cb625e5c1 --- /dev/null +++ b/src/bin/lttng-relayd/Makefile.am @@ -0,0 +1,16 @@ +AM_CPPFLAGS = -DINSTALL_BIN_PATH=\""$(lttnglibexecdir)"\" \ + -DINSTALL_LIB_PATH=\""$(libdir)"\" + +AM_CFLAGS = -fno-strict-aliasing + +bin_PROGRAMS = lttng-relayd + +lttng_relayd_SOURCES = main.c lttng-relayd.h + +# link on liblttngctl for check if relayd is already alive. +lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \ + $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la \ + $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ + $(top_builddir)/src/common/hashtable/libhashtable.la \ + $(top_builddir)/src/common/libcommon.la \ + $(top_builddir)/src/common/compat/libcompat.la diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h new file mode 100644 index 000000000..dfbc63f39 --- /dev/null +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2012 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _LTT_RELAYD_H +#define _LTT_RELAYD_H + +#define _LGPL_SOURCE +#include +#include + +/* + * Queue used to enqueue relay requests + */ +struct relay_cmd_queue { + int32_t futex; + struct cds_wfq_queue queue; +}; + +enum connection_type { + RELAY_DATA, + RELAY_CONTROL, +}; + +/* + * Represents a session for the relay point of view + */ +struct relay_session { + uint64_t id; + struct lttcomm_sock *sock; + unsigned int version_check_done:1; +}; + +/* + * Represents a stream in the relay + */ +struct relay_stream { + uint64_t stream_handle; + struct lttng_ht_node_ulong stream_n; + int fd; + uint64_t seq; + struct relay_session *session; +}; + +/* + * Internal structure to map a socket with the corresponding session. + * A hashtable indexed on the socket FD is used for the lookups. + */ +struct relay_command { + struct lttcomm_sock *sock; + struct cds_wfq_node node; + struct lttng_ht_node_ulong sock_n; + enum connection_type type; + struct relay_session *session; +}; + +#endif diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c new file mode 100644 index 000000000..338341dda --- /dev/null +++ b/src/bin/lttng-relayd/main.c @@ -0,0 +1,1571 @@ +/* + * Copyright (C) 2012 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "lttng-relayd.h" + +/* command line options */ +static int opt_daemon; +static char *opt_output_path; +static struct lttng_uri *control_uri = NULL; +static struct lttng_uri *data_uri = NULL; + +const char *progname; +static int is_root; /* Set to 1 if the daemon is running as root */ + +/* + * Quit pipe for all threads. This permits a single cancellation point + * for all threads when receiving an event on the pipe. + */ +static int thread_quit_pipe[2] = { -1, -1 }; + +/* + * This pipe is used to inform the worker thread that a command is queued and + * ready to be processed. + */ +static int relay_cmd_pipe[2] = { -1, -1 }; + +static int dispatch_thread_exit; + +static pthread_t listener_thread; +static pthread_t dispatcher_thread; +static pthread_t worker_thread; + +static uint64_t last_relay_stream_id = 0; +static uint64_t last_relay_session_id = 0; + +/* + * Relay command queue. + * + * The relay_thread_listener and relay_thread_dispatcher communicate with this + * queue. + */ +static struct relay_cmd_queue relay_cmd_queue; + +/* buffer allocated at startup, used to store the trace data */ +static char *data_buffer = NULL; +static unsigned int data_buffer_size = 0; + +/* + * usage function on stderr + */ +static +void usage(void) +{ + fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); + fprintf(stderr, " -h, --help Display this usage.\n"); + fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); + fprintf(stderr, " -C, --control-port Control port listening (URI)\n"); + fprintf(stderr, " -D, --data-port Data port listening (URI)\n"); + fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); +} + +static +int parse_args(int argc, char **argv) +{ + int c; + int ret = 0; + char *default_address; + + static struct option long_options[] = { + { "control-port", 1, 0, 'C' }, + { "data-port", 1, 0, 'D' }, + { "daemonize", 0, 0, 'd' }, + { "output", 1, 0, 'o' }, + { "verbose", 0, 0, 'v' }, + { NULL, 0, 0, 0 } + }; + + while (1) { + int option_index = 0; + c = getopt_long(argc, argv, "dhv" "C:D:o:", + long_options, &option_index); + if (c == -1) { + break; + } + + switch (c) { + case 0: + fprintf(stderr, "option %s", long_options[option_index].name); + if (optarg) { + fprintf(stderr, " with arg %s\n", optarg); + } + break; + case 'C': + ret = uri_parse(optarg, &control_uri); + if (ret < 0) { + ERR("Invalid control URI specified"); + goto exit; + } + if (control_uri->port == 0) { + control_uri->port = DEFAULT_NETWORK_CONTROL_PORT; + } + break; + case 'D': + ret = uri_parse(optarg, &data_uri); + if (ret < 0) { + ERR("Invalid data URI specified"); + goto exit; + } + if (data_uri->port == 0) { + data_uri->port = DEFAULT_NETWORK_DATA_PORT; + } + break; + case 'd': + opt_daemon = 1; + break; + case 'h': + usage(); + exit(EXIT_FAILURE); + case 'o': + ret = asprintf(&opt_output_path, "%s", optarg); + if (ret < 0) { + PERROR("asprintf opt_output_path"); + goto exit; + } + break; + case 'v': + /* Verbose level can increase using multiple -v */ + lttng_opt_verbose += 1; + break; + default: + /* Unknown option or other error. + * Error is printed by getopt, just return */ + ret = -1; + goto exit; + } + } + + /* assign default values */ + if (control_uri == NULL) { + ret = asprintf(&default_address, "tcp://0.0.0.0:%d", + DEFAULT_NETWORK_CONTROL_PORT); + if (ret < 0) { + PERROR("asprintf default data address"); + goto exit; + } + + ret = uri_parse(default_address, &control_uri); + free(default_address); + if (ret < 0) { + ERR("Invalid control URI specified"); + goto exit; + } + } + if (data_uri == NULL) { + ret = asprintf(&default_address, "tcp://0.0.0.0:%d", + DEFAULT_NETWORK_DATA_PORT); + if (ret < 0) { + PERROR("asprintf default data address"); + goto exit; + } + + ret = uri_parse(default_address, &data_uri); + free(default_address); + if (ret < 0) { + ERR("Invalid data URI specified"); + goto exit; + } + } + +exit: + return ret; +} + +/* + * Cleanup the daemon + */ +static +void cleanup(void) +{ + int i, ret; + + DBG("Cleaning up"); + + for (i = 0; i < 2; i++) { + if (thread_quit_pipe[i] >= 0) { + ret = close(thread_quit_pipe[i]); + if (ret) { + PERROR("close"); + } + } + } +} + +/* + * Write to writable pipe used to notify a thread. + */ +static +int notify_thread_pipe(int wpipe) +{ + int ret; + + ret = write(wpipe, "!", 1); + if (ret < 0) { + PERROR("write poll pipe"); + } + + return ret; +} + +/* + * Stop all threads by closing the thread quit pipe. + */ +static +void stop_threads(void) +{ + int ret; + + /* Stopping all threads */ + DBG("Terminating all threads"); + ret = notify_thread_pipe(thread_quit_pipe[1]); + if (ret < 0) { + ERR("write error on thread quit pipe"); + } + + /* Dispatch thread */ + dispatch_thread_exit = 1; + futex_nto1_wake(&relay_cmd_queue.futex); +} + +/* + * Signal handler for the daemon + * + * Simply stop all worker threads, leaving main() return gracefully after + * joining all threads and calling cleanup(). + */ +static +void sighandler(int sig) +{ + switch (sig) { + case SIGPIPE: + DBG("SIGPIPE caught"); + return; + case SIGINT: + DBG("SIGINT caught"); + stop_threads(); + break; + case SIGTERM: + DBG("SIGTERM caught"); + stop_threads(); + break; + default: + break; + } +} + +/* + * Setup signal handler for : + * SIGINT, SIGTERM, SIGPIPE + */ +static +int set_signal_handler(void) +{ + int ret = 0; + struct sigaction sa; + sigset_t sigset; + + if ((ret = sigemptyset(&sigset)) < 0) { + PERROR("sigemptyset"); + return ret; + } + + sa.sa_handler = sighandler; + sa.sa_mask = sigset; + sa.sa_flags = 0; + if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) { + PERROR("sigaction"); + return ret; + } + + if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) { + PERROR("sigaction"); + return ret; + } + + if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) { + PERROR("sigaction"); + return ret; + } + + DBG("Signal handler set for SIGTERM, SIGPIPE and SIGINT"); + + return ret; +} + +/* + * Init thread quit pipe. + * + * Return -1 on error or 0 if all pipes are created. + */ +static +int init_thread_quit_pipe(void) +{ + int ret, i; + + ret = pipe(thread_quit_pipe); + if (ret < 0) { + PERROR("thread quit pipe"); + goto error; + } + + for (i = 0; i < 2; i++) { + ret = fcntl(thread_quit_pipe[i], F_SETFD, FD_CLOEXEC); + if (ret < 0) { + PERROR("fcntl"); + goto error; + } + } + +error: + return ret; +} + +/* + * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. + */ +static +int create_thread_poll_set(struct lttng_poll_event *events, int size) +{ + int ret; + + if (events == NULL || size == 0) { + ret = -1; + goto error; + } + + ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); + if (ret < 0) { + goto error; + } + + /* Add quit pipe */ + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + return 0; + +error: + return ret; +} + +/* + * Check if the thread quit pipe was triggered. + * + * Return 1 if it was triggered else 0; + */ +static +int check_thread_quit_pipe(int fd, uint32_t events) +{ + if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { + return 1; + } + + return 0; +} + +/* + * Create and init socket from uri. + */ +static +struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri) +{ + int ret; + struct lttcomm_sock *sock = NULL; + + sock = lttcomm_alloc_sock_from_uri(uri); + if (sock == NULL) { + ERR("Allocating socket"); + goto error; + } + + ret = lttcomm_create_sock(sock); + if (ret < 0) { + goto error; + } + DBG("Listening on sock %d", sock->fd); + + ret = sock->ops->bind(sock); + if (ret < 0) { + goto error; + } + + ret = sock->ops->listen(sock, -1); + if (ret < 0) { + goto error; + + } + + return sock; + +error: + if (sock) { + lttcomm_destroy_sock(sock); + } + return NULL; +} + +/* + * This thread manages the listening for new connections on the network + */ +static +void *relay_thread_listener(void *data) +{ + int i, ret, pollfd; + int val = 1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct lttcomm_sock *control_sock, *data_sock; + + /* + * Get allocated in this thread, enqueued to a global queue, dequeued and + * freed in the worker thread. + */ + struct relay_command *relay_cmd = NULL; + + DBG("[thread] Relay listener started"); + + control_sock = relay_init_sock(control_uri); + if (!control_sock) { + goto error_sock; + } + + data_sock = relay_init_sock(data_uri); + if (!data_sock) { + goto error_sock; + } + + /* + * Pass 3 as size here for the thread quit pipe, control and data socket. + */ + ret = create_thread_poll_set(&events, 3); + if (ret < 0) { + goto error_create_poll; + } + + /* Add the control socket */ + ret = lttng_poll_add(&events, control_sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error_poll_add; + } + + /* Add the data socket */ + ret = lttng_poll_add(&events, data_sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error_poll_add; + } + + while (1) { + DBG("Listener accepting connections"); + + nb_fd = LTTNG_POLL_GETNB(&events); + +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + DBG("Relay new connection received"); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else if (revents & LPOLLIN) { + struct lttcomm_sock *newsock = NULL; + + relay_cmd = zmalloc(sizeof(struct relay_command)); + if (relay_cmd == NULL) { + PERROR("relay command zmalloc"); + goto error; + } + + if (pollfd == data_sock->fd) { + newsock = data_sock->ops->accept(data_sock); + if (newsock < 0) { + PERROR("accepting data sock"); + goto error; + } + relay_cmd->type = RELAY_DATA; + DBG("Relay data connection accepted, socket %d", newsock->fd); + } else if (pollfd == control_sock->fd) { + newsock = control_sock->ops->accept(control_sock); + if (newsock < 0) { + PERROR("accepting control sock"); + goto error; + } + relay_cmd->type = RELAY_CONTROL; + DBG("Relay control connection accepted, socket %d", newsock->fd); + } + ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, + &val, sizeof(int)); + if (ret < 0) { + PERROR("setsockopt inet"); + goto error; + } + relay_cmd->sock = newsock; + /* + * Lock free enqueue the request. + */ + cds_wfq_enqueue(&relay_cmd_queue.queue, &relay_cmd->node); + + /* + * Wake the dispatch queue futex. Implicit memory + * barrier with the exchange in cds_wfq_enqueue. + */ + futex_nto1_wake(&relay_cmd_queue.futex); + } + } + } + +error: +error_poll_add: + lttng_poll_clean(&events); +error_create_poll: + if (control_sock->fd >= 0) { + ret = control_sock->ops->close(control_sock); + if (ret) { + PERROR("close"); + } + lttcomm_destroy_sock(control_sock); + } + if (data_sock->fd >= 0) { + ret = data_sock->ops->close(data_sock); + if (ret) { + PERROR("close"); + } + lttcomm_destroy_sock(data_sock); + } + + DBG("Relay listener thread cleanup complete"); + stop_threads(); +error_sock: + return NULL; +} + +/* + * This thread manages the dispatching of the requests to worker threads + */ +static +void *relay_thread_dispatcher(void *data) +{ + int ret; + struct cds_wfq_node *node; + struct relay_command *relay_cmd = NULL; + + DBG("[thread] Relay dispatcher started"); + + while (!dispatch_thread_exit) { + /* Atomically prepare the queue futex */ + futex_nto1_prepare(&relay_cmd_queue.futex); + + do { + /* Dequeue commands */ + node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue); + if (node == NULL) { + DBG("Woken up but nothing in the relay command queue"); + /* Continue thread execution */ + break; + } + + relay_cmd = caa_container_of(node, struct relay_command, node); + DBG("Dispatching request waiting on sock %d", relay_cmd->sock->fd); + + /* + * Inform worker thread of the new request. This + * call is blocking so we can be assured that the data will be read + * at some point in time or wait to the end of the world :) + */ + ret = write(relay_cmd_pipe[1], relay_cmd, + sizeof(struct relay_command)); + free(relay_cmd); + if (ret < 0) { + PERROR("write cmd pipe"); + goto error; + } + } while (node != NULL); + + /* Futex wait on queue. Blocking call on futex() */ + futex_nto1_wait(&relay_cmd_queue.futex); + } + +error: + DBG("Dispatch thread dying"); + stop_threads(); + return NULL; +} + +/* + * Return the realpath(3) of the path even if the last directory token does not + * exist. For example, with /tmp/test1/test2, if test2/ does not exist but the + * /tmp/test1 does, the real path is returned. In normal time, realpath(3) + * fails if the end point directory does not exist. + */ +static +char *expand_full_path(const char *path) +{ + const char *end_path = path; + char *next, *cut_path, *expanded_path; + + /* Find last token delimited by '/' */ + while ((next = strpbrk(end_path + 1, "/"))) { + end_path = next; + } + + /* Cut last token from original path */ + cut_path = strndup(path, end_path - path); + + expanded_path = malloc(PATH_MAX); + if (expanded_path == NULL) { + goto error; + } + + expanded_path = realpath((char *)cut_path, expanded_path); + if (expanded_path == NULL) { + switch (errno) { + case ENOENT: + ERR("%s: No such file or directory", cut_path); + break; + default: + PERROR("realpath"); + break; + } + goto error; + } + + /* Add end part to expanded path */ + strcat(expanded_path, end_path); + + free(cut_path); + return expanded_path; + +error: + free(cut_path); + return NULL; +} + + +/* + * config_get_default_path + * + * Returns the HOME directory path. Caller MUST NOT free(3) the return pointer. + */ +static +char *config_get_default_path(void) +{ + return getenv("HOME"); +} + +/* + * Create recursively directory using the FULL path. + */ +static +int mkdir_recursive(char *path, mode_t mode) +{ + char *p, tmp[PATH_MAX]; + struct stat statbuf; + size_t len; + int ret; + + ret = snprintf(tmp, sizeof(tmp), "%s", path); + if (ret < 0) { + PERROR("snprintf mkdir"); + goto error; + } + + len = ret; + if (tmp[len - 1] == '/') { + tmp[len - 1] = 0; + } + + for (p = tmp + 1; *p; p++) { + if (*p == '/') { + *p = 0; + if (tmp[strlen(tmp) - 1] == '.' && + tmp[strlen(tmp) - 2] == '.' && + tmp[strlen(tmp) - 3] == '/') { + ERR("Using '/../' is not permitted in the trace path (%s)", + tmp); + ret = -1; + goto error; + } + ret = stat(tmp, &statbuf); + if (ret < 0) { + ret = mkdir(tmp, mode); + if (ret < 0) { + if (!(errno == EEXIST)) { + PERROR("mkdir recursive"); + ret = -errno; + goto error; + } + } + } + *p = '/'; + } + } + + ret = mkdir(tmp, mode); + if (ret < 0) { + if (!(errno == EEXIST)) { + PERROR("mkdir recursive last piece"); + ret = -errno; + } else { + ret = 0; + } + } + +error: + return ret; +} + +/* + * create_output_path: create the output trace directory + */ +static +char *create_output_path(char *path_name) +{ + int ret = 0; + char *alloc_path = NULL; + char *traces_path = NULL; + char *full_path = NULL; + + /* Auto output path */ + if (opt_output_path == NULL) { + alloc_path = strdup(config_get_default_path()); + if (alloc_path == NULL) { + ERR("Home path not found.\n \ + Please specify an output path using -o, --output PATH"); + ret = -1; + goto exit; + } + + ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME + "/%s", alloc_path, path_name); + if (ret < 0) { + PERROR("asprintf trace dir name"); + goto exit; + } + } else { + full_path = expand_full_path(opt_output_path); + ret = asprintf(&traces_path, "%s/%s", full_path, path_name); + if (ret < 0) { + PERROR("asprintf trace dir name"); + goto exit; + } + } + free(alloc_path); + free(full_path); + +exit: + return traces_path; +} + +/* + * relay_delete_session: Free all memory associated with a session and + * close all the FDs + */ +static +void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct relay_stream *stream; + int ret; + + if (!cmd->session) + return; + + DBG("Relay deleting session %lu", cmd->session->id); + free(cmd->session->sock); + + cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) { + node = lttng_ht_iter_get_node_ulong(&iter); + if (node) { + stream = caa_container_of(node, + struct relay_stream, stream_n); + if (stream->session == cmd->session) { + close(stream->fd); + ret = lttng_ht_del(streams_ht, &iter); + assert(!ret); + free(stream); + } + } + } +} + +/* + * relay_add_stream: allocate a new stream for a session + */ +static +int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + struct relay_session *session = cmd->session; + struct lttcomm_relayd_add_stream stream_info; + struct relay_stream *stream = NULL; + struct lttcomm_relayd_status_stream reply; + char *path = NULL, *root_path = NULL; + int ret, send_ret; + + if (!session || session->version_check_done == 0) { + ERR("Trying to add a stream before version check"); + ret = -1; + goto end_no_session; + } + + /* FIXME : use data_size for something ? */ + ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, + sizeof(struct lttcomm_relayd_add_stream), MSG_WAITALL); + if (ret < sizeof(struct lttcomm_relayd_add_stream)) { + ERR("Relay didn't receive valid add_stream struct size : %d", ret); + ret = -1; + goto end_no_session; + } + stream = zmalloc(sizeof(struct relay_stream)); + if (stream == NULL) { + PERROR("relay stream zmalloc"); + ret = -1; + goto end_no_session; + } + + stream->stream_handle = ++last_relay_stream_id; + stream->seq = 0; + stream->session = session; + + root_path = create_output_path(stream_info.pathname); + if (!root_path) { + ret = -1; + goto end; + } + ret = mkdir_recursive(root_path, S_IRWXU | S_IRWXG); + if (ret < 0) { + free(root_path); + ERR("relay creating output directory"); + goto end; + } + + ret = asprintf(&path, "%s/%s", root_path, stream_info.channel_name); + if (ret < 0) { + PERROR("asprintf stream path"); + goto end; + } + + ret = open(path, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + PERROR("Relay creating trace file"); + goto end; + } + + stream->fd = ret; + DBG("Tracefile %s created", path); + + lttng_ht_node_init_ulong(&stream->stream_n, + (unsigned long) stream->stream_handle); + lttng_ht_add_unique_ulong(streams_ht, + &stream->stream_n); + + DBG("Relay new stream added %s", stream_info.channel_name); + +end: + free(path); + free(root_path); + /* send the session id to the client or a negative return code on error */ + if (ret < 0) { + reply.ret_code = htobe32(LTTCOMM_ERR); + } else { + reply.ret_code = htobe32(LTTCOMM_OK); + } + reply.handle = htobe64(stream->stream_handle); + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_status_stream), 0); + if (send_ret < 0) { + ERR("Relay sending stream id"); + } + +end_no_session: + return ret; +} + +/* + * relay_unknown_command: send -1 if received unknown command + */ +static +void relay_unknown_command(struct relay_command *cmd) +{ + struct lttcomm_relayd_generic_reply reply; + int ret; + + reply.ret_code = htobe32(LTTCOMM_ERR); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (ret < 0) { + ERR("Relay sending unknown command"); + } +} + +/* + * relay_start: send an acknowledgment to the client to tell if we are + * ready to receive data. We are ready if a session is established. + */ +static +int relay_start(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd) +{ + int ret = htobe32(LTTCOMM_OK); + struct lttcomm_relayd_generic_reply reply; + struct relay_session *session = cmd->session; + + if (!session) { + DBG("Trying to start the streaming without a session established"); + ret = htobe32(LTTCOMM_ERR); + } + + reply.ret_code = ret; + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (ret < 0) { + ERR("Relay sending start ack"); + } + + return ret; +} + +/* + * Get stream from stream id. + */ +static +struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id, + struct lttng_ht *streams_ht) +{ + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct relay_stream *ret; + + lttng_ht_lookup(streams_ht, + (void *)((unsigned long) stream_id), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay stream %lu not found", stream_id); + ret = NULL; + goto end; + } + + ret = caa_container_of(node, struct relay_stream, stream_n); + +end: + return ret; +} + +/* + * relay_recv_metadata: receive the metada for the session. + */ +static +int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + int ret = htobe32(LTTCOMM_OK); + struct relay_session *session = cmd->session; + struct lttcomm_relayd_metadata_payload *metadata_struct; + struct relay_stream *metadata_stream; + uint64_t data_size, payload_size; + + if (!session) { + ERR("Metadata sent before version check"); + ret = -1; + goto end; + } + + data_size = be64toh(recv_hdr->data_size); + payload_size = data_size - sizeof(uint64_t); + if (data_buffer_size < data_size) { + data_buffer = realloc(data_buffer, data_size); + if (!data_buffer) { + ERR("Allocating data buffer"); + ret = -1; + goto end; + } + data_buffer_size = data_size; + } + memset(data_buffer, 0, data_size); + DBG2("Relay receiving metadata, waiting for %lu bytes", data_size); + ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL); + if (ret < 0 || ret != data_size) { + ret = -1; + ERR("Relay didn't receive the whole metadata"); + goto end; + } + metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; + metadata_stream = relay_stream_from_stream_id( + be64toh(metadata_struct->stream_id), streams_ht); + if (!metadata_stream) { + ret = -1; + goto end; + } + + ret = write(metadata_stream->fd, metadata_struct->payload, + payload_size); + if (ret < (payload_size)) { + ERR("Relay error writing metadata on file"); + ret = -1; + goto end; + } + DBG2("Relay metadata written"); + +end: + return ret; +} + +/* + * relay_send_version: send relayd version number + */ +static +int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd) +{ + int ret = htobe32(LTTCOMM_OK); + struct lttcomm_relayd_version reply; + struct relay_session *session = NULL; + + if (cmd->session == NULL) { + session = zmalloc(sizeof(struct relay_session)); + if (session == NULL) { + PERROR("relay session zmalloc"); + ret = -1; + goto end; + } + session->id = ++last_relay_session_id; + DBG("Created session %lu", session->id); + cmd->session = session; + } + session->version_check_done = 1; + + sscanf(VERSION, "%u.%u", &reply.major, &reply.minor); + reply.major = htobe32(reply.major); + reply.minor = htobe32(reply.minor); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_version), 0); + if (ret < 0) { + ERR("Relay sending version"); + } + DBG("Version check done"); + +end: + return ret; +} + +/* + * relay_process_control: Process the commands received on the control socket + */ +static +int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + int ret = 0; + + switch (be32toh(recv_hdr->cmd)) { + /* + case RELAYD_CREATE_SESSION: + ret = relay_create_session(recv_hdr, cmd); + break; + */ + case RELAYD_ADD_STREAM: + ret = relay_add_stream(recv_hdr, cmd, streams_ht); + break; + case RELAYD_START_DATA: + ret = relay_start(recv_hdr, cmd); + break; + case RELAYD_SEND_METADATA: + ret = relay_recv_metadata(recv_hdr, cmd, streams_ht); + break; + case RELAYD_VERSION: + ret = relay_send_version(recv_hdr, cmd); + break; + case RELAYD_UPDATE_SYNC_INFO: + default: + ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); + relay_unknown_command(cmd); + ret = -1; + goto end; + } + +end: + return ret; +} + +/* + * relay_process_data: Process the data received on the data socket + */ +static +int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + int ret = 0; + struct relay_stream *stream; + struct lttcomm_relayd_data_hdr data_hdr; + uint64_t stream_id; + uint32_t data_size; + + ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr, + sizeof(struct lttcomm_relayd_data_hdr), MSG_WAITALL); + if (ret <= 0) { + ERR("Connections seems to be closed"); + ret = -1; + goto end; + } + + stream_id = be64toh(data_hdr.stream_id); + stream = relay_stream_from_stream_id(stream_id, streams_ht); + if (!stream) { + ret = -1; + goto end; + } + + data_size = be32toh(data_hdr.data_size); + if (data_buffer_size < data_size) { + data_buffer = realloc(data_buffer, data_size); + if (!data_buffer) { + ERR("Allocating data buffer"); + ret = -1; + goto end; + } + data_buffer_size = data_size; + } + memset(data_buffer, 0, data_size); + + ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL); + if (ret <= 0) { + ret = -1; + goto end; + } + + ret = write(stream->fd, data_buffer, data_size); + if (ret < data_size) { + ERR("Relay error writing data to file"); + ret = -1; + goto end; + } + DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle); + +end: + return ret; +} + +static +void relay_cleanup_connection(struct lttng_ht *relay_connections_ht, struct lttng_poll_event *events, + struct lttng_ht *streams_ht, int pollfd, struct lttng_ht_iter *iter) +{ + int ret; + + ret = lttng_ht_del(relay_connections_ht, iter); + assert(!ret); + lttng_poll_del(events, pollfd); + + ret = close(pollfd); + if (ret < 0) { + ERR("Closing pollfd %d", pollfd); + } +} + +static +int relay_add_connection(int fd, struct lttng_poll_event *events, + struct lttng_ht *relay_connections_ht) +{ + int ret; + struct relay_command *relay_connection; + + relay_connection = zmalloc(sizeof(struct relay_command)); + if (relay_connection == NULL) { + PERROR("Relay command zmalloc"); + ret = -1; + goto end; + } + ret = read(fd, relay_connection, sizeof(struct relay_command)); + if (ret < 0 || ret < sizeof(relay_connection)) { + PERROR("read relay cmd pipe"); + ret = -1; + goto end; + } + + lttng_ht_node_init_ulong(&relay_connection->sock_n, + (unsigned long) relay_connection->sock->fd); + lttng_ht_add_unique_ulong(relay_connections_ht, + &relay_connection->sock_n); + ret = lttng_poll_add(events, + relay_connection->sock->fd, + LPOLLIN | LPOLLRDHUP); + +end: + return ret; +} + +/* + * This thread does the actual work + */ +static +void *relay_thread_worker(void *data) +{ + int i, ret, pollfd; + uint32_t revents, nb_fd; + struct relay_command *relay_connection; + struct lttng_poll_event events; + struct lttng_ht *relay_connections_ht; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct lttng_ht *streams_ht; + struct lttcomm_relayd_hdr recv_hdr; + + DBG("[thread] Relay worker started"); + + /* table of connections indexed on socket */ + relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + /* tables of streams indexed by stream ID */ + streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error_poll_create; + } + + ret = lttng_poll_add(&events, relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + + while (1) { + /* Zeroed the events structure */ + lttng_poll_reset(&events); + + nb_fd = LTTNG_POLL_GETNB(&events); + + /* Infinite blocking call, waiting for transmission */ + restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + goto error; + } + + /* Inspect the relay cmd pipe for new connection */ + if (pollfd == relay_cmd_pipe[0]) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay pipe error"); + goto error; + } else if (revents & LPOLLIN) { + DBG("Relay command received"); + ret = relay_add_connection(relay_cmd_pipe[0], + &events, relay_connections_ht); + if (ret < 0) { + goto error; + } + } + } else if (revents > 0) { + lttng_ht_lookup(relay_connections_ht, + (void *)((unsigned long) pollfd), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG2("Relay sock %d not found", pollfd); + goto error; + } + relay_connection = caa_container_of(node, + struct relay_command, sock_n); + + if (revents & (LPOLLERR)) { + ERR("POLL ERROR"); + relay_cleanup_connection(relay_connections_ht, + &events, streams_ht, pollfd, &iter); + free(relay_connection); + } else if (revents & (LPOLLHUP | LPOLLRDHUP)) { + DBG("Socket %d hung up", pollfd); + relay_cleanup_connection(relay_connections_ht, + &events, streams_ht, pollfd, &iter); + if (relay_connection->type == RELAY_CONTROL) { + relay_delete_session(relay_connection, streams_ht); + } + free(relay_connection); + } else if (revents & LPOLLIN) { + /* control socket */ + if (relay_connection->type == RELAY_CONTROL) { + ret = relay_connection->sock->ops->recvmsg( + relay_connection->sock, &recv_hdr, + sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL); + /* connection closed */ + if (ret <= 0) { + relay_cleanup_connection(relay_connections_ht, + &events, streams_ht, pollfd, &iter); + relay_delete_session(relay_connection, streams_ht); + free(relay_connection); + DBG("Control connection closed with %d", pollfd); + } else { + if (relay_connection->session) { + DBG2("Relay worker receiving data for session : %lu", + relay_connection->session->id); + } + ret = relay_process_control(&recv_hdr, + relay_connection, + streams_ht); + /* + * there was an error in processing a control + * command: clear the session + * */ + if (ret < 0) { + relay_cleanup_connection(relay_connections_ht, + &events, streams_ht, pollfd, &iter); + free(relay_connection); + DBG("Connection closed with %d", pollfd); + } + } + /* data socket */ + } else if (relay_connection->type == RELAY_DATA) { + ret = relay_process_data(relay_connection, streams_ht); + /* connection closed */ + if (ret < 0) { + relay_cleanup_connection(relay_connections_ht, + &events, streams_ht, pollfd, &iter); + relay_delete_session(relay_connection, streams_ht); + DBG("Data connection closed with %d", pollfd); + } + } + } + } + } + } + +error: + lttng_poll_clean(&events); + + /* empty the hash table and free the memory */ + cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) { + node = lttng_ht_iter_get_node_ulong(&iter); + if (node) { + relay_connection = caa_container_of(node, + struct relay_command, sock_n); + free(relay_connection); + } + ret = lttng_ht_del(relay_connections_ht, &iter); + assert(!ret); + } +error_poll_create: + free(data_buffer); + lttng_ht_destroy(relay_connections_ht); + DBG("Worker thread cleanup complete"); + stop_threads(); + return NULL; +} + +/* + * Create the relay command pipe to wake thread_manage_apps. + * Closed in cleanup(). + */ +static int create_relay_cmd_pipe(void) +{ + int ret, i; + + ret = pipe(relay_cmd_pipe); + if (ret < 0) { + PERROR("relay cmd pipe"); + goto error; + } + + for (i = 0; i < 2; i++) { + ret = fcntl(relay_cmd_pipe[i], F_SETFD, FD_CLOEXEC); + if (ret < 0) { + PERROR("fcntl relay_cmd_pipe"); + goto error; + } + } + +error: + return ret; +} + +/* + * main + */ +int main(int argc, char **argv) +{ + int ret = 0; + void *status; + + /* Create thread quit pipe */ + if ((ret = init_thread_quit_pipe()) < 0) { + goto error; + } + + /* Parse arguments */ + progname = argv[0]; + if ((ret = parse_args(argc, argv) < 0)) { + goto error; + } + + if ((ret = set_signal_handler()) < 0) { + goto exit; + } + + /* Daemonize */ + if (opt_daemon) { + ret = daemon(0, 0); + if (ret < 0) { + PERROR("daemon"); + goto error; + } + } + + /* Check if daemon is UID = 0 */ + is_root = !getuid(); + + if (!is_root) { + if (control_uri->port < 1024 || data_uri->port < 1024) { + ERR("Need to be root to use ports < 1024"); + ret = -1; + goto error; + } + } + + /* Setup the thread apps communication pipe. */ + if ((ret = create_relay_cmd_pipe()) < 0) { + goto exit; + } + + /* Init relay command queue. */ + cds_wfq_init(&relay_cmd_queue.queue); + + /* Set up max poll set size */ + lttng_poll_set_max_size(); + + /* Setup the dispatcher thread */ + ret = pthread_create(&dispatcher_thread, NULL, + relay_thread_dispatcher, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create dispatcher"); + goto exit_dispatcher; + } + + /* Setup the worker thread */ + ret = pthread_create(&worker_thread, NULL, + relay_thread_worker, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create worker"); + goto exit_worker; + } + + /* Setup the listener thread */ + ret = pthread_create(&listener_thread, NULL, + relay_thread_listener, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create listener"); + goto exit_listener; + } + +exit_listener: + ret = pthread_join(listener_thread, &status); + if (ret != 0) { + PERROR("pthread_join"); + goto error; /* join error, exit without cleanup */ + } + +exit_worker: + ret = pthread_join(worker_thread, &status); + if (ret != 0) { + PERROR("pthread_join"); + goto error; /* join error, exit without cleanup */ + } + +exit_dispatcher: + ret = pthread_join(dispatcher_thread, &status); + if (ret != 0) { + PERROR("pthread_join"); + goto error; /* join error, exit without cleanup */ + } + +exit: + cleanup(); + if (!ret) { + exit(EXIT_SUCCESS); + } +error: + exit(EXIT_FAILURE); +} diff --git a/src/common/sessiond-comm/Makefile.am b/src/common/sessiond-comm/Makefile.am index 1c73dcb9e..2a70d54ce 100644 --- a/src/common/sessiond-comm/Makefile.am +++ b/src/common/sessiond-comm/Makefile.am @@ -2,5 +2,5 @@ noinst_LTLIBRARIES = libsessiond-comm.la libsessiond_comm_la_SOURCES = sessiond-comm.c sessiond-comm.h \ - unix.c unix.h inet.c inet.h inet6.c inet6.h - + unix.c unix.h inet.c inet.h inet6.c inet6.h \ + relayd.h diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h new file mode 100644 index 000000000..67b06371f --- /dev/null +++ b/src/common/sessiond-comm/relayd.h @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2012 - David Goulet + * Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _RELAYD_COMM +#define _RELAYD_COMM + +#define _GNU_SOURCE + +#include +#include + +#include + +/* + * lttng-relayd communication header. + */ +struct lttcomm_relayd_hdr { + /* Circuit ID not used for now so always ignored */ + uint64_t circuit_id; + uint64_t data_size; /* data size following this header */ + uint32_t cmd; /* enum lttcomm_sessiond_command */ + uint32_t cmd_version; /* command version */ +} __attribute__ ((__packed__)); + +/* + * lttng-relayd data header. + */ +struct lttcomm_relayd_data_hdr { + /* Circuit ID not used for now so always ignored */ + uint64_t circuit_id; + uint64_t stream_id; /* Stream ID known by the relayd */ + uint64_t net_seq_num; /* Network seq. number for UDP. */ + uint32_t data_size; /* data size following this header */ +} __attribute__ ((__packed__)); + +#if 0 +/* + * Used to create a session between the relay and the sessiond. + */ +struct lttcomm_relayd_create_session { + char hostname[LTTNG_MAX_DNNAME]; + char session_name[NAME_MAX]; +}; +#endif + +/* + * Used to add a stream on the relay daemon. + */ +struct lttcomm_relayd_add_stream { + char channel_name[LTTNG_SYMBOL_NAME_LEN]; + char pathname[PATH_MAX]; +} __attribute__ ((__packed__)); + +/* + * Answer from an add stream command. + */ +struct lttcomm_relayd_status_stream { + uint64_t handle; + uint32_t ret_code; +} __attribute__ ((__packed__)); + +/* + * Used to return command code for command not needing special data. + */ +struct lttcomm_relayd_generic_reply { + uint32_t ret_code; +} __attribute__ ((__packed__)); + +/* + * Used to update synchronization information. + */ +struct lttcomm_relayd_update_sync_info { + /* TODO: fill the structure */ +} __attribute__ ((__packed__)); + +/* + * Version command. + */ +struct lttcomm_relayd_version { + uint32_t major; + uint32_t minor; +} __attribute__ ((__packed__)); + +/* + * Metadata payload used when metadata command is sent. + */ +struct lttcomm_relayd_metadata_payload { + uint64_t stream_id; + char payload[]; +} __attribute__ ((__packed__)); + +#endif /* _RELAYD_COMM */