From: Mathieu Desnoyers Date: Mon, 1 Jul 2013 21:01:56 +0000 (-0400) Subject: Fix: ring buffer: handle concurrent update in nested buffer wrap around check X-Git-Tag: v2.1.3~3 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=0348b08949345b263015bb95bcaae8f7a607fb77;p=lttng-modules.git Fix: ring buffer: handle concurrent update in nested buffer wrap around check With stress-test loads that trigger sub-buffer switch very frequently (small 4kB sub-buffers, frequent flush), we currently observe this kind of warnings once every few minutes: [65335.896208] ring buffer relay-overwrite-mmap, cpu 5: records were lost. Caused by: [65335.896208] [ 0 buffer full, 1 nest buffer wrap-around, 0 event too big ] It appears that the check for nested buffer wrap-around does not take into account that a concurrent execution contexts (either nested for per-cpu buffers, or from another CPU or nested for global buffers) can update the commit_count value concurrently. What we really want to do with this check is to ensure that if we enter a sub-buffer that had an unbalanced reserve/commit count, assuming there is no hope that this gets rebalanced promptly, we detect this and drop the current event. However, in the case where the commit counter has been concurrently updated by another reserve or a switch, we want to retry the entire reserve operation. One way to detect this is to sample the reserve offset twice, around the commit counter read, along with the appropriate memory barriers. Therefore, we can detect if the mismatch between reserve and commit counter is actually caused by a concurrent update, which necessarily has updated the reserve counter. Signed-off-by: Mathieu Desnoyers --- diff --git a/lib/ringbuffer/frontend_internal.h b/lib/ringbuffer/frontend_internal.h index bc284333..dbebdeec 100644 --- a/lib/ringbuffer/frontend_internal.h +++ b/lib/ringbuffer/frontend_internal.h @@ -331,6 +331,12 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ + + /* + * Order prior updates to reserve count prior to the + * commit_cold cc_sb update. + */ + smp_wmb(); if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { @@ -373,6 +379,11 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, /* End of exclusive subbuffer access */ v_set(config, &buf->commit_cold[idx].cc_sb, commit_count); + /* + * Order later updates to reserve count after + * the commit_cold cc_sb update. + */ + smp_wmb(); lib_ring_buffer_vmcore_check_deliver(config, buf, commit_count, idx); diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 942e14d3..28525a69 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -1572,9 +1572,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, struct lib_ring_buffer_ctx *ctx) { const struct lib_ring_buffer_config *config = &chan->backend.config; - unsigned long reserve_commit_diff; + unsigned long reserve_commit_diff, offset_cmp; - offsets->begin = v_read(config, &buf->offset); +retry: + offsets->begin = offset_cmp = v_read(config, &buf->offset); offsets->old = offsets->begin; offsets->switch_new_start = 0; offsets->switch_new_end = 0; @@ -1606,7 +1607,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, } } if (unlikely(offsets->switch_new_start)) { - unsigned long sb_index; + unsigned long sb_index, commit_count; /* * We are typically not filling the previous buffer completely. @@ -1617,12 +1618,31 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, + config->cb.subbuffer_header_size(); /* Test new buffer integrity */ sb_index = subbuf_index(offsets->begin, chan); + /* + * Read buf->offset before buf->commit_cold[sb_index].cc_sb. + * lib_ring_buffer_check_deliver() has the matching + * memory barriers required around commit_cold cc_sb + * updates to ensure reserve and commit counter updates + * are not seen reordered when updated by another CPU. + */ + smp_rmb(); + commit_count = v_read(config, + &buf->commit_cold[sb_index].cc_sb); + /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */ + smp_rmb(); + if (unlikely(offset_cmp != v_read(config, &buf->offset))) { + /* + * The reserve counter have been concurrently updated + * while we read the commit counter. This means the + * commit counter we read might not match buf->offset + * due to concurrent update. We therefore need to retry. + */ + goto retry; + } reserve_commit_diff = (buf_trunc(offsets->begin, chan) >> chan->backend.num_subbuf_order) - - ((unsigned long) v_read(config, - &buf->commit_cold[sb_index].cc_sb) - & chan->commit_count_mask); + - (commit_count & chan->commit_count_mask); if (likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ if (unlikely(config->mode != RING_BUFFER_OVERWRITE && @@ -1647,9 +1667,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, } else { /* * Next subbuffer reserve offset does not match the - * commit offset. Drop record in producer-consumer and - * overwrite mode. Caused by either a writer OOPS or too - * many nested writes over a reserve/commit pair. + * commit offset, and this did not involve update to the + * reserve counter. Drop record in producer-consumer and + * overwrite mode. Caused by either a writer OOPS or + * too many nested writes over a reserve/commit pair. */ v_inc(config, &buf->records_lost_wrap); return -EIO;