multithreaded lttd
authorcompudj <compudj@04897980-b3bd-0310-b5e0-8ef037075253>
Thu, 2 Feb 2006 15:03:25 +0000 (15:03 +0000)
committercompudj <compudj@04897980-b3bd-0310-b5e0-8ef037075253>
Thu, 2 Feb 2006 15:03:25 +0000 (15:03 +0000)
git-svn-id: http://ltt.polymtl.ca/svn@1508 04897980-b3bd-0310-b5e0-8ef037075253

ltt/branches/poly/lttctl/lttctl.c
ltt/branches/poly/lttd/lttd.c

index 105aacb61efb627cdb6a957e6f9cec0b30a73559..c14e3c9003df1605c78d281b5d821eaf5f7174ac 100644 (file)
@@ -48,6 +48,7 @@ static enum trace_mode mode = LTT_TRACE_NORMAL;
 static enum trace_ctl_op op = CTL_OP_NONE;
 static char *channel_root = NULL;
 static char *trace_root = NULL;
+static char *num_threads = "1";
 
 static int sigchld_received = 0;
 
@@ -85,6 +86,7 @@ void show_arguments(void)
        printf("-x            Number of subbuffers\n");
        printf("-e            Get XML facilities description\n");
        printf("-a            Append to trace\n");
+       printf("-N            Number of ltt threads\n");
        printf("\n");
 }
 
@@ -208,6 +210,12 @@ int parse_arguments(int argc, char **argv)
           case 'a':
             append_trace = 1;
             break;
+          case 'N':
+                                               if(argn+1 < argc) {
+                                                       num_threads = argv[argn+1];
+                                                       argn++;
+                                               }
+                                               break;
                                        default:
                                                printf("Invalid argument '%s'.\n", argv[argn]);
                                                printf("\n");
@@ -411,10 +419,10 @@ int lttctl_daemon(struct lttctl_handle *handle, char *trace_name)
     int ret;
     if(append_trace) 
                ret =   execlp(lttd_path, lttd_path, "-t", trace_root, "-c",
-                       channel_path, "-d", "-a", NULL);
+                       channel_path, "-d", "-a", "-N", num_threads, NULL);
     else
                ret =   execlp(lttd_path, lttd_path, "-t", trace_root, "-c",
-                       channel_path, "-d", NULL);
+                       channel_path, "-d", "-N", num_threads, NULL);
                if(ret) {
       ret = errno;
                        perror("Error in executing the lttd daemon");
index 2fa7fe8db4f5214bc6b78880c15fa688e3bb2b99..230159185df229e0e3d3b220784184910489da2b 100644 (file)
@@ -59,6 +59,7 @@ struct fd_pair {
        unsigned int n_subbufs;
        unsigned int subbuf_size;
        void *mmap;
+       pthread_mutex_t mutex;
 };
 
 struct channel_trace_fd {
@@ -90,7 +91,7 @@ void show_arguments(void)
        printf("-c directory  Root directory of the relayfs trace channels.\n");
        printf("-d            Run in background (daemon).\n");
        printf("-a            Append to an possibly existing trace.\n");
-       printf("-n            Number of threads to start.\n");
+       printf("-N            Number of threads to start.\n");
        printf("\n");
 }
 
@@ -136,7 +137,7 @@ int parse_arguments(int argc, char **argv)
                                        case 'a':
                                                append_mode = 1;
                                                break;
-                                       case 'n':
+                                       case 'N':
                                                if(argn+1 < argc) {
                                                        num_threads = strtoul(argv[argn+1], NULL, 0);
                                                        argn++;
@@ -309,15 +310,15 @@ end:
 int read_subbuffer(struct fd_pair *pair)
 {
        unsigned int    consumed_old;
-       int err, ret;
+       int err, ret=0;
 
 
        err = ioctl(pair->channel, RELAYFS_GET_SUBBUF, 
                                                                &consumed_old);
        printf("cookie : %u\n", consumed_old);
        if(err != 0) {
+               ret = errno;
                perror("Error in reserving sub buffer");
-               ret = -EPERM;
                goto get_error;
        }
        
@@ -327,8 +328,8 @@ int read_subbuffer(struct fd_pair *pair)
                                pair->subbuf_size));
 
        if(err < 0) {
+               ret = errno;
                perror("Error in writing to file");
-               ret = err;
                goto write_error;
        }
 
@@ -336,13 +337,12 @@ int read_subbuffer(struct fd_pair *pair)
 write_error:
        err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old);
        if(err != 0) {
+               ret = errno;
                if(errno == -EFAULT) {
-                       perror("Error in unreserving sub buffer");
-                       ret = -EFAULT;
+                       perror("Error in unreserving sub buffer\n");
                } else if(errno == -EIO) {
                        perror("Reader has been pushed by the writer, last subbuffer corrupted.");
                        /* FIXME : we may delete the last written buffer if we wish. */
-                       ret = -EIO;
                }
                goto get_error;
        }
@@ -380,6 +380,11 @@ int map_channels(struct channel_trace_fd *fd_pairs)
                        perror("Error in getting the size of the subbuffers");
                        goto end;
                }
+               ret = pthread_mutex_init(&pair->mutex, NULL);   /* Fast mutex */
+               if(ret != 0) {
+                       perror("Error in mutex init");
+                       goto end;
+               }
        }
 
        /* Mmap each FD */
@@ -394,6 +399,7 @@ int map_channels(struct channel_trace_fd *fd_pairs)
                }
        }
 
+       goto end; /* success */
 
        /* Error handling */
        /* munmap only the successfully mmapped indexes */
@@ -432,6 +438,11 @@ int unmap_channels(struct channel_trace_fd *fd_pairs)
                        perror("Error in munmap");
                }
                ret |= err_ret;
+               err_ret = pthread_mutex_destroy(&pair->mutex);
+               if(err_ret != 0) {
+                       perror("Error in mutex destroy");
+               }
+               ret |= err_ret;
        }
 
        return ret;
@@ -439,12 +450,14 @@ int unmap_channels(struct channel_trace_fd *fd_pairs)
 
 
 /* read_channels
+ *
+ * Thread worker.
  *
  * Read the relayfs channels and write them in the paired tracefiles.
  *
  * @fd_pairs : paired channels and trace files.
  *
- * returns 0 on success, -1 on error.
+ * returns (void*)0 on success, (void*)-1 on error.
  *
  * Note that the high priority polled channels are consumed first. We then poll
  * again to see if these channels are still in priority. Only when no
@@ -454,13 +467,14 @@ int unmap_channels(struct channel_trace_fd *fd_pairs)
  * full.
  */
 
-int read_channels(struct channel_trace_fd *fd_pairs)
+void * read_channels(void *arg)
 {
        struct pollfd *pollfd;
        int i,j;
        int num_rdy, num_hup;
        int high_prio;
-       int ret;
+       int ret = 0;
+       struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg;
 
        /* Start polling the FD */
        
@@ -508,10 +522,19 @@ int read_channels(struct channel_trace_fd *fd_pairs)
                                        num_hup++;
                                        break;
                                case POLLPRI:
-                                       printf("Urgent read on fd %d\n", pollfd[i].fd);
-                                       /* Take care of high priority channels first. */
-                                       high_prio = 1;
-                                       ret |= read_subbuffer(&fd_pairs->pair[i]);
+                                       if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+                                               printf("Urgent read on fd %d\n", pollfd[i].fd);
+                                               /* Take care of high priority channels first. */
+                                               high_prio = 1;
+                                               /* it's ok to have an unavailable subbuffer */
+                                               ret = read_subbuffer(&fd_pairs->pair[i]);
+                                               if(ret == -EAGAIN) ret = 0;
+                                               else if(ret)
+                                                       printf("Error in read_subbuffer : %s\n", strerror(ret));
+                                               ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+                                               if(ret)
+                                                       printf("Error in mutex unlock : %s\n", strerror(ret));
+                                       }
                                        break;
                        }
                }
@@ -522,9 +545,18 @@ int read_channels(struct channel_trace_fd *fd_pairs)
                        for(i=0;i<fd_pairs->num_pairs;i++) {
                                switch(pollfd[i].revents) {
                                        case POLLIN:
-                                               /* Take care of low priority channels. */
-                                               printf("Normal read on fd %d\n", pollfd[i].fd);
-                                               ret |= read_subbuffer(&fd_pairs->pair[i]);
+                                               if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+                                                       /* Take care of low priority channels. */
+                                                       printf("Normal read on fd %d\n", pollfd[i].fd);
+                                                       /* it's ok to have an unavailable subbuffer */
+                                                       ret = read_subbuffer(&fd_pairs->pair[i]);
+                                                       if(ret == -EAGAIN) ret = 0;
+                                                       else if(ret)
+                                                               printf("Error in read_subbuffer : %s\n", strerror(ret));
+                                                       ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+                                                       if(ret)
+                                                               printf("Error in mutex unlock : %s\n", strerror(ret));
+                                               }
                                                break;
                                }
                        }
@@ -536,7 +568,7 @@ free_fd:
        free(pollfd);
 
 end:
-       return ret;
+       return (void*)ret;
 }
 
 
@@ -559,6 +591,9 @@ int main(int argc, char ** argv)
        int ret = 0;
        struct channel_trace_fd fd_pairs = { NULL, 0 };
        struct sigaction act;
+       pthread_t *tids;
+       unsigned int i;
+       void *tret;
        
        ret = parse_arguments(argc, argv);
 
@@ -595,8 +630,28 @@ int main(int argc, char ** argv)
        if(ret = map_channels(&fd_pairs))
                goto close_channel;
        
-       ret = read_channels(&fd_pairs);
+       tids = malloc(sizeof(pthread_t) * num_threads);
+       for(i=0; i<num_threads; i++) {
+               ret = pthread_create(&tids[i], NULL, read_channels, &fd_pairs);
+               if(ret) {
+                       perror("Error creating thread");
+                       break;
+               }
+       }
 
+       for(i=0; i<num_threads; i++) {
+               ret = pthread_join(tids[i], &tret);
+               if(ret) {
+                       perror("Error joining thread");
+                       break;
+               }
+               if((int)tret != 0) {
+                       printf("Error %s occured in thread %u\n", strerror(-(int)tret), i);
+               }
+       }
+
+       free(tids);
+                       
        ret |= unmap_channels(&fd_pairs);
 
 close_channel:
This page took 0.028305 seconds and 4 git commands to generate.