c7f2fe9d7b6226b33738d2fbabb379b42999ef93
[lttng-modules.git] / lib / ringbuffer / ring_buffer_backend.c
1 /*
2 * ring_buffer_backend.c
3 *
4 * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; only
9 * version 2.1 of the License.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 #include <linux/stddef.h>
22 #include <linux/module.h>
23 #include <linux/string.h>
24 #include <linux/bitops.h>
25 #include <linux/delay.h>
26 #include <linux/errno.h>
27 #include <linux/slab.h>
28 #include <linux/cpu.h>
29 #include <linux/mm.h>
30
31 #include <wrapper/vmalloc.h> /* for wrapper_vmalloc_sync_all() */
32 #include <wrapper/ringbuffer/config.h>
33 #include <wrapper/ringbuffer/backend.h>
34 #include <wrapper/ringbuffer/frontend.h>
35
36 /**
37 * lib_ring_buffer_backend_allocate - allocate a channel buffer
38 * @config: ring buffer instance configuration
39 * @buf: the buffer struct
40 * @size: total size of the buffer
41 * @num_subbuf: number of subbuffers
42 * @extra_reader_sb: need extra subbuffer for reader
43 */
44 static
45 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
46 struct lib_ring_buffer_backend *bufb,
47 size_t size, size_t num_subbuf,
48 int extra_reader_sb)
49 {
50 struct channel_backend *chanb = &bufb->chan->backend;
51 unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
52 unsigned long subbuf_size, mmap_offset = 0;
53 unsigned long num_subbuf_alloc;
54 struct page **pages;
55 unsigned long i;
56
57 num_pages = size >> PAGE_SHIFT;
58 num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
59 subbuf_size = chanb->subbuf_size;
60 num_subbuf_alloc = num_subbuf;
61
62 if (extra_reader_sb) {
63 num_pages += num_pages_per_subbuf; /* Add pages for reader */
64 num_subbuf_alloc++;
65 }
66
67 pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
68 1 << INTERNODE_CACHE_SHIFT),
69 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
70 if (unlikely(!pages))
71 goto pages_error;
72
73 bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
74 * num_subbuf_alloc,
75 1 << INTERNODE_CACHE_SHIFT),
76 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
77 if (unlikely(!bufb->array))
78 goto array_error;
79
80 for (i = 0; i < num_pages; i++) {
81 pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
82 GFP_KERNEL | __GFP_ZERO, 0);
83 if (unlikely(!pages[i]))
84 goto depopulate;
85 }
86 bufb->num_pages_per_subbuf = num_pages_per_subbuf;
87
88 /* Allocate backend pages array elements */
89 for (i = 0; i < num_subbuf_alloc; i++) {
90 bufb->array[i] =
91 kzalloc_node(ALIGN(
92 sizeof(struct lib_ring_buffer_backend_pages) +
93 sizeof(struct lib_ring_buffer_backend_page)
94 * num_pages_per_subbuf,
95 1 << INTERNODE_CACHE_SHIFT),
96 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
97 if (!bufb->array[i])
98 goto free_array;
99 }
100
101 /* Allocate write-side subbuffer table */
102 bufb->buf_wsb = kzalloc_node(ALIGN(
103 sizeof(struct lib_ring_buffer_backend_subbuffer)
104 * num_subbuf,
105 1 << INTERNODE_CACHE_SHIFT),
106 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
107 if (unlikely(!bufb->buf_wsb))
108 goto free_array;
109
110 for (i = 0; i < num_subbuf; i++)
111 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
112
113 /* Assign read-side subbuffer table */
114 if (extra_reader_sb)
115 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
116 num_subbuf_alloc - 1);
117 else
118 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
119
120 /* Allocate subbuffer packet counter table */
121 bufb->buf_cnt = kzalloc_node(ALIGN(
122 sizeof(struct lib_ring_buffer_backend_counts)
123 * num_subbuf,
124 1 << INTERNODE_CACHE_SHIFT),
125 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
126 if (unlikely(!bufb->buf_cnt))
127 goto free_wsb;
128
129 /* Assign pages to page index */
130 for (i = 0; i < num_subbuf_alloc; i++) {
131 for (j = 0; j < num_pages_per_subbuf; j++) {
132 CHAN_WARN_ON(chanb, page_idx > num_pages);
133 bufb->array[i]->p[j].virt = page_address(pages[page_idx]);
134 bufb->array[i]->p[j].pfn = page_to_pfn(pages[page_idx]);
135 page_idx++;
136 }
137 if (config->output == RING_BUFFER_MMAP) {
138 bufb->array[i]->mmap_offset = mmap_offset;
139 mmap_offset += subbuf_size;
140 }
141 }
142
143 /*
144 * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
145 * will not fault.
146 */
147 wrapper_vmalloc_sync_all();
148 kfree(pages);
149 return 0;
150
151 free_wsb:
152 kfree(bufb->buf_wsb);
153 free_array:
154 for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
155 kfree(bufb->array[i]);
156 depopulate:
157 /* Free all allocated pages */
158 for (i = 0; (i < num_pages && pages[i]); i++)
159 __free_page(pages[i]);
160 kfree(bufb->array);
161 array_error:
162 kfree(pages);
163 pages_error:
164 return -ENOMEM;
165 }
166
167 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
168 struct channel_backend *chanb, int cpu)
169 {
170 const struct lib_ring_buffer_config *config = &chanb->config;
171
172 bufb->chan = container_of(chanb, struct channel, backend);
173 bufb->cpu = cpu;
174
175 return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
176 chanb->num_subbuf,
177 chanb->extra_reader_sb);
178 }
179
180 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
181 {
182 struct channel_backend *chanb = &bufb->chan->backend;
183 unsigned long i, j, num_subbuf_alloc;
184
185 num_subbuf_alloc = chanb->num_subbuf;
186 if (chanb->extra_reader_sb)
187 num_subbuf_alloc++;
188
189 kfree(bufb->buf_wsb);
190 kfree(bufb->buf_cnt);
191 for (i = 0; i < num_subbuf_alloc; i++) {
192 for (j = 0; j < bufb->num_pages_per_subbuf; j++)
193 __free_page(pfn_to_page(bufb->array[i]->p[j].pfn));
194 kfree(bufb->array[i]);
195 }
196 kfree(bufb->array);
197 bufb->allocated = 0;
198 }
199
200 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
201 {
202 struct channel_backend *chanb = &bufb->chan->backend;
203 const struct lib_ring_buffer_config *config = &chanb->config;
204 unsigned long num_subbuf_alloc;
205 unsigned int i;
206
207 num_subbuf_alloc = chanb->num_subbuf;
208 if (chanb->extra_reader_sb)
209 num_subbuf_alloc++;
210
211 for (i = 0; i < chanb->num_subbuf; i++)
212 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
213 if (chanb->extra_reader_sb)
214 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
215 num_subbuf_alloc - 1);
216 else
217 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
218
219 for (i = 0; i < num_subbuf_alloc; i++) {
220 /* Don't reset mmap_offset */
221 v_set(config, &bufb->array[i]->records_commit, 0);
222 v_set(config, &bufb->array[i]->records_unread, 0);
223 bufb->array[i]->data_size = 0;
224 /* Don't reset backend page and virt addresses */
225 }
226 /* Don't reset num_pages_per_subbuf, cpu, allocated */
227 v_set(config, &bufb->records_read, 0);
228 }
229
230 /*
231 * The frontend is responsible for also calling ring_buffer_backend_reset for
232 * each buffer when calling channel_backend_reset.
233 */
234 void channel_backend_reset(struct channel_backend *chanb)
235 {
236 struct channel *chan = container_of(chanb, struct channel, backend);
237 const struct lib_ring_buffer_config *config = &chanb->config;
238
239 /*
240 * Don't reset buf_size, subbuf_size, subbuf_size_order,
241 * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
242 * priv, notifiers, config, cpumask and name.
243 */
244 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
245 }
246
247 #ifdef CONFIG_HOTPLUG_CPU
248 /**
249 * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
250 * @nb: notifier block
251 * @action: hotplug action to take
252 * @hcpu: CPU number
253 *
254 * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
255 */
256 static
257 int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
258 unsigned long action,
259 void *hcpu)
260 {
261 unsigned int cpu = (unsigned long)hcpu;
262 struct channel_backend *chanb = container_of(nb, struct channel_backend,
263 cpu_hp_notifier);
264 const struct lib_ring_buffer_config *config = &chanb->config;
265 struct lib_ring_buffer *buf;
266 int ret;
267
268 CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
269
270 switch (action) {
271 case CPU_UP_PREPARE:
272 case CPU_UP_PREPARE_FROZEN:
273 buf = per_cpu_ptr(chanb->buf, cpu);
274 ret = lib_ring_buffer_create(buf, chanb, cpu);
275 if (ret) {
276 printk(KERN_ERR
277 "ring_buffer_cpu_hp_callback: cpu %d "
278 "buffer creation failed\n", cpu);
279 return NOTIFY_BAD;
280 }
281 break;
282 case CPU_DEAD:
283 case CPU_DEAD_FROZEN:
284 /* No need to do a buffer switch here, because it will happen
285 * when tracing is stopped, or will be done by switch timer CPU
286 * DEAD callback. */
287 break;
288 }
289 return NOTIFY_OK;
290 }
291 #endif
292
293 /**
294 * channel_backend_init - initialize a channel backend
295 * @chanb: channel backend
296 * @name: channel name
297 * @config: client ring buffer configuration
298 * @priv: client private data
299 * @parent: dentry of parent directory, %NULL for root directory
300 * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
301 * @num_subbuf: number of sub-buffers (power of 2)
302 *
303 * Returns channel pointer if successful, %NULL otherwise.
304 *
305 * Creates per-cpu channel buffers using the sizes and attributes
306 * specified. The created channel buffer files will be named
307 * name_0...name_N-1. File permissions will be %S_IRUSR.
308 *
309 * Called with CPU hotplug disabled.
310 */
311 int channel_backend_init(struct channel_backend *chanb,
312 const char *name,
313 const struct lib_ring_buffer_config *config,
314 void *priv, size_t subbuf_size, size_t num_subbuf)
315 {
316 struct channel *chan = container_of(chanb, struct channel, backend);
317 unsigned int i;
318 int ret;
319
320 if (!name)
321 return -EPERM;
322
323 /* Check that the subbuffer size is larger than a page. */
324 if (subbuf_size < PAGE_SIZE)
325 return -EINVAL;
326
327 /*
328 * Make sure the number of subbuffers and subbuffer size are
329 * power of 2 and nonzero.
330 */
331 if (!subbuf_size || (subbuf_size & (subbuf_size - 1)))
332 return -EINVAL;
333 if (!num_subbuf || (num_subbuf & (num_subbuf - 1)))
334 return -EINVAL;
335 /*
336 * Overwrite mode buffers require at least 2 subbuffers per
337 * buffer.
338 */
339 if (config->mode == RING_BUFFER_OVERWRITE && num_subbuf < 2)
340 return -EINVAL;
341
342 ret = subbuffer_id_check_index(config, num_subbuf);
343 if (ret)
344 return ret;
345
346 chanb->priv = priv;
347 chanb->buf_size = num_subbuf * subbuf_size;
348 chanb->subbuf_size = subbuf_size;
349 chanb->buf_size_order = get_count_order(chanb->buf_size);
350 chanb->subbuf_size_order = get_count_order(subbuf_size);
351 chanb->num_subbuf_order = get_count_order(num_subbuf);
352 chanb->extra_reader_sb =
353 (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
354 chanb->num_subbuf = num_subbuf;
355 strlcpy(chanb->name, name, NAME_MAX);
356 memcpy(&chanb->config, config, sizeof(chanb->config));
357
358 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
359 if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
360 return -ENOMEM;
361 }
362
363 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
364 /* Allocating the buffer per-cpu structures */
365 chanb->buf = alloc_percpu(struct lib_ring_buffer);
366 if (!chanb->buf)
367 goto free_cpumask;
368
369 /*
370 * In case of non-hotplug cpu, if the ring-buffer is allocated
371 * in early initcall, it will not be notified of secondary cpus.
372 * In that off case, we need to allocate for all possible cpus.
373 */
374 #ifdef CONFIG_HOTPLUG_CPU
375 /*
376 * buf->backend.allocated test takes care of concurrent CPU
377 * hotplug.
378 * Priority higher than frontend, so we create the ring buffer
379 * before we start the timer.
380 */
381 chanb->cpu_hp_notifier.notifier_call =
382 lib_ring_buffer_cpu_hp_callback;
383 chanb->cpu_hp_notifier.priority = 5;
384 register_hotcpu_notifier(&chanb->cpu_hp_notifier);
385
386 get_online_cpus();
387 for_each_online_cpu(i) {
388 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
389 chanb, i);
390 if (ret)
391 goto free_bufs; /* cpu hotplug locked */
392 }
393 put_online_cpus();
394 #else
395 for_each_possible_cpu(i) {
396 ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
397 chanb, i);
398 if (ret)
399 goto free_bufs; /* cpu hotplug locked */
400 }
401 #endif
402 } else {
403 chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
404 if (!chanb->buf)
405 goto free_cpumask;
406 ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
407 if (ret)
408 goto free_bufs;
409 }
410 chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
411
412 return 0;
413
414 free_bufs:
415 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
416 for_each_possible_cpu(i) {
417 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
418
419 if (!buf->backend.allocated)
420 continue;
421 lib_ring_buffer_free(buf);
422 }
423 #ifdef CONFIG_HOTPLUG_CPU
424 put_online_cpus();
425 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
426 #endif
427 free_percpu(chanb->buf);
428 } else
429 kfree(chanb->buf);
430 free_cpumask:
431 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
432 free_cpumask_var(chanb->cpumask);
433 return -ENOMEM;
434 }
435
436 /**
437 * channel_backend_unregister_notifiers - unregister notifiers
438 * @chan: the channel
439 *
440 * Holds CPU hotplug.
441 */
442 void channel_backend_unregister_notifiers(struct channel_backend *chanb)
443 {
444 const struct lib_ring_buffer_config *config = &chanb->config;
445
446 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
447 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
448 }
449
450 /**
451 * channel_backend_free - destroy the channel
452 * @chan: the channel
453 *
454 * Destroy all channel buffers and frees the channel.
455 */
456 void channel_backend_free(struct channel_backend *chanb)
457 {
458 const struct lib_ring_buffer_config *config = &chanb->config;
459 unsigned int i;
460
461 if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
462 for_each_possible_cpu(i) {
463 struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
464
465 if (!buf->backend.allocated)
466 continue;
467 lib_ring_buffer_free(buf);
468 }
469 free_cpumask_var(chanb->cpumask);
470 free_percpu(chanb->buf);
471 } else {
472 struct lib_ring_buffer *buf = chanb->buf;
473
474 CHAN_WARN_ON(chanb, !buf->backend.allocated);
475 lib_ring_buffer_free(buf);
476 kfree(buf);
477 }
478 }
479
480 /**
481 * lib_ring_buffer_write - write data to a ring_buffer buffer.
482 * @bufb : buffer backend
483 * @offset : offset within the buffer
484 * @src : source address
485 * @len : length to write
486 * @pagecpy : page size copied so far
487 */
488 void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
489 const void *src, size_t len, size_t pagecpy)
490 {
491 struct channel_backend *chanb = &bufb->chan->backend;
492 const struct lib_ring_buffer_config *config = &chanb->config;
493 size_t sbidx, index;
494 struct lib_ring_buffer_backend_pages *rpages;
495 unsigned long sb_bindex, id;
496
497 do {
498 len -= pagecpy;
499 src += pagecpy;
500 offset += pagecpy;
501 sbidx = offset >> chanb->subbuf_size_order;
502 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
503
504 /*
505 * Underlying layer should never ask for writes across
506 * subbuffers.
507 */
508 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
509
510 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
511 id = bufb->buf_wsb[sbidx].id;
512 sb_bindex = subbuffer_id_get_index(config, id);
513 rpages = bufb->array[sb_bindex];
514 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
515 && subbuffer_id_is_noref(config, id));
516 lib_ring_buffer_do_copy(config,
517 rpages->p[index].virt
518 + (offset & ~PAGE_MASK),
519 src, pagecpy);
520 } while (unlikely(len != pagecpy));
521 }
522 EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
523
524
525 /**
526 * lib_ring_buffer_memset - write len bytes of c to a ring_buffer buffer.
527 * @bufb : buffer backend
528 * @offset : offset within the buffer
529 * @c : the byte to write
530 * @len : length to write
531 * @pagecpy : page size copied so far
532 */
533 void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
534 size_t offset,
535 int c, size_t len, size_t pagecpy)
536 {
537 struct channel_backend *chanb = &bufb->chan->backend;
538 const struct lib_ring_buffer_config *config = &chanb->config;
539 size_t sbidx, index;
540 struct lib_ring_buffer_backend_pages *rpages;
541 unsigned long sb_bindex, id;
542
543 do {
544 len -= pagecpy;
545 offset += pagecpy;
546 sbidx = offset >> chanb->subbuf_size_order;
547 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
548
549 /*
550 * Underlying layer should never ask for writes across
551 * subbuffers.
552 */
553 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
554
555 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
556 id = bufb->buf_wsb[sbidx].id;
557 sb_bindex = subbuffer_id_get_index(config, id);
558 rpages = bufb->array[sb_bindex];
559 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
560 && subbuffer_id_is_noref(config, id));
561 lib_ring_buffer_do_memset(rpages->p[index].virt
562 + (offset & ~PAGE_MASK),
563 c, pagecpy);
564 } while (unlikely(len != pagecpy));
565 }
566 EXPORT_SYMBOL_GPL(_lib_ring_buffer_memset);
567
568 /**
569 * lib_ring_buffer_strcpy - write string data to a ring_buffer buffer.
570 * @bufb : buffer backend
571 * @offset : offset within the buffer
572 * @src : source address
573 * @len : length to write
574 * @pagecpy : page size copied so far
575 * @pad : character to use for padding
576 */
577 void _lib_ring_buffer_strcpy(struct lib_ring_buffer_backend *bufb,
578 size_t offset, const char *src, size_t len,
579 size_t pagecpy, int pad)
580 {
581 struct channel_backend *chanb = &bufb->chan->backend;
582 const struct lib_ring_buffer_config *config = &chanb->config;
583 size_t sbidx, index;
584 struct lib_ring_buffer_backend_pages *rpages;
585 unsigned long sb_bindex, id;
586 int src_terminated = 0;
587
588 CHAN_WARN_ON(chanb, !len);
589 offset += pagecpy;
590 do {
591 len -= pagecpy;
592 if (!src_terminated)
593 src += pagecpy;
594 sbidx = offset >> chanb->subbuf_size_order;
595 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
596
597 /*
598 * Underlying layer should never ask for writes across
599 * subbuffers.
600 */
601 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
602
603 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
604 id = bufb->buf_wsb[sbidx].id;
605 sb_bindex = subbuffer_id_get_index(config, id);
606 rpages = bufb->array[sb_bindex];
607 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
608 && subbuffer_id_is_noref(config, id));
609
610 if (likely(!src_terminated)) {
611 size_t count, to_copy;
612
613 to_copy = pagecpy;
614 if (pagecpy == len)
615 to_copy--; /* Final '\0' */
616 count = lib_ring_buffer_do_strcpy(config,
617 rpages->p[index].virt
618 + (offset & ~PAGE_MASK),
619 src, to_copy);
620 offset += count;
621 /* Padding */
622 if (unlikely(count < to_copy)) {
623 size_t pad_len = to_copy - count;
624
625 /* Next pages will have padding */
626 src_terminated = 1;
627 lib_ring_buffer_do_memset(rpages->p[index].virt
628 + (offset & ~PAGE_MASK),
629 pad, pad_len);
630 offset += pad_len;
631 }
632 } else {
633 size_t pad_len;
634
635 pad_len = pagecpy;
636 if (pagecpy == len)
637 pad_len--; /* Final '\0' */
638 lib_ring_buffer_do_memset(rpages->p[index].virt
639 + (offset & ~PAGE_MASK),
640 pad, pad_len);
641 offset += pad_len;
642 }
643 } while (unlikely(len != pagecpy));
644 /* Ending '\0' */
645 lib_ring_buffer_do_memset(rpages->p[index].virt + (offset & ~PAGE_MASK),
646 '\0', 1);
647 }
648 EXPORT_SYMBOL_GPL(_lib_ring_buffer_strcpy);
649
650 /**
651 * lib_ring_buffer_copy_from_user_inatomic - write user data to a ring_buffer buffer.
652 * @bufb : buffer backend
653 * @offset : offset within the buffer
654 * @src : source address
655 * @len : length to write
656 * @pagecpy : page size copied so far
657 *
658 * This function deals with userspace pointers, it should never be called
659 * directly without having the src pointer checked with access_ok()
660 * previously.
661 */
662 void _lib_ring_buffer_copy_from_user_inatomic(struct lib_ring_buffer_backend *bufb,
663 size_t offset,
664 const void __user *src, size_t len,
665 size_t pagecpy)
666 {
667 struct channel_backend *chanb = &bufb->chan->backend;
668 const struct lib_ring_buffer_config *config = &chanb->config;
669 size_t sbidx, index;
670 struct lib_ring_buffer_backend_pages *rpages;
671 unsigned long sb_bindex, id;
672 int ret;
673
674 do {
675 len -= pagecpy;
676 src += pagecpy;
677 offset += pagecpy;
678 sbidx = offset >> chanb->subbuf_size_order;
679 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
680
681 /*
682 * Underlying layer should never ask for writes across
683 * subbuffers.
684 */
685 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
686
687 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
688 id = bufb->buf_wsb[sbidx].id;
689 sb_bindex = subbuffer_id_get_index(config, id);
690 rpages = bufb->array[sb_bindex];
691 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
692 && subbuffer_id_is_noref(config, id));
693 ret = lib_ring_buffer_do_copy_from_user_inatomic(rpages->p[index].virt
694 + (offset & ~PAGE_MASK),
695 src, pagecpy) != 0;
696 if (ret > 0) {
697 /* Copy failed. */
698 _lib_ring_buffer_memset(bufb, offset, 0, len, 0);
699 break; /* stop copy */
700 }
701 } while (unlikely(len != pagecpy));
702 }
703 EXPORT_SYMBOL_GPL(_lib_ring_buffer_copy_from_user_inatomic);
704
705 /**
706 * lib_ring_buffer_strcpy_from_user_inatomic - write userspace string data to a ring_buffer buffer.
707 * @bufb : buffer backend
708 * @offset : offset within the buffer
709 * @src : source address
710 * @len : length to write
711 * @pagecpy : page size copied so far
712 * @pad : character to use for padding
713 *
714 * This function deals with userspace pointers, it should never be called
715 * directly without having the src pointer checked with access_ok()
716 * previously.
717 */
718 void _lib_ring_buffer_strcpy_from_user_inatomic(struct lib_ring_buffer_backend *bufb,
719 size_t offset, const char __user *src, size_t len,
720 size_t pagecpy, int pad)
721 {
722 struct channel_backend *chanb = &bufb->chan->backend;
723 const struct lib_ring_buffer_config *config = &chanb->config;
724 size_t sbidx, index;
725 struct lib_ring_buffer_backend_pages *rpages;
726 unsigned long sb_bindex, id;
727 int src_terminated = 0;
728
729 offset += pagecpy;
730 do {
731 len -= pagecpy;
732 if (!src_terminated)
733 src += pagecpy;
734 sbidx = offset >> chanb->subbuf_size_order;
735 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
736
737 /*
738 * Underlying layer should never ask for writes across
739 * subbuffers.
740 */
741 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
742
743 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
744 id = bufb->buf_wsb[sbidx].id;
745 sb_bindex = subbuffer_id_get_index(config, id);
746 rpages = bufb->array[sb_bindex];
747 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
748 && subbuffer_id_is_noref(config, id));
749
750 if (likely(!src_terminated)) {
751 size_t count, to_copy;
752
753 to_copy = pagecpy;
754 if (pagecpy == len)
755 to_copy--; /* Final '\0' */
756 count = lib_ring_buffer_do_strcpy_from_user_inatomic(config,
757 rpages->p[index].virt
758 + (offset & ~PAGE_MASK),
759 src, to_copy);
760 offset += count;
761 /* Padding */
762 if (unlikely(count < to_copy)) {
763 size_t pad_len = to_copy - count;
764
765 /* Next pages will have padding */
766 src_terminated = 1;
767 lib_ring_buffer_do_memset(rpages->p[index].virt
768 + (offset & ~PAGE_MASK),
769 pad, pad_len);
770 offset += pad_len;
771 }
772 } else {
773 size_t pad_len;
774
775 pad_len = pagecpy;
776 if (pagecpy == len)
777 pad_len--; /* Final '\0' */
778 lib_ring_buffer_do_memset(rpages->p[index].virt
779 + (offset & ~PAGE_MASK),
780 pad, pad_len);
781 offset += pad_len;
782 }
783 } while (unlikely(len != pagecpy));
784 /* Ending '\0' */
785 lib_ring_buffer_do_memset(rpages->p[index].virt + (offset & ~PAGE_MASK),
786 '\0', 1);
787 }
788 EXPORT_SYMBOL_GPL(_lib_ring_buffer_strcpy_from_user_inatomic);
789
790 /**
791 * lib_ring_buffer_read - read data from ring_buffer_buffer.
792 * @bufb : buffer backend
793 * @offset : offset within the buffer
794 * @dest : destination address
795 * @len : length to copy to destination
796 *
797 * Should be protected by get_subbuf/put_subbuf.
798 * Returns the length copied.
799 */
800 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
801 void *dest, size_t len)
802 {
803 struct channel_backend *chanb = &bufb->chan->backend;
804 const struct lib_ring_buffer_config *config = &chanb->config;
805 size_t index, pagecpy, orig_len;
806 struct lib_ring_buffer_backend_pages *rpages;
807 unsigned long sb_bindex, id;
808
809 orig_len = len;
810 offset &= chanb->buf_size - 1;
811 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
812 if (unlikely(!len))
813 return 0;
814 for (;;) {
815 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
816 id = bufb->buf_rsb.id;
817 sb_bindex = subbuffer_id_get_index(config, id);
818 rpages = bufb->array[sb_bindex];
819 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
820 && subbuffer_id_is_noref(config, id));
821 memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
822 pagecpy);
823 len -= pagecpy;
824 if (likely(!len))
825 break;
826 dest += pagecpy;
827 offset += pagecpy;
828 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
829 /*
830 * Underlying layer should never ask for reads across
831 * subbuffers.
832 */
833 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
834 }
835 return orig_len;
836 }
837 EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
838
839 /**
840 * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
841 * @bufb : buffer backend
842 * @offset : offset within the buffer
843 * @dest : destination userspace address
844 * @len : length to copy to destination
845 *
846 * Should be protected by get_subbuf/put_subbuf.
847 * access_ok() must have been performed on dest addresses prior to call this
848 * function.
849 * Returns -EFAULT on error, 0 if ok.
850 */
851 int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
852 size_t offset, void __user *dest, size_t len)
853 {
854 struct channel_backend *chanb = &bufb->chan->backend;
855 const struct lib_ring_buffer_config *config = &chanb->config;
856 size_t index;
857 ssize_t pagecpy;
858 struct lib_ring_buffer_backend_pages *rpages;
859 unsigned long sb_bindex, id;
860
861 offset &= chanb->buf_size - 1;
862 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
863 if (unlikely(!len))
864 return 0;
865 for (;;) {
866 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
867 id = bufb->buf_rsb.id;
868 sb_bindex = subbuffer_id_get_index(config, id);
869 rpages = bufb->array[sb_bindex];
870 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
871 && subbuffer_id_is_noref(config, id));
872 if (__copy_to_user(dest,
873 rpages->p[index].virt + (offset & ~PAGE_MASK),
874 pagecpy))
875 return -EFAULT;
876 len -= pagecpy;
877 if (likely(!len))
878 break;
879 dest += pagecpy;
880 offset += pagecpy;
881 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
882 /*
883 * Underlying layer should never ask for reads across
884 * subbuffers.
885 */
886 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
887 }
888 return 0;
889 }
890 EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
891
892 /**
893 * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
894 * @bufb : buffer backend
895 * @offset : offset within the buffer
896 * @dest : destination address
897 * @len : destination's length
898 *
899 * Return string's length, or -EINVAL on error.
900 * Should be protected by get_subbuf/put_subbuf.
901 * Destination length should be at least 1 to hold '\0'.
902 */
903 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
904 void *dest, size_t len)
905 {
906 struct channel_backend *chanb = &bufb->chan->backend;
907 const struct lib_ring_buffer_config *config = &chanb->config;
908 size_t index;
909 ssize_t pagecpy, pagelen, strpagelen, orig_offset;
910 char *str;
911 struct lib_ring_buffer_backend_pages *rpages;
912 unsigned long sb_bindex, id;
913
914 offset &= chanb->buf_size - 1;
915 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
916 orig_offset = offset;
917 if (unlikely(!len))
918 return -EINVAL;
919 for (;;) {
920 id = bufb->buf_rsb.id;
921 sb_bindex = subbuffer_id_get_index(config, id);
922 rpages = bufb->array[sb_bindex];
923 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
924 && subbuffer_id_is_noref(config, id));
925 str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
926 pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
927 strpagelen = strnlen(str, pagelen);
928 if (len) {
929 pagecpy = min_t(size_t, len, strpagelen);
930 if (dest) {
931 memcpy(dest, str, pagecpy);
932 dest += pagecpy;
933 }
934 len -= pagecpy;
935 }
936 offset += strpagelen;
937 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
938 if (strpagelen < pagelen)
939 break;
940 /*
941 * Underlying layer should never ask for reads across
942 * subbuffers.
943 */
944 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
945 }
946 if (dest && len)
947 ((char *)dest)[0] = 0;
948 return offset - orig_offset;
949 }
950 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
951
952 /**
953 * lib_ring_buffer_read_get_pfn - Get a page frame number to read from
954 * @bufb : buffer backend
955 * @offset : offset within the buffer
956 * @virt : pointer to page address (output)
957 *
958 * Should be protected by get_subbuf/put_subbuf.
959 * Returns the pointer to the page frame number unsigned long.
960 */
961 unsigned long *lib_ring_buffer_read_get_pfn(struct lib_ring_buffer_backend *bufb,
962 size_t offset, void ***virt)
963 {
964 size_t index;
965 struct lib_ring_buffer_backend_pages *rpages;
966 struct channel_backend *chanb = &bufb->chan->backend;
967 const struct lib_ring_buffer_config *config = &chanb->config;
968 unsigned long sb_bindex, id;
969
970 offset &= chanb->buf_size - 1;
971 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
972 id = bufb->buf_rsb.id;
973 sb_bindex = subbuffer_id_get_index(config, id);
974 rpages = bufb->array[sb_bindex];
975 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
976 && subbuffer_id_is_noref(config, id));
977 *virt = &rpages->p[index].virt;
978 return &rpages->p[index].pfn;
979 }
980 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_pfn);
981
982 /**
983 * lib_ring_buffer_read_offset_address - get address of a buffer location
984 * @bufb : buffer backend
985 * @offset : offset within the buffer.
986 *
987 * Return the address where a given offset is located (for read).
988 * Should be used to get the current subbuffer header pointer. Given we know
989 * it's never on a page boundary, it's safe to read/write directly
990 * from/to this address, as long as the read/write is never bigger than a
991 * page size.
992 */
993 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
994 size_t offset)
995 {
996 size_t index;
997 struct lib_ring_buffer_backend_pages *rpages;
998 struct channel_backend *chanb = &bufb->chan->backend;
999 const struct lib_ring_buffer_config *config = &chanb->config;
1000 unsigned long sb_bindex, id;
1001
1002 offset &= chanb->buf_size - 1;
1003 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
1004 id = bufb->buf_rsb.id;
1005 sb_bindex = subbuffer_id_get_index(config, id);
1006 rpages = bufb->array[sb_bindex];
1007 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
1008 && subbuffer_id_is_noref(config, id));
1009 return rpages->p[index].virt + (offset & ~PAGE_MASK);
1010 }
1011 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
1012
1013 /**
1014 * lib_ring_buffer_offset_address - get address of a location within the buffer
1015 * @bufb : buffer backend
1016 * @offset : offset within the buffer.
1017 *
1018 * Return the address where a given offset is located.
1019 * Should be used to get the current subbuffer header pointer. Given we know
1020 * it's always at the beginning of a page, it's safe to write directly to this
1021 * address, as long as the write is never bigger than a page size.
1022 */
1023 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
1024 size_t offset)
1025 {
1026 size_t sbidx, index;
1027 struct lib_ring_buffer_backend_pages *rpages;
1028 struct channel_backend *chanb = &bufb->chan->backend;
1029 const struct lib_ring_buffer_config *config = &chanb->config;
1030 unsigned long sb_bindex, id;
1031
1032 offset &= chanb->buf_size - 1;
1033 sbidx = offset >> chanb->subbuf_size_order;
1034 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
1035 id = bufb->buf_wsb[sbidx].id;
1036 sb_bindex = subbuffer_id_get_index(config, id);
1037 rpages = bufb->array[sb_bindex];
1038 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
1039 && subbuffer_id_is_noref(config, id));
1040 return rpages->p[index].virt + (offset & ~PAGE_MASK);
1041 }
1042 EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);
This page took 0.050878 seconds and 3 git commands to generate.