From: David Goulet Date: Sun, 14 Aug 2011 21:24:40 +0000 (-0400) Subject: Rename and export lib kernel consumer X-Git-Tag: v2.0-pre12~26 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=6533b585a3a53a0b52c2da14baec5e874d1bf3bb;p=lttng-tools.git Rename and export lib kernel consumer The old liblttkconsumerd is renamed to liblttngkconsumerd and exported publicly having the public header "lttng/lttng-kconsumerd.h". To link with this library, use -llttngkconsumerd. Every public functions now begins with the "lttng_" prefix to fit the lttng public namespace. Comments are changed also to fit 80 lines standard and the rest of the git tree comments. This complete renaming was motivated by the new tool made by Julien Desfossez called 'lttngtop' which uses the kernel consumer library to create a custom consumer. More works will add consumers in the lttv and babeltrace projects. Please see lttng.org to learn about lttngtop. Signed-off-by: David Goulet --- diff --git a/Makefile.am b/Makefile.am index df3067e02..ec8b7fe95 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,7 +2,7 @@ ACLOCAL_AMFLAGS = -I config SUBDIRS = liblttng-sessiond-comm \ libkernelctl \ - liblttkconsumerd \ + liblttngkconsumerd \ liblttngctl \ libustcomm \ libustctl \ diff --git a/README b/README index afdbf2311..16e1154bd 100644 --- a/README +++ b/README @@ -56,11 +56,11 @@ PACKAGE CONTENTS: - libkernelctl Kernel tracer control and ioctl definitions. - - liblttkconsumerd + - liblttngkconsumerd Library for Kernel trace consumer. - ltt-kconsumerd - The Kernel consumer daemon which uses liblttkconsumerd. + The Kernel consumer daemon which uses liblttngkconsumerd. - ltt-sessiond The LTTng session daemon binary. diff --git a/configure.ac b/configure.ac index 16d3dd6db..066c4e20e 100644 --- a/configure.ac +++ b/configure.ac @@ -43,7 +43,7 @@ AC_CONFIG_FILES([ Makefile include/Makefile libkernelctl/Makefile - liblttkconsumerd/Makefile + liblttngkconsumerd/Makefile liblttngctl/Makefile liblttng-sessiond-comm/Makefile libustctl/Makefile diff --git a/include/Makefile.am b/include/Makefile.am index dc3f0b553..44d6672f0 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1,4 +1,4 @@ -lttnginclude_HEADERS = lttng/lttng.h +lttnginclude_HEADERS = lttng/lttng.h lttng/lttng-kconsumerd.h -noinst_HEADERS = lttngerr.h lttng-kernel.h lttng-kconsumerd.h lttng-share.h \ - lttng-sessiond-comm.h +noinst_HEADERS = lttngerr.h lttng-kernel.h ltt-kconsumerd.h lttng-share.h \ + lttng-sessiond-comm.h diff --git a/include/ltt-kconsumerd.h b/include/ltt-kconsumerd.h new file mode 100644 index 000000000..95c500524 --- /dev/null +++ b/include/ltt-kconsumerd.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only verion 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTT_KCONSUMERD_H +#define _LTT_KCONSUMERD_H + +/* Kernel consumer path */ +#define KCONSUMERD_PATH LTTNG_RUNDIR "/kconsumerd" +#define KCONSUMERD_CMD_SOCK_PATH KCONSUMERD_PATH "/command" +#define KCONSUMERD_ERR_SOCK_PATH KCONSUMERD_PATH "/error" + +#endif /* _LTT_KCONSUMERD_H */ diff --git a/include/lttng-kconsumerd.h b/include/lttng-kconsumerd.h deleted file mode 100644 index ec3a9e2b5..000000000 --- a/include/lttng-kconsumerd.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * David Goulet - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only verion 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#ifndef _LTTNG_KCONSUMERD_H -#define _LTTNG_KCONSUMERD_H - -#include -#include "lttng-share.h" - -/* Kernel consumer path */ -#define KCONSUMERD_PATH LTTNG_RUNDIR "/kconsumerd" -#define KCONSUMERD_CMD_SOCK_PATH KCONSUMERD_PATH "/command" -#define KCONSUMERD_ERR_SOCK_PATH KCONSUMERD_PATH "/error" - -/* Commands for kconsumerd */ -enum kconsumerd_command { - ADD_STREAM, - UPDATE_STREAM, /* pause, delete, active depending on fd state */ - STOP, /* inform the kconsumerd to quit when all fd has hang up */ -}; - -/* State of each fd in consumerd */ -enum kconsumerd_fd_state { - ACTIVE_FD, - PAUSE_FD, - DELETE_FD, -}; - -#endif /* _LTTNG_KCONSUMERD_H */ diff --git a/include/lttng/lttng-kconsumerd.h b/include/lttng/lttng-kconsumerd.h new file mode 100644 index 000000000..8d16d5670 --- /dev/null +++ b/include/lttng/lttng-kconsumerd.h @@ -0,0 +1,184 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#ifndef _LTTNG_KCONSUMERD_H +#define _LTTNG_KCONSUMERD_H + +#include +#include + +/* + * When the receiving thread dies, we need to have a way to make the polling + * thread exit eventually. If all FDs hang up (normal case when the + * ltt-sessiond stops), we can exit cleanly, but if there is a problem and for + * whatever reason some FDs remain open, the consumer should still exit + * eventually. + * + * If the timeout is reached, it means that during this period no events + * occurred on the FDs so we need to force an exit. This case should not happen + * but it is a safety to ensure we won't block the consumer indefinitely. + * + * The value of 2 seconds is an arbitrary choice. + */ +#define LTTNG_KCONSUMERD_POLL_GRACE_PERIOD 2000 + +/* Commands for kconsumerd */ +enum lttng_kconsumerd_command { + ADD_STREAM, + UPDATE_STREAM, /* pause, delete, active depending on fd state */ + STOP, /* inform the kconsumerd to quit when all fd has hang up */ +}; + +/* State of each fd in consumerd */ +enum lttng_kconsumerd_fd_state { + ACTIVE_FD, + PAUSE_FD, + DELETE_FD, +}; + +struct lttng_kconsumerd_fd_list { + struct cds_list_head head; +}; + +/* + * Internal representation of the FDs, sessiond_fd is used to identify uniquely + * a fd + */ +struct lttng_kconsumerd_fd { + struct cds_list_head list; + int sessiond_fd; /* used to identify uniquely a fd with sessiond */ + int consumerd_fd; /* fd to consume */ + int out_fd; /* output file to write the data */ + off_t out_fd_offset; /* write position in the output file descriptor */ + char path_name[PATH_MAX]; /* tracefile name */ + enum lttng_kconsumerd_fd_state state; + unsigned long max_sb_size; /* the subbuffer size for this channel */ + void *mmap_base; + size_t mmap_len; + enum lttng_event_output output; /* splice or mmap */ +}; + +/* + * Kernel consumer local data to the program. + */ +struct lttng_kconsumerd_local_data { + /* function to call when data is available on a buffer */ + int (*on_buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd); + /* socket to communicate errors with sessiond */ + int kconsumerd_error_socket; + /* socket to exchange commands with sessiond */ + char *kconsumerd_command_sock_path; + /* communication with splice */ + int kconsumerd_thread_pipe[2]; + /* pipe to wake the poll thread when necessary */ + int kconsumerd_poll_pipe[2]; + /* to let the signal handler wake up the fd receiver thread */ + int kconsumerd_should_quit[2]; +}; + +/* + * Initialise the necessary environnement: + * - create a new context + * - create the poll_pipe + * - create the should_quit pipe (for signal handler) + * - create the thread pipe (for splice) + * + * Takes a function pointer as argument, this function is called when data is + * available on a buffer. This function is responsible to do the + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the + * buffer configuration and then kernctl_put_next_subbuf at the end. + * + * Returns a pointer to the new context or NULL on error. + */ +extern struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)); + +/* + * Close all fds associated with the instance and free the context. + */ +extern void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data *ctx); + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written. + */ +extern int lttng_kconsumerd_on_read_subbuffer_mmap( + struct lttng_kconsumerd_local_data *ctx, + struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len); + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +extern int lttng_kconsumerd_on_read_subbuffer_splice( + struct lttng_kconsumerd_local_data *ctx, + struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len); + +/* + * Send return code to session daemon. + * + * Returns the return code of sendmsg : the number of bytes transmitted or -1 + * on error. + */ +extern int lttng_kconsumerd_send_error( + struct lttng_kconsumerd_local_data *ctx, int cmd); + +/* + * Poll on the should_quit pipe and the command socket return -1 on error and + * should exit, 0 if data is available on the command socket. + */ +extern int lttng_kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll); + +/* + * This thread polls the fds in the ltt_fd_list to consume the data and write + * it to tracefile if necessary. + */ +extern void *lttng_kconsumerd_thread_poll_fds(void *data); + +/* + * This thread listens on the consumerd socket and receives the file + * descriptors from ltt-sessiond. + */ +extern void *lttng_kconsumerd_thread_receive_fds(void *data); + +/* + * Called from signal handler to ensure a clean exit. + */ +extern void lttng_kconsumerd_should_exit( + struct lttng_kconsumerd_local_data *ctx); + +/* + * Cleanup the daemon's socket on exit. + */ +extern void lttng_kconsumerd_cleanup(void); + +/* + * Set the error socket for communication with a session daemon. + */ +extern void lttng_kconsumerd_set_error_sock( + struct lttng_kconsumerd_local_data *ctx, int sock); + +/* + * Set the command socket path for communication with a session daemon. + */ +extern void lttng_kconsumerd_set_command_sock_path( + struct lttng_kconsumerd_local_data *ctx, char *sock); + +#endif /* _LTTNG_KCONSUMERD_H */ diff --git a/include/lttng/lttng.h b/include/lttng/lttng.h index 1bdb66c61..0811d594d 100644 --- a/include/lttng/lttng.h +++ b/include/lttng/lttng.h @@ -298,7 +298,7 @@ extern const char *lttng_get_readable_code(int code); * domain. No consumer will be spawned and all fds/commands will go through the * socket path given (socket_path). * - * NOTE: At the moment, if you use the liblttkconsumerd, you can only use the + * NOTE: At the moment, if you use the liblttngkconsumerd, you can only use the * command socket. The error socket is not supported yet for roaming consumers. */ extern int lttng_register_consumer(struct lttng_handle *handle, diff --git a/liblttkconsumerd/Makefile.am b/liblttkconsumerd/Makefile.am deleted file mode 100644 index a685221c8..000000000 --- a/liblttkconsumerd/Makefile.am +++ /dev/null @@ -1,9 +0,0 @@ -AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/liblttkconsumerd -I$(top_srcdir)/libkernelctl - -lib_LTLIBRARIES = liblttkconsumerd.la - -liblttkconsumerd_la_SOURCES = lttkconsumerd.c lttkconsumerd.h - -liblttkconsumerd_la_LIBADD = \ - $(top_builddir)/libkernelctl/libkernelctl.la \ - $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la diff --git a/liblttkconsumerd/lttkconsumerd.c b/liblttkconsumerd/lttkconsumerd.c deleted file mode 100644 index 29f1f95d5..000000000 --- a/liblttkconsumerd/lttkconsumerd.c +++ /dev/null @@ -1,989 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#define _GNU_SOURCE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "kernelctl.h" -#include "lttkconsumerd.h" -#include "lttngerr.h" - -static -struct kconsumerd_global_data { - /* - * kconsumerd_data.lock protects kconsumerd_data.fd_list, - * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It - * ensures the count matches the number of items in the fd_list. - * It ensures the list updates *always* trigger an fd_array - * update (therefore need to make list update vs - * kconsumerd_data.need_update flag update atomic, and also flag - * read, fd array and flag clear atomic). - */ - pthread_mutex_t lock; - /* - * Number of element for the list below. Protected by - * kconsumerd_data.lock. - */ - unsigned int fds_count; - /* - * List of FDs. Protected by kconsumerd_data.lock. - */ - struct kconsumerd_fd_list fd_list; - /* - * Flag specifying if the local array of FDs needs update in the - * poll function. Protected by kconsumerd_data.lock. - */ - unsigned int need_update; -} kconsumerd_data = { - .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), - .fds_count = 0, - .need_update = 1, -}; - -/* timeout parameter, to control the polling thread grace period */ -static int kconsumerd_poll_timeout = -1; - -/* - * flag to inform the polling thread to quit when all fd hung up. - * Updated by the kconsumerd_thread_receive_fds when it notices that all - * fds has hung up. Also updated by the signal handler - * (kconsumerd_should_exit()). Read by the polling threads. - */ -static volatile int kconsumerd_quit = 0; - -/* - * kconsumerd_set_error_socket - * - * Set the error socket - */ -void kconsumerd_set_error_socket(struct kconsumerd_local_data *ctx, int sock) -{ - ctx->kconsumerd_error_socket = sock; -} - -/* - * kconsumerd_set_command_socket_path - * - * Set the command socket path - */ -void kconsumerd_set_command_socket_path(struct kconsumerd_local_data *ctx, - char *sock) -{ - ctx->kconsumerd_command_sock_path = sock; -} - -/* - * kconsumerd_find_session_fd - * - * Find a session fd in the global list. - * The kconsumerd_data.lock must be locked during this call - * - * Return 1 if found else 0 - */ -static int kconsumerd_find_session_fd(int fd) -{ - struct kconsumerd_fd *iter; - - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->sessiond_fd == fd) { - DBG("Duplicate session fd %d", fd); - return 1; - } - } - - return 0; -} - -/* - * kconsumerd_del_fd - * - * Remove a fd from the global list protected by a mutex - */ -static void kconsumerd_del_fd(struct kconsumerd_fd *lcf) -{ - int ret; - pthread_mutex_lock(&kconsumerd_data.lock); - cds_list_del(&lcf->list); - if (kconsumerd_data.fds_count > 0) { - kconsumerd_data.fds_count--; - if (lcf != NULL) { - if (lcf->mmap_base != NULL) { - ret = munmap(lcf->mmap_base, lcf->mmap_len); - if (ret != 0) { - perror("munmap"); - } - } - if (lcf->out_fd != 0) { - close(lcf->out_fd); - } - close(lcf->consumerd_fd); - free(lcf); - lcf = NULL; - } - } - kconsumerd_data.need_update = 1; - pthread_mutex_unlock(&kconsumerd_data.lock); -} - -/* - * kconsumerd_add_fd - * - * Add a fd to the global list protected by a mutex - */ -static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd) -{ - struct kconsumerd_fd *tmp_fd; - int ret = 0; - - pthread_mutex_lock(&kconsumerd_data.lock); - /* Check if already exist */ - ret = kconsumerd_find_session_fd(buf->fd); - if (ret == 1) { - goto end; - } - - tmp_fd = malloc(sizeof(struct kconsumerd_fd)); - tmp_fd->sessiond_fd = buf->fd; - tmp_fd->consumerd_fd = consumerd_fd; - tmp_fd->state = buf->state; - tmp_fd->max_sb_size = buf->max_sb_size; - tmp_fd->out_fd = 0; - tmp_fd->out_fd_offset = 0; - tmp_fd->mmap_len = 0; - tmp_fd->mmap_base = NULL; - tmp_fd->output = buf->output; - strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); - tmp_fd->path_name[PATH_MAX - 1] = '\0'; - - /* Opening the tracefile in write mode */ - if (tmp_fd->path_name != NULL) { - ret = open(tmp_fd->path_name, - O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); - if (ret < 0) { - ERR("Opening %s", tmp_fd->path_name); - perror("open"); - goto end; - } - tmp_fd->out_fd = ret; - DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, - tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); - } - - if (tmp_fd->output == LTTNG_EVENT_MMAP) { - /* get the len of the mmap region */ - ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_len"); - goto end; - } - - tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, - PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0); - if (tmp_fd->mmap_base == MAP_FAILED) { - perror("Error mmaping"); - ret = -1; - goto end; - } - } - - cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); - kconsumerd_data.fds_count++; - kconsumerd_data.need_update = 1; -end: - pthread_mutex_unlock(&kconsumerd_data.lock); - return ret; -} - -/* - * kconsumerd_change_fd_state - * - * Update a fd according to what we just received - */ -static void kconsumerd_change_fd_state(int sessiond_fd, - enum kconsumerd_fd_state state) -{ - struct kconsumerd_fd *iter; - - pthread_mutex_lock(&kconsumerd_data.lock); - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->sessiond_fd == sessiond_fd) { - iter->state = state; - break; - } - } - kconsumerd_data.need_update = 1; - pthread_mutex_unlock(&kconsumerd_data.lock); -} - -/* - * kconsumerd_update_poll_array - * - * Allocate the pollfd structure and the local view of the out fds - * to avoid doing a lookup in the linked list and concurrency issues - * when writing is needed. - * Returns the number of fds in the structures - * Called with kconsumerd_data.lock held. - */ -static int kconsumerd_update_poll_array(struct kconsumerd_local_data *ctx, - struct pollfd **pollfd, struct kconsumerd_fd **local_kconsumerd_fd) -{ - struct kconsumerd_fd *iter; - int i = 0; - - DBG("Updating poll fd array"); - cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { - if (iter->state == ACTIVE_FD) { - DBG("Active FD %d", iter->consumerd_fd); - (*pollfd)[i].fd = iter->consumerd_fd; - (*pollfd)[i].events = POLLIN | POLLPRI; - local_kconsumerd_fd[i] = iter; - i++; - } - } - - /* - * insert the kconsumerd_poll_pipe at the end of the array and don't - * increment i so nb_fd is the number of real FD - */ - (*pollfd)[i].fd = ctx->kconsumerd_poll_pipe[0]; - (*pollfd)[i].events = POLLIN; - return i; -} - - -/* - * kconsumerd_on_read_subbuffer_mmap - * - * mmap the ring buffer, read it and write the data to the tracefile. - * Returns the number of bytes written - */ -int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data *ctx, - struct kconsumerd_fd *kconsumerd_fd, unsigned long len) -{ - unsigned long mmap_offset; - char *padding = NULL; - long ret = 0; - off_t orig_offset = kconsumerd_fd->out_fd_offset; - int fd = kconsumerd_fd->consumerd_fd; - int outfd = kconsumerd_fd->out_fd; - - /* get the offset inside the fd to mmap */ - ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); - if (ret != 0) { - ret = errno; - perror("kernctl_get_mmap_read_offset"); - goto end; - } - - while (len > 0) { - ret = write(outfd, kconsumerd_fd->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - ret = errno; - perror("Error in file write"); - goto end; - } - /* This won't block, but will start writeout asynchronously */ - sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - kconsumerd_fd->out_fd_offset += ret; - } - - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); - - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } - goto end; - -end: - if (padding != NULL) { - free(padding); - } - return ret; -} - -/* - * kconsumerd_on_read_subbuffer - * - * Splice the data from the ring buffer to the tracefile. - * Returns the number of bytes spliced - */ -int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data *ctx, - struct kconsumerd_fd *kconsumerd_fd, unsigned long len) -{ - long ret = 0; - loff_t offset = 0; - off_t orig_offset = kconsumerd_fd->out_fd_offset; - int fd = kconsumerd_fd->consumerd_fd; - int outfd = kconsumerd_fd->out_fd; - - while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, ctx->kconsumerd_thread_pipe[1], NULL, len, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %ld", ret); - if (ret < 0) { - ret = errno; - perror("Error in relay splice"); - goto splice_error; - } - - ret = splice(ctx->kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %ld", ret); - if (ret < 0) { - ret = errno; - perror("Error in file splice"); - goto splice_error; - } - if (ret >= len) { - len = 0; - } - /* This won't block, but will start writeout asynchronously */ - sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - kconsumerd_fd->out_fd_offset += ret; - } - - /* - * This does a blocking write-and-wait on any page that belongs to the - * subbuffer prior to the one we just wrote. - * Don't care about error values, as these are just hints and ways to - * limit the amount of page cache used. - */ - if (orig_offset >= kconsumerd_fd->max_sb_size) { - sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE - | SYNC_FILE_RANGE_WRITE - | SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, - kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); - } - goto end; - -splice_error: - /* send the appropriate error description to sessiond */ - switch(ret) { - case EBADF: - kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EBADF); - break; - case EINVAL: - kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EINVAL); - break; - case ENOMEM: - kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ENOMEM); - break; - case ESPIPE: - kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ESPIPE); - break; - } - -end: - return ret; -} - -/* - * kconsumerd_poll_socket - * - * Poll on the should_quit pipe and the command socket - * return -1 on error and should exit, 0 if data is - * available on the command socket - */ -int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) -{ - int num_rdy; - - num_rdy = poll(kconsumerd_sockpoll, 2, -1); - if (num_rdy == -1) { - perror("Poll error"); - goto exit; - } - if (kconsumerd_sockpoll[0].revents == POLLIN) { - DBG("kconsumerd_should_quit wake up"); - goto exit; - } - return 0; - -exit: - return -1; -} - -/* - * kconsumerd_consumerd_recv_fd - * - * Receives an array of file descriptors and the associated - * structures describing each fd (path name). - * Returns the size of received data - */ -static int kconsumerd_consumerd_recv_fd(struct kconsumerd_local_data *ctx, - int sfd, struct pollfd *kconsumerd_sockpoll, int size, - enum kconsumerd_command cmd_type) -{ - struct iovec iov[1]; - int ret = 0, i, tmp2; - struct cmsghdr *cmsg; - int nb_fd; - char recv_fd[CMSG_SPACE(sizeof(int))]; - struct lttcomm_kconsumerd_msg lkm; - - /* the number of fds we are about to receive */ - nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); - - /* - * nb_fd is the number of fds we receive. One fd per recvmsg. - */ - for (i = 0; i < nb_fd; i++) { - struct msghdr msg = { 0 }; - - /* Prepare to receive the structures */ - iov[0].iov_base = &lkm; - iov[0].iov_len = sizeof(lkm); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - msg.msg_control = recv_fd; - msg.msg_controllen = sizeof(recv_fd); - - DBG("Waiting to receive fd"); - if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - - if ((ret = recvmsg(sfd, &msg, 0)) < 0) { - perror("recvmsg"); - continue; - } - - if (ret != (size / nb_fd)) { - ERR("Received only %d, expected %d", ret, size); - kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); - goto end; - } - - cmsg = CMSG_FIRSTHDR(&msg); - if (!cmsg) { - ERR("Invalid control message header"); - ret = -1; - kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); - goto end; - } - - /* if we received fds */ - if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { - switch (cmd_type) { - case ADD_STREAM: - DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]); - ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); - if (ret < 0) { - kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); - goto end; - } - break; - case UPDATE_STREAM: - kconsumerd_change_fd_state(lkm.fd, lkm.state); - break; - default: - break; - } - /* signal the poll thread */ - tmp2 = write(ctx->kconsumerd_poll_pipe[1], "4", 1); - if (tmp2 < 0) { - perror("write kconsumerd poll"); - } - } else { - ERR("Didn't received any fd"); - kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); - ret = -1; - goto end; - } - } - -end: - return ret; -} - -/* - * kconsumerd_thread_poll_fds - * - * This thread polls the fds in the ltt_fd_list to consume the data - * and write it to tracefile if necessary. - */ -void *kconsumerd_thread_poll_fds(void *data) -{ - int num_rdy, num_hup, high_prio, ret, i; - struct pollfd *pollfd = NULL; - /* local view of the fds */ - struct kconsumerd_fd **local_kconsumerd_fd = NULL; - /* local view of kconsumerd_data.fds_count */ - int nb_fd = 0; - char tmp; - int tmp2; - struct kconsumerd_local_data *ctx = data; - - - local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd)); - - while (1) { - high_prio = 0; - num_hup = 0; - - /* - * the ltt_fd_list has been updated, we need to update our - * local array as well - */ - pthread_mutex_lock(&kconsumerd_data.lock); - if (kconsumerd_data.need_update) { - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_kconsumerd_fd != NULL) { - free(local_kconsumerd_fd); - local_kconsumerd_fd = NULL; - } - - /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); - if (pollfd == NULL) { - perror("pollfd malloc"); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - - /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ - local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * - sizeof(struct kconsumerd_fd)); - if (local_kconsumerd_fd == NULL) { - perror("local_kconsumerd_fd malloc"); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - ret = kconsumerd_update_poll_array(ctx, &pollfd, local_kconsumerd_fd); - if (ret < 0) { - ERR("Error in allocating pollfd or local_outfds"); - kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR); - pthread_mutex_unlock(&kconsumerd_data.lock); - goto end; - } - nb_fd = ret; - kconsumerd_data.need_update = 0; - } - pthread_mutex_unlock(&kconsumerd_data.lock); - - /* poll on the array of fds */ - DBG("polling on %d fd", nb_fd + 1); - num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout); - DBG("poll num_rdy : %d", num_rdy); - if (num_rdy == -1) { - perror("Poll error"); - kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR); - goto end; - } else if (num_rdy == 0) { - DBG("Polling thread timed out"); - goto end; - } - - /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */ - if (nb_fd == 0 && kconsumerd_quit == 1) { - goto end; - } - - /* - * If the kconsumerd_poll_pipe triggered poll go - * directly to the beginning of the loop to update the - * array. We want to prioritize array update over - * low-priority reads. - */ - if (pollfd[nb_fd].revents == POLLIN) { - DBG("kconsumerd_poll_pipe wake up"); - tmp2 = read(ctx->kconsumerd_poll_pipe[0], &tmp, 1); - if (tmp2 < 0) { - perror("read kconsumerd poll"); - } - continue; - } - - /* Take care of high priority channels first. */ - for (i = 0; i < nb_fd; i++) { - switch(pollfd[i].revents) { - case POLLERR: - ERR("Error returned in polling fd %d.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLHUP: - DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLNVAL: - ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); - kconsumerd_del_fd(local_kconsumerd_fd[i]); - num_hup++; - break; - case POLLPRI: - DBG("Urgent read on fd %d", pollfd[i].fd); - high_prio = 1; - ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]); - /* it's ok to have an unavailable sub-buffer */ - if (ret == EAGAIN) { - ret = 0; - } - break; - } - } - - /* If every buffer FD has hung up, we end the read loop here */ - if (nb_fd > 0 && num_hup == nb_fd) { - DBG("every buffer FD has hung up\n"); - if (kconsumerd_quit == 1) { - goto end; - } - continue; - } - - /* Take care of low priority channels. */ - if (high_prio == 0) { - for (i = 0; i < nb_fd; i++) { - if (pollfd[i].revents == POLLIN) { - DBG("Normal read on fd %d", pollfd[i].fd); - ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]); - /* it's ok to have an unavailable subbuffer */ - if (ret == EAGAIN) { - ret = 0; - } - } - } - } - } -end: - DBG("polling thread exiting"); - if (pollfd != NULL) { - free(pollfd); - pollfd = NULL; - } - if (local_kconsumerd_fd != NULL) { - free(local_kconsumerd_fd); - local_kconsumerd_fd = NULL; - } - return NULL; -} - -/* - * kconsumerd_create - * - * initialise the necessary environnement : - * - create a new context - * - create the poll_pipe - * - create the should_quit pipe (for signal handler) - * - create the thread pipe (for splice) - * Takes a function pointer as argument, this function is called when data is - * available on a buffer. This function is responsible to do the - * kernctl_get_next_subbuf, read the data with mmap or splice depending on the - * buffer configuration and then kernctl_put_next_subbuf at the end. - * Returns a pointer to the new context or NULL on error. - */ -struct kconsumerd_local_data *kconsumerd_create( - int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd)) -{ - int ret; - struct kconsumerd_local_data *ctx; - - ctx = malloc(sizeof(struct kconsumerd_local_data)); - if (ctx == NULL) { - perror("allocating context"); - goto end; - } - - ctx->on_buffer_ready = buffer_ready; - - ret = pipe(ctx->kconsumerd_poll_pipe); - if (ret < 0) { - perror("Error creating poll pipe"); - ctx = NULL; - goto end; - } - - ret = pipe(ctx->kconsumerd_should_quit); - if (ret < 0) { - perror("Error creating recv pipe"); - ctx = NULL; - goto end; - } - - ret = pipe(ctx->kconsumerd_thread_pipe); - if (ret < 0) { - perror("Error creating thread pipe"); - ctx = NULL; - goto end; - } - -end: - return ctx; -} - -/* - * kconsumerd_destroy - * - * Close all fds associated with the instance and free the context - */ -void kconsumerd_destroy(struct kconsumerd_local_data *ctx) -{ - close(ctx->kconsumerd_error_socket); - close(ctx->kconsumerd_thread_pipe[0]); - close(ctx->kconsumerd_thread_pipe[1]); - close(ctx->kconsumerd_poll_pipe[0]); - close(ctx->kconsumerd_poll_pipe[1]); - close(ctx->kconsumerd_should_quit[0]); - close(ctx->kconsumerd_should_quit[1]); - unlink(ctx->kconsumerd_command_sock_path); - free(ctx); - ctx = NULL; -} - -/* - * kconsumerd_thread_receive_fds - * - * This thread listens on the consumerd socket and - * receives the file descriptors from ltt-sessiond - */ -void *kconsumerd_thread_receive_fds(void *data) -{ - int sock, client_socket, ret; - struct lttcomm_kconsumerd_header tmp; - /* - * structure to poll for incoming data on communication socket - * avoids making blocking sockets - */ - struct pollfd kconsumerd_sockpoll[2]; - struct kconsumerd_local_data *ctx = data; - - - DBG("Creating command socket %s", ctx->kconsumerd_command_sock_path); - unlink(ctx->kconsumerd_command_sock_path); - client_socket = lttcomm_create_unix_sock(ctx->kconsumerd_command_sock_path); - if (client_socket < 0) { - ERR("Cannot create command socket"); - goto end; - } - - ret = lttcomm_listen_unix_sock(client_socket); - if (ret < 0) { - goto end; - } - - DBG("Sending ready command to ltt-sessiond"); - ret = kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY); - if (ret < 0) { - ERR("Error sending ready command to ltt-sessiond"); - goto end; - } - - ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); - if (ret < 0) { - perror("fcntl O_NONBLOCK"); - goto end; - } - - /* prepare the FDs to poll : to client socket and the should_quit pipe */ - kconsumerd_sockpoll[0].fd = ctx->kconsumerd_should_quit[0]; - kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; - kconsumerd_sockpoll[1].fd = client_socket; - kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; - - if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - DBG("Connection on client_socket"); - - /* Blocking call, waiting for transmission */ - sock = lttcomm_accept_unix_sock(client_socket); - if (sock <= 0) { - WARN("On accept"); - goto end; - } - ret = fcntl(sock, F_SETFL, O_NONBLOCK); - if (ret < 0) { - perror("fcntl O_NONBLOCK"); - goto end; - } - - /* update the polling structure to poll on the established socket */ - kconsumerd_sockpoll[1].fd = sock; - kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; - - while (1) { - if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { - goto end; - } - DBG("Incoming fds on sock"); - - /* We first get the number of fd we are about to receive */ - ret = lttcomm_recv_unix_sock(sock, &tmp, - sizeof(struct lttcomm_kconsumerd_header)); - if (ret <= 0) { - ERR("Communication interrupted on command socket"); - goto end; - } - if (tmp.cmd_type == STOP) { - DBG("Received STOP command"); - goto end; - } - if (kconsumerd_quit) { - DBG("kconsumerd_thread_receive_fds received quit from signal"); - goto end; - } - - /* we received a command to add or update fds */ - ret = kconsumerd_consumerd_recv_fd(ctx, sock, kconsumerd_sockpoll, - tmp.payload_size, tmp.cmd_type); - if (ret < 0) { - ERR("Receiving the FD, exiting"); - goto end; - } - DBG("received fds on sock"); - } - -end: - DBG("kconsumerd_thread_receive_fds exiting"); - - /* - * when all fds have hung up, the polling thread - * can exit cleanly - */ - kconsumerd_quit = 1; - - /* - * 2s of grace period, if no polling events occur during - * this period, the polling thread will exit even if there - * are still open FDs (should not happen, but safety mechanism). - */ - kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD; - - /* wake up the polling thread */ - ret = write(ctx->kconsumerd_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("poll pipe write"); - } - return NULL; -} - -/* - * kconsumerd_cleanup - * - * Close all the tracefiles and stream fds, should be called when all - * instances are destroyed. - */ -void kconsumerd_cleanup(void) -{ - struct kconsumerd_fd *iter, *tmp; - - /* - * close all outfd. Called when there are no more threads - * running (after joining on the threads), no need to protect - * list iteration with mutex. - */ - cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) { - kconsumerd_del_fd(iter); - } -} - -/* - * kconsumerd_should_exit - * - * Called from signal handler. - */ -void kconsumerd_should_exit(struct kconsumerd_local_data *ctx) -{ - int ret; - kconsumerd_quit = 1; - ret = write(ctx->kconsumerd_should_quit[1], "4", 1); - if (ret < 0) { - perror("write kconsumerd quit"); - } -} - -/* - * kconsumerd_send_error - * - * send return code to ltt-sessiond - */ -int kconsumerd_send_error(struct kconsumerd_local_data *ctx, enum lttcomm_return_code cmd) -{ - if (ctx->kconsumerd_error_socket > 0) { - return lttcomm_send_unix_sock(ctx->kconsumerd_error_socket, &cmd, - sizeof(enum lttcomm_sessiond_command)); - } - - return 0; -} diff --git a/liblttkconsumerd/lttkconsumerd.h b/liblttkconsumerd/lttkconsumerd.h deleted file mode 100644 index 1dd85d509..000000000 --- a/liblttkconsumerd/lttkconsumerd.h +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright (C) 2011 - Julien Desfossez - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - */ - -#ifndef _LIBLTTKCONSUMERD_H -#define _LIBLTTKCONSUMERD_H - -#include -#include "lttng-kconsumerd.h" - -/* - * When the receiving thread dies, we need to have a way to make - * the polling thread exit eventually. - * If all FDs hang up (normal case when the ltt-sessiond stops), - * we can exit cleanly, but if there is a problem and for whatever - * reason some FDs remain open, the consumer should still exit eventually. - * - * If the timeout is reached, it means that during this period - * no events occurred on the FDs so we need to force an exit. - * This case should not happen but it is a safety to ensure we won't block - * the consumer indefinitely. - * - * The value of 2 seconds is an arbitrary choice. - */ -#define KCONSUMERD_POLL_GRACE_PERIOD 2000 - -struct kconsumerd_fd_list { - struct cds_list_head head; -}; - -/* - * Internal representation of the FDs, - * sessiond_fd is used to identify uniquely a fd - */ -struct kconsumerd_fd { - struct cds_list_head list; - int sessiond_fd; /* used to identify uniquely a fd with sessiond */ - int consumerd_fd; /* fd to consume */ - int out_fd; /* output file to write the data */ - off_t out_fd_offset; /* write position in the output file descriptor */ - char path_name[PATH_MAX]; /* tracefile name */ - enum kconsumerd_fd_state state; - unsigned long max_sb_size; /* the subbuffer size for this channel */ - void *mmap_base; - size_t mmap_len; - enum lttng_event_output output; /* splice or mmap */ -}; - -struct kconsumerd_local_data { - /* function to call when data is available on a buffer */ - int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd); - /* socket to communicate errors with sessiond */ - int kconsumerd_error_socket; - /* socket to exchange commands with sessiond */ - char *kconsumerd_command_sock_path; - /* communication with splice */ - int kconsumerd_thread_pipe[2]; - /* pipe to wake the poll thread when necessary */ - int kconsumerd_poll_pipe[2]; - /* to let the signal handler wake up the fd receiver thread */ - int kconsumerd_should_quit[2]; -}; - -/* - * kconsumerd_create - * initialise the necessary environnement : - * - create a new context - * - create the poll_pipe - * - create the should_quit pipe (for signal handler) - * - create the thread pipe (for splice) - * Takes a function pointer as argument, this function is called when data is - * available on a buffer. This function is responsible to do the - * kernctl_get_next_subbuf, read the data with mmap or splice depending on the - * buffer configuration and then kernctl_put_next_subbuf at the end. - * Returns a pointer to the new context or NULL on error. - */ -struct kconsumerd_local_data *kconsumerd_create( - int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd)); - -/* - * kconsumerd_destroy - * Close all fds associated with the instance and free the context - */ -void kconsumerd_destroy(struct kconsumerd_local_data *ctx); - -/* - * kconsumerd_on_read_subbuffer_mmap - * mmap the ring buffer, read it and write the data to the tracefile. - * Returns the number of bytes written - */ -int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data *ctx, - struct kconsumerd_fd *kconsumerd_fd, unsigned long len); - -/* - * kconsumerd_on_read_subbuffer - * - * Splice the data from the ring buffer to the tracefile. - * Returns the number of bytes spliced - */ -int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data *ctx, - struct kconsumerd_fd *kconsumerd_fd, unsigned long len); - -/* - * kconsumerd_send_error - * send return code to ltt-sessiond - * returns the return code of sendmsg : the number of bytes transmitted - * or -1 on error. - */ -int kconsumerd_send_error(struct kconsumerd_local_data *ctx, - enum lttcomm_return_code cmd); - -/* - * kconsumerd_poll_socket - * Poll on the should_quit pipe and the command socket - * return -1 on error and should exit, 0 if data is - * available on the command socket - */ -int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll); - -/* - * kconsumerd_thread_poll_fds - * This thread polls the fds in the ltt_fd_list to consume the data - * and write it to tracefile if necessary. - */ -void *kconsumerd_thread_poll_fds(void *data); - -/* - * kconsumerd_thread_receive_fds - * This thread listens on the consumerd socket and - * receives the file descriptors from ltt-sessiond - */ -void *kconsumerd_thread_receive_fds(void *data); - -/* - * kconsumerd_should_exit - * Called from signal handler to ensure a clean exit - */ -void kconsumerd_should_exit(struct kconsumerd_local_data *ctx); - -/* - * kconsumerd_cleanup - * Cleanup the daemon's socket on exit - */ -void kconsumerd_cleanup(void); - -/* - * kconsumerd_set_error_socket - * Set the error socket for communication with a session daemon - */ -void kconsumerd_set_error_socket(struct kconsumerd_local_data *ctx, int sock); - -/* - * kconsumerd_set_command_socket_path - * Set the command socket path for communication with a session daemon - */ -void kconsumerd_set_command_socket_path(struct kconsumerd_local_data *ctx, char *sock); - -#endif /* _LIBLTTKCONSUMERD_H */ diff --git a/liblttngkconsumerd/Makefile.am b/liblttngkconsumerd/Makefile.am new file mode 100644 index 000000000..edd005930 --- /dev/null +++ b/liblttngkconsumerd/Makefile.am @@ -0,0 +1,9 @@ +AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/libkernelctl + +lib_LTLIBRARIES = liblttngkconsumerd.la + +liblttngkconsumerd_la_SOURCES = lttngkconsumerd.c + +liblttngkconsumerd_la_LIBADD = \ + $(top_builddir)/libkernelctl/libkernelctl.la \ + $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la diff --git a/liblttngkconsumerd/lttngkconsumerd.c b/liblttngkconsumerd/lttngkconsumerd.c new file mode 100644 index 000000000..ba26026c4 --- /dev/null +++ b/liblttngkconsumerd/lttngkconsumerd.c @@ -0,0 +1,965 @@ +/* + * Copyright (C) 2011 - Julien Desfossez + * Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; only version 2 + * of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "kernelctl.h" +#include "lttngerr.h" +#include "lttng-sessiond-comm.h" + +static struct lttng_kconsumerd_global_data { + /* + * kconsumerd_data.lock protects kconsumerd_data.fd_list, + * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It ensures + * the count matches the number of items in the fd_list. It ensures the + * list updates *always* trigger an fd_array update (therefore need to make + * list update vs kconsumerd_data.need_update flag update atomic, and also + * flag read, fd array and flag clear atomic). + */ + pthread_mutex_t lock; + /* + * Number of element for the list below. Protected by kconsumerd_data.lock. + */ + unsigned int fds_count; + /* + * List of FDs. Protected by kconsumerd_data.lock. + */ + struct lttng_kconsumerd_fd_list fd_list; + /* + * Flag specifying if the local array of FDs needs update in the poll + * function. Protected by kconsumerd_data.lock. + */ + unsigned int need_update; +} kconsumerd_data = { + .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head), + .fds_count = 0, + .need_update = 1, +}; + +/* timeout parameter, to control the polling thread grace period. */ +static int kconsumerd_poll_timeout = -1; + +/* + * Flag to inform the polling thread to quit when all fd hung up. Updated by + * the kconsumerd_thread_receive_fds when it notices that all fds has hung up. + * Also updated by the signal handler (kconsumerd_should_exit()). Read by the + * polling threads. + */ +static volatile int kconsumerd_quit = 0; + +/* + * Find a session fd in the global list. The kconsumerd_data.lock must be + * locked during this call. + * + * Return 1 if found else 0. + */ +static int kconsumerd_find_session_fd(int fd) +{ + struct lttng_kconsumerd_fd *iter; + + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + if (iter->sessiond_fd == fd) { + DBG("Duplicate session fd %d", fd); + return 1; + } + } + + return 0; +} + +/* + * Remove a fd from the global list protected by a mutex. + */ +static void kconsumerd_del_fd(struct lttng_kconsumerd_fd *lcf) +{ + int ret; + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_del(&lcf->list); + if (kconsumerd_data.fds_count > 0) { + kconsumerd_data.fds_count--; + if (lcf != NULL) { + if (lcf->mmap_base != NULL) { + ret = munmap(lcf->mmap_base, lcf->mmap_len); + if (ret != 0) { + perror("munmap"); + } + } + if (lcf->out_fd != 0) { + close(lcf->out_fd); + } + close(lcf->consumerd_fd); + free(lcf); + lcf = NULL; + } + } + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); +} + +/* + * Add a fd to the global list protected by a mutex. + */ +static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, + int consumerd_fd) +{ + struct lttng_kconsumerd_fd *tmp_fd; + int ret = 0; + + pthread_mutex_lock(&kconsumerd_data.lock); + /* Check if already exist */ + ret = kconsumerd_find_session_fd(buf->fd); + if (ret == 1) { + goto end; + } + + tmp_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); + tmp_fd->sessiond_fd = buf->fd; + tmp_fd->consumerd_fd = consumerd_fd; + tmp_fd->state = buf->state; + tmp_fd->max_sb_size = buf->max_sb_size; + tmp_fd->out_fd = 0; + tmp_fd->out_fd_offset = 0; + tmp_fd->mmap_len = 0; + tmp_fd->mmap_base = NULL; + tmp_fd->output = buf->output; + strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX); + tmp_fd->path_name[PATH_MAX - 1] = '\0'; + + /* Opening the tracefile in write mode */ + if (tmp_fd->path_name != NULL) { + ret = open(tmp_fd->path_name, + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + ERR("Opening %s", tmp_fd->path_name); + perror("open"); + goto end; + } + tmp_fd->out_fd = ret; + DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name, + tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd); + } + + if (tmp_fd->output == LTTNG_EVENT_MMAP) { + /* get the len of the mmap region */ + ret = kernctl_get_mmap_len(tmp_fd->consumerd_fd, &tmp_fd->mmap_len); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_len"); + goto end; + } + + tmp_fd->mmap_base = mmap(NULL, tmp_fd->mmap_len, + PROT_READ, MAP_PRIVATE, tmp_fd->consumerd_fd, 0); + if (tmp_fd->mmap_base == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto end; + } + } + + cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head); + kconsumerd_data.fds_count++; + kconsumerd_data.need_update = 1; +end: + pthread_mutex_unlock(&kconsumerd_data.lock); + return ret; +} + +/* + * Update a fd according to what we just received. + */ +static void kconsumerd_change_fd_state(int sessiond_fd, + enum lttng_kconsumerd_fd_state state) +{ + struct lttng_kconsumerd_fd *iter; + + pthread_mutex_lock(&kconsumerd_data.lock); + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + if (iter->sessiond_fd == sessiond_fd) { + iter->state = state; + break; + } + } + kconsumerd_data.need_update = 1; + pthread_mutex_unlock(&kconsumerd_data.lock); +} + +/* + * Allocate the pollfd structure and the local view of the out fds to avoid + * doing a lookup in the linked list and concurrency issues when writing is + * needed. Called with kconsumerd_data.lock held. + * + * Returns the number of fds in the structures. + */ +static int kconsumerd_update_poll_array( + struct lttng_kconsumerd_local_data *ctx, struct pollfd **pollfd, + struct lttng_kconsumerd_fd **local_kconsumerd_fd) +{ + struct lttng_kconsumerd_fd *iter; + int i = 0; + + DBG("Updating poll fd array"); + cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) { + if (iter->state == ACTIVE_FD) { + DBG("Active FD %d", iter->consumerd_fd); + (*pollfd)[i].fd = iter->consumerd_fd; + (*pollfd)[i].events = POLLIN | POLLPRI; + local_kconsumerd_fd[i] = iter; + i++; + } + } + + /* + * Insert the kconsumerd_poll_pipe at the end of the array and don't + * increment i so nb_fd is the number of real FD. + */ + (*pollfd)[i].fd = ctx->kconsumerd_poll_pipe[0]; + (*pollfd)[i].events = POLLIN; + return i; +} + +/* + * Receives an array of file descriptors and the associated structures + * describing each fd (path name). + * + * Returns the size of received data + */ +static int kconsumerd_consumerd_recv_fd( + struct lttng_kconsumerd_local_data *ctx, int sfd, + struct pollfd *kconsumerd_sockpoll, int size, + enum lttng_kconsumerd_command cmd_type) +{ + struct iovec iov[1]; + int ret = 0, i, tmp2; + struct cmsghdr *cmsg; + int nb_fd; + char recv_fd[CMSG_SPACE(sizeof(int))]; + struct lttcomm_kconsumerd_msg lkm; + + /* the number of fds we are about to receive */ + nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg); + + /* + * nb_fd is the number of fds we receive. One fd per recvmsg. + */ + for (i = 0; i < nb_fd; i++) { + struct msghdr msg = { 0 }; + + /* Prepare to receive the structures */ + iov[0].iov_base = &lkm; + iov[0].iov_len = sizeof(lkm); + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + msg.msg_control = recv_fd; + msg.msg_controllen = sizeof(recv_fd); + + DBG("Waiting to receive fd"); + if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + + if ((ret = recvmsg(sfd, &msg, 0)) < 0) { + perror("recvmsg"); + continue; + } + + if (ret != (size / nb_fd)) { + ERR("Received only %d, expected %d", ret, size); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); + goto end; + } + + cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg) { + ERR("Invalid control message header"); + ret = -1; + lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); + goto end; + } + + /* if we received fds */ + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + switch (cmd_type) { + case ADD_STREAM: + DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, + ((int *) CMSG_DATA(cmsg))[0]); + + ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]); + if (ret < 0) { + lttng_kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR); + goto end; + } + break; + case UPDATE_STREAM: + kconsumerd_change_fd_state(lkm.fd, lkm.state); + break; + default: + break; + } + /* signal the poll thread */ + tmp2 = write(ctx->kconsumerd_poll_pipe[1], "4", 1); + if (tmp2 < 0) { + perror("write kconsumerd poll"); + } + } else { + ERR("Didn't received any fd"); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD); + ret = -1; + goto end; + } + } + +end: + return ret; +} + +/* + * Set the error socket. + */ +void lttng_kconsumerd_set_error_sock( + struct lttng_kconsumerd_local_data *ctx, int sock) +{ + ctx->kconsumerd_error_socket = sock; +} + +/* + * Set the command socket path. + */ + +void lttng_kconsumerd_set_command_sock_path( + struct lttng_kconsumerd_local_data *ctx, char *sock) +{ + ctx->kconsumerd_command_sock_path = sock; +} + +/* + * Mmap the ring buffer, read it and write the data to the tracefile. + * + * Returns the number of bytes written + */ +int lttng_kconsumerd_on_read_subbuffer_mmap( + struct lttng_kconsumerd_local_data *ctx, + struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len) +{ + unsigned long mmap_offset; + char *padding = NULL; + long ret = 0; + off_t orig_offset = kconsumerd_fd->out_fd_offset; + int fd = kconsumerd_fd->consumerd_fd; + int outfd = kconsumerd_fd->out_fd; + + /* get the offset inside the fd to mmap */ + ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); + if (ret != 0) { + ret = errno; + perror("kernctl_get_mmap_read_offset"); + goto end; + } + + while (len > 0) { + ret = write(outfd, kconsumerd_fd->mmap_base + mmap_offset, len); + if (ret >= len) { + len = 0; + } else if (ret < 0) { + ret = errno; + perror("Error in file write"); + goto end; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + kconsumerd_fd->out_fd_offset += ret; + } + + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset >= kconsumerd_fd->max_sb_size) { + sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near future after + * we write it. + * + * We need to call fadvise again after the file grows because the + * kernel does not seem to apply fadvise to non-existing parts of the + * file. + * + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not well + * defined. So it can be expected to lead to lower throughput in + * streaming. + */ + posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); + } + goto end; + +end: + if (padding != NULL) { + free(padding); + } + return ret; +} + +/* + * Splice the data from the ring buffer to the tracefile. + * + * Returns the number of bytes spliced. + */ +int lttng_kconsumerd_on_read_subbuffer_splice( + struct lttng_kconsumerd_local_data *ctx, + struct lttng_kconsumerd_fd *kconsumerd_fd, unsigned long len) +{ + long ret = 0; + loff_t offset = 0; + off_t orig_offset = kconsumerd_fd->out_fd_offset; + int fd = kconsumerd_fd->consumerd_fd; + int outfd = kconsumerd_fd->out_fd; + + while (len > 0) { + DBG("splice chan to pipe offset %lu (fd : %d)", + (unsigned long)offset, fd); + ret = splice(fd, &offset, ctx->kconsumerd_thread_pipe[1], NULL, len, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice chan to pipe ret %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in relay splice"); + goto splice_error; + } + + ret = splice(ctx->kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret, + SPLICE_F_MOVE | SPLICE_F_MORE); + DBG("splice pipe to file %ld", ret); + if (ret < 0) { + ret = errno; + perror("Error in file splice"); + goto splice_error; + } + if (ret >= len) { + len = 0; + } + /* This won't block, but will start writeout asynchronously */ + sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + kconsumerd_fd->out_fd_offset += ret; + } + + /* + * This does a blocking write-and-wait on any page that belongs to the + * subbuffer prior to the one we just wrote. + * Don't care about error values, as these are just hints and ways to + * limit the amount of page cache used. + */ + if (orig_offset >= kconsumerd_fd->max_sb_size) { + sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, + SYNC_FILE_RANGE_WAIT_BEFORE + | SYNC_FILE_RANGE_WRITE + | SYNC_FILE_RANGE_WAIT_AFTER); + /* + * Give hints to the kernel about how we access the file: + * POSIX_FADV_DONTNEED : we won't re-access data in a near future after + * we write it. + * + * We need to call fadvise again after the file grows because the + * kernel does not seem to apply fadvise to non-existing parts of the + * file. + * + * Call fadvise _after_ having waited for the page writeback to + * complete because the dirty page writeback semantic is not well + * defined. So it can be expected to lead to lower throughput in + * streaming. + */ + posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size, + kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED); + } + goto end; + +splice_error: + /* send the appropriate error description to sessiond */ + switch(ret) { + case EBADF: + lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EBADF); + break; + case EINVAL: + lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EINVAL); + break; + case ENOMEM: + lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ENOMEM); + break; + case ESPIPE: + lttng_kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ESPIPE); + break; + } + +end: + return ret; +} + +/* + * Poll on the should_quit pipe and the command socket return -1 on error and + * should exit, 0 if data is available on the command socket + */ +int lttng_kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll) +{ + int num_rdy; + + num_rdy = poll(kconsumerd_sockpoll, 2, -1); + if (num_rdy == -1) { + perror("Poll error"); + goto exit; + } + if (kconsumerd_sockpoll[0].revents == POLLIN) { + DBG("kconsumerd_should_quit wake up"); + goto exit; + } + return 0; + +exit: + return -1; +} + +/* + * This thread polls the fds in the ltt_fd_list to consume the data and write + * it to tracefile if necessary. + */ +void *lttng_kconsumerd_thread_poll_fds(void *data) +{ + int num_rdy, num_hup, high_prio, ret, i; + struct pollfd *pollfd = NULL; + /* local view of the fds */ + struct lttng_kconsumerd_fd **local_kconsumerd_fd = NULL; + /* local view of kconsumerd_data.fds_count */ + int nb_fd = 0; + char tmp; + int tmp2; + struct lttng_kconsumerd_local_data *ctx = data; + + + local_kconsumerd_fd = malloc(sizeof(struct lttng_kconsumerd_fd)); + + while (1) { + high_prio = 0; + num_hup = 0; + + /* + * the ltt_fd_list has been updated, we need to update our + * local array as well + */ + pthread_mutex_lock(&kconsumerd_data.lock); + if (kconsumerd_data.need_update) { + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_kconsumerd_fd != NULL) { + free(local_kconsumerd_fd); + local_kconsumerd_fd = NULL; + } + + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ + pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd)); + if (pollfd == NULL) { + perror("pollfd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); + goto end; + } + + /* allocate for all fds + 1 for the kconsumerd_poll_pipe */ + local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) * + sizeof(struct lttng_kconsumerd_fd)); + if (local_kconsumerd_fd == NULL) { + perror("local_kconsumerd_fd malloc"); + pthread_mutex_unlock(&kconsumerd_data.lock); + goto end; + } + ret = kconsumerd_update_poll_array(ctx, &pollfd, local_kconsumerd_fd); + if (ret < 0) { + ERR("Error in allocating pollfd or local_outfds"); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR); + pthread_mutex_unlock(&kconsumerd_data.lock); + goto end; + } + nb_fd = ret; + kconsumerd_data.need_update = 0; + } + pthread_mutex_unlock(&kconsumerd_data.lock); + + /* poll on the array of fds */ + DBG("polling on %d fd", nb_fd + 1); + num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout); + DBG("poll num_rdy : %d", num_rdy); + if (num_rdy == -1) { + perror("Poll error"); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR); + goto end; + } else if (num_rdy == 0) { + DBG("Polling thread timed out"); + goto end; + } + + /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */ + if (nb_fd == 0 && kconsumerd_quit == 1) { + goto end; + } + + /* + * If the kconsumerd_poll_pipe triggered poll go + * directly to the beginning of the loop to update the + * array. We want to prioritize array update over + * low-priority reads. + */ + if (pollfd[nb_fd].revents == POLLIN) { + DBG("kconsumerd_poll_pipe wake up"); + tmp2 = read(ctx->kconsumerd_poll_pipe[0], &tmp, 1); + if (tmp2 < 0) { + perror("read kconsumerd poll"); + } + continue; + } + + /* Take care of high priority channels first. */ + for (i = 0; i < nb_fd; i++) { + switch(pollfd[i].revents) { + case POLLERR: + ERR("Error returned in polling fd %d.", pollfd[i].fd); + kconsumerd_del_fd(local_kconsumerd_fd[i]); + num_hup++; + break; + case POLLHUP: + DBG("Polling fd %d tells it has hung up.", pollfd[i].fd); + kconsumerd_del_fd(local_kconsumerd_fd[i]); + num_hup++; + break; + case POLLNVAL: + ERR("Polling fd %d tells fd is not open.", pollfd[i].fd); + kconsumerd_del_fd(local_kconsumerd_fd[i]); + num_hup++; + break; + case POLLPRI: + DBG("Urgent read on fd %d", pollfd[i].fd); + high_prio = 1; + ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]); + /* it's ok to have an unavailable sub-buffer */ + if (ret == EAGAIN) { + ret = 0; + } + break; + } + } + + /* If every buffer FD has hung up, we end the read loop here */ + if (nb_fd > 0 && num_hup == nb_fd) { + DBG("every buffer FD has hung up\n"); + if (kconsumerd_quit == 1) { + goto end; + } + continue; + } + + /* Take care of low priority channels. */ + if (high_prio == 0) { + for (i = 0; i < nb_fd; i++) { + if (pollfd[i].revents == POLLIN) { + DBG("Normal read on fd %d", pollfd[i].fd); + ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]); + /* it's ok to have an unavailable subbuffer */ + if (ret == EAGAIN) { + ret = 0; + } + } + } + } + } +end: + DBG("polling thread exiting"); + if (pollfd != NULL) { + free(pollfd); + pollfd = NULL; + } + if (local_kconsumerd_fd != NULL) { + free(local_kconsumerd_fd); + local_kconsumerd_fd = NULL; + } + return NULL; +} + +/* + * Initialise the necessary environnement : + * - create a new context + * - create the poll_pipe + * - create the should_quit pipe (for signal handler) + * - create the thread pipe (for splice) + * + * Takes a function pointer as argument, this function is called when data is + * available on a buffer. This function is responsible to do the + * kernctl_get_next_subbuf, read the data with mmap or splice depending on the + * buffer configuration and then kernctl_put_next_subbuf at the end. + * + * Returns a pointer to the new context or NULL on error. + */ +struct lttng_kconsumerd_local_data *lttng_kconsumerd_create( + int (*buffer_ready)(struct lttng_kconsumerd_fd *kconsumerd_fd)) +{ + int ret; + struct lttng_kconsumerd_local_data *ctx; + + ctx = malloc(sizeof(struct lttng_kconsumerd_local_data)); + if (ctx == NULL) { + perror("allocating context"); + goto end; + } + + ctx->on_buffer_ready = buffer_ready; + + ret = pipe(ctx->kconsumerd_poll_pipe); + if (ret < 0) { + perror("Error creating poll pipe"); + ctx = NULL; + goto end; + } + + ret = pipe(ctx->kconsumerd_should_quit); + if (ret < 0) { + perror("Error creating recv pipe"); + ctx = NULL; + goto end; + } + + ret = pipe(ctx->kconsumerd_thread_pipe); + if (ret < 0) { + perror("Error creating thread pipe"); + ctx = NULL; + goto end; + } + +end: + return ctx; +} + +/* + * Close all fds associated with the instance and free the context. + */ +void lttng_kconsumerd_destroy(struct lttng_kconsumerd_local_data *ctx) +{ + close(ctx->kconsumerd_error_socket); + close(ctx->kconsumerd_thread_pipe[0]); + close(ctx->kconsumerd_thread_pipe[1]); + close(ctx->kconsumerd_poll_pipe[0]); + close(ctx->kconsumerd_poll_pipe[1]); + close(ctx->kconsumerd_should_quit[0]); + close(ctx->kconsumerd_should_quit[1]); + unlink(ctx->kconsumerd_command_sock_path); + free(ctx); + ctx = NULL; +} + +/* + * This thread listens on the consumerd socket and receives the file + * descriptors from the session daemon. + */ +void *lttng_kconsumerd_thread_receive_fds(void *data) +{ + int sock, client_socket, ret; + struct lttcomm_kconsumerd_header tmp; + /* + * structure to poll for incoming data on communication socket avoids + * making blocking sockets. + */ + struct pollfd kconsumerd_sockpoll[2]; + struct lttng_kconsumerd_local_data *ctx = data; + + + DBG("Creating command socket %s", ctx->kconsumerd_command_sock_path); + unlink(ctx->kconsumerd_command_sock_path); + client_socket = lttcomm_create_unix_sock(ctx->kconsumerd_command_sock_path); + if (client_socket < 0) { + ERR("Cannot create command socket"); + goto end; + } + + ret = lttcomm_listen_unix_sock(client_socket); + if (ret < 0) { + goto end; + } + + DBG("Sending ready command to ltt-sessiond"); + ret = lttng_kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY); + if (ret < 0) { + ERR("Error sending ready command to ltt-sessiond"); + goto end; + } + + ret = fcntl(client_socket, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* prepare the FDs to poll : to client socket and the should_quit pipe */ + kconsumerd_sockpoll[0].fd = ctx->kconsumerd_should_quit[0]; + kconsumerd_sockpoll[0].events = POLLIN | POLLPRI; + kconsumerd_sockpoll[1].fd = client_socket; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + + if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Connection on client_socket"); + + /* Blocking call, waiting for transmission */ + sock = lttcomm_accept_unix_sock(client_socket); + if (sock <= 0) { + WARN("On accept"); + goto end; + } + ret = fcntl(sock, F_SETFL, O_NONBLOCK); + if (ret < 0) { + perror("fcntl O_NONBLOCK"); + goto end; + } + + /* update the polling structure to poll on the established socket */ + kconsumerd_sockpoll[1].fd = sock; + kconsumerd_sockpoll[1].events = POLLIN | POLLPRI; + + while (1) { + if (lttng_kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) { + goto end; + } + DBG("Incoming fds on sock"); + + /* We first get the number of fd we are about to receive */ + ret = lttcomm_recv_unix_sock(sock, &tmp, + sizeof(struct lttcomm_kconsumerd_header)); + if (ret <= 0) { + ERR("Communication interrupted on command socket"); + goto end; + } + if (tmp.cmd_type == STOP) { + DBG("Received STOP command"); + goto end; + } + if (kconsumerd_quit) { + DBG("kconsumerd_thread_receive_fds received quit from signal"); + goto end; + } + + /* we received a command to add or update fds */ + ret = kconsumerd_consumerd_recv_fd(ctx, sock, kconsumerd_sockpoll, + tmp.payload_size, tmp.cmd_type); + if (ret < 0) { + ERR("Receiving the FD, exiting"); + goto end; + } + DBG("received fds on sock"); + } + +end: + DBG("kconsumerd_thread_receive_fds exiting"); + + /* + * when all fds have hung up, the polling thread + * can exit cleanly + */ + kconsumerd_quit = 1; + + /* + * 2s of grace period, if no polling events occur during + * this period, the polling thread will exit even if there + * are still open FDs (should not happen, but safety mechanism). + */ + kconsumerd_poll_timeout = LTTNG_KCONSUMERD_POLL_GRACE_PERIOD; + + /* wake up the polling thread */ + ret = write(ctx->kconsumerd_poll_pipe[1], "4", 1); + if (ret < 0) { + perror("poll pipe write"); + } + return NULL; +} + +/* + * Close all the tracefiles and stream fds, should be called when all instances + * are destroyed. + */ +void lttng_kconsumerd_cleanup(void) +{ + struct lttng_kconsumerd_fd *iter, *tmp; + + /* + * close all outfd. Called when there are no more threads + * running (after joining on the threads), no need to protect + * list iteration with mutex. + */ + cds_list_for_each_entry_safe(iter, tmp, + &kconsumerd_data.fd_list.head, list) { + kconsumerd_del_fd(iter); + } +} + +/* + * Called from signal handler. + */ +void lttng_kconsumerd_should_exit(struct lttng_kconsumerd_local_data *ctx) +{ + int ret; + kconsumerd_quit = 1; + ret = write(ctx->kconsumerd_should_quit[1], "4", 1); + if (ret < 0) { + perror("write kconsumerd quit"); + } +} + +/* + * Send return code to the session daemon. + */ +int lttng_kconsumerd_send_error( + struct lttng_kconsumerd_local_data *ctx, int cmd) +{ + if (ctx->kconsumerd_error_socket > 0) { + return lttcomm_send_unix_sock(ctx->kconsumerd_error_socket, &cmd, + sizeof(enum lttcomm_sessiond_command)); + } + + return 0; +} diff --git a/ltt-kconsumerd/Makefile.am b/ltt-kconsumerd/Makefile.am index ce75694d9..9ba16549b 100644 --- a/ltt-kconsumerd/Makefile.am +++ b/ltt-kconsumerd/Makefile.am @@ -1,10 +1,10 @@ -AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/liblttkconsumerd -I$(top_srcdir)/libkernelctl +AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/libkernelctl bin_PROGRAMS = ltt-kconsumerd ltt_kconsumerd_SOURCES = ltt-kconsumerd.c ltt_kconsumerd_LDADD = \ - $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \ - $(top_builddir)/libkernelctl/libkernelctl.la \ - $(top_builddir)/liblttkconsumerd/liblttkconsumerd.la + $(top_builddir)/libkernelctl/libkernelctl.la \ + $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \ + $(top_builddir)/liblttngkconsumerd/liblttngkconsumerd.la diff --git a/ltt-kconsumerd/ltt-kconsumerd.c b/ltt-kconsumerd/ltt-kconsumerd.c index 32a726cf5..cd4b00e01 100644 --- a/ltt-kconsumerd/ltt-kconsumerd.c +++ b/ltt-kconsumerd/ltt-kconsumerd.c @@ -37,12 +37,15 @@ #include #include +#include + #include "lttngerr.h" #include "kernelctl.h" -#include "lttkconsumerd.h" +#include "ltt-kconsumerd.h" +#include "lttng-sessiond-comm.h" /* the two threads (receive fd and poll) */ -pthread_t threads[2]; +static pthread_t threads[2]; /* to count the number of time the user pressed ctrl+c */ static int sigintcount = 0; @@ -52,16 +55,14 @@ int opt_quiet; int opt_verbose; static int opt_daemon; static const char *progname; -char command_sock_path[PATH_MAX]; /* Global command socket path */ -char error_sock_path[PATH_MAX]; /* Global error path */ +static char command_sock_path[PATH_MAX]; /* Global command socket path */ +static char error_sock_path[PATH_MAX]; /* Global error path */ -/* the liblttkconsumerd context */ -struct kconsumerd_local_data *ctx; +/* the liblttngkconsumerd context */ +static struct lttng_kconsumerd_local_data *ctx; /* - * sighandler - * - * Signal handler for the daemon + * Signal handler for the daemon */ static void sighandler(int sig) { @@ -70,13 +71,11 @@ static void sighandler(int sig) return; } - kconsumerd_should_exit(ctx); + lttng_kconsumerd_should_exit(ctx); } /* - * set_signal_handler - * - * Setup signal handler for : + * Setup signal handler for : * SIGINT, SIGTERM, SIGPIPE */ static int set_signal_handler(void) @@ -194,11 +193,9 @@ static void parse_args(int argc, char **argv) } /* - * read_subbuffer - * - * Consume data on a file descriptor and write it on a trace file + * Consume data on a file descriptor and write it on a trace file. */ -static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) +static int read_subbuffer(struct lttng_kconsumerd_fd *kconsumerd_fd) { unsigned long len; int err; @@ -226,7 +223,7 @@ static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) } /* splice the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + ret = lttng_kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); if (ret < 0) { /* * display the error but continue processing to try @@ -244,7 +241,7 @@ static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd) goto end; } /* write the subbuffer to the tracefile */ - ret = kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len); + ret = lttng_kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len); if (ret < 0) { /* * display the error but continue processing to try @@ -301,12 +298,12 @@ int main(int argc, char **argv) KCONSUMERD_CMD_SOCK_PATH); } /* create the pipe to wake to receiving thread when needed */ - ctx = kconsumerd_create(read_subbuffer); + ctx = lttng_kconsumerd_create(read_subbuffer); if (ctx == NULL) { goto error; } - kconsumerd_set_command_socket_path(ctx, command_sock_path); + lttng_kconsumerd_set_command_sock_path(ctx, command_sock_path); if (strlen(error_sock_path) == 0) { snprintf(error_sock_path, PATH_MAX, KCONSUMERD_ERR_SOCK_PATH); @@ -323,10 +320,10 @@ int main(int argc, char **argv) if (ret < 0) { WARN("Cannot connect to error socket, is ltt-sessiond started ?"); } - kconsumerd_set_error_socket(ctx, ret); + lttng_kconsumerd_set_error_sock(ctx, ret); /* Create the thread to manage the receive of fd */ - ret = pthread_create(&threads[0], NULL, kconsumerd_thread_receive_fds, + ret = pthread_create(&threads[0], NULL, lttng_kconsumerd_thread_receive_fds, (void *) ctx); if (ret != 0) { perror("pthread_create"); @@ -334,7 +331,7 @@ int main(int argc, char **argv) } /* Create thread to manage the polling/writing of traces */ - ret = pthread_create(&threads[1], NULL, kconsumerd_thread_poll_fds, + ret = pthread_create(&threads[1], NULL, lttng_kconsumerd_thread_poll_fds, (void *) ctx); if (ret != 0) { perror("pthread_create"); @@ -349,16 +346,16 @@ int main(int argc, char **argv) } } ret = EXIT_SUCCESS; - kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS); goto end; error: ret = EXIT_FAILURE; - kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE); + lttng_kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE); end: - kconsumerd_destroy(ctx); - kconsumerd_cleanup(); + lttng_kconsumerd_destroy(ctx); + lttng_kconsumerd_cleanup(); return ret; } diff --git a/ltt-sessiond/main.c b/ltt-sessiond/main.c index 5db41869f..652798d34 100644 --- a/ltt-sessiond/main.c +++ b/ltt-sessiond/main.c @@ -41,6 +41,7 @@ #include /* URCU list library (-lurcu) */ #include +#include #include #include "context.h" @@ -50,7 +51,7 @@ #include "ust-ctl.h" #include "session.h" #include "traceable-app.h" -#include "lttng-kconsumerd.h" +#include "ltt-kconsumerd.h" #include "utils.h" /* Const values */