From: Julien Desfossez Date: Fri, 27 May 2011 22:07:48 +0000 (+0200) Subject: Don't quit when all FD hang up X-Git-Tag: v2.0-pre1~96 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=9d26659ae085e96066881c70d93a4929a9726070;p=lttng-tools.git Don't quit when all FD hang up We now wait for the STOP command to exit the polling thread. When we receive this command and all FD has hung up (all data is consumed) then we can exit cleanly. We also close every fd as soon as they report an error. Signed-off-by: Julien Desfossez --- diff --git a/include/lttng-kconsumerd.h b/include/lttng-kconsumerd.h index fa10ee25d..4f6fc8dc5 100644 --- a/include/lttng-kconsumerd.h +++ b/include/lttng-kconsumerd.h @@ -30,8 +30,8 @@ /* Commands for kconsumerd */ enum kconsumerd_command { ADD_STREAM, - UPDATE_STREAM, /* pause, delete, start depending on fd state */ - STOP, /* delete all */ + UPDATE_STREAM, /* pause, delete, active depending on fd state */ + STOP, /* inform the kconsumerd to quit when all fd has hang up */ }; /* State of each fd in consumerd */ diff --git a/kconsumerd/kconsumerd.c b/kconsumerd/kconsumerd.c index 69aa47a6e..5a1fe89a7 100644 --- a/kconsumerd/kconsumerd.c +++ b/kconsumerd/kconsumerd.c @@ -70,6 +70,9 @@ static int error_socket = -1; /* to count the number of time the user pressed ctrl+c */ static int sigintcount = 0; +/* flag to inform the polling thread to quit when all fd hung up */ +static int quit = 0; + /* Argument variables */ int opt_quiet; int opt_verbose; @@ -486,6 +489,7 @@ static int consumerd_recv_fd(int sfd, int size, } end: + DBG("consumerd_recv_fd thread exiting"); if (buf != NULL) { free(buf); buf = NULL; @@ -509,43 +513,50 @@ static void *thread_receive_fds(void *data) client_socket = lttcomm_create_unix_sock(command_sock_path); if (client_socket < 0) { ERR("Cannot create command socket"); - goto error; + goto end; } ret = lttcomm_listen_unix_sock(client_socket); if (ret < 0) { - goto error; + goto end; } DBG("Sending ready command to ltt-sessiond"); ret = send_error(KCONSUMERD_COMMAND_SOCK_READY); if (ret < 0) { ERR("Error sending ready command to ltt-sessiond"); - goto error; + goto end; } /* Blocking call, waiting for transmission */ sock = lttcomm_accept_unix_sock(client_socket); if (sock <= 0) { WARN("On accept"); - goto error; + goto end; } while (1) { /* We first get the number of fd we are about to receive */ ret = lttcomm_recv_unix_sock(sock, &tmp, sizeof(struct lttcomm_kconsumerd_header)); if (ret <= 0) { - ERR("Receiving the lttcomm_kconsumerd_header, exiting"); - goto error; + ERR("Communication interrupted on command socket"); + goto end; } + if (tmp.cmd_type == STOP) { + DBG("Received STOP command"); + quit = 1; + goto end; + } + /* we received a command to add or update fds */ ret = consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type); if (ret <= 0) { ERR("Receiving the FD, exiting"); - goto error; + goto end; } } -error: +end: + DBG("thread_receive_fds exiting"); return NULL; } @@ -575,8 +586,6 @@ static int update_poll_array(struct pollfd **pollfd, (*pollfd)[i].events = POLLIN | POLLPRI; local_kconsumerd_fd[i] = iter; i++; - } else if (iter->state == DELETE_FD) { - del_fd(iter); } } /* @@ -681,16 +690,20 @@ static void *thread_poll_fds(void *data) switch(pollfd[i].revents) { case POLLERR: ERR("Error returned in polling fd %d.", pollfd[i].fd); + del_fd(local_kconsumerd_fd[i]); + update_fd_array = 1; num_hup++; - send_error(KCONSUMERD_POLL_ERROR); break; case POLLHUP: ERR("Polling fd %d tells it has hung up.", pollfd[i].fd); + del_fd(local_kconsumerd_fd[i]); + update_fd_array = 1; num_hup++; break; case POLLNVAL: ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - send_error(KCONSUMERD_POLL_NVAL); + del_fd(local_kconsumerd_fd[i]); + update_fd_array = 1; num_hup++; break; case POLLPRI: @@ -708,8 +721,10 @@ static void *thread_poll_fds(void *data) /* If every buffer FD has hung up, we end the read loop here */ if (nb_fd > 0 && num_hup == nb_fd) { DBG("every buffer FD has hung up\n"); - send_error(KCONSUMERD_POLL_HUP); - goto end; + if (quit == 1) { + goto end; + } + continue; } /* Take care of low priority channels. */ @@ -727,6 +742,7 @@ static void *thread_poll_fds(void *data) } } end: + DBG("polling thread exiting"); if (pollfd != NULL) { free(pollfd); pollfd = NULL; diff --git a/liblttsessiondcomm/liblttsessiondcomm.h b/liblttsessiondcomm/liblttsessiondcomm.h index b563f6a7b..cb9e26dd8 100644 --- a/liblttsessiondcomm/liblttsessiondcomm.h +++ b/liblttsessiondcomm/liblttsessiondcomm.h @@ -169,8 +169,7 @@ struct lttcomm_lttng_msg { */ struct lttcomm_kconsumerd_header { u32 payload_size; - u32 cmd_type; /* enum lttcomm_consumerd_command */ - u32 ret_code; /* enum lttcomm_return_code */ + u32 cmd_type; /* enum kconsumerd_command */ }; /* lttcomm_kconsumerd_msg represents a file descriptor to consume the