unsigned int n_subbufs;
unsigned int subbuf_size;
void *mmap;
+ pthread_mutex_t mutex;
};
struct channel_trace_fd {
printf("-c directory Root directory of the relayfs trace channels.\n");
printf("-d Run in background (daemon).\n");
printf("-a Append to an possibly existing trace.\n");
- printf("-n Number of threads to start.\n");
+ printf("-N Number of threads to start.\n");
printf("\n");
}
case 'a':
append_mode = 1;
break;
- case 'n':
+ case 'N':
if(argn+1 < argc) {
num_threads = strtoul(argv[argn+1], NULL, 0);
argn++;
int read_subbuffer(struct fd_pair *pair)
{
unsigned int consumed_old;
- int err, ret;
+ int err, ret=0;
err = ioctl(pair->channel, RELAYFS_GET_SUBBUF,
&consumed_old);
printf("cookie : %u\n", consumed_old);
if(err != 0) {
+ ret = errno;
perror("Error in reserving sub buffer");
- ret = -EPERM;
goto get_error;
}
pair->subbuf_size));
if(err < 0) {
+ ret = errno;
perror("Error in writing to file");
- ret = err;
goto write_error;
}
write_error:
err = ioctl(pair->channel, RELAYFS_PUT_SUBBUF, &consumed_old);
if(err != 0) {
+ ret = errno;
if(errno == -EFAULT) {
- perror("Error in unreserving sub buffer");
- ret = -EFAULT;
+ perror("Error in unreserving sub buffer\n");
} else if(errno == -EIO) {
perror("Reader has been pushed by the writer, last subbuffer corrupted.");
/* FIXME : we may delete the last written buffer if we wish. */
- ret = -EIO;
}
goto get_error;
}
perror("Error in getting the size of the subbuffers");
goto end;
}
+ ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */
+ if(ret != 0) {
+ perror("Error in mutex init");
+ goto end;
+ }
}
/* Mmap each FD */
}
}
+ goto end; /* success */
/* Error handling */
/* munmap only the successfully mmapped indexes */
perror("Error in munmap");
}
ret |= err_ret;
+ err_ret = pthread_mutex_destroy(&pair->mutex);
+ if(err_ret != 0) {
+ perror("Error in mutex destroy");
+ }
+ ret |= err_ret;
}
return ret;
/* read_channels
+ *
+ * Thread worker.
*
* Read the relayfs channels and write them in the paired tracefiles.
*
* @fd_pairs : paired channels and trace files.
*
- * returns 0 on success, -1 on error.
+ * returns (void*)0 on success, (void*)-1 on error.
*
* Note that the high priority polled channels are consumed first. We then poll
* again to see if these channels are still in priority. Only when no
* full.
*/
-int read_channels(struct channel_trace_fd *fd_pairs)
+void * read_channels(void *arg)
{
struct pollfd *pollfd;
int i,j;
int num_rdy, num_hup;
int high_prio;
- int ret;
+ int ret = 0;
+ struct channel_trace_fd *fd_pairs = (struct channel_trace_fd *)arg;
/* Start polling the FD */
num_hup++;
break;
case POLLPRI:
- printf("Urgent read on fd %d\n", pollfd[i].fd);
- /* Take care of high priority channels first. */
- high_prio = 1;
- ret |= read_subbuffer(&fd_pairs->pair[i]);
+ if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+ printf("Urgent read on fd %d\n", pollfd[i].fd);
+ /* Take care of high priority channels first. */
+ high_prio = 1;
+ /* it's ok to have an unavailable subbuffer */
+ ret = read_subbuffer(&fd_pairs->pair[i]);
+ if(ret == -EAGAIN) ret = 0;
+ else if(ret)
+ printf("Error in read_subbuffer : %s\n", strerror(ret));
+ ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+ if(ret)
+ printf("Error in mutex unlock : %s\n", strerror(ret));
+ }
break;
}
}
for(i=0;i<fd_pairs->num_pairs;i++) {
switch(pollfd[i].revents) {
case POLLIN:
- /* Take care of low priority channels. */
- printf("Normal read on fd %d\n", pollfd[i].fd);
- ret |= read_subbuffer(&fd_pairs->pair[i]);
+ if(pthread_mutex_trylock(&fd_pairs->pair[i].mutex) == 0) {
+ /* Take care of low priority channels. */
+ printf("Normal read on fd %d\n", pollfd[i].fd);
+ /* it's ok to have an unavailable subbuffer */
+ ret = read_subbuffer(&fd_pairs->pair[i]);
+ if(ret == -EAGAIN) ret = 0;
+ else if(ret)
+ printf("Error in read_subbuffer : %s\n", strerror(ret));
+ ret = pthread_mutex_unlock(&fd_pairs->pair[i].mutex);
+ if(ret)
+ printf("Error in mutex unlock : %s\n", strerror(ret));
+ }
break;
}
}
free(pollfd);
end:
- return ret;
+ return (void*)ret;
}
int ret = 0;
struct channel_trace_fd fd_pairs = { NULL, 0 };
struct sigaction act;
+ pthread_t *tids;
+ unsigned int i;
+ void *tret;
ret = parse_arguments(argc, argv);
if(ret = map_channels(&fd_pairs))
goto close_channel;
- ret = read_channels(&fd_pairs);
+ tids = malloc(sizeof(pthread_t) * num_threads);
+ for(i=0; i<num_threads; i++) {
+ ret = pthread_create(&tids[i], NULL, read_channels, &fd_pairs);
+ if(ret) {
+ perror("Error creating thread");
+ break;
+ }
+ }
+ for(i=0; i<num_threads; i++) {
+ ret = pthread_join(tids[i], &tret);
+ if(ret) {
+ perror("Error joining thread");
+ break;
+ }
+ if((int)tret != 0) {
+ printf("Error %s occured in thread %u\n", strerror(-(int)tret), i);
+ }
+ }
+
+ free(tids);
+
ret |= unmap_channels(&fd_pairs);
close_channel: