-/*
- * ltt/ltt-relay-lockless.c
- *
- * (C) Copyright 2005-2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * LTTng lockless buffer space management (reader/writer).
- *
- * Author:
- * Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
- *
- * Inspired from LTT :
- * Karim Yaghmour (karim@opersys.com)
- * Tom Zanussi (zanussi@us.ibm.com)
- * Bob Wisniewski (bob@watson.ibm.com)
- * And from K42 :
- * Bob Wisniewski (bob@watson.ibm.com)
- *
- * Changelog:
- * 08/10/08, Cleanup.
- * 19/10/05, Complete lockless mechanism.
- * 27/05/05, Modular redesign and rewrite.
- *
- * Userspace reader semantic :
- * while (poll fd != POLLHUP) {
- * - ioctl RELAY_GET_SUBBUF_SIZE
- * while (1) {
- * - ioctl GET_SUBBUF
- * - splice 1 subbuffer worth of data to a pipe
- * - splice the data from pipe to disk/network
- * - ioctl PUT_SUBBUF, check error value
- * if err val < 0, previous subbuffer was corrupted.
- * }
- * }
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include <linux/time.h>
-#include <linux/module.h>
-#include <linux/string.h>
-#include <linux/slab.h>
-#include <linux/init.h>
-#include <linux/rcupdate.h>
-#include <linux/timer.h>
-#include <linux/sched.h>
-#include <linux/bitops.h>
-#include <linux/smp_lock.h>
-#include <linux/stat.h>
-#include <linux/cpu.h>
-#include <linux/idle.h>
-#include <linux/delay.h>
-#include <linux/notifier.h>
-#include <asm/atomic.h>
-#include <asm/local.h>
-
-#include "ltt-tracer.h"
-#include "ltt-relay.h"
-#include "ltt-relay-lockless.h"
-
-#if 0
-#define printk_dbg(fmt, args...) printk(fmt, args)
-#else
-#define printk_dbg(fmt, args...)
-#endif
-
-struct ltt_reserve_switch_offsets {
- long begin, end, old;
- long begin_switch, end_switch_current, end_switch_old;
- size_t before_hdr_pad, size;
-};
-
-static
-void ltt_force_switch(struct ltt_chanbuf *buf, enum force_switch_mode mode);
-
-static
-void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu);
-
-static const struct file_operations ltt_file_operations;
-
-static
-void ltt_buffer_begin(struct ltt_chanbuf *buf, u64 tsc, unsigned int subbuf_idx)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
- struct ltt_subbuffer_header *header =
- (struct ltt_subbuffer_header *)
- ltt_relay_offset_address(&buf->a,
- subbuf_idx * chan->a.sb_size);
-
- header->cycle_count_begin = tsc;
- header->data_size = 0xFFFFFFFF; /* for debugging */
- ltt_write_trace_header(chan->a.trace, header);
-}
-
-/*
- * offset is assumed to never be 0 here : never deliver a completely empty
- * subbuffer. The lost size is between 0 and subbuf_size-1.
- */
-static
-void ltt_buffer_end(struct ltt_chanbuf *buf, u64 tsc, unsigned int offset,
- unsigned int subbuf_idx)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
- struct ltt_subbuffer_header *header =
- (struct ltt_subbuffer_header *)
- ltt_relay_offset_address(&buf->a,
- subbuf_idx * chan->a.sb_size);
- u32 data_size = SUBBUF_OFFSET(offset - 1, chan) + 1;
-
- header->data_size = data_size;
- header->sb_size = PAGE_ALIGN(data_size);
- header->cycle_count_end = tsc;
- header->events_lost = local_read(&buf->events_lost);
- header->subbuf_corrupt = local_read(&buf->corrupted_subbuffers);
-}
-
-/*
- * Must be called under trace lock or cpu hotplug protection.
- */
-void ltt_chanbuf_free(struct ltt_chanbuf *buf)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
- ltt_relay_print_buffer_errors(chan, buf->a.cpu);
-#ifdef LTT_VMCORE
- kfree(buf->commit_seq);
-#endif
- kfree(buf->commit_count);
-
- ltt_chanbuf_alloc_free(&buf->a);
-}
-
-/*
- * Must be called under trace lock or cpu hotplug protection.
- */
-int ltt_chanbuf_create(struct ltt_chanbuf *buf, struct ltt_chan_alloc *chana,
- int cpu)
-{
- struct ltt_chan *chan = container_of(chana, struct ltt_chan, a);
- struct ltt_trace *trace = chana->trace;
- unsigned int j, n_sb;
- int ret;
-
- /* Test for cpu hotplug */
- if (buf->a.allocated)
- return 0;
-
- ret = ltt_chanbuf_alloc_create(&buf->a, &chan->a, cpu);
- if (ret)
- return ret;
-
- buf->commit_count =
- kzalloc_node(ALIGN(sizeof(*buf->commit_count) * chan->a.n_sb,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(cpu));
- if (!buf->commit_count) {
- ret = -ENOMEM;
- goto free_chanbuf;
- }
-
-#ifdef LTT_VMCORE
- buf->commit_seq =
- kzalloc_node(ALIGN(sizeof(*buf->commit_seq) * chan->a.n_sb,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(cpu));
- if (!buf->commit_seq) {
- kfree(buf->commit_count);
- ret = -ENOMEM;
- goto free_commit;
- }
-#endif
-
- local_set(&buf->offset, ltt_sb_header_size());
- atomic_long_set(&buf->consumed, 0);
- atomic_long_set(&buf->active_readers, 0);
- n_sb = chan->a.n_sb;
- for (j = 0; j < n_sb; j++) {
- local_set(&buf->commit_count[j].cc, 0);
- local_set(&buf->commit_count[j].cc_sb, 0);
- local_set(&buf->commit_count[j].events, 0);
- }
- init_waitqueue_head(&buf->write_wait);
- init_waitqueue_head(&buf->read_wait);
- spin_lock_init(&buf->full_lock);
-
- RCHAN_SB_CLEAR_NOREF(buf->a.buf_wsb[0].pages);
- ltt_buffer_begin(buf, trace->start_tsc, 0);
- /* atomic_add made on local variable on data that belongs to
- * various CPUs : ok because tracing not started (for this cpu). */
- local_add(ltt_sb_header_size(), &buf->commit_count[0].cc);
-
- local_set(&buf->events_lost, 0);
- local_set(&buf->corrupted_subbuffers, 0);
- buf->finalized = 0;
-
- ret = ltt_chanbuf_create_file(chan->a.filename, chan->a.parent,
- S_IRUSR, buf);
- if (ret)
- goto free_init;
-
- /*
- * Ensure the buffer is ready before setting it to allocated.
- * Used for cpu hotplug vs async wakeup.
- */
- smp_wmb();
- buf->a.allocated = 1;
-
- return 0;
-
- /* Error handling */
-free_init:
-#ifdef LTT_VMCORE
- kfree(buf->commit_seq);
-free_commit:
-#endif
- kfree(buf->commit_count);
-free_chanbuf:
- ltt_chanbuf_alloc_free(&buf->a);
- return ret;
-}
-
-void ltt_chan_remove_files(struct ltt_chan *chan)
-{
- ltt_ascii_remove(chan);
- ltt_chan_alloc_remove_files(&chan->a);
-}
-EXPORT_SYMBOL_GPL(ltt_chan_remove_files);
-
-
-void ltt_chan_free(struct kref *kref)
-{
- struct ltt_chan *chan = container_of(kref, struct ltt_chan, a.kref);
-
- ltt_chan_alloc_free(&chan->a);
-}
-EXPORT_SYMBOL_GPL(ltt_chan_free);
-
-/**
- * ltt_chan_create - Create channel.
- */
-int ltt_chan_create(const char *base_filename,
- struct ltt_chan *chan, struct dentry *parent,
- size_t sb_size, size_t n_sb,
- int overwrite, struct ltt_trace *trace)
-{
- int ret;
-
- chan->overwrite = overwrite;
-
- ret = ltt_chan_alloc_init(&chan->a, trace, base_filename, parent,
- sb_size, n_sb, overwrite, overwrite);
- if (ret)
- goto error;
-
- chan->commit_count_mask = (~0UL >> chan->a.n_sb_order);
-
- ret = ltt_ascii_create(chan);
- if (ret)
- goto error_chan_alloc_free;
-
- return ret;
-
-error_chan_alloc_free:
- ltt_chan_alloc_free(&chan->a);
-error:
- return ret;
-}
-EXPORT_SYMBOL_GPL(ltt_chan_create);
-
-int ltt_chanbuf_open_read(struct ltt_chanbuf *buf)
-{
- kref_get(&buf->a.chan->kref);
- if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) {
- kref_put(&buf->a.chan->kref, ltt_chan_free);
- return -EBUSY;
- }
-
- return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_open_read);
-
-void ltt_chanbuf_release_read(struct ltt_chanbuf *buf)
-{
- //ltt_relay_destroy_buffer(&buf->a.chan->a, buf->a.cpu);
- WARN_ON(atomic_long_read(&buf->active_readers) != 1);
- atomic_long_dec(&buf->active_readers);
- kref_put(&buf->a.chan->kref, ltt_chan_free);
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_release_read);
-
-/*
- * Wake writers :
- *
- * This must be done after the trace is removed from the RCU list so that there
- * are no stalled writers.
- */
-static void ltt_relay_wake_writers(struct ltt_chanbuf *buf)
-{
-
- if (waitqueue_active(&buf->write_wait))
- wake_up_interruptible(&buf->write_wait);
-}
-
-/*
- * This function should not be called from NMI interrupt context
- */
-static void ltt_buf_unfull(struct ltt_chanbuf *buf)
-{
- ltt_relay_wake_writers(buf);
-}
-
-/*
- * Promote compiler barrier to a smp_mb().
- * For the specific LTTng case, this IPI call should be removed if the
- * architecture does not reorder writes. This should eventually be provided by
- * a separate architecture-specific infrastructure.
- */
-static void remote_mb(void *info)
-{
- smp_mb();
-}
-
-int ltt_chanbuf_get_subbuf(struct ltt_chanbuf *buf, unsigned long *consumed)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
- long consumed_old, consumed_idx, commit_count, write_offset;
- int ret;
-
- consumed_old = atomic_long_read(&buf->consumed);
- consumed_idx = SUBBUF_INDEX(consumed_old, chan);
- commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
- /*
- * Make sure we read the commit count before reading the buffer
- * data and the write offset. Correct consumed offset ordering
- * wrt commit count is insured by the use of cmpxchg to update
- * the consumed offset.
- * smp_call_function_single can fail if the remote CPU is offline,
- * this is OK because then there is no wmb to execute there.
- * If our thread is executing on the same CPU as the on the buffers
- * belongs to, we don't have to synchronize it at all. If we are
- * migrated, the scheduler will take care of the memory barriers.
- * Normally, smp_call_function_single() should ensure program order when
- * executing the remote function, which implies that it surrounds the
- * function execution with :
- * smp_mb()
- * send IPI
- * csd_lock_wait
- * recv IPI
- * smp_mb()
- * exec. function
- * smp_mb()
- * csd unlock
- * smp_mb()
- *
- * However, smp_call_function_single() does not seem to clearly execute
- * such barriers. It depends on spinlock semantic to provide the barrier
- * before executing the IPI and, when busy-looping, csd_lock_wait only
- * executes smp_mb() when it has to wait for the other CPU.
- *
- * I don't trust this code. Therefore, let's add the smp_mb() sequence
- * required ourself, even if duplicated. It has no performance impact
- * anyway.
- *
- * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs
- * read and write vs write. They do not ensure core synchronization. We
- * really have to ensure total order between the 3 barriers running on
- * the 2 CPUs.
- */
-#ifdef LTT_NO_IPI_BARRIER
- /*
- * Local rmb to match the remote wmb to read the commit count before the
- * buffer data and the write offset.
- */
- smp_rmb();
-#else
- if (raw_smp_processor_id() != buf->a.cpu) {
- smp_mb(); /* Total order with IPI handler smp_mb() */
- smp_call_function_single(buf->a.cpu, remote_mb, NULL, 1);
- smp_mb(); /* Total order with IPI handler smp_mb() */
- }
-#endif
- write_offset = local_read(&buf->offset);
- /*
- * Check that the subbuffer we are trying to consume has been
- * already fully committed.
- */
- if (((commit_count - chan->a.sb_size)
- & chan->commit_count_mask)
- - (BUFFER_TRUNC(consumed_old, chan)
- >> chan->a.n_sb_order)
- != 0) {
- return -EAGAIN;
- }
- /*
- * Check that we are not about to read the same subbuffer in
- * which the writer head is.
- */
- if ((SUBBUF_TRUNC(write_offset, chan)
- - SUBBUF_TRUNC(consumed_old, chan))
- == 0) {
- return -EAGAIN;
- }
-
- ret = update_read_sb_index(&buf->a, &chan->a, consumed_idx);
- if (ret)
- return ret;
-
- *consumed = consumed_old;
- return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_get_subbuf);
-
-int ltt_chanbuf_put_subbuf(struct ltt_chanbuf *buf, unsigned long consumed)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
- long consumed_new, consumed_old;
-
- WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-
- consumed_old = consumed;
- consumed_new = SUBBUF_ALIGN(consumed_old, chan);
- WARN_ON_ONCE(RCHAN_SB_IS_NOREF(buf->a.buf_rsb.pages));
- RCHAN_SB_SET_NOREF(buf->a.buf_rsb.pages);
-
- spin_lock(&buf->full_lock);
- if (atomic_long_cmpxchg(&buf->consumed, consumed_old, consumed_new)
- != consumed_old) {
- /* We have been pushed by the writer. */
- spin_unlock(&buf->full_lock);
- /*
- * We exchanged the subbuffer pages. No corruption possible
- * even if the writer did push us. No more -EIO possible.
- */
- return 0;
- } else {
- /* tell the client that buffer is now unfull */
- int index;
- long data;
- index = SUBBUF_INDEX(consumed_old, chan);
- data = BUFFER_OFFSET(consumed_old, chan);
- ltt_buf_unfull(buf);
- spin_unlock(&buf->full_lock);
- }
- return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_chanbuf_put_subbuf);
-
-static void switch_buffer(unsigned long data)
-{
- struct ltt_chanbuf *buf = (struct ltt_chanbuf *)data;
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
- /*
- * Only flush buffers periodically if readers are active.
- */
- if (atomic_long_read(&buf->active_readers))
- ltt_force_switch(buf, FORCE_ACTIVE);
-
- mod_timer_pinned(&buf->switch_timer,
- jiffies + chan->switch_timer_interval);
-}
-
-static void ltt_chanbuf_start_switch_timer(struct ltt_chanbuf *buf)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
- if (!chan->switch_timer_interval)
- return;
-
- init_timer_deferrable(&buf->switch_timer);
- buf->switch_timer.function = switch_buffer;
- buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
- buf->switch_timer.data = (unsigned long)buf;
- add_timer_on(&buf->switch_timer, buf->a.cpu);
-}
-
-/*
- * called with ltt traces lock held.
- */
-void ltt_chan_start_switch_timer(struct ltt_chan *chan)
-{
- int cpu;
-
- if (!chan->switch_timer_interval)
- return;
-
- for_each_online_cpu(cpu) {
- struct ltt_chanbuf *buf;
-
- buf = per_cpu_ptr(chan->a.buf, cpu);
- ltt_chanbuf_start_switch_timer(buf);
- }
-}
-
-static void ltt_chanbuf_stop_switch_timer(struct ltt_chanbuf *buf)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
- if (!chan->switch_timer_interval)
- return;
-
- del_timer_sync(&buf->switch_timer);
-}
-
-/*
- * called with ltt traces lock held.
- */
-void ltt_chan_stop_switch_timer(struct ltt_chan *chan)
-{
- int cpu;
-
- if (!chan->switch_timer_interval)
- return;
-
- for_each_online_cpu(cpu) {
- struct ltt_chanbuf *buf;
-
- buf = per_cpu_ptr(chan->a.buf, cpu);
- ltt_chanbuf_stop_switch_timer(buf);
- }
-}
-
-static void ltt_chanbuf_idle_switch(struct ltt_chanbuf *buf)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
-
- if (chan->switch_timer_interval)
- ltt_force_switch(buf, FORCE_ACTIVE);
-}
-
-/*
- * ltt_chanbuf_switch is called from a remote CPU to ensure that the buffers of
- * a cpu which went down are flushed. Note that if we execute concurrently
- * with trace allocation, a buffer might appear be unallocated (because it
- * detects that the target CPU is offline).
- */
-static void ltt_chanbuf_switch(struct ltt_chanbuf *buf)
-{
- if (buf->a.allocated)
- ltt_force_switch(buf, FORCE_ACTIVE);
-}
-
-/**
- * ltt_chanbuf_hotcpu_callback - CPU hotplug callback
- * @nb: notifier block
- * @action: hotplug action to take
- * @hcpu: CPU number
- *
- * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
- */
-static
-int ltt_chanbuf_hotcpu_callback(struct notifier_block *nb,
- unsigned long action,
- void *hcpu)
-{
- unsigned int cpu = (unsigned long)hcpu;
-
- switch (action) {
- case CPU_DOWN_FAILED:
- case CPU_DOWN_FAILED_FROZEN:
- case CPU_ONLINE:
- case CPU_ONLINE_FROZEN:
- /*
- * CPU hotplug lock protects trace lock from this callback.
- */
- ltt_chan_for_each_channel(ltt_chanbuf_start_switch_timer, cpu);
- return NOTIFY_OK;
-
- case CPU_DOWN_PREPARE:
- case CPU_DOWN_PREPARE_FROZEN:
- /*
- * Performs an IPI to delete the timer locally on the target
- * CPU. CPU hotplug lock protects trace lock from this
- * callback.
- */
- ltt_chan_for_each_channel(ltt_chanbuf_stop_switch_timer, cpu);
- return NOTIFY_OK;
-
- case CPU_DEAD:
- case CPU_DEAD_FROZEN:
- /*
- * Performing a buffer switch on a remote CPU. Performed by
- * the CPU responsible for doing the hotunplug after the target
- * CPU stopped running completely. Ensures that all data
- * from that remote CPU is flushed. CPU hotplug lock protects
- * trace lock from this callback.
- */
- ltt_chan_for_each_channel(ltt_chanbuf_switch, cpu);
- return NOTIFY_OK;
-
- default:
- return NOTIFY_DONE;
- }
-}
-
-static int pm_idle_entry_callback(struct notifier_block *self,
- unsigned long val, void *data)
-{
- if (val == IDLE_START) {
- rcu_read_lock_sched_notrace();
- ltt_chan_for_each_channel(ltt_chanbuf_idle_switch,
- smp_processor_id());
- rcu_read_unlock_sched_notrace();
- }
- return 0;
-}
-
-struct notifier_block pm_idle_entry_notifier = {
- .notifier_call = pm_idle_entry_callback,
- .priority = ~0U, /* smallest prio, run after tracing events */
-};
-
-static
-void ltt_relay_print_written(struct ltt_chan *chan, long cons_off,
- unsigned int cpu)
-{
- struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
- long cons_idx, events_count;
-
- cons_idx = SUBBUF_INDEX(cons_off, chan);
- events_count = local_read(&buf->commit_count[cons_idx].events);
-
- if (events_count)
- printk(KERN_INFO
- "LTT: %lu events written in channel %s "
- "(cpu %u, index %lu)\n",
- events_count, chan->a.filename, cpu, cons_idx);
-}
-
-static
-void ltt_relay_print_subbuffer_errors(struct ltt_chanbuf *buf,
- struct ltt_chan *chan, long cons_off,
- unsigned int cpu)
-{
- long cons_idx, commit_count, commit_count_sb, write_offset;
-
- cons_idx = SUBBUF_INDEX(cons_off, chan);
- commit_count = local_read(&buf->commit_count[cons_idx].cc);
- commit_count_sb = local_read(&buf->commit_count[cons_idx].cc_sb);
- /*
- * No need to order commit_count and write_offset reads because we
- * execute after trace is stopped when there are no readers left.
- */
- write_offset = local_read(&buf->offset);
- printk(KERN_WARNING
- "LTT : unread channel %s offset is %ld "
- "and cons_off : %ld (cpu %u)\n",
- chan->a.filename, write_offset, cons_off, cpu);
- /* Check each sub-buffer for non filled commit count */
- if (((commit_count - chan->a.sb_size) & chan->commit_count_mask)
- - (BUFFER_TRUNC(cons_off, chan) >> chan->a.n_sb_order)
- != 0)
- printk(KERN_ALERT
- "LTT : %s : subbuffer %lu has non filled "
- "commit count [cc, cc_sb] [%lu,%lu].\n",
- chan->a.filename, cons_idx, commit_count,
- commit_count_sb);
- printk(KERN_ALERT "LTT : %s : commit count : %lu, subbuf size %lu\n",
- chan->a.filename, commit_count, chan->a.sb_size);
-}
-
-static
-void ltt_relay_print_errors(struct ltt_chanbuf *buf, struct ltt_chan *chan,
- struct ltt_trace *trace, int cpu)
-{
- long cons_off;
-
- /*
- * Can be called in the error path of allocation when
- * trans_channel_data is not yet set.
- */
- if (!chan)
- return;
- for (cons_off = 0; cons_off < chan->a.buf_size;
- cons_off = SUBBUF_ALIGN(cons_off, chan))
- ltt_relay_print_written(chan, cons_off, cpu);
- for (cons_off = atomic_long_read(&buf->consumed);
- (SUBBUF_TRUNC(local_read(&buf->offset), chan)
- - cons_off) > 0;
- cons_off = SUBBUF_ALIGN(cons_off, chan))
- ltt_relay_print_subbuffer_errors(buf, chan, cons_off, cpu);
-}
-
-static
-void ltt_relay_print_buffer_errors(struct ltt_chan *chan, unsigned int cpu)
-{
- struct ltt_trace *trace = chan->a.trace;
- struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
-
- if (local_read(&buf->events_lost))
- printk(KERN_ALERT
- "LTT : %s : %ld events lost "
- "in %s channel (cpu %u).\n",
- chan->a.filename, local_read(&buf->events_lost),
- chan->a.filename, cpu);
- if (local_read(&buf->corrupted_subbuffers))
- printk(KERN_ALERT
- "LTT : %s : %ld corrupted subbuffers "
- "in %s channel (cpu %u).\n",
- chan->a.filename,
- local_read(&buf->corrupted_subbuffers),
- chan->a.filename, cpu);
-
- ltt_relay_print_errors(buf, chan, trace, cpu);
-}
-
-static void ltt_relay_remove_dirs(struct ltt_trace *trace)
-{
- ltt_ascii_remove_dir(trace);
- debugfs_remove(trace->dentry.trace_root);
-}
-
-static int ltt_relay_create_dirs(struct ltt_trace *new_trace)
-{
- struct dentry *ltt_root_dentry;
- int ret;
-
- ltt_root_dentry = get_ltt_root();
- if (!ltt_root_dentry)
- return ENOENT;
-
- new_trace->dentry.trace_root = debugfs_create_dir(new_trace->trace_name,
- ltt_root_dentry);
- put_ltt_root();
- if (new_trace->dentry.trace_root == NULL) {
- printk(KERN_ERR "LTT : Trace directory name %s already taken\n",
- new_trace->trace_name);
- return EEXIST;
- }
- ret = ltt_ascii_create_dir(new_trace);
- if (ret)
- printk(KERN_WARNING "LTT : Unable to create ascii output file "
- "for trace %s\n", new_trace->trace_name);
-
- return 0;
-}
-
-/*
- * LTTng channel flush function.
- *
- * Must be called when no tracing is active in the channel, because of
- * accesses across CPUs.
- */
-static notrace void ltt_relay_buffer_flush(struct ltt_chanbuf *buf)
-{
- buf->finalized = 1;
- ltt_force_switch(buf, FORCE_FLUSH);
-}
-
-static void ltt_relay_async_wakeup_chan(struct ltt_chan *chan)
-{
- unsigned int i;
-
- for_each_possible_cpu(i) {
- struct ltt_chanbuf *buf;
-
- buf = per_cpu_ptr(chan->a.buf, i);
- if (!buf->a.allocated)
- continue;
- /*
- * Ensure the buffer has been allocated before reading its
- * content. Sync cpu hotplug vs async wakeup.
- */
- smp_rmb();
- if (ltt_poll_deliver(buf, chan))
- wake_up_interruptible(&buf->read_wait);
- }
-}
-
-static void ltt_relay_finish_buffer(struct ltt_chan *chan, unsigned int cpu)
-{
- struct ltt_chanbuf *buf = per_cpu_ptr(chan->a.buf, cpu);
-
- if (buf->a.allocated) {
- ltt_relay_buffer_flush(buf);
- ltt_relay_wake_writers(buf);
- }
-}
-
-
-static void ltt_relay_finish_channel(struct ltt_chan *chan)
-{
- unsigned int i;
-
- for_each_possible_cpu(i)
- ltt_relay_finish_buffer(chan, i);
-}
-
-/*
- * This is called with preemption disabled when user space has requested
- * blocking mode. If one of the active traces has free space below a
- * specific threshold value, we reenable preemption and block.
- */
-static
-int ltt_relay_user_blocking(struct ltt_trace *trace, unsigned int chan_index,
- size_t data_size, struct user_dbg_data *dbg)
-{
- struct ltt_chanbuf *buf;
- struct ltt_chan *chan;
- int cpu;
- DECLARE_WAITQUEUE(wait, current);
-
- chan = &trace->channels[chan_index];
- cpu = smp_processor_id();
- buf = per_cpu_ptr(chan->a.buf, cpu);
-
- /*
- * Check if data is too big for the channel : do not
- * block for it.
- */
- if (LTT_RESERVE_CRITICAL + data_size > chan->a.sb_size)
- return 0;
-
- /*
- * If free space too low, we block. We restart from the
- * beginning after we resume (cpu id may have changed
- * while preemption is active).
- */
- spin_lock(&buf->full_lock);
- if (!chan->overwrite) {
- dbg->write = local_read(&buf->offset);
- dbg->read = atomic_long_read(&buf->consumed);
- dbg->avail_size = dbg->write + LTT_RESERVE_CRITICAL + data_size
- - SUBBUF_TRUNC(dbg->read, chan);
- if (dbg->avail_size > chan->a.buf_size) {
- __set_current_state(TASK_INTERRUPTIBLE);
- add_wait_queue(&buf->write_wait, &wait);
- spin_unlock(&buf->full_lock);
- preempt_enable();
- schedule();
- __set_current_state(TASK_RUNNING);
- remove_wait_queue(&buf->write_wait, &wait);
- if (signal_pending(current))
- return -ERESTARTSYS;
- preempt_disable();
- return 1;
- }
- }
- spin_unlock(&buf->full_lock);
- return 0;
-}
-
-static
-void ltt_relay_print_user_errors(struct ltt_trace *trace,
- unsigned int chan_index, size_t data_size,
- struct user_dbg_data *dbg, int cpu)
-{
- struct ltt_chanbuf *buf;
- struct ltt_chan *chan;
-
- chan = &trace->channels[chan_index];
- buf = per_cpu_ptr(chan->a.buf, cpu);
-
- printk(KERN_ERR "Error in LTT usertrace : "
- "buffer full : event lost in blocking "
- "mode. Increase LTT_RESERVE_CRITICAL.\n");
- printk(KERN_ERR "LTT nesting level is %u.\n",
- per_cpu(ltt_nesting, cpu));
- printk(KERN_ERR "LTT available size %lu.\n",
- dbg->avail_size);
- printk(KERN_ERR "available write : %lu, read : %lu\n",
- dbg->write, dbg->read);
-
- dbg->write = local_read(&buf->offset);
- dbg->read = atomic_long_read(&buf->consumed);
-
- printk(KERN_ERR "LTT current size %lu.\n",
- dbg->write + LTT_RESERVE_CRITICAL + data_size
- - SUBBUF_TRUNC(dbg->read, chan));
- printk(KERN_ERR "current write : %lu, read : %lu\n",
- dbg->write, dbg->read);
-}
-
-/*
- * ltt_reserve_switch_old_subbuf: switch old subbuffer
- *
- * Concurrency safe because we are the last and only thread to alter this
- * sub-buffer. As long as it is not delivered and read, no other thread can
- * alter the offset, alter the reserve_count or call the
- * client_buffer_end_callback on this sub-buffer.
- *
- * The only remaining threads could be the ones with pending commits. They will
- * have to do the deliver themselves. Not concurrency safe in overwrite mode.
- * We detect corrupted subbuffers with commit and reserve counts. We keep a
- * corrupted sub-buffers count and push the readers across these sub-buffers.
- *
- * Not concurrency safe if a writer is stalled in a subbuffer and another writer
- * switches in, finding out it's corrupted. The result will be than the old
- * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
- * will be declared corrupted too because of the commit count adjustment.
- *
- * Note : offset_old should never be 0 here.
- */
-static
-void ltt_reserve_switch_old_subbuf(struct ltt_chanbuf *buf,
- struct ltt_chan *chan,
- struct ltt_reserve_switch_offsets *offsets,
- u64 *tsc)
-{
- long oldidx = SUBBUF_INDEX(offsets->old - 1, chan);
- long commit_count, padding_size;
-
- padding_size = chan->a.sb_size
- - (SUBBUF_OFFSET(offsets->old - 1, chan) + 1);
- ltt_buffer_end(buf, *tsc, offsets->old, oldidx);
-
- /*
- * Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_wmb() by the IPI
- * sent by get_subbuf() when it does its smp_rmb().
- */
- barrier();
- local_add(padding_size, &buf->commit_count[oldidx].cc);
- commit_count = local_read(&buf->commit_count[oldidx].cc);
- ltt_check_deliver(buf, chan, offsets->old - 1, commit_count, oldidx);
- ltt_write_commit_counter(buf, chan, oldidx, offsets->old, commit_count,
- padding_size);
-}
-
-/*
- * ltt_reserve_switch_new_subbuf: Populate new subbuffer.
- *
- * This code can be executed unordered : writers may already have written to the
- * sub-buffer before this code gets executed, caution. The commit makes sure
- * that this code is executed before the deliver of this sub-buffer.
- */
-static
-void ltt_reserve_switch_new_subbuf(struct ltt_chanbuf *buf,
- struct ltt_chan *chan,
- struct ltt_reserve_switch_offsets *offsets,
- u64 *tsc)
-{
- long beginidx = SUBBUF_INDEX(offsets->begin, chan);
- long commit_count;
-
- ltt_buffer_begin(buf, *tsc, beginidx);
-
- /*
- * Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_wmb() by the IPI
- * sent by get_subbuf() when it does its smp_rmb().
- */
- barrier();
- local_add(ltt_sb_header_size(), &buf->commit_count[beginidx].cc);
- commit_count = local_read(&buf->commit_count[beginidx].cc);
- /* Check if the written buffer has to be delivered */
- ltt_check_deliver(buf, chan, offsets->begin, commit_count, beginidx);
- ltt_write_commit_counter(buf, chan, beginidx, offsets->begin,
- commit_count, ltt_sb_header_size());
-}
-
-
-/*
- * ltt_reserve_end_switch_current: finish switching current subbuffer
- *
- * Concurrency safe because we are the last and only thread to alter this
- * sub-buffer. As long as it is not delivered and read, no other thread can
- * alter the offset, alter the reserve_count or call the
- * client_buffer_end_callback on this sub-buffer.
- *
- * The only remaining threads could be the ones with pending commits. They will
- * have to do the deliver themselves. Not concurrency safe in overwrite mode.
- * We detect corrupted subbuffers with commit and reserve counts. We keep a
- * corrupted sub-buffers count and push the readers across these sub-buffers.
- *
- * Not concurrency safe if a writer is stalled in a subbuffer and another writer
- * switches in, finding out it's corrupted. The result will be than the old
- * (uncommited) subbuffer will be declared corrupted, and that the new subbuffer
- * will be declared corrupted too because of the commit count adjustment.
- */
-static
-void ltt_reserve_end_switch_current(struct ltt_chanbuf *buf,
- struct ltt_chan *chan,
- struct ltt_reserve_switch_offsets *offsets,
- u64 *tsc)
-{
- long endidx = SUBBUF_INDEX(offsets->end - 1, chan);
- long commit_count, padding_size;
-
- padding_size = chan->a.sb_size
- - (SUBBUF_OFFSET(offsets->end - 1, chan) + 1);
-
- ltt_buffer_end(buf, *tsc, offsets->end, endidx);
-
- /*
- * Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_wmb() by the IPI
- * sent by get_subbuf() when it does its smp_rmb().
- */
- barrier();
- local_add(padding_size, &buf->commit_count[endidx].cc);
- commit_count = local_read(&buf->commit_count[endidx].cc);
- ltt_check_deliver(buf, chan, offsets->end - 1, commit_count, endidx);
- ltt_write_commit_counter(buf, chan, endidx, offsets->end, commit_count,
- padding_size);
-}
-
-/*
- * Returns :
- * 0 if ok
- * !0 if execution must be aborted.
- */
-static
-int ltt_relay_try_switch_slow(enum force_switch_mode mode,
- struct ltt_chanbuf *buf, struct ltt_chan *chan,
- struct ltt_reserve_switch_offsets *offsets,
- u64 *tsc)
-{
- long sb_index;
- long reserve_commit_diff;
- long off;
-
- offsets->begin = local_read(&buf->offset);
- offsets->old = offsets->begin;
- offsets->begin_switch = 0;
- offsets->end_switch_old = 0;
-
- *tsc = trace_clock_read64();
-
- off = SUBBUF_OFFSET(offsets->begin, chan);
- if ((mode != FORCE_ACTIVE && off > 0) || off > ltt_sb_header_size()) {
- offsets->begin = SUBBUF_ALIGN(offsets->begin, chan);
- offsets->end_switch_old = 1;
- } else {
- /* we do not have to switch : buffer is empty */
- return -1;
- }
- if (mode == FORCE_ACTIVE)
- offsets->begin += ltt_sb_header_size();
- /*
- * Always begin_switch in FORCE_ACTIVE mode.
- * Test new buffer integrity
- */
- sb_index = SUBBUF_INDEX(offsets->begin, chan);
- reserve_commit_diff =
- (BUFFER_TRUNC(offsets->begin, chan)
- >> chan->a.n_sb_order)
- - (local_read(&buf->commit_count[sb_index].cc_sb)
- & chan->commit_count_mask);
- if (reserve_commit_diff == 0) {
- /* Next buffer not corrupted. */
- if (mode == FORCE_ACTIVE
- && !chan->overwrite
- && offsets->begin - atomic_long_read(&buf->consumed)
- >= chan->a.buf_size) {
- /*
- * We do not overwrite non consumed buffers and we are
- * full : ignore switch while tracing is active.
- */
- return -1;
- }
- } else {
- /*
- * Next subbuffer corrupted. Force pushing reader even in normal
- * mode
- */
- }
- offsets->end = offsets->begin;
- return 0;
-}
-
-/*
- * Force a sub-buffer switch for a per-cpu buffer. This operation is
- * completely reentrant : can be called while tracing is active with
- * absolutely no lock held.
- *
- * Note, however, that as a local_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
- */
-void ltt_force_switch_lockless_slow(struct ltt_chanbuf *buf,
- enum force_switch_mode mode)
-{
- struct ltt_chan *chan = container_of(buf->a.chan, struct ltt_chan, a);
- struct ltt_reserve_switch_offsets offsets;
- u64 tsc;
-
- offsets.size = 0;
-
- /*
- * Perform retryable operations.
- */
- do {
- if (ltt_relay_try_switch_slow(mode, buf, chan, &offsets, &tsc))
- return;
- } while (local_cmpxchg(&buf->offset, offsets.old, offsets.end)
- != offsets.old);
-
- /*
- * Atomically update last_tsc. This update races against concurrent
- * atomic updates, but the race will always cause supplementary full TSC
- * events, never the opposite (missing a full TSC event when it would be
- * needed).
- */
- save_last_tsc(buf, tsc);
-
- /*
- * Push the reader if necessary
- */
- if (mode == FORCE_ACTIVE) {
- ltt_reserve_push_reader(buf, chan, offsets.end - 1);
- ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1,
- chan));
- }
-
- /*
- * Switch old subbuffer if needed.
- */
- if (offsets.end_switch_old) {
- ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1,
- chan));
- ltt_reserve_switch_old_subbuf(buf, chan, &offsets, &tsc);
- }
-
- /*
- * Populate new subbuffer.
- */
- if (mode == FORCE_ACTIVE)
- ltt_reserve_switch_new_subbuf(buf, chan, &offsets, &tsc);
-}
-EXPORT_SYMBOL_GPL(ltt_force_switch_lockless_slow);
-
-/*
- * Returns :
- * 0 if ok
- * !0 if execution must be aborted.
- */
-static
-int ltt_relay_try_reserve_slow(struct ltt_chanbuf *buf, struct ltt_chan *chan,
- struct ltt_reserve_switch_offsets *offsets,
- size_t data_size, u64 *tsc, unsigned int *rflags,
- int largest_align)
-{
- long reserve_commit_diff;
-
- offsets->begin = local_read(&buf->offset);
- offsets->old = offsets->begin;
- offsets->begin_switch = 0;
- offsets->end_switch_current = 0;
- offsets->end_switch_old = 0;
-
- *tsc = trace_clock_read64();
- if (last_tsc_overflow(buf, *tsc))
- *rflags = LTT_RFLAG_ID_SIZE_TSC;
-
- if (unlikely(SUBBUF_OFFSET(offsets->begin, chan) == 0)) {
- offsets->begin_switch = 1; /* For offsets->begin */
- } else {
- offsets->size = ltt_get_header_size(chan, offsets->begin,
- data_size,
- &offsets->before_hdr_pad,
- *rflags);
- offsets->size += ltt_align(offsets->begin + offsets->size,
- largest_align)
- + data_size;
- if (unlikely((SUBBUF_OFFSET(offsets->begin, chan) +
- offsets->size) > chan->a.sb_size)) {
- offsets->end_switch_old = 1; /* For offsets->old */
- offsets->begin_switch = 1; /* For offsets->begin */
- }
- }
- if (unlikely(offsets->begin_switch)) {
- long sb_index;
-
- /*
- * We are typically not filling the previous buffer completely.
- */
- if (likely(offsets->end_switch_old))
- offsets->begin = SUBBUF_ALIGN(offsets->begin, chan);
- offsets->begin = offsets->begin + ltt_sb_header_size();
- /* Test new buffer integrity */
- sb_index = SUBBUF_INDEX(offsets->begin, chan);
- reserve_commit_diff =
- (BUFFER_TRUNC(offsets->begin, chan)
- >> chan->a.n_sb_order)
- - (local_read(&buf->commit_count[sb_index].cc_sb)
- & chan->commit_count_mask);
- if (likely(reserve_commit_diff == 0)) {
- /* Next buffer not corrupted. */
- if (unlikely(!chan->overwrite &&
- (SUBBUF_TRUNC(offsets->begin, chan)
- - SUBBUF_TRUNC(atomic_long_read(&buf->consumed),
- chan))
- >= chan->a.buf_size)) {
- /*
- * We do not overwrite non consumed buffers
- * and we are full : event is lost.
- */
- local_inc(&buf->events_lost);
- return -1;
- } else {
- /*
- * next buffer not corrupted, we are either in
- * overwrite mode or the buffer is not full.
- * It's safe to write in this new subbuffer.
- */
- }
- } else {
- /*
- * Next subbuffer corrupted. Drop event in normal and
- * overwrite mode. Caused by either a writer OOPS or
- * too many nested writes over a reserve/commit pair.
- */
- local_inc(&buf->events_lost);
- return -1;
- }
- offsets->size = ltt_get_header_size(chan, offsets->begin,
- data_size,
- &offsets->before_hdr_pad,
- *rflags);
- offsets->size += ltt_align(offsets->begin + offsets->size,
- largest_align)
- + data_size;
- if (unlikely((SUBBUF_OFFSET(offsets->begin, chan)
- + offsets->size) > chan->a.sb_size)) {
- /*
- * Event too big for subbuffers, report error, don't
- * complete the sub-buffer switch.
- */
- local_inc(&buf->events_lost);
- return -1;
- } else {
- /*
- * We just made a successful buffer switch and the event
- * fits in the new subbuffer. Let's write.
- */
- }
- } else {
- /*
- * Event fits in the current buffer and we are not on a switch
- * boundary. It's safe to write.
- */
- }
- offsets->end = offsets->begin + offsets->size;
-
- if (unlikely((SUBBUF_OFFSET(offsets->end, chan)) == 0)) {
- /*
- * The offset_end will fall at the very beginning of the next
- * subbuffer.
- */
- offsets->end_switch_current = 1; /* For offsets->begin */
- }
- return 0;
-}
-
-/**
- * ltt_relay_reserve_slot_lockless_slow - Atomic slot reservation in a buffer.
- * @trace: the trace structure to log to.
- * @ltt_channel: channel structure
- * @transport_data: data structure specific to ltt relay
- * @data_size: size of the variable length data to log.
- * @slot_size: pointer to total size of the slot (out)
- * @buf_offset : pointer to reserved buffer offset (out)
- * @tsc: pointer to the tsc at the slot reservation (out)
- * @cpu: cpuid
- *
- * Return : -ENOSPC if not enough space, else returns 0.
- * It will take care of sub-buffer switching.
- */
-int ltt_reserve_slot_lockless_slow(struct ltt_chan *chan,
- struct ltt_trace *trace, size_t data_size,
- int largest_align, int cpu,
- struct ltt_chanbuf **ret_buf,
- size_t *slot_size, long *buf_offset,
- u64 *tsc, unsigned int *rflags)
-{
- struct ltt_chanbuf *buf = *ret_buf = per_cpu_ptr(chan->a.buf, cpu);
- struct ltt_reserve_switch_offsets offsets;
-
- offsets.size = 0;
-
- do {
- if (unlikely(ltt_relay_try_reserve_slow(buf, chan, &offsets,
- data_size, tsc, rflags,
- largest_align)))
- return -ENOSPC;
- } while (unlikely(local_cmpxchg(&buf->offset, offsets.old, offsets.end)
- != offsets.old));
-
- /*
- * Atomically update last_tsc. This update races against concurrent
- * atomic updates, but the race will always cause supplementary full TSC
- * events, never the opposite (missing a full TSC event when it would be
- * needed).
- */
- save_last_tsc(buf, *tsc);
-
- /*
- * Push the reader if necessary
- */
- ltt_reserve_push_reader(buf, chan, offsets.end - 1);
-
- /*
- * Clear noref flag for this subbuffer.
- */
- ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.end - 1, chan));
-
- /*
- * Switch old subbuffer if needed.
- */
- if (unlikely(offsets.end_switch_old)) {
- ltt_clear_noref_flag(&buf->a, SUBBUF_INDEX(offsets.old - 1,
- chan));
- ltt_reserve_switch_old_subbuf(buf, chan, &offsets, tsc);
- }
-
- /*
- * Populate new subbuffer.
- */
- if (unlikely(offsets.begin_switch))
- ltt_reserve_switch_new_subbuf(buf, chan, &offsets, tsc);
-
- if (unlikely(offsets.end_switch_current))
- ltt_reserve_end_switch_current(buf, chan, &offsets, tsc);
-
- *slot_size = offsets.size;
- *buf_offset = offsets.begin + offsets.before_hdr_pad;
- return 0;
-}
-EXPORT_SYMBOL_GPL(ltt_reserve_slot_lockless_slow);
-
-static struct ltt_transport ltt_relay_transport = {
- .name = "relay",
- .owner = THIS_MODULE,
- .ops = {
- .create_dirs = ltt_relay_create_dirs,
- .remove_dirs = ltt_relay_remove_dirs,
- .create_channel = ltt_chan_create,
- .finish_channel = ltt_relay_finish_channel,
- .remove_channel = ltt_chan_free,
- .remove_channel_files = ltt_chan_remove_files,
- .wakeup_channel = ltt_relay_async_wakeup_chan,
- .user_blocking = ltt_relay_user_blocking,
- .user_errors = ltt_relay_print_user_errors,
- .start_switch_timer = ltt_chan_start_switch_timer,
- .stop_switch_timer = ltt_chan_stop_switch_timer,
- },
-};
-
-static struct notifier_block fn_ltt_chanbuf_hotcpu_callback = {
- .notifier_call = ltt_chanbuf_hotcpu_callback,
- .priority = 6,
-};
-
-int __init ltt_relay_init(void)
-{
- printk(KERN_INFO "LTT : ltt-relay init\n");
-
- ltt_transport_register(<t_relay_transport);
- register_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback);
- register_idle_notifier(&pm_idle_entry_notifier);
-
- return 0;
-}
-
-void __exit ltt_relay_exit(void)
-{
- printk(KERN_INFO "LTT : ltt-relay exit\n");
-
- unregister_idle_notifier(&pm_idle_entry_notifier);
- unregister_cpu_notifier(&fn_ltt_chanbuf_hotcpu_callback);
- ltt_transport_unregister(<t_relay_transport);
-}
-
-MODULE_LICENSE("GPL and additional rights");
-MODULE_AUTHOR("Mathieu Desnoyers");
-MODULE_DESCRIPTION("Linux Trace Toolkit Next Generation Lockless Relay");