1 /* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
3 * ring_buffer_iterator.c
5 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
6 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
7 * complexity for the "get next event" operation.
9 * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
12 #include <ringbuffer/iterator.h>
13 #include <wrapper/cpu.h>
14 #include <wrapper/file.h>
15 #include <wrapper/uaccess.h>
16 #include <linux/jiffies.h>
17 #include <linux/delay.h>
18 #include <linux/module.h>
21 * Safety factor taking into account internal kernel interrupt latency.
22 * Assuming 250ms worse-case latency.
24 #define MAX_SYSTEM_LATENCY 250
27 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
29 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
32 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
36 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
37 * buffer is empty and finalized. The buffer must already be opened for reading.
39 ssize_t
lib_ring_buffer_get_next_record(struct lttng_kernel_ring_buffer_channel
*chan
,
40 struct lttng_kernel_ring_buffer
*buf
)
42 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
43 struct lttng_kernel_ring_buffer_iter
*iter
= &buf
->iter
;
47 switch (iter
->state
) {
49 ret
= lib_ring_buffer_get_next_subbuf(buf
);
50 if (ret
&& !LTTNG_READ_ONCE(buf
->finalized
)
51 && config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
) {
53 * Use "pull" scheme for global buffers. The reader
54 * itself flushes the buffer to "pull" data not visible
55 * to readers yet. Flush current subbuffer and re-try.
57 * Per-CPU buffers rather use a "push" scheme because
58 * the IPI needed to flush all CPU's buffers is too
59 * costly. In the "push" scheme, the reader waits for
60 * the writer periodic timer to flush the
61 * buffers (keeping track of a quiescent state
62 * timestamp). Therefore, the writer "pushes" data out
63 * of the buffers rather than letting the reader "pull"
64 * data from the buffer.
66 lib_ring_buffer_switch_slow(buf
, SWITCH_ACTIVE
);
67 ret
= lib_ring_buffer_get_next_subbuf(buf
);
71 iter
->consumed
= buf
->cons_snapshot
;
72 iter
->data_size
= lib_ring_buffer_get_read_data_size(config
, buf
);
73 iter
->read_offset
= iter
->consumed
;
75 iter
->read_offset
+= config
->cb
.subbuffer_header_size();
76 iter
->state
= ITER_TEST_RECORD
;
78 case ITER_TEST_RECORD
:
79 if (iter
->read_offset
- iter
->consumed
>= iter
->data_size
) {
80 iter
->state
= ITER_PUT_SUBBUF
;
82 CHAN_WARN_ON(chan
, !config
->cb
.record_get
);
83 config
->cb
.record_get(config
, chan
, buf
,
88 iter
->read_offset
+= iter
->header_len
;
89 subbuffer_consume_record(config
, &buf
->backend
);
90 iter
->state
= ITER_NEXT_RECORD
;
91 return iter
->payload_len
;
94 case ITER_NEXT_RECORD
:
95 iter
->read_offset
+= iter
->payload_len
;
96 iter
->state
= ITER_TEST_RECORD
;
99 lib_ring_buffer_put_next_subbuf(buf
);
100 iter
->state
= ITER_GET_SUBBUF
;
103 CHAN_WARN_ON(chan
, 1); /* Should not happen */
107 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record
);
109 void lib_ring_buffer_put_current_record(struct lttng_kernel_ring_buffer
*buf
)
111 struct lttng_kernel_ring_buffer_iter
*iter
;
116 if (iter
->state
!= ITER_NEXT_RECORD
)
118 iter
->read_offset
+= iter
->payload_len
;
119 iter
->state
= ITER_TEST_RECORD
;
120 if (iter
->read_offset
- iter
->consumed
>= iter
->data_size
) {
121 lib_ring_buffer_put_next_subbuf(buf
);
122 iter
->state
= ITER_GET_SUBBUF
;
125 EXPORT_SYMBOL_GPL(lib_ring_buffer_put_current_record
);
127 static int buf_is_higher(void *a
, void *b
)
129 struct lttng_kernel_ring_buffer
*bufa
= a
;
130 struct lttng_kernel_ring_buffer
*bufb
= b
;
132 /* Consider lowest timestamps to be at the top of the heap */
133 return (bufa
->iter
.timestamp
< bufb
->iter
.timestamp
);
137 void lib_ring_buffer_get_empty_buf_records(const struct lttng_kernel_ring_buffer_config
*config
,
138 struct lttng_kernel_ring_buffer_channel
*chan
)
140 struct lttng_ptr_heap
*heap
= &chan
->iter
.heap
;
141 struct lttng_kernel_ring_buffer
*buf
, *tmp
;
144 list_for_each_entry_safe(buf
, tmp
, &chan
->iter
.empty_head
,
146 len
= lib_ring_buffer_get_next_record(chan
, buf
);
149 * Deal with -EAGAIN and -ENODATA.
150 * len >= 0 means record contains data.
151 * -EBUSY should never happen, because we support only one
156 /* Keep node in empty list */
160 * Buffer is finalized. Don't add to list of empty
161 * buffer, because it has no more data to provide, ever.
163 list_del(&buf
->iter
.empty_node
);
166 CHAN_WARN_ON(chan
, 1);
170 * Insert buffer into the heap, remove from empty buffer
173 CHAN_WARN_ON(chan
, len
< 0);
174 list_del(&buf
->iter
.empty_node
);
175 CHAN_WARN_ON(chan
, lttng_heap_insert(heap
, buf
));
181 void lib_ring_buffer_wait_for_qs(const struct lttng_kernel_ring_buffer_config
*config
,
182 struct lttng_kernel_ring_buffer_channel
*chan
)
185 unsigned long wait_msecs
;
188 * No need to wait if no empty buffers are present.
190 if (list_empty(&chan
->iter
.empty_head
))
193 timestamp_qs
= config
->cb
.ring_buffer_clock_read(chan
);
195 * We need to consider previously empty buffers.
196 * Do a get next buf record on each of them. Add them to
197 * the heap if they have data. If at least one of them
198 * don't have data, we need to wait for
199 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
200 * buffers have been switched either by the timer or idle entry) and
201 * check them again, adding them if they have data.
203 lib_ring_buffer_get_empty_buf_records(config
, chan
);
206 * No need to wait if no empty buffers are present.
208 if (list_empty(&chan
->iter
.empty_head
))
212 * We need to wait for the buffer switch timer to run. If the
213 * CPU is idle, idle entry performed the switch.
214 * TODO: we could optimize further by skipping the sleep if all
215 * empty buffers belong to idle or offline cpus.
217 wait_msecs
= jiffies_to_msecs(chan
->switch_timer_interval
);
218 wait_msecs
+= MAX_SYSTEM_LATENCY
;
220 lib_ring_buffer_get_empty_buf_records(config
, chan
);
222 * Any buffer still in the empty list here cannot possibly
223 * contain an event with a timestamp prior to "timestamp_qs".
224 * The new quiescent state timestamp is the one we grabbed
225 * before waiting for buffer data. It is therefore safe to
226 * ignore empty buffers up to last_qs timestamp for fusion
229 chan
->iter
.last_qs
= timestamp_qs
;
233 * channel_get_next_record - Get the next record in a channel.
235 * @ret_buf: the buffer in which the event is located (output)
237 * Returns the size of new current event, -EAGAIN if all buffers are empty,
238 * -ENODATA if all buffers are empty and finalized. The channel must already be
239 * opened for reading.
242 ssize_t
channel_get_next_record(struct lttng_kernel_ring_buffer_channel
*chan
,
243 struct lttng_kernel_ring_buffer
**ret_buf
)
245 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
246 struct lttng_kernel_ring_buffer
*buf
;
247 struct lttng_ptr_heap
*heap
;
250 if (config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
) {
251 *ret_buf
= channel_get_ring_buffer(config
, chan
, 0);
252 return lib_ring_buffer_get_next_record(chan
, *ret_buf
);
255 heap
= &chan
->iter
.heap
;
258 * get next record for topmost buffer.
260 buf
= lttng_heap_maximum(heap
);
262 len
= lib_ring_buffer_get_next_record(chan
, buf
);
264 * Deal with -EAGAIN and -ENODATA.
265 * len >= 0 means record contains data.
269 buf
->iter
.timestamp
= 0;
270 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
271 /* Remove topmost buffer from the heap */
272 CHAN_WARN_ON(chan
, lttng_heap_remove(heap
) != buf
);
276 * Buffer is finalized. Remove buffer from heap and
277 * don't add to list of empty buffer, because it has no
278 * more data to provide, ever.
280 CHAN_WARN_ON(chan
, lttng_heap_remove(heap
) != buf
);
283 CHAN_WARN_ON(chan
, 1);
287 * Reinsert buffer into the heap. Note that heap can be
288 * partially empty, so we need to use
289 * lttng_heap_replace_max().
291 CHAN_WARN_ON(chan
, len
< 0);
292 CHAN_WARN_ON(chan
, lttng_heap_replace_max(heap
, buf
) != buf
);
297 buf
= lttng_heap_maximum(heap
);
298 if (!buf
|| buf
->iter
.timestamp
> chan
->iter
.last_qs
) {
300 * Deal with buffers previously showing no data.
301 * Add buffers containing data to the heap, update
304 lib_ring_buffer_wait_for_qs(config
, chan
);
307 *ret_buf
= buf
= lttng_heap_maximum(heap
);
310 * If this warning triggers, you probably need to check your
311 * system interrupt latency. Typical causes: too many printk()
312 * output going to a serial console with interrupts off.
313 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
314 * Observed on SMP KVM setups with trace_clock().
316 if (chan
->iter
.last_timestamp
317 > (buf
->iter
.timestamp
+ MAX_CLOCK_DELTA
)) {
318 printk(KERN_WARNING
"LTTng: ring_buffer: timestamps going "
319 "backward. Last time %llu ns, cpu %d, "
320 "current time %llu ns, cpu %d, "
322 chan
->iter
.last_timestamp
, chan
->iter
.last_cpu
,
323 buf
->iter
.timestamp
, buf
->backend
.cpu
,
324 chan
->iter
.last_timestamp
- buf
->iter
.timestamp
);
325 CHAN_WARN_ON(chan
, 1);
327 chan
->iter
.last_timestamp
= buf
->iter
.timestamp
;
328 chan
->iter
.last_cpu
= buf
->backend
.cpu
;
329 return buf
->iter
.payload_len
;
332 if (list_empty(&chan
->iter
.empty_head
))
333 return -ENODATA
; /* All buffers finalized */
335 return -EAGAIN
; /* Temporarily empty */
338 EXPORT_SYMBOL_GPL(channel_get_next_record
);
341 void lib_ring_buffer_iterator_init(struct lttng_kernel_ring_buffer_channel
*chan
, struct lttng_kernel_ring_buffer
*buf
)
343 if (buf
->iter
.allocated
)
346 buf
->iter
.allocated
= 1;
347 if (chan
->iter
.read_open
&& !buf
->iter
.read_open
) {
348 CHAN_WARN_ON(chan
, lib_ring_buffer_open_read(buf
) != 0);
349 buf
->iter
.read_open
= 1;
352 /* Add to list of buffers without any current record */
353 if (chan
->backend
.config
.alloc
== RING_BUFFER_ALLOC_PER_CPU
)
354 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
357 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
359 int lttng_cpuhp_rb_iter_online(unsigned int cpu
,
360 struct lttng_cpuhp_node
*node
)
362 struct lttng_kernel_ring_buffer_channel
*chan
= container_of(node
, struct lttng_kernel_ring_buffer_channel
,
364 struct lttng_kernel_ring_buffer
*buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
365 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
367 CHAN_WARN_ON(chan
, config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
);
369 lib_ring_buffer_iterator_init(chan
, buf
);
372 EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_iter_online
);
374 #else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
376 #ifdef CONFIG_HOTPLUG_CPU
378 int channel_iterator_cpu_hotplug(struct notifier_block
*nb
,
379 unsigned long action
,
382 unsigned int cpu
= (unsigned long)hcpu
;
383 struct lttng_kernel_ring_buffer_channel
*chan
= container_of(nb
, struct lttng_kernel_ring_buffer_channel
,
385 struct lttng_kernel_ring_buffer
*buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
386 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
388 if (!chan
->hp_iter_enable
)
391 CHAN_WARN_ON(chan
, config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
);
394 case CPU_DOWN_FAILED
:
395 case CPU_DOWN_FAILED_FROZEN
:
397 case CPU_ONLINE_FROZEN
:
398 lib_ring_buffer_iterator_init(chan
, buf
);
406 #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
408 int channel_iterator_init(struct lttng_kernel_ring_buffer_channel
*chan
)
410 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
411 struct lttng_kernel_ring_buffer
*buf
;
413 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
416 INIT_LIST_HEAD(&chan
->iter
.empty_head
);
417 ret
= lttng_heap_init(&chan
->iter
.heap
,
419 GFP_KERNEL
, buf_is_higher
);
423 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
424 chan
->cpuhp_iter_online
.component
= LTTNG_RING_BUFFER_ITER
;
425 ret
= cpuhp_state_add_instance(lttng_rb_hp_online
,
426 &chan
->cpuhp_iter_online
.node
);
429 #else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
434 * In case of non-hotplug cpu, if the ring-buffer is allocated
435 * in early initcall, it will not be notified of secondary cpus.
436 * In that off case, we need to allocate for all possible cpus.
438 #ifdef CONFIG_HOTPLUG_CPU
439 chan
->hp_iter_notifier
.notifier_call
=
440 channel_iterator_cpu_hotplug
;
441 chan
->hp_iter_notifier
.priority
= 10;
442 register_cpu_notifier(&chan
->hp_iter_notifier
);
444 lttng_cpus_read_lock();
445 for_each_online_cpu(cpu
) {
446 buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
447 lib_ring_buffer_iterator_init(chan
, buf
);
449 chan
->hp_iter_enable
= 1;
450 lttng_cpus_read_unlock();
452 for_each_possible_cpu(cpu
) {
453 buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
454 lib_ring_buffer_iterator_init(chan
, buf
);
458 #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
460 buf
= channel_get_ring_buffer(config
, chan
, 0);
461 lib_ring_buffer_iterator_init(chan
, buf
);
466 void channel_iterator_unregister_notifiers(struct lttng_kernel_ring_buffer_channel
*chan
)
468 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
470 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
471 #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0))
475 ret
= cpuhp_state_remove_instance(lttng_rb_hp_online
,
476 &chan
->cpuhp_iter_online
.node
);
479 #else /* #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
480 chan
->hp_iter_enable
= 0;
481 unregister_cpu_notifier(&chan
->hp_iter_notifier
);
482 #endif /* #else #if (LTTNG_LINUX_VERSION_CODE >= LTTNG_KERNEL_VERSION(4,10,0)) */
486 void channel_iterator_free(struct lttng_kernel_ring_buffer_channel
*chan
)
488 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
490 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
)
491 lttng_heap_free(&chan
->iter
.heap
);
494 int lib_ring_buffer_iterator_open(struct lttng_kernel_ring_buffer
*buf
)
496 struct lttng_kernel_ring_buffer_channel
*chan
= buf
->backend
.chan
;
497 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
498 CHAN_WARN_ON(chan
, config
->output
!= RING_BUFFER_ITERATOR
);
499 return lib_ring_buffer_open_read(buf
);
501 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open
);
504 * Note: Iterators must not be mixed with other types of outputs, because an
505 * iterator can leave the buffer in "GET" state, which is not consistent with
506 * other types of output (mmap, splice, raw data read).
508 void lib_ring_buffer_iterator_release(struct lttng_kernel_ring_buffer
*buf
)
510 lib_ring_buffer_release_read(buf
);
512 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release
);
514 int channel_iterator_open(struct lttng_kernel_ring_buffer_channel
*chan
)
516 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
517 struct lttng_kernel_ring_buffer
*buf
;
520 CHAN_WARN_ON(chan
, config
->output
!= RING_BUFFER_ITERATOR
);
522 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
523 lttng_cpus_read_lock();
524 /* Allow CPU hotplug to keep track of opened reader */
525 chan
->iter
.read_open
= 1;
526 for_each_channel_cpu(cpu
, chan
) {
527 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
528 ret
= lib_ring_buffer_iterator_open(buf
);
531 buf
->iter
.read_open
= 1;
533 lttng_cpus_read_unlock();
535 buf
= channel_get_ring_buffer(config
, chan
, 0);
536 ret
= lib_ring_buffer_iterator_open(buf
);
540 /* Error should always happen on CPU 0, hence no close is required. */
541 CHAN_WARN_ON(chan
, cpu
!= 0);
542 lttng_cpus_read_unlock();
545 EXPORT_SYMBOL_GPL(channel_iterator_open
);
547 void channel_iterator_release(struct lttng_kernel_ring_buffer_channel
*chan
)
549 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
550 struct lttng_kernel_ring_buffer
*buf
;
553 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
554 lttng_cpus_read_lock();
555 for_each_channel_cpu(cpu
, chan
) {
556 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
557 if (buf
->iter
.read_open
) {
558 lib_ring_buffer_iterator_release(buf
);
559 buf
->iter
.read_open
= 0;
562 chan
->iter
.read_open
= 0;
563 lttng_cpus_read_unlock();
565 buf
= channel_get_ring_buffer(config
, chan
, 0);
566 lib_ring_buffer_iterator_release(buf
);
569 EXPORT_SYMBOL_GPL(channel_iterator_release
);
571 void lib_ring_buffer_iterator_reset(struct lttng_kernel_ring_buffer
*buf
)
573 struct lttng_kernel_ring_buffer_channel
*chan
= buf
->backend
.chan
;
575 if (buf
->iter
.state
!= ITER_GET_SUBBUF
)
576 lib_ring_buffer_put_next_subbuf(buf
);
577 buf
->iter
.state
= ITER_GET_SUBBUF
;
578 /* Remove from heap (if present). */
579 if (lttng_heap_cherrypick(&chan
->iter
.heap
, buf
))
580 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
581 buf
->iter
.timestamp
= 0;
582 buf
->iter
.header_len
= 0;
583 buf
->iter
.payload_len
= 0;
584 buf
->iter
.consumed
= 0;
585 buf
->iter
.read_offset
= 0;
586 buf
->iter
.data_size
= 0;
587 /* Don't reset allocated and read_open */
590 void channel_iterator_reset(struct lttng_kernel_ring_buffer_channel
*chan
)
592 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
593 struct lttng_kernel_ring_buffer
*buf
;
596 /* Empty heap, put into empty_head */
597 while ((buf
= lttng_heap_remove(&chan
->iter
.heap
)) != NULL
)
598 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
600 for_each_channel_cpu(cpu
, chan
) {
601 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
602 lib_ring_buffer_iterator_reset(buf
);
604 /* Don't reset read_open */
605 chan
->iter
.last_qs
= 0;
606 chan
->iter
.last_timestamp
= 0;
607 chan
->iter
.last_cpu
= 0;
608 chan
->iter
.len_left
= 0;
612 * Ring buffer payload extraction read() implementation.
615 ssize_t
channel_ring_buffer_file_read(struct file
*filp
,
616 char __user
*user_buf
,
619 struct lttng_kernel_ring_buffer_channel
*chan
,
620 struct lttng_kernel_ring_buffer
*buf
,
623 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
624 size_t read_count
= 0, read_offset
;
628 if (!lttng_access_ok(VERIFY_WRITE
, user_buf
, count
))
631 /* Finish copy of previous record */
633 if (read_count
< count
) {
634 len
= chan
->iter
.len_left
;
636 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
638 buf
= lttng_heap_maximum(&chan
->iter
.heap
);
639 CHAN_WARN_ON(chan
, !buf
);
644 while (read_count
< count
) {
645 size_t copy_len
, space_left
;
648 len
= channel_get_next_record(chan
, &buf
);
650 len
= lib_ring_buffer_get_next_record(chan
, buf
);
654 * Check if buffer is finalized (end of file).
656 if (len
== -ENODATA
) {
657 /* A 0 read_count will tell about end of file */
660 if (filp
->f_flags
& O_NONBLOCK
) {
662 read_count
= -EAGAIN
;
668 * No data available at the moment, return what
675 * Wait for returned len to be >= 0 or -ENODATA.
678 error
= wait_event_interruptible(
680 ((len
= channel_get_next_record(chan
,
681 &buf
)), len
!= -EAGAIN
));
683 error
= wait_event_interruptible(
685 ((len
= lib_ring_buffer_get_next_record(
686 chan
, buf
)), len
!= -EAGAIN
));
687 CHAN_WARN_ON(chan
, len
== -EBUSY
);
692 CHAN_WARN_ON(chan
, len
< 0 && len
!= -ENODATA
);
696 read_offset
= buf
->iter
.read_offset
;
698 space_left
= count
- read_count
;
699 if (len
<= space_left
) {
701 chan
->iter
.len_left
= 0;
704 copy_len
= space_left
;
705 chan
->iter
.len_left
= len
- copy_len
;
706 *ppos
= read_offset
+ copy_len
;
708 if (__lib_ring_buffer_copy_to_user(&buf
->backend
, read_offset
,
709 &user_buf
[read_count
],
712 * Leave the len_left and ppos values at their current
713 * state, as we currently have a valid event to read.
717 read_count
+= copy_len
;
723 chan
->iter
.len_left
= 0;
725 lib_ring_buffer_put_current_record(buf
);
730 * lib_ring_buffer_file_read - Read buffer record payload.
731 * @filp: file structure pointer.
732 * @buffer: user buffer to read data into.
733 * @count: number of bytes to read.
734 * @ppos: file read position.
736 * Returns a negative value on error, or the number of bytes read on success.
737 * ppos is used to save the position _within the current record_ between calls
741 ssize_t
lib_ring_buffer_file_read(struct file
*filp
,
742 char __user
*user_buf
,
746 struct inode
*inode
= filp
->lttng_f_dentry
->d_inode
;
747 struct lttng_kernel_ring_buffer
*buf
= inode
->i_private
;
748 struct lttng_kernel_ring_buffer_channel
*chan
= buf
->backend
.chan
;
750 return channel_ring_buffer_file_read(filp
, user_buf
, count
, ppos
,
755 * channel_file_read - Read channel record payload.
756 * @filp: file structure pointer.
757 * @buffer: user buffer to read data into.
758 * @count: number of bytes to read.
759 * @ppos: file read position.
761 * Returns a negative value on error, or the number of bytes read on success.
762 * ppos is used to save the position _within the current record_ between calls
766 ssize_t
channel_file_read(struct file
*filp
,
767 char __user
*user_buf
,
771 struct inode
*inode
= filp
->lttng_f_dentry
->d_inode
;
772 struct lttng_kernel_ring_buffer_channel
*chan
= inode
->i_private
;
773 const struct lttng_kernel_ring_buffer_config
*config
= &chan
->backend
.config
;
775 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
)
776 return channel_ring_buffer_file_read(filp
, user_buf
, count
,
777 ppos
, chan
, NULL
, 1);
779 struct lttng_kernel_ring_buffer
*buf
=
780 channel_get_ring_buffer(config
, chan
, 0);
781 return channel_ring_buffer_file_read(filp
, user_buf
, count
,
787 int lib_ring_buffer_file_open(struct inode
*inode
, struct file
*file
)
789 struct lttng_kernel_ring_buffer
*buf
= inode
->i_private
;
792 ret
= lib_ring_buffer_iterator_open(buf
);
796 file
->private_data
= buf
;
797 ret
= nonseekable_open(inode
, file
);
803 lib_ring_buffer_iterator_release(buf
);
808 int lib_ring_buffer_file_release(struct inode
*inode
, struct file
*file
)
810 struct lttng_kernel_ring_buffer
*buf
= inode
->i_private
;
812 lib_ring_buffer_iterator_release(buf
);
817 int channel_file_open(struct inode
*inode
, struct file
*file
)
819 struct lttng_kernel_ring_buffer_channel
*chan
= inode
->i_private
;
822 ret
= channel_iterator_open(chan
);
826 file
->private_data
= chan
;
827 ret
= nonseekable_open(inode
, file
);
833 channel_iterator_release(chan
);
838 int channel_file_release(struct inode
*inode
, struct file
*file
)
840 struct lttng_kernel_ring_buffer_channel
*chan
= inode
->i_private
;
842 channel_iterator_release(chan
);
846 const struct file_operations channel_payload_file_operations
= {
847 .owner
= THIS_MODULE
,
848 .open
= channel_file_open
,
849 .release
= channel_file_release
,
850 .read
= channel_file_read
,
851 .llseek
= vfs_lib_ring_buffer_no_llseek
,
853 EXPORT_SYMBOL_GPL(channel_payload_file_operations
);
855 const struct file_operations lib_ring_buffer_payload_file_operations
= {
856 .owner
= THIS_MODULE
,
857 .open
= lib_ring_buffer_file_open
,
858 .release
= lib_ring_buffer_file_release
,
859 .read
= lib_ring_buffer_file_read
,
860 .llseek
= vfs_lib_ring_buffer_no_llseek
,
862 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations
);