}
}
+static
+void deferred_free_stream(struct rcu_head *head)
+{
+ struct relay_stream *stream =
+ caa_container_of(head, struct relay_stream, rcu_node);
+ free(stream);
+}
+
/*
* relay_delete_session: Free all memory associated with a session and
* close all the FDs
DBG("Relay deleting session %lu", cmd->session->id);
free(cmd->session->sock);
+ rcu_read_lock();
cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
close(stream->fd);
ret = lttng_ht_del(streams_ht, &iter);
assert(!ret);
- free(stream);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
}
}
}
+ rcu_read_unlock();
}
/*
goto end_no_session;
}
+ rcu_read_lock();
stream->stream_handle = ++last_relay_stream_id;
stream->seq = 0;
stream->session = session;
if (send_ret < 0) {
ERR("Relay sending stream id");
}
+ rcu_read_unlock();
end_no_session:
return ret;
/*
* Get stream from stream id.
+ * Need to be called with RCU read-side lock held.
*/
static
struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
goto end;
}
metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
+
+ rcu_read_lock();
metadata_stream = relay_stream_from_stream_id(
be64toh(metadata_struct->stream_id), streams_ht);
if (!metadata_stream) {
ret = -1;
- goto end;
+ goto end_unlock;
}
do {
if (ret < (payload_size)) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end;
+ goto end_unlock;
}
DBG2("Relay metadata written");
+end_unlock:
+ rcu_read_lock();
end:
return ret;
}
}
stream_id = be64toh(data_hdr.stream_id);
+
+ rcu_read_lock();
stream = relay_stream_from_stream_id(stream_id, streams_ht);
if (!stream) {
ret = -1;
- goto end;
+ goto end_unlock;
}
data_size = be32toh(data_hdr.data_size);
if (!data_buffer) {
ERR("Allocating data buffer");
ret = -1;
- goto end;
+ goto end_unlock;
}
data_buffer_size = data_size;
}
ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
if (ret <= 0) {
ret = -1;
- goto end;
+ goto end_unlock;
}
do {
if (ret < data_size) {
ERR("Relay error writing data to file");
ret = -1;
- goto end;
+ goto end_unlock;
}
DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle);
+end_unlock:
+ rcu_read_unlock();
end:
return ret;
}
static
-void relay_cleanup_connection(struct lttng_ht *relay_connections_ht, struct lttng_poll_event *events,
- struct lttng_ht *streams_ht, int pollfd, struct lttng_ht_iter *iter)
+void relay_cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
{
int ret;
- ret = lttng_ht_del(relay_connections_ht, iter);
- assert(!ret);
lttng_poll_del(events, pollfd);
ret = close(pollfd);
int relay_add_connection(int fd, struct lttng_poll_event *events,
struct lttng_ht *relay_connections_ht)
{
- int ret;
struct relay_command *relay_connection;
+ int ret;
relay_connection = zmalloc(sizeof(struct relay_command));
if (relay_connection == NULL) {
PERROR("Relay command zmalloc");
- ret = -1;
- goto end;
+ goto error;
}
ret = read(fd, relay_connection, sizeof(struct relay_command));
if (ret < 0 || ret < sizeof(relay_connection)) {
PERROR("read relay cmd pipe");
- ret = -1;
- goto end;
+ goto error_read;
}
lttng_ht_node_init_ulong(&relay_connection->sock_n,
(unsigned long) relay_connection->sock->fd);
+ rcu_read_lock();
lttng_ht_add_unique_ulong(relay_connections_ht,
&relay_connection->sock_n);
- ret = lttng_poll_add(events,
+ rcu_read_unlock();
+ return lttng_poll_add(events,
relay_connection->sock->fd,
LPOLLIN | LPOLLRDHUP);
-end:
- return ret;
+error_read:
+ free(relay_connection);
+error:
+ return -1;
+}
+
+static
+void deferred_free_connection(struct rcu_head *head)
+{
+ struct relay_command *relay_connection =
+ caa_container_of(head, struct relay_command, rcu_node);
+ free(relay_connection);
+}
+
+static
+void relay_del_connection(struct lttng_ht *relay_connections_ht,
+ struct lttng_ht *streams_ht, struct lttng_ht_iter *iter,
+ struct relay_command *relay_connection)
+{
+ int ret;
+
+ ret = lttng_ht_del(relay_connections_ht, iter);
+ assert(!ret);
+ if (relay_connection->type == RELAY_CONTROL) {
+ relay_delete_session(relay_connection, streams_ht);
+ }
+ call_rcu(&relay_connection->rcu_node,
+ deferred_free_connection);
}
/*
DBG("[thread] Relay worker started");
+ rcu_register_thread();
+
/* table of connections indexed on socket */
relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (!relay_connections_ht) {
}
}
} else if (revents > 0) {
+ rcu_read_lock();
lttng_ht_lookup(relay_connections_ht,
(void *)((unsigned long) pollfd),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node == NULL) {
DBG2("Relay sock %d not found", pollfd);
+ rcu_read_unlock();
goto error;
}
relay_connection = caa_container_of(node,
if (revents & (LPOLLERR)) {
ERR("POLL ERROR");
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- if (relay_connection->type == RELAY_CONTROL) {
- relay_delete_session(relay_connection, streams_ht);
- }
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Socket %d hung up", pollfd);
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- if (relay_connection->type == RELAY_CONTROL) {
- relay_delete_session(relay_connection, streams_ht);
- }
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
} else if (revents & LPOLLIN) {
/* control socket */
if (relay_connection->type == RELAY_CONTROL) {
sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL);
/* connection closed */
if (ret <= 0) {
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- relay_delete_session(relay_connection, streams_ht);
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
DBG("Control connection closed with %d", pollfd);
} else {
if (relay_connection->session) {
* command: clear the session
* */
if (ret < 0) {
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- relay_delete_session(relay_connection, streams_ht);
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
DBG("Connection closed with %d", pollfd);
}
}
ret = relay_process_data(relay_connection, streams_ht);
/* connection closed */
if (ret < 0) {
- relay_cleanup_connection(relay_connections_ht,
- &events, streams_ht, pollfd, &iter);
- relay_delete_session(relay_connection, streams_ht);
- free(relay_connection);
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
DBG("Data connection closed with %d", pollfd);
}
}
}
+ rcu_read_unlock();
}
}
}
lttng_poll_clean(&events);
/* empty the hash table and free the memory */
+ rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
if (node) {
relay_connection = caa_container_of(node,
struct relay_command, sock_n);
- if (relay_connection->type == RELAY_CONTROL) {
- relay_delete_session(relay_connection, streams_ht);
- }
- free(relay_connection);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
}
- ret = lttng_ht_del(relay_connections_ht, &iter);
- assert(!ret);
}
+ rcu_read_unlock();
error_poll_create:
lttng_ht_destroy(streams_ht);
streams_ht_error:
DBG("Worker thread cleanup complete");
free(data_buffer);
stop_threads();
+ rcu_unregister_thread();
return NULL;
}