health-relayd.c health-relayd.h \
lttng-viewer-abi.h testpoint.h \
viewer-stream.h viewer-stream.c \
- session.c session.h
+ session.c session.h \
+ stream.c stream.h
# link on liblttngctl for check if relayd is already alive.
lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \
#define RELAYD_CMD_2_1_H
#include "lttng-relayd.h"
+#include "stream.h"
int cmd_recv_stream_2_1(struct relay_command *cmd, struct relay_stream *stream);
#include <common/utils.h>
#include "ctf-trace.h"
+#include "lttng-relayd.h"
+#include "stream.h"
static uint64_t last_relay_ctf_trace_id;
+static void rcu_destroy_ctf_trace(struct rcu_head *head)
+{
+ struct lttng_ht_node_str *node =
+ caa_container_of(head, struct lttng_ht_node_str, head);
+ struct ctf_trace *trace=
+ caa_container_of(node, struct ctf_trace, node);
+
+ free(trace);
+}
+
/*
- * Try to destroy a ctf_trace object meaning that the refcount is decremented
- * and checked if down to 0 which will free it.
+ * Destroy a ctf trace and all stream contained in it.
+ *
+ * MUST be called with the RCU read side lock.
*/
-void ctf_trace_try_destroy(struct ctf_trace *obj)
+void ctf_trace_destroy(struct ctf_trace *obj)
{
- unsigned long ret_ref;
-
- if (!obj) {
- return;
+ struct relay_stream *stream, *tmp_stream;
+
+ assert(obj);
+ /*
+ * Getting to this point, every stream referenced to that object have put
+ * back their ref since the've been closed by the control side.
+ */
+ assert(!obj->refcount);
+
+ cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list,
+ trace_list) {
+ stream_delete(relay_streams_ht, stream);
+ stream_destroy(stream);
}
- ret_ref = uatomic_add_return(&obj->refcount, -1);
- if (ret_ref == 0) {
- DBG("Freeing ctf_trace %" PRIu64, obj->id);
- free(obj);
+ call_rcu(&obj->node.head, rcu_destroy_ctf_trace);
+}
+
+void ctf_trace_try_destroy(struct relay_session *session,
+ struct ctf_trace *ctf_trace)
+{
+ assert(session);
+ assert(ctf_trace);
+
+ /*
+ * Considering no viewer attach to the session and the trace having no more
+ * stream attached, wipe the trace.
+ */
+ if (uatomic_read(&session->viewer_refcount) == 0 &&
+ uatomic_read(&ctf_trace->refcount) == 0) {
+ ctf_trace_destroy(ctf_trace);
}
}
/*
* Create and return an allocated ctf_trace object. NULL on error.
*/
-struct ctf_trace *ctf_trace_create(void)
+struct ctf_trace *ctf_trace_create(char *path_name)
{
struct ctf_trace *obj;
+ assert(path_name);
+
obj = zmalloc(sizeof(*obj));
if (!obj) {
PERROR("ctf_trace alloc");
goto error;
}
+ CDS_INIT_LIST_HEAD(&obj->stream_list);
+
obj->id = ++last_relay_ctf_trace_id;
- DBG("Created ctf_trace %" PRIu64, obj->id);
+ lttng_ht_node_init_str(&obj->node, path_name);
+
+ DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name);
error:
return obj;
}
/*
- * Check if we can assign the ctf_trace id and metadata stream to one or all
- * the streams with the same path_name (our unique ID for ctf traces).
- *
- * The given stream MUST be new and NOT visible (in any hash table).
+ * Return a ctf_trace object if found by id in the given hash table else NULL.
*/
-void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream)
+struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+ char *path_name)
{
+ struct lttng_ht_node_str *node;
struct lttng_ht_iter iter;
- struct relay_stream *tmp_stream;
+ struct ctf_trace *trace = NULL;
assert(ht);
- assert(stream);
-
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct((void *) stream->path_name, lttng_ht_seed),
- ht->match_fct, (void *) stream->path_name,
- &iter.iter, tmp_stream, ctf_trace_node.node) {
- if (stream->metadata_flag) {
- /*
- * The new stream is the metadata stream for this trace,
- * assign the ctf_trace pointer to all the streams in
- * this bucket.
- */
- pthread_mutex_lock(&tmp_stream->lock);
- tmp_stream->ctf_trace = stream->ctf_trace;
- uatomic_inc(&tmp_stream->ctf_trace->refcount);
- pthread_mutex_unlock(&tmp_stream->lock);
- DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64,
- tmp_stream->ctf_trace->id, tmp_stream->stream_handle);
- } else if (tmp_stream->ctf_trace) {
- /*
- * The ctf_trace already exists for this bucket,
- * just assign the pointer to the new stream and exit.
- */
- stream->ctf_trace = tmp_stream->ctf_trace;
- uatomic_inc(&stream->ctf_trace->refcount);
- DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64,
- tmp_stream->ctf_trace->id, tmp_stream->stream_handle);
- goto end;
- } else {
- /*
- * We don't know yet the ctf_trace ID (no metadata has been added),
- * so leave it there until the metadata stream arrives.
- */
- goto end;
- }
+
+ lttng_ht_lookup(ht, (void *) path_name, &iter);
+ node = lttng_ht_iter_get_node_str(&iter);
+ if (!node) {
+ DBG("CTF Trace path %s not found", path_name);
+ goto end;
}
+ trace = caa_container_of(node, struct ctf_trace, node);
end:
- rcu_read_unlock();
- return;
+ return trace;
}
+/*
+ * Add stream to a given hash table.
+ */
+void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace)
+{
+ assert(ht);
+ assert(trace);
+
+ lttng_ht_add_str(ht, &trace->node);
+}
+
+/*
+ * Delete stream from a given hash table.
+ */
+void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ assert(ht);
+ assert(trace);
+
+ iter.iter.node = &trace->node.node;
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+}
#include <common/hashtable/hashtable.h>
#include "lttng-relayd.h"
+#include "session.h"
struct ctf_trace {
int refcount;
+ unsigned int invalid_flag:1;
uint64_t id;
uint64_t metadata_received;
uint64_t metadata_sent;
struct relay_stream *metadata_stream;
struct relay_viewer_stream *viewer_metadata_stream;
+ /* Node indexed by stream path name in the corresponding session. */
+ struct lttng_ht_node_str node;
+
+ /* Relay stream associated with this ctf trace. */
+ struct cds_list_head stream_list;
};
-void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream);
-struct ctf_trace *ctf_trace_create(void);
-void ctf_trace_try_destroy(struct ctf_trace *obj);
+static inline void ctf_trace_get_ref(struct ctf_trace *trace)
+{
+ uatomic_inc(&trace->refcount);
+}
+
+static inline void ctf_trace_put_ref(struct ctf_trace *trace)
+{
+ uatomic_add(&trace->refcount, -1);
+}
+
+void ctf_trace_assign(struct relay_stream *stream);
+struct ctf_trace *ctf_trace_create(char *path_name);
+void ctf_trace_destroy(struct ctf_trace *obj);
+void ctf_trace_try_destroy(struct relay_session *session,
+ struct ctf_trace *ctf_trace);
+struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht,
+ char *path_name);
+void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace);
+void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace);
#endif /* _CTF_TRACE_H */
#include "health-relayd.h"
#include "testpoint.h"
#include "viewer-stream.h"
+#include "stream.h"
+#include "session.h"
+#include "ctf-trace.h"
static struct lttng_uri *live_uri;
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
stream_n.node) {
+ struct ctf_trace *ctf_trace;
+
health_code_update();
/* Ignore if not the same session. */
continue;
}
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ vstream->path_name);
+ assert(ctf_trace);
+
send_stream.id = htobe64(vstream->stream_handle);
- send_stream.ctf_trace_id = htobe64(vstream->ctf_trace->id);
+ send_stream.ctf_trace_id = htobe64(ctf_trace->id);
send_stream.metadata_flag = htobe32(vstream->metadata_flag);
strncpy(send_stream.path_name, vstream->path_name,
sizeof(send_stream.path_name));
uint32_t *nb_created)
{
int ret;
- struct relay_stream *stream;
struct lttng_ht_iter iter;
+ struct ctf_trace *ctf_trace;
assert(session);
* Create viewer streams for relay streams that are ready to be used for a
* the given session id only.
*/
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- struct relay_viewer_stream *vstream;
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+ node.node) {
+ struct relay_stream *stream;
health_code_update();
- if (stream->session->id != session->id ||
- !stream->ctf_trace || !stream->viewer_ready) {
- /*
- * Ignore stream from a different session. Don't create streams
- * with no ctf_trace or not ready for the viewer.
- */
+ if (ctf_trace->invalid_flag) {
continue;
}
- vstream = viewer_stream_find_by_id(stream->stream_handle);
- if (!vstream) {
- vstream = viewer_stream_create(stream, seek_t);
+ cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+ struct relay_viewer_stream *vstream;
+
+ if (!stream->viewer_ready) {
+ continue;
+ }
+
+ vstream = viewer_stream_find_by_id(stream->stream_handle);
if (!vstream) {
- ret = -1;
- goto error_unlock;
+ vstream = viewer_stream_create(stream, seek_t, ctf_trace);
+ if (!vstream) {
+ ret = -1;
+ goto error_unlock;
+ }
+ /* Acquire reference to ctf_trace. */
+ ctf_trace_get_ref(ctf_trace);
+
+ if (nb_created) {
+ /* Update number of created stream counter. */
+ (*nb_created)++;
+ }
+ } else if (!vstream->sent_flag && nb_unsent) {
+ /* Update number of unsent stream counter. */
+ (*nb_unsent)++;
}
- if (nb_created) {
- /* Update number of created stream counter. */
- (*nb_created)++;
+ /* Update number of total stream counter. */
+ if (nb_total) {
+ (*nb_total)++;
}
- } else if (!vstream->sent_flag && nb_unsent) {
- /* Update number of unsent stream counter. */
- (*nb_unsent)++;
- }
- /* Update number of total stream counter. */
- if (nb_total) {
- (*nb_total)++;
}
}
ret = 0;
error_unlock:
+ rcu_read_unlock();
pthread_mutex_unlock(&session->viewer_ready_lock);
return ret;
}
/* Stopping all threads */
DBG("Terminating all live threads");
- ret = notify_thread_pipe(thread_quit_pipe[1]);
+ ret = notify_thread_pipe(live_conn_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
}
}
/* Add quit pipe */
- ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(events, live_conn_pipe[0], LPOLLIN | LPOLLERR);
if (ret < 0) {
goto error;
}
* Return 1 if it was triggered else 0;
*/
static
-int check_thread_quit_pipe(int fd, uint32_t events)
+int check_live_conn_pipe(int fd, uint32_t events)
{
- if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
+ if (fd == live_conn_pipe[0] && (events & LPOLLIN)) {
return 1;
}
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
- ret = check_thread_quit_pipe(pollfd, revents);
+ ret = check_live_conn_pipe(pollfd, revents);
if (ret) {
err = 0;
goto exit;
health_code_update();
-
ret = send_response(cmd->sock, &reply, sizeof(reply));
if (ret < 0) {
goto end;
sizeof(send_session.hostname));
send_session.id = htobe64(session->id);
send_session.live_timer = htobe32(session->live_timer);
- send_session.clients = htobe32(session->viewer_attached);
+ send_session.clients = htobe32(session->viewer_refcount);
send_session.streams = htobe32(session->stream_count);
health_code_update();
assert(cmd);
assert(sessions_ht);
- DBG("Attach session received");
-
health_code_update();
/* Receive the request from the connected client. */
response.status = htobe32(VIEWER_ATTACH_UNK);
goto send_reply;
}
+ session_viewer_attach(session);
+ DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id));
- if (cmd->session_id == session->id) {
- /* Same viewer already attached, just send the stream list. */
- send_streams = 1;
- response.status = htobe32(VIEWER_ATTACH_OK);
- } else if (session->viewer_attached != 0) {
+ if (uatomic_read(&session->viewer_refcount) > 1) {
DBG("Already a viewer attached");
response.status = htobe32(VIEWER_ATTACH_ALREADY);
+ session_viewer_detach(session);
goto send_reply;
} else if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(VIEWER_ATTACH_NOT_LIVE);
goto send_reply;
} else {
- session->viewer_attached++;
send_streams = 1;
response.status = htobe32(VIEWER_ATTACH_OK);
cmd->session_id = session->id;
struct ctf_packet_index packet_index;
struct relay_viewer_stream *vstream;
struct relay_stream *rstream;
+ struct ctf_trace *ctf_trace;
+ struct relay_session *session;
assert(cmd);
assert(sessions_ht);
health_code_update();
rcu_read_lock();
+ session = session_find_by_id(sessions_ht, cmd->session_id);
+ if (!session) {
+ ret = -1;
+ goto end_unlock;
+ }
+
vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id));
if (!vstream) {
ret = -1;
goto end_unlock;
}
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name);
+ assert(ctf_trace);
+
memset(&viewer_index, 0, sizeof(viewer_index));
/*
vstream->index_read_fd = ret;
}
- rstream = relay_stream_find_by_id(vstream->stream_handle);
- if (rstream) {
+ rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
+ assert(rstream);
+
+ if (!rstream->close_flag) {
if (vstream->abort_flag) {
/* Rotate on abort (overwrite). */
DBG("Viewer rotate because of overwrite");
} else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
+ /* ret == 0 means successful so we continue. */
}
+
pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
if (rstream->beacon_ts_end != -1ULL &&
viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
goto send_reply;
- /*
- * Reader and writer are working in the same tracefile, so we care
- * about the number of index received and sent. Otherwise, we read
- * up to EOF.
- */
} else if (rstream->total_index_received <= vstream->last_sent_index
&& !vstream->close_write_flag) {
+ /*
+ * Reader and writer are working in the same tracefile, so we care
+ * about the number of index received and sent. Otherwise, we read
+ * up to EOF.
+ */
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
/* No new index to send, retry later. */
viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
}
}
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
- } else if (!rstream && vstream->close_write_flag &&
+ } else if (rstream->close_flag && vstream->close_write_flag &&
vstream->total_index_received == vstream->last_sent_index) {
/* Last index sent and current tracefile closed in write */
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
} else {
vstream->close_write_flag = 1;
}
- if (!vstream->ctf_trace->metadata_received ||
- vstream->ctf_trace->metadata_received >
- vstream->ctf_trace->metadata_sent) {
+ if (!ctf_trace->metadata_received ||
+ ctf_trace->metadata_received > ctf_trace->metadata_sent) {
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
}
pthread_mutex_lock(&vstream->overwrite_lock);
if (vstream->abort_flag) {
/*
- * The file is being overwritten by the writer, we cannot
- * use it.
+ * The file is being overwritten by the writer, we cannot * use it.
*/
viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
pthread_mutex_unlock(&vstream->overwrite_lock);
} else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
goto send_reply;
} else if (ret == 1) {
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
- viewer_stream_destroy(vstream);
+ viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
} else {
struct lttng_viewer_get_packet get_packet_info;
struct lttng_viewer_trace_packet reply;
struct relay_viewer_stream *stream;
+ struct ctf_trace *ctf_trace;
assert(cmd);
if (!stream) {
goto error;
}
- assert(stream->ctf_trace);
+
+ ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
/*
* First time we read this stream, we need open the tracefile, we should
stream->read_fd = ret;
}
- if (!stream->ctf_trace->metadata_received ||
- stream->ctf_trace->metadata_received >
- stream->ctf_trace->metadata_sent) {
+ if (!ctf_trace->metadata_received ||
+ ctf_trace->metadata_received > ctf_trace->metadata_sent) {
reply.status = htobe32(VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
goto send_reply;
struct lttng_viewer_get_metadata request;
struct lttng_viewer_metadata_packet reply;
struct relay_viewer_stream *stream;
+ struct ctf_trace *ctf_trace;
assert(cmd);
ERR("Invalid metadata stream");
goto error;
}
- assert(stream->ctf_trace);
- assert(stream->ctf_trace->metadata_sent <=
- stream->ctf_trace->metadata_received);
- len = stream->ctf_trace->metadata_received -
- stream->ctf_trace->metadata_sent;
+ ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
+ assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received);
+
+ len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
if (len == 0) {
reply.status = htobe32(VIEWER_NO_NEW_METADATA);
goto send_reply;
PERROR("Relay reading metadata file");
goto error;
}
- stream->ctf_trace->metadata_sent += read_len;
+ ctf_trace->metadata_sent += read_len;
reply.status = htobe32(VIEWER_METADATA_OK);
goto send_reply;
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
- if (relay_connection->session &&
- relay_connection->session->viewer_attached > 0) {
- relay_connection->session->viewer_attached--;
- }
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
/*
* Delete all streams for a specific session ID.
*/
-static
-void viewer_del_streams(uint64_t session_id)
+static void destroy_viewer_streams_by_session(struct relay_session *session)
{
struct relay_viewer_stream *stream;
struct lttng_ht_iter iter;
+ assert(session);
+
rcu_read_lock();
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
stream_n.node) {
- health_code_update();
+ struct ctf_trace *ctf_trace;
- if (stream->session_id != session_id) {
+ health_code_update();
+ if (stream->session_id != session->id) {
continue;
}
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
+
viewer_stream_delete(stream);
- assert(stream->ctf_trace);
if (stream->metadata_flag) {
- /*
- * The metadata viewer stream is destroyed once the refcount on the
- * ctf trace goes to 0 in the destroy stream function thus there is
- * no explicit call to that function here.
- */
- stream->ctf_trace->metadata_sent = 0;
- stream->ctf_trace->viewer_metadata_stream = NULL;
- } else {
- viewer_stream_destroy(stream);
+ ctf_trace->metadata_sent = 0;
+ ctf_trace->viewer_metadata_stream = NULL;
}
+
+ viewer_stream_destroy(ctf_trace, stream);
}
rcu_read_unlock();
}
+static void try_destroy_streams(struct relay_session *session)
+{
+ struct ctf_trace *ctf_trace;
+ struct lttng_ht_iter iter;
+
+ assert(session);
+
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+ node.node) {
+ /* Attempt to destroy the ctf trace of that session. */
+ ctf_trace_try_destroy(session, ctf_trace);
+ }
+}
+
/*
* Delete and free a connection.
*
*/
static
void del_connection(struct lttng_ht *relay_connections_ht,
- struct lttng_ht_iter *iter, struct relay_command *relay_connection)
+ struct lttng_ht_iter *iter, struct relay_command *relay_connection,
+ struct lttng_ht *sessions_ht)
{
int ret;
+ struct relay_session *session;
assert(relay_connections_ht);
assert(iter);
assert(relay_connection);
+ assert(sessions_ht);
DBG("Cleaning connection of session ID %" PRIu64,
relay_connection->session_id);
+ rcu_read_lock();
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
- viewer_del_streams(relay_connection->session_id);
+ session = session_find_by_id(sessions_ht, relay_connection->session_id);
+ if (session) {
+ /*
+ * Very important that this is done before destroying the session so we
+ * can put back every viewer stream reference from the ctf_trace.
+ */
+ destroy_viewer_streams_by_session(session);
+ try_destroy_streams(session);
+ session_viewer_try_destroy(sessions_ht, session);
+ }
+ rcu_read_unlock();
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
}
health_code_update();
/* Thread quit pipe has been closed. Killing thread. */
- ret = check_thread_quit_pipe(pollfd, revents);
+ ret = check_live_conn_pipe(pollfd, revents);
if (ret) {
err = 0;
goto exit;
if (revents & (LPOLLERR)) {
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Viewer socket %d hung up", pollfd);
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
} else if (revents & LPOLLIN) {
ret = relay_connection->sock->ops->recvmsg(
relay_connection->sock, &recv_hdr,
if (ret <= 0) {
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
DBG("Viewer control connection closed with %d",
pollfd);
} else {
/* Clear the session on error. */
cleanup_poll_connection(&events, pollfd);
del_connection(relay_connections_ht, &iter,
- relay_connection);
+ relay_connection, relay_ctx->sessions_ht);
DBG("Viewer connection closed with %d", pollfd);
}
}
relay_connection = caa_container_of(node, struct relay_command,
sock_n);
- del_connection(relay_connections_ht, &iter, relay_connection);
+ del_connection(relay_connections_ht, &iter, relay_connection,
+ relay_ctx->sessions_ht);
}
rcu_read_unlock();
error_poll_create:
RELAY_VIEWER_NOTIFICATION = 4,
};
-/*
- * When we receive a stream, it gets stored in a list (on a per connection
- * basis) until we have all the streams of the same channel and the metadata
- * associated with it, then it gets flagged with viewer_ready.
- */
-struct relay_stream_recv_handle {
- uint64_t id; /* stream handle */
- struct cds_list_head node;
-};
-
-/*
- * Represents a stream in the relay
- */
-struct relay_stream {
- uint64_t stream_handle;
- uint64_t prev_seq; /* previous data sequence number encountered */
- struct lttng_ht_node_ulong stream_n;
- struct relay_session *session;
- struct rcu_head rcu_node;
- int fd;
- /* FD on which to write the index data. */
- int index_fd;
- /* FD on which to read the index data for the viewer. */
- int read_index_fd;
-
- char *path_name;
- char *channel_name;
- /* on-disk circular buffer of tracefiles */
- uint64_t tracefile_size;
- uint64_t tracefile_size_current;
- uint64_t tracefile_count;
- uint64_t tracefile_count_current;
- /* To inform the viewer up to where it can go back in time. */
- uint64_t oldest_tracefile_id;
-
- uint64_t total_index_received;
- uint64_t last_net_seq_num;
-
- /*
- * This node is added to the *control* connection hash table and the
- * pointer is copied in here so we can access it when deleting this object.
- * When deleting this, the ctf trace ht MUST NOT be destroyed. This happens
- * at connection deletion.
- */
- struct lttng_ht_node_str ctf_trace_node;
- struct lttng_ht *ctf_traces_ht;
-
- /*
- * To protect from concurrent read/update between the
- * streaming-side and the viewer-side.
- * This lock must be held, we reading/updating the
- * ctf_trace pointer.
- */
- pthread_mutex_t lock;
-
- struct ctf_trace *ctf_trace;
- /*
- * If the stream is inactive, this field is updated with the live beacon
- * timestamp end, when it is active, this field == -1ULL.
- */
- uint64_t beacon_ts_end;
- /*
- * To protect the update of the close_write_flag and the checks of
- * the tracefile_count_current.
- * It is taken before checking whenever we need to know if the
- * writer and reader are working in the same tracefile.
- */
- pthread_mutex_t viewer_stream_rotation_lock;
-
- /* Information telling us when to close the stream */
- unsigned int close_flag:1;
- /* Indicate if the stream was initialized for a data pending command. */
- unsigned int data_pending_check_done:1;
- unsigned int metadata_flag:1;
- /*
- * To detect when we start overwriting old data, it is used to
- * update the oldest_tracefile_id.
- */
- unsigned int tracefile_overwrite:1;
- /*
- * Can this stream be used by a viewer or are we waiting for additional
- * information.
- */
- unsigned int viewer_ready:1;
-};
-
/*
* Internal structure to map a socket with the corresponding session.
* A hashtable indexed on the socket FD is used for the lookups.
/* protocol version to use for this session */
uint32_t major;
uint32_t minor;
- struct lttng_ht *ctf_traces_ht; /* indexed by path name */
uint64_t session_id;
struct cds_list_head recv_head;
unsigned int version_check_done:1;
extern char *opt_output_path;
+/*
+ * Contains stream indexed by ID. This is important since many commands lookup
+ * streams only by ID thus also keeping them in this hash table makes the
+ * search O(1) instead of iterating over the ctf_traces_ht of the session.
+ */
extern struct lttng_ht *relay_streams_ht;
+
extern struct lttng_ht *viewer_streams_ht;
extern struct lttng_ht *indexes_ht;
extern const char *tracing_group_name;
+extern const char * const config_section_name;
+
extern int thread_quit_pipe[2];
-struct relay_stream *relay_stream_find_by_id(uint64_t stream_id);
void lttng_relay_notify_ready(void);
#endif /* LTTNG_RELAYD_H */
#include "health-relayd.h"
#include "testpoint.h"
#include "viewer-stream.h"
+#include "session.h"
+#include "stream.h"
/* command line options */
char *opt_output_path;
static pthread_t health_thread;
static uint64_t last_relay_stream_id;
-static uint64_t last_relay_session_id;
/*
* Relay command queue.
static
int close_stream_check(struct relay_stream *stream)
{
-
if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
/*
* We are about to close the stream so set the data pending flag to 1
return 0;
}
+static void try_close_stream(struct relay_session *session,
+ struct relay_stream *stream)
+{
+ int ret;
+ struct ctf_trace *ctf_trace;
+
+ assert(session);
+ assert(stream);
+
+ if (!close_stream_check(stream)) {
+ /* Can't close it, not ready for that. */
+ goto end;
+ }
+
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
+
+ pthread_mutex_lock(&session->viewer_ready_lock);
+ ctf_trace->invalid_flag = 1;
+ pthread_mutex_unlock(&session->viewer_ready_lock);
+
+ ret = stream_close(session, stream);
+ if (!ret) {
+ /* Already close thus the ctf trace is being or has been destroyed. */
+ goto end;
+ }
+
+ ctf_trace_try_destroy(session, ctf_trace);
+
+end:
+ return;
+}
+
/*
* This thread manages the listening for new connections on the network
*/
return NULL;
}
-/*
- * Get stream from stream id.
- * Need to be called with RCU read-side lock held.
- */
-struct relay_stream *relay_stream_find_by_id(uint64_t stream_id)
+static void try_close_streams(struct relay_session *session)
{
- struct lttng_ht_node_ulong *node;
+ struct ctf_trace *ctf_trace;
struct lttng_ht_iter iter;
- struct relay_stream *ret;
-
- lttng_ht_lookup(relay_streams_ht,
- (void *)((unsigned long) stream_id),
- &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node == NULL) {
- DBG("Relay stream %" PRIu64 " not found", stream_id);
- ret = NULL;
- goto end;
- }
- ret = caa_container_of(node, struct relay_stream, stream_n);
+ assert(session);
-end:
- return ret;
-}
-
-static
-void deferred_free_stream(struct rcu_head *head)
-{
- struct relay_stream *stream =
- caa_container_of(head, struct relay_stream, rcu_node);
-
- free(stream->path_name);
- free(stream->channel_name);
- free(stream);
-}
-
-static
-void deferred_free_session(struct rcu_head *head)
-{
- struct relay_session *session =
- caa_container_of(head, struct relay_session, rcu_node);
- free(session);
-}
-
-/*
- * Close a given stream. The stream is freed using a call RCU.
- *
- * RCU read side lock MUST be acquired. If NO close_stream_check() was called
- * BEFORE the stream lock MUST be acquired.
- */
-static void destroy_stream(struct relay_stream *stream)
-{
- int delret;
- struct relay_viewer_stream *vstream;
- struct lttng_ht_iter iter;
-
- assert(stream);
-
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
- }
+ pthread_mutex_lock(&session->viewer_ready_lock);
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+ node.node) {
+ struct relay_stream *stream;
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
+ /* Close streams. */
+ cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) {
+ stream_close(session, stream);
}
- }
-
- vstream = viewer_stream_find_by_id(stream->stream_handle);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- vstream->total_index_received = stream->total_index_received;
- vstream->tracefile_count_last = stream->tracefile_count_current;
- vstream->close_write_flag = 1;
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- }
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle);
-
- iter.iter.node = &stream->stream_n.node;
- delret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!delret);
- iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(stream->ctf_traces_ht, &iter);
- assert(!delret);
-
- if (stream->ctf_trace) {
- ctf_trace_try_destroy(stream->ctf_trace);
+ ctf_trace->invalid_flag = 1;
+ ctf_trace_try_destroy(session, ctf_trace);
}
-
- call_rcu(&stream->rcu_node, deferred_free_stream);
- DBG("Closed tracefile %d from close stream", stream->fd);
+ rcu_read_unlock();
+ pthread_mutex_unlock(&session->viewer_ready_lock);
}
/*
- * relay_delete_session: Free all memory associated with a session and
- * close all the FDs
+ * Try to destroy a session within a connection.
*/
static
void relay_delete_session(struct relay_command *cmd,
struct lttng_ht *sessions_ht)
{
- struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
- struct relay_stream *stream;
- int ret;
+ assert(cmd);
+ assert(sessions_ht);
- if (!cmd->session) {
- return;
- }
+ /* Indicate that this session can be destroyed from now on. */
+ cmd->session->close_flag = 1;
- DBG("Relay deleting session %" PRIu64, cmd->session->id);
+ try_close_streams(cmd->session);
- rcu_read_lock();
- cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (!node) {
- continue;
- }
- stream = caa_container_of(node, struct relay_stream, stream_n);
- if (stream->session == cmd->session) {
- destroy_stream(stream);
- cmd->session->stream_count--;
- assert(cmd->session->stream_count >= 0);
- }
- }
-
- /* Make this session not visible anymore. */
- iter.iter.node = &cmd->session->session_n.node;
- ret = lttng_ht_del(sessions_ht, &iter);
- assert(!ret);
- call_rcu(&cmd->session->rcu_node, deferred_free_session);
- rcu_read_unlock();
+ /*
+ * This will try to delete and destroy the session if no viewer is attached
+ * to it meaning the refcount is down to zero.
+ */
+ session_try_destroy(sessions_ht, cmd->session);
}
/*
memset(&reply, 0, sizeof(reply));
- session = zmalloc(sizeof(struct relay_session));
- if (session == NULL) {
- PERROR("relay session zmalloc");
+ session = session_create();
+ if (!session) {
ret = -1;
goto error;
}
-
- session->id = ++last_relay_session_id;
- session->sock = cmd->sock;
session->minor = cmd->minor;
session->major = cmd->major;
- pthread_mutex_init(&session->viewer_ready_lock, NULL);
+ cmd->session_id = session->id;
cmd->session = session;
reply.session_id = htobe64(session->id);
switch (cmd->minor) {
- case 1:
- case 2:
- case 3:
- break;
- case 4: /* LTTng sessiond 2.4 */
- default:
- ret = cmd_create_session_2_4(cmd, session);
- break;
+ case 1:
+ case 2:
+ case 3:
+ break;
+ case 4: /* LTTng sessiond 2.4 */
+ default:
+ ret = cmd_create_session_2_4(cmd, session);
+ break;
}
- lttng_ht_node_init_ulong(&session->session_n,
- (unsigned long) session->id);
- lttng_ht_add_unique_ulong(sessions_ht,
- &session->session_n);
-
+ lttng_ht_add_unique_u64(sessions_ht, &session->session_n);
DBG("Created session %" PRIu64, session->id);
error:
static
void set_viewer_ready_flag(struct relay_command *cmd)
{
- struct relay_stream_recv_handle *node, *tmp_node;
+ struct relay_stream *stream, *tmp_stream;
pthread_mutex_lock(&cmd->session->viewer_ready_lock);
-
- cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) {
- struct relay_stream *stream;
-
- rcu_read_lock();
- stream = relay_stream_find_by_id(node->id);
- if (!stream) {
- /*
- * Stream is most probably being cleaned up by the data thread thus
- * simply continue to the next one.
- */
- rcu_read_unlock();
- continue;
- }
-
+ cds_list_for_each_entry_safe(stream, tmp_stream, &cmd->recv_head,
+ recv_list) {
stream->viewer_ready = 1;
- rcu_read_unlock();
-
- /* Clean stream handle node. */
- cds_list_del(&node->node);
- free(node);
+ cds_list_del(&stream->recv_list);
}
-
pthread_mutex_unlock(&cmd->session->viewer_ready_lock);
return;
}
* handle. A new node is allocated thus must be freed when the node is deleted
* from the list.
*/
-static void queue_stream_handle(uint64_t handle, struct relay_command *cmd)
+static void queue_stream(struct relay_stream *stream, struct relay_command *cmd)
{
- struct relay_stream_recv_handle *node;
-
assert(cmd);
+ assert(stream);
- node = zmalloc(sizeof(*node));
- if (!node) {
- PERROR("zmalloc queue stream handle");
- return;
- }
-
- node->id = handle;
- cds_list_add(&node->node, &cmd->recv_head);
+ cds_list_add(&stream->recv_list, &cmd->recv_head);
}
/*
int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *sessions_ht)
{
+ int ret, send_ret;
struct relay_session *session = cmd->session;
struct relay_stream *stream = NULL;
struct lttcomm_relayd_status_stream reply;
- int ret, send_ret;
+ struct ctf_trace *trace;
if (!session || cmd->version_check_done == 0) {
ERR("Trying to add a stream before version check");
rcu_read_lock();
stream->stream_handle = ++last_relay_stream_id;
stream->prev_seq = -1ULL;
- stream->session = session;
+ stream->session_id = session->id;
stream->index_fd = -1;
stream->read_index_fd = -1;
- stream->ctf_trace = NULL;
+ lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
}
- if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
- stream->metadata_flag = 1;
- /*
- * When we receive a new metadata stream, we create a new
- * ctf_trace and we assign this ctf_trace to all streams with
- * the same path.
- *
- * If later on we receive a new stream for the same ctf_trace,
- * we copy the information from the first hit in the HT to the
- * new stream.
- */
- stream->ctf_trace = ctf_trace_create();
- if (!stream->ctf_trace) {
+ trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name);
+ if (!trace) {
+ trace = ctf_trace_create(stream->path_name);
+ if (!trace) {
ret = -1;
goto end;
}
- stream->ctf_trace->refcount++;
- stream->ctf_trace->metadata_stream = stream;
+ ctf_trace_add(session->ctf_traces_ht, trace);
+ }
+ ctf_trace_get_ref(trace);
+
+ if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) {
+ stream->metadata_flag = 1;
+ /* Assign quick reference to the metadata stream in the trace. */
+ trace->metadata_stream = stream;
}
- ctf_trace_assign(cmd->ctf_traces_ht, stream);
- stream->ctf_traces_ht = cmd->ctf_traces_ht;
/*
- * Add the stream handle in the recv list of the connection. Once the end
- * stream message is received, this list is emptied and streams are set
- * with the viewer ready flag.
+ * Add the stream in the recv list of the connection. Once the end stream
+ * message is received, this list is emptied and streams are set with the
+ * viewer ready flag.
*/
- queue_stream_handle(stream->stream_handle, cmd);
+ queue_stream(stream, cmd);
- lttng_ht_node_init_ulong(&stream->stream_n,
- (unsigned long) stream->stream_handle);
- lttng_ht_add_unique_ulong(relay_streams_ht,
- &stream->stream_n);
+ /*
+ * Both in the ctf_trace object and the global stream ht since the data
+ * side of the relayd does not have the concept of session.
+ */
+ lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
+ cds_list_add_tail(&stream->trace_list, &trace->stream_list);
- lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name);
- lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node);
session->stream_count++;
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
}
rcu_read_lock();
- stream = relay_stream_find_by_id(be64toh(stream_info.stream_id));
+ stream = stream_find_by_id(relay_streams_ht,
+ be64toh(stream_info.stream_id));
if (!stream) {
ret = -1;
goto end_unlock;
session->stream_count--;
assert(session->stream_count >= 0);
- if (close_stream_check(stream)) {
- destroy_stream(stream);
- }
+ /* Check if we can close it or else the data will do it. */
+ try_close_stream(session, stream);
end_unlock:
rcu_read_unlock();
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
uint64_t data_size, payload_size;
+ struct ctf_trace *ctf_trace;
if (!session) {
ERR("Metadata sent before version check");
metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
rcu_read_lock();
- metadata_stream = relay_stream_find_by_id(
+ metadata_stream = stream_find_by_id(relay_streams_ht,
be64toh(metadata_struct->stream_id));
if (!metadata_stream) {
ret = -1;
if (ret < 0) {
goto end_unlock;
}
- metadata_stream->ctf_trace->metadata_received +=
+
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ metadata_stream->path_name);
+ assert(ctf_trace);
+ ctf_trace->metadata_received +=
payload_size + be32toh(metadata_struct->padding_size);
DBG2("Relay metadata written");
last_net_seq_num = be64toh(msg.last_net_seq_num);
rcu_read_lock();
- stream = relay_stream_find_by_id(stream_id);
+ stream = stream_find_by_id(relay_streams_ht, stream_id);
if (stream == NULL) {
ret = -1;
goto end_unlock;
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
+ node.node) {
if (stream->stream_handle == stream_id) {
stream->data_pending_check_done = 1;
DBG("Relay quiescent control pending flag set to %" PRIu64,
*/
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- if (stream->session->id == session_id) {
+ node.node) {
+ if (stream->session_id == session_id) {
stream->data_pending_check_done = 0;
DBG("Set begin data pending flag to stream %" PRIu64,
stream->stream_handle);
/* Iterate over all streams to see if the begin data pending flag is set. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
- stream_n.node) {
- if (stream->session->id == session_id &&
+ node.node) {
+ if (stream->session_id == session_id &&
!stream->data_pending_check_done) {
is_data_inflight = 1;
DBG("Data is still in flight for stream %" PRIu64,
net_seq_num = be64toh(index_info.net_seq_num);
rcu_read_lock();
- stream = relay_stream_find_by_id(be64toh(index_info.relay_stream_id));
+ stream = stream_find_by_id(relay_streams_ht,
+ be64toh(index_info.relay_stream_id));
if (!stream) {
ret = -1;
goto end_rcu_unlock;
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd)
+int relay_process_data(struct relay_command *cmd, struct lttng_ht *sessions_ht)
{
int ret = 0, rotate_index = 0;
ssize_t size_ret;
uint64_t stream_id;
uint64_t net_seq_num;
uint32_t data_size;
+ struct relay_session *session;
ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
stream_id = be64toh(data_hdr.stream_id);
rcu_read_lock();
- stream = relay_stream_find_by_id(stream_id);
+ stream = stream_find_by_id(relay_streams_ht, stream_id);
if (!stream) {
ret = -1;
goto end_rcu_unlock;
}
+ session = session_find_by_id(sessions_ht, stream->session_id);
+ assert(session);
+
data_size = be32toh(data_hdr.data_size);
if (data_buffer_size < data_size) {
char *tmp_data_ptr;
* Index are handled in protocol version 2.4 and above. Also, snapshot and
* index are NOT supported.
*/
- if (stream->session->minor >= 4 && !stream->session->snapshot) {
+ if (session->minor >= 4 && !session->snapshot) {
ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
goto end_rcu_unlock;
stream->prev_seq = net_seq_num;
- /* Check if we need to close the FD */
- if (close_stream_check(stream)) {
- destroy_stream(stream);
- }
+ try_close_stream(session, stream);
end_rcu_unlock:
rcu_read_unlock();
}
CDS_INIT_LIST_HEAD(&relay_connection->recv_head);
- /*
- * Only used by the control side and the reference is copied inside each
- * stream from that connection. Thus a destroy HT must be done after every
- * stream has been destroyed.
- */
- if (relay_connection->type == RELAY_CONTROL) {
- relay_connection->ctf_traces_ht = lttng_ht_new(0,
- LTTNG_HT_TYPE_STRING);
- if (!relay_connection->ctf_traces_ht) {
- goto error_read;
- }
- }
-
lttng_ht_node_init_ulong(&relay_connection->sock_n,
(unsigned long) relay_connection->sock->fd);
rcu_read_lock();
assert(!ret);
if (relay_connection->type == RELAY_CONTROL) {
- struct relay_stream_recv_handle *node, *tmp_node;
-
- relay_delete_session(relay_connection, sessions_ht);
- lttng_ht_destroy(relay_connection->ctf_traces_ht);
+ struct relay_stream *stream, *tmp_stream;
/* Clean up recv list. */
- cds_list_for_each_entry_safe(node, tmp_node,
- &relay_connection->recv_head, node) {
- cds_list_del(&node->node);
- free(node);
+ cds_list_for_each_entry_safe(stream, tmp_stream,
+ &relay_connection->recv_head, recv_list) {
+ cds_list_del(&stream->recv_list);
}
+
+ relay_delete_session(relay_connection, sessions_ht);
+
}
call_rcu(&relay_connection->rcu_node, deferred_free_connection);
continue;
}
- ret = relay_process_data(relay_connection);
+ ret = relay_process_data(relay_connection,
+ sessions_ht);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
}
/* tables of sessions indexed by session ID */
- relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_ctx->sessions_ht) {
goto exit_relay_ctx_sessions;
}
/* tables of streams indexed by stream ID */
- relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_streams_ht) {
goto exit_relay_ctx_streams;
}
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+#define _GNU_SOURCE
+#include <common/common.h>
+
+#include "ctf-trace.h"
#include "session.h"
+#include "stream.h"
+
+/* Global session id used in the session creation. */
+static uint64_t last_relay_session_id;
+
+static void rcu_destroy_session(struct rcu_head *head)
+{
+ struct relay_session *session =
+ caa_container_of(head, struct relay_session, rcu_node);
+
+ free(session);
+}
+
+/*
+ * Create a new session by assigning a new session ID.
+ *
+ * Return allocated session or else NULL.
+ */
+struct relay_session *session_create(void)
+{
+ struct relay_session *session;
+
+ session = zmalloc(sizeof(*session));
+ if (!session) {
+ PERROR("relay session zmalloc");
+ goto error;
+ }
+
+ session->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
+ if (!session->ctf_traces_ht) {
+ free(session);
+ goto error;
+ }
+
+ pthread_mutex_init(&session->viewer_ready_lock, NULL);
+ session->id = ++last_relay_session_id;
+ lttng_ht_node_init_u64(&session->session_n, session->id);
+
+error:
+ return session;
+}
/*
* Lookup a session within the given hash table and session id. RCU read side
struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id)
{
struct relay_session *session = NULL;
- struct lttng_ht_node_ulong *node;
+ struct lttng_ht_node_u64 *node;
struct lttng_ht_iter iter;
assert(ht);
- lttng_ht_lookup(ht, (void *)((unsigned long) id), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
+ lttng_ht_lookup(ht, &id, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
if (!node) {
+ DBG("Session find by ID %" PRIu64 " id NOT found", id);
goto end;
}
session = caa_container_of(node, struct relay_session, session_n);
+ DBG("Session find by ID %" PRIu64 " id found", id);
end:
return session;
}
+
+/*
+ * Delete session from the given hash table.
+ *
+ * Return lttng ht del error code being 0 on success and 1 on failure.
+ */
+int session_delete(struct lttng_ht *ht, struct relay_session *session)
+{
+ struct lttng_ht_iter iter;
+
+ assert(ht);
+ assert(session);
+
+ iter.iter.node = &session->session_n.node;
+ return lttng_ht_del(ht, &iter);
+}
+
+/*
+ * The caller MUST be from the viewer thread since the viewer refcount is
+ * decremented. With this calue down to 0, it will try to destroy the session.
+ */
+void session_viewer_try_destroy(struct lttng_ht *ht,
+ struct relay_session *session)
+{
+ unsigned long ret_ref;
+
+ assert(session);
+
+ ret_ref = uatomic_add_return(&session->viewer_refcount, -1);
+ if (ret_ref == 0) {
+ session_try_destroy(ht, session);
+ }
+}
+
+/*
+ * Should only be called from the main streaming thread since it does not touch
+ * the viewer refcount. If this refcount is down to 0, destroy the session only
+ * and only if the session deletion succeeds. This is done because the viewer
+ * *and* the streaming thread can both concurently try to destroy the session
+ * thus the first come first serve.
+ */
+void session_try_destroy(struct lttng_ht *ht, struct relay_session *session)
+{
+ int ret = 0;
+ unsigned long ret_ref;
+
+ assert(session);
+
+ ret_ref = uatomic_read(&session->viewer_refcount);
+ if (ret_ref == 0 && session->close_flag) {
+ if (ht) {
+ ret = session_delete(ht, session);
+ }
+ if (!ret) {
+ /* Only destroy the session if the deletion was successful. */
+ session_destroy(session);
+ }
+ }
+}
+
+/*
+ * Destroy a session object.
+ */
+void session_destroy(struct relay_session *session)
+{
+ struct ctf_trace *ctf_trace;
+ struct lttng_ht_iter iter;
+
+ assert(session);
+
+ DBG("Relay destroying session %" PRIu64, session->id);
+
+ /*
+ * Empty the ctf trace hash table which will destroy the stream contained
+ * in that table.
+ */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
+ node.node) {
+ ctf_trace_delete(session->ctf_traces_ht, ctf_trace);
+ ctf_trace_destroy(ctf_trace);
+ }
+ lttng_ht_destroy(session->ctf_traces_ht);
+ rcu_read_unlock();
+
+ call_rcu(&session->rcu_node, rcu_destroy_session);
+}
* daemon which can provide multiple data source.
*/
uint64_t id;
- struct lttcomm_sock *sock;
char session_name[NAME_MAX];
char hostname[HOST_NAME_MAX];
uint32_t live_timer;
- struct lttng_ht_node_ulong session_n;
+ struct lttng_ht_node_u64 session_n;
struct rcu_head rcu_node;
- uint32_t viewer_attached;
uint32_t stream_count;
/* Tell if this session is for a snapshot or not. */
unsigned int snapshot:1;
+ /* Tell if the session has been closed on the streaming side. */
+ unsigned int close_flag:1;
+
+ /* Number of viewer using it. Set to 0, it should be destroyed. */
+ int viewer_refcount;
+
+ /* Contains ctf_trace object of that session indexed by path name. */
+ struct lttng_ht *ctf_traces_ht;
/*
* Indicate version protocol for this session. This is especially useful
pthread_mutex_t viewer_ready_lock;
};
+static inline void session_viewer_attach(struct relay_session *session)
+{
+ uatomic_inc(&session->viewer_refcount);
+}
+
+static inline void session_viewer_detach(struct relay_session *session)
+{
+ uatomic_add(&session->viewer_refcount, -1);
+}
+
struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id);
+struct relay_session *session_create(void);
+int session_delete(struct lttng_ht *ht, struct relay_session *session);
+
+/*
+ * Direct destroy without reading the refcount.
+ */
+void session_destroy(struct relay_session *session);
+
+/*
+ * Destroy the session if the refcount is down to 0.
+ */
+void session_try_destroy(struct lttng_ht *ht, struct relay_session *session);
+
+/*
+ * Decrement the viewer refcount and destroy it if down to 0.
+ */
+void session_viewer_try_destroy(struct lttng_ht *ht,
+ struct relay_session *session);
#endif /* _SESSION_H */
--- /dev/null
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
+ * David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _GNU_SOURCE
+#include <common/common.h>
+
+#include "index.h"
+#include "stream.h"
+#include "viewer-stream.h"
+
+static void rcu_destroy_stream(struct rcu_head *head)
+{
+ struct relay_stream *stream =
+ caa_container_of(head, struct relay_stream, rcu_node);
+
+ free(stream->path_name);
+ free(stream->channel_name);
+ free(stream);
+}
+
+/*
+ * Get stream from stream id from the given hash table. Return stream if found
+ * else NULL.
+ *
+ * Need to be called with RCU read-side lock held.
+ */
+struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
+ uint64_t stream_id)
+{
+ struct lttng_ht_node_u64 *node;
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream = NULL;
+
+ assert(ht);
+
+ lttng_ht_lookup(ht, &stream_id, &iter);
+ node = lttng_ht_iter_get_node_u64(&iter);
+ if (node == NULL) {
+ DBG("Relay stream %" PRIu64 " not found", stream_id);
+ goto end;
+ }
+ stream = caa_container_of(node, struct relay_stream, node);
+
+end:
+ return stream;
+}
+
+/*
+ * Close a given stream. If an assosiated viewer stream exists it is updated.
+ *
+ * RCU read side lock MUST be acquired.
+ *
+ * Return 0 if close was successful or 1 if already closed.
+ */
+int stream_close(struct relay_session *session, struct relay_stream *stream)
+{
+ int delret, ret;
+ struct relay_viewer_stream *vstream;
+ struct ctf_trace *ctf_trace;
+
+ assert(stream);
+
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->terminated_flag) {
+ /* This stream is already closed. Ignore. */
+ ret = 1;
+ goto end_unlock;
+ }
+
+ DBG("Closing stream id %" PRIu64, stream->stream_handle);
+
+ if (stream->fd >= 0) {
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+ }
+
+ if (stream->index_fd >= 0) {
+ delret = close(stream->index_fd);
+ if (delret < 0) {
+ PERROR("close stream index_fd");
+ }
+ }
+
+ vstream = viewer_stream_find_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * Set the last good value into the viewer stream. This is done
+ * right before the stream gets deleted from the hash table. The
+ * lookup failure on the live thread side of a stream indicates
+ * that the viewer stream index received value should be used.
+ */
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
+ vstream->total_index_received = stream->total_index_received;
+ vstream->tracefile_count_last = stream->tracefile_count_current;
+ vstream->close_write_flag = 1;
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+ }
+
+ /* Cleanup index of that stream. */
+ relay_index_destroy_by_stream_id(stream->stream_handle);
+
+ ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
+ stream->path_name);
+ assert(ctf_trace);
+ ctf_trace_put_ref(ctf_trace);
+
+ stream->terminated_flag = 1;
+ ret = 0;
+
+end_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}
+
+void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ assert(ht);
+ assert(stream);
+
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+
+ cds_list_del(&stream->trace_list);
+}
+
+void stream_destroy(struct relay_stream *stream)
+{
+ assert(stream);
+
+ call_rcu(&stream->rcu_node, rcu_destroy_stream);
+}
--- /dev/null
+/*
+ * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
+ * David Goulet <dgoulet@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef _STREAM_H
+#define _STREAM_H
+
+#include <limits.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <urcu/list.h>
+
+#include <common/hashtable/hashtable.h>
+
+#include "session.h"
+
+/*
+ * Represents a stream in the relay
+ */
+struct relay_stream {
+ uint64_t stream_handle;
+ uint64_t prev_seq; /* previous data sequence number encountered */
+ struct lttng_ht_node_u64 node;
+ /*
+ * When we receive a stream, it gets stored in a list (on a per connection
+ * basis) until we have all the streams of the same channel and the metadata
+ * associated with it, then it gets flagged with viewer_ready.
+ */
+ struct cds_list_head recv_list;
+
+ /* Added to the corresponding ctf_trace. */
+ struct cds_list_head trace_list;
+ struct rcu_head rcu_node;
+ uint64_t session_id;
+ int fd;
+ /* FD on which to write the index data. */
+ int index_fd;
+ /* FD on which to read the index data for the viewer. */
+ int read_index_fd;
+
+ char *path_name;
+ char *channel_name;
+ /* on-disk circular buffer of tracefiles */
+ uint64_t tracefile_size;
+ uint64_t tracefile_size_current;
+ uint64_t tracefile_count;
+ uint64_t tracefile_count_current;
+ /* To inform the viewer up to where it can go back in time. */
+ uint64_t oldest_tracefile_id;
+
+ uint64_t total_index_received;
+ uint64_t last_net_seq_num;
+
+ /*
+ * To protect from concurrent read/update. Also used to synchronize the
+ * closing of this stream.
+ */
+ pthread_mutex_t lock;
+
+ /*
+ * If the stream is inactive, this field is updated with the live beacon
+ * timestamp end, when it is active, this field == -1ULL.
+ */
+ uint64_t beacon_ts_end;
+ /*
+ * To protect the update of the close_write_flag and the checks of
+ * the tracefile_count_current.
+ * It is taken before checking whenever we need to know if the
+ * writer and reader are working in the same tracefile.
+ */
+ pthread_mutex_t viewer_stream_rotation_lock;
+
+ /* Information telling us when to close the stream */
+ unsigned int close_flag:1;
+ /*
+ * Indicates if the stream has been effectively closed thus having the
+ * information in it invalidated but NOT freed. The stream lock MUST be
+ * held to read/update that value.
+ */
+ unsigned int terminated_flag:1;
+ /* Indicate if the stream was initialized for a data pending command. */
+ unsigned int data_pending_check_done:1;
+ unsigned int metadata_flag:1;
+ /*
+ * To detect when we start overwriting old data, it is used to
+ * update the oldest_tracefile_id.
+ */
+ unsigned int tracefile_overwrite:1;
+ /*
+ * Can this stream be used by a viewer or are we waiting for additional
+ * information.
+ */
+ unsigned int viewer_ready:1;
+};
+
+struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
+ uint64_t stream_id);
+int stream_close(struct relay_session *session, struct relay_stream *stream);
+void stream_delete(struct lttng_ht *ht, struct relay_stream *stream);
+void stream_destroy(struct relay_stream *stream);
+
+#endif /* _STREAM_H */
}
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t)
+ enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace)
{
struct relay_viewer_stream *vstream;
assert(stream);
+ assert(ctf_trace);
vstream = zmalloc(sizeof(*vstream));
if (!vstream) {
goto error;
}
- vstream->session_id = stream->session->id;
+ vstream->session_id = stream->session_id;
vstream->stream_handle = stream->stream_handle;
vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
vstream->channel_name = strndup(stream->channel_name,
goto error;
}
- vstream->ctf_trace = stream->ctf_trace;
if (vstream->metadata_flag) {
- vstream->ctf_trace->viewer_metadata_stream = vstream;
+ ctf_trace->viewer_metadata_stream = vstream;
}
- uatomic_inc(&vstream->ctf_trace->refcount);
/* Globally visible after the add unique. */
lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
assert(!ret);
}
-void viewer_stream_destroy(struct relay_viewer_stream *stream)
+void viewer_stream_destroy(struct ctf_trace *ctf_trace,
+ struct relay_viewer_stream *stream)
{
int ret;
- unsigned long ret_ref;
assert(stream);
- ret_ref = uatomic_add_return(&stream->ctf_trace->refcount, -1);
- assert(ret_ref >= 0);
+
+ if (ctf_trace) {
+ ctf_trace_put_ref(ctf_trace);
+ }
if (stream->read_fd >= 0) {
ret = close(stream->read_fd);
}
}
- /*
- * If the only stream left in the HT is the metadata stream,
- * we need to remove it because we won't detect a EOF for this
- * stream.
- */
- if (ret_ref == 1 && stream->ctf_trace->viewer_metadata_stream) {
- viewer_stream_delete(stream->ctf_trace->viewer_metadata_stream);
- viewer_stream_destroy(stream->ctf_trace->viewer_metadata_stream);
- stream->ctf_trace->metadata_stream = NULL;
- DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id);
- /*
- * The streaming-side is already closed and we can't receive a new
- * stream concurrently at this point (since the session is being
- * destroyed), so when we detect the refcount equals 0, we are the
- * only owners of the ctf_trace and we can free it ourself.
- */
- free(stream->ctf_trace);
- }
-
call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
}
uint64_t tracefile_id;
assert(vstream);
+ assert(stream);
+
+ if (vstream->tracefile_count == 0) {
+ /* Ignore rotation, there is none to do. */
+ ret = 0;
+ goto end;
+ }
tracefile_id = (vstream->tracefile_count_current + 1) %
vstream->tracefile_count;
}
/*
- * If the stream on the streaming side still exists, lock to execute
- * rotation in order to avoid races between a modification on the index
- * values.
+ * Lock to execute rotation in order to avoid races between a modification
+ * on the index values.
*/
- if (stream) {
- pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
- }
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
/*
* The writer and the reader are not working in the same tracefile, we can
* read up to EOF, we don't care about the total_index_received.
*/
- if (!stream || (stream->tracefile_count_current != tracefile_id)) {
+ if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) {
vstream->close_write_flag = 1;
} else {
/*
* limit our reading to the number of indexes received.
*/
vstream->close_write_flag = 0;
- if (stream) {
+ if (stream->close_flag) {
vstream->total_index_received = stream->total_index_received;
}
}
vstream->abort_flag = 0;
pthread_mutex_unlock(&vstream->overwrite_lock);
- if (stream) {
- pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
- }
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
ret = index_open(vstream->path_name, vstream->channel_name,
vstream->tracefile_count, vstream->tracefile_count_current);
#include "ctf-trace.h"
#include "lttng-viewer-abi.h"
+#include "stream.h"
/* Stub */
struct relay_stream;
};
struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
- enum lttng_viewer_seek seek_t);
+ enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace);
struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id);
-void viewer_stream_destroy(struct relay_viewer_stream *stream);
+void viewer_stream_destroy(struct ctf_trace *ctf_trace,
+ struct relay_viewer_stream *stream);
void viewer_stream_delete(struct relay_viewer_stream *stream);
int viewer_stream_rotate(struct relay_viewer_stream *vstream,
struct relay_stream *stream);