fix: lttng-relayd: use appropriate RCU locking with hash tables
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 24 Jul 2012 13:08:15 +0000 (09:08 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 24 Jul 2012 13:08:15 +0000 (09:08 -0400)
RCU hash tables used in lttng-relayd use the auto-resize feature, which
is performed by concurrent call rcu worker threads. Therefore, we need
to protect the calls to the hash table with appropriate RCU read-side
locking, and use call_rcu to defer memory reclaim.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/bin/lttng-relayd/lttng-relayd.h
src/bin/lttng-relayd/main.c

index 8bbc279c3a61f993f322988935b1847976e9ccf0..bec13496ca26ab5944853bb0dacadebeb8b2436f 100644 (file)
@@ -53,6 +53,7 @@ struct relay_stream {
        uint64_t seq;
        struct lttng_ht_node_ulong stream_n;
        struct relay_session *session;
+       struct rcu_head rcu_node;
        int fd;
 };
 
@@ -65,6 +66,7 @@ struct relay_command {
        struct relay_session *session;
        struct cds_wfq_node node;
        struct lttng_ht_node_ulong sock_n;
+       struct rcu_head rcu_node;
        enum connection_type type;
 };
 
index 413b6cb29afe34ed79cc0480d2bf2d7de9ffac36..3613281dd03e971a45a1187495e40b1bc0f95f74 100644 (file)
@@ -840,6 +840,14 @@ char *create_output_path(char *path_name)
        }
 }
 
+static
+void deferred_free_stream(struct rcu_head *head)
+{
+       struct relay_stream *stream =
+               caa_container_of(head, struct relay_stream, rcu_node);
+       free(stream);
+}
+
 /*
  * relay_delete_session: Free all memory associated with a session and
  * close all the FDs
@@ -859,6 +867,7 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
        DBG("Relay deleting session %lu", cmd->session->id);
        free(cmd->session->sock);
 
+       rcu_read_lock();
        cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
                node = lttng_ht_iter_get_node_ulong(&iter);
                if (node) {
@@ -868,10 +877,12 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
                                close(stream->fd);
                                ret = lttng_ht_del(streams_ht, &iter);
                                assert(!ret);
-                               free(stream);
+                               call_rcu(&stream->rcu_node,
+                                       deferred_free_stream);
                        }
                }
        }
+       rcu_read_unlock();
 }
 
 /*
@@ -909,6 +920,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                goto end_no_session;
        }
 
+       rcu_read_lock();
        stream->stream_handle = ++last_relay_stream_id;
        stream->seq = 0;
        stream->session = session;
@@ -961,6 +973,7 @@ end:
        if (send_ret < 0) {
                ERR("Relay sending stream id");
        }
+       rcu_read_unlock();
 
 end_no_session:
        return ret;
@@ -1012,6 +1025,7 @@ int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
 
 /*
  * Get stream from stream id.
+ * Need to be called with RCU read-side lock held.
  */
 static
 struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
@@ -1076,11 +1090,13 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
        metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
+
+       rcu_read_lock();
        metadata_stream = relay_stream_from_stream_id(
                        be64toh(metadata_struct->stream_id), streams_ht);
        if (!metadata_stream) {
                ret = -1;
-               goto end;
+               goto end_unlock;
        }
 
        do {
@@ -1090,10 +1106,12 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
        if (ret < (payload_size)) {
                ERR("Relay error writing metadata on file");
                ret = -1;
-               goto end;
+               goto end_unlock;
        }
        DBG2("Relay metadata written");
 
+end_unlock:
+       rcu_read_lock();
 end:
        return ret;
 }
@@ -1196,10 +1214,12 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
        }
 
        stream_id = be64toh(data_hdr.stream_id);
+
+       rcu_read_lock();
        stream = relay_stream_from_stream_id(stream_id, streams_ht);
        if (!stream) {
                ret = -1;
-               goto end;
+               goto end_unlock;
        }
 
        data_size = be32toh(data_hdr.data_size);
@@ -1208,7 +1228,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
                if (!data_buffer) {
                        ERR("Allocating data buffer");
                        ret = -1;
-                       goto end;
+                       goto end_unlock;
                }
                data_buffer_size = data_size;
        }
@@ -1217,7 +1237,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
        ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
        if (ret <= 0) {
                ret = -1;
-               goto end;
+               goto end_unlock;
        }
 
        do {
@@ -1226,22 +1246,21 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
        if (ret < data_size) {
                ERR("Relay error writing data to file");
                ret = -1;
-               goto end;
+               goto end_unlock;
        }
        DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle);
 
+end_unlock:
+       rcu_read_unlock();
 end:
        return ret;
 }
 
 static
-void relay_cleanup_connection(struct lttng_ht *relay_connections_ht, struct lttng_poll_event *events,
-               struct lttng_ht *streams_ht, int pollfd, struct lttng_ht_iter *iter)
+void relay_cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
 {
        int ret;
 
-       ret = lttng_ht_del(relay_connections_ht, iter);
-       assert(!ret);
        lttng_poll_del(events, pollfd);
 
        ret = close(pollfd);
@@ -1254,32 +1273,58 @@ static
 int relay_add_connection(int fd, struct lttng_poll_event *events,
                struct lttng_ht *relay_connections_ht)
 {
-       int ret;
        struct relay_command *relay_connection;
+       int ret;
 
        relay_connection = zmalloc(sizeof(struct relay_command));
        if (relay_connection == NULL) {
                PERROR("Relay command zmalloc");
-               ret = -1;
-               goto end;
+               goto error;
        }
        ret = read(fd, relay_connection, sizeof(struct relay_command));
        if (ret < 0 || ret < sizeof(relay_connection)) {
                PERROR("read relay cmd pipe");
-               ret = -1;
-               goto end;
+               goto error_read;
        }
 
        lttng_ht_node_init_ulong(&relay_connection->sock_n,
                        (unsigned long) relay_connection->sock->fd);
+       rcu_read_lock();
        lttng_ht_add_unique_ulong(relay_connections_ht,
                        &relay_connection->sock_n);
-       ret = lttng_poll_add(events,
+       rcu_read_unlock();
+       return lttng_poll_add(events,
                        relay_connection->sock->fd,
                        LPOLLIN | LPOLLRDHUP);
 
-end:
-       return ret;
+error_read:
+       free(relay_connection);
+error:
+       return -1;
+}
+
+static
+void deferred_free_connection(struct rcu_head *head)
+{
+       struct relay_command *relay_connection =
+               caa_container_of(head, struct relay_command, rcu_node);
+       free(relay_connection);
+}
+
+static
+void relay_del_connection(struct lttng_ht *relay_connections_ht,
+               struct lttng_ht *streams_ht, struct lttng_ht_iter *iter,
+               struct relay_command *relay_connection)
+{
+       int ret;
+
+       ret = lttng_ht_del(relay_connections_ht, iter);
+       assert(!ret);
+       if (relay_connection->type == RELAY_CONTROL) {
+               relay_delete_session(relay_connection, streams_ht);
+       }
+       call_rcu(&relay_connection->rcu_node,
+               deferred_free_connection);
 }
 
 /*
@@ -1300,6 +1345,8 @@ void *relay_thread_worker(void *data)
 
        DBG("[thread] Relay worker started");
 
+       rcu_register_thread();
+
        /* table of connections indexed on socket */
        relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        if (!relay_connections_ht) {
@@ -1367,12 +1414,14 @@ void *relay_thread_worker(void *data)
                                        }
                                }
                        } else if (revents > 0) {
+                               rcu_read_lock();
                                lttng_ht_lookup(relay_connections_ht,
                                                (void *)((unsigned long) pollfd),
                                                &iter);
                                node = lttng_ht_iter_get_node_ulong(&iter);
                                if (node == NULL) {
                                        DBG2("Relay sock %d not found", pollfd);
+                                       rcu_read_unlock();
                                        goto error;
                                }
                                relay_connection = caa_container_of(node,
@@ -1380,20 +1429,16 @@ void *relay_thread_worker(void *data)
 
                                if (revents & (LPOLLERR)) {
                                        ERR("POLL ERROR");
-                                       relay_cleanup_connection(relay_connections_ht,
-                                                       &events, streams_ht, pollfd, &iter);
-                                       if (relay_connection->type == RELAY_CONTROL) {
-                                               relay_delete_session(relay_connection, streams_ht);
-                                       }
-                                       free(relay_connection);
+                                       relay_cleanup_poll_connection(&events, pollfd);
+                                       relay_del_connection(relay_connections_ht,
+                                                       streams_ht, &iter,
+                                                       relay_connection);
                                } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
                                        DBG("Socket %d hung up", pollfd);
-                                       relay_cleanup_connection(relay_connections_ht,
-                                                       &events, streams_ht, pollfd, &iter);
-                                       if (relay_connection->type == RELAY_CONTROL) {
-                                               relay_delete_session(relay_connection, streams_ht);
-                                       }
-                                       free(relay_connection);
+                                       relay_cleanup_poll_connection(&events, pollfd);
+                                       relay_del_connection(relay_connections_ht,
+                                                       streams_ht, &iter,
+                                                       relay_connection);
                                } else if (revents & LPOLLIN) {
                                        /* control socket */
                                        if (relay_connection->type == RELAY_CONTROL) {
@@ -1402,10 +1447,10 @@ void *relay_thread_worker(void *data)
                                                                sizeof(struct lttcomm_relayd_hdr), MSG_WAITALL);
                                                /* connection closed */
                                                if (ret <= 0) {
-                                                       relay_cleanup_connection(relay_connections_ht,
-                                                                       &events, streams_ht, pollfd, &iter);
-                                                       relay_delete_session(relay_connection, streams_ht);
-                                                       free(relay_connection);
+                                                       relay_cleanup_poll_connection(&events, pollfd);
+                                                       relay_del_connection(relay_connections_ht,
+                                                                       streams_ht, &iter,
+                                                                       relay_connection);
                                                        DBG("Control connection closed with %d", pollfd);
                                                } else {
                                                        if (relay_connection->session) {
@@ -1420,10 +1465,10 @@ void *relay_thread_worker(void *data)
                                                         * command: clear the session
                                                         * */
                                                        if (ret < 0) {
-                                                               relay_cleanup_connection(relay_connections_ht,
-                                                                               &events, streams_ht, pollfd, &iter);
-                                                               relay_delete_session(relay_connection, streams_ht);
-                                                               free(relay_connection);
+                                                               relay_cleanup_poll_connection(&events, pollfd);
+                                                               relay_del_connection(relay_connections_ht,
+                                                                               streams_ht, &iter,
+                                                                               relay_connection);
                                                                DBG("Connection closed with %d", pollfd);
                                                        }
                                                }
@@ -1432,14 +1477,15 @@ void *relay_thread_worker(void *data)
                                                ret = relay_process_data(relay_connection, streams_ht);
                                                /* connection closed */
                                                if (ret < 0) {
-                                                       relay_cleanup_connection(relay_connections_ht,
-                                                                       &events, streams_ht, pollfd, &iter);
-                                                       relay_delete_session(relay_connection, streams_ht);
-                                                       free(relay_connection);
+                                                       relay_cleanup_poll_connection(&events, pollfd);
+                                                       relay_del_connection(relay_connections_ht,
+                                                                       streams_ht, &iter,
+                                                                       relay_connection);
                                                        DBG("Data connection closed with %d", pollfd);
                                                }
                                        }
                                }
+                               rcu_read_unlock();
                        }
                }
        }
@@ -1449,19 +1495,18 @@ error:
        lttng_poll_clean(&events);
 
        /* empty the hash table and free the memory */
+       rcu_read_lock();
        cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
                node = lttng_ht_iter_get_node_ulong(&iter);
                if (node) {
                        relay_connection = caa_container_of(node,
                                        struct relay_command, sock_n);
-                       if (relay_connection->type == RELAY_CONTROL) {
-                               relay_delete_session(relay_connection, streams_ht);
-                       }
-                       free(relay_connection);
+                       relay_del_connection(relay_connections_ht,
+                                       streams_ht, &iter,
+                                       relay_connection);
                }
-               ret = lttng_ht_del(relay_connections_ht, &iter);
-               assert(!ret);
        }
+       rcu_read_unlock();
 error_poll_create:
        lttng_ht_destroy(streams_ht);
 streams_ht_error:
@@ -1473,6 +1518,7 @@ relay_connections_ht_error:
        DBG("Worker thread cleanup complete");
        free(data_buffer);
        stop_threads();
+       rcu_unregister_thread();
        return NULL;
 }
 
This page took 0.032579 seconds and 4 git commands to generate.