ret = lttng_write(wpipe, "!", 1);
if (ret < 1) {
PERROR("write poll pipe");
+ goto end;
}
-
+ ret = 0;
+end:
return ret;
}
-static void notify_health_quit_pipe(int *pipe)
+static
+int notify_health_quit_pipe(int *pipe)
{
ssize_t ret;
ret = lttng_write(pipe[1], "4", 1);
if (ret < 1) {
PERROR("write relay health quit");
+ goto end;
}
+ ret = 0;
+end:
+ return ret;
}
/*
- * Stop all threads by closing the thread quit pipe.
+ * Stop all relayd and relayd-live threads.
*/
-static
-void stop_threads(void)
+int lttng_relay_stop_threads(void)
{
- int ret;
+ int retval = 0;
/* Stopping all threads */
DBG("Terminating all threads");
- ret = notify_thread_pipe(thread_quit_pipe[1]);
- if (ret < 0) {
+ if (notify_thread_pipe(thread_quit_pipe[1])) {
ERR("write error on thread quit pipe");
+ retval = -1;
}
- notify_health_quit_pipe(health_quit_pipe);
+ if (notify_health_quit_pipe(health_quit_pipe)) {
+ ERR("write error on health quit pipe");
+ }
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_conn_queue.futex);
- ret = relayd_live_stop();
- if (ret) {
+ if (relayd_live_stop()) {
ERR("Error stopping live threads");
+ retval = -1;
}
+ return retval;
}
/*
return;
case SIGINT:
DBG("SIGINT caught");
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
break;
case SIGTERM:
DBG("SIGTERM caught");
- stop_threads();
+ if (lttng_relay_stop_threads()) {
+ ERR("Error stopping threads");
+ }
break;
case SIGUSR1:
CMM_STORE_SHARED(recv_child_signal, 1);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
}
health_unregister(health_relayd);
DBG("Relay listener thread cleanup complete");
- stop_threads();
+ lttng_relay_stop_threads();
return NULL;
}
}
health_unregister(health_relayd);
DBG("Dispatch thread dying");
- stop_threads();
+ lttng_relay_stop_threads();
return NULL;
}
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_conn_pipe[0]) {
continue;
}
- if (revents) {
- rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- if (!conn) {
- /* Skip it. Might be removed before. */
+ rcu_read_lock();
+ conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ if (!conn) {
+ /* Skip it. Might be removed before. */
+ rcu_read_unlock();
+ continue;
+ }
+
+ if (revents & LPOLLIN) {
+ if (conn->type != RELAY_DATA) {
rcu_read_unlock();
continue;
}
- if (revents & LPOLLIN) {
- if (conn->type != RELAY_DATA) {
- rcu_read_unlock();
- continue;
- }
-
- ret = relay_process_data(conn);
- /* Connection closed */
- if (ret < 0) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
- DBG("Data connection closed with %d", pollfd);
- /*
- * Every goto restart call sets the last seen fd where
- * here we don't really care since we gracefully
- * continue the loop after the connection is deleted.
- */
- } else {
- /* Keep last seen port. */
- last_seen_data_fd = pollfd;
- rcu_read_unlock();
- goto restart;
- }
+ ret = relay_process_data(conn);
+ /* Connection closed */
+ if (ret < 0) {
+ cleanup_connection_pollfd(&events, pollfd);
+ destroy_connection(relay_connections_ht, conn);
+ DBG("Data connection closed with %d", pollfd);
+ /*
+ * Every goto restart call sets the last seen fd where
+ * here we don't really care since we gracefully
+ * continue the loop after the connection is deleted.
+ */
+ } else {
+ /* Keep last seen port. */
+ last_seen_data_fd = pollfd;
+ rcu_read_unlock();
+ goto restart;
}
- rcu_read_unlock();
}
+ rcu_read_unlock();
}
last_seen_data_fd = -1;
}
}
health_unregister(health_relayd);
rcu_unregister_thread();
- stop_threads();
+ lttng_relay_stop_threads();
return NULL;
}
cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
/* Set up max poll set size */
- lttng_poll_set_max_size();
+ if (lttng_poll_set_max_size()) {
+ retval = -1;
+ goto exit_init_data;
+ }
/* Initialize communication library */
lttcomm_init();