From: Mathieu Desnoyers Date: Wed, 9 Sep 2015 15:56:33 +0000 (-0400) Subject: Fix: LPOLLHUP and LPOLLERR when there is still data in pipe/socket X-Git-Tag: v2.6.1~15 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=584fc280d529740882a24c87a431e547b5acf8e0;p=lttng-tools.git Fix: LPOLLHUP and LPOLLERR when there is still data in pipe/socket The event mask returned by poll/epoll is a bitwise mask made of all the events observed. On bidirectional sockets, there are cases where combinations of LPOLLHUP/LPOLLERR and LPOLLIN/LPOLLPRI can be raised at the same time. Currently the overall behavior in sessiond, consumerd and relayd is to handle LPOLLHUP or LPOLLERR immediately, whether or not there is still data to read in the socket. Unfortunately, this behavior may discard the last information made available on the pipe or socket. Audit all uses of LPOLLHUP and LPOLLERR on sockets on which we expect data to ensure that we deal with LPOLLIN or LPOLLPRI, and catch the hangup when read or recvmsg returns 0. Keep the LPOLLHUP and LPOLLERR handling, but only when LPOLLIN is not raised, just in case some unforeseen error happens when sending the reply. This is one correct case where we can handle LPOLLHUP and LPOLLERR directly without caring about LPOLLIN: sockets where we are expected to write and then read the reply (e.g. command sockets). It is then OK for a dedicated thread to watch for LPOLLHUP and LPOLLERR. Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-consumerd/health-consumerd.c b/src/bin/lttng-consumerd/health-consumerd.c index e8afd963f..93737c5d3 100644 --- a/src/bin/lttng-consumerd/health-consumerd.c +++ b/src/bin/lttng-consumerd/health-consumerd.c @@ -274,7 +274,8 @@ restart: /* Event on the registration socket */ if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { ERR("Health socket poll error"); goto error; } diff --git a/src/bin/lttng-relayd/health-relayd.c b/src/bin/lttng-relayd/health-relayd.c index 1feaaeb37..0deb3740a 100644 --- a/src/bin/lttng-relayd/health-relayd.c +++ b/src/bin/lttng-relayd/health-relayd.c @@ -344,9 +344,14 @@ restart: /* Event on the registration socket */ if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Health socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index f8d8ec975..8a7485399 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -541,10 +541,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * viewer connection is allocated in this @@ -587,6 +584,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&viewer_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1907,10 +1910,7 @@ restart: /* Inspect the relay conn pipe for new connection. */ if (pollfd == live_conn_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Relay live pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { struct relay_connection *conn; ret = lttng_read(live_conn_pipe[0], @@ -1922,6 +1922,12 @@ restart: LPOLLIN | LPOLLRDHUP); connection_ht_add(viewer_connections_ht, conn); DBG("Connection socket %d added to poll", conn->sock->fd); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Relay live pipe error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } else { /* Connection activity. */ @@ -1932,11 +1938,7 @@ restart: continue; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - cleanup_connection_pollfd(&events, pollfd); - /* Put "create" ownership reference. */ - connection_put(conn); - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr, sizeof(recv_hdr), 0); if (ret <= 0) { @@ -1955,6 +1957,14 @@ restart: DBG("Viewer connection closed with %d", pollfd); } } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + cleanup_connection_pollfd(&events, pollfd); + /* Put "create" ownership reference. */ + connection_put(conn); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + connection_put(conn); + goto error; } /* Put local "get_by_sock" reference. */ connection_put(conn); diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 936a259b0..30c7bd167 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -832,10 +832,7 @@ restart: goto exit; } - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { /* * A new connection is requested, therefore a * sessiond/consumerd connection is allocated in @@ -887,6 +884,12 @@ restart: * exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c index de5fd8de8..31e3e8d3c 100644 --- a/src/bin/lttng-sessiond/agent-thread.c +++ b/src/bin/lttng-sessiond/agent-thread.c @@ -295,35 +295,22 @@ restart: goto exit; } - /* - * Check first if this is a POLLERR since POLLIN is also included - * in an error value thus checking first. - */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - /* Removing from the poll set */ - ret = lttng_poll_del(&events, pollfd); - if (ret < 0) { - goto error; - } - - agent_destroy_app_by_sock(pollfd); - } else if (revents & (LPOLLIN)) { + if (revents & LPOLLIN) { int new_fd; struct agent_app *app = NULL; - /* Pollin event of agent app socket should NEVER happen. */ assert(pollfd == reg_sock->fd); - new_fd = handle_registration(reg_sock, &app); if (new_fd < 0) { - WARN("[agent-thread] agent registration failed. Ignoring."); - /* Somehow the communication failed. Just continue. */ continue; } /* Should not have a NULL app on success. */ assert(app); - /* Only add poll error event to only detect shutdown. */ + /* + * Since this is a command socket (write then read), + * only add poll error event to only detect shutdown. + */ ret = lttng_poll_add(&events, new_fd, LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { @@ -335,10 +322,26 @@ restart: update_agent_app(app); /* On failure, the poll will detect it and clean it up. */ - (void) agent_send_registration_done(app); + ret = agent_send_registration_done(app); + if (ret < 0) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, new_fd); + if (ret < 0) { + goto error; + } + agent_destroy_app_by_sock(new_fd); + continue; + } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } + agent_destroy_app_by_sock(pollfd); } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); - continue; + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } diff --git a/src/bin/lttng-sessiond/ht-cleanup.c b/src/bin/lttng-sessiond/ht-cleanup.c index 57d6aec11..c477e1a64 100644 --- a/src/bin/lttng-sessiond/ht-cleanup.c +++ b/src/bin/lttng-sessiond/ht-cleanup.c @@ -106,32 +106,31 @@ restart: } assert(pollfd == ht_cleanup_pipe[0]); - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + /* Get socket from dispatch thread. */ + size_ret = lttng_read(ht_cleanup_pipe[0], &ht, + sizeof(ht)); + if (size_ret < sizeof(ht)) { + PERROR("ht cleanup notify pipe"); + goto error; + } + health_code_update(); + /* + * The whole point of this thread is to call + * lttng_ht_destroy from a context that is NOT: + * 1) a read-side RCU lock, + * 2) a call_rcu thread. + */ + lttng_ht_destroy(ht); + + health_code_update(); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("ht cleanup pipe error"); goto error; - } else if (!(revents & LPOLLIN)) { - /* No POLLIN and not a catched error, stop the thread. */ - ERR("ht cleanup failed. revent: %u", revents); - goto error; - } - - /* Get socket from dispatch thread. */ - size_ret = lttng_read(ht_cleanup_pipe[0], &ht, - sizeof(ht)); - if (size_ret < sizeof(ht)) { - PERROR("ht cleanup notify pipe"); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); goto error; } - health_code_update(); - /* - * The whole point of this thread is to call - * lttng_ht_destroy from a context that is NOT: - * 1) a read-side RCU lock, - * 2) a call_rcu thread. - */ - lttng_ht_destroy(ht); - - health_code_update(); } } diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 022adf33e..b87915536 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -1051,31 +1051,33 @@ static void *thread_manage_kernel(void *data) } /* Check for data on kernel pipe */ - if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) { - (void) lttng_read(kernel_poll_pipe[0], - &tmp, 1); - /* - * Ret value is useless here, if this pipe gets any actions an - * update is required anyway. - */ - update_poll_flag = 1; - continue; - } else { - /* - * New CPU detected by the kernel. Adding kernel stream to - * kernel session and updating the kernel consumer - */ - if (revents & LPOLLIN) { + if (revents & LPOLLIN) { + if (pollfd == kernel_poll_pipe[0]) { + (void) lttng_read(kernel_poll_pipe[0], + &tmp, 1); + /* + * Ret value is useless here, if this pipe gets any actions an + * update is required anyway. + */ + update_poll_flag = 1; + continue; + } else { + /* + * New CPU detected by the kernel. Adding kernel stream to + * kernel session and updating the kernel consumer + */ ret = update_kernel_stream(&kconsumer_data, pollfd); if (ret < 0) { continue; } break; - /* - * TODO: We might want to handle the LPOLLERR | LPOLLHUP - * and unregister kernel stream at this point. - */ } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + update_poll_flag = 1; + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1205,9 +1207,14 @@ restart: /* Event on the registration socket */ if (pollfd == consumer_data->err_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("consumer err socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -1337,7 +1344,8 @@ restart_poll: if (pollfd == sock) { /* Event on the consumerd socket */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { ERR("consumer err socket second poll error"); goto error; } @@ -1355,6 +1363,11 @@ restart_poll: goto exit; } else if (pollfd == consumer_data->metadata_fd) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP) + && !(revents & LPOLLIN)) { + ERR("consumer err metadata socket second poll error"); + goto error; + } /* UST metadata requests */ ret = ust_consumer_metadata_request( &consumer_data->metadata_sock); @@ -1523,10 +1536,7 @@ static void *thread_manage_apps(void *data) /* Inspect the apps cmd pipe */ if (pollfd == apps_cmd_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Apps command pipe error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { int sock; /* Empty pipe */ @@ -1539,9 +1549,8 @@ static void *thread_manage_apps(void *data) health_code_update(); /* - * We only monitor the error events of the socket. This - * thread does not handle any incoming data from UST - * (POLLIN). + * Since this is a command socket (write then read), + * we only monitor the error events of the socket. */ ret = lttng_poll_add(&events, sock, LPOLLERR | LPOLLHUP | LPOLLRDHUP); @@ -1550,6 +1559,12 @@ static void *thread_manage_apps(void *data) } DBG("Apps with sock %d added to poll set", sock); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps command pipe error"); + goto error; + } else { + ERR("Unknown poll events %u for sock %d", revents, pollfd); + goto error; } } else { /* @@ -1565,6 +1580,9 @@ static void *thread_manage_apps(void *data) /* Socket closed on remote end. */ ust_app_unregister(pollfd); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } @@ -1706,6 +1724,9 @@ static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) ust_app_destroy(wait_node->app); free(wait_node); break; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -2066,10 +2087,7 @@ static void *thread_registration_apps(void *data) /* Event on the registration socket */ if (pollfd == apps_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Register apps socket poll error"); - goto error; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { sock = lttcomm_accept_unix_sock(apps_sock); if (sock < 0) { goto error; @@ -2156,6 +2174,12 @@ static void *thread_registration_apps(void *data) * barrier with the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&ust_cmd_queue.futex); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Register apps socket poll error"); + goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -3958,9 +3982,14 @@ restart: /* Event on the registration socket */ if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Health socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } @@ -4135,9 +4164,14 @@ static void *thread_manage_clients(void *data) /* Event on the registration socket */ if (pollfd == client_sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + continue; + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Client socket poll error"); goto error; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } } } diff --git a/src/bin/lttng-sessiond/ust-thread.c b/src/bin/lttng-sessiond/ust-thread.c index 35f82e391..272720fb9 100644 --- a/src/bin/lttng-sessiond/ust-thread.c +++ b/src/bin/lttng-sessiond/ust-thread.c @@ -56,7 +56,8 @@ void *ust_thread_manage_notify(void *data) } /* Add notify pipe to the pollset. */ - ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0], LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&events, apps_cmd_notify_pipe[0], + LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); if (ret < 0) { goto error; } @@ -108,45 +109,56 @@ restart: if (pollfd == apps_cmd_notify_pipe[0]) { int sock; - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & LPOLLIN) { + /* Get socket from dispatch thread. */ + size_ret = lttng_read(apps_cmd_notify_pipe[0], + &sock, sizeof(sock)); + if (size_ret < sizeof(sock)) { + PERROR("read apps notify pipe"); + goto error; + } + health_code_update(); + + ret = lttng_poll_add(&events, sock, + LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + /* + * It's possible we've reached the max poll fd allowed. + * Let's close the socket but continue normal execution. + */ + ret = close(sock); + if (ret) { + PERROR("close notify socket %d", sock); + } + lttng_fd_put(LTTNG_FD_APPS, 1); + continue; + } + DBG3("UST thread notify added sock %d to pollset", sock); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Apps notify command pipe error"); goto error; - } else if (!(revents & LPOLLIN)) { - /* No POLLIN and not a catched error, stop the thread. */ - ERR("Notify command pipe failed. revent: %u", revents); - goto error; - } - - /* Get socket from dispatch thread. */ - size_ret = lttng_read(apps_cmd_notify_pipe[0], - &sock, sizeof(sock)); - if (size_ret < sizeof(sock)) { - PERROR("read apps notify pipe"); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); goto error; } - health_code_update(); - - ret = lttng_poll_add(&events, sock, - LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP); - if (ret < 0) { - /* - * It's possible we've reached the max poll fd allowed. - * Let's close the socket but continue normal execution. - */ - ret = close(sock); - if (ret) { - PERROR("close notify socket %d", sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - continue; - } - DBG3("UST thread notify added sock %d to pollset", sock); } else { /* * At this point, we know that a registered application * triggered the event. */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + if (revents & (LPOLLIN | LPOLLPRI)) { + ret = ust_app_recv_notify(pollfd); + if (ret < 0) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } + + /* The socket is closed after a grace period here. */ + ust_app_notify_sock_unregister(pollfd); + } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { /* Removing from the poll set */ ret = lttng_poll_del(&events, pollfd); if (ret < 0) { @@ -155,22 +167,9 @@ restart: /* The socket is closed after a grace period here. */ ust_app_notify_sock_unregister(pollfd); - } else if (revents & (LPOLLIN | LPOLLPRI)) { - ret = ust_app_recv_notify(pollfd); - if (ret < 0) { - /* - * If the notification failed either the application is - * dead or an internal error happened. In both cases, - * we can only continue here. If the application is - * dead, an unregistration will follow or else the - * application will notice that we are not responding - * on that socket and will close it. - */ - continue; - } } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); - continue; + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; } health_code_update(); } diff --git a/src/common/consumer.c b/src/common/consumer.c index 0b59c511c..eed346d06 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -2217,26 +2217,22 @@ restart: } if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { - if (revents & (LPOLLERR | LPOLLHUP )) { - DBG("Metadata thread pipe hung up"); - /* - * Remove the pipe from the poll set and continue the loop - * since their might be data to consume. - */ - lttng_poll_del(&events, - lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); - lttng_pipe_read_close(ctx->consumer_metadata_pipe); - continue; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { ssize_t pipe_len; pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, &stream, sizeof(stream)); if (pipe_len < sizeof(stream)) { - PERROR("read metadata stream"); + if (pipe_len < 0) { + PERROR("read metadata stream"); + } /* - * Continue here to handle the rest of the streams. + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); continue; } @@ -2253,6 +2249,19 @@ restart: /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI | LPOLLHUP); + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Metadata thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; } /* Handle other stream */ @@ -2271,8 +2280,30 @@ restart: stream = caa_container_of(node, struct lttng_consumer_stream, node); - /* Check for error event */ - if (revents & (LPOLLERR | LPOLLHUP)) { + if (revents & (LPOLLIN | LPOLLPRI)) { + /* Get the data out of the metadata file descriptor */ + DBG("Metadata available on fd %d", pollfd); + assert(stream->wait_fd == pollfd); + + do { + health_code_update(); + + len = ctx->on_buffer_ready(stream, ctx); + /* + * We don't check the return value here since if we get + * a negative len, it means an error occured thus we + * simply remove it from the poll set and free the + * stream. + */ + } while (len > 0); + + /* It's ok to have an unavailable sub-buffer */ + if (len < 0 && len != -EAGAIN && len != -ENODATA) { + /* Clean up stream from consumer and free it. */ + lttng_poll_del(&events, stream->wait_fd); + consumer_del_metadata_stream(stream, metadata_ht); + } + } else if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Metadata fd %d is hup|err.", pollfd); if (!stream->hangup_flush_done && (consumer_data.type == LTTNG_CONSUMER32_UST @@ -2300,31 +2331,11 @@ restart: * and securely free the stream. */ consumer_del_metadata_stream(stream, metadata_ht); - } else if (revents & (LPOLLIN | LPOLLPRI)) { - /* Get the data out of the metadata file descriptor */ - DBG("Metadata available on fd %d", pollfd); - assert(stream->wait_fd == pollfd); - - do { - health_code_update(); - - len = ctx->on_buffer_ready(stream, ctx); - /* - * We don't check the return value here since if we get - * a negative len, it means an error occured thus we - * simply remove it from the poll set and free the - * stream. - */ - } while (len > 0); - - /* It's ok to have an unavailable sub-buffer */ - if (len < 0 && len != -EAGAIN && len != -ENODATA) { - /* Clean up stream from consumer and free it. */ - lttng_poll_del(&events, stream->wait_fd); - consumer_del_metadata_stream(stream, metadata_ht); - } + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + rcu_read_unlock; + goto end; } - /* Release RCU lock for the stream looked up */ rcu_read_unlock(); } @@ -2789,21 +2800,16 @@ restart: } if (pollfd == ctx->consumer_channel_pipe[0]) { - if (revents & (LPOLLERR | LPOLLHUP)) { - DBG("Channel thread pipe hung up"); - /* - * Remove the pipe from the poll set and continue the loop - * since their might be data to consume. - */ - lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); - continue; - } else if (revents & LPOLLIN) { + if (revents & LPOLLIN) { enum consumer_channel_action action; uint64_t key; ret = read_channel_pipe(ctx, &chan, &key, &action); if (ret <= 0) { - ERR("Error reading channel pipe"); + if (ret < 0) { + ERR("Error reading channel pipe"); + } + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); continue; } @@ -2820,7 +2826,7 @@ restart: rcu_read_unlock(); /* Add channel to the global poll events list */ lttng_poll_add(&events, chan->wait_fd, - LPOLLIN | LPOLLPRI); + LPOLLERR | LPOLLHUP); break; case CONSUMER_CHANNEL_DEL: { @@ -2880,6 +2886,17 @@ restart: ERR("Unknown action"); break; } + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Channel thread pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, ctx->consumer_channel_pipe[0]); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; } /* Handle other stream */ @@ -2918,6 +2935,10 @@ restart: && !uatomic_read(&chan->nb_init_stream_left)) { consumer_del_channel(chan); } + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + rcu_read_unlock(); + goto end; } /* Release RCU lock for the channel looked up */