4 * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
6 * LTTng lockless buffer space management (reader/writer).
9 * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
12 * Karim Yaghmour (karim@opersys.com)
13 * Tom Zanussi (zanussi@us.ibm.com)
14 * Bob Wisniewski (bob@watson.ibm.com)
16 * Bob Wisniewski (bob@watson.ibm.com)
20 * 19/10/05, Complete lockless mechanism.
21 * 27/05/05, Modular redesign and rewrite.
23 * Userspace reader semantic :
24 * while (poll fd != POLLHUP) {
25 * - ioctl RELAY_GET_SUBBUF_SIZE
28 * - splice 1 subbuffer worth of data to a pipe
29 * - splice the data from pipe to disk/network
30 * - ioctl PUT_SUBBUF, check error value
31 * if err val < 0, previous subbuffer was corrupted.
36 #include <linux/time.h>
37 #include <linux/ltt-tracer.h>
38 #include <linux/ltt-relay.h>
39 #include <linux/module.h>
40 #include <linux/string.h>
41 #include <linux/slab.h>
42 #include <linux/init.h>
43 #include <linux/rcupdate.h>
44 #include <linux/sched.h>
45 #include <linux/bitops.h>
47 #include <linux/smp_lock.h>
48 #include <linux/debugfs.h>
49 #include <linux/stat.h>
50 #include <linux/cpu.h>
51 #include <linux/pipe_fs_i.h>
52 #include <linux/splice.h>
53 #include <asm/atomic.h>
54 #include <asm/local.h>
57 #define printk_dbg(fmt, args...) printk(fmt, args)
59 #define printk_dbg(fmt, args...)
62 /* LTTng lockless logging buffer info */
63 struct ltt_channel_buf_struct
{
64 /* First 32 bytes cache-hot cacheline */
65 local_t offset
; /* Current offset in the buffer */
66 local_t
*commit_count
; /* Commit count per sub-buffer */
67 atomic_long_t consumed
; /*
68 * Current offset in the buffer
69 * standard atomic access (shared)
71 unsigned long last_tsc
; /*
72 * Last timestamp written in the buffer.
74 /* End of first 32 bytes cacheline */
75 atomic_long_t active_readers
; /*
76 * Active readers count
77 * standard atomic access (shared)
80 local_t corrupted_subbuffers
;
81 spinlock_t full_lock
; /*
82 * buffer full condition spinlock, only
83 * for userspace tracing blocking mode
84 * synchronization with reader.
86 wait_queue_head_t write_wait
; /*
87 * Wait queue for blocking user space
90 atomic_t wakeup_readers
; /* Boolean : wakeup readers waiting ? */
91 } ____cacheline_aligned
;
94 * Last TSC comparison functions. Check if the current TSC overflows
95 * LTT_TSC_BITS bits from the last TSC read. Reads and writes last_tsc
99 #if (BITS_PER_LONG == 32)
100 static inline void save_last_tsc(struct ltt_channel_buf_struct
*ltt_buf
,
103 ltt_buf
->last_tsc
= (unsigned long)(tsc
>> LTT_TSC_BITS
);
106 static inline int last_tsc_overflow(struct ltt_channel_buf_struct
*ltt_buf
,
109 unsigned long tsc_shifted
= (unsigned long)(tsc
>> LTT_TSC_BITS
);
111 if (unlikely((tsc_shifted
- ltt_buf
->last_tsc
)))
117 static inline void save_last_tsc(struct ltt_channel_buf_struct
*ltt_buf
,
120 ltt_buf
->last_tsc
= (unsigned long)tsc
;
123 static inline int last_tsc_overflow(struct ltt_channel_buf_struct
*ltt_buf
,
126 if (unlikely((tsc
- ltt_buf
->last_tsc
) >> LTT_TSC_BITS
))
133 static struct file_operations ltt_file_operations
;
136 * A switch is done during tracing or as a final flush after tracing (so it
137 * won't write in the new sub-buffer).
139 enum force_switch_mode
{ FORCE_ACTIVE
, FORCE_FLUSH
};
141 static int ltt_relay_create_buffer(struct ltt_trace_struct
*trace
,
142 struct ltt_channel_struct
*ltt_chan
,
143 struct rchan_buf
*buf
,
145 unsigned int n_subbufs
);
147 static void ltt_relay_destroy_buffer(struct ltt_channel_struct
*ltt_chan
,
150 static void ltt_force_switch(struct rchan_buf
*buf
,
151 enum force_switch_mode mode
);
156 static void ltt_buffer_begin_callback(struct rchan_buf
*buf
,
157 u64 tsc
, unsigned int subbuf_idx
)
159 struct ltt_channel_struct
*channel
=
160 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
161 struct ltt_subbuffer_header
*header
=
162 (struct ltt_subbuffer_header
*)
163 ltt_relay_offset_address(buf
,
164 subbuf_idx
* buf
->chan
->subbuf_size
);
166 header
->cycle_count_begin
= tsc
;
167 header
->lost_size
= 0xFFFFFFFF; /* for debugging */
168 header
->buf_size
= buf
->chan
->subbuf_size
;
169 ltt_write_trace_header(channel
->trace
, header
);
173 * offset is assumed to never be 0 here : never deliver a completely empty
174 * subbuffer. The lost size is between 0 and subbuf_size-1.
176 static notrace
void ltt_buffer_end_callback(struct rchan_buf
*buf
,
177 u64 tsc
, unsigned int offset
, unsigned int subbuf_idx
)
179 struct ltt_channel_struct
*channel
=
180 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
181 struct ltt_channel_buf_struct
*ltt_buf
=
182 percpu_ptr(channel
->buf
, buf
->cpu
);
183 struct ltt_subbuffer_header
*header
=
184 (struct ltt_subbuffer_header
*)
185 ltt_relay_offset_address(buf
,
186 subbuf_idx
* buf
->chan
->subbuf_size
);
188 header
->lost_size
= SUBBUF_OFFSET((buf
->chan
->subbuf_size
- offset
),
190 header
->cycle_count_end
= tsc
;
191 header
->events_lost
= local_read(<t_buf
->events_lost
);
192 header
->subbuf_corrupt
= local_read(<t_buf
->corrupted_subbuffers
);
195 static notrace
void ltt_deliver(struct rchan_buf
*buf
, unsigned int subbuf_idx
,
198 struct ltt_channel_struct
*channel
=
199 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
200 struct ltt_channel_buf_struct
*ltt_buf
=
201 percpu_ptr(channel
->buf
, buf
->cpu
);
203 atomic_set(<t_buf
->wakeup_readers
, 1);
206 static struct dentry
*ltt_create_buf_file_callback(const char *filename
,
207 struct dentry
*parent
, int mode
,
208 struct rchan_buf
*buf
)
210 struct ltt_channel_struct
*ltt_chan
;
212 struct dentry
*dentry
;
214 ltt_chan
= buf
->chan
->private_data
;
215 err
= ltt_relay_create_buffer(ltt_chan
->trace
, ltt_chan
,
217 buf
->chan
->n_subbufs
);
221 dentry
= debugfs_create_file(filename
, mode
, parent
, buf
,
222 <t_file_operations
);
227 ltt_relay_destroy_buffer(ltt_chan
, buf
->cpu
);
231 static int ltt_remove_buf_file_callback(struct dentry
*dentry
)
233 struct rchan_buf
*buf
= dentry
->d_inode
->i_private
;
234 struct ltt_channel_struct
*ltt_chan
= buf
->chan
->private_data
;
236 debugfs_remove(dentry
);
237 ltt_relay_destroy_buffer(ltt_chan
, buf
->cpu
);
245 * This must be done after the trace is removed from the RCU list so that there
246 * are no stalled writers.
248 static void ltt_relay_wake_writers(struct ltt_channel_buf_struct
*ltt_buf
)
251 if (waitqueue_active(<t_buf
->write_wait
))
252 wake_up_interruptible(<t_buf
->write_wait
);
256 * This function should not be called from NMI interrupt context
258 static notrace
void ltt_buf_unfull(struct rchan_buf
*buf
,
259 unsigned int subbuf_idx
,
262 struct ltt_channel_struct
*ltt_channel
=
263 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
264 struct ltt_channel_buf_struct
*ltt_buf
=
265 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
267 ltt_relay_wake_writers(ltt_buf
);
271 * ltt_open - open file op for ltt files
272 * @inode: opened inode
275 * Open implementation. Makes sure only one open instance of a buffer is
276 * done at a given moment.
278 static int ltt_open(struct inode
*inode
, struct file
*file
)
280 struct rchan_buf
*buf
= inode
->i_private
;
281 struct ltt_channel_struct
*ltt_channel
=
282 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
283 struct ltt_channel_buf_struct
*ltt_buf
=
284 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
286 if (!atomic_long_add_unless(<t_buf
->active_readers
, 1, 1))
288 return ltt_relay_file_operations
.open(inode
, file
);
292 * ltt_release - release file op for ltt files
293 * @inode: opened inode
296 * Release implementation.
298 static int ltt_release(struct inode
*inode
, struct file
*file
)
300 struct rchan_buf
*buf
= inode
->i_private
;
301 struct ltt_channel_struct
*ltt_channel
=
302 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
303 struct ltt_channel_buf_struct
*ltt_buf
=
304 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
307 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
308 atomic_long_dec(<t_buf
->active_readers
);
309 ret
= ltt_relay_file_operations
.release(inode
, file
);
315 * ltt_poll - file op for ltt files
319 * Poll implementation.
321 static unsigned int ltt_poll(struct file
*filp
, poll_table
*wait
)
323 unsigned int mask
= 0;
324 struct inode
*inode
= filp
->f_dentry
->d_inode
;
325 struct rchan_buf
*buf
= inode
->i_private
;
326 struct ltt_channel_struct
*ltt_channel
=
327 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
328 struct ltt_channel_buf_struct
*ltt_buf
=
329 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
331 if (filp
->f_mode
& FMODE_READ
) {
332 poll_wait_set_exclusive(wait
);
333 poll_wait(filp
, &buf
->read_wait
, wait
);
335 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
336 if (SUBBUF_TRUNC(local_read(<t_buf
->offset
),
338 - SUBBUF_TRUNC(atomic_long_read(<t_buf
->consumed
),
346 struct rchan
*rchan
=
347 ltt_channel
->trans_channel_data
;
348 if (SUBBUF_TRUNC(local_read(<t_buf
->offset
),
350 - SUBBUF_TRUNC(atomic_long_read(
353 >= rchan
->alloc_size
)
354 return POLLPRI
| POLLRDBAND
;
356 return POLLIN
| POLLRDNORM
;
362 static int ltt_do_get_subbuf(struct rchan_buf
*buf
, struct ltt_channel_buf_struct
*ltt_buf
, long *pconsumed_old
)
364 long consumed_old
, consumed_idx
, commit_count
, write_offset
;
365 consumed_old
= atomic_long_read(<t_buf
->consumed
);
366 consumed_idx
= SUBBUF_INDEX(consumed_old
, buf
->chan
);
367 commit_count
= local_read(<t_buf
->commit_count
[consumed_idx
]);
369 * Make sure we read the commit count before reading the buffer
370 * data and the write offset. Correct consumed offset ordering
371 * wrt commit count is insured by the use of cmpxchg to update
372 * the consumed offset.
375 write_offset
= local_read(<t_buf
->offset
);
377 * Check that the subbuffer we are trying to consume has been
378 * already fully committed.
380 if (((commit_count
- buf
->chan
->subbuf_size
)
381 & ltt_channel
->commit_count_mask
)
382 - (BUFFER_TRUNC(consumed_old
, buf
->chan
)
383 >> ltt_channel
->n_subbufs_order
)
388 * Check that we are not about to read the same subbuffer in
389 * which the writer head is.
391 if ((SUBBUF_TRUNC(write_offset
, buf
->chan
)
392 - SUBBUF_TRUNC(consumed_old
, buf
->chan
))
397 *pconsumed_old
= consumed_old
;
401 static int ltt_do_put_subbuf(struct rchan_buf
*buf
, struct ltt_channel_buf_struct
*ltt_buf
, u32 uconsumed_old
)
403 long consumed_new
, consumed_old
;
405 consumed_old
= atomic_long_read(<t_buf
->consumed
);
406 consumed_old
= consumed_old
& (~0xFFFFFFFFL
);
407 consumed_old
= consumed_old
| uconsumed_old
;
408 consumed_new
= SUBBUF_ALIGN(consumed_old
, buf
->chan
);
410 spin_lock(<t_buf
->full_lock
);
411 if (atomic_long_cmpxchg(<t_buf
->consumed
, consumed_old
,
414 /* We have been pushed by the writer : the last
415 * buffer read _is_ corrupted! It can also
416 * happen if this is a buffer we never got. */
417 spin_unlock(<t_buf
->full_lock
);
420 /* tell the client that buffer is now unfull */
423 index
= SUBBUF_INDEX(consumed_old
, buf
->chan
);
424 data
= BUFFER_OFFSET(consumed_old
, buf
->chan
);
425 ltt_buf_unfull(buf
, index
, data
);
426 spin_unlock(<t_buf
->full_lock
);
432 * ltt_ioctl - control on the debugfs file
439 * This ioctl implements three commands necessary for a minimal
440 * producer/consumer implementation :
442 * Get the next sub buffer that can be read. It never blocks.
444 * Release the currently read sub-buffer. Parameter is the last
445 * put subbuffer (returned by GET_SUBBUF).
446 * RELAY_GET_N_BUBBUFS
447 * returns the number of sub buffers in the per cpu channel.
448 * RELAY_GET_SUBBUF_SIZE
449 * returns the size of the sub buffers.
451 static int ltt_ioctl(struct inode
*inode
, struct file
*filp
,
452 unsigned int cmd
, unsigned long arg
)
454 struct rchan_buf
*buf
= inode
->i_private
;
455 struct ltt_channel_struct
*ltt_channel
=
456 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
457 struct ltt_channel_buf_struct
*ltt_buf
=
458 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
459 u32 __user
*argp
= (u32 __user
*)arg
;
461 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
463 case RELAY_GET_SUBBUF
:
466 ret
= ltt_do_get_subbuf(buf
, ltt_buf
, &consumed_old
);
469 return put_user((u32
)consumed_old
, argp
);
471 case RELAY_PUT_SUBBUF
:
475 ret
= get_user(uconsumed_old
, argp
);
477 return ret
; /* will return -EFAULT */
478 return ltt_do_put_subbuf(buf
, ltt_buf
, uconsumed_old
);
480 case RELAY_GET_N_SUBBUFS
:
481 return put_user((u32
)buf
->chan
->n_subbufs
, argp
);
483 case RELAY_GET_SUBBUF_SIZE
:
484 return put_user((u32
)buf
->chan
->subbuf_size
, argp
);
493 static long ltt_compat_ioctl(struct file
*file
, unsigned int cmd
,
496 long ret
= -ENOIOCTLCMD
;
499 ret
= ltt_ioctl(file
->f_dentry
->d_inode
, file
, cmd
, arg
);
506 static void ltt_relay_pipe_buf_release(struct pipe_inode_info
*pipe
,
507 struct pipe_buffer
*pbuf
)
511 static struct pipe_buf_operations ltt_relay_pipe_buf_ops
= {
513 .map
= generic_pipe_buf_map
,
514 .unmap
= generic_pipe_buf_unmap
,
515 .confirm
= generic_pipe_buf_confirm
,
516 .release
= ltt_relay_pipe_buf_release
,
517 .steal
= generic_pipe_buf_steal
,
518 .get
= generic_pipe_buf_get
,
521 static void ltt_relay_page_release(struct splice_pipe_desc
*spd
, unsigned int i
)
526 * subbuf_splice_actor - splice up to one subbuf's worth of data
528 static int subbuf_splice_actor(struct file
*in
,
530 struct pipe_inode_info
*pipe
,
534 struct rchan_buf
*buf
= in
->private_data
;
535 struct ltt_channel_struct
*ltt_channel
=
536 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
537 struct ltt_channel_buf_struct
*ltt_buf
=
538 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
539 unsigned int poff
, subbuf_pages
, nr_pages
;
540 struct page
*pages
[PIPE_BUFFERS
];
541 struct partial_page partial
[PIPE_BUFFERS
];
542 struct splice_pipe_desc spd
= {
547 .ops
= <t_relay_pipe_buf_ops
,
548 .spd_release
= ltt_relay_page_release
,
550 long consumed_old
, consumed_idx
, roffset
;
551 unsigned long bytes_avail
;
554 * Check that a GET_SUBBUF ioctl has been done before.
556 WARN_ON(atomic_long_read(<t_buf
->active_readers
) != 1);
557 consumed_old
= atomic_long_read(<t_buf
->consumed
);
558 consumed_old
+= *ppos
;
559 consumed_idx
= SUBBUF_INDEX(consumed_old
, buf
->chan
);
562 * Adjust read len, if longer than what is available
564 bytes_avail
= SUBBUF_TRUNC(local_read(<t_buf
->offset
), buf
->chan
)
566 WARN_ON(bytes_avail
> buf
->chan
->alloc_size
);
567 len
= min_t(size_t, len
, bytes_avail
);
568 subbuf_pages
= bytes_avail
>> PAGE_SHIFT
;
569 nr_pages
= min_t(unsigned int, subbuf_pages
, PIPE_BUFFERS
);
570 roffset
= consumed_old
& PAGE_MASK
;
571 poff
= consumed_old
& ~PAGE_MASK
;
572 printk_dbg(KERN_DEBUG
"SPLICE actor len %zu pos %zd write_pos %ld\n",
573 len
, (ssize_t
)*ppos
, local_read(<t_buf
->offset
));
575 for (; spd
.nr_pages
< nr_pages
; spd
.nr_pages
++) {
576 unsigned int this_len
;
577 struct buf_page
*page
;
581 printk_dbg(KERN_DEBUG
"SPLICE actor loop len %zu roffset %ld\n",
584 this_len
= PAGE_SIZE
- poff
;
585 page
= ltt_relay_read_get_page(buf
, roffset
);
586 spd
.pages
[spd
.nr_pages
] = page
->page
;
587 spd
.partial
[spd
.nr_pages
].offset
= poff
;
588 spd
.partial
[spd
.nr_pages
].len
= this_len
;
591 roffset
+= PAGE_SIZE
;
598 return splice_to_pipe(pipe
, &spd
);
601 static ssize_t
ltt_relay_file_splice_read(struct file
*in
,
603 struct pipe_inode_info
*pipe
,
613 printk_dbg(KERN_DEBUG
"SPLICE read len %zu pos %zd\n",
614 len
, (ssize_t
)*ppos
);
615 while (len
&& !spliced
) {
616 ret
= subbuf_splice_actor(in
, ppos
, pipe
, len
, flags
);
617 printk_dbg(KERN_DEBUG
"SPLICE read loop ret %d\n", ret
);
621 if (flags
& SPLICE_F_NONBLOCK
)
640 static void ltt_relay_print_subbuffer_errors(
641 struct ltt_channel_struct
*ltt_chan
,
642 long cons_off
, unsigned int cpu
)
644 struct rchan
*rchan
= ltt_chan
->trans_channel_data
;
645 struct ltt_channel_buf_struct
*ltt_buf
=
646 percpu_ptr(ltt_chan
->buf
, cpu
);
647 long cons_idx
, commit_count
, write_offset
;
649 cons_idx
= SUBBUF_INDEX(cons_off
, rchan
);
650 commit_count
= local_read(<t_buf
->commit_count
[cons_idx
]);
652 * No need to order commit_count and write_offset reads because we
653 * execute after trace is stopped when there are no readers left.
655 write_offset
= local_read(<t_buf
->offset
);
657 "LTT : unread channel %s offset is %ld "
658 "and cons_off : %ld (cpu %u)\n",
659 ltt_chan
->channel_name
, write_offset
, cons_off
, cpu
);
660 /* Check each sub-buffer for non filled commit count */
661 if (((commit_count
- rchan
->subbuf_size
) & ltt_chan
->commit_count_mask
)
662 - (BUFFER_TRUNC(cons_off
, rchan
) >> ltt_chan
->n_subbufs_order
)
665 "LTT : %s : subbuffer %lu has non filled "
666 "commit count %lu.\n",
667 ltt_chan
->channel_name
, cons_idx
, commit_count
);
668 printk(KERN_ALERT
"LTT : %s : commit count : %lu, subbuf size %zd\n",
669 ltt_chan
->channel_name
, commit_count
,
673 static void ltt_relay_print_errors(struct ltt_trace_struct
*trace
,
674 struct ltt_channel_struct
*ltt_chan
, int cpu
)
676 struct rchan
*rchan
= ltt_chan
->trans_channel_data
;
677 struct ltt_channel_buf_struct
*ltt_buf
=
678 percpu_ptr(ltt_chan
->buf
, cpu
);
681 for (cons_off
= atomic_long_read(<t_buf
->consumed
);
682 (SUBBUF_TRUNC(local_read(<t_buf
->offset
),
685 cons_off
= SUBBUF_ALIGN(cons_off
, rchan
))
686 ltt_relay_print_subbuffer_errors(ltt_chan
, cons_off
, cpu
);
689 static void ltt_relay_print_buffer_errors(struct ltt_channel_struct
*ltt_chan
,
692 struct ltt_trace_struct
*trace
= ltt_chan
->trace
;
693 struct ltt_channel_buf_struct
*ltt_buf
=
694 percpu_ptr(ltt_chan
->buf
, cpu
);
696 if (local_read(<t_buf
->events_lost
))
698 "LTT : %s : %ld events lost "
699 "in %s channel (cpu %u).\n",
700 ltt_chan
->channel_name
,
701 local_read(<t_buf
->events_lost
),
702 ltt_chan
->channel_name
, cpu
);
703 if (local_read(<t_buf
->corrupted_subbuffers
))
705 "LTT : %s : %ld corrupted subbuffers "
706 "in %s channel (cpu %u).\n",
707 ltt_chan
->channel_name
,
708 local_read(<t_buf
->corrupted_subbuffers
),
709 ltt_chan
->channel_name
, cpu
);
711 ltt_relay_print_errors(trace
, ltt_chan
, cpu
);
714 static void ltt_relay_remove_dirs(struct ltt_trace_struct
*trace
)
716 debugfs_remove(trace
->dentry
.trace_root
);
719 static void ltt_relay_release_channel(struct kref
*kref
)
721 struct ltt_channel_struct
*ltt_chan
= container_of(kref
,
722 struct ltt_channel_struct
, kref
);
723 percpu_free(ltt_chan
->buf
);
729 static int ltt_relay_create_buffer(struct ltt_trace_struct
*trace
,
730 struct ltt_channel_struct
*ltt_chan
, struct rchan_buf
*buf
,
731 unsigned int cpu
, unsigned int n_subbufs
)
733 struct ltt_channel_buf_struct
*ltt_buf
=
734 percpu_ptr(ltt_chan
->buf
, cpu
);
737 ltt_buf
->commit_count
=
738 kzalloc_node(sizeof(ltt_buf
->commit_count
) * n_subbufs
,
739 GFP_KERNEL
, cpu_to_node(cpu
));
740 if (!ltt_buf
->commit_count
)
742 kref_get(&trace
->kref
);
743 kref_get(&trace
->ltt_transport_kref
);
744 kref_get(<t_chan
->kref
);
745 local_set(<t_buf
->offset
, ltt_subbuffer_header_size());
746 atomic_long_set(<t_buf
->consumed
, 0);
747 atomic_long_set(<t_buf
->active_readers
, 0);
748 for (j
= 0; j
< n_subbufs
; j
++)
749 local_set(<t_buf
->commit_count
[j
], 0);
750 init_waitqueue_head(<t_buf
->write_wait
);
751 atomic_set(<t_buf
->wakeup_readers
, 0);
752 spin_lock_init(<t_buf
->full_lock
);
754 ltt_buffer_begin_callback(buf
, trace
->start_tsc
, 0);
755 /* atomic_add made on local variable on data that belongs to
756 * various CPUs : ok because tracing not started (for this cpu). */
757 local_add(ltt_subbuffer_header_size(), <t_buf
->commit_count
[0]);
759 local_set(<t_buf
->events_lost
, 0);
760 local_set(<t_buf
->corrupted_subbuffers
, 0);
765 static void ltt_relay_destroy_buffer(struct ltt_channel_struct
*ltt_chan
,
768 struct ltt_trace_struct
*trace
= ltt_chan
->trace
;
769 struct ltt_channel_buf_struct
*ltt_buf
=
770 percpu_ptr(ltt_chan
->buf
, cpu
);
772 kref_put(<t_chan
->trace
->ltt_transport_kref
,
773 ltt_release_transport
);
774 ltt_relay_print_buffer_errors(ltt_chan
, cpu
);
775 kfree(ltt_buf
->commit_count
);
776 ltt_buf
->commit_count
= NULL
;
777 kref_put(<t_chan
->kref
, ltt_relay_release_channel
);
778 kref_put(&trace
->kref
, ltt_release_trace
);
779 wake_up_interruptible(&trace
->kref_wq
);
785 static int ltt_relay_create_channel(const char *trace_name
,
786 struct ltt_trace_struct
*trace
, struct dentry
*dir
,
787 const char *channel_name
, struct ltt_channel_struct
*ltt_chan
,
788 unsigned int subbuf_size
, unsigned int n_subbufs
,
792 unsigned int tmpname_len
;
795 tmpname
= kmalloc(PATH_MAX
, GFP_KERNEL
);
799 strncpy(tmpname
, LTT_FLIGHT_PREFIX
, PATH_MAX
-1);
800 strncat(tmpname
, channel_name
,
801 PATH_MAX
-1-sizeof(LTT_FLIGHT_PREFIX
));
803 strncpy(tmpname
, channel_name
, PATH_MAX
-1);
805 strncat(tmpname
, "_", PATH_MAX
-1-strlen(tmpname
));
807 kref_init(<t_chan
->kref
);
809 ltt_chan
->trace
= trace
;
810 ltt_chan
->buffer_begin
= ltt_buffer_begin_callback
;
811 ltt_chan
->buffer_end
= ltt_buffer_end_callback
;
812 ltt_chan
->overwrite
= overwrite
;
813 ltt_chan
->n_subbufs_order
= get_count_order(n_subbufs
);
814 ltt_chan
->commit_count_mask
= (~0UL >> ltt_chan
->n_subbufs_order
);
815 ltt_chan
->buf
= percpu_alloc_mask(sizeof(struct ltt_channel_buf_struct
),
816 GFP_KERNEL
, cpu_possible_map
);
818 goto ltt_percpu_alloc_error
;
819 ltt_chan
->trans_channel_data
= ltt_relay_open(tmpname
,
825 tmpname_len
= strlen(tmpname
);
826 if (tmpname_len
> 0) {
827 /* Remove final _ for pretty printing */
828 tmpname
[tmpname_len
-1] = '\0';
830 if (ltt_chan
->trans_channel_data
== NULL
) {
831 printk(KERN_ERR
"LTT : Can't open %s channel for trace %s\n",
832 tmpname
, trace_name
);
833 goto relay_open_error
;
840 percpu_free(ltt_chan
->buf
);
841 ltt_percpu_alloc_error
:
848 static int ltt_relay_create_dirs(struct ltt_trace_struct
*new_trace
)
850 new_trace
->dentry
.trace_root
= debugfs_create_dir(new_trace
->trace_name
,
852 if (new_trace
->dentry
.trace_root
== NULL
) {
853 printk(KERN_ERR
"LTT : Trace directory name %s already taken\n",
854 new_trace
->trace_name
);
858 new_trace
->callbacks
.create_buf_file
= ltt_create_buf_file_callback
;
859 new_trace
->callbacks
.remove_buf_file
= ltt_remove_buf_file_callback
;
865 * LTTng channel flush function.
867 * Must be called when no tracing is active in the channel, because of
868 * accesses across CPUs.
870 static notrace
void ltt_relay_buffer_flush(struct rchan_buf
*buf
)
873 ltt_force_switch(buf
, FORCE_FLUSH
);
876 static void ltt_relay_async_wakeup_chan(struct ltt_channel_struct
*ltt_channel
)
879 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
881 for_each_possible_cpu(i
) {
882 struct ltt_channel_buf_struct
*ltt_buf
=
883 percpu_ptr(ltt_channel
->buf
, i
);
885 if (atomic_read(<t_buf
->wakeup_readers
) == 1) {
886 atomic_set(<t_buf
->wakeup_readers
, 0);
887 wake_up_interruptible(&rchan
->buf
[i
]->read_wait
);
892 static void ltt_relay_finish_buffer(struct ltt_channel_struct
*ltt_channel
,
895 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
897 if (rchan
->buf
[cpu
]) {
898 struct ltt_channel_buf_struct
*ltt_buf
=
899 percpu_ptr(ltt_channel
->buf
, cpu
);
900 ltt_relay_buffer_flush(rchan
->buf
[cpu
]);
901 ltt_relay_wake_writers(ltt_buf
);
906 static void ltt_relay_finish_channel(struct ltt_channel_struct
*ltt_channel
)
910 for_each_possible_cpu(i
)
911 ltt_relay_finish_buffer(ltt_channel
, i
);
914 static void ltt_relay_remove_channel(struct ltt_channel_struct
*channel
)
916 struct rchan
*rchan
= channel
->trans_channel_data
;
918 ltt_relay_close(rchan
);
919 kref_put(&channel
->kref
, ltt_relay_release_channel
);
922 struct ltt_reserve_switch_offsets
{
923 long begin
, end
, old
;
924 long begin_switch
, end_switch_current
, end_switch_old
;
925 long commit_count
, reserve_commit_diff
;
926 size_t before_hdr_pad
, size
;
932 * !0 if execution must be aborted.
934 static inline int ltt_relay_try_reserve(
935 struct ltt_channel_struct
*ltt_channel
,
936 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
937 struct rchan_buf
*buf
,
938 struct ltt_reserve_switch_offsets
*offsets
, size_t data_size
,
939 u64
*tsc
, unsigned int *rflags
, int largest_align
)
941 offsets
->begin
= local_read(<t_buf
->offset
);
942 offsets
->old
= offsets
->begin
;
943 offsets
->begin_switch
= 0;
944 offsets
->end_switch_current
= 0;
945 offsets
->end_switch_old
= 0;
947 *tsc
= trace_clock_read64();
948 if (last_tsc_overflow(ltt_buf
, *tsc
))
949 *rflags
= LTT_RFLAG_ID_SIZE_TSC
;
951 if (SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) == 0) {
952 offsets
->begin_switch
= 1; /* For offsets->begin */
954 offsets
->size
= ltt_get_header_size(ltt_channel
,
955 offsets
->begin
, data_size
,
956 &offsets
->before_hdr_pad
, *rflags
);
957 offsets
->size
+= ltt_align(offsets
->begin
+ offsets
->size
,
960 if ((SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) + offsets
->size
)
961 > buf
->chan
->subbuf_size
) {
962 offsets
->end_switch_old
= 1; /* For offsets->old */
963 offsets
->begin_switch
= 1; /* For offsets->begin */
966 if (offsets
->begin_switch
) {
969 if (offsets
->end_switch_old
)
970 offsets
->begin
= SUBBUF_ALIGN(offsets
->begin
,
972 offsets
->begin
= offsets
->begin
+ ltt_subbuffer_header_size();
973 /* Test new buffer integrity */
974 subbuf_index
= SUBBUF_INDEX(offsets
->begin
, buf
->chan
);
975 offsets
->reserve_commit_diff
=
976 (BUFFER_TRUNC(offsets
->begin
, buf
->chan
)
977 >> ltt_channel
->n_subbufs_order
)
978 - (local_read(<t_buf
->commit_count
[subbuf_index
])
979 & ltt_channel
->commit_count_mask
);
980 if (offsets
->reserve_commit_diff
== 0) {
981 /* Next buffer not corrupted. */
982 if (!ltt_channel
->overwrite
&&
983 (SUBBUF_TRUNC(offsets
->begin
, buf
->chan
)
984 - SUBBUF_TRUNC(atomic_long_read(
987 >= rchan
->alloc_size
) {
989 * We do not overwrite non consumed buffers
990 * and we are full : event is lost.
992 local_inc(<t_buf
->events_lost
);
996 * next buffer not corrupted, we are either in
997 * overwrite mode or the buffer is not full.
998 * It's safe to write in this new subbuffer.
1003 * Next subbuffer corrupted. Force pushing reader even
1004 * in normal mode. It's safe to write in this new
1008 offsets
->size
= ltt_get_header_size(ltt_channel
,
1009 offsets
->begin
, data_size
,
1010 &offsets
->before_hdr_pad
, *rflags
);
1011 offsets
->size
+= ltt_align(offsets
->begin
+ offsets
->size
,
1014 if ((SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) + offsets
->size
)
1015 > buf
->chan
->subbuf_size
) {
1017 * Event too big for subbuffers, report error, don't
1018 * complete the sub-buffer switch.
1020 local_inc(<t_buf
->events_lost
);
1024 * We just made a successful buffer switch and the event
1025 * fits in the new subbuffer. Let's write.
1030 * Event fits in the current buffer and we are not on a switch
1031 * boundary. It's safe to write.
1034 offsets
->end
= offsets
->begin
+ offsets
->size
;
1036 if ((SUBBUF_OFFSET(offsets
->end
, buf
->chan
)) == 0) {
1038 * The offset_end will fall at the very beginning of the next
1041 offsets
->end_switch_current
= 1; /* For offsets->begin */
1049 * !0 if execution must be aborted.
1051 static inline int ltt_relay_try_switch(
1052 enum force_switch_mode mode
,
1053 struct ltt_channel_struct
*ltt_channel
,
1054 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1055 struct rchan_buf
*buf
,
1056 struct ltt_reserve_switch_offsets
*offsets
,
1061 offsets
->begin
= local_read(<t_buf
->offset
);
1062 offsets
->old
= offsets
->begin
;
1063 offsets
->begin_switch
= 0;
1064 offsets
->end_switch_old
= 0;
1066 *tsc
= trace_clock_read64();
1068 if (SUBBUF_OFFSET(offsets
->begin
, buf
->chan
) != 0) {
1069 offsets
->begin
= SUBBUF_ALIGN(offsets
->begin
, buf
->chan
);
1070 offsets
->end_switch_old
= 1;
1072 /* we do not have to switch : buffer is empty */
1075 if (mode
== FORCE_ACTIVE
)
1076 offsets
->begin
+= ltt_subbuffer_header_size();
1078 * Always begin_switch in FORCE_ACTIVE mode.
1079 * Test new buffer integrity
1081 subbuf_index
= SUBBUF_INDEX(offsets
->begin
, buf
->chan
);
1082 offsets
->reserve_commit_diff
=
1083 (BUFFER_TRUNC(offsets
->begin
, buf
->chan
)
1084 >> ltt_channel
->n_subbufs_order
)
1085 - (local_read(<t_buf
->commit_count
[subbuf_index
])
1086 & ltt_channel
->commit_count_mask
);
1087 if (offsets
->reserve_commit_diff
== 0) {
1088 /* Next buffer not corrupted. */
1089 if (mode
== FORCE_ACTIVE
1090 && !ltt_channel
->overwrite
1091 && offsets
->begin
- atomic_long_read(<t_buf
->consumed
)
1092 >= rchan
->alloc_size
) {
1094 * We do not overwrite non consumed buffers and we are
1095 * full : ignore switch while tracing is active.
1101 * Next subbuffer corrupted. Force pushing reader even in normal
1105 offsets
->end
= offsets
->begin
;
1109 static inline void ltt_reserve_push_reader(
1110 struct ltt_channel_struct
*ltt_channel
,
1111 struct ltt_channel_buf_struct
*ltt_buf
,
1112 struct rchan
*rchan
,
1113 struct rchan_buf
*buf
,
1114 struct ltt_reserve_switch_offsets
*offsets
)
1116 long consumed_old
, consumed_new
;
1119 consumed_old
= atomic_long_read(<t_buf
->consumed
);
1121 * If buffer is in overwrite mode, push the reader consumed
1122 * count if the write position has reached it and we are not
1123 * at the first iteration (don't push the reader farther than
1124 * the writer). This operation can be done concurrently by many
1125 * writers in the same buffer, the writer being at the farthest
1126 * write position sub-buffer index in the buffer being the one
1127 * which will win this loop.
1128 * If the buffer is not in overwrite mode, pushing the reader
1129 * only happens if a sub-buffer is corrupted.
1131 if ((SUBBUF_TRUNC(offsets
->end
-1, buf
->chan
)
1132 - SUBBUF_TRUNC(consumed_old
, buf
->chan
))
1133 >= rchan
->alloc_size
)
1134 consumed_new
= SUBBUF_ALIGN(consumed_old
, buf
->chan
);
1136 consumed_new
= consumed_old
;
1139 } while (atomic_long_cmpxchg(<t_buf
->consumed
, consumed_old
,
1140 consumed_new
) != consumed_old
);
1142 if (consumed_old
!= consumed_new
) {
1144 * Reader pushed : we are the winner of the push, we can
1145 * therefore reequilibrate reserve and commit. Atomic increment
1146 * of the commit count permits other writers to play around
1147 * with this variable before us. We keep track of
1148 * corrupted_subbuffers even in overwrite mode :
1149 * we never want to write over a non completely committed
1150 * sub-buffer : possible causes : the buffer size is too low
1151 * compared to the unordered data input, or there is a writer
1152 * that died between the reserve and the commit.
1154 if (offsets
->reserve_commit_diff
) {
1156 * We have to alter the sub-buffer commit count.
1157 * We do not deliver the previous subbuffer, given it
1158 * was either corrupted or not consumed (overwrite
1161 local_add(offsets
->reserve_commit_diff
,
1162 <t_buf
->commit_count
[
1163 SUBBUF_INDEX(offsets
->begin
,
1165 if (!ltt_channel
->overwrite
1166 || offsets
->reserve_commit_diff
1167 != rchan
->subbuf_size
) {
1169 * The reserve commit diff was not subbuf_size :
1170 * it means the subbuffer was partly written to
1171 * and is therefore corrupted. If it is multiple
1172 * of subbuffer size and we are in flight
1173 * recorder mode, we are skipping over a whole
1176 local_inc(<t_buf
->corrupted_subbuffers
);
1184 * ltt_reserve_switch_old_subbuf: switch old subbuffer
1186 * Concurrency safe because we are the last and only thread to alter this
1187 * sub-buffer. As long as it is not delivered and read, no other thread can
1188 * alter the offset, alter the reserve_count or call the
1189 * client_buffer_end_callback on this sub-buffer.
1191 * The only remaining threads could be the ones with pending commits. They will
1192 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1193 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1194 * corrupted sub-buffers count and push the readers across these sub-buffers.
1196 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1197 * switches in, finding out it's corrupted. The result will be than the old
1198 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1199 * will be declared corrupted too because of the commit count adjustment.
1201 * Note : offset_old should never be 0 here.
1203 static inline void ltt_reserve_switch_old_subbuf(
1204 struct ltt_channel_struct
*ltt_channel
,
1205 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1206 struct rchan_buf
*buf
,
1207 struct ltt_reserve_switch_offsets
*offsets
, u64
*tsc
)
1209 long oldidx
= SUBBUF_INDEX(offsets
->old
- 1, rchan
);
1211 ltt_channel
->buffer_end(buf
, *tsc
, offsets
->old
, oldidx
);
1212 /* Must write buffer end before incrementing commit count */
1214 offsets
->commit_count
=
1215 local_add_return(rchan
->subbuf_size
1216 - (SUBBUF_OFFSET(offsets
->old
- 1, rchan
)
1218 <t_buf
->commit_count
[oldidx
]);
1219 if ((BUFFER_TRUNC(offsets
->old
- 1, rchan
)
1220 >> ltt_channel
->n_subbufs_order
)
1221 - ((offsets
->commit_count
- rchan
->subbuf_size
)
1222 & ltt_channel
->commit_count_mask
) == 0)
1223 ltt_deliver(buf
, oldidx
, NULL
);
1227 * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
1229 * This code can be executed unordered : writers may already have written to the
1230 * sub-buffer before this code gets executed, caution. The commit makes sure
1231 * that this code is executed before the deliver of this sub-buffer.
1233 static inline void ltt_reserve_switch_new_subbuf(
1234 struct ltt_channel_struct
*ltt_channel
,
1235 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1236 struct rchan_buf
*buf
,
1237 struct ltt_reserve_switch_offsets
*offsets
, u64
*tsc
)
1239 long beginidx
= SUBBUF_INDEX(offsets
->begin
, rchan
);
1241 ltt_channel
->buffer_begin(buf
, *tsc
, beginidx
);
1242 /* Must write buffer end before incrementing commit count */
1244 offsets
->commit_count
= local_add_return(ltt_subbuffer_header_size(),
1245 <t_buf
->commit_count
[beginidx
]);
1246 /* Check if the written buffer has to be delivered */
1247 if ((BUFFER_TRUNC(offsets
->begin
, rchan
)
1248 >> ltt_channel
->n_subbufs_order
)
1249 - ((offsets
->commit_count
- rchan
->subbuf_size
)
1250 & ltt_channel
->commit_count_mask
) == 0)
1251 ltt_deliver(buf
, beginidx
, NULL
);
1256 * ltt_reserve_end_switch_current: finish switching current subbuffer
1258 * Concurrency safe because we are the last and only thread to alter this
1259 * sub-buffer. As long as it is not delivered and read, no other thread can
1260 * alter the offset, alter the reserve_count or call the
1261 * client_buffer_end_callback on this sub-buffer.
1263 * The only remaining threads could be the ones with pending commits. They will
1264 * have to do the deliver themselves. Not concurrency safe in overwrite mode.
1265 * We detect corrupted subbuffers with commit and reserve counts. We keep a
1266 * corrupted sub-buffers count and push the readers across these sub-buffers.
1268 * Not concurrency safe if a writer is stalled in a subbuffer and another writer
1269 * switches in, finding out it's corrupted. The result will be than the old
1270 * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
1271 * will be declared corrupted too because of the commit count adjustment.
1273 static inline void ltt_reserve_end_switch_current(
1274 struct ltt_channel_struct
*ltt_channel
,
1275 struct ltt_channel_buf_struct
*ltt_buf
, struct rchan
*rchan
,
1276 struct rchan_buf
*buf
,
1277 struct ltt_reserve_switch_offsets
*offsets
, u64
*tsc
)
1279 long endidx
= SUBBUF_INDEX(offsets
->end
- 1, rchan
);
1281 ltt_channel
->buffer_end(buf
, *tsc
, offsets
->end
, endidx
);
1282 /* Must write buffer begin before incrementing commit count */
1284 offsets
->commit_count
=
1285 local_add_return(rchan
->subbuf_size
1286 - (SUBBUF_OFFSET(offsets
->end
- 1, rchan
)
1288 <t_buf
->commit_count
[endidx
]);
1289 if ((BUFFER_TRUNC(offsets
->end
- 1, rchan
)
1290 >> ltt_channel
->n_subbufs_order
)
1291 - ((offsets
->commit_count
- rchan
->subbuf_size
)
1292 & ltt_channel
->commit_count_mask
) == 0)
1293 ltt_deliver(buf
, endidx
, NULL
);
1297 * ltt_relay_reserve_slot - Atomic slot reservation in a LTTng buffer.
1298 * @trace: the trace structure to log to.
1299 * @ltt_channel: channel structure
1300 * @transport_data: data structure specific to ltt relay
1301 * @data_size: size of the variable length data to log.
1302 * @slot_size: pointer to total size of the slot (out)
1303 * @buf_offset : pointer to reserved buffer offset (out)
1304 * @tsc: pointer to the tsc at the slot reservation (out)
1307 * Return : -ENOSPC if not enough space, else returns 0.
1308 * It will take care of sub-buffer switching.
1310 static notrace
int ltt_relay_reserve_slot(struct ltt_trace_struct
*trace
,
1311 struct ltt_channel_struct
*ltt_channel
, void **transport_data
,
1312 size_t data_size
, size_t *slot_size
, long *buf_offset
, u64
*tsc
,
1313 unsigned int *rflags
, int largest_align
, int cpu
)
1315 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
1316 struct rchan_buf
*buf
= *transport_data
=
1318 struct ltt_channel_buf_struct
*ltt_buf
=
1319 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1320 struct ltt_reserve_switch_offsets offsets
;
1322 offsets
.reserve_commit_diff
= 0;
1326 * Perform retryable operations.
1328 if (__get_cpu_var(ltt_nesting
) > 4) {
1329 local_inc(<t_buf
->events_lost
);
1333 if (ltt_relay_try_reserve(ltt_channel
, ltt_buf
,
1334 rchan
, buf
, &offsets
, data_size
, tsc
, rflags
,
1337 } while (local_cmpxchg(<t_buf
->offset
, offsets
.old
,
1338 offsets
.end
) != offsets
.old
);
1341 * Atomically update last_tsc. This update races against concurrent
1342 * atomic updates, but the race will always cause supplementary full TSC
1343 * events, never the opposite (missing a full TSC event when it would be
1346 save_last_tsc(ltt_buf
, *tsc
);
1349 * Push the reader if necessary
1351 ltt_reserve_push_reader(ltt_channel
, ltt_buf
, rchan
, buf
, &offsets
);
1354 * Switch old subbuffer if needed.
1356 if (offsets
.end_switch_old
)
1357 ltt_reserve_switch_old_subbuf(ltt_channel
, ltt_buf
, rchan
, buf
,
1361 * Populate new subbuffer.
1363 if (offsets
.begin_switch
)
1364 ltt_reserve_switch_new_subbuf(ltt_channel
, ltt_buf
, rchan
,
1365 buf
, &offsets
, tsc
);
1367 if (offsets
.end_switch_current
)
1368 ltt_reserve_end_switch_current(ltt_channel
, ltt_buf
, rchan
,
1369 buf
, &offsets
, tsc
);
1371 *slot_size
= offsets
.size
;
1372 *buf_offset
= offsets
.begin
+ offsets
.before_hdr_pad
;
1377 * Force a sub-buffer switch for a per-cpu buffer. This operation is
1378 * completely reentrant : can be called while tracing is active with
1379 * absolutely no lock held.
1381 * Note, however, that as a local_cmpxchg is used for some atomic
1382 * operations, this function must be called from the CPU which owns the buffer
1383 * for a ACTIVE flush.
1385 static notrace
void ltt_force_switch(struct rchan_buf
*buf
,
1386 enum force_switch_mode mode
)
1388 struct ltt_channel_struct
*ltt_channel
=
1389 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
1390 struct ltt_channel_buf_struct
*ltt_buf
=
1391 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1392 struct rchan
*rchan
= ltt_channel
->trans_channel_data
;
1393 struct ltt_reserve_switch_offsets offsets
;
1396 offsets
.reserve_commit_diff
= 0;
1400 * Perform retryable operations.
1403 if (ltt_relay_try_switch(mode
, ltt_channel
, ltt_buf
,
1404 rchan
, buf
, &offsets
, &tsc
))
1406 } while (local_cmpxchg(<t_buf
->offset
, offsets
.old
,
1407 offsets
.end
) != offsets
.old
);
1410 * Atomically update last_tsc. This update races against concurrent
1411 * atomic updates, but the race will always cause supplementary full TSC
1412 * events, never the opposite (missing a full TSC event when it would be
1415 save_last_tsc(ltt_buf
, tsc
);
1418 * Push the reader if necessary
1420 if (mode
== FORCE_ACTIVE
)
1421 ltt_reserve_push_reader(ltt_channel
, ltt_buf
, rchan
,
1425 * Switch old subbuffer if needed.
1427 if (offsets
.end_switch_old
)
1428 ltt_reserve_switch_old_subbuf(ltt_channel
, ltt_buf
, rchan
, buf
,
1432 * Populate new subbuffer.
1434 if (mode
== FORCE_ACTIVE
)
1435 ltt_reserve_switch_new_subbuf(ltt_channel
,
1436 ltt_buf
, rchan
, buf
, &offsets
, &tsc
);
1440 * for flight recording. must be called after relay_commit.
1441 * This function decrements de subbuffer's lost_size each time the commit count
1442 * reaches back the reserve offset (module subbuffer size). It is useful for
1444 * We use slot_size - 1 to make sure we deal correctly with the case where we
1445 * fill the subbuffer completely (so the subbuf index stays in the previous
1448 #ifdef CONFIG_LTT_VMCORE
1449 static inline void ltt_write_commit_counter(struct rchan_buf
*buf
,
1450 long buf_offset
, size_t slot_size
)
1452 struct ltt_channel_struct
*ltt_channel
=
1453 (struct ltt_channel_struct
*)buf
->chan
->private_data
;
1454 struct ltt_channel_buf_struct
*ltt_buf
=
1455 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1456 struct ltt_subbuffer_header
*header
;
1457 long offset
, subbuf_idx
, commit_count
;
1458 uint32_t lost_old
, lost_new
;
1460 subbuf_idx
= SUBBUF_INDEX(buf_offset
- 1, buf
->chan
);
1461 offset
= buf_offset
+ slot_size
;
1462 header
= (struct ltt_subbuffer_header
*)
1463 ltt_relay_offset_address(buf
,
1464 subbuf_idx
* buf
->chan
->subbuf_size
);
1466 lost_old
= header
->lost_size
;
1468 local_read(<t_buf
->commit_count
[subbuf_idx
]);
1469 /* SUBBUF_OFFSET includes commit_count_mask */
1470 if (!SUBBUF_OFFSET(offset
- commit_count
, buf
->chan
)) {
1471 lost_new
= (uint32_t)buf
->chan
->subbuf_size
1472 - SUBBUF_OFFSET(commit_count
, buf
->chan
);
1473 lost_old
= cmpxchg_local(&header
->lost_size
, lost_old
,
1475 if (lost_old
<= lost_new
)
1483 static inline void ltt_write_commit_counter(struct rchan_buf
*buf
,
1484 long buf_offset
, size_t slot_size
)
1490 * Atomic unordered slot commit. Increments the commit count in the
1491 * specified sub-buffer, and delivers it if necessary.
1495 * @ltt_channel : channel structure
1496 * @transport_data: transport-specific data
1497 * @buf_offset : offset following the event header.
1498 * @slot_size : size of the reserved slot.
1500 static notrace
void ltt_relay_commit_slot(
1501 struct ltt_channel_struct
*ltt_channel
,
1502 void **transport_data
, long buf_offset
, size_t slot_size
)
1504 struct rchan_buf
*buf
= *transport_data
;
1505 struct ltt_channel_buf_struct
*ltt_buf
=
1506 percpu_ptr(ltt_channel
->buf
, buf
->cpu
);
1507 struct rchan
*rchan
= buf
->chan
;
1508 long offset_end
= buf_offset
;
1509 long endidx
= SUBBUF_INDEX(offset_end
- 1, rchan
);
1512 /* Must write slot data before incrementing commit count */
1514 commit_count
= local_add_return(slot_size
,
1515 <t_buf
->commit_count
[endidx
]);
1516 /* Check if all commits have been done */
1517 if ((BUFFER_TRUNC(offset_end
- 1, rchan
)
1518 >> ltt_channel
->n_subbufs_order
)
1519 - ((commit_count
- rchan
->subbuf_size
)
1520 & ltt_channel
->commit_count_mask
) == 0)
1521 ltt_deliver(buf
, endidx
, NULL
);
1523 * Update lost_size for each commit. It's needed only for extracting
1524 * ltt buffers from vmcore, after crash.
1526 ltt_write_commit_counter(buf
, buf_offset
, slot_size
);
1530 * This is called with preemption disabled when user space has requested
1531 * blocking mode. If one of the active traces has free space below a
1532 * specific threshold value, we reenable preemption and block.
1534 static int ltt_relay_user_blocking(struct ltt_trace_struct
*trace
,
1535 unsigned int chan_index
, size_t data_size
,
1536 struct user_dbg_data
*dbg
)
1538 struct rchan
*rchan
;
1539 struct ltt_channel_buf_struct
*ltt_buf
;
1540 struct ltt_channel_struct
*channel
;
1541 struct rchan_buf
*relay_buf
;
1543 DECLARE_WAITQUEUE(wait
, current
);
1545 channel
= &trace
->channels
[chan_index
];
1546 rchan
= channel
->trans_channel_data
;
1547 cpu
= smp_processor_id();
1548 relay_buf
= rchan
->buf
[cpu
];
1549 ltt_buf
= percpu_ptr(channel
->buf
, cpu
);
1552 * Check if data is too big for the channel : do not
1555 if (LTT_RESERVE_CRITICAL
+ data_size
> relay_buf
->chan
->subbuf_size
)
1559 * If free space too low, we block. We restart from the
1560 * beginning after we resume (cpu id may have changed
1561 * while preemption is active).
1563 spin_lock(<t_buf
->full_lock
);
1564 if (!channel
->overwrite
) {
1565 dbg
->write
= local_read(<t_buf
->offset
);
1566 dbg
->read
= atomic_long_read(<t_buf
->consumed
);
1567 dbg
->avail_size
= dbg
->write
+ LTT_RESERVE_CRITICAL
+ data_size
1568 - SUBBUF_TRUNC(dbg
->read
,
1570 if (dbg
->avail_size
> rchan
->alloc_size
) {
1571 __set_current_state(TASK_INTERRUPTIBLE
);
1572 add_wait_queue(<t_buf
->write_wait
, &wait
);
1573 spin_unlock(<t_buf
->full_lock
);
1576 __set_current_state(TASK_RUNNING
);
1577 remove_wait_queue(<t_buf
->write_wait
, &wait
);
1578 if (signal_pending(current
))
1579 return -ERESTARTSYS
;
1584 spin_unlock(<t_buf
->full_lock
);
1588 static void ltt_relay_print_user_errors(struct ltt_trace_struct
*trace
,
1589 unsigned int chan_index
, size_t data_size
,
1590 struct user_dbg_data
*dbg
, int cpu
)
1592 struct rchan
*rchan
;
1593 struct ltt_channel_buf_struct
*ltt_buf
;
1594 struct ltt_channel_struct
*channel
;
1595 struct rchan_buf
*relay_buf
;
1597 channel
= &trace
->channels
[chan_index
];
1598 rchan
= channel
->trans_channel_data
;
1599 relay_buf
= rchan
->buf
[cpu
];
1600 ltt_buf
= percpu_ptr(channel
->buf
, cpu
);
1602 printk(KERN_ERR
"Error in LTT usertrace : "
1603 "buffer full : event lost in blocking "
1604 "mode. Increase LTT_RESERVE_CRITICAL.\n");
1605 printk(KERN_ERR
"LTT nesting level is %u.\n",
1606 per_cpu(ltt_nesting
, cpu
));
1607 printk(KERN_ERR
"LTT avail size %lu.\n",
1609 printk(KERN_ERR
"avai write : %lu, read : %lu\n",
1610 dbg
->write
, dbg
->read
);
1612 dbg
->write
= local_read(<t_buf
->offset
);
1613 dbg
->read
= atomic_long_read(<t_buf
->consumed
);
1615 printk(KERN_ERR
"LTT cur size %lu.\n",
1616 dbg
->write
+ LTT_RESERVE_CRITICAL
+ data_size
1617 - SUBBUF_TRUNC(dbg
->read
, relay_buf
->chan
));
1618 printk(KERN_ERR
"cur write : %lu, read : %lu\n",
1619 dbg
->write
, dbg
->read
);
1622 static struct ltt_transport ltt_relay_transport
= {
1624 .owner
= THIS_MODULE
,
1626 .create_dirs
= ltt_relay_create_dirs
,
1627 .remove_dirs
= ltt_relay_remove_dirs
,
1628 .create_channel
= ltt_relay_create_channel
,
1629 .finish_channel
= ltt_relay_finish_channel
,
1630 .remove_channel
= ltt_relay_remove_channel
,
1631 .wakeup_channel
= ltt_relay_async_wakeup_chan
,
1632 .commit_slot
= ltt_relay_commit_slot
,
1633 .reserve_slot
= ltt_relay_reserve_slot
,
1634 .user_blocking
= ltt_relay_user_blocking
,
1635 .user_errors
= ltt_relay_print_user_errors
,
1639 static int __init
ltt_relay_init(void)
1641 printk(KERN_INFO
"LTT : ltt-relay init\n");
1643 ltt_file_operations
= ltt_relay_file_operations
;
1644 ltt_file_operations
.owner
= THIS_MODULE
;
1645 ltt_file_operations
.open
= ltt_open
;
1646 ltt_file_operations
.release
= ltt_release
;
1647 ltt_file_operations
.poll
= ltt_poll
;
1648 ltt_file_operations
.splice_read
= ltt_relay_file_splice_read
,
1649 ltt_file_operations
.ioctl
= ltt_ioctl
;
1650 #ifdef CONFIG_COMPAT
1651 ltt_file_operations
.compat_ioctl
= ltt_compat_ioctl
;
1654 ltt_transport_register(<t_relay_transport
);
1659 static void __exit
ltt_relay_exit(void)
1661 printk(KERN_INFO
"LTT : ltt-relay exit\n");
1663 ltt_transport_unregister(<t_relay_transport
);
1666 module_init(ltt_relay_init
);
1667 module_exit(ltt_relay_exit
);
1669 MODULE_LICENSE("GPL");
1670 MODULE_AUTHOR("Mathieu Desnoyers");
1671 MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");