2 * ring_buffer_iterator.c
4 * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * Ring buffer and channel iterators. Get each event of a channel in order. Uses
7 * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
8 * complexity for the "get next event" operation.
11 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
13 * Dual LGPL v2.1/GPL v2 license.
16 #include "../../wrapper/ringbuffer/iterator.h"
17 #include <linux/jiffies.h>
18 #include <linux/delay.h>
19 #include <linux/module.h>
22 * Safety factor taking into account internal kernel interrupt latency.
23 * Assuming 250ms worse-case latency.
25 #define MAX_SYSTEM_LATENCY 250
28 * Maximum delta expected between trace clocks. At most 1 jiffy delta.
30 #define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
33 * lib_ring_buffer_get_next_record - Get the next record in a buffer.
37 * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
38 * buffer is empty and finalized. The buffer must already be opened for reading.
40 ssize_t
lib_ring_buffer_get_next_record(struct channel
*chan
,
41 struct lib_ring_buffer
*buf
)
43 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
44 struct lib_ring_buffer_iter
*iter
= &buf
->iter
;
48 switch (iter
->state
) {
50 ret
= lib_ring_buffer_get_next_subbuf(buf
);
51 if (ret
&& !ACCESS_ONCE(buf
->finalized
)
52 && config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
) {
54 * Use "pull" scheme for global buffers. The reader
55 * itself flushes the buffer to "pull" data not visible
56 * to readers yet. Flush current subbuffer and re-try.
58 * Per-CPU buffers rather use a "push" scheme because
59 * the IPI needed to flush all CPU's buffers is too
60 * costly. In the "push" scheme, the reader waits for
61 * the writer periodic deferrable timer to flush the
62 * buffers (keeping track of a quiescent state
63 * timestamp). Therefore, the writer "pushes" data out
64 * of the buffers rather than letting the reader "pull"
65 * data from the buffer.
67 lib_ring_buffer_switch_slow(buf
, SWITCH_ACTIVE
);
68 ret
= lib_ring_buffer_get_next_subbuf(buf
);
72 iter
->consumed
= buf
->cons_snapshot
;
73 iter
->data_size
= lib_ring_buffer_get_read_data_size(config
, buf
);
74 iter
->read_offset
= iter
->consumed
;
76 iter
->read_offset
+= config
->cb
.subbuffer_header_size();
77 iter
->state
= ITER_TEST_RECORD
;
79 case ITER_TEST_RECORD
:
80 if (iter
->read_offset
- iter
->consumed
>= iter
->data_size
) {
81 iter
->state
= ITER_PUT_SUBBUF
;
83 CHAN_WARN_ON(chan
, !config
->cb
.record_get
);
84 config
->cb
.record_get(config
, chan
, buf
,
89 iter
->read_offset
+= iter
->header_len
;
90 subbuffer_consume_record(config
, &buf
->backend
);
91 iter
->state
= ITER_NEXT_RECORD
;
92 return iter
->payload_len
;
95 case ITER_NEXT_RECORD
:
96 iter
->read_offset
+= iter
->payload_len
;
97 iter
->state
= ITER_TEST_RECORD
;
100 lib_ring_buffer_put_next_subbuf(buf
);
101 iter
->state
= ITER_GET_SUBBUF
;
104 CHAN_WARN_ON(chan
, 1); /* Should not happen */
108 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record
);
110 static int buf_is_higher(void *a
, void *b
)
112 struct lib_ring_buffer
*bufa
= a
;
113 struct lib_ring_buffer
*bufb
= b
;
115 /* Consider lowest timestamps to be at the top of the heap */
116 return (bufa
->iter
.timestamp
< bufb
->iter
.timestamp
);
120 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config
*config
,
121 struct channel
*chan
)
123 struct lttng_ptr_heap
*heap
= &chan
->iter
.heap
;
124 struct lib_ring_buffer
*buf
, *tmp
;
127 list_for_each_entry_safe(buf
, tmp
, &chan
->iter
.empty_head
,
129 len
= lib_ring_buffer_get_next_record(chan
, buf
);
132 * Deal with -EAGAIN and -ENODATA.
133 * len >= 0 means record contains data.
134 * -EBUSY should never happen, because we support only one
139 /* Keep node in empty list */
143 * Buffer is finalized. Don't add to list of empty
144 * buffer, because it has no more data to provide, ever.
146 list_del(&buf
->iter
.empty_node
);
149 CHAN_WARN_ON(chan
, 1);
153 * Insert buffer into the heap, remove from empty buffer
156 CHAN_WARN_ON(chan
, len
< 0);
157 list_del(&buf
->iter
.empty_node
);
158 CHAN_WARN_ON(chan
, lttng_heap_insert(heap
, buf
));
164 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config
*config
,
165 struct channel
*chan
)
168 unsigned long wait_msecs
;
171 * No need to wait if no empty buffers are present.
173 if (list_empty(&chan
->iter
.empty_head
))
176 timestamp_qs
= config
->cb
.ring_buffer_clock_read(chan
);
178 * We need to consider previously empty buffers.
179 * Do a get next buf record on each of them. Add them to
180 * the heap if they have data. If at least one of them
181 * don't have data, we need to wait for
182 * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
183 * buffers have been switched either by the timer or idle entry) and
184 * check them again, adding them if they have data.
186 lib_ring_buffer_get_empty_buf_records(config
, chan
);
189 * No need to wait if no empty buffers are present.
191 if (list_empty(&chan
->iter
.empty_head
))
195 * We need to wait for the buffer switch timer to run. If the
196 * CPU is idle, idle entry performed the switch.
197 * TODO: we could optimize further by skipping the sleep if all
198 * empty buffers belong to idle or offline cpus.
200 wait_msecs
= jiffies_to_msecs(chan
->switch_timer_interval
);
201 wait_msecs
+= MAX_SYSTEM_LATENCY
;
203 lib_ring_buffer_get_empty_buf_records(config
, chan
);
205 * Any buffer still in the empty list here cannot possibly
206 * contain an event with a timestamp prior to "timestamp_qs".
207 * The new quiescent state timestamp is the one we grabbed
208 * before waiting for buffer data. It is therefore safe to
209 * ignore empty buffers up to last_qs timestamp for fusion
212 chan
->iter
.last_qs
= timestamp_qs
;
216 * channel_get_next_record - Get the next record in a channel.
218 * @ret_buf: the buffer in which the event is located (output)
220 * Returns the size of new current event, -EAGAIN if all buffers are empty,
221 * -ENODATA if all buffers are empty and finalized. The channel must already be
222 * opened for reading.
225 ssize_t
channel_get_next_record(struct channel
*chan
,
226 struct lib_ring_buffer
**ret_buf
)
228 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
229 struct lib_ring_buffer
*buf
;
230 struct lttng_ptr_heap
*heap
;
233 if (config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
) {
234 *ret_buf
= channel_get_ring_buffer(config
, chan
, 0);
235 return lib_ring_buffer_get_next_record(chan
, *ret_buf
);
238 heap
= &chan
->iter
.heap
;
241 * get next record for topmost buffer.
243 buf
= lttng_heap_maximum(heap
);
245 len
= lib_ring_buffer_get_next_record(chan
, buf
);
247 * Deal with -EAGAIN and -ENODATA.
248 * len >= 0 means record contains data.
252 buf
->iter
.timestamp
= 0;
253 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
254 /* Remove topmost buffer from the heap */
255 CHAN_WARN_ON(chan
, lttng_heap_remove(heap
) != buf
);
259 * Buffer is finalized. Remove buffer from heap and
260 * don't add to list of empty buffer, because it has no
261 * more data to provide, ever.
263 CHAN_WARN_ON(chan
, lttng_heap_remove(heap
) != buf
);
266 CHAN_WARN_ON(chan
, 1);
270 * Reinsert buffer into the heap. Note that heap can be
271 * partially empty, so we need to use
272 * lttng_heap_replace_max().
274 CHAN_WARN_ON(chan
, len
< 0);
275 CHAN_WARN_ON(chan
, lttng_heap_replace_max(heap
, buf
) != buf
);
280 buf
= lttng_heap_maximum(heap
);
281 if (!buf
|| buf
->iter
.timestamp
> chan
->iter
.last_qs
) {
283 * Deal with buffers previously showing no data.
284 * Add buffers containing data to the heap, update
287 lib_ring_buffer_wait_for_qs(config
, chan
);
290 *ret_buf
= buf
= lttng_heap_maximum(heap
);
293 * If this warning triggers, you probably need to check your
294 * system interrupt latency. Typical causes: too many printk()
295 * output going to a serial console with interrupts off.
296 * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
297 * Observed on SMP KVM setups with trace_clock().
299 if (chan
->iter
.last_timestamp
300 > (buf
->iter
.timestamp
+ MAX_CLOCK_DELTA
)) {
301 printk(KERN_WARNING
"ring_buffer: timestamps going "
302 "backward. Last time %llu ns, cpu %d, "
303 "current time %llu ns, cpu %d, "
305 chan
->iter
.last_timestamp
, chan
->iter
.last_cpu
,
306 buf
->iter
.timestamp
, buf
->backend
.cpu
,
307 chan
->iter
.last_timestamp
- buf
->iter
.timestamp
);
308 CHAN_WARN_ON(chan
, 1);
310 chan
->iter
.last_timestamp
= buf
->iter
.timestamp
;
311 chan
->iter
.last_cpu
= buf
->backend
.cpu
;
312 return buf
->iter
.payload_len
;
315 if (list_empty(&chan
->iter
.empty_head
))
316 return -ENODATA
; /* All buffers finalized */
318 return -EAGAIN
; /* Temporarily empty */
321 EXPORT_SYMBOL_GPL(channel_get_next_record
);
324 void lib_ring_buffer_iterator_init(struct channel
*chan
, struct lib_ring_buffer
*buf
)
326 if (buf
->iter
.allocated
)
329 buf
->iter
.allocated
= 1;
330 if (chan
->iter
.read_open
&& !buf
->iter
.read_open
) {
331 CHAN_WARN_ON(chan
, lib_ring_buffer_open_read(buf
) != 0);
332 buf
->iter
.read_open
= 1;
335 /* Add to list of buffers without any current record */
336 if (chan
->backend
.config
.alloc
== RING_BUFFER_ALLOC_PER_CPU
)
337 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
340 #ifdef CONFIG_HOTPLUG_CPU
342 int __cpuinit
channel_iterator_cpu_hotplug(struct notifier_block
*nb
,
343 unsigned long action
,
346 unsigned int cpu
= (unsigned long)hcpu
;
347 struct channel
*chan
= container_of(nb
, struct channel
,
349 struct lib_ring_buffer
*buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
350 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
352 if (!chan
->hp_iter_enable
)
355 CHAN_WARN_ON(chan
, config
->alloc
== RING_BUFFER_ALLOC_GLOBAL
);
358 case CPU_DOWN_FAILED
:
359 case CPU_DOWN_FAILED_FROZEN
:
361 case CPU_ONLINE_FROZEN
:
362 lib_ring_buffer_iterator_init(chan
, buf
);
370 int channel_iterator_init(struct channel
*chan
)
372 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
373 struct lib_ring_buffer
*buf
;
375 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
378 INIT_LIST_HEAD(&chan
->iter
.empty_head
);
379 ret
= lttng_heap_init(&chan
->iter
.heap
,
381 GFP_KERNEL
, buf_is_higher
);
385 * In case of non-hotplug cpu, if the ring-buffer is allocated
386 * in early initcall, it will not be notified of secondary cpus.
387 * In that off case, we need to allocate for all possible cpus.
389 #ifdef CONFIG_HOTPLUG_CPU
390 chan
->hp_iter_notifier
.notifier_call
=
391 channel_iterator_cpu_hotplug
;
392 chan
->hp_iter_notifier
.priority
= 10;
393 register_cpu_notifier(&chan
->hp_iter_notifier
);
395 for_each_online_cpu(cpu
) {
396 buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
397 lib_ring_buffer_iterator_init(chan
, buf
);
399 chan
->hp_iter_enable
= 1;
402 for_each_possible_cpu(cpu
) {
403 buf
= per_cpu_ptr(chan
->backend
.buf
, cpu
);
404 lib_ring_buffer_iterator_init(chan
, buf
);
408 buf
= channel_get_ring_buffer(config
, chan
, 0);
409 lib_ring_buffer_iterator_init(chan
, buf
);
414 void channel_iterator_unregister_notifiers(struct channel
*chan
)
416 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
418 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
419 chan
->hp_iter_enable
= 0;
420 unregister_cpu_notifier(&chan
->hp_iter_notifier
);
424 void channel_iterator_free(struct channel
*chan
)
426 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
428 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
)
429 lttng_heap_free(&chan
->iter
.heap
);
432 int lib_ring_buffer_iterator_open(struct lib_ring_buffer
*buf
)
434 struct channel
*chan
= buf
->backend
.chan
;
435 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
436 CHAN_WARN_ON(chan
, config
->output
!= RING_BUFFER_ITERATOR
);
437 return lib_ring_buffer_open_read(buf
);
439 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open
);
442 * Note: Iterators must not be mixed with other types of outputs, because an
443 * iterator can leave the buffer in "GET" state, which is not consistent with
444 * other types of output (mmap, splice, raw data read).
446 void lib_ring_buffer_iterator_release(struct lib_ring_buffer
*buf
)
448 lib_ring_buffer_release_read(buf
);
450 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release
);
452 int channel_iterator_open(struct channel
*chan
)
454 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
455 struct lib_ring_buffer
*buf
;
458 CHAN_WARN_ON(chan
, config
->output
!= RING_BUFFER_ITERATOR
);
460 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
462 /* Allow CPU hotplug to keep track of opened reader */
463 chan
->iter
.read_open
= 1;
464 for_each_channel_cpu(cpu
, chan
) {
465 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
466 ret
= lib_ring_buffer_iterator_open(buf
);
469 buf
->iter
.read_open
= 1;
473 buf
= channel_get_ring_buffer(config
, chan
, 0);
474 ret
= lib_ring_buffer_iterator_open(buf
);
478 /* Error should always happen on CPU 0, hence no close is required. */
479 CHAN_WARN_ON(chan
, cpu
!= 0);
483 EXPORT_SYMBOL_GPL(channel_iterator_open
);
485 void channel_iterator_release(struct channel
*chan
)
487 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
488 struct lib_ring_buffer
*buf
;
491 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
) {
493 for_each_channel_cpu(cpu
, chan
) {
494 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
495 if (buf
->iter
.read_open
) {
496 lib_ring_buffer_iterator_release(buf
);
497 buf
->iter
.read_open
= 0;
500 chan
->iter
.read_open
= 0;
503 buf
= channel_get_ring_buffer(config
, chan
, 0);
504 lib_ring_buffer_iterator_release(buf
);
507 EXPORT_SYMBOL_GPL(channel_iterator_release
);
509 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer
*buf
)
511 struct channel
*chan
= buf
->backend
.chan
;
513 if (buf
->iter
.state
!= ITER_GET_SUBBUF
)
514 lib_ring_buffer_put_next_subbuf(buf
);
515 buf
->iter
.state
= ITER_GET_SUBBUF
;
516 /* Remove from heap (if present). */
517 if (lttng_heap_cherrypick(&chan
->iter
.heap
, buf
))
518 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
519 buf
->iter
.timestamp
= 0;
520 buf
->iter
.header_len
= 0;
521 buf
->iter
.payload_len
= 0;
522 buf
->iter
.consumed
= 0;
523 buf
->iter
.read_offset
= 0;
524 buf
->iter
.data_size
= 0;
525 /* Don't reset allocated and read_open */
528 void channel_iterator_reset(struct channel
*chan
)
530 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
531 struct lib_ring_buffer
*buf
;
534 /* Empty heap, put into empty_head */
535 while ((buf
= lttng_heap_remove(&chan
->iter
.heap
)) != NULL
)
536 list_add(&buf
->iter
.empty_node
, &chan
->iter
.empty_head
);
538 for_each_channel_cpu(cpu
, chan
) {
539 buf
= channel_get_ring_buffer(config
, chan
, cpu
);
540 lib_ring_buffer_iterator_reset(buf
);
542 /* Don't reset read_open */
543 chan
->iter
.last_qs
= 0;
544 chan
->iter
.last_timestamp
= 0;
545 chan
->iter
.last_cpu
= 0;
546 chan
->iter
.len_left
= 0;
550 * Ring buffer payload extraction read() implementation.
553 ssize_t
channel_ring_buffer_file_read(struct file
*filp
,
554 char __user
*user_buf
,
557 struct channel
*chan
,
558 struct lib_ring_buffer
*buf
,
561 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
562 size_t read_count
= 0, read_offset
;
566 if (!access_ok(VERIFY_WRITE
, user_buf
, count
))
569 /* Finish copy of previous record */
571 if (read_count
< count
) {
572 len
= chan
->iter
.len_left
;
574 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
576 buf
= lttng_heap_maximum(&chan
->iter
.heap
);
577 CHAN_WARN_ON(chan
, !buf
);
582 while (read_count
< count
) {
583 size_t copy_len
, space_left
;
586 len
= channel_get_next_record(chan
, &buf
);
588 len
= lib_ring_buffer_get_next_record(chan
, buf
);
592 * Check if buffer is finalized (end of file).
594 if (len
== -ENODATA
) {
595 /* A 0 read_count will tell about end of file */
598 if (filp
->f_flags
& O_NONBLOCK
) {
600 read_count
= -EAGAIN
;
606 * No data available at the moment, return what
613 * Wait for returned len to be >= 0 or -ENODATA.
616 error
= wait_event_interruptible(
618 ((len
= channel_get_next_record(chan
,
619 &buf
)), len
!= -EAGAIN
));
621 error
= wait_event_interruptible(
623 ((len
= lib_ring_buffer_get_next_record(
624 chan
, buf
)), len
!= -EAGAIN
));
625 CHAN_WARN_ON(chan
, len
== -EBUSY
);
630 CHAN_WARN_ON(chan
, len
< 0 && len
!= -ENODATA
);
634 read_offset
= buf
->iter
.read_offset
;
636 space_left
= count
- read_count
;
637 if (len
<= space_left
) {
639 chan
->iter
.len_left
= 0;
642 copy_len
= space_left
;
643 chan
->iter
.len_left
= len
- copy_len
;
644 *ppos
= read_offset
+ copy_len
;
646 if (__lib_ring_buffer_copy_to_user(&buf
->backend
, read_offset
,
647 &user_buf
[read_count
],
650 * Leave the len_left and ppos values at their current
651 * state, as we currently have a valid event to read.
655 read_count
+= copy_len
;
661 chan
->iter
.len_left
= 0;
666 * lib_ring_buffer_file_read - Read buffer record payload.
667 * @filp: file structure pointer.
668 * @buffer: user buffer to read data into.
669 * @count: number of bytes to read.
670 * @ppos: file read position.
672 * Returns a negative value on error, or the number of bytes read on success.
673 * ppos is used to save the position _within the current record_ between calls
677 ssize_t
lib_ring_buffer_file_read(struct file
*filp
,
678 char __user
*user_buf
,
682 struct inode
*inode
= filp
->f_dentry
->d_inode
;
683 struct lib_ring_buffer
*buf
= inode
->i_private
;
684 struct channel
*chan
= buf
->backend
.chan
;
686 return channel_ring_buffer_file_read(filp
, user_buf
, count
, ppos
,
691 * channel_file_read - Read channel record payload.
692 * @filp: file structure pointer.
693 * @buffer: user buffer to read data into.
694 * @count: number of bytes to read.
695 * @ppos: file read position.
697 * Returns a negative value on error, or the number of bytes read on success.
698 * ppos is used to save the position _within the current record_ between calls
702 ssize_t
channel_file_read(struct file
*filp
,
703 char __user
*user_buf
,
707 struct inode
*inode
= filp
->f_dentry
->d_inode
;
708 struct channel
*chan
= inode
->i_private
;
709 const struct lib_ring_buffer_config
*config
= &chan
->backend
.config
;
711 if (config
->alloc
== RING_BUFFER_ALLOC_PER_CPU
)
712 return channel_ring_buffer_file_read(filp
, user_buf
, count
,
713 ppos
, chan
, NULL
, 1);
715 struct lib_ring_buffer
*buf
=
716 channel_get_ring_buffer(config
, chan
, 0);
717 return channel_ring_buffer_file_read(filp
, user_buf
, count
,
723 int lib_ring_buffer_file_open(struct inode
*inode
, struct file
*file
)
725 struct lib_ring_buffer
*buf
= inode
->i_private
;
728 ret
= lib_ring_buffer_iterator_open(buf
);
732 file
->private_data
= buf
;
733 ret
= nonseekable_open(inode
, file
);
739 lib_ring_buffer_iterator_release(buf
);
744 int lib_ring_buffer_file_release(struct inode
*inode
, struct file
*file
)
746 struct lib_ring_buffer
*buf
= inode
->i_private
;
748 lib_ring_buffer_iterator_release(buf
);
753 int channel_file_open(struct inode
*inode
, struct file
*file
)
755 struct channel
*chan
= inode
->i_private
;
758 ret
= channel_iterator_open(chan
);
762 file
->private_data
= chan
;
763 ret
= nonseekable_open(inode
, file
);
769 channel_iterator_release(chan
);
774 int channel_file_release(struct inode
*inode
, struct file
*file
)
776 struct channel
*chan
= inode
->i_private
;
778 channel_iterator_release(chan
);
782 const struct file_operations channel_payload_file_operations
= {
783 .owner
= THIS_MODULE
,
784 .open
= channel_file_open
,
785 .release
= channel_file_release
,
786 .read
= channel_file_read
,
787 .llseek
= lib_ring_buffer_no_llseek
,
789 EXPORT_SYMBOL_GPL(channel_payload_file_operations
);
791 const struct file_operations lib_ring_buffer_payload_file_operations
= {
792 .owner
= THIS_MODULE
,
793 .open
= lib_ring_buffer_file_open
,
794 .release
= lib_ring_buffer_file_release
,
795 .read
= lib_ring_buffer_file_read
,
796 .llseek
= lib_ring_buffer_no_llseek
,
798 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations
);