From 00e2e675d54dc726a7c8f8887c889cc8ef022003 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Wed, 11 Jul 2012 13:20:39 -0400 Subject: [PATCH] Network streaming support This is a huge commit integrating a lot of new feature for network streaming support with the lttng-relayd previously merged. Please note that this commit is the initial import of the network streaming feature meaning that this is NOT stable and SHOULD be considered experimental hence not suited for production deployment. API Changes: lttng_create_session_uri(), lttng_enable_consumer(), lttng_set_consumer_uri() and lttng_disable_consumer() calls are added. The lttng command line now supports these new calls by introducing two new commands and one modified. (Please look at the --help option for more details). $ lttng enable-consumer $ lttng disable-consumer The "lttng create" command can now take new arguments to specify the destination URI for network streaming. There is also a new flag which allows the user to disable the consumer for the session. At this point, the now old deprecated lttng_create_session() call is still supported but SHOULD be replaced by lttng_create_session_uri(). More calls will be added to the API in order to facilitate the creation of lttng URI. Network streaming: Both kernel and user space tracing are supported for network streaming. For now, only IPv4 and IPv6 protocol over TCP is supported. Two network socket are used where one is for control commands and the second one is for data transmission. The port are respectively 5342 and 5343. BUGS: Streaming high throughput traces (e.g lttng enable-event -a -k) with low bandwitdh available to the relayd, the tracer does not behave well when the metadata are not emptied enough quickly so data are missing on the remote target and the tracing is simply not stoppable. This is a known bug and future iterations will try to fix it. WARNING: At this stage, there is _NO_ security features meaning no remote authentication on the relayd, no transport layer encryption or network secure handshake of some sort. So, uses only for in-vitro experiment or on _trusted_ networks. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- configure.ac | 1 + include/lttng/lttng.h | 65 +- src/bin/lttng-sessiond/Makefile.am | 4 +- src/bin/lttng-sessiond/consumer.c | 333 +++++++ src/bin/lttng-sessiond/consumer.h | 83 ++ src/bin/lttng-sessiond/kernel-consumer.c | 320 +++++-- src/bin/lttng-sessiond/kernel-consumer.h | 25 +- src/bin/lttng-sessiond/kernel.c | 25 +- src/bin/lttng-sessiond/main.c | 900 ++++++++++++++++++- src/bin/lttng-sessiond/session.c | 24 +- src/bin/lttng-sessiond/session.h | 12 + src/bin/lttng-sessiond/trace-kernel.c | 59 +- src/bin/lttng-sessiond/trace-kernel.h | 18 +- src/bin/lttng-sessiond/trace-ust.c | 20 + src/bin/lttng-sessiond/trace-ust.h | 11 + src/bin/lttng-sessiond/ust-app.c | 23 +- src/bin/lttng-sessiond/ust-consumer.c | 228 +++-- src/bin/lttng-sessiond/ust-consumer.h | 9 +- src/bin/lttng/Makefile.am | 4 +- src/bin/lttng/command.h | 2 + src/bin/lttng/commands/create.c | 215 ++++- src/bin/lttng/commands/disable_consumer.c | 165 ++++ src/bin/lttng/commands/enable_consumer.c | 410 +++++++++ src/bin/lttng/lttng.c | 32 +- src/common/Makefile.am | 6 +- src/common/consumer.c | 310 ++++++- src/common/consumer.h | 49 +- src/common/defaults.h | 4 + src/common/kernel-consumer/Makefile.am | 4 +- src/common/kernel-consumer/kernel-consumer.c | 312 ++++++- src/common/kernel-ctl/kernel-ctl.c | 13 + src/common/kernel-ctl/kernel-ctl.h | 2 + src/common/kernel-ctl/kernel-ioctl.h | 5 + src/common/relayd/Makefile.am | 6 + src/common/relayd/relayd.c | 387 ++++++++ src/common/relayd/relayd.h | 41 + src/common/uri.c | 107 ++- src/common/ust-consumer/Makefile.am | 4 +- src/common/ust-consumer/ust-consumer.c | 172 +++- src/lib/lttng-ctl/lttng-ctl.c | 127 ++- tests/tools/Makefile.am | 12 +- tests/tools/test_kernel_data_trace.c | 5 +- tests/tools/test_sessions.c | 5 +- 43 files changed, 4097 insertions(+), 462 deletions(-) create mode 100644 src/bin/lttng-sessiond/consumer.c create mode 100644 src/bin/lttng/commands/disable_consumer.c create mode 100644 src/bin/lttng/commands/enable_consumer.c create mode 100644 src/common/relayd/Makefile.am create mode 100644 src/common/relayd/relayd.c create mode 100644 src/common/relayd/relayd.h diff --git a/configure.ac b/configure.ac index 7f4912321..4e7c2570a 100644 --- a/configure.ac +++ b/configure.ac @@ -188,6 +188,7 @@ AC_CONFIG_FILES([ src/common/hashtable/Makefile src/common/sessiond-comm/Makefile src/common/compat/Makefile + src/common/relayd/Makefile src/lib/Makefile src/lib/lttng-ctl/Makefile src/bin/Makefile diff --git a/include/lttng/lttng.h b/include/lttng/lttng.h index 73f291466..4741b6877 100644 --- a/include/lttng/lttng.h +++ b/include/lttng/lttng.h @@ -87,21 +87,21 @@ enum lttng_loglevel_type { * Available loglevels. */ enum lttng_loglevel { - LTTNG_LOGLEVEL_EMERG = 0, - LTTNG_LOGLEVEL_ALERT = 1, - LTTNG_LOGLEVEL_CRIT = 2, - LTTNG_LOGLEVEL_ERR = 3, - LTTNG_LOGLEVEL_WARNING = 4, - LTTNG_LOGLEVEL_NOTICE = 5, - LTTNG_LOGLEVEL_INFO = 6, - LTTNG_LOGLEVEL_DEBUG_SYSTEM = 7, - LTTNG_LOGLEVEL_DEBUG_PROGRAM = 8, - LTTNG_LOGLEVEL_DEBUG_PROCESS = 9, - LTTNG_LOGLEVEL_DEBUG_MODULE = 10, - LTTNG_LOGLEVEL_DEBUG_UNIT = 11, - LTTNG_LOGLEVEL_DEBUG_FUNCTION = 12, - LTTNG_LOGLEVEL_DEBUG_LINE = 13, - LTTNG_LOGLEVEL_DEBUG = 14, + LTTNG_LOGLEVEL_EMERG = 0, + LTTNG_LOGLEVEL_ALERT = 1, + LTTNG_LOGLEVEL_CRIT = 2, + LTTNG_LOGLEVEL_ERR = 3, + LTTNG_LOGLEVEL_WARNING = 4, + LTTNG_LOGLEVEL_NOTICE = 5, + LTTNG_LOGLEVEL_INFO = 6, + LTTNG_LOGLEVEL_DEBUG_SYSTEM = 7, + LTTNG_LOGLEVEL_DEBUG_PROGRAM = 8, + LTTNG_LOGLEVEL_DEBUG_PROCESS = 9, + LTTNG_LOGLEVEL_DEBUG_MODULE = 10, + LTTNG_LOGLEVEL_DEBUG_UNIT = 11, + LTTNG_LOGLEVEL_DEBUG_FUNCTION = 12, + LTTNG_LOGLEVEL_DEBUG_LINE = 13, + LTTNG_LOGLEVEL_DEBUG = 14, }; /* @@ -133,9 +133,9 @@ enum lttng_calibrate_type { /* Destination type of lttng URI */ enum lttng_dst_type { - LTTNG_DST_IPV4, /* IPv4 protocol */ - LTTNG_DST_IPV6, /* IPv6 protocol */ - LTTNG_DST_PATH, /* Local file system */ + LTTNG_DST_IPV4 = 1, + LTTNG_DST_IPV6 = 2, + LTTNG_DST_PATH = 3, }; /* Type of lttng URI where it is a final destination or a hop */ @@ -179,6 +179,7 @@ struct lttng_uri { enum lttng_proto_type proto; in_port_t port; char padding[LTTNG_URI_PADDING1_LEN]; + char subdir[PATH_MAX]; union { char ipv4[INET_ADDRSTRLEN]; char ipv6[INET6_ADDRSTRLEN]; @@ -414,6 +415,14 @@ extern void lttng_destroy_handle(struct lttng_handle *handle); */ extern int lttng_create_session(const char *name, const char *path); +/* + * Create a tracing sessioin using a name, URIs and a consumer enable flag. + * The control URI is mandatory for consumer local or network. + */ +extern int lttng_create_session_uri(const char *name, + struct lttng_uri *ctrl_uri, struct lttng_uri *data_uri, + unsigned int enable_consumer); + /* * Destroy a tracing session. * @@ -578,4 +587,24 @@ extern int lttng_calibrate(struct lttng_handle *handle, extern void lttng_channel_set_default_attr(struct lttng_domain *domain, struct lttng_channel_attr *attr); +/* + * Set URI for a consumer for a session and domain. + * + * For network streaming, both data and control stream type MUST be defined + * with a specific URIs. Default port are 5342 and 5343 respectively for + * control and data which uses the TCP protocol. + */ +extern int lttng_set_consumer_uri(struct lttng_handle *handle, + struct lttng_uri *uri); + +/* + * Enable the consumer for a session and domain. + */ +extern int lttng_enable_consumer(struct lttng_handle *handle); + +/* + * Disable consumer for a session and domain. + */ +extern int lttng_disable_consumer(struct lttng_handle *handle); + #endif /* _LTTNG_H */ diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 9b9713b2b..c9201f5da 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -17,6 +17,7 @@ lttng_sessiond_SOURCES = utils.c utils.h \ modprobe.c modprobe.h kern-modules.h \ lttng-ust-ctl.h lttng-ust-abi.h \ fd-limit.c fd-limit.h \ + consumer.c consumer.h \ kernel-consumer.c kernel-consumer.h \ consumer.h @@ -34,7 +35,8 @@ lttng_sessiond_LDADD = -lrt -lurcu-common -lurcu \ $(top_builddir)/src/common/kernel-ctl/libkernel-ctl.la \ $(top_builddir)/src/common/hashtable/libhashtable.la \ $(top_builddir)/src/common/libcommon.la \ - $(top_builddir)/src/common/compat/libcompat.la + $(top_builddir)/src/common/compat/libcompat.la \ + $(top_builddir)/src/common/relayd/librelayd.la if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_LDADD += -llttng-ust-ctl diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c new file mode 100644 index 000000000..56d938114 --- /dev/null +++ b/src/bin/lttng-sessiond/consumer.c @@ -0,0 +1,333 @@ +/* + * Copyright (C) 2012 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "consumer.h" + +/* + * Allocate and assign data to a consumer_output object. + * + * Return pointer to structure. + */ +struct consumer_output *consumer_create_output(enum consumer_dst_type type) +{ + struct consumer_output *output = NULL; + + output = zmalloc(sizeof(struct consumer_output)); + if (output == NULL) { + PERROR("zmalloc consumer_output"); + goto error; + } + + /* By default, consumer output is enabled */ + output->enabled = 1; + output->type = type; + output->net_seq_index = -1; + /* + * Important to keep it to a negative value on creation since it was zeroed + * during allocation and the file descriptor 0 is a valid one. + */ + output->sock = -1; + +error: + return output; +} + +/* + * Delete the consumer_output object from the list and free the ptr. + */ +void consumer_destroy_output(struct consumer_output *obj) +{ + if (obj == NULL) { + return; + } + + if (obj->sock >= 0) { + (void) close(obj->sock); + } + free(obj); +} + +/* + * Copy consumer output and returned the newly allocated copy. + */ +struct consumer_output *consumer_copy_output(struct consumer_output *obj) +{ + struct consumer_output *output; + + assert(obj); + + output = consumer_create_output(obj->type); + if (output == NULL) { + goto error; + } + + memcpy(output, obj, sizeof(struct consumer_output)); + +error: + return output; +} + +/* + * Set network URI to the consumer output object. + * + * Return 0 on success. Negative value on error. + */ +int consumer_set_network_uri(struct consumer_output *obj, + struct lttng_uri *uri) +{ + int ret; + char tmp_path[PATH_MAX]; + char hostname[HOST_NAME_MAX]; + struct lttng_uri *dst_uri = NULL; + + /* Code flow error safety net. */ + assert(obj); + assert(uri); + + switch (uri->stype) { + case LTTNG_STREAM_CONTROL: + dst_uri = &obj->dst.net.control; + obj->dst.net.control_isset = 1; + if (uri->port == 0) { + /* Assign default port. */ + uri->port = DEFAULT_NETWORK_CONTROL_PORT; + } + break; + case LTTNG_STREAM_DATA: + dst_uri = &obj->dst.net.data; + obj->dst.net.data_isset = 1; + if (uri->port == 0) { + /* Assign default port. */ + uri->port = DEFAULT_NETWORK_DATA_PORT; + } + break; + default: + ERR("Set network uri type unknown %d", uri->stype); + goto error; + } + + ret = uri_compare(dst_uri, uri); + if (!ret) { + /* Same URI, don't touch it and return success. */ + DBG3("URI network compare are the same"); + goto end; + } + + /* URIs were not equal, replacing it. */ + memset(dst_uri, 0, sizeof(struct lttng_uri)); + memcpy(dst_uri, uri, sizeof(struct lttng_uri)); + obj->type = CONSUMER_DST_NET; + + /* Handle subdir and add hostname in front. */ + if (dst_uri->stype == LTTNG_STREAM_CONTROL) { + /* Get hostname to append it in the pathname */ + ret = gethostname(hostname, sizeof(hostname)); + if (ret < 0) { + PERROR("gethostname. Fallback on default localhost"); + strncpy(hostname, "localhost", sizeof(hostname)); + } + hostname[sizeof(hostname) - 1] = '\0'; + + /* Setup consumer subdir if none present in the control URI */ + if (strlen(dst_uri->subdir) == 0) { + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + hostname, obj->subdir); + } else { + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + hostname, dst_uri->subdir); + } + if (ret < 0) { + PERROR("snprintf set consumer uri subdir"); + goto error; + } + + strncpy(obj->subdir, tmp_path, sizeof(obj->subdir)); + DBG3("Consumer set network uri subdir path %s", tmp_path); + } + +end: + return 0; + +error: + return -1; +} + +/* + * Send file descriptor to consumer via sock. + */ +int consumer_send_fds(int sock, int *fds, size_t nb_fd) +{ + int ret; + + assert(fds); + assert(nb_fd > 0); + + ret = lttcomm_send_fds_unix_sock(sock, fds, nb_fd); + if (ret < 0) { + PERROR("send consumer fds"); + goto error; + } + +error: + return ret; +} + +/* + * Consumer send channel communication message structure to consumer. + */ +int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg) +{ + int ret; + + assert(msg); + assert(sock >= 0); + + ret = lttcomm_send_unix_sock(sock, msg, + sizeof(struct lttcomm_consumer_msg)); + if (ret < 0) { + PERROR("send consumer channel"); + goto error; + } + +error: + return ret; +} + +/* + * Init channel communication message structure. + */ +void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + int channel_key, + uint64_t max_sb_size, + uint64_t mmap_len, + const char *name) +{ + assert(msg); + + /* TODO: Args validation */ + + /* Zeroed structure */ + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + /* Send channel */ + msg->cmd_type = cmd; + msg->u.channel.channel_key = channel_key; + msg->u.channel.max_sb_size = max_sb_size; + msg->u.channel.mmap_len = mmap_len; +} + +/* + * Init stream communication message structure. + */ +void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + int channel_key, + int stream_key, + uint32_t state, + enum lttng_event_output output, + uint64_t mmap_len, + uid_t uid, + gid_t gid, + int net_index, + unsigned int metadata_flag, + const char *name, + const char *pathname) +{ + assert(msg); + + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + /* TODO: Args validation */ + + msg->cmd_type = cmd; + msg->u.stream.channel_key = channel_key; + msg->u.stream.stream_key = stream_key; + msg->u.stream.state = state; + msg->u.stream.output = output; + msg->u.stream.mmap_len = mmap_len; + msg->u.stream.uid = uid; + msg->u.stream.gid = gid; + msg->u.stream.net_index = net_index; + msg->u.stream.metadata_flag = metadata_flag; + strncpy(msg->u.stream.name, name, sizeof(msg->u.stream.name)); + msg->u.stream.name[sizeof(msg->u.stream.name) - 1] = '\0'; + strncpy(msg->u.stream.path_name, pathname, + sizeof(msg->u.stream.path_name)); + msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; +} + +/* + * Send stream communication structure to the consumer. + */ +int consumer_send_stream(int sock, struct consumer_output *dst, + struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd) +{ + int ret; + + assert(msg); + assert(dst); + + switch (dst->type) { + case CONSUMER_DST_NET: + /* Consumer should send the stream on the network. */ + msg->u.stream.net_index = dst->net_seq_index; + break; + case CONSUMER_DST_LOCAL: + /* Add stream file name to stream path */ + strncat(msg->u.stream.path_name, "/", sizeof(msg->u.stream.path_name)); + strncat(msg->u.stream.path_name, msg->u.stream.name, + sizeof(msg->u.stream.path_name)); + msg->u.stream.path_name[sizeof(msg->u.stream.path_name) - 1] = '\0'; + /* Indicate that the stream is NOT network */ + msg->u.stream.net_index = -1; + break; + default: + ERR("Consumer unknown output type (%d)", dst->type); + ret = -1; + goto error; + } + + /* Send on socket */ + ret = lttcomm_send_unix_sock(sock, msg, + sizeof(struct lttcomm_consumer_msg)); + if (ret < 0) { + PERROR("send consumer stream"); + goto error; + } + + ret = consumer_send_fds(sock, fds, nb_fd); + if (ret < 0) { + goto error; + } + +error: + return ret; +} diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 21d762a15..2eb9d7433 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -21,6 +21,12 @@ #include #include +#include + +enum consumer_dst_type { + CONSUMER_DST_LOCAL, + CONSUMER_DST_NET, +}; struct consumer_data { enum lttng_consumer_type type; @@ -40,4 +46,81 @@ struct consumer_data { char cmd_unix_sock_path[PATH_MAX]; }; +/* + * Network URIs + */ +struct consumer_net { + /* + * Indicate if URI type is set. Those flags should only be set when the + * created URI is done AND valid. + */ + int control_isset; + int data_isset; + + /* + * The following two URIs MUST have the same destination address for + * network streaming to work. Network hop are not yet supported. + */ + + /* Control path for network streaming. */ + struct lttng_uri control; + + /* Data path for network streaming. */ + struct lttng_uri data; +}; + +/* + * Consumer output object describing where and how to send data. + */ +struct consumer_output { + /* Consumer socket file descriptor */ + int sock; + /* If the consumer is enabled meaning that should be used */ + unsigned int enabled; + enum consumer_dst_type type; + /* + * The net_seq_index is the index of the network stream on the consumer + * side. It's basically the relayd socket file descriptor value so the + * consumer can identify which streams goes with which socket. + */ + int net_seq_index; + /* + * Subdirectory path name used for both local and network consumer. + */ + char subdir[PATH_MAX]; + union { + char trace_path[PATH_MAX]; + struct consumer_net net; + } dst; +}; + +struct consumer_output *consumer_create_output(enum consumer_dst_type type); +struct consumer_output *consumer_copy_output(struct consumer_output *obj); +void consumer_destroy_output(struct consumer_output *obj); +int consumer_set_network_uri(struct consumer_output *obj, + struct lttng_uri *uri); +int consumer_send_fds(int sock, int *fds, size_t nb_fd); +int consumer_send_stream(int sock, struct consumer_output *dst, + struct lttcomm_consumer_msg *msg, int *fds, size_t nb_fd); +int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg); +void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + int channel_key, + int stream_key, + uint32_t state, + enum lttng_event_output output, + uint64_t mmap_len, + uid_t uid, + gid_t gid, + int net_index, + unsigned int metadata_flag, + const char *name, + const char *pathname); +void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + int channel_key, + uint64_t max_sb_size, + uint64_t mmap_len, + const char *name); + #endif /* _CONSUMER_H */ diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index bbf50d57b..5d690548c 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -24,34 +24,186 @@ #include #include -#include +#include "consumer.h" #include "kernel-consumer.h" +/* + * Sending a single channel to the consumer with command ADD_CHANNEL. + */ +int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel) +{ + int ret; + struct lttcomm_consumer_msg lkm; + + /* Safety net */ + assert(channel); + + DBG("Kernel consumer adding channel %s to kernel consumer", + channel->channel->name); + + /* Prep channel message structure */ + consumer_init_channel_comm_msg(&lkm, + LTTNG_CONSUMER_ADD_CHANNEL, + channel->fd, + channel->channel->attr.subbuf_size, + 0, /* Kernel */ + channel->channel->name); + + ret = consumer_send_channel(sock, &lkm); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. + */ +int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session) +{ + int ret; + const char *pathname; + struct lttcomm_consumer_msg lkm; + struct consumer_output *output; + + /* Safety net */ + assert(session); + assert(session->consumer); + + DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd); + + /* Get consumer output pointer */ + output = session->consumer; + + /* Get correct path name destination */ + if (output->type == CONSUMER_DST_LOCAL) { + pathname = output->dst.trace_path; + } else { + pathname = output->subdir; + } + + /* Prep channel message structure */ + consumer_init_channel_comm_msg(&lkm, + LTTNG_CONSUMER_ADD_CHANNEL, + session->metadata->fd, + session->metadata->conf->attr.subbuf_size, + 0, /* for kernel */ + "metadata"); + + ret = consumer_send_channel(sock, &lkm); + if (ret < 0) { + goto error; + } + + /* Prep stream message structure */ + consumer_init_stream_comm_msg(&lkm, + LTTNG_CONSUMER_ADD_STREAM, + session->metadata->fd, + session->metadata_stream_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_KERNEL_CHANNEL_OUTPUT, + 0, /* Kernel */ + session->uid, + session->gid, + output->net_seq_index, + 1, /* Metadata flag set */ + "metadata", + pathname); + + /* Send stream and file descriptor */ + ret = consumer_send_stream(sock, output, &lkm, + &session->metadata_stream_fd, 1); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * Sending a single stream to the consumer with command ADD_STREAM. + */ +int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, + struct ltt_kernel_stream *stream, struct ltt_kernel_session *session) +{ + int ret; + const char *pathname; + struct lttcomm_consumer_msg lkm; + struct consumer_output *output; + + assert(channel); + assert(stream); + assert(session); + assert(session->consumer); + + DBG("Sending stream %d of channel %s to kernel consumer", + stream->fd, channel->channel->name); + + /* Get consumer output pointer */ + output = session->consumer; + + /* Get correct path name destination */ + if (output->type == CONSUMER_DST_LOCAL) { + pathname = output->dst.trace_path; + DBG3("Consumer is local to %s", pathname); + } else { + pathname = output->subdir; + DBG3("Consumer is network to subdir %s", pathname); + } + + /* Prep stream consumer message */ + consumer_init_stream_comm_msg(&lkm, LTTNG_CONSUMER_ADD_STREAM, + channel->fd, + stream->fd, + stream->state, + channel->channel->attr.output, + 0, /* Kernel */ + session->uid, + session->gid, + output->net_seq_index, + 0, /* Metadata flag unset */ + stream->name, + pathname); + + /* Send stream and file descriptor */ + ret = consumer_send_stream(sock, output, &lkm, &stream->fd, 1); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + /* * Send all stream fds of kernel channel to the consumer. */ -int kernel_consumer_send_channel_stream(struct consumer_data *consumer_data, - struct ltt_kernel_channel *channel, uid_t uid, gid_t gid) +int kernel_consumer_send_channel_stream(int sock, + struct ltt_kernel_channel *channel, struct ltt_kernel_session *session) { - int ret, count = 0, consumer_sock; + int ret; struct ltt_kernel_stream *stream; - struct lttcomm_consumer_msg lkm; + + /* Safety net */ + assert(channel); + assert(session); + assert(session->consumer); + + /* Bail out if consumer is disabled */ + if (!session->consumer->enabled) { + ret = LTTCOMM_OK; + goto error; + } DBG("Sending streams of channel %s to kernel consumer", channel->channel->name); - consumer_sock = consumer_data->cmd_sock; - - /* Send channel */ - lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; - lkm.u.channel.channel_key = channel->fd; - lkm.u.channel.max_sb_size = channel->channel->attr.subbuf_size; - lkm.u.channel.mmap_len = 0; /* for kernel */ - DBG("Sending channel %d to consumer", lkm.u.channel.channel_key); - ret = lttcomm_send_unix_sock(consumer_sock, &lkm, sizeof(lkm)); + ret = kernel_consumer_add_channel(sock, channel); if (ret < 0) { - PERROR("send consumer channel"); goto error; } @@ -60,37 +212,14 @@ int kernel_consumer_send_channel_stream(struct consumer_data *consumer_data, if (!stream->fd) { continue; } - /* Reset consumer message structure */ - memset(&lkm, 0, sizeof(lkm)); - lkm.cmd_type = LTTNG_CONSUMER_ADD_STREAM; - lkm.u.stream.channel_key = channel->fd; - lkm.u.stream.stream_key = stream->fd; - lkm.u.stream.state = stream->state; - lkm.u.stream.output = channel->channel->attr.output; - lkm.u.stream.mmap_len = 0; /* for kernel */ - lkm.u.stream.uid = uid; - lkm.u.stream.gid = gid; - strncpy(lkm.u.stream.path_name, stream->pathname, PATH_MAX - 1); - lkm.u.stream.path_name[PATH_MAX - 1] = '\0'; - count++; - - DBG("Sending stream %d to consumer", lkm.u.stream.stream_key); - ret = lttcomm_send_unix_sock(consumer_sock, &lkm, sizeof(lkm)); - if (ret < 0) { - PERROR("send consumer stream"); - goto error; - } - ret = lttcomm_send_fds_unix_sock(consumer_sock, &stream->fd, 1); + + /* Add stream on the kernel consumer side. */ + ret = kernel_consumer_add_stream(sock, channel, stream, session); if (ret < 0) { - PERROR("send consumer stream ancillary data"); goto error; } } - DBG("consumer channel streams sent"); - - return 0; - error: return ret; } @@ -98,69 +227,42 @@ error: /* * Send all stream fds of the kernel session to the consumer. */ -int kernel_consumer_send_session(struct consumer_data *consumer_data, - struct ltt_kernel_session *session) +int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session) { int ret; struct ltt_kernel_channel *chan; - struct lttcomm_consumer_msg lkm; - int sock = session->consumer_fd; - DBG("Sending metadata stream fd"); + /* Safety net */ + assert(session); + assert(session->consumer); - /* Extra protection. It's NOT supposed to be set to -1 at this point */ - if (session->consumer_fd < 0) { - session->consumer_fd = consumer_data->cmd_sock; + /* Bail out if consumer is disabled */ + if (!session->consumer->enabled) { + ret = LTTCOMM_OK; + goto error; } + DBG("Sending session stream to kernel consumer"); + if (session->metadata_stream_fd >= 0) { - /* Send metadata channel fd */ - lkm.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; - lkm.u.channel.channel_key = session->metadata->fd; - lkm.u.channel.max_sb_size = session->metadata->conf->attr.subbuf_size; - lkm.u.channel.mmap_len = 0; /* for kernel */ - DBG("Sending metadata channel %d to consumer", lkm.u.channel.channel_key); - ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm)); + ret = kernel_consumer_add_metadata(sock, session); if (ret < 0) { - PERROR("send consumer channel"); goto error; } - /* Send metadata stream fd */ - lkm.cmd_type = LTTNG_CONSUMER_ADD_STREAM; - lkm.u.stream.channel_key = session->metadata->fd; - lkm.u.stream.stream_key = session->metadata_stream_fd; - lkm.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM; - lkm.u.stream.output = DEFAULT_KERNEL_CHANNEL_OUTPUT; - lkm.u.stream.mmap_len = 0; /* for kernel */ - lkm.u.stream.uid = session->uid; - lkm.u.stream.gid = session->gid; - strncpy(lkm.u.stream.path_name, session->metadata->pathname, - PATH_MAX - 1); - lkm.u.stream.path_name[PATH_MAX - 1] = '\0'; - - DBG("Sending metadata stream %d to consumer", lkm.u.stream.stream_key); - ret = lttcomm_send_unix_sock(sock, &lkm, sizeof(lkm)); - if (ret < 0) { - PERROR("send consumer stream"); - goto error; - } - ret = lttcomm_send_fds_unix_sock(sock, &session->metadata_stream_fd, 1); - if (ret < 0) { - PERROR("send consumer stream"); - goto error; - } + /* Flag that at least the metadata has been sent to the consumer. */ + session->consumer_fds_sent = 1; } + /* Send channel and streams of it */ cds_list_for_each_entry(chan, &session->channel_list.head, list) { - ret = kernel_consumer_send_channel_stream(consumer_data, chan, - session->uid, session->gid); + ret = kernel_consumer_send_channel_stream(sock, chan, session); if (ret < 0) { goto error; } } - DBG("consumer fds (metadata and channel streams) sent"); + DBG("Kernel consumer FDs of metadata and channel streams sent"); return 0; @@ -168,3 +270,53 @@ error: return ret; } +/* + * Send relayd socket to consumer associated with a session name. + * + * On success return positive value. On error, negative value. + */ +int kernel_consumer_send_relayd_socket(int consumer_sock, + struct lttcomm_sock *sock, struct consumer_output *consumer, + enum lttng_stream_type type) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + assert(sock); + assert(consumer); + + /* Bail out if consumer is disabled */ + if (!consumer->enabled) { + ret = LTTCOMM_OK; + goto error; + } + + msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; + /* + * Assign network consumer output index using the temporary consumer since + * this call should only be made from within a set_consumer_uri() function + * call in the session daemon. + */ + msg.u.relayd_sock.net_index = consumer->net_seq_index; + msg.u.relayd_sock.type = type; + memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); + + DBG2("Sending relayd sock info to consumer"); + ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); + if (ret < 0) { + PERROR("send consumer relayd socket info"); + goto error; + } + + DBG2("Sending relayd socket file descriptor to consumer"); + ret = consumer_send_fds(consumer_sock, &sock->fd, 1); + if (ret < 0) { + goto error; + } + + DBG("Kernel consumer relayd socket sent"); + +error: + return ret; +} diff --git a/src/bin/lttng-sessiond/kernel-consumer.h b/src/bin/lttng-sessiond/kernel-consumer.h index cf4ef256e..02e3ec008 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.h +++ b/src/bin/lttng-sessiond/kernel-consumer.h @@ -15,17 +15,24 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _KERNEL_CONSUMER_H -#define _KERNEL_CONSUMER_H - #include -#include "consumer.h" +#include + #include "trace-kernel.h" -int kernel_consumer_send_channel_stream(struct consumer_data *consumer_data, - struct ltt_kernel_channel *channel, uid_t uid, gid_t gid); -int kernel_consumer_send_session(struct consumer_data *consumer_data, - struct ltt_kernel_session *session); +int kernel_consumer_send_channel_stream(int sock, + struct ltt_kernel_channel *channel, struct ltt_kernel_session *session); + +int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session); + +int kernel_consumer_send_relayd_socket(int consumer_sock, + struct lttcomm_sock *sock, struct consumer_output *consumer, + enum lttng_stream_type type); + +int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel, + struct ltt_kernel_stream *stream, struct ltt_kernel_session *session); + +int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session); -#endif /* _KERNEL_CONSUMER_H */ +int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel); diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index 39006ab2f..484ea304e 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -169,8 +169,7 @@ int kernel_create_channel(struct ltt_kernel_session *session, cds_list_add(&lkc->list, &session->channel_list.head); session->channel_count++; - DBG("Kernel channel %s created (fd: %d and path: %s)", - lkc->channel->name, lkc->fd, lkc->pathname); + DBG("Kernel channel %s created (fd: %d)", lkc->channel->name, lkc->fd); return 0; @@ -377,7 +376,7 @@ int kernel_open_metadata(struct ltt_kernel_session *session, char *path) session->metadata = lkm; - DBG("Kernel metadata opened (fd: %d and path: %s)", lkm->fd, lkm->pathname); + DBG("Kernel metadata opened (fd: %d)", lkm->fd); return 0; @@ -448,7 +447,7 @@ int kernel_metadata_flush_buffer(int fd) ret = kernctl_buffer_flush(fd); if (ret < 0) { - ERR("Fail to flush metadata buffers %d (ret: %d", fd, ret); + ERR("Fail to flush metadata buffers %d (ret: %d)", fd, ret); } return 0; @@ -505,11 +504,11 @@ error: */ int kernel_open_channel_stream(struct ltt_kernel_channel *channel) { - int ret; + int ret, count = 0; struct ltt_kernel_stream *lks; while ((ret = kernctl_create_stream(channel->fd)) >= 0) { - lks = trace_kernel_create_stream(); + lks = trace_kernel_create_stream(channel->channel->name, count); if (lks == NULL) { ret = close(ret); if (ret) { @@ -525,19 +524,15 @@ int kernel_open_channel_stream(struct ltt_kernel_channel *channel) PERROR("fcntl session fd"); } - ret = asprintf(&lks->pathname, "%s/%s_%d", - channel->pathname, channel->channel->name, channel->stream_count); - if (ret < 0) { - PERROR("asprintf kernel create stream"); - goto error; - } - /* Add stream to channe stream list */ cds_list_add(&lks->list, &channel->stream_list.head); channel->stream_count++; - DBG("Kernel stream %d created (fd: %d, state: %d, path: %s)", - channel->stream_count, lks->fd, lks->state, lks->pathname); + /* Increment counter which represent CPU number. */ + count++; + + DBG("Kernel stream %s created (fd: %d, state: %d)", lks->name, lks->fd, + lks->state); } return channel->stream_count; diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 6ea615e10..a15721d76 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -42,11 +42,12 @@ #include #include #include -#include #include +#include #include "lttng-sessiond.h" #include "channel.h" +#include "consumer.h" #include "context.h" #include "event.h" #include "kernel.h" @@ -54,6 +55,7 @@ #include "modprobe.h" #include "shm.h" #include "ust-ctl.h" +#include "ust-consumer.h" #include "utils.h" #include "fd-limit.h" @@ -132,7 +134,6 @@ static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; - /* * UST registration command queue. This queue is tied with a futex and uses a N * wakers / 1 waiter implemented and detailed in futex.c/.h @@ -196,6 +197,16 @@ enum consumerd_state { static enum consumerd_state ust_consumerd_state; static enum consumerd_state kernel_consumerd_state; +/* + * Used to keep a unique index for each relayd socket created where this value + * is associated with streams on the consumer so it can match the right relayd + * to send to. + * + * This value should be incremented atomically for safety purposes and future + * possible concurrent access. + */ +static unsigned int relayd_net_seq_idx; + static void setup_consumerd_path(void) { @@ -641,9 +652,11 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1) { - ret = kernel_consumer_send_channel_stream(consumer_data, - channel, session->uid, session->gid); + if (session->kernel_session->consumer_fds_sent == 1 && + session->kernel_session->consumer != NULL) { + ret = kernel_consumer_send_channel_stream( + session->kernel_session->consumer_fd, channel, + session->kernel_session); if (ret < 0) { goto error; } @@ -1745,7 +1758,7 @@ static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; - if (session->consumer_fds_sent == 0) { + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { /* * Assign default kernel consumer socket if no consumer assigned to the * kernel session. At this point, it's NOT supposed to be -1 but this is @@ -1755,15 +1768,280 @@ static int init_kernel_tracing(struct ltt_kernel_session *session) session->consumer_fd = kconsumer_data.cmd_sock; } - ret = kernel_consumer_send_session(&kconsumer_data, session); + ret = kernel_consumer_send_session(session->consumer_fd, session); if (ret < 0) { ret = LTTCOMM_KERN_CONSUMER_FAIL; goto error; } + } - session->consumer_fds_sent = 1; +error: + return ret; +} + +/* + * Create a socket to the relayd using the URI. + * + * On success, the relayd_sock pointer is set to the created socket. + * Else, it is untouched and an lttcomm error code is returned. + */ +static int create_connect_relayd(struct consumer_output *output, + const char *session_name, struct lttng_uri *uri, + struct lttcomm_sock **relayd_sock) +{ + int ret; + struct lttcomm_sock *sock; + + /* Create socket object from URI */ + sock = lttcomm_alloc_sock_from_uri(uri); + if (sock == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + + ret = lttcomm_create_sock(sock); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Connect to relayd so we can proceed with a session creation. */ + ret = relayd_connect(sock); + if (ret < 0) { + ERR("Unable to reach lttng-relayd"); + ret = LTTCOMM_RELAYD_SESSION_FAIL; + goto free_sock; } + /* Create socket for control stream. */ + if (uri->stype == LTTNG_STREAM_CONTROL) { + DBG3("Creating relayd stream socket from URI"); + + /* Check relayd version */ + ret = relayd_version_check(sock, LTTNG_UST_COMM_MAJOR, 0); + if (ret < 0) { + ret = LTTCOMM_RELAYD_VERSION_FAIL; + goto close_sock; + } + } else if (uri->stype == LTTNG_STREAM_DATA) { + DBG3("Creating relayd data socket from URI"); + } else { + /* Command is not valid */ + ERR("Relayd invalid stream type: %d", uri->stype); + ret = LTTCOMM_INVALID; + goto close_sock; + } + + *relayd_sock = sock; + + return LTTCOMM_OK; + +close_sock: + if (sock) { + (void) relayd_close(sock); + } +free_sock: + if (sock) { + lttcomm_destroy_sock(sock); + } +error: + return ret; +} + +/* + * Connect to the relayd using URI and send the socket to the right consumer. + */ +static int send_socket_relayd_consumer(int domain, struct ltt_session *session, + struct lttng_uri *relayd_uri, struct consumer_output *consumer, + int consumer_fd) +{ + int ret; + struct lttcomm_sock *sock = NULL; + + /* Set the network sequence index if not set. */ + if (consumer->net_seq_index == -1) { + /* + * Increment net_seq_idx because we are about to transfer the + * new relayd socket to the consumer. + */ + uatomic_inc(&relayd_net_seq_idx); + /* Assign unique key so the consumer can match streams */ + consumer->net_seq_index = uatomic_read(&relayd_net_seq_idx); + } + + /* Connect to relayd and make version check if uri is the control. */ + ret = create_connect_relayd(consumer, session->name, relayd_uri, &sock); + if (ret != LTTCOMM_OK) { + goto close_sock; + } + + /* If the control socket is connected, network session is ready */ + if (relayd_uri->stype == LTTNG_STREAM_CONTROL) { + session->net_handle = 1; + } + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + /* Send relayd socket to consumer. */ + ret = kernel_consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; + } + break; + case LTTNG_DOMAIN_UST: + /* Send relayd socket to consumer. */ + ret = ust_consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; + } + break; + } + + ret = LTTCOMM_OK; + + /* + * Close socket which was dup on the consumer side. The session daemon does + * NOT keep track of the relayd socket(s) once transfer to the consumer. + */ + +close_sock: + if (sock) { + (void) relayd_close(sock); + lttcomm_destroy_sock(sock); + } + + return ret; +} + +/* + * Send both relayd sockets to a specific consumer and domain. This is a + * helper function to facilitate sending the information to the consumer for a + * session. + */ +static int send_sockets_relayd_consumer(int domain, + struct ltt_session *session, struct consumer_output *consumer, int fd) +{ + int ret; + + /* Sending control relayd socket. */ + ret = send_socket_relayd_consumer(domain, session, + &consumer->dst.net.control, consumer, fd); + if (ret != LTTCOMM_OK) { + goto error; + } + + /* Sending data relayd socket. */ + ret = send_socket_relayd_consumer(domain, session, + &consumer->dst.net.data, consumer, fd); + if (ret != LTTCOMM_OK) { + goto error; + } + +error: + return ret; +} + +/* + * Setup relayd connections for a tracing session. First creates the socket to + * the relayd and send them to the right domain consumer. Consumer type MUST be + * network. + */ +static int setup_relayd(struct ltt_session *session) +{ + int ret = LTTCOMM_OK; + struct ltt_ust_session *usess; + struct ltt_kernel_session *ksess; + + assert(session); + + usess = session->ust_session; + ksess = session->kernel_session; + + DBG2("Setting relayd for session %s", session->name); + + if (usess && usess->consumer->sock == -1 && + usess->consumer->type == CONSUMER_DST_NET && + usess->consumer->enabled) { + /* Setup relayd for 64 bits consumer */ + if (ust_consumerd64_fd >= 0) { + send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, + usess->consumer, ust_consumerd64_fd); + if (ret != LTTCOMM_OK) { + goto error; + } + } + + /* Setup relayd for 32 bits consumer */ + if (ust_consumerd32_fd >= 0) { + send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, + usess->consumer, ust_consumerd32_fd); + if (ret != LTTCOMM_OK) { + goto error; + } + } + } else if (ksess && ksess->consumer->sock == -1 && + ksess->consumer->type == CONSUMER_DST_NET && + ksess->consumer->enabled) { + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, ksess->consumer_fd); + if (ret != LTTCOMM_OK) { + goto error; + } + } + +error: + return ret; +} + +/* + * Copy consumer output from the tracing session to the domain session. The + * function also applies the right modification on a per domain basis for the + * trace files destination directory. + */ +static int copy_session_consumer(int domain, struct ltt_session *session) +{ + int ret; + const char *dir_name; + struct consumer_output *consumer; + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + DBG3("Copying tracing session consumer output in kernel session"); + session->kernel_session->consumer = + consumer_copy_output(session->consumer); + /* Ease our life a bit for the next part */ + consumer = session->kernel_session->consumer; + dir_name = DEFAULT_KERNEL_TRACE_DIR; + break; + case LTTNG_DOMAIN_UST: + DBG3("Copying tracing session consumer output in UST session"); + session->ust_session->consumer = + consumer_copy_output(session->consumer); + /* Ease our life a bit for the next part */ + consumer = session->ust_session->consumer; + dir_name = DEFAULT_UST_TRACE_DIR; + break; + default: + ret = LTTCOMM_UNKNOWN_DOMAIN; + goto error; + } + + /* Append correct directory to subdir */ + strncat(consumer->subdir, dir_name, sizeof(consumer->subdir)); + DBG3("Copy session consumer subdir %s", consumer->subdir); + + /* Add default trace directory name */ + if (consumer->type == CONSUMER_DST_LOCAL) { + strncat(consumer->dst.trace_path, dir_name, + sizeof(consumer->dst.trace_path)); + } + + ret = LTTCOMM_OK; + error: return ret; } @@ -1774,13 +2052,17 @@ error: static int create_ust_session(struct ltt_session *session, struct lttng_domain *domain) { - struct ltt_ust_session *lus = NULL; int ret; + struct ltt_ust_session *lus = NULL; + + assert(session); + assert(session->consumer); switch (domain->type) { case LTTNG_DOMAIN_UST: break; default: + ERR("Unknown UST domain on create session %d", domain->type); ret = LTTCOMM_UNKNOWN_DOMAIN; goto error; } @@ -1793,33 +2075,33 @@ static int create_ust_session(struct ltt_session *session, goto error; } - ret = run_as_mkdir_recursive(lus->pathname, S_IRWXU | S_IRWXG, - session->uid, session->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - ret = LTTCOMM_UST_SESS_FAIL; - goto error; + if (session->consumer->type == CONSUMER_DST_LOCAL) { + ret = run_as_mkdir_recursive(lus->pathname, S_IRWXU | S_IRWXG, + session->uid, session->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); + ret = LTTCOMM_UST_SESS_FAIL; + goto error; + } } } - /* The domain type dictate different actions on session creation */ - switch (domain->type) { - case LTTNG_DOMAIN_UST: - /* No ustctl for the global UST domain */ - break; - default: - ERR("Unknown UST domain on create session %d", domain->type); - goto error; - } lus->uid = session->uid; lus->gid = session->gid; session->ust_session = lus; + /* Copy session output to the newly created UST session */ + ret = copy_session_consumer(domain->type, session); + if (ret != LTTCOMM_OK) { + goto error; + } + return LTTCOMM_OK; error: free(lus); + session->ust_session = NULL; return ret; } @@ -1843,18 +2125,33 @@ static int create_kernel_session(struct ltt_session *session) session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; } - ret = run_as_mkdir_recursive(session->kernel_session->trace_path, - S_IRWXU | S_IRWXG, session->uid, session->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - goto error; + /* Copy session output to the newly created Kernel session */ + ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); + if (ret != LTTCOMM_OK) { + goto error; + } + + /* Create directory(ies) on local filesystem. */ + if (session->consumer->type == CONSUMER_DST_LOCAL) { + ret = run_as_mkdir_recursive( + session->kernel_session->consumer->dst.trace_path, + S_IRWXU | S_IRWXG, session->uid, session->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); + goto error; + } } } + session->kernel_session->uid = session->uid; session->kernel_session->gid = session->gid; + return LTTCOMM_OK; + error: + trace_kernel_destroy_session(session->kernel_session); + session->kernel_session = NULL; return ret; } @@ -1871,6 +2168,9 @@ static int session_access_ok(struct ltt_session *session, uid_t uid, gid_t gid) } } +/* + * Count number of session permitted by uid/gid. + */ static unsigned int lttng_sessions_count(uid_t uid, gid_t gid) { unsigned int i = 0; @@ -2752,8 +3052,9 @@ static int cmd_start_trace(struct ltt_session *session) int ret; struct ltt_kernel_session *ksession; struct ltt_ust_session *usess; + struct ltt_kernel_channel *kchan; - /* Short cut */ + /* Ease our life a bit ;) */ ksession = session->kernel_session; usess = session->ust_session; @@ -2765,13 +3066,18 @@ static int cmd_start_trace(struct ltt_session *session) session->enabled = 1; + ret = setup_relayd(session); + if (ret != LTTCOMM_OK) { + ERR("Error setting up relayd for session %s", session->name); + goto error; + } + /* Kernel tracing */ if (ksession != NULL) { - struct ltt_kernel_channel *kchan; - /* Open kernel metadata */ if (ksession->metadata == NULL) { - ret = kernel_open_metadata(ksession, ksession->trace_path); + ret = kernel_open_metadata(ksession, + ksession->consumer->dst.trace_path); if (ret < 0) { ret = LTTCOMM_KERN_META_FAIL; goto error; @@ -2861,12 +3167,15 @@ static int cmd_stop_trace(struct ltt_session *session) if (ksession != NULL) { DBG("Stop kernel tracing"); - /* Flush all buffers before stopping */ - ret = kernel_metadata_flush_buffer(ksession->metadata_stream_fd); - if (ret < 0) { - ERR("Kernel metadata flush failed"); + /* Flush metadata if exist */ + if (ksession->metadata_stream_fd >= 0) { + ret = kernel_metadata_flush_buffer(ksession->metadata_stream_fd); + if (ret < 0) { + ERR("Kernel metadata flush failed"); + } } + /* Flush all buffers before stopping */ cds_list_for_each_entry(kchan, &ksession->channel_list.head, list) { ret = kernel_flush_buffer(kchan); if (ret < 0) { @@ -2900,19 +3209,108 @@ error: } /* - * Command LTTNG_CREATE_SESSION processed by the client thread. + * Command LTTNG_CREATE_SESSION_URI processed by the client thread. */ -static int cmd_create_session(char *name, char *path, lttng_sock_cred *creds) +static int cmd_create_session_uri(char *name, struct lttng_uri *ctrl_uri, + struct lttng_uri *data_uri, unsigned int enable_consumer, + lttng_sock_cred *creds) { int ret; + char *path = NULL; + struct ltt_session *session; + struct consumer_output *consumer; + + /* Verify if the session already exist */ + session = session_find_by_name(name); + if (session != NULL) { + ret = LTTCOMM_EXIST_SESS; + goto error; + } + + /* TODO: validate URIs */ + + /* Create default consumer output */ + consumer = consumer_create_output(CONSUMER_DST_LOCAL); + if (consumer == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + strncpy(consumer->subdir, ctrl_uri->subdir, sizeof(consumer->subdir)); + DBG2("Consumer subdir set to %s", consumer->subdir); + + switch (ctrl_uri->dtype) { + case LTTNG_DST_IPV4: + case LTTNG_DST_IPV6: + /* Set control URI into consumer output object */ + ret = consumer_set_network_uri(consumer, ctrl_uri); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Set data URI into consumer output object */ + ret = consumer_set_network_uri(consumer, data_uri); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Empty path since the session is network */ + path = ""; + break; + case LTTNG_DST_PATH: + /* Very volatile pointer. Only used for the create session. */ + path = ctrl_uri->dst.path; + strncpy(consumer->dst.trace_path, path, + sizeof(consumer->dst.trace_path)); + break; + } + + /* Set if the consumer is enabled or not */ + consumer->enabled = enable_consumer; ret = session_create(name, path, LTTNG_SOCK_GET_UID_CRED(creds), LTTNG_SOCK_GET_GID_CRED(creds)); if (ret != LTTCOMM_OK) { - goto error; + goto consumer_error; } - ret = LTTCOMM_OK; + /* Get the newly created session pointer back */ + session = session_find_by_name(name); + assert(session); + + /* Assign consumer to session */ + session->consumer = consumer; + + return LTTCOMM_OK; + +consumer_error: + consumer_destroy_output(consumer); +error: + return ret; +} + +/* + * Command LTTNG_CREATE_SESSION processed by the client thread. + */ +static int cmd_create_session(char *name, char *path, lttng_sock_cred *creds) +{ + int ret; + struct lttng_uri uri; + + /* Zeroed temporary URI */ + memset(&uri, 0, sizeof(uri)); + + uri.dtype = LTTNG_DST_PATH; + uri.utype = LTTNG_URI_DST; + strncpy(uri.dst.path, path, sizeof(uri.dst.path)); + + /* TODO: Strip date-time from path and put it in uri's subdir */ + + ret = cmd_create_session_uri(name, &uri, NULL, 1, creds); + if (ret != LTTCOMM_OK) { + goto error; + } error: return ret; @@ -3146,6 +3544,384 @@ error: return ret; } +/* + * Command LTTNG_SET_CONSUMER_URI processed by the client thread. + */ +static int cmd_set_consumer_uri(int domain, struct ltt_session *session, + struct lttng_uri *uri) +{ + int ret; + struct ltt_kernel_session *ksess = session->kernel_session; + struct ltt_ust_session *usess = session->ust_session; + struct consumer_output *consumer; + + /* Can't enable consumer after session started. */ + if (session->enabled) { + ret = LTTCOMM_TRACE_ALREADY_STARTED; + goto error; + } + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + /* Code flow error if we don't have a kernel session here. */ + assert(ksess); + + /* Create consumer output if none exists */ + consumer = ksess->tmp_consumer; + if (consumer == NULL) { + consumer = consumer_copy_output(ksess->consumer); + if (consumer == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + /* Reassign new pointer */ + ksess->tmp_consumer = consumer; + } + + switch (uri->dtype) { + case LTTNG_DST_IPV4: + case LTTNG_DST_IPV6: + DBG2("Setting network URI for kernel session %s", session->name); + + /* Set URI into consumer output object */ + ret = consumer_set_network_uri(consumer, uri); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* On a new subdir, reappend the default trace dir. */ + if (strlen(uri->subdir) != 0) { + strncat(consumer->subdir, DEFAULT_KERNEL_TRACE_DIR, + sizeof(consumer->subdir)); + } + + ret = send_socket_relayd_consumer(domain, session, uri, consumer, + ksess->consumer_fd); + if (ret != LTTCOMM_OK) { + goto error; + } + break; + case LTTNG_DST_PATH: + DBG2("Setting trace directory path from URI to %s", uri->dst.path); + memset(consumer->dst.trace_path, 0, + sizeof(consumer->dst.trace_path)); + strncpy(consumer->dst.trace_path, uri->dst.path, + sizeof(consumer->dst.trace_path)); + /* Append default kernel trace dir */ + strncat(consumer->dst.trace_path, DEFAULT_KERNEL_TRACE_DIR, + sizeof(consumer->dst.trace_path)); + break; + } + + /* All good! */ + break; + case LTTNG_DOMAIN_UST: + /* Code flow error if we don't have a kernel session here. */ + assert(usess); + + /* Create consumer output if none exists */ + consumer = usess->tmp_consumer; + if (consumer == NULL) { + consumer = consumer_copy_output(usess->consumer); + if (consumer == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + /* Reassign new pointer */ + usess->tmp_consumer = consumer; + } + + switch (uri->dtype) { + case LTTNG_DST_IPV4: + case LTTNG_DST_IPV6: + { + DBG2("Setting network URI for UST session %s", session->name); + + /* Set URI into consumer object */ + ret = consumer_set_network_uri(consumer, uri); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* On a new subdir, reappend the default trace dir. */ + if (strlen(uri->subdir) != 0) { + strncat(consumer->subdir, DEFAULT_UST_TRACE_DIR, + sizeof(consumer->subdir)); + } + + if (ust_consumerd64_fd >= 0) { + ret = send_socket_relayd_consumer(domain, session, uri, + consumer, ust_consumerd64_fd); + if (ret != LTTCOMM_OK) { + goto error; + } + } + + if (ust_consumerd32_fd >= 0) { + ret = send_socket_relayd_consumer(domain, session, uri, + consumer, ust_consumerd32_fd); + if (ret != LTTCOMM_OK) { + goto error; + } + } + + break; + } + case LTTNG_DST_PATH: + DBG2("Setting trace directory path from URI to %s", uri->dst.path); + memset(consumer->dst.trace_path, 0, + sizeof(consumer->dst.trace_path)); + strncpy(consumer->dst.trace_path, uri->dst.path, + sizeof(consumer->dst.trace_path)); + /* Append default UST trace dir */ + strncat(consumer->dst.trace_path, DEFAULT_UST_TRACE_DIR, + sizeof(consumer->dst.trace_path)); + break; + } + break; + } + + /* All good! */ + ret = LTTCOMM_OK; + +error: + return ret; +} + +/* + * Command LTTNG_DISABLE_CONSUMER processed by the client thread. + */ +static int cmd_disable_consumer(int domain, struct ltt_session *session) +{ + int ret; + struct ltt_kernel_session *ksess = session->kernel_session; + struct ltt_ust_session *usess = session->ust_session; + struct consumer_output *consumer; + + if (session->enabled) { + /* Can't disable consumer on an already started session */ + ret = LTTCOMM_TRACE_ALREADY_STARTED; + goto error; + } + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + /* Code flow error if we don't have a kernel session here. */ + assert(ksess); + + DBG("Disabling kernel consumer"); + consumer = ksess->consumer; + + break; + case LTTNG_DOMAIN_UST: + /* Code flow error if we don't have a UST session here. */ + assert(usess); + + DBG("Disabling UST consumer"); + consumer = usess->consumer; + + break; + default: + ret = LTTCOMM_UNKNOWN_DOMAIN; + goto error; + } + + assert(consumer); + consumer->enabled = 0; + + /* Success at this point */ + ret = LTTCOMM_OK; + +error: + return ret; +} + +/* + * Command LTTNG_ENABLE_CONSUMER processed by the client thread. + */ +static int cmd_enable_consumer(int domain, struct ltt_session *session) +{ + int ret; + struct ltt_kernel_session *ksess = session->kernel_session; + struct ltt_ust_session *usess = session->ust_session; + struct consumer_output *tmp_out; + + /* Can't enable consumer after session started. */ + if (session->enabled) { + ret = LTTCOMM_TRACE_ALREADY_STARTED; + goto error; + } + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + /* Code flow error if we don't have a kernel session here. */ + assert(ksess); + + /* + * Check if we have already sent fds to the consumer. In that case, + * the enable-consumer command can't be used because a start trace + * had previously occured. + */ + if (ksess->consumer_fds_sent) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto error; + } + + tmp_out = ksess->tmp_consumer; + if (tmp_out == NULL) { + /* No temp. consumer output exists. Using the current one. */ + DBG3("No temporary consumer. Using default"); + ret = LTTCOMM_OK; + goto error; + } + + switch (tmp_out->type) { + case CONSUMER_DST_LOCAL: + DBG2("Consumer output is local. Creating directory(ies)"); + + /* Create directory(ies) */ + ret = run_as_mkdir_recursive(tmp_out->dst.trace_path, + S_IRWXU | S_IRWXG, session->uid, session->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); + ret = LTTCOMM_FATAL; + goto error; + } + } + break; + case CONSUMER_DST_NET: + DBG2("Consumer output is network. Validating URIs"); + /* Validate if we have both control and data path set. */ + if (!tmp_out->dst.net.control_isset) { + ret = LTTCOMM_URI_CTRL_MISS; + goto error; + } + + if (!tmp_out->dst.net.data_isset) { + ret = LTTCOMM_URI_DATA_MISS; + goto error; + } + + /* Check established network session state */ + if (session->net_handle == 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + ERR("Session network handle is not set on enable-consumer"); + goto error; + } + + /* Append default kernel trace dir to subdir */ + strncat(ksess->consumer->subdir, DEFAULT_KERNEL_TRACE_DIR, + sizeof(ksess->consumer->subdir)); + + break; + } + + /* + * @session-lock + * This is race free for now since the session lock is acquired before + * ending up in this function. No other threads can access this kernel + * session without this lock hence freeing the consumer output object + * is valid. + */ + consumer_destroy_output(ksess->consumer); + ksess->consumer = tmp_out; + ksess->tmp_consumer = NULL; + + break; + case LTTNG_DOMAIN_UST: + /* Code flow error if we don't have a UST session here. */ + assert(usess); + + /* + * Check if we have already sent fds to the consumer. In that case, + * the enable-consumer command can't be used because a start trace + * had previously occured. + */ + if (usess->start_trace) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto error; + } + + tmp_out = usess->tmp_consumer; + if (tmp_out == NULL) { + /* No temp. consumer output exists. Using the current one. */ + DBG3("No temporary consumer. Using default"); + ret = LTTCOMM_OK; + goto error; + } + + switch (tmp_out->type) { + case CONSUMER_DST_LOCAL: + DBG2("Consumer output is local. Creating directory(ies)"); + + /* Create directory(ies) */ + ret = run_as_mkdir_recursive(tmp_out->dst.trace_path, + S_IRWXU | S_IRWXG, session->uid, session->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); + ret = LTTCOMM_FATAL; + goto error; + } + } + break; + case CONSUMER_DST_NET: + DBG2("Consumer output is network. Validating URIs"); + /* Validate if we have both control and data path set. */ + if (!tmp_out->dst.net.control_isset) { + ret = LTTCOMM_URI_CTRL_MISS; + goto error; + } + + if (!tmp_out->dst.net.data_isset) { + ret = LTTCOMM_URI_DATA_MISS; + goto error; + } + + /* Check established network session state */ + if (session->net_handle == 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + DBG2("Session network handle is not set on enable-consumer"); + goto error; + } + + if (tmp_out->net_seq_index == -1) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + DBG2("Network index is not set on the consumer"); + goto error; + } + + /* Append default kernel trace dir to subdir */ + strncat(usess->consumer->subdir, DEFAULT_UST_TRACE_DIR, + sizeof(usess->consumer->subdir)); + + break; + } + + /* + * @session-lock + * This is race free for now since the session lock is acquired before + * ending up in this function. No other threads can access this kernel + * session without this lock hence freeing the consumer output object + * is valid. + */ + consumer_destroy_output(usess->consumer); + usess->consumer = tmp_out; + usess->tmp_consumer = NULL; + + break; + } + + /* Success at this point */ + ret = LTTCOMM_OK; + +error: + return ret; +} + /* * Process the command requested by the lttng client within the command * context structure. This function make sure that the return structure (llm) @@ -3163,6 +3939,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: + case LTTNG_CREATE_SESSION_URI: case LTTNG_DESTROY_SESSION: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_DOMAINS: @@ -3209,6 +3986,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) /* Commands that DO NOT need a session. */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: + case LTTNG_CREATE_SESSION_URI: case LTTNG_CALIBRATE: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_TRACEPOINTS: @@ -3288,6 +4066,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx) goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); + + /* Set consumer fd of the session */ + cmd_ctx->session->kernel_session->consumer_fd = + kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } @@ -3310,6 +4092,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx) goto error; } } + /* Start the UST consumer daemons */ /* 64-bit */ pthread_mutex_lock(&ustconsumer64_data.pid_mutex); @@ -3417,12 +4200,22 @@ skip_domain: cmd_ctx->lsm->u.disable.channel_name); break; } + case LTTNG_DISABLE_CONSUMER: + { + ret = cmd_disable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); + break; + } case LTTNG_ENABLE_CHANNEL: { ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, &cmd_ctx->lsm->u.channel.chan); break; } + case LTTNG_ENABLE_CONSUMER: + { + ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); + break; + } case LTTNG_ENABLE_EVENT: { ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, @@ -3499,7 +4292,12 @@ skip_domain: ret = LTTCOMM_OK; break; } - + case LTTNG_SET_CONSUMER_URI: + { + ret = cmd_set_consumer_uri(cmd_ctx->lsm->domain.type, cmd_ctx->session, + &cmd_ctx->lsm->u.uri); + break; + } case LTTNG_START_TRACE: { ret = cmd_start_trace(cmd_ctx->session); @@ -3516,6 +4314,14 @@ skip_domain: cmd_ctx->lsm->session.path, &cmd_ctx->creds); break; } + case LTTNG_CREATE_SESSION_URI: + { + ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, + &cmd_ctx->lsm->u.create_uri.ctrl_uri, + &cmd_ctx->lsm->u.create_uri.data_uri, + cmd_ctx->lsm->u.create_uri.enable_consumer, &cmd_ctx->creds); + break; + } case LTTNG_DESTROY_SESSION: { ret = cmd_destroy_session(cmd_ctx->session, @@ -4560,6 +5366,12 @@ int main(int argc, char **argv) /* Set up max poll set size */ lttng_poll_set_max_size(); + /* + * Set network sequence index to 1 for streams to match a relayd socket on + * the consumer side. + */ + uatomic_set(&relayd_net_seq_idx, 1); + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL); diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index bcdd78d50..f9d20cc56 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -166,12 +166,6 @@ int session_create(char *name, char *path, uid_t uid, gid_t gid) int ret; struct ltt_session *new_session; - new_session = session_find_by_name(name); - if (new_session != NULL) { - ret = LTTCOMM_EXIST_SESS; - goto error_exist; - } - /* Allocate session data structure */ new_session = zmalloc(sizeof(struct ltt_session)); if (new_session == NULL) { @@ -214,13 +208,16 @@ int session_create(char *name, char *path, uid_t uid, gid_t gid) new_session->uid = uid; new_session->gid = gid; - ret = run_as_mkdir_recursive(new_session->path, S_IRWXU | S_IRWXG, - new_session->uid, new_session->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - ret = LTTCOMM_CREATE_DIR_FAIL; - goto error; + /* Mkdir if we have a valid path length */ + if (strlen(new_session->path) > 0) { + ret = run_as_mkdir_recursive(new_session->path, S_IRWXU | S_IRWXG, + new_session->uid, new_session->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); + ret = LTTCOMM_CREATE_DIR_FAIL; + goto error; + } } } @@ -241,7 +238,6 @@ error_asprintf: free(new_session); } -error_exist: error_malloc: return ret; } diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 8294a83fd..ebb65bfd8 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -70,6 +70,18 @@ struct ltt_session { /* UID/GID of the user owning the session */ uid_t uid; gid_t gid; + /* + * Network session handle. A value of 0 means that there is no remote + * session established. + */ + uint64_t net_handle; + /* + * This consumer is only set when the create_session_uri call is made. + * This contains the temporary information for a consumer output. Upon + * creation of the UST or kernel session, this consumer, if available, is + * copied into those sessions. + */ + struct consumer_output *consumer; }; /* Prototypes */ diff --git a/src/bin/lttng-sessiond/trace-kernel.c b/src/bin/lttng-sessiond/trace-kernel.c index 293ca381b..c1104fac2 100644 --- a/src/bin/lttng-sessiond/trace-kernel.c +++ b/src/bin/lttng-sessiond/trace-kernel.c @@ -24,6 +24,7 @@ #include #include +#include "consumer.h" #include "trace-kernel.h" /* @@ -103,6 +104,27 @@ struct ltt_kernel_session *trace_kernel_create_session(char *path) lks->consumer_fd = -1; CDS_INIT_LIST_HEAD(&lks->channel_list.head); + /* Create default consumer output object */ + lks->consumer = consumer_create_output(CONSUMER_DST_LOCAL); + if (lks->consumer == NULL) { + goto error; + } + + /* + * The tmp_consumer stays NULL until a set_consumer_uri command is + * executed. At this point, the consumer should be nullify until an + * enable_consumer command. This assignment is symbolic since we've zmalloc + * the struct. + */ + lks->tmp_consumer = NULL; + + /* Use the default consumer output which is the tracing session path. */ + ret = snprintf(lks->consumer->dst.trace_path, PATH_MAX, "%s/kernel", path); + if (ret < 0) { + PERROR("snprintf consumer trace path"); + goto error; + } + /* Set session path */ ret = asprintf(&lks->trace_path, "%s/kernel", path); if (ret < 0) { @@ -121,9 +143,9 @@ error: * * Return pointer to structure or NULL. */ -struct ltt_kernel_channel *trace_kernel_create_channel(struct lttng_channel *chan, char *path) +struct ltt_kernel_channel *trace_kernel_create_channel( + struct lttng_channel *chan, char *path) { - int ret; struct ltt_kernel_channel *lkc; lkc = zmalloc(sizeof(struct ltt_kernel_channel)); @@ -147,12 +169,6 @@ struct ltt_kernel_channel *trace_kernel_create_channel(struct lttng_channel *cha /* Init linked list */ CDS_INIT_LIST_HEAD(&lkc->events_list.head); CDS_INIT_LIST_HEAD(&lkc->stream_list.head); - /* Set default trace output path */ - ret = asprintf(&lkc->pathname, "%s", path); - if (ret < 0) { - PERROR("asprintf kernel create channel"); - goto error; - } return lkc; @@ -240,7 +256,6 @@ error: */ struct ltt_kernel_metadata *trace_kernel_create_metadata(char *path) { - int ret; struct ltt_kernel_metadata *lkm; struct lttng_channel *chan; @@ -262,12 +277,6 @@ struct ltt_kernel_metadata *trace_kernel_create_metadata(char *path) /* Init metadata */ lkm->fd = -1; lkm->conf = chan; - /* Set default metadata path */ - ret = asprintf(&lkm->pathname, "%s/metadata", path); - if (ret < 0) { - PERROR("asprintf kernel metadata"); - goto error; - } return lkm; @@ -283,8 +292,10 @@ error: * * Return pointer to structure or NULL. */ -struct ltt_kernel_stream *trace_kernel_create_stream(void) +struct ltt_kernel_stream *trace_kernel_create_stream(const char *name, + unsigned int count) { + int ret; struct ltt_kernel_stream *lks; lks = zmalloc(sizeof(struct ltt_kernel_stream)); @@ -293,9 +304,16 @@ struct ltt_kernel_stream *trace_kernel_create_stream(void) goto error; } + /* Set name */ + ret = snprintf(lks->name, sizeof(lks->name), "%s_%d", name, count); + if (ret < 0) { + PERROR("snprintf stream name"); + goto error; + } + lks->name[sizeof(lks->name) - 1] = '\0'; + /* Init stream */ lks->fd = -1; - lks->pathname = NULL; lks->state = 0; return lks; @@ -322,7 +340,6 @@ void trace_kernel_destroy_stream(struct ltt_kernel_stream *stream) /* Remove from stream list */ cds_list_del(&stream->list); - free(stream->pathname); free(stream); } @@ -383,7 +400,6 @@ void trace_kernel_destroy_channel(struct ltt_kernel_channel *channel) /* Remove from channel list */ cds_list_del(&channel->list); - free(channel->pathname); free(channel->channel); free(channel->ctx); free(channel); @@ -406,7 +422,6 @@ void trace_kernel_destroy_metadata(struct ltt_kernel_metadata *metadata) } free(metadata->conf); - free(metadata->pathname); free(metadata); } @@ -443,6 +458,10 @@ void trace_kernel_destroy_session(struct ltt_kernel_session *session) trace_kernel_destroy_channel(channel); } + /* Wipe consumer output object */ + consumer_destroy_output(session->consumer); + consumer_destroy_output(session->tmp_consumer); + free(session->trace_path); free(session); } diff --git a/src/bin/lttng-sessiond/trace-kernel.h b/src/bin/lttng-sessiond/trace-kernel.h index d6f67265e..bc4c0e763 100644 --- a/src/bin/lttng-sessiond/trace-kernel.h +++ b/src/bin/lttng-sessiond/trace-kernel.h @@ -23,6 +23,8 @@ #include #include +#include "consumer.h" + /* Kernel event list */ struct ltt_kernel_event_list { struct cds_list_head head; @@ -55,7 +57,6 @@ struct ltt_kernel_event { struct ltt_kernel_channel { int fd; int enabled; - char *pathname; unsigned int stream_count; unsigned int event_count; /* @@ -72,15 +73,15 @@ struct ltt_kernel_channel { /* Metadata */ struct ltt_kernel_metadata { int fd; - char *pathname; struct lttng_channel *conf; }; /* Channel stream */ struct ltt_kernel_stream { int fd; - char *pathname; int state; + /* Format is %s_%d respectively channel name and CPU number. */ + char name[LTTNG_SYMBOL_NAME_LEN]; struct cds_list_head list; }; @@ -98,6 +99,14 @@ struct ltt_kernel_session { /* UID/GID of the user owning the session */ uid_t uid; gid_t gid; + /* + * Two consumer_output object are needed where one is needed for the + * current output object and the second one is the temporary object used to + * store URI being set by the lttng_set_consumer_uri call. Once + * lttng_enable_consumer is called, the two pointers are swapped. + */ + struct consumer_output *consumer; + struct consumer_output *tmp_consumer; }; /* @@ -115,7 +124,8 @@ struct ltt_kernel_session *trace_kernel_create_session(char *path); struct ltt_kernel_channel *trace_kernel_create_channel(struct lttng_channel *chan, char *path); struct ltt_kernel_event *trace_kernel_create_event(struct lttng_event *ev); struct ltt_kernel_metadata *trace_kernel_create_metadata(char *path); -struct ltt_kernel_stream *trace_kernel_create_stream(void); +struct ltt_kernel_stream *trace_kernel_create_stream(const char *name, + unsigned int count); /* * Destroy functions free() the data structure and remove from linked list if diff --git a/src/bin/lttng-sessiond/trace-ust.c b/src/bin/lttng-sessiond/trace-ust.c index ed635f960..cd1660d59 100644 --- a/src/bin/lttng-sessiond/trace-ust.c +++ b/src/bin/lttng-sessiond/trace-ust.c @@ -109,6 +109,26 @@ struct ltt_ust_session *trace_ust_create_session(char *path, /* Alloc UST global domain channels' HT */ lus->domain_global.channels = lttng_ht_new(0, LTTNG_HT_TYPE_STRING); + lus->consumer = consumer_create_output(CONSUMER_DST_LOCAL); + if (lus->consumer == NULL) { + goto error; + } + + /* + * The tmp_consumer stays NULL until a set_consumer_uri command is + * executed. At this point, the consumer should be nullify until an + * enable_consumer command. This assignment is symbolic since we've zmalloc + * the struct. + */ + lus->tmp_consumer = NULL; + + /* Use the default consumer output which is the tracing session path. */ + ret = snprintf(lus->consumer->dst.trace_path, PATH_MAX, "%s/ust", path); + if (ret < 0) { + PERROR("snprintf UST consumer trace path"); + goto error; + } + /* Set session path */ ret = snprintf(lus->pathname, PATH_MAX, "%s/ust", path); if (ret < 0) { diff --git a/src/bin/lttng-sessiond/trace-ust.h b/src/bin/lttng-sessiond/trace-ust.h index c59490b2a..f8bcb3356 100644 --- a/src/bin/lttng-sessiond/trace-ust.h +++ b/src/bin/lttng-sessiond/trace-ust.h @@ -25,6 +25,7 @@ #include #include +#include "consumer.h" #include "ust-ctl.h" /* UST Stream list */ @@ -51,6 +52,8 @@ struct ltt_ust_event { struct ltt_ust_stream { int handle; char pathname[PATH_MAX]; + /* Format is %s_%d respectively channel name and CPU number. */ + char name[LTTNG_SYMBOL_NAME_LEN]; struct lttng_ust_object_data *obj; /* Using a list of streams to keep order. */ struct cds_list_head list; @@ -111,6 +114,14 @@ struct ltt_ust_session { /* UID/GID of the user owning the session */ uid_t uid; gid_t gid; + /* + * Two consumer_output object are needed where one is for the current + * output object and the second one is the temporary object used to store + * URI being set by the lttng_set_consumer_uri call. Once + * lttng_enable_consumer is called, the two pointers are swapped. + */ + struct consumer_output *consumer; + struct consumer_output *tmp_consumer; }; #ifdef HAVE_LIBLTTNG_UST_CTL diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 956c5dafe..c8c653134 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -797,8 +797,8 @@ static void shadow_copy_session(struct ust_app_session *ua_sess, ua_sess->uid = usess->uid; ua_sess->gid = usess->gid; - ret = snprintf(ua_sess->path, PATH_MAX, "%s/%s-%d-%s", usess->pathname, - app->name, app->pid, datetime); + ret = snprintf(ua_sess->path, PATH_MAX, "%s-%d-%s/", app->name, app->pid, + datetime); if (ret < 0) { PERROR("asprintf UST shadow copy session"); /* TODO: We cannot return an error from here.. */ @@ -1225,13 +1225,6 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, goto error; } - ret = run_as_mkdir(ua_sess->path, S_IRWXU | S_IRWXG, - ua_sess->uid, ua_sess->gid); - if (ret < 0) { - PERROR("mkdir UST metadata"); - goto error; - } - ret = snprintf(ua_sess->metadata->pathname, PATH_MAX, "%s/metadata", ua_sess->path); if (ret < 0) { @@ -1548,7 +1541,6 @@ int ust_app_list_event_fields(struct lttng_event_field **fields) goto rcu_error; } } - memcpy(tmp[count].field_name, uiter.field_name, LTTNG_UST_SYM_NAME_LEN); tmp[count].type = uiter.type; @@ -2100,9 +2092,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) /* Order is important */ cds_list_add_tail(&ustream->list, &ua_chan->streams.head); - ret = snprintf(ustream->pathname, PATH_MAX, "%s/%s_%u", - ua_sess->path, ua_chan->name, - ua_chan->streams.count++); + ret = snprintf(ustream->name, sizeof(ustream->name), "%s_%u", + ua_chan->name, ua_chan->streams.count++); if (ret < 0) { PERROR("asprintf UST create stream"); /* @@ -2111,8 +2102,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) */ continue; } - DBG2("UST stream %d ready at %s", ua_chan->streams.count, - ustream->pathname); + DBG2("UST stream %d ready (handle: %d)", ua_chan->streams.count, + ustream->handle); } } @@ -2129,7 +2120,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) } /* Setup UST consumer socket and send fds to it */ - ret = ust_consumer_send_session(consumerd_fd, ua_sess); + ret = ust_consumer_send_session(consumerd_fd, ua_sess, usess->consumer); if (ret < 0) { goto error_rcu_unlock; } diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 03b0c7d03..fa5f071f0 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -25,86 +25,96 @@ #include #include #include -#include +#include "consumer.h" #include "ust-consumer.h" /* * Send all stream fds of UST channel to the consumer. */ static int send_channel_streams(int sock, - struct ust_app_channel *uchan, - uid_t uid, gid_t gid) + struct ust_app_channel *uchan, const char *path, + uid_t uid, gid_t gid, struct consumer_output *consumer) { int ret, fd; + char tmp_path[PATH_MAX]; + const char *pathname; struct lttcomm_consumer_msg lum; struct ltt_ust_stream *stream, *tmp; DBG("Sending streams of channel %s to UST consumer", uchan->name); - /* Send channel */ - lum.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; - - /* - * We need to keep shm_fd open while we transfer the stream file - * descriptors to make sure this key stays unique within the - * session daemon. We can free the channel shm_fd without - * problem after we finished sending stream fds for that - * channel. - */ - lum.u.channel.channel_key = uchan->obj->shm_fd; - lum.u.channel.max_sb_size = uchan->attr.subbuf_size; - lum.u.channel.mmap_len = uchan->obj->memory_map_size; - DBG("Sending channel %d to consumer", lum.u.channel.channel_key); - ret = lttcomm_send_unix_sock(sock, &lum, sizeof(lum)); + consumer_init_channel_comm_msg(&lum, + LTTNG_CONSUMER_ADD_CHANNEL, + uchan->obj->shm_fd, + uchan->attr.subbuf_size, + uchan->obj->memory_map_size, + uchan->name); + + ret = consumer_send_channel(sock, &lum); if (ret < 0) { - PERROR("send consumer channel"); goto error; } + fd = uchan->obj->shm_fd; - ret = lttcomm_send_fds_unix_sock(sock, &fd, 1); + ret = consumer_send_fds(sock, &fd, 1); if (ret < 0) { - PERROR("send consumer channel ancillary data"); goto error; } + /* Get the right path name destination */ + if (consumer->type == CONSUMER_DST_LOCAL) { + /* Set application path to the destination path */ + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->dst.trace_path, path); + if (ret < 0) { + PERROR("snprintf stream path"); + goto error; + } + pathname = tmp_path; + DBG3("UST local consumer tracefile path: %s", pathname); + } else { + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->subdir, path); + if (ret < 0) { + PERROR("snprintf stream path"); + goto error; + } + pathname = tmp_path; + DBG3("UST network consumer subdir path: %s", pathname); + } + cds_list_for_each_entry_safe(stream, tmp, &uchan->streams.head, list) { int fds[2]; if (!stream->obj->shm_fd) { continue; } - lum.cmd_type = LTTNG_CONSUMER_ADD_STREAM; - lum.u.stream.channel_key = uchan->obj->shm_fd; - lum.u.stream.stream_key = stream->obj->shm_fd; - lum.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM; - /* - * FIXME Hack alert! we force MMAP for now. Mixup - * between EVENT and UST enums elsewhere. - */ - lum.u.stream.output = DEFAULT_UST_CHANNEL_OUTPUT; - lum.u.stream.mmap_len = stream->obj->memory_map_size; - lum.u.stream.uid = uid; - lum.u.stream.gid = gid; - strncpy(lum.u.stream.path_name, stream->pathname, PATH_MAX - 1); - lum.u.stream.path_name[PATH_MAX - 1] = '\0'; - DBG("Sending stream %d to consumer", lum.u.stream.stream_key); - ret = lttcomm_send_unix_sock(sock, &lum, sizeof(lum)); - if (ret < 0) { - PERROR("send consumer stream"); - goto error; - } + consumer_init_stream_comm_msg(&lum, + LTTNG_CONSUMER_ADD_STREAM, + uchan->obj->shm_fd, + stream->obj->shm_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_UST_CHANNEL_OUTPUT, + stream->obj->memory_map_size, + uid, + gid, + consumer->net_seq_index, + 0, /* Metadata flag unset */ + stream->name, + pathname); + + /* Send stream and file descriptor */ fds[0] = stream->obj->shm_fd; fds[1] = stream->obj->wait_fd; - ret = lttcomm_send_fds_unix_sock(sock, fds, 2); + ret = consumer_send_stream(sock, consumer, &lum, fds, 2); if (ret < 0) { - PERROR("send consumer stream ancillary data"); goto error; } } - DBG("consumer channel streams sent"); + DBG("UST consumer channel streams sent"); return 0; @@ -115,10 +125,13 @@ error: /* * Send all stream fds of the UST session to the consumer. */ -int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess) +int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, + struct consumer_output *consumer) { int ret = 0; int sock = consumer_fd; + char tmp_path[PATH_MAX]; + const char *pathname; struct lttng_ht_iter iter; struct lttcomm_consumer_msg lum; struct ust_app_channel *ua_chan; @@ -134,46 +147,73 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess) int fd; int fds[2]; - /* Send metadata channel fd */ - lum.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL; - lum.u.channel.channel_key = usess->metadata->obj->shm_fd; - lum.u.channel.max_sb_size = usess->metadata->attr.subbuf_size; - lum.u.channel.mmap_len = usess->metadata->obj->memory_map_size; - DBG("Sending metadata channel %d to consumer", lum.u.channel.channel_key); - ret = lttcomm_send_unix_sock(sock, &lum, sizeof(lum)); + consumer_init_channel_comm_msg(&lum, + LTTNG_CONSUMER_ADD_CHANNEL, + usess->metadata->obj->shm_fd, + usess->metadata->attr.subbuf_size, + usess->metadata->obj->memory_map_size, + "metadata"); + + ret = consumer_send_channel(sock, &lum); if (ret < 0) { - PERROR("send consumer channel"); goto error; } + fd = usess->metadata->obj->shm_fd; - ret = lttcomm_send_fds_unix_sock(sock, &fd, 1); + ret = consumer_send_fds(sock, &fd, 1); if (ret < 0) { - PERROR("send consumer metadata channel"); goto error; } - /* Send metadata stream fd */ - lum.cmd_type = LTTNG_CONSUMER_ADD_STREAM; - lum.u.stream.channel_key = usess->metadata->obj->shm_fd; - lum.u.stream.stream_key = usess->metadata->stream_obj->shm_fd; - lum.u.stream.state = LTTNG_CONSUMER_ACTIVE_STREAM; - lum.u.stream.output = DEFAULT_UST_CHANNEL_OUTPUT; - lum.u.stream.mmap_len = usess->metadata->stream_obj->memory_map_size; - lum.u.stream.uid = usess->uid; - lum.u.stream.gid = usess->gid; - strncpy(lum.u.stream.path_name, usess->metadata->pathname, PATH_MAX - 1); - lum.u.stream.path_name[PATH_MAX - 1] = '\0'; - DBG("Sending metadata stream %d to consumer", lum.u.stream.stream_key); - ret = lttcomm_send_unix_sock(sock, &lum, sizeof(lum)); - if (ret < 0) { - PERROR("send consumer metadata stream"); - goto error; + /* Get correct path name destination */ + if (consumer->type == CONSUMER_DST_LOCAL) { + /* Set application path to the destination path */ + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->dst.trace_path, usess->path); + if (ret < 0) { + PERROR("snprintf stream path"); + goto error; + } + pathname = tmp_path; + + /* Create directory */ + ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG, + usess->uid, usess->gid); + if (ret < 0) { + if (ret != -EEXIST) { + ERR("Trace directory creation error"); + goto error; + } + } + } else { + ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s", + consumer->subdir, usess->path); + if (ret < 0) { + PERROR("snprintf metadata path"); + goto error; + } + pathname = tmp_path; } + + consumer_init_stream_comm_msg(&lum, + LTTNG_CONSUMER_ADD_STREAM, + usess->metadata->obj->shm_fd, + usess->metadata->stream_obj->shm_fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + DEFAULT_UST_CHANNEL_OUTPUT, + usess->metadata->stream_obj->memory_map_size, + usess->uid, + usess->gid, + consumer->net_seq_index, + 1, /* Flag metadata set */ + "metadata", + pathname); + + /* Send stream and file descriptor */ fds[0] = usess->metadata->stream_obj->shm_fd; fds[1] = usess->metadata->stream_obj->wait_fd; - ret = lttcomm_send_fds_unix_sock(sock, fds, 2); + ret = consumer_send_stream(sock, consumer, &lum, fds, 2); if (ret < 0) { - PERROR("send consumer stream"); goto error; } } @@ -190,7 +230,8 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess) continue; } - ret = send_channel_streams(sock, ua_chan, usess->uid, usess->gid); + ret = send_channel_streams(sock, ua_chan, usess->path, usess->uid, + usess->gid, consumer); if (ret < 0) { rcu_read_unlock(); goto error; @@ -205,3 +246,42 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess) error: return ret; } + +/* + * Send relayd socket to consumer associated with a session name. + * + * On success return positive value. On error, negative value. + */ +int ust_consumer_send_relayd_socket(int consumer_sock, + struct lttcomm_sock *sock, struct consumer_output *consumer, + enum lttng_stream_type type) +{ + int ret; + struct lttcomm_consumer_msg msg; + + /* Code flow error. Safety net. */ + assert(sock); + + msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; + msg.u.relayd_sock.net_index = consumer->net_seq_index; + msg.u.relayd_sock.type = type; + memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); + + DBG2("Sending relayd sock info to consumer"); + ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); + if (ret < 0) { + PERROR("send consumer relayd socket info"); + goto error; + } + + DBG2("Sending relayd socket file descriptor to consumer"); + ret = consumer_send_fds(consumer_sock, &sock->fd, 1); + if (ret < 0) { + goto error; + } + + DBG("UST consumer relayd socket sent"); + +error: + return ret; +} diff --git a/src/bin/lttng-sessiond/ust-consumer.h b/src/bin/lttng-sessiond/ust-consumer.h index 0afb08f3d..dcc51dcee 100644 --- a/src/bin/lttng-sessiond/ust-consumer.h +++ b/src/bin/lttng-sessiond/ust-consumer.h @@ -18,8 +18,15 @@ #ifndef _UST_CONSUMER_H #define _UST_CONSUMER_H +#include + +#include "consumer.h" #include "ust-app.h" -int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess); +int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, + struct consumer_output *consumer); +int ust_consumer_send_relayd_socket(int consumer_sock, + struct lttcomm_sock *sock, struct consumer_output *consumer, + enum lttng_stream_type type); #endif /* _UST_CONSUMER_H */ diff --git a/src/bin/lttng/Makefile.am b/src/bin/lttng/Makefile.am index e2040d8c3..0381aa656 100644 --- a/src/bin/lttng/Makefile.am +++ b/src/bin/lttng/Makefile.am @@ -9,6 +9,8 @@ lttng_SOURCES = command.h conf.c conf.h commands/start.c \ commands/disable_channels.c commands/add_context.c \ commands/set_session.c commands/version.c \ commands/calibrate.c commands/view.c \ + commands/enable_consumer.c commands/disable_consumer.c \ utils.c utils.h lttng.c -lttng_LDADD = $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la +lttng_LDADD = $(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la \ + $(top_builddir)/src/common/libcommon.la diff --git a/src/bin/lttng/command.h b/src/bin/lttng/command.h index 27de0d5a0..68ce903de 100644 --- a/src/bin/lttng/command.h +++ b/src/bin/lttng/command.h @@ -52,5 +52,7 @@ extern int cmd_set_session(int argc, const char **argv); extern int cmd_version(int argc, const char **argv); extern int cmd_calibrate(int argc, const char **argv); extern int cmd_view(int argc, const char **argv); +extern int cmd_enable_consumer(int argc, const char **argv); +extern int cmd_disable_consumer(int argc, const char **argv); #endif /* _LTTNG_CMD_H */ diff --git a/src/bin/lttng/commands/create.c b/src/bin/lttng/commands/create.c index eac261ac7..f89e2778c 100644 --- a/src/bin/lttng/commands/create.c +++ b/src/bin/lttng/commands/create.c @@ -28,10 +28,16 @@ #include "../command.h" #include "../utils.h" +#include #include +#include static char *opt_output_path; static char *opt_session_name; +static char *opt_uris; +static char *opt_ctrl_uris; +static char *opt_data_uris; +static int opt_no_consumer = 1; enum { OPT_HELP = 1, @@ -43,6 +49,10 @@ static struct poptOption long_options[] = { {"help", 'h', POPT_ARG_NONE, NULL, OPT_HELP, NULL, NULL}, {"output", 'o', POPT_ARG_STRING, &opt_output_path, 0, NULL, NULL}, {"list-options", 0, POPT_ARG_NONE, NULL, OPT_LIST_OPTIONS, NULL, NULL}, + {"set-uri", 'U', POPT_ARG_STRING, &opt_uris, 0, 0, 0}, + {"ctrl-uri", 'C', POPT_ARG_STRING, &opt_ctrl_uris, 0, 0, 0}, + {"data-uri", 'D', POPT_ARG_STRING, &opt_data_uris, 0, 0, 0}, + {"no-consumer", 0, POPT_ARG_NONE, &opt_no_consumer, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0} }; @@ -57,9 +67,81 @@ static void usage(FILE *ofp) fprintf(ofp, " -h, --help Show this help\n"); fprintf(ofp, " --list-options Simple listing of options\n"); fprintf(ofp, " -o, --output PATH Specify output path for traces\n"); + fprintf(ofp, " -U, --set-uri=URI Set URI for the enable-consumer destination.\n"); + fprintf(ofp, " It is persistent for the session lifetime.\n"); + fprintf(ofp, " Redo the command to change it.\n"); + fprintf(ofp, " This will set both data and control URI for network.\n"); + fprintf(ofp, " -C, --ctrl-uri=URI Set control path URI.\n"); + fprintf(ofp, " -D, --data-uri=URI Set data path URI.\n"); + fprintf(ofp, " --no-consumer Disable consumer for entire tracing session.\n"); fprintf(ofp, "\n"); } +/* + * Parse URI from string to lttng_uri object array. + */ +static ssize_t parse_uri_from_str(const char *str_uri, struct lttng_uri **uris) +{ + int i; + ssize_t size; + struct lttng_uri *uri; + + if (*uris != NULL) { + free(*uris); + } + + size = uri_parse(str_uri, uris); + if (size < 1) { + ERR("Bad URI %s. Either the hostname or IP is invalid", str_uri); + size = -1; + } + + for (i = 0; i < size; i++) { + uri = (struct lttng_uri *) &uris[i]; + /* Set default port if none was given */ + if (uri->port == 0) { + if (uri->stype == LTTNG_STREAM_CONTROL) { + uri->port = DEFAULT_NETWORK_CONTROL_PORT; + } else if (uri->stype == LTTNG_STREAM_DATA) { + uri->port = DEFAULT_NETWORK_DATA_PORT; + } + } + } + + return size; +} + +/* + * Print URI message. + */ +static void print_uri_msg(struct lttng_uri *uri) +{ + char *dst; + + switch (uri->dtype) { + case LTTNG_DST_IPV4: + dst = uri->dst.ipv4; + break; + case LTTNG_DST_IPV6: + dst = uri->dst.ipv6; + break; + case LTTNG_DST_PATH: + dst = uri->dst.path; + MSG("Consumer destination set to %s", dst); + goto end; + default: + DBG("Unknown URI destination"); + goto end; + } + + MSG("Consumer %s stream set to %s with the %s protocol on port %d", + uri->stype == LTTNG_STREAM_CONTROL ? "control" : "data", + dst, uri->proto == LTTNG_TCP ? "TCP" : "UNK", uri->port); + +end: + return; +} + /* * Create a tracing session. * If no name is specified, a default name is generated. @@ -68,11 +150,13 @@ static void usage(FILE *ofp) */ static int create_session() { - int ret, have_name = 0; + int ret, have_name = 0, i; char datetime[16]; char *session_name, *traces_path = NULL, *alloc_path = NULL; time_t rawtime; + ssize_t size; struct tm *timeinfo; + struct lttng_uri *uris = NULL, *ctrl_uri = NULL, *data_uri = NULL; /* Get date and time for automatic session name/path */ time(&rawtime); @@ -92,38 +176,139 @@ static int create_session() have_name = 1; } - /* Auto output path */ - if (opt_output_path == NULL) { + if (opt_output_path != NULL) { + traces_path = expand_full_path(opt_output_path); + if (traces_path == NULL) { + ret = CMD_ERROR; + goto error; + } + + ret = asprintf(&alloc_path, "file://%s", traces_path); + if (ret < 0) { + PERROR("asprintf expand path"); + ret = CMD_FATAL; + goto error; + } + + ret = uri_parse(alloc_path, &ctrl_uri); + if (ret < 1) { + ret = CMD_FATAL; + goto error; + } + } else if (opt_uris) { /* Handling URIs (-U opt) */ + size = parse_uri_from_str(opt_uris, &uris); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } else if (size == 1 && uris[0].dtype != LTTNG_DST_PATH) { + ERR("Only net:// and file:// are supported. " + "Use -C and -D for more fine grained control"); + ret = CMD_ERROR; + goto error; + } else if (size == 2) { + uris[0].stype = LTTNG_STREAM_CONTROL; + uris[1].stype = LTTNG_STREAM_DATA; + + for (i = 0; i < size; i++) { + /* Set default port if none was given */ + if (uris[i].port == 0) { + if (uris[i].stype == LTTNG_STREAM_CONTROL) { + uris[i].port = DEFAULT_NETWORK_CONTROL_PORT; + } else { + uris[i].port = DEFAULT_NETWORK_DATA_PORT; + } + } + } + + ctrl_uri = &uris[0]; + print_uri_msg(ctrl_uri); + data_uri = &uris[1]; + print_uri_msg(data_uri); + } else { + ctrl_uri = &uris[0]; + print_uri_msg(ctrl_uri); + } + } else if (opt_ctrl_uris || opt_data_uris) { + /* Setting up control URI (-C opt) */ + if (opt_ctrl_uris) { + size = parse_uri_from_str(opt_ctrl_uris, &uris); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } + ctrl_uri = &uris[0]; + ctrl_uri->stype = LTTNG_STREAM_CONTROL; + /* Set default port if none specified */ + if (ctrl_uri->port == 0) { + ctrl_uri->port = DEFAULT_NETWORK_CONTROL_PORT; + } + print_uri_msg(ctrl_uri); + } + + /* Setting up data URI (-D opt) */ + if (opt_data_uris) { + size = parse_uri_from_str(opt_data_uris, &uris); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } + data_uri = &uris[0]; + data_uri->stype = LTTNG_STREAM_DATA; + /* Set default port if none specified */ + if (data_uri->port == 0) { + data_uri->port = DEFAULT_NETWORK_DATA_PORT; + } + print_uri_msg(data_uri); + } + } else { + /* Auto output path */ alloc_path = config_get_default_path(); if (alloc_path == NULL) { ERR("HOME path not found.\n \ - Please specify an output path using -o, --output PATH"); + Please specify an output path using -o, --output PATH"); ret = CMD_FATAL; goto error; } alloc_path = strdup(alloc_path); if (have_name) { - ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME + ret = asprintf(&traces_path, "file://%s/" DEFAULT_TRACE_DIR_NAME "/%s-%s", alloc_path, session_name, datetime); } else { - ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME + ret = asprintf(&traces_path, "file://%s/" DEFAULT_TRACE_DIR_NAME "/%s", alloc_path, session_name); } - if (ret < 0) { - perror("asprintf trace dir name"); + PERROR("asprintf trace dir name"); + ret = CMD_FATAL; goto error; } - } else { - traces_path = expand_full_path(opt_output_path); - if (traces_path == NULL) { - ret = CMD_ERROR; + + ret = uri_parse(traces_path, &ctrl_uri); + if (ret < 1) { + ret = CMD_FATAL; + goto error; + } + } + + /* If there is no subdir specified and the URI are network */ + if (strlen(ctrl_uri->subdir) == 0) { + if (have_name) { + ret = snprintf(ctrl_uri->subdir, sizeof(ctrl_uri->subdir), "%s-%s", + session_name, datetime); + } else { + ret = snprintf(ctrl_uri->subdir, sizeof(ctrl_uri->subdir), "%s", + session_name); + } + if (ret < 0) { + PERROR("snprintf subdir"); goto error; } + DBG("Subdir update to %s", ctrl_uri->subdir); } - ret = lttng_create_session(session_name, traces_path); + ret = lttng_create_session_uri(session_name, ctrl_uri, data_uri, + opt_no_consumer); if (ret < 0) { /* Don't set ret so lttng can interpret the sessiond error. */ switch (-ret) { @@ -142,7 +327,9 @@ static int create_session() } MSG("Session %s created.", session_name); - MSG("Traces will be written in %s" , traces_path); + if (ctrl_uri->dtype == LTTNG_DST_PATH) { + MSG("Traces will be written in %s" , ctrl_uri->dst.path); + } ret = CMD_SUCCESS; diff --git a/src/bin/lttng/commands/disable_consumer.c b/src/bin/lttng/commands/disable_consumer.c new file mode 100644 index 000000000..58361db1e --- /dev/null +++ b/src/bin/lttng/commands/disable_consumer.c @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2012 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../command.h" +#include "../utils.h" + +#include +#include +#include + +static int opt_kernel; +static int opt_userspace; +static char *opt_session_name; + +static struct lttng_handle *handle; + +enum { + OPT_HELP = 1, + OPT_LIST_OPTIONS, +}; + +static struct poptOption long_options[] = { + /* longName, shortName, argInfo, argPtr, value, descrip, argDesc */ + {"help", 'h', POPT_ARG_NONE, NULL, OPT_HELP, NULL, NULL}, + {"list-options", 0, POPT_ARG_NONE, NULL, OPT_LIST_OPTIONS, NULL, NULL}, + {"session", 's', POPT_ARG_STRING, &opt_session_name, 0, 0, 0}, + {"kernel", 'k', POPT_ARG_VAL, &opt_kernel, 1, 0, 0}, + {"userspace", 'u', POPT_ARG_VAL, &opt_userspace, 1, 0, 0}, + {0, 0, 0, 0, 0, 0, 0} +}; + +/* + * usage + */ +static void usage(FILE *ofp) +{ + fprintf(ofp, "usage: lttng disable-consumer [-u|-k] [OPTIONS]\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "Disable the consumer for a tracing session. This call can\n"); + fprintf(ofp, "be done BEFORE tracing has started.\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "Options:\n"); + fprintf(ofp, " -h, --help Show this help\n"); + fprintf(ofp, " --list-options Simple listing of options\n"); + fprintf(ofp, " -s, --session=NAME Apply to session name\n"); + fprintf(ofp, " -k, --kernel Apply to the kernel tracer\n"); + fprintf(ofp, " -u, --userspace Apply to the user-space tracer\n"); + fprintf(ofp, "\n"); +} + +/* + * Disable consumer command. + */ +static int disable_consumer(char *session_name) +{ + int ret = CMD_SUCCESS; + struct lttng_domain dom; + + memset(&dom, 0, sizeof(dom)); + + /* Create lttng domain */ + if (opt_kernel) { + dom.type = LTTNG_DOMAIN_KERNEL; + } else if (opt_userspace) { + dom.type = LTTNG_DOMAIN_UST; + } else { + ERR("Please specify a tracer (-k/--kernel or -u/--userspace)"); + ret = CMD_ERROR; + goto error; + } + + handle = lttng_create_handle(session_name, &dom); + if (handle == NULL) { + ret = -1; + goto error; + } + + ret = lttng_disable_consumer(handle); + if (ret < 0) { + ERR("Disabling consumer for session %s: %s", session_name, + lttng_strerror(ret)); + goto error; + } + + MSG("Consumer disabled successfully"); + +error: + lttng_destroy_handle(handle); + return ret; +} + +/* + * The 'disable-consumer ' first level command + * + * Returns one of the CMD_* result constants. + */ +int cmd_disable_consumer(int argc, const char **argv) +{ + int opt, ret = CMD_SUCCESS; + static poptContext pc; + char *session_name = NULL; + + pc = poptGetContext(NULL, argc, argv, long_options, 0); + poptReadDefaultConfig(pc, 0); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + case OPT_HELP: + usage(stdout); + goto end; + case OPT_LIST_OPTIONS: + list_cmd_options(stdout, long_options); + goto end; + default: + usage(stderr); + ret = CMD_UNDEFINED; + goto end; + } + } + + /* Get session name */ + if (!opt_session_name) { + session_name = get_session_name(); + if (session_name == NULL) { + ret = CMD_ERROR; + goto end; + } + } else { + session_name = opt_session_name; + } + + ret = disable_consumer(session_name); + +end: + if (opt_session_name == NULL) { + free(session_name); + } + + poptFreeContext(pc); + return ret; +} diff --git a/src/bin/lttng/commands/enable_consumer.c b/src/bin/lttng/commands/enable_consumer.c new file mode 100644 index 000000000..67594529f --- /dev/null +++ b/src/bin/lttng/commands/enable_consumer.c @@ -0,0 +1,410 @@ +/* + * Copyright (C) 2012 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../command.h" +#include "../utils.h" + +#include +#include +#include + +static int opt_kernel; +static int opt_userspace; +static int opt_enable; +static char *opt_session_name; +static char *opt_uris; +static char *opt_ctrl_uris; +static char *opt_data_uris; +static char *opt_uris_arg; + +static struct lttng_handle *handle; + +enum { + OPT_HELP = 1, + OPT_LIST_OPTIONS, +}; + +static struct poptOption long_options[] = { + /* longName, shortName, argInfo, argPtr, value, descrip, argDesc */ + {"help", 'h', POPT_ARG_NONE, NULL, OPT_HELP, NULL, NULL}, + {"list-options", 0, POPT_ARG_NONE, NULL, OPT_LIST_OPTIONS, NULL, NULL}, + {"session", 's', POPT_ARG_STRING, &opt_session_name, 0, 0, 0}, + {"kernel", 'k', POPT_ARG_VAL, &opt_kernel, 1, 0, 0}, + {"userspace", 'u', POPT_ARG_VAL, &opt_userspace, 1, 0, 0}, + {"set-uri", 'U', POPT_ARG_STRING, &opt_uris, 0, 0, 0}, + {"ctrl-uri", 'C', POPT_ARG_STRING, &opt_ctrl_uris, 0, 0, 0}, + {"data-uri", 'D', POPT_ARG_STRING, &opt_data_uris, 0, 0, 0}, + {"enable", 'e', POPT_ARG_VAL, &opt_enable, 1, 0, 0}, + {0, 0, 0, 0, 0, 0, 0} +}; + +/* + * usage + */ +static void usage(FILE *ofp) +{ + fprintf(ofp, "usage: lttng enable-consumer [-u|-k] [URI] [OPTIONS]\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "The default behavior is to enable a consumer to the current URI.\n"); + fprintf(ofp, "The default URI is the local filesystem at the path of the session.\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "The enable-consumer feature supports both local and network transport.\n"); + fprintf(ofp, "You must have a running lttng-relayd for network transmission.\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "You can optionally specify two URIs for respectively the\n"); + fprintf(ofp, "control and data channel. URI supported:\n"); + fprintf(ofp, " > file://PATH\n"); + fprintf(ofp, " Local file full system path.\n"); + fprintf(ofp, "\n"); + fprintf(ofp, " > net://DST[:CTRL_PORT[:DATA_PORT]] and net6://...\n"); + fprintf(ofp, " This will use the default network transport layer which is\n"); + fprintf(ofp, " TCP for both control and data port. The default ports are\n"); + fprintf(ofp, " respectively 5342 and 5343.\n"); + fprintf(ofp, " Example:\n"); + fprintf(ofp, " # lttng enable-consumer net://192.168.1.42 -k\n"); + fprintf(ofp, " Uses TCP and default ports for the given destination.\n"); + fprintf(ofp, "\n"); + fprintf(ofp, " > tcp://DST:PORT and tcp6://DST:PORT\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "Options:\n"); + fprintf(ofp, " -h, --help Show this help\n"); + fprintf(ofp, " --list-options Simple listing of options\n"); + fprintf(ofp, " -s, --session=NAME Apply to session name\n"); + fprintf(ofp, " -k, --kernel Apply to the kernel tracer\n"); + fprintf(ofp, " -u, --userspace Apply to the user-space tracer\n"); + //fprintf(ofp, " -U, --set-uri=URI1[,URI2,...]\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "Extended Options:\n"); + fprintf(ofp, "\n"); + fprintf(ofp, "Using these options, each API call is controlled individually.\n"); + fprintf(ofp, "For instance, -C does not enable the consumer automatically.\n"); + fprintf(ofp, "\n"); + fprintf(ofp, " -U, --set-uri=URI Set URI for the enable-consumer destination.\n"); + fprintf(ofp, " It is persistent for the session lifetime.\n"); + fprintf(ofp, " Redo the command to change it.\n"); + fprintf(ofp, " This will set both data and control URI for network.\n"); + //fprintf(ofp, " -C, --ctrl-uri=URI1[,URI2,...]\n"); + fprintf(ofp, " -C, --ctrl-uri=URI Set control path URI.\n"); + //fprintf(ofp, " -D, --data-uri=URI1[,URI2,...]\n"); + fprintf(ofp, " -D, --data-uri=URI Set data path URI.\n"); + fprintf(ofp, " -e, --enable Enable consumer\n"); + fprintf(ofp, "\n"); +} + +/* + * Print URI message. + */ +static void print_uri_msg(struct lttng_uri *uri) +{ + char *dst; + + switch (uri->dtype) { + case LTTNG_DST_IPV4: + dst = uri->dst.ipv4; + break; + case LTTNG_DST_IPV6: + dst = uri->dst.ipv6; + break; + case LTTNG_DST_PATH: + dst = uri->dst.path; + MSG("Consumer destination set to %s", dst); + goto end; + default: + DBG("Unknown URI destination"); + goto end; + } + + MSG("Consumer %s stream set to %s with the %s protocol on port %d", + uri->stype == LTTNG_STREAM_CONTROL ? "control" : "data", + dst, uri->proto == LTTNG_TCP ? "TCP" : "UNK", uri->port); + +end: + return; +} + +/* + * Setting URIs taking from the command line arguments. There is some + * manipulations and special cases using the default args. + */ +static int set_consumer_arg_uris(struct lttng_uri *uri, size_t size) +{ + int ret, i; + + if (size == 2) { + /* URIs are the control and data stream respectively for net:// */ + uri[0].stype = LTTNG_STREAM_CONTROL; + uri[1].stype = LTTNG_STREAM_DATA; + + for (i = 0; i < size; i++) { + ret = lttng_set_consumer_uri(handle, &uri[i]); + if (ret < 0) { + ERR("Setting %s stream URI: %s", + uri[i].stype == LTTNG_STREAM_DATA ? "data" : "control", + lttng_strerror(ret)); + goto error; + } + /* Set default port if none was given */ + if (uri[i].port == 0) { + if (uri[i].stype == LTTNG_STREAM_CONTROL) { + uri[i].port = DEFAULT_NETWORK_CONTROL_PORT; + } else { + uri[i].port = DEFAULT_NETWORK_DATA_PORT; + } + } + print_uri_msg(&uri[i]); + } + } else if (size == 1 && uri[0].dtype == LTTNG_DST_PATH) { + /* Set URI if it's file:// */ + ret = lttng_set_consumer_uri(handle, &uri[0]); + if (ret < 0) { + ERR("Failed to set URI %s: %s", opt_uris_arg, + lttng_strerror(ret)); + goto error; + } + print_uri_msg(&uri[0]); + } else { + ERR("Only net:// and file:// are supported. " + "Use -D or -U for more fine grained control"); + ret = CMD_ERROR; + goto error; + } + +error: + return ret; +} + +/* + * Parse URI from string to lttng_uri object array. + */ +static ssize_t parse_uri_from_str(const char *str_uri, struct lttng_uri **uris) +{ + ssize_t size; + + if (*uris != NULL) { + free(*uris); + } + + size = uri_parse(str_uri, uris); + if (size < 1) { + ERR("Bad URI %s. Either the hostname or IP is invalid", str_uri); + size = -1; + } + + return size; +} + +/* + * Enable consumer command. + */ +static int enable_consumer(char *session_name) +{ + int ret = CMD_SUCCESS; + int run_enable_cmd = 1; + ssize_t size; + struct lttng_domain dom; + struct lttng_uri *uri = NULL; + + memset(&dom, 0, sizeof(dom)); + + /* Create lttng domain */ + if (opt_kernel) { + dom.type = LTTNG_DOMAIN_KERNEL; + } else if (opt_userspace) { + dom.type = LTTNG_DOMAIN_UST; + } else { + ERR("Please specify a tracer (-k/--kernel or -u/--userspace)"); + ret = CMD_ERROR; + goto error; + } + + handle = lttng_create_handle(session_name, &dom); + if (handle == NULL) { + ret = -1; + goto error; + } + + /* Handle trailing arguments */ + if (opt_uris_arg) { + size = parse_uri_from_str(opt_uris_arg, &uri); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } + + ret = set_consumer_arg_uris(uri, size); + if (ret < 0) { + goto free_uri; + } + } + + /* Handling URIs (-U opt) */ + if (opt_uris) { + size = parse_uri_from_str(opt_uris, &uri); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } + + ret = set_consumer_arg_uris(uri, size); + if (ret < 0) { + goto free_uri; + } + + /* opt_enable will tell us to run or not the enable_consumer cmd. */ + run_enable_cmd = 0; + } + + /* Setting up control URI (-C opt) */ + if (opt_ctrl_uris) { + size = parse_uri_from_str(opt_ctrl_uris, &uri); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } + + /* Set default port if none specified */ + if (uri[0].port == 0) { + uri[0].port = DEFAULT_NETWORK_CONTROL_PORT; + } + + uri[0].stype = LTTNG_STREAM_CONTROL; + + ret = lttng_set_consumer_uri(handle, &uri[0]); + if (ret < 0) { + ERR("Failed to set control URI %s: %s", opt_ctrl_uris, + lttng_strerror(ret)); + goto free_uri; + } + print_uri_msg(&uri[0]); + + /* opt_enable will tell us to run or not the enable_consumer cmd. */ + run_enable_cmd = 0; + } + + /* Setting up data URI (-D opt) */ + if (opt_data_uris) { + size = parse_uri_from_str(opt_data_uris, &uri); + if (size < 1) { + ret = CMD_ERROR; + goto error; + } + + /* Set default port if none specified */ + if (uri[0].port == 0) { + uri[0].port = DEFAULT_NETWORK_DATA_PORT; + } + + uri[0].stype = LTTNG_STREAM_DATA; + + ret = lttng_set_consumer_uri(handle, &uri[0]); + if (ret < 0) { + ERR("Failed to set data URI %s: %s", opt_data_uris, + lttng_strerror(ret)); + goto free_uri; + } + print_uri_msg(&uri[0]); + + /* opt_enable will tell us to run or not the enable_consumer cmd. */ + run_enable_cmd = 0; + } + + /* Enable consumer (-e opt) */ + if (opt_enable || run_enable_cmd) { + ret = lttng_enable_consumer(handle); + if (ret < 0) { + ERR("Enabling consumer for session %s: %s", session_name, + lttng_strerror(ret)); + if (ret == -LTTCOMM_ENABLE_CONSUMER_FAIL) { + ERR("Perhaps the session was previously started?"); + } + goto free_uri; + } + + MSG("Consumer enabled successfully"); + } + +free_uri: + free(uri); + +error: + lttng_destroy_handle(handle); + return ret; +} + +/* + * The 'enable-consumer ' first level command + * + * Returns one of the CMD_* result constants. + */ +int cmd_enable_consumer(int argc, const char **argv) +{ + int opt, ret = CMD_SUCCESS; + static poptContext pc; + char *session_name = NULL; + + pc = poptGetContext(NULL, argc, argv, long_options, 0); + poptReadDefaultConfig(pc, 0); + + while ((opt = poptGetNextOpt(pc)) != -1) { + switch (opt) { + case OPT_HELP: + usage(stdout); + goto end; + case OPT_LIST_OPTIONS: + list_cmd_options(stdout, long_options); + goto end; + default: + usage(stderr); + ret = CMD_UNDEFINED; + goto end; + } + } + + opt_uris_arg = (char *) poptGetArg(pc); + DBG("URIs: %s", opt_uris_arg); + + /* Get session name */ + if (!opt_session_name) { + session_name = get_session_name(); + if (session_name == NULL) { + ret = CMD_ERROR; + goto end; + } + } else { + session_name = opt_session_name; + } + + ret = enable_consumer(session_name); + +end: + if (opt_session_name == NULL) { + free(session_name); + } + + poptFreeContext(pc); + return ret; +} diff --git a/src/bin/lttng/lttng.c b/src/bin/lttng/lttng.c index 75a467606..f41e9767d 100644 --- a/src/bin/lttng/lttng.c +++ b/src/bin/lttng/lttng.c @@ -74,6 +74,8 @@ static struct cmd_struct commands[] = { { "version", cmd_version}, { "calibrate", cmd_calibrate}, { "view", cmd_view}, + { "enable-consumer", cmd_enable_consumer}, + { "disable-consumer", cmd_disable_consumer}, { NULL, NULL} /* Array closure */ }; @@ -93,20 +95,22 @@ static void usage(FILE *ofp) fprintf(ofp, " --sessiond-path PATH Session daemon full path\n"); fprintf(ofp, "\n"); fprintf(ofp, "Commands:\n"); - fprintf(ofp, " add-context Add context to event and/or channel\n"); - fprintf(ofp, " calibrate Quantify LTTng overhead\n"); - fprintf(ofp, " create Create tracing session\n"); - fprintf(ofp, " destroy Tear down tracing session\n"); - fprintf(ofp, " enable-channel Enable tracing channel\n"); - fprintf(ofp, " enable-event Enable tracing event\n"); - fprintf(ofp, " disable-channel Disable tracing channel\n"); - fprintf(ofp, " disable-event Disable tracing event\n"); - fprintf(ofp, " list List possible tracing options\n"); - fprintf(ofp, " set-session Set current session name\n"); - fprintf(ofp, " start Start tracing\n"); - fprintf(ofp, " stop Stop tracing\n"); - fprintf(ofp, " version Show version information\n"); - fprintf(ofp, " view Start trace viewer\n"); + fprintf(ofp, " add-context Add context to event and/or channel\n"); + fprintf(ofp, " calibrate Quantify LTTng overhead\n"); + fprintf(ofp, " create Create tracing session\n"); + fprintf(ofp, " destroy Tear down tracing session\n"); + fprintf(ofp, " enable-channel Enable tracing channel\n"); + fprintf(ofp, " enable-event Enable tracing event\n"); + fprintf(ofp, " disable-channel Disable tracing channel\n"); + fprintf(ofp, " disable-event Disable tracing event\n"); + fprintf(ofp, " enable-consumer Enable local or streaming consumer\n"); + fprintf(ofp, " disable-consumer Disable consumer\n"); + fprintf(ofp, " list List possible tracing options\n"); + fprintf(ofp, " set-session Set current session name\n"); + fprintf(ofp, " start Start tracing\n"); + fprintf(ofp, " stop Stop tracing\n"); + fprintf(ofp, " version Show version information\n"); + fprintf(ofp, " view Start trace viewer\n"); fprintf(ofp, "\n"); fprintf(ofp, "Each command also has its own -h, --help option.\n"); fprintf(ofp, "\n"); diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 1cf707204..5c694c1c5 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -1,11 +1,12 @@ AM_CPPFLAGS = -SUBDIRS = compat hashtable kernel-ctl sessiond-comm kernel-consumer ust-consumer +SUBDIRS = compat hashtable kernel-ctl sessiond-comm relayd kernel-consumer ust-consumer AM_CFLAGS = -fno-strict-aliasing noinst_HEADERS = lttng-kernel.h defaults.h macros.h error.h futex.h uri.h +# Common library noinst_LTLIBRARIES = libcommon.la libcommon_la_SOURCES = runas.c runas.h common.h futex.c futex.h uri.c uri.h @@ -19,7 +20,8 @@ libconsumer_la_LIBADD = \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ $(top_builddir)/src/common/kernel-consumer/libkernel-consumer.la \ $(top_builddir)/src/common/hashtable/libhashtable.la \ - $(top_builddir)/src/common/compat/libcompat.la + $(top_builddir)/src/common/compat/libcompat.la \ + $(top_builddir)/src/common/relayd/librelayd.la if HAVE_LIBLTTNG_UST_CTL libconsumer_la_LIBADD += \ diff --git a/src/common/consumer.c b/src/common/consumer.c index dd8c621f1..88e8d9d28 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers + * 2012 - David Goulet * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -29,8 +30,10 @@ #include #include +#include #include #include +#include #include #include "consumer.h" @@ -151,6 +154,45 @@ void consumer_free_stream(struct rcu_head *head) free(stream); } +/* + * RCU protected relayd socket pair free. + */ +static void consumer_rcu_free_relayd(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct consumer_relayd_sock_pair *relayd = + caa_container_of(node, struct consumer_relayd_sock_pair, node); + + free(relayd); +} + +/* + * Destroy and free relayd socket pair object. + * + * This function MUST be called with the consumer_data lock acquired. + */ +void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) +{ + int ret; + struct lttng_ht_iter iter; + + DBG("Consumer destroy and close relayd socket pair"); + + iter.iter.node = &relayd->node.node; + ret = lttng_ht_del(consumer_data.relayd_ht, &iter); + assert(!ret); + + /* Close all sockets */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + (void) relayd_close(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + (void) relayd_close(&relayd->data_sock); + + /* RCU free() call */ + call_rcu(&relayd->node.head, consumer_rcu_free_relayd); +} + /* * Remove a stream from the global list protected by a mutex. This * function is also responsible for freeing its data structures. @@ -160,6 +202,9 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) int ret; struct lttng_ht_iter iter; struct lttng_consumer_channel *free_chan = NULL; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); pthread_mutex_lock(&consumer_data.lock); @@ -214,8 +259,23 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) PERROR("close"); } } - if (!--stream->chan->refcount) + + /* Check and cleanup relayd */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* We are about to modify the relayd refcount */ + rcu_read_lock(); + if (!--relayd->refcount) { + /* Refcount of the relayd struct is 0, destroy it */ + consumer_destroy_relayd(relayd); + } + rcu_read_unlock(); + } + + if (!--stream->chan->refcount) { free_chan = stream->chan; + } + call_rcu(&stream->node.head, consumer_free_stream); end: @@ -234,7 +294,9 @@ struct lttng_consumer_stream *consumer_allocate_stream( enum lttng_event_output output, const char *path_name, uid_t uid, - gid_t gid) + gid_t gid, + int net_index, + int metadata_flag) { struct lttng_consumer_stream *stream; int ret; @@ -261,9 +323,12 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->output = output; stream->uid = uid; stream->gid = gid; - strncpy(stream->path_name, path_name, PATH_MAX - 1); - stream->path_name[PATH_MAX - 1] = '\0'; + stream->net_seq_idx = net_index; + stream->metadata_flag = metadata_flag; + strncpy(stream->path_name, path_name, sizeof(stream->path_name)); + stream->path_name[sizeof(stream->path_name) - 1] = '\0'; lttng_ht_node_init_ulong(&stream->node, stream->key); + lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -282,12 +347,13 @@ struct lttng_consumer_stream *consumer_allocate_stream( assert(0); goto end; } - DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d)", + DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, stream->shm_fd, stream->wait_fd, (unsigned long long) stream->mmap_len, - stream->out_fd); + stream->out_fd, + stream->net_seq_idx); end: return stream; } @@ -300,6 +366,7 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) int ret = 0; struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; + struct consumer_relayd_sock_pair *relayd; pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ @@ -317,6 +384,17 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); rcu_read_unlock(); + + /* Check and cleanup relayd */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* We are about to modify the relayd refcount */ + rcu_read_lock(); + relayd->refcount++; + rcu_read_unlock(); + } + + /* Update consumer data */ consumer_data.stream_count++; consumer_data.need_update = 1; @@ -340,6 +418,153 @@ end: return ret; } +/* + * Add relayd socket to global consumer data hashtable. + */ +int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd) +{ + int ret = 0; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + + if (relayd == NULL) { + ret = -1; + goto end; + } + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.relayd_ht, + (void *)((unsigned long) relayd->net_seq_idx), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + rcu_read_unlock(); + /* Relayd already exist. Ignore the insertion */ + goto end; + } + lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node); + + rcu_read_unlock(); + +end: + return ret; +} + +/* + * Allocate and return a consumer relayd socket. + */ +struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( + int net_seq_idx) +{ + struct consumer_relayd_sock_pair *obj = NULL; + + /* Negative net sequence index is a failure */ + if (net_seq_idx < 0) { + goto error; + } + + obj = zmalloc(sizeof(struct consumer_relayd_sock_pair)); + if (obj == NULL) { + PERROR("zmalloc relayd sock"); + goto error; + } + + obj->net_seq_idx = net_seq_idx; + obj->refcount = 0; + lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); + pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); + +error: + return obj; +} + +/* + * Find a relayd socket pair in the global consumer data. + * + * Return the object if found else NULL. + */ +struct consumer_relayd_sock_pair *consumer_find_relayd(int key) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Negative keys are lookup failures */ + if (key < 0) { + goto error; + } + + rcu_read_lock(); + + lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node); + } + + rcu_read_unlock(); + +error: + return relayd; +} + +/* + * Handle stream for relayd transmission if the stream applies for network + * streaming where the net sequence index is set. + * + * Return destination file descriptor or negative value on error. + */ +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, + size_t data_size) +{ + int outfd = -1, ret; + struct consumer_relayd_sock_pair *relayd; + struct lttcomm_relayd_data_hdr data_hdr; + + /* Safety net */ + assert(stream); + + /* Reset data header */ + memset(&data_hdr, 0, sizeof(data_hdr)); + + /* Get relayd reference of the stream. */ + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + /* Stream is either local or corrupted */ + goto error; + } + + DBG("Consumer found relayd socks with index %d", stream->net_seq_idx); + if (stream->metadata_flag) { + /* Caller MUST acquire the relayd control socket lock */ + ret = relayd_send_metadata(&relayd->control_sock, data_size); + if (ret < 0) { + goto error; + } + + /* Metadata are always sent on the control socket. */ + outfd = relayd->control_sock.fd; + } else { + /* Set header with stream information */ + data_hdr.stream_id = htobe64(stream->relayd_stream_id); + data_hdr.data_size = htobe32(data_size); + /* Other fields are zeroed previously */ + + ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, + sizeof(data_hdr)); + if (ret < 0) { + goto error; + } + + /* Set to go on data socket */ + outfd = relayd->data_sock.fd; + } + +error: + return outfd; +} + /* * Update a stream according to what we just received. */ @@ -464,9 +689,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( goto end; } DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)", - channel->key, - channel->shm_fd, - channel->wait_fd, + channel->key, channel->shm_fd, channel->wait_fd, (unsigned long long) channel->mmap_len, (unsigned long long) channel->max_sb_size); end: @@ -512,7 +735,8 @@ end: */ int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_stream) + struct lttng_consumer_stream **local_stream, + struct lttng_ht *metadata_ht) { int i = 0; struct lttng_ht_iter iter; @@ -528,6 +752,10 @@ int consumer_update_poll_array( DBG("Active FD %d", stream->wait_fd); (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; + if (stream->metadata_flag && metadata_ht) { + lttng_ht_add_unique_ulong(metadata_ht, &stream->waitfd_node); + DBG("Active FD added to metadata hash table"); + } local_stream[i] = stream; i++; } @@ -584,7 +812,6 @@ void lttng_consumer_set_error_sock( /* * Set the command socket path. */ - void lttng_consumer_set_command_sock_path( struct lttng_consumer_local_data *ctx, char *sock) { @@ -654,8 +881,8 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) } } -void lttng_consumer_sync_trace_file( - struct lttng_consumer_stream *stream, off_t orig_offset) +void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, + off_t orig_offset) { int outfd = stream->out_fd; @@ -952,6 +1179,13 @@ void *lttng_consumer_thread_poll_fds(void *data) /* local view of consumer_data.fds_count */ int nb_fd = 0; struct lttng_consumer_local_data *ctx = data; + struct lttng_ht *metadata_ht; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_stream *metadata_stream; + ssize_t len; + + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); rcu_register_thread(); @@ -992,7 +1226,8 @@ void *lttng_consumer_thread_poll_fds(void *data) pthread_mutex_unlock(&consumer_data.lock); goto end; } - ret = consumer_update_poll_array(ctx, &pollfd, local_stream); + ret = consumer_update_poll_array(ctx, &pollfd, local_stream, + metadata_ht); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, CONSUMERD_POLL_ERROR); @@ -1029,10 +1264,9 @@ void *lttng_consumer_thread_poll_fds(void *data) } /* - * If the consumer_poll_pipe triggered poll go - * directly to the beginning of the loop to update the - * array. We want to prioritize array update over - * low-priority reads. + * If the consumer_poll_pipe triggered poll go directly to the + * beginning of the loop to update the array. We want to prioritize + * array update over low-priority reads. */ if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) { size_t pipe_readlen; @@ -1048,9 +1282,24 @@ void *lttng_consumer_thread_poll_fds(void *data) /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents & POLLPRI) { - ssize_t len; - + /* Lookup for metadata which is the highest priority */ + lttng_ht_lookup(metadata_ht, + (void *)((unsigned long) pollfd[i].fd), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL && + (pollfd[i].revents & (POLLIN | POLLPRI))) { + DBG("Urgent metadata read on fd %d", pollfd[i].fd); + metadata_stream = caa_container_of(node, + struct lttng_consumer_stream, waitfd_node); + high_prio = 1; + len = ctx->on_buffer_ready(metadata_stream, ctx); + /* it's ok to have an unavailable sub-buffer */ + if (len < 0 && len != -EAGAIN) { + goto end; + } else if (len > 0) { + metadata_stream->data_read = 1; + } + } else if (pollfd[i].revents & POLLPRI) { DBG("Urgent read on fd %d", pollfd[i].fd); high_prio = 1; len = ctx->on_buffer_ready(local_stream[i], ctx); @@ -1075,8 +1324,6 @@ void *lttng_consumer_thread_poll_fds(void *data) for (i = 0; i < nb_fd; i++) { if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done) { - ssize_t len; - DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */ @@ -1108,18 +1355,33 @@ void *lttng_consumer_thread_poll_fds(void *data) if ((pollfd[i].revents & POLLHUP)) { DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } } else if (pollfd[i].revents & POLLERR) { ERR("Error returned in polling fd %d.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } } else if (pollfd[i].revents & POLLNVAL) { ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); if (!local_stream[i]->data_read) { + if (local_stream[i]->metadata_flag) { + iter.iter.node = &local_stream[i]->waitfd_node.node; + ret = lttng_ht_del(metadata_ht, &iter); + assert(!ret); + } consumer_del_stream(local_stream[i]); num_hup++; } @@ -1303,5 +1565,5 @@ void lttng_consumer_init(void) { consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); } - diff --git a/src/common/consumer.h b/src/common/consumer.h index 6ac781605..8bcf27d2c 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -1,6 +1,7 @@ /* * Copyright (C) 2011 - Julien Desfossez - * Copyright (C) 2011 - Mathieu Desnoyers + * Mathieu Desnoyers + * 2012 - David Goulet * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -27,6 +28,7 @@ #include #include +#include /* * When the receiving thread dies, we need to have a way to make the polling @@ -51,6 +53,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_UPDATE_STREAM, /* inform the consumer to quit when all fd has hang up */ LTTNG_CONSUMER_STOP, + LTTNG_CONSUMER_ADD_RELAYD_SOCKET, }; /* State of each fd in consumer */ @@ -92,6 +95,7 @@ struct lttng_ust_lib_ring_buffer; */ struct lttng_consumer_stream { struct lttng_ht_node_ulong node; + struct lttng_ht_node_ulong waitfd_node; struct lttng_consumer_channel *chan; /* associated channel */ /* * key is the key used by the session daemon to refer to the @@ -118,6 +122,27 @@ struct lttng_consumer_stream { /* UID/GID of the user owning the session to which stream belongs */ uid_t uid; gid_t gid; + /* Network sequence number. Indicating on which relayd socket it goes. */ + int net_seq_idx; + /* Identify if the stream is the metadata */ + unsigned int metadata_flag; + /* Used when the stream is set for network streaming */ + uint64_t relayd_stream_id; +}; + +/* + * Internal representation of a relayd socket pair. + */ +struct consumer_relayd_sock_pair { + /* Network sequence number. */ + int net_seq_idx; + /* Number of stream associated with this relayd */ + unsigned int refcount; + pthread_mutex_t ctrl_sock_mutex; + /* Sockets information */ + struct lttcomm_sock control_sock; + struct lttcomm_sock data_sock; + struct lttng_ht_node_ulong node; }; /* @@ -184,7 +209,6 @@ struct lttng_consumer_local_data { * Library-level data. One instance per process. */ struct lttng_consumer_global_data { - /* * At this time, this lock is used to ensure coherence between the count * and number of element in the hash table. It's also a protection for @@ -210,6 +234,12 @@ struct lttng_consumer_global_data { */ unsigned int need_update; enum lttng_consumer_type type; + + /* + * Relayd socket(s) hashtable indexed by network sequence number. Each + * stream has an index which associate the right relayd socket to use. + */ + struct lttng_ht *relayd_ht; }; /* @@ -263,7 +293,8 @@ extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); extern int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, - struct lttng_consumer_stream **local_consumer_streams); + struct lttng_consumer_stream **local_consumer_streams, + struct lttng_ht *metadata_ht); extern struct lttng_consumer_stream *consumer_allocate_stream( int channel_key, int stream_key, @@ -273,7 +304,9 @@ extern struct lttng_consumer_stream *consumer_allocate_stream( enum lttng_event_output output, const char *path_name, uid_t uid, - gid_t gid); + gid_t gid, + int net_index, + int metadata_flag); extern int consumer_add_stream(struct lttng_consumer_stream *stream); extern void consumer_del_stream(struct lttng_consumer_stream *stream); extern void consumer_change_stream_state(int stream_key, @@ -286,6 +319,14 @@ extern struct lttng_consumer_channel *consumer_allocate_channel( uint64_t max_sb_size); int consumer_add_channel(struct lttng_consumer_channel *channel); +/* lttng-relayd consumer command */ +int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd); +struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( + int net_seq_idx); +struct consumer_relayd_sock_pair *consumer_find_relayd(int key); +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, + size_t data_size); + extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, diff --git a/src/common/defaults.h b/src/common/defaults.h index 53caf62ac..5c5998dde 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -40,6 +40,10 @@ #define DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH "/lttng-ust-apps-wait" #define DEFAULT_HOME_APPS_WAIT_SHM_PATH "/lttng-ust-apps-wait-%u" +/* Default directory where the trace are written in per domain */ +#define DEFAULT_KERNEL_TRACE_DIR "/kernel" +#define DEFAULT_UST_TRACE_DIR "/ust" + /* Default consumer paths */ #define DEFAULT_CONSUMERD_RUNDIR "%s" diff --git a/src/common/kernel-consumer/Makefile.am b/src/common/kernel-consumer/Makefile.am index 655f77ef0..008041e1d 100644 --- a/src/common/kernel-consumer/Makefile.am +++ b/src/common/kernel-consumer/Makefile.am @@ -4,5 +4,5 @@ noinst_LTLIBRARIES = libkernel-consumer.la libkernel_consumer_la_SOURCES = kernel-consumer.c kernel-consumer.h libkernel_consumer_la_LIBADD = \ - $(top_builddir)/src/common/kernel-ctl/libkernel-ctl.la - + $(top_builddir)/src/common/kernel-ctl/libkernel-ctl.la \ + $(top_builddir)/src/common/relayd/librelayd.la diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 3c9687306..8ed279f44 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -31,7 +31,9 @@ #include #include #include +#include #include +#include #include "kernel-consumer.h" @@ -52,7 +54,18 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap( ssize_t ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int fd = stream->wait_fd; + /* Default is on the disk */ int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } /* get the offset inside the fd to mmap */ ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); @@ -63,6 +76,49 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap( goto end; } + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + if (stream->metadata_flag) { + /* Metadata requires the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + + ret = consumer_handle_stream_before_relayd(stream, len); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + } while (errno == EINTR); + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + while (len > 0) { ret = write(outfd, stream->mmap_base + mmap_offset, len); if (ret < 0) { @@ -84,14 +140,26 @@ ssize_t lttng_kconsumer_on_read_subbuffer_mmap( len -= ret; mmap_offset += ret; } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); + end: + /* Unlock only if ctrl socket used */ + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } @@ -104,54 +172,125 @@ ssize_t lttng_kconsumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len) { - ssize_t ret = 0, written = 0; + ssize_t ret = 0, written = 0, ret_splice = 0; loff_t offset = 0; off_t orig_offset = stream->out_fd_offset; int fd = stream->wait_fd; + /* Default is on the disk */ int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } + + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Write metadata stream id before payload */ + if (stream->metadata_flag && relayd) { + /* + * Lock the control socket for the complete duration of the function + * since from this point on we will use the socket. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + + do { + metadata_id = htobe64(stream->relayd_stream_id); + ret = write(ctx->consumer_thread_pipe[1], + (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + } while (errno == EINTR); + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, + DBG("splice chan to pipe offset %lu of len %lu (fd : %d)", + (unsigned long)offset, len, fd); + ret_splice = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %zd", ret); - if (ret < 0) { + DBG("splice chan to pipe, ret %zd", ret_splice); + if (ret_splice < 0) { perror("Error in relay splice"); if (written == 0) { - written = ret; + written = ret_splice; } ret = errno; goto splice_error; } - ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %zd", ret); - if (ret < 0) { + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Update counter to fit the spliced data */ + ret_splice += sizeof(stream->relayd_stream_id); + len += sizeof(stream->relayd_stream_id); + /* + * We do this so the return value can match the len passed as + * argument to this function. + */ + written -= sizeof(stream->relayd_stream_id); + } + + ret = consumer_handle_stream_before_relayd(stream, ret_splice); + if (ret >= 0) { + /* Use the returned socket. */ + outfd = ret; + } else { + if (outfd == -1) { + ERR("Remote relayd disconnected. Stopping"); + goto end; + } + } + } + + DBG3("Kernel consumer splice data in %d to out %d", + ctx->consumer_thread_pipe[0], outfd); + ret_splice = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, + ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file, ret %zd", ret_splice); + if (ret_splice < 0) { perror("Error in file splice"); if (written == 0) { - written = ret; + written = ret_splice; } ret = errno; goto splice_error; } - if (ret > len) { + if (ret_splice > len) { errno = EINVAL; - perror("Wrote more data than requested"); - written += ret; + PERROR("Wrote more data than requested %zd (len: %lu)", + ret_splice, len); + written += ret_splice; ret = errno; goto splice_error; } - len -= ret; - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - written += ret; + len -= ret_splice; + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret_splice, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret_splice; + } + written += ret_splice; } lttng_consumer_sync_trace_file(stream, orig_offset); + ret = ret_splice; + goto end; splice_error: @@ -172,6 +311,12 @@ splice_error: } end: + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + + rcu_read_unlock(); + return written; } @@ -233,6 +378,85 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + int fd; + struct consumer_relayd_sock_pair *relayd; + + DBG("Consumer adding relayd socket"); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair( + msg.u.relayd_sock.net_index); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + goto end_nosignal; + } + + /* Copy socket information and received FD */ + switch (msg.u.relayd_sock.type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); + + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type"); + goto end_nosignal; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -260,21 +484,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ADD_STREAM: { - struct lttng_consumer_stream *new_stream; int fd; + struct consumer_relayd_sock_pair *relayd = NULL; + struct lttng_consumer_stream *new_stream; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; } + + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } - DBG("consumer_add_stream %s (%d)", msg.u.stream.path_name, - fd); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fd, fd, @@ -283,11 +508,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { @@ -298,6 +545,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_add_stream(new_stream); } + + DBG("Kernel consumer_add_stream (%d)", fd); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -380,7 +629,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * display the error but continue processing to try * to release the subbuffer */ - ERR("Error splicing to tracefile"); + ERR("Error splicing to tracefile (ret: %ld != len: %ld)", + ret, len); } break; @@ -428,7 +678,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO, diff --git a/src/common/kernel-ctl/kernel-ctl.c b/src/common/kernel-ctl/kernel-ctl.c index 8ade0830d..1396cd9bb 100644 --- a/src/common/kernel-ctl/kernel-ctl.c +++ b/src/common/kernel-ctl/kernel-ctl.c @@ -187,3 +187,16 @@ int kernctl_put_subbuf(int fd) { return ioctl(fd, RING_BUFFER_PUT_SUBBUF); } + +/* Set the stream_id */ +int kernctl_set_stream_id(int fd, unsigned long *stream_id) +{ + return ioctl(fd, RING_BUFFER_SET_STREAM_ID, stream_id); +} + +/* Get the offset of the stream_id in the packet header */ +int kernctl_get_net_stream_id_offset(int fd, unsigned long *offset) +{ + return ioctl(fd, LTTNG_KERNEL_STREAM_ID_OFFSET, offset); + +} diff --git a/src/common/kernel-ctl/kernel-ctl.h b/src/common/kernel-ctl/kernel-ctl.h index f9b81c334..18712d96e 100644 --- a/src/common/kernel-ctl/kernel-ctl.h +++ b/src/common/kernel-ctl/kernel-ctl.h @@ -65,5 +65,7 @@ int kernctl_get_subbuf(int fd, unsigned long *pos); int kernctl_put_subbuf(int fd); int kernctl_buffer_flush(int fd); +int kernctl_set_stream_id(int fd, unsigned long *stream_id); +int kernctl_get_net_stream_id_offset(int fd, unsigned long *offset); #endif /* _LTTNG_KERNEL_CTL_H */ diff --git a/src/common/kernel-ctl/kernel-ioctl.h b/src/common/kernel-ctl/kernel-ioctl.h index 70408987b..35942befa 100644 --- a/src/common/kernel-ctl/kernel-ioctl.h +++ b/src/common/kernel-ctl/kernel-ioctl.h @@ -46,6 +46,9 @@ #define RING_BUFFER_GET_MMAP_READ_OFFSET _IOR(0xF6, 0x0B, unsigned long) /* flush the current sub-buffer */ #define RING_BUFFER_FLUSH _IO(0xF6, 0x0C) +/* map stream to stream id for network streaming */ +#define RING_BUFFER_SET_STREAM_ID _IOW(0xF6, 0x0D, unsigned long) + /* LTTng file descriptor ioctl */ #define LTTNG_KERNEL_SESSION _IO(0xF6, 0x40) @@ -68,6 +71,8 @@ #define LTTNG_KERNEL_STREAM _IO(0xF6, 0x60) #define LTTNG_KERNEL_EVENT \ _IOW(0xF6, 0x61, struct lttng_kernel_event) +#define LTTNG_KERNEL_STREAM_ID_OFFSET \ + _IOR(0xF6, 0x62, unsigned long) /* Event and Channel FD ioctl */ #define LTTNG_KERNEL_CONTEXT \ diff --git a/src/common/relayd/Makefile.am b/src/common/relayd/Makefile.am new file mode 100644 index 000000000..84eee1b8e --- /dev/null +++ b/src/common/relayd/Makefile.am @@ -0,0 +1,6 @@ +# Relayd library +noinst_LTLIBRARIES = librelayd.la + +librelayd_la_SOURCES = relayd.c relayd.h + +librelayd_la_LIBADD = $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c new file mode 100644 index 000000000..8945afea3 --- /dev/null +++ b/src/common/relayd/relayd.c @@ -0,0 +1,387 @@ +/* + * Copyright (C) 2012 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "relayd.h" + +/* + * Send command. Fill up the header and append the data. + */ +static int send_command(struct lttcomm_sock *sock, + enum lttcomm_sessiond_command cmd, void *data, size_t size, + int flags) +{ + int ret; + struct lttcomm_relayd_hdr header; + char *buf; + uint64_t buf_size = sizeof(header); + + if (data) { + buf_size += size; + } + + buf = zmalloc(buf_size); + if (buf == NULL) { + PERROR("zmalloc relayd send command buf"); + ret = -1; + goto alloc_error; + } + + header.cmd = htobe32(cmd); + header.data_size = htobe64(size); + + /* Zeroed for now since not used. */ + header.cmd_version = 0; + header.circuit_id = 0; + + /* Prepare buffer to send. */ + memcpy(buf, &header, sizeof(header)); + if (data) { + memcpy(buf + sizeof(header), data, size); + } + + ret = sock->ops->sendmsg(sock, buf, buf_size, flags); + if (ret < 0) { + goto error; + } + + DBG3("Relayd sending command %d", cmd); + +error: + free(buf); +alloc_error: + return ret; +} + +/* + * Receive reply data on socket. This MUST be call after send_command or else + * could result in unexpected behavior(s). + */ +static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size) +{ + int ret; + + DBG3("Relayd waiting for reply..."); + + ret = sock->ops->recvmsg(sock, data, size, 0); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +#if 0 +/* + * Create session on the relayd. + * + * On error, return ret_code negative value else return 0. + */ +int relayd_create_session(struct lttcomm_sock *sock, const char *hostname, + const char *session_name) +{ + int ret; + struct lttcomm_relayd_create_session msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + assert(hostname); + assert(session_name); + + DBG("Relayd creating session for hostname %s and session name %s", + hostname, session_name); + + strncpy(msg.hostname, hostname, sizeof(msg.hostname)); + strncpy(msg.session_name, session_name, sizeof(msg.session_name)); + + /* Send command */ + ret = send_command(sock, RELAYD_CREATE_SESSION, (void *) &msg, + sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTCOMM_OK) { + ret = -reply.ret_code; + } else { + /* Success */ + ret = 0; + } + + DBG2("Relayd created session for %s", session_name); + +error: + return ret; +} +#endif + +/* + * Add stream on the relayd and assign stream handle to the stream_id argument. + * + * On success return 0 else return ret_code negative value. + */ +int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, + const char *pathname, uint64_t *stream_id) +{ + int ret; + struct lttcomm_relayd_add_stream msg; + struct lttcomm_relayd_status_stream reply; + + /* Code flow error. Safety net. */ + assert(sock); + assert(channel_name); + assert(pathname); + + DBG("Relayd adding stream for channel name %s", channel_name); + + strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name)); + strncpy(msg.pathname, pathname, sizeof(msg.pathname)); + + /* Send command */ + ret = send_command(sock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Back to host bytes order. */ + reply.handle = be64toh(reply.handle); + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTCOMM_OK) { + ret = -reply.ret_code; + ERR("Relayd add stream replied error %d", ret); + } else { + /* Success */ + ret = 0; + *stream_id = reply.handle; + } + + DBG("Relayd stream added successfully with handle %zu", reply.handle); + +error: + return ret; +} + +/* + * Check version numbers on the relayd. + * + * Return 0 if compatible else negative value. + */ +int relayd_version_check(struct lttcomm_sock *sock, uint32_t major, + uint32_t minor) +{ + int ret; + struct lttcomm_relayd_version reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd version check for major.minor %u.%u", major, minor); + + /* Send command */ + ret = send_command(sock, RELAYD_VERSION, NULL, 0, 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Set back to host bytes order */ + reply.major = be32toh(reply.major); + reply.minor = be32toh(reply.minor); + + /* Validate version */ + if (reply.major <= major) { + if (reply.minor <= minor) { + /* Compatible */ + ret = 0; + DBG2("Relayd version is compatible"); + goto error; + } + } + + /* Version number not compatible */ + DBG2("Relayd version is NOT compatible %u.%u > %u.%u", reply.major, + reply.minor, major, minor); + ret = -1; + +error: + return ret; +} + +#if 0 +/* + * Start data command on the relayd. + * + * On success return 0 else return ret_code negative value. + */ +int relayd_start_data(struct lttcomm_sock *sock) +{ + int ret; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd start data command"); + + /* Send command */ + ret = send_command(sock, RELAYD_START_DATA, NULL, 0, 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTCOMM_OK) { + ret = -reply.ret_code; + } else { + /* Success */ + ret = 0; + } + +error: + return ret; +} +#endif + +/* + * Add stream on the relayd and assign stream handle to the stream_id argument. + * + * On success return 0 else return ret_code negative value. + */ +int relayd_send_metadata(struct lttcomm_sock *sock, size_t len) +{ + int ret; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd sending metadata of size %lu", len); + + /* Send command */ + ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0); + if (ret < 0) { + goto error; + } + + DBG2("Relayd metadata added successfully"); + + /* + * After that call, the metadata data MUST be sent to the relayd so the + * receive size on the other end matches the len of the metadata packet + * header. + */ + +error: + return ret; +} + +/* + * Connect to relay daemon with an allocated lttcomm_sock. + */ +int relayd_connect(struct lttcomm_sock *sock) +{ + /* Code flow error. Safety net. */ + assert(sock); + + DBG3("Relayd connect ..."); + + return sock->ops->connect(sock); +} + +/* + * Close relayd socket with an allocated lttcomm_sock. + */ +int relayd_close(struct lttcomm_sock *sock) +{ + /* Code flow error. Safety net. */ + assert(sock); + + DBG3("Relayd closing socket %d", sock->fd); + + return sock->ops->close(sock); +} + +/* + * Send data header structure to the relayd. + */ +int relayd_send_data_hdr(struct lttcomm_sock *sock, + struct lttcomm_relayd_data_hdr *hdr, size_t size) +{ + int ret; + + /* Code flow error. Safety net. */ + assert(sock); + assert(hdr); + + DBG3("Relayd sending data header..."); + + /* Again, safety net */ + if (size == 0) { + size = sizeof(struct lttcomm_relayd_data_hdr); + } + + /* Only send data header. */ + ret = sock->ops->sendmsg(sock, hdr, size, 0); + if (ret < 0) { + goto error; + } + + /* + * The data MUST be sent right after that command for the receive on the + * other end to match the size in the header. + */ + +error: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h new file mode 100644 index 000000000..12efe1ec0 --- /dev/null +++ b/src/common/relayd/relayd.h @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2012 - David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _RELAYD_H +#define _RELAYD_H + +#include + +#include +#include + +int relayd_connect(struct lttcomm_sock *sock); +int relayd_close(struct lttcomm_sock *sock); +#if 0 +int relayd_create_session(struct lttcomm_sock *sock, const char *hostname, + const char *session_name); +#endif +int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, + const char *pathname, uint64_t *stream_id); +int relayd_version_check(struct lttcomm_sock *sock, uint32_t major, + uint32_t minor); +int relayd_start_data(struct lttcomm_sock *sock); +int relayd_send_metadata(struct lttcomm_sock *sock, size_t len); +int relayd_send_data_hdr(struct lttcomm_sock *sock, + struct lttcomm_relayd_data_hdr *hdr, size_t size); + +#endif /* _RELAYD_H */ diff --git a/src/common/uri.c b/src/common/uri.c index cb04d1963..f97b0bba8 100644 --- a/src/common/uri.c +++ b/src/common/uri.c @@ -74,6 +74,39 @@ end: return supported; } +/* + * Set network address from string into dst. Supports both IP string and + * hostname. + */ +static int set_ip_address(const char *addr, int af, char *dst, size_t size) +{ + int ret; + unsigned char buf[sizeof(struct in6_addr)]; + struct hostent *record; + + /* Network protocol */ + ret = inet_pton(af, addr, buf); + if (ret < 1) { + /* We consider the dst to be an hostname or an invalid IP char */ + record = gethostbyname2(addr, af); + if (record == NULL) { + /* At this point, the IP or the hostname is bad */ + printf("bad hostname\n"); + goto error; + } + + /* Translate IP to string */ + (void) inet_ntop(af, record->h_addr_list[0], dst, size); + } else { + memcpy(dst, addr, size); + } + + return 0; + +error: + return -1; +} + /* * Compare two URIs. * @@ -110,41 +143,23 @@ struct lttng_uri *uri_create(void) return uri; } -static int set_ip_address(const char *addr, int af, char *dst, size_t size) -{ - int ret; - unsigned char buf[sizeof(struct in6_addr)]; - struct hostent *record; - - /* Network protocol */ - ret = inet_pton(af, addr, buf); - if (ret < 1) { - /* We consider the dst to be an hostname or an invalid IP char */ - record = gethostbyname2(addr, af); - if (record == NULL) { - /* At this point, the IP or the hostname is bad */ - printf("bad hostname\n"); - goto error; - } - - /* Translate IP to string */ - (void) inet_ntop(af, record->h_addr_list[0], dst, size); - } else { - memcpy(dst, addr, size); - } - - return 0; - -error: - return -1; -} - +/* + * Parses a string URI to a lttng_uri. This function can potentially return + * more than one URI in uris so the size of the array is returned and uris is + * allocated and populated. Caller must free(3) the array. + * + * This function can not detect the stream type of the URI so the caller has to + * make sure the correct type (stype) is set on the return URI(s). The default + * port must also be set by the caller if the returned URI has its port set to + * zero. + */ ssize_t uri_parse(const char *str_uri, struct lttng_uri **uris) { int ret; + size_t str_offset = 0; /* Size of the uris array. Default is 1 */ ssize_t size = 1; - char net[6], dst[LTTNG_MAX_DNNAME + 1]; + char net[6], dst[LTTNG_MAX_DNNAME + 1], subdir[PATH_MAX]; unsigned int ctrl_port = 0; unsigned int data_port = 0; struct lttng_uri *uri; @@ -164,7 +179,7 @@ ssize_t uri_parse(const char *str_uri, struct lttng_uri **uris) goto error; } - DBG3("URI protocol : %s", str_uri); + DBG3("URI string: %s", str_uri); proto = validate_protocol(net); if (proto == NULL) { @@ -178,12 +193,27 @@ ssize_t uri_parse(const char *str_uri, struct lttng_uri **uris) size = 2; } + memset(subdir, 0, sizeof(subdir)); + str_offset += strlen(net); + /* Parse the rest of the URI */ - ret = sscanf(str_uri + strlen(net), "://%255[^:]:%u:%u", dst, - &ctrl_port, &data_port); - if (ret < 0) { - printf("bad URI\n"); - goto error; + if (sscanf(str_uri + str_offset, "://%255[^:]:%u:%u/%s", dst, &ctrl_port, + &data_port, subdir) == 4) { + /* All set */ + } else if (sscanf(str_uri + str_offset, "://%255[^:]:%u:%u", dst, + &ctrl_port, &data_port) == 3) { + } else if (sscanf(str_uri + str_offset, "://%255[^:]:%u/%s", dst, + &ctrl_port, subdir) == 3) { + } else if (sscanf(str_uri + str_offset, "://%255[^:]:%u", dst, + &ctrl_port) == 2) { + } else if (sscanf(str_uri + str_offset, "://%255[^/]/%s", dst, + subdir) == 2) { + } else { + ret = sscanf(str_uri + str_offset, "://%255[^:]", dst); + if (ret < 0) { + ERR("Bad URI"); + goto error; + } } /* We have enough valid information to create URI(s) object */ @@ -199,9 +229,10 @@ ssize_t uri_parse(const char *str_uri, struct lttng_uri **uris) uri[0].dtype = proto->dtype; uri[0].proto = proto->type; uri[0].port = ctrl_port; + strncpy(uri[0].subdir, subdir, sizeof(uri[0].subdir)); - DBG3("URI dtype: %d, proto: %d, port: %d", proto->dtype, proto->type, - ctrl_port); + DBG3("URI dtype: %d, proto: %d, host: %s, subdir: %s, ctrl: %d, data: %d", + proto->dtype, proto->type, dst, subdir, ctrl_port, data_port); switch (proto->code) { case P_FILE: diff --git a/src/common/ust-consumer/Makefile.am b/src/common/ust-consumer/Makefile.am index 798ad9aa6..c79c4a55e 100644 --- a/src/common/ust-consumer/Makefile.am +++ b/src/common/ust-consumer/Makefile.am @@ -3,6 +3,8 @@ noinst_LTLIBRARIES = libust-consumer.la libust_consumer_la_SOURCES = ust-consumer.c ust-consumer.h -libust_consumer_la_LIBADD = -llttng-ust-ctl +libust_consumer_la_LIBADD = \ + -llttng-ust-ctl \ + $(top_builddir)/src/common/relayd/librelayd.la endif diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 47c0d460f..f6add2c2c 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -31,6 +31,7 @@ #include #include +#include #include #include "ust-consumer.h" @@ -52,6 +53,16 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( long ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + goto end; + } + } /* get the offset inside the fd to mmap */ ret = ustctl_get_mmap_read_offset(stream->chan->handle, @@ -62,6 +73,37 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( written = ret; goto end; } + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Only lock if metadata since we use the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + + ret = consumer_handle_stream_before_relayd(stream, len); + if (ret >= 0) { + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + } while (errno == EINTR); + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + while (len > 0) { ret = write(outfd, stream->mmap_base + mmap_offset, len); if (ret < 0) { @@ -76,21 +118,30 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( goto end; } } else if (ret > len) { - PERROR("Error in file write"); + PERROR("ret %ld > len %lu", ret, len); written += ret; goto end; } else { len -= ret; mmap_offset += ret; } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; + DBG("UST mmap write() ret %ld (len %lu)", ret, len); + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; + } written += ret; } lttng_consumer_sync_trace_file(stream, orig_offset); + end: + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } return written; } @@ -163,6 +214,84 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + int fd; + struct consumer_relayd_sock_pair *relayd; + + DBG("UST Consumer adding relayd socket"); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair( + msg.u.relayd_sock.net_index); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + goto end_nosignal; + } + + /* Copy socket information and received FD */ + switch (msg.u.relayd_sock.type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type"); + goto end_nosignal; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -206,6 +335,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *new_stream; int fds[2]; size_t nb_fd = 2; + struct consumer_relayd_sock_pair *relayd = NULL; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { @@ -217,8 +347,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } - DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, - fds[0], fds[1]); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.channel.channel_key, msg.u.stream.stream_key, @@ -228,11 +356,33 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Add stream on the relayd */ + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { @@ -243,6 +393,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_add_stream(new_stream); } + + DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu", + msg.u.stream.path_name, fds[0], fds[1], + new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -412,7 +566,7 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (stream->path_name != NULL && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO, diff --git a/src/lib/lttng-ctl/lttng-ctl.c b/src/lib/lttng-ctl/lttng-ctl.c index a1256e12d..a05a8049e 100644 --- a/src/lib/lttng-ctl/lttng-ctl.c +++ b/src/lib/lttng-ctl/lttng-ctl.c @@ -75,19 +75,19 @@ static void copy_lttng_domain(struct lttng_domain *dst, struct lttng_domain *src { if (src && dst) { switch (src->type) { - case LTTNG_DOMAIN_KERNEL: - case LTTNG_DOMAIN_UST: - /* - case LTTNG_DOMAIN_UST_EXEC_NAME: - case LTTNG_DOMAIN_UST_PID: - case LTTNG_DOMAIN_UST_PID_FOLLOW_CHILDREN: - */ - memcpy(dst, src, sizeof(struct lttng_domain)); - break; - default: - memset(dst, 0, sizeof(struct lttng_domain)); - dst->type = LTTNG_DOMAIN_KERNEL; - break; + case LTTNG_DOMAIN_KERNEL: + case LTTNG_DOMAIN_UST: + /* + case LTTNG_DOMAIN_UST_EXEC_NAME: + case LTTNG_DOMAIN_UST_PID: + case LTTNG_DOMAIN_UST_PID_FOLLOW_CHILDREN: + */ + memcpy(dst, src, sizeof(struct lttng_domain)); + break; + default: + memset(dst, 0, sizeof(struct lttng_domain)); + dst->type = LTTNG_DOMAIN_KERNEL; + break; } } } @@ -736,6 +736,39 @@ int lttng_create_session(const char *name, const char *path) return ask_sessiond(&lsm, NULL); } +/* + * Create a new tracing session using a name, URIs and a consumer enable flag. + */ +int lttng_create_session_uri(const char *name, struct lttng_uri *ctrl_uri, + struct lttng_uri *data_uri, unsigned int enable_consumer) +{ + struct lttcomm_session_msg lsm; + + /* Name and ctrl_uri are mandatory */ + if (name == NULL || ctrl_uri == NULL) { + return -1; + } + + lsm.cmd_type = LTTNG_CREATE_SESSION_URI; + + copy_string(lsm.session.name, name, sizeof(lsm.session.name)); + /* Anything bigger than zero, the consumer(s) will be enabled */ + lsm.u.create_uri.enable_consumer = enable_consumer; + memcpy(&lsm.u.create_uri.ctrl_uri, ctrl_uri, + sizeof(lsm.u.create_uri.ctrl_uri)); + if (data_uri) { + /* + * The only possible scenario where data_uri is NULL is for a local + * consumer where the output is at a specified path name on the + * filesystem. + */ + memcpy(&lsm.u.create_uri.data_uri, data_uri, + sizeof(lsm.u.create_uri.data_uri)); + } + + return ask_sessiond(&lsm, NULL); +} + /* * Destroy session using name. * Returns size of returned session payload data or a negative error code. @@ -980,6 +1013,74 @@ int lttng_session_daemon_alive(void) return 1; } +/* + * Set URI for a consumer for a session and domain. + * + * Return 0 on success, else a negative value. + */ +int lttng_set_consumer_uri(struct lttng_handle *handle, struct lttng_uri *uri) +{ + struct lttcomm_session_msg lsm; + + if (handle == NULL || uri == NULL) { + return -1; + } + + lsm.cmd_type = LTTNG_SET_CONSUMER_URI; + + copy_string(lsm.session.name, handle->session_name, + sizeof(lsm.session.name)); + copy_lttng_domain(&lsm.domain, &handle->domain); + + memcpy(&lsm.u.uri, uri, sizeof(lsm.u.uri)); + + return ask_sessiond(&lsm, NULL); +} + +/* + * Enable consumer for a session and domain. + * + * Return 0 on success, else a negative value. + */ +int lttng_enable_consumer(struct lttng_handle *handle) +{ + struct lttcomm_session_msg lsm; + + if (handle == NULL) { + return -1; + } + + lsm.cmd_type = LTTNG_ENABLE_CONSUMER; + + copy_string(lsm.session.name, handle->session_name, + sizeof(lsm.session.name)); + copy_lttng_domain(&lsm.domain, &handle->domain); + + return ask_sessiond(&lsm, NULL); +} + +/* + * Disable consumer for a session and domain. + * + * Return 0 on success, else a negative value. + */ +int lttng_disable_consumer(struct lttng_handle *handle) +{ + struct lttcomm_session_msg lsm; + + if (handle == NULL) { + return -1; + } + + lsm.cmd_type = LTTNG_DISABLE_CONSUMER; + + copy_string(lsm.session.name, handle->session_name, + sizeof(lsm.session.name)); + copy_lttng_domain(&lsm.domain, &handle->domain); + + return ask_sessiond(&lsm, NULL); +} + /* * lib constructor */ diff --git a/tests/tools/Makefile.am b/tests/tools/Makefile.am index 9f0ba7347..0f4b69c04 100644 --- a/tests/tools/Makefile.am +++ b/tests/tools/Makefile.am @@ -7,9 +7,12 @@ noinst_PROGRAMS = test_sessions test_kernel_data_trace UTILS=../utils.h SESSIONS=$(top_srcdir)/src/bin/lttng-sessiond/session.c -KERN_DATA_TRACE=$(top_srcdir)/src/bin/lttng-sessiond/trace-kernel.c +KERN_DATA_TRACE=$(top_srcdir)/src/bin/lttng-sessiond/trace-kernel.c \ + $(top_srcdir)/src/bin/lttng-sessiond/consumer.c \ + $(top_srcdir)/src/common/uri.c COMMON=$(top_builddir)/src/common/libcommon.la HASHTABLE=$(top_builddir)/src/common/hashtable/libhashtable.la +SESSIOND_COMM=$(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la # Tracing sessions unit tests test_sessions_SOURCES = test_sessions.c $(UTILS) $(SESSIONS) @@ -17,11 +20,14 @@ test_sessions_LDADD = $(COMMON) $(HASHTABLE) # Kernel trace data unit tests test_kernel_data_trace_SOURCES = test_kernel_data_trace.c $(UTILS) $(KERN_DATA_TRACE) +test_kernel_data_trace_LDADD = $(SESSIOND_COMM) if HAVE_LIBLTTNG_UST_CTL noinst_PROGRAMS += test_ust_data_trace -UST_DATA_TRACE=$(top_srcdir)/src/bin/lttng-sessiond/trace-ust.c +UST_DATA_TRACE=$(top_srcdir)/src/bin/lttng-sessiond/trace-ust.c \ + $(top_srcdir)/src/bin/lttng-sessiond/consumer.c \ + $(top_srcdir)/src/common/uri.c # UST trace data unit tests test_ust_data_trace_SOURCES = test_ust_data_trace.c $(UTILS) $(UST_DATA_TRACE) -test_ust_data_trace_LDADD = $(COMMON) $(HASHTABLE) +test_ust_data_trace_LDADD = $(COMMON) $(HASHTABLE) $(SESSIOND_COMM) endif diff --git a/tests/tools/test_kernel_data_trace.c b/tests/tools/test_kernel_data_trace.c index 6d7f291be..3e9dc442a 100644 --- a/tests/tools/test_kernel_data_trace.c +++ b/tests/tools/test_kernel_data_trace.c @@ -96,7 +96,6 @@ static void create_kernel_metadata(void) printf("Validating kernel session metadata: "); assert(kern->metadata->fd == -1); - assert(strlen(kern->metadata->pathname)); assert(kern->metadata->conf != NULL); assert(kern->metadata->conf->attr.overwrite == DEFAULT_CHANNEL_OVERWRITE); @@ -130,7 +129,6 @@ static void create_kernel_channel(void) printf("Validating kernel channel: "); assert(chan->fd == -1); assert(chan->enabled == 1); - assert(strcmp(PATH1, chan->pathname) == 0); assert(chan->stream_count == 0); assert(chan->ctx == NULL); assert(chan->channel->attr.overwrite == attr.attr.overwrite); @@ -174,13 +172,12 @@ static void create_kernel_stream(void) struct ltt_kernel_stream *stream; printf("Creating kernel stream: "); - stream = trace_kernel_create_stream(); + stream = trace_kernel_create_stream("stream1", 0); assert(stream != NULL); PRINT_OK(); printf("Validating kernel stream: "); assert(stream->fd == -1); - assert(stream->pathname == NULL); assert(stream->state == 0); PRINT_OK(); diff --git a/tests/tools/test_sessions.c b/tests/tools/test_sessions.c index e55593561..ce716e8c1 100644 --- a/tests/tools/test_sessions.c +++ b/tests/tools/test_sessions.c @@ -221,6 +221,7 @@ static int fuzzing_destroy_args(void) static int two_session_same_name(void) { int ret; + struct ltt_session *sess; ret = create_one_session(SESSION1, PATH1); if (ret < 0) { @@ -228,8 +229,8 @@ static int two_session_same_name(void) return -1; } - ret = create_one_session(SESSION1, PATH1); - if (ret < 0) { + sess = session_find_by_name(SESSION1); + if (sess) { /* Success */ return 0; } -- 2.34.1