futex
authorcompudj <compudj@04897980-b3bd-0310-b5e0-8ef037075253>
Wed, 8 Mar 2006 04:35:33 +0000 (04:35 +0000)
committercompudj <compudj@04897980-b3bd-0310-b5e0-8ef037075253>
Wed, 8 Mar 2006 04:35:33 +0000 (04:35 +0000)
git-svn-id: http://ltt.polymtl.ca/svn@1613 04897980-b3bd-0310-b5e0-8ef037075253

usertrace-fast/ltt-usertrace-fast.c
usertrace-fast/ltt-usertrace-fast.h

index c809aff096f0fb359bdaf141c2b45d3477b103af..5f8c5e08657a2b9e747e7b00af436939e36a8855 100644 (file)
@@ -58,6 +58,8 @@
 #include <fcntl.h>
 #include <stdlib.h>
 #include <sys/param.h>
+#include <linux/futex.h>
+#include <sys/time.h>
 
 #include <asm/timex.h> //for get_cycles()
 
@@ -116,6 +118,119 @@ static void handler_sigalarm(int signo)
        alarm(3);
 }
 
+/* Do a buffer switch. Don't switch if buffer is completely empty */
+static void flush_buffer(struct ltt_buf *ltt_buf)
+{
+
+
+}
+
+static inline int ltt_buffer_get(struct ltt_buf *ltt_buf,
+               unsigned int *offset)
+{
+       unsigned int consumed_old, consumed_idx;
+       consumed_old = atomic_read(&ltt_buf->consumed);
+       consumed_idx = SUBBUF_INDEX(consumed_old, ltt_buf);
+       
+       if(atomic_read(&ltt_buf->commit_count[consumed_idx])
+               != atomic_read(&ltt_buf->reserve_count[consumed_idx])) {
+               return -EAGAIN;
+       }
+       if((SUBBUF_TRUNC(atomic_read(&ltt_buf->offset), ltt_buf)
+                               -SUBBUF_TRUNC(consumed_old, ltt_buf)) == 0) {
+               return -EAGAIN;
+       }
+       
+       *offset = consumed_old;
+
+       return 0;
+}
+
+static inline int ltt_buffer_put(struct ltt_buf *ltt_buf,
+               unsigned int offset)
+{
+       unsigned int consumed_old, consumed_new;
+       int ret;
+
+       consumed_old = offset;
+       consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
+       if(atomic_cmpxchg(&ltt_buf->consumed, consumed_old, consumed_new)
+                       != consumed_old) {
+               /* We have been pushed by the writer : the last buffer read _is_
+                * corrupted!
+                * It can also happen if this is a buffer we never got. */
+               return -EIO;
+       } else {
+               if(atomic_read(&ltt_buf->full) == 1) {
+                       /* tell the client that buffer is now unfull */
+                       ret = futex(&ltt_buf->full, FUTEX_WAKE, 1, NULL, NULL, 0);
+                       if(ret != 1) {
+                               printf("LTT warning : race condition : writer not waiting or too many writers\n");
+                       }
+                       atomic_set(&ltt_buf->full, 0);
+               }
+       }
+}
+
+/* In the writer :
+ *
+ * if(buffer full condition) {
+ *   put myself in the wait queue
+ *   ltt_buf->full = 1;
+ *   schedule
+ * }
+ *{
+       if(buffer_is_full) {
+               atomic_set(&ltt_buf->full, 1);
+               ret = futex(&ltt_buf->full, 1, NULL, NULL, 0);
+       }
+}
+
+ */
+
+static int read_subbuffer(struct ltt_buf *ltt_buf, int fd)
+{
+       int err;
+       printf("LTT read buffer\n");
+
+
+       err = ltt_buffer_get(&shared_trace_info->channel.cpu, &consumed_old);
+       if(err != -EAGAIN && err != 0) {
+               printf("LTT Reserving sub buffer failed\n");
+               goto get_error;
+       }
+
+       err = TEMP_FAILURE_RETRY(write(fd,
+                               ltt_buf->start 
+                                       + (consumed_old & ((ltt_buf->alloc_size)-1)),
+                               ltt_buf->subbuf_size));
+
+       if(err < 0) {
+               perror("Error in writing to file");
+               goto write_error;
+       }
+#if 0
+       err = fsync(pair->trace);
+       if(err < 0) {
+               ret = errno;
+               perror("Error in writing to file");
+               goto write_error;
+       }
+#endif //0
+write_error:
+       err = ltt_buffer_put(&shared_trace_info->channel.cpu, consumed_old);
+
+       if(err != 0) {
+               if(err == -EIO) {
+                       perror("Reader has been pushed by the writer, last subbuffer corrupted.");
+                       /* FIXME : we may delete the last written buffer if we wish. */
+               }
+               goto get_error;
+       }
+
+get_error:
+       return err;
+}
 
 /* This function is called by ltt_rw_init which has signals blocked */
 static void ltt_usertrace_fast_daemon(struct ltt_trace_info *shared_trace_info,
@@ -187,12 +302,28 @@ static void ltt_usertrace_fast_daemon(struct ltt_trace_info *shared_trace_info,
                if(traced_pid == 0) break; /* parent died */
                if(parent_exited) break;
                printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
-               //printf("Test parent. pid is : %lu, ppid is %lu\n", getpid(), getppid());
+       
+               do {
+                       ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu);
+               } while(ret == 0);
+
+               do {
+                       ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac);
+               } while(ret == 0);
        }
 
        /* Buffer force switch (flush) */
-       //TODO
-       
+       flush_buffer(&shared_trace_info->channel.cpu);
+       do {
+               ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu);
+       } while(ret == 0);
+
+
+       flush_buffer(&shared_trace_info->channel.facilities);
+       do {
+               ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac);
+       } while(ret == 0);
+
        close(fd_fac);
        close(fd_cpu);
        
@@ -222,6 +353,14 @@ void ltt_rw_init(void)
        shared_trace_info = mmap(0, sizeof(*thread_trace_info),
                        PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, 0, 0);
        memset(shared_trace_info, 0, sizeof(*shared_trace_info));
+       /* Tricky semaphore : is in a shared memory space, so it's ok for a fast
+        * mutex (futex). */
+       atomic_set(&shared_trace_info->channel.facilities.full, 0);
+       shared_trace_info->channel.facilities.alloc_size = LTT_BUF_SIZE_FACILITIES;
+       shared_trace_info->channel.facilities.subbuf_size = LTT_SUBBUF_SIZE_FACILITIES;
+       atomic_set(&shared_trace_info->channel.cpu.full, 0);
+       shared_trace_info->channel.cpu.alloc_size = LTT_BUF_SIZE_CPU;
+       shared_trace_info->channel.cpu.subbuf_size = LTT_SUBBUF_SIZE_CPU;
        shared_trace_info->init = 1;
 
        /* Disable signals */
index fcd67e53d9ad4d381486933cf31a850ec5be283d..811a2537199a9ef8ad3ecae9e9959bfc6469e29a 100644 (file)
 #include <asm/atomic.h>
 #include <pthread.h>
 
-#ifndef        LTT_BUF_SIZE_CPU
-#define LTT_BUF_SIZE_CPU 1048576
+
+#ifndef        LTT_N_SUBBUFS
+#define LTT_N_SUBBUFS 2
+#endif //LTT_N_SUBBUFS
+
+#ifndef        LTT_SUBBUF_SIZE_CPU
+#define LTT_SUBBUF_SIZE_CPU 1048576
 #endif //LTT_BUF_SIZE_CPU
 
-#ifndef        LTT_BUF_SIZE_FACILITIES
-#define LTT_BUF_SIZE_FACILITIES 4096
+#define LTT_BUF_SIZE_CPU (LTT_SUBBUF_SIZE_CPU * LTT_N_SUBBUFS)
+
+#ifndef        LTT_SUBBUF_SIZE_FACILITIES
+#define LTT_SUBBUF_SIZE_FACILITIES 4096
 #endif //LTT_BUF_SIZE_FACILITIES
 
+#define LTT_BUF_SIZE_FACILITIES  (LTT_SUBBUF_SIZE_FACILITIES * LTT_N_SUBBUFS)
+
 #ifndef LTT_USERTRACE_ROOT
 #define LTT_USERTRACE_ROOT "/tmp/ltt-usertrace"
 #endif //LTT_USERTRACE_ROOT
 
+
+/* Buffer offset macros */
+
+#define BUFFER_OFFSET(offset, buf) (offset & (buf->alloc_size-1))
+#define SUBBUF_OFFSET(offset, buf) (offset & (buf->subbuf_size-1))
+#define SUBBUF_ALIGN(offset, buf) \
+  (((offset) + buf->subbuf_size) & (~(buf->subbuf_size-1)))
+#define SUBBUF_TRUNC(offset, buf) \
+  ((offset) & (~(buf->subbuf_size-1)))
+#define SUBBUF_INDEX(offset, buf) \
+  (BUFFER_OFFSET(offset,buf)/buf->subbuf_size)
+
+
 struct ltt_buf {
        atomic_t        offset;
-       atomic_t        reserve_count;
-       atomic_t        commit_count;
+       atomic_t        consumed;
+       atomic_t        reserve_count[LTT_N_SUBBUFS];
+       atomic_t        commit_count[LTT_N_SUBBUFS];
 
        atomic_t        events_lost;
+       atomic_t        full;   /* futex on which the writer waits : 1 : full */
+       unsigned int    alloc_size;
+       unsigned int    subbuf_size;
 };
 
 struct ltt_trace_info {
        int init;
        int filter;
-#ifndef LTT_USE_THREADS
        pid_t daemon_id;
-#else
-       pthread_t daemon_id;
-#endif //LTT_USE_THREADS
        atomic_t nesting;
        struct {
                struct ltt_buf facilities;
This page took 0.026345 seconds and 4 git commands to generate.