*
*/
+#define _GNU_SOURCE
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/param.h>
-#include <linux/futex.h>
#include <sys/time.h>
+#include <asm/atomic.h>
#include <asm/timex.h> //for get_cycles()
#include "ltt-usertrace-fast.h"
+enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
/* Writer (the traced application) */
}
/* Do a buffer switch. Don't switch if buffer is completely empty */
-static void flush_buffer(struct ltt_buf *ltt_buf)
+static void flush_buffer(struct ltt_buf *ltt_buf, enum force_switch_mode mode)
{
+ uint64_t tsc;
+ int offset_begin, offset_end, offset_old;
+ int reserve_commit_diff;
+ int consumed_old, consumed_new;
+ int commit_count, reserve_count;
+ int end_switch_old;
+ do {
+ offset_old = atomic_read(<t_buf->offset);
+ offset_begin = offset_old;
+ end_switch_old = 0;
+ tsc = ltt_get_timestamp();
+ if(tsc == 0) {
+ /* Error in getting the timestamp : should not happen : it would
+ * mean we are called from an NMI during a write seqlock on xtime. */
+ return;
+ }
+
+ if(SUBBUF_OFFSET(offset_begin, ltt_buf) != 0) {
+ offset_begin = SUBBUF_ALIGN(offset_begin, ltt_buf);
+ end_switch_old = 1;
+ } else {
+ /* we do not have to switch : buffer is empty */
+ return;
+ }
+ if(mode == FORCE_ACTIVE)
+ offset_begin += ltt_subbuf_header_len(ltt_buf);
+ /* Always begin_switch in FORCE_ACTIVE mode */
+
+ /* Test new buffer integrity */
+ reserve_commit_diff =
+ atomic_read(
+ <t_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])
+ - atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
+ if(reserve_commit_diff == 0) {
+ /* Next buffer not corrupted. */
+ if(mode == FORCE_ACTIVE
+ && (offset_begin-atomic_read(<t_buf->consumed))
+ >= ltt_buf->alloc_size) {
+ /* We do not overwrite non consumed buffers and we are full : ignore
+ switch while tracing is active. */
+ return;
+ }
+ } else {
+ /* Next subbuffer corrupted. Force pushing reader even in normal mode */
+ }
+
+ offset_end = offset_begin;
+ } while(atomic_cmpxchg(<t_buf->offset, offset_old, offset_end)
+ != offset_old);
+
+
+ if(mode == FORCE_ACTIVE) {
+ /* Push the reader if necessary */
+ do {
+ consumed_old = atomic_read(<t_buf->consumed);
+ /* If buffer is in overwrite mode, push the reader consumed count if
+ the write position has reached it and we are not at the first
+ iteration (don't push the reader farther than the writer).
+ This operation can be done concurrently by many writers in the
+ same buffer, the writer being at the fartest write position sub-buffer
+ index in the buffer being the one which will win this loop. */
+ /* If the buffer is not in overwrite mode, pushing the reader only
+ happen if a sub-buffer is corrupted */
+ if((SUBBUF_TRUNC(offset_end, ltt_buf)
+ - SUBBUF_TRUNC(consumed_old, ltt_buf))
+ >= ltt_buf->alloc_size)
+ consumed_new = SUBBUF_ALIGN(consumed_old, ltt_buf);
+ else {
+ consumed_new = consumed_old;
+ break;
+ }
+ } while(atomic_cmpxchg(<t_buf->consumed, consumed_old, consumed_new)
+ != consumed_old);
+
+ if(consumed_old != consumed_new) {
+ /* Reader pushed : we are the winner of the push, we can therefore
+ reequilibrate reserve and commit. Atomic increment of the commit
+ count permits other writers to play around with this variable
+ before us. We keep track of corrupted_subbuffers even in overwrite
+ mode :
+ we never want to write over a non completely committed sub-buffer :
+ possible causes : the buffer size is too low compared to the unordered
+ data input, or there is a writer who died between the reserve and the
+ commit. */
+ if(reserve_commit_diff) {
+ /* We have to alter the sub-buffer commit count : a sub-buffer is
+ corrupted */
+ atomic_add(reserve_commit_diff,
+ <t_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
+ atomic_inc(<t_buf->corrupted_subbuffers);
+ }
+ }
+ }
+
+ /* Always switch */
+
+ if(end_switch_old) {
+ /* old subbuffer */
+ /* Concurrency safe because we are the last and only thread to alter this
+ sub-buffer. As long as it is not delivered and read, no other thread can
+ alter the offset, alter the reserve_count or call the
+ client_buffer_end_callback on this sub-buffer.
+ The only remaining threads could be the ones with pending commits. They
+ will have to do the deliver themself.
+ Not concurrency safe in overwrite mode. We detect corrupted subbuffers with
+ commit and reserve counts. We keep a corrupted sub-buffers count and push
+ the readers across these sub-buffers.
+ Not concurrency safe if a writer is stalled in a subbuffer and
+ another writer switches in, finding out it's corrupted. The result will be
+ than the old (uncommited) subbuffer will be declared corrupted, and that
+ the new subbuffer will be declared corrupted too because of the commit
+ count adjustment.
+ Offset old should never be 0. */
+ ltt_buffer_end_callback(ltt_buf, tsc, offset_old,
+ SUBBUF_INDEX((offset_old), ltt_buf));
+ /* Setting this reserve_count will allow the sub-buffer to be delivered by
+ the last committer. */
+ reserve_count = atomic_add_return((SUBBUF_OFFSET((offset_old-1),
+ ltt_buf) + 1),
+ <t_buf->reserve_count[SUBBUF_INDEX((offset_old),
+ ltt_buf)]);
+ if(reserve_count == atomic_read(
+ <t_buf->commit_count[SUBBUF_INDEX((offset_old), ltt_buf)])) {
+ ltt_deliver_callback(ltt_buf, SUBBUF_INDEX((offset_old), ltt_buf), NULL);
+ }
+ }
+
+ if(mode == FORCE_ACTIVE) {
+ /* New sub-buffer */
+ /* This code can be executed unordered : writers may already have written
+ to the sub-buffer before this code gets executed, caution. */
+ /* The commit makes sure that this code is executed before the deliver
+ of this sub-buffer */
+ ltt_buffer_begin_callback(ltt_buf, tsc, SUBBUF_INDEX(offset_begin, ltt_buf));
+ commit_count = atomic_add_return(ltt_subbuf_header_len(ltt_buf),
+ <t_buf->commit_count[SUBBUF_INDEX(offset_begin, ltt_buf)]);
+ /* Check if the written buffer has to be delivered */
+ if(commit_count == atomic_read(
+ <t_buf->reserve_count[SUBBUF_INDEX(offset_begin, ltt_buf)])) {
+ ltt_deliver_callback(ltt_buf, SUBBUF_INDEX(offset_begin, ltt_buf), NULL);
+ }
+ }
}
} else {
if(atomic_read(<t_buf->full) == 1) {
/* tell the client that buffer is now unfull */
- ret = futex(<t_buf->full, FUTEX_WAKE, 1, NULL, NULL, 0);
+ ret = futex((unsigned long)<t_buf->full,
+ FUTEX_WAKE, 1, 0, 0, 0);
if(ret != 1) {
printf("LTT warning : race condition : writer not waiting or too many writers\n");
}
*{
if(buffer_is_full) {
atomic_set(<t_buf->full, 1);
- ret = futex(<t_buf->full, 1, NULL, NULL, 0);
+ ret = do_futex((unsigned long)<t_buf->full, 1, 0, 0, 0);
}
}
static int read_subbuffer(struct ltt_buf *ltt_buf, int fd)
{
+ unsigned int consumed_old;
int err;
printf("LTT read buffer\n");
- err = ltt_buffer_get(&shared_trace_info->channel.cpu, &consumed_old);
+ err = ltt_buffer_get(ltt_buf, &consumed_old);
if(err != -EAGAIN && err != 0) {
printf("LTT Reserving sub buffer failed\n");
goto get_error;
}
#endif //0
write_error:
- err = ltt_buffer_put(&shared_trace_info->channel.cpu, consumed_old);
+ err = ltt_buffer_put(ltt_buf, consumed_old);
if(err != 0) {
if(err == -EIO) {
- perror("Reader has been pushed by the writer, last subbuffer corrupted.");
+ printf("Reader has been pushed by the writer, last subbuffer corrupted.\n");
/* FIXME : we may delete the last written buffer if we wish. */
}
goto get_error;
printf("LTT Doing a buffer switch read. pid is : %lu\n", getpid());
do {
- ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu);
+ ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
} while(ret == 0);
do {
- ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac);
+ ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
} while(ret == 0);
}
- /* Buffer force switch (flush) */
- flush_buffer(&shared_trace_info->channel.cpu);
+ /* The parent thread is dead and we have finished with the buffer */
+
+ /* Buffer force switch (flush). Using FLUSH instead of ACTIVE because we know
+ * there is no writer. */
+ flush_buffer(&shared_trace_info->channel.cpu, FORCE_FLUSH);
do {
- ret = read_buffer(&shared_trace_info->channel.cpu, fd_cpu);
+ ret = read_subbuffer(&shared_trace_info->channel.cpu, fd_cpu);
} while(ret == 0);
- flush_buffer(&shared_trace_info->channel.facilities);
+ flush_buffer(&shared_trace_info->channel.facilities, FORCE_FLUSH);
do {
- ret = read_buffer(&shared_trace_info->channel.facilities, fd_fac);
+ ret = read_subbuffer(&shared_trace_info->channel.facilities, fd_fac);
} while(ret == 0);
close(fd_fac);
close(fd_cpu);
- /* The parent thread is dead and we have finished with the buffer */
munmap(shared_trace_info, sizeof(*shared_trace_info));
exit(0);
atomic_set(&shared_trace_info->channel.facilities.full, 0);
shared_trace_info->channel.facilities.alloc_size = LTT_BUF_SIZE_FACILITIES;
shared_trace_info->channel.facilities.subbuf_size = LTT_SUBBUF_SIZE_FACILITIES;
+ shared_trace_info->channel.facilities.start =
+ shared_trace_info->channel.facilities_buf;
+
atomic_set(&shared_trace_info->channel.cpu.full, 0);
shared_trace_info->channel.cpu.alloc_size = LTT_BUF_SIZE_CPU;
shared_trace_info->channel.cpu.subbuf_size = LTT_SUBBUF_SIZE_CPU;
+ shared_trace_info->channel.cpu.start = shared_trace_info->channel.cpu_buf;
shared_trace_info->init = 1;
/* Disable signals */
#include <errno.h>
#include <asm/atomic.h>
#include <pthread.h>
+#include <stdint.h>
+#include <syscall.h>
+#include <linux/futex.h>
+
+#ifndef futex
+static inline _syscall6(long, futex, unsigned long, uaddr, int, op, int, val,
+ unsigned long, timeout, unsigned long, uaddr2, int, val2)
+#endif //futex
#ifndef LTT_N_SUBBUFS
(BUFFER_OFFSET(offset,buf)/buf->subbuf_size)
+#define LTT_TRACER_MAGIC_NUMBER 0x00D6B7ED
+#define LTT_TRACER_VERSION_MAJOR 0
+#define LTT_TRACER_VERSION_MINOR 7
+
+#ifndef atomic_cmpxchg
+#define atomic_cmpxchg(v, old, new) ((int)cmpxchg(&((v)->counter), old, new))
+#endif //atomic_cmpxchg
+struct ltt_trace_header {
+ uint32_t magic_number;
+ uint32_t arch_type;
+ uint32_t arch_variant;
+ uint32_t float_word_order; /* Only useful for user space traces */
+ uint8_t arch_size;
+ //uint32_t system_type;
+ uint8_t major_version;
+ uint8_t minor_version;
+ uint8_t flight_recorder;
+ uint8_t has_heartbeat;
+ uint8_t has_alignment; /* Event header alignment */
+ uint32_t freq_scale;
+ uint64_t start_freq;
+ uint64_t start_tsc;
+ uint64_t start_monotonic;
+ uint64_t start_time_sec;
+ uint64_t start_time_usec;
+} __attribute((packed));
+
+
+struct ltt_block_start_header {
+ struct {
+ uint64_t cycle_count;
+ uint64_t freq; /* khz */
+ } begin;
+ struct {
+ uint64_t cycle_count;
+ uint64_t freq; /* khz */
+ } end;
+ uint32_t lost_size; /* Size unused at the end of the buffer */
+ uint32_t buf_size; /* The size of this sub-buffer */
+ struct ltt_trace_header trace;
+} __attribute((packed));
+
+
+
struct ltt_buf {
+ void *start;
atomic_t offset;
atomic_t consumed;
atomic_t reserve_count[LTT_N_SUBBUFS];
atomic_t commit_count[LTT_N_SUBBUFS];
atomic_t events_lost;
+ atomic_t corrupted_subbuffers;
atomic_t full; /* futex on which the writer waits : 1 : full */
unsigned int alloc_size;
unsigned int subbuf_size;
} channel;
};
+
+
extern __thread struct ltt_trace_info *thread_trace_info;
void ltt_thread_init(void);
void ltt_usertrace_fast_buffer_switch(void);
+
+
+static inline uint64_t ltt_get_timestamp()
+{
+ return get_cycles();
+}
+
+static inline unsigned int ltt_subbuf_header_len(struct ltt_buf *buf)
+{
+ return sizeof(struct ltt_block_start_header);
+}
+
+
+
+static inline void ltt_write_trace_header(struct ltt_trace_header *header)
+{
+ header->magic_number = LTT_TRACER_MAGIC_NUMBER;
+ header->major_version = LTT_TRACER_VERSION_MAJOR;
+ header->minor_version = LTT_TRACER_VERSION_MINOR;
+ header->float_word_order = 0; //FIXME
+ header->arch_type = 0; //FIXME LTT_ARCH_TYPE;
+ header->arch_size = sizeof(void*);
+ header->arch_variant = 0; //FIXME LTT_ARCH_VARIANT;
+ header->flight_recorder = 0;
+ header->has_heartbeat = 0;
+
+#ifdef CONFIG_LTT_ALIGNMENT
+ header->has_alignment = sizeof(void*);
+#else
+ header->has_alignment = 0;
+#endif
+
+ //FIXME
+ header->freq_scale = 0;
+ header->start_freq = 0;
+ header->start_tsc = 0;
+ header->start_monotonic = 0;
+ header->start_time_sec = 0;
+ header->start_time_usec = 0;
+}
+
+
+static inline void ltt_buffer_begin_callback(struct ltt_buf *buf,
+ uint64_t tsc, unsigned int subbuf_idx)
+{
+ struct ltt_block_start_header *header =
+ (struct ltt_block_start_header*)
+ (buf->start + (subbuf_idx*buf->subbuf_size));
+
+ header->begin.cycle_count = tsc;
+ header->begin.freq = 0; //ltt_frequency();
+
+ header->lost_size = 0xFFFFFFFF; // for debugging...
+
+ header->buf_size = buf->subbuf_size;
+
+ ltt_write_trace_header(&header->trace);
+
+}
+
+
+
+static inline void ltt_buffer_end_callback(struct ltt_buf *buf,
+ uint64_t tsc, unsigned int offset, unsigned int subbuf_idx)
+{
+ struct ltt_block_start_header *header =
+ (struct ltt_block_start_header*)
+ (buf->start + (subbuf_idx*buf->subbuf_size));
+ /* offset is assumed to never be 0 here : never deliver a completely
+ * empty subbuffer. */
+ /* The lost size is between 0 and subbuf_size-1 */
+ header->lost_size = SUBBUF_OFFSET((buf->subbuf_size - offset),
+ buf);
+ header->end.cycle_count = tsc;
+ header->end.freq = 0; //ltt_frequency();
+}
+
+
+static inline void ltt_deliver_callback(struct ltt_buf *buf,
+ unsigned subbuf_idx,
+ void *subbuf)
+{
+ ltt_usertrace_fast_buffer_switch();
+}
#endif //_LTT_USERTRACE_FAST_H