ac2008c735cc6fafb78a31a9397c86c275ef3f5b
[lttng-ust.git] / libtracing / relay.c
1 /*
2 * ltt/ltt-relay.c
3 *
4 * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
5 *
6 * LTTng lockless buffer space management (reader/writer).
7 *
8 * Author:
9 * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
10 *
11 * Inspired from LTT :
12 * Karim Yaghmour (karim@opersys.com)
13 * Tom Zanussi (zanussi@us.ibm.com)
14 * Bob Wisniewski (bob@watson.ibm.com)
15 * And from K42 :
16 * Bob Wisniewski (bob@watson.ibm.com)
17 *
18 * Changelog:
19 * 08/10/08, Cleanup.
20 * 19/10/05, Complete lockless mechanism.
21 * 27/05/05, Modular redesign and rewrite.
22 *
23 * Userspace reader semantic :
24 * while (poll fd != POLLHUP) {
25 * - ioctl RELAY_GET_SUBBUF_SIZE
26 * while (1) {
27 * - ioctl GET_SUBBUF
28 * - splice 1 subbuffer worth of data to a pipe
29 * - splice the data from pipe to disk/network
30 * - ioctl PUT_SUBBUF, check error value
31 * if err val < 0, previous subbuffer was corrupted.
32 * }
33 * }
34 */
35
36 #include <linux/time.h>
37 #include <linux/ltt-tracer.h>
38 #include <linux/ltt-relay.h>
39 #include <linux/module.h>
40 #include <linux/string.h>
41 #include <linux/slab.h>
42 #include <linux/init.h>
43 #include <linux/rcupdate.h>
44 #include <linux/sched.h>
45 #include <linux/bitops.h>
46 #include <linux/fs.h>
47 #include <linux/smp_lock.h>
48 #include <linux/debugfs.h>
49 #include <linux/stat.h>
50 #include <linux/cpu.h>
51 #include <linux/pipe_fs_i.h>
52 #include <linux/splice.h>
53 #include <asm/atomic.h>
54 #include <asm/local.h>
55
56 #if 0
57 #define printk_dbg(fmt, args...) printk(fmt, args)
58 #else
59 #define printk_dbg(fmt, args...)
60 #endif
61
62 /* LTTng lockless logging buffer info */
63 struct ltt_channel_buf_struct {
64 /* First 32 bytes cache-hot cacheline */
65 local_t offset; /* Current offset in the buffer */
66 local_t *commit_count; /* Commit count per sub-buffer */
67 atomic_long_t consumed; /*
68 * Current offset in the buffer
69 * standard atomic access (shared)
70 */
71 unsigned long last_tsc; /*
72 * Last timestamp written in the buffer.
73 */
74 /* End of first 32 bytes cacheline */
75 atomic_long_t active_readers; /*
76 * Active readers count
77 * standard atomic access (shared)
78 */
79 local_t events_lost;
80 local_t corrupted_subbuffers;
81 spinlock_t full_lock; /*
82 * buffer full condition spinlock, only
83 * for userspace tracing blocking mode
84 * synchronization with reader.
85 */
86 wait_queue_head_t write_wait; /*
87 * Wait queue for blocking user space
88 * writers
89 */
90 atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */
91 } ____cacheline_aligned;
92
93 /*
94 * Last TSC comparison functions. Check if the current TSC overflows
95 * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
96 * atomically.
97 */
98
99 #if (BITS_PER_LONG == 32)
100 static inline void save_last_tsc(struct ltt_channel_buf_struct *ltt_buf,
101 u64 tsc)
102 {
103 ltt_buf->last_tsc = (unsigned long)(tsc >> LTT_TSC_BITS);
104 }
105
106 static inline int last_tsc_overflow(struct ltt_channel_buf_struct *ltt_buf,
107 u64 tsc)
108 {
109 unsigned long tsc_shifted = (unsigned long)(tsc >> LTT_TSC_BITS);
110
111 if (unlikely((tsc_shifted - ltt_buf->last_tsc)))
112 return 1;
113 else
114 return 0;
115 }
116 #else
117 static inline void save_last_tsc(struct ltt_channel_buf_struct *ltt_buf,
118 u64 tsc)
119 {
120 ltt_buf->last_tsc = (unsigned long)tsc;
121 }
122
123 static inline int last_tsc_overflow(struct ltt_channel_buf_struct *ltt_buf,
124 u64 tsc)
125 {
126 if (unlikely((tsc - ltt_buf->last_tsc) >> LTT_TSC_BITS))
127 return 1;
128 else
129 return 0;
130 }
131 #endif
132
133 //ust// static struct file_operations ltt_file_operations;
134
135 /*
136 * A switch is done during tracing or as a final flush after tracing (so it
137 * won't write in the new sub-buffer).
138 */
139 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
140
141 static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
142 struct ltt_channel_struct *ltt_chan,
143 struct rchan_buf *buf,
144 unsigned int cpu,
145 unsigned int n_subbufs);
146
147 static void ltt_relay_destroy_buffer(struct ltt_channel_struct *ltt_chan,
148 unsigned int cpu);
149
150 static void ltt_force_switch(struct rchan_buf *buf,
151 enum force_switch_mode mode);
152
153 /*
154 * Trace callbacks
155 */
156 static void ltt_buffer_begin_callback(struct rchan_buf *buf,
157 u64 tsc, unsigned int subbuf_idx)
158 {
159 struct ltt_channel_struct *channel =
160 (struct ltt_channel_struct *)buf->chan->private_data;
161 struct ltt_subbuffer_header *header =
162 (struct ltt_subbuffer_header *)
163 ltt_relay_offset_address(buf,
164 subbuf_idx * buf->chan->subbuf_size);
165
166 header->cycle_count_begin = tsc;
167 header->lost_size = 0xFFFFFFFF; /* for debugging */
168 header->buf_size = buf->chan->subbuf_size;
169 ltt_write_trace_header(channel->trace, header);
170 }
171
172 /*
173 * offset is assumed to never be 0 here : never deliver a completely empty
174 * subbuffer. The lost size is between 0 and subbuf_size-1.
175 */
176 static notrace void ltt_buffer_end_callback(struct rchan_buf *buf,
177 u64 tsc, unsigned int offset, unsigned int subbuf_idx)
178 {
179 struct ltt_channel_struct *channel =
180 (struct ltt_channel_struct *)buf->chan->private_data;
181 struct ltt_channel_buf_struct *ltt_buf =
182 percpu_ptr(channel->buf, buf->cpu);
183 struct ltt_subbuffer_header *header =
184 (struct ltt_subbuffer_header *)
185 ltt_relay_offset_address(buf,
186 subbuf_idx * buf->chan->subbuf_size);
187
188 header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset),
189 buf->chan);
190 header->cycle_count_end = tsc;
191 header->events_lost = local_read(&ltt_buf->events_lost);
192 header->subbuf_corrupt = local_read(&ltt_buf->corrupted_subbuffers);
193 }
194
195 static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
196 void *subbuf)
197 {
198 struct ltt_channel_struct *channel =
199 (struct ltt_channel_struct *)buf->chan->private_data;
200 struct ltt_channel_buf_struct *ltt_buf =
201 percpu_ptr(channel->buf, buf->cpu);
202
203 atomic_set(&ltt_buf->wakeup_readers, 1);
204 }
205
206 static struct dentry *ltt_create_buf_file_callback(const char *filename,
207 struct dentry *parent, int mode,
208 struct rchan_buf *buf)
209 {
210 struct ltt_channel_struct *ltt_chan;
211 int err;
212 //ust// struct dentry *dentry;
213
214 ltt_chan = buf->chan->private_data;
215 err = ltt_relay_create_buffer(ltt_chan->trace, ltt_chan,
216 buf, buf->cpu,
217 buf->chan->n_subbufs);
218 if (err)
219 return ERR_PTR(err);
220
221 //ust// dentry = debugfs_create_file(filename, mode, parent, buf,
222 //ust// &ltt_file_operations);
223 //ust// if (!dentry)
224 //ust// goto error;
225 //ust// return dentry;
226 //ust//error:
227 ltt_relay_destroy_buffer(ltt_chan, buf->cpu);
228 return NULL;
229 }
230
231 static int ltt_remove_buf_file_callback(struct dentry *dentry)
232 {
233 struct rchan_buf *buf = dentry->d_inode->i_private;
234 struct ltt_channel_struct *ltt_chan = buf->chan->private_data;
235
236 //ust// debugfs_remove(dentry);
237 ltt_relay_destroy_buffer(ltt_chan, buf->cpu);
238
239 return 0;
240 }
241
242 /*
243 * Wake writers :
244 *
245 * This must be done after the trace is removed from the RCU list so that there
246 * are no stalled writers.
247 */
248 static void ltt_relay_wake_writers(struct ltt_channel_buf_struct *ltt_buf)
249 {
250
251 if (waitqueue_active(&ltt_buf->write_wait))
252 wake_up_interruptible(&ltt_buf->write_wait);
253 }
254
255 /*
256 * This function should not be called from NMI interrupt context
257 */
258 static notrace void ltt_buf_unfull(struct rchan_buf *buf,
259 unsigned int subbuf_idx,
260 long offset)
261 {
262 struct ltt_channel_struct *ltt_channel =
263 (struct ltt_channel_struct *)buf->chan->private_data;
264 struct ltt_channel_buf_struct *ltt_buf =
265 percpu_ptr(ltt_channel->buf, buf->cpu);
266
267 ltt_relay_wake_writers(ltt_buf);
268 }
269
270 /**
271 * ltt_open - open file op for ltt files
272 * @inode: opened inode
273 * @file: opened file
274 *
275 * Open implementation. Makes sure only one open instance of a buffer is
276 * done at a given moment.
277 */
278 static int ltt_open(struct inode *inode, struct file *file)
279 {
280 struct rchan_buf *buf = inode->i_private;
281 struct ltt_channel_struct *ltt_channel =
282 (struct ltt_channel_struct *)buf->chan->private_data;
283 struct ltt_channel_buf_struct *ltt_buf =
284 percpu_ptr(ltt_channel->buf, buf->cpu);
285
286 if (!atomic_long_add_unless(&ltt_buf->active_readers, 1, 1))
287 return -EBUSY;
288 return ltt_relay_file_operations.open(inode, file);
289 }
290
291 /**
292 * ltt_release - release file op for ltt files
293 * @inode: opened inode
294 * @file: opened file
295 *
296 * Release implementation.
297 */
298 static int ltt_release(struct inode *inode, struct file *file)
299 {
300 struct rchan_buf *buf = inode->i_private;
301 struct ltt_channel_struct *ltt_channel =
302 (struct ltt_channel_struct *)buf->chan->private_data;
303 struct ltt_channel_buf_struct *ltt_buf =
304 percpu_ptr(ltt_channel->buf, buf->cpu);
305 int ret;
306
307 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
308 atomic_long_dec(&ltt_buf->active_readers);
309 ret = ltt_relay_file_operations.release(inode, file);
310 WARN_ON(ret);
311 return ret;
312 }
313
314 /**
315 * ltt_poll - file op for ltt files
316 * @filp: the file
317 * @wait: poll table
318 *
319 * Poll implementation.
320 */
321 static unsigned int ltt_poll(struct file *filp, poll_table *wait)
322 {
323 unsigned int mask = 0;
324 struct inode *inode = filp->f_dentry->d_inode;
325 struct rchan_buf *buf = inode->i_private;
326 struct ltt_channel_struct *ltt_channel =
327 (struct ltt_channel_struct *)buf->chan->private_data;
328 struct ltt_channel_buf_struct *ltt_buf =
329 percpu_ptr(ltt_channel->buf, buf->cpu);
330
331 if (filp->f_mode & FMODE_READ) {
332 poll_wait_set_exclusive(wait);
333 poll_wait(filp, &buf->read_wait, wait);
334
335 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
336 if (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
337 buf->chan)
338 - SUBBUF_TRUNC(atomic_long_read(&ltt_buf->consumed),
339 buf->chan)
340 == 0) {
341 if (buf->finalized)
342 return POLLHUP;
343 else
344 return 0;
345 } else {
346 struct rchan *rchan =
347 ltt_channel->trans_channel_data;
348 if (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
349 buf->chan)
350 - SUBBUF_TRUNC(atomic_long_read(
351 &ltt_buf->consumed),
352 buf->chan)
353 >= rchan->alloc_size)
354 return POLLPRI | POLLRDBAND;
355 else
356 return POLLIN | POLLRDNORM;
357 }
358 }
359 return mask;
360 }
361
362 static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
363 {
364 long consumed_old, consumed_idx, commit_count, write_offset;
365 consumed_old = atomic_long_read(&ltt_buf->consumed);
366 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
367 commit_count = local_read(&ltt_buf->commit_count[consumed_idx]);
368 /*
369 * Make sure we read the commit count before reading the buffer
370 * data and the write offset. Correct consumed offset ordering
371 * wrt commit count is insured by the use of cmpxchg to update
372 * the consumed offset.
373 */
374 smp_rmb();
375 write_offset = local_read(&ltt_buf->offset);
376 /*
377 * Check that the subbuffer we are trying to consume has been
378 * already fully committed.
379 */
380 if (((commit_count - buf->chan->subbuf_size)
381 & ltt_channel->commit_count_mask)
382 - (BUFFER_TRUNC(consumed_old, buf->chan)
383 >> ltt_channel->n_subbufs_order)
384 != 0) {
385 return -EAGAIN;
386 }
387 /*
388 * Check that we are not about to read the same subbuffer in
389 * which the writer head is.
390 */
391 if ((SUBBUF_TRUNC(write_offset, buf->chan)
392 - SUBBUF_TRUNC(consumed_old, buf->chan))
393 == 0) {
394 return -EAGAIN;
395 }
396
397 *pconsumed_old = consumed_old;
398 return 0;
399 }
400
401 static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old)
402 {
403 long consumed_new, consumed_old;
404
405 consumed_old = atomic_long_read(&ltt_buf->consumed);
406 consumed_old = consumed_old & (~0xFFFFFFFFL);
407 consumed_old = consumed_old | uconsumed_old;
408 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
409
410 spin_lock(&ltt_buf->full_lock);
411 if (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
412 consumed_new)
413 != consumed_old) {
414 /* We have been pushed by the writer : the last
415 * buffer read _is_ corrupted! It can also
416 * happen if this is a buffer we never got. */
417 spin_unlock(&ltt_buf->full_lock);
418 return -EIO;
419 } else {
420 /* tell the client that buffer is now unfull */
421 int index;
422 long data;
423 index = SUBBUF_INDEX(consumed_old, buf->chan);
424 data = BUFFER_OFFSET(consumed_old, buf->chan);
425 ltt_buf_unfull(buf, index, data);
426 spin_unlock(&ltt_buf->full_lock);
427 }
428 return 0;
429 }
430
431 /**
432 * ltt_ioctl - control on the debugfs file
433 *
434 * @inode: the inode
435 * @filp: the file
436 * @cmd: the command
437 * @arg: command arg
438 *
439 * This ioctl implements three commands necessary for a minimal
440 * producer/consumer implementation :
441 * RELAY_GET_SUBBUF
442 * Get the next sub buffer that can be read. It never blocks.
443 * RELAY_PUT_SUBBUF
444 * Release the currently read sub-buffer. Parameter is the last
445 * put subbuffer (returned by GET_SUBBUF).
446 * RELAY_GET_N_BUBBUFS
447 * returns the number of sub buffers in the per cpu channel.
448 * RELAY_GET_SUBBUF_SIZE
449 * returns the size of the sub buffers.
450 */
451 //ust// static int ltt_ioctl(struct inode *inode, struct file *filp,
452 //ust// unsigned int cmd, unsigned long arg)
453 //ust// {
454 //ust// struct rchan_buf *buf = inode->i_private;
455 //ust// struct ltt_channel_struct *ltt_channel =
456 //ust// (struct ltt_channel_struct *)buf->chan->private_data;
457 //ust// struct ltt_channel_buf_struct *ltt_buf =
458 //ust// percpu_ptr(ltt_channel->buf, buf->cpu);
459 //ust// u32 __user *argp = (u32 __user *)arg;
460 //ust//
461 //ust// WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
462 //ust// switch (cmd) {
463 //ust// case RELAY_GET_SUBBUF:
464 //ust// {
465 //ust// int ret;
466 //ust// ret = ltt_do_get_subbuf(buf, ltt_buf, &consumed_old);
467 //ust// if(ret < 0)
468 //ust// return ret;
469 //ust// return put_user((u32)consumed_old, argp);
470 //ust// }
471 //ust// case RELAY_PUT_SUBBUF:
472 //ust// {
473 //ust// int ret;
474 //ust// u32 uconsumed_old;
475 //ust// ret = get_user(uconsumed_old, argp);
476 //ust// if (ret)
477 //ust// return ret; /* will return -EFAULT */
478 //ust// return ltt_do_put_subbuf(buf, ltt_buf, uconsumed_old);
479 //ust// }
480 //ust// case RELAY_GET_N_SUBBUFS:
481 //ust// return put_user((u32)buf->chan->n_subbufs, argp);
482 //ust// break;
483 //ust// case RELAY_GET_SUBBUF_SIZE:
484 //ust// return put_user((u32)buf->chan->subbuf_size, argp);
485 //ust// break;
486 //ust// default:
487 //ust// return -ENOIOCTLCMD;
488 //ust// }
489 //ust// return 0;
490 //ust// }
491
492 //ust// #ifdef CONFIG_COMPAT
493 //ust// static long ltt_compat_ioctl(struct file *file, unsigned int cmd,
494 //ust// unsigned long arg)
495 //ust// {
496 //ust// long ret = -ENOIOCTLCMD;
497 //ust//
498 //ust// lock_kernel();
499 //ust// ret = ltt_ioctl(file->f_dentry->d_inode, file, cmd, arg);
500 //ust// unlock_kernel();
501 //ust//
502 //ust// return ret;
503 //ust// }
504 //ust// #endif
505
506 //ust// static void ltt_relay_pipe_buf_release(struct pipe_inode_info *pipe,
507 //ust// struct pipe_buffer *pbuf)
508 //ust// {
509 //ust// }
510 //ust//
511 //ust// static struct pipe_buf_operations ltt_relay_pipe_buf_ops = {
512 //ust// .can_merge = 0,
513 //ust// .map = generic_pipe_buf_map,
514 //ust// .unmap = generic_pipe_buf_unmap,
515 //ust// .confirm = generic_pipe_buf_confirm,
516 //ust// .release = ltt_relay_pipe_buf_release,
517 //ust// .steal = generic_pipe_buf_steal,
518 //ust// .get = generic_pipe_buf_get,
519 //ust// };
520
521 //ust// static void ltt_relay_page_release(struct splice_pipe_desc *spd, unsigned int i)
522 //ust// {
523 //ust// }
524
525 /*
526 * subbuf_splice_actor - splice up to one subbuf's worth of data
527 */
528 static int subbuf_splice_actor(struct file *in,
529 loff_t *ppos,
530 struct pipe_inode_info *pipe,
531 size_t len,
532 unsigned int flags)
533 {
534 struct rchan_buf *buf = in->private_data;
535 struct ltt_channel_struct *ltt_channel =
536 (struct ltt_channel_struct *)buf->chan->private_data;
537 struct ltt_channel_buf_struct *ltt_buf =
538 percpu_ptr(ltt_channel->buf, buf->cpu);
539 unsigned int poff, subbuf_pages, nr_pages;
540 struct page *pages[PIPE_BUFFERS];
541 struct partial_page partial[PIPE_BUFFERS];
542 struct splice_pipe_desc spd = {
543 .pages = pages,
544 .nr_pages = 0,
545 .partial = partial,
546 .flags = flags,
547 .ops = &ltt_relay_pipe_buf_ops,
548 .spd_release = ltt_relay_page_release,
549 };
550 long consumed_old, consumed_idx, roffset;
551 unsigned long bytes_avail;
552
553 /*
554 * Check that a GET_SUBBUF ioctl has been done before.
555 */
556 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
557 consumed_old = atomic_long_read(&ltt_buf->consumed);
558 consumed_old += *ppos;
559 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
560
561 /*
562 * Adjust read len, if longer than what is available
563 */
564 bytes_avail = SUBBUF_TRUNC(local_read(&ltt_buf->offset), buf->chan)
565 - consumed_old;
566 WARN_ON(bytes_avail > buf->chan->alloc_size);
567 len = min_t(size_t, len, bytes_avail);
568 subbuf_pages = bytes_avail >> PAGE_SHIFT;
569 nr_pages = min_t(unsigned int, subbuf_pages, PIPE_BUFFERS);
570 roffset = consumed_old & PAGE_MASK;
571 poff = consumed_old & ~PAGE_MASK;
572 printk_dbg(KERN_DEBUG "SPLICE actor len %zu pos %zd write_pos %ld\n",
573 len, (ssize_t)*ppos, local_read(&ltt_buf->offset));
574
575 for (; spd.nr_pages < nr_pages; spd.nr_pages++) {
576 unsigned int this_len;
577 struct buf_page *page;
578
579 if (!len)
580 break;
581 printk_dbg(KERN_DEBUG "SPLICE actor loop len %zu roffset %ld\n",
582 len, roffset);
583
584 this_len = PAGE_SIZE - poff;
585 page = ltt_relay_read_get_page(buf, roffset);
586 spd.pages[spd.nr_pages] = page->page;
587 spd.partial[spd.nr_pages].offset = poff;
588 spd.partial[spd.nr_pages].len = this_len;
589
590 poff = 0;
591 roffset += PAGE_SIZE;
592 len -= this_len;
593 }
594
595 if (!spd.nr_pages)
596 return 0;
597
598 return splice_to_pipe(pipe, &spd);
599 }
600
601 static ssize_t ltt_relay_file_splice_read(struct file *in,
602 loff_t *ppos,
603 struct pipe_inode_info *pipe,
604 size_t len,
605 unsigned int flags)
606 {
607 ssize_t spliced;
608 int ret;
609
610 ret = 0;
611 spliced = 0;
612
613 printk_dbg(KERN_DEBUG "SPLICE read len %zu pos %zd\n",
614 len, (ssize_t)*ppos);
615 while (len && !spliced) {
616 ret = subbuf_splice_actor(in, ppos, pipe, len, flags);
617 printk_dbg(KERN_DEBUG "SPLICE read loop ret %d\n", ret);
618 if (ret < 0)
619 break;
620 else if (!ret) {
621 if (flags & SPLICE_F_NONBLOCK)
622 ret = -EAGAIN;
623 break;
624 }
625
626 *ppos += ret;
627 if (ret > len)
628 len = 0;
629 else
630 len -= ret;
631 spliced += ret;
632 }
633
634 if (spliced)
635 return spliced;
636
637 return ret;
638 }
639
640 static void ltt_relay_print_subbuffer_errors(
641 struct ltt_channel_struct *ltt_chan,
642 long cons_off, unsigned int cpu)
643 {
644 struct rchan *rchan = ltt_chan->trans_channel_data;
645 struct ltt_channel_buf_struct *ltt_buf =
646 percpu_ptr(ltt_chan->buf, cpu);
647 long cons_idx, commit_count, write_offset;
648
649 cons_idx = SUBBUF_INDEX(cons_off, rchan);
650 commit_count = local_read(&ltt_buf->commit_count[cons_idx]);
651 /*
652 * No need to order commit_count and write_offset reads because we
653 * execute after trace is stopped when there are no readers left.
654 */
655 write_offset = local_read(&ltt_buf->offset);
656 printk(KERN_WARNING
657 "LTT : unread channel %s offset is %ld "
658 "and cons_off : %ld (cpu %u)\n",
659 ltt_chan->channel_name, write_offset, cons_off, cpu);
660 /* Check each sub-buffer for non filled commit count */
661 if (((commit_count - rchan->subbuf_size) & ltt_chan->commit_count_mask)
662 - (BUFFER_TRUNC(cons_off, rchan) >> ltt_chan->n_subbufs_order)
663 != 0)
664 printk(KERN_ALERT
665 "LTT : %s : subbuffer %lu has non filled "
666 "commit count %lu.\n",
667 ltt_chan->channel_name, cons_idx, commit_count);
668 printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %zd\n",
669 ltt_chan->channel_name, commit_count,
670 rchan->subbuf_size);
671 }
672
673 static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
674 struct ltt_channel_struct *ltt_chan, int cpu)
675 {
676 struct rchan *rchan = ltt_chan->trans_channel_data;
677 struct ltt_channel_buf_struct *ltt_buf =
678 percpu_ptr(ltt_chan->buf, cpu);
679 long cons_off;
680
681 for (cons_off = atomic_long_read(&ltt_buf->consumed);
682 (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
683 rchan)
684 - cons_off) > 0;
685 cons_off = SUBBUF_ALIGN(cons_off, rchan))
686 ltt_relay_print_subbuffer_errors(ltt_chan, cons_off, cpu);
687 }
688
689 static void ltt_relay_print_buffer_errors(struct ltt_channel_struct *ltt_chan,
690 unsigned int cpu)
691 {
692 struct ltt_trace_struct *trace = ltt_chan->trace;
693 struct ltt_channel_buf_struct *ltt_buf =
694 percpu_ptr(ltt_chan->buf, cpu);
695
696 if (local_read(&ltt_buf->events_lost))
697 printk(KERN_ALERT
698 "LTT : %s : %ld events lost "
699 "in %s channel (cpu %u).\n",
700 ltt_chan->channel_name,
701 local_read(&ltt_buf->events_lost),
702 ltt_chan->channel_name, cpu);
703 if (local_read(&ltt_buf->corrupted_subbuffers))
704 printk(KERN_ALERT
705 "LTT : %s : %ld corrupted subbuffers "
706 "in %s channel (cpu %u).\n",
707 ltt_chan->channel_name,
708 local_read(&ltt_buf->corrupted_subbuffers),
709 ltt_chan->channel_name, cpu);
710
711 ltt_relay_print_errors(trace, ltt_chan, cpu);
712 }
713
714 //ust// static void ltt_relay_remove_dirs(struct ltt_trace_struct *trace)
715 //ust// {
716 //ust// debugfs_remove(trace->dentry.trace_root);
717 //ust// }
718
719 static void ltt_relay_release_channel(struct kref *kref)
720 {
721 struct ltt_channel_struct *ltt_chan = container_of(kref,
722 struct ltt_channel_struct, kref);
723 percpu_free(ltt_chan->buf);
724 }
725
726 /*
727 * Create ltt buffer.
728 */
729 //ust// static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
730 //ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
731 //ust// unsigned int cpu, unsigned int n_subbufs)
732 //ust// {
733 //ust// struct ltt_channel_buf_struct *ltt_buf =
734 //ust// percpu_ptr(ltt_chan->buf, cpu);
735 //ust// unsigned int j;
736 //ust//
737 //ust// ltt_buf->commit_count =
738 //ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
739 //ust// GFP_KERNEL, cpu_to_node(cpu));
740 //ust// if (!ltt_buf->commit_count)
741 //ust// return -ENOMEM;
742 //ust// kref_get(&trace->kref);
743 //ust// kref_get(&trace->ltt_transport_kref);
744 //ust// kref_get(&ltt_chan->kref);
745 //ust// local_set(&ltt_buf->offset, ltt_subbuffer_header_size());
746 //ust// atomic_long_set(&ltt_buf->consumed, 0);
747 //ust// atomic_long_set(&ltt_buf->active_readers, 0);
748 //ust// for (j = 0; j < n_subbufs; j++)
749 //ust// local_set(&ltt_buf->commit_count[j], 0);
750 //ust// init_waitqueue_head(&ltt_buf->write_wait);
751 //ust// atomic_set(&ltt_buf->wakeup_readers, 0);
752 //ust// spin_lock_init(&ltt_buf->full_lock);
753 //ust//
754 //ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
755 //ust// /* atomic_add made on local variable on data that belongs to
756 //ust// * various CPUs : ok because tracing not started (for this cpu). */
757 //ust// local_add(ltt_subbuffer_header_size(), &ltt_buf->commit_count[0]);
758 //ust//
759 //ust// local_set(&ltt_buf->events_lost, 0);
760 //ust// local_set(&ltt_buf->corrupted_subbuffers, 0);
761 //ust//
762 //ust// return 0;
763 //ust// }
764
765 static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
766 struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
767 unsigned int cpu, unsigned int n_subbufs)
768 {
769 struct ltt_channel_buf_struct *ltt_buf = ltt_chan->buf;
770 unsigned int j;
771
772 ltt_buf->commit_count =
773 malloc(sizeof(ltt_buf->commit_count) * n_subbufs);
774 if (!ltt_buf->commit_count)
775 return -ENOMEM;
776 kref_get(&trace->kref);
777 kref_get(&trace->ltt_transport_kref);
778 kref_get(&ltt_chan->kref);
779 ltt_buf->offset = ltt_subbuffer_header_size();
780 atomic_long_set(&ltt_buf->consumed, 0);
781 atomic_long_set(&ltt_buf->active_readers, 0);
782 for (j = 0; j < n_subbufs; j++)
783 local_set(&ltt_buf->commit_count[j], 0);
784 //ust// init_waitqueue_head(&ltt_buf->write_wait);
785 atomic_set(&ltt_buf->wakeup_readers, 0);
786 spin_lock_init(&ltt_buf->full_lock);
787
788 ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
789
790 ltt_buf->commit_count[0] += ltt_subbuffer_header_size();
791
792 ltt_buf->events_lost = 0;
793 ltt_buf->corrupted_subbuffers = 0;
794
795 return 0;
796 }
797
798 static void ltt_relay_destroy_buffer(struct ltt_channel_struct *ltt_chan,
799 unsigned int cpu)
800 {
801 struct ltt_trace_struct *trace = ltt_chan->trace;
802 struct ltt_channel_buf_struct *ltt_buf =
803 percpu_ptr(ltt_chan->buf, cpu);
804
805 kref_put(&ltt_chan->trace->ltt_transport_kref,
806 ltt_release_transport);
807 ltt_relay_print_buffer_errors(ltt_chan, cpu);
808 kfree(ltt_buf->commit_count);
809 ltt_buf->commit_count = NULL;
810 kref_put(&ltt_chan->kref, ltt_relay_release_channel);
811 kref_put(&trace->kref, ltt_release_trace);
812 wake_up_interruptible(&trace->kref_wq);
813 }
814
815 /*
816 * Create channel.
817 */
818 static int ltt_relay_create_channel(const char *trace_name,
819 struct ltt_trace_struct *trace, struct dentry *dir,
820 const char *channel_name, struct ltt_channel_struct *ltt_chan,
821 unsigned int subbuf_size, unsigned int n_subbufs,
822 int overwrite)
823 {
824 char *tmpname;
825 unsigned int tmpname_len;
826 int err = 0;
827
828 tmpname = kmalloc(PATH_MAX, GFP_KERNEL);
829 if (!tmpname)
830 return EPERM;
831 if (overwrite) {
832 strncpy(tmpname, LTT_FLIGHT_PREFIX, PATH_MAX-1);
833 strncat(tmpname, channel_name,
834 PATH_MAX-1-sizeof(LTT_FLIGHT_PREFIX));
835 } else {
836 strncpy(tmpname, channel_name, PATH_MAX-1);
837 }
838 strncat(tmpname, "_", PATH_MAX-1-strlen(tmpname));
839
840 kref_init(&ltt_chan->kref);
841
842 ltt_chan->trace = trace;
843 ltt_chan->buffer_begin = ltt_buffer_begin_callback;
844 ltt_chan->buffer_end = ltt_buffer_end_callback;
845 ltt_chan->overwrite = overwrite;
846 ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
847 ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
848 ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct),
849 GFP_KERNEL, cpu_possible_map);
850 if (!ltt_chan->buf)
851 goto ltt_percpu_alloc_error;
852 ltt_chan->trans_channel_data = ltt_relay_open(tmpname,
853 dir,
854 subbuf_size,
855 n_subbufs,
856 &trace->callbacks,
857 ltt_chan);
858 tmpname_len = strlen(tmpname);
859 if (tmpname_len > 0) {
860 /* Remove final _ for pretty printing */
861 tmpname[tmpname_len-1] = '\0';
862 }
863 if (ltt_chan->trans_channel_data == NULL) {
864 printk(KERN_ERR "LTT : Can't open %s channel for trace %s\n",
865 tmpname, trace_name);
866 goto relay_open_error;
867 }
868
869 err = 0;
870 goto end;
871
872 relay_open_error:
873 percpu_free(ltt_chan->buf);
874 ltt_percpu_alloc_error:
875 err = EPERM;
876 end:
877 kfree(tmpname);
878 return err;
879 }
880
881 //ust// static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
882 //ust// {
883 //ust// new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
884 //ust// get_ltt_root());
885 //ust// if (new_trace->dentry.trace_root == NULL) {
886 //ust// printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
887 //ust// new_trace->trace_name);
888 //ust// return EEXIST;
889 //ust// }
890 //ust//
891 //ust// new_trace->callbacks.create_buf_file = ltt_create_buf_file_callback;
892 //ust// new_trace->callbacks.remove_buf_file = ltt_remove_buf_file_callback;
893 //ust//
894 //ust// return 0;
895 //ust// }
896
897 /*
898 * LTTng channel flush function.
899 *
900 * Must be called when no tracing is active in the channel, because of
901 * accesses across CPUs.
902 */
903 static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf)
904 {
905 buf->finalized = 1;
906 ltt_force_switch(buf, FORCE_FLUSH);
907 }
908
909 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
910 {
911 unsigned int i;
912 struct rchan *rchan = ltt_channel->trans_channel_data;
913
914 for_each_possible_cpu(i) {
915 struct ltt_channel_buf_struct *ltt_buf =
916 percpu_ptr(ltt_channel->buf, i);
917
918 if (atomic_read(&ltt_buf->wakeup_readers) == 1) {
919 atomic_set(&ltt_buf->wakeup_readers, 0);
920 wake_up_interruptible(&rchan->buf[i]->read_wait);
921 }
922 }
923 }
924
925 static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel,
926 unsigned int cpu)
927 {
928 struct rchan *rchan = ltt_channel->trans_channel_data;
929
930 if (rchan->buf[cpu]) {
931 struct ltt_channel_buf_struct *ltt_buf =
932 percpu_ptr(ltt_channel->buf, cpu);
933 ltt_relay_buffer_flush(rchan->buf[cpu]);
934 ltt_relay_wake_writers(ltt_buf);
935 }
936 }
937
938
939 static void ltt_relay_finish_channel(struct ltt_channel_struct *ltt_channel)
940 {
941 unsigned int i;
942
943 for_each_possible_cpu(i)
944 ltt_relay_finish_buffer(ltt_channel, i);
945 }
946
947 static void ltt_relay_remove_channel(struct ltt_channel_struct *channel)
948 {
949 struct rchan *rchan = channel->trans_channel_data;
950
951 ltt_relay_close(rchan);
952 kref_put(&channel->kref, ltt_relay_release_channel);
953 }
954
955 struct ltt_reserve_switch_offsets {
956 long begin, end, old;
957 long begin_switch, end_switch_current, end_switch_old;
958 long commit_count, reserve_commit_diff;
959 size_t before_hdr_pad, size;
960 };
961
962 /*
963 * Returns :
964 * 0 if ok
965 * !0 if execution must be aborted.
966 */
967 static inline int ltt_relay_try_reserve(
968 struct ltt_channel_struct *ltt_channel,
969 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
970 struct rchan_buf *buf,
971 struct ltt_reserve_switch_offsets *offsets, size_t data_size,
972 u64 *tsc, unsigned int *rflags, int largest_align)
973 {
974 offsets->begin = local_read(&ltt_buf->offset);
975 offsets->old = offsets->begin;
976 offsets->begin_switch = 0;
977 offsets->end_switch_current = 0;
978 offsets->end_switch_old = 0;
979
980 *tsc = trace_clock_read64();
981 if (last_tsc_overflow(ltt_buf, *tsc))
982 *rflags = LTT_RFLAG_ID_SIZE_TSC;
983
984 if (SUBBUF_OFFSET(offsets->begin, buf->chan) == 0) {
985 offsets->begin_switch = 1; /* For offsets->begin */
986 } else {
987 offsets->size = ltt_get_header_size(ltt_channel,
988 offsets->begin, data_size,
989 &offsets->before_hdr_pad, *rflags);
990 offsets->size += ltt_align(offsets->begin + offsets->size,
991 largest_align)
992 + data_size;
993 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
994 > buf->chan->subbuf_size) {
995 offsets->end_switch_old = 1; /* For offsets->old */
996 offsets->begin_switch = 1; /* For offsets->begin */
997 }
998 }
999 if (offsets->begin_switch) {
1000 long subbuf_index;
1001
1002 if (offsets->end_switch_old)
1003 offsets->begin = SUBBUF_ALIGN(offsets->begin,
1004 buf->chan);
1005 offsets->begin = offsets->begin + ltt_subbuffer_header_size();
1006 /* Test new buffer integrity */
1007 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
1008 offsets->reserve_commit_diff =
1009 (BUFFER_TRUNC(offsets->begin, buf->chan)
1010 >> ltt_channel->n_subbufs_order)
1011 - (local_read(&ltt_buf->commit_count[subbuf_index])
1012 & ltt_channel->commit_count_mask);
1013 if (offsets->reserve_commit_diff == 0) {
1014 /* Next buffer not corrupted. */
1015 if (!ltt_channel->overwrite &&
1016 (SUBBUF_TRUNC(offsets->begin, buf->chan)
1017 - SUBBUF_TRUNC(atomic_long_read(
1018 &ltt_buf->consumed),
1019 buf->chan))
1020 >= rchan->alloc_size) {
1021 /*
1022 * We do not overwrite non consumed buffers
1023 * and we are full : event is lost.
1024 */
1025 local_inc(&ltt_buf->events_lost);
1026 return -1;
1027 } else {
1028 /*
1029 * next buffer not corrupted, we are either in
1030 * overwrite mode or the buffer is not full.
1031 * It's safe to write in this new subbuffer.
1032 */
1033 }
1034 } else {
1035 /*
1036 * Next subbuffer corrupted. Force pushing reader even
1037 * in normal mode. It's safe to write in this new
1038 * subbuffer.
1039 */
1040 }
1041 offsets->size = ltt_get_header_size(ltt_channel,
1042 offsets->begin, data_size,
1043 &offsets->before_hdr_pad, *rflags);
1044 offsets->size += ltt_align(offsets->begin + offsets->size,
1045 largest_align)
1046 + data_size;
1047 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
1048 > buf->chan->subbuf_size) {
1049 /*
1050 * Event too big for subbuffers, report error, don't
1051 * complete the sub-buffer switch.
1052 */
1053 local_inc(&ltt_buf->events_lost);
1054 return -1;
1055 } else {
1056 /*
1057 * We just made a successful buffer switch and the event
1058 * fits in the new subbuffer. Let's write.
1059 */
1060 }
1061 } else {
1062 /*
1063 * Event fits in the current buffer and we are not on a switch
1064 * boundary. It's safe to write.
1065 */
1066 }
1067 offsets->end = offsets->begin + offsets->size;
1068
1069 if ((SUBBUF_OFFSET(offsets->end, buf->chan)) == 0) {
1070 /*
1071 * The offset_end will fall at the very beginning of the next
1072 * subbuffer.
1073 */
1074 offsets->end_switch_current = 1; /* For offsets->begin */
1075 }
1076 return 0;
1077 }
1078
1079 /*
1080 * Returns :
1081 * 0 if ok
1082 * !0 if execution must be aborted.
1083 */
1084 static inline int ltt_relay_try_switch(
1085 enum force_switch_mode mode,
1086 struct ltt_channel_struct *ltt_channel,
1087 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1088 struct rchan_buf *buf,
1089 struct ltt_reserve_switch_offsets *offsets,
1090 u64 *tsc)
1091 {
1092 long subbuf_index;
1093
1094 offsets->begin = local_read(&ltt_buf->offset);
1095 offsets->old = offsets->begin;
1096 offsets->begin_switch = 0;
1097 offsets->end_switch_old = 0;
1098
1099 *tsc = trace_clock_read64();
1100
1101 if (SUBBUF_OFFSET(offsets->begin, buf->chan) != 0) {
1102 offsets->begin = SUBBUF_ALIGN(offsets->begin, buf->chan);
1103 offsets->end_switch_old = 1;
1104 } else {
1105 /* we do not have to switch : buffer is empty */
1106 return -1;
1107 }
1108 if (mode == FORCE_ACTIVE)
1109 offsets->begin += ltt_subbuffer_header_size();
1110 /*
1111 * Always begin_switch in FORCE_ACTIVE mode.
1112 * Test new buffer integrity
1113 */
1114 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
1115 offsets->reserve_commit_diff =
1116 (BUFFER_TRUNC(offsets->begin, buf->chan)
1117 >> ltt_channel->n_subbufs_order)
1118 - (local_read(&ltt_buf->commit_count[subbuf_index])
1119 & ltt_channel->commit_count_mask);
1120 if (offsets->reserve_commit_diff == 0) {
1121 /* Next buffer not corrupted. */
1122 if (mode == FORCE_ACTIVE
1123 && !ltt_channel->overwrite
1124 && offsets->begin - atomic_long_read(&ltt_buf->consumed)
1125 >= rchan->alloc_size) {
1126 /*
1127 * We do not overwrite non consumed buffers and we are
1128 * full : ignore switch while tracing is active.
1129 */
1130 return -1;
1131 }
1132 } else {
1133 /*
1134 * Next subbuffer corrupted. Force pushing reader even in normal
1135 * mode
1136 */
1137 }
1138 offsets->end = offsets->begin;
1139 return 0;
1140 }
1141
1142 static inline void ltt_reserve_push_reader(
1143 struct ltt_channel_struct *ltt_channel,
1144 struct ltt_channel_buf_struct *ltt_buf,
1145 struct rchan *rchan,
1146 struct rchan_buf *buf,
1147 struct ltt_reserve_switch_offsets *offsets)
1148 {
1149 long consumed_old, consumed_new;
1150
1151 do {
1152 consumed_old = atomic_long_read(&ltt_buf->consumed);
1153 /*
1154 * If buffer is in overwrite mode, push the reader consumed
1155 * count if the write position has reached it and we are not
1156 * at the first iteration (don't push the reader farther than
1157 * the writer). This operation can be done concurrently by many
1158 * writers in the same buffer, the writer being at the farthest
1159 * write position sub-buffer index in the buffer being the one
1160 * which will win this loop.
1161 * If the buffer is not in overwrite mode, pushing the reader
1162 * only happens if a sub-buffer is corrupted.
1163 */
1164 if ((SUBBUF_TRUNC(offsets->end-1, buf->chan)
1165 - SUBBUF_TRUNC(consumed_old, buf->chan))
1166 >= rchan->alloc_size)
1167 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
1168 else {
1169 consumed_new = consumed_old;
1170 break;
1171 }
1172 } while (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
1173 consumed_new) != consumed_old);
1174
1175 if (consumed_old != consumed_new) {
1176 /*
1177 * Reader pushed : we are the winner of the push, we can
1178 * therefore reequilibrate reserve and commit. Atomic increment
1179 * of the commit count permits other writers to play around
1180 * with this variable before us. We keep track of
1181 * corrupted_subbuffers even in overwrite mode :
1182 * we never want to write over a non completely committed
1183 * sub-buffer : possible causes : the buffer size is too low
1184 * compared to the unordered data input, or there is a writer
1185 * that died between the reserve and the commit.
1186 */
1187 if (offsets->reserve_commit_diff) {
1188 /*
1189 * We have to alter the sub-buffer commit count.
1190 * We do not deliver the previous subbuffer, given it
1191 * was either corrupted or not consumed (overwrite
1192 * mode).
1193 */
1194 local_add(offsets->reserve_commit_diff,
1195 &ltt_buf->commit_count[
1196 SUBBUF_INDEX(offsets->begin,
1197 buf->chan)]);
1198 if (!ltt_channel->overwrite
1199 || offsets->reserve_commit_diff
1200 != rchan->subbuf_size) {
1201 /*
1202 * The reserve commit diff was not subbuf_size :
1203 * it means the subbuffer was partly written to
1204 * and is therefore corrupted. If it is multiple
1205 * of subbuffer size and we are in flight
1206 * recorder mode, we are skipping over a whole
1207 * subbuffer.
1208 */
1209 local_inc(&ltt_buf->corrupted_subbuffers);
1210 }
1211 }
1212 }
1213 }
1214
1215
1216 /*
1217 * ltt_reserve_switch_old_subbuf: switch old subbuffer
1218 *
1219 * Concurrency safe because we are the last and only thread to alter this
1220 * sub-buffer. As long as it is not delivered and read, no other thread can
1221 * alter the offset, alter the reserve_count or call the
1222 * client_buffer_end_callback on this sub-buffer.
1223 *
1224 * The only remaining threads could be the ones with pending commits. They will
1225 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1226 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1227 * corrupted sub-buffers count and push the readers across these sub-buffers.
1228 *
1229 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1230 * switches in, finding out it's corrupted. The result will be than the old
1231 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1232 * will be declared corrupted too because of the commit count adjustment.
1233 *
1234 * Note : offset_old should never be 0 here.
1235 */
1236 static inline void ltt_reserve_switch_old_subbuf(
1237 struct ltt_channel_struct *ltt_channel,
1238 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1239 struct rchan_buf *buf,
1240 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1241 {
1242 long oldidx = SUBBUF_INDEX(offsets->old - 1, rchan);
1243
1244 ltt_channel->buffer_end(buf, *tsc, offsets->old, oldidx);
1245 /* Must write buffer end before incrementing commit count */
1246 smp_wmb();
1247 offsets->commit_count =
1248 local_add_return(rchan->subbuf_size
1249 - (SUBBUF_OFFSET(offsets->old - 1, rchan)
1250 + 1),
1251 &ltt_buf->commit_count[oldidx]);
1252 if ((BUFFER_TRUNC(offsets->old - 1, rchan)
1253 >> ltt_channel->n_subbufs_order)
1254 - ((offsets->commit_count - rchan->subbuf_size)
1255 & ltt_channel->commit_count_mask) == 0)
1256 ltt_deliver(buf, oldidx, NULL);
1257 }
1258
1259 /*
1260 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
1261 *
1262 * This code can be executed unordered : writers may already have written to the
1263 * sub-buffer before this code gets executed, caution. The commit makes sure
1264 * that this code is executed before the deliver of this sub-buffer.
1265 */
1266 static inline void ltt_reserve_switch_new_subbuf(
1267 struct ltt_channel_struct *ltt_channel,
1268 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1269 struct rchan_buf *buf,
1270 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1271 {
1272 long beginidx = SUBBUF_INDEX(offsets->begin, rchan);
1273
1274 ltt_channel->buffer_begin(buf, *tsc, beginidx);
1275 /* Must write buffer end before incrementing commit count */
1276 smp_wmb();
1277 offsets->commit_count = local_add_return(ltt_subbuffer_header_size(),
1278 &ltt_buf->commit_count[beginidx]);
1279 /* Check if the written buffer has to be delivered */
1280 if ((BUFFER_TRUNC(offsets->begin, rchan)
1281 >> ltt_channel->n_subbufs_order)
1282 - ((offsets->commit_count - rchan->subbuf_size)
1283 & ltt_channel->commit_count_mask) == 0)
1284 ltt_deliver(buf, beginidx, NULL);
1285 }
1286
1287
1288 /*
1289 * ltt_reserve_end_switch_current: finish switching current subbuffer
1290 *
1291 * Concurrency safe because we are the last and only thread to alter this
1292 * sub-buffer. As long as it is not delivered and read, no other thread can
1293 * alter the offset, alter the reserve_count or call the
1294 * client_buffer_end_callback on this sub-buffer.
1295 *
1296 * The only remaining threads could be the ones with pending commits. They will
1297 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1298 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1299 * corrupted sub-buffers count and push the readers across these sub-buffers.
1300 *
1301 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1302 * switches in, finding out it's corrupted. The result will be than the old
1303 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1304 * will be declared corrupted too because of the commit count adjustment.
1305 */
1306 static inline void ltt_reserve_end_switch_current(
1307 struct ltt_channel_struct *ltt_channel,
1308 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1309 struct rchan_buf *buf,
1310 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1311 {
1312 long endidx = SUBBUF_INDEX(offsets->end - 1, rchan);
1313
1314 ltt_channel->buffer_end(buf, *tsc, offsets->end, endidx);
1315 /* Must write buffer begin before incrementing commit count */
1316 smp_wmb();
1317 offsets->commit_count =
1318 local_add_return(rchan->subbuf_size
1319 - (SUBBUF_OFFSET(offsets->end - 1, rchan)
1320 + 1),
1321 &ltt_buf->commit_count[endidx]);
1322 if ((BUFFER_TRUNC(offsets->end - 1, rchan)
1323 >> ltt_channel->n_subbufs_order)
1324 - ((offsets->commit_count - rchan->subbuf_size)
1325 & ltt_channel->commit_count_mask) == 0)
1326 ltt_deliver(buf, endidx, NULL);
1327 }
1328
1329 /**
1330 * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
1331 * @trace: the trace structure to log to.
1332 * @ltt_channel: channel structure
1333 * @transport_data: data structure specific to ltt relay
1334 * @data_size: size of the variable length data to log.
1335 * @slot_size: pointer to total size of the slot (out)
1336 * @buf_offset : pointer to reserved buffer offset (out)
1337 * @tsc: pointer to the tsc at the slot reservation (out)
1338 * @cpu: cpuid
1339 *
1340 * Return : -ENOSPC if not enough space, else returns 0.
1341 * It will take care of sub-buffer switching.
1342 */
1343 static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
1344 struct ltt_channel_struct *ltt_channel, void **transport_data,
1345 size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
1346 unsigned int *rflags, int largest_align, int cpu)
1347 {
1348 struct rchan *rchan = ltt_channel->trans_channel_data;
1349 struct rchan_buf *buf = *transport_data =
1350 rchan->buf[cpu];
1351 struct ltt_channel_buf_struct *ltt_buf =
1352 percpu_ptr(ltt_channel->buf, buf->cpu);
1353 struct ltt_reserve_switch_offsets offsets;
1354
1355 offsets.reserve_commit_diff = 0;
1356 offsets.size = 0;
1357
1358 /*
1359 * Perform retryable operations.
1360 */
1361 if (__get_cpu_var(ltt_nesting) > 4) {
1362 local_inc(&ltt_buf->events_lost);
1363 return -EPERM;
1364 }
1365 do {
1366 if (ltt_relay_try_reserve(ltt_channel, ltt_buf,
1367 rchan, buf, &offsets, data_size, tsc, rflags,
1368 largest_align))
1369 return -ENOSPC;
1370 } while (local_cmpxchg(&ltt_buf->offset, offsets.old,
1371 offsets.end) != offsets.old);
1372
1373 /*
1374 * Atomically update last_tsc. This update races against concurrent
1375 * atomic updates, but the race will always cause supplementary full TSC
1376 * events, never the opposite (missing a full TSC event when it would be
1377 * needed).
1378 */
1379 save_last_tsc(ltt_buf, *tsc);
1380
1381 /*
1382 * Push the reader if necessary
1383 */
1384 ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf, &offsets);
1385
1386 /*
1387 * Switch old subbuffer if needed.
1388 */
1389 if (offsets.end_switch_old)
1390 ltt_reserve_switch_old_subbuf(ltt_channel, ltt_buf, rchan, buf,
1391 &offsets, tsc);
1392
1393 /*
1394 * Populate new subbuffer.
1395 */
1396 if (offsets.begin_switch)
1397 ltt_reserve_switch_new_subbuf(ltt_channel, ltt_buf, rchan,
1398 buf, &offsets, tsc);
1399
1400 if (offsets.end_switch_current)
1401 ltt_reserve_end_switch_current(ltt_channel, ltt_buf, rchan,
1402 buf, &offsets, tsc);
1403
1404 *slot_size = offsets.size;
1405 *buf_offset = offsets.begin + offsets.before_hdr_pad;
1406 return 0;
1407 }
1408
1409 /*
1410 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1411 * completely reentrant : can be called while tracing is active with
1412 * absolutely no lock held.
1413 *
1414 * Note, however, that as a local_cmpxchg is used for some atomic
1415 * operations, this function must be called from the CPU which owns the buffer
1416 * for a ACTIVE flush.
1417 */
1418 static notrace void ltt_force_switch(struct rchan_buf *buf,
1419 enum force_switch_mode mode)
1420 {
1421 struct ltt_channel_struct *ltt_channel =
1422 (struct ltt_channel_struct *)buf->chan->private_data;
1423 struct ltt_channel_buf_struct *ltt_buf =
1424 percpu_ptr(ltt_channel->buf, buf->cpu);
1425 struct rchan *rchan = ltt_channel->trans_channel_data;
1426 struct ltt_reserve_switch_offsets offsets;
1427 u64 tsc;
1428
1429 offsets.reserve_commit_diff = 0;
1430 offsets.size = 0;
1431
1432 /*
1433 * Perform retryable operations.
1434 */
1435 do {
1436 if (ltt_relay_try_switch(mode, ltt_channel, ltt_buf,
1437 rchan, buf, &offsets, &tsc))
1438 return;
1439 } while (local_cmpxchg(&ltt_buf->offset, offsets.old,
1440 offsets.end) != offsets.old);
1441
1442 /*
1443 * Atomically update last_tsc. This update races against concurrent
1444 * atomic updates, but the race will always cause supplementary full TSC
1445 * events, never the opposite (missing a full TSC event when it would be
1446 * needed).
1447 */
1448 save_last_tsc(ltt_buf, tsc);
1449
1450 /*
1451 * Push the reader if necessary
1452 */
1453 if (mode == FORCE_ACTIVE)
1454 ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan,
1455 buf, &offsets);
1456
1457 /*
1458 * Switch old subbuffer if needed.
1459 */
1460 if (offsets.end_switch_old)
1461 ltt_reserve_switch_old_subbuf(ltt_channel, ltt_buf, rchan, buf,
1462 &offsets, &tsc);
1463
1464 /*
1465 * Populate new subbuffer.
1466 */
1467 if (mode == FORCE_ACTIVE)
1468 ltt_reserve_switch_new_subbuf(ltt_channel,
1469 ltt_buf, rchan, buf, &offsets, &tsc);
1470 }
1471
1472 /*
1473 * for flight recording. must be called after relay_commit.
1474 * This function decrements de subbuffer's lost_size each time the commit count
1475 * reaches back the reserve offset (module subbuffer size). It is useful for
1476 * crash dump.
1477 * We use slot_size - 1 to make sure we deal correctly with the case where we
1478 * fill the subbuffer completely (so the subbuf index stays in the previous
1479 * subbuffer).
1480 */
1481 #ifdef CONFIG_LTT_VMCORE
1482 static inline void ltt_write_commit_counter(struct rchan_buf *buf,
1483 long buf_offset, size_t slot_size)
1484 {
1485 struct ltt_channel_struct *ltt_channel =
1486 (struct ltt_channel_struct *)buf->chan->private_data;
1487 struct ltt_channel_buf_struct *ltt_buf =
1488 percpu_ptr(ltt_channel->buf, buf->cpu);
1489 struct ltt_subbuffer_header *header;
1490 long offset, subbuf_idx, commit_count;
1491 uint32_t lost_old, lost_new;
1492
1493 subbuf_idx = SUBBUF_INDEX(buf_offset - 1, buf->chan);
1494 offset = buf_offset + slot_size;
1495 header = (struct ltt_subbuffer_header *)
1496 ltt_relay_offset_address(buf,
1497 subbuf_idx * buf->chan->subbuf_size);
1498 for (;;) {
1499 lost_old = header->lost_size;
1500 commit_count =
1501 local_read(&ltt_buf->commit_count[subbuf_idx]);
1502 /* SUBBUF_OFFSET includes commit_count_mask */
1503 if (!SUBBUF_OFFSET(offset - commit_count, buf->chan)) {
1504 lost_new = (uint32_t)buf->chan->subbuf_size
1505 - SUBBUF_OFFSET(commit_count, buf->chan);
1506 lost_old = cmpxchg_local(&header->lost_size, lost_old,
1507 lost_new);
1508 if (lost_old <= lost_new)
1509 break;
1510 } else {
1511 break;
1512 }
1513 }
1514 }
1515 #else
1516 static inline void ltt_write_commit_counter(struct rchan_buf *buf,
1517 long buf_offset, size_t slot_size)
1518 {
1519 }
1520 #endif
1521
1522 /*
1523 * Atomic unordered slot commit. Increments the commit count in the
1524 * specified sub-buffer, and delivers it if necessary.
1525 *
1526 * Parameters:
1527 *
1528 * @ltt_channel : channel structure
1529 * @transport_data: transport-specific data
1530 * @buf_offset : offset following the event header.
1531 * @slot_size : size of the reserved slot.
1532 */
1533 static notrace void ltt_relay_commit_slot(
1534 struct ltt_channel_struct *ltt_channel,
1535 void **transport_data, long buf_offset, size_t slot_size)
1536 {
1537 struct rchan_buf *buf = *transport_data;
1538 struct ltt_channel_buf_struct *ltt_buf =
1539 percpu_ptr(ltt_channel->buf, buf->cpu);
1540 struct rchan *rchan = buf->chan;
1541 long offset_end = buf_offset;
1542 long endidx = SUBBUF_INDEX(offset_end - 1, rchan);
1543 long commit_count;
1544
1545 /* Must write slot data before incrementing commit count */
1546 smp_wmb();
1547 commit_count = local_add_return(slot_size,
1548 &ltt_buf->commit_count[endidx]);
1549 /* Check if all commits have been done */
1550 if ((BUFFER_TRUNC(offset_end - 1, rchan)
1551 >> ltt_channel->n_subbufs_order)
1552 - ((commit_count - rchan->subbuf_size)
1553 & ltt_channel->commit_count_mask) == 0)
1554 ltt_deliver(buf, endidx, NULL);
1555 /*
1556 * Update lost_size for each commit. It's needed only for extracting
1557 * ltt buffers from vmcore, after crash.
1558 */
1559 ltt_write_commit_counter(buf, buf_offset, slot_size);
1560 }
1561
1562 /*
1563 * This is called with preemption disabled when user space has requested
1564 * blocking mode. If one of the active traces has free space below a
1565 * specific threshold value, we reenable preemption and block.
1566 */
1567 static int ltt_relay_user_blocking(struct ltt_trace_struct *trace,
1568 unsigned int chan_index, size_t data_size,
1569 struct user_dbg_data *dbg)
1570 {
1571 struct rchan *rchan;
1572 struct ltt_channel_buf_struct *ltt_buf;
1573 struct ltt_channel_struct *channel;
1574 struct rchan_buf *relay_buf;
1575 int cpu;
1576 DECLARE_WAITQUEUE(wait, current);
1577
1578 channel = &trace->channels[chan_index];
1579 rchan = channel->trans_channel_data;
1580 cpu = smp_processor_id();
1581 relay_buf = rchan->buf[cpu];
1582 ltt_buf = percpu_ptr(channel->buf, cpu);
1583
1584 /*
1585 * Check if data is too big for the channel : do not
1586 * block for it.
1587 */
1588 if (LTT_RESERVE_CRITICAL + data_size > relay_buf->chan->subbuf_size)
1589 return 0;
1590
1591 /*
1592 * If free space too low, we block. We restart from the
1593 * beginning after we resume (cpu id may have changed
1594 * while preemption is active).
1595 */
1596 spin_lock(&ltt_buf->full_lock);
1597 if (!channel->overwrite) {
1598 dbg->write = local_read(&ltt_buf->offset);
1599 dbg->read = atomic_long_read(&ltt_buf->consumed);
1600 dbg->avail_size = dbg->write + LTT_RESERVE_CRITICAL + data_size
1601 - SUBBUF_TRUNC(dbg->read,
1602 relay_buf->chan);
1603 if (dbg->avail_size > rchan->alloc_size) {
1604 __set_current_state(TASK_INTERRUPTIBLE);
1605 add_wait_queue(&ltt_buf->write_wait, &wait);
1606 spin_unlock(&ltt_buf->full_lock);
1607 preempt_enable();
1608 schedule();
1609 __set_current_state(TASK_RUNNING);
1610 remove_wait_queue(&ltt_buf->write_wait, &wait);
1611 if (signal_pending(current))
1612 return -ERESTARTSYS;
1613 preempt_disable();
1614 return 1;
1615 }
1616 }
1617 spin_unlock(&ltt_buf->full_lock);
1618 return 0;
1619 }
1620
1621 static void ltt_relay_print_user_errors(struct ltt_trace_struct *trace,
1622 unsigned int chan_index, size_t data_size,
1623 struct user_dbg_data *dbg, int cpu)
1624 {
1625 struct rchan *rchan;
1626 struct ltt_channel_buf_struct *ltt_buf;
1627 struct ltt_channel_struct *channel;
1628 struct rchan_buf *relay_buf;
1629
1630 channel = &trace->channels[chan_index];
1631 rchan = channel->trans_channel_data;
1632 relay_buf = rchan->buf[cpu];
1633 ltt_buf = percpu_ptr(channel->buf, cpu);
1634
1635 printk(KERN_ERR "Error in LTT usertrace : "
1636 "buffer full : event lost in blocking "
1637 "mode. Increase LTT_RESERVE_CRITICAL.\n");
1638 printk(KERN_ERR "LTT nesting level is %u.\n",
1639 per_cpu(ltt_nesting, cpu));
1640 printk(KERN_ERR "LTT avail size %lu.\n",
1641 dbg->avail_size);
1642 printk(KERN_ERR "avai write : %lu, read : %lu\n",
1643 dbg->write, dbg->read);
1644
1645 dbg->write = local_read(&ltt_buf->offset);
1646 dbg->read = atomic_long_read(&ltt_buf->consumed);
1647
1648 printk(KERN_ERR "LTT cur size %lu.\n",
1649 dbg->write + LTT_RESERVE_CRITICAL + data_size
1650 - SUBBUF_TRUNC(dbg->read, relay_buf->chan));
1651 printk(KERN_ERR "cur write : %lu, read : %lu\n",
1652 dbg->write, dbg->read);
1653 }
1654
1655 //ust// static struct ltt_transport ltt_relay_transport = {
1656 //ust// .name = "relay",
1657 //ust// .owner = THIS_MODULE,
1658 //ust// .ops = {
1659 //ust// .create_dirs = ltt_relay_create_dirs,
1660 //ust// .remove_dirs = ltt_relay_remove_dirs,
1661 //ust// .create_channel = ltt_relay_create_channel,
1662 //ust// .finish_channel = ltt_relay_finish_channel,
1663 //ust// .remove_channel = ltt_relay_remove_channel,
1664 //ust// .wakeup_channel = ltt_relay_async_wakeup_chan,
1665 //ust// .commit_slot = ltt_relay_commit_slot,
1666 //ust// .reserve_slot = ltt_relay_reserve_slot,
1667 //ust// .user_blocking = ltt_relay_user_blocking,
1668 //ust// .user_errors = ltt_relay_print_user_errors,
1669 //ust// },
1670 //ust// };
1671
1672 static struct ltt_transport ust_relay_transport = {
1673 .name = "ustrelay",
1674 .owner = THIS_MODULE,
1675 .ops = {
1676 .create_dirs = ltt_relay_create_dirs,
1677 .remove_dirs = ltt_relay_remove_dirs,
1678 .create_channel = ltt_relay_create_channel,
1679 .finish_channel = ltt_relay_finish_channel,
1680 .remove_channel = ltt_relay_remove_channel,
1681 .wakeup_channel = ltt_relay_async_wakeup_chan,
1682 .commit_slot = ltt_relay_commit_slot,
1683 .reserve_slot = ltt_relay_reserve_slot,
1684 .user_blocking = ltt_relay_user_blocking,
1685 .user_errors = ltt_relay_print_user_errors,
1686 },
1687 };
1688
1689 //ust// static int __init ltt_relay_init(void)
1690 //ust// {
1691 //ust// printk(KERN_INFO "LTT : ltt-relay init\n");
1692 //ust//
1693 //ust// ltt_file_operations = ltt_relay_file_operations;
1694 //ust// ltt_file_operations.owner = THIS_MODULE;
1695 //ust// ltt_file_operations.open = ltt_open;
1696 //ust// ltt_file_operations.release = ltt_release;
1697 //ust// ltt_file_operations.poll = ltt_poll;
1698 //ust// ltt_file_operations.splice_read = ltt_relay_file_splice_read,
1699 //ust// ltt_file_operations.ioctl = ltt_ioctl;
1700 //ust//#ifdef CONFIG_COMPAT
1701 //ust// ltt_file_operations.compat_ioctl = ltt_compat_ioctl;
1702 //ust//#endif
1703 //ust//
1704 //ust// ltt_transport_register(&ltt_relay_transport);
1705 //ust//
1706 //ust// return 0;
1707 //ust// }
1708
1709 void init_ustrelay_transport(void)
1710 {
1711 ltt_transport_register(&ust_relay_transport);
1712 }
1713
1714 static void __exit ltt_relay_exit(void)
1715 {
1716 //ust// printk(KERN_INFO "LTT : ltt-relay exit\n");
1717
1718 ltt_transport_unregister(&ltt_relay_transport);
1719 }
1720
1721 //ust// module_init(ltt_relay_init);
1722 //ust// module_exit(ltt_relay_exit);
1723 //ust//
1724 //ust// MODULE_LICENSE("GPL");
1725 //ust// MODULE_AUTHOR("Mathieu Desnoyers");
1726 //ust// MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");
This page took 0.104 seconds and 3 git commands to generate.