From 5a693d30f34648874fb7e0972d04011eb9906023 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 2 Sep 2015 22:57:40 -0400 Subject: [PATCH] Fix: Relay daemon ownership and reference counting MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The ownership and reference counting of the relay daemon is unclear and buggy in many ways. It is the cause of memory corruptions, double-free, leaks, segmentation faults, observed in various conditions. Fix this situation by introducing a clear ownership and reference counting scheme for this daemon. See doc/relayd-architecture.txt for details. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- doc/Makefile.am | 3 +- doc/relayd-architecture.txt | 101 +++ src/bin/lttng-relayd/Makefile.am | 4 +- src/bin/lttng-relayd/cmd-2-1.c | 25 +- src/bin/lttng-relayd/cmd-2-1.h | 10 +- src/bin/lttng-relayd/cmd-2-2.c | 31 +- src/bin/lttng-relayd/cmd-2-2.h | 10 +- src/bin/lttng-relayd/cmd-2-4.c | 19 +- src/bin/lttng-relayd/cmd-2-4.h | 10 +- src/bin/lttng-relayd/cmd-generic.c | 1 + src/bin/lttng-relayd/cmd-generic.h | 7 +- src/bin/lttng-relayd/cmd.h | 7 +- src/bin/lttng-relayd/connection.c | 125 ++- src/bin/lttng-relayd/connection.h | 71 +- src/bin/lttng-relayd/ctf-trace.c | 198 +++-- src/bin/lttng-relayd/ctf-trace.h | 66 +- src/bin/lttng-relayd/index.c | 346 +++++--- src/bin/lttng-relayd/index.h | 60 +- src/bin/lttng-relayd/live.c | 1082 +++++++++++------------ src/bin/lttng-relayd/live.h | 10 +- src/bin/lttng-relayd/lttng-relayd.h | 20 +- src/bin/lttng-relayd/main.c | 1179 ++++++++++--------------- src/bin/lttng-relayd/session.c | 215 +++-- src/bin/lttng-relayd/session.h | 115 ++- src/bin/lttng-relayd/stream-fd.c | 58 ++ src/bin/lttng-relayd/stream-fd.h | 32 + src/bin/lttng-relayd/stream.c | 353 ++++++-- src/bin/lttng-relayd/stream.h | 138 +-- src/bin/lttng-relayd/utils.h | 7 +- src/bin/lttng-relayd/viewer-session.c | 177 ++++ src/bin/lttng-relayd/viewer-session.h | 53 ++ src/bin/lttng-relayd/viewer-stream.c | 316 ++++--- src/bin/lttng-relayd/viewer-stream.h | 90 +- src/common/index/ctf-index.h | 2 +- src/common/index/index.c | 15 +- src/common/utils.c | 115 ++- src/common/utils.h | 2 + 37 files changed, 2977 insertions(+), 2096 deletions(-) create mode 100644 doc/relayd-architecture.txt create mode 100644 src/bin/lttng-relayd/stream-fd.c create mode 100644 src/bin/lttng-relayd/stream-fd.h create mode 100644 src/bin/lttng-relayd/viewer-session.c create mode 100644 src/bin/lttng-relayd/viewer-session.h diff --git a/doc/Makefile.am b/doc/Makefile.am index 4ff71ebbf..7ae930356 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,7 +1,8 @@ SUBDIRS = man EXTRA_DIST = quickstart.txt streaming-howto.txt python-howto.txt \ snapshot-howto.txt calibrate.txt kernel-CodingStyle.txt \ - live-reading-howto.txt live-reading-protocol.txt + live-reading-howto.txt live-reading-protocol.txt \ + relayd-architecture.txt dist_doc_DATA = quickstart.txt streaming-howto.txt python-howto.txt \ snapshot-howto.txt calibrate.txt live-reading-howto.txt \ diff --git a/doc/relayd-architecture.txt b/doc/relayd-architecture.txt new file mode 100644 index 000000000..1491da90f --- /dev/null +++ b/doc/relayd-architecture.txt @@ -0,0 +1,101 @@ +LTTng Relay Daemon Architecture +Mathieu Desnoyers, August 2015 + +This document describes the object model and architecture of the relay +daemon, after the refactoring done within the commit "Fix: Relay daemon +ownership and reference counting". + +We have the following object composition hierarchy: + +relay connection (main.c, for sessiond/consumer) + | + \-> 0 or 1 session + | + \-> 0 or many ctf-trace + | + \-> 0 or many stream + | | + | \-> 0 or many index + | + \-------> 0 or 1 viewer stream + +live connection (live.c, for client) + | + \-> 1 viewer session + | + \-> 0 or many session (actually a reference to session as created + | by the relay connection) + | + \-> ..... (ctf-trace, stream, index, viewer stream) + +There are global tables declared in lttng-relayd.h for sessions +(sessions_ht, indexed by session id), streams (relay_streams_ht, indexed +by stream handle), and viewer streams (viewer_streams_ht, indexed by +stream handle). The purpose of those tables is to allow fast lookup of +those objects using the IDs received in the communication protocols. + +There is also one connection hash table per worker thread. There is one +worker thread to receive data (main.c), and one worker thread to +interact with viewer clients (live.c). Those tables are indexed by +socket file descriptor. + +A RCU lookup+refcounting scheme has been introduced for all objects +(except viewer session which is still an exception at the moment). This +scheme allows looking up the objects or doing a traversal on the RCU +linked list or hash table in combination with a getter on the object. +This getter validates that there is still at least one reference to the +object, else the lookup acts just as if the object does not exist. This +scheme is protected by a "reflock" mutex in each object. "reflock" +mutexes can be nested from the innermost object to the outermost object. +IOW, the session reflock can nest within the ctf-trace reflock. + +The relay_connection (connection between the sessiond/consumer and the +relayd) is the outermost object of its hierarchy. + +The live connection (connection between a live client and the relayd) +is the outermost object of its hierarchy. + +There is also a "lock" mutex in each object. Those are used to +synchronize between threads (currently the main.c relay thread and +live.c client thread) when objects are shared. Locks can be nested from +the outermost object to the innermost object. IOW, the ctf-trace lock can +nest within the session lock. + +A "lock" should never nest within a "reflock". + +RCU linked lists are used to iterate using RCU, and are protected by +their own mutex for modifications. Iterations should be confirmed using +the object "getter" to ensure its refcount is not 0 (except in cases +where the caller actually owns the objects and therefore can assume its +refcount is not 0). + +RCU hash tables are used to iterate using RCU. Iteration should be +confirmed using the object "getter" to ensure its refcount is not 0 +(except again if we have ownership and can assume the object refcount is +not 0). + +Object creation has a refcount of 1. Each getter increments the +refcount, and needs to be paired with a "put" to decrement it. A final +put on "self" (ownership) will allow refcount to reach 0, therefore +triggering release, and thus free through call_rcu. + +In the composition scheme, we find back references from each composite +to its container. Therefore, each composite holds a reference (refcount) +on its container. This allows following pointers from e.g. viewer stream +to stream to ctf-trace to session without performing any validation, +due to transitive refcounting of those back-references. + +In addition to those back references, there are a few key ownership +references held. The connection in the relay worker thread (main.c) +holds ownership on the session, and on each stream it contains. The +connection in the live worker thread (live.c) holds ownership on each +viewer stream it creates. The rest is ensured by back references from +composite to container objects. When a connection is closed, it puts all +the ownership references it is holding. This will then eventually +trigger destruction of the session, streams, and viewer streams +associated with the connection when all the back references reach 0. + +RCU read-side locks are now only held during iteration on RCU lists and +hash tables, and within the internals of the get (lookup) and put +functions. Those functions then use refcounting to ensure existence of +the object when returned to their caller. diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index 126cdafc4..428f35202 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -17,7 +17,9 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ viewer-stream.h viewer-stream.c \ session.c session.h \ stream.c stream.h \ - connection.c connection.h + stream-fd.c stream-fd.h \ + connection.c connection.h \ + viewer-session.c viewer-session.h # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \ diff --git a/src/bin/lttng-relayd/cmd-2-1.c b/src/bin/lttng-relayd/cmd-2-1.c index 0cd9b5ab9..32a94386f 100644 --- a/src/bin/lttng-relayd/cmd-2-1.c +++ b/src/bin/lttng-relayd/cmd-2-1.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -28,14 +29,16 @@ #include "cmd-2-1.h" #include "utils.h" +/* + * cmd_recv_stream_2_1 allocates path_name and channel_name. + */ int cmd_recv_stream_2_1(struct relay_connection *conn, - struct relay_stream *stream) + char **ret_path_name, char **ret_channel_name) { int ret; struct lttcomm_relayd_add_stream stream_info; - - assert(conn); - assert(stream); + char *path_name = NULL; + char *channel_name = NULL; ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info)); if (ret < 0) { @@ -43,21 +46,25 @@ int cmd_recv_stream_2_1(struct relay_connection *conn, goto error; } - stream->path_name = create_output_path(stream_info.pathname); - if (stream->path_name == NULL) { + path_name = create_output_path(stream_info.pathname); + if (!path_name) { PERROR("Path name allocation"); ret = -ENOMEM; goto error; } - stream->channel_name = strdup(stream_info.channel_name); - if (stream->channel_name == NULL) { + channel_name = strdup(stream_info.channel_name); + if (!channel_name) { ret = -errno; PERROR("Path name allocation"); goto error; } - ret = 0; + *ret_path_name = path_name; + *ret_channel_name = channel_name; + return 0; error: + free(path_name); + free(channel_name); return ret; } diff --git a/src/bin/lttng-relayd/cmd-2-1.h b/src/bin/lttng-relayd/cmd-2-1.h index bab8190bf..46283dc5a 100644 --- a/src/bin/lttng-relayd/cmd-2-1.h +++ b/src/bin/lttng-relayd/cmd-2-1.h @@ -1,6 +1,10 @@ +#ifndef RELAYD_CMD_2_1_H +#define RELAYD_CMD_2_1_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,13 +20,9 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef RELAYD_CMD_2_1_H -#define RELAYD_CMD_2_1_H - #include "lttng-relayd.h" -#include "stream.h" int cmd_recv_stream_2_1(struct relay_connection *conn, - struct relay_stream *stream); + char **path_name, char **channel_name); #endif /* RELAYD_CMD_2_1_H */ diff --git a/src/bin/lttng-relayd/cmd-2-2.c b/src/bin/lttng-relayd/cmd-2-2.c index 7dd99ad5f..f82166cf3 100644 --- a/src/bin/lttng-relayd/cmd-2-2.c +++ b/src/bin/lttng-relayd/cmd-2-2.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -30,14 +31,17 @@ #include "cmd-2-1.h" #include "utils.h" +/* + * cmd_recv_stream_2_2 allocates path_name and channel_name. + */ int cmd_recv_stream_2_2(struct relay_connection *conn, - struct relay_stream *stream) + char **ret_path_name, char **ret_channel_name, + uint64_t *tracefile_size, uint64_t *tracefile_count) { int ret; struct lttcomm_relayd_add_stream_2_2 stream_info; - - assert(conn); - assert(stream); + char *path_name = NULL; + char *channel_name = NULL; ret = cmd_recv(conn->sock, &stream_info, sizeof(stream_info)); if (ret < 0) { @@ -45,24 +49,27 @@ int cmd_recv_stream_2_2(struct relay_connection *conn, goto error; } - stream->path_name = create_output_path(stream_info.pathname); - if (stream->path_name == NULL) { + path_name = create_output_path(stream_info.pathname); + if (!path_name) { PERROR("Path name allocation"); ret = -ENOMEM; goto error; } - stream->channel_name = strdup(stream_info.channel_name); - if (stream->channel_name == NULL) { + channel_name = strdup(stream_info.channel_name); + if (!channel_name) { ret = -errno; PERROR("Path name allocation"); goto error; } - stream->tracefile_size = be64toh(stream_info.tracefile_size); - stream->tracefile_count = be64toh(stream_info.tracefile_count); - ret = 0; - + *tracefile_size = be64toh(stream_info.tracefile_size); + *tracefile_count = be64toh(stream_info.tracefile_count); + *ret_path_name = path_name; + *ret_channel_name = channel_name; + return 0; error: + free(path_name); + free(channel_name); return ret; } diff --git a/src/bin/lttng-relayd/cmd-2-2.h b/src/bin/lttng-relayd/cmd-2-2.h index bd1cd1414..894a63a1e 100644 --- a/src/bin/lttng-relayd/cmd-2-2.h +++ b/src/bin/lttng-relayd/cmd-2-2.h @@ -1,6 +1,10 @@ +#ifndef RELAYD_CMD_2_2_H +#define RELAYD_CMD_2_2_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,12 +20,10 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef RELAYD_CMD_2_2_H -#define RELAYD_CMD_2_2_H - #include "lttng-relayd.h" int cmd_recv_stream_2_2(struct relay_connection *conn, - struct relay_stream *stream); + char **path_name, char **channel_name, + uint64_t *tracefile_size, uint64_t *tracefile_count); #endif /* RELAYD_CMD_2_2_H */ diff --git a/src/bin/lttng-relayd/cmd-2-4.c b/src/bin/lttng-relayd/cmd-2-4.c index d8aa73704..a3290cb25 100644 --- a/src/bin/lttng-relayd/cmd-2-4.c +++ b/src/bin/lttng-relayd/cmd-2-4.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -30,26 +31,24 @@ #include "lttng-relayd.h" int cmd_create_session_2_4(struct relay_connection *conn, - struct relay_session *session) + char *session_name, char *hostname, + uint32_t *live_timer, bool *snapshot) { int ret; struct lttcomm_relayd_create_session_2_4 session_info; - assert(conn); - assert(session); - ret = cmd_recv(conn->sock, &session_info, sizeof(session_info)); if (ret < 0) { ERR("Unable to recv session info version 2.4"); goto error; } - strncpy(session->session_name, session_info.session_name, - sizeof(session->session_name)); - strncpy(session->hostname, session_info.hostname, - sizeof(session->hostname)); - session->live_timer = be32toh(session_info.live_timer); - session->snapshot = be32toh(session_info.snapshot); + strncpy(session_name, session_info.session_name, + sizeof(session_info.session_name)); + strncpy(hostname, session_info.hostname, + sizeof(session_info.hostname)); + *live_timer = be32toh(session_info.live_timer); + *snapshot = be32toh(session_info.snapshot); ret = 0; diff --git a/src/bin/lttng-relayd/cmd-2-4.h b/src/bin/lttng-relayd/cmd-2-4.h index aaf572a1c..ab7347897 100644 --- a/src/bin/lttng-relayd/cmd-2-4.h +++ b/src/bin/lttng-relayd/cmd-2-4.h @@ -1,6 +1,10 @@ +#ifndef RELAYD_CMD_2_4_H +#define RELAYD_CMD_2_4_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,12 +20,10 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef RELAYD_CMD_2_4_H -#define RELAYD_CMD_2_4_H - #include "lttng-relayd.h" int cmd_create_session_2_4(struct relay_connection *conn, - struct relay_session *session); + char *session_name, char *hostname, + uint32_t *live_timer, bool *snapshot); #endif /* RELAYD_CMD_2_4_H */ diff --git a/src/bin/lttng-relayd/cmd-generic.c b/src/bin/lttng-relayd/cmd-generic.c index 417d6d33f..276e85bb3 100644 --- a/src/bin/lttng-relayd/cmd-generic.c +++ b/src/bin/lttng-relayd/cmd-generic.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as diff --git a/src/bin/lttng-relayd/cmd-generic.h b/src/bin/lttng-relayd/cmd-generic.h index 640fed7e4..4551f0aa1 100644 --- a/src/bin/lttng-relayd/cmd-generic.h +++ b/src/bin/lttng-relayd/cmd-generic.h @@ -1,6 +1,10 @@ +#ifndef RELAYD_CMD_GENERIC_H +#define RELAYD_CMD_GENERIC_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,9 +20,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef RELAYD_CMD_GENERIC_H -#define RELAYD_CMD_GENERIC_H - #include #include "connection.h" diff --git a/src/bin/lttng-relayd/cmd.h b/src/bin/lttng-relayd/cmd.h index c8b37d5e5..88db09aed 100644 --- a/src/bin/lttng-relayd/cmd.h +++ b/src/bin/lttng-relayd/cmd.h @@ -1,6 +1,10 @@ +#ifndef RELAYD_CMD_H +#define RELAYD_CMD_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,9 +20,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef RELAYD_CMD_H -#define RELAYD_CMD_H - #include "cmd-generic.h" #include "cmd-2-1.h" #include "cmd-2-2.h" diff --git a/src/bin/lttng-relayd/connection.c b/src/bin/lttng-relayd/connection.c index 76e48a6ab..fce6c84d5 100644 --- a/src/bin/lttng-relayd/connection.c +++ b/src/bin/lttng-relayd/connection.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -19,91 +20,129 @@ #define _GNU_SOURCE #define _LGPL_SOURCE #include +#include #include "connection.h" #include "stream.h" +#include "viewer-session.h" -static void rcu_free_connection(struct rcu_head *head) +bool connection_get(struct relay_connection *conn) { - struct relay_connection *conn = - caa_container_of(head, struct relay_connection, rcu_node); + bool has_ref = false; - lttcomm_destroy_sock(conn->sock); - connection_free(conn); + pthread_mutex_lock(&conn->reflock); + if (conn->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&conn->ref); + } + pthread_mutex_unlock(&conn->reflock); + + return has_ref; } -/* - * Must be called with a read side lock held. The read side lock must be - * kept until the returned relay_connection is no longer in use. - */ -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, int sock) +struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht, + int sock) { struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct relay_connection *conn = NULL; - assert(ht); assert(sock >= 0); - lttng_ht_lookup(ht, (void *)((unsigned long) sock), &iter); + rcu_read_lock(); + lttng_ht_lookup(relay_connections_ht, (void *)((unsigned long) sock), + &iter); node = lttng_ht_iter_get_node_ulong(&iter); if (!node) { DBG2("Relay connection by sock %d not found", sock); goto end; } conn = caa_container_of(node, struct relay_connection, sock_n); - + if (!connection_get(conn)) { + conn = NULL; + } end: + rcu_read_unlock(); return conn; } -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn) +struct relay_connection *connection_create(struct lttcomm_sock *sock, + enum connection_type type) { - int ret; - struct lttng_ht_iter iter; - - assert(ht); - assert(conn); + struct relay_connection *conn; - iter.iter.node = &conn->sock_n.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); + conn = zmalloc(sizeof(*conn)); + if (!conn) { + PERROR("zmalloc relay connection"); + goto end; + } + pthread_mutex_init(&conn->reflock, NULL); + urcu_ref_init(&conn->ref); + conn->type = type; + conn->sock = sock; + lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); +end: + return conn; } -void connection_destroy(struct relay_connection *conn) +static void rcu_free_connection(struct rcu_head *head) { - assert(conn); + struct relay_connection *conn = + caa_container_of(head, struct relay_connection, rcu_node); + lttcomm_destroy_sock(conn->sock); + if (conn->viewer_session) { + viewer_session_destroy(conn->viewer_session); + conn->viewer_session = NULL; + } + free(conn); +} + +static void destroy_connection(struct relay_connection *conn) +{ call_rcu(&conn->rcu_node, rcu_free_connection); } -struct relay_connection *connection_create(void) +static void connection_release(struct urcu_ref *ref) { - struct relay_connection *conn; + struct relay_connection *conn = + caa_container_of(ref, struct relay_connection, ref); - conn = zmalloc(sizeof(*conn)); - if (!conn) { - PERROR("zmalloc relay connection"); - goto error; + if (conn->in_socket_ht) { + struct lttng_ht_iter iter; + int ret; + + iter.iter.node = &conn->sock_n.node; + ret = lttng_ht_del(conn->socket_ht, &iter); + assert(!ret); } -error: - return conn; + if (conn->session) { + if (session_close(conn->session)) { + ERR("session_close"); + } + conn->session = NULL; + } + if (conn->viewer_session) { + viewer_session_close(conn->viewer_session); + } + destroy_connection(conn); } -void connection_init(struct relay_connection *conn) +void connection_put(struct relay_connection *conn) { - assert(conn); - assert(conn->sock); - - CDS_INIT_LIST_HEAD(&conn->recv_head); - lttng_ht_node_init_ulong(&conn->sock_n, (unsigned long) conn->sock->fd); + rcu_read_lock(); + pthread_mutex_lock(&conn->reflock); + urcu_ref_put(&conn->ref, connection_release); + pthread_mutex_unlock(&conn->reflock); + rcu_read_unlock(); } -void connection_free(struct relay_connection *conn) +void connection_ht_add(struct lttng_ht *relay_connections_ht, + struct relay_connection *conn) { - assert(conn); - - free(conn->viewer_session); - free(conn); + assert(!conn->in_socket_ht); + lttng_ht_add_unique_ulong(relay_connections_ht, &conn->sock_n); + conn->in_socket_ht = 1; + conn->socket_ht = relay_connections_ht; } diff --git a/src/bin/lttng-relayd/connection.h b/src/bin/lttng-relayd/connection.h index 70fe4ba3d..443863813 100644 --- a/src/bin/lttng-relayd/connection.h +++ b/src/bin/lttng-relayd/connection.h @@ -1,6 +1,10 @@ +#ifndef _CONNECTION_H +#define _CONNECTION_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,9 +20,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _CONNECTION_H -#define _CONNECTION_H - #include #include #include @@ -32,6 +33,7 @@ #include "session.h" enum connection_type { + RELAY_CONNECTION_UNKNOWN = 0, RELAY_DATA = 1, RELAY_CONTROL = 2, RELAY_VIEWER_COMMAND = 3, @@ -41,39 +43,60 @@ enum connection_type { /* * Internal structure to map a socket with the corresponding session. * A hashtable indexed on the socket FD is used for the lookups. + * + * Connections are assumed to be accessed from a single thread. Live + * connections between the relay and a live client are only accessed + * from the live worker thread. + * + * The connections between the consumerd/sessiond and the relayd are only + * handled by the "main" worker thread (as in, the worker thread in main.c). + * + * This is why there are no back references to connections from the + * sessions and session list. */ struct relay_connection { struct lttcomm_sock *sock; - struct relay_session *session; - struct relay_viewer_session *viewer_session; struct cds_wfcq_node qnode; - struct lttng_ht_node_ulong sock_n; - struct rcu_head rcu_node; + enum connection_type type; - /* Protocol version to use for this connection. */ + /* + * session is only ever set for RELAY_CONTROL connection type. + */ + struct relay_session *session; + /* + * viewer_session is only ever set for RELAY_VIEWER_COMMAND + * connection type. + */ + struct relay_viewer_session *viewer_session; + + /* + * Protocol version to use for this connection. Only valid for + * RELAY_CONTROL connection type. + */ uint32_t major; uint32_t minor; - uint64_t session_id; + + struct urcu_ref ref; + pthread_mutex_t reflock; + + bool version_check_done; /* - * This contains streams that are received on that connection. It's used to - * store them until we get the streams sent command where they are removed - * and flagged ready for the viewer. This is ONLY used by the control - * thread thus any action on it should happen in that thread. + * Node member of connection within global socket hash table. */ - struct cds_list_head recv_head; - unsigned int version_check_done:1; - - /* Pointer to the sessions HT that this connection can use. */ - struct lttng_ht *sessions_ht; + struct lttng_ht_node_ulong sock_n; + bool in_socket_ht; + struct lttng_ht *socket_ht; /* HACK: Contained within this hash table. */ + struct rcu_head rcu_node; /* For call_rcu teardown. */ }; -struct relay_connection *connection_find_by_sock(struct lttng_ht *ht, +struct relay_connection *connection_create(struct lttcomm_sock *sock, + enum connection_type type); +struct relay_connection *connection_get_by_sock(struct lttng_ht *relay_connections_ht, int sock); -struct relay_connection *connection_create(void); -void connection_init(struct relay_connection *conn); -void connection_delete(struct lttng_ht *ht, struct relay_connection *conn); -void connection_destroy(struct relay_connection *conn); -void connection_free(struct relay_connection *conn); +bool connection_get(struct relay_connection *connection); +void connection_put(struct relay_connection *connection); +void connection_ht_add(struct lttng_ht *relay_connections_ht, + struct relay_connection *conn); #endif /* _CONNECTION_H */ diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c index 02a8b2bf1..d965cec8c 100644 --- a/src/bin/lttng-relayd/ctf-trace.c +++ b/src/bin/lttng-relayd/ctf-trace.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -22,148 +23,191 @@ #include #include +#include #include "ctf-trace.h" #include "lttng-relayd.h" #include "stream.h" static uint64_t last_relay_ctf_trace_id; +static pthread_mutex_t last_relay_ctf_trace_id_lock = PTHREAD_MUTEX_INITIALIZER; -static void rcu_destroy_ctf_trace(struct rcu_head *head) +static void rcu_destroy_ctf_trace(struct rcu_head *rcu_head) { - struct lttng_ht_node_str *node = - caa_container_of(head, struct lttng_ht_node_str, head); - struct ctf_trace *trace= - caa_container_of(node, struct ctf_trace, node); + struct ctf_trace *trace = + caa_container_of(rcu_head, struct ctf_trace, rcu_node); free(trace); } -static void rcu_destroy_stream(struct rcu_head *head) -{ - struct relay_stream *stream = - caa_container_of(head, struct relay_stream, rcu_node); - - stream_destroy(stream); -} - /* * Destroy a ctf trace and all stream contained in it. * * MUST be called with the RCU read side lock. */ -void ctf_trace_destroy(struct ctf_trace *obj) +void ctf_trace_destroy(struct ctf_trace *trace) { - struct relay_stream *stream, *tmp_stream; - - assert(obj); /* - * Getting to this point, every stream referenced to that object have put - * back their ref since the've been closed by the control side. + * Getting to this point, every stream referenced by that trace + * have put back their ref since the've been closed by the + * control side. */ - assert(!obj->refcount); + assert(cds_list_empty(&trace->stream_list)); + session_put(trace->session); + trace->session = NULL; + call_rcu(&trace->rcu_node, rcu_destroy_ctf_trace); +} - cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list, - trace_list) { - stream_delete(relay_streams_ht, stream); - call_rcu(&stream->rcu_node, rcu_destroy_stream); - } +void ctf_trace_release(struct urcu_ref *ref) +{ + struct ctf_trace *trace = + caa_container_of(ref, struct ctf_trace, ref); + int ret; + struct lttng_ht_iter iter; - call_rcu(&obj->node.head, rcu_destroy_ctf_trace); + iter.iter.node = &trace->node.node; + ret = lttng_ht_del(trace->session->ctf_traces_ht, &iter); + assert(!ret); + ctf_trace_destroy(trace); } -void ctf_trace_try_destroy(struct relay_session *session, - struct ctf_trace *ctf_trace) +/* + * Should be called with RCU read-side lock held. + */ +bool ctf_trace_get(struct ctf_trace *trace) { - assert(session); - assert(ctf_trace); + bool has_ref = false; - /* - * Considering no viewer attach to the session and the trace having no more - * stream attached, wipe the trace. - */ - if (uatomic_read(&session->viewer_refcount) == 0 && - uatomic_read(&ctf_trace->refcount) == 0) { - ctf_trace_delete(session->ctf_traces_ht, ctf_trace); - ctf_trace_destroy(ctf_trace); + /* Confirm that the trace refcount has not reached 0. */ + pthread_mutex_lock(&trace->reflock); + if (trace->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&trace->ref); } + pthread_mutex_unlock(&trace->reflock); + + return has_ref; } /* - * Create and return an allocated ctf_trace object. NULL on error. + * Create and return an allocated ctf_trace. NULL on error. + * There is no "open" and "close" for a ctf_trace, but rather just a + * create and refcounting. Whenever all the streams belonging to a trace + * put their reference, its refcount drops to 0. */ -struct ctf_trace *ctf_trace_create(char *path_name) +static struct ctf_trace *ctf_trace_create(struct relay_session *session, + char *path_name) { - struct ctf_trace *obj; + struct ctf_trace *trace; - assert(path_name); - - obj = zmalloc(sizeof(*obj)); - if (!obj) { + trace = zmalloc(sizeof(*trace)); + if (!trace) { PERROR("ctf_trace alloc"); goto error; } - CDS_INIT_LIST_HEAD(&obj->stream_list); + if (!session_get(session)) { + ERR("Cannot get session"); + free(trace); + trace = NULL; + goto error; + } + trace->session = session; + + CDS_INIT_LIST_HEAD(&trace->stream_list); + + pthread_mutex_lock(&last_relay_ctf_trace_id_lock); + trace->id = ++last_relay_ctf_trace_id; + pthread_mutex_unlock(&last_relay_ctf_trace_id_lock); - obj->id = ++last_relay_ctf_trace_id; - lttng_ht_node_init_str(&obj->node, path_name); + lttng_ht_node_init_str(&trace->node, path_name); + trace->session = session; + urcu_ref_init(&trace->ref); + pthread_mutex_init(&trace->lock, NULL); + pthread_mutex_init(&trace->reflock, NULL); + pthread_mutex_init(&trace->stream_list_lock, NULL); + lttng_ht_add_str(session->ctf_traces_ht, &trace->node); - DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name); + DBG("Created ctf_trace %" PRIu64 " with path: %s", trace->id, path_name); error: - return obj; + return trace; } /* - * Return a ctf_trace object if found by id in the given hash table else NULL. - * - * Must be called with rcu_read_lock() taken. + * Return a ctf_trace if found by id in the given hash table else NULL. + * Hold a reference on the ctf_trace, and must be paired with + * ctf_trace_put(). */ -struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht, +struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session, char *path_name) { struct lttng_ht_node_str *node; struct lttng_ht_iter iter; struct ctf_trace *trace = NULL; - assert(ht); - - lttng_ht_lookup(ht, (void *) path_name, &iter); + rcu_read_lock(); + lttng_ht_lookup(session->ctf_traces_ht, (void *) path_name, &iter); node = lttng_ht_iter_get_node_str(&iter); if (!node) { DBG("CTF Trace path %s not found", path_name); goto end; } trace = caa_container_of(node, struct ctf_trace, node); - + if (!ctf_trace_get(trace)) { + trace = NULL; + } end: + rcu_read_unlock(); + if (!trace) { + /* Try to create */ + trace = ctf_trace_create(session, path_name); + } return trace; } -/* - * Add stream to a given hash table. - */ -void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace) +void ctf_trace_put(struct ctf_trace *trace) { - assert(ht); - assert(trace); - - lttng_ht_add_str(ht, &trace->node); + rcu_read_lock(); + pthread_mutex_lock(&trace->reflock); + urcu_ref_put(&trace->ref, ctf_trace_release); + pthread_mutex_unlock(&trace->reflock); + rcu_read_unlock(); } -/* - * Delete stream from a given hash table. - */ -void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace) +int ctf_trace_close(struct ctf_trace *trace) { - int ret; - struct lttng_ht_iter iter; + struct relay_stream *stream; + + rcu_read_lock(); + cds_list_for_each_entry_rcu(stream, &trace->stream_list, + stream_node) { + /* + * Close the stream. + */ + stream_close(stream); + } + rcu_read_unlock(); + /* + * Since all references to the trace are held by its streams, we + * don't need to do any self-ref put. + */ + return 0; +} - assert(ht); - assert(trace); +struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace) +{ + struct relay_viewer_stream *vstream; - iter.iter.node = &trace->node.node; - ret = lttng_ht_del(ht, &iter); - assert(!ret); + rcu_read_lock(); + vstream = rcu_dereference(trace->viewer_metadata_stream); + if (!vstream) { + goto end; + } + if (!viewer_stream_get(vstream)) { + vstream = NULL; + } +end: + rcu_read_unlock(); + return vstream; } diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h index 489c5f1d5..d051f8083 100644 --- a/src/bin/lttng-relayd/ctf-trace.h +++ b/src/bin/lttng-relayd/ctf-trace.h @@ -1,6 +1,10 @@ +#ifndef _CTF_TRACE_H +#define _CTF_TRACE_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,49 +20,53 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _CTF_TRACE_H -#define _CTF_TRACE_H - #include +#include #include #include "session.h" #include "stream.h" +#include "viewer-stream.h" struct ctf_trace { - int refcount; - unsigned int invalid_flag:1; + /* + * The ctf_trace reflock nests inside the stream reflock. + */ + pthread_mutex_t reflock; /* Protects refcounting */ + struct urcu_ref ref; /* Every stream has a ref on the trace. */ + struct relay_session *session; /* Back ref to trace session */ + + /* + * The ctf_trace lock nests inside the session lock. + */ + pthread_mutex_t lock; uint64_t id; - uint64_t metadata_received; - uint64_t metadata_sent; - struct relay_stream *metadata_stream; - struct relay_viewer_stream *viewer_metadata_stream; - /* Node indexed by stream path name in the corresponding session. */ - struct lttng_ht_node_str node; + struct relay_viewer_stream *viewer_metadata_stream; /* RCU protected */ - /* Relay stream associated with this ctf trace. */ + /* + * Relay streams associated with this ctf trace. + * Updates are protected by the stream_list lock. + * Traversals are protected by RCU. + */ struct cds_list_head stream_list; + pthread_mutex_t stream_list_lock; + + /* + * Node within session trace hash table. Node is indexed by + * stream path name. + */ + struct lttng_ht_node_str node; + struct rcu_head rcu_node; /* For call_rcu teardown. */ }; -static inline void ctf_trace_get_ref(struct ctf_trace *trace) -{ - uatomic_inc(&trace->refcount); -} +struct ctf_trace *ctf_trace_get_by_path_or_create(struct relay_session *session, + char *path_name); +bool ctf_trace_get(struct ctf_trace *trace); +void ctf_trace_put(struct ctf_trace *trace); -static inline void ctf_trace_put_ref(struct ctf_trace *trace) -{ - uatomic_add(&trace->refcount, -1); -} +int ctf_trace_close(struct ctf_trace *trace); -void ctf_trace_assign(struct relay_stream *stream); -struct ctf_trace *ctf_trace_create(char *path_name); -void ctf_trace_destroy(struct ctf_trace *obj); -void ctf_trace_try_destroy(struct relay_session *session, - struct ctf_trace *ctf_trace); -struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht, - char *path_name); -void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace); -void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace); +struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace); #endif /* _CTF_TRACE_H */ diff --git a/src/bin/lttng-relayd/index.c b/src/bin/lttng-relayd/index.c index b7507a022..7182e36cc 100644 --- a/src/bin/lttng-relayd/index.c +++ b/src/bin/lttng-relayd/index.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -24,190 +25,311 @@ #include #include "lttng-relayd.h" +#include "stream.h" #include "index.h" /* - * Deferred free of a relay index object. MUST only be called by a call RCU. + * Allocate a new relay index object. Pass the stream in which it is + * contained as parameter. The sequence number will be used as the hash + * table key. + * + * Called with stream mutex held. + * Return allocated object or else NULL on error. */ -static void deferred_free_relay_index(struct rcu_head *head) +static struct relay_index *relay_index_create(struct relay_stream *stream, + uint64_t net_seq_num) { - struct relay_index *index = - caa_container_of(head, struct relay_index, rcu_node); + struct relay_index *index; - if (index->to_close_fd >= 0) { - int ret; + DBG2("Creating relay index for stream id %" PRIu64 " and seqnum %" PRIu64, + stream->stream_handle, net_seq_num); - ret = close(index->to_close_fd); - if (ret < 0) { - PERROR("Relay index to close fd %d", index->to_close_fd); - } + index = zmalloc(sizeof(*index)); + if (!index) { + PERROR("Relay index zmalloc"); + goto end; + } + if (!stream_get(stream)) { + ERR("Cannot get stream"); + free(index); + index = NULL; + goto end; } + index->stream = stream; - relay_index_free(index); + lttng_ht_node_init_u64(&index->index_n, net_seq_num); + pthread_mutex_init(&index->lock, NULL); + pthread_mutex_init(&index->reflock, NULL); + urcu_ref_init(&index->ref); + +end: + return index; } /* - * Allocate a new relay index object using the given stream ID and sequence - * number as the hash table key. + * Add unique relay index to the given hash table. In case of a collision, the + * already existing object is put in the given _index variable. * - * Return allocated object or else NULL on error. + * RCU read side lock MUST be acquired. */ -struct relay_index *relay_index_create(uint64_t stream_id, - uint64_t net_seq_num) +static struct relay_index *relay_index_add_unique(struct relay_stream *stream, + struct relay_index *index) { - struct relay_index *index; + struct cds_lfht_node *node_ptr; + struct relay_index *_index; - DBG2("Creating relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - stream_id, net_seq_num); + DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, + stream->stream_handle, index->index_n.key); - index = zmalloc(sizeof(*index)); - if (index == NULL) { - PERROR("Relay index zmalloc"); - goto error; + node_ptr = cds_lfht_add_unique(stream->indexes_ht->ht, + stream->indexes_ht->hash_fct(&index->index_n, lttng_ht_seed), + stream->indexes_ht->match_fct, &index->index_n, + &index->index_n.node); + if (node_ptr != &index->index_n.node) { + _index = caa_container_of(node_ptr, struct relay_index, + index_n.node); + } else { + _index = NULL; } + return _index; +} + +/* + * Should be called with RCU read-side lock held. + */ +static bool relay_index_get(struct relay_index *index) +{ + bool has_ref = false; - index->to_close_fd = -1; - lttng_ht_node_init_two_u64(&index->index_n, stream_id, net_seq_num); + DBG2("index get for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", + index->stream->stream_handle, index->index_n.key, + (int) index->ref.refcount); -error: - return index; + /* Confirm that the index refcount has not reached 0. */ + pthread_mutex_lock(&index->reflock); + if (index->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&index->ref); + } + pthread_mutex_unlock(&index->reflock); + + return has_ref; } /* - * Find a relayd index in the given hash table. + * Get a relayd index in within the given stream, or create it if not + * present. * + * Called with stream mutex held. * Return index object or else NULL on error. */ -struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num) +struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, + uint64_t net_seq_num) { - struct lttng_ht_node_two_u64 *node; + struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct lttng_ht_two_u64 key; struct relay_index *index = NULL; DBG3("Finding index for stream id %" PRIu64 " and seq_num %" PRIu64, - stream_id, net_seq_num); - - key.key1 = stream_id; - key.key2 = net_seq_num; + stream->stream_handle, net_seq_num); - lttng_ht_lookup(indexes_ht, (void *)(&key), &iter); - node = lttng_ht_iter_get_node_two_u64(&iter); - if (node == NULL) { - goto end; + rcu_read_lock(); + lttng_ht_lookup(stream->indexes_ht, &net_seq_num, &iter); + node = lttng_ht_iter_get_node_u64(&iter); + if (node) { + index = caa_container_of(node, struct relay_index, index_n); + } else { + struct relay_index *oldindex; + + index = relay_index_create(stream, net_seq_num); + if (!index) { + ERR("Cannot create index for stream id %" PRIu64 " and seq_num %" PRIu64, + index->stream->stream_handle, net_seq_num); + goto end; + } + oldindex = relay_index_add_unique(stream, index); + if (oldindex) { + /* Added concurrently, keep old. */ + relay_index_put(index); + index = oldindex; + if (!relay_index_get(index)) { + index = NULL; + } + } else { + stream->indexes_in_flight++; + index->in_hash_table = true; + } } - index = caa_container_of(node, struct relay_index, index_n); - end: - DBG2("Index %sfound in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, - (index == NULL) ? "NOT " : "", stream_id, net_seq_num); + rcu_read_unlock(); + DBG2("Index %sfound or created in HT for stream ID %" PRIu64 " and seqnum %" PRIu64, + (index == NULL) ? "NOT " : "", index->stream->stream_handle, net_seq_num); return index; } -/* - * Add unique relay index to the given hash table. In case of a collision, the - * already existing object is put in the given _index variable. - * - * RCU read side lock MUST be acquired. - */ -void relay_index_add(struct relay_index *index, struct relay_index **_index) +int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd, + uint64_t data_offset) { - struct cds_lfht_node *node_ptr; + int ret = 0; - assert(index); + pthread_mutex_lock(&index->lock); + if (index->index_fd) { + ret = -1; + goto end; + } + stream_fd_get(index_fd); + index->index_fd = index_fd; + index->index_data.offset = data_offset; +end: + pthread_mutex_unlock(&index->lock); + return ret; +} - DBG2("Adding relay index with stream id %" PRIu64 " and seqnum %" PRIu64, - index->index_n.key.key1, index->index_n.key.key2); +int relay_index_set_data(struct relay_index *index, + const struct ctf_packet_index *data) +{ + int ret = 0; - node_ptr = cds_lfht_add_unique(indexes_ht->ht, - indexes_ht->hash_fct((void *) &index->index_n.key, lttng_ht_seed), - indexes_ht->match_fct, (void *) &index->index_n.key, - &index->index_n.node); - if (node_ptr != &index->index_n.node) { - *_index = caa_container_of(node_ptr, struct relay_index, index_n.node); + pthread_mutex_lock(&index->lock); + if (index->has_index_data) { + ret = -1; + goto end; } + /* Set everything except data_offset. */ + index->index_data.packet_size = data->packet_size; + index->index_data.content_size = data->content_size; + index->index_data.timestamp_begin = data->timestamp_begin; + index->index_data.timestamp_end = data->timestamp_end; + index->index_data.events_discarded = data->events_discarded; + index->index_data.stream_id = data->stream_id; + index->has_index_data = true; +end: + pthread_mutex_unlock(&index->lock); + return ret; } -/* - * Write index on disk to the given fd. Once done error or not, it is removed - * from the hash table and destroy the object. - * - * MUST be called with a RCU read side lock held. - * - * Return 0 on success else a negative value. - */ -int relay_index_write(int fd, struct relay_index *index) +static void index_destroy(struct relay_index *index) { - int ret; - struct lttng_ht_iter iter; - - DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 - " on fd %d", index->index_n.key.key1, - index->index_n.key.key2, fd); + free(index); +} - /* Delete index from hash table. */ - iter.iter.node = &index->index_n.node; - ret = lttng_ht_del(indexes_ht, &iter); - assert(!ret); - call_rcu(&index->rcu_node, deferred_free_relay_index); +static void index_destroy_rcu(struct rcu_head *rcu_head) +{ + struct relay_index *index = + caa_container_of(rcu_head, struct relay_index, rcu_node); - return index_write(fd, &index->index_data, sizeof(index->index_data)); + index_destroy(index); } -/* - * Free the given index. - */ -void relay_index_free(struct relay_index *index) +/* Stream lock must be held by the caller. */ +static void index_release(struct urcu_ref *ref) { - free(index); + struct relay_index *index = caa_container_of(ref, struct relay_index, ref); + struct relay_stream *stream = index->stream; + int ret; + struct lttng_ht_iter iter; + + if (index->index_fd) { + stream_fd_put(index->index_fd); + index->index_fd = NULL; + } + if (index->in_hash_table) { + /* Delete index from hash table. */ + iter.iter.node = &index->index_n.node; + ret = lttng_ht_del(stream->indexes_ht, &iter); + assert(!ret); + stream->indexes_in_flight--; + } + + stream_put(index->stream); + index->stream = NULL; + + call_rcu(&index->rcu_node, index_destroy_rcu); } /* - * Safely free the given index using a call RCU. + * Called with stream mutex held. + * + * Stream lock must be held by the caller. */ -void relay_index_free_safe(struct relay_index *index) +void relay_index_put(struct relay_index *index) { - if (!index) { - return; - } - - call_rcu(&index->rcu_node, deferred_free_relay_index); + DBG2("index put for stream id %" PRIu64 " and seqnum %" PRIu64 " refcount %d", + index->stream->stream_handle, index->index_n.key, + (int) index->ref.refcount); + /* + * Ensure existance of index->lock for index unlock. + */ + rcu_read_lock(); + /* + * Index lock ensures that concurrent test and update of stream + * ref is atomic. + */ + pthread_mutex_lock(&index->reflock); + assert(index->ref.refcount != 0); + urcu_ref_put(&index->ref, index_release); + pthread_mutex_unlock(&index->reflock); + rcu_read_unlock(); } /* - * Delete index from the given hash table. + * Try to flush index to disk. Releases self-reference to index once + * flush succeeds. * - * RCU read side lock MUST be acquired. + * Stream lock must be held by the caller. + * Return 0 on successful flush, a negative value on error, or positive + * value if no flush was performed. */ -void relay_index_delete(struct relay_index *index) +int relay_index_try_flush(struct relay_index *index) { - int ret; - struct lttng_ht_iter iter; + int ret = 1; + bool flushed = false; + int fd; - DBG3("Relay index with stream ID %" PRIu64 " and seq num %" PRIu64 - " deleted.", index->index_n.key.key1, - index->index_n.key.key2); + pthread_mutex_lock(&index->lock); + if (index->flushed) { + goto skip; + } + /* Check if we are ready to flush. */ + if (!index->has_index_data || !index->index_fd) { + goto skip; + } + fd = index->index_fd->fd; + DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64 + " on fd %d", index->stream->stream_handle, + index->index_n.key, fd); + flushed = true; + index->flushed = true; + ret = index_write(fd, &index->index_data, sizeof(index->index_data)); + if (ret == sizeof(index->index_data)) { + ret = 0; + } else { + ret = -1; + } +skip: + pthread_mutex_unlock(&index->lock); - /* Delete index from hash table. */ - iter.iter.node = &index->index_n.node; - ret = lttng_ht_del(indexes_ht, &iter); - assert(!ret); + if (flushed) { + /* Put self-ref from index now that it has been flushed. */ + relay_index_put(index); + } + return ret; } /* - * Destroy every relay index with the given stream id as part of the key. + * Close every relay index within a given stream, without flushing + * them. */ -void relay_index_destroy_by_stream_id(uint64_t stream_id) +void relay_index_close_all(struct relay_stream *stream) { struct lttng_ht_iter iter; struct relay_index *index; rcu_read_lock(); - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { - if (index->index_n.key.key1 == stream_id) { - relay_index_delete(index); - relay_index_free_safe(index); - } + cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, + index, index_n.node) { + /* Put self-ref from index. */ + relay_index_put(index); } rcu_read_unlock(); } diff --git a/src/bin/lttng-relayd/index.h b/src/bin/lttng-relayd/index.h index e7f9cdbd0..e882ed97d 100644 --- a/src/bin/lttng-relayd/index.h +++ b/src/bin/lttng-relayd/index.h @@ -1,6 +1,10 @@ +#ifndef _RELAY_INDEX_H +#define _RELAY_INDEX_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,42 +20,56 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _RELAY_INDEX_H -#define _RELAY_INDEX_H - #include #include #include #include +#include "stream-fd.h" + +struct relay_stream; + struct relay_index { - /* FD on which to write the index data. */ - int fd; /* - * When destroying this object, this fd is checked and if valid, close it - * so this is basically a lazy close of the previous fd corresponding to - * the same stream id. This is used for the rotate file feature. + * index lock nests inside stream lock. */ - int to_close_fd; + pthread_mutex_t reflock; /* Protects refcounting. */ + struct urcu_ref ref; /* Reference from getters. */ + struct relay_stream *stream; /* Back ref to stream */ + + pthread_mutex_t lock; + /* + * FD on which to write the index data. May differ from + * stream->index_fd due to tracefile rotation. + */ + struct stream_fd *index_fd; /* Index packet data. This is the data that is written on disk. */ struct ctf_packet_index index_data; - /* key1 = stream_id, key2 = net_seq_num */ - struct lttng_ht_node_two_u64 index_n; - struct rcu_head rcu_node; - pthread_mutex_t mutex; + bool has_index_data; + bool flushed; + bool in_hash_table; + + /* + * Node within indexes_ht that corresponds to this struct + * relay_index. Indexed by net_seq_num, which is unique for this + * index across the stream. + */ + struct lttng_ht_node_u64 index_n; + struct rcu_head rcu_node; /* For call_rcu teardown. */ }; -struct relay_index *relay_index_create(uint64_t stream_id, +struct relay_index *relay_index_get_by_id_or_create(struct relay_stream *stream, uint64_t net_seq_num); -struct relay_index *relay_index_find(uint64_t stream_id, uint64_t net_seq_num); -void relay_index_add(struct relay_index *index, struct relay_index **_index); -int relay_index_write(int fd, struct relay_index *index); -void relay_index_free(struct relay_index *index); -void relay_index_free_safe(struct relay_index *index); -void relay_index_delete(struct relay_index *index); -void relay_index_destroy_by_stream_id(uint64_t stream_id); +void relay_index_put(struct relay_index *index); +int relay_index_set_fd(struct relay_index *index, struct stream_fd *index_fd, + uint64_t data_offset); +int relay_index_set_data(struct relay_index *index, + const struct ctf_packet_index *data); +int relay_index_try_flush(struct relay_index *index); + +void relay_index_close_all(struct relay_stream *stream); #endif /* _RELAY_INDEX_H */ diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 562a7fa52..cacad2916 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -36,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -65,6 +67,9 @@ #include "session.h" #include "ctf-trace.h" #include "connection.h" +#include "viewer-session.h" + +#define SESSION_BUF_DEFAULT_COUNT 16 static struct lttng_uri *live_uri; @@ -90,6 +95,8 @@ static pthread_t live_worker_thread; static struct relay_conn_queue viewer_conn_queue; static uint64_t last_relay_viewer_session_id; +static pthread_mutex_t last_relay_viewer_session_id_lock = + PTHREAD_MUTEX_INITIALIZER; /* * Cleanup the daemon @@ -114,9 +121,6 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) { ssize_t ret; - assert(sock); - assert(buf); - ret = sock->ops->recvmsg(sock, buf, size, 0); if (ret < 0 || ret != size) { if (ret == 0) { @@ -143,9 +147,6 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) { ssize_t ret; - assert(sock); - assert(buf); - ret = sock->ops->sendmsg(sock, buf, size, 0); if (ret < 0) { ERR("Relayd failed to send response."); @@ -171,17 +172,22 @@ int check_new_streams(struct relay_connection *conn) if (!conn->viewer_session) { goto end; } - cds_list_for_each_entry(session, - &conn->viewer_session->sessions_head, - viewer_session_list) { + rcu_read_lock(); + cds_list_for_each_entry_rcu(session, + &conn->viewer_session->session_list, + viewer_session_node) { + if (!session_get(session)) { + continue; + } current_val = uatomic_cmpxchg(&session->new_streams, 1, 0); ret = current_val; + session_put(session); if (ret == 1) { goto end; } } - end: + rcu_read_unlock(); return ret; } @@ -200,8 +206,6 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, struct lttng_ht_iter iter; struct relay_viewer_stream *vstream; - assert(session); - rcu_read_lock(); cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, @@ -210,30 +214,39 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, health_code_update(); + if (!viewer_stream_get(vstream)) { + continue; + } + + pthread_mutex_lock(&vstream->stream->lock); /* Ignore if not the same session. */ - if (vstream->session_id != session->id || + if (vstream->stream->trace->session->id != session->id || (!ignore_sent_flag && vstream->sent_flag)) { + pthread_mutex_unlock(&vstream->stream->lock); + viewer_stream_put(vstream); continue; } - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - vstream->path_name); - assert(ctf_trace); - - send_stream.id = htobe64(vstream->stream_handle); + ctf_trace = vstream->stream->trace; + send_stream.id = htobe64(vstream->stream->stream_handle); send_stream.ctf_trace_id = htobe64(ctf_trace->id); - send_stream.metadata_flag = htobe32(vstream->metadata_flag); + send_stream.metadata_flag = htobe32( + vstream->stream->is_metadata); strncpy(send_stream.path_name, vstream->path_name, sizeof(send_stream.path_name)); strncpy(send_stream.channel_name, vstream->channel_name, sizeof(send_stream.channel_name)); - DBG("Sending stream %" PRIu64 " to viewer", vstream->stream_handle); + DBG("Sending stream %" PRIu64 " to viewer", + vstream->stream->stream_handle); + vstream->sent_flag = 1; + pthread_mutex_unlock(&vstream->stream->lock); + ret = send_response(sock, &send_stream, sizeof(send_stream)); + viewer_stream_put(vstream); if (ret < 0) { goto end_unlock; } - vstream->sent_flag = 1; } ret = 0; @@ -263,17 +276,14 @@ int make_viewer_streams(struct relay_session *session, assert(session); /* - * This is to make sure we create viewer streams for a full received - * channel. For instance, if we have 8 streams for a channel that are - * concurrently being flagged ready, we can end up creating just a subset - * of the 8 streams (the ones that are flagged). This lock avoids this - * limbo state. + * Hold the session lock to ensure that we see either none or + * all initial streams for a session, but no intermediate state. */ - pthread_mutex_lock(&session->viewer_ready_lock); + pthread_mutex_lock(&session->lock); /* - * Create viewer streams for relay streams that are ready to be used for a - * the given session id only. + * Create viewer streams for relay streams that are ready to be + * used for a the given session id only. */ rcu_read_lock(); cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, @@ -282,47 +292,59 @@ int make_viewer_streams(struct relay_session *session, health_code_update(); - if (ctf_trace->invalid_flag) { + if (!ctf_trace_get(ctf_trace)) { continue; } - cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) { + cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) { struct relay_viewer_stream *vstream; - if (!stream->viewer_ready) { + if (!stream_get(stream)) { continue; } - - vstream = viewer_stream_find_by_id(stream->stream_handle); + /* + * stream published is protected by the session + * lock. + */ + if (!stream->published) { + goto next; + } + vstream = viewer_stream_get_by_id(stream->stream_handle); if (!vstream) { - vstream = viewer_stream_create(stream, seek_t, ctf_trace); + vstream = viewer_stream_create(stream, seek_t); if (!vstream) { ret = -1; + ctf_trace_put(ctf_trace); + stream_put(stream); goto error_unlock; } - /* Acquire reference to ctf_trace. */ - ctf_trace_get_ref(ctf_trace); if (nb_created) { /* Update number of created stream counter. */ (*nb_created)++; } - } else if (!vstream->sent_flag && nb_unsent) { - /* Update number of unsent stream counter. */ - (*nb_unsent)++; + } else { + if (!vstream->sent_flag && nb_unsent) { + /* Update number of unsent stream counter. */ + (*nb_unsent)++; + } + viewer_stream_put(vstream); } /* Update number of total stream counter. */ if (nb_total) { (*nb_total)++; } + next: + stream_put(stream); } + ctf_trace_put(ctf_trace); } ret = 0; error_unlock: rcu_read_unlock(); - pthread_mutex_unlock(&session->viewer_ready_lock); + pthread_mutex_unlock(&session->lock); return ret; } @@ -505,22 +527,18 @@ restart: goto error; } else if (revents & LPOLLIN) { /* - * Get allocated in this thread, enqueued to a global queue, - * dequeued and freed in the worker thread. + * A new connection is requested, therefore a + * viewer connection is allocated in this + * thread, enqueued to a global queue and + * dequeued (and freed) in the worker thread. */ int val = 1; struct relay_connection *new_conn; struct lttcomm_sock *newsock; - new_conn = connection_create(); - if (!new_conn) { - goto error; - } - newsock = live_control_sock->ops->accept(live_control_sock); if (!newsock) { PERROR("accepting control sock"); - connection_free(new_conn); goto error; } DBG("Relay viewer connection accepted socket %d", newsock->fd); @@ -530,18 +548,24 @@ restart: if (ret < 0) { PERROR("setsockopt inet"); lttcomm_destroy_sock(newsock); - connection_free(new_conn); goto error; } - new_conn->sock = newsock; + new_conn = connection_create(newsock, RELAY_CONNECTION_UNKNOWN); + if (!new_conn) { + lttcomm_destroy_sock(newsock); + goto error; + } + /* Ownership assumed by the connection. */ + newsock = NULL; /* Enqueue request for the dispatcher thread. */ cds_wfcq_enqueue(&viewer_conn_queue.head, &viewer_conn_queue.tail, &new_conn->qnode); /* - * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfcq_enqueue. + * Wake the dispatch queue futex. + * Implicit memory barrier with the + * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); } @@ -618,14 +642,15 @@ void *thread_dispatcher(void *data) conn->sock->fd); /* - * Inform worker thread of the new request. This call is blocking - * so we can be assured that the data will be read at some point in - * time or wait to the end of the world :) + * Inform worker thread of the new request. This + * call is blocking so we can be assured that + * the data will be read at some point in time + * or wait to the end of the world :) */ ret = lttng_write(live_conn_pipe[1], &conn, sizeof(conn)); if (ret < 0) { PERROR("write conn pipe"); - connection_destroy(conn); + connection_put(conn); goto error; } } while (node != NULL); @@ -664,8 +689,6 @@ int viewer_connect(struct relay_connection *conn) int ret; struct lttng_viewer_connect reply, msg; - assert(conn); - conn->version_check_done = 1; health_code_update(); @@ -713,10 +736,13 @@ int viewer_connect(struct relay_connection *conn) reply.minor = htobe32(reply.minor); if (conn->type == RELAY_VIEWER_COMMAND) { /* - * Increment outside of htobe64 macro, because can be used more than once - * within the macro, and thus the operation may be undefined. + * Increment outside of htobe64 macro, because the argument can + * be used more than once within the macro, and thus the + * operation may be undefined. */ + pthread_mutex_lock(&last_relay_viewer_session_id_lock); last_relay_viewer_session_id++; + pthread_mutex_unlock(&last_relay_viewer_session_id_lock); reply.viewer_session_id = htobe64(last_relay_viewer_session_id); } @@ -738,6 +764,9 @@ end: /* * Send the viewer the list of current sessions. + * We need to create a copy of the hash table content because otherwise + * we cannot assume the number of entries stays the same between getting + * the number of HT elements and iteration over the HT. * * Return 0 on success or else a negative value. */ @@ -746,156 +775,89 @@ int viewer_list_sessions(struct relay_connection *conn) { int ret; struct lttng_viewer_list_sessions session_list; - unsigned long count; - long approx_before, approx_after; struct lttng_ht_iter iter; - struct lttng_viewer_session send_session; struct relay_session *session; + struct lttng_viewer_session *send_session_buf = NULL; + uint32_t buf_count = SESSION_BUF_DEFAULT_COUNT; + uint32_t count = 0; DBG("List sessions received"); - rcu_read_lock(); - cds_lfht_count_nodes(conn->sessions_ht->ht, &approx_before, &count, - &approx_after); - session_list.sessions_count = htobe32(count); - - health_code_update(); - - ret = send_response(conn->sock, &session_list, sizeof(session_list)); - if (ret < 0) { - goto end_unlock; + send_session_buf = zmalloc(SESSION_BUF_DEFAULT_COUNT * sizeof(*send_session_buf)); + if (!send_session_buf) { + return -1; } - health_code_update(); - - cds_lfht_for_each_entry(conn->sessions_ht->ht, &iter.iter, session, + rcu_read_lock(); + cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session, session_n.node) { - health_code_update(); - - strncpy(send_session.session_name, session->session_name, - sizeof(send_session.session_name)); - strncpy(send_session.hostname, session->hostname, - sizeof(send_session.hostname)); - send_session.id = htobe64(session->id); - send_session.live_timer = htobe32(session->live_timer); - send_session.clients = htobe32(session->viewer_refcount); - send_session.streams = htobe32(session->stream_count); + struct lttng_viewer_session *send_session; health_code_update(); - ret = send_response(conn->sock, &send_session, sizeof(send_session)); - if (ret < 0) { - goto end_unlock; - } - } - health_code_update(); + if (count >= buf_count) { + struct lttng_viewer_session *newbuf; + uint32_t new_buf_count = buf_count << 1; - ret = 0; -end_unlock: - rcu_read_unlock(); - return ret; -} - -/* - * Check if a connection is attached to a session. - * Return 1 if attached, 0 if not attached, a negative value on error. - */ -static -int session_attached(struct relay_connection *conn, uint64_t session_id) -{ - struct relay_session *session; - int found = 0; - - if (!conn->viewer_session) { - goto end; - } - cds_list_for_each_entry(session, - &conn->viewer_session->sessions_head, - viewer_session_list) { - if (session->id == session_id) { - found = 1; - goto end; + newbuf = realloc(send_session_buf, + new_buf_count * sizeof(*send_session_buf)); + if (!newbuf) { + ret = -1; + rcu_read_unlock(); + goto end_free; + } + send_session_buf = newbuf; + buf_count = new_buf_count; } - } - -end: - return found; -} - -/* - * Delete all streams for a specific session ID. - */ -static void destroy_viewer_streams_by_session(struct relay_session *session) -{ - struct relay_viewer_stream *stream; - struct lttng_ht_iter iter; - - assert(session); - - rcu_read_lock(); - cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream, - stream_n.node) { - struct ctf_trace *ctf_trace; - - health_code_update(); - if (stream->session_id != session->id) { - continue; + send_session = &send_session_buf[count]; + strncpy(send_session->session_name, session->session_name, + sizeof(send_session->session_name)); + strncpy(send_session->hostname, session->hostname, + sizeof(send_session->hostname)); + send_session->id = htobe64(session->id); + send_session->live_timer = htobe32(session->live_timer); + if (session->viewer_attached) { + send_session->clients = htobe32(1); + } else { + send_session->clients = htobe32(0); } + send_session->streams = htobe32(session->stream_count); + count++; + } + rcu_read_unlock(); - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - stream->path_name); - assert(ctf_trace); - - viewer_stream_delete(stream); + session_list.sessions_count = htobe32(count); - if (stream->metadata_flag) { - ctf_trace->metadata_sent = 0; - ctf_trace->viewer_metadata_stream = NULL; - } + health_code_update(); - viewer_stream_destroy(ctf_trace, stream); + ret = send_response(conn->sock, &session_list, sizeof(session_list)); + if (ret < 0) { + goto end_free; } - rcu_read_unlock(); -} - -static void try_destroy_streams(struct relay_session *session) -{ - struct ctf_trace *ctf_trace; - struct lttng_ht_iter iter; - assert(session); + health_code_update(); - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, - node.node) { - /* Attempt to destroy the ctf trace of that session. */ - ctf_trace_try_destroy(session, ctf_trace); + ret = send_response(conn->sock, send_session_buf, + count * sizeof(*send_session_buf)); + if (ret < 0) { + goto end_free; } -} + health_code_update(); -/* - * Cleanup a session. - */ -static void cleanup_session(struct relay_connection *conn, - struct relay_session *session) -{ - /* - * Very important that this is done before destroying the session so we - * can put back every viewer stream reference from the ctf_trace. - */ - destroy_viewer_streams_by_session(session); - try_destroy_streams(session); - cds_list_del(&session->viewer_session_list); - session_viewer_try_destroy(conn->sessions_ht, session); + ret = 0; +end_free: + free(send_session_buf); + return ret; } /* - * Send the viewer the list of current sessions. + * Send the viewer the list of current streams. */ static int viewer_get_new_streams(struct relay_connection *conn) { int ret, send_streams = 0; - uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0; + uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0; struct lttng_viewer_new_streams_request request; struct lttng_viewer_new_streams_response response; struct relay_session *session; @@ -918,15 +880,14 @@ int viewer_get_new_streams(struct relay_connection *conn) memset(&response, 0, sizeof(response)); - rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, session_id); + session = session_get_by_id(session_id); if (!session) { DBG("Relay session %" PRIu64 " not found", session_id); response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; } - if (!session_attached(conn, session_id)) { + if (!viewer_session_is_attached(conn->viewer_session, session)) { send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR); goto send_reply; @@ -935,10 +896,10 @@ int viewer_get_new_streams(struct relay_connection *conn) send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); - ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent, + ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent, &nb_created); if (ret < 0) { - goto end_unlock; + goto end_put_session; } /* Only send back the newly created streams with the unsent ones. */ nb_streams = nb_created + nb_unsent; @@ -949,15 +910,9 @@ int viewer_get_new_streams(struct relay_connection *conn) * it means that the viewer has already received the whole trace * for this session and should now close it. */ - if (nb_streams == 0 && session->close_flag) { + if (nb_total == 0 && session->connection_closed) { send_streams = 0; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP); - /* - * Remove the session from the attached list of the connection - * and try to destroy it. - */ - cds_list_del(&session->viewer_session_list); - cleanup_session(conn, session); goto send_reply; } @@ -965,30 +920,33 @@ send_reply: health_code_update(); ret = send_response(conn->sock, &response, sizeof(response)); if (ret < 0) { - goto end_unlock; + goto end_put_session; } health_code_update(); /* - * Unknown or empty session, just return gracefully, the viewer knows what - * is happening. + * Unknown or empty session, just return gracefully, the viewer + * knows what is happening. */ if (!send_streams || !nb_streams) { ret = 0; - goto end_unlock; + goto end_put_session; } /* - * Send stream and *DON'T* ignore the sent flag so every viewer streams - * that were not sent from that point will be sent to the viewer. + * Send stream and *DON'T* ignore the sent flag so every viewer + * streams that were not sent from that point will be sent to + * the viewer. */ ret = send_viewer_streams(conn->sock, session, 0); if (ret < 0) { - goto end_unlock; + goto end_put_session; } -end_unlock: - rcu_read_unlock(); +end_put_session: + if (session) { + session_put(session); + } error: return ret; } @@ -1005,7 +963,7 @@ int viewer_attach_session(struct relay_connection *conn) enum lttng_viewer_seek seek_type; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; - struct relay_session *session; + struct relay_session *session = NULL; assert(conn); @@ -1027,37 +985,34 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - rcu_read_lock(); - session = session_find_by_id(conn->sessions_ht, - be64toh(request.session_id)); + session = session_get_by_id(be64toh(request.session_id)); if (!session) { DBG("Relay session %" PRIu64 " not found", be64toh(request.session_id)); response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK); goto send_reply; } - session_viewer_attach(session); - DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id)); + DBG("Attach session ID %" PRIu64 " received", + be64toh(request.session_id)); - if (uatomic_read(&session->viewer_refcount) > 1) { - DBG("Already a viewer attached"); - response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY); - session_viewer_detach(session); - goto send_reply; - } else if (session->live_timer == 0) { + if (session->live_timer == 0) { DBG("Not live session"); response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE); goto send_reply; - } else { - send_streams = 1; - response.status = htobe32(LTTNG_VIEWER_ATTACH_OK); - cds_list_add(&session->viewer_session_list, - &conn->viewer_session->sessions_head); + } + + send_streams = 1; + ret = viewer_session_attach(conn->viewer_session, session); + if (ret) { + DBG("Already a viewer attached"); + response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY); + goto send_reply; } switch (be32toh(request.seek)) { case LTTNG_VIEWER_SEEK_BEGINNING: case LTTNG_VIEWER_SEEK_LAST: + response.status = htobe32(LTTNG_VIEWER_ATTACH_OK); seek_type = be32toh(request.seek); break; default: @@ -1069,7 +1024,7 @@ int viewer_attach_session(struct relay_connection *conn) ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL); if (ret < 0) { - goto end_unlock; + goto end_put_session; } response.streams_count = htobe32(nb_streams); @@ -1077,27 +1032,29 @@ send_reply: health_code_update(); ret = send_response(conn->sock, &response, sizeof(response)); if (ret < 0) { - goto end_unlock; + goto end_put_session; } health_code_update(); /* - * Unknown or empty session, just return gracefully, the viewer knows what - * is happening. + * Unknown or empty session, just return gracefully, the viewer + * knows what is happening. */ if (!send_streams || !nb_streams) { ret = 0; - goto end_unlock; + goto end_put_session; } /* Send stream and ignore the sent flag. */ ret = send_viewer_streams(conn->sock, session, 1); if (ret < 0) { - goto end_unlock; + goto end_put_session; } -end_unlock: - rcu_read_unlock(); +end_put_session: + if (session) { + session_put(session); + } error: return ret; } @@ -1105,38 +1062,42 @@ error: /* * Open the index file if needed for the given vstream. * - * If an index file is successfully opened, the index_read_fd of the stream is - * set with it. + * If an index file is successfully opened, the vstream index_fd set with + * it. * * Return 0 on success, a negative value on error (-ENOENT if not ready yet). + * + * Called with rstream lock held. */ static int try_open_index(struct relay_viewer_stream *vstream, struct relay_stream *rstream) { int ret = 0; - assert(vstream); - assert(rstream); - - if (vstream->index_read_fd >= 0) { + if (vstream->index_fd) { goto end; } /* - * First time, we open the index file and at least one index is ready. The - * race between the read and write of the total_index_received is - * acceptable here since the client will be notified to simply come back - * and get the next index. + * First time, we open the index file and at least one index is ready. */ - if (rstream->total_index_received <= 0) { + if (rstream->total_index_received == 0) { ret = -ENOENT; goto end; } ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + vstream->stream->tracefile_count, + vstream->current_tracefile_id); if (ret >= 0) { - vstream->index_read_fd = ret; - ret = 0; + vstream->index_fd = stream_fd_create(ret); + if (!vstream->index_fd) { + if (close(ret)) { + PERROR("close"); + } + ret = -1; + } else { + ret = 0; + } goto end; } @@ -1145,13 +1106,15 @@ end: } /* - * Check the status of the index for the given stream. This function updates - * the index structure if needed and can destroy the vstream also for the HUP - * situation. + * Check the status of the index for the given stream. This function + * updates the index structure if needed and can put (close) the vstream + * in the HUP situation. + * + * Return 0 means that we can proceed with the index. A value of 1 means + * that the index has been updated and is ready to be sent to the + * client. A negative value indicates an error that can't be handled. * - * Return 0 means that we can proceed with the index. A value of 1 means that - * the index has been updated and is ready to be send to the client. A negative - * value indicates an error that can't be handled. + * Called with rstream lock held. */ static int check_index_status(struct relay_viewer_stream *vstream, struct relay_stream *rstream, struct ctf_trace *trace, @@ -1159,68 +1122,101 @@ static int check_index_status(struct relay_viewer_stream *vstream, { int ret; - assert(vstream); - assert(rstream); - assert(index); - assert(trace); - - if (!rstream->close_flag) { - /* Rotate on abort (overwrite). */ - if (vstream->abort_flag) { - DBG("Viewer stream %" PRIu64 " rotate because of overwrite", - vstream->stream_handle); - ret = viewer_stream_rotate(vstream, rstream); + if (trace->session->connection_closed + && rstream->total_index_received + == vstream->last_sent_index) { + /* Last index sent and session connection is closed. */ + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto hup; + } else if (rstream->beacon_ts_end != -1ULL && + rstream->total_index_received + == vstream->last_sent_index) { + /* + * We've received a synchronization beacon and the last index + * available has been sent, the index for now is inactive. + * + * In this case, we have received a beacon which allows us to + * inform the client of a time interval during which we can + * guarantee that there are no events to read (and never will + * be). + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); + index->timestamp_end = htobe64(rstream->beacon_ts_end); + index->stream_id = htobe64(rstream->ctf_stream_id); + goto index_ready; + } else if (rstream->total_index_received <= vstream->last_sent_index) { + /* + * This actually checks the case where recv == last_sent. + * In this case, we have not received a beacon. Therefore, we + * can only ask the client to retry later. + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto index_ready; + } else if (!viewer_stream_is_tracefile_seq_readable(vstream, + vstream->current_tracefile_seq)) { + /* + * The producer has overwritten our current file. We + * need to rotate. + */ + DBG("Viewer stream %" PRIu64 " rotation due to overwrite", + vstream->stream->stream_handle); + ret = viewer_stream_rotate(vstream); + if (ret < 0) { + goto end; + } else if (ret == 1) { + /* EOF across entire stream. */ + index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); + goto hup; + } + assert(viewer_stream_is_tracefile_seq_readable(vstream, + vstream->current_tracefile_seq)); + /* ret == 0 means successful so we continue. */ + ret = 0; + } else { + ssize_t read_ret; + char tmp[1]; + + /* + * Use EOF on current index file to find out when we + * need to rotate. + */ + read_ret = lttng_read(vstream->index_fd->fd, tmp, 1); + if (read_ret == 1) { + off_t seek_ret; + + /* There is still data to read. Rewind position. */ + seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR); + if (seek_ret < 0) { + ret = -1; + goto end; + } + ret = 0; + } else if (read_ret == 0) { + /* EOF. We need to rotate. */ + DBG("Viewer stream %" PRIu64 " rotation due to EOF", + vstream->stream->stream_handle); + ret = viewer_stream_rotate(vstream); if (ret < 0) { - goto error; + goto end; } else if (ret == 1) { - /* EOF */ + /* EOF across entire stream. */ index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); goto hup; } + assert(viewer_stream_is_tracefile_seq_readable(vstream, + vstream->current_tracefile_seq)); /* ret == 0 means successful so we continue. */ + ret = 0; + } else { + /* Error reading index. */ + ret = -1; } - - /* Check if we are in the same trace file at this point. */ - if (rstream->tracefile_count_current == vstream->tracefile_count_current) { - if (rstream->beacon_ts_end != -1ULL && - vstream->last_sent_index == rstream->total_index_received) { - /* - * We've received a synchronization beacon and the last index - * available has been sent, the index for now is inactive. - */ - index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); - index->timestamp_end = htobe64(rstream->beacon_ts_end); - index->stream_id = htobe64(rstream->ctf_stream_id); - goto index_ready; - } else if (rstream->total_index_received <= vstream->last_sent_index - && !vstream->close_write_flag) { - /* - * Reader and writer are working in the same tracefile, so we care - * about the number of index received and sent. Otherwise, we read - * up to EOF. - */ - index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - goto index_ready; - } - } - /* Nothing to do with the index, continue with it. */ - ret = 0; - } else if (rstream->close_flag && vstream->close_write_flag && - vstream->total_index_received == vstream->last_sent_index) { - /* Last index sent and current tracefile closed in write */ - index->status = htobe32(LTTNG_VIEWER_INDEX_HUP); - goto hup; - } else { - vstream->close_write_flag = 1; - ret = 0; } - -error: +end: return ret; hup: - viewer_stream_delete(vstream); - viewer_stream_destroy(trace, vstream); + viewer_stream_put(vstream); index_ready: return 1; } @@ -1238,15 +1234,16 @@ int viewer_get_next_index(struct relay_connection *conn) struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; - struct relay_viewer_stream *vstream; - struct relay_stream *rstream; - struct ctf_trace *ctf_trace; - struct relay_session *session; + struct relay_viewer_stream *vstream = NULL; + struct relay_stream *rstream = NULL; + struct ctf_trace *ctf_trace = NULL; + struct relay_viewer_stream *metadata_viewer_stream = NULL; assert(conn); DBG("Viewer get next index"); + memset(&viewer_index, 0, sizeof(viewer_index)); health_code_update(); ret = recv_request(conn->sock, &request_index, sizeof(request_index)); @@ -1255,42 +1252,38 @@ int viewer_get_next_index(struct relay_connection *conn) } health_code_update(); - rcu_read_lock(); - vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id)); + vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id)); if (!vstream) { ret = -1; - goto end_unlock; + goto end; } - session = session_find_by_id(conn->sessions_ht, vstream->session_id); - if (!session) { - ret = -1; - goto end_unlock; - } + /* Use back. ref. Protected by refcounts. */ + rstream = vstream->stream; + ctf_trace = rstream->trace; - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name); - assert(ctf_trace); + /* metadata_viewer_stream may be NULL. */ + metadata_viewer_stream = + ctf_trace_get_viewer_metadata_stream(ctf_trace); - memset(&viewer_index, 0, sizeof(viewer_index)); + pthread_mutex_lock(&rstream->lock); /* * The viewer should not ask for index on metadata stream. */ - if (vstream->metadata_flag) { + if (rstream->is_metadata) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); goto send_reply; } - rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); - assert(rstream); - /* Try to open an index if one is needed for that stream. */ ret = try_open_index(vstream, rstream); if (ret < 0) { if (ret == -ENOENT) { /* - * The index is created only when the first data packet arrives, it - * might not be ready at the beginning of the session + * The index is created only when the first data + * packet arrives, it might not be ready at the + * beginning of the session */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); } else { @@ -1300,80 +1293,70 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (ret < 0) { - goto end_unlock; + goto error_put; } else if (ret == 1) { /* - * This means the viewer index data structure has been populated by the - * check call thus we now send back the reply to the client. + * We have no index to send and check_index_status has populated + * viewer_index's status. */ goto send_reply; } - /* At this point, ret MUST be 0 thus we continue with the get. */ + /* At this point, ret is 0 thus we will be able to read the index. */ assert(!ret); - if (!ctf_trace->metadata_received || - ctf_trace->metadata_received > ctf_trace->metadata_sent) { - viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; + /* + * vstream->stream_fd may be NULL if it has been closed by + * tracefile rotation, or if we are at the beginning of the + * stream. We open the data stream file here to protect against + * overwrite caused by tracefile rotation (in association with + * unlink performed before overwrite). + */ + if (!vstream->stream_fd) { + char fullpath[PATH_MAX]; + + if (vstream->stream->tracefile_count > 0) { + ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, + vstream->path_name, + vstream->channel_name, + vstream->current_tracefile_id); + } else { + ret = snprintf(fullpath, PATH_MAX, "%s/%s", + vstream->path_name, + vstream->channel_name); + } + if (ret < 0) { + goto error_put; + } + ret = open(fullpath, O_RDONLY); + if (ret < 0) { + PERROR("Relay opening trace file"); + goto error_put; + } + vstream->stream_fd = stream_fd_create(ret); + if (!vstream->stream_fd) { + if (close(ret)) { + PERROR("close"); + } + goto error_put; + } } ret = check_new_streams(conn); if (ret < 0) { - goto end_unlock; + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); + goto send_reply; } else if (ret == 1) { viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - pthread_mutex_lock(&vstream->overwrite_lock); - if (vstream->abort_flag) { - /* The file is being overwritten by the writer, we cannot use it. */ - pthread_mutex_unlock(&vstream->overwrite_lock); - ret = viewer_stream_rotate(vstream, rstream); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - if (ret < 0) { - goto end_unlock; - } else if (ret == 1) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); - } else { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - } - goto send_reply; - } - - read_ret = lttng_read(vstream->index_read_fd, &packet_index, + read_ret = lttng_read(vstream->index_fd->fd, &packet_index, sizeof(packet_index)); - pthread_mutex_unlock(&vstream->overwrite_lock); - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - if (read_ret < 0) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); - goto send_reply; - } else if (read_ret < sizeof(packet_index)) { - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); - if (vstream->close_write_flag) { - ret = viewer_stream_rotate(vstream, rstream); - if (ret < 0) { - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - goto end_unlock; - } else if (ret == 1) { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); - viewer_stream_delete(vstream); - viewer_stream_destroy(ctf_trace, vstream); - } else { - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); - } - } else { - ERR("Relay reading index file %d", vstream->index_read_fd); - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); - } - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); + if (read_ret < sizeof(packet_index)) { + ERR("Relay reading index file %d returned %zd", + vstream->index_fd->fd, read_ret); + viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } else { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK); @@ -1383,6 +1366,9 @@ int viewer_get_next_index(struct relay_connection *conn) /* * Indexes are stored in big endian, no need to switch before sending. */ + DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64, + rstream->stream_handle, + be64toh(packet_index.offset)); viewer_index.offset = packet_index.offset; viewer_index.packet_size = packet_index.packet_size; viewer_index.content_size = packet_index.content_size; @@ -1392,22 +1378,49 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.stream_id = packet_index.stream_id; send_reply: + pthread_mutex_unlock(&rstream->lock); + + if (metadata_viewer_stream) { + pthread_mutex_lock(&metadata_viewer_stream->stream->lock); + DBG("get next index metadata check: recv %" PRIu64 + " sent %" PRIu64, + metadata_viewer_stream->stream->metadata_received, + metadata_viewer_stream->metadata_sent); + if (!metadata_viewer_stream->stream->metadata_received || + metadata_viewer_stream->stream->metadata_received > + metadata_viewer_stream->metadata_sent) { + viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; + } + pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); + } + viewer_index.flags = htobe32(viewer_index.flags); health_code_update(); ret = send_response(conn->sock, &viewer_index, sizeof(viewer_index)); if (ret < 0) { - goto end_unlock; + goto end; } health_code_update(); DBG("Index %" PRIu64 " for stream %" PRIu64 " sent", - vstream->last_sent_index, vstream->stream_handle); - -end_unlock: - rcu_read_unlock(); - + vstream->last_sent_index, + vstream->stream->stream_handle); end: + if (metadata_viewer_stream) { + viewer_stream_put(metadata_viewer_stream); + } + if (vstream) { + viewer_stream_put(vstream); + } + return ret; + +error_put: + pthread_mutex_unlock(&rstream->lock); + if (metadata_viewer_stream) { + viewer_stream_put(metadata_viewer_stream); + } + viewer_stream_put(vstream); return ret; } @@ -1425,17 +1438,16 @@ int viewer_get_packet(struct relay_connection *conn) ssize_t read_len; struct lttng_viewer_get_packet get_packet_info; struct lttng_viewer_trace_packet reply; - struct relay_viewer_stream *stream; - struct relay_session *session; + struct relay_viewer_stream *vstream = NULL; struct ctf_trace *ctf_trace; - - assert(conn); + struct relay_viewer_stream *metadata_viewer_stream = NULL; DBG2("Relay get data packet"); health_code_update(); - ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info)); + ret = recv_request(conn->sock, &get_packet_info, + sizeof(get_packet_info)); if (ret < 0) { goto end; } @@ -1444,59 +1456,57 @@ int viewer_get_packet(struct relay_connection *conn) /* From this point on, the error label can be reached. */ memset(&reply, 0, sizeof(reply)); - rcu_read_lock(); - stream = viewer_stream_find_by_id(be64toh(get_packet_info.stream_id)); - if (!stream) { + vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id)); + if (!vstream) { goto error; } - session = session_find_by_id(conn->sessions_ht, stream->session_id); - if (!session) { - ret = -1; - goto error; - } + ctf_trace = vstream->stream->trace; - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - stream->path_name); - assert(ctf_trace); + /* metadata_viewer_stream may be NULL. */ + metadata_viewer_stream = + ctf_trace_get_viewer_metadata_stream(ctf_trace); - /* - * First time we read this stream, we need open the tracefile, we should - * only arrive here if an index has already been sent to the viewer, so the - * tracefile must exist, if it does not it is a fatal error. - */ - if (stream->read_fd < 0) { - char fullpath[PATH_MAX]; + if (metadata_viewer_stream) { + bool get_packet_err = false; - if (stream->tracefile_count > 0) { - ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, stream->path_name, - stream->channel_name, - stream->tracefile_count_current); - } else { - ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name, - stream->channel_name); - } - if (ret < 0) { - goto error; + pthread_mutex_lock(&metadata_viewer_stream->stream->lock); + DBG("get packet metadata check: recv %" PRIu64 " sent %" PRIu64, + metadata_viewer_stream->stream->metadata_received, + metadata_viewer_stream->metadata_sent); + if (!metadata_viewer_stream->stream->metadata_received || + metadata_viewer_stream->stream->metadata_received > + metadata_viewer_stream->metadata_sent) { + /* + * We prevent the client from reading a data stream as + * long as there is metadata left to consume. This + * ensures that the client won't receive data of which + * it can't make sense. + */ + get_packet_err = true; } - ret = open(fullpath, O_RDONLY); - if (ret < 0) { - PERROR("Relay opening trace file"); - goto error; + pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); + viewer_stream_put(metadata_viewer_stream); + if (get_packet_err) { + reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); + reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; + goto send_reply_nolock; } - stream->read_fd = ret; - } - - if (!ctf_trace->metadata_received || - ctf_trace->metadata_received > ctf_trace->metadata_sent) { - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); - reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; - goto send_reply; } + pthread_mutex_lock(&vstream->stream->lock); + /* + * The vstream->stream_fd used here has been opened by + * get_next_index. It is opened there because this is what + * allows us to grab a reference to the file with stream lock + * held, thus protecting us against overwrite caused by + * tracefile rotation. Since tracefile rotation unlinks the old + * data file, we are ensured that we won't have our data + * overwritten under us. + */ ret = check_new_streams(conn); if (ret < 0) { - goto end_unlock; + goto end_free; } else if (ret == 1) { reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; @@ -1510,34 +1520,19 @@ int viewer_get_packet(struct relay_connection *conn) goto error; } - ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET); + ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), + SEEK_SET); if (ret < 0) { - /* - * If the read fd was closed by the streaming side, the - * abort_flag will be set to 1, otherwise it is an error. - */ - if (stream->abort_flag == 0) { - PERROR("lseek"); - goto error; - } - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF); - goto send_reply; + PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd, + be64toh(get_packet_info.offset)); + goto error; } - read_len = lttng_read(stream->read_fd, data, len); + read_len = lttng_read(vstream->stream_fd->fd, data, len); if (read_len < len) { - /* - * If the read fd was closed by the streaming side, the - * abort_flag will be set to 1, otherwise it is an error. - */ - if (stream->abort_flag == 0) { - PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, - stream->read_fd, - be64toh(get_packet_info.offset)); - goto error; - } else { - reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF); - goto send_reply; - } + PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, + vstream->stream_fd->fd, + be64toh(get_packet_info.offset)); + goto error; } reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); reply.len = htobe32(len); @@ -1548,13 +1543,17 @@ error: reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); send_reply: + if (vstream) { + pthread_mutex_unlock(&vstream->stream->lock); + } +send_reply_nolock: reply.flags = htobe32(reply.flags); health_code_update(); ret = send_response(conn->sock, &reply, sizeof(reply)); if (ret < 0) { - goto end_unlock; + goto end_free; } health_code_update(); @@ -1562,7 +1561,7 @@ send_reply: health_code_update(); ret = send_response(conn->sock, data, len); if (ret < 0) { - goto end_unlock; + goto end_free; } health_code_update(); } @@ -1570,11 +1569,12 @@ send_reply: DBG("Sent %u bytes for stream %" PRIu64, len, be64toh(get_packet_info.stream_id)); -end_unlock: +end_free: free(data); - rcu_read_unlock(); - end: + if (vstream) { + viewer_stream_put(vstream); + } return ret; } @@ -1592,9 +1592,7 @@ int viewer_get_metadata(struct relay_connection *conn) char *data = NULL; struct lttng_viewer_get_metadata request; struct lttng_viewer_metadata_packet reply; - struct relay_viewer_stream *stream; - struct ctf_trace *ctf_trace; - struct relay_session *session; + struct relay_viewer_stream *vstream = NULL; assert(conn); @@ -1610,36 +1608,31 @@ int viewer_get_metadata(struct relay_connection *conn) memset(&reply, 0, sizeof(reply)); - rcu_read_lock(); - stream = viewer_stream_find_by_id(be64toh(request.stream_id)); - if (!stream || !stream->metadata_flag) { - ERR("Invalid metadata stream"); - goto error; + vstream = viewer_stream_get_by_id(be64toh(request.stream_id)); + if (!vstream) { + reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); + goto send_reply; } - - session = session_find_by_id(conn->sessions_ht, stream->session_id); - if (!session) { - ret = -1; + pthread_mutex_lock(&vstream->stream->lock); + if (!vstream->stream->is_metadata) { + ERR("Invalid metadata stream"); goto error; } - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - stream->path_name); - assert(ctf_trace); - assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received); + assert(vstream->metadata_sent <= vstream->stream->metadata_received); - len = ctf_trace->metadata_received - ctf_trace->metadata_sent; + len = vstream->stream->metadata_received - vstream->metadata_sent; if (len == 0) { reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); goto send_reply; } /* first time, we open the metadata file */ - if (stream->read_fd < 0) { + if (!vstream->stream_fd) { char fullpath[PATH_MAX]; - ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name, - stream->channel_name); + ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name, + vstream->channel_name); if (ret < 0) { goto error; } @@ -1648,7 +1641,13 @@ int viewer_get_metadata(struct relay_connection *conn) PERROR("Relay opening metadata file"); goto error; } - stream->read_fd = ret; + vstream->stream_fd = stream_fd_create(ret); + if (!vstream->stream_fd) { + if (close(ret)) { + PERROR("close"); + } + goto error; + } } reply.len = htobe64(len); @@ -1658,13 +1657,20 @@ int viewer_get_metadata(struct relay_connection *conn) goto error; } - read_len = lttng_read(stream->read_fd, data, len); + read_len = lttng_read(vstream->stream_fd->fd, data, len); if (read_len < len) { PERROR("Relay reading metadata file"); goto error; } - ctf_trace->metadata_sent += read_len; + vstream->metadata_sent += read_len; + if (vstream->metadata_sent == vstream->stream->metadata_received + && vstream->stream->closed) { + /* Release ownership for the viewer metadata stream. */ + viewer_stream_put(vstream); + } + reply.status = htobe32(LTTNG_VIEWER_METADATA_OK); + goto send_reply; error: @@ -1672,16 +1678,19 @@ error: send_reply: health_code_update(); + if (vstream) { + pthread_mutex_unlock(&vstream->stream->lock); + } ret = send_response(conn->sock, &reply, sizeof(reply)); if (ret < 0) { - goto end_unlock; + goto end_free; } health_code_update(); if (len > 0) { ret = send_response(conn->sock, data, len); if (ret < 0) { - goto end_unlock; + goto end_free; } } @@ -1690,10 +1699,12 @@ send_reply: DBG("Metadata sent"); -end_unlock: +end_free: free(data); - rcu_read_unlock(); end: + if (vstream) { + viewer_stream_put(vstream); + } return ret; } @@ -1712,13 +1723,12 @@ int viewer_create_session(struct relay_connection *conn) memset(&resp, 0, sizeof(resp)); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); - conn->viewer_session = zmalloc(sizeof(*conn->viewer_session)); + conn->viewer_session = viewer_session_create(); if (!conn->viewer_session) { ERR("Allocation viewer session"); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR); goto send_reply; } - CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head); send_reply: health_code_update(); @@ -1757,9 +1767,6 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, int ret = 0; uint32_t msg_value; - assert(recv_hdr); - assert(conn); - msg_value = be32toh(recv_hdr->cmd); /* @@ -1798,7 +1805,8 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_create_session(conn); break; default: - ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); + ERR("Received unknown viewer command (%u)", + be32toh(recv_hdr->cmd)); live_relay_unknown_command(conn); ret = -1; goto end; @@ -1813,8 +1821,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) { int ret; - assert(events); - (void) lttng_poll_del(events, pollfd); ret = close(pollfd); @@ -1823,38 +1829,6 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) } } -/* - * Delete and destroy a connection. - * - * RCU read side lock MUST be acquired. - */ -static void destroy_connection(struct lttng_ht *relay_connections_ht, - struct relay_connection *conn) -{ - struct relay_session *session, *tmp_session; - - assert(relay_connections_ht); - assert(conn); - - connection_delete(relay_connections_ht, conn); - - if (!conn->viewer_session) { - goto end; - } - - rcu_read_lock(); - cds_list_for_each_entry_safe(session, tmp_session, - &conn->viewer_session->sessions_head, - viewer_session_list) { - DBG("Cleaning connection of session ID %" PRIu64, session->id); - cleanup_session(conn, session); - } - rcu_read_unlock(); - -end: - connection_destroy(conn); -} - /* * This thread does the actual work */ @@ -1864,11 +1838,9 @@ void *thread_worker(void *data) int ret, err = -1; uint32_t nb_fd; struct lttng_poll_event events; - struct lttng_ht *relay_connections_ht; + struct lttng_ht *viewer_connections_ht; struct lttng_ht_iter iter; struct lttng_viewer_cmd recv_hdr; - struct relay_local_data *relay_ctx = (struct relay_local_data *) data; - struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; struct relay_connection *destroy_conn; DBG("[thread] Live viewer relay worker started"); @@ -1882,9 +1854,9 @@ void *thread_worker(void *data) } /* table of connections indexed on socket */ - relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); - if (!relay_connections_ht) { - goto relay_connections_ht_error; + viewer_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + if (!viewer_connections_ht) { + goto viewer_connections_ht_error; } ret = create_thread_poll_set(&events, 2); @@ -1944,7 +1916,7 @@ restart: goto exit; } - /* Inspect the relay conn pipe for new connection */ + /* Inspect the relay conn pipe for new connection. */ if (pollfd == live_conn_pipe[0]) { if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Relay live pipe error"); @@ -1952,50 +1924,51 @@ restart: } else if (revents & LPOLLIN) { struct relay_connection *conn; - ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn)); + ret = lttng_read(live_conn_pipe[0], + &conn, sizeof(conn)); if (ret < 0) { goto error; } - conn->sessions_ht = sessions_ht; - connection_init(conn); lttng_poll_add(&events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); - rcu_read_lock(); - lttng_ht_add_unique_ulong(relay_connections_ht, - &conn->sock_n); - rcu_read_unlock(); - DBG("Connection socket %d added", conn->sock->fd); + connection_ht_add(viewer_connections_ht, conn); + DBG("Connection socket %d added to poll", conn->sock->fd); } } else { + /* Connection activity. */ struct relay_connection *conn; - rcu_read_lock(); - conn = connection_find_by_sock(relay_connections_ht, pollfd); - /* If not found, there is a synchronization issue. */ - assert(conn); + conn = connection_get_by_sock(viewer_connections_ht, pollfd); + if (!conn) { + continue; + } if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, conn); + /* Put "create" ownership reference. */ + connection_put(conn); } else if (revents & LPOLLIN) { ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr, sizeof(recv_hdr), 0); if (ret <= 0) { - /* Connection closed */ + /* Connection closed. */ cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, conn); + /* Put "create" ownership reference. */ + connection_put(conn); DBG("Viewer control conn closed with %d", pollfd); } else { ret = process_control(&recv_hdr, conn); if (ret < 0) { /* Clear the session on error. */ cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, conn); + /* Put "create" ownership reference. */ + connection_put(conn); DBG("Viewer connection closed with %d", pollfd); } } } - rcu_read_unlock(); + /* Put local "get_by_sock" reference. */ + connection_put(conn); } } } @@ -2006,16 +1979,16 @@ error: /* Cleanup reamaining connection object. */ rcu_read_lock(); - cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, + cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { health_code_update(); - destroy_connection(relay_connections_ht, destroy_conn); + connection_put(destroy_conn); } rcu_read_unlock(); error_poll_create: - lttng_ht_destroy(relay_connections_ht); -relay_connections_ht_error: + lttng_ht_destroy(viewer_connections_ht); +viewer_connections_ht_error: /* Close relay conn pipes */ utils_close_pipe(live_conn_pipe); if (err) { @@ -2078,8 +2051,7 @@ int relayd_live_join(void) /* * main */ -int relayd_live_create(struct lttng_uri *uri, - struct relay_local_data *relay_ctx) +int relayd_live_create(struct lttng_uri *uri) { int ret = 0, retval = 0; void *status; @@ -2129,7 +2101,7 @@ int relayd_live_create(struct lttng_uri *uri, /* Setup the worker thread */ ret = pthread_create(&live_worker_thread, NULL, - thread_worker, relay_ctx); + thread_worker, NULL); if (ret) { errno = ret; PERROR("pthread_create viewer worker"); diff --git a/src/bin/lttng-relayd/live.h b/src/bin/lttng-relayd/live.h index 5db940b3b..2b8a3a050 100644 --- a/src/bin/lttng-relayd/live.h +++ b/src/bin/lttng-relayd/live.h @@ -1,6 +1,10 @@ +#ifndef LTTNG_RELAYD_LIVE_H +#define LTTNG_RELAYD_LIVE_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,15 +20,11 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef LTTNG_RELAYD_LIVE_H -#define LTTNG_RELAYD_LIVE_H - #include #include "lttng-relayd.h" -int relayd_live_create(struct lttng_uri *live_uri, - struct relay_local_data *relay_ctx); +int relayd_live_create(struct lttng_uri *live_uri); int relayd_live_stop(void); int relayd_live_join(void); diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 245c5fd26..e4e29e781 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -1,6 +1,10 @@ +#ifndef LTTNG_RELAYD_H +#define LTTNG_RELAYD_H + /* * Copyright (C) 2012 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -16,9 +20,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef LTTNG_RELAYD_H -#define LTTNG_RELAYD_H - #include #include #include @@ -34,24 +35,17 @@ struct relay_conn_queue { int32_t futex; }; -struct relay_local_data { - struct lttng_ht *sessions_ht; -}; - -extern char *opt_output_path; - /* * Contains stream indexed by ID. This is important since many commands lookup * streams only by ID thus also keeping them in this hash table makes the - * search O(1) instead of iterating over the ctf_traces_ht of the session. + * search O(1). */ +extern struct lttng_ht *sessions_ht; extern struct lttng_ht *relay_streams_ht; - extern struct lttng_ht *viewer_streams_ht; -extern struct lttng_ht *indexes_ht; +extern char *opt_output_path; extern const char *tracing_group_name; - extern const char * const config_section_name; extern int thread_quit_pipe[2]; diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index f55901a4c..057ac4046 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2,6 +2,7 @@ * Copyright (C) 2012 - Julien Desfossez * David Goulet * 2013 - Jérémie Galarneau + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -56,6 +57,7 @@ #include #include #include +#include #include "cmd.h" #include "ctf-trace.h" @@ -114,6 +116,11 @@ static pthread_t dispatcher_thread; static pthread_t worker_thread; static pthread_t health_thread; +/* + * last_relay_stream_id_lock protects last_relay_stream_id increment + * atomicity on 32-bit architectures. + */ +static pthread_mutex_t last_relay_stream_id_lock = PTHREAD_MUTEX_INITIALIZER; static uint64_t last_relay_stream_id; /* @@ -128,18 +135,14 @@ static struct relay_conn_queue relay_conn_queue; static char *data_buffer; static unsigned int data_buffer_size; -/* We need those values for the file/dir creation. */ -static uid_t relayd_uid; -static gid_t relayd_gid; - /* Global relay stream hash table. */ struct lttng_ht *relay_streams_ht; /* Global relay viewer stream hash table. */ struct lttng_ht *viewer_streams_ht; -/* Global hash table that stores relay index object. */ -struct lttng_ht *indexes_ht; +/* Global relay sessions hash table. */ +struct lttng_ht *sessions_ht; /* Relayd health monitoring */ struct health_app *health_relayd; @@ -163,8 +166,7 @@ static const char *config_ignore_options[] = { "help", "config" }; /* * usage function on stderr */ -static -void usage(void) +static void usage(void) { fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); fprintf(stderr, " -h, --help Display this usage.\n"); @@ -185,8 +187,7 @@ void usage(void) * * Return 0 on success else a negative value. */ -static -int set_option(int opt, const char *arg, const char *optname) +static int set_option(int opt, const char *arg, const char *optname) { int ret; @@ -308,8 +309,7 @@ end: * See config_entry_handler_cb comment in common/config/config.h for the * return value conventions. */ -static -int config_entry_handler(const struct config_entry *entry, void *unused) +static int config_entry_handler(const struct config_entry *entry, void *unused) { int ret = 0, i; @@ -332,9 +332,9 @@ int config_entry_handler(const struct config_entry *entry, void *unused) } /* - * If the option takes no argument on the command line, we have to - * check if the value is "true". We support non-zero numeric values, - * true, on and yes. + * If the option takes no argument on the command line, + * we have to check if the value is "true". We support + * non-zero numeric values, true, on and yes. */ if (!long_options[i].has_arg) { ret = config_parse_value(entry->value); @@ -359,8 +359,7 @@ end: return ret; } -static -int set_options(int argc, char **argv) +static int set_options(int argc, char **argv) { int c, ret = 0, option_index = 0, retval = 0; int orig_optopt = optopt, orig_optind = optind; @@ -483,21 +482,32 @@ exit: return retval; } +static void print_global_objects(void) +{ + rcu_register_thread(); + + print_viewer_streams(); + print_relay_streams(); + print_sessions(); + + rcu_unregister_thread(); +} + /* * Cleanup the daemon */ -static -void relayd_cleanup(struct relay_local_data *relay_ctx) +static void relayd_cleanup(void) { + print_global_objects(); + DBG("Cleaning up"); if (viewer_streams_ht) lttng_ht_destroy(viewer_streams_ht); if (relay_streams_ht) lttng_ht_destroy(relay_streams_ht); - if (relay_ctx && relay_ctx->sessions_ht) - lttng_ht_destroy(relay_ctx->sessions_ht); - free(relay_ctx); + if (sessions_ht) + lttng_ht_destroy(sessions_ht); /* free the dynamically allocated opt_output_path */ free(opt_output_path); @@ -517,8 +527,7 @@ void relayd_cleanup(struct relay_local_data *relay_ctx) /* * Write to writable pipe used to notify a thread. */ -static -int notify_thread_pipe(int wpipe) +static int notify_thread_pipe(int wpipe) { ssize_t ret; @@ -532,8 +541,7 @@ end: return ret; } -static -int notify_health_quit_pipe(int *pipe) +static int notify_health_quit_pipe(int *pipe) { ssize_t ret; @@ -582,8 +590,7 @@ int lttng_relay_stop_threads(void) * Simply stop all worker threads, leaving main() return gracefully after * joining all threads and calling cleanup(). */ -static -void sighandler(int sig) +static void sighandler(int sig) { switch (sig) { case SIGPIPE: @@ -613,8 +620,7 @@ void sighandler(int sig) * Setup signal handler for : * SIGINT, SIGTERM, SIGPIPE */ -static -int set_signal_handler(void) +static int set_signal_handler(void) { int ret = 0; struct sigaction sa; @@ -668,8 +674,7 @@ void lttng_relay_notify_ready(void) * * Return -1 on error or 0 if all pipes are created. */ -static -int init_thread_quit_pipe(void) +static int init_thread_quit_pipe(void) { int ret; @@ -681,8 +686,7 @@ int init_thread_quit_pipe(void) /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ -static -int create_thread_poll_set(struct lttng_poll_event *events, int size) +static int create_thread_poll_set(struct lttng_poll_event *events, int size) { int ret; @@ -713,8 +717,7 @@ error: * * Return 1 if it was triggered else 0; */ -static -int check_thread_quit_pipe(int fd, uint32_t events) +static int check_thread_quit_pipe(int fd, uint32_t events) { if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { return 1; @@ -726,8 +729,7 @@ int check_thread_quit_pipe(int fd, uint32_t events) /* * Create and init socket from uri. */ -static -struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri) +static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri) { int ret; struct lttcomm_sock *sock = NULL; @@ -764,64 +766,10 @@ error: return NULL; } -/* - * Return nonzero if stream needs to be closed. - */ -static -int close_stream_check(struct relay_stream *stream) -{ - if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) { - /* - * We are about to close the stream so set the data pending flag to 1 - * which will make the end data pending command skip the stream which - * is now closed and ready. Note that after proceeding to a file close, - * the written file is ready for reading. - */ - stream->data_pending_check_done = 1; - return 1; - } - return 0; -} - -static void try_close_stream(struct relay_session *session, - struct relay_stream *stream) -{ - int ret; - struct ctf_trace *ctf_trace; - - assert(session); - assert(stream); - - if (!close_stream_check(stream)) { - /* Can't close it, not ready for that. */ - goto end; - } - - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - stream->path_name); - assert(ctf_trace); - - pthread_mutex_lock(&session->viewer_ready_lock); - ctf_trace->invalid_flag = 1; - pthread_mutex_unlock(&session->viewer_ready_lock); - - ret = stream_close(session, stream); - if (ret || session->snapshot) { - /* Already close thus the ctf trace is being or has been destroyed. */ - goto end; - } - - ctf_trace_try_destroy(session, ctf_trace); - -end: - return; -} - /* * This thread manages the listening for new connections on the network */ -static -void *relay_thread_listener(void *data) +static void *relay_thread_listener(void *data) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; @@ -834,18 +782,19 @@ void *relay_thread_listener(void *data) health_code_update(); - control_sock = relay_init_sock(control_uri); + control_sock = relay_socket_create(control_uri); if (!control_sock) { goto error_sock_control; } - data_sock = relay_init_sock(data_uri); + data_sock = relay_socket_create(data_uri); if (!data_sock) { goto error_sock_relay; } /* - * Pass 3 as size here for the thread quit pipe, control and data socket. + * Pass 3 as size here for the thread quit pipe, control and + * data socket. */ ret = create_thread_poll_set(&events, 3); if (ret < 0) { @@ -900,7 +849,10 @@ restart: pollfd = LTTNG_POLL_GETFD(&events, i); if (!revents) { - /* No activity for this FD (poll implementation). */ + /* + * No activity for this FD (poll + * implementation). + */ continue; } @@ -916,33 +868,30 @@ restart: goto error; } else if (revents & LPOLLIN) { /* - * Get allocated in this thread, enqueued to a global queue, - * dequeued and freed in the worker thread. + * A new connection is requested, therefore a + * sessiond/consumerd connection is allocated in + * this thread, enqueued to a global queue and + * dequeued (and freed) in the worker thread. */ int val = 1; struct relay_connection *new_conn; struct lttcomm_sock *newsock; - - new_conn = connection_create(); - if (!new_conn) { - goto error; - } + enum connection_type type; if (pollfd == data_sock->fd) { - new_conn->type = RELAY_DATA; + type = RELAY_DATA; newsock = data_sock->ops->accept(data_sock); DBG("Relay data connection accepted, socket %d", newsock->fd); } else { assert(pollfd == control_sock->fd); - new_conn->type = RELAY_CONTROL; + type = RELAY_CONTROL; newsock = control_sock->ops->accept(control_sock); DBG("Relay control connection accepted, socket %d", newsock->fd); } if (!newsock) { PERROR("accepting sock"); - connection_free(new_conn); goto error; } @@ -951,18 +900,22 @@ restart: if (ret < 0) { PERROR("setsockopt inet"); lttcomm_destroy_sock(newsock); - connection_free(new_conn); goto error; } - new_conn->sock = newsock; + new_conn = connection_create(newsock, type); + if (!new_conn) { + lttcomm_destroy_sock(newsock); + goto error; + } /* Enqueue request for the dispatcher thread. */ cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail, &new_conn->qnode); /* - * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfcq_enqueue. + * Wake the dispatch queue futex. + * Implicit memory barrier with the + * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); } @@ -1004,8 +957,7 @@ error_sock_control: /* * This thread manages the dispatching of the requests to worker threads */ -static -void *relay_thread_dispatcher(void *data) +static void *relay_thread_dispatcher(void *data) { int err = -1; ssize_t ret; @@ -1044,14 +996,15 @@ void *relay_thread_dispatcher(void *data) DBG("Dispatching request waiting on sock %d", new_conn->sock->fd); /* - * Inform worker thread of the new request. This call is blocking - * so we can be assured that the data will be read at some point in - * time or wait to the end of the world :) + * Inform worker thread of the new request. This + * call is blocking so we can be assured that + * the data will be read at some point in time + * or wait to the end of the world :) */ ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn)); if (ret < 0) { PERROR("write connection pipe"); - connection_destroy(new_conn); + connection_put(new_conn); goto error; } } while (node != NULL); @@ -1077,72 +1030,27 @@ error_testpoint: return NULL; } -static void try_close_streams(struct relay_session *session) -{ - struct ctf_trace *ctf_trace; - struct lttng_ht_iter iter; - - assert(session); - - pthread_mutex_lock(&session->viewer_ready_lock); - rcu_read_lock(); - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, - node.node) { - struct relay_stream *stream; - - /* Close streams. */ - cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) { - stream_close(session, stream); - } - - ctf_trace->invalid_flag = 1; - ctf_trace_try_destroy(session, ctf_trace); - } - rcu_read_unlock(); - pthread_mutex_unlock(&session->viewer_ready_lock); -} - /* - * Try to destroy a session within a connection. + * Set index data from the control port to a given index object. */ -static void destroy_session(struct relay_session *session, - struct lttng_ht *sessions_ht) -{ - assert(session); - assert(sessions_ht); - - /* Indicate that this session can be destroyed from now on. */ - session->close_flag = 1; - - try_close_streams(session); - - /* - * This will try to delete and destroy the session if no viewer is attached - * to it meaning the refcount is down to zero. - */ - session_try_destroy(sessions_ht, session); -} - -/* - * Copy index data from the control port to a given index object. - */ -static void copy_index_control_data(struct relay_index *index, +static int set_index_control_data(struct relay_index *index, struct lttcomm_relayd_index *data) { - assert(index); - assert(data); + struct ctf_packet_index index_data; /* - * The index on disk is encoded in big endian, so we don't need to convert - * the data received on the network. The data_offset value is NEVER - * modified here and is updated by the data thread. + * The index on disk is encoded in big endian, so we don't need + * to convert the data received on the network. The data_offset + * value is NEVER modified here and is updated by the data + * thread. */ - index->index_data.packet_size = data->packet_size; - index->index_data.content_size = data->content_size; - index->index_data.timestamp_begin = data->timestamp_begin; - index->index_data.timestamp_end = data->timestamp_end; - index->index_data.events_discarded = data->events_discarded; - index->index_data.stream_id = data->stream_id; + index_data.packet_size = data->packet_size; + index_data.content_size = data->content_size; + index_data.timestamp_begin = data->timestamp_begin; + index_data.timestamp_end = data->timestamp_end; + index_data.events_discarded = data->events_discarded; + index_data.stream_id = data->stream_id; + return relay_index_set_data(index, &index_data); } /* @@ -1150,31 +1058,22 @@ static void copy_index_control_data(struct relay_index *index, * * On success, send back the session id or else return a negative value. */ -static -int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret = 0, send_ret; struct relay_session *session; struct lttcomm_relayd_status_session reply; + char session_name[NAME_MAX]; + char hostname[HOST_NAME_MAX]; + uint32_t live_timer = 0; + bool snapshot = false; - assert(recv_hdr); - assert(conn); + memset(session_name, 0, NAME_MAX); + memset(hostname, 0, HOST_NAME_MAX); memset(&reply, 0, sizeof(reply)); - session = session_create(); - if (!session) { - ret = -1; - goto error; - } - session->minor = conn->minor; - session->major = conn->major; - conn->session_id = session->id; - conn->session = session; - - reply.session_id = htobe64(session->id); - switch (conn->minor) { case 1: case 2: @@ -1182,13 +1081,26 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, break; case 4: /* LTTng sessiond 2.4 */ default: - ret = cmd_create_session_2_4(conn, session); + ret = cmd_create_session_2_4(conn, session_name, + hostname, &live_timer, &snapshot); + } + if (ret < 0) { + goto send_reply; } - lttng_ht_add_unique_u64(conn->sessions_ht, &session->session_n); + session = session_create(session_name, hostname, live_timer, + snapshot, conn->major, conn->minor); + if (!session) { + ret = -1; + goto send_reply; + } + assert(!conn->session); + conn->session = session; DBG("Created session %" PRIu64, session->id); -error: + reply.session_id = htobe64(session->id); + +send_reply: if (ret < 0) { reply.ret_code = htobe32(LTTNG_ERR_FATAL); } else { @@ -1208,47 +1120,47 @@ error: * When we have received all the streams and the metadata for a channel, * we make them visible to the viewer threads. */ -static -void set_viewer_ready_flag(struct relay_connection *conn) +static void publish_connection_local_streams(struct relay_connection *conn) { - struct relay_stream *stream, *tmp_stream; + struct relay_stream *stream; + struct relay_session *session = conn->session; - pthread_mutex_lock(&conn->session->viewer_ready_lock); - cds_list_for_each_entry_safe(stream, tmp_stream, &conn->recv_head, - recv_list) { - stream->viewer_ready = 1; - cds_list_del(&stream->recv_list); + /* + * We publish all streams belonging to a session atomically wrt + * session lock. + */ + pthread_mutex_lock(&session->lock); + rcu_read_lock(); + cds_list_for_each_entry_rcu(stream, &session->recv_list, + recv_node) { + stream_publish(stream); } - pthread_mutex_unlock(&conn->session->viewer_ready_lock); - return; -} - -/* - * Add a recv handle node to the connection recv list with the given stream - * handle. A new node is allocated thus must be freed when the node is deleted - * from the list. - */ -static void queue_stream(struct relay_stream *stream, - struct relay_connection *conn) -{ - assert(conn); - assert(stream); + rcu_read_unlock(); - cds_list_add(&stream->recv_list, &conn->recv_head); + /* + * Inform the viewer that there are new streams in the session. + */ + if (session->viewer_attached) { + uatomic_set(&session->new_streams, 1); + } + pthread_mutex_unlock(&session->lock); } /* * relay_add_stream: allocate a new stream for a session */ -static -int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret, send_ret; + int ret; + ssize_t send_ret; struct relay_session *session = conn->session; struct relay_stream *stream = NULL; struct lttcomm_relayd_status_stream reply; struct ctf_trace *trace = NULL; + uint64_t stream_handle = -1ULL; + char *path_name = NULL, *channel_name = NULL; + uint64_t tracefile_size = 0, tracefile_count = 0; if (!session || conn->version_check_done == 0) { ERR("Trying to add a stream before version check"); @@ -1256,108 +1168,48 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - stream = zmalloc(sizeof(struct relay_stream)); - if (stream == NULL) { - PERROR("relay stream zmalloc"); - ret = -1; - goto end_no_session; - } - - switch (conn->minor) { - case 1: /* LTTng sessiond 2.1 */ - ret = cmd_recv_stream_2_1(conn, stream); + switch (session->minor) { + case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */ + ret = cmd_recv_stream_2_1(conn, &path_name, + &channel_name); break; - case 2: /* LTTng sessiond 2.2 */ + case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */ default: - ret = cmd_recv_stream_2_2(conn, stream); + ret = cmd_recv_stream_2_2(conn, &path_name, + &channel_name, &tracefile_size, &tracefile_count); break; } if (ret < 0) { - goto err_free_stream; - } - - stream->stream_handle = ++last_relay_stream_id; - stream->prev_seq = -1ULL; - stream->session_id = session->id; - stream->index_fd = -1; - stream->read_index_fd = -1; - stream->ctf_stream_id = -1ULL; - lttng_ht_node_init_u64(&stream->node, stream->stream_handle); - pthread_mutex_init(&stream->lock, NULL); - - ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, - -1, -1); - if (ret < 0) { - ERR("relay creating output directory"); - goto err_free_stream; + goto send_reply; } - /* - * No need to use run_as API here because whatever we receives, the relayd - * uses its own credentials for the stream files. - */ - ret = utils_create_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL); - if (ret < 0) { - ERR("Create output file"); - goto err_free_stream; - } - stream->fd = ret; - if (stream->tracefile_size) { - DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); - } else { - DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); - } - - /* Protect access to "trace" */ - rcu_read_lock(); - trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); + trace = ctf_trace_get_by_path_or_create(session, path_name); if (!trace) { - trace = ctf_trace_create(stream->path_name); - if (!trace) { - ret = -1; - goto end; - } - ctf_trace_add(session->ctf_traces_ht, trace); + goto send_reply; } - ctf_trace_get_ref(trace); + /* This stream here has one reference on the trace. */ - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { - stream->metadata_flag = 1; - /* Assign quick reference to the metadata stream in the trace. */ - trace->metadata_stream = stream; - } + pthread_mutex_lock(&last_relay_stream_id_lock); + stream_handle = ++last_relay_stream_id; + pthread_mutex_unlock(&last_relay_stream_id_lock); - /* - * Add the stream in the recv list of the connection. Once the end stream - * message is received, this list is emptied and streams are set with the - * viewer ready flag. - */ - queue_stream(stream, conn); + /* We pass ownership of path_name and channel_name. */ + stream = stream_create(trace, stream_handle, path_name, + channel_name, tracefile_size, tracefile_count); + path_name = NULL; + channel_name = NULL; /* - * Both in the ctf_trace object and the global stream ht since the data - * side of the relayd does not have the concept of session. - * - * rcu_read_lock() is kept to protect the stream which is now part of - * the relay_streams_ht. + * Streams are the owners of their trace. Reference to trace is + * kept within stream_create(). */ - lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); - cds_list_add_tail(&stream->trace_list, &trace->stream_list); - - session->stream_count++; + ctf_trace_put(trace); - DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, - stream->stream_handle); - -end: +send_reply: memset(&reply, 0, sizeof(reply)); - reply.handle = htobe64(stream->stream_handle); - /* send the session id to the client or a negative return code on error */ - if (ret < 0) { + reply.handle = htobe64(stream_handle); + if (!stream) { reply.ret_code = htobe32(LTTNG_ERR_UNK); - /* stream was not properly added to the ht, so free it */ - stream_destroy(stream); } else { reply.ret_code = htobe32(LTTNG_OK); } @@ -1366,29 +1218,19 @@ end: sizeof(struct lttcomm_relayd_status_stream), 0); if (send_ret < 0) { ERR("Relay sending stream id"); - ret = send_ret; + ret = (int) send_ret; } - /* - * rcu_read_lock() was held to protect either "trace" OR the "stream" at - * this point. - */ - rcu_read_unlock(); - trace = NULL; - stream = NULL; end_no_session: - return ret; - -err_free_stream: - stream_destroy(stream); + free(path_name); + free(channel_name); return ret; } /* * relay_close_stream: close a specific stream */ -static -int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret, send_ret; @@ -1418,24 +1260,36 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - rcu_read_lock(); - stream = stream_find_by_id(relay_streams_ht, - be64toh(stream_info.stream_id)); + stream = stream_get_by_id(be64toh(stream_info.stream_id)); if (!stream) { ret = -1; - goto end_unlock; + goto end; } - + pthread_mutex_lock(&stream->lock); + stream->closed = true; stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); - stream->close_flag = 1; - session->stream_count--; - - /* Check if we can close it or else the data will do it. */ - try_close_stream(session, stream); + if (stream->is_metadata) { + struct relay_viewer_stream *vstream; -end_unlock: - rcu_read_unlock(); + vstream = viewer_stream_get_by_id(stream->stream_handle); + if (vstream) { + if (vstream->metadata_sent == stream->metadata_received) { + /* + * Since all the metadata has been sent to the + * viewer and that we have a request to close + * its stream, we can safely teardown the + * corresponding metadata viewer stream. + */ + viewer_stream_put(vstream); + } + /* Put local reference. */ + viewer_stream_put(vstream); + } + } + pthread_mutex_unlock(&stream->lock); + stream_put(stream); +end: memset(&reply, 0, sizeof(reply)); if (ret < 0) { reply.ret_code = htobe32(LTTNG_ERR_UNK); @@ -1456,8 +1310,7 @@ end_no_session: /* * relay_unknown_command: send -1 if received unknown command */ -static -void relay_unknown_command(struct relay_connection *conn) +static void relay_unknown_command(struct relay_connection *conn) { struct lttcomm_relayd_generic_reply reply; int ret; @@ -1475,8 +1328,7 @@ void relay_unknown_command(struct relay_connection *conn) * relay_start: send an acknowledgment to the client to tell if we are * ready to receive data. We are ready if a session is established. */ -static -int relay_start(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_start(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret = htobe32(LTTNG_OK); @@ -1530,10 +1382,9 @@ end: } /* - * relay_recv_metadata: receive the metada for the session. + * relay_recv_metadata: receive the metadata for the session. */ -static -int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret = htobe32(LTTNG_OK); @@ -1542,7 +1393,6 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload *metadata_struct; struct relay_stream *metadata_stream; uint64_t data_size, payload_size; - struct ctf_trace *ctf_trace; if (!session) { ERR("Metadata sent before version check"); @@ -1587,38 +1437,37 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, } metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; - rcu_read_lock(); - metadata_stream = stream_find_by_id(relay_streams_ht, - be64toh(metadata_struct->stream_id)); + metadata_stream = stream_get_by_id(be64toh(metadata_struct->stream_id)); if (!metadata_stream) { ret = -1; - goto end_unlock; + goto end; } - size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload, + pthread_mutex_lock(&metadata_stream->lock); + + size_ret = lttng_write(metadata_stream->stream_fd->fd, metadata_struct->payload, payload_size); if (size_ret < payload_size) { ERR("Relay error writing metadata on file"); ret = -1; - goto end_unlock; + goto end_put; } - ret = write_padding_to_file(metadata_stream->fd, + ret = write_padding_to_file(metadata_stream->stream_fd->fd, be32toh(metadata_struct->padding_size)); if (ret < 0) { - goto end_unlock; + goto end_put; } - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - metadata_stream->path_name); - assert(ctf_trace); - ctf_trace->metadata_received += + metadata_stream->metadata_received += payload_size + be32toh(metadata_struct->padding_size); + DBG2("Relay metadata written. Updated metadata_received %" PRIu64, + metadata_stream->metadata_received); - DBG2("Relay metadata written"); +end_put: + pthread_mutex_unlock(&metadata_stream->lock); + stream_put(metadata_stream); -end_unlock: - rcu_read_unlock(); end: return ret; } @@ -1626,15 +1475,12 @@ end: /* * relay_send_version: send relayd version number */ -static -int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret; struct lttcomm_relayd_version reply, msg; - assert(conn); - conn->version_check_done = 1; /* Get version from the other side. */ @@ -1658,7 +1504,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions (%u vs %u), deleting session", reply.major, be32toh(msg.major)); - destroy_session(conn->session, conn->sessions_ht); + connection_put(conn); ret = 0; goto end; } @@ -1689,8 +1535,7 @@ end: /* * Check for data pending for a given stream id from the session daemon. */ -static -int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { struct relay_session *session = conn->session; @@ -1724,13 +1569,14 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, stream_id = be64toh(msg.stream_id); last_net_seq_num = be64toh(msg.last_net_seq_num); - rcu_read_lock(); - stream = stream_find_by_id(relay_streams_ht, stream_id); + stream = stream_get_by_id(stream_id); if (stream == NULL) { ret = -1; - goto end_unlock; + goto end; } + pthread_mutex_lock(&stream->lock); + DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 " and last_seq %" PRIu64, stream_id, stream->prev_seq, last_net_seq_num); @@ -1744,11 +1590,11 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, ret = 1; } - /* Pending check is now done. */ - stream->data_pending_check_done = 1; + stream->data_pending_check_done = true; + pthread_mutex_unlock(&stream->lock); -end_unlock: - rcu_read_unlock(); + stream_put(stream); +end: memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(ret); @@ -1764,18 +1610,17 @@ end_no_session: /* * Wait for the control socket to reach a quiescent state. * - * Note that for now, when receiving this command from the session daemon, this - * means that every subsequent commands or data received on the control socket - * has been handled. So, this is why we simply return OK here. + * Note that for now, when receiving this command from the session + * daemon, this means that every subsequent commands or data received on + * the control socket has been handled. So, this is why we simply return + * OK here. */ -static -int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret; uint64_t stream_id; struct relay_stream *stream; - struct lttng_ht_iter iter; struct lttcomm_relayd_quiescent_control msg; struct lttcomm_relayd_generic_reply reply; @@ -1801,19 +1646,16 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, } stream_id = be64toh(msg.stream_id); - - rcu_read_lock(); - cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - node.node) { - if (stream->stream_handle == stream_id) { - stream->data_pending_check_done = 1; - DBG("Relay quiescent control pending flag set to %" PRIu64, - stream_id); - break; - } - } - rcu_read_unlock(); - + stream = stream_get_by_id(stream_id); + if (!stream) { + goto reply; + } + pthread_mutex_lock(&stream->lock); + stream->data_pending_check_done = true; + pthread_mutex_unlock(&stream->lock); + DBG("Relay quiescent control pending flag set to %" PRIu64, stream_id); + stream_put(stream); +reply: memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_OK); ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); @@ -1826,14 +1668,13 @@ end_no_session: } /* - * Initialize a data pending command. This means that a client is about to ask - * for data pending for each stream he/she holds. Simply iterate over all - * streams of a session and set the data_pending_check_done flag. + * Initialize a data pending command. This means that a consumer is about + * to ask for data pending for each stream it holds. Simply iterate over + * all streams of a session and set the data_pending_check_done flag. * * This command returns to the client a LTTNG_OK code. */ -static -int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret; @@ -1870,18 +1711,25 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, session_id = be64toh(msg.session_id); /* - * Iterate over all streams to set the begin data pending flag. For now, the - * streams are indexed by stream handle so we have to iterate over all - * streams to find the one associated with the right session_id. + * Iterate over all streams to set the begin data pending flag. + * For now, the streams are indexed by stream handle so we have + * to iterate over all streams to find the one associated with + * the right session_id. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (stream->session_id == session_id) { - stream->data_pending_check_done = 0; + if (!stream_get(stream)) { + continue; + } + if (stream->trace->session->id == session_id) { + pthread_mutex_lock(&stream->lock); + stream->data_pending_check_done = false; + pthread_mutex_unlock(&stream->lock); DBG("Set begin data pending flag to stream %" PRIu64, stream->stream_handle); } + stream_put(stream); } rcu_read_unlock(); @@ -1899,16 +1747,15 @@ end_no_session: } /* - * End data pending command. This will check, for a given session id, if each - * stream associated with it has its data_pending_check_done flag set. If not, - * this means that the client lost track of the stream but the data is still - * being streamed on our side. In this case, we inform the client that data is - * inflight. + * End data pending command. This will check, for a given session id, if + * each stream associated with it has its data_pending_check_done flag + * set. If not, this means that the client lost track of the stream but + * the data is still being streamed on our side. In this case, we inform + * the client that data is in flight. * * Return to the client if there is data in flight or not with a ret_code. */ -static -int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret; @@ -1919,9 +1766,6 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, uint64_t session_id; uint32_t is_data_inflight = 0; - assert(recv_hdr); - assert(conn); - DBG("End data pending command"); if (!conn->session || conn->version_check_done == 0) { @@ -1945,17 +1789,33 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, session_id = be64toh(msg.session_id); - /* Iterate over all streams to see if the begin data pending flag is set. */ + /* + * Iterate over all streams to see if the begin data pending + * flag is set. + */ rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, node.node) { - if (stream->session_id == session_id && - !stream->data_pending_check_done && !stream->terminated_flag) { - is_data_inflight = 1; - DBG("Data is still in flight for stream %" PRIu64, - stream->stream_handle); - break; + if (!stream_get(stream)) { + continue; + } + if (stream->trace->session->id != session_id) { + stream_put(stream); + continue; + } + pthread_mutex_lock(&stream->lock); + if (!stream->data_pending_check_done) { + if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) { + is_data_inflight = 1; + DBG("Data is still in flight for stream %" PRIu64, + stream->stream_handle); + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + break; + } } + pthread_mutex_unlock(&stream->lock); + stream_put(stream); } rcu_read_unlock(); @@ -1977,14 +1837,13 @@ end_no_session: * * Return 0 on success else a negative value. */ -static -int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { - int ret, send_ret, index_created = 0; + int ret, send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_index index_info; - struct relay_index *index, *wr_index = NULL; + struct relay_index *index; struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; uint64_t net_seq_num; @@ -2014,76 +1873,66 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, net_seq_num = be64toh(index_info.net_seq_num); - rcu_read_lock(); - stream = stream_find_by_id(relay_streams_ht, - be64toh(index_info.relay_stream_id)); + stream = stream_get_by_id(be64toh(index_info.relay_stream_id)); if (!stream) { + ERR("stream_get_by_id not found"); ret = -1; - goto end_rcu_unlock; + goto end; } + pthread_mutex_lock(&stream->lock); /* Live beacon handling */ if (index_info.packet_size == 0) { - DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); + DBG("Received live beacon for stream %" PRIu64, + stream->stream_handle); /* - * Only flag a stream inactive when it has already received data - * and no indexes are in flight. + * Only flag a stream inactive when it has already + * received data and no indexes are in flight. */ - if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { - stream->beacon_ts_end = be64toh(index_info.timestamp_end); + if (stream->total_index_received > 0 + && stream->indexes_in_flight == 0) { + stream->beacon_ts_end = + be64toh(index_info.timestamp_end); } ret = 0; - goto end_rcu_unlock; + goto end_stream_put; } else { stream->beacon_ts_end = -1ULL; } - index = relay_index_find(stream->stream_handle, net_seq_num); - if (!index) { - /* A successful creation will add the object to the HT. */ - index = relay_index_create(stream->stream_handle, net_seq_num); - if (!index) { - goto end_rcu_unlock; - } - index_created = 1; - stream->indexes_in_flight++; - } - - copy_index_control_data(index, &index_info); if (stream->ctf_stream_id == -1ULL) { stream->ctf_stream_id = be64toh(index_info.stream_id); } - - if (index_created) { - /* - * Try to add the relay index object to the hash table. If an object - * already exist, destroy back the index created, set the data in this - * object and write it on disk. - */ - relay_index_add(index, &wr_index); - if (wr_index) { - copy_index_control_data(wr_index, &index_info); - free(index); - } - } else { - /* The index already exists so write it on disk. */ - wr_index = index; + index = relay_index_get_by_id_or_create(stream, net_seq_num); + if (!index) { + ret = -1; + ERR("relay_index_get_by_id_or_create index NULL"); + goto end_stream_put; } - - /* Do we have a writable ready index to write on disk. */ - if (wr_index) { - ret = relay_index_write(wr_index->fd, wr_index); - if (ret < 0) { - goto end_rcu_unlock; - } + if (set_index_control_data(index, &index_info)) { + ERR("set_index_control_data error"); + relay_index_put(index); + ret = -1; + goto end_stream_put; + } + ret = relay_index_try_flush(index); + if (ret == 0) { stream->total_index_received++; - stream->indexes_in_flight--; - assert(stream->indexes_in_flight >= 0); + } else if (ret > 0) { + /* no flush. */ + ret = 0; + } else { + ERR("relay_index_try_flush error %d", ret); + relay_index_put(index); + ret = -1; } -end_rcu_unlock: - rcu_read_unlock(); +end_stream_put: + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + +end: memset(&reply, 0, sizeof(reply)); if (ret < 0) { @@ -2106,8 +1955,7 @@ end_no_session: * * Return 0 on success else a negative value. */ -static -int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret, send_ret; @@ -2124,17 +1972,10 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, } /* - * Flag every pending stream in the connection recv list that they are - * ready to be used by the viewer. - */ - set_viewer_ready_flag(conn); - - /* - * Inform the viewer that there are new streams in the session. + * Publish every pending stream in the connection recv list which are + * now ready to be used by the viewer. */ - if (conn->session->viewer_refcount) { - uatomic_set(&conn->session->new_streams, 1); - } + publish_connection_local_streams(conn); memset(&reply, 0, sizeof(reply)); reply.ret_code = htobe32(LTTNG_OK); @@ -2154,8 +1995,7 @@ end_no_session: /* * Process the commands received on the control socket */ -static -int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, +static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn) { int ret = 0; @@ -2212,93 +2052,91 @@ end: /* * Handle index for a data stream. * - * RCU read side lock MUST be acquired. + * Called with the stream lock held. * * Return 0 on success else a negative value. */ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, int rotate_index) { - int ret = 0, index_created = 0; - uint64_t stream_id, data_offset; - struct relay_index *index, *wr_index = NULL; - - assert(stream); + int ret = 0; + uint64_t data_offset; + struct relay_index *index; - stream_id = stream->stream_handle; /* Get data offset because we are about to update the index. */ data_offset = htobe64(stream->tracefile_size_current); + DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64, + stream->stream_handle, stream->tracefile_size_current); + /* - * Lookup for an existing index for that stream id/sequence number. If on - * exists, the control thread already received the data for it thus we need - * to write it on disk. + * Lookup for an existing index for that stream id/sequence + * number. If it exists, the control thread has already received the + * data for it, thus we need to write it to disk. */ - index = relay_index_find(stream_id, net_seq_num); + index = relay_index_get_by_id_or_create(stream, net_seq_num); if (!index) { - /* A successful creation will add the object to the HT. */ - index = relay_index_create(stream_id, net_seq_num); - if (!index) { - ret = -1; - goto error; - } - index_created = 1; - stream->indexes_in_flight++; + ret = -1; + goto end; } - if (rotate_index || stream->index_fd < 0) { - index->to_close_fd = stream->index_fd; - ret = index_create_file(stream->path_name, stream->channel_name, - relayd_uid, relayd_gid, stream->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - /* This will close the stream's index fd if one. */ - relay_index_free_safe(index); - goto error; + if (rotate_index || !stream->index_fd) { + int fd; + + /* Put ref on previous index_fd. */ + if (stream->index_fd) { + stream_fd_put(stream->index_fd); + stream->index_fd = NULL; } - stream->index_fd = ret; - } - index->fd = stream->index_fd; - index->index_data.offset = data_offset; - if (index_created) { - /* - * Try to add the relay index object to the hash table. If an object - * already exist, destroy back the index created and set the data. - */ - relay_index_add(index, &wr_index); - if (wr_index) { - /* Copy back data from the created index. */ - wr_index->fd = index->fd; - wr_index->to_close_fd = index->to_close_fd; - wr_index->index_data.offset = data_offset; - free(index); + fd = index_create_file(stream->path_name, stream->channel_name, + -1, -1, stream->tracefile_size, + stream->current_tracefile_id); + if (fd < 0) { + ret = -1; + /* Put self-ref for this index due to error. */ + relay_index_put(index); + goto end; + } + stream->index_fd = stream_fd_create(fd); + if (!stream->index_fd) { + ret = -1; + if (close(fd)) { + PERROR("Error closing FD %d", fd); + } + /* Put self-ref for this index due to error. */ + relay_index_put(index); + /* Will put the local ref. */ + goto end; } - } else { - /* The index already exists so write it on disk. */ - wr_index = index; } - /* Do we have a writable ready index to write on disk. */ - if (wr_index) { - ret = relay_index_write(wr_index->fd, wr_index); - if (ret < 0) { - goto error; - } - stream->total_index_received++; - stream->indexes_in_flight--; - assert(stream->indexes_in_flight >= 0); + if (relay_index_set_fd(index, stream->index_fd, data_offset)) { + ret = -1; + /* Put self-ref for this index due to error. */ + relay_index_put(index); + goto end; } -error: + ret = relay_index_try_flush(index); + if (ret == 0) { + stream->total_index_received++; + } else if (ret > 0) { + /* No flush. */ + ret = 0; + } else { + /* Put self-ref for this index due to error. */ + relay_index_put(index); + ret = -1; + } +end: return ret; } /* * relay_process_data: Process the data received on the data socket */ -static -int relay_process_data(struct relay_connection *conn) +static int relay_process_data(struct relay_connection *conn) { int ret = 0, rotate_index = 0; ssize_t size_ret; @@ -2309,8 +2147,6 @@ int relay_process_data(struct relay_connection *conn) uint32_t data_size; struct relay_session *session; - assert(conn); - ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); if (ret <= 0) { @@ -2325,17 +2161,12 @@ int relay_process_data(struct relay_connection *conn) } stream_id = be64toh(data_hdr.stream_id); - - rcu_read_lock(); - stream = stream_find_by_id(relay_streams_ht, stream_id); + stream = stream_get_by_id(stream_id); if (!stream) { ret = -1; - goto end_rcu_unlock; + goto end; } - - session = session_find_by_id(conn->sessions_ht, stream->session_id); - assert(session); - + session = stream->trace->session; data_size = be32toh(data_hdr.data_size); if (data_buffer_size < data_size) { char *tmp_data_ptr; @@ -2345,7 +2176,7 @@ int relay_process_data(struct relay_connection *conn) ERR("Allocating data buffer"); free(data_buffer); ret = -1; - goto end_rcu_unlock; + goto end_stream_put; } data_buffer = tmp_data_ptr; data_buffer_size = data_size; @@ -2363,114 +2194,95 @@ int relay_process_data(struct relay_connection *conn) DBG("Socket %d did an orderly shutdown", conn->sock->fd); } ret = -1; - goto end_rcu_unlock; + goto end_stream_put; } + pthread_mutex_lock(&stream->lock); + /* Check if a rotation is needed. */ if (stream->tracefile_size > 0 && (stream->tracefile_size_current + data_size) > stream->tracefile_size) { - struct relay_viewer_stream *vstream; uint64_t new_id; - new_id = (stream->tracefile_count_current + 1) % + new_id = (stream->current_tracefile_id + 1) % stream->tracefile_count; /* - * When we wrap-around back to 0, we start overwriting old - * trace data. + * Move viewer oldest available data position forward if + * we are overwriting a tracefile. */ - if (!stream->tracefile_overwrite && new_id == 0) { - stream->tracefile_overwrite = 1; - } - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - if (stream->tracefile_overwrite) { + if (new_id == stream->oldest_tracefile_id) { stream->oldest_tracefile_id = (stream->oldest_tracefile_id + 1) % stream->tracefile_count; } - vstream = viewer_stream_find_by_id(stream->stream_handle); - if (vstream) { - /* - * The viewer is reading a file about to be - * overwritten. Close the FDs it is - * currently using and let it handle the fault. - */ - if (vstream->tracefile_count_current == new_id) { - pthread_mutex_lock(&vstream->overwrite_lock); - vstream->abort_flag = 1; - pthread_mutex_unlock(&vstream->overwrite_lock); - DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n", - stream->channel_name, new_id); - } else if (vstream->tracefile_count_current == - stream->tracefile_count_current) { - /* - * The reader and writer were in the - * same trace file, inform the viewer - * that no new index will ever be added - * to this file. - */ - vstream->close_write_flag = 1; - } - } - ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, - stream->tracefile_size, stream->tracefile_count, - relayd_uid, relayd_gid, stream->fd, - &(stream->tracefile_count_current), &stream->fd); - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); + ret = utils_rotate_stream_file(stream->path_name, + stream->channel_name, stream->tracefile_size, + stream->tracefile_count, -1, + -1, stream->stream_fd->fd, + &stream->current_tracefile_id, + &stream->stream_fd->fd); if (ret < 0) { ERR("Rotating stream output file"); - goto end_rcu_unlock; + goto end_stream_unlock; + } + stream->current_tracefile_seq++; + if (stream->current_tracefile_seq + - stream->oldest_tracefile_seq >= + stream->tracefile_count) { + stream->oldest_tracefile_seq++; } - /* Reset current size because we just perform a stream rotation. */ + /* + * Reset current size because we just performed a stream + * rotation. + */ stream->tracefile_size_current = 0; rotate_index = 1; } /* - * Index are handled in protocol version 2.4 and above. Also, snapshot and - * index are NOT supported. + * Index are handled in protocol version 2.4 and above. Also, + * snapshot and index are NOT supported. */ if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { - goto end_rcu_unlock; + goto end_stream_unlock; } } /* Write data to stream output fd. */ - size_ret = lttng_write(stream->fd, data_buffer, data_size); + size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size); if (size_ret < data_size) { ERR("Relay error writing data to file"); ret = -1; - goto end_rcu_unlock; + goto end_stream_unlock; } - DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64, - ret, stream->stream_handle); + DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64, + size_ret, stream->stream_handle); - ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size)); + ret = write_padding_to_file(stream->stream_fd->fd, + be32toh(data_hdr.padding_size)); if (ret < 0) { - goto end_rcu_unlock; + goto end_stream_unlock; } - stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size); - + stream->tracefile_size_current += + data_size + be32toh(data_hdr.padding_size); stream->prev_seq = net_seq_num; - try_close_stream(session, stream); - -end_rcu_unlock: - rcu_read_unlock(); +end_stream_unlock: + pthread_mutex_unlock(&stream->lock); +end_stream_put: + stream_put(stream); end: return ret; } -static -void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) +static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) { int ret; - assert(events); - (void) lttng_poll_del(events, pollfd); ret = close(pollfd); @@ -2479,27 +2291,36 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) } } -static void destroy_connection(struct lttng_ht *relay_connections_ht, - struct relay_connection *conn) +static void relay_thread_close_connection(struct lttng_poll_event *events, + int pollfd, struct relay_connection *conn) { - assert(relay_connections_ht); - assert(conn); + const char *type_str; - connection_delete(relay_connections_ht, conn); - - /* For the control socket, we try to destroy the session. */ - if (conn->type == RELAY_CONTROL && conn->session) { - destroy_session(conn->session, conn->sessions_ht); + switch (conn->type) { + case RELAY_DATA: + type_str = "Data"; + break; + case RELAY_CONTROL: + type_str = "Control"; + break; + case RELAY_VIEWER_COMMAND: + type_str = "Viewer Command"; + break; + case RELAY_VIEWER_NOTIFICATION: + type_str = "Viewer Notification"; + break; + default: + type_str = "Unknown"; } - - connection_destroy(conn); + cleanup_connection_pollfd(events, pollfd); + connection_put(conn); + DBG("%s connection closed with %d", type_str, pollfd); } /* * This thread does the actual work */ -static -void *relay_thread_worker(void *data) +static void *relay_thread_worker(void *data) { int ret, err = -1, last_seen_data_fd = -1; uint32_t nb_fd; @@ -2507,9 +2328,6 @@ void *relay_thread_worker(void *data) struct lttng_ht *relay_connections_ht; struct lttng_ht_iter iter; struct lttcomm_relayd_hdr recv_hdr; - struct relay_local_data *relay_ctx = (struct relay_local_data *) data; - struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; - struct relay_index *index; struct relay_connection *destroy_conn = NULL; DBG("[thread] Relay worker started"); @@ -2530,12 +2348,6 @@ void *relay_thread_worker(void *data) goto relay_connections_ht_error; } - /* Tables of received indexes indexed by index handle and net_seq_num. */ - indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64); - if (!indexes_ht) { - goto indexes_ht_error; - } - ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -2570,9 +2382,9 @@ restart: nb_fd = ret; /* - * Process control. The control connection is prioritised so we don't - * starve it with high throughout put tracing data on the data - * connection. + * Process control. The control connection is + * prioritized so we don't starve it with high + * throughput tracing data on the data connection. */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ @@ -2582,7 +2394,10 @@ restart: health_code_update(); if (!revents) { - /* No activity for this FD (poll implementation). */ + /* + * No activity for this FD (poll + * implementation). + */ continue; } @@ -2605,27 +2420,20 @@ restart: if (ret < 0) { goto error; } - conn->sessions_ht = sessions_ht; - connection_init(conn); lttng_poll_add(&events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); - rcu_read_lock(); - lttng_ht_add_unique_ulong(relay_connections_ht, - &conn->sock_n); - rcu_read_unlock(); + connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); } } else { struct relay_connection *ctrl_conn; - rcu_read_lock(); - ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd); + ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd); /* If not found, there is a synchronization issue. */ assert(ctrl_conn); if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, ctrl_conn); + relay_thread_close_connection(&events, pollfd, ctrl_conn); if (last_seen_data_fd == pollfd) { last_seen_data_fd = last_notdel_data_fd; } @@ -2635,16 +2443,14 @@ restart: sizeof(recv_hdr), 0); if (ret <= 0) { /* Connection closed */ - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, ctrl_conn); - DBG("Control connection closed with %d", pollfd); + relay_thread_close_connection(&events, pollfd, + ctrl_conn); } else { ret = relay_process_control(&recv_hdr, ctrl_conn); if (ret < 0) { /* Clear the session on error. */ - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, ctrl_conn); - DBG("Connection closed with %d", pollfd); + relay_thread_close_connection(&events, pollfd, + ctrl_conn); } seen_control = 1; } @@ -2659,7 +2465,7 @@ restart: } else { ERR("Unknown poll events %u for sock %d", revents, pollfd); } - rcu_read_unlock(); + connection_put(ctrl_conn); } } @@ -2703,26 +2509,22 @@ restart: continue; } - rcu_read_lock(); - data_conn = connection_find_by_sock(relay_connections_ht, pollfd); + data_conn = connection_get_by_sock(relay_connections_ht, pollfd); if (!data_conn) { /* Skip it. Might be removed before. */ - rcu_read_unlock(); continue; } if (revents & LPOLLIN) { if (data_conn->type != RELAY_DATA) { - rcu_read_unlock(); - continue; + goto put_connection; } ret = relay_process_data(data_conn); /* Connection closed */ if (ret < 0) { - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, data_conn); - DBG("Data connection closed with %d", pollfd); + relay_thread_close_connection(&events, pollfd, + data_conn); /* * Every goto restart call sets the last seen fd where * here we don't really care since we gracefully @@ -2731,11 +2533,12 @@ restart: } else { /* Keep last seen port. */ last_seen_data_fd = pollfd; - rcu_read_unlock(); + connection_put(data_conn); goto restart; } } - rcu_read_unlock(); + put_connection: + connection_put(data_conn); } last_seen_data_fd = -1; } @@ -2745,28 +2548,23 @@ restart: exit: error: - lttng_poll_clean(&events); - /* Cleanup reamaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { health_code_update(); - destroy_connection(relay_connections_ht, destroy_conn); + /* + * No need to grab another ref, because we own + * destroy_conn. + */ + relay_thread_close_connection(&events, destroy_conn->sock->fd, + destroy_conn); } rcu_read_unlock(); + + lttng_poll_clean(&events); error_poll_create: - rcu_read_lock(); - cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, - index_n.node) { - health_code_update(); - relay_index_delete(index); - relay_index_free_safe(index); - } - rcu_read_unlock(); - lttng_ht_destroy(indexes_ht); -indexes_ht_error: lttng_ht_destroy(relay_connections_ht); relay_connections_ht_error: /* Close relay conn pipes */ @@ -2807,7 +2605,6 @@ int main(int argc, char **argv) { int ret = 0, retval = 0; void *status; - struct relay_local_data *relay_ctx = NULL; /* Parse arguments */ progname = argv[0]; @@ -2874,12 +2671,8 @@ int main(int argc, char **argv) goto exit_init_data; } - /* We need those values for the file/dir creation. */ - relayd_uid = getuid(); - relayd_gid = getgid(); - /* Check if daemon is UID = 0 */ - if (relayd_uid == 0) { + if (!getuid()) { if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); retval = -1; @@ -2906,16 +2699,9 @@ int main(int argc, char **argv) lttcomm_init(); lttcomm_inet_init(); - relay_ctx = zmalloc(sizeof(struct relay_local_data)); - if (!relay_ctx) { - PERROR("relay_ctx"); - retval = -1; - goto exit_init_data; - } - /* tables of sessions indexed by session ID */ - relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); - if (!relay_ctx->sessions_ht) { + sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!sessions_ht) { retval = -1; goto exit_init_data; } @@ -2962,7 +2748,7 @@ int main(int argc, char **argv) /* Setup the worker thread */ ret = pthread_create(&worker_thread, NULL, - relay_thread_worker, (void *) relay_ctx); + relay_thread_worker, NULL); if (ret) { errno = ret; PERROR("pthread_create worker"); @@ -2980,7 +2766,7 @@ int main(int argc, char **argv) goto exit_listener_thread; } - ret = relayd_live_create(live_uri, relay_ctx); + ret = relayd_live_create(live_uri); if (ret) { ERR("Starting live viewer threads"); retval = -1; @@ -3037,7 +2823,10 @@ exit_init_data: health_app_destroy(health_relayd); exit_health_app_create: exit_options: - relayd_cleanup(relay_ctx); + relayd_cleanup(); + + /* Ensure all prior call_rcu are done. */ + rcu_barrier(); if (!retval) { exit(EXIT_SUCCESS); diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c index 46d9cc66a..33d7d43c0 100644 --- a/src/bin/lttng-relayd/session.c +++ b/src/bin/lttng-relayd/session.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -19,28 +20,25 @@ #define _GNU_SOURCE #define _LGPL_SOURCE #include +#include +#include "lttng-relayd.h" #include "ctf-trace.h" #include "session.h" #include "stream.h" /* Global session id used in the session creation. */ static uint64_t last_relay_session_id; - -static void rcu_destroy_session(struct rcu_head *head) -{ - struct relay_session *session = - caa_container_of(head, struct relay_session, rcu_node); - - free(session); -} +static pthread_mutex_t last_relay_session_id_lock = PTHREAD_MUTEX_INITIALIZER; /* * Create a new session by assigning a new session ID. * * Return allocated session or else NULL. */ -struct relay_session *session_create(void) +struct relay_session *session_create(const char *session_name, + const char *hostname, uint32_t live_timer, + bool snapshot, uint32_t major, uint32_t minor) { struct relay_session *session; @@ -57,30 +55,62 @@ struct relay_session *session_create(void) goto error; } - pthread_mutex_init(&session->viewer_ready_lock, NULL); + pthread_mutex_lock(&last_relay_session_id_lock); session->id = ++last_relay_session_id; + pthread_mutex_unlock(&last_relay_session_id_lock); + + session->major = major; + session->minor = minor; lttng_ht_node_init_u64(&session->session_n, session->id); + urcu_ref_init(&session->ref); + CDS_INIT_LIST_HEAD(&session->recv_list); + pthread_mutex_init(&session->lock, NULL); + pthread_mutex_init(&session->reflock, NULL); + pthread_mutex_init(&session->recv_list_lock, NULL); + + strncpy(session->session_name, session_name, + sizeof(session->session_name)); + strncpy(session->hostname, hostname, + sizeof(session->hostname)); + session->live_timer = live_timer; + session->snapshot = snapshot; + + lttng_ht_add_unique_u64(sessions_ht, &session->session_n); error: return session; } +/* Should be called with RCU read-side lock held. */ +bool session_get(struct relay_session *session) +{ + bool has_ref = false; + + pthread_mutex_lock(&session->reflock); + if (session->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&session->ref); + } + pthread_mutex_unlock(&session->reflock); + + return has_ref; +} + /* - * Lookup a session within the given hash table and session id. RCU read side - * lock MUST be acquired before calling this and as long as the caller has a - * reference to the object. + * Lookup a session within the session hash table using the session id + * as key. A session reference is taken when a session is returned. + * session_put() must be called on that session. * * Return session or NULL if not found. */ -struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id) +struct relay_session *session_get_by_id(uint64_t id) { struct relay_session *session = NULL; struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - assert(ht); - - lttng_ht_lookup(ht, &id, &iter); + rcu_read_lock(); + lttng_ht_lookup(sessions_ht, &id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { DBG("Session find by ID %" PRIu64 " id NOT found", id); @@ -88,97 +118,128 @@ struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id) } session = caa_container_of(node, struct relay_session, session_n); DBG("Session find by ID %" PRIu64 " id found", id); - + if (!session_get(session)) { + session = NULL; + } end: + rcu_read_unlock(); return session; } +static void rcu_destroy_session(struct rcu_head *rcu_head) +{ + struct relay_session *session = + caa_container_of(rcu_head, struct relay_session, + rcu_node); + + free(session); +} + /* * Delete session from the given hash table. * * Return lttng ht del error code being 0 on success and 1 on failure. */ -int session_delete(struct lttng_ht *ht, struct relay_session *session) +static int session_delete(struct relay_session *session) { struct lttng_ht_iter iter; - assert(ht); - assert(session); - iter.iter.node = &session->session_n.node; - return lttng_ht_del(ht, &iter); + return lttng_ht_del(sessions_ht, &iter); } -/* - * The caller MUST be from the viewer thread since the viewer refcount is - * decremented. With this calue down to 0, it will try to destroy the session. - */ -void session_viewer_try_destroy(struct lttng_ht *ht, - struct relay_session *session) + +static void destroy_session(struct relay_session *session) +{ + int ret; + + ret = session_delete(session); + assert(!ret); + /* + * Since each trace has a reference on the session, it means + * that if we are at the point where we teardown the session, no + * trace belonging to that session exist at this point. + */ + lttng_ht_destroy(session->ctf_traces_ht); + call_rcu(&session->rcu_node, rcu_destroy_session); +} + +void session_release(struct urcu_ref *ref) { - unsigned long ret_ref; + struct relay_session *session = + caa_container_of(ref, struct relay_session, ref); - assert(session); + destroy_session(session); +} - ret_ref = uatomic_add_return(&session->viewer_refcount, -1); - if (ret_ref == 0) { - session_try_destroy(ht, session); - } +void session_put(struct relay_session *session) +{ + rcu_read_lock(); + pthread_mutex_lock(&session->reflock); + urcu_ref_put(&session->ref, session_release); + pthread_mutex_unlock(&session->reflock); + rcu_read_unlock(); } -/* - * Should only be called from the main streaming thread since it does not touch - * the viewer refcount. If this refcount is down to 0, destroy the session only - * and only if the session deletion succeeds. This is done because the viewer - * *and* the streaming thread can both concurently try to destroy the session - * thus the first come first serve. - */ -void session_try_destroy(struct lttng_ht *ht, struct relay_session *session) +int session_close(struct relay_session *session) { int ret = 0; - unsigned long ret_ref; - - assert(session); + struct ctf_trace *trace; + struct lttng_ht_iter iter; + struct relay_stream *stream; + + pthread_mutex_lock(&session->lock); + DBG("closing session %" PRIu64 ": is conn already closed %d", + session->id, session->connection_closed); + if (session->connection_closed) { + ret = -1; + goto unlock; + } + session->connection_closed = true; +unlock: + pthread_mutex_unlock(&session->lock); + if (ret) { + return ret; + } - ret_ref = uatomic_read(&session->viewer_refcount); - if (ret_ref == 0 && session->close_flag) { - if (ht) { - ret = session_delete(ht, session); - } - if (!ret) { - /* Only destroy the session if the deletion was successful. */ - session_destroy(session); + rcu_read_lock(); + cds_lfht_for_each_entry(session->ctf_traces_ht->ht, + &iter.iter, trace, node.node) { + ret = ctf_trace_close(trace); + if (ret) { + goto rcu_unlock; } } + cds_list_for_each_entry_rcu(stream, &session->recv_list, + recv_node) { + stream_close(stream); + } +rcu_unlock: + rcu_read_unlock(); + if (ret) { + return ret; + } + /* Put self-reference from create. */ + session_put(session); + return ret; } -/* - * Destroy a session object. - * - * This function must *NOT* be called with an RCU read lock held since - * the session's ctf_traces_ht is destroyed. - */ -void session_destroy(struct relay_session *session) +void print_sessions(void) { - struct ctf_trace *ctf_trace; struct lttng_ht_iter iter; + struct relay_session *session; - assert(session); - - DBG("Relay destroying session %" PRIu64, session->id); - - /* - * Empty the ctf trace hash table which will destroy the stream contained - * in that table. - */ rcu_read_lock(); - cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, - node.node) { - ctf_trace_delete(session->ctf_traces_ht, ctf_trace); - ctf_trace_destroy(ctf_trace); + cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session, + session_n.node) { + if (!session_get(session)) { + continue; + } + DBG("session %p refcount %ld session %" PRIu64, + session, + session->ref.refcount, + session->id); + session_put(session); } rcu_read_unlock(); - lttng_ht_destroy(session->ctf_traces_ht); - - call_rcu(&session->rcu_node, rcu_destroy_session); } diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h index cb125be18..1a37cfee2 100644 --- a/src/bin/lttng-relayd/session.h +++ b/src/bin/lttng-relayd/session.h @@ -1,6 +1,10 @@ +#ifndef _SESSION_H +#define _SESSION_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,13 +20,11 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _SESSION_H -#define _SESSION_H - #include #include #include #include +#include #include @@ -31,35 +33,56 @@ */ struct relay_session { /* - * This session id is used to identify a set of stream to a tracing session - * but also make sure we have a unique session id associated with a session - * daemon which can provide multiple data source. + * This session id is generated by the relay daemon to guarantee + * its uniqueness even when serving multiple session daemons. + * It is used to match a set of streams to their session. */ uint64_t id; char session_name[NAME_MAX]; char hostname[HOST_NAME_MAX]; uint32_t live_timer; - struct lttng_ht_node_u64 session_n; - struct rcu_head rcu_node; - uint32_t stream_count; - /* Tell if this session is for a snapshot or not. */ - unsigned int snapshot:1; - /* Tell if the session has been closed on the streaming side. */ - unsigned int close_flag:1; - /* Number of viewer using it. Set to 0, it should be destroyed. */ - int viewer_refcount; + /* Session in snapshot mode. */ + bool snapshot; + + /* + * Session has no back reference to its connection because it + * has a life-time that can be longer than the consumer connection's + * life-time; a reference can still be held by the viewer + * connection through the viewer streams. + */ + + struct urcu_ref ref; + /* session reflock nests inside ctf_trace reflock. */ + pthread_mutex_t reflock; + + pthread_mutex_t lock; + + /* major/minor version used for this session. */ + uint32_t major; + uint32_t minor; + + bool viewer_attached; + /* Tell if the session connection has been closed on the streaming side. */ + bool connection_closed; /* Contains ctf_trace object of that session indexed by path name. */ struct lttng_ht *ctf_traces_ht; /* - * Indicate version protocol for this session. This is especially useful - * for the data thread that has no idea which version it operates on since - * linking control/data sockets is non trivial. + * This contains streams that are received on that connection. + * It's used to store them until we get the streams sent + * command. When this is received, we remove those streams from + * the list and publish them. + * + * Updates are protected by the recv_list_lock. + * Traversals are protected by RCU. + * recv_list_lock also protects stream_count. */ - uint64_t minor; - uint64_t major; + struct cds_list_head recv_list; /* RCU list. */ + uint32_t stream_count; + pthread_mutex_t recv_list_lock; + /* * Flag checked and exchanged with uatomic_cmpxchg to tell the * viewer-side if new streams got added since the last check. @@ -67,50 +90,26 @@ struct relay_session { unsigned long new_streams; /* - * Used to synchronize the process where we flag every streams readiness - * for the viewer when the streams_sent message is received and the viewer - * process of sending those streams. + * Node in the global session hash table. */ - pthread_mutex_t viewer_ready_lock; - + struct lttng_ht_node_u64 session_n; /* * Member of the session list in struct relay_viewer_session. + * Updates are protected by the relay_viewer_session + * session_list_lock. Traversals are protected by RCU. */ - struct cds_list_head viewer_session_list; -}; - -struct relay_viewer_session { - struct cds_list_head sessions_head; + struct cds_list_head viewer_session_node; + struct rcu_head rcu_node; /* For call_rcu teardown. */ }; -static inline void session_viewer_attach(struct relay_session *session) -{ - uatomic_inc(&session->viewer_refcount); -} - -static inline void session_viewer_detach(struct relay_session *session) -{ - uatomic_add(&session->viewer_refcount, -1); -} +struct relay_session *session_create(const char *session_name, + const char *hostname, uint32_t live_timer, + bool snapshot, uint32_t major, uint32_t minor); +struct relay_session *session_get_by_id(uint64_t id); +bool session_get(struct relay_session *session); +void session_put(struct relay_session *session); -struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id); -struct relay_session *session_create(void); -int session_delete(struct lttng_ht *ht, struct relay_session *session); - -/* - * Direct destroy without reading the refcount. - */ -void session_destroy(struct relay_session *session); - -/* - * Destroy the session if the refcount is down to 0. - */ -void session_try_destroy(struct lttng_ht *ht, struct relay_session *session); - -/* - * Decrement the viewer refcount and destroy it if down to 0. - */ -void session_viewer_try_destroy(struct lttng_ht *ht, - struct relay_session *session); +int session_close(struct relay_session *session); +void print_sessions(void); #endif /* _SESSION_H */ diff --git a/src/bin/lttng-relayd/stream-fd.c b/src/bin/lttng-relayd/stream-fd.c new file mode 100644 index 000000000..57324d7a0 --- /dev/null +++ b/src/bin/lttng-relayd/stream-fd.c @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2015 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#define _LGPL_SOURCE +#include + +#include "stream-fd.h" + +struct stream_fd *stream_fd_create(int fd) +{ + struct stream_fd *sf; + + sf = zmalloc(sizeof(*sf)); + if (!sf) { + goto end; + } + urcu_ref_init(&sf->ref); + sf->fd = fd; +end: + return sf; +} + +void stream_fd_get(struct stream_fd *sf) +{ + urcu_ref_get(&sf->ref); +} + +static void stream_fd_release(struct urcu_ref *ref) +{ + struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref); + int ret; + + ret = close(sf->fd); + if (ret) { + PERROR("Error closing stream FD %d", sf->fd); + } + free(sf); +} + +void stream_fd_put(struct stream_fd *sf) +{ + urcu_ref_put(&sf->ref, stream_fd_release); +} diff --git a/src/bin/lttng-relayd/stream-fd.h b/src/bin/lttng-relayd/stream-fd.h new file mode 100644 index 000000000..64f3b16a5 --- /dev/null +++ b/src/bin/lttng-relayd/stream-fd.h @@ -0,0 +1,32 @@ +#ifndef _STREAM_FD_H +#define _STREAM_FD_H + +/* + * Copyright (C) 2015 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include + +struct stream_fd { + int fd; + struct urcu_ref ref; +}; + +struct stream_fd *stream_fd_create(int fd); +void stream_fd_get(struct stream_fd *sf); +void stream_fd_put(struct stream_fd *sf); + +#endif /* _STREAM_FD_H */ diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 17a5bcd4f..af4ef1bbb 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -19,128 +20,342 @@ #define _GNU_SOURCE #define _LGPL_SOURCE #include +#include +#include +#include +#include +#include "lttng-relayd.h" #include "index.h" #include "stream.h" #include "viewer-stream.h" +/* Should be called with RCU read-side lock held. */ +bool stream_get(struct relay_stream *stream) +{ + bool has_ref = false; + + pthread_mutex_lock(&stream->reflock); + if (stream->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&stream->ref); + } + pthread_mutex_unlock(&stream->reflock); + + return has_ref; +} + /* - * Get stream from stream id from the given hash table. Return stream if found - * else NULL. - * - * Need to be called with RCU read-side lock held. + * Get stream from stream id from the streams hash table. Return stream + * if found else NULL. A stream reference is taken when a stream is + * returned. stream_put() must be called on that stream. */ -struct relay_stream *stream_find_by_id(struct lttng_ht *ht, - uint64_t stream_id) +struct relay_stream *stream_get_by_id(uint64_t stream_id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; struct relay_stream *stream = NULL; - assert(ht); - - lttng_ht_lookup(ht, &stream_id, &iter); + rcu_read_lock(); + lttng_ht_lookup(relay_streams_ht, &stream_id, &iter); node = lttng_ht_iter_get_node_u64(&iter); - if (node == NULL) { + if (!node) { DBG("Relay stream %" PRIu64 " not found", stream_id); goto end; } stream = caa_container_of(node, struct relay_stream, node); - + if (!stream_get(stream)) { + stream = NULL; + } end: + rcu_read_unlock(); return stream; } /* - * Close a given stream. If an assosiated viewer stream exists it is updated. - * - * RCU read side lock MUST be acquired. - * - * Return 0 if close was successful or 1 if already closed. + * We keep ownership of path_name and channel_name. */ -int stream_close(struct relay_session *session, struct relay_stream *stream) +struct relay_stream *stream_create(struct ctf_trace *trace, + uint64_t stream_handle, char *path_name, + char *channel_name, uint64_t tracefile_size, + uint64_t tracefile_count) { - int delret, ret; - struct relay_viewer_stream *vstream; - struct ctf_trace *ctf_trace; + int ret; + struct relay_stream *stream = NULL; + struct relay_session *session = trace->session; - assert(stream); + stream = zmalloc(sizeof(struct relay_stream)); + if (stream == NULL) { + PERROR("relay stream zmalloc"); + ret = -1; + goto error_no_alloc; + } - pthread_mutex_lock(&stream->lock); + stream->stream_handle = stream_handle; + stream->prev_seq = -1ULL; + stream->ctf_stream_id = -1ULL; + stream->tracefile_size = tracefile_size; + stream->tracefile_count = tracefile_count; + stream->path_name = path_name; + stream->channel_name = channel_name; + lttng_ht_node_init_u64(&stream->node, stream->stream_handle); + pthread_mutex_init(&stream->lock, NULL); + pthread_mutex_init(&stream->reflock, NULL); + urcu_ref_init(&stream->ref); + ctf_trace_get(trace); + stream->trace = trace; - if (stream->terminated_flag) { - /* This stream is already closed. Ignore. */ - ret = 1; - goto end_unlock; + stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); + if (!stream->indexes_ht) { + ERR("Cannot created indexes_ht"); + ret = -1; + goto end; } - DBG("Closing stream id %" PRIu64, stream->stream_handle); + ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG, + -1, -1); + if (ret < 0) { + ERR("relay creating output directory"); + goto end; + } - if (stream->fd >= 0) { - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); + /* + * No need to use run_as API here because whatever we receive, + * the relayd uses its own credentials for the stream files. + */ + ret = utils_create_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, 0, -1, -1, NULL); + if (ret < 0) { + ERR("Create output file"); + goto end; + } + stream->stream_fd = stream_fd_create(ret); + if (!stream->stream_fd) { + if (close(ret)) { + PERROR("Error closing file %d", ret); } + ret = -1; + goto end; } + if (stream->tracefile_size) { + DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); + } else { + DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); + } + + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { + stream->is_metadata = 1; + } + + stream->in_recv_list = true; + + /* + * Add the stream in the recv list of the session. Once the end stream + * message is received, all session streams are published. + */ + pthread_mutex_lock(&session->recv_list_lock); + cds_list_add_rcu(&stream->recv_node, &session->recv_list); + session->stream_count++; + pthread_mutex_unlock(&session->recv_list_lock); + + /* + * Both in the ctf_trace object and the global stream ht since the data + * side of the relayd does not have the concept of session. + */ + lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, + stream->stream_handle); + ret = 0; + +end: + if (ret) { + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; } + stream_put(stream); + stream = NULL; } + return stream; - vstream = viewer_stream_find_by_id(stream->stream_handle); - if (vstream) { - /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. - */ - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - vstream->total_index_received = stream->total_index_received; - vstream->tracefile_count_last = stream->tracefile_count_current; - vstream->close_write_flag = 1; - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); +error_no_alloc: + /* + * path_name and channel_name need to be freed explicitly here + * because we cannot rely on stream_put(). + */ + free(path_name); + free(channel_name); + return NULL; +} + +/* + * Called with the session lock held. + */ +void stream_publish(struct relay_stream *stream) +{ + struct relay_session *session; + + pthread_mutex_lock(&stream->lock); + if (stream->published) { + goto unlock; } - /* Cleanup index of that stream. */ - relay_index_destroy_by_stream_id(stream->stream_handle); + session = stream->trace->session; - ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, - stream->path_name); - assert(ctf_trace); - ctf_trace_put_ref(ctf_trace); + pthread_mutex_lock(&session->recv_list_lock); + if (stream->in_recv_list) { + cds_list_del_rcu(&stream->recv_node); + stream->in_recv_list = false; + } + pthread_mutex_unlock(&session->recv_list_lock); - stream->close_flag = 1; - stream->terminated_flag = 1; - ret = 0; + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list); + pthread_mutex_unlock(&stream->trace->stream_list_lock); -end_unlock: + stream->published = true; +unlock: pthread_mutex_unlock(&stream->lock); - return ret; } -void stream_delete(struct lttng_ht *ht, struct relay_stream *stream) +/* + * Only called from destroy. No stream lock needed, since there is a + * single user at this point. This is ensured by having the refcount + * reaching 0. + */ +static void stream_unpublish(struct relay_stream *stream) { + if (!stream->published) { + return; + } + pthread_mutex_lock(&stream->trace->stream_list_lock); + cds_list_del_rcu(&stream->stream_node); + pthread_mutex_unlock(&stream->trace->stream_list_lock); + + stream->published = false; +} + +static void stream_destroy(struct relay_stream *stream) +{ + if (stream->indexes_ht) { + lttng_ht_destroy(stream->indexes_ht); + } + free(stream->path_name); + free(stream->channel_name); + free(stream); +} + +static void stream_destroy_rcu(struct rcu_head *rcu_head) +{ + struct relay_stream *stream = + caa_container_of(rcu_head, struct relay_stream, rcu_node); + + stream_destroy(stream); +} + +/* + * No need to take stream->lock since this is only called on the final + * stream_put which ensures that a single thread may act on the stream. + * + * At that point, the object is also protected by the reflock which + * guarantees that no other thread may share ownership of this stream. + */ +static void stream_release(struct urcu_ref *ref) +{ + struct relay_stream *stream = + caa_container_of(ref, struct relay_stream, ref); + struct relay_session *session; int ret; struct lttng_ht_iter iter; - assert(ht); - assert(stream); + session = stream->trace->session; + + DBG("Releasing stream id %" PRIu64, stream->stream_handle); + + pthread_mutex_lock(&session->recv_list_lock); + session->stream_count--; + if (stream->in_recv_list) { + cds_list_del_rcu(&stream->recv_node); + stream->in_recv_list = false; + } + pthread_mutex_unlock(&session->recv_list_lock); iter.iter.node = &stream->node.node; - ret = lttng_ht_del(ht, &iter); + ret = lttng_ht_del(relay_streams_ht, &iter); assert(!ret); - cds_list_del(&stream->trace_list); + stream_unpublish(stream); + + if (stream->stream_fd) { + stream_fd_put(stream->stream_fd); + stream->stream_fd = NULL; + } + if (stream->index_fd) { + stream_fd_put(stream->index_fd); + stream->index_fd = NULL; + } + if (stream->trace) { + ctf_trace_put(stream->trace); + stream->trace = NULL; + } + + call_rcu(&stream->rcu_node, stream_destroy_rcu); } -void stream_destroy(struct relay_stream *stream) +void stream_put(struct relay_stream *stream) { - assert(stream); - free(stream->path_name); - free(stream->channel_name); - free(stream); + DBG("stream put for stream id %" PRIu64, stream->stream_handle); + /* + * Ensure existence of stream->reflock for stream unlock. + */ + rcu_read_lock(); + /* + * Stream reflock ensures that concurrent test and update of + * stream ref is atomic. + */ + pthread_mutex_lock(&stream->reflock); + assert(stream->ref.refcount != 0); + /* + * Wait until we have processed all the stream packets before + * actually putting our last stream reference. + */ + DBG("stream put stream id %" PRIu64 " refcount %d", + stream->stream_handle, + (int) stream->ref.refcount); + urcu_ref_put(&stream->ref, stream_release); + pthread_mutex_unlock(&stream->reflock); + rcu_read_unlock(); +} + +void stream_close(struct relay_stream *stream) +{ + DBG("closing stream %" PRIu64, stream->stream_handle); + pthread_mutex_lock(&stream->lock); + relay_index_close_all(stream); + pthread_mutex_unlock(&stream->lock); + stream_put(stream); +} + +void print_relay_streams(void) +{ + struct lttng_ht_iter iter; + struct relay_stream *stream; + + rcu_read_lock(); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + node.node) { + if (!stream_get(stream)) { + continue; + } + DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + stream, + stream->ref.refcount, + stream->stream_handle, + stream->trace->id, + stream->trace->session->id); + stream_put(stream); + } + rcu_read_unlock(); } diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index 4dd2e627e..7e2b1334e 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -1,6 +1,10 @@ +#ifndef _STREAM_H +#define _STREAM_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,9 +20,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _STREAM_H -#define _STREAM_H - #include #include #include @@ -27,100 +28,115 @@ #include #include "session.h" +#include "stream-fd.h" /* * Represents a stream in the relay */ struct relay_stream { uint64_t stream_handle; - uint64_t prev_seq; /* previous data sequence number encountered */ - struct lttng_ht_node_u64 node; + /* - * When we receive a stream, it gets stored in a list (on a per connection - * basis) until we have all the streams of the same channel and the metadata - * associated with it, then it gets flagged with viewer_ready. + * reflock used to synchronize the closing of this stream. + * stream reflock nests inside viewer stream reflock. + * stream reflock nests inside index reflock. */ - struct cds_list_head recv_list; + pthread_mutex_t reflock; + struct urcu_ref ref; + /* Back reference to trace. Protected by refcount on trace object. */ + struct ctf_trace *trace; - /* Added to the corresponding ctf_trace. */ - struct cds_list_head trace_list; - struct rcu_head rcu_node; - uint64_t session_id; - int fd; + /* + * To protect from concurrent read/update. The viewer stream + * lock nests inside the stream lock. The stream lock nests + * inside the ctf_trace lock. + */ + pthread_mutex_t lock; + uint64_t prev_seq; /* previous data sequence number encountered. */ + uint64_t last_net_seq_num; /* seq num to encounter before closing. */ + + /* FD on which to write the stream data. */ + struct stream_fd *stream_fd; /* FD on which to write the index data. */ - int index_fd; - /* FD on which to read the index data for the viewer. */ - int read_index_fd; + struct stream_fd *index_fd; char *path_name; char *channel_name; - /* on-disk circular buffer of tracefiles */ + + /* On-disk circular buffer of tracefiles. */ uint64_t tracefile_size; uint64_t tracefile_size_current; uint64_t tracefile_count; - uint64_t tracefile_count_current; + uint64_t current_tracefile_id; + + uint64_t current_tracefile_seq; /* Free-running counter. */ + uint64_t oldest_tracefile_seq; /* Free-running counter. */ + /* To inform the viewer up to where it can go back in time. */ uint64_t oldest_tracefile_id; uint64_t total_index_received; - uint64_t last_net_seq_num; - /* - * To protect from concurrent read/update. Also used to synchronize the - * closing of this stream. - */ - pthread_mutex_t lock; + bool closed; /* Stream is closed. */ /* - * If the stream is inactive, this field is updated with the live beacon - * timestamp end, when it is active, this field == -1ULL. - */ - uint64_t beacon_ts_end; - /* - * Number of indexes that are supposed to be complete soon. - * Avoid sending the inactivity beacon to the client when data is in - * transit. + * Counts number of indexes in indexes_ht. Redundant info. + * Protected by stream lock. */ int indexes_in_flight; + struct lttng_ht *indexes_ht; + /* - * CTF stream ID, -1ULL when unset. + * If the stream is inactive, this field is updated with the + * live beacon timestamp end, when it is active, this + * field == -1ULL. */ + uint64_t beacon_ts_end; + + /* CTF stream ID, -1ULL when unset (first packet not received yet). */ uint64_t ctf_stream_id; - /* - * To protect the update of the close_write_flag and the checks of - * the tracefile_count_current. - * It is taken before checking whenever we need to know if the - * writer and reader are working in the same tracefile. - */ - pthread_mutex_t viewer_stream_rotation_lock; - /* Information telling us when to close the stream */ - unsigned int close_flag:1; + /* Indicate if the stream was initialized for a data pending command. */ + bool data_pending_check_done; + + /* Is this stream a metadata stream ? */ + int32_t is_metadata; + /* Amount of metadata received (bytes). */ + uint64_t metadata_received; + /* - * Indicates if the stream has been effectively closed thus having the - * information in it invalidated but NOT freed. The stream lock MUST be - * held to read/update that value. + * Member of the stream list in struct ctf_trace. + * Updates are protected by the stream_list_lock. + * Traversals are protected by RCU. */ - unsigned int terminated_flag:1; - /* Indicate if the stream was initialized for a data pending command. */ - unsigned int data_pending_check_done:1; - unsigned int metadata_flag:1; + struct cds_list_head stream_node; /* - * To detect when we start overwriting old data, it is used to - * update the oldest_tracefile_id. + * Temporary list belonging to the connection until all streams + * are received for that connection. + * Member of the stream recv list in the connection. + * Updates are protected by the stream_recv_list_lock. + * Traversals are protected by RCU. */ - unsigned int tracefile_overwrite:1; + bool in_recv_list; + struct cds_list_head recv_node; + bool published; /* Protected by session lock. */ /* - * Can this stream be used by a viewer or are we waiting for additional - * information. + * Node of stream within global stream hash table. */ - unsigned int viewer_ready:1; + struct lttng_ht_node_u64 node; + struct rcu_head rcu_node; /* For call_rcu teardown. */ }; -struct relay_stream *stream_find_by_id(struct lttng_ht *ht, - uint64_t stream_id); -int stream_close(struct relay_session *session, struct relay_stream *stream); -void stream_delete(struct lttng_ht *ht, struct relay_stream *stream); -void stream_destroy(struct relay_stream *stream); +struct relay_stream *stream_create(struct ctf_trace *trace, + uint64_t stream_handle, char *path_name, + char *channel_name, uint64_t tracefile_size, + uint64_t tracefile_count); + +struct relay_stream *stream_get_by_id(uint64_t stream_id); +bool stream_get(struct relay_stream *stream); +void stream_put(struct relay_stream *stream); +void stream_close(struct relay_stream *stream); +void stream_publish(struct relay_stream *stream); +void print_relay_streams(void); #endif /* _STREAM_H */ diff --git a/src/bin/lttng-relayd/utils.h b/src/bin/lttng-relayd/utils.h index de1521d6f..4a56980e4 100644 --- a/src/bin/lttng-relayd/utils.h +++ b/src/bin/lttng-relayd/utils.h @@ -1,6 +1,10 @@ +#ifndef RELAYD_UTILS_H +#define RELAYD_UTILS_H + /* * Copyright (C) 2012 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -16,9 +20,6 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef RELAYD_UTILS_H -#define RELAYD_UTILS_H - char *create_output_path(char *path_name); #endif /* RELAYD_UTILS_H */ diff --git a/src/bin/lttng-relayd/viewer-session.c b/src/bin/lttng-relayd/viewer-session.c new file mode 100644 index 000000000..657d3fc36 --- /dev/null +++ b/src/bin/lttng-relayd/viewer-session.c @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * 2015 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#define _LGPL_SOURCE +#include +#include + +#include "lttng-relayd.h" +#include "ctf-trace.h" +#include "session.h" +#include "viewer-session.h" +#include "viewer-stream.h" +#include "stream.h" + +struct relay_viewer_session *viewer_session_create(void) +{ + struct relay_viewer_session *vsession; + + vsession = zmalloc(sizeof(*vsession)); + if (!vsession) { + goto end; + } + CDS_INIT_LIST_HEAD(&vsession->session_list); +end: + return vsession; +} + +/* The existence of session must be guaranteed by the caller. */ +int viewer_session_attach(struct relay_viewer_session *vsession, + struct relay_session *session) +{ + int ret = 0; + + /* Will not fail, as per the ownership guarantee. */ + if (!session_get(session)) { + ret = -1; + goto end; + } + pthread_mutex_lock(&session->lock); + if (session->viewer_attached) { + ret = -1; + } else { + session->viewer_attached = true; + } + + if (!ret) { + pthread_mutex_lock(&vsession->session_list_lock); + /* Ownership is transfered to the list. */ + cds_list_add_rcu(&session->viewer_session_node, + &vsession->session_list); + pthread_mutex_unlock(&vsession->session_list_lock); + } else { + /* Put our local ref. */ + session_put(session); + } + /* Safe since we know the session exists. */ + pthread_mutex_unlock(&session->lock); +end: + return ret; +} + +/* The existence of session must be guaranteed by the caller. */ +static int viewer_session_detach(struct relay_viewer_session *vsession, + struct relay_session *session) +{ + int ret = 0; + + pthread_mutex_lock(&session->lock); + if (!session->viewer_attached) { + ret = -1; + } else { + session->viewer_attached = false; + } + + if (!ret) { + pthread_mutex_lock(&vsession->session_list_lock); + cds_list_del_rcu(&session->viewer_session_node); + pthread_mutex_unlock(&vsession->session_list_lock); + /* Release reference held by the list. */ + session_put(session); + } + /* Safe since we know the session exists. */ + pthread_mutex_unlock(&session->lock); + return ret; +} + +void viewer_session_destroy(struct relay_viewer_session *vsession) +{ + free(vsession); +} + +void viewer_session_close(struct relay_viewer_session *vsession) +{ + struct relay_session *session; + + rcu_read_lock(); + cds_list_for_each_entry_rcu(session, + &vsession->session_list, viewer_session_node) { + struct lttng_ht_iter iter; + struct relay_viewer_stream *vstream; + + /* + * TODO: improvement: create more efficient list of + * vstream per session. + */ + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, + vstream, stream_n.node) { + if (!viewer_stream_get(vstream)) { + continue; + } + if (vstream->stream->trace->session != session) { + viewer_stream_put(vstream); + continue; + } + /* Put local reference. */ + viewer_stream_put(vstream); + /* + * We have reached one of the viewer stream's lifetime + * end condition. + */ + viewer_stream_put(vstream); + } + + viewer_session_detach(vsession, session); + } + rcu_read_unlock(); +} + +/* + * Check if a connection is attached to a session. + * Return 1 if attached, 0 if not attached, a negative value on error. + */ +int viewer_session_is_attached(struct relay_viewer_session *vsession, + struct relay_session *session) +{ + struct relay_session *iter; + int found = 0; + + pthread_mutex_lock(&session->lock); + if (!vsession) { + goto end; + } + if (!session->viewer_attached) { + goto end; + } + rcu_read_lock(); + cds_list_for_each_entry_rcu(iter, + &vsession->session_list, + viewer_session_node) { + if (session == iter) { + found = 1; + goto end_rcu_unlock; + } + } +end_rcu_unlock: + rcu_read_unlock(); +end: + pthread_mutex_unlock(&session->lock); + return found; +} diff --git a/src/bin/lttng-relayd/viewer-session.h b/src/bin/lttng-relayd/viewer-session.h new file mode 100644 index 000000000..4013b3587 --- /dev/null +++ b/src/bin/lttng-relayd/viewer-session.h @@ -0,0 +1,53 @@ +#ifndef _VIEWER_SESSION_H +#define _VIEWER_SESSION_H + +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * 2015 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include +#include +#include +#include + +#include + +#include "session.h" + +struct relay_viewer_session { + /* + * Session list. Updates are protected by the session_list_lock. + * Traversals are protected by RCU. + * This list limits the design to having the sessions in at most + * one viewer session. + */ + struct cds_list_head session_list; /* RCU list. */ + pthread_mutex_t session_list_lock; /* Protects list updates. */ +}; + +struct relay_viewer_session *viewer_session_create(void); +void viewer_session_destroy(struct relay_viewer_session *vsession); +void viewer_session_close(struct relay_viewer_session *vsession); + +int viewer_session_attach(struct relay_viewer_session *vsession, + struct relay_session *session); +int viewer_session_is_attached(struct relay_viewer_session *vsession, + struct relay_session *session); + +#endif /* _VIEWER_SESSION_H */ diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 37486293f..1d02ee329 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -24,39 +25,32 @@ #include "lttng-relayd.h" #include "viewer-stream.h" -static void free_stream(struct relay_viewer_stream *stream) +static void viewer_stream_destroy(struct relay_viewer_stream *vstream) { - assert(stream); - - free(stream->path_name); - free(stream->channel_name); - free(stream); + free(vstream->path_name); + free(vstream->channel_name); + free(vstream); } -static void deferred_free_viewer_stream(struct rcu_head *head) +static void viewer_stream_destroy_rcu(struct rcu_head *head) { - struct relay_viewer_stream *stream = + struct relay_viewer_stream *vstream = caa_container_of(head, struct relay_viewer_stream, rcu_node); - free_stream(stream); + viewer_stream_destroy(vstream); } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) + enum lttng_viewer_seek seek_t) { struct relay_viewer_stream *vstream; - assert(stream); - assert(ctf_trace); - vstream = zmalloc(sizeof(*vstream)); if (!vstream) { PERROR("relay viewer stream zmalloc"); goto error; } - vstream->session_id = stream->session_id; - vstream->stream_handle = stream->stream_handle; vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); if (vstream->path_name == NULL) { PERROR("relay viewer path_name alloc"); @@ -68,216 +62,274 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, PERROR("relay viewer channel_name alloc"); goto error; } - vstream->tracefile_count = stream->tracefile_count; - vstream->metadata_flag = stream->metadata_flag; - vstream->tracefile_count_last = -1ULL; switch (seek_t) { case LTTNG_VIEWER_SEEK_BEGINNING: - vstream->tracefile_count_current = stream->oldest_tracefile_id; + vstream->current_tracefile_id = stream->oldest_tracefile_id; break; case LTTNG_VIEWER_SEEK_LAST: - vstream->tracefile_count_current = stream->tracefile_count_current; + vstream->current_tracefile_id = stream->current_tracefile_id; break; default: - assert(0); goto error; } - - if (vstream->metadata_flag) { - ctf_trace->viewer_metadata_stream = vstream; + if (!stream_get(stream)) { + ERR("Cannot get stream"); + goto error; } + vstream->stream = stream; - /* Globally visible after the add unique. */ - lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); - lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); - - vstream->index_read_fd = -1; - vstream->read_fd = -1; - - /* - * This is to avoid a race between the initialization of this object and - * the close of the given stream. If the stream is unable to find this - * viewer stream when closing, this copy will at least take the latest - * value. We also need that for the seek_last. - */ - vstream->total_index_received = stream->total_index_received; - + pthread_mutex_lock(&stream->lock); /* * If we never received an index for the current stream, delay the opening * of the index, otherwise open it right now. */ - if (vstream->tracefile_count_current == stream->tracefile_count_current - && vstream->total_index_received == 0) { - vstream->index_read_fd = -1; + if (vstream->current_tracefile_id == stream->current_tracefile_id + && stream->total_index_received == 0) { + vstream->index_fd = NULL; } else { int read_fd; read_fd = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + stream->tracefile_count, + vstream->current_tracefile_id); if (read_fd < 0) { - goto error; + goto error_unlock; + } + vstream->index_fd = stream_fd_create(read_fd); + if (!vstream->index_fd) { + if (close(read_fd)) { + PERROR("close"); + } + goto error_unlock; } - vstream->index_read_fd = read_fd; } - if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_read_fd >= 0) { + if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_fd) { off_t lseek_ret; - lseek_ret = lseek(vstream->index_read_fd, - vstream->total_index_received * sizeof(struct ctf_packet_index), - SEEK_CUR); + lseek_ret = lseek(vstream->index_fd->fd, 0, SEEK_END); if (lseek_ret < 0) { - goto error; + goto error_unlock; } - vstream->last_sent_index = vstream->total_index_received; + vstream->last_sent_index = stream->total_index_received; + } + pthread_mutex_unlock(&stream->lock); + + if (stream->is_metadata) { + rcu_assign_pointer(stream->trace->viewer_metadata_stream, + vstream); } + /* Globally visible after the add unique. */ + lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); + lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n); + + pthread_mutex_init(&vstream->reflock, NULL); + urcu_ref_init(&vstream->ref); + return vstream; +error_unlock: + pthread_mutex_unlock(&stream->lock); error: if (vstream) { - free_stream(vstream); + viewer_stream_destroy(vstream); } return NULL; } -void viewer_stream_delete(struct relay_viewer_stream *stream) +static void viewer_stream_unpublish(struct relay_viewer_stream *vstream) { int ret; struct lttng_ht_iter iter; - iter.iter.node = &stream->stream_n.node; + iter.iter.node = &vstream->stream_n.node; ret = lttng_ht_del(viewer_streams_ht, &iter); assert(!ret); } -void viewer_stream_destroy(struct ctf_trace *ctf_trace, - struct relay_viewer_stream *stream) +static void viewer_stream_release(struct urcu_ref *ref) { - int ret; - - assert(stream); + struct relay_viewer_stream *vstream = caa_container_of(ref, + struct relay_viewer_stream, ref); - if (ctf_trace) { - ctf_trace_put_ref(ctf_trace); + if (vstream->stream->is_metadata) { + rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL); } - if (stream->read_fd >= 0) { - ret = close(stream->read_fd); - if (ret < 0) { - PERROR("close read_fd"); - } + viewer_stream_unpublish(vstream); + + if (vstream->stream_fd) { + stream_fd_put(vstream->stream_fd); + vstream->stream_fd = NULL; } - if (stream->index_read_fd >= 0) { - ret = close(stream->index_read_fd); - if (ret < 0) { - PERROR("close index_read_fd"); - } + if (vstream->index_fd) { + stream_fd_put(vstream->index_fd); + vstream->index_fd = NULL; + } + if (vstream->stream) { + stream_put(vstream->stream); + vstream->stream = NULL; + } + call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu); +} + +/* Must be called with RCU read-side lock held. */ +bool viewer_stream_get(struct relay_viewer_stream *vstream) +{ + bool has_ref = false; + + pthread_mutex_lock(&vstream->reflock); + if (vstream->ref.refcount != 0) { + has_ref = true; + urcu_ref_get(&vstream->ref); } + pthread_mutex_unlock(&vstream->reflock); - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); + return has_ref; } /* - * Find viewer stream by id. RCU read side lock MUST be acquired. + * Get viewer stream by id. * - * Return stream if found else NULL. + * Return viewer stream if found else NULL. */ -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id) +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id) { struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; - struct relay_viewer_stream *stream = NULL; + struct relay_viewer_stream *vstream = NULL; + rcu_read_lock(); lttng_ht_lookup(viewer_streams_ht, &id, &iter); node = lttng_ht_iter_get_node_u64(&iter); if (!node) { DBG("Relay viewer stream %" PRIu64 " not found", id); goto end; } - stream = caa_container_of(node, struct relay_viewer_stream, stream_n); - + vstream = caa_container_of(node, struct relay_viewer_stream, stream_n); + if (!viewer_stream_get(vstream)) { + vstream = NULL; + } end: - return stream; + rcu_read_unlock(); + return vstream; +} + +void viewer_stream_put(struct relay_viewer_stream *vstream) +{ + rcu_read_lock(); + pthread_mutex_lock(&vstream->reflock); + urcu_ref_put(&vstream->ref, viewer_stream_release); + pthread_mutex_unlock(&vstream->reflock); + rcu_read_unlock(); +} + +/* + * Returns whether the current tracefile is readable. If not, it has + * been overwritten. + * Must be called with rstream lock held. + */ +bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream, + uint64_t seq) +{ + struct relay_stream *stream = vstream->stream; + + if (seq >= stream->oldest_tracefile_seq + && seq <= stream->current_tracefile_seq) { + /* seq is a readable file. */ + return true; + } else { + /* seq is not readable. */ + return false; + } } /* * Rotate a stream to the next tracefile. * - * Must be called with viewer_stream_rotation_lock held. + * Must be called with the rstream lock held. * Returns 0 on success, 1 on EOF, a negative value on error. */ -int viewer_stream_rotate(struct relay_viewer_stream *vstream, - struct relay_stream *stream) +int viewer_stream_rotate(struct relay_viewer_stream *vstream) { int ret; - uint64_t tracefile_id; + struct relay_stream *stream = vstream->stream; - assert(vstream); - assert(stream); - - if (vstream->tracefile_count == 0) { - /* Ignore rotation, there is none to do. */ - ret = 0; + /* Detect the last tracefile to open. */ + if (stream->total_index_received == vstream->last_sent_index + && stream->trace->session->connection_closed) { + ret = 1; goto end; } - tracefile_id = (vstream->tracefile_count_current + 1) % - vstream->tracefile_count; - - /* Detect the last tracefile to open. */ - if (vstream->tracefile_count_last != -1ULL && - vstream->tracefile_count_last == - vstream->tracefile_count_current) { - ret = 1; + if (stream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; goto end; } - /* - * The writer and the reader are not working in the same tracefile, we can - * read up to EOF, we don't care about the total_index_received. - */ - if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { - vstream->close_write_flag = 1; + if (!viewer_stream_is_tracefile_seq_readable(vstream, + vstream->current_tracefile_seq + 1)) { + vstream->current_tracefile_id = + stream->oldest_tracefile_id; + vstream->current_tracefile_seq = + stream->oldest_tracefile_seq; } else { - /* - * We are opening a file that is still open in write, make sure we - * limit our reading to the number of indexes received. - */ - vstream->close_write_flag = 0; - if (stream->close_flag) { - vstream->total_index_received = stream->total_index_received; - } + vstream->current_tracefile_id = + (vstream->current_tracefile_id + 1) + % stream->tracefile_count; + vstream->current_tracefile_seq++; } - vstream->tracefile_count_current = tracefile_id; - ret = close(vstream->index_read_fd); - if (ret < 0) { - PERROR("close index file %d", vstream->index_read_fd); + if (vstream->index_fd) { + stream_fd_put(vstream->index_fd); + vstream->index_fd = NULL; } - vstream->index_read_fd = -1; - - ret = close(vstream->read_fd); - if (ret < 0) { - PERROR("close tracefile %d", vstream->read_fd); + if (vstream->stream_fd) { + stream_fd_put(vstream->stream_fd); + vstream->stream_fd = NULL; } - vstream->read_fd = -1; - - pthread_mutex_lock(&vstream->overwrite_lock); - vstream->abort_flag = 0; - pthread_mutex_unlock(&vstream->overwrite_lock); ret = index_open(vstream->path_name, vstream->channel_name, - vstream->tracefile_count, vstream->tracefile_count_current); + stream->tracefile_count, + vstream->current_tracefile_id); if (ret < 0) { - goto error; + goto end; + } + vstream->index_fd = stream_fd_create(ret); + if (vstream->index_fd) { + ret = 0; + } else { + if (close(ret)) { + PERROR("close"); + } + ret = -1; } - vstream->index_read_fd = ret; - - ret = 0; - end: -error: return ret; } + +void print_viewer_streams(void) +{ + struct lttng_ht_iter iter; + struct relay_viewer_stream *vstream; + + rcu_read_lock(); + cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, + stream_n.node) { + if (!viewer_stream_get(vstream)) { + continue; + } + DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64 + " session %" PRIu64, + vstream, + vstream->ref.refcount, + vstream->stream->stream_handle, + vstream->stream->trace->id, + vstream->stream->trace->session->id); + viewer_stream_put(vstream); + } + rcu_read_unlock(); +} diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h index 003b1197c..cc46db4e2 100644 --- a/src/bin/lttng-relayd/viewer-stream.h +++ b/src/bin/lttng-relayd/viewer-stream.h @@ -1,6 +1,10 @@ +#ifndef _VIEWER_STREAM_H +#define _VIEWER_STREAM_H + /* * Copyright (C) 2013 - Julien Desfossez * David Goulet + * 2015 - Mathieu Desnoyers * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License, version 2 only, as @@ -16,9 +20,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef _VIEWER_STREAM_H -#define _VIEWER_STREAM_H - #include #include #include @@ -29,61 +30,58 @@ #include "lttng-viewer-abi.h" #include "stream.h" -/* Stub */ struct relay_stream; /* - * Shadow copy of the relay_stream structure for the viewer side. The only - * fields updated by the writer (streaming side) after allocation are : - * total_index_received and close_flag. Everything else is updated by the - * reader (viewer side). + * The viewer stream's lifetime is the intersection of their viewer connection's + * lifetime and the duration during which at least: + * a) their input source is still active + * b) they still have data left to send to the client. + * + * This means that both the sessiond/consumerd connection or the viewer + * connection may tear down (and unpublish) a relay_viewer_stream. + * + * Viewer stream updates are protected by their associated stream's lock. */ struct relay_viewer_stream { - uint64_t stream_handle; - uint64_t session_id; - int read_fd; - int index_read_fd; + struct urcu_ref ref; + pthread_mutex_t reflock; + + /* Back ref to stream. */ + struct relay_stream *stream; + + /* FD from which to read the stream data. */ + struct stream_fd *stream_fd; + /* FD from which to read the index data. */ + struct stream_fd *index_fd; + char *path_name; char *channel_name; + + uint64_t current_tracefile_id; + /* Free-running counter. */ + uint64_t current_tracefile_seq; + uint64_t last_sent_index; - uint64_t total_index_received; - uint64_t tracefile_count; - uint64_t tracefile_count_current; - /* Stop after reading this tracefile. */ - uint64_t tracefile_count_last; + + /* Indicates if this stream has been sent to a viewer client. */ + bool sent_flag; + /* For metadata stream, how much metadata has been sent. */ + uint64_t metadata_sent; + struct lttng_ht_node_u64 stream_n; struct rcu_head rcu_node; - struct ctf_trace *ctf_trace; - /* - * This lock blocks only when the writer is about to start overwriting - * a file currently read by the reader. - * - * This is nested INSIDE the viewer_stream_rotation_lock. - */ - pthread_mutex_t overwrite_lock; - /* Information telling us if the stream is a metadata stream. */ - unsigned int metadata_flag:1; - /* - * Information telling us that the stream is closed in write, so - * we don't expect new indexes and we can read up to EOF. - */ - unsigned int close_write_flag:1; - /* - * If the streaming side closes a FD in use in the viewer side, - * it sets this flag to inform that it is a normal error. - */ - unsigned int abort_flag:1; - /* Indicates if this stream has been sent to a viewer client. */ - unsigned int sent_flag:1; }; struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace); -struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id); -void viewer_stream_destroy(struct ctf_trace *ctf_trace, - struct relay_viewer_stream *stream); -void viewer_stream_delete(struct relay_viewer_stream *stream); -int viewer_stream_rotate(struct relay_viewer_stream *vstream, - struct relay_stream *stream); + enum lttng_viewer_seek seek_t); + +struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id); +bool viewer_stream_get(struct relay_viewer_stream *vstream); +void viewer_stream_put(struct relay_viewer_stream *vstream); +int viewer_stream_rotate(struct relay_viewer_stream *vstream); +bool viewer_stream_is_tracefile_seq_readable(struct relay_viewer_stream *vstream, + uint64_t seq); +void print_viewer_streams(void); #endif /* _VIEWER_STREAM_H */ diff --git a/src/common/index/ctf-index.h b/src/common/index/ctf-index.h index 0efa8887d..1f38d9ab0 100644 --- a/src/common/index/ctf-index.h +++ b/src/common/index/ctf-index.h @@ -44,7 +44,7 @@ struct ctf_packet_index_file_hdr { } __attribute__((__packed__)); /* - * Packet index generated for each trace packet store in a trace file. + * Packet index generated for each trace packet stored in a trace file. * All integer fields are stored in big endian. */ struct ctf_packet_index { diff --git a/src/common/index/index.c b/src/common/index/index.c index 35cff533c..46f8bcb1f 100644 --- a/src/common/index/index.c +++ b/src/common/index/index.c @@ -51,7 +51,7 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid, } /* Create index directory if necessary. */ - ret = run_as_mkdir(fullpath, S_IRWXU | S_IRWXG, uid, gid); + ret = utils_mkdir(fullpath, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { if (ret != -EEXIST) { PERROR("Index trace directory creation error"); @@ -59,6 +59,19 @@ int index_create_file(char *path_name, char *stream_name, int uid, int gid, } } + /* + * For tracefile rotation. We need to unlink the old + * file if present to synchronize with the tail of the + * live viewer which could be working on this same file. + * By doing so, any reference to the old index file + * stays valid even if we re-create a new file with the + * same name afterwards. + */ + ret = utils_unlink_stream_file(fullpath, stream_name, size, count, uid, + gid, DEFAULT_INDEX_FILE_SUFFIX); + if (ret < 0 && errno != ENOENT) { + goto error; + } ret = utils_create_stream_file(fullpath, stream_name, size, count, uid, gid, DEFAULT_INDEX_FILE_SUFFIX); if (ret < 0) { diff --git a/src/common/utils.c b/src/common/utils.c index db2ed8e7d..337713eb7 100644 --- a/src/common/utils.c +++ b/src/common/utils.c @@ -649,22 +649,20 @@ int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid) } /* - * Create the stream tracefile on disk. * path is the output parameter. It needs to be PATH_MAX len. * * Return 0 on success or else a negative value. */ -LTTNG_HIDDEN -int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size, - uint64_t count, int uid, int gid, char *suffix) +static int utils_stream_file_name(char *path, + const char *path_name, const char *file_name, + uint64_t size, uint64_t count, + const char *suffix) { - int ret, out_fd, flags, mode; - char full_path[PATH_MAX], *path_name_suffix = NULL, *path; + int ret; + char full_path[PATH_MAX]; + char *path_name_suffix = NULL; char *extra = NULL; - assert(path_name); - assert(file_name); - ret = snprintf(full_path, sizeof(full_path), "%s/%s", path_name, file_name); if (ret < 0) { @@ -686,8 +684,8 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si } /* - * If we split the trace in multiple files, we have to add the count at the - * end of the tracefile name + * If we split the trace in multiple files, we have to add the count at + * the end of the tracefile name. */ if (extra) { ret = asprintf(&path_name_suffix, "%s%s", full_path, extra); @@ -695,9 +693,37 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si PERROR("Allocating path name with extra string"); goto error_free_suffix; } - path = path_name_suffix; + strncpy(path, path_name_suffix, PATH_MAX - 1); + path[PATH_MAX - 1] = '\0'; } else { - path = full_path; + strncpy(path, full_path, PATH_MAX - 1); + } + path[PATH_MAX - 1] = '\0'; + ret = 0; + + free(path_name_suffix); +error_free_suffix: + free(extra); +error: + return ret; +} + +/* + * Create the stream file on disk. + * + * Return 0 on success or else a negative value. + */ +LTTNG_HIDDEN +int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size, + uint64_t count, int uid, int gid, char *suffix) +{ + int ret, flags, mode; + char path[PATH_MAX]; + + ret = utils_stream_file_name(path, path_name, file_name, + size, count, suffix); + if (ret < 0) { + goto error; } flags = O_WRONLY | O_CREAT | O_TRUNC; @@ -705,21 +731,48 @@ int utils_create_stream_file(const char *path_name, char *file_name, uint64_t si mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP; if (uid < 0 || gid < 0) { - out_fd = open(path, flags, mode); + ret = open(path, flags, mode); } else { - out_fd = run_as_open(path, flags, mode, uid, gid); + ret = run_as_open(path, flags, mode, uid, gid); } - if (out_fd < 0) { + if (ret < 0) { PERROR("open stream path %s", path); - goto error_open; } - ret = out_fd; +error: + return ret; +} -error_open: - free(path_name_suffix); -error_free_suffix: - free(extra); +/* + * Unlink the stream tracefile from disk. + * + * Return 0 on success or else a negative value. + */ +LTTNG_HIDDEN +int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size, + uint64_t count, int uid, int gid, char *suffix) +{ + int ret; + char path[PATH_MAX]; + + ret = utils_stream_file_name(path, path_name, file_name, + size, count, suffix); + if (ret < 0) { + goto error; + } + if (uid < 0 || gid < 0) { + ret = unlink(path); + } else { + ret = run_as_unlink(path, uid, gid); + if (ret < 0) { + errno = -ret; + ret = -1; + } + } + if (ret < 0) { + goto error; + } error: + DBG("utils_unlink_stream_file %s returns %d", path, ret); return ret; } @@ -749,7 +802,25 @@ int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, } if (count > 0) { + /* + * In tracefile rotation, for the relay daemon we need + * to unlink the old file if present, because it may + * still be open in reading by the live thread, and we + * need to ensure that we do not overwrite the content + * between get_index and get_packet. Since we have no + * way to verify integrity of the data content compared + * to the associated index, we need to ensure the reader + * has exclusive access to the file content, and that + * the open of the data file is performed in get_index. + * Unlinking the old file rather than overwriting it + * achieves this. + */ *new_count = (*new_count + 1) % count; + ret = utils_unlink_stream_file(path_name, file_name, + size, *new_count, uid, gid, 0); + if (ret < 0 && errno != ENOENT) { + goto error; + } } else { (*new_count)++; } diff --git a/src/common/utils.h b/src/common/utils.h index e5af6fadd..4f82ebd44 100644 --- a/src/common/utils.h +++ b/src/common/utils.h @@ -41,6 +41,8 @@ int utils_mkdir(const char *path, mode_t mode, int uid, int gid); int utils_mkdir_recursive(const char *path, mode_t mode, int uid, int gid); int utils_create_stream_file(const char *path_name, char *file_name, uint64_t size, uint64_t count, int uid, int gid, char *suffix); +int utils_unlink_stream_file(const char *path_name, char *file_name, uint64_t size, + uint64_t count, int uid, int gid, char *suffix); int utils_rotate_stream_file(char *path_name, char *file_name, uint64_t size, uint64_t count, int uid, int gid, int out_fd, uint64_t *new_count, int *stream_fd); -- 2.34.1