From 22acb617fc3d3c8e0b7a95de89679f91662aa387 Mon Sep 17 00:00:00 2001 From: compudj Date: Thu, 2 Feb 2006 15:03:25 +0000 Subject: [PATCH] multithreaded lttd git-svn-id: http://ltt.polymtl.ca/svn@1508 04897980-b3bd-0310-b5e0-8ef037075253 --- ltt/branches/poly/lttctl/lttctl.c | 12 +++- ltt/branches/poly/lttd/lttd.c | 95 ++++++++++++++++++++++++------- 2 files changed, 85 insertions(+), 22 deletions(-) diff --git a/ltt/branches/poly/lttctl/lttctl.c b/ltt/branches/poly/lttctl/lttctl.c index 105aacb6..c14e3c90 100644 --- a/ltt/branches/poly/lttctl/lttctl.c +++ b/ltt/branches/poly/lttctl/lttctl.c @@ -48,6 +48,7 @@ static enum trace_mode mode = LTT_TRACE_NORMAL; static enum trace_ctl_op op = CTL_OP_NONE; static char *channel_root = NULL; static char *trace_root = NULL; +static char *num_threads = "1"; static int sigchld_received = 0; @@ -85,6 +86,7 @@ void show_arguments(void) printf("-x Number of subbuffers\n"); printf("-e Get XML facilities description\n"); printf("-a Append to trace\n"); + printf("-N Number of ltt threads\n"); printf("\n"); } @@ -208,6 +210,12 @@ int parse_arguments(int argc, char **argv) case 'a': append_trace = 1; break; + case 'N': + if(argn+1 < argc) { + num_threads = argv[argn+1]; + argn++; + } + break; default: printf("Invalid argument '%s'.\n", argv[argn]); printf("\n"); @@ -411,10 +419,10 @@ int lttctl_daemon(struct lttctl_handle *handle, char *trace_name) int ret; if(append_trace) ret = execlp(lttd_path, lttd_path, "-t", trace_root, "-c", - channel_path, "-d", "-a", NULL); + channel_path, "-d", "-a", "-N", num_threads, NULL); else ret = execlp(lttd_path, lttd_path, "-t", trace_root, "-c", - channel_path, "-d", NULL); + channel_path, "-d", "-N", num_threads, NULL); if(ret) { ret = errno; perror("Error in executing the lttd daemon"); diff --git a/ltt/branches/poly/lttd/lttd.c b/ltt/branches/poly/lttd/lttd.c index 2fa7fe8d..23015918 100644 --- a/ltt/branches/poly/lttd/lttd.c +++ b/ltt/branches/poly/lttd/lttd.c @@ -59,6 +59,7 @@ struct fd_pair { unsigned int n_subbufs; unsigned int subbuf_size; void *mmap; + pthread_mutex_t mutex; }; struct channel_trace_fd { @@ -90,7 +91,7 @@ void show_arguments(void) printf("-c directory Root directory of the relayfs trace channels.\n"); printf("-d Run in background (daemon).\n"); printf("-a Append to an possibly existing trace.\n"); - printf("-n Number of threads to start.\n"); + printf("-N Number of threads to start.\n"); printf("\n"); } @@ -136,7 +137,7 @@ int parse_arguments(int argc, char **argv) case 'a': append_mode = 1; break; - case 'n': + case 'N': if(argn+1 < argc) { num_threads = strtoul(argv[argn+1], NULL, 0); argn++; @@ -309,15 +310,15 @@ end: int read_subbuffer(struct fd_pair *pair) { unsigned int consumed_old; - int err, ret; + int err, ret=0; err = ioctl(pair->channel, RELAYFS_GET_SUBBUF, &consumed_old); printf("cookie : %u\n", consumed_old); if(err != 0) { + ret = errno; perror("Error in reserving sub buffer"); - ret = -EPERM; goto get_error; } @@ -327,8 +328,8 @@ int read_subbuffer(struct fd_pair *pair) pair->subbuf_size)); if(err < 0) { + ret = errno; perror("Error in writing to file"); - ret = err; goto write_error; } @@ -336,13 +337,12 @@ int read_subbuffer(struct fd_pair *pair) write_error: err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old); if(err != 0) { + ret = errno; if(errno == -EFAULT) { - perror("Error in unreserving sub buffer"); - ret = -EFAULT; + perror("Error in unreserving sub buffer\n"); } else if(errno == -EIO) { perror("Reader has been pushed by the writer, last subbuffer corrupted."); /* FIXME : we may delete the last written buffer if we wish. */ - ret = -EIO; } goto get_error; } @@ -380,6 +380,11 @@ int map_channels(struct channel_trace_fd *fd_pairs) perror("Error in getting the size of the subbuffers"); goto end; } + ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */ + if(ret != 0) { + perror("Error in mutex init"); + goto end; + } } /* Mmap each FD */ @@ -394,6 +399,7 @@ int map_channels(struct channel_trace_fd *fd_pairs) } } + goto end; /* success */ /* Error handling */ /* munmap only the successfully mmapped indexes */ @@ -432,6 +438,11 @@ int unmap_channels(struct channel_trace_fd *fd_pairs) perror("Error in munmap"); } ret |= err_ret; + err_ret = pthread_mutex_destroy(&pair->mutex); + if(err_ret != 0) { + perror("Error in mutex destroy"); + } + ret |= err_ret; } return ret; @@ -439,12 +450,14 @@ int unmap_channels(struct channel_trace_fd *fd_pairs) /* read_channels + * + * Thread worker. * * Read the relayfs channels and write them in the paired tracefiles. * * @fd_pairs : paired channels and trace files. * - * returns 0 on success, -1 on error. + * returns (void*)0 on success, (void*)-1 on error. * * Note that the high priority polled channels are consumed first. We then poll * again to see if these channels are still in priority. Only when no @@ -454,13 +467,14 @@ int unmap_channels(struct channel_trace_fd *fd_pairs) * full. */ -int read_channels(struct channel_trace_fd *fd_pairs) +void * read_channels(void *arg) { struct pollfd *pollfd; int i,j; int num_rdy, num_hup; int high_prio; - int ret; + int ret = 0; + struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg; /* Start polling the FD */ @@ -508,10 +522,19 @@ int read_channels(struct channel_trace_fd *fd_pairs) num_hup++; break; case POLLPRI: - printf("Urgent read on fd %d\n", pollfd[i].fd); - /* Take care of high priority channels first. */ - high_prio = 1; - ret |= read_subbuffer(&fd_pairs->pair[i]); + if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) { + printf("Urgent read on fd %d\n", pollfd[i].fd); + /* Take care of high priority channels first. */ + high_prio = 1; + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i]); + if(ret == -EAGAIN) ret = 0; + else if(ret) + printf("Error in read_subbuffer : %s\n", strerror(ret)); + ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } break; } } @@ -522,9 +545,18 @@ int read_channels(struct channel_trace_fd *fd_pairs) for(i=0;inum_pairs;i++) { switch(pollfd[i].revents) { case POLLIN: - /* Take care of low priority channels. */ - printf("Normal read on fd %d\n", pollfd[i].fd); - ret |= read_subbuffer(&fd_pairs->pair[i]); + if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) { + /* Take care of low priority channels. */ + printf("Normal read on fd %d\n", pollfd[i].fd); + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i]); + if(ret == -EAGAIN) ret = 0; + else if(ret) + printf("Error in read_subbuffer : %s\n", strerror(ret)); + ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } break; } } @@ -536,7 +568,7 @@ free_fd: free(pollfd); end: - return ret; + return (void*)ret; } @@ -559,6 +591,9 @@ int main(int argc, char ** argv) int ret = 0; struct channel_trace_fd fd_pairs = { NULL, 0 }; struct sigaction act; + pthread_t *tids; + unsigned int i; + void *tret; ret = parse_arguments(argc, argv); @@ -595,8 +630,28 @@ int main(int argc, char ** argv) if(ret = map_channels(&fd_pairs)) goto close_channel; - ret = read_channels(&fd_pairs); + tids = malloc(sizeof(pthread_t) * num_threads); + for(i=0; i