1 #ifndef _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H
2 #define _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H
5 * linux/ringbuffer/frontend_internal.h
7 * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 * Ring Buffer Library Synchronization Header (internal helpers).
12 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
14 * See ring_buffer_frontend.c for more information on wait-free algorithms.
16 * Dual LGPL v2.1/GPL v2 license.
19 #include <urcu/compiler.h>
22 #include "backend_types.h"
23 #include "frontend_types.h"
26 /* Buffer offset macros */
28 /* buf_trunc mask selects only the buffer number. */
30 unsigned long buf_trunc(unsigned long offset
, struct channel
*chan
)
32 return offset
& ~(chan
->backend
.buf_size
- 1);
36 /* Select the buffer number value (counter). */
38 unsigned long buf_trunc_val(unsigned long offset
, struct channel
*chan
)
40 return buf_trunc(offset
, chan
) >> chan
->backend
.buf_size_order
;
43 /* buf_offset mask selects only the offset within the current buffer. */
45 unsigned long buf_offset(unsigned long offset
, struct channel
*chan
)
47 return offset
& (chan
->backend
.buf_size
- 1);
50 /* subbuf_offset mask selects the offset within the current subbuffer. */
52 unsigned long subbuf_offset(unsigned long offset
, struct channel
*chan
)
54 return offset
& (chan
->backend
.subbuf_size
- 1);
57 /* subbuf_trunc mask selects the subbuffer number. */
59 unsigned long subbuf_trunc(unsigned long offset
, struct channel
*chan
)
61 return offset
& ~(chan
->backend
.subbuf_size
- 1);
64 /* subbuf_align aligns the offset to the next subbuffer. */
66 unsigned long subbuf_align(unsigned long offset
, struct channel
*chan
)
68 return (offset
+ chan
->backend
.subbuf_size
)
69 & ~(chan
->backend
.subbuf_size
- 1);
72 /* subbuf_index returns the index of the current subbuffer within the buffer. */
74 unsigned long subbuf_index(unsigned long offset
, struct channel
*chan
)
76 return buf_offset(offset
, chan
) >> chan
->backend
.subbuf_size_order
;
80 * Last TSC comparison functions. Check if the current TSC overflows tsc_bits
81 * bits from the last TSC read. When overflows are detected, the full 64-bit
82 * timestamp counter should be written in the record header. Reads and writes
83 * last_tsc atomically.
86 #if (CAA_BITS_PER_LONG == 32)
88 void save_last_tsc(const struct lib_ring_buffer_config
*config
,
89 struct lib_ring_buffer
*buf
, u64 tsc
)
91 if (config
->tsc_bits
== 0 || config
->tsc_bits
== 64)
95 * Ensure the compiler performs this update in a single instruction.
97 v_set(config
, &buf
->last_tsc
, (unsigned long)(tsc
>> config
->tsc_bits
));
101 int last_tsc_overflow(const struct lib_ring_buffer_config
*config
,
102 struct lib_ring_buffer
*buf
, u64 tsc
)
104 unsigned long tsc_shifted
;
106 if (config
->tsc_bits
== 0 || config
->tsc_bits
== 64)
109 tsc_shifted
= (unsigned long)(tsc
>> config
->tsc_bits
);
110 if (unlikely(tsc_shifted
111 - (unsigned long)v_read(config
, &buf
->last_tsc
)))
118 void save_last_tsc(const struct lib_ring_buffer_config
*config
,
119 struct lib_ring_buffer
*buf
, u64 tsc
)
121 if (config
->tsc_bits
== 0 || config
->tsc_bits
== 64)
124 v_set(config
, &buf
->last_tsc
, (unsigned long)tsc
);
128 int last_tsc_overflow(const struct lib_ring_buffer_config
*config
,
129 struct lib_ring_buffer
*buf
, u64 tsc
)
131 if (config
->tsc_bits
== 0 || config
->tsc_bits
== 64)
134 if (unlikely((tsc
- v_read(config
, &buf
->last_tsc
))
135 >> config
->tsc_bits
))
143 int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx
*ctx
);
146 void lib_ring_buffer_switch_slow(struct lib_ring_buffer
*buf
,
147 enum switch_mode mode
);
149 /* Buffer write helpers */
152 void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer
*buf
,
153 struct channel
*chan
,
154 unsigned long offset
)
156 unsigned long consumed_old
, consumed_new
;
159 consumed_old
= uatomic_read(&buf
->consumed
);
161 * If buffer is in overwrite mode, push the reader consumed
162 * count if the write position has reached it and we are not
163 * at the first iteration (don't push the reader farther than
164 * the writer). This operation can be done concurrently by many
165 * writers in the same buffer, the writer being at the farthest
166 * write position sub-buffer index in the buffer being the one
167 * which will win this loop.
169 if (unlikely(subbuf_trunc(offset
, chan
)
170 - subbuf_trunc(consumed_old
, chan
)
171 >= chan
->backend
.buf_size
))
172 consumed_new
= subbuf_align(consumed_old
, chan
);
175 } while (unlikely(uatomic_cmpxchg(&buf
->consumed
, consumed_old
,
176 consumed_new
) != consumed_old
));
180 void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config
*config
,
181 struct lib_ring_buffer
*buf
,
182 unsigned long commit_count
,
185 if (config
->oops
== RING_BUFFER_OOPS_CONSISTENCY
)
186 v_set(config
, &shmp(buf
->commit_hot
)[idx
].seq
, commit_count
);
190 int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config
*config
,
191 struct lib_ring_buffer
*buf
,
192 struct channel
*chan
)
194 unsigned long consumed_old
, consumed_idx
, commit_count
, write_offset
;
196 consumed_old
= uatomic_read(&buf
->consumed
);
197 consumed_idx
= subbuf_index(consumed_old
, chan
);
198 commit_count
= v_read(config
, &shmp(buf
->commit_cold
)[consumed_idx
].cc_sb
);
200 * No memory barrier here, since we are only interested
201 * in a statistically correct polling result. The next poll will
202 * get the data is we are racing. The mb() that ensures correct
203 * memory order is in get_subbuf.
205 write_offset
= v_read(config
, &buf
->offset
);
208 * Check that the subbuffer we are trying to consume has been
209 * already fully committed.
212 if (((commit_count
- chan
->backend
.subbuf_size
)
213 & chan
->commit_count_mask
)
214 - (buf_trunc(consumed_old
, chan
)
215 >> chan
->backend
.num_subbuf_order
)
220 * Check that we are not about to read the same subbuffer in
221 * which the writer head is.
223 if (subbuf_trunc(write_offset
, chan
) - subbuf_trunc(consumed_old
, chan
)
232 int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config
*config
,
233 struct lib_ring_buffer
*buf
,
234 struct channel
*chan
)
236 return !!subbuf_offset(v_read(config
, &buf
->offset
), chan
);
240 unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config
*config
,
241 struct lib_ring_buffer
*buf
,
244 return subbuffer_get_data_size(config
, &buf
->backend
, idx
);
248 * Check if all space reservation in a buffer have been committed. This helps
249 * knowing if an execution context is nested (for per-cpu buffers only).
250 * This is a very specific ftrace use-case, so we keep this as "internal" API.
253 int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config
*config
,
254 struct lib_ring_buffer
*buf
,
255 struct channel
*chan
)
257 unsigned long offset
, idx
, commit_count
;
259 CHAN_WARN_ON(chan
, config
->alloc
!= RING_BUFFER_ALLOC_PER_CPU
);
260 CHAN_WARN_ON(chan
, config
->sync
!= RING_BUFFER_SYNC_PER_CPU
);
263 * Read offset and commit count in a loop so they are both read
264 * atomically wrt interrupts. By deal with interrupt concurrency by
265 * restarting both reads if the offset has been pushed. Note that given
266 * we only have to deal with interrupt concurrency here, an interrupt
267 * modifying the commit count will also modify "offset", so it is safe
268 * to only check for offset modifications.
271 offset
= v_read(config
, &buf
->offset
);
272 idx
= subbuf_index(offset
, chan
);
273 commit_count
= v_read(config
, &shmp(buf
->commit_hot
)[idx
].cc
);
274 } while (offset
!= v_read(config
, &buf
->offset
));
276 return ((buf_trunc(offset
, chan
) >> chan
->backend
.num_subbuf_order
)
277 - (commit_count
& chan
->commit_count_mask
) == 0);
281 void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config
*config
,
282 struct lib_ring_buffer
*buf
,
283 struct channel
*chan
,
284 unsigned long offset
,
285 unsigned long commit_count
,
288 unsigned long old_commit_count
= commit_count
289 - chan
->backend
.subbuf_size
;
292 /* Check if all commits have been done */
293 if (unlikely((buf_trunc(offset
, chan
) >> chan
->backend
.num_subbuf_order
)
294 - (old_commit_count
& chan
->commit_count_mask
) == 0)) {
296 * If we succeeded at updating cc_sb below, we are the subbuffer
297 * writer delivering the subbuffer. Deals with concurrent
298 * updates of the "cc" value without adding a add_return atomic
299 * operation to the fast path.
301 * We are doing the delivery in two steps:
302 * - First, we cmpxchg() cc_sb to the new value
303 * old_commit_count + 1. This ensures that we are the only
304 * subbuffer user successfully filling the subbuffer, but we
305 * do _not_ set the cc_sb value to "commit_count" yet.
306 * Therefore, other writers that would wrap around the ring
307 * buffer and try to start writing to our subbuffer would
308 * have to drop records, because it would appear as
310 * We therefore have exclusive access to the subbuffer control
311 * structures. This mutual exclusion with other writers is
312 * crucially important to perform record overruns count in
313 * flight recorder mode locklessly.
314 * - When we are ready to release the subbuffer (either for
315 * reading or for overrun by other writers), we simply set the
316 * cc_sb value to "commit_count" and perform delivery.
318 * The subbuffer size is least 2 bytes (minimum size: 1 page).
319 * This guarantees that old_commit_count + 1 != commit_count.
321 if (likely(v_cmpxchg(config
, &shmp(buf
->commit_cold
)[idx
].cc_sb
,
322 old_commit_count
, old_commit_count
+ 1)
323 == old_commit_count
)) {
325 * Start of exclusive subbuffer access. We are
326 * guaranteed to be the last writer in this subbuffer
327 * and any other writer trying to access this subbuffer
328 * in this state is required to drop records.
330 tsc
= config
->cb
.ring_buffer_clock_read(chan
);
332 subbuffer_get_records_count(config
,
334 &buf
->records_count
);
336 subbuffer_count_records_overrun(config
,
339 &buf
->records_overrun
);
340 config
->cb
.buffer_end(buf
, tsc
, idx
,
341 lib_ring_buffer_get_data_size(config
,
346 * Set noref flag and offset for this subbuffer id.
347 * Contains a memory barrier that ensures counter stores
348 * are ordered before set noref and offset.
350 lib_ring_buffer_set_noref_offset(config
, &buf
->backend
, idx
,
351 buf_trunc_val(offset
, chan
));
354 * Order set_noref and record counter updates before the
355 * end of subbuffer exclusive access. Orders with
356 * respect to writers coming into the subbuffer after
357 * wrap around, and also order wrt concurrent readers.
360 /* End of exclusive subbuffer access */
361 v_set(config
, &shmp(buf
->commit_cold
)[idx
].cc_sb
,
363 lib_ring_buffer_vmcore_check_deliver(config
, buf
,
367 * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
369 if (config
->wakeup
== RING_BUFFER_WAKEUP_BY_WRITER
370 && uatomic_read(&buf
->active_readers
)
371 && lib_ring_buffer_poll_deliver(config
, buf
, chan
)) {
372 //wake_up_interruptible(&buf->read_wait);
373 //wake_up_interruptible(&chan->read_wait);
381 * lib_ring_buffer_write_commit_counter
383 * For flight recording. must be called after commit.
384 * This function increments the subbuffer's commit_seq counter each time the
385 * commit count reaches back the reserve offset (modulo subbuffer size). It is
386 * useful for crash dump.
389 void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config
*config
,
390 struct lib_ring_buffer
*buf
,
391 struct channel
*chan
,
393 unsigned long buf_offset
,
394 unsigned long commit_count
,
397 unsigned long offset
, commit_seq_old
;
399 if (config
->oops
!= RING_BUFFER_OOPS_CONSISTENCY
)
402 offset
= buf_offset
+ slot_size
;
405 * subbuf_offset includes commit_count_mask. We can simply
406 * compare the offsets within the subbuffer without caring about
407 * buffer full/empty mismatch because offset is never zero here
408 * (subbuffer header and record headers have non-zero length).
410 if (unlikely(subbuf_offset(offset
- commit_count
, chan
)))
413 commit_seq_old
= v_read(config
, &shmp(buf
->commit_hot
)[idx
].seq
);
414 while ((long) (commit_seq_old
- commit_count
) < 0)
415 commit_seq_old
= v_cmpxchg(config
, &shmp(buf
->commit_hot
)[idx
].seq
,
416 commit_seq_old
, commit_count
);
419 extern int lib_ring_buffer_create(struct lib_ring_buffer
*buf
,
420 struct channel_backend
*chanb
, int cpu
,
421 struct shm_header
*shm_header
);
422 extern void lib_ring_buffer_free(struct lib_ring_buffer
*buf
);
424 /* Keep track of trap nesting inside ring buffer code */
425 extern __thread
unsigned int lib_ring_buffer_nesting
;
427 #endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */