lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
lttng_kvfree(buf->commit_hot);
lttng_kvfree(buf->commit_cold);
+ lttng_kvfree(buf->ts_end);
lib_ring_buffer_backend_free(&buf->backend);
}
v_set(config, &buf->commit_hot[i].cc, 0);
v_set(config, &buf->commit_hot[i].seq, 0);
v_set(config, &buf->commit_cold[i].cc_sb, 0);
+ buf->ts_end[i] = 0;
}
atomic_long_set(&buf->consumed, 0);
atomic_set(&buf->record_disabled, 0);
goto free_commit;
}
+ buf->ts_end =
+ lttng_kvzalloc_node(ALIGN(sizeof(*buf->ts_end)
+ * chan->backend.num_subbuf,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL | __GFP_NOWARN,
+ cpu_to_node(max(cpu, 0)));
+ if (!buf->ts_end) {
+ ret = -ENOMEM;
+ goto free_commit_cold;
+ }
+
init_waitqueue_head(&buf->read_wait);
init_waitqueue_head(&buf->write_wait);
raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
/* Error handling */
free_init:
+ lttng_kvfree(buf->ts_end);
+free_commit_cold:
lttng_kvfree(buf->commit_cold);
free_commit:
lttng_kvfree(buf->commit_hot);
unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
unsigned long commit_count, padding_size, data_size;
struct commit_counters_hot *cc_hot;
+ u64 *ts_end;
data_size = subbuf_offset(offsets->old - 1, chan) + 1;
padding_size = chan->backend.subbuf_size - data_size;
subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
+ ts_end = &buf->ts_end[oldidx];
/*
- * Order all writes to buffer before the commit count update that will
- * determine that the subbuffer is full.
+ * This is the last space reservation in that sub-buffer before
+ * it gets delivered. This provides exclusive access to write to
+ * this sub-buffer's ts_end. There are also no concurrent
+ * readers of that ts_end because delivery of that sub-buffer is
+ * postponed until the commit counter is incremented for the
+ * current space reservation.
+ */
+ *ts_end = tsc;
+
+ /*
+ * Order all writes to buffer and store to ts_end before the commit
+ * count update that will determine that the subbuffer is full.
*/
if (config->ipi == RING_BUFFER_IPI_BARRIER) {
/*
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
unsigned long endidx, data_size;
+ u64 *ts_end;
endidx = subbuf_index(offsets->end - 1, chan);
data_size = subbuf_offset(offsets->end - 1, chan) + 1;
subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+ ts_end = &buf->ts_end[endidx];
+ /*
+ * This is the last space reservation in that sub-buffer before
+ * it gets delivered. This provides exclusive access to write to
+ * this sub-buffer's ts_end. There are also no concurrent
+ * readers of that ts_end because delivery of that sub-buffer is
+ * postponed until the commit counter is incremented for the
+ * current space reservation.
+ */
+ *ts_end = tsc;
}
/*
if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
old_commit_count, old_commit_count + 1)
== old_commit_count)) {
+ u64 *ts_end;
+
/*
* Start of exclusive subbuffer access. We are
* guaranteed to be the last writer in this subbuffer
* and any other writer trying to access this subbuffer
* in this state is required to drop records.
+ *
+ * We can read the ts_end for the current sub-buffer
+ * which has been saved by the very last space
+ * reservation for the current sub-buffer.
+ *
+ * Order increment of commit counter before reading ts_end.
*/
+ smp_mb();
+ ts_end = &buf->ts_end[idx];
deliver_count_events(config, buf, idx);
- config->cb.buffer_end(buf, tsc, idx,
+ config->cb.buffer_end(buf, *ts_end, idx,
lib_ring_buffer_get_data_size(config,
buf,
idx));