Use single callsite for send/recv ops. for consumer in sessiond
authorDavid Goulet <dgoulet@efficios.com>
Mon, 26 Aug 2013 17:11:39 +0000 (13:11 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Mon, 26 Aug 2013 19:35:15 +0000 (15:35 -0400)
Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/ust-consumer.c

index 39a1946380120db04419809e52a147a08b999726..ee6f3c3ff8e9ddb5c614d6b9c60d5f2d592e21ff 100644 (file)
 #include "ust-app.h"
 #include "utils.h"
 
+/*
+ * Send a data payload using a given consumer socket of size len.
+ *
+ * The consumer socket lock MUST be acquired before calling this since this
+ * function can change the fd value.
+ *
+ * Return 0 on success else a negative value on error.
+ */
+int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len)
+{
+       int fd;
+       ssize_t size;
+
+       assert(socket);
+       assert(socket->fd);
+       assert(msg);
+
+       /* Consumer socket is invalid. Stopping. */
+       fd = *socket->fd;
+       if (fd < 0) {
+               goto error;
+       }
+
+       size = lttcomm_send_unix_sock(fd, msg, len);
+       if (size < 0) {
+               /* The above call will print a PERROR on error. */
+               DBG("Error when sending data to consumer on sock %d", fd);
+               /*
+                * At this point, the socket is not usable anymore thus flagging it
+                * invalid and closing it.
+                */
+
+               /* This call will PERROR on error. */
+               (void) lttcomm_close_unix_sock(fd);
+               *socket->fd = -1;
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
+}
+
+/*
+ * Receive a data payload using a given consumer socket of size len.
+ *
+ * The consumer socket lock MUST be acquired before calling this since this
+ * function can change the fd value.
+ *
+ * Return 0 on success else a negative value on error.
+ */
+int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len)
+{
+       int fd;
+       ssize_t size;
+
+       assert(socket);
+       assert(socket->fd);
+       assert(msg);
+
+       /* Consumer socket is invalid. Stopping. */
+       fd = *socket->fd;
+       if (fd < 0) {
+               goto error;
+       }
+
+       size = lttcomm_recv_unix_sock(fd, msg, len);
+       if (size <= 0) {
+               /* The above call will print a PERROR on error. */
+               DBG("Error when receiving data from the consumer socket %d", fd);
+               /*
+                * At this point, the socket is not usable anymore thus flagging it
+                * invalid and closing it.
+                */
+
+               /* This call will PERROR on error. */
+               (void) lttcomm_close_unix_sock(fd);
+               *socket->fd = -1;
+               goto error;
+       }
+
+       return 0;
+
+error:
+       return -1;
+}
+
 /*
  * Receive a reply command status message from the consumer. Consumer socket
  * lock MUST be acquired before calling this function.
@@ -48,14 +136,8 @@ int consumer_recv_status_reply(struct consumer_socket *sock)
 
        assert(sock);
 
-       ret = lttcomm_recv_unix_sock(*sock->fd, &reply, sizeof(reply));
-       if (ret <= 0) {
-               if (ret == 0) {
-                       /* Orderly shutdown. Don't return 0 which means success. */
-                       ret = -1;
-               }
-               /* The above call will print a PERROR on error. */
-               DBG("Fail to receive status reply on sock %d", *sock->fd);
+       ret = consumer_socket_recv(sock, &reply, sizeof(reply));
+       if (ret < 0) {
                goto end;
        }
 
@@ -89,14 +171,8 @@ int consumer_recv_status_channel(struct consumer_socket *sock,
        assert(stream_count);
        assert(key);
 
-       ret = lttcomm_recv_unix_sock(*sock->fd, &reply, sizeof(reply));
-       if (ret <= 0) {
-               if (ret == 0) {
-                       /* Orderly shutdown. Don't return 0 which means success. */
-                       ret = -1;
-               }
-               /* The above call will print a PERROR on error. */
-               DBG("Fail to receive status reply on sock %d", *sock->fd);
+       ret = consumer_socket_recv(sock, &reply, sizeof(reply));
+       if (ret < 0) {
                goto end;
        }
 
@@ -129,22 +205,13 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd);
 
-       /* Bail out if consumer is disabled */
-       if (!consumer->enabled) {
-               ret = LTTNG_OK;
-               DBG3("Consumer is disabled");
-               goto error;
-       }
-
        msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
        msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
 
        pthread_mutex_lock(sock->lock);
-       ret = lttcomm_send_unix_sock(*sock->fd, &msg, sizeof(msg));
+       ret = consumer_socket_send(sock, &msg, sizeof(msg));
        if (ret < 0) {
-               /* Indicate that the consumer is probably closing at this point. */
-               DBG("send consumer destroy relayd command");
-               goto error_send;
+               goto error;
        }
 
        /* Don't check the return value. The caller will do it. */
@@ -152,9 +219,8 @@ int consumer_send_destroy_relayd(struct consumer_socket *sock,
 
        DBG2("Consumer send destroy relayd command done");
 
-error_send:
-       pthread_mutex_unlock(sock->lock);
 error:
+       pthread_mutex_unlock(sock->lock);
        return ret;
 }
 
@@ -676,11 +742,8 @@ int consumer_send_msg(struct consumer_socket *sock,
        assert(sock);
        assert(sock->fd);
 
-       ret = lttcomm_send_unix_sock(*sock->fd, msg,
-                       sizeof(struct lttcomm_consumer_msg));
+       ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg));
        if (ret < 0) {
-               /* The above call will print a PERROR on error. */
-               DBG("Error when sending consumer channel on sock %d", *sock->fd);
                goto error;
        }
 
@@ -702,16 +765,11 @@ int consumer_send_channel(struct consumer_socket *sock,
        assert(sock);
        assert(sock->fd);
 
-       ret = lttcomm_send_unix_sock(*sock->fd, msg,
-                       sizeof(struct lttcomm_consumer_msg));
+       ret = consumer_send_msg(sock, msg);
        if (ret < 0) {
-               /* The above call will print a PERROR on error. */
-               DBG("Error when sending consumer channel on sock %d", *sock->fd);
                goto error;
        }
 
-       ret = consumer_recv_status_reply(sock);
-
 error:
        return ret;
 }
@@ -860,16 +918,7 @@ int consumer_send_stream(struct consumer_socket *sock,
        assert(sock->fd);
        assert(fds);
 
-       /* Send on socket */
-       ret = lttcomm_send_unix_sock(*sock->fd, msg,
-                       sizeof(struct lttcomm_consumer_msg));
-       if (ret < 0) {
-               /* The above call will print a PERROR on error. */
-               DBG("Error when sending consumer stream on sock %d", *sock->fd);
-               goto error;
-       }
-
-       ret = consumer_recv_status_reply(sock);
+       ret = consumer_send_msg(sock, msg);
        if (ret < 0) {
                goto error;
        }
@@ -919,14 +968,7 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock,
        memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
 
        DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd);
-       ret = lttcomm_send_unix_sock(*consumer_sock->fd, &msg, sizeof(msg));
-       if (ret < 0) {
-               /* The above call will print a PERROR on error. */
-               DBG("Error when sending relayd sockets on sock %d", rsock->sock.fd);
-               goto error;
-       }
-
-       ret = consumer_recv_status_reply(consumer_sock);
+       ret = consumer_send_msg(consumer_sock, &msg);
        if (ret < 0) {
                goto error;
        }
@@ -1023,11 +1065,8 @@ int consumer_is_data_pending(uint64_t session_id,
                assert(socket->fd);
 
                pthread_mutex_lock(socket->lock);
-
-               ret = lttcomm_send_unix_sock(*socket->fd, &msg, sizeof(msg));
+               ret = consumer_socket_send(socket, &msg, sizeof(msg));
                if (ret < 0) {
-                       /* The above call will print a PERROR on error. */
-                       DBG("Error on consumer is data pending on sock %d", *socket->fd);
                        pthread_mutex_unlock(socket->lock);
                        goto error_unlock;
                }
@@ -1037,18 +1076,11 @@ int consumer_is_data_pending(uint64_t session_id,
                 * the reply status message.
                 */
 
-               ret = lttcomm_recv_unix_sock(*socket->fd, &ret_code, sizeof(ret_code));
-               if (ret <= 0) {
-                       if (ret == 0) {
-                               /* Orderly shutdown. Don't return 0 which means success. */
-                               ret = -1;
-                       }
-                       /* The above call will print a PERROR on error. */
-                       DBG("Error on recv consumer is data pending on sock %d", *socket->fd);
+               ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+               if (ret < 0) {
                        pthread_mutex_unlock(socket->lock);
                        goto error_unlock;
                }
-
                pthread_mutex_unlock(socket->lock);
 
                if (ret_code == 1) {
@@ -1194,7 +1226,7 @@ int consumer_push_metadata(struct consumer_socket *socket,
 
        DBG3("Consumer pushing metadata on sock %d of len %zu", *socket->fd, len);
 
-       ret = lttcomm_send_unix_sock(*socket->fd, metadata_str, len);
+       ret = consumer_socket_send(socket, metadata_str, len);
        if (ret < 0) {
                goto end;
        }
index 7ff3e5dcf70cbe06f28c371480ffd4391eec75a1..eaa115333584d88e6edcfa3785b46ec3addf78d3 100644 (file)
@@ -177,6 +177,10 @@ void consumer_destroy_socket(struct consumer_socket *sock);
 int consumer_copy_sockets(struct consumer_output *dst,
                struct consumer_output *src);
 void consumer_destroy_output_sockets(struct consumer_output *obj);
+int consumer_socket_send(struct consumer_socket *socket, void *msg,
+               size_t len);
+int consumer_socket_recv(struct consumer_socket *socket, void *msg,
+               size_t len);
 
 struct consumer_output *consumer_create_output(enum consumer_dst_type type);
 struct consumer_output *consumer_copy_output(struct consumer_output *obj);
index 2eb8cda253a096c89f03d0e7cfd6bd869ed675f2..f0e3ce92a192cbaad2e2b9f42a29f8a1e407eb42 100644 (file)
@@ -166,7 +166,7 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
 
        health_code_update();
 
-       ret = lttcomm_send_unix_sock(*socket->fd, &msg, sizeof(msg));
+       ret = consumer_socket_send(socket, &msg, sizeof(msg));
        if (ret < 0) {
                goto error;
        }
@@ -445,10 +445,8 @@ int ust_consumer_metadata_request(struct consumer_socket *socket)
        health_code_update();
 
        /* Wait for a metadata request */
-       ret = lttcomm_recv_unix_sock(*socket->fd, &request, sizeof(request));
-       if (ret <= 0) {
-               ERR("Consumer closed the metadata socket");
-               ret = -1;
+       ret = consumer_socket_recv(socket, &request, sizeof(request));
+       if (ret < 0) {
                goto end;
        }
 
This page took 0.031063 seconds and 4 git commands to generate.