client->socket = -1;
}
client->communication.active = false;
- lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
- lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
+ lttng_payload_reset(&client->communication.inbound.payload);
+ lttng_payload_reset(&client->communication.outbound.payload);
pthread_mutex_destroy(&client->lock);
call_rcu(&client->rcu_node, free_notification_client_rcu);
}
{
int ret;
- ret = lttng_dynamic_buffer_set_size(
- &client->communication.inbound.buffer, 0);
- assert(!ret);
+
+ lttng_payload_clear(&client->communication.inbound.payload);
client->communication.inbound.bytes_to_receive =
sizeof(struct lttng_notification_channel_message);
LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
ret = lttng_dynamic_buffer_set_size(
- &client->communication.inbound.buffer,
+ &client->communication.inbound.payload.buffer,
client->communication.inbound.bytes_to_receive);
+
return ret;
}
pthread_mutex_init(&client->lock, NULL);
client->id = state->next_notification_client_id++;
CDS_INIT_LIST_HEAD(&client->condition_list);
- lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
- lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
+ lttng_payload_init(&client->communication.inbound.payload);
+ lttng_payload_init(&client->communication.outbound.payload);
client->communication.inbound.expect_creds = true;
ret = client_reset_inbound_state(client);
ssize_t ret;
size_t to_send_count;
enum client_transmission_status status;
+ struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const int fds_to_send_count =
+ lttng_payload_view_get_fd_handle_count(&pv);
ASSERT_LOCKED(client->lock);
goto end;
}
- assert(client->communication.outbound.buffer.size != 0);
- to_send_count = client->communication.outbound.buffer.size;
+ if (pv.buffer.size == 0) {
+ /*
+ * If both data and fds are equal to zero, we are in an invalid
+ * state.
+ */
+ assert(fds_to_send_count != 0);
+ goto send_fds;
+ }
+
+ /* Send data. */
+ to_send_count = pv.buffer.size;
DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
client->socket);
ret = lttcomm_send_unix_sock_non_block(client->socket,
- client->communication.outbound.buffer.data,
+ pv.buffer.data,
to_send_count);
if ((ret >= 0 && ret < to_send_count)) {
DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
client->socket);
to_send_count -= max(ret, 0);
- memcpy(client->communication.outbound.buffer.data,
- client->communication.outbound.buffer.data +
- client->communication.outbound.buffer.size - to_send_count,
+ memcpy(client->communication.outbound.payload.buffer.data,
+ pv.buffer.data +
+ pv.buffer.size - to_send_count,
to_send_count);
ret = lttng_dynamic_buffer_set_size(
- &client->communication.outbound.buffer,
+ &client->communication.outbound.payload.buffer,
to_send_count);
if (ret) {
goto error;
}
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
+ goto end;
} else if (ret < 0) {
/* Generic error, disable the client's communication. */
ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
client->communication.active = false;
status = CLIENT_TRANSMISSION_STATUS_FAIL;
+ goto end;
} else {
- /* No error and flushed the queue completely. */
+ /*
+ * No error and flushed the queue completely.
+ *
+ * The payload buffer size is used later to
+ * check if there is notifications queued. So albeit that the
+ * direct caller knows that the transmission is complete, we
+ * need to set the buffer size to zero.
+ */
ret = lttng_dynamic_buffer_set_size(
- &client->communication.outbound.buffer, 0);
+ &client->communication.outbound.payload.buffer, 0);
if (ret) {
goto error;
}
+ }
- client->communication.outbound.queued_command_reply = false;
- client->communication.outbound.dropped_notification = false;
+send_fds:
+ /* No fds to send, transmission is complete. */
+ if (fds_to_send_count == 0) {
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ goto end;
}
+
+ ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
+ client->socket, &pv);
+ if (ret < 0) {
+ /* Generic error, disable the client's communication. */
+ ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
+ client->socket);
+ client->communication.active = false;
+ status = CLIENT_TRANSMISSION_STATUS_FAIL;
+ goto end;
+ } else if (ret == 0) {
+ /* Nothing could be sent. */
+ status = CLIENT_TRANSMISSION_STATUS_QUEUED;
+ } else {
+ /* Fd passing is an all or nothing kind of thing. */
+ status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ /*
+ * The payload _fd_array count is used later to
+ * check if there is notifications queued. So although the
+ * direct caller knows that the transmission is complete, we
+ * need to clear the _fd_array for the queuing check.
+ */
+ lttng_dynamic_pointer_array_clear(
+ &client->communication.outbound.payload
+ ._fd_handles);
+ }
+
end:
+ if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
+ client->communication.outbound.queued_command_reply = false;
+ client->communication.outbound.dropped_notification = false;
+ lttng_payload_clear(&client->communication.outbound.payload);
+ }
+
return status;
error:
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
+static
+bool client_has_outbound_data_left(
+ const struct notification_client *client)
+{
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const bool has_data = pv.buffer.size != 0;
+ const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+ return has_data || has_fds;
+}
+
/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
/* Enqueue buffer to outgoing queue and flush it. */
ret = lttng_dynamic_buffer_append(
- &client->communication.outbound.buffer,
+ &client->communication.outbound.payload.buffer,
buffer, sizeof(buffer));
if (ret) {
goto error_unlock;
}
transmission_status = client_flush_outgoing_queue(client);
- if (client->communication.outbound.buffer.size != 0) {
+
+ if (client_has_outbound_data_left(client)) {
/* Queue could not be emptied. */
client->communication.outbound.queued_command_reply = true;
}
*/
const struct lttng_notification_channel_message *msg;
- assert(sizeof(*msg) == client->communication.inbound.buffer.size);
+ assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
msg = (const struct lttng_notification_channel_message *)
- client->communication.inbound.buffer.data;
+ client->communication.inbound.payload.buffer.data;
if (msg->size == 0 ||
msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
}
client->communication.inbound.bytes_to_receive = msg->size;
+ client->communication.inbound.fds_to_receive = msg->fds;
client->communication.inbound.msg_type =
(enum lttng_notification_channel_message_type) msg->type;
ret = lttng_dynamic_buffer_set_size(
- &client->communication.inbound.buffer, msg->size);
+ &client->communication.inbound.payload.buffer, msg->size);
+
+ /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
+ msg = NULL;
end:
return ret;
}
handshake_client =
(struct lttng_notification_channel_command_handshake *)
- client->communication.inbound.buffer
+ client->communication.inbound.payload.buffer
.data;
client->major = handshake_client->major;
client->minor = handshake_client->minor;
pthread_mutex_lock(&client->lock);
/* Outgoing queue will be flushed when the command reply is sent. */
ret = lttng_dynamic_buffer_append(
- &client->communication.outbound.buffer, send_buffer,
+ &client->communication.outbound.payload.buffer, send_buffer,
sizeof(send_buffer));
if (ret) {
ERR("[notification-thread] Failed to send protocol version to notification channel client");
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
struct lttng_payload_view condition_view =
- lttng_payload_view_from_dynamic_buffer(
- &client->communication.inbound.buffer,
+ lttng_payload_view_from_payload(
+ &client->communication.inbound.payload,
0, -1);
size_t expected_condition_size;
* other thread accessing clients (action executor) only uses the
* outbound state.
*/
- expected_condition_size = client->communication.inbound.buffer.size;
+ expected_condition_size = client->communication.inbound.payload.buffer.size;
ret = lttng_condition_create_from_payload(&condition_view, &condition);
if (ret != expected_condition_size) {
ERR("[notification-thread] Malformed condition received from client");
struct notification_client *client;
ssize_t recv_ret;
size_t offset;
- bool message_is_complete = false;
rcu_read_lock();
client = get_client_from_socket(socket, state);
goto end;
}
- offset = client->communication.inbound.buffer.size -
+ offset = client->communication.inbound.payload.buffer.size -
client->communication.inbound.bytes_to_receive;
if (client->communication.inbound.expect_creds) {
recv_ret = lttcomm_recv_creds_unix_sock(socket,
- client->communication.inbound.buffer.data + offset,
+ client->communication.inbound.payload.buffer.data + offset,
client->communication.inbound.bytes_to_receive,
&client->communication.inbound.creds);
if (recv_ret > 0) {
}
} else {
recv_ret = lttcomm_recv_unix_sock_non_block(socket,
- client->communication.inbound.buffer.data + offset,
+ client->communication.inbound.payload.buffer.data + offset,
client->communication.inbound.bytes_to_receive);
}
if (recv_ret >= 0) {
client->communication.inbound.bytes_to_receive -= recv_ret;
- message_is_complete = client->communication.inbound
- .bytes_to_receive == 0;
+ } else {
+ goto error_disconnect_client;
}
- if (recv_ret < 0) {
- goto error_disconnect_client;
+ if (client->communication.inbound.bytes_to_receive != 0) {
+ /* Message incomplete wait for more data. */
+ ret = 0;
+ goto end;
}
- if (message_is_complete) {
- ret = client_dispatch_message(client, state);
- if (ret) {
+ assert(client->communication.inbound.bytes_to_receive == 0);
+
+ /* Receive fds. */
+ if (client->communication.inbound.fds_to_receive != 0) {
+ ret = lttcomm_recv_payload_fds_unix_sock_non_block(
+ client->socket,
+ client->communication.inbound.fds_to_receive,
+ &client->communication.inbound.payload);
+ if (ret > 0) {
/*
- * Only returns an error if this client must be
- * disconnected.
+ * Fds received. non blocking fds passing is all
+ * or nothing.
*/
+ ssize_t expected_size;
+
+ expected_size = sizeof(int) *
+ client->communication.inbound
+ .fds_to_receive;
+ assert(ret == expected_size);
+ client->communication.inbound.fds_to_receive = 0;
+ } else if (ret == 0) {
+ /* Received nothing. */
+ ret = 0;
+ goto end;
+ } else {
goto error_disconnect_client;
}
}
+
+ /* At this point the message is complete.*/
+ assert(client->communication.inbound.bytes_to_receive == 0 &&
+ client->communication.inbound.fds_to_receive == 0);
+ ret = client_dispatch_message(client, state);
+ if (ret) {
+ /*
+ * Only returns an error if this client must be
+ * disconnected.
+ */
+ goto error_disconnect_client;
+ }
+
end:
rcu_read_unlock();
return ret;
+
error_disconnect_client:
ret = notification_thread_client_disconnect(client, state);
goto end;
client->communication.outbound.dropped_notification = true;
ret = lttng_dynamic_buffer_append(
- &client->communication.outbound.buffer, &msg,
+ &client->communication.outbound.payload.buffer, &msg,
sizeof(msg));
if (ret) {
PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
->size = (uint32_t)(
msg_payload.buffer.size - sizeof(msg_header));
+ /* Update the payload number of fds. */
+ {
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &msg_payload, 0, -1);
+
+ ((struct lttng_notification_channel_message *)
+ msg_payload.buffer.data)->fds = (uint32_t)
+ lttng_payload_view_get_fd_handle_count(&pv);
+ }
+
pthread_mutex_lock(&client_list->lock);
cds_list_for_each_entry_safe(client_list_element, tmp,
&client_list->list, node) {
DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
client->socket, msg_payload.buffer.size);
- if (client->communication.outbound.buffer.size) {
+
+ if (client_has_outbound_data_left(client)) {
/*
* Outgoing data is already buffered for this client;
* drop the notification and enqueue a "dropped
}
}
- ret = lttng_dynamic_buffer_append_buffer(
- &client->communication.outbound.buffer,
- &msg_payload.buffer);
+ ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
if (ret) {
/* Fatal error. */
goto skip_client;
#include <common/dynamic-buffer.h>
#include <common/utils.h>
#include <common/defaults.h>
+#include <common/payload.h>
+#include <common/payload-view.h>
+#include <common/unix.h>
#include <assert.h>
#include "lttng-ctl-helper.h"
#include <common/compat/poll.h>
ssize_t ret;
struct lttng_notification_channel_message msg;
- if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
- ret = -1;
- goto end;
- }
+ lttng_payload_clear(&channel->reception_payload);
ret = lttcomm_recv_unix_sock(channel->socket, &msg, sizeof(msg));
if (ret <= 0) {
}
/* Add message header at buffer's start. */
- ret = lttng_dynamic_buffer_append(&channel->reception_buffer, &msg,
+ ret = lttng_dynamic_buffer_append(&channel->reception_payload.buffer, &msg,
sizeof(msg));
if (ret) {
goto error;
}
/* Reserve space for the payload. */
- ret = lttng_dynamic_buffer_set_size(&channel->reception_buffer,
- channel->reception_buffer.size + msg.size);
+ ret = lttng_dynamic_buffer_set_size(&channel->reception_payload.buffer,
+ channel->reception_payload.buffer.size + msg.size);
if (ret) {
goto error;
}
/* Receive message payload. */
ret = lttcomm_recv_unix_sock(channel->socket,
- channel->reception_buffer.data + sizeof(msg), msg.size);
+ channel->reception_payload.buffer.data + sizeof(msg), msg.size);
if (ret < (ssize_t) msg.size) {
ret = -1;
goto error;
}
+
+ /* Receive message fds. */
+ if (msg.fds != 0) {
+ ret = lttcomm_recv_payload_fds_unix_sock(channel->socket,
+ msg.fds, &channel->reception_payload);
+ if (ret < sizeof(int) * msg.fds) {
+ ret = -1;
+ goto error;
+ }
+ }
ret = 0;
end:
return ret;
error:
- if (lttng_dynamic_buffer_set_size(&channel->reception_buffer, 0)) {
- ret = -1;
- }
+ lttng_payload_clear(&channel->reception_payload);
goto end;
}
{
struct lttng_notification_channel_message *msg;
- assert(channel->reception_buffer.size >= sizeof(*msg));
+ assert(channel->reception_payload.buffer.size >= sizeof(*msg));
msg = (struct lttng_notification_channel_message *)
- channel->reception_buffer.data;
+ channel->reception_payload.buffer.data;
return (enum lttng_notification_channel_message_type) msg->type;
}
ssize_t ret;
struct lttng_notification *notification = NULL;
- if (channel->reception_buffer.size <=
+ if (channel->reception_payload.buffer.size <=
sizeof(struct lttng_notification_channel_message)) {
goto end;
}
{
- struct lttng_payload_view view = lttng_payload_view_from_dynamic_buffer(
- &channel->reception_buffer,
+ struct lttng_payload_view view = lttng_payload_view_from_payload(
+ &channel->reception_payload,
sizeof(struct lttng_notification_channel_message),
-1);
&view, ¬ification);
}
- if (ret != channel->reception_buffer.size -
+ if (ret != channel->reception_payload.buffer.size -
sizeof(struct lttng_notification_channel_message)) {
lttng_notification_destroy(notification);
notification = NULL;
}
channel->socket = -1;
pthread_mutex_init(&channel->lock, NULL);
- lttng_dynamic_buffer_init(&channel->reception_buffer);
+ lttng_payload_init(&channel->reception_payload);
CDS_INIT_LIST_HEAD(&channel->pending_notifications.list);
is_root = (getuid() == 0);
struct lttng_notification_channel_command_handshake *handshake;
handshake = (struct lttng_notification_channel_command_handshake *)
- (channel->reception_buffer.data +
+ (channel->reception_payload.buffer.data +
sizeof(struct lttng_notification_channel_message));
channel->version.major = handshake->major;
channel->version.minor = handshake->minor;
}
exit_loop:
- if (channel->reception_buffer.size <
+ if (channel->reception_payload.buffer.size <
(sizeof(struct lttng_notification_channel_message) +
sizeof(*reply))) {
/* Invalid message received. */
}
reply = (struct lttng_notification_channel_command_reply *)
- (channel->reception_buffer.data +
+ (channel->reception_payload.buffer.data +
sizeof(struct lttng_notification_channel_message));
*status = (enum lttng_notification_channel_status) reply->status;
end:
pthread_mutex_lock(&channel->lock);
socket = channel->socket;
+
if (!lttng_condition_validate(condition)) {
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_INVALID;
goto end_unlock;
((struct lttng_notification_channel_message *) payload.buffer.data)->size =
(uint32_t) (payload.buffer.size - sizeof(cmd_header));
- ret = lttcomm_send_unix_sock(
- socket, payload.buffer.data, payload.buffer.size);
- if (ret < 0) {
- status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
- goto end_unlock;
+ {
+ struct lttng_payload_view pv =
+ lttng_payload_view_from_payload(
+ &payload, 0, -1);
+ const int fd_count =
+ lttng_payload_view_get_fd_handle_count(&pv);
+
+ /* Update fd count. */
+ ((struct lttng_notification_channel_message *) payload.buffer.data)->fds =
+ (uint32_t) fd_count;
+
+ ret = lttcomm_send_unix_sock(
+ socket, pv.buffer.data, pv.buffer.size);
+ if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_unlock;
+ }
+
+ /* Pass fds if present. */
+ if (fd_count > 0) {
+ ret = lttcomm_send_payload_view_fds_unix_sock(socket,
+ &pv);
+ if (ret < 0) {
+ status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR;
+ goto end_unlock;
+ }
+ }
}
ret = receive_command_reply(channel, &status);
(void) lttcomm_close_unix_sock(channel->socket);
}
pthread_mutex_destroy(&channel->lock);
- lttng_dynamic_buffer_reset(&channel->reception_buffer);
+ lttng_payload_reset(&channel->reception_payload);
free(channel);
}