unsigned int need_update;
} kconsumerd_data = {
.fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
- .need_update = 1,
};
/* communication with splice */
/* pipe to wake the poll thread when necessary */
static int kconsumerd_poll_pipe[2];
-/*
- * TODO: create a should_quit pipe to let the signal handler wake up the
- * fd receiver thread. It should be initialized before any signal can be
- * received by the library.
- */
-
+/* to let the signal handler wake up the fd receiver thread */
+static int kconsumerd_should_quit[2];
/* timeout parameter, to control the polling thread grace period */
static int kconsumerd_poll_timeout = -1;
return ret;
}
+/*
+ * kconsumerd_poll_socket
+ *
+ * Poll on the should_quit pipe and the command socket
+ * return -1 on error and should exit, 0 if data is
+ * available on the command socket
+ */
+int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
+{
+ int num_rdy;
+
+ num_rdy = poll(kconsumerd_sockpoll, 2, -1);
+ if (num_rdy == -1) {
+ perror("Poll error");
+ goto exit;
+ }
+ if (kconsumerd_sockpoll[0].revents == POLLIN) {
+ DBG("kconsumerd_should_quit wake up");
+ goto exit;
+ }
+ return 0;
+
+exit:
+ return -1;
+}
+
/*
* kconsumerd_consumerd_recv_fd
*
* structures describing each fd (path name).
* Returns the size of received data
*/
-static int kconsumerd_consumerd_recv_fd(int sfd, int size,
+static int kconsumerd_consumerd_recv_fd(int sfd,
+ struct pollfd *kconsumerd_sockpoll, int size,
enum kconsumerd_command cmd_type)
{
struct msghdr msg;
msg.msg_controllen = sizeof(recv_fd);
DBG("Waiting to receive fd");
+ if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+ goto end;
+ }
+
if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
perror("recvmsg");
continue;
}
/*
- * kconsumerd_create_poll_pipe
+ * kconsumerd_init(void)
*
- * create the pipe to wake to polling thread when needed
+ * initialise the necessary environnement :
+ * - inform the polling thread to update the polling array
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
*/
-int kconsumerd_create_poll_pipe()
+int kconsumerd_init(void)
{
- return pipe(kconsumerd_poll_pipe);
+ int ret;
+
+ /* need to update the polling array at init time */
+ kconsumerd_data.need_update = 1;
+
+ ret = pipe(kconsumerd_poll_pipe);
+ if (ret < 0) {
+ perror("Error creating poll pipe");
+ goto end;
+ }
+
+ ret = pipe(kconsumerd_should_quit);
+ if (ret < 0) {
+ perror("Error creating recv pipe");
+ goto end;
+ }
+
+end:
+ return ret;
}
/*
{
int sock, client_socket, ret;
struct lttcomm_kconsumerd_header tmp;
+ /*
+ * structure to poll for incoming data on communication socket
+ * avoids making blocking sockets
+ */
+ struct pollfd kconsumerd_sockpoll[2];
+
DBG("Creating command socket %s", kconsumerd_command_sock_path);
unlink(kconsumerd_command_sock_path);
goto end;
}
- /* TODO: poll on socket and "should_quit" fd pipe */
- /* TODO: change blocking call into non-blocking call */
+ ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ perror("fcntl O_NONBLOCK");
+ goto end;
+ }
+
+ /* prepare the FDs to poll : to client socket and the should_quit pipe */
+ kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
+ kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
+ kconsumerd_sockpoll[1].fd = client_socket;
+ kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
+ if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+ goto end;
+ }
+ DBG("Connection on client_socket");
+
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
if (sock <= 0) {
WARN("On accept");
goto end;
}
+ ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ perror("fcntl O_NONBLOCK");
+ goto end;
+ }
+
+ /* update the polling structure to poll on the established socket */
+ kconsumerd_sockpoll[1].fd = sock;
+ kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
while (1) {
+ if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+ goto end;
+ }
+ DBG("Incoming fds on sock");
+
/* We first get the number of fd we are about to receive */
- /* TODO: poll on sock and "should_quit" fd pipe */
- /* TODO: change recv into a non-blocking call */
ret = lttcomm_recv_unix_sock(sock, &tmp,
sizeof(struct lttcomm_kconsumerd_header));
if (ret <= 0) {
DBG("kconsumerd_thread_receive_fds received quit from signal");
goto end;
}
+
/* we received a command to add or update fds */
- ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
+ ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
+ tmp.payload_size, tmp.cmd_type);
if (ret <= 0) {
ERR("Receiving the FD, exiting");
goto end;
}
+ DBG("received fds on sock");
}
end:
}
/*
+ * kconsumerd_should_exit
+ *
* Called from signal handler.
*/
void kconsumerd_should_exit(void)
{
+ int ret;
kconsumerd_quit = 1;
- /*
- * TODO: write into a should_quit pipe to wake up the fd
- * receiver thread.
- */
+ ret = write(kconsumerd_should_quit[1], "4", 1);
}
/*