From: Julien Desfossez Date: Thu, 28 Mar 2013 04:09:48 +0000 (-0400) Subject: Introduce the relayd socket object X-Git-Tag: v2.2.0-rc1~6 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=6151a90fe7fa3dea52c57771df9083e56de7a60b;p=lttng-tools.git Introduce the relayd socket object Used to store the version that the relayd socket supports so we can adapt the communication to the lowest version between the session daemon and relayd. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 995714faf..5f6f9cfd8 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1268,22 +1268,25 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, ret = -1; goto end; } - reply.major = htobe32(reply.major); - reply.minor = htobe32(reply.minor); - ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, - sizeof(struct lttcomm_relayd_version), 0); - if (ret < 0) { - ERR("Relay sending version"); - } /* Major versions must be the same */ if (reply.major != be32toh(msg.major)) { - DBG("Incompatible major versions, deleting session"); + DBG("Incompatible major versions (%u vs %u), deleting session", + reply.major, be32toh(msg.major)); relay_delete_session(cmd, streams_ht); ret = 0; goto end; } + reply.major = htobe32(reply.major); + reply.minor = htobe32(reply.minor); + ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_version), 0); + if (ret < 0) { + ERR("Relay sending version"); + } + +#if 0 cmd->session->major = reply.major; /* We adapt to the lowest compatible version */ if (reply.minor <= be32toh(msg.minor)) { @@ -1294,6 +1297,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, DBG("Version check done using protocol %u.%u", cmd->session->major, cmd->session->minor); +#endif end: return ret; diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index ca7e7dac1..687c06d42 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -478,24 +478,15 @@ error: * On success, the relayd_sock pointer is set to the created socket. * Else, it's stays untouched and a lttcomm error code is returned. */ -static int create_connect_relayd(struct consumer_output *output, - const char *session_name, struct lttng_uri *uri, - struct lttcomm_sock **relayd_sock, - struct ltt_session *session) +static int create_connect_relayd(struct lttng_uri *uri, + struct lttcomm_relayd_sock **relayd_sock) { int ret; - struct lttcomm_sock *sock; - uint32_t minor; + struct lttcomm_relayd_sock *rsock; - /* Create socket object from URI */ - sock = lttcomm_alloc_sock_from_uri(uri); - if (sock == NULL) { - ret = LTTNG_ERR_FATAL; - goto error; - } - - ret = lttcomm_create_sock(sock); - if (ret < 0) { + rsock = lttcomm_alloc_relayd_sock(uri, RELAYD_VERSION_COMM_MAJOR, + RELAYD_VERSION_COMM_MINOR); + if (!rsock) { ret = LTTNG_ERR_FATAL; goto error; } @@ -506,7 +497,7 @@ static int create_connect_relayd(struct consumer_output *output, * state to be in poll execution. */ health_poll_entry(); - ret = relayd_connect(sock); + ret = relayd_connect(rsock); health_poll_exit(); if (ret < 0) { ERR("Unable to reach lttng-relayd"); @@ -519,14 +510,11 @@ static int create_connect_relayd(struct consumer_output *output, DBG3("Creating relayd stream socket from URI"); /* Check relayd version */ - ret = relayd_version_check(sock, RELAYD_VERSION_COMM_MAJOR, - RELAYD_VERSION_COMM_MINOR, &minor); + ret = relayd_version_check(rsock); if (ret < 0) { ret = LTTNG_ERR_RELAYD_VERSION_FAIL; goto close_sock; } - session->major = RELAYD_VERSION_COMM_MAJOR; - session->minor = minor; } else if (uri->stype == LTTNG_STREAM_DATA) { DBG3("Creating relayd data socket from URI"); } else { @@ -536,18 +524,15 @@ static int create_connect_relayd(struct consumer_output *output, goto close_sock; } - *relayd_sock = sock; + *relayd_sock = rsock; return LTTNG_OK; close_sock: - if (sock) { - (void) relayd_close(sock); - } + /* The returned value is not useful since we are on an error path. */ + (void) relayd_close(rsock); free_sock: - if (sock) { - lttcomm_destroy_sock(sock); - } + free(rsock); error: return ret; } @@ -560,14 +545,14 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session, struct consumer_socket *consumer_sock) { int ret; - struct lttcomm_sock *sock = NULL; + struct lttcomm_relayd_sock *rsock = NULL; /* Connect to relayd and make version check if uri is the control. */ - ret = create_connect_relayd(consumer, session->name, relayd_uri, - &sock, session); + ret = create_connect_relayd(relayd_uri, &rsock); if (ret != LTTNG_OK) { - goto close_sock; + goto error; } + assert(rsock); /* If the control socket is connected, network session is ready */ if (relayd_uri->stype == LTTNG_STREAM_CONTROL) { @@ -587,8 +572,8 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session, } /* Send relayd socket to consumer. */ - ret = consumer_send_relayd_socket(consumer_sock, sock, - consumer, relayd_uri->stype, session->id); + ret = consumer_send_relayd_socket(consumer_sock, rsock, consumer, + relayd_uri->stype, session->id); if (ret < 0) { ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL; goto close_sock; @@ -609,11 +594,10 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session, */ close_sock: - if (sock) { - (void) relayd_close(sock); - lttcomm_destroy_sock(sock); - } + (void) relayd_close(rsock); + free(rsock); +error: if (ret != LTTNG_OK) { /* * On error, nullify the consumer sequence index so streams are not @@ -621,7 +605,6 @@ close_sock: */ uatomic_set(&consumer->net_seq_index, -1); } - return ret; } diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index f803f7b05..0cf43d2ca 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -805,14 +805,14 @@ error: * On success return positive value. On error, negative value. */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, + struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id) { int ret; struct lttcomm_consumer_msg msg; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); assert(consumer); assert(consumer_sock); @@ -831,13 +831,13 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, msg.u.relayd_sock.net_index = consumer->net_seq_index; msg.u.relayd_sock.type = type; msg.u.relayd_sock.session_id = session_id; - memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); + memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock)); DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd); ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg)); if (ret < 0) { /* The above call will print a PERROR on error. */ - DBG("Error when sending relayd sockets on sock %d", sock->fd); + DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd); goto error; } @@ -847,7 +847,7 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, } DBG3("Sending relayd socket file descriptor to consumer"); - ret = consumer_send_fds(consumer_sock, &sock->fd, 1); + ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1); if (ret < 0) { goto error; } diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 798eaa265..09f4545a0 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -171,7 +171,7 @@ int consumer_send_stream(struct consumer_socket *sock, int consumer_send_channel(struct consumer_socket *sock, struct lttcomm_consumer_msg *msg); int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, - struct lttcomm_sock *sock, struct consumer_output *consumer, + struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, enum lttng_stream_type type, uint64_t session_id); int consumer_send_destroy_relayd(struct consumer_socket *sock, struct consumer_output *consumer); diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index a56473783..fd23ab05e 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -86,9 +86,6 @@ struct ltt_session { /* Did a start command occured before the kern/ust session creation? */ unsigned int started; - /* Procotol version to use with the relayd */ - uint32_t major; - uint32_t minor; }; /* Prototypes */ diff --git a/src/common/consumer.c b/src/common/consumer.c index b6e440a48..a9070b1c9 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -783,7 +783,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, } /* Metadata are always sent on the control socket. */ - outfd = relayd->control_sock.fd; + outfd = relayd->control_sock.sock.fd; } else { /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); @@ -808,7 +808,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, ++stream->next_net_seq_num; /* Set to go on data socket */ - outfd = relayd->data_sock.fd; + outfd = relayd->data_sock.sock.fd; } error: @@ -1300,7 +1300,12 @@ int lttng_create_output_file(struct lttng_consumer_stream *stream) char *path; assert(stream); - assert(stream->net_seq_idx == (uint64_t) -1ULL); + + /* Don't create anything if this is set for streaming. */ + if (stream->net_seq_idx != (uint64_t) -1ULL) { + ret = 0; + goto end; + } ret = snprintf(full_path, sizeof(full_path), "%s/%s", stream->chan->pathname, stream->name); @@ -1337,6 +1342,7 @@ int lttng_create_output_file(struct lttng_consumer_stream *stream) error_open: free(path_name_id); error: +end: return ret; } @@ -3075,13 +3081,16 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, - unsigned int sessiond_id) + struct pollfd *consumer_sockpoll, + struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; + assert(ctx); + assert(relayd_sock); + DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); /* First send a status message before receiving the fds. */ @@ -3131,11 +3140,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->control_sock); + lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->control_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->control_sock.fd >= 0) { - if (close(relayd->control_sock.fd)) { + if (relayd->control_sock.sock.fd >= 0) { + if (close(relayd->control_sock.sock.fd)) { PERROR("close relayd control socket"); } } @@ -3145,7 +3154,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->control_sock.fd = fd; + relayd->control_sock.sock.fd = fd; + /* Assign version values. */ + relayd->control_sock.major = relayd_sock->major; + relayd->control_sock.minor = relayd_sock->minor; /* * Create a session on the relayd and store the returned id. Lock the @@ -3173,11 +3185,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->data_sock); + lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->data_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->data_sock.fd >= 0) { - if (close(relayd->data_sock.fd)) { + if (relayd->data_sock.sock.fd >= 0) { + if (close(relayd->data_sock.sock.fd)) { PERROR("close relayd data socket"); } } @@ -3187,7 +3199,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->data_sock.fd = fd; + relayd->data_sock.sock.fd = fd; + /* Assign version values. */ + relayd->data_sock.major = relayd_sock->major; + relayd->data_sock.minor = relayd_sock->minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type); diff --git a/src/common/consumer.h b/src/common/consumer.h index 6fd4e2e5c..0931250b0 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -269,14 +269,14 @@ struct consumer_relayd_sock_pair { pthread_mutex_t ctrl_sock_mutex; /* Control socket. Command and metadata are passed over it */ - struct lttcomm_sock control_sock; + struct lttcomm_relayd_sock control_sock; /* * We don't need a mutex at this point since we only splice or write single * large chunk of data with a header appended at the begining. Moreover, * this socket is for now only used in a single thread. */ - struct lttcomm_sock data_sock; + struct lttcomm_relayd_sock data_sock; struct lttng_ht_node_u64 node; /* Session id on both sides for the sockets. */ @@ -518,7 +518,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, + struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id); void consumer_flag_relayd_for_destroy( struct consumer_relayd_sock_pair *relayd); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index da54939f9..bed0933d1 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -32,7 +32,7 @@ /* * Send command. Fill up the header and append the data. */ -static int send_command(struct lttcomm_sock *sock, +static int send_command(struct lttcomm_relayd_sock *rsock, enum lttcomm_relayd_command cmd, void *data, size_t size, int flags) { @@ -65,7 +65,7 @@ static int send_command(struct lttcomm_sock *sock, memcpy(buf + sizeof(header), data, size); } - ret = sock->ops->sendmsg(sock, buf, buf_size, flags); + ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags); if (ret < 0) { ret = -errno; goto error; @@ -83,20 +83,20 @@ alloc_error: * Receive reply data on socket. This MUST be call after send_command or else * could result in unexpected behavior(s). */ -static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size) +static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size) { int ret; DBG3("Relayd waiting for reply of size %ld", size); - ret = sock->ops->recvmsg(sock, data, size, 0); + ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0); if (ret <= 0 || ret != size) { if (ret == 0) { /* Orderly shutdown. */ - DBG("Socket %d has performed an orderly shutdown", sock->fd); + DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd); } else { DBG("Receiving reply failed on sock %d for size %lu with ret %d", - sock->fd, size, ret); + rsock->sock.fd, size, ret); } /* Always return -1 here and the caller can use errno. */ ret = -1; @@ -114,24 +114,24 @@ error: * On success, return 0 else a negative value which is either an errno error or * a lttng error code from the relayd. */ -int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id) +int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id) { int ret; struct lttcomm_relayd_status_session reply; - assert(sock); + assert(rsock); assert(session_id); DBG("Relayd create session"); /* Send command */ - ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0); + ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } @@ -160,7 +160,7 @@ error: * * On success return 0 else return ret_code negative value. */ -int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, +int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name, const char *pathname, uint64_t *stream_id) { int ret; @@ -168,7 +168,7 @@ int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, struct lttcomm_relayd_status_stream reply; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); assert(channel_name); assert(pathname); @@ -178,13 +178,13 @@ int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, strncpy(msg.pathname, pathname, sizeof(msg.pathname)); /* Send command */ - ret = send_command(sock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0); + ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Waiting for reply */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } @@ -217,29 +217,29 @@ error: * * Return 0 if compatible else negative value. */ -int relayd_version_check(struct lttcomm_sock *sock, uint32_t major, - uint32_t minor, uint32_t *minor_to_use) +int relayd_version_check(struct lttcomm_relayd_sock *rsock) { int ret; struct lttcomm_relayd_version msg; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); - DBG("Relayd version check for major.minor %u.%u", major, minor); + DBG("Relayd version check for major.minor %u.%u", rsock->major, + rsock->minor); /* Prepare network byte order before transmission. */ - msg.major = htobe32(major); - msg.minor = htobe32(minor); + msg.major = htobe32(rsock->major); + msg.minor = htobe32(rsock->minor); /* Send command */ - ret = send_command(sock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0); + ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &msg, sizeof(msg)); + ret = recv_reply(rsock, (void *) &msg, sizeof(msg)); if (ret < 0) { goto error; } @@ -254,30 +254,26 @@ int relayd_version_check(struct lttcomm_sock *sock, uint32_t major, * other. If the minor version differs, the lowest version is used by both * sides. */ - if (msg.major != major) { + if (msg.major != rsock->major) { /* Not compatible */ ret = -1; DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)", - msg.major, major); + msg.major, rsock->major); goto error; } /* - * After 2.1.0 release, for the 2.2 release, at this point will have to - * check the minor version in order for the session daemon to know which - * structure to use to communicate with the relayd. If the relayd's minor - * version is higher, it will adapt to our version so we can continue to - * use the latest relayd communication data structure. + * If the relayd's minor version is higher, it will adapt to our version so + * we can continue to use the latest relayd communication data structure. + * If the received minor version is higher, the relayd should adapt to us. */ - if (minor <= msg.minor) { - *minor_to_use = minor; - } else { - *minor_to_use = msg.minor; + if (rsock->minor > msg.minor) { + rsock->minor = msg.minor; } /* Version number compatible */ DBG2("Relayd version is compatible, using protocol version %u.%u", - major, *minor_to_use); + rsock->major, rsock->minor); ret = 0; error: @@ -289,17 +285,17 @@ error: * * On success return 0 else return ret_code negative value. */ -int relayd_send_metadata(struct lttcomm_sock *sock, size_t len) +int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len) { int ret; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG("Relayd sending metadata of size %zu", len); /* Send command */ - ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0); + ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0); if (ret < 0) { goto error; } @@ -317,20 +313,20 @@ error: } /* - * Connect to relay daemon with an allocated lttcomm_sock. + * Connect to relay daemon with an allocated lttcomm_relayd_sock. */ -int relayd_connect(struct lttcomm_sock *sock) +int relayd_connect(struct lttcomm_relayd_sock *rsock) { /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG3("Relayd connect ..."); - return sock->ops->connect(sock); + return rsock->sock.ops->connect(&rsock->sock); } /* - * Close relayd socket with an allocated lttcomm_sock. + * Close relayd socket with an allocated lttcomm_relayd_sock. * * If no socket operations are found, simply return 0 meaning that everything * is fine. Without operations, the socket can not possibly be opened or used. @@ -342,26 +338,26 @@ int relayd_connect(struct lttcomm_sock *sock) * Return the close returned value. On error, a negative value is usually * returned back from close(2). */ -int relayd_close(struct lttcomm_sock *sock) +int relayd_close(struct lttcomm_relayd_sock *rsock) { int ret; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); /* An invalid fd is fine, return success. */ - if (sock->fd < 0) { + if (rsock->sock.fd < 0) { ret = 0; goto end; } - DBG3("Relayd closing socket %d", sock->fd); + DBG3("Relayd closing socket %d", rsock->sock.fd); - if (sock->ops) { - ret = sock->ops->close(sock); + if (rsock->sock.ops) { + ret = rsock->sock.ops->close(&rsock->sock); } else { /* Default call if no specific ops found. */ - ret = close(sock->fd); + ret = close(rsock->sock.fd); if (ret < 0) { PERROR("relayd_close default close"); } @@ -374,13 +370,13 @@ end: /* * Send data header structure to the relayd. */ -int relayd_send_data_hdr(struct lttcomm_sock *sock, +int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock, struct lttcomm_relayd_data_hdr *hdr, size_t size) { int ret; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); assert(hdr); DBG3("Relayd sending data header of size %ld", size); @@ -391,7 +387,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock, } /* Only send data header. */ - ret = sock->ops->sendmsg(sock, hdr, size, 0); + ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0); if (ret < 0) { ret = -errno; goto error; @@ -409,7 +405,7 @@ error: /* * Send close stream command to the relayd. */ -int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, +int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t last_net_seq_num) { int ret; @@ -417,7 +413,7 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, struct lttcomm_relayd_generic_reply reply; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG("Relayd closing stream id %" PRIu64, stream_id); @@ -425,13 +421,13 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, msg.last_net_seq_num = htobe64(last_net_seq_num); /* Send command */ - ret = send_command(sock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0); + ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } @@ -458,7 +454,7 @@ error: * * Return 0 if NOT pending, 1 if so and a negative value on error. */ -int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id, +int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t last_net_seq_num) { int ret; @@ -466,7 +462,7 @@ int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id, struct lttcomm_relayd_generic_reply reply; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG("Relayd data pending for stream id %" PRIu64, stream_id); @@ -474,14 +470,14 @@ int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id, msg.last_net_seq_num = htobe64(last_net_seq_num); /* Send command */ - ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg, + ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } @@ -506,7 +502,7 @@ error: /* * Check on the relayd side for a quiescent state on the control socket. */ -int relayd_quiescent_control(struct lttcomm_sock *sock, +int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock, uint64_t metadata_stream_id) { int ret; @@ -514,20 +510,20 @@ int relayd_quiescent_control(struct lttcomm_sock *sock, struct lttcomm_relayd_generic_reply reply; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG("Relayd checking quiescent control state"); msg.stream_id = htobe64(metadata_stream_id); /* Send command */ - ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0); + ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } @@ -551,27 +547,27 @@ error: /* * Begin a data pending command for a specific session id. */ -int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id) +int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id) { int ret; struct lttcomm_relayd_begin_data_pending msg; struct lttcomm_relayd_generic_reply reply; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG("Relayd begin data pending"); msg.session_id = htobe64(id); /* Send command */ - ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0); + ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } @@ -597,7 +593,7 @@ error: * Return 0 on success and set is_data_inflight to 0 if no data is being * streamed or 1 if it is the case. */ -int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id, +int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id, unsigned int *is_data_inflight) { int ret; @@ -605,20 +601,20 @@ int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id, struct lttcomm_relayd_generic_reply reply; /* Code flow error. Safety net. */ - assert(sock); + assert(rsock); DBG("Relayd end data pending"); msg.session_id = htobe64(id); /* Send command */ - ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0); + ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0); if (ret < 0) { goto error; } /* Receive response */ - ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); if (ret < 0) { goto error; } diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 22bfce829..bbb6f9385 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -23,25 +23,24 @@ #include #include -int relayd_connect(struct lttcomm_sock *sock); -int relayd_close(struct lttcomm_sock *sock); -int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id); -int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, +int relayd_connect(struct lttcomm_relayd_sock *sock); +int relayd_close(struct lttcomm_relayd_sock *sock); +int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id); +int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id); -int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, +int relayd_send_close_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, uint64_t last_net_seq_num); -int relayd_version_check(struct lttcomm_sock *sock, uint32_t major, - uint32_t minor, uint32_t *minor_to_use); -int relayd_start_data(struct lttcomm_sock *sock); -int relayd_send_metadata(struct lttcomm_sock *sock, size_t len); -int relayd_send_data_hdr(struct lttcomm_sock *sock, +int relayd_version_check(struct lttcomm_relayd_sock *sock); +int relayd_start_data(struct lttcomm_relayd_sock *sock); +int relayd_send_metadata(struct lttcomm_relayd_sock *sock, size_t len); +int relayd_send_data_hdr(struct lttcomm_relayd_sock *sock, struct lttcomm_relayd_data_hdr *hdr, size_t size); -int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id, +int relayd_data_pending(struct lttcomm_relayd_sock *sock, uint64_t stream_id, uint64_t last_net_seq_num); -int relayd_quiescent_control(struct lttcomm_sock *sock, +int relayd_quiescent_control(struct lttcomm_relayd_sock *sock, uint64_t metadata_stream_id); -int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id); -int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id, +int relayd_begin_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id); +int relayd_end_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id, unsigned int *is_data_inflight); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/sessiond-comm.c b/src/common/sessiond-comm/sessiond-comm.c index ef51e0a85..195c8b550 100644 --- a/src/common/sessiond-comm/sessiond-comm.c +++ b/src/common/sessiond-comm/sessiond-comm.c @@ -313,3 +313,53 @@ void lttcomm_destroy_sock(struct lttcomm_sock *sock) { free(sock); } + +/* + * Allocate and return a relayd socket object using a given URI to initialize + * it and the major/minor version of the supported protocol. + * + * On error, NULL is returned. + */ +struct lttcomm_relayd_sock *lttcomm_alloc_relayd_sock(struct lttng_uri *uri, + uint32_t major, uint32_t minor) +{ + int ret; + struct lttcomm_sock *tmp_sock = NULL; + struct lttcomm_relayd_sock *rsock = NULL; + + assert(uri); + + rsock = zmalloc(sizeof(*rsock)); + if (!rsock) { + PERROR("zmalloc relayd sock"); + goto error; + } + + /* Allocate socket object from URI */ + tmp_sock = lttcomm_alloc_sock_from_uri(uri); + if (tmp_sock == NULL) { + goto error_free; + } + + /* + * Create socket object which basically sets the ops according to the + * socket protocol. + */ + lttcomm_copy_sock(&rsock->sock, tmp_sock); + /* Temporary socket pointer not needed anymore. */ + lttcomm_destroy_sock(tmp_sock); + ret = lttcomm_create_sock(&rsock->sock); + if (ret < 0) { + goto error_free; + } + + rsock->major = major; + rsock->minor = minor; + + return rsock; + +error_free: + free(rsock); +error: + return NULL; +} diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 5980ddfca..ebb896b58 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -171,6 +171,16 @@ struct lttcomm_sock { const struct lttcomm_proto_ops *ops; } LTTNG_PACKED; +/* + * Relayd sock. Adds the protocol version to use for the communications with + * the relayd. + */ +struct lttcomm_relayd_sock { + struct lttcomm_sock sock; + uint32_t major; + uint32_t minor; +} LTTNG_PACKED; + struct lttcomm_net_family { int family; int (*create) (struct lttcomm_sock *sock, int type, int proto); @@ -303,7 +313,7 @@ struct lttcomm_consumer_msg { uint64_t net_index; enum lttng_stream_type type; /* Open socket to the relayd */ - struct lttcomm_sock sock; + struct lttcomm_relayd_sock sock; /* Tracing session id associated to the relayd. */ uint64_t session_id; } LTTNG_PACKED relayd_sock; @@ -425,4 +435,8 @@ extern struct lttcomm_sock *lttcomm_alloc_copy_sock(struct lttcomm_sock *src); extern void lttcomm_copy_sock(struct lttcomm_sock *dst, struct lttcomm_sock *src); +/* Relayd socket object. */ +extern struct lttcomm_relayd_sock *lttcomm_alloc_relayd_sock( + struct lttng_uri *uri, uint32_t major, uint32_t minor); + #endif /* _LTTNG_SESSIOND_COMM_H */