1 /* Copyright (C) 2009 Pierre-Marc Fournier
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
8 * This library is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 /* API used by UST components to communicate with each other via sockets. */
21 #include <sys/types.h>
24 #include <sys/socket.h>
38 #include "multipoll.h"
40 #define UNIX_PATH_MAX 108
42 static int mkdir_p(const char *path
, mode_t mode
)
51 tmp
= zmalloc(strlen(path
) + 1);
60 while (*path_p
!= '/') {
66 strncpy(tmp
, path
, path_p
- path
);
67 tmp
[path_p
-path
] = '\0';
68 if (tmp
[path_p
- path
- 1] != '/') {
69 result
= mkdir(tmp
, mode
);
71 if (!(errno
== EEXIST
|| errno
== EACCES
|| errno
== EROFS
)) {
72 /* Then this is a real error */
82 result
= mkdir(path
, mode
);
94 static int signal_process(pid_t pid
)
99 void ustcomm_init_connection(struct ustcomm_connection
*conn
)
101 conn
->recv_buf
= NULL
;
102 conn
->recv_buf_size
= 0;
103 conn
->recv_buf_alloc
= 0;
106 int pid_is_online(pid_t pid
) {
112 * @fd: file descriptor to send to
113 * @msg: a null-terminated string containing the message to send
117 * 0: connection closed
121 static int send_message_fd(int fd
, const char *msg
)
125 /* Send including the final \0 */
126 result
= patient_send(fd
, msg
, strlen(msg
)+1, MSG_NOSIGNAL
);
132 else if(result
== 0) {
136 DBG("sent message \"%s\"", msg
);
140 /* Called by an app to ask the consumer daemon to connect to it. */
142 int ustcomm_request_consumer(pid_t pid
, const char *channel
)
144 char path
[UNIX_PATH_MAX
];
148 struct ustcomm_connection conn
;
149 char *explicit_daemon_socket_path
;
151 explicit_daemon_socket_path
= getenv("UST_DAEMON_SOCKET");
152 if(explicit_daemon_socket_path
) {
153 /* user specified explicitly a socket path */
154 result
= snprintf(path
, UNIX_PATH_MAX
, "%s", explicit_daemon_socket_path
);
157 /* just use the default path */
158 result
= snprintf(path
, UNIX_PATH_MAX
, "%s/ustd", SOCK_DIR
);
161 if(result
>= UNIX_PATH_MAX
) {
162 ERR("string overflow allocating socket name");
166 if (asprintf(&msg
, "collect %d %s", pid
, channel
) < 0) {
167 ERR("ustcomm_request_consumer : asprintf failed (collect %d/%s)",
172 /* don't signal it because it's the daemon */
173 result
= ustcomm_connect_path(path
, &conn
, -1);
175 WARN("ustcomm_connect_path failed");
180 result
= ustcomm_send_request(&conn
, msg
, NULL
);
182 WARN("ustcomm_send_request failed");
188 ustcomm_disconnect(&conn
);
195 /* returns 1 to indicate a message was received
196 * returns 0 to indicate no message was received (end of stream)
197 * returns -1 to indicate an error
200 #define RECV_INCREMENT 1000
201 #define RECV_INITIAL_BUF_SIZE 10
203 static int recv_message_fd(int fd
, char **recv_buf
, int *recv_buf_size
, int *recv_buf_alloc
, char **msg
)
207 /* 1. Check if there is a message in the buf */
209 2.1 receive chunk and put it in buffer
210 2.2 process full message if there is one
211 -- while no message arrived
218 /* Search for full message in buffer */
219 for(i
=0; i
<*recv_buf_size
; i
++) {
220 if((*recv_buf
)[i
] == '\0') {
226 /* Process found message */
232 WARN("received empty message");
234 *msg
= strndup(*recv_buf
, i
);
236 /* Remove processed message from buffer */
237 newbuf
= (char *) malloc(*recv_buf_size
- (i
+1));
238 memcpy(newbuf
, *recv_buf
+ (i
+1), *recv_buf_size
- (i
+1));
241 *recv_buf_size
-= (i
+1);
242 *recv_buf_alloc
-= (i
+1);
247 /* Receive a chunk from the fd */
248 if(*recv_buf_alloc
- *recv_buf_size
< RECV_INCREMENT
) {
249 *recv_buf_alloc
+= RECV_INCREMENT
- (*recv_buf_alloc
- *recv_buf_size
);
250 *recv_buf
= (char *) realloc(*recv_buf
, *recv_buf_alloc
);
253 result
= recv(fd
, *recv_buf
+*recv_buf_size
, RECV_INCREMENT
, 0);
255 if(errno
== ECONNRESET
) {
259 else if(errno
== EINTR
) {
270 *recv_buf_size
+= result
;
272 /* Go back to the beginning to check if there is a full message in the buffer */
275 DBG("received message \"%s\"", *recv_buf
);
281 static int recv_message_conn(struct ustcomm_connection
*conn
, char **msg
)
283 return recv_message_fd(conn
->fd
, &conn
->recv_buf
, &conn
->recv_buf_size
, &conn
->recv_buf_alloc
, msg
);
286 int ustcomm_send_reply(struct ustcomm_server
*server
, char *msg
, struct ustcomm_source
*src
)
290 result
= send_message_fd(src
->fd
, msg
);
292 ERR("error in send_message_fd");
299 /* Called after a fork. */
301 int ustcomm_close_all_connections(struct ustcomm_server
*server
)
303 struct ustcomm_connection
*conn
;
304 struct ustcomm_connection
*deletable_conn
= NULL
;
306 list_for_each_entry(conn
, &server
->connections
, list
) {
307 free(deletable_conn
);
308 deletable_conn
= conn
;
309 ustcomm_close_app(conn
);
310 list_del(&conn
->list
);
316 /* @timeout: max blocking time in milliseconds, -1 means infinity
318 * returns 1 to indicate a message was received
319 * returns 0 to indicate no message was received
320 * returns -1 to indicate an error
323 int ustcomm_recv_message(struct ustcomm_server
*server
, char **msg
, struct ustcomm_source
*src
, int timeout
)
326 struct ustcomm_connection
**conn_table
;
327 struct ustcomm_connection
*conn
;
335 list_for_each_entry(conn
, &server
->connections
, list
) {
339 fds
= (struct pollfd
*) zmalloc(n_fds
* sizeof(struct pollfd
));
341 ERR("zmalloc returned NULL");
345 conn_table
= (struct ustcomm_connection
**) zmalloc(n_fds
* sizeof(struct ustcomm_connection
*));
346 if(conn_table
== NULL
) {
347 ERR("zmalloc returned NULL");
349 goto free_fds_return
;
352 /* special idx 0 is for listening socket */
353 fds
[idx
].fd
= server
->listen_fd
;
354 fds
[idx
].events
= POLLIN
;
357 list_for_each_entry(conn
, &server
->connections
, list
) {
358 fds
[idx
].fd
= conn
->fd
;
359 fds
[idx
].events
= POLLIN
;
360 conn_table
[idx
] = conn
;
364 result
= poll(fds
, n_fds
, timeout
);
365 if(result
== -1 && errno
== EINTR
) {
366 /* That's ok. ustd receives signals to notify it must shutdown. */
368 goto free_conn_table_return
;
370 else if(result
== -1) {
373 goto free_conn_table_return
;
375 else if(result
== 0) {
377 goto free_conn_table_return
;
381 struct ustcomm_connection
*newconn
;
384 result
= newfd
= accept(server
->listen_fd
, NULL
, NULL
);
388 goto free_conn_table_return
;
391 newconn
= (struct ustcomm_connection
*) zmalloc(sizeof(struct ustcomm_connection
));
392 if(newconn
== NULL
) {
393 ERR("zmalloc returned NULL");
397 ustcomm_init_connection(newconn
);
400 list_add(&newconn
->list
, &server
->connections
);
403 for(idx
=1; idx
<n_fds
; idx
++) {
404 if(fds
[idx
].revents
) {
405 retval
= recv_message_conn(conn_table
[idx
], msg
);
407 src
->fd
= fds
[idx
].fd
;
410 /* connection finished */
411 list_for_each_entry(conn
, &server
->connections
, list
) {
412 if(conn
->fd
== fds
[idx
].fd
) {
413 ustcomm_close_app(conn
);
414 list_del(&conn
->list
);
421 goto free_conn_table_return
;
430 free_conn_table_return
:
437 int ustcomm_ustd_recv_message(struct ustcomm_ustd
*ustd
, char **msg
, struct ustcomm_source
*src
, int timeout
)
439 return ustcomm_recv_message(&ustd
->server
, msg
, src
, timeout
);
442 int ustcomm_app_recv_message(struct ustcomm_app
*app
, char **msg
, struct ustcomm_source
*src
, int timeout
)
444 return ustcomm_recv_message(&app
->server
, msg
, src
, timeout
);
447 /* This removes src from the list of active connections of app.
450 int ustcomm_app_detach_client(struct ustcomm_app
*app
, struct ustcomm_source
*src
)
452 struct ustcomm_server
*server
= (struct ustcomm_server
*)app
;
453 struct ustcomm_connection
*conn
;
455 list_for_each_entry(conn
, &server
->connections
, list
) {
456 if(conn
->fd
== src
->fd
) {
457 list_del(&conn
->list
);
467 static int init_named_socket(const char *name
, char **path_out
)
472 struct sockaddr_un addr
;
474 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
480 addr
.sun_family
= AF_UNIX
;
482 strncpy(addr
.sun_path
, name
, UNIX_PATH_MAX
);
483 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
485 result
= access(name
, F_OK
);
488 result
= unlink(name
);
490 PERROR("unlink of socket file");
493 DBG("socket already exists; overwriting");
496 result
= bind(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
502 result
= listen(fd
, 1);
509 *path_out
= strdup(addr
.sun_path
);
522 * 0: Success, but no reply because recv() returned 0
526 * On error, the error message is printed, except on
527 * ECONNRESET, which is normal when the application dies.
530 int ustcomm_send_request(struct ustcomm_connection
*conn
, const char *req
, char **reply
)
534 /* Send including the final \0 */
535 result
= send_message_fd(conn
->fd
, req
);
542 result
= recv_message_conn(conn
, reply
);
546 else if(result
== 0) {
558 int ustcomm_connect_path(const char *path
, struct ustcomm_connection
*conn
, pid_t signalpid
)
562 struct sockaddr_un addr
;
564 ustcomm_init_connection(conn
);
566 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
572 addr
.sun_family
= AF_UNIX
;
574 result
= snprintf(addr
.sun_path
, UNIX_PATH_MAX
, "%s", path
);
575 if(result
>= UNIX_PATH_MAX
) {
576 ERR("string overflow allocating socket name");
581 result
= signal_process(signalpid
);
583 ERR("could not signal process");
588 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
590 PERROR("connect (path=%s)", path
);
599 int ustcomm_disconnect(struct ustcomm_connection
*conn
)
601 return close(conn
->fd
);
604 /* Open a connection to a traceable app.
611 int ustcomm_connect_app(pid_t pid
, struct ustcomm_connection
*conn
)
614 char path
[UNIX_PATH_MAX
];
617 result
= snprintf(path
, UNIX_PATH_MAX
, "%s/%d", SOCK_DIR
, pid
);
618 if(result
>= UNIX_PATH_MAX
) {
619 ERR("string overflow allocating socket name");
623 return ustcomm_connect_path(path
, conn
, pid
);
626 /* Close a connection to a traceable app. It frees the
627 * resources. It however does not free the
628 * ustcomm_connection itself.
631 int ustcomm_close_app(struct ustcomm_connection
*conn
)
634 free(conn
->recv_buf
);
639 static int ensure_dir_exists(const char *dir
)
647 result
= stat(dir
, &st
);
648 if(result
== -1 && errno
!= ENOENT
) {
651 else if(result
== -1) {
655 /* mkdir mode to 0777 */
656 result
= mkdir_p(dir
, S_IRWXU
| S_IRWXG
| S_IRWXO
);
658 ERR("executing in recursive creation of directory %s", dir
);
666 /* Called by an application to initialize its server so daemons can
670 int ustcomm_init_app(pid_t pid
, struct ustcomm_app
*handle
)
675 result
= asprintf(&name
, "%s/%d", SOCK_DIR
, (int)pid
);
676 if(result
>= UNIX_PATH_MAX
) {
677 ERR("string overflow allocating socket name");
681 result
= ensure_dir_exists(SOCK_DIR
);
683 ERR("Unable to create socket directory %s", SOCK_DIR
);
687 handle
->server
.listen_fd
= init_named_socket(name
, &(handle
->server
.socketpath
));
688 if(handle
->server
.listen_fd
< 0) {
689 ERR("Error initializing named socket (%s). Check that directory exists and that it is writable.", name
);
694 INIT_LIST_HEAD(&handle
->server
.connections
);
703 /* Used by the daemon to initialize its server so applications
707 int ustcomm_init_ustd(struct ustcomm_ustd
*handle
, const char *sock_path
)
713 if (asprintf(&name
, "%s", sock_path
) < 0) {
714 ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
722 /* Only check if socket dir exists if we are using the default directory */
723 result
= ensure_dir_exists(SOCK_DIR
);
725 ERR("Unable to create socket directory %s", SOCK_DIR
);
729 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustd") < 0) {
730 ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
736 handle
->server
.listen_fd
= init_named_socket(name
, &handle
->server
.socketpath
);
737 if(handle
->server
.listen_fd
< 0) {
738 ERR("error initializing named socket at %s", name
);
743 INIT_LIST_HEAD(&handle
->server
.connections
);
751 static void ustcomm_fini_server(struct ustcomm_server
*server
, int keep_socket_file
)
756 if(!keep_socket_file
) {
758 result
= stat(server
->socketpath
, &st
);
760 PERROR("stat (%s)", server
->socketpath
);
764 /* Paranoid check before deleting. */
765 result
= S_ISSOCK(st
.st_mode
);
767 ERR("The socket we are about to delete is not a socket.");
771 result
= unlink(server
->socketpath
);
777 free(server
->socketpath
);
779 result
= close(server
->listen_fd
);
786 /* Free a traceable application server */
788 void ustcomm_fini_app(struct ustcomm_app
*handle
, int keep_socket_file
)
790 ustcomm_fini_server(&handle
->server
, keep_socket_file
);
793 /* Free a ustd server */
795 void ustcomm_fini_ustd(struct ustcomm_ustd
*handle
)
797 ustcomm_fini_server(&handle
->server
, 0);
800 static const char *find_tok(const char *str
)
812 static const char *find_sep(const char *str
)
824 int nth_token_is(const char *str
, const char *token
, int tok_no
)
830 for(i
=0; i
<=tok_no
; i
++) {
844 if(end
-start
!= strlen(token
))
847 if(strncmp(start
, token
, end
-start
))
853 char *nth_token(const char *str
, int tok_no
)
855 static char *retval
= NULL
;
860 for(i
=0; i
<=tok_no
; i
++) {
879 if (asprintf(&retval
, "%.*s", (int)(end
-start
), start
) < 0) {
880 ERR("nth_token : asprintf failed (%.*s)",
881 (int)(end
-start
), start
);
888 /* Callback from multipoll.
889 * Receive a new connection on the listening socket.
892 static int process_mp_incoming_conn(void *priv
, int fd
, short events
)
894 struct ustcomm_connection
*newconn
;
895 struct ustcomm_server
*server
= (struct ustcomm_server
*) priv
;
899 result
= newfd
= accept(server
->listen_fd
, NULL
, NULL
);
905 newconn
= (struct ustcomm_connection
*) zmalloc(sizeof(struct ustcomm_connection
));
906 if(newconn
== NULL
) {
907 ERR("zmalloc returned NULL");
911 ustcomm_init_connection(newconn
);
914 list_add(&newconn
->list
, &server
->connections
);
919 /* Callback from multipoll.
920 * Receive a message on an existing connection.
923 static int process_mp_conn_msg(void *priv
, int fd
, short revents
)
925 struct ustcomm_multipoll_conn_info
*mpinfo
= (struct ustcomm_multipoll_conn_info
*) priv
;
928 struct ustcomm_source src
;
933 result
= recv_message_conn(mpinfo
->conn
, &msg
);
935 ERR("error in recv_message_conn");
938 else if(result
== 0) {
939 /* connection finished */
940 ustcomm_close_app(mpinfo
->conn
);
941 list_del(&mpinfo
->conn
->list
);
945 mpinfo
->cb(msg
, &src
);
953 int free_ustcomm_client_poll(void *data
)
959 void ustcomm_mp_add_app_clients(struct mpentries
*ent
, struct ustcomm_app
*app
, int (*cb
)(char *recvbuf
, struct ustcomm_source
*src
))
961 struct ustcomm_connection
*conn
;
963 /* add listener socket */
964 multipoll_add(ent
, app
->server
.listen_fd
, POLLIN
, process_mp_incoming_conn
, &app
->server
, NULL
);
966 list_for_each_entry(conn
, &app
->server
.connections
, list
) {
967 struct ustcomm_multipoll_conn_info
*mpinfo
= (struct ustcomm_multipoll_conn_info
*) zmalloc(sizeof(struct ustcomm_multipoll_conn_info
));
970 multipoll_add(ent
, conn
->fd
, POLLIN
, process_mp_conn_msg
, mpinfo
, free_ustcomm_client_poll
);