--- /dev/null
+/*
+ * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the Free
+ * Software Foundation; only version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _LTT_POLL_H
+#define _LTT_POLL_H
+
+#include <string.h>
+#include <unistd.h>
+
+#include <lttng-share.h>
+
+/*
+ * Value taken from the hard limit allowed by the kernel when using setrlimit
+ * with RLIMIT_NOFILE on an Intel i7 CPU and Linux 3.0.3.
+ */
+#define LTTNG_POLL_DEFAULT_SIZE 65535
+
+/*
+ * Maximum number of fd we can monitor.
+ *
+ * For epoll(7), /proc/sys/fs/epoll/max_user_watches (since Linux 2.6.28) will
+ * be used for the maximum size of the poll set. If this interface is not
+ * available, according to the manpage, the max_user_watches value is 1/25 (4%)
+ * of the available low memory divided by the registration cost in bytes which
+ * is 90 bytes on a 32-bit kernel and 160 bytes on a 64-bit kernel.
+ *
+ * For poll(2), the max fds must not exceed RLIMIT_NOFILE given by
+ * getrlimit(2).
+ */
+extern unsigned int poll_max_size;
+
+/*
+ * Used by lttng_poll_clean to free the events structure in a lttng_poll_event.
+ */
+static inline void __lttng_poll_free(void *events)
+{
+ free(events);
+}
+
+/*
+ * epoll(7) implementation.
+ */
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+
+/* See man epoll(7) for this define path */
+#define LTTNG_EPOLL_PROC_PATH "/proc/sys/fs/epoll/max_user_watches"
+
+enum {
+ /* Polling variables compatibility for epoll */
+ LPOLLIN = EPOLLIN,
+ LPOLLPRI = EPOLLPRI,
+ LPOLLOUT = EPOLLOUT,
+ LPOLLRDNORM = EPOLLRDNORM,
+ LPOLLRDBAND = EPOLLRDBAND,
+ LPOLLWRNORM = EPOLLWRNORM,
+ LPOLLWRBAND = EPOLLWRBAND,
+ LPOLLMSG = EPOLLMSG,
+ LPOLLERR = EPOLLERR,
+ LPOLLHUP = EPOLLHUP,
+ LPOLLNVAL = EPOLLHUP,
+ LPOLLRDHUP = EPOLLRDHUP,
+ /* Close on exec feature of epoll */
+ LTTNG_CLOEXEC = EPOLL_CLOEXEC,
+};
+
+struct compat_epoll_event {
+ int epfd;
+ uint32_t nb_fd; /* Current number of fd in events */
+ uint32_t events_size; /* Size of events array */
+ struct epoll_event *events;
+};
+#define lttng_poll_event compat_epoll_event
+
+/*
+ * For the following calls, consider 'e' to be a lttng_poll_event pointer and i
+ * being the index of the events array.
+ */
+#define LTTNG_POLL_GETFD(e, i) LTTNG_REF(e)->events[i].data.fd
+#define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->events[i].events
+#define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->nb_fd
+#define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->events_size
+
+/*
+ * Create the epoll set. No memory allocation is done here.
+ */
+extern int compat_epoll_create(struct lttng_poll_event *events,
+ int size, int flags);
+#define lttng_poll_create(events, size, flags) \
+ compat_epoll_create(events, size, flags);
+
+/*
+ * Wait on epoll set with the number of fd registered to the lttng_poll_event
+ * data structure (events).
+ */
+extern int compat_epoll_wait(struct lttng_poll_event *events, int timeout);
+#define lttng_poll_wait(events, timeout) \
+ compat_epoll_wait(events, timeout);
+
+/*
+ * Add a fd to the epoll set and resize the epoll_event structure if needed.
+ */
+extern int compat_epoll_add(struct lttng_poll_event *events,
+ int fd, uint32_t req_events);
+#define lttng_poll_add(events, fd, req_events) \
+ compat_epoll_add(events, fd, req_events);
+
+/*
+ * Remove a fd from the epoll set.
+ */
+extern int compat_epoll_del(struct lttng_poll_event *events, int fd);
+#define lttng_poll_del(events, fd) \
+ compat_epoll_del(events, fd);
+
+/*
+ * Set up the poll set limits variable poll_max_size
+ */
+extern void compat_epoll_set_max_size(void);
+#define lttng_poll_set_max_size(void) \
+ compat_epoll_set_max_size(void);
+
+/*
+ * This function memset with zero the structure since it can be reused at each
+ * round of a main loop. Being in a loop and using a non static number of fds,
+ * this function must be called to insure coherent events with associted fds.
+ */
+static inline void lttng_poll_reset(struct lttng_poll_event *events)
+{
+ if (events && events->events) {
+ memset(events->events, 0,
+ events->nb_fd * sizeof(struct epoll_event));
+ }
+}
+
+/*
+ * Clean the events structure of a lttng_poll_event. It's the caller
+ * responsability to free the lttng_poll_event memory.
+ */
+static inline void lttng_poll_clean(struct lttng_poll_event *events)
+{
+ if (events) {
+ close(events->epfd);
+ __lttng_poll_free((void *) events->events);
+ }
+}
+
+#else /* HAVE_EPOLL */
+/*
+ * Fallback on poll(2) API
+ */
+
+/* Needed for some poll event values */
+#ifndef __USE_XOPEN
+#define __USE_XOPEN
+#endif
+
+/* Needed for some poll event values */
+#ifndef __USE_GNU
+#define __USE_GNU
+#endif
+
+#include <poll.h>
+#include <stdint.h>
+
+enum {
+ /* Polling variables compatibility for poll */
+ LPOLLIN = POLLIN,
+ LPOLLPRI = POLLPRI,
+ LPOLLOUT = POLLOUT,
+ LPOLLRDNORM = POLLRDNORM,
+ LPOLLRDBAND = POLLRDBAND,
+ LPOLLWRNORM = POLLWRNORM,
+ LPOLLWRBAND = POLLWRBAND,
+ LPOLLMSG = POLLMSG,
+ LPOLLERR = POLLERR,
+ LPOLLHUP = POLLHUP | POLLNVAL,
+ LPOLLRDHUP = POLLRDHUP,
+ /* Close on exec feature does not exist for poll(2) */
+ LTTNG_CLOEXEC = 0xdead,
+};
+
+struct compat_poll_event {
+ uint32_t nb_fd; /* Current number of fd in events */
+ uint32_t events_size; /* Size of events array */
+ struct pollfd *events;
+};
+#define lttng_poll_event compat_poll_event
+
+/*
+ * For the following calls, consider 'e' to be a lttng_poll_event pointer and i
+ * being the index of the events array.
+ */
+#define LTTNG_POLL_GETFD(e, i) LTTNG_REF(e)->events[i].fd
+#define LTTNG_POLL_GETEV(e, i) LTTNG_REF(e)->events[i].revents
+#define LTTNG_POLL_GETNB(e) LTTNG_REF(e)->nb_fd
+#define LTTNG_POLL_GETSZ(e) LTTNG_REF(e)->events_size
+
+/*
+ * Create a pollfd structure of size 'size'.
+ */
+extern int compat_poll_create(struct lttng_poll_event *events, int size);
+#define lttng_poll_create(events, size, flags) \
+ compat_poll_create(events, size);
+
+/*
+ * Wait on poll(2) event with nb_fd registered to the lttng_poll_event data
+ * structure.
+ */
+extern int compat_poll_wait(struct lttng_poll_event *events, int timeout);
+#define lttng_poll_wait(events, timeout) \
+ compat_poll_wait(events, timeout);
+
+/*
+ * Add the fd to the pollfd structure. Resize if needed.
+ */
+extern int compat_poll_add(struct lttng_poll_event *events,
+ int fd, uint32_t req_events);
+#define lttng_poll_add(events, fd, req_events) \
+ compat_poll_add(events, fd, req_events);
+
+/*
+ * Remove the fd from the pollfd. Memory allocation is done to recreate a new
+ * pollfd, data is copied from the old pollfd to the new and, finally, the old
+ * one is freed().
+ */
+extern int compat_poll_del(struct lttng_poll_event *events, int fd);
+#define lttng_poll_del(events, fd) \
+ compat_poll_del(events, fd);
+
+/*
+ * Set up the poll set limits variable poll_max_size
+ */
+extern void compat_poll_set_max_size(void);
+#define lttng_poll_set_max_size(void) \
+ compat_poll_set_max_size(void);
+
+/*
+ * No need to reset a pollfd structure for poll(2)
+ */
+static inline void lttng_poll_reset(struct lttng_poll_event *events)
+{}
+
+/*
+ * Clean the events structure of a lttng_poll_event. It's the caller
+ * responsability to free the lttng_poll_event memory.
+ */
+static inline void lttng_poll_clean(struct lttng_poll_event *events)
+{
+ if (events) {
+ __lttng_poll_free((void *) events->events);
+ }
+}
+
+#endif /* HAVE_EPOLL */
+
+#endif /* _LTT_POLL_H */
#include <getopt.h>
#include <grp.h>
#include <limits.h>
-#include <poll.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <lttng/lttng-kconsumerd.h>
#include <lttngerr.h>
+#include "compat/poll.h"
#include "context.h"
#include "futex.h"
#include "kernel-ctl.h"
static int is_root; /* Set to 1 if the daemon is running as root */
static pid_t ppid; /* Parent PID for --sig-parent option */
static pid_t kconsumerd_pid;
-static struct pollfd *kernel_pollfd;
static int dispatch_thread_exit;
static char apps_unix_sock_path[PATH_MAX]; /* Global application Unix socket path */
static pthread_t dispatch_thread;
static sem_t kconsumerd_sem;
-static pthread_mutex_t kconsumerd_pid_mutex; /* Mutex to control kconsumerd pid assignation */
+
+/* Mutex to control kconsumerd pid assignation */
+static pthread_mutex_t kconsumerd_pid_mutex;
/*
* UST registration command queue. This queue is tied with a futex and uses a N
*/
static struct ltt_session_list *session_list_ptr;
+/*
+ * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
+ */
+static int create_thread_poll_set(struct lttng_poll_event *events,
+ unsigned int size)
+{
+ int ret;
+
+ if (events == NULL || size == 0) {
+ ret = -1;
+ goto error;
+ }
+
+ ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Add quit pipe */
+ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN);
+ if (ret < 0) {
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * Check if the thread quit pipe was triggered.
+ *
+ * Return 1 if it was triggered else 0;
+ */
+static int check_thread_quit_pipe(int fd, uint32_t events)
+{
+ if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
+ return 1;
+ }
+
+ return 0;
+}
+
/*
* Remove modules in reverse load order.
*/
}
/*
- * Init quit pipe.
+ * Init thread quit pipe.
*
* Return -1 on error or 0 if all pipes are created.
*/
*/
static void stop_threads(void)
{
+ int ret;
+
/* Stopping all threads */
DBG("Terminating all threads");
- close(thread_quit_pipe[0]);
- close(thread_quit_pipe[1]);
+ ret = write(thread_quit_pipe[1], "!", 1);
+ if (ret < 0) {
+ ERR("write error on thread quit pipe");
+ }
+
/* Dispatch thread */
dispatch_thread_exit = 1;
futex_nto1_wake(&ust_cmd_queue.futex);
27, 1, 31, 27, 0, 27, 1, 33, 27, 0);
/* </fun> */
- DBG("Removing %s directory", LTTNG_RUNDIR);
- ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR);
- if (ret < 0) {
- ERR("asprintf failed. Something is really wrong!");
- }
+ if (is_root) {
+ DBG("Removing %s directory", LTTNG_RUNDIR);
+ ret = asprintf(&cmd, "rm -rf " LTTNG_RUNDIR);
+ if (ret < 0) {
+ ERR("asprintf failed. Something is really wrong!");
+ }
- /* Remove lttng run directory */
- ret = system(cmd);
- if (ret < 0) {
- ERR("Unable to clean " LTTNG_RUNDIR);
+ /* Remove lttng run directory */
+ ret = system(cmd);
+ if (ret < 0) {
+ ERR("Unable to clean " LTTNG_RUNDIR);
+ }
}
DBG("Cleaning up all session");
DBG("Unloading kernel modules");
modprobe_remove_kernel_modules();
}
+
+ close(thread_quit_pipe[0]);
+ close(thread_quit_pipe[1]);
}
/*
}
/*
- * Update the kernel pollfd set of all channel fd available over all tracing
+ * Update the kernel poll set of all channel fd available over all tracing
* session. Add the wakeup pipe at the end of the set.
*/
-static int update_kernel_pollfd(void)
+static int update_kernel_poll(struct lttng_poll_event *events)
{
- int i = 0;
- /*
- * The wakup pipe and the quit pipe are needed so the number of fds starts
- * at 2 for those pipes.
- */
- unsigned int nb_fd = 2;
+ int ret;
struct ltt_session *session;
struct ltt_kernel_channel *channel;
- DBG("Updating kernel_pollfd");
+ DBG("Updating kernel poll set");
- /* Get the number of channel of all kernel session */
lock_session_list();
cds_list_for_each_entry(session, &session_list_ptr->head, list) {
lock_session(session);
unlock_session(session);
continue;
}
- nb_fd += session->kernel_session->channel_count;
- unlock_session(session);
- }
-
- DBG("Resizing kernel_pollfd to size %d", nb_fd);
- kernel_pollfd = realloc(kernel_pollfd, nb_fd * sizeof(struct pollfd));
- if (kernel_pollfd == NULL) {
- perror("malloc kernel_pollfd");
- goto error;
- }
-
- cds_list_for_each_entry(session, &session_list_ptr->head, list) {
- lock_session(session);
- if (session->kernel_session == NULL) {
- unlock_session(session);
- continue;
- }
- if (i >= nb_fd) {
- ERR("To much channel for kernel_pollfd size");
- unlock_session(session);
- break;
- }
cds_list_for_each_entry(channel, &session->kernel_session->channel_list.head, list) {
- kernel_pollfd[i].fd = channel->fd;
- kernel_pollfd[i].events = POLLIN | POLLRDNORM;
- i++;
+ /* Add channel fd to the kernel poll set */
+ ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM);
+ if (ret < 0) {
+ unlock_session(session);
+ goto error;
+ }
+ DBG("Channel fd %d added to kernel set", channel->fd);
}
unlock_session(session);
}
unlock_session_list();
- /* Adding wake up pipe */
- kernel_pollfd[nb_fd - 2].fd = kernel_poll_pipe[0];
- kernel_pollfd[nb_fd - 2].events = POLLIN;
-
- /* Adding the quit pipe */
- kernel_pollfd[nb_fd - 1].fd = thread_quit_pipe[0];
- kernel_pollfd[nb_fd - 1].events =
- POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI;
-
- return nb_fd;
+ return 0;
error:
unlock_session_list();
session->kernel_session->consumer_fd = kconsumerd_cmd_sock;
}
- cds_list_for_each_entry(channel, &session->kernel_session->channel_list.head, list) {
+ cds_list_for_each_entry(channel,
+ &session->kernel_session->channel_list.head, list) {
if (channel->fd == fd) {
DBG("Channel found, updating kernel streams");
ret = kernel_open_channel_stream(channel);
}
/*
- * Have we already sent fds to the consumer? If yes, it means that
- * tracing is started so it is safe to send our updated stream fds.
+ * Have we already sent fds to the consumer? If yes, it means
+ * that tracing is started so it is safe to send our updated
+ * stream fds.
*/
if (session->kernel_session->kconsumer_fds_sent == 1) {
- ret = send_kconsumerd_channel_fds(session->kernel_session->consumer_fd,
- channel);
+ ret = send_kconsumerd_channel_fds(
+ session->kernel_session->consumer_fd, channel);
if (ret < 0) {
goto error;
}
*/
static void *thread_manage_kernel(void *data)
{
- int ret, i, nb_fd = 0;
+ int ret, i, pollfd, update_poll_flag = 1;
+ uint32_t revents, nb_fd;
char tmp;
- int update_poll_flag = 1;
+ struct lttng_poll_event events;
DBG("Thread manage kernel started");
+ ret = create_thread_poll_set(&events, 2);
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN);
+ if (ret < 0) {
+ goto error;
+ }
+
while (1) {
if (update_poll_flag == 1) {
- nb_fd = update_kernel_pollfd();
- if (nb_fd < 0) {
+ ret = update_kernel_poll(&events);
+ if (ret < 0) {
goto error;
}
update_poll_flag = 0;
}
- DBG("Polling on %d fds", nb_fd);
+ nb_fd = LTTNG_POLL_GETNB(&events);
+
+ DBG("Thread kernel polling on %d fds", nb_fd);
+
+ /* Zeroed the poll events */
+ lttng_poll_reset(&events);
/* Poll infinite value of time */
- ret = poll(kernel_pollfd, nb_fd, -1);
+ ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
- perror("poll kernel thread");
goto error;
} else if (ret == 0) {
/* Should not happen since timeout is infinite */
continue;
}
- /* Thread quit pipe has been closed. Killing thread. */
- if (kernel_pollfd[nb_fd - 1].revents == POLLNVAL) {
- goto error;
- }
-
- DBG("Kernel poll event triggered");
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
- /*
- * Check if the wake up pipe was triggered. If so, the kernel_pollfd
- * must be updated.
- */
- switch (kernel_pollfd[nb_fd - 2].revents) {
- case POLLIN:
- ret = read(kernel_poll_pipe[0], &tmp, 1);
- update_poll_flag = 1;
- continue;
- case POLLERR:
- goto error;
- default:
- break;
- }
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
+ goto error;
+ }
- for (i = 0; i < nb_fd; i++) {
- switch (kernel_pollfd[i].revents) {
- /*
- * New CPU detected by the kernel. Adding kernel stream to kernel
- * session and updating the kernel consumer
- */
- case POLLIN | POLLRDNORM:
- ret = update_kernel_stream(kernel_pollfd[i].fd);
- if (ret < 0) {
- continue;
+ /* Check for data on kernel pipe */
+ if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) {
+ ret = read(kernel_poll_pipe[0], &tmp, 1);
+ update_poll_flag = 1;
+ continue;
+ } else {
+ /*
+ * New CPU detected by the kernel. Adding kernel stream to
+ * kernel session and updating the kernel consumer
+ */
+ if (revents & LPOLLIN) {
+ ret = update_kernel_stream(pollfd);
+ if (ret < 0) {
+ continue;
+ }
+ break;
+ /*
+ * TODO: We might want to handle the LPOLLERR | LPOLLHUP
+ * and unregister kernel stream at this point.
+ */
}
- break;
}
}
}
error:
DBG("Kernel thread dying");
- if (kernel_pollfd) {
- free(kernel_pollfd);
- }
-
close(kernel_poll_pipe[0]);
close(kernel_poll_pipe[1]);
+
+ lttng_poll_clean(&events);
+
return NULL;
}
*/
static void *thread_manage_kconsumerd(void *data)
{
- int sock = 0, ret;
+ int sock = 0, i, ret, pollfd;
+ uint32_t revents, nb_fd;
enum lttcomm_return_code code;
- struct pollfd pollfd[2];
+ struct lttng_poll_event events;
DBG("[thread] Manage kconsumerd started");
goto error;
}
- /* First fd is always the quit pipe */
- pollfd[0].fd = thread_quit_pipe[0];
+ /*
+ * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock.
+ * Nothing more will be added to this poll set.
+ */
+ ret = create_thread_poll_set(&events, 2);
+ if (ret < 0) {
+ goto error;
+ }
- /* Apps socket */
- pollfd[1].fd = kconsumerd_err_sock;
- pollfd[1].events = POLLIN;
+ ret = lttng_poll_add(&events, kconsumerd_err_sock, LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ goto error;
+ }
+
+ nb_fd = LTTNG_POLL_GETNB(&events);
/* Inifinite blocking call, waiting for transmission */
- ret = poll(pollfd, 2, -1);
+ ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
- perror("poll kconsumerd thread");
goto error;
}
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd[0].revents == POLLNVAL) {
- goto error;
- } else if (pollfd[1].revents == POLLERR) {
- ERR("Kconsumerd err socket poll error");
- goto error;
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
+ goto error;
+ }
+
+ /* Event on the registration socket */
+ if (pollfd == kconsumerd_err_sock) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Kconsumerd err socket poll error");
+ goto error;
+ }
+ }
}
sock = lttcomm_accept_unix_sock(kconsumerd_err_sock);
goto error;
}
- /* Kconsumerd err socket */
- pollfd[1].fd = sock;
- pollfd[1].events = POLLIN;
-
- /* Inifinite blocking call, waiting for transmission */
- ret = poll(pollfd, 2, -1);
+ /* Remove the kconsumerd error socket since we have established a connexion */
+ ret = lttng_poll_del(&events, kconsumerd_err_sock);
if (ret < 0) {
- perror("poll kconsumerd thread");
goto error;
}
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd[0].revents == POLLNVAL) {
+ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
goto error;
- } else if (pollfd[1].revents == POLLERR) {
- ERR("Kconsumerd err socket second poll error");
+ }
+
+ /* Update number of fd */
+ nb_fd = LTTNG_POLL_GETNB(&events);
+
+ /* Inifinite blocking call, waiting for transmission */
+ ret = lttng_poll_wait(&events, -1);
+ if (ret < 0) {
goto error;
}
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
+ goto error;
+ }
+
+ /* Event on the kconsumerd socket */
+ if (pollfd == sock) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Kconsumerd err socket second poll error");
+ goto error;
+ }
+ }
+ }
+
/* Wait for any kconsumerd error */
ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code));
if (ret <= 0) {
error:
DBG("Kconsumerd thread dying");
- if (kconsumerd_err_sock) {
- close(kconsumerd_err_sock);
- }
- if (kconsumerd_cmd_sock) {
- close(kconsumerd_cmd_sock);
- }
- if (sock) {
- close(sock);
- }
+ close(kconsumerd_err_sock);
+ close(kconsumerd_cmd_sock);
+ close(sock);
unlink(kconsumerd_err_unix_sock_path);
unlink(kconsumerd_cmd_unix_sock_path);
-
kconsumerd_pid = 0;
- return NULL;
-}
-/*
- * Reallocate the apps command pollfd structure of nb_fd size.
- *
- * The first two fds must be there at all time.
- */
-static int update_apps_cmd_pollfd(unsigned int nb_fd, unsigned int old_nb_fd,
- struct pollfd **pollfd)
-{
- int i, count;
- struct pollfd *old_pollfd = NULL;
+ lttng_poll_clean(&events);
- /* Can't accept pollfd less than 2 */
- if (nb_fd < 2) {
- goto end;
- }
-
- if (*pollfd) {
- /* Save pointer */
- old_pollfd = *pollfd;
- }
-
- *pollfd = malloc(nb_fd * sizeof(struct pollfd));
- if (*pollfd == NULL) {
- perror("malloc manage apps pollfd");
- goto error;
- }
-
- /* First fd is always the quit pipe */
- (*pollfd)[0].fd = thread_quit_pipe[0];
- (*pollfd)[0].events =
- POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI;
-
- /* Apps command pipe */
- (*pollfd)[1].fd = apps_cmd_pipe[0];
- (*pollfd)[1].events = POLLIN;
-
- /* Start count after the two pipes below */
- count = 2;
- for (i = 2; i < old_nb_fd; i++) {
- /* Add to new pollfd */
- if (old_pollfd[i].fd != -1) {
- (*pollfd)[count].fd = old_pollfd[i].fd;
- (*pollfd)[count].events = POLLHUP | POLLNVAL | POLLERR;
- count++;
- }
-
- if (count > nb_fd) {
- ERR("Updating poll fd wrong size");
- goto error;
- }
- }
-
- if (nb_fd < 2) {
- /*
- * There should *always* be at least two fds in the pollfd. This safety
- * check make sure the poll() will actually try on those two pipes at
- * best which are the thread_quit_pipe and apps_cmd_pipe.
- */
- nb_fd = 2;
- MSG("nb_fd < 2 --> Not good! Continuing...");
- }
-
- /* Destroy old pollfd */
- free(old_pollfd);
-
- DBG("Apps cmd pollfd realloc of size %d", nb_fd);
-
-end:
- return 0;
-
-error:
- /* Destroy old pollfd */
- free(old_pollfd);
- return -1;
+ return NULL;
}
/*
*/
static void *thread_manage_apps(void *data)
{
- int i, ret, current_nb_fd;
- unsigned int nb_fd = 2;
- int update_poll_flag = 1;
- struct pollfd *pollfd = NULL;
+ int i, ret, pollfd;
+ uint32_t revents, nb_fd;
struct ust_command ust_cmd;
+ struct lttng_poll_event events;
DBG("[thread] Manage application started");
- ust_cmd.sock = -1;
- current_nb_fd = nb_fd;
+ ret = create_thread_poll_set(&events, 2);
+ if (ret < 0) {
+ goto error;
+ }
- while (1) {
- /* See if we have a valid socket to add to pollfd */
- if (ust_cmd.sock != -1) {
- nb_fd++;
- update_poll_flag = 1;
- }
+ ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ goto error;
+ }
- /* The pollfd struct must be updated */
- if (update_poll_flag) {
- ret = update_apps_cmd_pollfd(nb_fd, current_nb_fd, &pollfd);
- if (ret < 0) {
- /* malloc failed so we quit */
- goto error;
- }
+ while (1) {
+ /* Zeroed the events structure */
+ lttng_poll_reset(&events);
- if (ust_cmd.sock != -1) {
- /* Update pollfd with the new UST socket */
- DBG("Adding sock %d to apps cmd pollfd", ust_cmd.sock);
- pollfd[nb_fd - 1].fd = ust_cmd.sock;
- pollfd[nb_fd - 1].events = POLLHUP | POLLNVAL | POLLERR;
- ust_cmd.sock = -1;
- }
- }
+ nb_fd = LTTNG_POLL_GETNB(&events);
DBG("Apps thread polling on %d fds", nb_fd);
/* Inifinite blocking call, waiting for transmission */
- ret = poll(pollfd, nb_fd, -1);
+ ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
- perror("poll apps thread");
goto error;
}
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd[0].revents == POLLNVAL) {
- goto error;
- } else {
- /* apps_cmd_pipe pipe events */
- switch (pollfd[1].revents) {
- case POLLERR:
- ERR("Apps command pipe poll error");
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
goto error;
- case POLLIN:
- /* Empty pipe */
- ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
- if (ret < 0 || ret < sizeof(ust_cmd)) {
- perror("read apps cmd pipe");
- goto error;
- }
+ }
- /* Register applicaton to the session daemon */
- ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock);
- if (ret < 0) {
- /* Only critical ENOMEM error can be returned here */
+ /* Inspect the apps cmd pipe */
+ if (pollfd == apps_cmd_pipe[0]) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Apps command pipe error");
goto error;
- }
+ } else if (revents & LPOLLIN) {
+ /* Empty pipe */
+ ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd));
+ if (ret < 0 || ret < sizeof(ust_cmd)) {
+ perror("read apps cmd pipe");
+ goto error;
+ }
- ret = ustctl_register_done(ust_cmd.sock);
- if (ret < 0) {
- /*
- * If the registration is not possible, we simply unregister
- * the apps and continue
- */
- unregister_traceable_app(ust_cmd.sock);
+ /* Register applicaton to the session daemon */
+ ret = register_traceable_app(&ust_cmd.reg_msg, ust_cmd.sock);
+ if (ret < 0) {
+ /* Only critical ENOMEM error can be returned here */
+ goto error;
+ }
+
+ ret = ustctl_register_done(ust_cmd.sock);
+ if (ret < 0) {
+ /*
+ * If the registration is not possible, we simply
+ * unregister the apps and continue
+ */
+ unregister_traceable_app(ust_cmd.sock);
+ } else {
+ /*
+ * We just need here to monitor the close of the UST
+ * socket and poll set monitor those by default.
+ */
+ ret = lttng_poll_add(&events, ust_cmd.sock, 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG("Apps with sock %d added to poll set", ust_cmd.sock);
+ }
+ break;
}
- break;
- }
- }
+ } else {
+ /*
+ * At this point, we know that a registered application made the
+ * event at poll_wait.
+ */
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ /* Removing from the poll set */
+ ret = lttng_poll_del(&events, pollfd);
+ if (ret < 0) {
+ goto error;
+ }
- current_nb_fd = nb_fd;
- for (i = 2; i < current_nb_fd; i++) {
- /* Apps socket is closed/hungup */
- switch (pollfd[i].revents) {
- case POLLERR:
- case POLLHUP:
- case POLLNVAL:
- /* Pipe closed */
- unregister_traceable_app(pollfd[i].fd);
- /* Indicate to remove this fd from the pollfd */
- pollfd[i].fd = -1;
- nb_fd--;
- break;
+ /* Socket closed */
+ unregister_traceable_app(pollfd);
+ break;
+ }
}
}
-
- if (nb_fd != current_nb_fd) {
- update_poll_flag = 1;
- }
}
error:
close(apps_cmd_pipe[0]);
close(apps_cmd_pipe[1]);
- free(pollfd);
+ lttng_poll_clean(&events);
return NULL;
}
*/
static void *thread_registration_apps(void *data)
{
- int sock = 0, ret;
- struct pollfd pollfd[2];
+ int sock = 0, i, ret, pollfd;
+ uint32_t revents, nb_fd;
+ struct lttng_poll_event events;
/*
* Get allocated in this thread, enqueued to a global queue, dequeued and
* freed in the manage apps thread.
goto error;
}
- /* First fd is always the quit pipe */
- pollfd[0].fd = thread_quit_pipe[0];
- pollfd[0].events =
- POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI;
+ /*
+ * Pass 2 as size here for the thread quit pipe and apps socket. Nothing
+ * more will be added to this poll set.
+ */
+ ret = create_thread_poll_set(&events, 2);
+ if (ret < 0) {
+ goto error;
+ }
- /* Apps socket */
- pollfd[1].fd = apps_sock;
- pollfd[1].events = POLLIN;
+ /* Add the application registration socket */
+ ret = lttng_poll_add(&events, apps_sock, LPOLLIN | LPOLLRDHUP);
+ if (ret < 0) {
+ goto error;
+ }
/* Notify all applications to register */
ret = notify_ust_apps(1);
while (1) {
DBG("Accepting application registration");
+ nb_fd = LTTNG_POLL_GETNB(&events);
+
/* Inifinite blocking call, waiting for transmission */
- ret = poll(pollfd, 2, -1);
+ ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
- perror("poll register apps thread");
goto error;
}
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd[0].revents == POLLNVAL) {
- goto error;
- }
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
- switch (pollfd[1].revents) {
- case POLLNVAL:
- case POLLHUP:
- case POLLRDHUP:
- case POLLERR:
- ERR("Register apps socket poll error");
- goto error;
- case POLLIN:
- sock = lttcomm_accept_unix_sock(apps_sock);
- if (sock < 0) {
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
goto error;
}
- /* Create UST registration command for enqueuing */
- ust_cmd = malloc(sizeof(struct ust_command));
- if (ust_cmd == NULL) {
- perror("ust command malloc");
- goto error;
- }
+ /* Event on the registration socket */
+ if (pollfd == apps_sock) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Register apps socket poll error");
+ goto error;
+ } else if (revents & LPOLLIN) {
+ sock = lttcomm_accept_unix_sock(apps_sock);
+ if (sock < 0) {
+ goto error;
+ }
- /*
- * Using message-based transmissions to ensure we don't have to deal
- * with partially received messages.
- */
- ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
- sizeof(struct ust_register_msg));
- if (ret < 0 || ret < sizeof(struct ust_register_msg)) {
- if (ret < 0) {
- perror("lttcomm_recv_unix_sock register apps");
- } else {
- ERR("Wrong size received on apps register");
- }
- free(ust_cmd);
- close(sock);
- continue;
- }
+ /* Create UST registration command for enqueuing */
+ ust_cmd = malloc(sizeof(struct ust_command));
+ if (ust_cmd == NULL) {
+ perror("ust command malloc");
+ goto error;
+ }
- ust_cmd->sock = sock;
+ /*
+ * Using message-based transmissions to ensure we don't
+ * have to deal with partially received messages.
+ */
+ ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
+ sizeof(struct ust_register_msg));
+ if (ret < 0 || ret < sizeof(struct ust_register_msg)) {
+ if (ret < 0) {
+ perror("lttcomm_recv_unix_sock register apps");
+ } else {
+ ERR("Wrong size received on apps register");
+ }
+ free(ust_cmd);
+ close(sock);
+ continue;
+ }
- DBG("UST registration received with pid:%d ppid:%d uid:%d"
- " gid:%d sock:%d name:%s (version %d.%d)",
- ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
- ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
- ust_cmd->sock, ust_cmd->reg_msg.name,
- ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
- /*
- * Lock free enqueue the registration request.
- * The red pill has been taken! This apps will be part of the *system*
- */
- cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node);
+ ust_cmd->sock = sock;
- /*
- * Wake the registration queue futex.
- * Implicit memory barrier with the exchange in cds_wfq_enqueue.
- */
- futex_nto1_wake(&ust_cmd_queue.futex);
- break;
+ DBG("UST registration received with pid:%d ppid:%d uid:%d"
+ " gid:%d sock:%d name:%s (version %d.%d)",
+ ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid,
+ ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid,
+ ust_cmd->sock, ust_cmd->reg_msg.name,
+ ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor);
+ /*
+ * Lock free enqueue the registration request. The red pill
+ * has been taken! This apps will be part of the *system* :)
+ */
+ cds_wfq_enqueue(&ust_cmd_queue.queue, &ust_cmd->node);
+
+ /*
+ * Wake the registration queue futex. Implicit memory
+ * barrier with the exchange in cds_wfq_enqueue.
+ */
+ futex_nto1_wake(&ust_cmd_queue.futex);
+ }
+ }
}
}
close(apps_sock);
close(sock);
-
unlink(apps_unix_sock_path);
+ lttng_poll_clean(&events);
+
return NULL;
}
}
/*
- * Notify kernel thread to update it's pollfd.
+ * Notify kernel thread to update it's poll set.
*/
-static int notify_kernel_pollfd(void)
+static int notify_kernel_channels_update(void)
{
int ret;
switch (cmd_ctx->lsm->domain.type) {
case LTTNG_DOMAIN_KERNEL:
- kchan = trace_kernel_get_channel_by_name(cmd_ctx->lsm->u.enable.channel_name,
+ kchan = trace_kernel_get_channel_by_name(
+ cmd_ctx->lsm->u.enable.channel_name,
cmd_ctx->session->kernel_session);
if (kchan == NULL) {
/* Channel not found, creating it */
- DBG("Creating kernel channel");
+ DBG("Creating kernel channel %s",
+ cmd_ctx->lsm->u.enable.channel_name);
ret = kernel_create_channel(cmd_ctx->session->kernel_session,
&cmd_ctx->lsm->u.channel.chan,
}
/* Notify kernel thread that there is a new channel */
- ret = notify_kernel_pollfd();
+ ret = notify_kernel_channels_update();
if (ret < 0) {
ret = LTTCOMM_FATAL;
goto error;
ret = LTTCOMM_FATAL;
goto error;
}
+
+ ret = notify_kernel_channels_update();
+ if (ret < 0) {
+ ret = LTTCOMM_FATAL;
+ goto error;
+ }
}
kevent = trace_kernel_get_event_by_name(cmd_ctx->lsm->u.enable.event.name, kchan);
ret = LTTCOMM_FATAL;
goto error;
}
+
+ ret = notify_kernel_channels_update();
+ if (ret < 0) {
+ ret = LTTCOMM_FATAL;
+ goto error;
+ }
}
/* For each event in the kernel session */
}
/*
- * Must notify the kernel thread here to update it's pollfd in order to
- * remove the channel(s)' fd just destroyed.
+ * Must notify the kernel thread here to update it's poll setin order
+ * to remove the channel(s)' fd just destroyed.
*/
- ret = notify_kernel_pollfd();
+ ret = notify_kernel_channels_update();
if (ret < 0) {
ret = LTTCOMM_FATAL;
goto error;
*/
static void *thread_manage_clients(void *data)
{
- int sock = 0, ret;
+ int sock = 0, ret, i, pollfd;
+ uint32_t revents, nb_fd;
struct command_ctx *cmd_ctx = NULL;
- struct pollfd pollfd[2];
+ struct lttng_poll_event events;
DBG("[thread] Manage client started");
goto error;
}
- /* First fd is always the quit pipe */
- pollfd[0].fd = thread_quit_pipe[0];
- pollfd[0].events =
- POLLHUP | POLLNVAL | POLLERR | POLLIN | POLLRDHUP | POLLPRI;
+ /*
+ * Pass 2 as size here for the thread quit pipe and client_sock. Nothing
+ * more will be added to this poll set.
+ */
+ ret = create_thread_poll_set(&events, 2);
+ if (ret < 0) {
+ goto error;
+ }
- /* Apps socket */
- pollfd[1].fd = client_sock;
- pollfd[1].events = POLLIN;
+ /* Add the application registration socket */
+ ret = lttng_poll_add(&events, client_sock, LPOLLIN | LPOLLPRI);
+ if (ret < 0) {
+ goto error;
+ }
/*
* Notify parent pid that we are ready to accept command for client side.
while (1) {
DBG("Accepting client command ...");
+ nb_fd = LTTNG_POLL_GETNB(&events);
+
/* Inifinite blocking call, waiting for transmission */
- ret = poll(pollfd, 2, -1);
+ ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
- perror("poll client thread");
goto error;
}
- /* Thread quit pipe has been closed. Killing thread. */
- if (pollfd[0].revents == POLLNVAL) {
+ for (i = 0; i < nb_fd; i++) {
+ /* Fetch once the poll data */
+ revents = LTTNG_POLL_GETEV(&events, i);
+ pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Thread quit pipe has been closed. Killing thread. */
+ ret = check_thread_quit_pipe(pollfd, revents);
+ if (ret) {
+ goto error;
+ }
+
+ /* Event on the registration socket */
+ if (pollfd == client_sock) {
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Client socket poll error");
+ goto error;
+ }
+ }
+ }
+
+ DBG("Wait for client response");
+
+ sock = lttcomm_accept_unix_sock(client_sock);
+ if (sock < 0) {
goto error;
}
- switch (pollfd[1].revents) {
- case POLLNVAL:
- case POLLHUP:
- case POLLERR:
- ERR("Client socket poll error");
+ /* Allocate context command to process the client request */
+ cmd_ctx = malloc(sizeof(struct command_ctx));
+ if (cmd_ctx == NULL) {
+ perror("malloc cmd_ctx");
goto error;
- case POLLIN:
- sock = lttcomm_accept_unix_sock(client_sock);
- if (sock < 0) {
- goto error;
- }
+ }
- /* Allocate context command to process the client request */
- cmd_ctx = malloc(sizeof(struct command_ctx));
+ /* Allocate data buffer for reception */
+ cmd_ctx->lsm = malloc(sizeof(struct lttcomm_session_msg));
+ if (cmd_ctx->lsm == NULL) {
+ perror("malloc cmd_ctx->lsm");
+ goto error;
+ }
- /* Allocate data buffer for reception */
- cmd_ctx->lsm = malloc(sizeof(struct lttcomm_session_msg));
- cmd_ctx->llm = NULL;
- cmd_ctx->session = NULL;
+ cmd_ctx->llm = NULL;
+ cmd_ctx->session = NULL;
- /*
- * Data is received from the lttng client. The struct
- * lttcomm_session_msg (lsm) contains the command and data request of
- * the client.
- */
- DBG("Receiving data from client ...");
- ret = lttcomm_recv_unix_sock(sock, cmd_ctx->lsm, sizeof(struct lttcomm_session_msg));
- if (ret <= 0) {
- continue;
- }
+ /*
+ * Data is received from the lttng client. The struct
+ * lttcomm_session_msg (lsm) contains the command and data request of
+ * the client.
+ */
+ DBG("Receiving data from client ...");
+ ret = lttcomm_recv_unix_sock(sock, cmd_ctx->lsm,
+ sizeof(struct lttcomm_session_msg));
+ if (ret <= 0) {
+ DBG("Nothing recv() from client... continuing");
+ close(sock);
+ free(cmd_ctx);
+ continue;
+ }
- // TODO: Validate cmd_ctx including sanity check for security purpose.
+ // TODO: Validate cmd_ctx including sanity check for
+ // security purpose.
+ /*
+ * This function dispatch the work to the kernel or userspace tracer
+ * libs and fill the lttcomm_lttng_msg data structure of all the needed
+ * informations for the client. The command context struct contains
+ * everything this function may needs.
+ */
+ ret = process_client_msg(cmd_ctx);
+ if (ret < 0) {
/*
- * This function dispatch the work to the kernel or userspace tracer
- * libs and fill the lttcomm_lttng_msg data structure of all the needed
- * informations for the client. The command context struct contains
- * everything this function may needs.
+ * TODO: Inform client somehow of the fatal error. At
+ * this point, ret < 0 means that a malloc failed
+ * (ENOMEM). Error detected but still accept command.
*/
- ret = process_client_msg(cmd_ctx);
- if (ret < 0) {
- /* TODO: Inform client somehow of the fatal error. At this point,
- * ret < 0 means that a malloc failed (ENOMEM). */
- /* Error detected but still accept command */
- clean_command_ctx(&cmd_ctx);
- continue;
- }
-
- DBG("Sending response (size: %d, retcode: %d)",
- cmd_ctx->lttng_msg_size, cmd_ctx->llm->ret_code);
- ret = send_unix_sock(sock, cmd_ctx->llm, cmd_ctx->lttng_msg_size);
- if (ret < 0) {
- ERR("Failed to send data back to client");
- }
-
clean_command_ctx(&cmd_ctx);
+ continue;
+ }
- /* End of transmission */
- close(sock);
- break;
+ DBG("Sending response (size: %d, retcode: %d)",
+ cmd_ctx->lttng_msg_size, cmd_ctx->llm->ret_code);
+ ret = send_unix_sock(sock, cmd_ctx->llm,
+ cmd_ctx->lttng_msg_size);
+ if (ret < 0) {
+ ERR("Failed to send data back to client");
}
- }
-error:
- DBG("Client thread dying");
- if (client_sock) {
- close(client_sock);
- }
- if (sock) {
+ clean_command_ctx(&cmd_ctx);
+
+ /* End of transmission */
close(sock);
}
+error:
+ DBG("Client thread dying");
unlink(client_unix_sock_path);
+ close(client_sock);
+ close(sock);
+ lttng_poll_clean(&events);
clean_command_ctx(&cmd_ctx);
return NULL;
}
*/
session_list_ptr = get_session_list();
+ /* Set up max poll set size */
+ lttng_poll_set_max_size();
+
/* Create thread to manage the client socket */
ret = pthread_create(&client_thread, NULL,
thread_manage_clients, (void *) NULL);