Introduce the relayd socket object
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 28 Mar 2013 04:09:48 +0000 (00:09 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Thu, 28 Mar 2013 16:55:25 +0000 (12:55 -0400)
Used to store the version that the relayd socket supports so we can
adapt the communication to the lowest version between the session daemon
and relayd.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/session.h
src/common/consumer.c
src/common/consumer.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/sessiond-comm.c
src/common/sessiond-comm/sessiond-comm.h

index 995714faf40b6f6d34b6ed31319834baf99e2b6d..5f6f9cfd8ed78a613972c5b3ed9b999c63c90ecc 100644 (file)
@@ -1268,22 +1268,25 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
                ret = -1;
                goto end;
        }
-       reply.major = htobe32(reply.major);
-       reply.minor = htobe32(reply.minor);
-       ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
-                       sizeof(struct lttcomm_relayd_version), 0);
-       if (ret < 0) {
-               ERR("Relay sending version");
-       }
 
        /* Major versions must be the same */
        if (reply.major != be32toh(msg.major)) {
-               DBG("Incompatible major versions, deleting session");
+               DBG("Incompatible major versions (%u vs %u), deleting session",
+                               reply.major, be32toh(msg.major));
                relay_delete_session(cmd, streams_ht);
                ret = 0;
                goto end;
        }
 
+       reply.major = htobe32(reply.major);
+       reply.minor = htobe32(reply.minor);
+       ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
+                       sizeof(struct lttcomm_relayd_version), 0);
+       if (ret < 0) {
+               ERR("Relay sending version");
+       }
+
+#if 0
        cmd->session->major = reply.major;
        /* We adapt to the lowest compatible version */
        if (reply.minor <= be32toh(msg.minor)) {
@@ -1294,6 +1297,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
 
        DBG("Version check done using protocol %u.%u", cmd->session->major,
                        cmd->session->minor);
+#endif
 
 end:
        return ret;
index ca7e7dac17cb4e0e52e275e043b51c749d552096..687c06d42d2b7a3bff15338cec62749a3647ab4b 100644 (file)
@@ -478,24 +478,15 @@ error:
  * On success, the relayd_sock pointer is set to the created socket.
  * Else, it's stays untouched and a lttcomm error code is returned.
  */
-static int create_connect_relayd(struct consumer_output *output,
-               const char *session_name, struct lttng_uri *uri,
-               struct lttcomm_sock **relayd_sock,
-               struct ltt_session *session)
+static int create_connect_relayd(struct lttng_uri *uri,
+               struct lttcomm_relayd_sock **relayd_sock)
 {
        int ret;
-       struct lttcomm_sock *sock;
-       uint32_t minor;
+       struct lttcomm_relayd_sock *rsock;
 
-       /* Create socket object from URI */
-       sock = lttcomm_alloc_sock_from_uri(uri);
-       if (sock == NULL) {
-               ret = LTTNG_ERR_FATAL;
-               goto error;
-       }
-
-       ret = lttcomm_create_sock(sock);
-       if (ret < 0) {
+       rsock = lttcomm_alloc_relayd_sock(uri, RELAYD_VERSION_COMM_MAJOR,
+                       RELAYD_VERSION_COMM_MINOR);
+       if (!rsock) {
                ret = LTTNG_ERR_FATAL;
                goto error;
        }
@@ -506,7 +497,7 @@ static int create_connect_relayd(struct consumer_output *output,
         * state to be in poll execution.
         */
        health_poll_entry();
-       ret = relayd_connect(sock);
+       ret = relayd_connect(rsock);
        health_poll_exit();
        if (ret < 0) {
                ERR("Unable to reach lttng-relayd");
@@ -519,14 +510,11 @@ static int create_connect_relayd(struct consumer_output *output,
                DBG3("Creating relayd stream socket from URI");
 
                /* Check relayd version */
-               ret = relayd_version_check(sock, RELAYD_VERSION_COMM_MAJOR,
-                               RELAYD_VERSION_COMM_MINOR, &minor);
+               ret = relayd_version_check(rsock);
                if (ret < 0) {
                        ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
                        goto close_sock;
                }
-               session->major = RELAYD_VERSION_COMM_MAJOR;
-               session->minor = minor;
        } else if (uri->stype == LTTNG_STREAM_DATA) {
                DBG3("Creating relayd data socket from URI");
        } else {
@@ -536,18 +524,15 @@ static int create_connect_relayd(struct consumer_output *output,
                goto close_sock;
        }
 
-       *relayd_sock = sock;
+       *relayd_sock = rsock;
 
        return LTTNG_OK;
 
 close_sock:
-       if (sock) {
-               (void) relayd_close(sock);
-       }
+       /* The returned value is not useful since we are on an error path. */
+       (void) relayd_close(rsock);
 free_sock:
-       if (sock) {
-               lttcomm_destroy_sock(sock);
-       }
+       free(rsock);
 error:
        return ret;
 }
@@ -560,14 +545,14 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
                struct consumer_socket *consumer_sock)
 {
        int ret;
-       struct lttcomm_sock *sock = NULL;
+       struct lttcomm_relayd_sock *rsock = NULL;
 
        /* Connect to relayd and make version check if uri is the control. */
-       ret = create_connect_relayd(consumer, session->name, relayd_uri,
-                       &sock, session);
+       ret = create_connect_relayd(relayd_uri, &rsock);
        if (ret != LTTNG_OK) {
-               goto close_sock;
+               goto error;
        }
+       assert(rsock);
 
        /* If the control socket is connected, network session is ready */
        if (relayd_uri->stype == LTTNG_STREAM_CONTROL) {
@@ -587,8 +572,8 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
        }
 
        /* Send relayd socket to consumer. */
-       ret = consumer_send_relayd_socket(consumer_sock, sock,
-                       consumer, relayd_uri->stype, session->id);
+       ret = consumer_send_relayd_socket(consumer_sock, rsock, consumer,
+                       relayd_uri->stype, session->id);
        if (ret < 0) {
                ret = LTTNG_ERR_ENABLE_CONSUMER_FAIL;
                goto close_sock;
@@ -609,11 +594,10 @@ static int send_consumer_relayd_socket(int domain, struct ltt_session *session,
         */
 
 close_sock:
-       if (sock) {
-               (void) relayd_close(sock);
-               lttcomm_destroy_sock(sock);
-       }
+       (void) relayd_close(rsock);
+       free(rsock);
 
+error:
        if (ret != LTTNG_OK) {
                /*
                 * On error, nullify the consumer sequence index so streams are not
@@ -621,7 +605,6 @@ close_sock:
                 */
                uatomic_set(&consumer->net_seq_index, -1);
        }
-
        return ret;
 }
 
index f803f7b0553c19c0458d387ae8b03a73a1969417..0cf43d2ca3da002ffb7d68aef558316c75b9006a 100644 (file)
@@ -805,14 +805,14 @@ error:
  * On success return positive value. On error, negative value.
  */
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
-               struct lttcomm_sock *sock, struct consumer_output *consumer,
+               struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
                enum lttng_stream_type type, uint64_t session_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
        assert(consumer);
        assert(consumer_sock);
 
@@ -831,13 +831,13 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
        msg.u.relayd_sock.net_index = consumer->net_seq_index;
        msg.u.relayd_sock.type = type;
        msg.u.relayd_sock.session_id = session_id;
-       memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
+       memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
 
        DBG3("Sending relayd sock info to consumer on %d", consumer_sock->fd);
        ret = lttcomm_send_unix_sock(consumer_sock->fd, &msg, sizeof(msg));
        if (ret < 0) {
                /* The above call will print a PERROR on error. */
-               DBG("Error when sending relayd sockets on sock %d", sock->fd);
+               DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd);
                goto error;
        }
 
@@ -847,7 +847,7 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
        }
 
        DBG3("Sending relayd socket file descriptor to consumer");
-       ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
+       ret = consumer_send_fds(consumer_sock, &rsock->sock.fd, 1);
        if (ret < 0) {
                goto error;
        }
index 798eaa265679f960e93f4b334ce7cfc93c43e85c..09f4545a03fdc1e40fb777aaff1c4f9cdf278b2d 100644 (file)
@@ -171,7 +171,7 @@ int consumer_send_stream(struct consumer_socket *sock,
 int consumer_send_channel(struct consumer_socket *sock,
                struct lttcomm_consumer_msg *msg);
 int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
-               struct lttcomm_sock *sock, struct consumer_output *consumer,
+               struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
                enum lttng_stream_type type, uint64_t session_id);
 int consumer_send_destroy_relayd(struct consumer_socket *sock,
                struct consumer_output *consumer);
index a56473783fb7d5f8f93ef11f50a40f628eaf35e5..fd23ab05ed90bb422779446d7dd3961bc862f1da 100644 (file)
@@ -86,9 +86,6 @@ struct ltt_session {
 
        /* Did a start command occured before the kern/ust session creation? */
        unsigned int started;
-       /* Procotol version to use with the relayd */
-       uint32_t major;
-       uint32_t minor;
 };
 
 /* Prototypes */
index b6e440a486fc5995f9a7caca0eac39631bf3340c..a9070b1c9bc3f976ef26a2fbbc183c7fd4b75343 100644 (file)
@@ -783,7 +783,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                }
 
                /* Metadata are always sent on the control socket. */
-               outfd = relayd->control_sock.fd;
+               outfd = relayd->control_sock.sock.fd;
        } else {
                /* Set header with stream information */
                data_hdr.stream_id = htobe64(stream->relayd_stream_id);
@@ -808,7 +808,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream,
                ++stream->next_net_seq_num;
 
                /* Set to go on data socket */
-               outfd = relayd->data_sock.fd;
+               outfd = relayd->data_sock.sock.fd;
        }
 
 error:
@@ -1300,7 +1300,12 @@ int lttng_create_output_file(struct lttng_consumer_stream *stream)
        char *path;
 
        assert(stream);
-       assert(stream->net_seq_idx == (uint64_t) -1ULL);
+
+       /* Don't create anything if this is set for streaming. */
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
+               ret = 0;
+               goto end;
+       }
 
        ret = snprintf(full_path, sizeof(full_path), "%s/%s",
                        stream->chan->pathname, stream->name);
@@ -1337,6 +1342,7 @@ int lttng_create_output_file(struct lttng_consumer_stream *stream)
 error_open:
        free(path_name_id);
 error:
+end:
        return ret;
 }
 
@@ -3075,13 +3081,16 @@ void lttng_consumer_init(void)
  */
 int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
-               struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
-               unsigned int sessiond_id)
+               struct pollfd *consumer_sockpoll,
+               struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttng_error_code ret_code = LTTNG_OK;
        struct consumer_relayd_sock_pair *relayd;
 
+       assert(ctx);
+       assert(relayd_sock);
+
        DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
 
        /* First send a status message before receiving the fds. */
@@ -3131,11 +3140,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        switch (sock_type) {
        case LTTNG_STREAM_CONTROL:
                /* Copy received lttcomm socket */
-               lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
-               ret = lttcomm_create_sock(&relayd->control_sock);
+               lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
+               ret = lttcomm_create_sock(&relayd->control_sock.sock);
                /* Immediately try to close the created socket if valid. */
-               if (relayd->control_sock.fd >= 0) {
-                       if (close(relayd->control_sock.fd)) {
+               if (relayd->control_sock.sock.fd >= 0) {
+                       if (close(relayd->control_sock.sock.fd)) {
                                PERROR("close relayd control socket");
                        }
                }
@@ -3145,7 +3154,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Assign new file descriptor */
-               relayd->control_sock.fd = fd;
+               relayd->control_sock.sock.fd = fd;
+               /* Assign version values. */
+               relayd->control_sock.major = relayd_sock->major;
+               relayd->control_sock.minor = relayd_sock->minor;
 
                /*
                 * Create a session on the relayd and store the returned id. Lock the
@@ -3173,11 +3185,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
-               lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
-               ret = lttcomm_create_sock(&relayd->data_sock);
+               lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
+               ret = lttcomm_create_sock(&relayd->data_sock.sock);
                /* Immediately try to close the created socket if valid. */
-               if (relayd->data_sock.fd >= 0) {
-                       if (close(relayd->data_sock.fd)) {
+               if (relayd->data_sock.sock.fd >= 0) {
+                       if (close(relayd->data_sock.sock.fd)) {
                                PERROR("close relayd data socket");
                        }
                }
@@ -3187,7 +3199,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                }
 
                /* Assign new file descriptor */
-               relayd->data_sock.fd = fd;
+               relayd->data_sock.sock.fd = fd;
+               /* Assign version values. */
+               relayd->data_sock.major = relayd_sock->major;
+               relayd->data_sock.minor = relayd_sock->minor;
                break;
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
index 6fd4e2e5ce5ae08160475b3d6ba02d4069ca95df..0931250b06fafc95e1d6b4648b5d9f3241b3f58e 100644 (file)
@@ -269,14 +269,14 @@ struct consumer_relayd_sock_pair {
        pthread_mutex_t ctrl_sock_mutex;
 
        /* Control socket. Command and metadata are passed over it */
-       struct lttcomm_sock control_sock;
+       struct lttcomm_relayd_sock control_sock;
 
        /*
         * We don't need a mutex at this point since we only splice or write single
         * large chunk of data with a header appended at the begining. Moreover,
         * this socket is for now only used in a single thread.
         */
-       struct lttcomm_sock data_sock;
+       struct lttcomm_relayd_sock data_sock;
        struct lttng_ht_node_u64 node;
 
        /* Session id on both sides for the sockets. */
@@ -518,7 +518,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
-               struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+               struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
                unsigned int sessiond_id);
 void consumer_flag_relayd_for_destroy(
                struct consumer_relayd_sock_pair *relayd);
index da54939f9a28153e4ae343d281ec1352e07b8ef9..bed0933d1102ae4d13454ba3d152fd86d831f377 100644 (file)
@@ -32,7 +32,7 @@
 /*
  * Send command. Fill up the header and append the data.
  */
-static int send_command(struct lttcomm_sock *sock,
+static int send_command(struct lttcomm_relayd_sock *rsock,
                enum lttcomm_relayd_command cmd, void *data, size_t size,
                int flags)
 {
@@ -65,7 +65,7 @@ static int send_command(struct lttcomm_sock *sock,
                memcpy(buf + sizeof(header), data, size);
        }
 
-       ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
+       ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
        if (ret < 0) {
                ret = -errno;
                goto error;
@@ -83,20 +83,20 @@ alloc_error:
  * Receive reply data on socket. This MUST be call after send_command or else
  * could result in unexpected behavior(s).
  */
-static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
+static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
 {
        int ret;
 
        DBG3("Relayd waiting for reply of size %ld", size);
 
-       ret = sock->ops->recvmsg(sock, data, size, 0);
+       ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
        if (ret <= 0 || ret != size) {
                if (ret == 0) {
                        /* Orderly shutdown. */
-                       DBG("Socket %d has performed an orderly shutdown", sock->fd);
+                       DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
                } else {
                        DBG("Receiving reply failed on sock %d for size %lu with ret %d",
-                                       sock->fd, size, ret);
+                                       rsock->sock.fd, size, ret);
                }
                /* Always return -1 here and the caller can use errno. */
                ret = -1;
@@ -114,24 +114,24 @@ error:
  * On success, return 0 else a negative value which is either an errno error or
  * a lttng error code from the relayd.
  */
-int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
+int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id)
 {
        int ret;
        struct lttcomm_relayd_status_session reply;
 
-       assert(sock);
+       assert(rsock);
        assert(session_id);
 
        DBG("Relayd create session");
 
        /* Send command */
-       ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
+       ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -160,7 +160,7 @@ error:
  *
  * On success return 0 else return ret_code negative value.
  */
-int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
+int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
                const char *pathname, uint64_t *stream_id)
 {
        int ret;
@@ -168,7 +168,7 @@ int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
        struct lttcomm_relayd_status_stream reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
        assert(channel_name);
        assert(pathname);
 
@@ -178,13 +178,13 @@ int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
        strncpy(msg.pathname, pathname, sizeof(msg.pathname));
 
        /* Send command */
-       ret = send_command(sock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Waiting for reply */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -217,29 +217,29 @@ error:
  *
  * Return 0 if compatible else negative value.
  */
-int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
-               uint32_t minor, uint32_t *minor_to_use)
+int relayd_version_check(struct lttcomm_relayd_sock *rsock)
 {
        int ret;
        struct lttcomm_relayd_version msg;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
-       DBG("Relayd version check for major.minor %u.%u", major, minor);
+       DBG("Relayd version check for major.minor %u.%u", rsock->major,
+                       rsock->minor);
 
        /* Prepare network byte order before transmission. */
-       msg.major = htobe32(major);
-       msg.minor = htobe32(minor);
+       msg.major = htobe32(rsock->major);
+       msg.minor = htobe32(rsock->minor);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &msg, sizeof(msg));
+       ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
        if (ret < 0) {
                goto error;
        }
@@ -254,30 +254,26 @@ int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
         * other. If the minor version differs, the lowest version is used by both
         * sides.
         */
-       if (msg.major != major) {
+       if (msg.major != rsock->major) {
                /* Not compatible */
                ret = -1;
                DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
-                               msg.major, major);
+                               msg.major, rsock->major);
                goto error;
        }
 
        /*
-        * After 2.1.0 release, for the 2.2 release, at this point will have to
-        * check the minor version in order for the session daemon to know which
-        * structure to use to communicate with the relayd. If the relayd's minor
-        * version is higher, it will adapt to our version so we can continue to
-        * use the latest relayd communication data structure.
+        * If the relayd's minor version is higher, it will adapt to our version so
+        * we can continue to use the latest relayd communication data structure.
+        * If the received minor version is higher, the relayd should adapt to us.
         */
-       if (minor <= msg.minor) {
-               *minor_to_use = minor;
-       } else {
-               *minor_to_use = msg.minor;
+       if (rsock->minor > msg.minor) {
+               rsock->minor = msg.minor;
        }
 
        /* Version number compatible */
        DBG2("Relayd version is compatible, using protocol version %u.%u",
-                       major, *minor_to_use);
+                       rsock->major, rsock->minor);
        ret = 0;
 
 error:
@@ -289,17 +285,17 @@ error:
  *
  * On success return 0 else return ret_code negative value.
  */
-int relayd_send_metadata(struct lttcomm_sock *sock, size_t len)
+int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
 {
        int ret;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd sending metadata of size %zu", len);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0);
+       ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
        if (ret < 0) {
                goto error;
        }
@@ -317,20 +313,20 @@ error:
 }
 
 /*
- * Connect to relay daemon with an allocated lttcomm_sock.
+ * Connect to relay daemon with an allocated lttcomm_relayd_sock.
  */
-int relayd_connect(struct lttcomm_sock *sock)
+int relayd_connect(struct lttcomm_relayd_sock *rsock)
 {
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG3("Relayd connect ...");
 
-       return sock->ops->connect(sock);
+       return rsock->sock.ops->connect(&rsock->sock);
 }
 
 /*
- * Close relayd socket with an allocated lttcomm_sock.
+ * Close relayd socket with an allocated lttcomm_relayd_sock.
  *
  * If no socket operations are found, simply return 0 meaning that everything
  * is fine. Without operations, the socket can not possibly be opened or used.
@@ -342,26 +338,26 @@ int relayd_connect(struct lttcomm_sock *sock)
  * Return the close returned value. On error, a negative value is usually
  * returned back from close(2).
  */
-int relayd_close(struct lttcomm_sock *sock)
+int relayd_close(struct lttcomm_relayd_sock *rsock)
 {
        int ret;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        /* An invalid fd is fine, return success. */
-       if (sock->fd < 0) {
+       if (rsock->sock.fd < 0) {
                ret = 0;
                goto end;
        }
 
-       DBG3("Relayd closing socket %d", sock->fd);
+       DBG3("Relayd closing socket %d", rsock->sock.fd);
 
-       if (sock->ops) {
-               ret = sock->ops->close(sock);
+       if (rsock->sock.ops) {
+               ret = rsock->sock.ops->close(&rsock->sock);
        } else {
                /* Default call if no specific ops found. */
-               ret = close(sock->fd);
+               ret = close(rsock->sock.fd);
                if (ret < 0) {
                        PERROR("relayd_close default close");
                }
@@ -374,13 +370,13 @@ end:
 /*
  * Send data header structure to the relayd.
  */
-int relayd_send_data_hdr(struct lttcomm_sock *sock,
+int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
                struct lttcomm_relayd_data_hdr *hdr, size_t size)
 {
        int ret;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
        assert(hdr);
 
        DBG3("Relayd sending data header of size %ld", size);
@@ -391,7 +387,7 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock,
        }
 
        /* Only send data header. */
-       ret = sock->ops->sendmsg(sock, hdr, size, 0);
+       ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
        if (ret < 0) {
                ret = -errno;
                goto error;
@@ -409,7 +405,7 @@ error:
 /*
  * Send close stream command to the relayd.
  */
-int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
                uint64_t last_net_seq_num)
 {
        int ret;
@@ -417,7 +413,7 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd closing stream id %" PRIu64, stream_id);
 
@@ -425,13 +421,13 @@ int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
        msg.last_net_seq_num = htobe64(last_net_seq_num);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -458,7 +454,7 @@ error:
  *
  * Return 0 if NOT pending, 1 if so and a negative value on error.
  */
-int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
                uint64_t last_net_seq_num)
 {
        int ret;
@@ -466,7 +462,7 @@ int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd data pending for stream id %" PRIu64, stream_id);
 
@@ -474,14 +470,14 @@ int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
        msg.last_net_seq_num = htobe64(last_net_seq_num);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_DATA_PENDING, (void *) &msg,
+       ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
                        sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -506,7 +502,7 @@ error:
 /*
  * Check on the relayd side for a quiescent state on the control socket.
  */
-int relayd_quiescent_control(struct lttcomm_sock *sock,
+int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
                uint64_t metadata_stream_id)
 {
        int ret;
@@ -514,20 +510,20 @@ int relayd_quiescent_control(struct lttcomm_sock *sock,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd checking quiescent control state");
 
        msg.stream_id = htobe64(metadata_stream_id);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -551,27 +547,27 @@ error:
 /*
  * Begin a data pending command for a specific session id.
  */
-int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
 {
        int ret;
        struct lttcomm_relayd_begin_data_pending msg;
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd begin data pending");
 
        msg.session_id = htobe64(id);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
@@ -597,7 +593,7 @@ error:
  * Return 0 on success and set is_data_inflight to 0 if no data is being
  * streamed or 1 if it is the case.
  */
-int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
                unsigned int *is_data_inflight)
 {
        int ret;
@@ -605,20 +601,20 @@ int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
        struct lttcomm_relayd_generic_reply reply;
 
        /* Code flow error. Safety net. */
-       assert(sock);
+       assert(rsock);
 
        DBG("Relayd end data pending");
 
        msg.session_id = htobe64(id);
 
        /* Send command */
-       ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+       ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
        if (ret < 0) {
                goto error;
        }
 
        /* Receive response */
-       ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
index 22bfce82901d3956fa53c55c8a228fbc09d8ff1c..bbb6f9385e72b962d107ab272585827ce4179bb3 100644 (file)
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 
-int relayd_connect(struct lttcomm_sock *sock);
-int relayd_close(struct lttcomm_sock *sock);
-int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id);
-int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
+int relayd_connect(struct lttcomm_relayd_sock *sock);
+int relayd_close(struct lttcomm_relayd_sock *sock);
+int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id);
+int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name,
                const char *pathname, uint64_t *stream_id);
-int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_send_close_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
                uint64_t last_net_seq_num);
-int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
-               uint32_t minor, uint32_t *minor_to_use);
-int relayd_start_data(struct lttcomm_sock *sock);
-int relayd_send_metadata(struct lttcomm_sock *sock, size_t len);
-int relayd_send_data_hdr(struct lttcomm_sock *sock,
+int relayd_version_check(struct lttcomm_relayd_sock *sock);
+int relayd_start_data(struct lttcomm_relayd_sock *sock);
+int relayd_send_metadata(struct lttcomm_relayd_sock *sock, size_t len);
+int relayd_send_data_hdr(struct lttcomm_relayd_sock *sock,
                struct lttcomm_relayd_data_hdr *hdr, size_t size);
-int relayd_data_pending(struct lttcomm_sock *sock, uint64_t stream_id,
+int relayd_data_pending(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
                uint64_t last_net_seq_num);
-int relayd_quiescent_control(struct lttcomm_sock *sock,
+int relayd_quiescent_control(struct lttcomm_relayd_sock *sock,
                uint64_t metadata_stream_id);
-int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id);
-int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+int relayd_begin_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id);
+int relayd_end_data_pending(struct lttcomm_relayd_sock *sock, uint64_t id,
                unsigned int *is_data_inflight);
 
 #endif /* _RELAYD_H */
index ef51e0a85f697ecc5b1902a16294e2c6b4dc0c17..195c8b5506721f64bf9bd204f7db8fa5bda86198 100644 (file)
@@ -313,3 +313,53 @@ void lttcomm_destroy_sock(struct lttcomm_sock *sock)
 {
        free(sock);
 }
+
+/*
+ * Allocate and return a relayd socket object using a given URI to initialize
+ * it and the major/minor version of the supported protocol.
+ *
+ * On error, NULL is returned.
+ */
+struct lttcomm_relayd_sock *lttcomm_alloc_relayd_sock(struct lttng_uri *uri,
+               uint32_t major, uint32_t minor)
+{
+       int ret;
+       struct lttcomm_sock *tmp_sock = NULL;
+       struct lttcomm_relayd_sock *rsock = NULL;
+
+       assert(uri);
+
+       rsock = zmalloc(sizeof(*rsock));
+       if (!rsock) {
+               PERROR("zmalloc relayd sock");
+               goto error;
+       }
+
+       /* Allocate socket object from URI */
+       tmp_sock = lttcomm_alloc_sock_from_uri(uri);
+       if (tmp_sock == NULL) {
+               goto error_free;
+       }
+
+       /*
+        * Create socket object which basically sets the ops according to the
+        * socket protocol.
+        */
+       lttcomm_copy_sock(&rsock->sock, tmp_sock);
+       /* Temporary socket pointer not needed anymore. */
+       lttcomm_destroy_sock(tmp_sock);
+       ret = lttcomm_create_sock(&rsock->sock);
+       if (ret < 0) {
+               goto error_free;
+       }
+
+       rsock->major = major;
+       rsock->minor = minor;
+
+       return rsock;
+
+error_free:
+       free(rsock);
+error:
+       return NULL;
+}
index 5980ddfcaccb0ea47e6854723262f7e4338e8fbd..ebb896b585e6fb759a9ea0c85f32c801f4d94ec2 100644 (file)
@@ -171,6 +171,16 @@ struct lttcomm_sock {
        const struct lttcomm_proto_ops *ops;
 } LTTNG_PACKED;
 
+/*
+ * Relayd sock. Adds the protocol version to use for the communications with
+ * the relayd.
+ */
+struct lttcomm_relayd_sock {
+       struct lttcomm_sock sock;
+       uint32_t major;
+       uint32_t minor;
+} LTTNG_PACKED;
+
 struct lttcomm_net_family {
        int family;
        int (*create) (struct lttcomm_sock *sock, int type, int proto);
@@ -303,7 +313,7 @@ struct lttcomm_consumer_msg {
                        uint64_t net_index;
                        enum lttng_stream_type type;
                        /* Open socket to the relayd */
-                       struct lttcomm_sock sock;
+                       struct lttcomm_relayd_sock sock;
                        /* Tracing session id associated to the relayd. */
                        uint64_t session_id;
                } LTTNG_PACKED relayd_sock;
@@ -425,4 +435,8 @@ extern struct lttcomm_sock *lttcomm_alloc_copy_sock(struct lttcomm_sock *src);
 extern void lttcomm_copy_sock(struct lttcomm_sock *dst,
                struct lttcomm_sock *src);
 
+/* Relayd socket object. */
+extern struct lttcomm_relayd_sock *lttcomm_alloc_relayd_sock(
+               struct lttng_uri *uri, uint32_t major, uint32_t minor);
+
 #endif /* _LTTNG_SESSIOND_COMM_H */
This page took 0.047031 seconds and 4 git commands to generate.