#endif
;
+enum relay_connection_status {
+ RELAY_CONNECTION_STATUS_OK,
+ /* An error occured while processing an event on the connection. */
+ RELAY_CONNECTION_STATUS_ERROR,
+ /* Connection closed/shutdown cleanly. */
+ RELAY_CONNECTION_STATUS_CLOSED,
+};
+
/* command line options */
char *opt_output_path;
static int opt_daemon, opt_background;
return ret;
}
-static int relay_process_control_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_payload(
+ struct relay_connection *conn)
{
int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
struct ctrl_connection_state_receive_payload *state =
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive command payload on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive command payload on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_CLOSED;
goto end;
}
DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
ret = relay_process_control_command(conn,
&state->header, &payload_view);
if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
ret = connection_reset_protocol_state(conn);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
end:
- return ret;
+ return status;
}
-static int relay_process_control_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control_receive_header(
+ struct relay_connection *conn)
{
int ret = 0;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct lttcomm_relayd_hdr header;
struct lttng_dynamic_buffer *reception_buffer =
&conn->protocol.ctrl.reception_buffer;
reception_buffer->data + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive control command header on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive control command header on sock %d",
+ conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_CLOSED;
goto end;
}
DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
state->received, state->left_to_receive,
conn->sock->fd);
- ret = 0;
goto end;
}
if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
header.data_size);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
ret = lttng_dynamic_buffer_set_size(reception_buffer,
header.data_size);
if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
* Manually invoke the next state as the poll loop
* will not wake-up to allow us to proceed further.
*/
- ret = relay_process_control_receive_payload(conn);
+ status = relay_process_control_receive_payload(conn);
}
end:
- return ret;
+ return status;
}
/*
* Process the commands received on the control socket
*/
-static int relay_process_control(struct relay_connection *conn)
+static enum relay_connection_status relay_process_control(
+ struct relay_connection *conn)
{
- int ret = 0;
+ enum relay_connection_status status;
switch (conn->protocol.ctrl.state_id) {
case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
- ret = relay_process_control_receive_header(conn);
+ status = relay_process_control_receive_header(conn);
break;
case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
- ret = relay_process_control_receive_payload(conn);
+ status = relay_process_control_receive_payload(conn);
break;
default:
ERR("Unknown control connection protocol state encountered.");
abort();
}
- return ret;
+ return status;
}
/*
return ret;
}
-static int relay_process_data_receive_header(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_header(
+ struct relay_connection *conn)
{
int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct data_connection_state_receive_header *state =
&conn->protocol.data.state.receive_header;
struct lttcomm_relayd_data_hdr header;
state->header_reception_buffer + state->received,
state->left_to_receive, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Unable to receive data header on sock %d", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end;
} else if (ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_CLOSED;
goto end;
}
if (!stream) {
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
header.stream_id);
- ret = 0;
+ /* Protocol error. */
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
&new_id, &stream->stream_fd->fd);
if (ret < 0) {
ERR("Failed to rotate stream output file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
end:
- return ret;
+ return status;
}
-static int relay_process_data_receive_payload(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data_receive_payload(
+ struct relay_connection *conn)
{
int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
struct relay_stream *stream;
struct data_connection_state_receive_payload *state =
&conn->protocol.data.state.receive_payload;
stream = stream_get_by_id(state->header.stream_id);
if (!stream) {
+ /* Protocol error. */
DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
state->header.stream_id);
- ret = 0;
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end;
}
ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
recv_size, MSG_DONTWAIT);
if (ret < 0) {
- ERR("Socket %d error %d", conn->sock->fd, ret);
- ret = -1;
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Socket %d error", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
goto end_stream_unlock;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
state->header.stream_id);
+ status = RELAY_CONNECTION_STATUS_CLOSED;
break;
} else if (ret < (int) recv_size) {
/*
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
- ret = -1;
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- ret = 0;
goto end_stream_unlock;
}
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
}
ret = try_rotate_stream(stream);
if (ret < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
goto end_stream_unlock;
}
stream_put(stream);
end:
- return ret;
+ return status;
}
/*
* relay_process_data: Process the data received on the data socket
*/
-static int relay_process_data(struct relay_connection *conn)
+static enum relay_connection_status relay_process_data(
+ struct relay_connection *conn)
{
- int ret;
+ enum relay_connection_status status;
switch (conn->protocol.data.state_id) {
case DATA_CONNECTION_STATE_RECEIVE_HEADER:
- ret = relay_process_data_receive_header(conn);
+ status = relay_process_data_receive_header(conn);
break;
case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
- ret = relay_process_data_receive_payload(conn);
+ status = relay_process_data_receive_payload(conn);
break;
default:
ERR("Unexpected data connection communication state.");
abort();
}
- return ret;
+ return status;
}
static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
assert(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
- ret = relay_process_control(ctrl_conn);
- if (ret < 0) {
- /* Clear the connection on error. */
+ enum relay_connection_status status;
+
+ status = relay_process_control(ctrl_conn);
+ if (status != RELAY_CONNECTION_STATUS_OK) {
+ /* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
ctrl_conn);
assert(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
- ret = relay_process_data(data_conn);
- /* Connection closed */
- if (ret < 0) {
+ enum relay_connection_status status;
+
+ status = relay_process_data(data_conn);
+ /* Connection closed or error. */
+ if (status != RELAY_CONNECTION_STATUS_OK) {
relay_thread_close_connection(&events, pollfd,
data_conn);
/*