2 * ring_buffer_backend.c
4 * Copyright (C) 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * Dual LGPL v2.1/GPL v2 license.
13 #include <ust/ringbuffer-config.h>
20 * lib_ring_buffer_backend_allocate - allocate a channel buffer
21 * @config: ring buffer instance configuration
22 * @buf: the buffer struct
23 * @size: total size of the buffer
24 * @num_subbuf: number of subbuffers
25 * @extra_reader_sb: need extra subbuffer for reader
28 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config
*config
,
29 struct lib_ring_buffer_backend
*bufb
,
30 size_t size
, size_t num_subbuf
,
32 struct shm_handle
*handle
,
33 struct shm_object
*shmobj
)
35 struct channel_backend
*chanb
= &shmp(handle
, bufb
->chan
)->backend
;
36 unsigned long subbuf_size
, mmap_offset
= 0;
37 unsigned long num_subbuf_alloc
;
40 subbuf_size
= chanb
->subbuf_size
;
41 num_subbuf_alloc
= num_subbuf
;
46 align_shm(shmobj
, __alignof__(struct lib_ring_buffer_backend_pages_shmp
));
47 set_shmp(bufb
->array
, zalloc_shm(shmobj
,
48 sizeof(struct lib_ring_buffer_backend_pages_shmp
) * num_subbuf_alloc
));
49 if (unlikely(!shmp(handle
, bufb
->array
)))
53 * This is the largest element (the buffer pages) which needs to
54 * be aligned on PAGE_SIZE.
56 align_shm(shmobj
, PAGE_SIZE
);
57 set_shmp(bufb
->memory_map
, zalloc_shm(shmobj
,
58 subbuf_size
* num_subbuf_alloc
));
59 if (unlikely(!shmp(handle
, bufb
->memory_map
)))
60 goto memory_map_error
;
62 /* Allocate backend pages array elements */
63 for (i
= 0; i
< num_subbuf_alloc
; i
++) {
64 align_shm(shmobj
, __alignof__(struct lib_ring_buffer_backend_pages
));
65 set_shmp(shmp(handle
, bufb
->array
)[i
].shmp
,
67 sizeof(struct lib_ring_buffer_backend_pages
)));
68 if (!shmp(handle
, shmp(handle
, bufb
->array
)[i
].shmp
))
72 /* Allocate write-side subbuffer table */
73 align_shm(shmobj
, __alignof__(struct lib_ring_buffer_backend_subbuffer
));
74 set_shmp(bufb
->buf_wsb
, zalloc_shm(shmobj
,
75 sizeof(struct lib_ring_buffer_backend_subbuffer
)
77 if (unlikely(!shmp(handle
, bufb
->buf_wsb
)))
80 for (i
= 0; i
< num_subbuf
; i
++)
81 shmp(handle
, bufb
->buf_wsb
)[i
].id
= subbuffer_id(config
, 0, 1, i
);
83 /* Assign read-side subbuffer table */
85 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1,
86 num_subbuf_alloc
- 1);
88 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1, 0);
90 /* Assign pages to page index */
91 for (i
= 0; i
< num_subbuf_alloc
; i
++) {
94 ref
.index
= bufb
->memory_map
._ref
.index
;
95 ref
.offset
= bufb
->memory_map
._ref
.offset
;
96 ref
.offset
+= i
* subbuf_size
;
98 set_shmp(shmp(handle
, shmp(handle
, bufb
->array
)[i
].shmp
)->p
,
100 if (config
->output
== RING_BUFFER_MMAP
) {
101 shmp(handle
, shmp(handle
, bufb
->array
)[i
].shmp
)->mmap_offset
= mmap_offset
;
102 mmap_offset
+= subbuf_size
;
109 /* bufb->array[i] will be freed by shm teardown */
111 /* bufb->array will be freed by shm teardown */
116 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend
*bufb
,
117 struct channel_backend
*chanb
, int cpu
,
118 struct shm_handle
*handle
,
119 struct shm_object
*shmobj
)
121 const struct lib_ring_buffer_config
*config
= chanb
->config
;
123 set_shmp(bufb
->chan
, handle
->chan
._ref
);
126 return lib_ring_buffer_backend_allocate(config
, bufb
, chanb
->buf_size
,
128 chanb
->extra_reader_sb
,
132 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend
*bufb
)
134 /* bufb->buf_wsb will be freed by shm teardown */
135 /* bufb->array[i] will be freed by shm teardown */
136 /* bufb->array will be freed by shm teardown */
140 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend
*bufb
,
141 struct shm_handle
*handle
)
143 struct channel_backend
*chanb
= &shmp(handle
, bufb
->chan
)->backend
;
144 const struct lib_ring_buffer_config
*config
= chanb
->config
;
145 unsigned long num_subbuf_alloc
;
148 num_subbuf_alloc
= chanb
->num_subbuf
;
149 if (chanb
->extra_reader_sb
)
152 for (i
= 0; i
< chanb
->num_subbuf
; i
++)
153 shmp(handle
, bufb
->buf_wsb
)[i
].id
= subbuffer_id(config
, 0, 1, i
);
154 if (chanb
->extra_reader_sb
)
155 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1,
156 num_subbuf_alloc
- 1);
158 bufb
->buf_rsb
.id
= subbuffer_id(config
, 0, 1, 0);
160 for (i
= 0; i
< num_subbuf_alloc
; i
++) {
161 /* Don't reset mmap_offset */
162 v_set(config
, &shmp(handle
, shmp(handle
, bufb
->array
)[i
].shmp
)->records_commit
, 0);
163 v_set(config
, &shmp(handle
, shmp(handle
, bufb
->array
)[i
].shmp
)->records_unread
, 0);
164 shmp(handle
, shmp(handle
, bufb
->array
)[i
].shmp
)->data_size
= 0;
165 /* Don't reset backend page and virt addresses */
167 /* Don't reset num_pages_per_subbuf, cpu, allocated */
168 v_set(config
, &bufb
->records_read
, 0);
172 * The frontend is responsible for also calling ring_buffer_backend_reset for
173 * each buffer when calling channel_backend_reset.
175 void channel_backend_reset(struct channel_backend
*chanb
)
177 struct channel
*chan
= caa_container_of(chanb
, struct channel
, backend
);
178 const struct lib_ring_buffer_config
*config
= chanb
->config
;
181 * Don't reset buf_size, subbuf_size, subbuf_size_order,
182 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
183 * priv, notifiers, config, cpumask and name.
185 chanb
->start_tsc
= config
->cb
.ring_buffer_clock_read(chan
);
189 * channel_backend_init - initialize a channel backend
190 * @chanb: channel backend
191 * @name: channel name
192 * @config: client ring buffer configuration
193 * @priv: client private data
194 * @parent: dentry of parent directory, %NULL for root directory
195 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
196 * @num_subbuf: number of sub-buffers (power of 2)
197 * @shm_handle: shared memory handle
199 * Returns channel pointer if successful, %NULL otherwise.
201 * Creates per-cpu channel buffers using the sizes and attributes
202 * specified. The created channel buffer files will be named
203 * name_0...name_N-1. File permissions will be %S_IRUSR.
205 * Called with CPU hotplug disabled.
207 int channel_backend_init(struct channel_backend
*chanb
,
209 const struct lib_ring_buffer_config
*config
,
210 void *priv
, size_t subbuf_size
, size_t num_subbuf
,
211 struct shm_handle
*handle
)
213 struct channel
*chan
= caa_container_of(chanb
, struct channel
, backend
);
216 size_t shmsize
= 0, bufshmsize
= 0, num_subbuf_alloc
;
221 if (!(subbuf_size
&& num_subbuf
))
224 /* Check that the subbuffer size is larger than a page. */
225 if (subbuf_size
< PAGE_SIZE
)
229 * Make sure the number of subbuffers and subbuffer size are power of 2.
231 CHAN_WARN_ON(chanb
, hweight32(subbuf_size
) != 1);
232 CHAN_WARN_ON(chanb
, hweight32(num_subbuf
) != 1);
234 ret
= subbuffer_id_check_index(config
, num_subbuf
);
239 chanb
->buf_size
= num_subbuf
* subbuf_size
;
240 chanb
->subbuf_size
= subbuf_size
;
241 chanb
->buf_size_order
= get_count_order(chanb
->buf_size
);
242 chanb
->subbuf_size_order
= get_count_order(subbuf_size
);
243 chanb
->num_subbuf_order
= get_count_order(num_subbuf
);
244 chanb
->extra_reader_sb
=
245 (config
->mode
== RING_BUFFER_OVERWRITE
) ? 1 : 0;
246 chanb
->num_subbuf
= num_subbuf
;
247 strncpy(chanb
->name
, name
, NAME_MAX
);
248 chanb
->name
[NAME_MAX
- 1] = '\0';
249 chanb
->config
= config
;
251 /* Per-cpu buffer size: control (prior to backend) */
252 shmsize
= offset_align(shmsize
, __alignof__(struct lib_ring_buffer
));
253 shmsize
+= sizeof(struct lib_ring_buffer
);
255 /* Per-cpu buffer size: backend */
256 /* num_subbuf + 1 is the worse case */
257 num_subbuf_alloc
= num_subbuf
+ 1;
258 shmsize
+= offset_align(shmsize
, __alignof__(struct lib_ring_buffer_backend_pages_shmp
));
259 shmsize
+= sizeof(struct lib_ring_buffer_backend_pages_shmp
) * num_subbuf_alloc
;
260 shmsize
+= offset_align(bufshmsize
, PAGE_SIZE
);
261 shmsize
+= subbuf_size
* num_subbuf_alloc
;
262 shmsize
+= offset_align(bufshmsize
, __alignof__(struct lib_ring_buffer_backend_pages
));
263 shmsize
+= sizeof(struct lib_ring_buffer_backend_pages
) * num_subbuf_alloc
;
264 shmsize
+= offset_align(bufshmsize
, __alignof__(struct lib_ring_buffer_backend_subbuffer
));
265 shmsize
+= sizeof(struct lib_ring_buffer_backend_subbuffer
) * num_subbuf
;
266 /* Per-cpu buffer size: control (after backend) */
267 shmsize
+= offset_align(shmsize
, __alignof__(struct commit_counters_hot
));
268 shmsize
+= sizeof(struct commit_counters_hot
) * num_subbuf
;
269 shmsize
+= offset_align(shmsize
, __alignof__(struct commit_counters_cold
));
270 shmsize
+= sizeof(struct commit_counters_cold
) * num_subbuf
;
272 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
273 struct lib_ring_buffer
*buf
;
275 * We need to allocate for all possible cpus.
277 for_each_possible_cpu(i
) {
278 struct shm_object
*shmobj
;
280 shmobj
= shm_object_table_append(handle
->table
, shmsize
);
281 align_shm(shmobj
, __alignof__(struct lib_ring_buffer
));
282 set_shmp(chanb
->buf
[i
].shmp
, zalloc_shm(shmobj
, sizeof(struct lib_ring_buffer
)));
283 buf
= shmp(handle
, chanb
->buf
[i
].shmp
);
286 set_shmp(buf
->self
, chanb
->buf
[i
].shmp
._ref
);
287 ret
= lib_ring_buffer_create(buf
, chanb
, i
,
290 goto free_bufs
; /* cpu hotplug locked */
293 struct shm_object
*shmobj
;
294 struct lib_ring_buffer
*buf
;
296 shmobj
= shm_object_table_append(handle
->table
, shmsize
);
297 align_shm(shmobj
, __alignof__(struct lib_ring_buffer
));
298 set_shmp(chanb
->buf
[0].shmp
, zalloc_shm(shmobj
, sizeof(struct lib_ring_buffer
)));
299 buf
= shmp(handle
, chanb
->buf
[0].shmp
);
302 ret
= lib_ring_buffer_create(buf
, chanb
, -1,
307 chanb
->start_tsc
= config
->cb
.ring_buffer_clock_read(chan
);
312 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
313 for_each_possible_cpu(i
) {
314 struct lib_ring_buffer
*buf
= shmp(handle
, chanb
->buf
[i
].shmp
);
316 if (!buf
->backend
.allocated
)
318 lib_ring_buffer_free(buf
, handle
);
321 /* We only free the buffer data upon shm teardown */
327 * channel_backend_free - destroy the channel
330 * Destroy all channel buffers and frees the channel.
332 void channel_backend_free(struct channel_backend
*chanb
,
333 struct shm_handle
*handle
)
335 const struct lib_ring_buffer_config
*config
= chanb
->config
;
338 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
339 for_each_possible_cpu(i
) {
340 struct lib_ring_buffer
*buf
= shmp(handle
, chanb
->buf
[i
].shmp
);
342 if (!buf
->backend
.allocated
)
344 lib_ring_buffer_free(buf
, handle
);
347 struct lib_ring_buffer
*buf
= shmp(handle
, chanb
->buf
[0].shmp
);
349 CHAN_WARN_ON(chanb
, !buf
->backend
.allocated
);
350 lib_ring_buffer_free(buf
, handle
);
352 /* We only free the buffer data upon shm teardown */
356 * lib_ring_buffer_read - read data from ring_buffer_buffer.
357 * @bufb : buffer backend
358 * @offset : offset within the buffer
359 * @dest : destination address
360 * @len : length to copy to destination
362 * Should be protected by get_subbuf/put_subbuf.
363 * Returns the length copied.
365 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend
*bufb
, size_t offset
,
366 void *dest
, size_t len
, struct shm_handle
*handle
)
368 struct channel_backend
*chanb
= &shmp(handle
, bufb
->chan
)->backend
;
369 const struct lib_ring_buffer_config
*config
= chanb
->config
;
371 struct lib_ring_buffer_backend_pages_shmp
*rpages
;
372 unsigned long sb_bindex
, id
;
375 offset
&= chanb
->buf_size
- 1;
379 id
= bufb
->buf_rsb
.id
;
380 sb_bindex
= subbuffer_id_get_index(config
, id
);
381 rpages
= &shmp(handle
, bufb
->array
)[sb_bindex
];
383 * Underlying layer should never ask for reads across
386 CHAN_WARN_ON(chanb
, offset
>= chanb
->buf_size
);
387 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
388 && subbuffer_id_is_noref(config
, id
));
389 memcpy(dest
, shmp(handle
, shmp(handle
, rpages
->shmp
)->p
) + (offset
& ~(chanb
->subbuf_size
- 1)), len
);
394 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
395 * @bufb : buffer backend
396 * @offset : offset within the buffer
397 * @dest : destination address
398 * @len : destination's length
400 * return string's length
401 * Should be protected by get_subbuf/put_subbuf.
403 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend
*bufb
, size_t offset
,
404 void *dest
, size_t len
, struct shm_handle
*handle
)
406 struct channel_backend
*chanb
= &shmp(handle
, bufb
->chan
)->backend
;
407 const struct lib_ring_buffer_config
*config
= chanb
->config
;
408 ssize_t string_len
, orig_offset
;
410 struct lib_ring_buffer_backend_pages_shmp
*rpages
;
411 unsigned long sb_bindex
, id
;
413 offset
&= chanb
->buf_size
- 1;
414 orig_offset
= offset
;
415 id
= bufb
->buf_rsb
.id
;
416 sb_bindex
= subbuffer_id_get_index(config
, id
);
417 rpages
= &shmp(handle
, bufb
->array
)[sb_bindex
];
419 * Underlying layer should never ask for reads across
422 CHAN_WARN_ON(chanb
, offset
>= chanb
->buf_size
);
423 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
424 && subbuffer_id_is_noref(config
, id
));
425 str
= (char *)shmp(handle
, shmp(handle
, rpages
->shmp
)->p
) + (offset
& ~(chanb
->subbuf_size
- 1));
426 string_len
= strnlen(str
, len
);
428 memcpy(dest
, str
, string_len
);
429 ((char *)dest
)[0] = 0;
431 return offset
- orig_offset
;
435 * lib_ring_buffer_read_offset_address - get address of a buffer location
436 * @bufb : buffer backend
437 * @offset : offset within the buffer.
439 * Return the address where a given offset is located (for read).
440 * Should be used to get the current subbuffer header pointer. Given we know
441 * it's never on a page boundary, it's safe to write directly to this address,
442 * as long as the write is never bigger than a page size.
444 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend
*bufb
,
446 struct shm_handle
*handle
)
448 struct lib_ring_buffer_backend_pages_shmp
*rpages
;
449 struct channel_backend
*chanb
= &shmp(handle
, bufb
->chan
)->backend
;
450 const struct lib_ring_buffer_config
*config
= chanb
->config
;
451 unsigned long sb_bindex
, id
;
453 offset
&= chanb
->buf_size
- 1;
454 id
= bufb
->buf_rsb
.id
;
455 sb_bindex
= subbuffer_id_get_index(config
, id
);
456 rpages
= &shmp(handle
, bufb
->array
)[sb_bindex
];
457 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
458 && subbuffer_id_is_noref(config
, id
));
459 return shmp(handle
, shmp(handle
, rpages
->shmp
)->p
) + (offset
& ~(chanb
->subbuf_size
- 1));
463 * lib_ring_buffer_offset_address - get address of a location within the buffer
464 * @bufb : buffer backend
465 * @offset : offset within the buffer.
467 * Return the address where a given offset is located.
468 * Should be used to get the current subbuffer header pointer. Given we know
469 * it's always at the beginning of a page, it's safe to write directly to this
470 * address, as long as the write is never bigger than a page size.
472 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend
*bufb
,
474 struct shm_handle
*handle
)
477 struct lib_ring_buffer_backend_pages_shmp
*rpages
;
478 struct channel_backend
*chanb
= &shmp(handle
, bufb
->chan
)->backend
;
479 const struct lib_ring_buffer_config
*config
= chanb
->config
;
480 unsigned long sb_bindex
, id
;
482 offset
&= chanb
->buf_size
- 1;
483 sbidx
= offset
>> chanb
->subbuf_size_order
;
484 id
= shmp(handle
, bufb
->buf_wsb
)[sbidx
].id
;
485 sb_bindex
= subbuffer_id_get_index(config
, id
);
486 rpages
= &shmp(handle
, bufb
->array
)[sb_bindex
];
487 CHAN_WARN_ON(chanb
, config
->mode
== RING_BUFFER_OVERWRITE
488 && subbuffer_id_is_noref(config
, id
));
489 return shmp(handle
, shmp(handle
, rpages
->shmp
)->p
) + (offset
& ~(chanb
->subbuf_size
- 1));