void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, struct buffer_info *buf)
{
- struct ust_buffer *ustbuf = buf->bufstruct_mem;
- long write_offset = uatomic_read(&ustbuf->offset);
-
- long i_subbuf;
+ struct ust_buffer *ust_buf = buf->bufstruct_mem;
+ unsigned long n_subbufs_order = get_count_order(buf->n_subbufs);
+ unsigned long commit_seq_mask = (~0UL >> n_subbufs_order);
+ unsigned long cons_off;
int ret;
DBG("processing dead buffer (%s)", buf->name);
- DBG("consumed offset is %ld (%s)", uatomic_read(&ustbuf->consumed),
- buf->name);
- DBG("write offset is %ld (%s)", write_offset, buf->name);
-
- /* First subbuf that we need to consume now. It is not modulo'd.
- * Consumed_offset is the next byte to consume. */
- long first_subbuf = uatomic_read(&ustbuf->consumed) / buf->subbuf_size;
- /* Last subbuf that we need to consume now. It is not modulo'd.
- * Write_offset is the next place to write so write_offset-1 is the
- * last place written. */
- long last_subbuf = (write_offset - 1) / buf->subbuf_size;
-
- DBG("first_subbuf=%ld", first_subbuf);
- DBG("last_subbuf=%ld", last_subbuf);
-
- if(last_subbuf - first_subbuf >= buf->n_subbufs) {
- DBG("an overflow has occurred, nothing can be recovered");
- return;
- }
+ DBG("consumed offset is %ld (%s)", uatomic_read(&ust_buf->consumed),
+ buf->name);
+ DBG("write offset is %ld (%s)", uatomic_read(&ust_buf->offset),
+ buf->name);
- /* Iterate on subbuffers to recover. */
- for(i_subbuf = first_subbuf % buf->n_subbufs; ; i_subbuf++, i_subbuf %= buf->n_subbufs) {
- /* commit_seq is the offset in the buffer of the end of the last sequential commit.
- * Bytes beyond this limit cannot be recovered. This is a free-running counter. */
- long commit_seq = uatomic_read(&ustbuf->commit_seq[i_subbuf]);
-
- unsigned long valid_length = buf->subbuf_size;
- long n_subbufs_order = get_count_order(buf->n_subbufs);
- long commit_seq_mask = (~0UL >> n_subbufs_order);
-
- struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem+i_subbuf*buf->subbuf_size);
+ /*
+ * Iterate on subbuffers to recover, including the one the writer
+ * just wrote data into. Using write position - 1 since the writer
+ * position points into the position that is going to be written.
+ */
+ for (cons_off = uatomic_read(&ust_buf->consumed);
+ (long) (SUBBUF_TRUNC(uatomic_read(&ust_buf->offset) - 1, buf)
+ - cons_off) >= 0;
+ cons_off = SUBBUF_ALIGN(cons_off, buf)) {
+ /*
+ * commit_seq is the offset in the buffer of the end of the last sequential commit.
+ * Bytes beyond this limit cannot be recovered. This is a free-running counter.
+ */
+ unsigned long commit_seq =
+ uatomic_read(&ust_buf->commit_seq[SUBBUF_INDEX(cons_off, buf)]);
+ struct ltt_subbuffer_header *header =
+ (struct ltt_subbuffer_header *)((char *) buf->mem
+ + SUBBUF_INDEX(cons_off, buf) * buf->subbuf_size);
+ unsigned long valid_length;
/* Check if subbuf was fully written. This is from Mathieu's algorithm/paper. */
if (((commit_seq - buf->subbuf_size) & commit_seq_mask)
- - (USTD_BUFFER_TRUNC(uatomic_read(&ustbuf->consumed), buf) >> n_subbufs_order) == 0
+ - (USTD_BUFFER_TRUNC(uatomic_read(&ust_buf->consumed), buf) >> n_subbufs_order) == 0
&& header->data_size != 0xffffffff) {
assert(header->sb_size != 0xffffffff);
/*
- * If it was, we only check the data_size. This is the
- * amount of valid data at the beginning of the
- * subbuffer.
+ * If it was fully written, we only check the data_size.
+ * This is the amount of valid data at the beginning of
+ * the subbuffer.
*/
valid_length = header->data_size;
- DBG("writing full subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length);
- }
- else {
+ DBG("writing full subbuffer (%ld) with valid_length = %ld",
+ SUBBUF_INDEX(cons_off, buf), valid_length);
+ } else {
/*
* If the subbuffer was not fully written, then we don't
* check data_size because it hasn't been written yet.
* writing data_size that commit_seq is updated to
* include the end-of-buffer padding.
*/
- valid_length = commit_seq & (buf->subbuf_size-1);
- DBG("writing unfull subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length);
+ valid_length = commit_seq & (buf->subbuf_size - 1);
+ DBG("writing unfull subbuffer (%ld) with valid_length = %ld",
+ SUBBUF_INDEX(cons_off, buf), valid_length);
header->data_size = valid_length;
header->sb_size = PAGE_ALIGN(valid_length);
}
if (callbacks->on_read_partial_subbuffer) {
- ret = callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length);
- /* Increment the consumed offset */
- if (ret >= 0)
- uatomic_add(&ustbuf->consumed, buf->subbuf_size);
- else
+ ret = callbacks->on_read_partial_subbuffer(callbacks, buf,
+ SUBBUF_INDEX(cons_off, buf),
+ valid_length);
+ if (ret < 0)
break; /* Error happened */
- } else
- uatomic_add(&ustbuf->consumed, buf->subbuf_size);
- if(i_subbuf == last_subbuf % buf->n_subbufs)
- break;
+ }
}
-
+ /* Increment the consumed offset */
+ uatomic_set(&ust_buf->consumed, cons_off);
ltt_relay_print_buffer_errors(buf, buf->channel_cpu);
}