From: Julien Desfossez Date: Sun, 28 Jul 2013 22:57:41 +0000 (-0400) Subject: Implement the relayd live features X-Git-Tag: v2.4.0-rc1~143 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=d3e2ba59faddb31870e2ce29b6a881f7ad5ad883;p=lttng-tools.git Implement the relayd live features The live-reading feature allows a user to read the trace while it is being recorded. In order to work, this feature requires the trace to be streamed to lttng-relayd. Then, a viewer can connect to the relayd, list the sessions, attach to one, and start reading the data. The live-reading protocol, enforces the viewer to read all the metadata before trying to read the trace, and checks all the streams for new activity at a user-defined rate (configured by the new --live parameter during the session creation). Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index e05a4b150..59397594b 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -367,17 +367,11 @@ int main(int argc, char **argv) lttng_consumer_set_error_sock(ctx, ret); /* - * For UST consumer, we block RT signals used for periodical metadata flush - * in main and create a dedicated thread to handle these signals. + * Block RT signals used for UST periodical metadata flush and the live + * timer in main, and create a dedicated thread to handle these signals. */ - switch (opt_type) { - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - consumer_signal_init(); - break; - default: - break; - } + consumer_signal_init(); + ctx->type = opt_type; /* Initialize communication library */ @@ -415,25 +409,21 @@ int main(int argc, char **argv) goto sessiond_error; } - switch (opt_type) { - case LTTNG_CONSUMER32_UST: - case LTTNG_CONSUMER64_UST: - /* Create the thread to manage the metadata periodic timers */ - ret = pthread_create(&metadata_timer_thread, NULL, - consumer_timer_metadata_thread, (void *) ctx); - if (ret != 0) { - perror("pthread_create"); - goto metadata_timer_error; - } + /* + * Create the thread to manage the UST metadata periodic timer and + * live timer. + */ + ret = pthread_create(&metadata_timer_thread, NULL, + consumer_timer_thread, (void *) ctx); + if (ret != 0) { + perror("pthread_create"); + goto metadata_timer_error; + } - ret = pthread_detach(metadata_timer_thread); - if (ret) { - errno = ret; - perror("pthread_detach"); - } - break; - default: - break; + ret = pthread_detach(metadata_timer_thread); + if (ret) { + errno = ret; + perror("pthread_detach"); } metadata_timer_error: diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index 853c21ae9..4f23dbc43 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -7,10 +7,11 @@ AM_CFLAGS = -fno-strict-aliasing bin_PROGRAMS = lttng-relayd lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ - index.c index.h \ + index.c index.h live.c live.h ctf-trace.c ctf-trace.h \ cmd-generic.c cmd-generic.h \ cmd-2-1.c cmd-2-1.h \ - cmd-2-2.c cmd-2-2.h + cmd-2-2.c cmd-2-2.h \ + cmd-2-4.c cmd-2-4.h # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \ diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c new file mode 100644 index 000000000..45410e4e8 --- /dev/null +++ b/src/bin/lttng-relayd/cmd-2-4.c @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include + +#include +#include + +#include "cmd-generic.h" +#include "lttng-relayd.h" + +int cmd_create_session_2_4(struct relay_command *cmd, + struct relay_session *session) +{ + int ret; + struct lttcomm_relayd_create_session_2_4 session_info; + + assert(cmd); + assert(session); + + ret = cmd_recv(cmd->sock, &session_info, sizeof(session_info)); + if (ret < 0) { + ERR("Unable to recv session info version 2.4"); + goto error; + } + + strncpy(session->session_name, session_info.session_name, + sizeof(session->session_name)); + strncpy(session->hostname, session_info.hostname, + sizeof(session->hostname)); + session->live_timer = be32toh(session_info.live_timer); + + ret = 0; + +error: + return ret; +} diff --git a/src/bin/lttng-relayd/cmd-2-4.h b/src/bin/lttng-relayd/cmd-2-4.h new file mode 100644 index 000000000..fc611345b --- /dev/null +++ b/src/bin/lttng-relayd/cmd-2-4.h @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef RELAYD_CMD_2_4_H +#define RELAYD_CMD_2_4_H + +#include "lttng-relayd.h" + +int cmd_create_session_2_4(struct relay_command *cmd, + struct relay_session *session); + +#endif /* RELAYD_CMD_2_4_H */ diff --git a/src/bin/lttng-relayd/cmd.h b/src/bin/lttng-relayd/cmd.h index 14634954b..c8b37d5e5 100644 --- a/src/bin/lttng-relayd/cmd.h +++ b/src/bin/lttng-relayd/cmd.h @@ -22,5 +22,6 @@ #include "cmd-generic.h" #include "cmd-2-1.h" #include "cmd-2-2.h" +#include "cmd-2-4.h" #endif /* RELAYD_CMD_H */ diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c new file mode 100644 index 000000000..adfbac3f0 --- /dev/null +++ b/src/bin/lttng-relayd/ctf-trace.c @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include + +#include +#include + +#include "ctf-trace.h" + +static uint64_t last_relay_ctf_trace_id; + +/* + * Try to destroy a ctf_trace object meaning that the refcount is decremented + * and checked if down to 0 which will free it. + */ +void ctf_trace_try_destroy(struct ctf_trace *obj) +{ + unsigned long ret_ref; + + if (!obj) { + return; + } + + ret_ref = uatomic_add_return(&obj->refcount, -1); + assert(ret_ref >= 0); + if (ret_ref == 0) { + DBG("Freeing ctf_trace %" PRIu64, obj->id); + free(obj); + } +} + +/* + * Create and return an allocated ctf_trace object. NULL on error. + */ +struct ctf_trace *ctf_trace_create(void) +{ + struct ctf_trace *obj; + + obj = zmalloc(sizeof(*obj)); + if (!obj) { + PERROR("ctf_trace alloc"); + goto error; + } + + obj->id = ++last_relay_ctf_trace_id; + DBG("Created ctf_trace %" PRIu64, obj->id); + +error: + return obj; +} + +/* + * Check if we can assign the ctf_trace id and metadata stream to one or all + * the streams with the same path_name (our unique ID for ctf traces). + * + * The given stream MUST be new and NOT visible (in any hash table). + */ +void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream) +{ + struct lttng_ht_iter iter; + struct relay_stream *tmp_stream; + + assert(ht); + assert(stream); + + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct((void *) stream->path_name, lttng_ht_seed), + ht->match_fct, (void *) stream->path_name, + &iter.iter, tmp_stream, ctf_trace_node.node) { + if (stream->metadata_flag) { + /* + * The new stream is the metadata stream for this trace, + * assign the ctf_trace pointer to all the streams in + * this bucket. + */ + pthread_mutex_lock(&tmp_stream->lock); + tmp_stream->ctf_trace = stream->ctf_trace; + uatomic_inc(&tmp_stream->ctf_trace->refcount); + pthread_mutex_unlock(&tmp_stream->lock); + DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64, + tmp_stream->ctf_trace->id, tmp_stream->stream_handle); + } else if (tmp_stream->ctf_trace) { + /* + * The ctf_trace already exists for this bucket, + * just assign the pointer to the new stream and exit. + */ + stream->ctf_trace = tmp_stream->ctf_trace; + uatomic_inc(&stream->ctf_trace->refcount); + DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64, + tmp_stream->ctf_trace->id, tmp_stream->stream_handle); + goto end; + } else { + /* + * We don't know yet the ctf_trace ID (no metadata has been added), + * so leave it there until the metadata stream arrives. + */ + goto end; + } + } + +end: + rcu_read_unlock(); + return; +} + diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h new file mode 100644 index 000000000..6e39af02b --- /dev/null +++ b/src/bin/lttng-relayd/ctf-trace.h @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _CTF_TRACE_H +#define _CTF_TRACE_H + +#include + +#include + +#include "lttng-relayd.h" + +struct ctf_trace { + int refcount; + uint64_t id; + uint64_t metadata_received; + uint64_t metadata_sent; + struct relay_stream *metadata_stream; +}; + +void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream); +struct ctf_trace *ctf_trace_create(void); +void ctf_trace_try_destroy(struct ctf_trace *obj); + +#endif /* _CTF_TRACE_H */ diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c new file mode 100644 index 000000000..03aab2d3c --- /dev/null +++ b/src/bin/lttng-relayd/live.c @@ -0,0 +1,1840 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cmd.h" +#include "live.h" +#include "lttng-relayd.h" +#include "lttng-viewer.h" +#include "utils.h" + +static struct lttng_uri *live_uri; + +/* + * Quit pipe for all threads. This permits a single cancellation point + * for all threads when receiving an event on the pipe. + */ +static int live_thread_quit_pipe[2] = { -1, -1 }; + +/* + * This pipe is used to inform the worker thread that a command is queued and + * ready to be processed. + */ +static int live_relay_cmd_pipe[2] = { -1, -1 }; + +/* Shared between threads */ +static int live_dispatch_thread_exit; + +static pthread_t live_listener_thread; +static pthread_t live_dispatcher_thread; +static pthread_t live_worker_thread; + +/* + * Relay command queue. + * + * The live_thread_listener and live_thread_dispatcher communicate with this + * queue. + */ +static struct relay_cmd_queue viewer_cmd_queue; + +static uint64_t last_relay_viewer_session_id; + +/* + * Cleanup the daemon + */ +static +void cleanup(void) +{ + DBG("Cleaning up"); + + /* Close thread quit pipes */ + utils_close_pipe(live_thread_quit_pipe); + free(live_uri); +} + +/* + * Write to writable pipe used to notify a thread. + */ +static +int notify_thread_pipe(int wpipe) +{ + int ret; + + do { + ret = write(wpipe, "!", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write poll pipe"); + } + + return ret; +} + +/* + * Stop all threads by closing the thread quit pipe. + */ +static +void stop_threads(void) +{ + int ret; + + /* Stopping all threads */ + DBG("Terminating all live threads"); + ret = notify_thread_pipe(live_thread_quit_pipe[1]); + if (ret < 0) { + ERR("write error on thread quit pipe"); + } + + /* Dispatch thread */ + CMM_STORE_SHARED(live_dispatch_thread_exit, 1); + futex_nto1_wake(&viewer_cmd_queue.futex); +} + +/* + * Init thread quit pipe. + * + * Return -1 on error or 0 if all pipes are created. + */ +static +int init_thread_quit_pipe(void) +{ + int ret; + + ret = utils_create_pipe_cloexec(live_thread_quit_pipe); + + return ret; +} + +/* + * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. + */ +static +int create_thread_poll_set(struct lttng_poll_event *events, int size) +{ + int ret; + + if (events == NULL || size == 0) { + ret = -1; + goto error; + } + + ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); + if (ret < 0) { + goto error; + } + + /* Add quit pipe */ + ret = lttng_poll_add(events, live_thread_quit_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + return 0; + +error: + return ret; +} + +/* + * Check if the thread quit pipe was triggered. + * + * Return 1 if it was triggered else 0; + */ +static +int check_thread_quit_pipe(int fd, uint32_t events) +{ + if (fd == live_thread_quit_pipe[0] && (events & LPOLLIN)) { + return 1; + } + + return 0; +} + +/* + * Create and init socket from uri. + */ +static +struct lttcomm_sock *init_socket(struct lttng_uri *uri) +{ + int ret; + struct lttcomm_sock *sock = NULL; + + sock = lttcomm_alloc_sock_from_uri(uri); + if (sock == NULL) { + ERR("Allocating socket"); + goto error; + } + + ret = lttcomm_create_sock(sock); + if (ret < 0) { + goto error; + } + DBG("Listening on sock %d for live", sock->fd); + + ret = sock->ops->bind(sock); + if (ret < 0) { + goto error; + } + + ret = sock->ops->listen(sock, -1); + if (ret < 0) { + goto error; + + } + + return sock; + +error: + if (sock) { + lttcomm_destroy_sock(sock); + } + return NULL; +} + +/* + * This thread manages the listening for new connections on the network + */ +static +void *thread_listener(void *data) +{ + int i, ret, pollfd, err = -1; + int val = 1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct lttcomm_sock *live_control_sock; + + DBG("[thread] Relay live listener started"); + + live_control_sock = init_socket(live_uri); + if (!live_control_sock) { + goto error_sock_control; + } + + /* + * Pass 3 as size here for the thread quit pipe, control and data socket. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error_create_poll; + } + + /* Add the control socket */ + ret = lttng_poll_add(&events, live_control_sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error_poll_add; + } + + while (1) { + DBG("Listener accepting live viewers connections"); + +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + nb_fd = ret; + + DBG("Relay new viewer connection received"); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else if (revents & LPOLLIN) { + /* + * Get allocated in this thread, enqueued to a global queue, + * dequeued and freed in the worker thread. + */ + struct relay_command *relay_cmd; + struct lttcomm_sock *newsock; + + relay_cmd = zmalloc(sizeof(*relay_cmd)); + if (!relay_cmd) { + PERROR("relay command zmalloc"); + goto error; + } + + assert(pollfd == live_control_sock->fd); + newsock = live_control_sock->ops->accept(live_control_sock); + if (!newsock) { + PERROR("accepting control sock"); + free(relay_cmd); + goto error; + } + DBG("Relay viewer connection accepted socket %d", newsock->fd); + ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val, + sizeof(int)); + if (ret < 0) { + PERROR("setsockopt inet"); + lttcomm_destroy_sock(newsock); + free(relay_cmd); + goto error; + } + relay_cmd->sock = newsock; + + /* + * Lock free enqueue the request. + */ + cds_wfq_enqueue(&viewer_cmd_queue.queue, &relay_cmd->node); + + /* + * Wake the dispatch queue futex. Implicit memory + * barrier with the exchange in cds_wfq_enqueue. + */ + futex_nto1_wake(&viewer_cmd_queue.futex); + } + } + } + +exit: +error: +error_poll_add: + lttng_poll_clean(&events); +error_create_poll: + if (live_control_sock->fd >= 0) { + ret = live_control_sock->ops->close(live_control_sock); + if (ret) { + PERROR("close"); + } + } + lttcomm_destroy_sock(live_control_sock); +error_sock_control: + if (err) { + DBG("Live viewer listener thread exited with error"); + } + DBG("Live viewer listener thread cleanup complete"); + stop_threads(); + return NULL; +} + +/* + * This thread manages the dispatching of the requests to worker threads + */ +static +void *thread_dispatcher(void *data) +{ + int ret; + struct cds_wfq_node *node; + struct relay_command *relay_cmd = NULL; + + DBG("[thread] Live viewer relay dispatcher started"); + + while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + /* Atomically prepare the queue futex */ + futex_nto1_prepare(&viewer_cmd_queue.futex); + + do { + /* Dequeue commands */ + node = cds_wfq_dequeue_blocking(&viewer_cmd_queue.queue); + if (node == NULL) { + DBG("Woken up but nothing in the live-viewer " + "relay command queue"); + /* Continue thread execution */ + break; + } + + relay_cmd = caa_container_of(node, struct relay_command, node); + DBG("Dispatching viewer request waiting on sock %d", + relay_cmd->sock->fd); + + /* + * Inform worker thread of the new request. This call is blocking + * so we can be assured that the data will be read at some point in + * time or wait to the end of the world :) + */ + do { + ret = write(live_relay_cmd_pipe[1], relay_cmd, + sizeof(*relay_cmd)); + } while (ret < 0 && errno == EINTR); + free(relay_cmd); + if (ret < 0 || ret != sizeof(struct relay_command)) { + PERROR("write cmd pipe"); + goto error; + } + } while (node != NULL); + + /* Futex wait on queue. Blocking call on futex() */ + futex_nto1_wait(&viewer_cmd_queue.futex); + } + +error: + DBG("Live viewer dispatch thread dying"); + stop_threads(); + return NULL; +} + +/* + * Establish connection with the viewer and check the versions. + * + * Return 0 on success or else negative value. + */ +static +int viewer_connect(struct relay_command *cmd) +{ + int ret; + struct lttng_viewer_connect reply, msg; + + assert(cmd); + + cmd->version_check_done = 1; + + /* Get version from the other side. */ + ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0); + if (ret < 0 || ret != sizeof(msg)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay failed to receive the version values."); + } + ret = -1; + goto end; + } + + reply.major = RELAYD_VERSION_COMM_MAJOR; + reply.minor = RELAYD_VERSION_COMM_MINOR; + + /* Major versions must be the same */ + if (reply.major != be32toh(msg.major)) { + DBG("Incompatible major versions (%u vs %u)", reply.major, + be32toh(msg.major)); + ret = 0; + goto end; + } + + cmd->major = reply.major; + /* We adapt to the lowest compatible version */ + if (reply.minor <= be32toh(msg.minor)) { + cmd->minor = reply.minor; + } else { + cmd->minor = be32toh(msg.minor); + } + + if (be32toh(msg.type) == VIEWER_CLIENT_COMMAND) { + cmd->type = RELAY_VIEWER_COMMAND; + } else if (be32toh(msg.type) == VIEWER_CLIENT_NOTIFICATION) { + cmd->type = RELAY_VIEWER_NOTIFICATION; + } else { + ERR("Unknown connection type : %u", be32toh(msg.type)); + ret = -1; + goto end; + } + + reply.major = htobe32(reply.major); + reply.minor = htobe32(reply.minor); + if (cmd->type == RELAY_VIEWER_COMMAND) { + reply.viewer_session_id = htobe64(++last_relay_viewer_session_id); + } + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttng_viewer_connect), 0); + if (ret < 0) { + ERR("Relay sending version"); + } + + DBG("Version check done using protocol %u.%u", cmd->major, cmd->minor); + ret = 0; + +end: + return ret; +} + +/* + * Send the viewer the list of current sessions. + * + * Return 0 on success or else a negative value. + */ +static +int viewer_list_sessions(struct relay_command *cmd, + struct lttng_ht *sessions_ht) +{ + int ret; + struct lttng_viewer_list_sessions session_list; + unsigned long count; + long approx_before, approx_after; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct lttng_viewer_session send_session; + struct relay_session *session; + + DBG("List sessions received"); + + if (cmd->version_check_done == 0) { + ERR("Trying to list sessions before version check"); + ret = -1; + goto end_no_session; + } + + rcu_read_lock(); + cds_lfht_count_nodes(sessions_ht->ht, &approx_before, &count, &approx_after); + session_list.sessions_count = htobe32(count); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &session_list, + sizeof(session_list), 0); + if (ret < 0) { + ERR("Relay sending sessions list"); + goto end_unlock; + } + + cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, node, node) { + node = lttng_ht_iter_get_node_ulong(&iter); + if (!node) { + goto end_unlock; + } + session = caa_container_of(node, struct relay_session, session_n); + + strncpy(send_session.session_name, session->session_name, + sizeof(send_session.session_name)); + strncpy(send_session.hostname, session->hostname, + sizeof(send_session.hostname)); + send_session.id = htobe64(session->id); + send_session.live_timer = htobe32(session->live_timer); + send_session.clients = htobe32(session->viewer_attached); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &send_session, + sizeof(send_session), 0); + if (ret < 0) { + ERR("Relay sending session info"); + goto end_unlock; + } + } + rcu_read_unlock(); + ret = 0; + goto end; + +end_unlock: + rcu_read_unlock(); + +end: +end_no_session: + return ret; +} + +/* + * Allocate and init a new viewer_stream. + * + * Copies the values from the stream passed in parameter and insert the new + * stream in the viewer_streams_ht. + * + * MUST be called with rcu_read_lock held. + * + * Returns 0 on success or a negative value on error. + */ +static +int init_viewer_stream(struct relay_stream *stream, + struct lttng_ht *viewer_streams_ht) +{ + int ret; + struct relay_viewer_stream *viewer_stream; + + assert(stream); + assert(viewer_streams_ht); + + viewer_stream = zmalloc(sizeof(*viewer_stream)); + if (!viewer_stream) { + PERROR("relay viewer stream zmalloc"); + ret = -1; + goto error; + } + + viewer_stream->read_fd = -1; + viewer_stream->index_read_fd = -1; + viewer_stream->session_id = stream->session->id; + viewer_stream->stream_handle = stream->stream_handle; + viewer_stream->path_name = strndup(stream->path_name, + LTTNG_VIEWER_PATH_MAX); + viewer_stream->channel_name = strndup(stream->channel_name, + LTTNG_VIEWER_NAME_MAX); + viewer_stream->total_index_received = stream->total_index_received; + viewer_stream->tracefile_size = stream->tracefile_size; + viewer_stream->tracefile_count = stream->tracefile_count; + viewer_stream->metadata_flag = stream->metadata_flag; + + /* + * This is to avoid a race between the initialization of this object and + * the close of the given stream. If the stream is unable to find this + * viewer stream when closing, this copy will at least take the latest + * value. + */ + viewer_stream->total_index_received = stream->total_index_received; + + /* + * The deletion of this ctf_trace object is only done in a call RCU of the + * relay stream making it valid as long as we have the read side lock. + */ + viewer_stream->ctf_trace = stream->ctf_trace; + uatomic_inc(&viewer_stream->ctf_trace->refcount); + + lttng_ht_node_init_u64(&viewer_stream->stream_n, stream->stream_handle); + lttng_ht_add_unique_u64(viewer_streams_ht, &viewer_stream->stream_n); + + ret = 0; + +error: + return ret; +} + +/* + * Send the viewer the list of current sessions. + */ +static +int viewer_attach_session(struct relay_command *cmd, + struct lttng_ht *sessions_ht, + struct lttng_ht *viewer_streams_ht) +{ + int ret, send_streams = 0, nb_streams = 0; + struct lttng_viewer_attach_session_request request; + struct lttng_viewer_attach_session_response response; + struct lttng_viewer_stream send_stream; + struct relay_stream *stream; + struct relay_viewer_stream *viewer_stream; + struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node64; + struct lttng_ht_iter iter; + struct relay_session *session; + + assert(cmd); + assert(sessions_ht); + assert(viewer_streams_ht); + + DBG("Attach session received"); + + if (cmd->version_check_done == 0) { + ERR("Trying to attach session before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &request, sizeof(request), 0); + if (ret < 0 || ret != sizeof(request)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay failed to receive the attach parameters."); + } + ret = -1; + goto error; + } + + rcu_read_lock(); + lttng_ht_lookup(sessions_ht, + (void *)((unsigned long) be64toh(request.session_id)), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay session %" PRIu64 " not found", + be64toh(request.session_id)); + response.status = htobe32(VIEWER_ATTACH_UNK); + goto send_reply; + } + + session = caa_container_of(node, struct relay_session, session_n); + if (cmd->session == session) { + /* Same viewer already attached, just send the stream list. */ + send_streams = 1; + response.status = htobe32(VIEWER_ATTACH_OK); + } else if (session->viewer_attached != 0) { + DBG("Already a viewer attached"); + response.status = htobe32(VIEWER_ATTACH_ALREADY); + goto send_reply; + } else if (session->live_timer == 0) { + DBG("Not live session"); + response.status = htobe32(VIEWER_ATTACH_NOT_LIVE); + goto send_reply; + } else { + session->viewer_attached++; + send_streams = 1; + response.status = htobe32(VIEWER_ATTACH_OK); + cmd->session = session; + } + + switch (be32toh(request.seek)) { + case VIEWER_SEEK_BEGINNING: + /* Default behaviour. */ + break; + case VIEWER_SEEK_LAST: + /* TODO */ + break; + default: + ERR("Wrong seek parameter"); + response.status = htobe32(VIEWER_ATTACH_SEEK_ERR); + send_streams = 0; + goto send_reply; + } + + if (send_streams) { + /* We should only be there if we have a session to attach to. */ + assert(session); + + /* + * Fill the viewer_streams_ht to count the number of streams + * ready to be sent and avoid concurrency issues on the + * relay_streams_ht and don't rely on a total session stream count. + */ + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { + struct relay_viewer_stream *vstream; + + node = lttng_ht_iter_get_node_ulong(&iter); + if (!node) { + continue; + } + stream = caa_container_of(node, struct relay_stream, stream_n); + if (stream->session != cmd->session) { + continue; + } + + /* + * Don't send streams with no ctf_trace, they are not ready to be + * read. + */ + if (!stream->ctf_trace) { + continue; + } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle, + viewer_streams_ht); + if (!vstream) { + ret = init_viewer_stream(stream, viewer_streams_ht); + if (ret < 0) { + goto end_unlock; + } + } + nb_streams++; + } + response.streams_count = htobe32(nb_streams); + } + +send_reply: + ret = cmd->sock->ops->sendmsg(cmd->sock, &response, sizeof(response), 0); + if (ret < 0) { + ERR("Relay sending viewer attach response"); + goto end_unlock; + } + + /* + * Unknown or busy session, just return gracefully, the viewer knows what + * is happening. + */ + if (!send_streams) { + ret = 0; + goto end_unlock; + } + + /* We should only be there if we have a session to attach to. */ + assert(session); + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) { + node64 = lttng_ht_iter_get_node_u64(&iter); + if (!node64) { + continue; + } + viewer_stream = caa_container_of(node64, struct relay_viewer_stream, + stream_n); + if (viewer_stream->session_id != cmd->session->id) { + continue; + } + + send_stream.id = htobe64(viewer_stream->stream_handle); + send_stream.ctf_trace_id = htobe64(viewer_stream->ctf_trace->id); + send_stream.metadata_flag = htobe32(viewer_stream->metadata_flag); + strncpy(send_stream.path_name, viewer_stream->path_name, + sizeof(send_stream.path_name)); + strncpy(send_stream.channel_name, viewer_stream->channel_name, + sizeof(send_stream.channel_name)); + + ret = cmd->sock->ops->sendmsg(cmd->sock, &send_stream, + sizeof(send_stream), 0); + if (ret < 0) { + ERR("Relay sending stream %" PRIu64, viewer_stream->stream_handle); + goto end_unlock; + } + DBG("Sent stream %" PRIu64 " to viewer", viewer_stream->stream_handle); + } + ret = 0; + +end_unlock: + rcu_read_unlock(); +end_no_session: +error: + return ret; +} + +/* + * Open index file using a given viewer stream. + * + * Return 0 on success or else a negative value. + */ +static int open_index(struct relay_viewer_stream *stream) +{ + int ret; + char fullpath[PATH_MAX]; + struct lttng_packet_index_file_hdr hdr; + + if (stream->tracefile_size > 0) { + /* For now we don't support on-disk ring buffer. */ + ret = -1; + goto end; + } else { + ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR + "/%s" DEFAULT_INDEX_FILE_SUFFIX, + stream->path_name, stream->channel_name); + if (ret < 0) { + PERROR("snprintf index path"); + goto error; + } + } + + DBG("Opening index file %s in read only", fullpath); + ret = open(fullpath, O_RDONLY); + if (ret < 0) { + if (errno == ENOENT) { + ret = ENOENT; + goto error; + } else { + PERROR("opening index in read-only"); + } + goto error; + } + stream->index_read_fd = ret; + DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret); + + do { + ret = read(stream->index_read_fd, &hdr, sizeof(hdr)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Reading index header"); + goto error; + } + if (strncmp(hdr.magic, INDEX_MAGIC, sizeof(hdr.magic)) != 0) { + ERR("Invalid header magic"); + ret = -1; + goto error; + } + if (be32toh(hdr.index_major) != INDEX_MAJOR || + be32toh(hdr.index_minor) != INDEX_MINOR) { + ERR("Invalid header version"); + ret = -1; + goto error; + } + ret = 0; + +error: +end: + return ret; +} + +/* + * Get viewer stream from stream id. + * + * RCU read side lock MUST be acquired. + */ +struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id, + struct lttng_ht *viewer_streams_ht) +{ + struct lttng_ht_node_u64 *node; + struct lttng_ht_iter iter; + struct relay_viewer_stream *stream = NULL; + + assert(viewer_streams_ht); + + lttng_ht_lookup(viewer_streams_ht, &stream_id, &iter); + node = lttng_ht_iter_get_node_u64(&iter); + if (node == NULL) { + DBG("Relay viewer stream %" PRIu64 " not found", stream_id); + goto end; + } + stream = caa_container_of(node, struct relay_viewer_stream, stream_n); + +end: + return stream; +} + +/* + * Send the next index for a stream. + * + * Return 0 on success or else a negative value. + */ +static +int viewer_get_next_index(struct relay_command *cmd, + struct lttng_ht *viewer_streams_ht, struct lttng_ht *sessions_ht) +{ + int ret; + struct lttng_viewer_get_next_index request_index; + struct lttng_viewer_index viewer_index; + struct lttng_packet_index packet_index; + struct relay_viewer_stream *vstream; + struct relay_stream *rstream; + + assert(cmd); + assert(viewer_streams_ht); + assert(sessions_ht); + + DBG("Viewer get next index"); + + if (cmd->version_check_done == 0) { + ERR("Trying to request index before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &request_index, + sizeof(request_index), 0); + if (ret < 0 || ret != sizeof(request_index)) { + ret = -1; + ERR("Relay didn't receive the whole packet"); + goto end; + } + + rcu_read_lock(); + vstream = live_find_viewer_stream_by_id(be64toh(request_index.stream_id), + viewer_streams_ht); + if (!vstream) { + ret = -1; + goto end_unlock; + } + + memset(&viewer_index, 0, sizeof(viewer_index)); + + /* + * The viewer should not ask for index on metadata stream. + */ + if (vstream->metadata_flag) { + viewer_index.status = htobe32(VIEWER_INDEX_HUP); + goto send_reply; + } + + /* First time, we open the index file */ + if (vstream->index_read_fd < 0) { + ret = open_index(vstream); + if (ret == ENOENT) { + /* + * The index is created only when the first data packet arrives, it + * might not be ready at the beginning of the session + */ + viewer_index.status = htobe32(VIEWER_INDEX_RETRY); + goto send_reply; + } else if (ret < 0) { + viewer_index.status = htobe32(VIEWER_INDEX_ERR); + goto send_reply; + } + } + + rstream = relay_stream_find_by_id(vstream->stream_handle); + if (rstream) { + if (rstream->beacon_ts_end != -1ULL && + vstream->last_sent_index == rstream->total_index_received) { + viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE); + viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); + goto send_reply; + } + + if (rstream->total_index_received <= vstream->last_sent_index) { + /* No new index to send, retry later. */ + viewer_index.status = htobe32(VIEWER_INDEX_RETRY); + goto send_reply; + } + } else if (!rstream && + vstream->total_index_received == vstream->last_sent_index) { + /* Last index sent and stream closed */ + viewer_index.status = htobe32(VIEWER_INDEX_HUP); + goto send_reply; + } + + if (!vstream->ctf_trace->metadata_received || + vstream->ctf_trace->metadata_received > + vstream->ctf_trace->metadata_sent) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; + } + + do { + ret = read(vstream->index_read_fd, &packet_index, + sizeof(packet_index)); + } while (ret < 0 && errno == EINTR); + if (ret < sizeof(packet_index)) { + PERROR("Relay reading index file"); + viewer_index.status = htobe32(VIEWER_INDEX_ERR); + } else { + viewer_index.status = htobe32(VIEWER_INDEX_OK); + vstream->last_sent_index++; + } + + /* + * Indexes are stored in big endian, no need to switch before sending. + */ + viewer_index.offset = packet_index.offset; + viewer_index.packet_size = packet_index.packet_size; + viewer_index.content_size = packet_index.content_size; + viewer_index.timestamp_begin = packet_index.timestamp_begin; + viewer_index.timestamp_end = packet_index.timestamp_end; + viewer_index.events_discarded = packet_index.events_discarded; + viewer_index.stream_id = packet_index.stream_id; + +send_reply: + viewer_index.flags = htobe32(viewer_index.flags); + ret = cmd->sock->ops->sendmsg(cmd->sock, &viewer_index, + sizeof(viewer_index), 0); + if (ret < 0) { + ERR("Relay index to viewer"); + goto end_unlock; + } + + DBG("Index %" PRIu64 "for stream %" PRIu64 "sent", + vstream->last_sent_index, vstream->stream_handle); + +end_unlock: + rcu_read_unlock(); + +end_no_session: +end: + return ret; +} + +/* + * Send the next index for a stream + * + * Return 0 on success or else a negative value. + */ +static +int viewer_get_packet(struct relay_command *cmd, + struct lttng_ht *viewer_streams_ht) +{ + int ret, send_data = 0; + char *data = NULL; + uint32_t len = 0; + ssize_t read_len; + struct lttng_viewer_get_packet get_packet_info; + struct lttng_viewer_trace_packet reply; + struct relay_viewer_stream *stream; + + assert(cmd); + assert(viewer_streams_ht); + + DBG2("Relay get data packet"); + + if (cmd->version_check_done == 0) { + ERR("Trying to get packet before version check"); + ret = -1; + goto end; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &get_packet_info, + sizeof(get_packet_info), 0); + if (ret < 0 || ret != sizeof(get_packet_info)) { + ret = -1; + ERR("Relay didn't receive the whole packet"); + goto end; + } + + rcu_read_lock(); + stream = live_find_viewer_stream_by_id(be64toh(get_packet_info.stream_id), + viewer_streams_ht); + if (!stream) { + goto error; + } + assert(stream->ctf_trace); + + /* + * First time we read this stream, we need open the tracefile, we should + * only arrive here if an index has already been sent to the viewer, so the + * tracefile must exist, if it does not it is a fatal error. + */ + if (stream->read_fd < 0) { + char fullpath[PATH_MAX]; + + ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name, + stream->channel_name); + if (ret < 0) { + goto error; + } + ret = open(fullpath, O_RDONLY); + if (ret < 0) { + PERROR("Relay opening trace file"); + goto error; + } + stream->read_fd = ret; + } + + memset(&reply, 0, sizeof(reply)); + + if (!stream->ctf_trace->metadata_received || + stream->ctf_trace->metadata_received > + stream->ctf_trace->metadata_sent) { + reply.status = htobe32(VIEWER_GET_PACKET_ERR); + reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; + + goto send_reply; + } + + len = be32toh(get_packet_info.len); + data = zmalloc(len); + if (!data) { + PERROR("relay data zmalloc"); + goto error; + } + + ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET); + if (ret < 0) { + PERROR("lseek"); + goto error; + } + read_len = read(stream->read_fd, data, len); + if (read_len < (ssize_t) len) { + PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, + stream->read_fd, be64toh(get_packet_info.offset)); + goto error; + } + reply.status = htobe32(VIEWER_GET_PACKET_OK); + reply.len = htobe32(len); + send_data = 1; + goto send_reply; + +error: + reply.status = htobe32(VIEWER_GET_PACKET_ERR); + +send_reply: + reply.flags = htobe32(reply.flags); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay data header to viewer"); + goto end_unlock; + } + + if (send_data) { + ret = cmd->sock->ops->sendmsg(cmd->sock, data, len, 0); + if (ret < 0) { + ERR("Relay send data to viewer"); + goto end_unlock; + } + } + + DBG("Sent %u bytes for stream %" PRIu64, len, + be64toh(get_packet_info.stream_id)); + +end_unlock: + free(data); + rcu_read_unlock(); + +end: + return ret; +} + +/* + * Send the session's metadata + * + * Return 0 on success else a negative value. + */ +static +int viewer_get_metadata(struct relay_command *cmd, + struct lttng_ht *viewer_streams_ht) +{ + int ret = 0; + ssize_t read_len; + uint64_t len = 0; + char *data = NULL; + struct lttng_viewer_get_metadata request; + struct lttng_viewer_metadata_packet reply; + struct relay_viewer_stream *stream; + + assert(cmd); + assert(viewer_streams_ht); + + DBG("Relay get metadata"); + + if (cmd->version_check_done == 0) { + ERR("Trying to get metadata before version check"); + ret = -1; + goto end; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &request, + sizeof(request), 0); + if (ret < 0 || ret != sizeof(request)) { + ret = -1; + ERR("Relay didn't receive the whole packet"); + goto end; + } + + rcu_read_lock(); + stream = live_find_viewer_stream_by_id(be64toh(request.stream_id), + viewer_streams_ht); + if (!stream || !stream->metadata_flag) { + ERR("Invalid metadata stream"); + goto error; + } + assert(stream->ctf_trace); + assert(stream->ctf_trace->metadata_sent <= + stream->ctf_trace->metadata_received); + + len = stream->ctf_trace->metadata_received - + stream->ctf_trace->metadata_sent; + if (len == 0) { + reply.status = htobe32(VIEWER_NO_NEW_METADATA); + goto send_reply; + } + + /* first time, we open the metadata file */ + if (stream->read_fd < 0) { + char fullpath[PATH_MAX]; + + ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name, + stream->channel_name); + if (ret < 0) { + goto error; + } + ret = open(fullpath, O_RDONLY); + if (ret < 0) { + PERROR("Relay opening metadata file"); + goto error; + } + stream->read_fd = ret; + } + + reply.len = htobe64(len); + data = zmalloc(len); + if (!data) { + PERROR("viewer metadata zmalloc"); + goto error; + } + + read_len = read(stream->read_fd, data, len); + if (read_len < (ssize_t) len) { + PERROR("Relay reading metadata file"); + goto error; + } + stream->ctf_trace->metadata_sent += read_len; + reply.status = htobe32(VIEWER_METADATA_OK); + goto send_reply; + +error: + reply.status = htobe32(VIEWER_METADATA_ERR); + +send_reply: + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay data header to viewer"); + goto end_unlock; + } + + if (len > 0) { + ret = cmd->sock->ops->sendmsg(cmd->sock, data, len, 0); + if (ret < 0) { + ERR("Relay send data to viewer"); + goto end_unlock; + } + } + + DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len, + be64toh(request.stream_id)); + + DBG("Metadata sent"); + +end_unlock: + free(data); + rcu_read_unlock(); +end: + return ret; +} + +/* + * live_relay_unknown_command: send -1 if received unknown command + */ +static +void live_relay_unknown_command(struct relay_command *cmd) +{ + struct lttcomm_relayd_generic_reply reply; + int ret; + + reply.ret_code = htobe32(LTTNG_ERR_UNK); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (ret < 0) { + ERR("Relay sending unknown command"); + } +} + +/* + * Process the commands received on the control socket + */ +static +int process_control(struct lttng_viewer_cmd *recv_hdr, + struct relay_command *cmd, struct lttng_ht *sessions_ht, + struct lttng_ht *viewer_streams_ht) +{ + int ret = 0; + + switch (be32toh(recv_hdr->cmd)) { + case VIEWER_CONNECT: + ret = viewer_connect(cmd); + break; + case VIEWER_LIST_SESSIONS: + ret = viewer_list_sessions(cmd, sessions_ht); + break; + case VIEWER_ATTACH_SESSION: + ret = viewer_attach_session(cmd, sessions_ht, + viewer_streams_ht); + break; + case VIEWER_GET_NEXT_INDEX: + ret = viewer_get_next_index(cmd, viewer_streams_ht, sessions_ht); + break; + case VIEWER_GET_PACKET: + ret = viewer_get_packet(cmd, viewer_streams_ht); + break; + case VIEWER_GET_METADATA: + ret = viewer_get_metadata(cmd, viewer_streams_ht); + break; + default: + ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); + live_relay_unknown_command(cmd); + ret = -1; + goto end; + } + +end: + return ret; +} + +static +void cleanup_poll_connection(struct lttng_poll_event *events, int pollfd) +{ + int ret; + + assert(events); + + lttng_poll_del(events, pollfd); + + ret = close(pollfd); + if (ret < 0) { + ERR("Closing pollfd %d", pollfd); + } +} + +/* + * Create and add connection to the given hash table. + * + * Return poll add value or else -1 on error. + */ +static +int add_connection(int fd, struct lttng_poll_event *events, + struct lttng_ht *relay_connections_ht) +{ + int ret; + struct relay_command *relay_connection; + + assert(events); + assert(relay_connections_ht); + + relay_connection = zmalloc(sizeof(struct relay_command)); + if (relay_connection == NULL) { + PERROR("Relay command zmalloc"); + goto error; + } + + do { + ret = read(fd, relay_connection, sizeof(*relay_connection)); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret < sizeof(*relay_connection)) { + PERROR("read relay cmd pipe"); + goto error_read; + } + + lttng_ht_node_init_ulong(&relay_connection->sock_n, + (unsigned long) relay_connection->sock->fd); + rcu_read_lock(); + lttng_ht_add_unique_ulong(relay_connections_ht, + &relay_connection->sock_n); + rcu_read_unlock(); + + return lttng_poll_add(events, relay_connection->sock->fd, + LPOLLIN | LPOLLRDHUP); + +error_read: + free(relay_connection); +error: + return -1; +} + +static +void deferred_free_connection(struct rcu_head *head) +{ + struct relay_command *relay_connection = + caa_container_of(head, struct relay_command, rcu_node); + + if (relay_connection->session && + relay_connection->session->viewer_attached > 0) { + relay_connection->session->viewer_attached--; + } + lttcomm_destroy_sock(relay_connection->sock); + free(relay_connection); +} + +static +void deferred_free_viewer_stream(struct rcu_head *head) +{ + struct relay_viewer_stream *stream = + caa_container_of(head, struct relay_viewer_stream, rcu_node); + + if (stream->ctf_trace) { + uatomic_dec(&stream->ctf_trace->refcount); + assert(uatomic_read(&stream->ctf_trace->refcount) >= 0); + if (uatomic_read(&stream->ctf_trace->refcount) == 0) { + DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id); + free(stream->ctf_trace); + } + } + + free(stream->path_name); + free(stream->channel_name); + free(stream); +} + +static +void viewer_del_streams(struct lttng_ht *viewer_streams_ht, + struct relay_session *session) +{ + int ret; + struct relay_viewer_stream *stream; + struct lttng_ht_node_u64 *node; + struct lttng_ht_iter iter; + + assert(viewer_streams_ht); + assert(session); + + rcu_read_lock(); + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) { + node = lttng_ht_iter_get_node_u64(&iter); + if (!node) { + continue; + } + + stream = caa_container_of(node, struct relay_viewer_stream, stream_n); + if (stream->session_id != session->id) { + continue; + } + + if (stream->read_fd > 0) { + ret = close(stream->read_fd); + if (ret < 0) { + PERROR("close read_fd"); + } + } + if (stream->index_read_fd > 0) { + ret = close(stream->index_read_fd); + if (ret < 0) { + PERROR("close index_read_fd"); + } + } + if (stream->metadata_flag && stream->ctf_trace) { + stream->ctf_trace->metadata_sent = 0; + } + ret = lttng_ht_del(viewer_streams_ht, &iter); + assert(!ret); + call_rcu(&stream->rcu_node, deferred_free_viewer_stream); + } + rcu_read_unlock(); +} + +/* + * Delete and free a connection. + * + * RCU read side lock MUST be acquired. + */ +static +void del_connection(struct lttng_ht *relay_connections_ht, + struct lttng_ht_iter *iter, struct relay_command *relay_connection, + struct lttng_ht *viewer_streams_ht) +{ + int ret; + + assert(relay_connections_ht); + assert(iter); + assert(relay_connection); + assert(viewer_streams_ht); + + ret = lttng_ht_del(relay_connections_ht, iter); + assert(!ret); + + if (relay_connection->session) { + viewer_del_streams(viewer_streams_ht, relay_connection->session); + } + + call_rcu(&relay_connection->rcu_node, deferred_free_connection); +} + +/* + * This thread does the actual work + */ +static +void *thread_worker(void *data) +{ + int ret, err = -1; + uint32_t nb_fd; + struct relay_command *relay_connection; + struct lttng_poll_event events; + struct lttng_ht *relay_connections_ht; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct lttng_viewer_cmd recv_hdr; + struct relay_local_data *relay_ctx = (struct relay_local_data *) data; + struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; + struct lttng_ht *viewer_streams_ht = relay_ctx->viewer_streams_ht; + + DBG("[thread] Live viewer relay worker started"); + + rcu_register_thread(); + + /* table of connections indexed on socket */ + relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!relay_connections_ht) { + goto relay_connections_ht_error; + } + + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error_poll_create; + } + + ret = lttng_poll_add(&events, live_relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + +restart: + while (1) { + int i; + + /* Infinite blocking call, waiting for transmission */ + DBG3("Relayd live viewer worker thread polling..."); + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + nb_fd = ret; + + /* + * Process control. The control connection is prioritised so we don't + * starve it with high throughput tracing data on the data + * connection. + */ + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + uint32_t revents = LTTNG_POLL_GETEV(&events, i); + int pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + /* Inspect the relay cmd pipe for new connection */ + if (pollfd == live_relay_cmd_pipe[0]) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay live pipe error"); + goto error; + } else if (revents & LPOLLIN) { + DBG("Relay live viewer command received"); + ret = add_connection(live_relay_cmd_pipe[0], + &events, relay_connections_ht); + if (ret < 0) { + goto error; + } + } + } else if (revents) { + rcu_read_lock(); + lttng_ht_lookup(relay_connections_ht, + (void *)((unsigned long) pollfd), &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG2("Relay viewer sock %d not found", pollfd); + rcu_read_unlock(); + goto error; + } + relay_connection = caa_container_of(node, struct relay_command, + sock_n); + + if (revents & (LPOLLERR)) { + ERR("VIEWER POLL ERROR"); + cleanup_poll_connection(&events, pollfd); + del_connection(relay_connections_ht, &iter, + relay_connection, viewer_streams_ht); + } else if (revents & (LPOLLHUP | LPOLLRDHUP)) { + DBG("Viewer socket %d hung up", pollfd); + cleanup_poll_connection(&events, pollfd); + del_connection(relay_connections_ht, &iter, + relay_connection, viewer_streams_ht); + } else if (revents & LPOLLIN) { + ret = relay_connection->sock->ops->recvmsg( + relay_connection->sock, &recv_hdr, + sizeof(struct lttng_viewer_cmd), + 0); + /* connection closed */ + if (ret <= 0) { + cleanup_poll_connection(&events, pollfd); + del_connection( relay_connections_ht, &iter, + relay_connection, viewer_streams_ht); + DBG("Viewer control connection closed with %d", + pollfd); + } else { + if (relay_connection->session) { + DBG2("Relay viewer worker receiving data for " + "session: %" PRIu64, + relay_connection->session->id); + } + ret = process_control(&recv_hdr, relay_connection, + sessions_ht, viewer_streams_ht); + if (ret < 0) { + /* Clear the session on error. */ + cleanup_poll_connection(&events, pollfd); + del_connection(relay_connections_ht, &iter, + relay_connection, viewer_streams_ht); + DBG("Viewer connection closed with %d", pollfd); + } + } + } + rcu_read_unlock(); + } + } + } + +exit: +error: + lttng_poll_clean(&events); + + /* empty the hash table and free the memory */ + rcu_read_lock(); + cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) { + node = lttng_ht_iter_get_node_ulong(&iter); + if (!node) { + continue; + } + + relay_connection = caa_container_of(node, struct relay_command, + sock_n); + del_connection(relay_connections_ht, &iter, relay_connection, + viewer_streams_ht); + } + rcu_read_unlock(); +error_poll_create: + lttng_ht_destroy(relay_connections_ht); +relay_connections_ht_error: + /* Close relay cmd pipes */ + utils_close_pipe(live_relay_cmd_pipe); + if (err) { + DBG("Viewer worker thread exited with error"); + } + DBG("Viewer worker thread cleanup complete"); + stop_threads(); + rcu_unregister_thread(); + return NULL; +} + +/* + * Create the relay command pipe to wake thread_manage_apps. + * Closed in cleanup(). + */ +static int create_relay_cmd_pipe(void) +{ + int ret; + + ret = utils_create_pipe_cloexec(live_relay_cmd_pipe); + + return ret; +} + +void live_stop_threads() +{ + int ret; + void *status; + + stop_threads(); + + ret = pthread_join(live_listener_thread, &status); + if (ret != 0) { + PERROR("pthread_join live listener"); + goto error; /* join error, exit without cleanup */ + } + + ret = pthread_join(live_worker_thread, &status); + if (ret != 0) { + PERROR("pthread_join live worker"); + goto error; /* join error, exit without cleanup */ + } + + ret = pthread_join(live_dispatcher_thread, &status); + if (ret != 0) { + PERROR("pthread_join live dispatcher"); + goto error; /* join error, exit without cleanup */ + } + + cleanup(); + +error: + return; +} + +/* + * main + */ +int live_start_threads(struct lttng_uri *uri, + struct relay_local_data *relay_ctx) +{ + int ret = 0; + void *status; + int is_root; + + assert(uri); + live_uri = uri; + + /* Create thread quit pipe */ + if ((ret = init_thread_quit_pipe()) < 0) { + goto error; + } + + /* Check if daemon is UID = 0 */ + is_root = !getuid(); + + if (!is_root) { + if (live_uri->port < 1024) { + ERR("Need to be root to use ports < 1024"); + ret = -1; + goto exit; + } + } + + /* Setup the thread apps communication pipe. */ + if ((ret = create_relay_cmd_pipe()) < 0) { + goto exit; + } + + /* Init relay command queue. */ + cds_wfq_init(&viewer_cmd_queue.queue); + + /* Set up max poll set size */ + lttng_poll_set_max_size(); + + /* Setup the dispatcher thread */ + ret = pthread_create(&live_dispatcher_thread, NULL, + thread_dispatcher, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create viewer dispatcher"); + goto exit_dispatcher; + } + + /* Setup the worker thread */ + ret = pthread_create(&live_worker_thread, NULL, + thread_worker, relay_ctx); + if (ret != 0) { + PERROR("pthread_create viewer worker"); + goto exit_worker; + } + + /* Setup the listener thread */ + ret = pthread_create(&live_listener_thread, NULL, + thread_listener, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create viewer listener"); + goto exit_listener; + } + + ret = 0; + goto end; + +exit_listener: + ret = pthread_join(live_listener_thread, &status); + if (ret != 0) { + PERROR("pthread_join live listener"); + goto error; /* join error, exit without cleanup */ + } + +exit_worker: + ret = pthread_join(live_worker_thread, &status); + if (ret != 0) { + PERROR("pthread_join live worker"); + goto error; /* join error, exit without cleanup */ + } + +exit_dispatcher: + ret = pthread_join(live_dispatcher_thread, &status); + if (ret != 0) { + PERROR("pthread_join live dispatcher"); + goto error; /* join error, exit without cleanup */ + } + +exit: + cleanup(); + +end: +error: + return ret; +} diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h new file mode 100644 index 000000000..dd6bb6091 --- /dev/null +++ b/src/bin/lttng-relayd/live.h @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef LTTNG_RELAYD_LIVE_H +#define LTTNG_RELAYD_LIVE_H + +#include + +#include "lttng-relayd.h" + +int live_start_threads(struct lttng_uri *live_uri, + struct relay_local_data *relay_ctx); +void live_stop_threads(void); + +struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id, + struct lttng_ht *viewer_streams_ht); + +#endif /* LTTNG_RELAYD_LIVE_H */ diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index c60280e0f..638ecbbea 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -20,11 +20,15 @@ #define LTTNG_RELAYD_H #define _LGPL_SOURCE +#include #include #include + #include #include +#include "ctf-trace.h" + /* * Queue used to enqueue relay requests */ @@ -36,6 +40,8 @@ struct relay_cmd_queue { enum connection_type { RELAY_DATA, RELAY_CONTROL, + RELAY_VIEWER_COMMAND, + RELAY_VIEWER_NOTIFICATION, }; /* @@ -49,6 +55,13 @@ struct relay_session { */ uint64_t id; struct lttcomm_sock *sock; + char session_name[NAME_MAX]; + char hostname[HOST_NAME_MAX]; + uint32_t live_timer; + struct lttng_ht_node_ulong session_n; + struct rcu_head rcu_node; + uint32_t viewer_attached; + uint32_t stream_count; }; /* @@ -63,6 +76,8 @@ struct relay_stream { int fd; /* FD on which to write the index data. */ int index_fd; + /* FD on which to read the index data for the viewer. */ + int read_index_fd; char *path_name; char *channel_name; @@ -72,11 +87,57 @@ struct relay_stream { uint64_t tracefile_count; uint64_t tracefile_count_current; + uint64_t total_index_received; + struct relay_viewer_stream *viewer_stream; + uint64_t last_net_seq_num; + struct lttng_ht_node_str ctf_trace_node; + + /* + * To protect from concurrent read/update between the + * streaming-side and the viewer-side. + * This lock must be held, we reading/updating the + * ctf_trace pointer. + */ + pthread_mutex_t lock; + + struct ctf_trace *ctf_trace; + /* + * If the stream is inactive, this field is updated with the live beacon + * timestamp end, when it is active, this field == -1ULL. + */ + uint64_t beacon_ts_end; + /* Information telling us when to close the stream */ unsigned int close_flag:1; - uint64_t last_net_seq_num; /* Indicate if the stream was initialized for a data pending command. */ unsigned int data_pending_check_done:1; + unsigned int metadata_flag:1; +}; + +/* + * Shadow copy of the relay_stream structure for the viewer side. The only + * fields updated by the writer (streaming side) after allocation are : + * total_index_received and close_flag. Everything else is updated by the + * reader (viewer side). + */ +struct relay_viewer_stream { + uint64_t stream_handle; + uint64_t session_id; + int read_fd; + int index_read_fd; + char *path_name; + char *channel_name; + uint64_t last_sent_index; + uint64_t total_index_received; + uint64_t tracefile_size; + uint64_t tracefile_size_current; + uint64_t tracefile_count; + uint64_t tracefile_count_current; + struct lttng_ht_node_u64 stream_n; + struct rcu_head rcu_node; + struct ctf_trace *ctf_trace; + /* Information telling us if the stream is a metadata stream. */ + unsigned int metadata_flag:1; }; /* @@ -94,8 +155,18 @@ struct relay_command { /* protocol version to use for this session */ uint32_t major; uint32_t minor; + struct lttng_ht *ctf_traces_ht; /* indexed by path name */ +}; + +struct relay_local_data { + struct lttng_ht *sessions_ht; + struct lttng_ht *viewer_streams_ht; }; extern char *opt_output_path; +extern struct lttng_ht *relay_streams_ht; + +struct relay_stream *relay_stream_find_by_id(uint64_t stream_id); + #endif /* LTTNG_RELAYD_H */ diff --git a/src/bin/lttng-relayd/lttng-viewer.h b/src/bin/lttng-relayd/lttng-viewer.h new file mode 100644 index 000000000..f397de7e5 --- /dev/null +++ b/src/bin/lttng-relayd/lttng-viewer.h @@ -0,0 +1,184 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef LTTNG_VIEWER_H +#define LTTNG_VIEWER_H + +#include + +#define LTTNG_VIEWER_PATH_MAX 4096 +#define LTTNG_VIEWER_NAME_MAX 255 +#define LTTNG_VIEWER_HOST_NAME_MAX 64 + +/* Flags in reply to get_next_index and get_packet. */ +/* New metadata is required to read this packet. */ +#define LTTNG_VIEWER_FLAG_NEW_METADATA (1 << 0) +/* New stream got added to the trace */ +#define LTTNG_VIEWER_FLAG_NEW_STREAM (1 << 1) + +enum lttng_viewer_command { + VIEWER_CONNECT = 1, + VIEWER_LIST_SESSIONS = 2, + VIEWER_ATTACH_SESSION = 3, + VIEWER_GET_NEXT_INDEX = 4, + VIEWER_GET_PACKET = 5, + VIEWER_GET_METADATA = 6, +}; + +enum lttng_viewer_attach_return_code { + VIEWER_ATTACH_OK = 1, /* If the attach command succeeded. */ + VIEWER_ATTACH_ALREADY = 2, /* If a viewer is already attached. */ + VIEWER_ATTACH_UNK = 3, /* If the session ID is unknown. */ + VIEWER_ATTACH_NOT_LIVE = 4, /* If the session is not live. */ + VIEWER_ATTACH_SEEK_ERR = 5, /* Seek error. */ +}; + +enum lttng_viewer_next_index_return_code { + VIEWER_INDEX_OK = 1, /* Index is available. */ + VIEWER_INDEX_RETRY = 2, /* Index not yet available. */ + VIEWER_INDEX_HUP = 3, /* Index closed (trace destroyed). */ + VIEWER_INDEX_ERR = 4, /* Unknow error. */ + VIEWER_INDEX_INACTIVE = 5, /* Inactive stream beacon. */ +}; + +enum lttng_viewer_get_packet_return_code { + VIEWER_GET_PACKET_OK = 1, + VIEWER_GET_PACKET_RETRY = 2, + VIEWER_GET_PACKET_ERR = 3, +}; + +enum lttng_viewer_get_metadata_return_code { + VIEWER_METADATA_OK = 1, + VIEWER_NO_NEW_METADATA = 2, + VIEWER_METADATA_ERR = 3, +}; + +enum lttng_viewer_connection_type { + VIEWER_CLIENT_COMMAND = 1, + VIEWER_CLIENT_NOTIFICATION = 2, +}; + +enum lttng_viewer_seek { + VIEWER_SEEK_BEGINNING = 1, /* Receive the trace packets from the beginning. */ + VIEWER_SEEK_LAST = 2, /* Receive the trace packets from now. */ +}; + +struct lttng_viewer_session { + uint64_t id; + char hostname[LTTNG_VIEWER_HOST_NAME_MAX]; + char session_name[LTTNG_VIEWER_NAME_MAX]; + uint32_t live_timer; + uint32_t clients; +} __attribute__((__packed__)); + +struct lttng_viewer_stream { + uint64_t id; + uint64_t ctf_trace_id; + char path_name[LTTNG_VIEWER_PATH_MAX]; + char channel_name[LTTNG_VIEWER_NAME_MAX]; + int metadata_flag; +} __attribute__((__packed__)); + +struct lttng_viewer_cmd { + uint64_t data_size; /* data size following this header */ + uint32_t cmd; /* enum lttcomm_relayd_command */ + uint32_t cmd_version; /* command version */ +} __attribute__((__packed__)); + +/* + * CONNECT payload. + */ +struct lttng_viewer_connect { + uint32_t major; + uint32_t minor; + uint32_t type; /* enum lttng_viewer_connection_type */ + uint64_t viewer_session_id; /* session ID assigned by the relay for command connections */ +} __attribute__((__packed__)); + +/* + * VIEWER_LIST_SESSIONS payload. + */ +struct lttng_viewer_list_sessions { + uint32_t sessions_count; + char session_list[]; /* struct lttng_viewer_session */ +} __attribute__((__packed__)); + +/* + * VIEWER_ATTACH_SESSION payload. + */ +struct lttng_viewer_attach_session_request { + uint64_t session_id; + uint32_t seek; /* enum lttng_viewer_seek */ + uint64_t offset; /* unused for now */ +} __attribute__((__packed__)); + +struct lttng_viewer_attach_session_response { + uint32_t status; /* enum lttng_viewer_attach_return_code */ + uint32_t streams_count; + char stream_list[]; /* struct lttng_viewer_stream */ +} __attribute__((__packed__)); + +/* + * VIEWER_GET_NEXT_INDEX payload. + */ +struct lttng_viewer_get_next_index { + uint64_t stream_id; +} __attribute__ ((__packed__)); + +struct lttng_viewer_index { + uint32_t status; /* enum lttng_viewer_next_index_return_code */ + uint64_t offset; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + uint64_t stream_id; + uint32_t flags; /* LTTNG_VIEWER_FLAG_* */ +} __attribute__ ((__packed__)); + +/* + * VIEWER_GET_PACKET payload. + */ +struct lttng_viewer_get_packet { + uint64_t stream_id; + uint64_t offset; + uint32_t len; +} __attribute__((__packed__)); + +struct lttng_viewer_trace_packet { + uint32_t status; /* enum lttng_viewer_get_packet_return_code */ + uint32_t len; + uint32_t flags; /* LTTNG_VIEWER_FLAG_* */ + char data[]; +} __attribute__((__packed__)); + +/* + * VIEWER_GET_METADATA payload. + */ +struct lttng_viewer_get_metadata { + uint64_t stream_id; +} __attribute__((__packed__)); + +struct lttng_viewer_metadata_packet { + uint32_t status; /* enum lttng_viewer_get_metadata_return_code */ + uint64_t len; + char data[]; +} __attribute__((__packed__)); + +#endif /* LTTNG_VIEWER_H */ diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index ca37b8bc4..43e6f318a 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -52,15 +52,18 @@ #include #include "cmd.h" +#include "ctf-trace.h" #include "index.h" #include "utils.h" #include "lttng-relayd.h" +#include "live.h" /* command line options */ char *opt_output_path; static int opt_daemon; static struct lttng_uri *control_uri; static struct lttng_uri *data_uri; +static struct lttng_uri *live_uri; const char *progname; @@ -105,6 +108,9 @@ static struct lttng_ht *indexes_ht; static uid_t relayd_uid; static gid_t relayd_gid; +/* Global relay stream hash table. */ +struct lttng_ht *relay_streams_ht; + /* * usage function on stderr */ @@ -228,6 +234,21 @@ int parse_args(int argc, char **argv) goto exit; } } + if (live_uri == NULL) { + ret = asprintf(&default_address, "tcp://0.0.0.0:%d", + DEFAULT_NETWORK_VIEWER_PORT); + if (ret < 0) { + PERROR("asprintf default viewer control address"); + goto exit; + } + + ret = uri_parse(default_address, &live_uri); + free(default_address); + if (ret < 0) { + ERR("Invalid viewer control URI specified"); + goto exit; + } + } exit: return ret; @@ -697,15 +718,13 @@ error: * Get stream from stream id. * Need to be called with RCU read-side lock held. */ -static -struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id, - struct lttng_ht *streams_ht) +struct relay_stream *relay_stream_find_by_id(uint64_t stream_id) { struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct relay_stream *ret; - lttng_ht_lookup(streams_ht, + lttng_ht_lookup(relay_streams_ht, (void *)((unsigned long) stream_id), &iter); node = lttng_ht_iter_get_node_ulong(&iter); @@ -726,17 +745,29 @@ void deferred_free_stream(struct rcu_head *head) { struct relay_stream *stream = caa_container_of(head, struct relay_stream, rcu_node); + + ctf_trace_try_destroy(stream->ctf_trace); + free(stream->path_name); free(stream->channel_name); free(stream); } +static +void deferred_free_session(struct rcu_head *head) +{ + struct relay_session *session = + caa_container_of(head, struct relay_session, rcu_node); + free(session); +} + /* * relay_delete_session: Free all memory associated with a session and * close all the FDs */ static -void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht) +void relay_delete_session(struct relay_command *cmd, + struct lttng_ht *sessions_ht) { struct lttng_ht_iter iter; struct lttng_ht_node_ulong *node; @@ -750,7 +781,7 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht DBG("Relay deleting session %" PRIu64, cmd->session->id); rcu_read_lock(); - cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) { + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { node = lttng_ht_iter_get_node_ulong(&iter); if (node) { stream = caa_container_of(node, @@ -760,7 +791,7 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht if (ret < 0) { PERROR("close stream fd on delete session"); } - ret = lttng_ht_del(streams_ht, &iter); + ret = lttng_ht_del(relay_streams_ht, &iter); assert(!ret); call_rcu(&stream->rcu_node, deferred_free_stream); @@ -770,9 +801,12 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht indexes_ht); } } + iter.iter.node = &cmd->session->session_n.node; + ret = lttng_ht_del(sessions_ht, &iter); + assert(!ret); + call_rcu(&cmd->session->rcu_node, + deferred_free_session); rcu_read_unlock(); - - free(cmd->session); } /* @@ -804,7 +838,8 @@ static void copy_index_control_data(struct relay_index *index, */ static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd) + struct relay_command *cmd, + struct lttng_ht *sessions_ht) { int ret = 0, send_ret; struct relay_session *session; @@ -828,6 +863,18 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, reply.session_id = htobe64(session->id); + switch (cmd->minor) { + case 4: /* LTTng sessiond 2.4 */ + default: + ret = cmd_create_session_2_4(cmd, session); + break; + } + + lttng_ht_node_init_ulong(&session->session_n, + (unsigned long) session->id); + lttng_ht_add_unique_ulong(sessions_ht, + &session->session_n); + DBG("Created session %" PRIu64, session->id); error: @@ -851,7 +898,7 @@ error: */ static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd, struct lttng_ht *sessions_ht) { struct relay_session *session = cmd->session; struct relay_stream *stream = NULL; @@ -889,6 +936,9 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->prev_seq = -1ULL; stream->session = session; stream->index_fd = -1; + stream->read_index_fd = -1; + stream->ctf_trace = NULL; + pthread_mutex_init(&stream->lock, NULL); ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); if (ret < 0) { @@ -913,11 +963,35 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); } + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { + stream->metadata_flag = 1; + /* + * When we receive a new metadata stream, we create a new + * ctf_trace and we assign this ctf_trace to all streams with + * the same path. + * + * If later on we receive a new stream for the same ctf_trace, + * we copy the information from the first hit in the HT to the + * new stream. + */ + stream->ctf_trace = ctf_trace_create(); + if (!stream->ctf_trace) { + ret = -1; + goto end; + } + stream->ctf_trace->refcount++; + stream->ctf_trace->metadata_stream = stream; + } + ctf_trace_assign(cmd->ctf_traces_ht, stream); + lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); - lttng_ht_add_unique_ulong(streams_ht, + lttng_ht_add_unique_ulong(relay_streams_ht, &stream->stream_n); + lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name); + lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node); + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, stream->stream_handle); @@ -955,7 +1029,7 @@ err_free_stream: */ static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd, struct lttng_ht *viewer_streams_ht) { struct relay_session *session = cmd->session; struct lttcomm_relayd_close_stream stream_info; @@ -986,8 +1060,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, } rcu_read_lock(); - stream = relay_stream_from_stream_id(be64toh(stream_info.stream_id), - streams_ht); + stream = relay_stream_find_by_id(be64toh(stream_info.stream_id)); if (!stream) { ret = -1; goto end_unlock; @@ -998,6 +1071,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, if (close_stream_check(stream)) { int delret; + struct relay_viewer_stream *vstream; delret = close(stream->fd); if (delret < 0) { @@ -1010,8 +1084,24 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, PERROR("close stream index_fd"); } } + + vstream = live_find_viewer_stream_by_id(stream->stream_handle, + viewer_streams_ht); + if (vstream) { + /* + * Set the last good value into the viewer stream. This is done + * right before the stream gets deleted from the hash table. The + * lookup failure on the live thread side of a stream indicates + * that the viewer stream index received value should be used. + */ + vstream->total_index_received = stream->total_index_received; + } + iter.iter.node = &stream->stream_n.node; - delret = lttng_ht_del(streams_ht, &iter); + delret = lttng_ht_del(relay_streams_ht, &iter); + assert(!delret); + iter.iter.node = &stream->ctf_trace_node.node; + delret = lttng_ht_del(cmd->ctf_traces_ht, &iter); assert(!delret); call_rcu(&stream->rcu_node, deferred_free_stream); @@ -1118,7 +1208,7 @@ end: */ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd) { int ret = htobe32(LTTNG_OK); struct relay_session *session = cmd->session; @@ -1170,8 +1260,8 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; rcu_read_lock(); - metadata_stream = relay_stream_from_stream_id( - be64toh(metadata_struct->stream_id), streams_ht); + metadata_stream = relay_stream_find_by_id( + be64toh(metadata_struct->stream_id)); if (!metadata_stream) { ret = -1; goto end_unlock; @@ -1192,6 +1282,8 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, if (ret < 0) { goto end_unlock; } + metadata_stream->ctf_trace->metadata_received += + payload_size + be32toh(metadata_struct->padding_size); DBG2("Relay metadata written"); @@ -1206,7 +1298,7 @@ end: */ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret; struct lttcomm_relayd_version reply, msg; @@ -1235,7 +1327,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - relay_delete_session(cmd, streams_ht); + relay_delete_session(cmd, sessions_ht); ret = 0; goto end; } @@ -1268,7 +1360,7 @@ end: */ static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd) { struct relay_session *session = cmd->session; struct lttcomm_relayd_data_pending msg; @@ -1302,7 +1394,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, last_net_seq_num = be64toh(msg.last_net_seq_num); rcu_read_lock(); - stream = relay_stream_from_stream_id(stream_id, streams_ht); + stream = relay_stream_find_by_id(stream_id); if (stream == NULL) { ret = -1; goto end_unlock; @@ -1346,7 +1438,7 @@ end_no_session: */ static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd) { int ret; uint64_t stream_id; @@ -1379,7 +1471,8 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, stream_id = be64toh(msg.stream_id); rcu_read_lock(); - cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + stream_n.node) { if (stream->stream_handle == stream_id) { stream->data_pending_check_done = 1; DBG("Relay quiescent control pending flag set to %" PRIu64, @@ -1408,7 +1501,7 @@ end_no_session: */ static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd) { int ret; struct lttng_ht_iter iter; @@ -1419,7 +1512,6 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, assert(recv_hdr); assert(cmd); - assert(streams_ht); DBG("Init streams for data pending"); @@ -1450,7 +1542,8 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, * streams to find the one associated with the right session_id. */ rcu_read_lock(); - cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + stream_n.node) { if (stream->session->id == session_id) { stream->data_pending_check_done = 0; DBG("Set begin data pending flag to stream %" PRIu64, @@ -1482,7 +1575,7 @@ end_no_session: */ static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd) { int ret; struct lttng_ht_iter iter; @@ -1494,7 +1587,6 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, assert(recv_hdr); assert(cmd); - assert(streams_ht); DBG("End data pending command"); @@ -1521,7 +1613,8 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, /* Iterate over all streams to see if the begin data pending flag is set. */ rcu_read_lock(); - cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) { + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + stream_n.node) { if (stream->session->id == session_id && !stream->data_pending_check_done) { is_data_inflight = 1; @@ -1551,8 +1644,7 @@ end_no_session: */ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht, - struct lttng_ht *indexes_ht) + struct relay_command *cmd, struct lttng_ht *indexes_ht) { int ret, send_ret, index_created = 0; struct relay_session *session = cmd->session; @@ -1563,7 +1655,6 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, uint64_t net_seq_num; assert(cmd); - assert(streams_ht); assert(indexes_ht); DBG("Relay receiving index"); @@ -1590,13 +1681,28 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, net_seq_num = be64toh(index_info.net_seq_num); rcu_read_lock(); - stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id), - streams_ht); + stream = relay_stream_find_by_id(be64toh(index_info.relay_stream_id)); if (!stream) { ret = -1; goto end_rcu_unlock; } + /* Live beacon handling */ + if (index_info.packet_size == 0) { + DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); + + /* + * Only flag a stream inactive when it has already received data. + */ + if (stream->total_index_received > 0) { + stream->beacon_ts_end = be64toh(index_info.timestamp_end); + } + ret = 0; + goto end_rcu_unlock; + } else { + stream->beacon_ts_end = -1ULL; + } + index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht); if (!index) { /* A successful creation will add the object to the HT. */ @@ -1642,6 +1748,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, if (ret < 0) { goto end_rcu_unlock; } + stream->total_index_received++; } end_rcu_unlock: @@ -1663,49 +1770,47 @@ end_no_session: } /* - * relay_process_control: Process the commands received on the control socket + * Process the commands received on the control socket */ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht, - struct lttng_ht *index_streams_ht, - struct lttng_ht *indexes_ht) + struct relay_command *cmd, struct relay_local_data *ctx) { int ret = 0; switch (be32toh(recv_hdr->cmd)) { case RELAYD_CREATE_SESSION: - ret = relay_create_session(recv_hdr, cmd); + ret = relay_create_session(recv_hdr, cmd, ctx->sessions_ht); break; case RELAYD_ADD_STREAM: - ret = relay_add_stream(recv_hdr, cmd, streams_ht); + ret = relay_add_stream(recv_hdr, cmd, ctx->sessions_ht); break; case RELAYD_START_DATA: ret = relay_start(recv_hdr, cmd); break; case RELAYD_SEND_METADATA: - ret = relay_recv_metadata(recv_hdr, cmd, streams_ht); + ret = relay_recv_metadata(recv_hdr, cmd); break; case RELAYD_VERSION: - ret = relay_send_version(recv_hdr, cmd, streams_ht); + ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht); break; case RELAYD_CLOSE_STREAM: - ret = relay_close_stream(recv_hdr, cmd, streams_ht); + ret = relay_close_stream(recv_hdr, cmd, ctx->viewer_streams_ht); break; case RELAYD_DATA_PENDING: - ret = relay_data_pending(recv_hdr, cmd, streams_ht); + ret = relay_data_pending(recv_hdr, cmd); break; case RELAYD_QUIESCENT_CONTROL: - ret = relay_quiescent_control(recv_hdr, cmd, streams_ht); + ret = relay_quiescent_control(recv_hdr, cmd); break; case RELAYD_BEGIN_DATA_PENDING: - ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht); + ret = relay_begin_data_pending(recv_hdr, cmd); break; case RELAYD_END_DATA_PENDING: - ret = relay_end_data_pending(recv_hdr, cmd, streams_ht); + ret = relay_end_data_pending(recv_hdr, cmd); break; case RELAYD_SEND_INDEX: - ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht); + ret = relay_recv_index(recv_hdr, cmd, indexes_ht); break; case RELAYD_UPDATE_SYNC_INFO: default: @@ -1723,7 +1828,7 @@ end: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, +int relay_process_data(struct relay_command *cmd, struct lttng_ht *indexes_ht) { int ret = 0, rotate_index = 0, index_created = 0; @@ -1750,7 +1855,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, stream_id = be64toh(data_hdr.stream_id); rcu_read_lock(); - stream = relay_stream_from_stream_id(stream_id, streams_ht); + stream = relay_stream_find_by_id(stream_id); if (!stream) { ret = -1; goto end_rcu_unlock; @@ -1871,6 +1976,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, if (ret < 0) { goto end_rcu_unlock; } + stream->total_index_received++; } do { @@ -1908,7 +2014,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, PERROR("close stream index_fd"); } iter.iter.node = &stream->stream_n.node; - ret = lttng_ht_del(streams_ht, &iter); + ret = lttng_ht_del(relay_streams_ht, &iter); assert(!ret); call_rcu(&stream->rcu_node, deferred_free_stream); @@ -1954,6 +2060,11 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, goto error_read; } + relay_connection->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING); + if (!relay_connection->ctf_traces_ht) { + goto error_read; + } + lttng_ht_node_init_ulong(&relay_connection->sock_n, (unsigned long) relay_connection->sock->fd); rcu_read_lock(); @@ -1976,21 +2087,22 @@ void deferred_free_connection(struct rcu_head *head) struct relay_command *relay_connection = caa_container_of(head, struct relay_command, rcu_node); + lttng_ht_destroy(relay_connection->ctf_traces_ht); lttcomm_destroy_sock(relay_connection->sock); free(relay_connection); } static void relay_del_connection(struct lttng_ht *relay_connections_ht, - struct lttng_ht *streams_ht, struct lttng_ht_iter *iter, - struct relay_command *relay_connection) + struct lttng_ht_iter *iter, struct relay_command *relay_connection, + struct lttng_ht *sessions_ht) { int ret; ret = lttng_ht_del(relay_connections_ht, iter); assert(!ret); if (relay_connection->type == RELAY_CONTROL) { - relay_delete_session(relay_connection, streams_ht); + relay_delete_session(relay_connection, sessions_ht); } call_rcu(&relay_connection->rcu_node, @@ -2010,9 +2122,9 @@ void *relay_thread_worker(void *data) struct lttng_ht *relay_connections_ht; struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; - struct lttng_ht *streams_ht; - struct lttng_ht *index_streams_ht; struct lttcomm_relayd_hdr recv_hdr; + struct relay_local_data *relay_ctx = (struct relay_local_data *) data; + struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; DBG("[thread] Relay worker started"); @@ -2024,12 +2136,6 @@ void *relay_thread_worker(void *data) goto relay_connections_ht_error; } - /* tables of streams indexed by stream ID */ - streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - if (!streams_ht) { - goto streams_ht_error; - } - /* Tables of received indexes indexed by index handle and net_seq_num. */ indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64); if (!indexes_ht) { @@ -2113,8 +2219,7 @@ restart: ERR("POLL ERROR"); relay_cleanup_poll_connection(&events, pollfd); relay_del_connection(relay_connections_ht, - streams_ht, &iter, - relay_connection); + &iter, relay_connection, sessions_ht); if (last_seen_data_fd == pollfd) { last_seen_data_fd = last_notdel_data_fd; } @@ -2122,8 +2227,7 @@ restart: DBG("Socket %d hung up", pollfd); relay_cleanup_poll_connection(&events, pollfd); relay_del_connection(relay_connections_ht, - streams_ht, &iter, - relay_connection); + &iter, relay_connection, sessions_ht); if (last_seen_data_fd == pollfd) { last_seen_data_fd = last_notdel_data_fd; } @@ -2137,8 +2241,7 @@ restart: if (ret <= 0) { relay_cleanup_poll_connection(&events, pollfd); relay_del_connection(relay_connections_ht, - streams_ht, &iter, - relay_connection); + &iter, relay_connection, sessions_ht); DBG("Control connection closed with %d", pollfd); } else { if (relay_connection->session) { @@ -2146,16 +2249,12 @@ restart: relay_connection->session->id); } ret = relay_process_control(&recv_hdr, - relay_connection, - streams_ht, - index_streams_ht, - indexes_ht); + relay_connection, relay_ctx); if (ret < 0) { /* Clear the session on error. */ relay_cleanup_poll_connection(&events, pollfd); relay_del_connection(relay_connections_ht, - streams_ht, &iter, - relay_connection); + &iter, relay_connection, sessions_ht); DBG("Connection closed with %d", pollfd); } seen_control = 1; @@ -2221,14 +2320,12 @@ restart: continue; } - ret = relay_process_data(relay_connection, streams_ht, - indexes_ht); + ret = relay_process_data(relay_connection, indexes_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); relay_del_connection(relay_connections_ht, - streams_ht, &iter, - relay_connection); + &iter, relay_connection, sessions_ht); DBG("Data connection closed with %d", pollfd); /* * Every goto restart call sets the last seen fd where @@ -2260,16 +2357,13 @@ error: relay_connection = caa_container_of(node, struct relay_command, sock_n); relay_del_connection(relay_connections_ht, - streams_ht, &iter, - relay_connection); + &iter, relay_connection, sessions_ht); } } rcu_read_unlock(); error_poll_create: lttng_ht_destroy(indexes_ht); indexes_ht_error: - lttng_ht_destroy(streams_ht); -streams_ht_error: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: /* Close relay cmd pipes */ @@ -2304,6 +2398,7 @@ int main(int argc, char **argv) { int ret = 0; void *status; + struct relay_local_data *relay_ctx; /* Create thread quit pipe */ if ((ret = init_thread_quit_pipe()) < 0) { @@ -2370,6 +2465,30 @@ int main(int argc, char **argv) /* Initialize communication library */ lttcomm_init(); + relay_ctx = zmalloc(sizeof(struct relay_local_data)); + if (!relay_ctx) { + PERROR("relay_ctx"); + goto exit; + } + + /* tables of sessions indexed by session ID */ + relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!relay_ctx->sessions_ht) { + goto exit_relay_ctx_sessions; + } + + /* tables of streams indexed by stream ID */ + relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!relay_streams_ht) { + goto exit_relay_ctx_streams; + } + + /* tables of streams indexed by stream ID */ + relay_ctx->viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!relay_ctx->viewer_streams_ht) { + goto exit_relay_ctx_viewer_streams; + } + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL); @@ -2380,7 +2499,7 @@ int main(int argc, char **argv) /* Setup the worker thread */ ret = pthread_create(&worker_thread, NULL, - relay_thread_worker, (void *) NULL); + relay_thread_worker, (void *) relay_ctx); if (ret != 0) { PERROR("pthread_create worker"); goto exit_worker; @@ -2394,6 +2513,11 @@ int main(int argc, char **argv) goto exit_listener; } + ret = live_start_threads(live_uri, relay_ctx); + if (ret != 0) { + ERR("Starting live viewer threads"); + } + exit_listener: ret = pthread_join(listener_thread, &status); if (ret != 0) { @@ -2414,8 +2538,19 @@ exit_dispatcher: PERROR("pthread_join"); goto error; /* join error, exit without cleanup */ } + lttng_ht_destroy(relay_ctx->viewer_streams_ht); + +exit_relay_ctx_viewer_streams: + lttng_ht_destroy(relay_streams_ht); + +exit_relay_ctx_streams: + lttng_ht_destroy(relay_ctx->sessions_ht); + +exit_relay_ctx_sessions: + free(relay_ctx); exit: + live_stop_threads(); cleanup(); if (!ret) { exit(EXIT_SUCCESS); diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index bed8f830d..85f396f1c 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -547,7 +547,8 @@ error: */ static int send_consumer_relayd_socket(int domain, unsigned int session_id, struct lttng_uri *relayd_uri, struct consumer_output *consumer, - struct consumer_socket *consumer_sock) + struct consumer_socket *consumer_sock, + char *session_name, char *hostname, int session_live_timer) { int ret; struct lttcomm_relayd_sock *rsock = NULL; @@ -573,7 +574,8 @@ static int send_consumer_relayd_socket(int domain, unsigned int session_id, /* Send relayd socket to consumer. */ ret = consumer_send_relayd_socket(consumer_sock, rsock, consumer, - relayd_uri->stype, session_id); + relayd_uri->stype, session_id, + session_name, hostname, session_live_timer); if (ret < 0) { ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL; goto close_sock; @@ -615,7 +617,8 @@ error: * session. */ static int send_consumer_relayd_sockets(int domain, unsigned int session_id, - struct consumer_output *consumer, struct consumer_socket *sock) + struct consumer_output *consumer, struct consumer_socket *sock, + char *session_name, char *hostname, int session_live_timer) { int ret = LTTNG_OK; @@ -625,7 +628,8 @@ static int send_consumer_relayd_sockets(int domain, unsigned int session_id, /* Sending control relayd socket. */ if (!sock->control_sock_sent) { ret = send_consumer_relayd_socket(domain, session_id, - &consumer->dst.net.control, consumer, sock); + &consumer->dst.net.control, consumer, sock, + session_name, hostname, session_live_timer); if (ret != LTTNG_OK) { goto error; } @@ -634,7 +638,8 @@ static int send_consumer_relayd_sockets(int domain, unsigned int session_id, /* Sending data relayd socket. */ if (!sock->data_sock_sent) { ret = send_consumer_relayd_socket(domain, session_id, - &consumer->dst.net.data, consumer, sock); + &consumer->dst.net.data, consumer, sock, + session_name, hostname, session_live_timer); if (ret != LTTNG_OK) { goto error; } @@ -673,7 +678,9 @@ int cmd_setup_relayd(struct ltt_session *session) socket, node.node) { pthread_mutex_lock(socket->lock); ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_UST, session->id, - usess->consumer, socket); + usess->consumer, socket, + session->name, session->hostname, + session->live_timer); pthread_mutex_unlock(socket->lock); if (ret != LTTNG_OK) { goto error; @@ -689,7 +696,9 @@ int cmd_setup_relayd(struct ltt_session *session) socket, node.node) { pthread_mutex_lock(socket->lock); ret = send_consumer_relayd_sockets(LTTNG_DOMAIN_KERNEL, session->id, - ksess->consumer, socket); + ksess->consumer, socket, + session->name, session->hostname, + session->live_timer); pthread_mutex_unlock(socket->lock); if (ret != LTTNG_OK) { goto error; @@ -2539,7 +2548,9 @@ static int set_relayd_for_snapshot(struct consumer_output *consumer, cds_lfht_for_each_entry(snap_output->consumer->socks->ht, &iter.iter, socket, node.node) { ret = send_consumer_relayd_sockets(0, session->id, - snap_output->consumer, socket); + snap_output->consumer, socket, + session->name, session->hostname, + session->live_timer); if (ret != LTTNG_OK) { rcu_read_unlock(); goto error; diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index befc5d6dc..f0870e6b0 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -28,6 +28,7 @@ #include #include #include +#include #include "consumer.h" #include "health.h" @@ -939,7 +940,8 @@ error: */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, - enum lttng_stream_type type, uint64_t session_id) + enum lttng_stream_type type, uint64_t session_id, + char *session_name, char *hostname, int session_live_timer) { int ret; struct lttcomm_consumer_msg msg; @@ -955,6 +957,17 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, goto error; } + if (type == LTTNG_STREAM_CONTROL) { + ret = relayd_create_session(rsock, + &msg.u.relayd_sock.relayd_session_id, + session_name, hostname, session_live_timer); + if (ret < 0) { + /* Close the control socket. */ + (void) relayd_close(rsock); + goto error; + } + } + msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; /* * Assign network consumer output index using the temporary consumer since diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 5b4372221..4d6838f6a 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -202,7 +202,8 @@ int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg); int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, - enum lttng_stream_type type, uint64_t session_id); + enum lttng_stream_type type, uint64_t session_id, + char *session_name, char *hostname, int session_live_timer); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); int consumer_recv_status_reply(struct consumer_socket *sock); diff --git a/src/bin/lttng-sessiond/session.c b/src/bin/lttng-sessiond/session.c index a777b0414..e4c747f7f 100644 --- a/src/bin/lttng-sessiond/session.c +++ b/src/bin/lttng-sessiond/session.c @@ -194,6 +194,11 @@ int session_create(char *name, uid_t uid, gid_t gid) goto error; } + ret = gethostname(new_session->hostname, sizeof(new_session->hostname)); + if (ret && errno == ENAMETOOLONG) { + new_session->hostname[HOST_NAME_MAX - 1] = '\0'; + } + /* Init kernel session */ new_session->kernel_session = NULL; new_session->ust_session = NULL; diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 2bd2fd0ba..de98bde51 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -59,6 +59,8 @@ struct ltt_session_list { */ struct ltt_session { char name[NAME_MAX]; + /* FIXME : size */ + char hostname[PATH_MAX]; /* local hostname, FIXME : could be useful to have that user defined too */ struct ltt_kernel_session *kernel_session; struct ltt_ust_session *ust_session; /* diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index 37c9861b9..e2be05e7e 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -21,7 +21,11 @@ #include #include +#include #include +#include +#include +#include #include "consumer-timer.h" #include "ust-consumer/ust-consumer.h" @@ -46,11 +50,15 @@ static void setmask(sigset_t *mask) } ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH); if (ret) { - PERROR("sigaddset"); + PERROR("sigaddset switch"); } ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN); if (ret) { - PERROR("sigaddset"); + PERROR("sigaddset teardown"); + } + ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE); + if (ret) { + PERROR("sigaddset live"); } } @@ -104,6 +112,165 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, } } +static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts) +{ + int ret; + struct lttng_packet_index index; + + memset(&index, 0, sizeof(index)); + index.timestamp_end = htobe64(ts); + ret = consumer_stream_write_index(stream, &index); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +static int check_kernel_stream(struct lttng_consumer_stream *stream) +{ + uint64_t ts; + int ret; + + /* + * While holding the stream mutex, try to take a snapshot, if it + * succeeds, it means that data is ready to be sent, just let the data + * thread handle that. Otherwise, if the snapshot returns EAGAIN, it + * means that there is no data to read after the flush, so we can + * safely send the empty index. + */ + pthread_mutex_lock(&stream->lock); + ret = kernctl_get_current_timestamp(stream->wait_fd, &ts); + if (ret < 0) { + ERR("Failed to get the current timestamp"); + goto error_unlock; + } + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto error_unlock; + } + ret = kernctl_snapshot(stream->wait_fd); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Taking kernel snapshot"); + ret = -1; + goto error_unlock; + } + DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); + ret = send_empty_index(stream, ts); + if (ret < 0) { + goto error_unlock; + } + } + ret = 0; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + return ret; +} + +static int check_ust_stream(struct lttng_consumer_stream *stream) +{ + uint64_t ts; + int ret; + + assert(stream); + assert(stream->ustream); + /* + * While holding the stream mutex, try to take a snapshot, if it + * succeeds, it means that data is ready to be sent, just let the data + * thread handle that. Otherwise, if the snapshot returns EAGAIN, it + * means that there is no data to read after the flush, so we can + * safely send the empty index. + */ + pthread_mutex_lock(&stream->lock); + ret = ustctl_get_current_timestamp(stream->ustream, &ts); + if (ret < 0) { + ERR("Failed to get the current timestamp"); + goto error_unlock; + } + ustctl_flush_buffer(stream->ustream, 1); + ret = ustctl_snapshot(stream->ustream); + if (ret < 0) { + if (errno != EAGAIN) { + ERR("Taking UST snapshot"); + ret = -1; + goto error_unlock; + } + DBG("Stream %" PRIu64 " empty, sending beacon", stream->key); + ret = send_empty_index(stream, ts); + if (ret < 0) { + goto error_unlock; + } + } + ret = 0; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + return ret; +} + +/* + * Execute action on a live timer + */ +static void live_timer(struct lttng_consumer_local_data *ctx, + int sig, siginfo_t *si, void *uc) +{ + int ret; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + struct lttng_ht *ht; + struct lttng_ht_iter iter; + + channel = si->si_value.sival_ptr; + assert(channel); + + if (channel->switch_timer_error) { + goto error; + } + ht = consumer_data.stream_per_chan_id_ht; + + DBG("Live timer for channel %" PRIu64, channel->key); + + rcu_read_lock(); + switch (ctx->type) { + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + ret = check_ust_stream(stream); + if (ret < 0) { + goto error_unlock; + } + } + break; + case LTTNG_CONSUMER_KERNEL: + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, &iter.iter, + stream, node_channel_id.node) { + ret = check_kernel_stream(stream); + if (ret < 0) { + goto error_unlock; + } + } + break; + case LTTNG_CONSUMER_UNKNOWN: + assert(0); + break; + } + +error_unlock: + rcu_read_unlock(); + +error: + return; +} + static void consumer_timer_signal_thread_qs(unsigned int signr) { @@ -212,6 +379,63 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) channel->switch_timer_enabled = 0; } +/* + * Set the timer for the live mode. + */ +void consumer_timer_live_start(struct lttng_consumer_channel *channel, + int live_timer_interval) +{ + int ret; + struct sigevent sev; + struct itimerspec its; + + assert(channel); + assert(channel->key); + + if (live_timer_interval == 0) { + return; + } + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE; + sev.sigev_value.sival_ptr = channel; + ret = timer_create(CLOCKID, &sev, &channel->live_timer); + if (ret == -1) { + PERROR("timer_create"); + } + channel->live_timer_enabled = 1; + + its.it_value.tv_sec = live_timer_interval / 1000000; + its.it_value.tv_nsec = live_timer_interval % 1000000; + its.it_interval.tv_sec = its.it_value.tv_sec; + its.it_interval.tv_nsec = its.it_value.tv_nsec; + + ret = timer_settime(channel->live_timer, 0, &its, NULL); + if (ret == -1) { + PERROR("timer_settime"); + } +} + +/* + * Stop and delete timer. + */ +void consumer_timer_live_stop(struct lttng_consumer_channel *channel) +{ + int ret; + + assert(channel); + + ret = timer_delete(channel->live_timer); + if (ret == -1) { + PERROR("timer_delete"); + } + + consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE); + + channel->live_timer = 0; + channel->live_timer_enabled = 0; +} + /* * Block the RT signals for the entire process. It must be called from the * consumer main before creating the threads @@ -231,11 +455,10 @@ void consumer_signal_init(void) } /* - * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH and - * LTTNG_CONSUMER_SIG_TEARDOWN that are emitted by the periodic timer to check - * if new metadata is available. + * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH, + * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE. */ -void *consumer_timer_metadata_thread(void *data) +void *consumer_timer_thread(void *data) { int signr; sigset_t mask; @@ -260,6 +483,8 @@ void *consumer_timer_metadata_thread(void *data) CMM_STORE_SHARED(timer_signal.qs_done, 1); cmm_smp_mb(); DBG("Signal timer metadata thread teardown"); + } else if (signr == LTTNG_CONSUMER_SIG_LIVE) { + live_timer(ctx, info.si_signo, &info, NULL); } else { ERR("Unexpected signal %d\n", info.si_signo); } diff --git a/src/common/consumer-timer.h b/src/common/consumer-timer.h index 04743abf8..f3fac5d25 100644 --- a/src/common/consumer-timer.h +++ b/src/common/consumer-timer.h @@ -26,6 +26,7 @@ #define LTTNG_CONSUMER_SIG_SWITCH SIGRTMIN + 10 #define LTTNG_CONSUMER_SIG_TEARDOWN SIGRTMIN + 11 +#define LTTNG_CONSUMER_SIG_LIVE SIGRTMIN + 12 #define CLOCKID CLOCK_MONOTONIC @@ -45,7 +46,10 @@ struct timer_signal_data { void consumer_timer_switch_start(struct lttng_consumer_channel *channel, unsigned int switch_timer_interval); void consumer_timer_switch_stop(struct lttng_consumer_channel *channel); -void *consumer_timer_metadata_thread(void *data); +void consumer_timer_live_start(struct lttng_consumer_channel *channel, + int live_timer_interval); +void consumer_timer_live_stop(struct lttng_consumer_channel *channel); +void *consumer_timer_thread(void *data); void consumer_signal_init(void); #endif /* CONSUMER_TIMER_H */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 191367cd0..b8695698d 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -40,6 +40,7 @@ #include #include #include +#include #include "consumer.h" #include "consumer-stream.h" @@ -305,6 +306,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) consumer_stream_destroy(stream, NULL); } + if (channel->live_timer_enabled == 1) { + consumer_timer_live_stop(channel); + } + switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: break; @@ -3112,7 +3117,8 @@ void lttng_consumer_init(void) int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, - struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id) + struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id, + uint64_t relayd_session_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; @@ -3212,29 +3218,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, relayd->control_sock.major = relayd_sock->major; relayd->control_sock.minor = relayd_sock->minor; - /* - * Create a session on the relayd and store the returned id. Lock the - * control socket mutex if the relayd was NOT created before. - */ - if (!relayd_created) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - } - ret = relayd_create_session(&relayd->control_sock, - &relayd->relayd_session_id); - if (!relayd_created) { - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - } - if (ret < 0) { - /* - * Close all sockets of a relayd object. It will be freed if it was - * created at the error code path or else it will be garbage - * collect. - */ - (void) relayd_close(&relayd->control_sock); - (void) relayd_close(&relayd->data_sock); - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - goto error; - } + relayd->relayd_session_id = relayd_session_id; break; case LTTNG_STREAM_DATA: diff --git a/src/common/consumer.h b/src/common/consumer.h index 6eb5b61bb..2bf572303 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -156,11 +156,16 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; - /* For metadata periodical flush */ + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; int switch_timer_error; + /* For the live mode */ + int live_timer_enabled; + timer_t live_timer; + int live_timer_error; + /* On-disk circular buffer */ uint64_t tracefile_size; uint64_t tracefile_count; @@ -622,7 +627,7 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, - uint64_t sessiond_id); + uint64_t sessiond_id, uint64_t relayd_session_id); void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); diff --git a/src/common/defaults.h b/src/common/defaults.h index b8db779dc..616f3cd97 100644 --- a/src/common/defaults.h +++ b/src/common/defaults.h @@ -182,6 +182,7 @@ /* Default network ports for trace streaming support */ #define DEFAULT_NETWORK_CONTROL_PORT 5342 #define DEFAULT_NETWORK_DATA_PORT 5343 +#define DEFAULT_NETWORK_VIEWER_PORT 5344 /* * If a thread stalls for this amount of time, it will be considered bogus (bad diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 315af2eb8..4618ccedc 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -39,6 +39,7 @@ #include #include #include +#include #include "kernel-consumer.h" @@ -441,7 +442,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -502,6 +504,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = consumer_add_channel(new_channel, ctx); } + consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval); /* If we received an error in add_channel, we need to report it. */ if (ret < 0) { diff --git a/src/common/kernel-consumer/kernel-consumer.h b/src/common/kernel-consumer/kernel-consumer.h index 41482b13f..d8f74ca1f 100644 --- a/src/common/kernel-consumer/kernel-consumer.h +++ b/src/common/kernel-consumer/kernel-consumer.h @@ -24,6 +24,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos); int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, diff --git a/src/common/kernel-ctl/kernel-ctl.c b/src/common/kernel-ctl/kernel-ctl.c index 495301e5f..5ea3e1ae1 100644 --- a/src/common/kernel-ctl/kernel-ctl.c +++ b/src/common/kernel-ctl/kernel-ctl.c @@ -426,3 +426,9 @@ int kernctl_get_stream_id(int fd, uint64_t *stream_id) { return ioctl(fd, LTTNG_RING_BUFFER_GET_STREAM_ID, stream_id); } + +/* Returns the current timestamp. */ +int kernctl_get_current_timestamp(int fd, uint64_t *ts) +{ + return ioctl(fd, LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP, ts); +} diff --git a/src/common/kernel-ctl/kernel-ctl.h b/src/common/kernel-ctl/kernel-ctl.h index badf609a0..7c5f8bebe 100644 --- a/src/common/kernel-ctl/kernel-ctl.h +++ b/src/common/kernel-ctl/kernel-ctl.h @@ -74,5 +74,6 @@ int kernctl_get_events_discarded(int fd, uint64_t *events_discarded); int kernctl_get_content_size(int fd, uint64_t *content_size); int kernctl_get_packet_size(int fd, uint64_t *packet_size); int kernctl_get_stream_id(int fd, uint64_t *stream_id); +int kernctl_get_current_timestamp(int fd, uint64_t *ts); #endif /* _LTTNG_KERNEL_CTL_H */ diff --git a/src/common/kernel-ctl/kernel-ioctl.h b/src/common/kernel-ctl/kernel-ioctl.h index 1a3b16967..1db964e00 100644 --- a/src/common/kernel-ctl/kernel-ioctl.h +++ b/src/common/kernel-ctl/kernel-ioctl.h @@ -59,6 +59,8 @@ #define LTTNG_RING_BUFFER_GET_PACKET_SIZE _IOR(0xF6, 0x24, uint64_t) /* returns the stream id */ #define LTTNG_RING_BUFFER_GET_STREAM_ID _IOR(0xF6, 0x25, uint64_t) +/* returns the current timestamp */ +#define LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP _IOR(0xF6, 0x26, uint64_t) /* Old ABI (without support for 32/64 bits compat) */ /* LTTng file descriptor ioctl */ diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index bb20a64a0..a4c8a9261 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -116,6 +116,49 @@ error: return ret; } +/* + * Starting at 2.4, RELAYD_CREATE_SESSION takes additional parameters to + * support the live reading capability. + */ +static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock, + uint64_t *session_id, char *session_name, char *hostname, + int session_live_timer) +{ + int ret; + struct lttcomm_relayd_create_session_2_4 msg; + + strncpy(msg.session_name, session_name, sizeof(msg.session_name)); + strncpy(msg.hostname, hostname, sizeof(msg.hostname)); + msg.live_timer = htobe32(session_live_timer); + + /* Send command */ + ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + +/* + * RELAYD_CREATE_SESSION from 2.1 to 2.3. + */ +static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock, + uint64_t *session_id) +{ + int ret; + + /* Send command */ + ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + /* * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and * set session_id of the relayd if we have a successful reply from the relayd. @@ -123,7 +166,8 @@ error: * On success, return 0 else a negative value which is either an errno error or * a lttng error code from the relayd. */ -int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id) +int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id, + char *session_name, char *hostname, int session_live_timer) { int ret; struct lttcomm_relayd_status_session reply; @@ -133,8 +177,18 @@ int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_i DBG("Relayd create session"); - /* Send command */ - ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0); + switch(rsock->minor) { + case 1: + case 2: + case 3: + ret = relayd_create_session_2_1(rsock, session_id); + case 4: + default: + ret = relayd_create_session_2_4(rsock, session_id, + session_name, hostname, + session_live_timer); + } + if (ret < 0) { goto error; } diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index a49bab733..d59f4ae2f 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -25,7 +25,8 @@ int relayd_connect(struct lttcomm_relayd_sock *sock); int relayd_close(struct lttcomm_relayd_sock *sock); -int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id); +int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id, + char *session_name, char *hostname, int session_live_timer); int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id, uint64_t tracefile_size, uint64_t tracefile_count); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 3df68682a..bc27b0ed4 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -164,4 +164,13 @@ struct lttcomm_relayd_index { uint64_t stream_id; } LTTNG_PACKED; +/* + * Create session in 2.4 adds additionnal parameters for live reading. + */ +struct lttcomm_relayd_create_session_2_4 { + char session_name[NAME_MAX]; + char hostname[HOST_NAME_MAX]; + uint32_t live_timer; +} LTTNG_PACKED; + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 39ab69bf9..f86ba7323 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -106,6 +106,8 @@ enum lttcomm_relayd_command { RELAYD_ADD_INDEX = 12, RELAYD_SEND_INDEX = 13, RELAYD_CLOSE_INDEX = 14, + /* Live-reading commands. */ + RELAYD_LIST_SESSIONS = 15, }; /* @@ -349,6 +351,8 @@ struct lttcomm_consumer_msg { struct lttcomm_relayd_sock sock; /* Tracing session id associated to the relayd. */ uint64_t session_id; + /* Relayd session id, only used with control socket. */ + uint64_t relayd_session_id; } LTTNG_PACKED relayd_sock; struct { uint64_t net_seq_idx; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 113ae959e..192217b4e 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1120,7 +1120,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id); goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_RELAYD: @@ -1254,6 +1255,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, attr.switch_timer_interval = 0; } + consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval); + /* * Add the channel to the internal state AFTER all streams were created * and successfully sent to session daemon. This way, all streams must @@ -1269,6 +1272,9 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } consumer_metadata_cache_destroy(channel); } + if (channel->live_timer_enabled == 1) { + consumer_timer_live_stop(channel); + } goto end_channel_error; } diff --git a/tests/regression/tools/tracefile-limits/test_tracefile_count b/tests/regression/tools/tracefile-limits/test_tracefile_count index 0c23d843b..afe5435bd 100755 --- a/tests/regression/tools/tracefile-limits/test_tracefile_count +++ b/tests/regression/tools/tracefile-limits/test_tracefile_count @@ -94,7 +94,7 @@ function validate_file_count file_pattern="$2" expected_max_count="$3" - count=`find $path -name "$file_pattern" -type f | wc -l` + count=`find $path -name "$file_pattern" -type f \( ! -iname "*.idx" \) | wc -l` if [ "$count" -gt "$expected_max_count" ]; then fail "Validate file count: $file_pattern" diff --git a/tests/unit/Makefile.am b/tests/unit/Makefile.am index f517fdcbb..83bb8d351 100644 --- a/tests/unit/Makefile.am +++ b/tests/unit/Makefile.am @@ -13,6 +13,7 @@ LIBTAP=$(top_builddir)/tests/utils/tap/libtap.la LIBCOMMON=$(top_builddir)/src/common/libcommon.la LIBSESSIOND_COMM=$(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la LIBHASHTABLE=$(top_builddir)/src/common/hashtable/libhashtable.la +LIBRELAYD=$(top_builddir)/src/common/relayd/librelayd.la # Define test programs noinst_PROGRAMS = test_uri test_session test_kernel_data test_utils_parse_size_suffix @@ -36,8 +37,8 @@ SESSIONS=$(top_builddir)/src/bin/lttng-sessiond/session.o \ $(top_builddir)/src/common/.libs/error.o test_session_SOURCES = test_session.c -test_session_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBSESSIOND_COMM) $(LIBHASHTABLE) \ - -lrt +test_session_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBRELAYD) $(LIBSESSIOND_COMM) \ + $(LIBHASHTABLE) -lrt test_session_LDADD += $(SESSIONS) # UST data structures unit test @@ -58,8 +59,8 @@ UST_DATA_TRACE=$(top_builddir)/src/bin/lttng-sessiond/trace-ust.o \ $(top_builddir)/src/common/.libs/utils.o test_ust_data_SOURCES = test_ust_data.c -test_ust_data_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBSESSIOND_COMM) $(LIBHASHTABLE) \ - -lrt -llttng-ust-ctl +test_ust_data_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBRELAYD) $(LIBSESSIOND_COMM)\ + $(LIBHASHTABLE) -lrt -llttng-ust-ctl test_ust_data_LDADD += $(UST_DATA_TRACE) endif @@ -72,8 +73,8 @@ KERN_DATA_TRACE=$(top_builddir)/src/bin/lttng-sessiond/trace-kernel.o \ $(top_builddir)/src/common/.libs/utils.o test_kernel_data_SOURCES = test_kernel_data.c -test_kernel_data_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBSESSIOND_COMM) $(LIBHASHTABLE) \ - -lrt +test_kernel_data_LDADD = $(LIBTAP) $(LIBCOMMON) $(LIBRELAYD) $(LIBSESSIOND_COMM) \ + $(LIBHASHTABLE) -lrt test_kernel_data_LDADD += $(KERN_DATA_TRACE) # parse_size_suffix unit test