Fix: prioritize control socket communication in relayd
authorDavid Goulet <dgoulet@efficios.com>
Tue, 18 Dec 2012 00:04:13 +0000 (19:04 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Tue, 18 Dec 2012 19:22:20 +0000 (14:22 -0500)
Add the LTTNG_POLL_GET_PREV_FD for the relayd listener thread that needs
to access the previous valid fd during a poll loop.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/main.c
src/common/compat/poll.h

index 4de6613a0eff3e8847d31cc65c90228cc467adf8..009621a6f7b1c4701a0857848346f29c66b8511a 100644 (file)
@@ -1841,8 +1841,8 @@ void relay_del_connection(struct lttng_ht *relay_connections_ht,
 static
 void *relay_thread_worker(void *data)
 {
-       int i, ret, pollfd, err = -1;
-       uint32_t revents, nb_fd;
+       int ret, err = -1, last_seen_data_fd = -1;
+       uint32_t nb_fd;
        struct relay_command *relay_connection;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
@@ -1877,9 +1877,11 @@ void *relay_thread_worker(void *data)
                goto error;
        }
 
+restart:
        while (1) {
+               int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+
                /* Infinite blocking call, waiting for transmission */
-       restart:
                DBG3("Relayd worker thread polling...");
                ret = lttng_poll_wait(&events, -1);
                if (ret < 0) {
@@ -1894,10 +1896,15 @@ void *relay_thread_worker(void *data)
 
                nb_fd = ret;
 
+               /*
+                * Process control. The control connection is prioritised so we don't
+                * starve it with high throughout put tracing data on the data
+                * connection.
+                */
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
-                       revents = LTTNG_POLL_GETEV(&events, i);
-                       pollfd = LTTNG_POLL_GETFD(&events, i);
+                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+                       int pollfd = LTTNG_POLL_GETFD(&events, i);
 
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
@@ -1919,7 +1926,7 @@ void *relay_thread_worker(void *data)
                                                goto error;
                                        }
                                }
-                       } else if (revents > 0) {
+                       } else if (revents) {
                                rcu_read_lock();
                                lttng_ht_lookup(relay_connections_ht,
                                                (void *)((unsigned long) pollfd),
@@ -1939,12 +1946,18 @@ void *relay_thread_worker(void *data)
                                        relay_del_connection(relay_connections_ht,
                                                        streams_ht, &iter,
                                                        relay_connection);
+                                       if (last_seen_data_fd == pollfd) {
+                                               last_seen_data_fd = last_notdel_data_fd;
+                                       }
                                } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
                                        DBG("Socket %d hung up", pollfd);
                                        relay_cleanup_poll_connection(&events, pollfd);
                                        relay_del_connection(relay_connections_ht,
                                                        streams_ht, &iter,
                                                        relay_connection);
+                                       if (last_seen_data_fd == pollfd) {
+                                               last_seen_data_fd = last_notdel_data_fd;
+                                       }
                                } else if (revents & LPOLLIN) {
                                        /* control socket */
                                        if (relay_connection->type == RELAY_CONTROL) {
@@ -1966,34 +1979,101 @@ void *relay_thread_worker(void *data)
                                                        ret = relay_process_control(&recv_hdr,
                                                                        relay_connection,
                                                                        streams_ht);
-                                                       /*
-                                                        * there was an error in processing a control
-                                                        * command: clear the session
-                                                        * */
                                                        if (ret < 0) {
+                                                               /* Clear the session on error. */
                                                                relay_cleanup_poll_connection(&events, pollfd);
                                                                relay_del_connection(relay_connections_ht,
                                                                                streams_ht, &iter,
                                                                                relay_connection);
                                                                DBG("Connection closed with %d", pollfd);
                                                        }
+                                                       seen_control = 1;
                                                }
-                                               /* data socket */
-                                       } else if (relay_connection->type == RELAY_DATA) {
-                                               ret = relay_process_data(relay_connection, streams_ht);
-                                               /* connection closed */
-                                               if (ret < 0) {
-                                                       relay_cleanup_poll_connection(&events, pollfd);
-                                                       relay_del_connection(relay_connections_ht,
-                                                                       streams_ht, &iter,
-                                                                       relay_connection);
-                                                       DBG("Data connection closed with %d", pollfd);
-                                               }
+                                       } else {
+                                               /*
+                                                * Flag the last seen data fd not deleted. It will be
+                                                * used as the last seen fd if any fd gets deleted in
+                                                * this first loop.
+                                                */
+                                               last_notdel_data_fd = pollfd;
+                                       }
+                               }
+                               rcu_read_unlock();
+                       }
+               }
+
+               /*
+                * The last loop handled a control request, go back to poll to make
+                * sure we prioritise the control socket.
+                */
+               if (seen_control) {
+                       continue;
+               }
+
+               if (last_seen_data_fd >= 0) {
+                       for (i = 0; i < nb_fd; i++) {
+                               int pollfd = LTTNG_POLL_GETFD(&events, i);
+                               if (last_seen_data_fd == pollfd) {
+                                       idx = i;
+                                       break;
+                               }
+                       }
+               }
+
+               /* Process data connection. */
+               for (i = idx + 1; i < nb_fd; i++) {
+                       /* Fetch the poll data. */
+                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+                       int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       /* Skip the command pipe. It's handled in the first loop. */
+                       if (pollfd == relay_cmd_pipe[0]) {
+                               continue;
+                       }
+
+                       if (revents) {
+                               rcu_read_lock();
+                               lttng_ht_lookup(relay_connections_ht,
+                                               (void *)((unsigned long) pollfd),
+                                               &iter);
+                               node = lttng_ht_iter_get_node_ulong(&iter);
+                               if (node == NULL) {
+                                       /* Skip it. Might be removed before. */
+                                       rcu_read_unlock();
+                                       continue;
+                               }
+                               relay_connection = caa_container_of(node,
+                                               struct relay_command, sock_n);
+
+                               if (revents & LPOLLIN) {
+                                       if (relay_connection->type != RELAY_DATA) {
+                                               continue;
+                                       }
+
+                                       ret = relay_process_data(relay_connection, streams_ht);
+                                       /* connection closed */
+                                       if (ret < 0) {
+                                               relay_cleanup_poll_connection(&events, pollfd);
+                                               relay_del_connection(relay_connections_ht,
+                                                               streams_ht, &iter,
+                                                               relay_connection);
+                                               DBG("Data connection closed with %d", pollfd);
+                                               /*
+                                                * Every goto restart call sets the last seen fd where
+                                                * here we don't really care since we gracefully
+                                                * continue the loop after the connection is deleted.
+                                                */
+                                       } else {
+                                               /* Keep last seen port. */
+                                               last_seen_data_fd = pollfd;
+                                               rcu_read_unlock();
+                                               goto restart;
                                        }
                                }
                                rcu_read_unlock();
                        }
                }
+               last_seen_data_fd = -1;
        }
 
 exit:
index 2cfad9a25fa1c7e0339ff8e810796c58f1b5b552..49673cd5af643e40a10aa9173d1ff4cf489d0802 100644 (file)
@@ -18,6 +18,7 @@
 #ifndef _LTT_POLL_H
 #define _LTT_POLL_H
 
+#include <assert.h>
 #include <string.h>
 #include <unistd.h>
 
@@ -84,6 +85,19 @@ struct compat_epoll_event {
 };
 #define lttng_poll_event compat_epoll_event
 
+static inline int __lttng_epoll_get_prev_fd(struct lttng_poll_event *events,
+               int index, uint32_t nb_fd)
+{
+       assert(events);
+       assert(index != nb_fd);
+
+       if (index == 0 || nb_fd == 0) {
+               return -1;
+       } else {
+               return events->events[index - 1].data.fd;
+       }
+}
+
 /*
  * For the following calls, consider 'e' to be a lttng_poll_event pointer and i
  * being the index of the events array.
@@ -92,6 +106,8 @@ struct compat_epoll_event {
 #define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->events[i].events
 #define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->nb_fd
 #define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->events_size
+#define LTTNG_POLL_GET_PREV_FD(e, i, nb_fd) \
+       __lttng_epoll_get_prev_fd(LTTNG_REF(e), i, nb_fd)
 
 /*
  * Create the epoll set. No memory allocation is done here.
@@ -229,6 +245,19 @@ struct compat_poll_event {
 };
 #define lttng_poll_event compat_poll_event
 
+static inline int __lttng_poll_get_prev_fd(struct lttng_poll_event *events,
+               int index, uint32_t nb_fd)
+{
+       assert(events);
+       assert(index != nb_fd);
+
+       if (index == 0 || nb_fd == 0) {
+               return -1;
+       } else {
+               return events->current.events[index - 1].fd;
+       }
+}
+
 /*
  * For the following calls, consider 'e' to be a lttng_poll_event pointer and i
  * being the index of the events array.
@@ -237,6 +266,8 @@ struct compat_poll_event {
 #define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->wait.events[i].revents
 #define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->wait.nb_fd
 #define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->wait.events_size
+#define LTTNG_POLL_GET_PREV_FD(e, i, nb_fd) \
+       __lttng_poll_get_prev_fd(LTTNG_REF(e), i, nb_fd)
 
 /*
  * Create a pollfd structure of size 'size'.
This page took 0.030868 seconds and 4 git commands to generate.