From: Julien Desfossez Date: Sun, 17 Jul 2011 16:46:08 +0000 (-0400) Subject: Make the receiving thread non blocking X-Git-Tag: v2.0-pre1~10 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=4de84ad900e5b6e739f3c36aa1b86746081333d7;p=lttng-tools.git Make the receiving thread non blocking Avoids the risk of blocking indefinitely on the command socket when the ltt-sessiond is not available and allow the signal handler to stop the process when necessary. Signed-off-by: Julien Desfossez --- diff --git a/liblttkconsumerd/liblttkconsumerd.c b/liblttkconsumerd/liblttkconsumerd.c index 5735f38b8..e11229abd 100644 --- a/liblttkconsumerd/liblttkconsumerd.c +++ b/liblttkconsumerd/liblttkconsumerd.c @@ -61,7 +61,6 @@ struct kconsumerd_global_data { unsigned int need_update; } kconsumerd_data = { .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), - .need_update = 1, }; /* communication with splice */ @@ -70,12 +69,8 @@ static int kconsumerd_thread_pipe[2]; /* pipe to wake the poll thread when necessary */ static int kconsumerd_poll_pipe[2]; -/* - * TODO: create a should_quit pipe to let the signal handler wake up the - * fd receiver thread. It should be initialized before any signal can be - * received by the library. - */ - +/* to let the signal handler wake up the fd receiver thread */ +static int kconsumerd_should_quit[2]; /* timeout parameter, to control the polling thread grace period */ static int kconsumerd_poll_timeout = -1; @@ -554,6 +549,32 @@ end: return ret; } +/* + * kconsumerd_poll_socket + * + * Poll on the should_quit pipe and the command socket + * return -1 on error and should exit, 0 if data is + * available on the command socket + */ +int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) +{ + int num_rdy; + + num_rdy = poll(kconsumerd_sockpoll, 2, -1); + if (num_rdy == -1) { + perror("Poll error"); + goto exit; + } + if (kconsumerd_sockpoll[0].revents == POLLIN) { + DBG("kconsumerd_should_quit wake up"); + goto exit; + } + return 0; + +exit: + return -1; +} + /* * kconsumerd_consumerd_recv_fd * @@ -561,7 +582,8 @@ end: * structures describing each fd (path name). * Returns the size of received data */ -static int kconsumerd_consumerd_recv_fd(int sfd, int size, +static int kconsumerd_consumerd_recv_fd(int sfd, + struct pollfd *kconsumerd_sockpoll, int size, enum kconsumerd_command cmd_type) { struct msghdr msg; @@ -588,6 +610,10 @@ static int kconsumerd_consumerd_recv_fd(int sfd, int size, msg.msg_controllen = sizeof(recv_fd); DBG("Waiting to receive fd"); + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + if ((ret = recvmsg(sfd, &msg, 0)) < 0) { perror("recvmsg"); continue; @@ -806,13 +832,34 @@ end: } /* - * kconsumerd_create_poll_pipe + * kconsumerd_init(void) * - * create the pipe to wake to polling thread when needed + * initialise the necessary environnement : + * - inform the polling thread to update the polling array + * - create the poll_pipe + * - create the should_quit pipe (for signal handler) */ -int kconsumerd_create_poll_pipe() +int kconsumerd_init(void) { - return pipe(kconsumerd_poll_pipe); + int ret; + + /* need to update the polling array at init time */ + kconsumerd_data.need_update = 1; + + ret = pipe(kconsumerd_poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + goto end; + } + + ret = pipe(kconsumerd_should_quit); + if (ret < 0) { + perror("Error creating recv pipe"); + goto end; + } + +end: + return ret; } /* @@ -825,6 +872,12 @@ void *kconsumerd_thread_receive_fds(void *data) { int sock, client_socket, ret; struct lttcomm_kconsumerd_header tmp; + /* + * structure to poll for incoming data on communication socket + * avoids making blocking sockets + */ + struct pollfd kconsumerd_sockpoll[2]; + DBG("Creating command socket %s", kconsumerd_command_sock_path); unlink(kconsumerd_command_sock_path); @@ -846,18 +899,46 @@ void *kconsumerd_thread_receive_fds(void *data) goto end; } - /* TODO: poll on socket and "should_quit" fd pipe */ - /* TODO: change blocking call into non-blocking call */ + ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* prepare the FDs to poll : to client socket and the should_quit pipe */ + kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0]; + kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; + kconsumerd_sockpoll[1].fd = client_socket; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Connection on client_socket"); + /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); if (sock <= 0) { WARN("On accept"); goto end; } + ret = fcntl(sock, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* update the polling structure to poll on the established socket */ + kconsumerd_sockpoll[1].fd = sock; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + while (1) { + if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Incoming fds on sock"); + /* We first get the number of fd we are about to receive */ - /* TODO: poll on sock and "should_quit" fd pipe */ - /* TODO: change recv into a non-blocking call */ ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); if (ret <= 0) { @@ -872,12 +953,15 @@ void *kconsumerd_thread_receive_fds(void *data) DBG("kconsumerd_thread_receive_fds received quit from signal"); goto end; } + /* we received a command to add or update fds */ - ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); + ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll, + tmp.payload_size, tmp.cmd_type); if (ret <= 0) { ERR("Receiving the FD, exiting"); goto end; } + DBG("received fds on sock"); } end: @@ -927,15 +1011,15 @@ void kconsumerd_cleanup(void) } /* + * kconsumerd_should_exit + * * Called from signal handler. */ void kconsumerd_should_exit(void) { + int ret; kconsumerd_quit = 1; - /* - * TODO: write into a should_quit pipe to wake up the fd - * receiver thread. - */ + ret = write(kconsumerd_should_quit[1], "4", 1); } /* diff --git a/liblttkconsumerd/liblttkconsumerd.h b/liblttkconsumerd/liblttkconsumerd.h index 5c74db25e..73cd29e27 100644 --- a/liblttkconsumerd/liblttkconsumerd.h +++ b/liblttkconsumerd/liblttkconsumerd.h @@ -57,8 +57,9 @@ struct kconsumerd_fd { unsigned long max_sb_size; /* the subbuffer size for this channel */ }; -int kconsumerd_create_poll_pipe(); +int kconsumerd_init(void); int kconsumerd_send_error(enum lttcomm_return_code cmd); +int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll); void *kconsumerd_thread_poll_fds(void *data); void *kconsumerd_thread_receive_fds(void *data); void kconsumerd_should_exit(void); diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index ade19399d..4180f8901 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -227,10 +227,9 @@ int main(int argc, char **argv) goto error; } - /* create the pipe to wake to polling thread when needed */ - ret = kconsumerd_create_poll_pipe(); + /* create the pipe to wake to receiving thread when needed */ + ret = kconsumerd_init(); if (ret < 0) { - perror("Error creating poll pipe"); goto end; }