f8bff41adde6ad5dff6894fb03048a89255f9981
[lttng-ust.git] / libtracing / relay.c
1 /*
2 * ltt/ltt-relay.c
3 *
4 * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
5 *
6 * LTTng lockless buffer space management (reader/writer).
7 *
8 * Author:
9 * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
10 *
11 * Inspired from LTT :
12 * Karim Yaghmour (karim@opersys.com)
13 * Tom Zanussi (zanussi@us.ibm.com)
14 * Bob Wisniewski (bob@watson.ibm.com)
15 * And from K42 :
16 * Bob Wisniewski (bob@watson.ibm.com)
17 *
18 * Changelog:
19 * 08/10/08, Cleanup.
20 * 19/10/05, Complete lockless mechanism.
21 * 27/05/05, Modular redesign and rewrite.
22 *
23 * Userspace reader semantic :
24 * while (poll fd != POLLHUP) {
25 * - ioctl RELAY_GET_SUBBUF_SIZE
26 * while (1) {
27 * - ioctl GET_SUBBUF
28 * - splice 1 subbuffer worth of data to a pipe
29 * - splice the data from pipe to disk/network
30 * - ioctl PUT_SUBBUF, check error value
31 * if err val < 0, previous subbuffer was corrupted.
32 * }
33 * }
34 */
35
36 #include <linux/time.h>
37 #include <linux/ltt-tracer.h>
38 #include <linux/ltt-relay.h>
39 #include <linux/module.h>
40 #include <linux/string.h>
41 #include <linux/slab.h>
42 #include <linux/init.h>
43 #include <linux/rcupdate.h>
44 #include <linux/sched.h>
45 #include <linux/bitops.h>
46 #include <linux/fs.h>
47 #include <linux/smp_lock.h>
48 #include <linux/debugfs.h>
49 #include <linux/stat.h>
50 #include <linux/cpu.h>
51 #include <linux/pipe_fs_i.h>
52 #include <linux/splice.h>
53 #include <asm/atomic.h>
54 #include <asm/local.h>
55
56 #if 0
57 #define printk_dbg(fmt, args...) printk(fmt, args)
58 #else
59 #define printk_dbg(fmt, args...)
60 #endif
61
62 /* LTTng lockless logging buffer info */
63 struct ltt_channel_buf_struct {
64 /* First 32 bytes cache-hot cacheline */
65 local_t offset; /* Current offset in the buffer */
66 local_t *commit_count; /* Commit count per sub-buffer */
67 atomic_long_t consumed; /*
68 * Current offset in the buffer
69 * standard atomic access (shared)
70 */
71 unsigned long last_tsc; /*
72 * Last timestamp written in the buffer.
73 */
74 /* End of first 32 bytes cacheline */
75 atomic_long_t active_readers; /*
76 * Active readers count
77 * standard atomic access (shared)
78 */
79 local_t events_lost;
80 local_t corrupted_subbuffers;
81 spinlock_t full_lock; /*
82 * buffer full condition spinlock, only
83 * for userspace tracing blocking mode
84 * synchronization with reader.
85 */
86 wait_queue_head_t write_wait; /*
87 * Wait queue for blocking user space
88 * writers
89 */
90 atomic_t wakeup_readers; /* Boolean : wakeup readers waiting ? */
91 } ____cacheline_aligned;
92
93 /*
94 * Last TSC comparison functions. Check if the current TSC overflows
95 * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
96 * atomically.
97 */
98
99 #if (BITS_PER_LONG == 32)
100 static inline void save_last_tsc(struct ltt_channel_buf_struct *ltt_buf,
101 u64 tsc)
102 {
103 ltt_buf->last_tsc = (unsigned long)(tsc >> LTT_TSC_BITS);
104 }
105
106 static inline int last_tsc_overflow(struct ltt_channel_buf_struct *ltt_buf,
107 u64 tsc)
108 {
109 unsigned long tsc_shifted = (unsigned long)(tsc >> LTT_TSC_BITS);
110
111 if (unlikely((tsc_shifted - ltt_buf->last_tsc)))
112 return 1;
113 else
114 return 0;
115 }
116 #else
117 static inline void save_last_tsc(struct ltt_channel_buf_struct *ltt_buf,
118 u64 tsc)
119 {
120 ltt_buf->last_tsc = (unsigned long)tsc;
121 }
122
123 static inline int last_tsc_overflow(struct ltt_channel_buf_struct *ltt_buf,
124 u64 tsc)
125 {
126 if (unlikely((tsc - ltt_buf->last_tsc) >> LTT_TSC_BITS))
127 return 1;
128 else
129 return 0;
130 }
131 #endif
132
133 static struct file_operations ltt_file_operations;
134
135 /*
136 * A switch is done during tracing or as a final flush after tracing (so it
137 * won't write in the new sub-buffer).
138 */
139 enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
140
141 static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
142 struct ltt_channel_struct *ltt_chan,
143 struct rchan_buf *buf,
144 unsigned int cpu,
145 unsigned int n_subbufs);
146
147 static void ltt_relay_destroy_buffer(struct ltt_channel_struct *ltt_chan,
148 unsigned int cpu);
149
150 static void ltt_force_switch(struct rchan_buf *buf,
151 enum force_switch_mode mode);
152
153 /*
154 * Trace callbacks
155 */
156 static void ltt_buffer_begin_callback(struct rchan_buf *buf,
157 u64 tsc, unsigned int subbuf_idx)
158 {
159 struct ltt_channel_struct *channel =
160 (struct ltt_channel_struct *)buf->chan->private_data;
161 struct ltt_subbuffer_header *header =
162 (struct ltt_subbuffer_header *)
163 ltt_relay_offset_address(buf,
164 subbuf_idx * buf->chan->subbuf_size);
165
166 header->cycle_count_begin = tsc;
167 header->lost_size = 0xFFFFFFFF; /* for debugging */
168 header->buf_size = buf->chan->subbuf_size;
169 ltt_write_trace_header(channel->trace, header);
170 }
171
172 /*
173 * offset is assumed to never be 0 here : never deliver a completely empty
174 * subbuffer. The lost size is between 0 and subbuf_size-1.
175 */
176 static notrace void ltt_buffer_end_callback(struct rchan_buf *buf,
177 u64 tsc, unsigned int offset, unsigned int subbuf_idx)
178 {
179 struct ltt_channel_struct *channel =
180 (struct ltt_channel_struct *)buf->chan->private_data;
181 struct ltt_channel_buf_struct *ltt_buf =
182 percpu_ptr(channel->buf, buf->cpu);
183 struct ltt_subbuffer_header *header =
184 (struct ltt_subbuffer_header *)
185 ltt_relay_offset_address(buf,
186 subbuf_idx * buf->chan->subbuf_size);
187
188 header->lost_size = SUBBUF_OFFSET((buf->chan->subbuf_size - offset),
189 buf->chan);
190 header->cycle_count_end = tsc;
191 header->events_lost = local_read(&ltt_buf->events_lost);
192 header->subbuf_corrupt = local_read(&ltt_buf->corrupted_subbuffers);
193 }
194
195 static notrace void ltt_deliver(struct rchan_buf *buf, unsigned int subbuf_idx,
196 void *subbuf)
197 {
198 struct ltt_channel_struct *channel =
199 (struct ltt_channel_struct *)buf->chan->private_data;
200 struct ltt_channel_buf_struct *ltt_buf =
201 percpu_ptr(channel->buf, buf->cpu);
202
203 atomic_set(&ltt_buf->wakeup_readers, 1);
204 }
205
206 static struct dentry *ltt_create_buf_file_callback(const char *filename,
207 struct dentry *parent, int mode,
208 struct rchan_buf *buf)
209 {
210 struct ltt_channel_struct *ltt_chan;
211 int err;
212 struct dentry *dentry;
213
214 ltt_chan = buf->chan->private_data;
215 err = ltt_relay_create_buffer(ltt_chan->trace, ltt_chan,
216 buf, buf->cpu,
217 buf->chan->n_subbufs);
218 if (err)
219 return ERR_PTR(err);
220
221 dentry = debugfs_create_file(filename, mode, parent, buf,
222 &ltt_file_operations);
223 if (!dentry)
224 goto error;
225 return dentry;
226 error:
227 ltt_relay_destroy_buffer(ltt_chan, buf->cpu);
228 return NULL;
229 }
230
231 static int ltt_remove_buf_file_callback(struct dentry *dentry)
232 {
233 struct rchan_buf *buf = dentry->d_inode->i_private;
234 struct ltt_channel_struct *ltt_chan = buf->chan->private_data;
235
236 debugfs_remove(dentry);
237 ltt_relay_destroy_buffer(ltt_chan, buf->cpu);
238
239 return 0;
240 }
241
242 /*
243 * Wake writers :
244 *
245 * This must be done after the trace is removed from the RCU list so that there
246 * are no stalled writers.
247 */
248 static void ltt_relay_wake_writers(struct ltt_channel_buf_struct *ltt_buf)
249 {
250
251 if (waitqueue_active(&ltt_buf->write_wait))
252 wake_up_interruptible(&ltt_buf->write_wait);
253 }
254
255 /*
256 * This function should not be called from NMI interrupt context
257 */
258 static notrace void ltt_buf_unfull(struct rchan_buf *buf,
259 unsigned int subbuf_idx,
260 long offset)
261 {
262 struct ltt_channel_struct *ltt_channel =
263 (struct ltt_channel_struct *)buf->chan->private_data;
264 struct ltt_channel_buf_struct *ltt_buf =
265 percpu_ptr(ltt_channel->buf, buf->cpu);
266
267 ltt_relay_wake_writers(ltt_buf);
268 }
269
270 /**
271 * ltt_open - open file op for ltt files
272 * @inode: opened inode
273 * @file: opened file
274 *
275 * Open implementation. Makes sure only one open instance of a buffer is
276 * done at a given moment.
277 */
278 static int ltt_open(struct inode *inode, struct file *file)
279 {
280 struct rchan_buf *buf = inode->i_private;
281 struct ltt_channel_struct *ltt_channel =
282 (struct ltt_channel_struct *)buf->chan->private_data;
283 struct ltt_channel_buf_struct *ltt_buf =
284 percpu_ptr(ltt_channel->buf, buf->cpu);
285
286 if (!atomic_long_add_unless(&ltt_buf->active_readers, 1, 1))
287 return -EBUSY;
288 return ltt_relay_file_operations.open(inode, file);
289 }
290
291 /**
292 * ltt_release - release file op for ltt files
293 * @inode: opened inode
294 * @file: opened file
295 *
296 * Release implementation.
297 */
298 static int ltt_release(struct inode *inode, struct file *file)
299 {
300 struct rchan_buf *buf = inode->i_private;
301 struct ltt_channel_struct *ltt_channel =
302 (struct ltt_channel_struct *)buf->chan->private_data;
303 struct ltt_channel_buf_struct *ltt_buf =
304 percpu_ptr(ltt_channel->buf, buf->cpu);
305 int ret;
306
307 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
308 atomic_long_dec(&ltt_buf->active_readers);
309 ret = ltt_relay_file_operations.release(inode, file);
310 WARN_ON(ret);
311 return ret;
312 }
313
314 /**
315 * ltt_poll - file op for ltt files
316 * @filp: the file
317 * @wait: poll table
318 *
319 * Poll implementation.
320 */
321 static unsigned int ltt_poll(struct file *filp, poll_table *wait)
322 {
323 unsigned int mask = 0;
324 struct inode *inode = filp->f_dentry->d_inode;
325 struct rchan_buf *buf = inode->i_private;
326 struct ltt_channel_struct *ltt_channel =
327 (struct ltt_channel_struct *)buf->chan->private_data;
328 struct ltt_channel_buf_struct *ltt_buf =
329 percpu_ptr(ltt_channel->buf, buf->cpu);
330
331 if (filp->f_mode & FMODE_READ) {
332 poll_wait_set_exclusive(wait);
333 poll_wait(filp, &buf->read_wait, wait);
334
335 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
336 if (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
337 buf->chan)
338 - SUBBUF_TRUNC(atomic_long_read(&ltt_buf->consumed),
339 buf->chan)
340 == 0) {
341 if (buf->finalized)
342 return POLLHUP;
343 else
344 return 0;
345 } else {
346 struct rchan *rchan =
347 ltt_channel->trans_channel_data;
348 if (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
349 buf->chan)
350 - SUBBUF_TRUNC(atomic_long_read(
351 &ltt_buf->consumed),
352 buf->chan)
353 >= rchan->alloc_size)
354 return POLLPRI | POLLRDBAND;
355 else
356 return POLLIN | POLLRDNORM;
357 }
358 }
359 return mask;
360 }
361
362 static int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
363 {
364 long consumed_old, consumed_idx, commit_count, write_offset;
365 consumed_old = atomic_long_read(&ltt_buf->consumed);
366 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
367 commit_count = local_read(&ltt_buf->commit_count[consumed_idx]);
368 /*
369 * Make sure we read the commit count before reading the buffer
370 * data and the write offset. Correct consumed offset ordering
371 * wrt commit count is insured by the use of cmpxchg to update
372 * the consumed offset.
373 */
374 smp_rmb();
375 write_offset = local_read(&ltt_buf->offset);
376 /*
377 * Check that the subbuffer we are trying to consume has been
378 * already fully committed.
379 */
380 if (((commit_count - buf->chan->subbuf_size)
381 & ltt_channel->commit_count_mask)
382 - (BUFFER_TRUNC(consumed_old, buf->chan)
383 >> ltt_channel->n_subbufs_order)
384 != 0) {
385 return -EAGAIN;
386 }
387 /*
388 * Check that we are not about to read the same subbuffer in
389 * which the writer head is.
390 */
391 if ((SUBBUF_TRUNC(write_offset, buf->chan)
392 - SUBBUF_TRUNC(consumed_old, buf->chan))
393 == 0) {
394 return -EAGAIN;
395 }
396
397 *pconsumed_old = consumed_old;
398 return 0;
399 }
400
401 static int ltt_do_put_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, u32 uconsumed_old)
402 {
403 long consumed_new, consumed_old;
404
405 consumed_old = atomic_long_read(&ltt_buf->consumed);
406 consumed_old = consumed_old & (~0xFFFFFFFFL);
407 consumed_old = consumed_old | uconsumed_old;
408 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
409
410 spin_lock(&ltt_buf->full_lock);
411 if (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
412 consumed_new)
413 != consumed_old) {
414 /* We have been pushed by the writer : the last
415 * buffer read _is_ corrupted! It can also
416 * happen if this is a buffer we never got. */
417 spin_unlock(&ltt_buf->full_lock);
418 return -EIO;
419 } else {
420 /* tell the client that buffer is now unfull */
421 int index;
422 long data;
423 index = SUBBUF_INDEX(consumed_old, buf->chan);
424 data = BUFFER_OFFSET(consumed_old, buf->chan);
425 ltt_buf_unfull(buf, index, data);
426 spin_unlock(&ltt_buf->full_lock);
427 }
428 return 0;
429 }
430
431 /**
432 * ltt_ioctl - control on the debugfs file
433 *
434 * @inode: the inode
435 * @filp: the file
436 * @cmd: the command
437 * @arg: command arg
438 *
439 * This ioctl implements three commands necessary for a minimal
440 * producer/consumer implementation :
441 * RELAY_GET_SUBBUF
442 * Get the next sub buffer that can be read. It never blocks.
443 * RELAY_PUT_SUBBUF
444 * Release the currently read sub-buffer. Parameter is the last
445 * put subbuffer (returned by GET_SUBBUF).
446 * RELAY_GET_N_BUBBUFS
447 * returns the number of sub buffers in the per cpu channel.
448 * RELAY_GET_SUBBUF_SIZE
449 * returns the size of the sub buffers.
450 */
451 static int ltt_ioctl(struct inode *inode, struct file *filp,
452 unsigned int cmd, unsigned long arg)
453 {
454 struct rchan_buf *buf = inode->i_private;
455 struct ltt_channel_struct *ltt_channel =
456 (struct ltt_channel_struct *)buf->chan->private_data;
457 struct ltt_channel_buf_struct *ltt_buf =
458 percpu_ptr(ltt_channel->buf, buf->cpu);
459 u32 __user *argp = (u32 __user *)arg;
460
461 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
462 switch (cmd) {
463 case RELAY_GET_SUBBUF:
464 {
465 int ret;
466 ret = ltt_do_get_subbuf(buf, ltt_buf, &consumed_old);
467 if(ret < 0)
468 return ret;
469 return put_user((u32)consumed_old, argp);
470 }
471 case RELAY_PUT_SUBBUF:
472 {
473 int ret;
474 u32 uconsumed_old;
475 ret = get_user(uconsumed_old, argp);
476 if (ret)
477 return ret; /* will return -EFAULT */
478 return ltt_do_put_subbuf(buf, ltt_buf, uconsumed_old);
479 }
480 case RELAY_GET_N_SUBBUFS:
481 return put_user((u32)buf->chan->n_subbufs, argp);
482 break;
483 case RELAY_GET_SUBBUF_SIZE:
484 return put_user((u32)buf->chan->subbuf_size, argp);
485 break;
486 default:
487 return -ENOIOCTLCMD;
488 }
489 return 0;
490 }
491
492 #ifdef CONFIG_COMPAT
493 static long ltt_compat_ioctl(struct file *file, unsigned int cmd,
494 unsigned long arg)
495 {
496 long ret = -ENOIOCTLCMD;
497
498 lock_kernel();
499 ret = ltt_ioctl(file->f_dentry->d_inode, file, cmd, arg);
500 unlock_kernel();
501
502 return ret;
503 }
504 #endif
505
506 static void ltt_relay_pipe_buf_release(struct pipe_inode_info *pipe,
507 struct pipe_buffer *pbuf)
508 {
509 }
510
511 static struct pipe_buf_operations ltt_relay_pipe_buf_ops = {
512 .can_merge = 0,
513 .map = generic_pipe_buf_map,
514 .unmap = generic_pipe_buf_unmap,
515 .confirm = generic_pipe_buf_confirm,
516 .release = ltt_relay_pipe_buf_release,
517 .steal = generic_pipe_buf_steal,
518 .get = generic_pipe_buf_get,
519 };
520
521 static void ltt_relay_page_release(struct splice_pipe_desc *spd, unsigned int i)
522 {
523 }
524
525 /*
526 * subbuf_splice_actor - splice up to one subbuf's worth of data
527 */
528 static int subbuf_splice_actor(struct file *in,
529 loff_t *ppos,
530 struct pipe_inode_info *pipe,
531 size_t len,
532 unsigned int flags)
533 {
534 struct rchan_buf *buf = in->private_data;
535 struct ltt_channel_struct *ltt_channel =
536 (struct ltt_channel_struct *)buf->chan->private_data;
537 struct ltt_channel_buf_struct *ltt_buf =
538 percpu_ptr(ltt_channel->buf, buf->cpu);
539 unsigned int poff, subbuf_pages, nr_pages;
540 struct page *pages[PIPE_BUFFERS];
541 struct partial_page partial[PIPE_BUFFERS];
542 struct splice_pipe_desc spd = {
543 .pages = pages,
544 .nr_pages = 0,
545 .partial = partial,
546 .flags = flags,
547 .ops = &ltt_relay_pipe_buf_ops,
548 .spd_release = ltt_relay_page_release,
549 };
550 long consumed_old, consumed_idx, roffset;
551 unsigned long bytes_avail;
552
553 /*
554 * Check that a GET_SUBBUF ioctl has been done before.
555 */
556 WARN_ON(atomic_long_read(&ltt_buf->active_readers) != 1);
557 consumed_old = atomic_long_read(&ltt_buf->consumed);
558 consumed_old += *ppos;
559 consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
560
561 /*
562 * Adjust read len, if longer than what is available
563 */
564 bytes_avail = SUBBUF_TRUNC(local_read(&ltt_buf->offset), buf->chan)
565 - consumed_old;
566 WARN_ON(bytes_avail > buf->chan->alloc_size);
567 len = min_t(size_t, len, bytes_avail);
568 subbuf_pages = bytes_avail >> PAGE_SHIFT;
569 nr_pages = min_t(unsigned int, subbuf_pages, PIPE_BUFFERS);
570 roffset = consumed_old & PAGE_MASK;
571 poff = consumed_old & ~PAGE_MASK;
572 printk_dbg(KERN_DEBUG "SPLICE actor len %zu pos %zd write_pos %ld\n",
573 len, (ssize_t)*ppos, local_read(&ltt_buf->offset));
574
575 for (; spd.nr_pages < nr_pages; spd.nr_pages++) {
576 unsigned int this_len;
577 struct buf_page *page;
578
579 if (!len)
580 break;
581 printk_dbg(KERN_DEBUG "SPLICE actor loop len %zu roffset %ld\n",
582 len, roffset);
583
584 this_len = PAGE_SIZE - poff;
585 page = ltt_relay_read_get_page(buf, roffset);
586 spd.pages[spd.nr_pages] = page->page;
587 spd.partial[spd.nr_pages].offset = poff;
588 spd.partial[spd.nr_pages].len = this_len;
589
590 poff = 0;
591 roffset += PAGE_SIZE;
592 len -= this_len;
593 }
594
595 if (!spd.nr_pages)
596 return 0;
597
598 return splice_to_pipe(pipe, &spd);
599 }
600
601 static ssize_t ltt_relay_file_splice_read(struct file *in,
602 loff_t *ppos,
603 struct pipe_inode_info *pipe,
604 size_t len,
605 unsigned int flags)
606 {
607 ssize_t spliced;
608 int ret;
609
610 ret = 0;
611 spliced = 0;
612
613 printk_dbg(KERN_DEBUG "SPLICE read len %zu pos %zd\n",
614 len, (ssize_t)*ppos);
615 while (len && !spliced) {
616 ret = subbuf_splice_actor(in, ppos, pipe, len, flags);
617 printk_dbg(KERN_DEBUG "SPLICE read loop ret %d\n", ret);
618 if (ret < 0)
619 break;
620 else if (!ret) {
621 if (flags & SPLICE_F_NONBLOCK)
622 ret = -EAGAIN;
623 break;
624 }
625
626 *ppos += ret;
627 if (ret > len)
628 len = 0;
629 else
630 len -= ret;
631 spliced += ret;
632 }
633
634 if (spliced)
635 return spliced;
636
637 return ret;
638 }
639
640 static void ltt_relay_print_subbuffer_errors(
641 struct ltt_channel_struct *ltt_chan,
642 long cons_off, unsigned int cpu)
643 {
644 struct rchan *rchan = ltt_chan->trans_channel_data;
645 struct ltt_channel_buf_struct *ltt_buf =
646 percpu_ptr(ltt_chan->buf, cpu);
647 long cons_idx, commit_count, write_offset;
648
649 cons_idx = SUBBUF_INDEX(cons_off, rchan);
650 commit_count = local_read(&ltt_buf->commit_count[cons_idx]);
651 /*
652 * No need to order commit_count and write_offset reads because we
653 * execute after trace is stopped when there are no readers left.
654 */
655 write_offset = local_read(&ltt_buf->offset);
656 printk(KERN_WARNING
657 "LTT : unread channel %s offset is %ld "
658 "and cons_off : %ld (cpu %u)\n",
659 ltt_chan->channel_name, write_offset, cons_off, cpu);
660 /* Check each sub-buffer for non filled commit count */
661 if (((commit_count - rchan->subbuf_size) & ltt_chan->commit_count_mask)
662 - (BUFFER_TRUNC(cons_off, rchan) >> ltt_chan->n_subbufs_order)
663 != 0)
664 printk(KERN_ALERT
665 "LTT : %s : subbuffer %lu has non filled "
666 "commit count %lu.\n",
667 ltt_chan->channel_name, cons_idx, commit_count);
668 printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %zd\n",
669 ltt_chan->channel_name, commit_count,
670 rchan->subbuf_size);
671 }
672
673 static void ltt_relay_print_errors(struct ltt_trace_struct *trace,
674 struct ltt_channel_struct *ltt_chan, int cpu)
675 {
676 struct rchan *rchan = ltt_chan->trans_channel_data;
677 struct ltt_channel_buf_struct *ltt_buf =
678 percpu_ptr(ltt_chan->buf, cpu);
679 long cons_off;
680
681 for (cons_off = atomic_long_read(&ltt_buf->consumed);
682 (SUBBUF_TRUNC(local_read(&ltt_buf->offset),
683 rchan)
684 - cons_off) > 0;
685 cons_off = SUBBUF_ALIGN(cons_off, rchan))
686 ltt_relay_print_subbuffer_errors(ltt_chan, cons_off, cpu);
687 }
688
689 static void ltt_relay_print_buffer_errors(struct ltt_channel_struct *ltt_chan,
690 unsigned int cpu)
691 {
692 struct ltt_trace_struct *trace = ltt_chan->trace;
693 struct ltt_channel_buf_struct *ltt_buf =
694 percpu_ptr(ltt_chan->buf, cpu);
695
696 if (local_read(&ltt_buf->events_lost))
697 printk(KERN_ALERT
698 "LTT : %s : %ld events lost "
699 "in %s channel (cpu %u).\n",
700 ltt_chan->channel_name,
701 local_read(&ltt_buf->events_lost),
702 ltt_chan->channel_name, cpu);
703 if (local_read(&ltt_buf->corrupted_subbuffers))
704 printk(KERN_ALERT
705 "LTT : %s : %ld corrupted subbuffers "
706 "in %s channel (cpu %u).\n",
707 ltt_chan->channel_name,
708 local_read(&ltt_buf->corrupted_subbuffers),
709 ltt_chan->channel_name, cpu);
710
711 ltt_relay_print_errors(trace, ltt_chan, cpu);
712 }
713
714 static void ltt_relay_remove_dirs(struct ltt_trace_struct *trace)
715 {
716 debugfs_remove(trace->dentry.trace_root);
717 }
718
719 static void ltt_relay_release_channel(struct kref *kref)
720 {
721 struct ltt_channel_struct *ltt_chan = container_of(kref,
722 struct ltt_channel_struct, kref);
723 percpu_free(ltt_chan->buf);
724 }
725
726 /*
727 * Create ltt buffer.
728 */
729 static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
730 struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
731 unsigned int cpu, unsigned int n_subbufs)
732 {
733 struct ltt_channel_buf_struct *ltt_buf =
734 percpu_ptr(ltt_chan->buf, cpu);
735 unsigned int j;
736
737 ltt_buf->commit_count =
738 kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
739 GFP_KERNEL, cpu_to_node(cpu));
740 if (!ltt_buf->commit_count)
741 return -ENOMEM;
742 kref_get(&trace->kref);
743 kref_get(&trace->ltt_transport_kref);
744 kref_get(&ltt_chan->kref);
745 local_set(&ltt_buf->offset, ltt_subbuffer_header_size());
746 atomic_long_set(&ltt_buf->consumed, 0);
747 atomic_long_set(&ltt_buf->active_readers, 0);
748 for (j = 0; j < n_subbufs; j++)
749 local_set(&ltt_buf->commit_count[j], 0);
750 init_waitqueue_head(&ltt_buf->write_wait);
751 atomic_set(&ltt_buf->wakeup_readers, 0);
752 spin_lock_init(&ltt_buf->full_lock);
753
754 ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
755 /* atomic_add made on local variable on data that belongs to
756 * various CPUs : ok because tracing not started (for this cpu). */
757 local_add(ltt_subbuffer_header_size(), &ltt_buf->commit_count[0]);
758
759 local_set(&ltt_buf->events_lost, 0);
760 local_set(&ltt_buf->corrupted_subbuffers, 0);
761
762 return 0;
763 }
764
765 static void ltt_relay_destroy_buffer(struct ltt_channel_struct *ltt_chan,
766 unsigned int cpu)
767 {
768 struct ltt_trace_struct *trace = ltt_chan->trace;
769 struct ltt_channel_buf_struct *ltt_buf =
770 percpu_ptr(ltt_chan->buf, cpu);
771
772 kref_put(&ltt_chan->trace->ltt_transport_kref,
773 ltt_release_transport);
774 ltt_relay_print_buffer_errors(ltt_chan, cpu);
775 kfree(ltt_buf->commit_count);
776 ltt_buf->commit_count = NULL;
777 kref_put(&ltt_chan->kref, ltt_relay_release_channel);
778 kref_put(&trace->kref, ltt_release_trace);
779 wake_up_interruptible(&trace->kref_wq);
780 }
781
782 /*
783 * Create channel.
784 */
785 static int ltt_relay_create_channel(const char *trace_name,
786 struct ltt_trace_struct *trace, struct dentry *dir,
787 const char *channel_name, struct ltt_channel_struct *ltt_chan,
788 unsigned int subbuf_size, unsigned int n_subbufs,
789 int overwrite)
790 {
791 char *tmpname;
792 unsigned int tmpname_len;
793 int err = 0;
794
795 tmpname = kmalloc(PATH_MAX, GFP_KERNEL);
796 if (!tmpname)
797 return EPERM;
798 if (overwrite) {
799 strncpy(tmpname, LTT_FLIGHT_PREFIX, PATH_MAX-1);
800 strncat(tmpname, channel_name,
801 PATH_MAX-1-sizeof(LTT_FLIGHT_PREFIX));
802 } else {
803 strncpy(tmpname, channel_name, PATH_MAX-1);
804 }
805 strncat(tmpname, "_", PATH_MAX-1-strlen(tmpname));
806
807 kref_init(&ltt_chan->kref);
808
809 ltt_chan->trace = trace;
810 ltt_chan->buffer_begin = ltt_buffer_begin_callback;
811 ltt_chan->buffer_end = ltt_buffer_end_callback;
812 ltt_chan->overwrite = overwrite;
813 ltt_chan->n_subbufs_order = get_count_order(n_subbufs);
814 ltt_chan->commit_count_mask = (~0UL >> ltt_chan->n_subbufs_order);
815 ltt_chan->buf = percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct),
816 GFP_KERNEL, cpu_possible_map);
817 if (!ltt_chan->buf)
818 goto ltt_percpu_alloc_error;
819 ltt_chan->trans_channel_data = ltt_relay_open(tmpname,
820 dir,
821 subbuf_size,
822 n_subbufs,
823 &trace->callbacks,
824 ltt_chan);
825 tmpname_len = strlen(tmpname);
826 if (tmpname_len > 0) {
827 /* Remove final _ for pretty printing */
828 tmpname[tmpname_len-1] = '\0';
829 }
830 if (ltt_chan->trans_channel_data == NULL) {
831 printk(KERN_ERR "LTT : Can't open %s channel for trace %s\n",
832 tmpname, trace_name);
833 goto relay_open_error;
834 }
835
836 err = 0;
837 goto end;
838
839 relay_open_error:
840 percpu_free(ltt_chan->buf);
841 ltt_percpu_alloc_error:
842 err = EPERM;
843 end:
844 kfree(tmpname);
845 return err;
846 }
847
848 static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
849 {
850 new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
851 get_ltt_root());
852 if (new_trace->dentry.trace_root == NULL) {
853 printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
854 new_trace->trace_name);
855 return EEXIST;
856 }
857
858 new_trace->callbacks.create_buf_file = ltt_create_buf_file_callback;
859 new_trace->callbacks.remove_buf_file = ltt_remove_buf_file_callback;
860
861 return 0;
862 }
863
864 /*
865 * LTTng channel flush function.
866 *
867 * Must be called when no tracing is active in the channel, because of
868 * accesses across CPUs.
869 */
870 static notrace void ltt_relay_buffer_flush(struct rchan_buf *buf)
871 {
872 buf->finalized = 1;
873 ltt_force_switch(buf, FORCE_FLUSH);
874 }
875
876 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct *ltt_channel)
877 {
878 unsigned int i;
879 struct rchan *rchan = ltt_channel->trans_channel_data;
880
881 for_each_possible_cpu(i) {
882 struct ltt_channel_buf_struct *ltt_buf =
883 percpu_ptr(ltt_channel->buf, i);
884
885 if (atomic_read(&ltt_buf->wakeup_readers) == 1) {
886 atomic_set(&ltt_buf->wakeup_readers, 0);
887 wake_up_interruptible(&rchan->buf[i]->read_wait);
888 }
889 }
890 }
891
892 static void ltt_relay_finish_buffer(struct ltt_channel_struct *ltt_channel,
893 unsigned int cpu)
894 {
895 struct rchan *rchan = ltt_channel->trans_channel_data;
896
897 if (rchan->buf[cpu]) {
898 struct ltt_channel_buf_struct *ltt_buf =
899 percpu_ptr(ltt_channel->buf, cpu);
900 ltt_relay_buffer_flush(rchan->buf[cpu]);
901 ltt_relay_wake_writers(ltt_buf);
902 }
903 }
904
905
906 static void ltt_relay_finish_channel(struct ltt_channel_struct *ltt_channel)
907 {
908 unsigned int i;
909
910 for_each_possible_cpu(i)
911 ltt_relay_finish_buffer(ltt_channel, i);
912 }
913
914 static void ltt_relay_remove_channel(struct ltt_channel_struct *channel)
915 {
916 struct rchan *rchan = channel->trans_channel_data;
917
918 ltt_relay_close(rchan);
919 kref_put(&channel->kref, ltt_relay_release_channel);
920 }
921
922 struct ltt_reserve_switch_offsets {
923 long begin, end, old;
924 long begin_switch, end_switch_current, end_switch_old;
925 long commit_count, reserve_commit_diff;
926 size_t before_hdr_pad, size;
927 };
928
929 /*
930 * Returns :
931 * 0 if ok
932 * !0 if execution must be aborted.
933 */
934 static inline int ltt_relay_try_reserve(
935 struct ltt_channel_struct *ltt_channel,
936 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
937 struct rchan_buf *buf,
938 struct ltt_reserve_switch_offsets *offsets, size_t data_size,
939 u64 *tsc, unsigned int *rflags, int largest_align)
940 {
941 offsets->begin = local_read(&ltt_buf->offset);
942 offsets->old = offsets->begin;
943 offsets->begin_switch = 0;
944 offsets->end_switch_current = 0;
945 offsets->end_switch_old = 0;
946
947 *tsc = trace_clock_read64();
948 if (last_tsc_overflow(ltt_buf, *tsc))
949 *rflags = LTT_RFLAG_ID_SIZE_TSC;
950
951 if (SUBBUF_OFFSET(offsets->begin, buf->chan) == 0) {
952 offsets->begin_switch = 1; /* For offsets->begin */
953 } else {
954 offsets->size = ltt_get_header_size(ltt_channel,
955 offsets->begin, data_size,
956 &offsets->before_hdr_pad, *rflags);
957 offsets->size += ltt_align(offsets->begin + offsets->size,
958 largest_align)
959 + data_size;
960 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
961 > buf->chan->subbuf_size) {
962 offsets->end_switch_old = 1; /* For offsets->old */
963 offsets->begin_switch = 1; /* For offsets->begin */
964 }
965 }
966 if (offsets->begin_switch) {
967 long subbuf_index;
968
969 if (offsets->end_switch_old)
970 offsets->begin = SUBBUF_ALIGN(offsets->begin,
971 buf->chan);
972 offsets->begin = offsets->begin + ltt_subbuffer_header_size();
973 /* Test new buffer integrity */
974 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
975 offsets->reserve_commit_diff =
976 (BUFFER_TRUNC(offsets->begin, buf->chan)
977 >> ltt_channel->n_subbufs_order)
978 - (local_read(&ltt_buf->commit_count[subbuf_index])
979 & ltt_channel->commit_count_mask);
980 if (offsets->reserve_commit_diff == 0) {
981 /* Next buffer not corrupted. */
982 if (!ltt_channel->overwrite &&
983 (SUBBUF_TRUNC(offsets->begin, buf->chan)
984 - SUBBUF_TRUNC(atomic_long_read(
985 &ltt_buf->consumed),
986 buf->chan))
987 >= rchan->alloc_size) {
988 /*
989 * We do not overwrite non consumed buffers
990 * and we are full : event is lost.
991 */
992 local_inc(&ltt_buf->events_lost);
993 return -1;
994 } else {
995 /*
996 * next buffer not corrupted, we are either in
997 * overwrite mode or the buffer is not full.
998 * It's safe to write in this new subbuffer.
999 */
1000 }
1001 } else {
1002 /*
1003 * Next subbuffer corrupted. Force pushing reader even
1004 * in normal mode. It's safe to write in this new
1005 * subbuffer.
1006 */
1007 }
1008 offsets->size = ltt_get_header_size(ltt_channel,
1009 offsets->begin, data_size,
1010 &offsets->before_hdr_pad, *rflags);
1011 offsets->size += ltt_align(offsets->begin + offsets->size,
1012 largest_align)
1013 + data_size;
1014 if ((SUBBUF_OFFSET(offsets->begin, buf->chan) + offsets->size)
1015 > buf->chan->subbuf_size) {
1016 /*
1017 * Event too big for subbuffers, report error, don't
1018 * complete the sub-buffer switch.
1019 */
1020 local_inc(&ltt_buf->events_lost);
1021 return -1;
1022 } else {
1023 /*
1024 * We just made a successful buffer switch and the event
1025 * fits in the new subbuffer. Let's write.
1026 */
1027 }
1028 } else {
1029 /*
1030 * Event fits in the current buffer and we are not on a switch
1031 * boundary. It's safe to write.
1032 */
1033 }
1034 offsets->end = offsets->begin + offsets->size;
1035
1036 if ((SUBBUF_OFFSET(offsets->end, buf->chan)) == 0) {
1037 /*
1038 * The offset_end will fall at the very beginning of the next
1039 * subbuffer.
1040 */
1041 offsets->end_switch_current = 1; /* For offsets->begin */
1042 }
1043 return 0;
1044 }
1045
1046 /*
1047 * Returns :
1048 * 0 if ok
1049 * !0 if execution must be aborted.
1050 */
1051 static inline int ltt_relay_try_switch(
1052 enum force_switch_mode mode,
1053 struct ltt_channel_struct *ltt_channel,
1054 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1055 struct rchan_buf *buf,
1056 struct ltt_reserve_switch_offsets *offsets,
1057 u64 *tsc)
1058 {
1059 long subbuf_index;
1060
1061 offsets->begin = local_read(&ltt_buf->offset);
1062 offsets->old = offsets->begin;
1063 offsets->begin_switch = 0;
1064 offsets->end_switch_old = 0;
1065
1066 *tsc = trace_clock_read64();
1067
1068 if (SUBBUF_OFFSET(offsets->begin, buf->chan) != 0) {
1069 offsets->begin = SUBBUF_ALIGN(offsets->begin, buf->chan);
1070 offsets->end_switch_old = 1;
1071 } else {
1072 /* we do not have to switch : buffer is empty */
1073 return -1;
1074 }
1075 if (mode == FORCE_ACTIVE)
1076 offsets->begin += ltt_subbuffer_header_size();
1077 /*
1078 * Always begin_switch in FORCE_ACTIVE mode.
1079 * Test new buffer integrity
1080 */
1081 subbuf_index = SUBBUF_INDEX(offsets->begin, buf->chan);
1082 offsets->reserve_commit_diff =
1083 (BUFFER_TRUNC(offsets->begin, buf->chan)
1084 >> ltt_channel->n_subbufs_order)
1085 - (local_read(&ltt_buf->commit_count[subbuf_index])
1086 & ltt_channel->commit_count_mask);
1087 if (offsets->reserve_commit_diff == 0) {
1088 /* Next buffer not corrupted. */
1089 if (mode == FORCE_ACTIVE
1090 && !ltt_channel->overwrite
1091 && offsets->begin - atomic_long_read(&ltt_buf->consumed)
1092 >= rchan->alloc_size) {
1093 /*
1094 * We do not overwrite non consumed buffers and we are
1095 * full : ignore switch while tracing is active.
1096 */
1097 return -1;
1098 }
1099 } else {
1100 /*
1101 * Next subbuffer corrupted. Force pushing reader even in normal
1102 * mode
1103 */
1104 }
1105 offsets->end = offsets->begin;
1106 return 0;
1107 }
1108
1109 static inline void ltt_reserve_push_reader(
1110 struct ltt_channel_struct *ltt_channel,
1111 struct ltt_channel_buf_struct *ltt_buf,
1112 struct rchan *rchan,
1113 struct rchan_buf *buf,
1114 struct ltt_reserve_switch_offsets *offsets)
1115 {
1116 long consumed_old, consumed_new;
1117
1118 do {
1119 consumed_old = atomic_long_read(&ltt_buf->consumed);
1120 /*
1121 * If buffer is in overwrite mode, push the reader consumed
1122 * count if the write position has reached it and we are not
1123 * at the first iteration (don't push the reader farther than
1124 * the writer). This operation can be done concurrently by many
1125 * writers in the same buffer, the writer being at the farthest
1126 * write position sub-buffer index in the buffer being the one
1127 * which will win this loop.
1128 * If the buffer is not in overwrite mode, pushing the reader
1129 * only happens if a sub-buffer is corrupted.
1130 */
1131 if ((SUBBUF_TRUNC(offsets->end-1, buf->chan)
1132 - SUBBUF_TRUNC(consumed_old, buf->chan))
1133 >= rchan->alloc_size)
1134 consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
1135 else {
1136 consumed_new = consumed_old;
1137 break;
1138 }
1139 } while (atomic_long_cmpxchg(&ltt_buf->consumed, consumed_old,
1140 consumed_new) != consumed_old);
1141
1142 if (consumed_old != consumed_new) {
1143 /*
1144 * Reader pushed : we are the winner of the push, we can
1145 * therefore reequilibrate reserve and commit. Atomic increment
1146 * of the commit count permits other writers to play around
1147 * with this variable before us. We keep track of
1148 * corrupted_subbuffers even in overwrite mode :
1149 * we never want to write over a non completely committed
1150 * sub-buffer : possible causes : the buffer size is too low
1151 * compared to the unordered data input, or there is a writer
1152 * that died between the reserve and the commit.
1153 */
1154 if (offsets->reserve_commit_diff) {
1155 /*
1156 * We have to alter the sub-buffer commit count.
1157 * We do not deliver the previous subbuffer, given it
1158 * was either corrupted or not consumed (overwrite
1159 * mode).
1160 */
1161 local_add(offsets->reserve_commit_diff,
1162 &ltt_buf->commit_count[
1163 SUBBUF_INDEX(offsets->begin,
1164 buf->chan)]);
1165 if (!ltt_channel->overwrite
1166 || offsets->reserve_commit_diff
1167 != rchan->subbuf_size) {
1168 /*
1169 * The reserve commit diff was not subbuf_size :
1170 * it means the subbuffer was partly written to
1171 * and is therefore corrupted. If it is multiple
1172 * of subbuffer size and we are in flight
1173 * recorder mode, we are skipping over a whole
1174 * subbuffer.
1175 */
1176 local_inc(&ltt_buf->corrupted_subbuffers);
1177 }
1178 }
1179 }
1180 }
1181
1182
1183 /*
1184 * ltt_reserve_switch_old_subbuf: switch old subbuffer
1185 *
1186 * Concurrency safe because we are the last and only thread to alter this
1187 * sub-buffer. As long as it is not delivered and read, no other thread can
1188 * alter the offset, alter the reserve_count or call the
1189 * client_buffer_end_callback on this sub-buffer.
1190 *
1191 * The only remaining threads could be the ones with pending commits. They will
1192 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1193 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1194 * corrupted sub-buffers count and push the readers across these sub-buffers.
1195 *
1196 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1197 * switches in, finding out it's corrupted. The result will be than the old
1198 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1199 * will be declared corrupted too because of the commit count adjustment.
1200 *
1201 * Note : offset_old should never be 0 here.
1202 */
1203 static inline void ltt_reserve_switch_old_subbuf(
1204 struct ltt_channel_struct *ltt_channel,
1205 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1206 struct rchan_buf *buf,
1207 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1208 {
1209 long oldidx = SUBBUF_INDEX(offsets->old - 1, rchan);
1210
1211 ltt_channel->buffer_end(buf, *tsc, offsets->old, oldidx);
1212 /* Must write buffer end before incrementing commit count */
1213 smp_wmb();
1214 offsets->commit_count =
1215 local_add_return(rchan->subbuf_size
1216 - (SUBBUF_OFFSET(offsets->old - 1, rchan)
1217 + 1),
1218 &ltt_buf->commit_count[oldidx]);
1219 if ((BUFFER_TRUNC(offsets->old - 1, rchan)
1220 >> ltt_channel->n_subbufs_order)
1221 - ((offsets->commit_count - rchan->subbuf_size)
1222 & ltt_channel->commit_count_mask) == 0)
1223 ltt_deliver(buf, oldidx, NULL);
1224 }
1225
1226 /*
1227 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
1228 *
1229 * This code can be executed unordered : writers may already have written to the
1230 * sub-buffer before this code gets executed, caution. The commit makes sure
1231 * that this code is executed before the deliver of this sub-buffer.
1232 */
1233 static inline void ltt_reserve_switch_new_subbuf(
1234 struct ltt_channel_struct *ltt_channel,
1235 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1236 struct rchan_buf *buf,
1237 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1238 {
1239 long beginidx = SUBBUF_INDEX(offsets->begin, rchan);
1240
1241 ltt_channel->buffer_begin(buf, *tsc, beginidx);
1242 /* Must write buffer end before incrementing commit count */
1243 smp_wmb();
1244 offsets->commit_count = local_add_return(ltt_subbuffer_header_size(),
1245 &ltt_buf->commit_count[beginidx]);
1246 /* Check if the written buffer has to be delivered */
1247 if ((BUFFER_TRUNC(offsets->begin, rchan)
1248 >> ltt_channel->n_subbufs_order)
1249 - ((offsets->commit_count - rchan->subbuf_size)
1250 & ltt_channel->commit_count_mask) == 0)
1251 ltt_deliver(buf, beginidx, NULL);
1252 }
1253
1254
1255 /*
1256 * ltt_reserve_end_switch_current: finish switching current subbuffer
1257 *
1258 * Concurrency safe because we are the last and only thread to alter this
1259 * sub-buffer. As long as it is not delivered and read, no other thread can
1260 * alter the offset, alter the reserve_count or call the
1261 * client_buffer_end_callback on this sub-buffer.
1262 *
1263 * The only remaining threads could be the ones with pending commits. They will
1264 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1265 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1266 * corrupted sub-buffers count and push the readers across these sub-buffers.
1267 *
1268 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1269 * switches in, finding out it's corrupted. The result will be than the old
1270 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1271 * will be declared corrupted too because of the commit count adjustment.
1272 */
1273 static inline void ltt_reserve_end_switch_current(
1274 struct ltt_channel_struct *ltt_channel,
1275 struct ltt_channel_buf_struct *ltt_buf, struct rchan *rchan,
1276 struct rchan_buf *buf,
1277 struct ltt_reserve_switch_offsets *offsets, u64 *tsc)
1278 {
1279 long endidx = SUBBUF_INDEX(offsets->end - 1, rchan);
1280
1281 ltt_channel->buffer_end(buf, *tsc, offsets->end, endidx);
1282 /* Must write buffer begin before incrementing commit count */
1283 smp_wmb();
1284 offsets->commit_count =
1285 local_add_return(rchan->subbuf_size
1286 - (SUBBUF_OFFSET(offsets->end - 1, rchan)
1287 + 1),
1288 &ltt_buf->commit_count[endidx]);
1289 if ((BUFFER_TRUNC(offsets->end - 1, rchan)
1290 >> ltt_channel->n_subbufs_order)
1291 - ((offsets->commit_count - rchan->subbuf_size)
1292 & ltt_channel->commit_count_mask) == 0)
1293 ltt_deliver(buf, endidx, NULL);
1294 }
1295
1296 /**
1297 * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
1298 * @trace: the trace structure to log to.
1299 * @ltt_channel: channel structure
1300 * @transport_data: data structure specific to ltt relay
1301 * @data_size: size of the variable length data to log.
1302 * @slot_size: pointer to total size of the slot (out)
1303 * @buf_offset : pointer to reserved buffer offset (out)
1304 * @tsc: pointer to the tsc at the slot reservation (out)
1305 * @cpu: cpuid
1306 *
1307 * Return : -ENOSPC if not enough space, else returns 0.
1308 * It will take care of sub-buffer switching.
1309 */
1310 static notrace int ltt_relay_reserve_slot(struct ltt_trace_struct *trace,
1311 struct ltt_channel_struct *ltt_channel, void **transport_data,
1312 size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
1313 unsigned int *rflags, int largest_align, int cpu)
1314 {
1315 struct rchan *rchan = ltt_channel->trans_channel_data;
1316 struct rchan_buf *buf = *transport_data =
1317 rchan->buf[cpu];
1318 struct ltt_channel_buf_struct *ltt_buf =
1319 percpu_ptr(ltt_channel->buf, buf->cpu);
1320 struct ltt_reserve_switch_offsets offsets;
1321
1322 offsets.reserve_commit_diff = 0;
1323 offsets.size = 0;
1324
1325 /*
1326 * Perform retryable operations.
1327 */
1328 if (__get_cpu_var(ltt_nesting) > 4) {
1329 local_inc(&ltt_buf->events_lost);
1330 return -EPERM;
1331 }
1332 do {
1333 if (ltt_relay_try_reserve(ltt_channel, ltt_buf,
1334 rchan, buf, &offsets, data_size, tsc, rflags,
1335 largest_align))
1336 return -ENOSPC;
1337 } while (local_cmpxchg(&ltt_buf->offset, offsets.old,
1338 offsets.end) != offsets.old);
1339
1340 /*
1341 * Atomically update last_tsc. This update races against concurrent
1342 * atomic updates, but the race will always cause supplementary full TSC
1343 * events, never the opposite (missing a full TSC event when it would be
1344 * needed).
1345 */
1346 save_last_tsc(ltt_buf, *tsc);
1347
1348 /*
1349 * Push the reader if necessary
1350 */
1351 ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan, buf, &offsets);
1352
1353 /*
1354 * Switch old subbuffer if needed.
1355 */
1356 if (offsets.end_switch_old)
1357 ltt_reserve_switch_old_subbuf(ltt_channel, ltt_buf, rchan, buf,
1358 &offsets, tsc);
1359
1360 /*
1361 * Populate new subbuffer.
1362 */
1363 if (offsets.begin_switch)
1364 ltt_reserve_switch_new_subbuf(ltt_channel, ltt_buf, rchan,
1365 buf, &offsets, tsc);
1366
1367 if (offsets.end_switch_current)
1368 ltt_reserve_end_switch_current(ltt_channel, ltt_buf, rchan,
1369 buf, &offsets, tsc);
1370
1371 *slot_size = offsets.size;
1372 *buf_offset = offsets.begin + offsets.before_hdr_pad;
1373 return 0;
1374 }
1375
1376 /*
1377 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1378 * completely reentrant : can be called while tracing is active with
1379 * absolutely no lock held.
1380 *
1381 * Note, however, that as a local_cmpxchg is used for some atomic
1382 * operations, this function must be called from the CPU which owns the buffer
1383 * for a ACTIVE flush.
1384 */
1385 static notrace void ltt_force_switch(struct rchan_buf *buf,
1386 enum force_switch_mode mode)
1387 {
1388 struct ltt_channel_struct *ltt_channel =
1389 (struct ltt_channel_struct *)buf->chan->private_data;
1390 struct ltt_channel_buf_struct *ltt_buf =
1391 percpu_ptr(ltt_channel->buf, buf->cpu);
1392 struct rchan *rchan = ltt_channel->trans_channel_data;
1393 struct ltt_reserve_switch_offsets offsets;
1394 u64 tsc;
1395
1396 offsets.reserve_commit_diff = 0;
1397 offsets.size = 0;
1398
1399 /*
1400 * Perform retryable operations.
1401 */
1402 do {
1403 if (ltt_relay_try_switch(mode, ltt_channel, ltt_buf,
1404 rchan, buf, &offsets, &tsc))
1405 return;
1406 } while (local_cmpxchg(&ltt_buf->offset, offsets.old,
1407 offsets.end) != offsets.old);
1408
1409 /*
1410 * Atomically update last_tsc. This update races against concurrent
1411 * atomic updates, but the race will always cause supplementary full TSC
1412 * events, never the opposite (missing a full TSC event when it would be
1413 * needed).
1414 */
1415 save_last_tsc(ltt_buf, tsc);
1416
1417 /*
1418 * Push the reader if necessary
1419 */
1420 if (mode == FORCE_ACTIVE)
1421 ltt_reserve_push_reader(ltt_channel, ltt_buf, rchan,
1422 buf, &offsets);
1423
1424 /*
1425 * Switch old subbuffer if needed.
1426 */
1427 if (offsets.end_switch_old)
1428 ltt_reserve_switch_old_subbuf(ltt_channel, ltt_buf, rchan, buf,
1429 &offsets, &tsc);
1430
1431 /*
1432 * Populate new subbuffer.
1433 */
1434 if (mode == FORCE_ACTIVE)
1435 ltt_reserve_switch_new_subbuf(ltt_channel,
1436 ltt_buf, rchan, buf, &offsets, &tsc);
1437 }
1438
1439 /*
1440 * for flight recording. must be called after relay_commit.
1441 * This function decrements de subbuffer's lost_size each time the commit count
1442 * reaches back the reserve offset (module subbuffer size). It is useful for
1443 * crash dump.
1444 * We use slot_size - 1 to make sure we deal correctly with the case where we
1445 * fill the subbuffer completely (so the subbuf index stays in the previous
1446 * subbuffer).
1447 */
1448 #ifdef CONFIG_LTT_VMCORE
1449 static inline void ltt_write_commit_counter(struct rchan_buf *buf,
1450 long buf_offset, size_t slot_size)
1451 {
1452 struct ltt_channel_struct *ltt_channel =
1453 (struct ltt_channel_struct *)buf->chan->private_data;
1454 struct ltt_channel_buf_struct *ltt_buf =
1455 percpu_ptr(ltt_channel->buf, buf->cpu);
1456 struct ltt_subbuffer_header *header;
1457 long offset, subbuf_idx, commit_count;
1458 uint32_t lost_old, lost_new;
1459
1460 subbuf_idx = SUBBUF_INDEX(buf_offset - 1, buf->chan);
1461 offset = buf_offset + slot_size;
1462 header = (struct ltt_subbuffer_header *)
1463 ltt_relay_offset_address(buf,
1464 subbuf_idx * buf->chan->subbuf_size);
1465 for (;;) {
1466 lost_old = header->lost_size;
1467 commit_count =
1468 local_read(&ltt_buf->commit_count[subbuf_idx]);
1469 /* SUBBUF_OFFSET includes commit_count_mask */
1470 if (!SUBBUF_OFFSET(offset - commit_count, buf->chan)) {
1471 lost_new = (uint32_t)buf->chan->subbuf_size
1472 - SUBBUF_OFFSET(commit_count, buf->chan);
1473 lost_old = cmpxchg_local(&header->lost_size, lost_old,
1474 lost_new);
1475 if (lost_old <= lost_new)
1476 break;
1477 } else {
1478 break;
1479 }
1480 }
1481 }
1482 #else
1483 static inline void ltt_write_commit_counter(struct rchan_buf *buf,
1484 long buf_offset, size_t slot_size)
1485 {
1486 }
1487 #endif
1488
1489 /*
1490 * Atomic unordered slot commit. Increments the commit count in the
1491 * specified sub-buffer, and delivers it if necessary.
1492 *
1493 * Parameters:
1494 *
1495 * @ltt_channel : channel structure
1496 * @transport_data: transport-specific data
1497 * @buf_offset : offset following the event header.
1498 * @slot_size : size of the reserved slot.
1499 */
1500 static notrace void ltt_relay_commit_slot(
1501 struct ltt_channel_struct *ltt_channel,
1502 void **transport_data, long buf_offset, size_t slot_size)
1503 {
1504 struct rchan_buf *buf = *transport_data;
1505 struct ltt_channel_buf_struct *ltt_buf =
1506 percpu_ptr(ltt_channel->buf, buf->cpu);
1507 struct rchan *rchan = buf->chan;
1508 long offset_end = buf_offset;
1509 long endidx = SUBBUF_INDEX(offset_end - 1, rchan);
1510 long commit_count;
1511
1512 /* Must write slot data before incrementing commit count */
1513 smp_wmb();
1514 commit_count = local_add_return(slot_size,
1515 &ltt_buf->commit_count[endidx]);
1516 /* Check if all commits have been done */
1517 if ((BUFFER_TRUNC(offset_end - 1, rchan)
1518 >> ltt_channel->n_subbufs_order)
1519 - ((commit_count - rchan->subbuf_size)
1520 & ltt_channel->commit_count_mask) == 0)
1521 ltt_deliver(buf, endidx, NULL);
1522 /*
1523 * Update lost_size for each commit. It's needed only for extracting
1524 * ltt buffers from vmcore, after crash.
1525 */
1526 ltt_write_commit_counter(buf, buf_offset, slot_size);
1527 }
1528
1529 /*
1530 * This is called with preemption disabled when user space has requested
1531 * blocking mode. If one of the active traces has free space below a
1532 * specific threshold value, we reenable preemption and block.
1533 */
1534 static int ltt_relay_user_blocking(struct ltt_trace_struct *trace,
1535 unsigned int chan_index, size_t data_size,
1536 struct user_dbg_data *dbg)
1537 {
1538 struct rchan *rchan;
1539 struct ltt_channel_buf_struct *ltt_buf;
1540 struct ltt_channel_struct *channel;
1541 struct rchan_buf *relay_buf;
1542 int cpu;
1543 DECLARE_WAITQUEUE(wait, current);
1544
1545 channel = &trace->channels[chan_index];
1546 rchan = channel->trans_channel_data;
1547 cpu = smp_processor_id();
1548 relay_buf = rchan->buf[cpu];
1549 ltt_buf = percpu_ptr(channel->buf, cpu);
1550
1551 /*
1552 * Check if data is too big for the channel : do not
1553 * block for it.
1554 */
1555 if (LTT_RESERVE_CRITICAL + data_size > relay_buf->chan->subbuf_size)
1556 return 0;
1557
1558 /*
1559 * If free space too low, we block. We restart from the
1560 * beginning after we resume (cpu id may have changed
1561 * while preemption is active).
1562 */
1563 spin_lock(&ltt_buf->full_lock);
1564 if (!channel->overwrite) {
1565 dbg->write = local_read(&ltt_buf->offset);
1566 dbg->read = atomic_long_read(&ltt_buf->consumed);
1567 dbg->avail_size = dbg->write + LTT_RESERVE_CRITICAL + data_size
1568 - SUBBUF_TRUNC(dbg->read,
1569 relay_buf->chan);
1570 if (dbg->avail_size > rchan->alloc_size) {
1571 __set_current_state(TASK_INTERRUPTIBLE);
1572 add_wait_queue(&ltt_buf->write_wait, &wait);
1573 spin_unlock(&ltt_buf->full_lock);
1574 preempt_enable();
1575 schedule();
1576 __set_current_state(TASK_RUNNING);
1577 remove_wait_queue(&ltt_buf->write_wait, &wait);
1578 if (signal_pending(current))
1579 return -ERESTARTSYS;
1580 preempt_disable();
1581 return 1;
1582 }
1583 }
1584 spin_unlock(&ltt_buf->full_lock);
1585 return 0;
1586 }
1587
1588 static void ltt_relay_print_user_errors(struct ltt_trace_struct *trace,
1589 unsigned int chan_index, size_t data_size,
1590 struct user_dbg_data *dbg, int cpu)
1591 {
1592 struct rchan *rchan;
1593 struct ltt_channel_buf_struct *ltt_buf;
1594 struct ltt_channel_struct *channel;
1595 struct rchan_buf *relay_buf;
1596
1597 channel = &trace->channels[chan_index];
1598 rchan = channel->trans_channel_data;
1599 relay_buf = rchan->buf[cpu];
1600 ltt_buf = percpu_ptr(channel->buf, cpu);
1601
1602 printk(KERN_ERR "Error in LTT usertrace : "
1603 "buffer full : event lost in blocking "
1604 "mode. Increase LTT_RESERVE_CRITICAL.\n");
1605 printk(KERN_ERR "LTT nesting level is %u.\n",
1606 per_cpu(ltt_nesting, cpu));
1607 printk(KERN_ERR "LTT avail size %lu.\n",
1608 dbg->avail_size);
1609 printk(KERN_ERR "avai write : %lu, read : %lu\n",
1610 dbg->write, dbg->read);
1611
1612 dbg->write = local_read(&ltt_buf->offset);
1613 dbg->read = atomic_long_read(&ltt_buf->consumed);
1614
1615 printk(KERN_ERR "LTT cur size %lu.\n",
1616 dbg->write + LTT_RESERVE_CRITICAL + data_size
1617 - SUBBUF_TRUNC(dbg->read, relay_buf->chan));
1618 printk(KERN_ERR "cur write : %lu, read : %lu\n",
1619 dbg->write, dbg->read);
1620 }
1621
1622 static struct ltt_transport ltt_relay_transport = {
1623 .name = "relay",
1624 .owner = THIS_MODULE,
1625 .ops = {
1626 .create_dirs = ltt_relay_create_dirs,
1627 .remove_dirs = ltt_relay_remove_dirs,
1628 .create_channel = ltt_relay_create_channel,
1629 .finish_channel = ltt_relay_finish_channel,
1630 .remove_channel = ltt_relay_remove_channel,
1631 .wakeup_channel = ltt_relay_async_wakeup_chan,
1632 .commit_slot = ltt_relay_commit_slot,
1633 .reserve_slot = ltt_relay_reserve_slot,
1634 .user_blocking = ltt_relay_user_blocking,
1635 .user_errors = ltt_relay_print_user_errors,
1636 },
1637 };
1638
1639 static int __init ltt_relay_init(void)
1640 {
1641 printk(KERN_INFO "LTT : ltt-relay init\n");
1642
1643 ltt_file_operations = ltt_relay_file_operations;
1644 ltt_file_operations.owner = THIS_MODULE;
1645 ltt_file_operations.open = ltt_open;
1646 ltt_file_operations.release = ltt_release;
1647 ltt_file_operations.poll = ltt_poll;
1648 ltt_file_operations.splice_read = ltt_relay_file_splice_read,
1649 ltt_file_operations.ioctl = ltt_ioctl;
1650 #ifdef CONFIG_COMPAT
1651 ltt_file_operations.compat_ioctl = ltt_compat_ioctl;
1652 #endif
1653
1654 ltt_transport_register(&ltt_relay_transport);
1655
1656 return 0;
1657 }
1658
1659 static void __exit ltt_relay_exit(void)
1660 {
1661 printk(KERN_INFO "LTT : ltt-relay exit\n");
1662
1663 ltt_transport_unregister(&ltt_relay_transport);
1664 }
1665
1666 module_init(ltt_relay_init);
1667 module_exit(ltt_relay_exit);
1668
1669 MODULE_LICENSE("GPL");
1670 MODULE_AUTHOR("Mathieu Desnoyers");
1671 MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");
This page took 0.101401 seconds and 3 git commands to generate.