X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=ltt%2Fbranches%2Fpoly%2Flttd%2Flttd.c;h=2601d805df0187e82019d1460ffbfd5efeee219f;hb=f5eafc51d456f751a9c22e7d62b1f5c74b84c15f;hp=737822e168d533b0a9d5ad0b8434aa1648d036ec;hpb=02ba971eed95c26127ba243871eb0de89592ba1c;p=lttv.git diff --git a/ltt/branches/poly/lttd/lttd.c b/ltt/branches/poly/lttd/lttd.c index 737822e1..2601d805 100644 --- a/ltt/branches/poly/lttd/lttd.c +++ b/ltt/branches/poly/lttd/lttd.c @@ -14,7 +14,9 @@ #include #endif +#define _REENTRANT #define _GNU_SOURCE +#include #include #include #include @@ -27,6 +29,7 @@ #include #include #include +#include /* Relayfs IOCTL */ #include @@ -35,7 +38,7 @@ /* Get the next sub buffer that can be read. */ #define RELAYFS_GET_SUBBUF _IOR(0xF4, 0x00,__u32) /* Release the oldest reserved (by "get") sub buffer. */ -#define RELAYFS_PUT_SUBBUF _IO(0xF4, 0x01) +#define RELAYFS_PUT_SUBBUF _IOW(0xF4, 0x01,__u32) /* returns the number of sub buffers in the per cpu channel. */ #define RELAYFS_GET_N_SUBBUFS _IOR(0xF4, 0x02,__u32) /* returns the size of the sub buffers. */ @@ -56,6 +59,7 @@ struct fd_pair { unsigned int n_subbufs; unsigned int subbuf_size; void *mmap; + pthread_mutex_t mutex; }; struct channel_trace_fd { @@ -67,7 +71,7 @@ static char *trace_name = NULL; static char *channel_name = NULL; static int daemon_mode = 0; static int append_mode = 0; -static int sig_parent = 0; +static unsigned long num_threads = 1; volatile static int quit_program = 0; /* For signal handler */ /* Args : @@ -76,7 +80,7 @@ volatile static int quit_program = 0; /* For signal handler */ * -c directory Root directory of the relayfs trace channels. * -d Run in background (daemon). * -a Trace append mode. - * -s Send SIGIO to parent when ready for IO. + * -s Send SIGUSR1 to parent when ready for IO. */ void show_arguments(void) { @@ -87,7 +91,7 @@ void show_arguments(void) printf("-c directory Root directory of the relayfs trace channels.\n"); printf("-d Run in background (daemon).\n"); printf("-a Append to an possibly existing trace.\n"); - printf("-s Send SIGIO to parent when ready for IO.\n"); + printf("-N Number of threads to start.\n"); printf("\n"); } @@ -133,8 +137,11 @@ int parse_arguments(int argc, char **argv) case 'a': append_mode = 1; break; - case 's': - sig_parent = 1; + case 'N': + if(argn+1 < argc) { + num_threads = strtoul(argv[argn+1], NULL, 0); + argn++; + } break; default: printf("Invalid argument '%s'.\n", argv[argn]); @@ -209,9 +216,7 @@ int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name, printf("Creating trace subdirectory %s\n", subtrace_name); ret = mkdir(subtrace_name, S_IRWXU|S_IRWXG|S_IRWXO); if(ret == -1) { - if(errno == EEXIST && append_mode) { - printf("Appending to directory %s as resquested\n", subtrace_name); - } else { + if(errno != EEXIST) { perror(subtrace_name); open_ret = -1; goto end; @@ -304,35 +309,47 @@ end: int read_subbuffer(struct fd_pair *pair) { - unsigned int subbuf_index; - int err, ret; + unsigned int consumed_old; + int err, ret=0; err = ioctl(pair->channel, RELAYFS_GET_SUBBUF, - &subbuf_index); - printf("index : %u\n", subbuf_index); + &consumed_old); + printf("cookie : %u\n", consumed_old); if(err != 0) { - perror("Error in reserving sub buffer"); - ret = -EPERM; + ret = errno; + perror("Reserving sub buffer failed (everything is normal)"); goto get_error; } err = TEMP_FAILURE_RETRY(write(pair->trace, - pair->mmap + (subbuf_index * pair->subbuf_size), + pair->mmap + + (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)), pair->subbuf_size)); if(err < 0) { + ret = errno; perror("Error in writing to file"); - ret = err; goto write_error; } - - +#if 0 + err = fsync(pair->trace); + if(err < 0) { + ret = errno; + perror("Error in writing to file"); + goto write_error; + } +#endif //0 write_error: - err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF); + err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old); if(err != 0) { - perror("Error in unreserving sub buffer"); - ret = -EPERM; + ret = errno; + if(errno == -EFAULT) { + perror("Error in unreserving sub buffer\n"); + } else if(errno == -EIO) { + perror("Reader has been pushed by the writer, last subbuffer corrupted."); + /* FIXME : we may delete the last written buffer if we wish. */ + } goto get_error; } @@ -341,29 +358,11 @@ get_error: } -/* read_channels - * - * Read the realyfs channels and write them in the paired tracefiles. - * - * @fd_pairs : paired channels and trace files. - * - * returns 0 on success, -1 on error. - * - * Note that the high priority polled channels are consumed first. We then poll - * again to see if these channels are still in priority. Only when no - * high priority channel is left, we start reading low priority channels. - * - * Note that a channel is considered high priority when the buffer is almost - * full. - */ -int read_channels(struct channel_trace_fd *fd_pairs) +int map_channels(struct channel_trace_fd *fd_pairs) { - struct pollfd *pollfd; int i,j; - int num_rdy, num_hup; - int high_prio; - int ret; + int ret=0; if(fd_pairs->num_pairs <= 0) { printf("No channel to read\n"); @@ -387,6 +386,11 @@ int read_channels(struct channel_trace_fd *fd_pairs) perror("Error in getting the size of the subbuffers"); goto end; } + ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */ + if(ret != 0) { + perror("Error in mutex init"); + goto end; + } } /* Mmap each FD */ @@ -401,6 +405,82 @@ int read_channels(struct channel_trace_fd *fd_pairs) } } + goto end; /* success */ + + /* Error handling */ + /* munmap only the successfully mmapped indexes */ +munmap: + /* Munmap each FD */ + for(j=0;jpair[j]; + int err_ret; + + err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); + if(err_ret != 0) { + perror("Error in munmap"); + } + ret |= err_ret; + } + +end: + return ret; + + +} + + +int unmap_channels(struct channel_trace_fd *fd_pairs) +{ + int j; + int ret=0; + + /* Munmap each FD */ + for(j=0;jnum_pairs;j++) { + struct fd_pair *pair = &fd_pairs->pair[j]; + int err_ret; + + err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); + if(err_ret != 0) { + perror("Error in munmap"); + } + ret |= err_ret; + err_ret = pthread_mutex_destroy(&pair->mutex); + if(err_ret != 0) { + perror("Error in mutex destroy"); + } + ret |= err_ret; + } + + return ret; +} + + +/* read_channels + * + * Thread worker. + * + * Read the relayfs channels and write them in the paired tracefiles. + * + * @fd_pairs : paired channels and trace files. + * + * returns (void*)0 on success, (void*)-1 on error. + * + * Note that the high priority polled channels are consumed first. We then poll + * again to see if these channels are still in priority. Only when no + * high priority channel is left, we start reading low priority channels. + * + * Note that a channel is considered high priority when the buffer is almost + * full. + */ + +void * read_channels(void *arg) +{ + struct pollfd *pollfd; + int i,j; + int num_rdy, num_hup; + int high_prio; + int ret = 0; + struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg; /* Start polling the FD */ @@ -412,9 +492,6 @@ int read_channels(struct channel_trace_fd *fd_pairs) pollfd[i].events = POLLIN|POLLPRI; } - /* Signal the parent that ready for IO */ - if(sig_parent) kill(getppid(), SIGIO); - while(1) { high_prio = 0; num_hup = 0; @@ -451,10 +528,18 @@ int read_channels(struct channel_trace_fd *fd_pairs) num_hup++; break; case POLLPRI: - printf("Urgent read on fd %d\n", pollfd[i].fd); - /* Take care of high priority channels first. */ - high_prio = 1; - ret |= read_subbuffer(&fd_pairs->pair[i]); + if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) { + printf("Urgent read on fd %d\n", pollfd[i].fd); + /* Take care of high priority channels first. */ + high_prio = 1; + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i]); + if(ret == -EAGAIN) ret = 0; + + ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } break; } } @@ -465,9 +550,17 @@ int read_channels(struct channel_trace_fd *fd_pairs) for(i=0;inum_pairs;i++) { switch(pollfd[i].revents) { case POLLIN: - /* Take care of low priority channels. */ - printf("Normal read on fd %d\n", pollfd[i].fd); - ret |= read_subbuffer(&fd_pairs->pair[i]); + if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) { + /* Take care of low priority channels. */ + printf("Normal read on fd %d\n", pollfd[i].fd); + /* it's ok to have an unavailable subbuffer */ + ret = read_subbuffer(&fd_pairs->pair[i]); + if(ret == -EAGAIN) ret = 0; + + ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex); + if(ret) + printf("Error in mutex unlock : %s\n", strerror(ret)); + } break; } } @@ -478,23 +571,8 @@ int read_channels(struct channel_trace_fd *fd_pairs) free_fd: free(pollfd); - /* munmap only the successfully mmapped indexes */ - i = fd_pairs->num_pairs; -munmap: - /* Munmap each FD */ - for(j=0;jpair[j]; - int err_ret; - - err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs); - if(err_ret != 0) { - perror("Error in munmap"); - } - ret |= err_ret; - } - end: - return ret; + return (void*)ret; } @@ -514,10 +592,12 @@ void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs) int main(int argc, char ** argv) { - int ret; - pid_t pid; + int ret = 0; struct channel_trace_fd fd_pairs = { NULL, 0 }; struct sigaction act; + pthread_t *tids; + unsigned int i; + void *tret; ret = parse_arguments(argc, argv); @@ -528,18 +608,13 @@ int main(int argc, char ** argv) show_info(); if(daemon_mode) { - pid = fork(); - - if(pid > 0) { - /* parent */ - return 0; - } else if(pid < 0) { - /* error */ - printf("An error occured while forking.\n"); - return -1; - } - /* else, we are the child, continue... */ - } + ret = daemon(0, 0); + + if(ret == -1) { + perror("An error occured while daemonizing."); + exit(-1); + } + } /* Connect the signal handlers */ act.sa_handler = handler; @@ -552,11 +627,36 @@ int main(int argc, char ** argv) sigaction(SIGQUIT, &act, NULL); sigaction(SIGINT, &act, NULL); - //return 0; + if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs)) goto close_channel; - ret = read_channels(&fd_pairs); + if(ret = map_channels(&fd_pairs)) + goto close_channel; + + tids = malloc(sizeof(pthread_t) * num_threads); + for(i=0; i