Rename consumer threads and spawn them in daemon
authorDavid Goulet <dgoulet@efficios.com>
Thu, 11 Oct 2012 17:10:29 +0000 (13:10 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 16:49:01 +0000 (12:49 -0400)
The metadata thread is now created in the lttng-consumerd daemon so all
thread could be controlled inside the daemon.

This is the first step of a consumer thread refactoring which aims at
moving data and metadata stream operations inside a dedicated thread so
the session daemon thread does not block and is more efficient at adding
streams.

The most important concept is that a stream file descriptor MUST be
opened as quickly as we can then passed to the right thread (for UST
since they are already opened by the session daemon for the kernel).

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-consumerd/lttng-consumerd.c
src/common/consumer.c
src/common/consumer.h

index 5952334cd47fe1c1d882e9c7880e627218fb044b..946fb02af2c1931ba95adb969593f81dcac19ad9 100644 (file)
@@ -356,23 +356,31 @@ int main(int argc, char **argv)
        }
        lttng_consumer_set_error_sock(ctx, ret);
 
-       /* Create the thread to manage the receive of fd */
-       ret = pthread_create(&threads[0], NULL, lttng_consumer_thread_receive_fds,
+       /* Create thread to manage the polling/writing of trace metadata */
+       ret = pthread_create(&threads[0], NULL, consumer_thread_metadata_poll,
+                       (void *) ctx);
+       if (ret != 0) {
+               perror("pthread_create");
+               goto error;
+       }
+
+       /* Create thread to manage the polling/writing of trace data */
+       ret = pthread_create(&threads[1], NULL, consumer_thread_data_poll,
                        (void *) ctx);
        if (ret != 0) {
                perror("pthread_create");
                goto error;
        }
 
-       /* Create thread to manage the polling/writing of traces */
-       ret = pthread_create(&threads[1], NULL, lttng_consumer_thread_poll_fds,
+       /* Create the thread to manage the receive of fd */
+       ret = pthread_create(&threads[2], NULL, consumer_thread_sessiond_poll,
                        (void *) ctx);
        if (ret != 0) {
                perror("pthread_create");
                goto error;
        }
 
-       for (i = 0; i < 2; i++) {
+       for (i = 0; i < 3; i++) {
                ret = pthread_join(threads[i], &status);
                if (ret != 0) {
                        perror("pthread_join");
index 27c870f3ad83fdd546b821f2758c1041d7e8fa95..cf1b7b96f95a6c09897d44b9f6d735013513d035 100644 (file)
@@ -1756,7 +1756,7 @@ error:
  * Thread polls on metadata file descriptor and write them on disk or on the
  * network.
  */
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
 {
        int ret, i, pollfd;
        uint32_t revents, nb_fd;
@@ -1942,7 +1942,7 @@ end:
  * This thread polls the fds in the set to consume the data and write
  * it to tracefile if necessary.
  */
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
 {
        int num_rdy, num_hup, high_prio, ret, i;
        struct pollfd *pollfd = NULL;
@@ -1952,19 +1952,9 @@ void *lttng_consumer_thread_poll_fds(void *data)
        int nb_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
-       pthread_t metadata_thread;
-       void *status;
 
        rcu_register_thread();
 
-       /* Start metadata polling thread */
-       ret = pthread_create(&metadata_thread, NULL,
-                       lttng_consumer_thread_poll_metadata, (void *) ctx);
-       if (ret < 0) {
-               PERROR("pthread_create metadata thread");
-               goto end;
-       }
-
        local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
 
        while (1) {
@@ -2148,19 +2138,13 @@ end:
 
        /*
         * Close the write side of the pipe so epoll_wait() in
-        * lttng_consumer_thread_poll_metadata can catch it. The thread is
-        * monitoring the read side of the pipe. If we close them both, epoll_wait
-        * strangely does not return and could create a endless wait period if the
-        * pipe is the only tracked fd in the poll set. The thread will take care
-        * of closing the read side.
+        * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+        * read side of the pipe. If we close them both, epoll_wait strangely does
+        * not return and could create a endless wait period if the pipe is the
+        * only tracked fd in the poll set. The thread will take care of closing
+        * the read side.
         */
        close(ctx->consumer_metadata_pipe[1]);
-       if (ret) {
-               ret = pthread_join(metadata_thread, &status);
-               if (ret < 0) {
-                       PERROR("pthread_join metadata thread");
-               }
-       }
 
        rcu_unregister_thread();
        return NULL;
@@ -2170,7 +2154,7 @@ end:
  * This thread listens on the consumerd socket and receives the file
  * descriptors from the session daemon.
  */
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
 {
        int sock, client_socket, ret;
        /*
index d0cd8fd869e2b647581e02e163f75725365124f8..4b225e43c4ced7a0a38527402690a825e27ed9cc 100644 (file)
@@ -385,8 +385,9 @@ extern int lttng_consumer_get_produced_snapshot(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                unsigned long *pos);
-extern void *lttng_consumer_thread_poll_fds(void *data);
-extern void *lttng_consumer_thread_receive_fds(void *data);
+extern void *consumer_thread_metadata_poll(void *data);
+extern void *consumer_thread_data_poll(void *data);
+extern void *consumer_thread_sessiond_poll(void *data);
 extern int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
This page took 0.032364 seconds and 4 git commands to generate.