finish_consuming_dead_subbuffer: fix data_size read race, reread new consumed count
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 3 Mar 2011 17:53:52 +0000 (12:53 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 3 Mar 2011 17:53:52 +0000 (12:53 -0500)
Make sure finish_consuming_dead_subbuffer always see a data_size that is non
0xffffffff only when the buffer data is entirely readable, else the code rely on
the commit_seq counter which is the proper solution.

The consumed count should be re-read in each test within the loop, otherwise the
mathematic formula to get the amount of data to read does not work wrt
buffer-size wrap-around.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
libust/buffers.c
libustconsumer/lowlevel.c

index 732e03dce7263b33e19958d763607b8ff09f306f..9dcec2a825d953ab51a07adfdaf69e244c51e4b9 100644 (file)
@@ -146,7 +146,13 @@ static void ltt_buffer_begin(struct ust_buffer *buf,
        header->cycle_count_begin = tsc;
        header->data_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
        header->sb_size = 0xFFFFFFFF; /* for recognizing crashed buffers */
-       /* FIXME: add memory barrier? */
+       /*
+        * No memory barrier needed to order data_data/sb_size vs commit count
+        * update, because commit count update contains a compiler barrier that
+        * ensures the order of the writes are OK from a program POV. It only
+        * matters for crash dump recovery which is not executed concurrently,
+        * so memory write order does not matter.
+        */
        ltt_write_trace_header(channel->trace, header);
 }
 
@@ -356,7 +362,6 @@ static notrace void ltt_buffer_end(struct ust_buffer *buf,
                                subbuf_idx * buf->chan->subbuf_size);
        u32 data_size = SUBBUF_OFFSET(offset - 1, buf->chan) + 1;
 
-       header->data_size = data_size;
        header->sb_size = PAGE_ALIGN(data_size);
        header->cycle_count_end = tsc;
        header->events_lost = uatomic_read(&buf->events_lost);
@@ -364,6 +369,13 @@ static notrace void ltt_buffer_end(struct ust_buffer *buf,
        if(unlikely(header->events_lost > 0)) {
                DBG("Some events (%d) were lost in %s_%d", header->events_lost, buf->chan->channel_name, buf->cpu);
        }
+       /*
+        * Makes sure data_size write happens after write of the rest of the
+        * buffer end data, because data_size is used to identify a completely
+        * written subbuffer in a crash dump.
+        */
+       cmm_barrier();
+       header->data_size = data_size;
 }
 
 /*
index ec3cf89cd539e9bca6670b5f17c93df57bb5537f..a65ed09c59a3d453d2b4b35d7252a17b268e74c7 100644 (file)
@@ -125,20 +125,19 @@ size_t subbuffer_data_size(void *subbuf)
 void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, struct buffer_info *buf)
 {
        struct ust_buffer *ustbuf = buf->bufstruct_mem;
-
        long write_offset = uatomic_read(&ustbuf->offset);
-       long consumed_offset = uatomic_read(&ustbuf->consumed);
 
        long i_subbuf;
        int ret;
 
        DBG("processing dead buffer (%s)", buf->name);
-       DBG("consumed offset is %ld (%s)", consumed_offset, buf->name);
+       DBG("consumed offset is %ld (%s)", uatomic_read(&ustbuf->consumed),
+           buf->name);
        DBG("write offset is %ld (%s)", write_offset, buf->name);
 
        /* First subbuf that we need to consume now. It is not modulo'd.
         * Consumed_offset is the next byte to consume.  */
-       long first_subbuf = consumed_offset / buf->subbuf_size;
+       long first_subbuf = uatomic_read(&ustbuf->consumed) / buf->subbuf_size;
        /* Last subbuf that we need to consume now. It is not modulo'd. 
         * Write_offset is the next place to write so write_offset-1 is the
         * last place written. */
@@ -165,36 +164,49 @@ void finish_consuming_dead_subbuffer(struct ustconsumer_callbacks *callbacks, st
                struct ltt_subbuffer_header *header = (struct ltt_subbuffer_header *)((char *)buf->mem+i_subbuf*buf->subbuf_size);
 
                /* Check if subbuf was fully written. This is from Mathieu's algorithm/paper. */
-               /* FIXME: not sure data_size = 0xffffffff when the buffer is not full. It might
-                * take the value of the header size initially */
                if (((commit_seq - buf->subbuf_size) & commit_seq_mask)
-                   - (USTD_BUFFER_TRUNC(consumed_offset, buf) >> n_subbufs_order) == 0
-                    && header->data_size != 0xffffffff && header->sb_size != 0xffffffff) {
-                       /* If it was, we only check the data_size. This is the amount of valid data at
-                        * the beginning of the subbuffer. */
+                   - (USTD_BUFFER_TRUNC(uatomic_read(&ustbuf->consumed), buf) >> n_subbufs_order) == 0
+                    && header->data_size != 0xffffffff) {
+                       assert(header->sb_size != 0xffffffff);
+                       /*
+                        * If it was, we only check the data_size. This is the
+                        * amount of valid data at the beginning of the
+                        * subbuffer.
+                        */
                        valid_length = header->data_size;
                        DBG("writing full subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length);
                }
                else {
-                       /* If the subbuffer was not fully written, then we don't check data_size because
-                        * it hasn't been written yet. Instead we check commit_seq and use it to choose
-                        * a value for data_size. The viewer will need this value when parsing.
+                       /*
+                        * If the subbuffer was not fully written, then we don't
+                        * check data_size because it hasn't been written yet.
+                        * Instead we check commit_seq and use it to choose a
+                        * value for data_size. The viewer will need this value
+                        * when parsing. Generally, this will happen only for
+                        * the last subbuffer. However, if we have threads still
+                        * holding reserved slots in the previous subbuffers,
+                        * which could happen for other subbuffers prior to the
+                        * last one. Note that when data_size is set, the
+                        * commit_seq count is still at a value that shows the
+                        * amount of valid data to read. It's only _after_
+                        * writing data_size that commit_seq is updated to
+                        * include the end-of-buffer padding.
                         */
-
                        valid_length = commit_seq & (buf->subbuf_size-1);
                        DBG("writing unfull subbuffer (%ld) with valid_length = %ld", i_subbuf, valid_length);
                        header->data_size = valid_length;
                        header->sb_size = PAGE_ALIGN(valid_length);
-                       assert(i_subbuf == (last_subbuf % buf->n_subbufs));
                }
 
-               if(callbacks->on_read_partial_subbuffer)
+               if (callbacks->on_read_partial_subbuffer) {
                        ret = callbacks->on_read_partial_subbuffer(callbacks, buf, i_subbuf, valid_length);
-
-               /* Increment the consumed offset */
-               if (ret >= 0)
+                       /* Increment the consumed offset */
+                       if (ret >= 0)
+                               uatomic_add(&ustbuf->consumed, buf->subbuf_size);
+                       else
+                               break;  /* Error happened */
+               } else
                        uatomic_add(&ustbuf->consumed, buf->subbuf_size);
-
                if(i_subbuf == last_subbuf % buf->n_subbufs)
                        break;
        }
This page took 0.028702 seconds and 4 git commands to generate.