int fds[2];
int result;
- ltt_buf->commit_count =
- zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs);
- if (!ltt_buf->commit_count)
- return -ENOMEM;
+//ust// ltt_buf->commit_count =
+//ust// zmalloc(sizeof(ltt_buf->commit_count) * n_subbufs);
+//ust// if (!ltt_buf->commit_count)
+//ust// return -ENOMEM;
kref_get(&trace->kref);
kref_get(&trace->ltt_transport_kref);
kref_get(<t_chan->kref);
kref_put(<t_chan->trace->ltt_transport_kref,
ltt_release_transport);
ltt_relay_print_buffer_errors(ltt_chan);
- kfree(ltt_buf->commit_count);
- ltt_buf->commit_count = NULL;
+//ust// kfree(ltt_buf->commit_count);
+//ust// ltt_buf->commit_count = NULL;
kref_put(<t_chan->kref, ltt_relay_release_channel);
kref_put(&trace->kref, ltt_release_trace);
//ust// wake_up_interruptible(&trace->kref_wq);
int result;
/* Get one page */
+ /* FIXME: increase size if we have a commit_count array that overflows the page */
size_t size = PAGE_ALIGN(1);
result = ltt_chan->buf_shmid = shmget(getpid(), size, IPC_CREAT | IPC_EXCL | 0700);
* sub-buffer before this code gets executed, caution. The commit makes sure
* that this code is executed before the deliver of this sub-buffer.
*/
-static inline void ltt_reserve_switch_new_subbuf(
+static /*inline*/ void ltt_reserve_switch_new_subbuf(
struct ltt_channel_struct *ltt_channel,
struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
struct rchan_buf *buf,
* fill the subbuffer completely (so the subbuf index stays in the previous
* subbuffer).
*/
-#ifdef CONFIG_LTT_VMCORE
-static inline void ltt_write_commit_counter(struct rchan_buf *buf,
+//ust// #ifdef CONFIG_LTT_VMCORE
+static /*inline*/ void ltt_write_commit_counter(struct rchan_buf *buf,
long buf_offset, size_t slot_size)
{
struct ltt_channel_struct *ltt_channel =
(struct ltt_channel_struct *)buf->chan->private_data;
- struct ltt_channel_buf_struct *ltt_buf =
- percpu_ptr(ltt_channel->buf, buf->cpu);
+ struct ltt_channel_buf_struct *ltt_buf = ltt_channel->buf;
struct ltt_subbuffer_header *header;
long offset, subbuf_idx, commit_count;
uint32_t lost_old, lost_new;
}
}
}
-#else
-static inline void ltt_write_commit_counter(struct rchan_buf *buf,
- long buf_offset, size_t slot_size)
-{
-}
-#endif
+//ust// #else
+//ust// static inline void ltt_write_commit_counter(struct rchan_buf *buf,
+//ust// long buf_offset, size_t slot_size)
+//ust// {
+//ust// }
+//ust// #endif
/*
* Atomic unordered slot commit. Increments the commit count in the
* ltt buffers from vmcore, after crash.
*/
ltt_write_commit_counter(buf, buf_offset, slot_size);
+
+ DBG("commited slot. now commit count is %ld", commit_count);
}
/*
struct ltt_channel_buf_struct {
/* First 32 bytes cache-hot cacheline */
local_t offset; /* Current offset in the buffer */
- local_t *commit_count; /* Commit count per sub-buffer */
+//ust// local_t *commit_count; /* Commit count per sub-buffer */
atomic_long_t consumed; /*
* Current offset in the buffer
* standard atomic access (shared)
int data_ready_fd_write;
/* the reading end of the pipe */
int data_ready_fd_read;
+
+ /* commit count per subbuffer; must be at end of struct */
+ local_t commit_count[0] ____cacheline_aligned;
} ____cacheline_aligned;
int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old);
#include "compiler.h"
#include <string.h>
+#include <sys/time.h>
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
+CFLAGS="-g -Wall"
+
all: ustd
-ustd: ustd.c
- gcc -g -Wall -I ../libustcomm -I. -I ../../../../libkcompat -lpthread -o ustd ustd.c ../libustcomm/ustcomm.c
+lowlevel.o: lowlevel.c
+ gcc -g -Wall -I ../libustcomm -I. -I ../../../../libkcompat -I ../libtracing -I ../share -I ../../../../urcu -I ../libmarkers -lpthread -c lowlevel.c
+
+ustd.o: ustd.c
+ gcc -g -Wall -I ../libustcomm -I. -I ../../../../libkcompat -lpthread -c ustd.c
+
+ustcomm.o: ../libustcomm/ustcomm.c
+ gcc -g -Wall -I ../share ../libustcomm/ustcomm.c
+
+ustd: ustd.o lowlevel.o ustcomm.o
+ gcc -g -Wall -lpthread -o ustd ustd.o lowlevel.o ustcomm.o
.PHONY: ustd
--- /dev/null
+#include "tracer.h"
+#include "ustd.h"
+#include "localerr.h"
+
+#define USTD_BUFFER_TRUNC(offset, bufinfo) \
+ ((offset) & (~(((bufinfo)->subbuf_size*(bufinfo)->n_subbufs)-1)))
+
+void finish_consuming_dead_subbuffer(struct buffer_info *buf)
+{
+ struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem;
+
+ long write_offset = local_read(<t_buf->offset);
+ long consumed_offset = atomic_long_read(<t_buf->consumed);
+
+ long i_subbuf;
+
+ DBG("processing died buffer");
+ DBG("consumed offset is %ld", consumed_offset);
+ DBG("write offset is %ld", write_offset);
+
+ long first_subbuf = write_offset / buf->subbuf_size;
+ long last_subbuf = consumed_offset / buf->subbuf_size;
+
+ if(last_subbuf - first_subbuf > buf->n_subbufs) {
+ DBG("an overflow has occurred, nothing can be recovered");
+ return;
+ }
+
+ for(i_subbuf=first_subbuf; ; i_subbuf++, i_subbuf %= buf->n_subbufs) {
+ long commit_count = local_read(<t_buf->commit_count[i_subbuf]);
+
+ unsigned long valid_length = buf->subbuf_size;
+ long n_subbufs_order = get_count_order(buf->n_subbufs);
+ long commit_count_mask = (~0UL >> n_subbufs_order);
+
+ /* check if subbuf was fully written */
+ if (((commit_count - buf->subbuf_size) & commit_count_mask)
+ - (USTD_BUFFER_TRUNC(consumed_offset, buf) >> n_subbufs_order)
+ != 0) {
+ struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem)+i_subbuf*buf->subbuf_size;
+ valid_length = buf->subbuf_size - header->lost_size;
+ }
+
+ patient_write(buf->file_fd, buf->mem + i_subbuf * buf->subbuf_size, buf->subbuf_size);
+
+ if(i_subbuf == last_subbuf)
+ break;
+ }
+}
+
#include <stdio.h>
#include <string.h>
+#include "ustd.h"
#include "localerr.h"
#include "ustcomm.h"
struct list_head buffers = LIST_HEAD_INIT(buffers);
-struct buffer_info {
- char *name;
- pid_t pid;
- struct ustcomm_connection conn;
-
- int shmid;
- int bufstruct_shmid;
-
- /* the buffer memory */
- void *mem;
- /* buffer size */
- int memlen;
- /* number of subbuffers in buffer */
- int n_subbufs;
- /* size of each subbuffer */
- int subbuf_size;
-
- /* the buffer information struct */
- void *bufstruct_mem;
-
- int file_fd; /* output file */
-
- struct list_head list;
-
- long consumed_old;
-};
-
/* return value: 0 = subbuffer is finished, it won't produce data anymore
* 1 = got subbuffer successfully
* <0 = error
return bufc-(const char *)buf;
}
-int get_subbuffer_died(struct buffer_info *buf)
-{
- return 0;
-}
-
-//int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
-//{
-// struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem;
-//
-////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
-// long consumed_old, consumed_idx, commit_count, write_offset;
-// consumed_old = atomic_long_read(<t_buf->consumed);
-// consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
-// commit_count = local_read(<t_buf->commit_count[consumed_idx]);
-// /*
-// * Make sure we read the commit count before reading the buffer
-// * data and the write offset. Correct consumed offset ordering
-// * wrt commit count is insured by the use of cmpxchg to update
-// * the consumed offset.
-// */
-// smp_rmb();
-// write_offset = local_read(<t_buf->offset);
-// /*
-// * Check that the subbuffer we are trying to consume has been
-// * already fully committed.
-// */
-// if (((commit_count - buf->chan->subbuf_size)
-// & ltt_channel->commit_count_mask)
-// - (BUFFER_TRUNC(consumed_old, buf->chan)
-// >> ltt_channel->n_subbufs_order)
-// != 0) {
-// return -EAGAIN;
-// }
-// /*
-// * Check that we are not about to read the same subbuffer in
-// * which the writer head is.
-// */
-// if ((SUBBUF_TRUNC(write_offset, buf->chan)
-// - SUBBUF_TRUNC(consumed_old, buf->chan))
-// == 0) {
-// return -EAGAIN;
-// }
-//
-// *pconsumed_old = consumed_old;
-// return 0;
-//}
-
void *consumer_thread(void *arg)
{
struct buffer_info *buf = (struct buffer_info *) arg;
int result;
- int died = 0;
for(;;) {
/* get the subbuffer */
- if(died == 0) {
- result = get_subbuffer(buf);
- if(result == -1) {
- ERR("error getting subbuffer");
- continue;
- }
- else if(result == GET_SUBBUF_DONE) {
- /* this is done */
- break;
- }
- else if(result == GET_SUBBUF_DIED) {
- died = 1;
- }
+ result = get_subbuffer(buf);
+ if(result == -1) {
+ ERR("error getting subbuffer");
+ continue;
}
- if(died == 1) {
- result = get_subbuffer_died(buf);
- if(result <= 0) {
- break;
- }
+ else if(result == GET_SUBBUF_DONE) {
+ /* this is done */
+ break;
+ }
+ else if(result == GET_SUBBUF_DIED) {
+ finish_consuming_dead_subbuffer(buf);
+ break;
}
/* write data to file */
}
/* put the subbuffer */
- if(died == 0) {
- result = put_subbuffer(buf);
- if(result == -1) {
- ERR("error putting subbuffer");
- break;
- }
- }
- else {
-// result = put_subbuffer_died(buf);
+ result = put_subbuffer(buf);
+ if(result == -1) {
+ ERR("error putting subbuffer");
+ break;
}
}
--- /dev/null
+#ifndef USTD_H
+#define USTD_H
+
+#include "ustcomm.h"
+
+struct buffer_info {
+ char *name;
+ pid_t pid;
+ struct ustcomm_connection conn;
+
+ int shmid;
+ int bufstruct_shmid;
+
+ /* the buffer memory */
+ void *mem;
+ /* buffer size */
+ int memlen;
+ /* number of subbuffers in buffer */
+ int n_subbufs;
+ /* size of each subbuffer */
+ int subbuf_size;
+
+ /* the buffer information struct */
+ void *bufstruct_mem;
+
+ int file_fd; /* output file */
+
+ struct list_head list;
+
+ long consumed_old;
+};
+
+ssize_t patient_write(int fd, const void *buf, size_t count);
+
+void finish_consuming_dead_subbuffer(struct buffer_info *buf);
+
+#endif /* USTD_H */