* The subbuffer size is least 2 bytes (minimum size: 1 page).
* This guarantees that old_commit_count + 1 != commit_count.
*/
+
+ /*
+ * Order prior updates to reserve count prior to the
+ * commit_cold cc_sb update.
+ */
+ cmm_smp_wmb();
if (caa_likely(v_cmpxchg(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb,
old_commit_count, old_commit_count + 1)
== old_commit_count)) {
/* End of exclusive subbuffer access */
v_set(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb,
commit_count);
+ /*
+ * Order later updates to reserve count after
+ * the commit cold cc_sb update.
+ */
+ cmm_smp_wmb();
lib_ring_buffer_vmcore_check_deliver(config, buf,
commit_count, idx, handle);
{
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle = ctx->handle;
- unsigned long reserve_commit_diff;
+ unsigned long reserve_commit_diff, offset_cmp;
- offsets->begin = v_read(config, &buf->offset);
+retry:
+ offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->old = offsets->begin;
offsets->switch_new_start = 0;
offsets->switch_new_end = 0;
}
}
if (caa_unlikely(offsets->switch_new_start)) {
- unsigned long sb_index;
+ unsigned long sb_index, commit_count;
/*
* We are typically not filling the previous buffer completely.
+ config->cb.subbuffer_header_size();
/* Test new buffer integrity */
sb_index = subbuf_index(offsets->begin, chan);
+ /*
+ * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+ * lib_ring_buffer_check_deliver() has the matching
+ * memory barriers required around commit_cold cc_sb
+ * updates to ensure reserve and commit counter updates
+ * are not seen reordered when updated by another CPU.
+ */
+ cmm_smp_rmb();
+ commit_count = v_read(config,
+ &shmp_index(handle, buf->commit_cold,
+ sb_index)->cc_sb);
+ /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+ cmm_smp_rmb();
+ if (caa_unlikely(offset_cmp != v_read(config, &buf->offset))) {
+ /*
+ * The reserve counter have been concurrently updated
+ * while we read the commit counter. This means the
+ * commit counter we read might not match buf->offset
+ * due to concurrent update. We therefore need to retry.
+ */
+ goto retry;
+ }
reserve_commit_diff =
(buf_trunc(offsets->begin, chan)
>> chan->backend.num_subbuf_order)
- - ((unsigned long) v_read(config,
- &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb)
- & chan->commit_count_mask);
+ - (commit_count & chan->commit_count_mask);
if (caa_likely(reserve_commit_diff == 0)) {
/* Next subbuffer not being written to. */
if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE &&
/*
* Next subbuffer reserve offset does not match the
- * commit offset. Drop record in producer-consumer and
+ * commit offset, and this did not involve update to the
+ * reserve counter. Drop record in producer-consumer and
* overwrite mode. Caused by either a writer OOPS or too
* many nested writes over a reserve/commit pair.
*/