#include "lttng-sessiond.h"
#include "kernel.h"
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
/* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
#define MAX_CAPTURE_SIZE (PIPE_BUF)
goto error;
}
+ client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
ret = lttng_poll_add(&state->events, client->socket,
- LPOLLIN | LPOLLERR |
- LPOLLHUP | LPOLLRDHUP);
+ client->communication.current_poll_events);
if (ret < 0) {
ERR("Failed to add notification channel client socket to poll set");
ret = 0;
return error_occurred ? -1 : 0;
}
+static
+bool client_has_outbound_data_left(
+ const struct notification_client *client)
+{
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const bool has_data = pv.buffer.size != 0;
+ const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+ return has_data || has_fds;
+}
+
static
int client_handle_transmission_status(
struct notification_client *client,
switch (transmission_status) {
case CLIENT_TRANSMISSION_STATUS_COMPLETE:
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN);
- if (ret) {
- goto end;
- }
-
- break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
+ {
+ int current_poll_events;
+ int new_poll_events;
/*
* We want to be notified whenever there is buffer space
- * available to send the rest of the payload.
+ * available to send the rest of the payload if we are
+ * waiting to send data to the client.
+ *
+ * The state of the outbound queue being sampled here is
+ * fine since:
+ * - it is okay to wake-up "for nothing" in case we see
+ * that data is left, but another thread succeeds in
+ * flushing it before us when handling the client "out"
+ * event. We will simply stop monitoring that event the next
+ * time it wakes us up and we see no data left to be sent,
+ * - if another thread fails to flush the entire client
+ * outgoing queue, it will issue a "communication update"
+ * command and cause the client's (e)poll mask to be
+ * re-evaluated.
+ *
+ * The situation we seek to avoid would be to disable the
+ * monitoring of "out" client events indefinitely when there is
+ * data to be sent, which can't happen because of the
+ * aforementioned "communication update" mechanism.
*/
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN_OUT);
- if (ret) {
- goto end;
+ pthread_mutex_lock(&client->lock);
+ current_poll_events = client->communication.current_poll_events;
+ new_poll_events = client_has_outbound_data_left(client) ?
+ CLIENT_POLL_EVENTS_IN_OUT :
+ CLIENT_POLL_EVENTS_IN;
+ client->communication.current_poll_events = new_poll_events;
+ pthread_mutex_unlock(&client->lock);
+
+ /* Update the monitored event set only if it changed. */
+ if (current_poll_events != new_poll_events) {
+ ret = lttng_poll_mod(&state->events, client->socket,
+ new_poll_events);
+ if (ret) {
+ goto end;
+ }
}
+
break;
+ }
case CLIENT_TRANSMISSION_STATUS_FAIL:
ret = notification_thread_client_disconnect(client, state);
if (ret) {
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-static
-bool client_has_outbound_data_left(
- const struct notification_client *client)
-{
- const struct lttng_payload_view pv = lttng_payload_view_from_payload(
- &client->communication.outbound.payload, 0, -1);
- const bool has_data = pv.buffer.size != 0;
- const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
- return has_data || has_fds;
-}
-
/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
}
pthread_mutex_lock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
+ if (!client_has_outbound_data_left(client)) {
+ /*
+ * A client "out" event can be received when no payload is left
+ * to send under some circumstances.
+ *
+ * Many threads can flush a client's outgoing queue and, if they
+ * had to queue their message (socket was full), will use the
+ * "communication update" command to signal the (e)poll thread
+ * to monitor for space being made available in the socket.
+ *
+ * Commands are sent over an internal pipe serviced by the same
+ * thread as the client sockets.
+ *
+ * When space is made available in the socket, there is a race
+ * between the (e)poll thread and the other threads that may
+ * wish to use the client's socket to flush its outgoing queue.
+ *
+ * A non-(e)poll thread may attempt (and succeed) in flushing
+ * the queue before the (e)poll thread gets a chance to service
+ * the client's "out" event.
+ *
+ * In this situation, the (e)poll thread processing the client
+ * out event will see an empty payload: there is nothing to do
+ * except unsubscribing (e)poll "out" events.
+ *
+ * Note that this thread is the (e)poll thread so it can modify
+ * the (e)poll mask directly without using a communication
+ * update command. Other threads that flush the outgoing queue
+ * will use the "communication update" command to wake up this
+ * thread and force it to monitor "out" events.
+ *
+ * When other threads succeed in emptying the outgoing queue,
+ * they don't need to update the (e)poll mask: if the "out"
+ * event is monitored, it will fire once and the (e)poll
+ * thread will reach this condition, causing the event to
+ * stop being monitored.
+ */
+ transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ } else {
+ transmission_status = client_flush_outgoing_queue(client);
+ }
pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(