X-Git-Url: http://git.lttng.org./?a=blobdiff_plain;f=ltt-relay-lockless.c;fp=ltt-relay-lockless.c;h=0000000000000000000000000000000000000000;hb=7514523fe6d6be9d94be2e577954cdd82296d20e;hp=db4362d45569801055c3f187720592146dd5e97c;hpb=6cadfbcb9112fbf75db9551bec68a40f650c3020;p=lttng-modules.git diff --git a/ltt-relay-lockless.c b/ltt-relay-lockless.c deleted file mode 100644 index db4362d4..00000000 --- a/ltt-relay-lockless.c +++ /dev/null @@ -1,1366 +0,0 @@ -/* - * ltt/ltt-relay-lockless.c - * - * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca) - * - * LTTng lockless buffer space management (reader/writer). - * - * Author: - * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca) - * - * Inspired from LTT : - * Karim Yaghmour (karim@opersys.com) - * Tom Zanussi (zanussi@us.ibm.com) - * Bob Wisniewski (bob@watson.ibm.com) - * And from K42 : - * Bob Wisniewski (bob@watson.ibm.com) - * - * Changelog: - * 08/10/08, Cleanup. - * 19/10/05, Complete lockless mechanism. - * 27/05/05, Modular redesign and rewrite. - * - * Userspace reader semantic : - * while (poll fd != POLLHUP) { - * - ioctl RELAY_GET_SUBBUF_SIZE - * while (1) { - * - ioctl GET_SUBBUF - * - splice 1 subbuffer worth of data to a pipe - * - splice the data from pipe to disk/network - * - ioctl PUT_SUBBUF, check error value - * if err val < 0, previous subbuffer was corrupted. - * } - * } - * - * Dual LGPL v2.1/GPL v2 license. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ltt-tracer.h" -#include "ltt-relay.h" -#include "ltt-relay-lockless.h" - -#if 0 -#define printk_dbg(fmt, args...) printk(fmt, args) -#else -#define printk_dbg(fmt, args...) -#endif - -struct ltt_reserve_switch_offsets { - long begin, end, old; - long begin_switch, end_switch_current, end_switch_old; - size_t before_hdr_pad, size; -}; - -static -void ltt_force_switch(struct ltt_chanbuf *buf, enum force_switch_mode mode); - -static -void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu); - -static const struct file_operations ltt_file_operations; - -static -void ltt_buffer_begin(struct ltt_chanbuf *buf, u64 tsc, unsigned int subbuf_idx) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - struct ltt_subbuffer_header *header = - (struct ltt_subbuffer_header *) - ltt_relay_offset_address(&buf->a, - subbuf_idx * chan->a.sb_size); - - header->cycle_count_begin = tsc; - header->data_size = 0xFFFFFFFF; /* for debugging */ - ltt_write_trace_header(chan->a.trace, header); -} - -/* - * offset is assumed to never be 0 here : never deliver a completely empty - * subbuffer. The lost size is between 0 and subbuf_size-1. - */ -static -void ltt_buffer_end(struct ltt_chanbuf *buf, u64 tsc, unsigned int offset, - unsigned int subbuf_idx) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - struct ltt_subbuffer_header *header = - (struct ltt_subbuffer_header *) - ltt_relay_offset_address(&buf->a, - subbuf_idx * chan->a.sb_size); - u32 data_size = SUBBUF_OFFSET(offset - 1, chan) + 1; - - header->data_size = data_size; - header->sb_size = PAGE_ALIGN(data_size); - header->cycle_count_end = tsc; - header->events_lost = local_read(&buf->events_lost); - header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers); -} - -/* - * Must be called under trace lock or cpu hotplug protection. - */ -void ltt_chanbuf_free(struct ltt_chanbuf *buf) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - - ltt_relay_print_buffer_errors(chan, buf->a.cpu); -#ifdef LTT_VMCORE - kfree(buf->commit_seq); -#endif - kfree(buf->commit_count); - - ltt_chanbuf_alloc_free(&buf->a); -} - -/* - * Must be called under trace lock or cpu hotplug protection. - */ -int ltt_chanbuf_create(struct ltt_chanbuf *buf, struct ltt_chan_alloc *chana, - int cpu) -{ - struct ltt_chan *chan = container_of(chana, struct ltt_chan, a); - struct ltt_trace *trace = chana->trace; - unsigned int j, n_sb; - int ret; - - /* Test for cpu hotplug */ - if (buf->a.allocated) - return 0; - - ret = ltt_chanbuf_alloc_create(&buf->a, &chan->a, cpu); - if (ret) - return ret; - - buf->commit_count = - kzalloc_node(ALIGN(sizeof(*buf->commit_count) * chan->a.n_sb, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(cpu)); - if (!buf->commit_count) { - ret = -ENOMEM; - goto free_chanbuf; - } - -#ifdef LTT_VMCORE - buf->commit_seq = - kzalloc_node(ALIGN(sizeof(*buf->commit_seq) * chan->a.n_sb, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(cpu)); - if (!buf->commit_seq) { - kfree(buf->commit_count); - ret = -ENOMEM; - goto free_commit; - } -#endif - - local_set(&buf->offset, ltt_sb_header_size()); - atomic_long_set(&buf->consumed, 0); - atomic_long_set(&buf->active_readers, 0); - n_sb = chan->a.n_sb; - for (j = 0; j < n_sb; j++) { - local_set(&buf->commit_count[j].cc, 0); - local_set(&buf->commit_count[j].cc_sb, 0); - local_set(&buf->commit_count[j].events, 0); - } - init_waitqueue_head(&buf->write_wait); - init_waitqueue_head(&buf->read_wait); - spin_lock_init(&buf->full_lock); - - RCHAN_SB_CLEAR_NOREF(buf->a.buf_wsb[0].pages); - ltt_buffer_begin(buf, trace->start_tsc, 0); - /* atomic_add made on local variable on data that belongs to - * various CPUs : ok because tracing not started (for this cpu). */ - local_add(ltt_sb_header_size(), &buf->commit_count[0].cc); - - local_set(&buf->events_lost, 0); - local_set(&buf->corrupted_subbuffers, 0); - buf->finalized = 0; - - ret = ltt_chanbuf_create_file(chan->a.filename, chan->a.parent, - S_IRUSR, buf); - if (ret) - goto free_init; - - /* - * Ensure the buffer is ready before setting it to allocated. - * Used for cpu hotplug vs async wakeup. - */ - smp_wmb(); - buf->a.allocated = 1; - - return 0; - - /* Error handling */ -free_init: -#ifdef LTT_VMCORE - kfree(buf->commit_seq); -free_commit: -#endif - kfree(buf->commit_count); -free_chanbuf: - ltt_chanbuf_alloc_free(&buf->a); - return ret; -} - -void ltt_chan_remove_files(struct ltt_chan *chan) -{ - ltt_ascii_remove(chan); - ltt_chan_alloc_remove_files(&chan->a); -} -EXPORT_SYMBOL_GPL(ltt_chan_remove_files); - - -void ltt_chan_free(struct kref *kref) -{ - struct ltt_chan *chan = container_of(kref, struct ltt_chan, a.kref); - - ltt_chan_alloc_free(&chan->a); -} -EXPORT_SYMBOL_GPL(ltt_chan_free); - -/** - * ltt_chan_create - Create channel. - */ -int ltt_chan_create(const char *base_filename, - struct ltt_chan *chan, struct dentry *parent, - size_t sb_size, size_t n_sb, - int overwrite, struct ltt_trace *trace) -{ - int ret; - - chan->overwrite = overwrite; - - ret = ltt_chan_alloc_init(&chan->a, trace, base_filename, parent, - sb_size, n_sb, overwrite, overwrite); - if (ret) - goto error; - - chan->commit_count_mask = (~0UL >> chan->a.n_sb_order); - - ret = ltt_ascii_create(chan); - if (ret) - goto error_chan_alloc_free; - - return ret; - -error_chan_alloc_free: - ltt_chan_alloc_free(&chan->a); -error: - return ret; -} -EXPORT_SYMBOL_GPL(ltt_chan_create); - -int ltt_chanbuf_open_read(struct ltt_chanbuf *buf) -{ - kref_get(&buf->a.chan->kref); - if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) { - kref_put(&buf->a.chan->kref, ltt_chan_free); - return -EBUSY; - } - - return 0; -} -EXPORT_SYMBOL_GPL(ltt_chanbuf_open_read); - -void ltt_chanbuf_release_read(struct ltt_chanbuf *buf) -{ - //ltt_relay_destroy_buffer(&buf->a.chan->a, buf->a.cpu); - WARN_ON(atomic_long_read(&buf->active_readers) != 1); - atomic_long_dec(&buf->active_readers); - kref_put(&buf->a.chan->kref, ltt_chan_free); -} -EXPORT_SYMBOL_GPL(ltt_chanbuf_release_read); - -/* - * Wake writers : - * - * This must be done after the trace is removed from the RCU list so that there - * are no stalled writers. - */ -static void ltt_relay_wake_writers(struct ltt_chanbuf *buf) -{ - - if (waitqueue_active(&buf->write_wait)) - wake_up_interruptible(&buf->write_wait); -} - -/* - * This function should not be called from NMI interrupt context - */ -static void ltt_buf_unfull(struct ltt_chanbuf *buf) -{ - ltt_relay_wake_writers(buf); -} - -/* - * Promote compiler barrier to a smp_mb(). - * For the specific LTTng case, this IPI call should be removed if the - * architecture does not reorder writes. This should eventually be provided by - * a separate architecture-specific infrastructure. - */ -static void remote_mb(void *info) -{ - smp_mb(); -} - -int ltt_chanbuf_get_subbuf(struct ltt_chanbuf *buf, unsigned long *consumed) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - long consumed_old, consumed_idx, commit_count, write_offset; - int ret; - - consumed_old = atomic_long_read(&buf->consumed); - consumed_idx = SUBBUF_INDEX(consumed_old, chan); - commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb); - /* - * Make sure we read the commit count before reading the buffer - * data and the write offset. Correct consumed offset ordering - * wrt commit count is insured by the use of cmpxchg to update - * the consumed offset. - * smp_call_function_single can fail if the remote CPU is offline, - * this is OK because then there is no wmb to execute there. - * If our thread is executing on the same CPU as the on the buffers - * belongs to, we don't have to synchronize it at all. If we are - * migrated, the scheduler will take care of the memory barriers. - * Normally, smp_call_function_single() should ensure program order when - * executing the remote function, which implies that it surrounds the - * function execution with : - * smp_mb() - * send IPI - * csd_lock_wait - * recv IPI - * smp_mb() - * exec. function - * smp_mb() - * csd unlock - * smp_mb() - * - * However, smp_call_function_single() does not seem to clearly execute - * such barriers. It depends on spinlock semantic to provide the barrier - * before executing the IPI and, when busy-looping, csd_lock_wait only - * executes smp_mb() when it has to wait for the other CPU. - * - * I don't trust this code. Therefore, let's add the smp_mb() sequence - * required ourself, even if duplicated. It has no performance impact - * anyway. - * - * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs - * read and write vs write. They do not ensure core synchronization. We - * really have to ensure total order between the 3 barriers running on - * the 2 CPUs. - */ -#ifdef LTT_NO_IPI_BARRIER - /* - * Local rmb to match the remote wmb to read the commit count before the - * buffer data and the write offset. - */ - smp_rmb(); -#else - if (raw_smp_processor_id() != buf->a.cpu) { - smp_mb(); /* Total order with IPI handler smp_mb() */ - smp_call_function_single(buf->a.cpu, remote_mb, NULL, 1); - smp_mb(); /* Total order with IPI handler smp_mb() */ - } -#endif - write_offset = local_read(&buf->offset); - /* - * Check that the subbuffer we are trying to consume has been - * already fully committed. - */ - if (((commit_count - chan->a.sb_size) - & chan->commit_count_mask) - - (BUFFER_TRUNC(consumed_old, chan) - >> chan->a.n_sb_order) - != 0) { - return -EAGAIN; - } - /* - * Check that we are not about to read the same subbuffer in - * which the writer head is. - */ - if ((SUBBUF_TRUNC(write_offset, chan) - - SUBBUF_TRUNC(consumed_old, chan)) - == 0) { - return -EAGAIN; - } - - ret = update_read_sb_index(&buf->a, &chan->a, consumed_idx); - if (ret) - return ret; - - *consumed = consumed_old; - return 0; -} -EXPORT_SYMBOL_GPL(ltt_chanbuf_get_subbuf); - -int ltt_chanbuf_put_subbuf(struct ltt_chanbuf *buf, unsigned long consumed) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - long consumed_new, consumed_old; - - WARN_ON(atomic_long_read(&buf->active_readers) != 1); - - consumed_old = consumed; - consumed_new = SUBBUF_ALIGN(consumed_old, chan); - WARN_ON_ONCE(RCHAN_SB_IS_NOREF(buf->a.buf_rsb.pages)); - RCHAN_SB_SET_NOREF(buf->a.buf_rsb.pages); - - spin_lock(&buf->full_lock); - if (atomic_long_cmpxchg(&buf->consumed, consumed_old, consumed_new) - != consumed_old) { - /* We have been pushed by the writer. */ - spin_unlock(&buf->full_lock); - /* - * We exchanged the subbuffer pages. No corruption possible - * even if the writer did push us. No more -EIO possible. - */ - return 0; - } else { - /* tell the client that buffer is now unfull */ - int index; - long data; - index = SUBBUF_INDEX(consumed_old, chan); - data = BUFFER_OFFSET(consumed_old, chan); - ltt_buf_unfull(buf); - spin_unlock(&buf->full_lock); - } - return 0; -} -EXPORT_SYMBOL_GPL(ltt_chanbuf_put_subbuf); - -static void switch_buffer(unsigned long data) -{ - struct ltt_chanbuf *buf = (struct ltt_chanbuf *)data; - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - - /* - * Only flush buffers periodically if readers are active. - */ - if (atomic_long_read(&buf->active_readers)) - ltt_force_switch(buf, FORCE_ACTIVE); - - mod_timer_pinned(&buf->switch_timer, - jiffies + chan->switch_timer_interval); -} - -static void ltt_chanbuf_start_switch_timer(struct ltt_chanbuf *buf) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - - if (!chan->switch_timer_interval) - return; - - init_timer_deferrable(&buf->switch_timer); - buf->switch_timer.function = switch_buffer; - buf->switch_timer.expires = jiffies + chan->switch_timer_interval; - buf->switch_timer.data = (unsigned long)buf; - add_timer_on(&buf->switch_timer, buf->a.cpu); -} - -/* - * called with ltt traces lock held. - */ -void ltt_chan_start_switch_timer(struct ltt_chan *chan) -{ - int cpu; - - if (!chan->switch_timer_interval) - return; - - for_each_online_cpu(cpu) { - struct ltt_chanbuf *buf; - - buf = per_cpu_ptr(chan->a.buf, cpu); - ltt_chanbuf_start_switch_timer(buf); - } -} - -static void ltt_chanbuf_stop_switch_timer(struct ltt_chanbuf *buf) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - - if (!chan->switch_timer_interval) - return; - - del_timer_sync(&buf->switch_timer); -} - -/* - * called with ltt traces lock held. - */ -void ltt_chan_stop_switch_timer(struct ltt_chan *chan) -{ - int cpu; - - if (!chan->switch_timer_interval) - return; - - for_each_online_cpu(cpu) { - struct ltt_chanbuf *buf; - - buf = per_cpu_ptr(chan->a.buf, cpu); - ltt_chanbuf_stop_switch_timer(buf); - } -} - -static void ltt_chanbuf_idle_switch(struct ltt_chanbuf *buf) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - - if (chan->switch_timer_interval) - ltt_force_switch(buf, FORCE_ACTIVE); -} - -/* - * ltt_chanbuf_switch is called from a remote CPU to ensure that the buffers of - * a cpu which went down are flushed. Note that if we execute concurrently - * with trace allocation, a buffer might appear be unallocated (because it - * detects that the target CPU is offline). - */ -static void ltt_chanbuf_switch(struct ltt_chanbuf *buf) -{ - if (buf->a.allocated) - ltt_force_switch(buf, FORCE_ACTIVE); -} - -/** - * ltt_chanbuf_hotcpu_callback - CPU hotplug callback - * @nb: notifier block - * @action: hotplug action to take - * @hcpu: CPU number - * - * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) - */ -static -int ltt_chanbuf_hotcpu_callback(struct notifier_block *nb, - unsigned long action, - void *hcpu) -{ - unsigned int cpu = (unsigned long)hcpu; - - switch (action) { - case CPU_DOWN_FAILED: - case CPU_DOWN_FAILED_FROZEN: - case CPU_ONLINE: - case CPU_ONLINE_FROZEN: - /* - * CPU hotplug lock protects trace lock from this callback. - */ - ltt_chan_for_each_channel(ltt_chanbuf_start_switch_timer, cpu); - return NOTIFY_OK; - - case CPU_DOWN_PREPARE: - case CPU_DOWN_PREPARE_FROZEN: - /* - * Performs an IPI to delete the timer locally on the target - * CPU. CPU hotplug lock protects trace lock from this - * callback. - */ - ltt_chan_for_each_channel(ltt_chanbuf_stop_switch_timer, cpu); - return NOTIFY_OK; - - case CPU_DEAD: - case CPU_DEAD_FROZEN: - /* - * Performing a buffer switch on a remote CPU. Performed by - * the CPU responsible for doing the hotunplug after the target - * CPU stopped running completely. Ensures that all data - * from that remote CPU is flushed. CPU hotplug lock protects - * trace lock from this callback. - */ - ltt_chan_for_each_channel(ltt_chanbuf_switch, cpu); - return NOTIFY_OK; - - default: - return NOTIFY_DONE; - } -} - -static int pm_idle_entry_callback(struct notifier_block *self, - unsigned long val, void *data) -{ - if (val == IDLE_START) { - rcu_read_lock_sched_notrace(); - ltt_chan_for_each_channel(ltt_chanbuf_idle_switch, - smp_processor_id()); - rcu_read_unlock_sched_notrace(); - } - return 0; -} - -struct notifier_block pm_idle_entry_notifier = { - .notifier_call = pm_idle_entry_callback, - .priority = ~0U, /* smallest prio, run after tracing events */ -}; - -static -void ltt_relay_print_written(struct ltt_chan *chan, long cons_off, - unsigned int cpu) -{ - struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu); - long cons_idx, events_count; - - cons_idx = SUBBUF_INDEX(cons_off, chan); - events_count = local_read(&buf->commit_count[cons_idx].events); - - if (events_count) - printk(KERN_INFO - "LTT: %lu events written in channel %s " - "(cpu %u, index %lu)\n", - events_count, chan->a.filename, cpu, cons_idx); -} - -static -void ltt_relay_print_subbuffer_errors(struct ltt_chanbuf *buf, - struct ltt_chan *chan, long cons_off, - unsigned int cpu) -{ - long cons_idx, commit_count, commit_count_sb, write_offset; - - cons_idx = SUBBUF_INDEX(cons_off, chan); - commit_count = local_read(&buf->commit_count[cons_idx].cc); - commit_count_sb = local_read(&buf->commit_count[cons_idx].cc_sb); - /* - * No need to order commit_count and write_offset reads because we - * execute after trace is stopped when there are no readers left. - */ - write_offset = local_read(&buf->offset); - printk(KERN_WARNING - "LTT : unread channel %s offset is %ld " - "and cons_off : %ld (cpu %u)\n", - chan->a.filename, write_offset, cons_off, cpu); - /* Check each sub-buffer for non filled commit count */ - if (((commit_count - chan->a.sb_size) & chan->commit_count_mask) - - (BUFFER_TRUNC(cons_off, chan) >> chan->a.n_sb_order) - != 0) - printk(KERN_ALERT - "LTT : %s : subbuffer %lu has non filled " - "commit count [cc, cc_sb] [%lu,%lu].\n", - chan->a.filename, cons_idx, commit_count, - commit_count_sb); - printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %lu\n", - chan->a.filename, commit_count, chan->a.sb_size); -} - -static -void ltt_relay_print_errors(struct ltt_chanbuf *buf, struct ltt_chan *chan, - struct ltt_trace *trace, int cpu) -{ - long cons_off; - - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; - for (cons_off = 0; cons_off < chan->a.buf_size; - cons_off = SUBBUF_ALIGN(cons_off, chan)) - ltt_relay_print_written(chan, cons_off, cpu); - for (cons_off = atomic_long_read(&buf->consumed); - (SUBBUF_TRUNC(local_read(&buf->offset), chan) - - cons_off) > 0; - cons_off = SUBBUF_ALIGN(cons_off, chan)) - ltt_relay_print_subbuffer_errors(buf, chan, cons_off, cpu); -} - -static -void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu) -{ - struct ltt_trace *trace = chan->a.trace; - struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu); - - if (local_read(&buf->events_lost)) - printk(KERN_ALERT - "LTT : %s : %ld events lost " - "in %s channel (cpu %u).\n", - chan->a.filename, local_read(&buf->events_lost), - chan->a.filename, cpu); - if (local_read(&buf->corrupted_subbuffers)) - printk(KERN_ALERT - "LTT : %s : %ld corrupted subbuffers " - "in %s channel (cpu %u).\n", - chan->a.filename, - local_read(&buf->corrupted_subbuffers), - chan->a.filename, cpu); - - ltt_relay_print_errors(buf, chan, trace, cpu); -} - -static void ltt_relay_remove_dirs(struct ltt_trace *trace) -{ - ltt_ascii_remove_dir(trace); - debugfs_remove(trace->dentry.trace_root); -} - -static int ltt_relay_create_dirs(struct ltt_trace *new_trace) -{ - struct dentry *ltt_root_dentry; - int ret; - - ltt_root_dentry = get_ltt_root(); - if (!ltt_root_dentry) - return ENOENT; - - new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name, - ltt_root_dentry); - put_ltt_root(); - if (new_trace->dentry.trace_root == NULL) { - printk(KERN_ERR "LTT : Trace directory name %s already taken\n", - new_trace->trace_name); - return EEXIST; - } - ret = ltt_ascii_create_dir(new_trace); - if (ret) - printk(KERN_WARNING "LTT : Unable to create ascii output file " - "for trace %s\n", new_trace->trace_name); - - return 0; -} - -/* - * LTTng channel flush function. - * - * Must be called when no tracing is active in the channel, because of - * accesses across CPUs. - */ -static notrace void ltt_relay_buffer_flush(struct ltt_chanbuf *buf) -{ - buf->finalized = 1; - ltt_force_switch(buf, FORCE_FLUSH); -} - -static void ltt_relay_async_wakeup_chan(struct ltt_chan *chan) -{ - unsigned int i; - - for_each_possible_cpu(i) { - struct ltt_chanbuf *buf; - - buf = per_cpu_ptr(chan->a.buf, i); - if (!buf->a.allocated) - continue; - /* - * Ensure the buffer has been allocated before reading its - * content. Sync cpu hotplug vs async wakeup. - */ - smp_rmb(); - if (ltt_poll_deliver(buf, chan)) - wake_up_interruptible(&buf->read_wait); - } -} - -static void ltt_relay_finish_buffer(struct ltt_chan *chan, unsigned int cpu) -{ - struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu); - - if (buf->a.allocated) { - ltt_relay_buffer_flush(buf); - ltt_relay_wake_writers(buf); - } -} - - -static void ltt_relay_finish_channel(struct ltt_chan *chan) -{ - unsigned int i; - - for_each_possible_cpu(i) - ltt_relay_finish_buffer(chan, i); -} - -/* - * This is called with preemption disabled when user space has requested - * blocking mode. If one of the active traces has free space below a - * specific threshold value, we reenable preemption and block. - */ -static -int ltt_relay_user_blocking(struct ltt_trace *trace, unsigned int chan_index, - size_t data_size, struct user_dbg_data *dbg) -{ - struct ltt_chanbuf *buf; - struct ltt_chan *chan; - int cpu; - DECLARE_WAITQUEUE(wait, current); - - chan = &trace->channels[chan_index]; - cpu = smp_processor_id(); - buf = per_cpu_ptr(chan->a.buf, cpu); - - /* - * Check if data is too big for the channel : do not - * block for it. - */ - if (LTT_RESERVE_CRITICAL + data_size > chan->a.sb_size) - return 0; - - /* - * If free space too low, we block. We restart from the - * beginning after we resume (cpu id may have changed - * while preemption is active). - */ - spin_lock(&buf->full_lock); - if (!chan->overwrite) { - dbg->write = local_read(&buf->offset); - dbg->read = atomic_long_read(&buf->consumed); - dbg->avail_size = dbg->write + LTT_RESERVE_CRITICAL + data_size - - SUBBUF_TRUNC(dbg->read, chan); - if (dbg->avail_size > chan->a.buf_size) { - __set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&buf->write_wait, &wait); - spin_unlock(&buf->full_lock); - preempt_enable(); - schedule(); - __set_current_state(TASK_RUNNING); - remove_wait_queue(&buf->write_wait, &wait); - if (signal_pending(current)) - return -ERESTARTSYS; - preempt_disable(); - return 1; - } - } - spin_unlock(&buf->full_lock); - return 0; -} - -static -void ltt_relay_print_user_errors(struct ltt_trace *trace, - unsigned int chan_index, size_t data_size, - struct user_dbg_data *dbg, int cpu) -{ - struct ltt_chanbuf *buf; - struct ltt_chan *chan; - - chan = &trace->channels[chan_index]; - buf = per_cpu_ptr(chan->a.buf, cpu); - - printk(KERN_ERR "Error in LTT usertrace : " - "buffer full : event lost in blocking " - "mode. Increase LTT_RESERVE_CRITICAL.\n"); - printk(KERN_ERR "LTT nesting level is %u.\n", - per_cpu(ltt_nesting, cpu)); - printk(KERN_ERR "LTT available size %lu.\n", - dbg->avail_size); - printk(KERN_ERR "available write : %lu, read : %lu\n", - dbg->write, dbg->read); - - dbg->write = local_read(&buf->offset); - dbg->read = atomic_long_read(&buf->consumed); - - printk(KERN_ERR "LTT current size %lu.\n", - dbg->write + LTT_RESERVE_CRITICAL + data_size - - SUBBUF_TRUNC(dbg->read, chan)); - printk(KERN_ERR "current write : %lu, read : %lu\n", - dbg->write, dbg->read); -} - -/* - * ltt_reserve_switch_old_subbuf: switch old subbuffer - * - * Concurrency safe because we are the last and only thread to alter this - * sub-buffer. As long as it is not delivered and read, no other thread can - * alter the offset, alter the reserve_count or call the - * client_buffer_end_callback on this sub-buffer. - * - * The only remaining threads could be the ones with pending commits. They will - * have to do the deliver themselves. Not concurrency safe in overwrite mode. - * We detect corrupted subbuffers with commit and reserve counts. We keep a - * corrupted sub-buffers count and push the readers across these sub-buffers. - * - * Not concurrency safe if a writer is stalled in a subbuffer and another writer - * switches in, finding out it's corrupted. The result will be than the old - * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer - * will be declared corrupted too because of the commit count adjustment. - * - * Note : offset_old should never be 0 here. - */ -static -void ltt_reserve_switch_old_subbuf(struct ltt_chanbuf *buf, - struct ltt_chan *chan, - struct ltt_reserve_switch_offsets *offsets, - u64 *tsc) -{ - long oldidx = SUBBUF_INDEX(offsets->old - 1, chan); - long commit_count, padding_size; - - padding_size = chan->a.sb_size - - (SUBBUF_OFFSET(offsets->old - 1, chan) + 1); - ltt_buffer_end(buf, *tsc, offsets->old, oldidx); - - /* - * Must write slot data before incrementing commit count. - * This compiler barrier is upgraded into a smp_wmb() by the IPI - * sent by get_subbuf() when it does its smp_rmb(). - */ - barrier(); - local_add(padding_size, &buf->commit_count[oldidx].cc); - commit_count = local_read(&buf->commit_count[oldidx].cc); - ltt_check_deliver(buf, chan, offsets->old - 1, commit_count, oldidx); - ltt_write_commit_counter(buf, chan, oldidx, offsets->old, commit_count, - padding_size); -} - -/* - * ltt_reserve_switch_new_subbuf: Populate new subbuffer. - * - * This code can be executed unordered : writers may already have written to the - * sub-buffer before this code gets executed, caution. The commit makes sure - * that this code is executed before the deliver of this sub-buffer. - */ -static -void ltt_reserve_switch_new_subbuf(struct ltt_chanbuf *buf, - struct ltt_chan *chan, - struct ltt_reserve_switch_offsets *offsets, - u64 *tsc) -{ - long beginidx = SUBBUF_INDEX(offsets->begin, chan); - long commit_count; - - ltt_buffer_begin(buf, *tsc, beginidx); - - /* - * Must write slot data before incrementing commit count. - * This compiler barrier is upgraded into a smp_wmb() by the IPI - * sent by get_subbuf() when it does its smp_rmb(). - */ - barrier(); - local_add(ltt_sb_header_size(), &buf->commit_count[beginidx].cc); - commit_count = local_read(&buf->commit_count[beginidx].cc); - /* Check if the written buffer has to be delivered */ - ltt_check_deliver(buf, chan, offsets->begin, commit_count, beginidx); - ltt_write_commit_counter(buf, chan, beginidx, offsets->begin, - commit_count, ltt_sb_header_size()); -} - - -/* - * ltt_reserve_end_switch_current: finish switching current subbuffer - * - * Concurrency safe because we are the last and only thread to alter this - * sub-buffer. As long as it is not delivered and read, no other thread can - * alter the offset, alter the reserve_count or call the - * client_buffer_end_callback on this sub-buffer. - * - * The only remaining threads could be the ones with pending commits. They will - * have to do the deliver themselves. Not concurrency safe in overwrite mode. - * We detect corrupted subbuffers with commit and reserve counts. We keep a - * corrupted sub-buffers count and push the readers across these sub-buffers. - * - * Not concurrency safe if a writer is stalled in a subbuffer and another writer - * switches in, finding out it's corrupted. The result will be than the old - * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer - * will be declared corrupted too because of the commit count adjustment. - */ -static -void ltt_reserve_end_switch_current(struct ltt_chanbuf *buf, - struct ltt_chan *chan, - struct ltt_reserve_switch_offsets *offsets, - u64 *tsc) -{ - long endidx = SUBBUF_INDEX(offsets->end - 1, chan); - long commit_count, padding_size; - - padding_size = chan->a.sb_size - - (SUBBUF_OFFSET(offsets->end - 1, chan) + 1); - - ltt_buffer_end(buf, *tsc, offsets->end, endidx); - - /* - * Must write slot data before incrementing commit count. - * This compiler barrier is upgraded into a smp_wmb() by the IPI - * sent by get_subbuf() when it does its smp_rmb(). - */ - barrier(); - local_add(padding_size, &buf->commit_count[endidx].cc); - commit_count = local_read(&buf->commit_count[endidx].cc); - ltt_check_deliver(buf, chan, offsets->end - 1, commit_count, endidx); - ltt_write_commit_counter(buf, chan, endidx, offsets->end, commit_count, - padding_size); -} - -/* - * Returns : - * 0 if ok - * !0 if execution must be aborted. - */ -static -int ltt_relay_try_switch_slow(enum force_switch_mode mode, - struct ltt_chanbuf *buf, struct ltt_chan *chan, - struct ltt_reserve_switch_offsets *offsets, - u64 *tsc) -{ - long sb_index; - long reserve_commit_diff; - long off; - - offsets->begin = local_read(&buf->offset); - offsets->old = offsets->begin; - offsets->begin_switch = 0; - offsets->end_switch_old = 0; - - *tsc = trace_clock_read64(); - - off = SUBBUF_OFFSET(offsets->begin, chan); - if ((mode != FORCE_ACTIVE && off > 0) || off > ltt_sb_header_size()) { - offsets->begin = SUBBUF_ALIGN(offsets->begin, chan); - offsets->end_switch_old = 1; - } else { - /* we do not have to switch : buffer is empty */ - return -1; - } - if (mode == FORCE_ACTIVE) - offsets->begin += ltt_sb_header_size(); - /* - * Always begin_switch in FORCE_ACTIVE mode. - * Test new buffer integrity - */ - sb_index = SUBBUF_INDEX(offsets->begin, chan); - reserve_commit_diff = - (BUFFER_TRUNC(offsets->begin, chan) - >> chan->a.n_sb_order) - - (local_read(&buf->commit_count[sb_index].cc_sb) - & chan->commit_count_mask); - if (reserve_commit_diff == 0) { - /* Next buffer not corrupted. */ - if (mode == FORCE_ACTIVE - && !chan->overwrite - && offsets->begin - atomic_long_read(&buf->consumed) - >= chan->a.buf_size) { - /* - * We do not overwrite non consumed buffers and we are - * full : ignore switch while tracing is active. - */ - return -1; - } - } else { - /* - * Next subbuffer corrupted. Force pushing reader even in normal - * mode - */ - } - offsets->end = offsets->begin; - return 0; -} - -/* - * Force a sub-buffer switch for a per-cpu buffer. This operation is - * completely reentrant : can be called while tracing is active with - * absolutely no lock held. - * - * Note, however, that as a local_cmpxchg is used for some atomic - * operations, this function must be called from the CPU which owns the buffer - * for a ACTIVE flush. - */ -void ltt_force_switch_lockless_slow(struct ltt_chanbuf *buf, - enum force_switch_mode mode) -{ - struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a); - struct ltt_reserve_switch_offsets offsets; - u64 tsc; - - offsets.size = 0; - - /* - * Perform retryable operations. - */ - do { - if (ltt_relay_try_switch_slow(mode, buf, chan, &offsets, &tsc)) - return; - } while (local_cmpxchg(&buf->offset, offsets.old, offsets.end) - != offsets.old); - - /* - * Atomically update last_tsc. This update races against concurrent - * atomic updates, but the race will always cause supplementary full TSC - * events, never the opposite (missing a full TSC event when it would be - * needed). - */ - save_last_tsc(buf, tsc); - - /* - * Push the reader if necessary - */ - if (mode == FORCE_ACTIVE) { - ltt_reserve_push_reader(buf, chan, offsets.end - 1); - ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1, - chan)); - } - - /* - * Switch old subbuffer if needed. - */ - if (offsets.end_switch_old) { - ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1, - chan)); - ltt_reserve_switch_old_subbuf(buf, chan, &offsets, &tsc); - } - - /* - * Populate new subbuffer. - */ - if (mode == FORCE_ACTIVE) - ltt_reserve_switch_new_subbuf(buf, chan, &offsets, &tsc); -} -EXPORT_SYMBOL_GPL(ltt_force_switch_lockless_slow); - -/* - * Returns : - * 0 if ok - * !0 if execution must be aborted. - */ -static -int ltt_relay_try_reserve_slow(struct ltt_chanbuf *buf, struct ltt_chan *chan, - struct ltt_reserve_switch_offsets *offsets, - size_t data_size, u64 *tsc, unsigned int *rflags, - int largest_align) -{ - long reserve_commit_diff; - - offsets->begin = local_read(&buf->offset); - offsets->old = offsets->begin; - offsets->begin_switch = 0; - offsets->end_switch_current = 0; - offsets->end_switch_old = 0; - - *tsc = trace_clock_read64(); - if (last_tsc_overflow(buf, *tsc)) - *rflags = LTT_RFLAG_ID_SIZE_TSC; - - if (unlikely(SUBBUF_OFFSET(offsets->begin, chan) == 0)) { - offsets->begin_switch = 1; /* For offsets->begin */ - } else { - offsets->size = ltt_get_header_size(chan, offsets->begin, - data_size, - &offsets->before_hdr_pad, - *rflags); - offsets->size += ltt_align(offsets->begin + offsets->size, - largest_align) - + data_size; - if (unlikely((SUBBUF_OFFSET(offsets->begin, chan) + - offsets->size) > chan->a.sb_size)) { - offsets->end_switch_old = 1; /* For offsets->old */ - offsets->begin_switch = 1; /* For offsets->begin */ - } - } - if (unlikely(offsets->begin_switch)) { - long sb_index; - - /* - * We are typically not filling the previous buffer completely. - */ - if (likely(offsets->end_switch_old)) - offsets->begin = SUBBUF_ALIGN(offsets->begin, chan); - offsets->begin = offsets->begin + ltt_sb_header_size(); - /* Test new buffer integrity */ - sb_index = SUBBUF_INDEX(offsets->begin, chan); - reserve_commit_diff = - (BUFFER_TRUNC(offsets->begin, chan) - >> chan->a.n_sb_order) - - (local_read(&buf->commit_count[sb_index].cc_sb) - & chan->commit_count_mask); - if (likely(reserve_commit_diff == 0)) { - /* Next buffer not corrupted. */ - if (unlikely(!chan->overwrite && - (SUBBUF_TRUNC(offsets->begin, chan) - - SUBBUF_TRUNC(atomic_long_read(&buf->consumed), - chan)) - >= chan->a.buf_size)) { - /* - * We do not overwrite non consumed buffers - * and we are full : event is lost. - */ - local_inc(&buf->events_lost); - return -1; - } else { - /* - * next buffer not corrupted, we are either in - * overwrite mode or the buffer is not full. - * It's safe to write in this new subbuffer. - */ - } - } else { - /* - * Next subbuffer corrupted. Drop event in normal and - * overwrite mode. Caused by either a writer OOPS or - * too many nested writes over a reserve/commit pair. - */ - local_inc(&buf->events_lost); - return -1; - } - offsets->size = ltt_get_header_size(chan, offsets->begin, - data_size, - &offsets->before_hdr_pad, - *rflags); - offsets->size += ltt_align(offsets->begin + offsets->size, - largest_align) - + data_size; - if (unlikely((SUBBUF_OFFSET(offsets->begin, chan) - + offsets->size) > chan->a.sb_size)) { - /* - * Event too big for subbuffers, report error, don't - * complete the sub-buffer switch. - */ - local_inc(&buf->events_lost); - return -1; - } else { - /* - * We just made a successful buffer switch and the event - * fits in the new subbuffer. Let's write. - */ - } - } else { - /* - * Event fits in the current buffer and we are not on a switch - * boundary. It's safe to write. - */ - } - offsets->end = offsets->begin + offsets->size; - - if (unlikely((SUBBUF_OFFSET(offsets->end, chan)) == 0)) { - /* - * The offset_end will fall at the very beginning of the next - * subbuffer. - */ - offsets->end_switch_current = 1; /* For offsets->begin */ - } - return 0; -} - -/** - * ltt_relay_reserve_slot_lockless_slow - Atomic slot reservation in a buffer. - * @trace: the trace structure to log to. - * @ltt_channel: channel structure - * @transport_data: data structure specific to ltt relay - * @data_size: size of the variable length data to log. - * @slot_size: pointer to total size of the slot (out) - * @buf_offset : pointer to reserved buffer offset (out) - * @tsc: pointer to the tsc at the slot reservation (out) - * @cpu: cpuid - * - * Return : -ENOSPC if not enough space, else returns 0. - * It will take care of sub-buffer switching. - */ -int ltt_reserve_slot_lockless_slow(struct ltt_chan *chan, - struct ltt_trace *trace, size_t data_size, - int largest_align, int cpu, - struct ltt_chanbuf **ret_buf, - size_t *slot_size, long *buf_offset, - u64 *tsc, unsigned int *rflags) -{ - struct ltt_chanbuf *buf = *ret_buf = per_cpu_ptr(chan->a.buf, cpu); - struct ltt_reserve_switch_offsets offsets; - - offsets.size = 0; - - do { - if (unlikely(ltt_relay_try_reserve_slow(buf, chan, &offsets, - data_size, tsc, rflags, - largest_align))) - return -ENOSPC; - } while (unlikely(local_cmpxchg(&buf->offset, offsets.old, offsets.end) - != offsets.old)); - - /* - * Atomically update last_tsc. This update races against concurrent - * atomic updates, but the race will always cause supplementary full TSC - * events, never the opposite (missing a full TSC event when it would be - * needed). - */ - save_last_tsc(buf, *tsc); - - /* - * Push the reader if necessary - */ - ltt_reserve_push_reader(buf, chan, offsets.end - 1); - - /* - * Clear noref flag for this subbuffer. - */ - ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1, chan)); - - /* - * Switch old subbuffer if needed. - */ - if (unlikely(offsets.end_switch_old)) { - ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1, - chan)); - ltt_reserve_switch_old_subbuf(buf, chan, &offsets, tsc); - } - - /* - * Populate new subbuffer. - */ - if (unlikely(offsets.begin_switch)) - ltt_reserve_switch_new_subbuf(buf, chan, &offsets, tsc); - - if (unlikely(offsets.end_switch_current)) - ltt_reserve_end_switch_current(buf, chan, &offsets, tsc); - - *slot_size = offsets.size; - *buf_offset = offsets.begin + offsets.before_hdr_pad; - return 0; -} -EXPORT_SYMBOL_GPL(ltt_reserve_slot_lockless_slow); - -static struct ltt_transport ltt_relay_transport = { - .name = "relay", - .owner = THIS_MODULE, - .ops = { - .create_dirs = ltt_relay_create_dirs, - .remove_dirs = ltt_relay_remove_dirs, - .create_channel = ltt_chan_create, - .finish_channel = ltt_relay_finish_channel, - .remove_channel = ltt_chan_free, - .remove_channel_files = ltt_chan_remove_files, - .wakeup_channel = ltt_relay_async_wakeup_chan, - .user_blocking = ltt_relay_user_blocking, - .user_errors = ltt_relay_print_user_errors, - .start_switch_timer = ltt_chan_start_switch_timer, - .stop_switch_timer = ltt_chan_stop_switch_timer, - }, -}; - -static struct notifier_block fn_ltt_chanbuf_hotcpu_callback = { - .notifier_call = ltt_chanbuf_hotcpu_callback, - .priority = 6, -}; - -int __init ltt_relay_init(void) -{ - printk(KERN_INFO "LTT : ltt-relay init\n"); - - ltt_transport_register(<t_relay_transport); - register_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback); - register_idle_notifier(&pm_idle_entry_notifier); - - return 0; -} - -void __exit ltt_relay_exit(void) -{ - printk(KERN_INFO "LTT : ltt-relay exit\n"); - - unregister_idle_notifier(&pm_idle_entry_notifier); - unregister_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback); - ltt_transport_unregister(<t_relay_transport); -} - -MODULE_LICENSE("GPL and additional rights"); -MODULE_AUTHOR("Mathieu Desnoyers"); -MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");