clock: output clock description in metadata
[lttng-modules.git] / lib / ringbuffer / ring_buffer_splice.c
1 /*
2 * ring_buffer_splice.c
3 *
4 * Copyright (C) 2002-2005 - Tom Zanussi <zanussi@us.ibm.com>, IBM Corp
5 * Copyright (C) 1999-2005 - Karim Yaghmour <karim@opersys.com>
6 * Copyright (C) 2008-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * Re-using content from kernel/relay.c.
9 *
10 * This file is released under the GPL v2.
11 */
12
13 #include <linux/module.h>
14 #include <linux/fs.h>
15
16 #include "../../wrapper/splice.h"
17 #include "../../wrapper/ringbuffer/backend.h"
18 #include "../../wrapper/ringbuffer/frontend.h"
19 #include "../../wrapper/ringbuffer/vfs.h"
20
21 #if 0
22 #define printk_dbg(fmt, args...) printk(fmt, args)
23 #else
24 #define printk_dbg(fmt, args...)
25 #endif
26
27 loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin)
28 {
29 return -ESPIPE;
30 }
31
32 /*
33 * Release pages from the buffer so splice pipe_to_file can move them.
34 * Called after the pipe has been populated with buffer pages.
35 */
36 static void lib_ring_buffer_pipe_buf_release(struct pipe_inode_info *pipe,
37 struct pipe_buffer *pbuf)
38 {
39 __free_page(pbuf->page);
40 }
41
42 static const struct pipe_buf_operations ring_buffer_pipe_buf_ops = {
43 .can_merge = 0,
44 .map = generic_pipe_buf_map,
45 .unmap = generic_pipe_buf_unmap,
46 .confirm = generic_pipe_buf_confirm,
47 .release = lib_ring_buffer_pipe_buf_release,
48 .steal = generic_pipe_buf_steal,
49 .get = generic_pipe_buf_get,
50 };
51
52 /*
53 * Page release operation after splice pipe_to_file ends.
54 */
55 static void lib_ring_buffer_page_release(struct splice_pipe_desc *spd,
56 unsigned int i)
57 {
58 __free_page(spd->pages[i]);
59 }
60
61 /*
62 * subbuf_splice_actor - splice up to one subbuf's worth of data
63 */
64 static int subbuf_splice_actor(struct file *in,
65 loff_t *ppos,
66 struct pipe_inode_info *pipe,
67 size_t len,
68 unsigned int flags)
69 {
70 struct lib_ring_buffer *buf = in->private_data;
71 struct channel *chan = buf->backend.chan;
72 const struct lib_ring_buffer_config *config = &chan->backend.config;
73 unsigned int poff, subbuf_pages, nr_pages;
74 struct page *pages[PIPE_DEF_BUFFERS];
75 struct partial_page partial[PIPE_DEF_BUFFERS];
76 struct splice_pipe_desc spd = {
77 .pages = pages,
78 .nr_pages = 0,
79 .partial = partial,
80 .flags = flags,
81 .ops = &ring_buffer_pipe_buf_ops,
82 .spd_release = lib_ring_buffer_page_release,
83 };
84 unsigned long consumed_old, roffset;
85 unsigned long bytes_avail;
86
87 /*
88 * Check that a GET_SUBBUF ioctl has been done before.
89 */
90 WARN_ON(atomic_long_read(&buf->active_readers) != 1);
91 consumed_old = lib_ring_buffer_get_consumed(config, buf);
92 consumed_old += *ppos;
93
94 /*
95 * Adjust read len, if longer than what is available.
96 * Max read size is 1 subbuffer due to get_subbuf/put_subbuf for
97 * protection.
98 */
99 bytes_avail = chan->backend.subbuf_size;
100 WARN_ON(bytes_avail > chan->backend.buf_size);
101 len = min_t(size_t, len, bytes_avail);
102 subbuf_pages = bytes_avail >> PAGE_SHIFT;
103 nr_pages = min_t(unsigned int, subbuf_pages, PIPE_DEF_BUFFERS);
104 roffset = consumed_old & PAGE_MASK;
105 poff = consumed_old & ~PAGE_MASK;
106 printk_dbg(KERN_DEBUG "SPLICE actor len %zu pos %zd write_pos %ld\n",
107 len, (ssize_t)*ppos, lib_ring_buffer_get_offset(config, buf));
108
109 for (; spd.nr_pages < nr_pages; spd.nr_pages++) {
110 unsigned int this_len;
111 struct page **page, *new_page;
112 void **virt;
113
114 if (!len)
115 break;
116 printk_dbg(KERN_DEBUG "SPLICE actor loop len %zu roffset %ld\n",
117 len, roffset);
118
119 /*
120 * We have to replace the page we are moving into the splice
121 * pipe.
122 */
123 new_page = alloc_pages_node(cpu_to_node(max(buf->backend.cpu,
124 0)),
125 GFP_KERNEL | __GFP_ZERO, 0);
126 if (!new_page)
127 break;
128
129 this_len = PAGE_SIZE - poff;
130 page = lib_ring_buffer_read_get_page(&buf->backend, roffset, &virt);
131 spd.pages[spd.nr_pages] = *page;
132 *page = new_page;
133 *virt = page_address(new_page);
134 spd.partial[spd.nr_pages].offset = poff;
135 spd.partial[spd.nr_pages].len = this_len;
136
137 poff = 0;
138 roffset += PAGE_SIZE;
139 len -= this_len;
140 }
141
142 if (!spd.nr_pages)
143 return 0;
144
145 return wrapper_splice_to_pipe(pipe, &spd);
146 }
147
148 ssize_t lib_ring_buffer_splice_read(struct file *in, loff_t *ppos,
149 struct pipe_inode_info *pipe, size_t len,
150 unsigned int flags)
151 {
152 struct lib_ring_buffer *buf = in->private_data;
153 struct channel *chan = buf->backend.chan;
154 const struct lib_ring_buffer_config *config = &chan->backend.config;
155 ssize_t spliced;
156 int ret;
157
158 if (config->output != RING_BUFFER_SPLICE)
159 return -EINVAL;
160
161 /*
162 * We require ppos and length to be page-aligned for performance reasons
163 * (no page copy). Size is known using the ioctl
164 * RING_BUFFER_GET_PADDED_SUBBUF_SIZE, which is page-size padded.
165 * We fail when the ppos or len passed is not page-sized, because splice
166 * is not allowed to copy more than the length passed as parameter (so
167 * the ABI does not let us silently copy more than requested to include
168 * padding).
169 */
170 if (*ppos != PAGE_ALIGN(*ppos) || len != PAGE_ALIGN(len))
171 return -EINVAL;
172
173 ret = 0;
174 spliced = 0;
175
176 printk_dbg(KERN_DEBUG "SPLICE read len %zu pos %zd\n", len,
177 (ssize_t)*ppos);
178 while (len && !spliced) {
179 ret = subbuf_splice_actor(in, ppos, pipe, len, flags);
180 printk_dbg(KERN_DEBUG "SPLICE read loop ret %d\n", ret);
181 if (ret < 0)
182 break;
183 else if (!ret) {
184 if (flags & SPLICE_F_NONBLOCK)
185 ret = -EAGAIN;
186 break;
187 }
188
189 *ppos += ret;
190 if (ret > len)
191 len = 0;
192 else
193 len -= ret;
194 spliced += ret;
195 }
196
197 if (spliced)
198 return spliced;
199
200 return ret;
201 }
202 EXPORT_SYMBOL_GPL(lib_ring_buffer_splice_read);
This page took 0.034697 seconds and 4 git commands to generate.