ret = -1;
goto end;
}
- reply.major = htobe32(reply.major);
- reply.minor = htobe32(reply.minor);
- ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
- sizeof(struct lttcomm_relayd_version), 0);
- if (ret < 0) {
- ERR("Relay sending version");
- }
/* Major versions must be the same */
if (reply.major != be32toh(msg.major)) {
- DBG("Incompatible major versions, deleting session");
+ DBG("Incompatible major versions (%u vs %u), deleting session",
+ reply.major, be32toh(msg.major));
relay_delete_session(cmd, streams_ht);
ret = 0;
goto end;
}
+ reply.major = htobe32(reply.major);
+ reply.minor = htobe32(reply.minor);
+ ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
+ sizeof(struct lttcomm_relayd_version), 0);
+ if (ret < 0) {
+ ERR("Relay sending version");
+ }
+
+#if 0
cmd->session->major = reply.major;
/* We adapt to the lowest compatible version */
if (reply.minor <= be32toh(msg.minor)) {
DBG("Version check done using protocol %u.%u", cmd->session->major,
cmd->session->minor);
+#endif
end:
return ret;
* On success, the relayd_sock pointer is set to the created socket.
* Else, it's stays untouched and a lttcomm error code is returned.
*/
-static int create_connect_relayd(struct consumer_output *output,
- const char *session_name, struct lttng_uri *uri,
- struct lttcomm_sock **relayd_sock,
- struct ltt_session *session)
+static int create_connect_relayd(struct lttng_uri *uri,
+ struct lttcomm_relayd_sock **relayd_sock)
{
int ret;
- struct lttcomm_sock *sock;
- uint32_t minor;
+ struct lttcomm_relayd_sock *rsock;
- /* Create socket object from URI */
- sock = lttcomm_alloc_sock_from_uri(uri);
- if (sock == NULL) {
- ret = LTTNG_ERR_FATAL;
- goto error;
- }
-
- ret = lttcomm_create_sock(sock);
- if (ret < 0) {
+ rsock = lttcomm_alloc_relayd_sock(uri, RELAYD_VERSION_COMM_MAJOR,
+ RELAYD_VERSION_COMM_MINOR);
+ if (!rsock) {
ret = LTTNG_ERR_FATAL;
goto error;
}
* state to be in poll execution.
*/
health_poll_entry();
- ret = relayd_connect(sock);
+ ret = relayd_connect(rsock);
health_poll_exit();
if (ret < 0) {
ERR("Unable to reach lttng-relayd");
DBG3("Creating relayd stream socket from URI");
/* Check relayd version */
- ret = relayd_version_check(sock, RELAYD_VERSION_COMM_MAJOR,
- RELAYD_VERSION_COMM_MINOR, &minor);
+ ret = relayd_version_check(rsock);
if (ret < 0) {
ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
goto close_sock;
}
- session->major = RELAYD_VERSION_COMM_MAJOR;
- session->minor = minor;
} else if (uri->stype == LTTNG_STREAM_DATA) {
DBG3("Creating relayd data socket from URI");
} else {
goto close_sock;
}
- *relayd_sock = sock;
+ *relayd_sock = rsock;
return LTTNG_OK;
close_sock:
- if (sock) {
- (void) relayd_close(sock);
- }
+ /* The returned value is not useful since we are on an error path. */
+ (void) relayd_close(rsock);
free_sock:
- if (sock) {
- lttcomm_destroy_sock(sock);
- }
+ free(rsock);
error:
return ret;
}
struct consumer_socket *consumer_sock)
{
int ret;
- struct lttcomm_sock *sock = NULL;
+ struct lttcomm_relayd_sock *rsock = NULL;
/* Connect to relayd and make version check if uri is the control. */
- ret = create_connect_relayd(consumer, session->name, relayd_uri,
- &sock, session);
+ ret = create_connect_relayd(relayd_uri, &rsock);
if (ret != LTTNG_OK) {
- goto close_sock;
+ goto error;
}
+ assert(rsock);
/* If the control socket is connected, network session is ready */
if (relayd_uri->stype == LTTNG_STREAM_CONTROL) {
}
/* Send relayd socket to consumer. */
- ret = consumer_send_relayd_socket(consumer_sock, sock,
- consumer, relayd_uri->stype, session->id);
+ ret = consumer_send_relayd_socket(consumer_sock, rsock, consumer,
+ relayd_uri->stype, session->id);
if (ret < 0) {
ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
goto close_sock;
*/
close_sock:
- if (sock) {
- (void) relayd_close(sock);
- lttcomm_destroy_sock(sock);
- }
+ (void) relayd_close(rsock);
+ free(rsock);
+error:
if (ret != LTTNG_OK) {
/*
* On error, nullify the consumer sequence index so streams are not
*/
uatomic_set(&consumer->net_seq_index, -1);
}
-
return ret;
}
* On success return positive value. On error, negative value.
*/
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
- struct lttcomm_sock *sock, struct consumer_output *consumer,
+ struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
enum lttng_stream_type type, uint64_t session_id)
{
int ret;
struct lttcomm_consumer_msg msg;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
assert(consumer);
assert(consumer_sock);
msg.u.relayd_sock.net_index = consumer->net_seq_index;
msg.u.relayd_sock.type = type;
msg.u.relayd_sock.session_id = session_id;
- memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
+ memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
if (ret < 0) {
/* The above call will print a PERROR on error. */
- DBG("Error when sending relayd sockets on sock %d", sock->fd);
+ DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd);
goto error;
}
}
DBG3("Sending relayd socket file descriptor to consumer");
- ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
+ ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1);
if (ret < 0) {
goto error;
}
int consumer_send_channel(struct consumer_socket *sock,
struct lttcomm_consumer_msg *msg);
int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
- struct lttcomm_sock *sock, struct consumer_output *consumer,
+ struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
enum lttng_stream_type type, uint64_t session_id);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
/* Did a start command occured before the kern/ust session creation? */
unsigned int started;
- /* Procotol version to use with the relayd */
- uint32_t major;
- uint32_t minor;
};
/* Prototypes */
}
/* Metadata are always sent on the control socket. */
- outfd = relayd->control_sock.fd;
+ outfd = relayd->control_sock.sock.fd;
} else {
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
++stream->next_net_seq_num;
/* Set to go on data socket */
- outfd = relayd->data_sock.fd;
+ outfd = relayd->data_sock.sock.fd;
}
error:
char *path;
assert(stream);
- assert(stream->net_seq_idx == (uint64_t) -1ULL);
+
+ /* Don't create anything if this is set for streaming. */
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
+ ret = 0;
+ goto end;
+ }
ret = snprintf(full_path, sizeof(full_path), "%s/%s",
stream->chan->pathname, stream->name);
error_open:
free(path_name_id);
error:
+end:
return ret;
}
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
- unsigned int sessiond_id)
+ struct pollfd *consumer_sockpoll,
+ struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
+ assert(ctx);
+ assert(relayd_sock);
+
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
/* First send a status message before receiving the fds. */
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
- ret = lttcomm_create_sock(&relayd->control_sock);
+ lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
+ ret = lttcomm_create_sock(&relayd->control_sock.sock);
/* Immediately try to close the created socket if valid. */
- if (relayd->control_sock.fd >= 0) {
- if (close(relayd->control_sock.fd)) {
+ if (relayd->control_sock.sock.fd >= 0) {
+ if (close(relayd->control_sock.sock.fd)) {
PERROR("close relayd control socket");
}
}
}
/* Assign new file descriptor */
- relayd->control_sock.fd = fd;
+ relayd->control_sock.sock.fd = fd;
+ /* Assign version values. */
+ relayd->control_sock.major = relayd_sock->major;
+ relayd->control_sock.minor = relayd_sock->minor;
/*
* Create a session on the relayd and store the returned id. Lock the
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
- ret = lttcomm_create_sock(&relayd->data_sock);
+ lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
+ ret = lttcomm_create_sock(&relayd->data_sock.sock);
/* Immediately try to close the created socket if valid. */
- if (relayd->data_sock.fd >= 0) {
- if (close(relayd->data_sock.fd)) {
+ if (relayd->data_sock.sock.fd >= 0) {
+ if (close(relayd->data_sock.sock.fd)) {
PERROR("close relayd data socket");
}
}
}
/* Assign new file descriptor */
- relayd->data_sock.fd = fd;
+ relayd->data_sock.sock.fd = fd;
+ /* Assign version values. */
+ relayd->data_sock.major = relayd_sock->major;
+ relayd->data_sock.minor = relayd_sock->minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
pthread_mutex_t ctrl_sock_mutex;
/* Control socket. Command and metadata are passed over it */
- struct lttcomm_sock control_sock;
+ struct lttcomm_relayd_sock control_sock;
/*
* We don't need a mutex at this point since we only splice or write single
* large chunk of data with a header appended at the begining. Moreover,
* this socket is for now only used in a single thread.
*/
- struct lttcomm_sock data_sock;
+ struct lttcomm_relayd_sock data_sock;
struct lttng_ht_node_u64 node;
/* Session id on both sides for the sockets. */
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
unsigned int sessiond_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
/*
* Send command. Fill up the header and append the data.
*/
-static int send_command(struct lttcomm_sock *sock,
+static int send_command(struct lttcomm_relayd_sock *rsock,
enum lttcomm_relayd_command cmd, void *data, size_t size,
int flags)
{
memcpy(buf + sizeof(header), data, size);
}
- ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
+ ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
if (ret < 0) {
ret = -errno;
goto error;
* Receive reply data on socket. This MUST be call after send_command or else
* could result in unexpected behavior(s).
*/
-static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
+static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
{
int ret;
DBG3("Relayd waiting for reply of size %ld", size);
- ret = sock->ops->recvmsg(sock, data, size, 0);
+ ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
if (ret <= 0 || ret != size) {
if (ret == 0) {
/* Orderly shutdown. */
- DBG("Socket %d has performed an orderly shutdown", sock->fd);
+ DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
} else {
DBG("Receiving reply failed on sock %d for size %lu with ret %d",
- sock->fd, size, ret);
+ rsock->sock.fd, size, ret);
}
/* Always return -1 here and the caller can use errno. */
ret = -1;
* On success, return 0 else a negative value which is either an errno error or
* a lttng error code from the relayd.
*/
-int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
+int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id)
{
int ret;
struct lttcomm_relayd_status_session reply;
- assert(sock);
+ assert(rsock);
assert(session_id);
DBG("Relayd create session");
/* Send command */
- ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
+ ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
*
* On success return 0 else return ret_code negative value.
*/
-int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
+int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
const char *pathname, uint64_t *stream_id)
{
int ret;
struct lttcomm_relayd_status_stream reply;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
assert(channel_name);
assert(pathname);
strncpy(msg.pathname, pathname, sizeof(msg.pathname));
/* Send command */
- ret = send_command(sock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Waiting for reply */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
*
* Return 0 if compatible else negative value.
*/
-int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
- uint32_t minor, uint32_t *minor_to_use)
+int relayd_version_check(struct lttcomm_relayd_sock *rsock)
{
int ret;
struct lttcomm_relayd_version msg;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
- DBG("Relayd version check for major.minor %u.%u", major, minor);
+ DBG("Relayd version check for major.minor %u.%u", rsock->major,
+ rsock->minor);
/* Prepare network byte order before transmission. */
- msg.major = htobe32(major);
- msg.minor = htobe32(minor);
+ msg.major = htobe32(rsock->major);
+ msg.minor = htobe32(rsock->minor);
/* Send command */
- ret = send_command(sock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &msg, sizeof(msg));
+ ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
if (ret < 0) {
goto error;
}
* other. If the minor version differs, the lowest version is used by both
* sides.
*/
- if (msg.major != major) {
+ if (msg.major != rsock->major) {
/* Not compatible */
ret = -1;
DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
- msg.major, major);
+ msg.major, rsock->major);
goto error;
}
/*
- * After 2.1.0 release, for the 2.2 release, at this point will have to
- * check the minor version in order for the session daemon to know which
- * structure to use to communicate with the relayd. If the relayd's minor
- * version is higher, it will adapt to our version so we can continue to
- * use the latest relayd communication data structure.
+ * If the relayd's minor version is higher, it will adapt to our version so
+ * we can continue to use the latest relayd communication data structure.
+ * If the received minor version is higher, the relayd should adapt to us.
*/
- if (minor <= msg.minor) {
- *minor_to_use = minor;
- } else {
- *minor_to_use = msg.minor;
+ if (rsock->minor > msg.minor) {
+ rsock->minor = msg.minor;
}
/* Version number compatible */
DBG2("Relayd version is compatible, using protocol version %u.%u",
- major, *minor_to_use);
+ rsock->major, rsock->minor);
ret = 0;
error:
*
* On success return 0 else return ret_code negative value.
*/
-int relayd_send_metadata(struct lttcomm_sock *sock, size_t len)
+int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
{
int ret;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG("Relayd sending metadata of size %zu", len);
/* Send command */
- ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0);
+ ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
if (ret < 0) {
goto error;
}
}
/*
- * Connect to relay daemon with an allocated lttcomm_sock.
+ * Connect to relay daemon with an allocated lttcomm_relayd_sock.
*/
-int relayd_connect(struct lttcomm_sock *sock)
+int relayd_connect(struct lttcomm_relayd_sock *rsock)
{
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG3("Relayd connect ...");
- return sock->ops->connect(sock);
+ return rsock->sock.ops->connect(&rsock->sock);
}
/*
- * Close relayd socket with an allocated lttcomm_sock.
+ * Close relayd socket with an allocated lttcomm_relayd_sock.
*
* If no socket operations are found, simply return 0 meaning that everything
* is fine. Without operations, the socket can not possibly be opened or used.
* Return the close returned value. On error, a negative value is usually
* returned back from close(2).
*/
-int relayd_close(struct lttcomm_sock *sock)
+int relayd_close(struct lttcomm_relayd_sock *rsock)
{
int ret;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
/* An invalid fd is fine, return success. */
- if (sock->fd < 0) {
+ if (rsock->sock.fd < 0) {
ret = 0;
goto end;
}
- DBG3("Relayd closing socket %d", sock->fd);
+ DBG3("Relayd closing socket %d", rsock->sock.fd);
- if (sock->ops) {
- ret = sock->ops->close(sock);
+ if (rsock->sock.ops) {
+ ret = rsock->sock.ops->close(&rsock->sock);
} else {
/* Default call if no specific ops found. */
- ret = close(sock->fd);
+ ret = close(rsock->sock.fd);
if (ret < 0) {
PERROR("relayd_close default close");
}
/*
* Send data header structure to the relayd.
*/
-int relayd_send_data_hdr(struct lttcomm_sock *sock,
+int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
struct lttcomm_relayd_data_hdr *hdr, size_t size)
{
int ret;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
assert(hdr);
DBG3("Relayd sending data header of size %ld", size);
}
/* Only send data header. */
- ret = sock->ops->sendmsg(sock, hdr, size, 0);
+ ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
if (ret < 0) {
ret = -errno;
goto error;
/*
* Send close stream command to the relayd.
*/
-int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
uint64_t last_net_seq_num)
{
int ret;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG("Relayd closing stream id %" PRIu64, stream_id);
msg.last_net_seq_num = htobe64(last_net_seq_num);
/* Send command */
- ret = send_command(sock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
*
* Return 0 if NOT pending, 1 if so and a negative value on error.
*/
-int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
uint64_t last_net_seq_num)
{
int ret;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG("Relayd data pending for stream id %" PRIu64, stream_id);
msg.last_net_seq_num = htobe64(last_net_seq_num);
/* Send command */
- ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg,
+ ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
/*
* Check on the relayd side for a quiescent state on the control socket.
*/
-int relayd_quiescent_control(struct lttcomm_sock *sock,
+int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
uint64_t metadata_stream_id)
{
int ret;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG("Relayd checking quiescent control state");
msg.stream_id = htobe64(metadata_stream_id);
/* Send command */
- ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
/*
* Begin a data pending command for a specific session id.
*/
-int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
{
int ret;
struct lttcomm_relayd_begin_data_pending msg;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG("Relayd begin data pending");
msg.session_id = htobe64(id);
/* Send command */
- ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
* Return 0 on success and set is_data_inflight to 0 if no data is being
* streamed or 1 if it is the case.
*/
-int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
unsigned int *is_data_inflight)
{
int ret;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
- assert(sock);
+ assert(rsock);
DBG("Relayd end data pending");
msg.session_id = htobe64(id);
/* Send command */
- ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+ ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
/* Receive response */
- ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
if (ret < 0) {
goto error;
}
#include <common/sessiond-comm/relayd.h>
#include <common/sessiond-comm/sessiond-comm.h>
-int relayd_connect(struct lttcomm_sock *sock);
-int relayd_close(struct lttcomm_sock *sock);
-int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id);
-int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
+int relayd_connect(struct lttcomm_relayd_sock *sock);
+int relayd_close(struct lttcomm_relayd_sock *sock);
+int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id);
+int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name,
const char *pathname, uint64_t *stream_id);
-int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_send_close_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
uint64_t last_net_seq_num);
-int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
- uint32_t minor, uint32_t *minor_to_use);
-int relayd_start_data(struct lttcomm_sock *sock);
-int relayd_send_metadata(struct lttcomm_sock *sock, size_t len);
-int relayd_send_data_hdr(struct lttcomm_sock *sock,
+int relayd_version_check(struct lttcomm_relayd_sock *sock);
+int relayd_start_data(struct lttcomm_relayd_sock *sock);
+int relayd_send_metadata(struct lttcomm_relayd_sock *sock, size_t len);
+int relayd_send_data_hdr(struct lttcomm_relayd_sock *sock,
struct lttcomm_relayd_data_hdr *hdr, size_t size);
-int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
uint64_t last_net_seq_num);
-int relayd_quiescent_control(struct lttcomm_sock *sock,
+int relayd_quiescent_control(struct lttcomm_relayd_sock *sock,
uint64_t metadata_stream_id);
-int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id);
-int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+int relayd_begin_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id);
+int relayd_end_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id,
unsigned int *is_data_inflight);
#endif /* _RELAYD_H */
{
free(sock);
}
+
+/*
+ * Allocate and return a relayd socket object using a given URI to initialize
+ * it and the major/minor version of the supported protocol.
+ *
+ * On error, NULL is returned.
+ */
+struct lttcomm_relayd_sock *lttcomm_alloc_relayd_sock(struct lttng_uri *uri,
+ uint32_t major, uint32_t minor)
+{
+ int ret;
+ struct lttcomm_sock *tmp_sock = NULL;
+ struct lttcomm_relayd_sock *rsock = NULL;
+
+ assert(uri);
+
+ rsock = zmalloc(sizeof(*rsock));
+ if (!rsock) {
+ PERROR("zmalloc relayd sock");
+ goto error;
+ }
+
+ /* Allocate socket object from URI */
+ tmp_sock = lttcomm_alloc_sock_from_uri(uri);
+ if (tmp_sock == NULL) {
+ goto error_free;
+ }
+
+ /*
+ * Create socket object which basically sets the ops according to the
+ * socket protocol.
+ */
+ lttcomm_copy_sock(&rsock->sock, tmp_sock);
+ /* Temporary socket pointer not needed anymore. */
+ lttcomm_destroy_sock(tmp_sock);
+ ret = lttcomm_create_sock(&rsock->sock);
+ if (ret < 0) {
+ goto error_free;
+ }
+
+ rsock->major = major;
+ rsock->minor = minor;
+
+ return rsock;
+
+error_free:
+ free(rsock);
+error:
+ return NULL;
+}
const struct lttcomm_proto_ops *ops;
} LTTNG_PACKED;
+/*
+ * Relayd sock. Adds the protocol version to use for the communications with
+ * the relayd.
+ */
+struct lttcomm_relayd_sock {
+ struct lttcomm_sock sock;
+ uint32_t major;
+ uint32_t minor;
+} LTTNG_PACKED;
+
struct lttcomm_net_family {
int family;
int (*create) (struct lttcomm_sock *sock, int type, int proto);
uint64_t net_index;
enum lttng_stream_type type;
/* Open socket to the relayd */
- struct lttcomm_sock sock;
+ struct lttcomm_relayd_sock sock;
/* Tracing session id associated to the relayd. */
uint64_t session_id;
} LTTNG_PACKED relayd_sock;
extern void lttcomm_copy_sock(struct lttcomm_sock *dst,
struct lttcomm_sock *src);
+/* Relayd socket object. */
+extern struct lttcomm_relayd_sock *lttcomm_alloc_relayd_sock(
+ struct lttng_uri *uri, uint32_t major, uint32_t minor);
+
#endif /* _LTTNG_SESSIOND_COMM_H */