Register the consuming function and add a library context
authorJulien Desfossez <julien.desfossez@polymtl.ca>
Fri, 12 Aug 2011 13:36:47 +0000 (09:36 -0400)
committerDavid Goulet <david.goulet@polymtl.ca>
Fri, 12 Aug 2011 14:56:16 +0000 (10:56 -0400)
The init function of the library now takes a function as argument to
allow a consumer using the library to control the function to be called
when data is ready in a buffer.

The kconsumerd_on_read_subbuffer_mmap and
kconsumerd_on_read_subbuffer_splice are now exported to allow a consumer
to use them directly if needed.

Also the library has now a context, where all local parameters are
registered instead of static variables. That way, we can have multiple
callers using the library within the same process. Only the flag
indicating that all fds are closed remain global to the library and
shared among callers.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Julien Desfossez <julien.desfossez@polymtl.ca>
liblttkconsumerd/lttkconsumerd.c
liblttkconsumerd/lttkconsumerd.h
ltt-kconsumerd/ltt-kconsumerd.c

index 5c22d5ec5b3d43503657b7b30739fa4c8e57a7d3..d4908d16c9c43bd01023698d91e76efab34ebb88 100644 (file)
@@ -62,26 +62,13 @@ struct kconsumerd_global_data {
        unsigned int need_update;
 } kconsumerd_data = {
        .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
+       .fds_count = 0,
+       .need_update = 1,
 };
 
-/* communication with splice */
-static int kconsumerd_thread_pipe[2];
-
-/* pipe to wake the poll thread when necessary */
-static int kconsumerd_poll_pipe[2];
-
-/* to let the signal handler wake up the fd receiver thread */
-static int kconsumerd_should_quit[2];
-
 /* timeout parameter, to control the polling thread grace period */
 static int kconsumerd_poll_timeout = -1;
 
-/* socket to communicate errors with sessiond */
-static int kconsumerd_error_socket;
-
-/* socket to exchange commands with sessiond */
-static char *kconsumerd_command_sock_path;
-
 /*
  * flag to inform the polling thread to quit when all fd hung up.
  * Updated by the kconsumerd_thread_receive_fds when it notices that all
@@ -95,9 +82,9 @@ static volatile int kconsumerd_quit = 0;
  *
  * Set the error socket
  */
-void kconsumerd_set_error_socket(int sock)
+void kconsumerd_set_error_socket(struct kconsumerd_local_data *ctx, int sock)
 {
-       kconsumerd_error_socket = sock;
+       ctx->kconsumerd_error_socket = sock;
 }
 
 /*
@@ -105,9 +92,10 @@ void kconsumerd_set_error_socket(int sock)
  *
  * Set the command socket path
  */
-void kconsumerd_set_command_socket_path(char *sock)
+void kconsumerd_set_command_socket_path(struct kconsumerd_local_data *ctx,
+               char *sock)
 {
-       kconsumerd_command_sock_path = sock;
+       ctx->kconsumerd_command_sock_path = sock;
 }
 
 /*
@@ -144,7 +132,9 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
        if (kconsumerd_data.fds_count > 0) {
                kconsumerd_data.fds_count--;
                if (lcf != NULL) {
-                       close(lcf->out_fd);
+                       if (lcf->out_fd != 0) {
+                               close(lcf->out_fd);
+                       }
                        close(lcf->consumerd_fd);
                        free(lcf);
                        lcf = NULL;
@@ -161,8 +151,8 @@ static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
  */
 static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
 {
-       int ret;
        struct kconsumerd_fd *tmp_fd;
+       int ret = 0;
 
        pthread_mutex_lock(&kconsumerd_data.lock);
        /* Check if already exist */
@@ -176,22 +166,24 @@ static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_f
        tmp_fd->consumerd_fd = consumerd_fd;
        tmp_fd->state = buf->state;
        tmp_fd->max_sb_size = buf->max_sb_size;
+       tmp_fd->out_fd = 0;
+       tmp_fd->out_fd_offset = 0;
        strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
        tmp_fd->path_name[PATH_MAX - 1] = '\0';
 
        /* Opening the tracefile in write mode */
-       ret = open(tmp_fd->path_name,
-                       O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
-       if (ret < 0) {
-               ERR("Opening %s", tmp_fd->path_name);
-               perror("open");
-               goto end;
+       if (tmp_fd->path_name != NULL) {
+               ret = open(tmp_fd->path_name,
+                               O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
+               if (ret < 0) {
+                       ERR("Opening %s", tmp_fd->path_name);
+                       perror("open");
+                       goto end;
+               }
+               tmp_fd->out_fd = ret;
+               DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
+                               tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
        }
-       tmp_fd->out_fd = ret;
-       tmp_fd->out_fd_offset = 0;
-
-       DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
-                       tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
 
        cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
        kconsumerd_data.fds_count++;
@@ -231,16 +223,14 @@ static void kconsumerd_change_fd_state(int sessiond_fd,
  * Returns the number of fds in the structures
  * Called with kconsumerd_data.lock held.
  */
-static int kconsumerd_update_poll_array(struct pollfd **pollfd,
-               struct kconsumerd_fd **local_kconsumerd_fd)
+static int kconsumerd_update_poll_array(struct kconsumerd_local_data *ctx,
+               struct pollfd **pollfd, struct kconsumerd_fd **local_kconsumerd_fd)
 {
        struct kconsumerd_fd *iter;
        int i = 0;
 
        DBG("Updating poll fd array");
-
        cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
-               DBG("Inside for each");
                if (iter->state == ACTIVE_FD) {
                        DBG("Active FD %d", iter->consumerd_fd);
                        (*pollfd)[i].fd = iter->consumerd_fd;
@@ -254,7 +244,7 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
         * insert the kconsumerd_poll_pipe at the end of the array and don't
         * increment i so nb_fd is the number of real FD
         */
-       (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
+       (*pollfd)[i].fd = ctx->kconsumerd_poll_pipe[0];
        (*pollfd)[i].events = POLLIN;
        return i;
 }
@@ -266,7 +256,7 @@ static int kconsumerd_update_poll_array(struct pollfd **pollfd,
  * mmap the ring buffer, read it and write the data to the tracefile.
  * Returns the number of bytes written
  */
-static int kconsumerd_on_read_subbuffer_mmap(
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data *ctx,
                struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
 {
        unsigned long mmap_len, mmap_offset, padded_len, padding_len;
@@ -379,7 +369,7 @@ end:
  * Splice the data from the ring buffer to the tracefile.
  * Returns the number of bytes spliced
  */
-static int kconsumerd_on_read_subbuffer(
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data *ctx,
                struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
 {
        long ret = 0;
@@ -391,7 +381,7 @@ static int kconsumerd_on_read_subbuffer(
        while (len > 0) {
                DBG("splice chan to pipe offset %lu (fd : %d)",
                                (unsigned long)offset, fd);
-               ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
+               ret = splice(fd, &offset, ctx->kconsumerd_thread_pipe[1], NULL, len,
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice chan to pipe ret %ld", ret);
                if (ret < 0) {
@@ -400,7 +390,7 @@ static int kconsumerd_on_read_subbuffer(
                        goto splice_error;
                }
 
-               ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
+               ret = splice(ctx->kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
                                SPLICE_F_MOVE | SPLICE_F_MORE);
                DBG("splice pipe to file %ld", ret);
                if (ret < 0) {
@@ -452,98 +442,17 @@ splice_error:
        /* send the appropriate error description to sessiond */
        switch(ret) {
        case EBADF:
-               kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
+               kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EBADF);
                break;
        case EINVAL:
-               kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
+               kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_EINVAL);
                break;
        case ENOMEM:
-               kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
+               kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ENOMEM);
                break;
        case ESPIPE:
-               kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
-               break;
-       }
-
-end:
-       return ret;
-}
-
-/*
- * kconsumerd_read_subbuffer
- *
- * Consume data on a file descriptor and write it on a trace file
- */
-static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
-{
-       unsigned long len;
-       int err;
-       long ret = 0;
-       int infd = kconsumerd_fd->consumerd_fd;
-
-       DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
-       /* Get the next subbuffer */
-       err = kernctl_get_next_subbuf(infd);
-       if (err != 0) {
-               ret = errno;
-               perror("Reserving sub buffer failed (everything is normal, "
-                               "it is due to concurrency)");
-               goto end;
-       }
-
-       switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
-       case LTTNG_EVENT_SPLICE:
-               /* read the whole subbuffer */
-               err = kernctl_get_padded_subbuf_size(infd, &len);
-               if (err != 0) {
-                       ret = errno;
-                       perror("Getting sub-buffer len failed.");
-                       goto end;
-               }
-
-               /* splice the subbuffer to the tracefile */
-               ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
-               if (ret < 0) {
-                       /*
-                        * display the error but continue processing to try
-                        * to release the subbuffer
-                        */
-                       ERR("Error splicing to tracefile");
-               }
-               break;
-       case LTTNG_EVENT_MMAP:
-               /* read the used subbuffer size */
-               err = kernctl_get_subbuf_size(infd, &len);
-               if (err != 0) {
-                       ret = errno;
-                       perror("Getting sub-buffer len failed.");
-                       goto end;
-               }
-               /* write the subbuffer to the tracefile */
-               ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
-               if (ret < 0) {
-                       /*
-                        * display the error but continue processing to try
-                        * to release the subbuffer
-                        */
-                       ERR("Error writing to tracefile");
-               }
+               kconsumerd_send_error(ctx, KCONSUMERD_SPLICE_ESPIPE);
                break;
-       default:
-               ERR("Unknown output method");
-               ret = -1;
-       }
-
-       err = kernctl_put_next_subbuf(infd);
-       if (err != 0) {
-               ret = errno;
-               if (errno == EFAULT) {
-                       perror("Error in unreserving sub buffer\n");
-               } else if (errno == EIO) {
-                       /* Should never happen with newer LTTng versions */
-                       perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
-               }
-               goto end;
        }
 
 end:
@@ -583,8 +492,8 @@ exit:
  * structures describing each fd (path name).
  * Returns the size of received data
  */
-static int kconsumerd_consumerd_recv_fd(int sfd,
-               struct pollfd *kconsumerd_sockpoll, int size,
+static int kconsumerd_consumerd_recv_fd(struct kconsumerd_local_data *ctx,
+               int sfd, struct pollfd *kconsumerd_sockpoll, int size,
                enum kconsumerd_command cmd_type)
 {
        struct iovec iov[1];
@@ -624,7 +533,7 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
 
                if (ret != (size / nb_fd)) {
                        ERR("Received only %d, expected %d", ret, size);
-                       kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
+                       kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
                        goto end;
                }
 
@@ -632,7 +541,7 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
                if (!cmsg) {
                        ERR("Invalid control message header");
                        ret = -1;
-                       kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
+                       kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
                        goto end;
                }
 
@@ -643,7 +552,7 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
                                DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
                                ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
                                if (ret < 0) {
-                                       kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
+                                       kconsumerd_send_error(ctx, KCONSUMERD_OUTFD_ERROR);
                                        goto end;
                                }
                                break;
@@ -654,13 +563,13 @@ static int kconsumerd_consumerd_recv_fd(int sfd,
                                break;
                        }
                        /* signal the poll thread */
-                       tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
+                       tmp2 = write(ctx->kconsumerd_poll_pipe[1], "4", 1);
                        if (tmp2 < 0) {
                                perror("write kconsumerd poll");
                        }
                } else {
                        ERR("Didn't received any fd");
-                       kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
+                       kconsumerd_send_error(ctx, KCONSUMERD_ERROR_RECV_FD);
                        ret = -1;
                        goto end;
                }
@@ -686,12 +595,8 @@ void *kconsumerd_thread_poll_fds(void *data)
        int nb_fd = 0;
        char tmp;
        int tmp2;
+       struct kconsumerd_local_data *ctx = data;
 
-       ret = pipe(kconsumerd_thread_pipe);
-       if (ret < 0) {
-               perror("Error creating pipe");
-               goto end;
-       }
 
        local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
 
@@ -730,10 +635,10 @@ void *kconsumerd_thread_poll_fds(void *data)
                                pthread_mutex_unlock(&kconsumerd_data.lock);
                                goto end;
                        }
-                       ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
+                       ret = kconsumerd_update_poll_array(ctx, &pollfd, local_kconsumerd_fd);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
-                               kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
+                               kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR);
                                pthread_mutex_unlock(&kconsumerd_data.lock);
                                goto end;
                        }
@@ -748,7 +653,7 @@ void *kconsumerd_thread_poll_fds(void *data)
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        perror("Poll error");
-                       kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
+                       kconsumerd_send_error(ctx, KCONSUMERD_POLL_ERROR);
                        goto end;
                } else if (num_rdy == 0) {
                        DBG("Polling thread timed out");
@@ -768,7 +673,7 @@ void *kconsumerd_thread_poll_fds(void *data)
                 */
                if (pollfd[nb_fd].revents == POLLIN) {
                        DBG("kconsumerd_poll_pipe wake up");
-                       tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
+                       tmp2 = read(ctx->kconsumerd_poll_pipe[0], &tmp, 1);
                        if (tmp2 < 0) {
                                perror("read kconsumerd poll");
                        }
@@ -796,7 +701,7 @@ void *kconsumerd_thread_poll_fds(void *data)
                        case POLLPRI:
                                DBG("Urgent read on fd %d", pollfd[i].fd);
                                high_prio = 1;
-                               ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+                               ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]);
                                /* it's ok to have an unavailable sub-buffer */
                                if (ret == EAGAIN) {
                                        ret = 0;
@@ -819,7 +724,7 @@ void *kconsumerd_thread_poll_fds(void *data)
                        for (i = 0; i < nb_fd; i++) {
                                if (pollfd[i].revents == POLLIN) {
                                        DBG("Normal read on fd %d", pollfd[i].fd);
-                                       ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
+                                       ret = ctx->on_buffer_ready(local_kconsumerd_fd[i]);
                                        /* it's ok to have an unavailable subbuffer */
                                        if (ret == EAGAIN) {
                                                ret = 0;
@@ -842,34 +747,75 @@ end:
 }
 
 /*
- * kconsumerd_init(void)
+ * kconsumerd_create
  *
  * initialise the necessary environnement :
- * - inform the polling thread to update the polling array
+ * - create a new context
  * - create the poll_pipe
  * - create the should_quit pipe (for signal handler)
+ * - create the thread pipe (for splice)
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ * Returns a pointer to the new context or NULL on error.
  */
-int kconsumerd_init(void)
+struct kconsumerd_local_data *kconsumerd_create(
+               int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd))
 {
        int ret;
+       struct kconsumerd_local_data *ctx;
 
-       /* need to update the polling array at init time */
-       kconsumerd_data.need_update = 1;
+       ctx = malloc(sizeof(struct kconsumerd_local_data));
+       if (ctx == NULL) {
+               perror("allocating context");
+               goto end;
+       }
+
+       ctx->on_buffer_ready = buffer_ready;
 
-       ret = pipe(kconsumerd_poll_pipe);
+       ret = pipe(ctx->kconsumerd_poll_pipe);
        if (ret < 0) {
                perror("Error creating poll pipe");
+               ctx = NULL;
                goto end;
        }
 
-       ret = pipe(kconsumerd_should_quit);
+       ret = pipe(ctx->kconsumerd_should_quit);
        if (ret < 0) {
                perror("Error creating recv pipe");
+               ctx = NULL;
+               goto end;
+       }
+
+       ret = pipe(ctx->kconsumerd_thread_pipe);
+       if (ret < 0) {
+               perror("Error creating thread pipe");
+               ctx = NULL;
                goto end;
        }
 
 end:
-       return ret;
+       return ctx;
+}
+
+/*
+ * kconsumerd_destroy
+ *
+ * Close all fds associated with the instance and free the context
+ */
+void kconsumerd_destroy(struct kconsumerd_local_data *ctx)
+{
+       close(ctx->kconsumerd_error_socket);
+       close(ctx->kconsumerd_thread_pipe[0]);
+       close(ctx->kconsumerd_thread_pipe[1]);
+       close(ctx->kconsumerd_poll_pipe[0]);
+       close(ctx->kconsumerd_poll_pipe[1]);
+       close(ctx->kconsumerd_should_quit[0]);
+       close(ctx->kconsumerd_should_quit[1]);
+       unlink(ctx->kconsumerd_command_sock_path);
+       free(ctx);
+       ctx = NULL;
 }
 
 /*
@@ -887,11 +833,12 @@ void *kconsumerd_thread_receive_fds(void *data)
         * avoids making blocking sockets
         */
        struct pollfd kconsumerd_sockpoll[2];
+       struct kconsumerd_local_data *ctx = data;
 
 
-       DBG("Creating command socket %s", kconsumerd_command_sock_path);
-       unlink(kconsumerd_command_sock_path);
-       client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
+       DBG("Creating command socket %s", ctx->kconsumerd_command_sock_path);
+       unlink(ctx->kconsumerd_command_sock_path);
+       client_socket = lttcomm_create_unix_sock(ctx->kconsumerd_command_sock_path);
        if (client_socket < 0) {
                ERR("Cannot create command socket");
                goto end;
@@ -903,7 +850,7 @@ void *kconsumerd_thread_receive_fds(void *data)
        }
 
        DBG("Sending ready command to ltt-sessiond");
-       ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
+       ret = kconsumerd_send_error(ctx, KCONSUMERD_COMMAND_SOCK_READY);
        if (ret < 0) {
                ERR("Error sending ready command to ltt-sessiond");
                goto end;
@@ -916,7 +863,7 @@ void *kconsumerd_thread_receive_fds(void *data)
        }
 
        /* prepare the FDs to poll : to client socket and the should_quit pipe */
-       kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
+       kconsumerd_sockpoll[0].fd = ctx->kconsumerd_should_quit[0];
        kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
        kconsumerd_sockpoll[1].fd = client_socket;
        kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
@@ -965,7 +912,7 @@ void *kconsumerd_thread_receive_fds(void *data)
                }
 
                /* we received a command to add or update fds */
-               ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
+               ret = kconsumerd_consumerd_recv_fd(ctx, sock, kconsumerd_sockpoll,
                                tmp.payload_size, tmp.cmd_type);
                if (ret <= 0) {
                        ERR("Receiving the FD, exiting");
@@ -991,7 +938,7 @@ end:
        kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
 
        /* wake up the polling thread */
-       ret = write(kconsumerd_poll_pipe[1], "4", 1);
+       ret = write(ctx->kconsumerd_poll_pipe[1], "4", 1);
        if (ret < 0) {
                perror("poll pipe write");
        }
@@ -1001,15 +948,13 @@ end:
 /*
  *  kconsumerd_cleanup
  *
- *  Cleanup the daemon's socket on exit
+ *  Close all the tracefiles and stream fds, should be called when all
+ *  instances are destroyed.
  */
 void kconsumerd_cleanup(void)
 {
        struct kconsumerd_fd *iter, *tmp;
 
-       /* remove the socket file */
-       unlink(kconsumerd_command_sock_path);
-
        /*
         * close all outfd. Called when there are no more threads
         * running (after joining on the threads), no need to protect
@@ -1025,11 +970,11 @@ void kconsumerd_cleanup(void)
  *
  * Called from signal handler.
  */
-void kconsumerd_should_exit(void)
+void kconsumerd_should_exit(struct kconsumerd_local_data *ctx)
 {
        int ret;
        kconsumerd_quit = 1;
-       ret = write(kconsumerd_should_quit[1], "4", 1);
+       ret = write(ctx->kconsumerd_should_quit[1], "4", 1);
        if (ret < 0) {
                perror("write kconsumerd quit");
        }
@@ -1040,10 +985,10 @@ void kconsumerd_should_exit(void)
  *
  * send return code to ltt-sessiond
  */
-int kconsumerd_send_error(enum lttcomm_return_code cmd)
+int kconsumerd_send_error(struct kconsumerd_local_data *ctx, enum lttcomm_return_code cmd)
 {
-       if (kconsumerd_error_socket > 0) {
-               return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
+       if (ctx->kconsumerd_error_socket > 0) {
+               return lttcomm_send_unix_sock(ctx->kconsumerd_error_socket, &cmd,
                                sizeof(enum lttcomm_sessiond_command));
        }
 
index cbdedd213ba0c614e3336d91378c780ac26af618..10e4a5504e19148ac7fe69d10783342090019bc4 100644 (file)
@@ -57,15 +57,59 @@ struct kconsumerd_fd {
        unsigned long max_sb_size; /* the subbuffer size for this channel */
 };
 
+struct kconsumerd_local_data {
+       /* function to call when data is available on a buffer */
+       int (*on_buffer_ready)(struct kconsumerd_fd *kconsumerd_fd);
+       /* socket to communicate errors with sessiond */
+       int kconsumerd_error_socket;
+       /* socket to exchange commands with sessiond */
+       char *kconsumerd_command_sock_path;
+       /* communication with splice */
+       int kconsumerd_thread_pipe[2];
+       /* pipe to wake the poll thread when necessary */
+       int kconsumerd_poll_pipe[2];
+       /* to let the signal handler wake up the fd receiver thread */
+       int kconsumerd_should_quit[2];
+};
+
 /*
- * kconsumerd_init(void)
+ * kconsumerd_create
  * initialise the necessary environnement :
- * - inform the polling thread to update the polling array
+ * - create a new context
  * - create the poll_pipe
  * - create the should_quit pipe (for signal handler)
- * returns the return code of pipe, 0 on success, -1 on error
+ * - create the thread pipe (for splice)
+ * Takes a function pointer as argument, this function is called when data is
+ * available on a buffer. This function is responsible to do the
+ * kernctl_get_next_subbuf, read the data with mmap or splice depending on the
+ * buffer configuration and then kernctl_put_next_subbuf at the end.
+ * Returns a pointer to the new context or NULL on error.
+ */
+struct kconsumerd_local_data *kconsumerd_create(
+               int (*buffer_ready)(struct kconsumerd_fd *kconsumerd_fd));
+
+/*
+ * kconsumerd_destroy
+ * Close all fds associated with the instance and free the context
+ */
+void kconsumerd_destroy(struct kconsumerd_local_data *ctx);
+
+/*
+ * kconsumerd_on_read_subbuffer_mmap
+ * mmap the ring buffer, read it and write the data to the tracefile.
+ * Returns the number of bytes written
+ */
+int kconsumerd_on_read_subbuffer_mmap(struct kconsumerd_local_data *ctx,
+               struct kconsumerd_fd *kconsumerd_fd, unsigned long len);
+
+/*
+ * kconsumerd_on_read_subbuffer
+ *
+ * Splice the data from the ring buffer to the tracefile.
+ * Returns the number of bytes spliced
  */
-int kconsumerd_init(void);
+int kconsumerd_on_read_subbuffer_splice(struct kconsumerd_local_data *ctx,
+               struct kconsumerd_fd *kconsumerd_fd, unsigned long len);
 
 /*
  * kconsumerd_send_error
@@ -73,7 +117,8 @@ int kconsumerd_init(void);
  * returns the return code of sendmsg : the number of bytes transmitted
  * or -1 on error.
  */
-int kconsumerd_send_error(enum lttcomm_return_code cmd);
+int kconsumerd_send_error(struct kconsumerd_local_data *ctx,
+               enum lttcomm_return_code cmd);
 
 /*
  * kconsumerd_poll_socket
@@ -101,7 +146,7 @@ void *kconsumerd_thread_receive_fds(void *data);
  * kconsumerd_should_exit
  * Called from signal handler to ensure a clean exit
  */
-void kconsumerd_should_exit(void);
+void kconsumerd_should_exit(struct kconsumerd_local_data *ctx);
 
 /*
  *  kconsumerd_cleanup
@@ -113,12 +158,12 @@ void kconsumerd_cleanup(void);
  * kconsumerd_set_error_socket
  * Set the error socket for communication with a session daemon
  */
-void kconsumerd_set_error_socket(int sock);
+void kconsumerd_set_error_socket(struct kconsumerd_local_data *ctx, int sock);
 
 /*
  * kconsumerd_set_command_socket_path
  * Set the command socket path for communication with a session daemon
  */
-void kconsumerd_set_command_socket_path(char *sock);
+void kconsumerd_set_command_socket_path(struct kconsumerd_local_data *ctx, char *sock);
 
 #endif /* _LIBLTTKCONSUMERD_H */
index 1e2841c015ac8e163e177404c758efd5505a1121..64c5ccfae908499c20eeb59a5cdb305eae342a2c 100644 (file)
@@ -55,6 +55,9 @@ static const char *progname;
 char command_sock_path[PATH_MAX]; /* Global command socket path */
 char error_sock_path[PATH_MAX]; /* Global error path */
 
+/* the liblttkconsumerd context */
+struct kconsumerd_local_data *ctx;
+
 /*
  *  sighandler
  *
@@ -67,7 +70,7 @@ static void sighandler(int sig)
                return;
        }
 
-       kconsumerd_should_exit();
+       kconsumerd_should_exit(ctx);
 }
 
 /*
@@ -190,6 +193,86 @@ static void parse_args(int argc, char **argv)
        }
 }
 
+/*
+ * read_subbuffer
+ *
+ * Consume data on a file descriptor and write it on a trace file
+ */
+static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
+{
+       unsigned long len;
+       int err;
+       long ret = 0;
+       int infd = kconsumerd_fd->consumerd_fd;
+
+       DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
+       /* Get the next subbuffer */
+       err = kernctl_get_next_subbuf(infd);
+       if (err != 0) {
+               ret = errno;
+               perror("Reserving sub buffer failed (everything is normal, "
+                               "it is due to concurrency)");
+               goto end;
+       }
+
+       switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
+               case LTTNG_EVENT_SPLICE:
+                       /* read the whole subbuffer */
+                       err = kernctl_get_padded_subbuf_size(infd, &len);
+                       if (err != 0) {
+                               ret = errno;
+                               perror("Getting sub-buffer len failed.");
+                               goto end;
+                       }
+
+                       /* splice the subbuffer to the tracefile */
+                       ret = kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len);
+                       if (ret < 0) {
+                               /*
+                                * display the error but continue processing to try
+                                * to release the subbuffer
+                                */
+                               ERR("Error splicing to tracefile");
+                       }
+                       break;
+               case LTTNG_EVENT_MMAP:
+                       /* read the used subbuffer size */
+                       err = kernctl_get_subbuf_size(infd, &len);
+                       if (err != 0) {
+                               ret = errno;
+                               perror("Getting sub-buffer len failed.");
+                               goto end;
+                       }
+                       /* write the subbuffer to the tracefile */
+                       ret = kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len);
+                       if (ret < 0) {
+                               /*
+                                * display the error but continue processing to try
+                                * to release the subbuffer
+                                */
+                               ERR("Error writing to tracefile");
+                       }
+                       break;
+               default:
+                       ERR("Unknown output method");
+                       ret = -1;
+       }
+
+       err = kernctl_put_next_subbuf(infd);
+       if (err != 0) {
+               ret = errno;
+               if (errno == EFAULT) {
+                       perror("Error in unreserving sub buffer\n");
+               } else if (errno == EIO) {
+                       /* Should never happen with newer LTTng versions */
+                       perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+               }
+               goto end;
+       }
+
+end:
+       return ret;
+}
 
 /*
  * main
@@ -217,7 +300,13 @@ int main(int argc, char **argv)
                snprintf(command_sock_path, PATH_MAX,
                                KCONSUMERD_CMD_SOCK_PATH);
        }
-       kconsumerd_set_command_socket_path(command_sock_path);
+       /* create the pipe to wake to receiving thread when needed */
+       ctx = kconsumerd_create(read_subbuffer);
+       if (ctx == NULL) {
+               goto error;
+       }
+
+       kconsumerd_set_command_socket_path(ctx, command_sock_path);
        if (strlen(error_sock_path) == 0) {
                snprintf(error_sock_path, PATH_MAX,
                                KCONSUMERD_ERR_SOCK_PATH);
@@ -227,12 +316,6 @@ int main(int argc, char **argv)
                goto error;
        }
 
-       /* create the pipe to wake to receiving thread when needed */
-       ret = kconsumerd_init();
-       if (ret < 0) {
-               goto end;
-       }
-
        /* Connect to the socket created by ltt-sessiond to report errors */
        DBG("Connecting to error socket %s", error_sock_path);
        ret = lttcomm_connect_unix_sock(error_sock_path);
@@ -240,11 +323,11 @@ int main(int argc, char **argv)
        if (ret < 0) {
                WARN("Cannot connect to error socket, is ltt-sessiond started ?");
        }
-       kconsumerd_set_error_socket(ret);
+       kconsumerd_set_error_socket(ctx, ret);
 
        /* Create the thread to manage the receive of fd */
        ret = pthread_create(&threads[0], NULL, kconsumerd_thread_receive_fds,
-                       (void *) NULL);
+                       (void *) ctx);
        if (ret != 0) {
                perror("pthread_create");
                goto error;
@@ -252,7 +335,7 @@ int main(int argc, char **argv)
 
        /* Create thread to manage the polling/writing of traces */
        ret = pthread_create(&threads[1], NULL, kconsumerd_thread_poll_fds,
-                       (void *) NULL);
+                       (void *) ctx);
        if (ret != 0) {
                perror("pthread_create");
                goto error;
@@ -266,14 +349,15 @@ int main(int argc, char **argv)
                }
        }
        ret = EXIT_SUCCESS;
-       kconsumerd_send_error(KCONSUMERD_EXIT_SUCCESS);
+       kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS);
        goto end;
 
 error:
        ret = EXIT_FAILURE;
-       kconsumerd_send_error(KCONSUMERD_EXIT_FAILURE);
+       kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE);
 
 end:
+       kconsumerd_destroy(ctx);
        kconsumerd_cleanup();
 
        return ret;
This page took 0.037556 seconds and 4 git commands to generate.