static
void *relay_thread_worker(void *data)
{
- int i, ret, pollfd, err = -1;
- uint32_t revents, nb_fd;
+ int ret, err = -1, last_seen_data_fd = -1;
+ uint32_t nb_fd;
struct relay_command *relay_connection;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
goto error;
}
+restart:
while (1) {
+ int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+
/* Infinite blocking call, waiting for transmission */
- restart:
DBG3("Relayd worker thread polling...");
ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
nb_fd = ret;
+ /*
+ * Process control. The control connection is prioritised so we don't
+ * starve it with high throughout put tracing data on the data
+ * connection.
+ */
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
goto error;
}
}
- } else if (revents > 0) {
+ } else if (revents) {
rcu_read_lock();
lttng_ht_lookup(relay_connections_ht,
(void *)((unsigned long) pollfd),
relay_del_connection(relay_connections_ht,
streams_ht, &iter,
relay_connection);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Socket %d hung up", pollfd);
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
streams_ht, &iter,
relay_connection);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else if (revents & LPOLLIN) {
/* control socket */
if (relay_connection->type == RELAY_CONTROL) {
ret = relay_process_control(&recv_hdr,
relay_connection,
streams_ht);
- /*
- * there was an error in processing a control
- * command: clear the session
- * */
if (ret < 0) {
+ /* Clear the session on error. */
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
streams_ht, &iter,
relay_connection);
DBG("Connection closed with %d", pollfd);
}
+ seen_control = 1;
}
- /* data socket */
- } else if (relay_connection->type == RELAY_DATA) {
- ret = relay_process_data(relay_connection, streams_ht);
- /* connection closed */
- if (ret < 0) {
- relay_cleanup_poll_connection(&events, pollfd);
- relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
- DBG("Data connection closed with %d", pollfd);
- }
+ } else {
+ /*
+ * Flag the last seen data fd not deleted. It will be
+ * used as the last seen fd if any fd gets deleted in
+ * this first loop.
+ */
+ last_notdel_data_fd = pollfd;
+ }
+ }
+ rcu_read_unlock();
+ }
+ }
+
+ /*
+ * The last loop handled a control request, go back to poll to make
+ * sure we prioritise the control socket.
+ */
+ if (seen_control) {
+ continue;
+ }
+
+ if (last_seen_data_fd >= 0) {
+ for (i = 0; i < nb_fd; i++) {
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (last_seen_data_fd == pollfd) {
+ idx = i;
+ break;
+ }
+ }
+ }
+
+ /* Process data connection. */
+ for (i = idx + 1; i < nb_fd; i++) {
+ /* Fetch the poll data. */
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Skip the command pipe. It's handled in the first loop. */
+ if (pollfd == relay_cmd_pipe[0]) {
+ continue;
+ }
+
+ if (revents) {
+ rcu_read_lock();
+ lttng_ht_lookup(relay_connections_ht,
+ (void *)((unsigned long) pollfd),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ /* Skip it. Might be removed before. */
+ rcu_read_unlock();
+ continue;
+ }
+ relay_connection = caa_container_of(node,
+ struct relay_command, sock_n);
+
+ if (revents & LPOLLIN) {
+ if (relay_connection->type != RELAY_DATA) {
+ continue;
+ }
+
+ ret = relay_process_data(relay_connection, streams_ht);
+ /* connection closed */
+ if (ret < 0) {
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
+ DBG("Data connection closed with %d", pollfd);
+ /*
+ * Every goto restart call sets the last seen fd where
+ * here we don't really care since we gracefully
+ * continue the loop after the connection is deleted.
+ */
+ } else {
+ /* Keep last seen port. */
+ last_seen_data_fd = pollfd;
+ rcu_read_unlock();
+ goto restart;
}
}
rcu_read_unlock();
}
}
+ last_seen_data_fd = -1;
}
exit:
#ifndef _LTT_POLL_H
#define _LTT_POLL_H
+#include <assert.h>
#include <string.h>
#include <unistd.h>
};
#define lttng_poll_event compat_epoll_event
+static inline int __lttng_epoll_get_prev_fd(struct lttng_poll_event *events,
+ int index, uint32_t nb_fd)
+{
+ assert(events);
+ assert(index != nb_fd);
+
+ if (index == 0 || nb_fd == 0) {
+ return -1;
+ } else {
+ return events->events[index - 1].data.fd;
+ }
+}
+
/*
* For the following calls, consider 'e' to be a lttng_poll_event pointer and i
* being the index of the events array.
#define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->events[i].events
#define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->nb_fd
#define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->events_size
+#define LTTNG_POLL_GET_PREV_FD(e, i, nb_fd) \
+ __lttng_epoll_get_prev_fd(LTTNG_REF(e), i, nb_fd)
/*
* Create the epoll set. No memory allocation is done here.
};
#define lttng_poll_event compat_poll_event
+static inline int __lttng_poll_get_prev_fd(struct lttng_poll_event *events,
+ int index, uint32_t nb_fd)
+{
+ assert(events);
+ assert(index != nb_fd);
+
+ if (index == 0 || nb_fd == 0) {
+ return -1;
+ } else {
+ return events->current.events[index - 1].fd;
+ }
+}
+
/*
* For the following calls, consider 'e' to be a lttng_poll_event pointer and i
* being the index of the events array.
#define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->wait.events[i].revents
#define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->wait.nb_fd
#define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->wait.events_size
+#define LTTNG_POLL_GET_PREV_FD(e, i, nb_fd) \
+ __lttng_poll_get_prev_fd(LTTNG_REF(e), i, nb_fd)
/*
* Create a pollfd structure of size 'size'.