refactor/enhance libustcomm
authorPierre-Marc Fournier <pierre-marc.fournier@polymtl.ca>
Wed, 10 Feb 2010 22:15:50 +0000 (17:15 -0500)
committerPierre-Marc Fournier <pierre-marc.fournier@polymtl.ca>
Mon, 15 Feb 2010 05:40:36 +0000 (00:40 -0500)
- regroup sending and receiving code
- add \0 at end of messages
- add dynamic allocation of received message buffers
- remove message length limit

include/share.h
libustcomm/ustcomm.c

index f674f31f590719915398d56d464e7684b7ddaf14..1f2195ee4b1311cfb6218c0331bfecf25f33d641 100644 (file)
@@ -1,13 +1,19 @@
 #ifndef UST_SHARE_H
 #define UST_SHARE_H
 
+/* write() */
 #include <unistd.h>
+
+/* send() */
+#include <sys/types.h>
+#include <sys/socket.h>
+
 #include <errno.h>
 
 /* This write is patient because it restarts if it was incomplete.
  */
 
-static inline ssize_t patient_write(int fd, const void *buf, size_t count)
+static __inline__ ssize_t patient_write(int fd, const void *buf, size_t count)
 {
        const char *bufc = (const char *) buf;
        int result;
@@ -31,4 +37,28 @@ static inline ssize_t patient_write(int fd, const void *buf, size_t count)
        return bufc-(const char *)buf;
 }
 
+static __inline__ ssize_t patient_send(int fd, const void *buf, size_t count, int flags)
+{
+       const char *bufc = (const char *) buf;
+       int result;
+
+       for(;;) {
+               result = send(fd, bufc, count, flags);
+               if(result == -1 && errno == EINTR) {
+                       continue;
+               }
+               if(result <= 0) {
+                       return result;
+               }
+               count -= result;
+               bufc += result;
+
+               if(count == 0) {
+                       break;
+               }
+       }
+
+       return bufc-(const char *)buf;
+}
+
 #endif /* UST_SHARE_H */
index 4d6a6509dba71ac4a1333034de403930437f25b6..e1a2d5b9554425283b04a283d93403a5d846ff96 100644 (file)
@@ -32,6 +32,7 @@
 
 #include "ustcomm.h"
 #include "usterr.h"
+#include "share.h"
 
 #define UNIX_PATH_MAX 108
 
@@ -74,19 +75,33 @@ int pid_is_online(pid_t pid) {
        return 1;
 }
 
+/* Send a message
+ *
+ * @fd: file descriptor to send to
+ * @msg: a null-terminated string containing the message to send
+ *
+ * Return value:
+ * -1: error
+ * 0: connection closed
+ * 1: success
+ */
+
 static int send_message_fd(int fd, const char *msg)
 {
        int result;
 
-       result = send(fd, msg, strlen(msg), MSG_NOSIGNAL);
+       /* Send including the final \0 */
+       result = patient_send(fd, msg, strlen(msg)+1, MSG_NOSIGNAL);
        if(result == -1) {
-               PERROR("send");
+               if(errno != EPIPE)
+                       PERROR("send");
                return -1;
        }
        else if(result == 0) {
                return 0;
        }
 
+       DBG("sent message \"%s\"", msg);
        return 1;
 }
 
@@ -146,26 +161,56 @@ int ustcomm_request_consumer(pid_t pid, const char *channel)
  * returns -1 to indicate an error
  */
 
-static int recv_message_fd(int fd, char **msg, struct ustcomm_source *src)
+#define RECV_INCREMENT 1
+
+static int recv_message_fd(int fd, char **msg)
 {
        int result;
+       int buf_alloc_size = 0;
+       char *buf = NULL;
+       int buf_used_size = 0;
 
-       *msg = (char *) malloc(MSG_MAX+1);
+       buf = malloc(10);
+       buf_alloc_size = 16;
 
-       result = recv(fd, *msg, MSG_MAX, 0);
-       if(result == -1) {
-               PERROR("recv");
-               return -1;
-       }
+       for(;;) {
+               if(buf_used_size + RECV_INCREMENT > buf_alloc_size) {
+                       buf_alloc_size *= 2;
+                       buf = (char *) realloc(buf, buf_alloc_size);
+               }
 
-       (*msg)[result] = '\0';
-       
-       DBG("ustcomm_app_recv_message: result is %d, message is %s", result, (*msg));
+               /* FIXME: this is really inefficient; but with count>1 we would
+                * need a buffering mechanism */
+               result = recv(fd, buf+buf_used_size, RECV_INCREMENT, 0);
+               if(result == -1) {
+                       free(buf);
+                       if(errno != ECONNRESET)
+                               PERROR("recv");
+                       return -1;
+               }
+               if(result == 0) {
+                       if(buf_used_size)
+                               goto ret;
+                       else {
+                               free(buf);
+                               return 0;
+                       }
+               }
+
+
+               buf_used_size += result;
+
+               if(buf[buf_used_size-1] == 0) {
+                       goto ret;
+               }
+       }
 
-       if(src)
-               src->fd = fd;
+ret:
+       *msg = buf;
+       DBG("received message \"%s\"", buf);
 
        return 1;
+
 }
 
 int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_source *src)
@@ -270,7 +315,10 @@ int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustco
 
                for(idx=1; idx<n_fds; idx++) {
                        if(fds[idx].revents) {
-                               retval = recv_message_fd(fds[idx].fd, msg, src);
+                               retval = recv_message_fd(fds[idx].fd, msg);
+                               if(src)
+                                       src->fd = fds[idx].fd;
+
                                if(**msg == 0) {
                                        /* connection finished */
                                        close(fds[idx].fd);
@@ -393,29 +441,22 @@ int ustcomm_send_request(struct ustcomm_connection *conn, const char *req, char
 {
        int result;
 
-       result = send(conn->fd, req, strlen(req), MSG_NOSIGNAL);
-       if(result == -1) {
-               if(errno != EPIPE)
-                       PERROR("send");
-               return -1;
-       }
+       /* Send including the final \0 */
+       result = send_message_fd(conn->fd, req);
+       if(result != 1)
+               return result;
 
        if(!reply)
                return 1;
 
-       *reply = (char *) malloc(MSG_MAX+1);
-       result = recv(conn->fd, *reply, MSG_MAX, 0);
+       result = recv_message_fd(conn->fd, reply);
        if(result == -1) {
-               if(errno != ECONNRESET)
-                       PERROR("recv");
                return -1;
        }
        else if(result == 0) {
                return 0;
        }
        
-       (*reply)[result] = '\0';
-
        return 1;
 }
 
@@ -449,7 +490,7 @@ int ustcomm_connect_path(const char *path, struct ustcomm_connection *conn, pid_
 
        result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
        if(result == -1) {
-               PERROR("connect");
+               PERROR("connect (path=%s)", path);
                return -1;
        }
 
This page took 0.026784 seconds and 4 git commands to generate.