{
int ret, err = -1, last_seen_data_fd = -1;
uint32_t nb_fd;
- struct relay_connection *conn;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
struct relay_index *index;
+ struct relay_connection *destroy_conn = NULL;
DBG("[thread] Relay worker started");
ERR("Relay connection pipe error");
goto error;
} else if (revents & LPOLLIN) {
+ struct relay_connection *conn;
+
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
DBG("Connection socket %d added", conn->sock->fd);
}
} else {
+ struct relay_connection *ctrl_conn;
+
rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ ctrl_conn = connection_find_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
- assert(conn);
+ assert(ctrl_conn);
if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, ctrl_conn);
if (last_seen_data_fd == pollfd) {
last_seen_data_fd = last_notdel_data_fd;
}
} else if (revents & LPOLLIN) {
- if (conn->type == RELAY_CONTROL) {
- ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr,
+ if (ctrl_conn->type == RELAY_CONTROL) {
+ ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
sizeof(recv_hdr), 0);
if (ret <= 0) {
/* Connection closed */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, ctrl_conn);
DBG("Control connection closed with %d", pollfd);
} else {
- ret = relay_process_control(&recv_hdr, conn);
+ ret = relay_process_control(&recv_hdr, ctrl_conn);
if (ret < 0) {
/* Clear the session on error. */
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, ctrl_conn);
DBG("Connection closed with %d", pollfd);
}
seen_control = 1;
/* Fetch the poll data. */
uint32_t revents = LTTNG_POLL_GETEV(&events, i);
int pollfd = LTTNG_POLL_GETFD(&events, i);
+ struct relay_connection *data_conn;
health_code_update();
}
rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- if (!conn) {
+ data_conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ if (!data_conn) {
/* Skip it. Might be removed before. */
rcu_read_unlock();
continue;
}
if (revents & LPOLLIN) {
- if (conn->type != RELAY_DATA) {
+ if (data_conn->type != RELAY_DATA) {
rcu_read_unlock();
continue;
}
- ret = relay_process_data(conn);
+ ret = relay_process_data(data_conn);
/* Connection closed */
if (ret < 0) {
cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, data_conn);
DBG("Data connection closed with %d", pollfd);
/*
* Every goto restart call sets the last seen fd where
/* Cleanup reamaining connection object. */
rcu_read_lock();
- cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, conn,
+ cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
+ destroy_conn,
sock_n.node) {
health_code_update();
- destroy_connection(relay_connections_ht, conn);
+ destroy_connection(relay_connections_ht, destroy_conn);
}
rcu_read_unlock();
error_poll_create: