4 * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
6 * LTTng lockless buffer space management (reader/writer).
9 * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
12 * Karim Yaghmour (karim@opersys.com)
13 * Tom Zanussi (zanussi@us.ibm.com)
14 * Bob Wisniewski (bob@watson.ibm.com)
16 * Bob Wisniewski (bob@watson.ibm.com)
20 * 19/10/05, Complete lockless mechanism.
21 * 27/05/05, Modular redesign and rewrite.
23 * Userspace reader semantic :
24 * while (poll fd != POLLHUP) {
25 * - ioctl RELAY_GET_SUBBUF_SIZE
28 * - splice 1 subbuffer worth of data to a pipe
29 * - splice the data from pipe to disk/network
30 * - ioctl PUT_SUBBUF, check error value
31 * if err val < 0, previous subbuffer was corrupted.
36 #include <linux/time.h>
37 #include <linux/ltt-tracer.h>
38 #include <linux/ltt-relay.h>
39 #include <linux/module.h>
40 #include <linux/string.h>
41 #include <linux/slab.h>
42 #include <linux/init.h>
43 #include <linux/rcupdate.h>
44 #include <linux/sched.h>
45 #include <linux/bitops.h>
47 #include <linux/smp_lock.h>
48 #include <linux/debugfs.h>
49 #include <linux/stat.h>
50 #include <linux/cpu.h>
51 #include <linux/pipe_fs_i.h>
52 #include <linux/splice.h>
53 #include <asm/atomic.h>
54 #include <asm/local.h>
57 #define printk_dbg(fmt, args...) printk(fmt, args)
59 #define printk_dbg(fmt, args...)
62 /* LTTng lockless logging buffer info */
63 struct ltt_channel_buf_struct
{
64 /* First 32 bytes cache-hot cacheline */
65 local_t offset
; /* Current offset in the buffer */
66 local_t
*commit_count
; /* Commit count per sub-buffer */
67 atomic_long_t consumed
; /*
68 * Current offset in the buffer
69 * standard atomic access (shared)
71 unsigned long last_tsc
; /*
72 * Last timestamp written in the buffer.
74 /* End of first 32 bytes cacheline */
75 atomic_long_t active_readers
; /*
76 * Active readers count
77 * standard atomic access (shared)
80 local_t corrupted_subbuffers
;
81 spinlock_t full_lock
; /*
82 * buffer full condition spinlock, only
83 * for userspace tracing blocking mode
84 * synchronization with reader.
86 wait_queue_head_t write_wait
; /*
87 * Wait queue for blocking user space
90 atomic_t wakeup_readers
; /* Boolean : wakeup readers waiting ? */
91 } ____cacheline_aligned
;
94 * Last TSC comparison functions. Check if the current TSC overflows
95 * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
99 #if (BITS_PER_LONG == 32)
100 static inline void save_last_tsc(struct ltt_channel_buf_struct
*ltt_buf
,
103 ltt_buf
->last_tsc
= (unsigned long)(tsc
>> LTT_TSC_BITS
);
106 static inline int last_tsc_overflow(struct ltt_channel_buf_struct
*ltt_buf
,
109 unsigned long tsc_shifted
= (unsigned long)(tsc
>> LTT_TSC_BITS
);
111 if (unlikely((tsc_shifted
- ltt_buf
->last_tsc
)))
117 static inline void save_last_tsc(struct ltt_channel_buf_struct
*ltt_buf
,
120 ltt_buf
->last_tsc
= (unsigned long)tsc
;
123 static inline int last_tsc_overflow(struct ltt_channel_buf_struct
*ltt_buf
,
126 if (unlikely((tsc
- ltt_buf
->last_tsc
) >> LTT_TSC_BITS
))
133 //ust// static struct file_operations ltt_file_operations;
136 * A switch is done during tracing or as a final flush after tracing (so it
137 * won't write in the new sub-buffer).
139 enum force_switch_mode
{ FORCE_ACTIVE
, FORCE_FLUSH
};
141 static int ltt_relay_create_buffer(struct ltt_trace_struct
*trace
,
142 struct ltt_channel_struct
*ltt_chan
,
143 struct rchan_buf
*buf
,
145 unsigned int n_subbufs
);
147 static void ltt_relay_destroy_buffer(struct ltt_channel_struct
*ltt_chan
,
150 static void ltt_force_switch(struct rchan_buf
*buf
,
151 enum force_switch_mode mode
);
156 static void ltt_buffer_begin_callback(struct rchan_buf
*buf
,
157 u64 tsc
, unsigned int subbuf_idx
)
159 struct ltt_channel_struct
*channel
=
160 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
161 struct ltt_subbuffer_header
*header
=
162 (struct ltt_subbuffer_header
*)
163 ltt_relay_offset_address(buf
,
164 subbuf_idx
* buf
->chan
->subbuf_size
);
166 header
->cycle_count_begin
= tsc
;
167 header
->lost_size
= 0xFFFFFFFF; /* for debugging */
168 header
->buf_size
= buf
->chan
->subbuf_size
;
169 ltt_write_trace_header(channel
->trace
, header
);
173 * offset is assumed to never be 0 here : never deliver a completely empty
174 * subbuffer. The lost size is between 0 and subbuf_size-1.
176 static notrace
void ltt_buffer_end_callback(struct rchan_buf
*buf
,
177 u64 tsc
, unsigned int offset
, unsigned int subbuf_idx
)
179 struct ltt_channel_struct
*channel
=
180 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
181 struct ltt_channel_buf_struct
*ltt_buf
=
182 percpu_ptr(channel
->buf
, buf
->cpu
);
183 struct ltt_subbuffer_header
*header
=
184 (struct ltt_subbuffer_header
*)
185 ltt_relay_offset_address(buf
,
186 subbuf_idx
* buf
->chan
->subbuf_size
);
188 header
->lost_size
= SUBBUF_OFFSET((buf
->chan
->subbuf_size
- offset
),
190 header
->cycle_count_end
= tsc
;
191 header
->events_lost
= local_read(<t_buf
->events_lost
);
192 header
->subbuf_corrupt
= local_read(<t_buf
->corrupted_subbuffers
);
195 static notrace
void ltt_deliver(struct rchan_buf
*buf
, unsigned int subbuf_idx
,
198 struct ltt_channel_struct
*channel
=
199 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
200 struct ltt_channel_buf_struct
*ltt_buf
=
201 percpu_ptr(channel
->buf
, buf
->cpu
);
203 atomic_set(<t_buf
->wakeup_readers
, 1);
206 static struct dentry
*ltt_create_buf_file_callback(const char *filename
,
207 struct dentry
*parent
, int mode
,
208 struct rchan_buf
*buf
)
210 struct ltt_channel_struct
*ltt_chan
;
212 //ust// struct dentry *dentry;
214 ltt_chan
= buf
->chan
->private_data
;
215 err
= ltt_relay_create_buffer(ltt_chan
->trace
, ltt_chan
,
217 buf
->chan
->n_subbufs
);
221 //ust// dentry = debugfs_create_file(filename, mode, parent, buf,
222 //ust// <t_file_operations);
225 //ust// return dentry;
227 ltt_relay_destroy_buffer(ltt_chan
, buf
->cpu
);
231 static int ltt_remove_buf_file_callback(struct dentry
*dentry
)
233 struct rchan_buf
*buf
= dentry
->d_inode
->i_private
;
234 struct ltt_channel_struct
*ltt_chan
= buf
->chan
->private_data
;
236 //ust// debugfs_remove(dentry);
237 ltt_relay_destroy_buffer(ltt_chan
, buf
->cpu
);
245 * This must be done after the trace is removed from the RCU list so that there
246 * are no stalled writers.
248 static void ltt_relay_wake_writers(struct ltt_channel_buf_struct
*ltt_buf
)
251 if (waitqueue_active(<t_buf
->write_wait
))
252 wake_up_interruptible(<t_buf
->write_wait
);
256 * This function should not be called from NMI interrupt context
258 static notrace
void ltt_buf_unfull(struct rchan_buf
*buf
,
259 unsigned int subbuf_idx
,
262 struct ltt_channel_struct
*ltt_channel
=
263 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
264 struct ltt_channel_buf_struct
*ltt_buf
=
265 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
267 ltt_relay_wake_writers(ltt_buf
);
271 * ltt_open - open file op for ltt files
272 * @inode: opened inode
275 * Open implementation. Makes sure only one open instance of a buffer is
276 * done at a given moment.
278 static int ltt_open(struct inode
*inode
, struct file
*file
)
280 struct rchan_buf
*buf
= inode
->i_private
;
281 struct ltt_channel_struct
*ltt_channel
=
282 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
283 struct ltt_channel_buf_struct
*ltt_buf
=
284 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
286 if (!atomic_long_add_unless(<t_buf
->active_readers
, 1, 1))
288 return ltt_relay_file_operations
.open(inode
, file
);
292 * ltt_release - release file op for ltt files
293 * @inode: opened inode
296 * Release implementation.
298 static int ltt_release(struct inode
*inode
, struct file
*file
)
300 struct rchan_buf
*buf
= inode
->i_private
;
301 struct ltt_channel_struct
*ltt_channel
=
302 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
303 struct ltt_channel_buf_struct
*ltt_buf
=
304 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
307 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
308 atomic_long_dec(<t_buf
->active_readers
);
309 ret
= ltt_relay_file_operations
.release(inode
, file
);
315 * ltt_poll - file op for ltt files
319 * Poll implementation.
321 static unsigned int ltt_poll(struct file
*filp
, poll_table
*wait
)
323 unsigned int mask
= 0;
324 struct inode
*inode
= filp
->f_dentry
->d_inode
;
325 struct rchan_buf
*buf
= inode
->i_private
;
326 struct ltt_channel_struct
*ltt_channel
=
327 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
328 struct ltt_channel_buf_struct
*ltt_buf
=
329 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
331 if (filp
->f_mode
& FMODE_READ
) {
332 poll_wait_set_exclusive(wait
);
333 poll_wait(filp
, &buf
->read_wait
, wait
);
335 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
336 if (SUBBUF_TRUNC(local_read(<t_buf
->offset
),
338 - SUBBUF_TRUNC(atomic_long_read(<t_buf
->consumed
),
346 struct rchan
*rchan
=
347 ltt_channel
->trans_channel_data
;
348 if (SUBBUF_TRUNC(local_read(<t_buf
->offset
),
350 - SUBBUF_TRUNC(atomic_long_read(
353 >= rchan
->alloc_size
)
354 return POLLPRI
| POLLRDBAND
;
356 return POLLIN
| POLLRDNORM
;
362 static int ltt_do_get_subbuf(struct rchan_buf
*buf
, struct ltt_channel_buf_struct
*ltt_buf
, long *pconsumed_old
)
364 long consumed_old
, consumed_idx
, commit_count
, write_offset
;
365 consumed_old
= atomic_long_read(<t_buf
->consumed
);
366 consumed_idx
= SUBBUF_INDEX(consumed_old
, buf
->chan
);
367 commit_count
= local_read(<t_buf
->commit_count
[consumed_idx
]);
369 * Make sure we read the commit count before reading the buffer
370 * data and the write offset. Correct consumed offset ordering
371 * wrt commit count is insured by the use of cmpxchg to update
372 * the consumed offset.
375 write_offset
= local_read(<t_buf
->offset
);
377 * Check that the subbuffer we are trying to consume has been
378 * already fully committed.
380 if (((commit_count
- buf
->chan
->subbuf_size
)
381 & ltt_channel
->commit_count_mask
)
382 - (BUFFER_TRUNC(consumed_old
, buf
->chan
)
383 >> ltt_channel
->n_subbufs_order
)
388 * Check that we are not about to read the same subbuffer in
389 * which the writer head is.
391 if ((SUBBUF_TRUNC(write_offset
, buf
->chan
)
392 - SUBBUF_TRUNC(consumed_old
, buf
->chan
))
397 *pconsumed_old
= consumed_old
;
401 static int ltt_do_put_subbuf(struct rchan_buf
*buf
, struct ltt_channel_buf_struct
*ltt_buf
, u32 uconsumed_old
)
403 long consumed_new
, consumed_old
;
405 consumed_old
= atomic_long_read(<t_buf
->consumed
);
406 consumed_old
= consumed_old
& (~0xFFFFFFFFL
);
407 consumed_old
= consumed_old
| uconsumed_old
;
408 consumed_new
= SUBBUF_ALIGN(consumed_old
, buf
->chan
);
410 spin_lock(<t_buf
->full_lock
);
411 if (atomic_long_cmpxchg(<t_buf
->consumed
, consumed_old
,
414 /* We have been pushed by the writer : the last
415 * buffer read _is_ corrupted! It can also
416 * happen if this is a buffer we never got. */
417 spin_unlock(<t_buf
->full_lock
);
420 /* tell the client that buffer is now unfull */
423 index
= SUBBUF_INDEX(consumed_old
, buf
->chan
);
424 data
= BUFFER_OFFSET(consumed_old
, buf
->chan
);
425 ltt_buf_unfull(buf
, index
, data
);
426 spin_unlock(<t_buf
->full_lock
);
432 * ltt_ioctl - control on the debugfs file
439 * This ioctl implements three commands necessary for a minimal
440 * producer/consumer implementation :
442 * Get the next sub buffer that can be read. It never blocks.
444 * Release the currently read sub-buffer. Parameter is the last
445 * put subbuffer (returned by GET_SUBBUF).
446 * RELAY_GET_N_BUBBUFS
447 * returns the number of sub buffers in the per cpu channel.
448 * RELAY_GET_SUBBUF_SIZE
449 * returns the size of the sub buffers.
451 //ust// static int ltt_ioctl(struct inode *inode, struct file *filp,
452 //ust// unsigned int cmd, unsigned long arg)
454 //ust// struct rchan_buf *buf = inode->i_private;
455 //ust// struct ltt_channel_struct *ltt_channel =
456 //ust// (struct ltt_channel_struct *)buf->chan->private_data;
457 //ust// struct ltt_channel_buf_struct *ltt_buf =
458 //ust// percpu_ptr(ltt_channel->buf, buf->cpu);
459 //ust// u32 __user *argp = (u32 __user *)arg;
461 //ust// WARN_ON(atomic_long_read(<t_buf->active_readers) != 1);
462 //ust// switch (cmd) {
463 //ust// case RELAY_GET_SUBBUF:
466 //ust// ret = ltt_do_get_subbuf(buf, ltt_buf, &consumed_old);
469 //ust// return put_user((u32)consumed_old, argp);
471 //ust// case RELAY_PUT_SUBBUF:
474 //ust// u32 uconsumed_old;
475 //ust// ret = get_user(uconsumed_old, argp);
477 //ust// return ret; /* will return -EFAULT */
478 //ust// return ltt_do_put_subbuf(buf, ltt_buf, uconsumed_old);
480 //ust// case RELAY_GET_N_SUBBUFS:
481 //ust// return put_user((u32)buf->chan->n_subbufs, argp);
483 //ust// case RELAY_GET_SUBBUF_SIZE:
484 //ust// return put_user((u32)buf->chan->subbuf_size, argp);
487 //ust// return -ENOIOCTLCMD;
492 //ust// #ifdef CONFIG_COMPAT
493 //ust// static long ltt_compat_ioctl(struct file *file, unsigned int cmd,
494 //ust// unsigned long arg)
496 //ust// long ret = -ENOIOCTLCMD;
498 //ust// lock_kernel();
499 //ust// ret = ltt_ioctl(file->f_dentry->d_inode, file, cmd, arg);
500 //ust// unlock_kernel();
506 //ust// static void ltt_relay_pipe_buf_release(struct pipe_inode_info *pipe,
507 //ust// struct pipe_buffer *pbuf)
511 //ust// static struct pipe_buf_operations ltt_relay_pipe_buf_ops = {
512 //ust// .can_merge = 0,
513 //ust// .map = generic_pipe_buf_map,
514 //ust// .unmap = generic_pipe_buf_unmap,
515 //ust// .confirm = generic_pipe_buf_confirm,
516 //ust// .release = ltt_relay_pipe_buf_release,
517 //ust// .steal = generic_pipe_buf_steal,
518 //ust// .get = generic_pipe_buf_get,
521 //ust// static void ltt_relay_page_release(struct splice_pipe_desc *spd, unsigned int i)
526 * subbuf_splice_actor - splice up to one subbuf's worth of data
528 static int subbuf_splice_actor(struct file
*in
,
530 struct pipe_inode_info
*pipe
,
534 struct rchan_buf
*buf
= in
->private_data
;
535 struct ltt_channel_struct
*ltt_channel
=
536 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
537 struct ltt_channel_buf_struct
*ltt_buf
=
538 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
539 unsigned int poff
, subbuf_pages
, nr_pages
;
540 struct page
*pages
[PIPE_BUFFERS
];
541 struct partial_page partial
[PIPE_BUFFERS
];
542 struct splice_pipe_desc spd
= {
547 .ops
= <t_relay_pipe_buf_ops
,
548 .spd_release
= ltt_relay_page_release
,
550 long consumed_old
, consumed_idx
, roffset
;
551 unsigned long bytes_avail
;
554 * Check that a GET_SUBBUF ioctl has been done before.
556 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
557 consumed_old
= atomic_long_read(<t_buf
->consumed
);
558 consumed_old
+= *ppos
;
559 consumed_idx
= SUBBUF_INDEX(consumed_old
, buf
->chan
);
562 * Adjust read len, if longer than what is available
564 bytes_avail
= SUBBUF_TRUNC(local_read(<t_buf
->offset
), buf
->chan
)
566 WARN_ON(bytes_avail
> buf
->chan
->alloc_size
);
567 len
= min_t(size_t, len
, bytes_avail
);
568 subbuf_pages
= bytes_avail
>> PAGE_SHIFT
;
569 nr_pages
= min_t(unsigned int, subbuf_pages
, PIPE_BUFFERS
);
570 roffset
= consumed_old
& PAGE_MASK
;
571 poff
= consumed_old
& ~PAGE_MASK
;
572 printk_dbg(KERN_DEBUG
"SPLICE actor len %zu pos %zd write_pos %ld\n",
573 len
, (ssize_t
)*ppos
, local_read(<t_buf
->offset
));
575 for (; spd
.nr_pages
< nr_pages
; spd
.nr_pages
++) {
576 unsigned int this_len
;
577 struct buf_page
*page
;
581 printk_dbg(KERN_DEBUG
"SPLICE actor loop len %zu roffset %ld\n",
584 this_len
= PAGE_SIZE
- poff
;
585 page
= ltt_relay_read_get_page(buf
, roffset
);
586 spd
.pages
[spd
.nr_pages
] = page
->page
;
587 spd
.partial
[spd
.nr_pages
].offset
= poff
;
588 spd
.partial
[spd
.nr_pages
].len
= this_len
;
591 roffset
+= PAGE_SIZE
;
598 return splice_to_pipe(pipe
, &spd
);
601 static ssize_t
ltt_relay_file_splice_read(struct file
*in
,
603 struct pipe_inode_info
*pipe
,
613 printk_dbg(KERN_DEBUG
"SPLICE read len %zu pos %zd\n",
614 len
, (ssize_t
)*ppos
);
615 while (len
&& !spliced
) {
616 ret
= subbuf_splice_actor(in
, ppos
, pipe
, len
, flags
);
617 printk_dbg(KERN_DEBUG
"SPLICE read loop ret %d\n", ret
);
621 if (flags
& SPLICE_F_NONBLOCK
)
640 static void ltt_relay_print_subbuffer_errors(
641 struct ltt_channel_struct
*ltt_chan
,
642 long cons_off
, unsigned int cpu
)
644 struct rchan
*rchan
= ltt_chan
->trans_channel_data
;
645 struct ltt_channel_buf_struct
*ltt_buf
=
646 percpu_ptr(ltt_chan
->buf
, cpu
);
647 long cons_idx
, commit_count
, write_offset
;
649 cons_idx
= SUBBUF_INDEX(cons_off
, rchan
);
650 commit_count
= local_read(<t_buf
->commit_count
[cons_idx
]);
652 * No need to order commit_count and write_offset reads because we
653 * execute after trace is stopped when there are no readers left.
655 write_offset
= local_read(<t_buf
->offset
);
657 "LTT : unread channel %s offset is %ld "
658 "and cons_off : %ld (cpu %u)\n",
659 ltt_chan
->channel_name
, write_offset
, cons_off
, cpu
);
660 /* Check each sub-buffer for non filled commit count */
661 if (((commit_count
- rchan
->subbuf_size
) & ltt_chan
->commit_count_mask
)
662 - (BUFFER_TRUNC(cons_off
, rchan
) >> ltt_chan
->n_subbufs_order
)
665 "LTT : %s : subbuffer %lu has non filled "
666 "commit count %lu.\n",
667 ltt_chan
->channel_name
, cons_idx
, commit_count
);
668 printk(KERN_ALERT
"LTT : %s : commit count : %lu, subbuf size %zd\n",
669 ltt_chan
->channel_name
, commit_count
,
673 static void ltt_relay_print_errors(struct ltt_trace_struct
*trace
,
674 struct ltt_channel_struct
*ltt_chan
, int cpu
)
676 struct rchan
*rchan
= ltt_chan
->trans_channel_data
;
677 struct ltt_channel_buf_struct
*ltt_buf
=
678 percpu_ptr(ltt_chan
->buf
, cpu
);
681 for (cons_off
= atomic_long_read(<t_buf
->consumed
);
682 (SUBBUF_TRUNC(local_read(<t_buf
->offset
),
685 cons_off
= SUBBUF_ALIGN(cons_off
, rchan
))
686 ltt_relay_print_subbuffer_errors(ltt_chan
, cons_off
, cpu
);
689 static void ltt_relay_print_buffer_errors(struct ltt_channel_struct
*ltt_chan
,
692 struct ltt_trace_struct
*trace
= ltt_chan
->trace
;
693 struct ltt_channel_buf_struct
*ltt_buf
=
694 percpu_ptr(ltt_chan
->buf
, cpu
);
696 if (local_read(<t_buf
->events_lost
))
698 "LTT : %s : %ld events lost "
699 "in %s channel (cpu %u).\n",
700 ltt_chan
->channel_name
,
701 local_read(<t_buf
->events_lost
),
702 ltt_chan
->channel_name
, cpu
);
703 if (local_read(<t_buf
->corrupted_subbuffers
))
705 "LTT : %s : %ld corrupted subbuffers "
706 "in %s channel (cpu %u).\n",
707 ltt_chan
->channel_name
,
708 local_read(<t_buf
->corrupted_subbuffers
),
709 ltt_chan
->channel_name
, cpu
);
711 ltt_relay_print_errors(trace
, ltt_chan
, cpu
);
714 //ust// static void ltt_relay_remove_dirs(struct ltt_trace_struct *trace)
716 //ust// debugfs_remove(trace->dentry.trace_root);
719 static void ltt_relay_release_channel(struct kref
*kref
)
721 struct ltt_channel_struct
*ltt_chan
= container_of(kref
,
722 struct ltt_channel_struct
, kref
);
723 percpu_free(ltt_chan
->buf
);
729 //ust// static int ltt_relay_create_buffer(struct ltt_trace_struct *trace,
730 //ust// struct ltt_channel_struct *ltt_chan, struct rchan_buf *buf,
731 //ust// unsigned int cpu, unsigned int n_subbufs)
733 //ust// struct ltt_channel_buf_struct *ltt_buf =
734 //ust// percpu_ptr(ltt_chan->buf, cpu);
735 //ust// unsigned int j;
737 //ust// ltt_buf->commit_count =
738 //ust// kzalloc_node(sizeof(ltt_buf->commit_count) * n_subbufs,
739 //ust// GFP_KERNEL, cpu_to_node(cpu));
740 //ust// if (!ltt_buf->commit_count)
741 //ust// return -ENOMEM;
742 //ust// kref_get(&trace->kref);
743 //ust// kref_get(&trace->ltt_transport_kref);
744 //ust// kref_get(<t_chan->kref);
745 //ust// local_set(<t_buf->offset, ltt_subbuffer_header_size());
746 //ust// atomic_long_set(<t_buf->consumed, 0);
747 //ust// atomic_long_set(<t_buf->active_readers, 0);
748 //ust// for (j = 0; j < n_subbufs; j++)
749 //ust// local_set(<t_buf->commit_count[j], 0);
750 //ust// init_waitqueue_head(<t_buf->write_wait);
751 //ust// atomic_set(<t_buf->wakeup_readers, 0);
752 //ust// spin_lock_init(<t_buf->full_lock);
754 //ust// ltt_buffer_begin_callback(buf, trace->start_tsc, 0);
755 //ust// /* atomic_add made on local variable on data that belongs to
756 //ust// * various CPUs : ok because tracing not started (for this cpu). */
757 //ust// local_add(ltt_subbuffer_header_size(), <t_buf->commit_count[0]);
759 //ust// local_set(<t_buf->events_lost, 0);
760 //ust// local_set(<t_buf->corrupted_subbuffers, 0);
765 static int ltt_relay_create_buffer(struct ltt_trace_struct
*trace
,
766 struct ltt_channel_struct
*ltt_chan
, struct rchan_buf
*buf
,
767 unsigned int cpu
, unsigned int n_subbufs
)
769 struct ltt_channel_buf_struct
*ltt_buf
= ltt_chan
->buf
;
772 ltt_buf
->commit_count
=
773 malloc(sizeof(ltt_buf
->commit_count
) * n_subbufs
);
774 if (!ltt_buf
->commit_count
)
776 kref_get(&trace
->kref
);
777 kref_get(&trace
->ltt_transport_kref
);
778 kref_get(<t_chan
->kref
);
779 ltt_buf
->offset
= ltt_subbuffer_header_size();
780 atomic_long_set(<t_buf
->consumed
, 0);
781 atomic_long_set(<t_buf
->active_readers
, 0);
782 for (j
= 0; j
< n_subbufs
; j
++)
783 local_set(<t_buf
->commit_count
[j
], 0);
784 //ust// init_waitqueue_head(<t_buf->write_wait);
785 atomic_set(<t_buf
->wakeup_readers
, 0);
786 spin_lock_init(<t_buf
->full_lock
);
788 ltt_buffer_begin_callback(buf
, trace
->start_tsc
, 0);
790 ltt_buf
->commit_count
[0] += ltt_subbuffer_header_size();
792 ltt_buf
->events_lost
= 0;
793 ltt_buf
->corrupted_subbuffers
= 0;
798 static void ltt_relay_destroy_buffer(struct ltt_channel_struct
*ltt_chan
,
801 struct ltt_trace_struct
*trace
= ltt_chan
->trace
;
802 struct ltt_channel_buf_struct
*ltt_buf
=
803 percpu_ptr(ltt_chan
->buf
, cpu
);
805 kref_put(<t_chan
->trace
->ltt_transport_kref
,
806 ltt_release_transport
);
807 ltt_relay_print_buffer_errors(ltt_chan
, cpu
);
808 kfree(ltt_buf
->commit_count
);
809 ltt_buf
->commit_count
= NULL
;
810 kref_put(<t_chan
->kref
, ltt_relay_release_channel
);
811 kref_put(&trace
->kref
, ltt_release_trace
);
812 wake_up_interruptible(&trace
->kref_wq
);
818 static int ltt_relay_create_channel(const char *trace_name
,
819 struct ltt_trace_struct
*trace
, struct dentry
*dir
,
820 const char *channel_name
, struct ltt_channel_struct
*ltt_chan
,
821 unsigned int subbuf_size
, unsigned int n_subbufs
,
825 unsigned int tmpname_len
;
828 tmpname
= kmalloc(PATH_MAX
, GFP_KERNEL
);
832 strncpy(tmpname
, LTT_FLIGHT_PREFIX
, PATH_MAX
-1);
833 strncat(tmpname
, channel_name
,
834 PATH_MAX
-1-sizeof(LTT_FLIGHT_PREFIX
));
836 strncpy(tmpname
, channel_name
, PATH_MAX
-1);
838 strncat(tmpname
, "_", PATH_MAX
-1-strlen(tmpname
));
840 kref_init(<t_chan
->kref
);
842 ltt_chan
->trace
= trace
;
843 ltt_chan
->buffer_begin
= ltt_buffer_begin_callback
;
844 ltt_chan
->buffer_end
= ltt_buffer_end_callback
;
845 ltt_chan
->overwrite
= overwrite
;
846 ltt_chan
->n_subbufs_order
= get_count_order(n_subbufs
);
847 ltt_chan
->commit_count_mask
= (~0UL >> ltt_chan
->n_subbufs_order
);
848 ltt_chan
->buf
= percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct
),
849 GFP_KERNEL
, cpu_possible_map
);
851 goto ltt_percpu_alloc_error
;
852 ltt_chan
->trans_channel_data
= ltt_relay_open(tmpname
,
858 tmpname_len
= strlen(tmpname
);
859 if (tmpname_len
> 0) {
860 /* Remove final _ for pretty printing */
861 tmpname
[tmpname_len
-1] = '\0';
863 if (ltt_chan
->trans_channel_data
== NULL
) {
864 printk(KERN_ERR
"LTT : Can't open %s channel for trace %s\n",
865 tmpname
, trace_name
);
866 goto relay_open_error
;
873 percpu_free(ltt_chan
->buf
);
874 ltt_percpu_alloc_error
:
881 //ust// static int ltt_relay_create_dirs(struct ltt_trace_struct *new_trace)
883 //ust// new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
884 //ust// get_ltt_root());
885 //ust// if (new_trace->dentry.trace_root == NULL) {
886 //ust// printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
887 //ust// new_trace->trace_name);
888 //ust// return EEXIST;
891 //ust// new_trace->callbacks.create_buf_file = ltt_create_buf_file_callback;
892 //ust// new_trace->callbacks.remove_buf_file = ltt_remove_buf_file_callback;
898 * LTTng channel flush function.
900 * Must be called when no tracing is active in the channel, because of
901 * accesses across CPUs.
903 static notrace
void ltt_relay_buffer_flush(struct rchan_buf
*buf
)
906 ltt_force_switch(buf
, FORCE_FLUSH
);
909 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct
*ltt_channel
)
912 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
914 for_each_possible_cpu(i
) {
915 struct ltt_channel_buf_struct
*ltt_buf
=
916 percpu_ptr(ltt_channel
->buf
, i
);
918 if (atomic_read(<t_buf
->wakeup_readers
) == 1) {
919 atomic_set(<t_buf
->wakeup_readers
, 0);
920 wake_up_interruptible(&rchan
->buf
[i
]->read_wait
);
925 static void ltt_relay_finish_buffer(struct ltt_channel_struct
*ltt_channel
,
928 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
930 if (rchan
->buf
[cpu
]) {
931 struct ltt_channel_buf_struct
*ltt_buf
=
932 percpu_ptr(ltt_channel
->buf
, cpu
);
933 ltt_relay_buffer_flush(rchan
->buf
[cpu
]);
934 ltt_relay_wake_writers(ltt_buf
);
939 static void ltt_relay_finish_channel(struct ltt_channel_struct
*ltt_channel
)
943 for_each_possible_cpu(i
)
944 ltt_relay_finish_buffer(ltt_channel
, i
);
947 static void ltt_relay_remove_channel(struct ltt_channel_struct
*channel
)
949 struct rchan
*rchan
= channel
->trans_channel_data
;
951 ltt_relay_close(rchan
);
952 kref_put(&channel
->kref
, ltt_relay_release_channel
);
955 struct ltt_reserve_switch_offsets
{
956 long begin
, end
, old
;
957 long begin_switch
, end_switch_current
, end_switch_old
;
958 long commit_count
, reserve_commit_diff
;
959 size_t before_hdr_pad
, size
;
965 * !0 if execution must be aborted.
967 static inline int ltt_relay_try_reserve(
968 struct ltt_channel_struct
*ltt_channel
,
969 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
970 struct rchan_buf
*buf
,
971 struct ltt_reserve_switch_offsets
*offsets
, size_t data_size
,
972 u64
*tsc
, unsigned int *rflags
, int largest_align
)
974 offsets
->begin
= local_read(<t_buf
->offset
);
975 offsets
->old
= offsets
->begin
;
976 offsets
->begin_switch
= 0;
977 offsets
->end_switch_current
= 0;
978 offsets
->end_switch_old
= 0;
980 *tsc
= trace_clock_read64();
981 if (last_tsc_overflow(ltt_buf
, *tsc
))
982 *rflags
= LTT_RFLAG_ID_SIZE_TSC
;
984 if (SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) == 0) {
985 offsets
->begin_switch
= 1; /* For offsets->begin */
987 offsets
->size
= ltt_get_header_size(ltt_channel
,
988 offsets
->begin
, data_size
,
989 &offsets
->before_hdr_pad
, *rflags
);
990 offsets
->size
+= ltt_align(offsets
->begin
+ offsets
->size
,
993 if ((SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) + offsets
->size
)
994 > buf
->chan
->subbuf_size
) {
995 offsets
->end_switch_old
= 1; /* For offsets->old */
996 offsets
->begin_switch
= 1; /* For offsets->begin */
999 if (offsets
->begin_switch
) {
1002 if (offsets
->end_switch_old
)
1003 offsets
->begin
= SUBBUF_ALIGN(offsets
->begin
,
1005 offsets
->begin
= offsets
->begin
+ ltt_subbuffer_header_size();
1006 /* Test new buffer integrity */
1007 subbuf_index
= SUBBUF_INDEX(offsets
->begin
, buf
->chan
);
1008 offsets
->reserve_commit_diff
=
1009 (BUFFER_TRUNC(offsets
->begin
, buf
->chan
)
1010 >> ltt_channel
->n_subbufs_order
)
1011 - (local_read(<t_buf
->commit_count
[subbuf_index
])
1012 & ltt_channel
->commit_count_mask
);
1013 if (offsets
->reserve_commit_diff
== 0) {
1014 /* Next buffer not corrupted. */
1015 if (!ltt_channel
->overwrite
&&
1016 (SUBBUF_TRUNC(offsets
->begin
, buf
->chan
)
1017 - SUBBUF_TRUNC(atomic_long_read(
1018 <t_buf
->consumed
),
1020 >= rchan
->alloc_size
) {
1022 * We do not overwrite non consumed buffers
1023 * and we are full : event is lost.
1025 local_inc(<t_buf
->events_lost
);
1029 * next buffer not corrupted, we are either in
1030 * overwrite mode or the buffer is not full.
1031 * It's safe to write in this new subbuffer.
1036 * Next subbuffer corrupted. Force pushing reader even
1037 * in normal mode. It's safe to write in this new
1041 offsets
->size
= ltt_get_header_size(ltt_channel
,
1042 offsets
->begin
, data_size
,
1043 &offsets
->before_hdr_pad
, *rflags
);
1044 offsets
->size
+= ltt_align(offsets
->begin
+ offsets
->size
,
1047 if ((SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) + offsets
->size
)
1048 > buf
->chan
->subbuf_size
) {
1050 * Event too big for subbuffers, report error, don't
1051 * complete the sub-buffer switch.
1053 local_inc(<t_buf
->events_lost
);
1057 * We just made a successful buffer switch and the event
1058 * fits in the new subbuffer. Let's write.
1063 * Event fits in the current buffer and we are not on a switch
1064 * boundary. It's safe to write.
1067 offsets
->end
= offsets
->begin
+ offsets
->size
;
1069 if ((SUBBUF_OFFSET(offsets
->end
, buf
->chan
)) == 0) {
1071 * The offset_end will fall at the very beginning of the next
1074 offsets
->end_switch_current
= 1; /* For offsets->begin */
1082 * !0 if execution must be aborted.
1084 static inline int ltt_relay_try_switch(
1085 enum force_switch_mode mode
,
1086 struct ltt_channel_struct
*ltt_channel
,
1087 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1088 struct rchan_buf
*buf
,
1089 struct ltt_reserve_switch_offsets
*offsets
,
1094 offsets
->begin
= local_read(<t_buf
->offset
);
1095 offsets
->old
= offsets
->begin
;
1096 offsets
->begin_switch
= 0;
1097 offsets
->end_switch_old
= 0;
1099 *tsc
= trace_clock_read64();
1101 if (SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) != 0) {
1102 offsets
->begin
= SUBBUF_ALIGN(offsets
->begin
, buf
->chan
);
1103 offsets
->end_switch_old
= 1;
1105 /* we do not have to switch : buffer is empty */
1108 if (mode
== FORCE_ACTIVE
)
1109 offsets
->begin
+= ltt_subbuffer_header_size();
1111 * Always begin_switch in FORCE_ACTIVE mode.
1112 * Test new buffer integrity
1114 subbuf_index
= SUBBUF_INDEX(offsets
->begin
, buf
->chan
);
1115 offsets
->reserve_commit_diff
=
1116 (BUFFER_TRUNC(offsets
->begin
, buf
->chan
)
1117 >> ltt_channel
->n_subbufs_order
)
1118 - (local_read(<t_buf
->commit_count
[subbuf_index
])
1119 & ltt_channel
->commit_count_mask
);
1120 if (offsets
->reserve_commit_diff
== 0) {
1121 /* Next buffer not corrupted. */
1122 if (mode
== FORCE_ACTIVE
1123 && !ltt_channel
->overwrite
1124 && offsets
->begin
- atomic_long_read(<t_buf
->consumed
)
1125 >= rchan
->alloc_size
) {
1127 * We do not overwrite non consumed buffers and we are
1128 * full : ignore switch while tracing is active.
1134 * Next subbuffer corrupted. Force pushing reader even in normal
1138 offsets
->end
= offsets
->begin
;
1142 static inline void ltt_reserve_push_reader(
1143 struct ltt_channel_struct
*ltt_channel
,
1144 struct ltt_channel_buf_struct
*ltt_buf
,
1145 struct rchan
*rchan
,
1146 struct rchan_buf
*buf
,
1147 struct ltt_reserve_switch_offsets
*offsets
)
1149 long consumed_old
, consumed_new
;
1152 consumed_old
= atomic_long_read(<t_buf
->consumed
);
1154 * If buffer is in overwrite mode, push the reader consumed
1155 * count if the write position has reached it and we are not
1156 * at the first iteration (don't push the reader farther than
1157 * the writer). This operation can be done concurrently by many
1158 * writers in the same buffer, the writer being at the farthest
1159 * write position sub-buffer index in the buffer being the one
1160 * which will win this loop.
1161 * If the buffer is not in overwrite mode, pushing the reader
1162 * only happens if a sub-buffer is corrupted.
1164 if ((SUBBUF_TRUNC(offsets
->end
-1, buf
->chan
)
1165 - SUBBUF_TRUNC(consumed_old
, buf
->chan
))
1166 >= rchan
->alloc_size
)
1167 consumed_new
= SUBBUF_ALIGN(consumed_old
, buf
->chan
);
1169 consumed_new
= consumed_old
;
1172 } while (atomic_long_cmpxchg(<t_buf
->consumed
, consumed_old
,
1173 consumed_new
) != consumed_old
);
1175 if (consumed_old
!= consumed_new
) {
1177 * Reader pushed : we are the winner of the push, we can
1178 * therefore reequilibrate reserve and commit. Atomic increment
1179 * of the commit count permits other writers to play around
1180 * with this variable before us. We keep track of
1181 * corrupted_subbuffers even in overwrite mode :
1182 * we never want to write over a non completely committed
1183 * sub-buffer : possible causes : the buffer size is too low
1184 * compared to the unordered data input, or there is a writer
1185 * that died between the reserve and the commit.
1187 if (offsets
->reserve_commit_diff
) {
1189 * We have to alter the sub-buffer commit count.
1190 * We do not deliver the previous subbuffer, given it
1191 * was either corrupted or not consumed (overwrite
1194 local_add(offsets
->reserve_commit_diff
,
1195 <t_buf
->commit_count
[
1196 SUBBUF_INDEX(offsets
->begin
,
1198 if (!ltt_channel
->overwrite
1199 || offsets
->reserve_commit_diff
1200 != rchan
->subbuf_size
) {
1202 * The reserve commit diff was not subbuf_size :
1203 * it means the subbuffer was partly written to
1204 * and is therefore corrupted. If it is multiple
1205 * of subbuffer size and we are in flight
1206 * recorder mode, we are skipping over a whole
1209 local_inc(<t_buf
->corrupted_subbuffers
);
1217 * ltt_reserve_switch_old_subbuf: switch old subbuffer
1219 * Concurrency safe because we are the last and only thread to alter this
1220 * sub-buffer. As long as it is not delivered and read, no other thread can
1221 * alter the offset, alter the reserve_count or call the
1222 * client_buffer_end_callback on this sub-buffer.
1224 * The only remaining threads could be the ones with pending commits. They will
1225 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1226 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1227 * corrupted sub-buffers count and push the readers across these sub-buffers.
1229 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1230 * switches in, finding out it's corrupted. The result will be than the old
1231 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1232 * will be declared corrupted too because of the commit count adjustment.
1234 * Note : offset_old should never be 0 here.
1236 static inline void ltt_reserve_switch_old_subbuf(
1237 struct ltt_channel_struct
*ltt_channel
,
1238 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1239 struct rchan_buf
*buf
,
1240 struct ltt_reserve_switch_offsets
*offsets
, u64
*tsc
)
1242 long oldidx
= SUBBUF_INDEX(offsets
->old
- 1, rchan
);
1244 ltt_channel
->buffer_end(buf
, *tsc
, offsets
->old
, oldidx
);
1245 /* Must write buffer end before incrementing commit count */
1247 offsets
->commit_count
=
1248 local_add_return(rchan
->subbuf_size
1249 - (SUBBUF_OFFSET(offsets
->old
- 1, rchan
)
1251 <t_buf
->commit_count
[oldidx
]);
1252 if ((BUFFER_TRUNC(offsets
->old
- 1, rchan
)
1253 >> ltt_channel
->n_subbufs_order
)
1254 - ((offsets
->commit_count
- rchan
->subbuf_size
)
1255 & ltt_channel
->commit_count_mask
) == 0)
1256 ltt_deliver(buf
, oldidx
, NULL
);
1260 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
1262 * This code can be executed unordered : writers may already have written to the
1263 * sub-buffer before this code gets executed, caution. The commit makes sure
1264 * that this code is executed before the deliver of this sub-buffer.
1266 static inline void ltt_reserve_switch_new_subbuf(
1267 struct ltt_channel_struct
*ltt_channel
,
1268 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1269 struct rchan_buf
*buf
,
1270 struct ltt_reserve_switch_offsets
*offsets
, u64
*tsc
)
1272 long beginidx
= SUBBUF_INDEX(offsets
->begin
, rchan
);
1274 ltt_channel
->buffer_begin(buf
, *tsc
, beginidx
);
1275 /* Must write buffer end before incrementing commit count */
1277 offsets
->commit_count
= local_add_return(ltt_subbuffer_header_size(),
1278 <t_buf
->commit_count
[beginidx
]);
1279 /* Check if the written buffer has to be delivered */
1280 if ((BUFFER_TRUNC(offsets
->begin
, rchan
)
1281 >> ltt_channel
->n_subbufs_order
)
1282 - ((offsets
->commit_count
- rchan
->subbuf_size
)
1283 & ltt_channel
->commit_count_mask
) == 0)
1284 ltt_deliver(buf
, beginidx
, NULL
);
1289 * ltt_reserve_end_switch_current: finish switching current subbuffer
1291 * Concurrency safe because we are the last and only thread to alter this
1292 * sub-buffer. As long as it is not delivered and read, no other thread can
1293 * alter the offset, alter the reserve_count or call the
1294 * client_buffer_end_callback on this sub-buffer.
1296 * The only remaining threads could be the ones with pending commits. They will
1297 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1298 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1299 * corrupted sub-buffers count and push the readers across these sub-buffers.
1301 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1302 * switches in, finding out it's corrupted. The result will be than the old
1303 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1304 * will be declared corrupted too because of the commit count adjustment.
1306 static inline void ltt_reserve_end_switch_current(
1307 struct ltt_channel_struct
*ltt_channel
,
1308 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1309 struct rchan_buf
*buf
,
1310 struct ltt_reserve_switch_offsets
*offsets
, u64
*tsc
)
1312 long endidx
= SUBBUF_INDEX(offsets
->end
- 1, rchan
);
1314 ltt_channel
->buffer_end(buf
, *tsc
, offsets
->end
, endidx
);
1315 /* Must write buffer begin before incrementing commit count */
1317 offsets
->commit_count
=
1318 local_add_return(rchan
->subbuf_size
1319 - (SUBBUF_OFFSET(offsets
->end
- 1, rchan
)
1321 <t_buf
->commit_count
[endidx
]);
1322 if ((BUFFER_TRUNC(offsets
->end
- 1, rchan
)
1323 >> ltt_channel
->n_subbufs_order
)
1324 - ((offsets
->commit_count
- rchan
->subbuf_size
)
1325 & ltt_channel
->commit_count_mask
) == 0)
1326 ltt_deliver(buf
, endidx
, NULL
);
1330 * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
1331 * @trace: the trace structure to log to.
1332 * @ltt_channel: channel structure
1333 * @transport_data: data structure specific to ltt relay
1334 * @data_size: size of the variable length data to log.
1335 * @slot_size: pointer to total size of the slot (out)
1336 * @buf_offset : pointer to reserved buffer offset (out)
1337 * @tsc: pointer to the tsc at the slot reservation (out)
1340 * Return : -ENOSPC if not enough space, else returns 0.
1341 * It will take care of sub-buffer switching.
1343 static notrace
int ltt_relay_reserve_slot(struct ltt_trace_struct
*trace
,
1344 struct ltt_channel_struct
*ltt_channel
, void **transport_data
,
1345 size_t data_size
, size_t *slot_size
, long *buf_offset
, u64
*tsc
,
1346 unsigned int *rflags
, int largest_align
, int cpu
)
1348 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
1349 struct rchan_buf
*buf
= *transport_data
=
1351 struct ltt_channel_buf_struct
*ltt_buf
=
1352 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1353 struct ltt_reserve_switch_offsets offsets
;
1355 offsets
.reserve_commit_diff
= 0;
1359 * Perform retryable operations.
1361 if (__get_cpu_var(ltt_nesting
) > 4) {
1362 local_inc(<t_buf
->events_lost
);
1366 if (ltt_relay_try_reserve(ltt_channel
, ltt_buf
,
1367 rchan
, buf
, &offsets
, data_size
, tsc
, rflags
,
1370 } while (local_cmpxchg(<t_buf
->offset
, offsets
.old
,
1371 offsets
.end
) != offsets
.old
);
1374 * Atomically update last_tsc. This update races against concurrent
1375 * atomic updates, but the race will always cause supplementary full TSC
1376 * events, never the opposite (missing a full TSC event when it would be
1379 save_last_tsc(ltt_buf
, *tsc
);
1382 * Push the reader if necessary
1384 ltt_reserve_push_reader(ltt_channel
, ltt_buf
, rchan
, buf
, &offsets
);
1387 * Switch old subbuffer if needed.
1389 if (offsets
.end_switch_old
)
1390 ltt_reserve_switch_old_subbuf(ltt_channel
, ltt_buf
, rchan
, buf
,
1394 * Populate new subbuffer.
1396 if (offsets
.begin_switch
)
1397 ltt_reserve_switch_new_subbuf(ltt_channel
, ltt_buf
, rchan
,
1398 buf
, &offsets
, tsc
);
1400 if (offsets
.end_switch_current
)
1401 ltt_reserve_end_switch_current(ltt_channel
, ltt_buf
, rchan
,
1402 buf
, &offsets
, tsc
);
1404 *slot_size
= offsets
.size
;
1405 *buf_offset
= offsets
.begin
+ offsets
.before_hdr_pad
;
1410 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1411 * completely reentrant : can be called while tracing is active with
1412 * absolutely no lock held.
1414 * Note, however, that as a local_cmpxchg is used for some atomic
1415 * operations, this function must be called from the CPU which owns the buffer
1416 * for a ACTIVE flush.
1418 static notrace
void ltt_force_switch(struct rchan_buf
*buf
,
1419 enum force_switch_mode mode
)
1421 struct ltt_channel_struct
*ltt_channel
=
1422 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
1423 struct ltt_channel_buf_struct
*ltt_buf
=
1424 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1425 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
1426 struct ltt_reserve_switch_offsets offsets
;
1429 offsets
.reserve_commit_diff
= 0;
1433 * Perform retryable operations.
1436 if (ltt_relay_try_switch(mode
, ltt_channel
, ltt_buf
,
1437 rchan
, buf
, &offsets
, &tsc
))
1439 } while (local_cmpxchg(<t_buf
->offset
, offsets
.old
,
1440 offsets
.end
) != offsets
.old
);
1443 * Atomically update last_tsc. This update races against concurrent
1444 * atomic updates, but the race will always cause supplementary full TSC
1445 * events, never the opposite (missing a full TSC event when it would be
1448 save_last_tsc(ltt_buf
, tsc
);
1451 * Push the reader if necessary
1453 if (mode
== FORCE_ACTIVE
)
1454 ltt_reserve_push_reader(ltt_channel
, ltt_buf
, rchan
,
1458 * Switch old subbuffer if needed.
1460 if (offsets
.end_switch_old
)
1461 ltt_reserve_switch_old_subbuf(ltt_channel
, ltt_buf
, rchan
, buf
,
1465 * Populate new subbuffer.
1467 if (mode
== FORCE_ACTIVE
)
1468 ltt_reserve_switch_new_subbuf(ltt_channel
,
1469 ltt_buf
, rchan
, buf
, &offsets
, &tsc
);
1473 * for flight recording. must be called after relay_commit.
1474 * This function decrements de subbuffer's lost_size each time the commit count
1475 * reaches back the reserve offset (module subbuffer size). It is useful for
1477 * We use slot_size - 1 to make sure we deal correctly with the case where we
1478 * fill the subbuffer completely (so the subbuf index stays in the previous
1481 #ifdef CONFIG_LTT_VMCORE
1482 static inline void ltt_write_commit_counter(struct rchan_buf
*buf
,
1483 long buf_offset
, size_t slot_size
)
1485 struct ltt_channel_struct
*ltt_channel
=
1486 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
1487 struct ltt_channel_buf_struct
*ltt_buf
=
1488 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1489 struct ltt_subbuffer_header
*header
;
1490 long offset
, subbuf_idx
, commit_count
;
1491 uint32_t lost_old
, lost_new
;
1493 subbuf_idx
= SUBBUF_INDEX(buf_offset
- 1, buf
->chan
);
1494 offset
= buf_offset
+ slot_size
;
1495 header
= (struct ltt_subbuffer_header
*)
1496 ltt_relay_offset_address(buf
,
1497 subbuf_idx
* buf
->chan
->subbuf_size
);
1499 lost_old
= header
->lost_size
;
1501 local_read(<t_buf
->commit_count
[subbuf_idx
]);
1502 /* SUBBUF_OFFSET includes commit_count_mask */
1503 if (!SUBBUF_OFFSET(offset
- commit_count
, buf
->chan
)) {
1504 lost_new
= (uint32_t)buf
->chan
->subbuf_size
1505 - SUBBUF_OFFSET(commit_count
, buf
->chan
);
1506 lost_old
= cmpxchg_local(&header
->lost_size
, lost_old
,
1508 if (lost_old
<= lost_new
)
1516 static inline void ltt_write_commit_counter(struct rchan_buf
*buf
,
1517 long buf_offset
, size_t slot_size
)
1523 * Atomic unordered slot commit. Increments the commit count in the
1524 * specified sub-buffer, and delivers it if necessary.
1528 * @ltt_channel : channel structure
1529 * @transport_data: transport-specific data
1530 * @buf_offset : offset following the event header.
1531 * @slot_size : size of the reserved slot.
1533 static notrace
void ltt_relay_commit_slot(
1534 struct ltt_channel_struct
*ltt_channel
,
1535 void **transport_data
, long buf_offset
, size_t slot_size
)
1537 struct rchan_buf
*buf
= *transport_data
;
1538 struct ltt_channel_buf_struct
*ltt_buf
=
1539 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1540 struct rchan
*rchan
= buf
->chan
;
1541 long offset_end
= buf_offset
;
1542 long endidx
= SUBBUF_INDEX(offset_end
- 1, rchan
);
1545 /* Must write slot data before incrementing commit count */
1547 commit_count
= local_add_return(slot_size
,
1548 <t_buf
->commit_count
[endidx
]);
1549 /* Check if all commits have been done */
1550 if ((BUFFER_TRUNC(offset_end
- 1, rchan
)
1551 >> ltt_channel
->n_subbufs_order
)
1552 - ((commit_count
- rchan
->subbuf_size
)
1553 & ltt_channel
->commit_count_mask
) == 0)
1554 ltt_deliver(buf
, endidx
, NULL
);
1556 * Update lost_size for each commit. It's needed only for extracting
1557 * ltt buffers from vmcore, after crash.
1559 ltt_write_commit_counter(buf
, buf_offset
, slot_size
);
1563 * This is called with preemption disabled when user space has requested
1564 * blocking mode. If one of the active traces has free space below a
1565 * specific threshold value, we reenable preemption and block.
1567 static int ltt_relay_user_blocking(struct ltt_trace_struct
*trace
,
1568 unsigned int chan_index
, size_t data_size
,
1569 struct user_dbg_data
*dbg
)
1571 struct rchan
*rchan
;
1572 struct ltt_channel_buf_struct
*ltt_buf
;
1573 struct ltt_channel_struct
*channel
;
1574 struct rchan_buf
*relay_buf
;
1576 DECLARE_WAITQUEUE(wait
, current
);
1578 channel
= &trace
->channels
[chan_index
];
1579 rchan
= channel
->trans_channel_data
;
1580 cpu
= smp_processor_id();
1581 relay_buf
= rchan
->buf
[cpu
];
1582 ltt_buf
= percpu_ptr(channel
->buf
, cpu
);
1585 * Check if data is too big for the channel : do not
1588 if (LTT_RESERVE_CRITICAL
+ data_size
> relay_buf
->chan
->subbuf_size
)
1592 * If free space too low, we block. We restart from the
1593 * beginning after we resume (cpu id may have changed
1594 * while preemption is active).
1596 spin_lock(<t_buf
->full_lock
);
1597 if (!channel
->overwrite
) {
1598 dbg
->write
= local_read(<t_buf
->offset
);
1599 dbg
->read
= atomic_long_read(<t_buf
->consumed
);
1600 dbg
->avail_size
= dbg
->write
+ LTT_RESERVE_CRITICAL
+ data_size
1601 - SUBBUF_TRUNC(dbg
->read
,
1603 if (dbg
->avail_size
> rchan
->alloc_size
) {
1604 __set_current_state(TASK_INTERRUPTIBLE
);
1605 add_wait_queue(<t_buf
->write_wait
, &wait
);
1606 spin_unlock(<t_buf
->full_lock
);
1609 __set_current_state(TASK_RUNNING
);
1610 remove_wait_queue(<t_buf
->write_wait
, &wait
);
1611 if (signal_pending(current
))
1612 return -ERESTARTSYS
;
1617 spin_unlock(<t_buf
->full_lock
);
1621 static void ltt_relay_print_user_errors(struct ltt_trace_struct
*trace
,
1622 unsigned int chan_index
, size_t data_size
,
1623 struct user_dbg_data
*dbg
, int cpu
)
1625 struct rchan
*rchan
;
1626 struct ltt_channel_buf_struct
*ltt_buf
;
1627 struct ltt_channel_struct
*channel
;
1628 struct rchan_buf
*relay_buf
;
1630 channel
= &trace
->channels
[chan_index
];
1631 rchan
= channel
->trans_channel_data
;
1632 relay_buf
= rchan
->buf
[cpu
];
1633 ltt_buf
= percpu_ptr(channel
->buf
, cpu
);
1635 printk(KERN_ERR
"Error in LTT usertrace : "
1636 "buffer full : event lost in blocking "
1637 "mode. Increase LTT_RESERVE_CRITICAL.\n");
1638 printk(KERN_ERR
"LTT nesting level is %u.\n",
1639 per_cpu(ltt_nesting
, cpu
));
1640 printk(KERN_ERR
"LTT avail size %lu.\n",
1642 printk(KERN_ERR
"avai write : %lu, read : %lu\n",
1643 dbg
->write
, dbg
->read
);
1645 dbg
->write
= local_read(<t_buf
->offset
);
1646 dbg
->read
= atomic_long_read(<t_buf
->consumed
);
1648 printk(KERN_ERR
"LTT cur size %lu.\n",
1649 dbg
->write
+ LTT_RESERVE_CRITICAL
+ data_size
1650 - SUBBUF_TRUNC(dbg
->read
, relay_buf
->chan
));
1651 printk(KERN_ERR
"cur write : %lu, read : %lu\n",
1652 dbg
->write
, dbg
->read
);
1655 //ust// static struct ltt_transport ltt_relay_transport = {
1656 //ust// .name = "relay",
1657 //ust// .owner = THIS_MODULE,
1659 //ust// .create_dirs = ltt_relay_create_dirs,
1660 //ust// .remove_dirs = ltt_relay_remove_dirs,
1661 //ust// .create_channel = ltt_relay_create_channel,
1662 //ust// .finish_channel = ltt_relay_finish_channel,
1663 //ust// .remove_channel = ltt_relay_remove_channel,
1664 //ust// .wakeup_channel = ltt_relay_async_wakeup_chan,
1665 //ust// .commit_slot = ltt_relay_commit_slot,
1666 //ust// .reserve_slot = ltt_relay_reserve_slot,
1667 //ust// .user_blocking = ltt_relay_user_blocking,
1668 //ust// .user_errors = ltt_relay_print_user_errors,
1672 static struct ltt_transport ust_relay_transport
= {
1674 .owner
= THIS_MODULE
,
1676 .create_dirs
= ltt_relay_create_dirs
,
1677 .remove_dirs
= ltt_relay_remove_dirs
,
1678 .create_channel
= ltt_relay_create_channel
,
1679 .finish_channel
= ltt_relay_finish_channel
,
1680 .remove_channel
= ltt_relay_remove_channel
,
1681 .wakeup_channel
= ltt_relay_async_wakeup_chan
,
1682 .commit_slot
= ltt_relay_commit_slot
,
1683 .reserve_slot
= ltt_relay_reserve_slot
,
1684 .user_blocking
= ltt_relay_user_blocking
,
1685 .user_errors
= ltt_relay_print_user_errors
,
1689 //ust// static int __init ltt_relay_init(void)
1691 //ust// printk(KERN_INFO "LTT : ltt-relay init\n");
1693 //ust// ltt_file_operations = ltt_relay_file_operations;
1694 //ust// ltt_file_operations.owner = THIS_MODULE;
1695 //ust// ltt_file_operations.open = ltt_open;
1696 //ust// ltt_file_operations.release = ltt_release;
1697 //ust// ltt_file_operations.poll = ltt_poll;
1698 //ust// ltt_file_operations.splice_read = ltt_relay_file_splice_read,
1699 //ust// ltt_file_operations.ioctl = ltt_ioctl;
1700 //ust//#ifdef CONFIG_COMPAT
1701 //ust// ltt_file_operations.compat_ioctl = ltt_compat_ioctl;
1704 //ust// ltt_transport_register(<t_relay_transport);
1709 void init_ustrelay_transport(void)
1711 ltt_transport_register(&ust_relay_transport
);
1714 static void __exit
ltt_relay_exit(void)
1716 //ust// printk(KERN_INFO "LTT : ltt-relay exit\n");
1718 ltt_transport_unregister(<t_relay_transport
);
1721 //ust// module_init(ltt_relay_init);
1722 //ust// module_exit(ltt_relay_exit);
1724 //ust// MODULE_LICENSE("GPL");
1725 //ust// MODULE_AUTHOR("Mathieu Desnoyers");
1726 //ust// MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");