From: Mathieu Desnoyers Date: Thu, 3 Mar 2011 17:53:52 +0000 (-0500) Subject: finish_consuming_dead_subbuffer: fix data_size read race, reread new consumed count X-Git-Tag: v1.9.1~433 X-Git-Url: http://git.lttng.org./?a=commitdiff_plain;h=c74e3560aa89ea4dac242ceae06355f5a1ddc2c9;p=lttng-ust.git finish_consuming_dead_subbuffer: fix data_size read race, reread new consumed count Make sure finish_consuming_dead_subbuffer always see a data_size that is non 0xffffffff only when the buffer data is entirely readable, else the code rely on the commit_seq counter which is the proper solution. The consumed count should be re-read in each test within the loop, otherwise the mathematic formula to get the amount of data to read does not work wrt buffer-size wrap-around. Signed-off-by: Mathieu Desnoyers --- diff --git a/libust/buffers.c b/libust/buffers.c index 732e03dc..9dcec2a8 100644 --- a/libust/buffers.c +++ b/libust/buffers.c @@ -146,7 +146,13 @@ static void ltt_buffer_begin(struct ust_buffer *buf, header->cycle_count_begin = tsc; header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */ - /* FIXME: add memory barrier? */ + /* + * No memory barrier needed to order data_data/sb_size vs commit count + * update, because commit count update contains a compiler barrier that + * ensures the order of the writes are OK from a program POV. It only + * matters for crash dump recovery which is not executed concurrently, + * so memory write order does not matter. + */ ltt_write_trace_header(channel->trace, header); } @@ -356,7 +362,6 @@ static notrace void ltt_buffer_end(struct ust_buffer *buf, subbuf_idx * buf->chan->subbuf_size); u32 data_size = SUBBUF_OFFSET(offset - 1, buf->chan) + 1; - header->data_size = data_size; header->sb_size = PAGE_ALIGN(data_size); header->cycle_count_end = tsc; header->events_lost = uatomic_read(&buf->events_lost); @@ -364,6 +369,13 @@ static notrace void ltt_buffer_end(struct ust_buffer *buf, if(unlikely(header->events_lost > 0)) { DBG("Some events (%d) were lost in %s_%d", header->events_lost, buf->chan->channel_name, buf->cpu); } + /* + * Makes sure data_size write happens after write of the rest of the + * buffer end data, because data_size is used to identify a completely + * written subbuffer in a crash dump. + */ + cmm_barrier(); + header->data_size = data_size; } /* diff --git a/libustconsumer/lowlevel.c b/libustconsumer/lowlevel.c index ec3cf89c..a65ed09c 100644 --- a/libustconsumer/lowlevel.c +++ b/libustconsumer/lowlevel.c @@ -125,20 +125,19 @@ size_t subbuffer_data_size(void *subbuf) void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, struct buffer_info *buf) { struct ust_buffer *ustbuf = buf->bufstruct_mem; - long write_offset = uatomic_read(&ustbuf->offset); - long consumed_offset = uatomic_read(&ustbuf->consumed); long i_subbuf; int ret; DBG("processing dead buffer (%s)", buf->name); - DBG("consumed offset is %ld (%s)", consumed_offset, buf->name); + DBG("consumed offset is %ld (%s)", uatomic_read(&ustbuf->consumed), + buf->name); DBG("write offset is %ld (%s)", write_offset, buf->name); /* First subbuf that we need to consume now. It is not modulo'd. * Consumed_offset is the next byte to consume. */ - long first_subbuf = consumed_offset / buf->subbuf_size; + long first_subbuf = uatomic_read(&ustbuf->consumed) / buf->subbuf_size; /* Last subbuf that we need to consume now. It is not modulo'd. * Write_offset is the next place to write so write_offset-1 is the * last place written. */ @@ -165,36 +164,49 @@ void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, st struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem+i_subbuf*buf->subbuf_size); /* Check if subbuf was fully written. This is from Mathieu's algorithm/paper. */ - /* FIXME: not sure data_size = 0xffffffff when the buffer is not full. It might - * take the value of the header size initially */ if (((commit_seq - buf->subbuf_size) & commit_seq_mask) - - (USTD_BUFFER_TRUNC(consumed_offset, buf) >> n_subbufs_order) == 0 - && header->data_size != 0xffffffff && header->sb_size != 0xffffffff) { - /* If it was, we only check the data_size. This is the amount of valid data at - * the beginning of the subbuffer. */ + - (USTD_BUFFER_TRUNC(uatomic_read(&ustbuf->consumed), buf) >> n_subbufs_order) == 0 + && header->data_size != 0xffffffff) { + assert(header->sb_size != 0xffffffff); + /* + * If it was, we only check the data_size. This is the + * amount of valid data at the beginning of the + * subbuffer. + */ valid_length = header->data_size; DBG("writing full subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length); } else { - /* If the subbuffer was not fully written, then we don't check data_size because - * it hasn't been written yet. Instead we check commit_seq and use it to choose - * a value for data_size. The viewer will need this value when parsing. + /* + * If the subbuffer was not fully written, then we don't + * check data_size because it hasn't been written yet. + * Instead we check commit_seq and use it to choose a + * value for data_size. The viewer will need this value + * when parsing. Generally, this will happen only for + * the last subbuffer. However, if we have threads still + * holding reserved slots in the previous subbuffers, + * which could happen for other subbuffers prior to the + * last one. Note that when data_size is set, the + * commit_seq count is still at a value that shows the + * amount of valid data to read. It's only _after_ + * writing data_size that commit_seq is updated to + * include the end-of-buffer padding. */ - valid_length = commit_seq & (buf->subbuf_size-1); DBG("writing unfull subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length); header->data_size = valid_length; header->sb_size = PAGE_ALIGN(valid_length); - assert(i_subbuf == (last_subbuf % buf->n_subbufs)); } - if(callbacks->on_read_partial_subbuffer) + if (callbacks->on_read_partial_subbuffer) { ret = callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length); - - /* Increment the consumed offset */ - if (ret >= 0) + /* Increment the consumed offset */ + if (ret >= 0) + uatomic_add(&ustbuf->consumed, buf->subbuf_size); + else + break; /* Error happened */ + } else uatomic_add(&ustbuf->consumed, buf->subbuf_size); - if(i_subbuf == last_subbuf % buf->n_subbufs) break; }