From 32f2b04a6c569bc0c62836827501dd402c5f7b7a Mon Sep 17 00:00:00 2001 From: compudj Date: Wed, 8 Mar 2006 05:34:11 +0000 Subject: [PATCH] reader starts to work git-svn-id: http://ltt.polymtl.ca/svn@1614 04897980-b3bd-0310-b5e0-8ef037075253 --- usertrace-fast/ltt-usertrace-fast.c | 183 +++++++++++++++++++++++++--- usertrace-fast/ltt-usertrace-fast.h | 140 +++++++++++++++++++++ 2 files changed, 308 insertions(+), 15 deletions(-) diff --git a/usertrace-fast/ltt-usertrace-fast.c b/usertrace-fast/ltt-usertrace-fast.c index 5f8c5e08..990b0211 100644 --- a/usertrace-fast/ltt-usertrace-fast.c +++ b/usertrace-fast/ltt-usertrace-fast.c @@ -41,6 +41,7 @@ * */ +#define _GNU_SOURCE #include #include #include @@ -58,13 +59,14 @@ #include #include #include -#include #include +#include #include //for get_cycles() #include "ltt-usertrace-fast.h" +enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH }; /* Writer (the traced application) */ @@ -119,9 +121,152 @@ static void handler_sigalarm(int signo) } /* Do a buffer switch. Don't switch if buffer is completely empty */ -static void flush_buffer(struct ltt_buf *ltt_buf) +static void flush_buffer(struct ltt_buf *ltt_buf, enum force_switch_mode mode) { + uint64_t tsc; + int offset_begin, offset_end, offset_old; + int reserve_commit_diff; + int consumed_old, consumed_new; + int commit_count, reserve_count; + int end_switch_old; + do { + offset_old = atomic_read(<t_buf->offset); + offset_begin = offset_old; + end_switch_old = 0; + tsc = ltt_get_timestamp(); + if(tsc == 0) { + /* Error in getting the timestamp : should not happen : it would + * mean we are called from an NMI during a write seqlock on xtime. */ + return; + } + + if(SUBBUF_OFFSET(offset_begin, ltt_buf) != 0) { + offset_begin = SUBBUF_ALIGN(offset_begin, ltt_buf); + end_switch_old = 1; + } else { + /* we do not have to switch : buffer is empty */ + return; + } + if(mode == FORCE_ACTIVE) + offset_begin += ltt_subbuf_header_len(ltt_buf); + /* Always begin_switch in FORCE_ACTIVE mode */ + + /* Test new buffer integrity */ + reserve_commit_diff = + atomic_read( + <t_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)]) + - atomic_read( + <t_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]); + if(reserve_commit_diff == 0) { + /* Next buffer not corrupted. */ + if(mode == FORCE_ACTIVE + && (offset_begin-atomic_read(<t_buf->consumed)) + >= ltt_buf->alloc_size) { + /* We do not overwrite non consumed buffers and we are full : ignore + switch while tracing is active. */ + return; + } + } else { + /* Next subbuffer corrupted. Force pushing reader even in normal mode */ + } + + offset_end = offset_begin; + } while(atomic_cmpxchg(<t_buf->offset, offset_old, offset_end) + != offset_old); + + + if(mode == FORCE_ACTIVE) { + /* Push the reader if necessary */ + do { + consumed_old = atomic_read(<t_buf->consumed); + /* If buffer is in overwrite mode, push the reader consumed count if + the write position has reached it and we are not at the first + iteration (don't push the reader farther than the writer). + This operation can be done concurrently by many writers in the + same buffer, the writer being at the fartest write position sub-buffer + index in the buffer being the one which will win this loop. */ + /* If the buffer is not in overwrite mode, pushing the reader only + happen if a sub-buffer is corrupted */ + if((SUBBUF_TRUNC(offset_end, ltt_buf) + - SUBBUF_TRUNC(consumed_old, ltt_buf)) + >= ltt_buf->alloc_size) + consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf); + else { + consumed_new = consumed_old; + break; + } + } while(atomic_cmpxchg(<t_buf->consumed, consumed_old, consumed_new) + != consumed_old); + + if(consumed_old != consumed_new) { + /* Reader pushed : we are the winner of the push, we can therefore + reequilibrate reserve and commit. Atomic increment of the commit + count permits other writers to play around with this variable + before us. We keep track of corrupted_subbuffers even in overwrite + mode : + we never want to write over a non completely committed sub-buffer : + possible causes : the buffer size is too low compared to the unordered + data input, or there is a writer who died between the reserve and the + commit. */ + if(reserve_commit_diff) { + /* We have to alter the sub-buffer commit count : a sub-buffer is + corrupted */ + atomic_add(reserve_commit_diff, + <t_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]); + atomic_inc(<t_buf->corrupted_subbuffers); + } + } + } + + /* Always switch */ + + if(end_switch_old) { + /* old subbuffer */ + /* Concurrency safe because we are the last and only thread to alter this + sub-buffer. As long as it is not delivered and read, no other thread can + alter the offset, alter the reserve_count or call the + client_buffer_end_callback on this sub-buffer. + The only remaining threads could be the ones with pending commits. They + will have to do the deliver themself. + Not concurrency safe in overwrite mode. We detect corrupted subbuffers with + commit and reserve counts. We keep a corrupted sub-buffers count and push + the readers across these sub-buffers. + Not concurrency safe if a writer is stalled in a subbuffer and + another writer switches in, finding out it's corrupted. The result will be + than the old (uncommited) subbuffer will be declared corrupted, and that + the new subbuffer will be declared corrupted too because of the commit + count adjustment. + Offset old should never be 0. */ + ltt_buffer_end_callback(ltt_buf, tsc, offset_old, + SUBBUF_INDEX((offset_old), ltt_buf)); + /* Setting this reserve_count will allow the sub-buffer to be delivered by + the last committer. */ + reserve_count = atomic_add_return((SUBBUF_OFFSET((offset_old-1), + ltt_buf) + 1), + <t_buf->reserve_count[SUBBUF_INDEX((offset_old), + ltt_buf)]); + if(reserve_count == atomic_read( + <t_buf->commit_count[SUBBUF_INDEX((offset_old), ltt_buf)])) { + ltt_deliver_callback(ltt_buf, SUBBUF_INDEX((offset_old), ltt_buf), NULL); + } + } + + if(mode == FORCE_ACTIVE) { + /* New sub-buffer */ + /* This code can be executed unordered : writers may already have written + to the sub-buffer before this code gets executed, caution. */ + /* The commit makes sure that this code is executed before the deliver + of this sub-buffer */ + ltt_buffer_begin_callback(ltt_buf, tsc, SUBBUF_INDEX(offset_begin, ltt_buf)); + commit_count = atomic_add_return(ltt_subbuf_header_len(ltt_buf), + <t_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]); + /* Check if the written buffer has to be delivered */ + if(commit_count == atomic_read( + <t_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])) { + ltt_deliver_callback(ltt_buf, SUBBUF_INDEX(offset_begin, ltt_buf), NULL); + } + } } @@ -163,7 +308,8 @@ static inline int ltt_buffer_put(struct ltt_buf *ltt_buf, } else { if(atomic_read(<t_buf->full) == 1) { /* tell the client that buffer is now unfull */ - ret = futex(<t_buf->full, FUTEX_WAKE, 1, NULL, NULL, 0); + ret = futex((unsigned long)<t_buf->full, + FUTEX_WAKE, 1, 0, 0, 0); if(ret != 1) { printf("LTT warning : race condition : writer not waiting or too many writers\n"); } @@ -182,7 +328,7 @@ static inline int ltt_buffer_put(struct ltt_buf *ltt_buf, *{ if(buffer_is_full) { atomic_set(<t_buf->full, 1); - ret = futex(<t_buf->full, 1, NULL, NULL, 0); + ret = do_futex((unsigned long)<t_buf->full, 1, 0, 0, 0); } } @@ -190,11 +336,12 @@ static inline int ltt_buffer_put(struct ltt_buf *ltt_buf, static int read_subbuffer(struct ltt_buf *ltt_buf, int fd) { + unsigned int consumed_old; int err; printf("LTT read buffer\n"); - err = ltt_buffer_get(&shared_trace_info->channel.cpu, &consumed_old); + err = ltt_buffer_get(ltt_buf, &consumed_old); if(err != -EAGAIN && err != 0) { printf("LTT Reserving sub buffer failed\n"); goto get_error; @@ -218,11 +365,11 @@ static int read_subbuffer(struct ltt_buf *ltt_buf, int fd) } #endif //0 write_error: - err = ltt_buffer_put(&shared_trace_info->channel.cpu, consumed_old); + err = ltt_buffer_put(ltt_buf, consumed_old); if(err != 0) { if(err == -EIO) { - perror("Reader has been pushed by the writer, last subbuffer corrupted."); + printf("Reader has been pushed by the writer, last subbuffer corrupted.\n"); /* FIXME : we may delete the last written buffer if we wish. */ } goto get_error; @@ -304,30 +451,32 @@ static void ltt_usertrace_fast_daemon(struct ltt_trace_info *shared_trace_info, printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid()); do { - ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu); + ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu); } while(ret == 0); do { - ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac); + ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac); } while(ret == 0); } - /* Buffer force switch (flush) */ - flush_buffer(&shared_trace_info->channel.cpu); + /* The parent thread is dead and we have finished with the buffer */ + + /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know + * there is no writer. */ + flush_buffer(&shared_trace_info->channel.cpu, FORCE_FLUSH); do { - ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu); + ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu); } while(ret == 0); - flush_buffer(&shared_trace_info->channel.facilities); + flush_buffer(&shared_trace_info->channel.facilities, FORCE_FLUSH); do { - ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac); + ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac); } while(ret == 0); close(fd_fac); close(fd_cpu); - /* The parent thread is dead and we have finished with the buffer */ munmap(shared_trace_info, sizeof(*shared_trace_info)); exit(0); @@ -358,9 +507,13 @@ void ltt_rw_init(void) atomic_set(&shared_trace_info->channel.facilities.full, 0); shared_trace_info->channel.facilities.alloc_size = LTT_BUF_SIZE_FACILITIES; shared_trace_info->channel.facilities.subbuf_size = LTT_SUBBUF_SIZE_FACILITIES; + shared_trace_info->channel.facilities.start = + shared_trace_info->channel.facilities_buf; + atomic_set(&shared_trace_info->channel.cpu.full, 0); shared_trace_info->channel.cpu.alloc_size = LTT_BUF_SIZE_CPU; shared_trace_info->channel.cpu.subbuf_size = LTT_SUBBUF_SIZE_CPU; + shared_trace_info->channel.cpu.start = shared_trace_info->channel.cpu_buf; shared_trace_info->init = 1; /* Disable signals */ diff --git a/usertrace-fast/ltt-usertrace-fast.h b/usertrace-fast/ltt-usertrace-fast.h index 811a2537..9953c15a 100644 --- a/usertrace-fast/ltt-usertrace-fast.h +++ b/usertrace-fast/ltt-usertrace-fast.h @@ -11,6 +11,14 @@ #include #include #include +#include +#include +#include + +#ifndef futex +static inline _syscall6(long, futex, unsigned long, uaddr, int, op, int, val, + unsigned long, timeout, unsigned long, uaddr2, int, val2) +#endif //futex #ifndef LTT_N_SUBBUFS @@ -46,13 +54,59 @@ (BUFFER_OFFSET(offset,buf)/buf->subbuf_size) +#define LTT_TRACER_MAGIC_NUMBER 0x00D6B7ED +#define LTT_TRACER_VERSION_MAJOR 0 +#define LTT_TRACER_VERSION_MINOR 7 + +#ifndef atomic_cmpxchg +#define atomic_cmpxchg(v, old, new) ((int)cmpxchg(&((v)->counter), old, new)) +#endif //atomic_cmpxchg +struct ltt_trace_header { + uint32_t magic_number; + uint32_t arch_type; + uint32_t arch_variant; + uint32_t float_word_order; /* Only useful for user space traces */ + uint8_t arch_size; + //uint32_t system_type; + uint8_t major_version; + uint8_t minor_version; + uint8_t flight_recorder; + uint8_t has_heartbeat; + uint8_t has_alignment; /* Event header alignment */ + uint32_t freq_scale; + uint64_t start_freq; + uint64_t start_tsc; + uint64_t start_monotonic; + uint64_t start_time_sec; + uint64_t start_time_usec; +} __attribute((packed)); + + +struct ltt_block_start_header { + struct { + uint64_t cycle_count; + uint64_t freq; /* khz */ + } begin; + struct { + uint64_t cycle_count; + uint64_t freq; /* khz */ + } end; + uint32_t lost_size; /* Size unused at the end of the buffer */ + uint32_t buf_size; /* The size of this sub-buffer */ + struct ltt_trace_header trace; +} __attribute((packed)); + + + struct ltt_buf { + void *start; atomic_t offset; atomic_t consumed; atomic_t reserve_count[LTT_N_SUBBUFS]; atomic_t commit_count[LTT_N_SUBBUFS]; atomic_t events_lost; + atomic_t corrupted_subbuffers; atomic_t full; /* futex on which the writer waits : 1 : full */ unsigned int alloc_size; unsigned int subbuf_size; @@ -71,10 +125,96 @@ struct ltt_trace_info { } channel; }; + + extern __thread struct ltt_trace_info *thread_trace_info; void ltt_thread_init(void); void ltt_usertrace_fast_buffer_switch(void); + + +static inline uint64_t ltt_get_timestamp() +{ + return get_cycles(); +} + +static inline unsigned int ltt_subbuf_header_len(struct ltt_buf *buf) +{ + return sizeof(struct ltt_block_start_header); +} + + + +static inline void ltt_write_trace_header(struct ltt_trace_header *header) +{ + header->magic_number = LTT_TRACER_MAGIC_NUMBER; + header->major_version = LTT_TRACER_VERSION_MAJOR; + header->minor_version = LTT_TRACER_VERSION_MINOR; + header->float_word_order = 0; //FIXME + header->arch_type = 0; //FIXME LTT_ARCH_TYPE; + header->arch_size = sizeof(void*); + header->arch_variant = 0; //FIXME LTT_ARCH_VARIANT; + header->flight_recorder = 0; + header->has_heartbeat = 0; + +#ifdef CONFIG_LTT_ALIGNMENT + header->has_alignment = sizeof(void*); +#else + header->has_alignment = 0; +#endif + + //FIXME + header->freq_scale = 0; + header->start_freq = 0; + header->start_tsc = 0; + header->start_monotonic = 0; + header->start_time_sec = 0; + header->start_time_usec = 0; +} + + +static inline void ltt_buffer_begin_callback(struct ltt_buf *buf, + uint64_t tsc, unsigned int subbuf_idx) +{ + struct ltt_block_start_header *header = + (struct ltt_block_start_header*) + (buf->start + (subbuf_idx*buf->subbuf_size)); + + header->begin.cycle_count = tsc; + header->begin.freq = 0; //ltt_frequency(); + + header->lost_size = 0xFFFFFFFF; // for debugging... + + header->buf_size = buf->subbuf_size; + + ltt_write_trace_header(&header->trace); + +} + + + +static inline void ltt_buffer_end_callback(struct ltt_buf *buf, + uint64_t tsc, unsigned int offset, unsigned int subbuf_idx) +{ + struct ltt_block_start_header *header = + (struct ltt_block_start_header*) + (buf->start + (subbuf_idx*buf->subbuf_size)); + /* offset is assumed to never be 0 here : never deliver a completely + * empty subbuffer. */ + /* The lost size is between 0 and subbuf_size-1 */ + header->lost_size = SUBBUF_OFFSET((buf->subbuf_size - offset), + buf); + header->end.cycle_count = tsc; + header->end.freq = 0; //ltt_frequency(); +} + + +static inline void ltt_deliver_callback(struct ltt_buf *buf, + unsigned subbuf_idx, + void *subbuf) +{ + ltt_usertrace_fast_buffer_switch(); +} #endif //_LTT_USERTRACE_FAST_H -- 2.34.1