/* MALLOCATION */
-#define zmalloc(s) calloc(1, s)
+#include <stdlib.h>
+
+static inline
+void *zmalloc(size_t len)
+{
+ return calloc(1, len);
+}
+
+static inline
+void *malloc_align(size_t len)
+{
+ return malloc(ALIGN(len, CAA_CACHE_LINE_SIZE));
+}
+
+static inline
+void *zmalloc_align(size_t len)
+{
+ return calloc(1, ALIGN(len, CAA_CACHE_LINE_SIZE));
+}
/* MATH */
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
+#ifndef inline_memcpy
+#define inline_memcpy memcpy
+#endif
+
+#ifndef __same_type
+#define __same_type(a, b) __builtin_types_compatible_p(typeof(a), typeof(b))
+#endif
+
#endif /* UST_CORE_H */
+++ /dev/null
-/*
- * lttng_prio_heap.c
- *
- * Priority heap containing pointers. Based on CLRS, chapter 6.
- *
- * Copyright 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include <linux/slab.h>
-#include "lttng_prio_heap.h"
-
-#ifdef DEBUG_HEAP
-void lttng_check_heap(const struct lttng_ptr_heap *heap)
-{
- size_t i;
-
- if (!heap->len)
- return;
-
- for (i = 1; i < heap->len; i++)
- WARN_ON_ONCE(!heap->gt(heap->ptrs[i], heap->ptrs[0]));
-}
-#endif
-
-static
-size_t parent(size_t i)
-{
- return (i -1) >> 1;
-}
-
-static
-size_t left(size_t i)
-{
- return (i << 1) + 1;
-}
-
-static
-size_t right(size_t i)
-{
- return (i << 1) + 2;
-}
-
-/*
- * Copy of heap->ptrs pointer is invalid after heap_grow.
- */
-static
-int heap_grow(struct lttng_ptr_heap *heap, size_t new_len)
-{
- void **new_ptrs;
-
- if (heap->alloc_len >= new_len)
- return 0;
-
- heap->alloc_len = max_t(size_t, new_len, heap->alloc_len << 1);
- new_ptrs = kmalloc(heap->alloc_len * sizeof(void *), heap->gfpmask);
- if (!new_ptrs)
- return -ENOMEM;
- if (heap->ptrs)
- memcpy(new_ptrs, heap->ptrs, heap->len * sizeof(void *));
- kfree(heap->ptrs);
- heap->ptrs = new_ptrs;
- return 0;
-}
-
-static
-int heap_set_len(struct lttng_ptr_heap *heap, size_t new_len)
-{
- int ret;
-
- ret = heap_grow(heap, new_len);
- if (ret)
- return ret;
- heap->len = new_len;
- return 0;
-}
-
-int lttng_heap_init(struct lttng_ptr_heap *heap, size_t alloc_len,
- gfp_t gfpmask, int gt(void *a, void *b))
-{
- heap->ptrs = NULL;
- heap->len = 0;
- heap->alloc_len = 0;
- heap->gt = gt;
- /*
- * Minimum size allocated is 1 entry to ensure memory allocation
- * never fails within heap_replace_max.
- */
- return heap_grow(heap, max_t(size_t, 1, alloc_len));
-}
-
-void lttng_heap_free(struct lttng_ptr_heap *heap)
-{
- kfree(heap->ptrs);
-}
-
-static void heapify(struct lttng_ptr_heap *heap, size_t i)
-{
- void **ptrs = heap->ptrs;
- size_t l, r, largest;
-
- for (;;) {
- void *tmp;
-
- l = left(i);
- r = right(i);
- if (l < heap->len && heap->gt(ptrs[l], ptrs[i]))
- largest = l;
- else
- largest = i;
- if (r < heap->len && heap->gt(ptrs[r], ptrs[largest]))
- largest = r;
- if (largest == i)
- break;
- tmp = ptrs[i];
- ptrs[i] = ptrs[largest];
- ptrs[largest] = tmp;
- i = largest;
- }
- lttng_check_heap(heap);
-}
-
-void *lttng_heap_replace_max(struct lttng_ptr_heap *heap, void *p)
-{
- void *res;
-
- if (!heap->len) {
- (void) heap_set_len(heap, 1);
- heap->ptrs[0] = p;
- lttng_check_heap(heap);
- return NULL;
- }
-
- /* Replace the current max and heapify */
- res = heap->ptrs[0];
- heap->ptrs[0] = p;
- heapify(heap, 0);
- return res;
-}
-
-int lttng_heap_insert(struct lttng_ptr_heap *heap, void *p)
-{
- void **ptrs;
- size_t pos;
- int ret;
-
- ret = heap_set_len(heap, heap->len + 1);
- if (ret)
- return ret;
- ptrs = heap->ptrs;
- pos = heap->len - 1;
- while (pos > 0 && heap->gt(p, ptrs[parent(pos)])) {
- /* Move parent down until we find the right spot */
- ptrs[pos] = ptrs[parent(pos)];
- pos = parent(pos);
- }
- ptrs[pos] = p;
- lttng_check_heap(heap);
- return 0;
-}
-
-void *lttng_heap_remove(struct lttng_ptr_heap *heap)
-{
- switch (heap->len) {
- case 0:
- return NULL;
- case 1:
- (void) heap_set_len(heap, 0);
- return heap->ptrs[0];
- }
- /* Shrink, replace the current max by previous last entry and heapify */
- heap_set_len(heap, heap->len - 1);
- /* len changed. previous last entry is at heap->len */
- return lttng_heap_replace_max(heap, heap->ptrs[heap->len]);
-}
-
-void *lttng_heap_cherrypick(struct lttng_ptr_heap *heap, void *p)
-{
- size_t pos, len = heap->len;
-
- for (pos = 0; pos < len; pos++)
- if (heap->ptrs[pos] == p)
- goto found;
- return NULL;
-found:
- if (heap->len == 1) {
- (void) heap_set_len(heap, 0);
- lttng_check_heap(heap);
- return heap->ptrs[0];
- }
- /* Replace p with previous last entry and heapify. */
- heap_set_len(heap, heap->len - 1);
- /* len changed. previous last entry is at heap->len */
- heap->ptrs[pos] = heap->ptrs[heap->len];
- heapify(heap, pos);
- return p;
-}
+++ /dev/null
-#ifndef _LTTNG_PRIO_HEAP_H
-#define _LTTNG_PRIO_HEAP_H
-
-/*
- * lttng_prio_heap.h
- *
- * Priority heap containing pointers. Based on CLRS, chapter 6.
- *
- * Copyright 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include <linux/gfp.h>
-
-struct lttng_ptr_heap {
- size_t len, alloc_len;
- void **ptrs;
- int (*gt)(void *a, void *b);
- gfp_t gfpmask;
-};
-
-#ifdef DEBUG_HEAP
-void lttng_check_heap(const struct lttng_ptr_heap *heap);
-#else
-static inline
-void lttng_check_heap(const struct lttng_ptr_heap *heap)
-{
-}
-#endif
-
-/**
- * lttng_heap_maximum - return the largest element in the heap
- * @heap: the heap to be operated on
- *
- * Returns the largest element in the heap, without performing any modification
- * to the heap structure. Returns NULL if the heap is empty.
- */
-static inline void *lttng_heap_maximum(const struct lttng_ptr_heap *heap)
-{
- lttng_check_heap(heap);
- return heap->len ? heap->ptrs[0] : NULL;
-}
-
-/**
- * lttng_heap_init - initialize the heap
- * @heap: the heap to initialize
- * @alloc_len: number of elements initially allocated
- * @gfp: allocation flags
- * @gt: function to compare the elements
- *
- * Returns -ENOMEM if out of memory.
- */
-extern int lttng_heap_init(struct lttng_ptr_heap *heap,
- size_t alloc_len, gfp_t gfpmask,
- int gt(void *a, void *b));
-
-/**
- * lttng_heap_free - free the heap
- * @heap: the heap to free
- */
-extern void lttng_heap_free(struct lttng_ptr_heap *heap);
-
-/**
- * lttng_heap_insert - insert an element into the heap
- * @heap: the heap to be operated on
- * @p: the element to add
- *
- * Insert an element into the heap.
- *
- * Returns -ENOMEM if out of memory.
- */
-extern int lttng_heap_insert(struct lttng_ptr_heap *heap, void *p);
-
-/**
- * lttng_heap_remove - remove the largest element from the heap
- * @heap: the heap to be operated on
- *
- * Returns the largest element in the heap. It removes this element from the
- * heap. Returns NULL if the heap is empty.
- */
-extern void *lttng_heap_remove(struct lttng_ptr_heap *heap);
-
-/**
- * lttng_heap_cherrypick - remove a given element from the heap
- * @heap: the heap to be operated on
- * @p: the element
- *
- * Remove the given element from the heap. Return the element if present, else
- * return NULL. This algorithm has a complexity of O(n), which is higher than
- * O(log(n)) provided by the rest of this API.
- */
-extern void *lttng_heap_cherrypick(struct lttng_ptr_heap *heap, void *p);
-
-/**
- * lttng_heap_replace_max - replace the the largest element from the heap
- * @heap: the heap to be operated on
- * @p: the pointer to be inserted as topmost element replacement
- *
- * Returns the largest element in the heap. It removes this element from the
- * heap. The heap is rebalanced only once after the insertion. Returns NULL if
- * the heap is empty.
- *
- * This is the equivalent of calling heap_remove() and then heap_insert(), but
- * it only rebalances the heap once. It never allocates memory.
- */
-extern void *lttng_heap_replace_max(struct lttng_ptr_heap *heap, void *p);
-
-#endif /* _LTTNG_PRIO_HEAP_H */
libringbuffer_la_SOURCES = \
ring_buffer_backend.c \
ring_buffer_frontend.c \
- ring_buffer_iterator.c \
- ring_buffer_vfs.c \
- ring_buffer_splice.c \
- ring_buffer_mmap.c \
- ../libprio_heap/lttng_prio_heap.c
+ ring_buffer_abi.c
libringbuffer_la_LDFLAGS = -no-undefined -version-info 0:0:0
* the reader in flight recorder mode.
*/
+#include <unistd.h>
+
+#include "ust/core.h"
+
/* Internal helpers */
#include "backend_internal.h"
#include "frontend_internal.h"
extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb,
size_t offset, void *dest, size_t len);
-extern int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
- size_t offset, void __user *dest,
- size_t len);
-
extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
size_t offset, void *dest, size_t len);
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
id = bufb->buf_wsb[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
return records_unread;
}
-ssize_t lib_ring_buffer_file_splice_read(struct file *in, loff_t *ppos,
- struct pipe_inode_info *pipe,
- size_t len, unsigned int flags);
-loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin);
-
#endif /* _LINUX_RING_BUFFER_BACKEND_H */
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <unistd.h>
+#include <urcu/compiler.h>
+
#include "config.h"
#include "backend_types.h"
#include "frontend_types.h"
* sampling and subbuffer ID exchange).
*/
-#define HALF_ULONG_BITS (BITS_PER_LONG >> 1)
+#define HALF_ULONG_BITS (CAA_BITS_PER_LONG >> 1)
#define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1)
#define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT)
tmp |= offset << SB_ID_OFFSET_SHIFT;
tmp |= SB_ID_NOREF_MASK;
/* Volatile store, read concurrently by readers. */
- ACCESS_ONCE(*id) = tmp;
+ CMM_ACCESS_ONCE(*id) = tmp;
}
}
* Performing a volatile access to read the sb_pages, because we want to
* read a coherent version of the pointer and the associated noref flag.
*/
- id = ACCESS_ONCE(bufb->buf_wsb[idx].id);
+ id = CMM_ACCESS_ONCE(bufb->buf_wsb[idx].id);
for (;;) {
/* This check is called on the fast path for each record. */
if (likely(!subbuffer_id_is_noref(config, id))) {
}
new_id = id;
subbuffer_id_clear_noref(config, &new_id);
- new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
+ new_id = uatomic_cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
if (likely(new_id == id))
break;
id = new_id;
* Memory barrier that ensures counter stores are ordered before set
* noref and offset.
*/
- smp_mb();
+ cmm_smp_mb();
subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset);
}
if (config->mode == RING_BUFFER_OVERWRITE) {
/*
* Exchange the target writer subbuffer with our own unused
- * subbuffer. No need to use ACCESS_ONCE() here to read the
+ * subbuffer. No need to use CMM_ACCESS_ONCE() here to read the
* old_wpage, because the value read will be confirmed by the
* following cmpxchg().
*/
!subbuffer_id_is_noref(config, bufb->buf_rsb.id));
subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id,
consumed_count);
- new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
+ new_id = uatomic_cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
bufb->buf_rsb.id);
if (unlikely(old_id != new_id))
return -EAGAIN;
unsigned long num_subbuf; /* Number of sub-buffers for writer */
u64 start_tsc; /* Channel creation TSC value */
void *priv; /* Client-specific information */
- struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */
const struct lib_ring_buffer_config *config; /* Ring buffer configuration */
- cpumask_var_t cpumask; /* Allocated per-cpu buffers cpumask */
char name[NAME_MAX]; /* Channel name */
};
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <urcu/compiler.h>
+#include <urcu/uatomic.h>
+
/* Internal helpers */
#include "frontend_internal.h"
#define for_each_channel_cpu(cpu, chan) \
for ((cpu) = -1; \
({ (cpu) = cpumask_next(cpu, (chan)->backend.cpumask); \
- smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });)
+ cmm_smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });)
extern struct lib_ring_buffer *channel_get_ring_buffer(
const struct lib_ring_buffer_config *config,
unsigned long lib_ring_buffer_get_consumed(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
- return atomic_long_read(&buf->consumed);
+ return uatomic_read(&buf->consumed);
}
/*
int lib_ring_buffer_is_finalized(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf)
{
- int finalized = ACCESS_ONCE(buf->finalized);
+ int finalized = CMM_ACCESS_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
- smp_rmb();
+ cmm_smp_rmb();
return finalized;
}
static inline
int lib_ring_buffer_channel_is_disabled(const struct channel *chan)
{
- return atomic_read(&chan->record_disabled);
+ return uatomic_read(&chan->record_disabled);
}
static inline
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <urcu/compiler.h>
+
#include "config.h"
#include "backend_types.h"
#include "frontend_types.h"
-#include "../lib_prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */
/* Buffer offset macros */
* last_tsc atomically.
*/
-#if (BITS_PER_LONG == 32)
+#if (CAA_BITS_PER_LONG == 32)
static inline
void save_last_tsc(const struct lib_ring_buffer_config *config,
struct lib_ring_buffer *buf, u64 tsc)
unsigned long consumed_old, consumed_new;
do {
- consumed_old = atomic_long_read(&buf->consumed);
+ consumed_old = uatomic_read(&buf->consumed);
/*
* If buffer is in overwrite mode, push the reader consumed
* count if the write position has reached it and we are not
consumed_new = subbuf_align(consumed_old, chan);
else
return;
- } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
+ } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
consumed_new) != consumed_old));
}
{
unsigned long consumed_old, consumed_idx, commit_count, write_offset;
- consumed_old = atomic_long_read(&buf->consumed);
+ consumed_old = uatomic_read(&buf->consumed);
consumed_idx = subbuf_index(consumed_old, chan);
commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
/*
* respect to writers coming into the subbuffer after
* wrap around, and also order wrt concurrent readers.
*/
- smp_mb();
+ cmm_smp_mb();
/* End of exclusive subbuffer access */
v_set(config, &buf->commit_cold[idx].cc_sb,
commit_count);
* RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
*/
if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
- && atomic_long_read(&buf->active_readers)
+ && uatomic_read(&buf->active_readers)
&& lib_ring_buffer_poll_deliver(config, buf, chan)) {
- wake_up_interruptible(&buf->read_wait);
- wake_up_interruptible(&chan->read_wait);
+ //wake_up_interruptible(&buf->read_wait);
+ //wake_up_interruptible(&chan->read_wait);
}
}
extern void lib_ring_buffer_free(struct lib_ring_buffer *buf);
/* Keep track of trap nesting inside ring buffer code */
-DECLARE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
+extern __thread unsigned int lib_ring_buffer_nesting;
#endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <urcu/list.h>
+#include <urcu/uatomic.h>
+#include <urcu/ref.h>
+
+#include "ust/core.h"
+
+#include "usterr_signal_safe.h"
#include "config.h"
#include "backend_types.h"
-#include "../lib_prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */
/*
* A switch is done during tracing or as a final flush after tracing (so it
*/
enum switch_mode { SWITCH_ACTIVE, SWITCH_FLUSH };
-/* channel-level read-side iterator */
-struct channel_iter {
- /* Prio heap of buffers. Lowest timestamps at the top. */
- struct lttng_ptr_heap heap; /* Heap of struct lib_ring_buffer ptrs */
- struct list_head empty_head; /* Empty buffers linked-list head */
- int read_open; /* Opened for reading ? */
- u64 last_qs; /* Last quiescent state timestamp */
- u64 last_timestamp; /* Last timestamp (for WARN_ON) */
- int last_cpu; /* Last timestamp cpu */
- /*
- * read() file operation state.
- */
- unsigned long len_left;
-};
-
/* channel: collection of per-cpu ring buffers. */
struct channel {
- atomic_t record_disabled;
+ int record_disabled;
unsigned long commit_count_mask; /*
* Commit count mask, removing
* the MSBs corresponding to
unsigned long switch_timer_interval; /* Buffer flush (jiffies) */
unsigned long read_timer_interval; /* Reader wakeup (jiffies) */
- struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */
- struct notifier_block tick_nohz_notifier; /* CPU nohz notifier */
- struct notifier_block hp_iter_notifier; /* hotplug iterator notifier */
- int cpu_hp_enable:1; /* Enable CPU hotplug notif. */
- int hp_iter_enable:1; /* Enable hp iter notif. */
- wait_queue_head_t read_wait; /* reader wait queue */
- wait_queue_head_t hp_wait; /* CPU hotplug wait queue */
+ //wait_queue_head_t read_wait; /* reader wait queue */
int finalized; /* Has channel been finalized */
- struct channel_iter iter; /* Channel read-side iterator */
- struct kref ref; /* Reference count */
+ struct urcu_ref ref; /* Reference count */
};
/* Per-subbuffer commit counters used on the hot path */
union v_atomic cc_sb; /* Incremented _once_ at sb switch */
};
-/* Per-buffer read iterator */
-struct lib_ring_buffer_iter {
- u64 timestamp; /* Current record timestamp */
- size_t header_len; /* Current record header length */
- size_t payload_len; /* Current record payload length */
-
- struct list_head empty_node; /* Linked list of empty buffers */
- unsigned long consumed, read_offset, data_size;
- enum {
- ITER_GET_SUBBUF = 0,
- ITER_TEST_RECORD,
- ITER_NEXT_RECORD,
- ITER_PUT_SUBBUF,
- } state;
- int allocated:1;
- int read_open:1; /* Opened for reading ? */
-};
-
/* ring buffer state */
struct lib_ring_buffer {
/* First 32 bytes cache-hot cacheline */
union v_atomic offset; /* Current offset in the buffer */
struct commit_counters_hot *commit_hot;
/* Commit count per sub-buffer */
- atomic_long_t consumed; /*
+ long consumed; /*
* Current offset in the buffer
* standard atomic access (shared)
*/
- atomic_t record_disabled;
+ int record_disabled;
/* End of first 32 bytes cacheline */
union v_atomic last_tsc; /*
* Last timestamp written in the buffer.
struct commit_counters_cold *commit_cold;
/* Commit count per sub-buffer */
- atomic_long_t active_readers; /*
+ long active_readers; /*
* Active readers count
* standard atomic access (shared)
*/
union v_atomic records_lost_big; /* Events too big */
union v_atomic records_count; /* Number of records written */
union v_atomic records_overrun; /* Number of overwritten records */
- wait_queue_head_t read_wait; /* reader buffer-level wait queue */
+ //wait_queue_head_t read_wait; /* reader buffer-level wait queue */
int finalized; /* buffer has been finalized */
- struct timer_list switch_timer; /* timer for periodical switch */
- struct timer_list read_timer; /* timer for read poll */
- raw_spinlock_t raw_tick_nohz_spinlock; /* nohz entry lock/trylock */
- struct lib_ring_buffer_iter iter; /* read-side iterator */
+ //struct timer_list switch_timer; /* timer for periodical switch */
+ //struct timer_list read_timer; /* timer for read poll */
unsigned long get_subbuf_consumed; /* Read-side consumed */
unsigned long prod_snapshot; /* Producer count snapshot */
unsigned long cons_snapshot; /* Consumer count snapshot */
int _____ret = unlikely(cond); \
if (_____ret) { \
if (__same_type(*(c), struct channel_backend)) \
- __chan = container_of((void *) (c), \
+ __chan = caa_container_of((void *) (c), \
struct channel, \
backend); \
else if (__same_type(*(c), struct channel)) \
__chan = (void *) (c); \
else \
BUG_ON(1); \
- atomic_inc(&__chan->record_disabled); \
+ uatomic_inc(&__chan->record_disabled); \
WARN_ON(1); \
} \
_____ret; \
--- /dev/null
+/*
+ * ring_buffer_vfs.c
+ *
+ * Copyright (C) 2009-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Ring Buffer VFS file operations.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "backend.h"
+#include "frontend.h"
+#include "vfs.h"
+
+static int put_ulong(unsigned long val, unsigned long arg)
+{
+ return put_user(val, (unsigned long __user *)arg);
+}
+
+#ifdef CONFIG_COMPAT
+static int compat_put_ulong(compat_ulong_t val, unsigned long arg)
+{
+ return put_user(val, (compat_ulong_t __user *)compat_ptr(arg));
+}
+#endif
+
+/**
+ * lib_ring_buffer_open - ring buffer open file operation
+ * @inode: opened inode
+ * @file: opened file
+ *
+ * Open implementation. Makes sure only one open instance of a buffer is
+ * done at a given moment.
+ */
+int lib_ring_buffer_open(struct inode *inode, struct file *file)
+{
+ struct lib_ring_buffer *buf = inode->i_private;
+ int ret;
+
+ ret = lib_ring_buffer_open_read(buf);
+ if (ret)
+ return ret;
+
+ file->private_data = buf;
+ ret = nonseekable_open(inode, file);
+ if (ret)
+ goto release_read;
+ return 0;
+
+release_read:
+ lib_ring_buffer_release_read(buf);
+ return ret;
+}
+
+/**
+ * lib_ring_buffer_release - ring buffer release file operation
+ * @inode: opened inode
+ * @file: opened file
+ *
+ * Release implementation.
+ */
+int lib_ring_buffer_release(struct inode *inode, struct file *file)
+{
+ struct lib_ring_buffer *buf = file->private_data;
+
+ lib_ring_buffer_release_read(buf);
+
+ return 0;
+}
+
+/**
+ * lib_ring_buffer_poll - ring buffer poll file operation
+ * @filp: the file
+ * @wait: poll table
+ *
+ * Poll implementation.
+ */
+unsigned int lib_ring_buffer_poll(struct file *filp, poll_table *wait)
+{
+ unsigned int mask = 0;
+ struct lib_ring_buffer *buf = filp->private_data;
+ struct channel *chan = buf->backend.chan;
+ const struct lib_ring_buffer_config *config = chan->backend.config;
+ int finalized, disabled;
+
+ if (filp->f_mode & FMODE_READ) {
+ poll_wait_set_exclusive(wait);
+ poll_wait(filp, &buf->read_wait, wait);
+
+ finalized = lib_ring_buffer_is_finalized(config, buf);
+ disabled = lib_ring_buffer_channel_is_disabled(chan);
+
+ /*
+ * lib_ring_buffer_is_finalized() contains a smp_rmb() ordering
+ * finalized load before offsets loads.
+ */
+ WARN_ON(atomic_long_read(&buf->active_readers) != 1);
+retry:
+ if (disabled)
+ return POLLERR;
+
+ if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf), chan)
+ - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf), chan)
+ == 0) {
+ if (finalized)
+ return POLLHUP;
+ else {
+ /*
+ * The memory barriers
+ * __wait_event()/wake_up_interruptible() take
+ * care of "raw_spin_is_locked" memory ordering.
+ */
+ if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
+ goto retry;
+ else
+ return 0;
+ }
+ } else {
+ if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf),
+ chan)
+ - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf),
+ chan)
+ >= chan->backend.buf_size)
+ return POLLPRI | POLLRDBAND;
+ else
+ return POLLIN | POLLRDNORM;
+ }
+ }
+ return mask;
+}
+
+/**
+ * lib_ring_buffer_ioctl - control ring buffer reader synchronization
+ *
+ * @filp: the file
+ * @cmd: the command
+ * @arg: command arg
+ *
+ * This ioctl implements commands necessary for producer/consumer
+ * and flight recorder reader interaction :
+ * RING_BUFFER_GET_NEXT_SUBBUF
+ * Get the next sub-buffer that can be read. It never blocks.
+ * RING_BUFFER_PUT_NEXT_SUBBUF
+ * Release the currently read sub-buffer.
+ * RING_BUFFER_GET_SUBBUF_SIZE
+ * returns the size of the current sub-buffer.
+ * RING_BUFFER_GET_MAX_SUBBUF_SIZE
+ * returns the maximum size for sub-buffers.
+ * RING_BUFFER_GET_NUM_SUBBUF
+ * returns the number of reader-visible sub-buffers in the per cpu
+ * channel (for mmap).
+ * RING_BUFFER_GET_MMAP_READ_OFFSET
+ * returns the offset of the subbuffer belonging to the reader.
+ * Should only be used for mmap clients.
+ */
+long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
+{
+ struct lib_ring_buffer *buf = filp->private_data;
+ struct channel *chan = buf->backend.chan;
+ const struct lib_ring_buffer_config *config = chan->backend.config;
+
+ if (lib_ring_buffer_channel_is_disabled(chan))
+ return -EIO;
+
+ switch (cmd) {
+ case RING_BUFFER_SNAPSHOT:
+ return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+ &buf->prod_snapshot);
+ case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
+ return put_ulong(buf->cons_snapshot, arg);
+ case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
+ return put_ulong(buf->prod_snapshot, arg);
+ case RING_BUFFER_GET_SUBBUF:
+ {
+ unsigned long uconsume;
+ long ret;
+
+ ret = get_user(uconsume, (unsigned long __user *) arg);
+ if (ret)
+ return ret; /* will return -EFAULT */
+ ret = lib_ring_buffer_get_subbuf(buf, uconsume);
+ if (!ret) {
+ /* Set file position to zero at each successful "get" */
+ filp->f_pos = 0;
+ }
+ return ret;
+ }
+ case RING_BUFFER_PUT_SUBBUF:
+ lib_ring_buffer_put_subbuf(buf);
+ return 0;
+
+ case RING_BUFFER_GET_NEXT_SUBBUF:
+ {
+ long ret;
+
+ ret = lib_ring_buffer_get_next_subbuf(buf);
+ if (!ret) {
+ /* Set file position to zero at each successful "get" */
+ filp->f_pos = 0;
+ }
+ return ret;
+ }
+ case RING_BUFFER_PUT_NEXT_SUBBUF:
+ lib_ring_buffer_put_next_subbuf(buf);
+ return 0;
+ case RING_BUFFER_GET_SUBBUF_SIZE:
+ return put_ulong(lib_ring_buffer_get_read_data_size(config, buf),
+ arg);
+ case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
+ {
+ unsigned long size;
+
+ size = lib_ring_buffer_get_read_data_size(config, buf);
+ size = PAGE_ALIGN(size);
+ return put_ulong(size, arg);
+ }
+ case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
+ return put_ulong(chan->backend.subbuf_size, arg);
+ case RING_BUFFER_GET_MMAP_LEN:
+ {
+ unsigned long mmap_buf_len;
+
+ if (config->output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ mmap_buf_len = chan->backend.buf_size;
+ if (chan->backend.extra_reader_sb)
+ mmap_buf_len += chan->backend.subbuf_size;
+ if (mmap_buf_len > INT_MAX)
+ return -EFBIG;
+ return put_ulong(mmap_buf_len, arg);
+ }
+ case RING_BUFFER_GET_MMAP_READ_OFFSET:
+ {
+ unsigned long sb_bindex;
+
+ if (config->output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ sb_bindex = subbuffer_id_get_index(config,
+ buf->backend.buf_rsb.id);
+ return put_ulong(buf->backend.array[sb_bindex]->mmap_offset,
+ arg);
+ }
+ case RING_BUFFER_FLUSH:
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ return 0;
+ default:
+ return -ENOIOCTLCMD;
+ }
+}
+
+#ifdef CONFIG_COMPAT
+long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
+ unsigned long arg)
+{
+ struct lib_ring_buffer *buf = filp->private_data;
+ struct channel *chan = buf->backend.chan;
+ const struct lib_ring_buffer_config *config = chan->backend.config;
+
+ if (lib_ring_buffer_channel_is_disabled(chan))
+ return -EIO;
+
+ switch (cmd) {
+ case RING_BUFFER_SNAPSHOT:
+ return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+ &buf->prod_snapshot);
+ case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
+ return compat_put_ulong(buf->cons_snapshot, arg);
+ case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
+ return compat_put_ulong(buf->prod_snapshot, arg);
+ case RING_BUFFER_GET_SUBBUF:
+ {
+ __u32 uconsume;
+ unsigned long consume;
+ long ret;
+
+ ret = get_user(uconsume, (__u32 __user *) arg);
+ if (ret)
+ return ret; /* will return -EFAULT */
+ consume = buf->cons_snapshot;
+ consume &= ~0xFFFFFFFFL;
+ consume |= uconsume;
+ ret = lib_ring_buffer_get_subbuf(buf, consume);
+ if (!ret) {
+ /* Set file position to zero at each successful "get" */
+ filp->f_pos = 0;
+ }
+ return ret;
+ }
+ case RING_BUFFER_PUT_SUBBUF:
+ lib_ring_buffer_put_subbuf(buf);
+ return 0;
+
+ case RING_BUFFER_GET_NEXT_SUBBUF:
+ {
+ long ret;
+
+ ret = lib_ring_buffer_get_next_subbuf(buf);
+ if (!ret) {
+ /* Set file position to zero at each successful "get" */
+ filp->f_pos = 0;
+ }
+ return ret;
+ }
+ case RING_BUFFER_PUT_NEXT_SUBBUF:
+ lib_ring_buffer_put_next_subbuf(buf);
+ return 0;
+ case RING_BUFFER_GET_SUBBUF_SIZE:
+ {
+ unsigned long data_size;
+
+ data_size = lib_ring_buffer_get_read_data_size(config, buf);
+ if (data_size > UINT_MAX)
+ return -EFBIG;
+ return put_ulong(data_size, arg);
+ }
+ case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
+ {
+ unsigned long size;
+
+ size = lib_ring_buffer_get_read_data_size(config, buf);
+ size = PAGE_ALIGN(size);
+ if (size > UINT_MAX)
+ return -EFBIG;
+ return put_ulong(size, arg);
+ }
+ case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
+ if (chan->backend.subbuf_size > UINT_MAX)
+ return -EFBIG;
+ return put_ulong(chan->backend.subbuf_size, arg);
+ case RING_BUFFER_GET_MMAP_LEN:
+ {
+ unsigned long mmap_buf_len;
+
+ if (config->output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ mmap_buf_len = chan->backend.buf_size;
+ if (chan->backend.extra_reader_sb)
+ mmap_buf_len += chan->backend.subbuf_size;
+ if (mmap_buf_len > UINT_MAX)
+ return -EFBIG;
+ return put_ulong(mmap_buf_len, arg);
+ }
+ case RING_BUFFER_GET_MMAP_READ_OFFSET:
+ {
+ unsigned long sb_bindex, read_offset;
+
+ if (config->output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ sb_bindex = subbuffer_id_get_index(config,
+ buf->backend.buf_rsb.id);
+ read_offset = buf->backend.array[sb_bindex]->mmap_offset;
+ if (read_offset > UINT_MAX)
+ return -EINVAL;
+ return put_ulong(read_offset, arg);
+ }
+ case RING_BUFFER_FLUSH:
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ return 0;
+ default:
+ return -ENOIOCTLCMD;
+ }
+}
+#endif
+
+const struct file_operations lib_ring_buffer_file_operations = {
+ .open = lib_ring_buffer_open,
+ .release = lib_ring_buffer_release,
+ .poll = lib_ring_buffer_poll,
+ .splice_read = lib_ring_buffer_splice_read,
+ .mmap = lib_ring_buffer_mmap,
+ .unlocked_ioctl = lib_ring_buffer_ioctl,
+ .llseek = lib_ring_buffer_no_llseek,
+#ifdef CONFIG_COMPAT
+ .compat_ioctl = lib_ring_buffer_compat_ioctl,
+#endif
+};
+EXPORT_SYMBOL_GPL(lib_ring_buffer_file_operations);
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Ring Buffer Library VFS");
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <urcu/arch.h>
+
+#include "ust/core.h"
+
#include "config.h"
#include "backend.h"
#include "frontend.h"
void **virt;
unsigned long i;
- num_pages = size >> PAGE_SHIFT;
+ num_pages = size >> get_count_order(PAGE_SIZE);
num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
subbuf_size = chanb->subbuf_size;
num_subbuf_alloc = num_subbuf;
num_subbuf_alloc++;
}
- pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ pages = malloc_align(sizeof(*pages) * num_pages);
if (unlikely(!pages))
goto pages_error;
- virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ virt = malloc_align(sizeof(*virt) * num_pages);
if (unlikely(!virt))
goto virt_error;
- bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
- * num_subbuf_alloc,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ bufb->array = malloc_align(sizeof(*bufb->array) * num_subbuf_alloc);
if (unlikely(!bufb->array))
goto array_error;
/* Allocate backend pages array elements */
for (i = 0; i < num_subbuf_alloc; i++) {
bufb->array[i] =
- kzalloc_node(ALIGN(
+ zmalloc_align(
sizeof(struct lib_ring_buffer_backend_pages) +
sizeof(struct lib_ring_buffer_backend_page)
- * num_pages_per_subbuf,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ * num_pages_per_subbuf);
if (!bufb->array[i])
goto free_array;
}
/* Allocate write-side subbuffer table */
- bufb->buf_wsb = kzalloc_node(ALIGN(
+ bufb->buf_wsb = zmalloc_align(
sizeof(struct lib_ring_buffer_backend_subbuffer)
- * num_subbuf,
- 1 << INTERNODE_CACHE_SHIFT),
- GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+ * num_subbuf);
if (unlikely(!bufb->buf_wsb))
goto free_array;
}
}
- /*
- * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
- * will not fault.
- */
- wrapper_vmalloc_sync_all();
kfree(virt);
kfree(pages);
return 0;
{
const struct lib_ring_buffer_config *config = chanb->config;
- bufb->chan = container_of(chanb, struct channel, backend);
+ bufb->chan = caa_container_of(chanb, struct channel, backend);
bufb->cpu = cpu;
return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
*/
void channel_backend_reset(struct channel_backend *chanb)
{
- struct channel *chan = container_of(chanb, struct channel, backend);
+ struct channel *chan = caa_container_of(chanb, struct channel, backend);
const struct lib_ring_buffer_config *config = chanb->config;
/*
void *hcpu)
{
unsigned int cpu = (unsigned long)hcpu;
- struct channel_backend *chanb = container_of(nb, struct channel_backend,
+ struct channel_backend *chanb = caa_container_of(nb, struct channel_backend,
cpu_hp_notifier);
const struct lib_ring_buffer_config *config = chanb->config;
struct lib_ring_buffer *buf;
const struct lib_ring_buffer_config *config,
void *priv, size_t subbuf_size, size_t num_subbuf)
{
- struct channel *chan = container_of(chanb, struct channel, backend);
+ struct channel *chan = caa_container_of(chanb, struct channel, backend);
unsigned int i;
int ret;
src += pagecpy;
offset += pagecpy;
sbidx = offset >> chanb->subbuf_size_order;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
/*
* Underlying layer should never ask for writes across
orig_len = len;
offset &= chanb->buf_size - 1;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
if (unlikely(!len))
return 0;
for (;;) {
break;
dest += pagecpy;
offset += pagecpy;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
/*
* Underlying layer should never ask for reads across
* subbuffers.
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
-/**
- * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
- * @bufb : buffer backend
- * @offset : offset within the buffer
- * @dest : destination userspace address
- * @len : length to copy to destination
- *
- * Should be protected by get_subbuf/put_subbuf.
- * access_ok() must have been performed on dest addresses prior to call this
- * function.
- * Returns -EFAULT on error, 0 if ok.
- */
-int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
- size_t offset, void __user *dest, size_t len)
-{
- struct channel_backend *chanb = &bufb->chan->backend;
- const struct lib_ring_buffer_config *config = chanb->config;
- size_t index;
- ssize_t pagecpy, orig_len;
- struct lib_ring_buffer_backend_pages *rpages;
- unsigned long sb_bindex, id;
-
- orig_len = len;
- offset &= chanb->buf_size - 1;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
- if (unlikely(!len))
- return 0;
- for (;;) {
- pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
- id = bufb->buf_rsb.id;
- sb_bindex = subbuffer_id_get_index(config, id);
- rpages = bufb->array[sb_bindex];
- CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
- && subbuffer_id_is_noref(config, id));
- if (__copy_to_user(dest,
- rpages->p[index].virt + (offset & ~PAGE_MASK),
- pagecpy))
- return -EFAULT;
- len -= pagecpy;
- if (likely(!len))
- break;
- dest += pagecpy;
- offset += pagecpy;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
- /*
- * Underlying layer should never ask for reads across
- * subbuffers.
- */
- CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
- }
- return 0;
-}
-EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
-
/**
* lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
* @bufb : buffer backend
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
orig_offset = offset;
for (;;) {
id = bufb->buf_rsb.id;
len -= pagecpy;
}
offset += strpagelen;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
if (strpagelen < pagelen)
break;
/*
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
rpages = bufb->array[sb_bindex];
unsigned long sb_bindex, id;
offset &= chanb->buf_size - 1;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
id = bufb->buf_rsb.id;
sb_bindex = subbuffer_id_get_index(config, id);
rpages = bufb->array[sb_bindex];
offset &= chanb->buf_size - 1;
sbidx = offset >> chanb->subbuf_size_order;
- index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+ index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
id = bufb->buf_wsb[sbidx].id;
sb_bindex = subbuffer_id_get_index(config, id);
rpages = bufb->array[sb_bindex];
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <urcu/compiler.h>
+
#include "config.h"
#include "backend.h"
#include "frontend.h"
struct channel_backend *chanb, int cpu)
{
const struct lib_ring_buffer_config *config = chanb->config;
- struct channel *chan = container_of(chanb, struct channel, backend);
+ struct channel *chan = caa_container_of(chanb, struct channel, backend);
void *priv = chanb->priv;
unsigned int num_subbuf;
size_t subbuf_header_size;
void *hcpu)
{
unsigned int cpu = (unsigned long)hcpu;
- struct channel *chan = container_of(nb, struct channel,
+ struct channel *chan = caa_container_of(nb, struct channel,
cpu_hp_notifier);
struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
const struct lib_ring_buffer_config *config = chan->backend.config;
unsigned long val,
void *data)
{
- struct channel *chan = container_of(nb, struct channel,
+ struct channel *chan = caa_container_of(nb, struct channel,
tick_nohz_notifier);
const struct lib_ring_buffer_config *config = chan->backend.config;
struct lib_ring_buffer *buf;
static
void channel_release(struct kref *kref)
{
- struct channel *chan = container_of(kref, struct channel, ref);
+ struct channel *chan = caa_container_of(kref, struct channel, ref);
channel_free(chan);
}
* Perform flush before writing to finalized.
*/
smp_wmb();
- ACCESS_ONCE(buf->finalized) = 1;
+ CMM_ACCESS_ONCE(buf->finalized) = 1;
wake_up_interruptible(&buf->read_wait);
}
} else {
* Perform flush before writing to finalized.
*/
smp_wmb();
- ACCESS_ONCE(buf->finalized) = 1;
+ CMM_ACCESS_ONCE(buf->finalized) = 1;
wake_up_interruptible(&buf->read_wait);
}
- ACCESS_ONCE(chan->finalized) = 1;
+ CMM_ACCESS_ONCE(chan->finalized) = 1;
wake_up_interruptible(&chan->hp_wait);
wake_up_interruptible(&chan->read_wait);
kref_put(&chan->ref, channel_release);
int finalized;
retry:
- finalized = ACCESS_ONCE(buf->finalized);
+ finalized = CMM_ACCESS_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
int finalized;
retry:
- finalized = ACCESS_ONCE(buf->finalized);
+ finalized = CMM_ACCESS_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
+++ /dev/null
-/*
- * ring_buffer_iterator.c
- *
- * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Ring buffer and channel iterators. Get each event of a channel in order. Uses
- * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
- * complexity for the "get next event" operation.
- *
- * Author:
- * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include "iterator.h"
-
-/*
- * Safety factor taking into account internal kernel interrupt latency.
- * Assuming 250ms worse-case latency.
- */
-#define MAX_SYSTEM_LATENCY 250
-
-/*
- * Maximum delta expected between trace clocks. At most 1 jiffy delta.
- */
-#define MAX_CLOCK_DELTA (jiffies_to_usecs(1) * 1000)
-
-/**
- * lib_ring_buffer_get_next_record - Get the next record in a buffer.
- * @chan: channel
- * @buf: buffer
- *
- * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
- * buffer is empty and finalized. The buffer must already be opened for reading.
- */
-ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
- struct lib_ring_buffer *buf)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- struct lib_ring_buffer_iter *iter = &buf->iter;
- int ret;
-
-restart:
- switch (iter->state) {
- case ITER_GET_SUBBUF:
- ret = lib_ring_buffer_get_next_subbuf(buf);
- if (ret && !ACCESS_ONCE(buf->finalized)
- && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
- /*
- * Use "pull" scheme for global buffers. The reader
- * itself flushes the buffer to "pull" data not visible
- * to readers yet. Flush current subbuffer and re-try.
- *
- * Per-CPU buffers rather use a "push" scheme because
- * the IPI needed to flush all CPU's buffers is too
- * costly. In the "push" scheme, the reader waits for
- * the writer periodic deferrable timer to flush the
- * buffers (keeping track of a quiescent state
- * timestamp). Therefore, the writer "pushes" data out
- * of the buffers rather than letting the reader "pull"
- * data from the buffer.
- */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
- ret = lib_ring_buffer_get_next_subbuf(buf);
- }
- if (ret)
- return ret;
- iter->consumed = buf->cons_snapshot;
- iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
- iter->read_offset = iter->consumed;
- /* skip header */
- iter->read_offset += config->cb.subbuffer_header_size();
- iter->state = ITER_TEST_RECORD;
- goto restart;
- case ITER_TEST_RECORD:
- if (iter->read_offset - iter->consumed >= iter->data_size) {
- iter->state = ITER_PUT_SUBBUF;
- } else {
- CHAN_WARN_ON(chan, !config->cb.record_get);
- config->cb.record_get(config, chan, buf,
- iter->read_offset,
- &iter->header_len,
- &iter->payload_len,
- &iter->timestamp);
- iter->read_offset += iter->header_len;
- subbuffer_consume_record(config, &buf->backend);
- iter->state = ITER_NEXT_RECORD;
- return iter->payload_len;
- }
- goto restart;
- case ITER_NEXT_RECORD:
- iter->read_offset += iter->payload_len;
- iter->state = ITER_TEST_RECORD;
- goto restart;
- case ITER_PUT_SUBBUF:
- lib_ring_buffer_put_next_subbuf(buf);
- iter->state = ITER_GET_SUBBUF;
- goto restart;
- default:
- CHAN_WARN_ON(chan, 1); /* Should not happen */
- return -EPERM;
- }
-}
-EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
-
-static int buf_is_higher(void *a, void *b)
-{
- struct lib_ring_buffer *bufa = a;
- struct lib_ring_buffer *bufb = b;
-
- /* Consider lowest timestamps to be at the top of the heap */
- return (bufa->iter.timestamp < bufb->iter.timestamp);
-}
-
-static
-void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
- struct channel *chan)
-{
- struct lttng_ptr_heap *heap = &chan->iter.heap;
- struct lib_ring_buffer *buf, *tmp;
- ssize_t len;
-
- list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
- iter.empty_node) {
- len = lib_ring_buffer_get_next_record(chan, buf);
-
- /*
- * Deal with -EAGAIN and -ENODATA.
- * len >= 0 means record contains data.
- * -EBUSY should never happen, because we support only one
- * reader.
- */
- switch (len) {
- case -EAGAIN:
- /* Keep node in empty list */
- break;
- case -ENODATA:
- /*
- * Buffer is finalized. Don't add to list of empty
- * buffer, because it has no more data to provide, ever.
- */
- list_del(&buf->iter.empty_node);
- break;
- case -EBUSY:
- CHAN_WARN_ON(chan, 1);
- break;
- default:
- /*
- * Insert buffer into the heap, remove from empty buffer
- * list.
- */
- CHAN_WARN_ON(chan, len < 0);
- list_del(&buf->iter.empty_node);
- CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
- }
- }
-}
-
-static
-void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
- struct channel *chan)
-{
- u64 timestamp_qs;
- unsigned long wait_msecs;
-
- /*
- * No need to wait if no empty buffers are present.
- */
- if (list_empty(&chan->iter.empty_head))
- return;
-
- timestamp_qs = config->cb.ring_buffer_clock_read(chan);
- /*
- * We need to consider previously empty buffers.
- * Do a get next buf record on each of them. Add them to
- * the heap if they have data. If at least one of them
- * don't have data, we need to wait for
- * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
- * buffers have been switched either by the timer or idle entry) and
- * check them again, adding them if they have data.
- */
- lib_ring_buffer_get_empty_buf_records(config, chan);
-
- /*
- * No need to wait if no empty buffers are present.
- */
- if (list_empty(&chan->iter.empty_head))
- return;
-
- /*
- * We need to wait for the buffer switch timer to run. If the
- * CPU is idle, idle entry performed the switch.
- * TODO: we could optimize further by skipping the sleep if all
- * empty buffers belong to idle or offline cpus.
- */
- wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
- wait_msecs += MAX_SYSTEM_LATENCY;
- msleep(wait_msecs);
- lib_ring_buffer_get_empty_buf_records(config, chan);
- /*
- * Any buffer still in the empty list here cannot possibly
- * contain an event with a timestamp prior to "timestamp_qs".
- * The new quiescent state timestamp is the one we grabbed
- * before waiting for buffer data. It is therefore safe to
- * ignore empty buffers up to last_qs timestamp for fusion
- * merge.
- */
- chan->iter.last_qs = timestamp_qs;
-}
-
-/**
- * channel_get_next_record - Get the next record in a channel.
- * @chan: channel
- * @ret_buf: the buffer in which the event is located (output)
- *
- * Returns the size of new current event, -EAGAIN if all buffers are empty,
- * -ENODATA if all buffers are empty and finalized. The channel must already be
- * opened for reading.
- */
-
-ssize_t channel_get_next_record(struct channel *chan,
- struct lib_ring_buffer **ret_buf)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- struct lib_ring_buffer *buf;
- struct lttng_ptr_heap *heap;
- ssize_t len;
-
- if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
- *ret_buf = channel_get_ring_buffer(config, chan, 0);
- return lib_ring_buffer_get_next_record(chan, *ret_buf);
- }
-
- heap = &chan->iter.heap;
-
- /*
- * get next record for topmost buffer.
- */
- buf = lttng_heap_maximum(heap);
- if (buf) {
- len = lib_ring_buffer_get_next_record(chan, buf);
- /*
- * Deal with -EAGAIN and -ENODATA.
- * len >= 0 means record contains data.
- */
- switch (len) {
- case -EAGAIN:
- buf->iter.timestamp = 0;
- list_add(&buf->iter.empty_node, &chan->iter.empty_head);
- /* Remove topmost buffer from the heap */
- CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
- break;
- case -ENODATA:
- /*
- * Buffer is finalized. Remove buffer from heap and
- * don't add to list of empty buffer, because it has no
- * more data to provide, ever.
- */
- CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
- break;
- case -EBUSY:
- CHAN_WARN_ON(chan, 1);
- break;
- default:
- /*
- * Reinsert buffer into the heap. Note that heap can be
- * partially empty, so we need to use
- * lttng_heap_replace_max().
- */
- CHAN_WARN_ON(chan, len < 0);
- CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
- break;
- }
- }
-
- buf = lttng_heap_maximum(heap);
- if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
- /*
- * Deal with buffers previously showing no data.
- * Add buffers containing data to the heap, update
- * last_qs.
- */
- lib_ring_buffer_wait_for_qs(config, chan);
- }
-
- *ret_buf = buf = lttng_heap_maximum(heap);
- if (buf) {
- /*
- * If this warning triggers, you probably need to check your
- * system interrupt latency. Typical causes: too many printk()
- * output going to a serial console with interrupts off.
- * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
- * Observed on SMP KVM setups with trace_clock().
- */
- if (chan->iter.last_timestamp
- > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
- printk(KERN_WARNING "ring_buffer: timestamps going "
- "backward. Last time %llu ns, cpu %d, "
- "current time %llu ns, cpu %d, "
- "delta %llu ns.\n",
- chan->iter.last_timestamp, chan->iter.last_cpu,
- buf->iter.timestamp, buf->backend.cpu,
- chan->iter.last_timestamp - buf->iter.timestamp);
- CHAN_WARN_ON(chan, 1);
- }
- chan->iter.last_timestamp = buf->iter.timestamp;
- chan->iter.last_cpu = buf->backend.cpu;
- return buf->iter.payload_len;
- } else {
- /* Heap is empty */
- if (list_empty(&chan->iter.empty_head))
- return -ENODATA; /* All buffers finalized */
- else
- return -EAGAIN; /* Temporarily empty */
- }
-}
-EXPORT_SYMBOL_GPL(channel_get_next_record);
-
-static
-void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
-{
- if (buf->iter.allocated)
- return;
-
- buf->iter.allocated = 1;
- if (chan->iter.read_open && !buf->iter.read_open) {
- CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
- buf->iter.read_open = 1;
- }
-
- /* Add to list of buffers without any current record */
- if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- list_add(&buf->iter.empty_node, &chan->iter.empty_head);
-}
-
-#ifdef CONFIG_HOTPLUG_CPU
-static
-int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb,
- unsigned long action,
- void *hcpu)
-{
- unsigned int cpu = (unsigned long)hcpu;
- struct channel *chan = container_of(nb, struct channel,
- hp_iter_notifier);
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
- const struct lib_ring_buffer_config *config = chan->backend.config;
-
- if (!chan->hp_iter_enable)
- return NOTIFY_DONE;
-
- CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
-
- switch (action) {
- case CPU_DOWN_FAILED:
- case CPU_DOWN_FAILED_FROZEN:
- case CPU_ONLINE:
- case CPU_ONLINE_FROZEN:
- lib_ring_buffer_iterator_init(chan, buf);
- return NOTIFY_OK;
- default:
- return NOTIFY_DONE;
- }
-}
-#endif
-
-int channel_iterator_init(struct channel *chan)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- struct lib_ring_buffer *buf;
-
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
- int cpu, ret;
-
- INIT_LIST_HEAD(&chan->iter.empty_head);
- ret = lttng_heap_init(&chan->iter.heap,
- num_possible_cpus(),
- GFP_KERNEL, buf_is_higher);
- if (ret)
- return ret;
- /*
- * In case of non-hotplug cpu, if the ring-buffer is allocated
- * in early initcall, it will not be notified of secondary cpus.
- * In that off case, we need to allocate for all possible cpus.
- */
-#ifdef CONFIG_HOTPLUG_CPU
- chan->hp_iter_notifier.notifier_call =
- channel_iterator_cpu_hotplug;
- chan->hp_iter_notifier.priority = 10;
- register_cpu_notifier(&chan->hp_iter_notifier);
- get_online_cpus();
- for_each_online_cpu(cpu) {
- buf = per_cpu_ptr(chan->backend.buf, cpu);
- lib_ring_buffer_iterator_init(chan, buf);
- }
- chan->hp_iter_enable = 1;
- put_online_cpus();
-#else
- for_each_possible_cpu(cpu) {
- buf = per_cpu_ptr(chan->backend.buf, cpu);
- lib_ring_buffer_iterator_init(chan, buf);
- }
-#endif
- } else {
- buf = channel_get_ring_buffer(config, chan, 0);
- lib_ring_buffer_iterator_init(chan, buf);
- }
- return 0;
-}
-
-void channel_iterator_unregister_notifiers(struct channel *chan)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
-
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
- chan->hp_iter_enable = 0;
- unregister_cpu_notifier(&chan->hp_iter_notifier);
- }
-}
-
-void channel_iterator_free(struct channel *chan)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
-
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_heap_free(&chan->iter.heap);
-}
-
-int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
-{
- struct channel *chan = buf->backend.chan;
- const struct lib_ring_buffer_config *config = chan->backend.config;
- CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
- return lib_ring_buffer_open_read(buf);
-}
-EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
-
-/*
- * Note: Iterators must not be mixed with other types of outputs, because an
- * iterator can leave the buffer in "GET" state, which is not consistent with
- * other types of output (mmap, splice, raw data read).
- */
-void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
-{
- lib_ring_buffer_release_read(buf);
-}
-EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
-
-int channel_iterator_open(struct channel *chan)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- struct lib_ring_buffer *buf;
- int ret = 0, cpu;
-
- CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
-
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
- get_online_cpus();
- /* Allow CPU hotplug to keep track of opened reader */
- chan->iter.read_open = 1;
- for_each_channel_cpu(cpu, chan) {
- buf = channel_get_ring_buffer(config, chan, cpu);
- ret = lib_ring_buffer_iterator_open(buf);
- if (ret)
- goto error;
- buf->iter.read_open = 1;
- }
- put_online_cpus();
- } else {
- buf = channel_get_ring_buffer(config, chan, 0);
- ret = lib_ring_buffer_iterator_open(buf);
- }
- return ret;
-error:
- /* Error should always happen on CPU 0, hence no close is required. */
- CHAN_WARN_ON(chan, cpu != 0);
- put_online_cpus();
- return ret;
-}
-EXPORT_SYMBOL_GPL(channel_iterator_open);
-
-void channel_iterator_release(struct channel *chan)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- struct lib_ring_buffer *buf;
- int cpu;
-
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
- get_online_cpus();
- for_each_channel_cpu(cpu, chan) {
- buf = channel_get_ring_buffer(config, chan, cpu);
- if (buf->iter.read_open) {
- lib_ring_buffer_iterator_release(buf);
- buf->iter.read_open = 0;
- }
- }
- chan->iter.read_open = 0;
- put_online_cpus();
- } else {
- buf = channel_get_ring_buffer(config, chan, 0);
- lib_ring_buffer_iterator_release(buf);
- }
-}
-EXPORT_SYMBOL_GPL(channel_iterator_release);
-
-void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
-{
- struct channel *chan = buf->backend.chan;
-
- if (buf->iter.state != ITER_GET_SUBBUF)
- lib_ring_buffer_put_next_subbuf(buf);
- buf->iter.state = ITER_GET_SUBBUF;
- /* Remove from heap (if present). */
- if (lttng_heap_cherrypick(&chan->iter.heap, buf))
- list_add(&buf->iter.empty_node, &chan->iter.empty_head);
- buf->iter.timestamp = 0;
- buf->iter.header_len = 0;
- buf->iter.payload_len = 0;
- buf->iter.consumed = 0;
- buf->iter.read_offset = 0;
- buf->iter.data_size = 0;
- /* Don't reset allocated and read_open */
-}
-
-void channel_iterator_reset(struct channel *chan)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- struct lib_ring_buffer *buf;
- int cpu;
-
- /* Empty heap, put into empty_head */
- while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
- list_add(&buf->iter.empty_node, &chan->iter.empty_head);
-
- for_each_channel_cpu(cpu, chan) {
- buf = channel_get_ring_buffer(config, chan, cpu);
- lib_ring_buffer_iterator_reset(buf);
- }
- /* Don't reset read_open */
- chan->iter.last_qs = 0;
- chan->iter.last_timestamp = 0;
- chan->iter.last_cpu = 0;
- chan->iter.len_left = 0;
-}
-
-/*
- * Ring buffer payload extraction read() implementation.
- */
-static
-ssize_t channel_ring_buffer_file_read(struct file *filp,
- char __user *user_buf,
- size_t count,
- loff_t *ppos,
- struct channel *chan,
- struct lib_ring_buffer *buf,
- int fusionmerge)
-{
- const struct lib_ring_buffer_config *config = chan->backend.config;
- size_t read_count = 0, read_offset;
- ssize_t len;
-
- might_sleep();
- if (!access_ok(VERIFY_WRITE, user_buf, count))
- return -EFAULT;
-
- /* Finish copy of previous record */
- if (*ppos != 0) {
- if (read_count < count) {
- len = chan->iter.len_left;
- read_offset = *ppos;
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
- && fusionmerge)
- buf = lttng_heap_maximum(&chan->iter.heap);
- CHAN_WARN_ON(chan, !buf);
- goto skip_get_next;
- }
- }
-
- while (read_count < count) {
- size_t copy_len, space_left;
-
- if (fusionmerge)
- len = channel_get_next_record(chan, &buf);
- else
- len = lib_ring_buffer_get_next_record(chan, buf);
-len_test:
- if (len < 0) {
- /*
- * Check if buffer is finalized (end of file).
- */
- if (len == -ENODATA) {
- /* A 0 read_count will tell about end of file */
- goto nodata;
- }
- if (filp->f_flags & O_NONBLOCK) {
- if (!read_count)
- read_count = -EAGAIN;
- goto nodata;
- } else {
- int error;
-
- /*
- * No data available at the moment, return what
- * we got.
- */
- if (read_count)
- goto nodata;
-
- /*
- * Wait for returned len to be >= 0 or -ENODATA.
- */
- if (fusionmerge)
- error = wait_event_interruptible(
- chan->read_wait,
- ((len = channel_get_next_record(chan,
- &buf)), len != -EAGAIN));
- else
- error = wait_event_interruptible(
- buf->read_wait,
- ((len = lib_ring_buffer_get_next_record(
- chan, buf)), len != -EAGAIN));
- CHAN_WARN_ON(chan, len == -EBUSY);
- if (error) {
- read_count = error;
- goto nodata;
- }
- CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
- goto len_test;
- }
- }
- read_offset = buf->iter.read_offset;
-skip_get_next:
- space_left = count - read_count;
- if (len <= space_left) {
- copy_len = len;
- chan->iter.len_left = 0;
- *ppos = 0;
- } else {
- copy_len = space_left;
- chan->iter.len_left = len - copy_len;
- *ppos = read_offset + copy_len;
- }
- if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
- &user_buf[read_count],
- copy_len)) {
- /*
- * Leave the len_left and ppos values at their current
- * state, as we currently have a valid event to read.
- */
- return -EFAULT;
- }
- read_count += copy_len;
- };
- return read_count;
-
-nodata:
- *ppos = 0;
- chan->iter.len_left = 0;
- return read_count;
-}
-
-/**
- * lib_ring_buffer_file_read - Read buffer record payload.
- * @filp: file structure pointer.
- * @buffer: user buffer to read data into.
- * @count: number of bytes to read.
- * @ppos: file read position.
- *
- * Returns a negative value on error, or the number of bytes read on success.
- * ppos is used to save the position _within the current record_ between calls
- * to read().
- */
-static
-ssize_t lib_ring_buffer_file_read(struct file *filp,
- char __user *user_buf,
- size_t count,
- loff_t *ppos)
-{
- struct inode *inode = filp->f_dentry->d_inode;
- struct lib_ring_buffer *buf = inode->i_private;
- struct channel *chan = buf->backend.chan;
-
- return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
- chan, buf, 0);
-}
-
-/**
- * channel_file_read - Read channel record payload.
- * @filp: file structure pointer.
- * @buffer: user buffer to read data into.
- * @count: number of bytes to read.
- * @ppos: file read position.
- *
- * Returns a negative value on error, or the number of bytes read on success.
- * ppos is used to save the position _within the current record_ between calls
- * to read().
- */
-static
-ssize_t channel_file_read(struct file *filp,
- char __user *user_buf,
- size_t count,
- loff_t *ppos)
-{
- struct inode *inode = filp->f_dentry->d_inode;
- struct channel *chan = inode->i_private;
- const struct lib_ring_buffer_config *config = chan->backend.config;
-
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- return channel_ring_buffer_file_read(filp, user_buf, count,
- ppos, chan, NULL, 1);
- else {
- struct lib_ring_buffer *buf =
- channel_get_ring_buffer(config, chan, 0);
- return channel_ring_buffer_file_read(filp, user_buf, count,
- ppos, chan, buf, 0);
- }
-}
-
-static
-int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
-{
- struct lib_ring_buffer *buf = inode->i_private;
- int ret;
-
- ret = lib_ring_buffer_iterator_open(buf);
- if (ret)
- return ret;
-
- file->private_data = buf;
- ret = nonseekable_open(inode, file);
- if (ret)
- goto release_iter;
- return 0;
-
-release_iter:
- lib_ring_buffer_iterator_release(buf);
- return ret;
-}
-
-static
-int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
-{
- struct lib_ring_buffer *buf = inode->i_private;
-
- lib_ring_buffer_iterator_release(buf);
- return 0;
-}
-
-static
-int channel_file_open(struct inode *inode, struct file *file)
-{
- struct channel *chan = inode->i_private;
- int ret;
-
- ret = channel_iterator_open(chan);
- if (ret)
- return ret;
-
- file->private_data = chan;
- ret = nonseekable_open(inode, file);
- if (ret)
- goto release_iter;
- return 0;
-
-release_iter:
- channel_iterator_release(chan);
- return ret;
-}
-
-static
-int channel_file_release(struct inode *inode, struct file *file)
-{
- struct channel *chan = inode->i_private;
-
- channel_iterator_release(chan);
- return 0;
-}
-
-const struct file_operations channel_payload_file_operations = {
- .open = channel_file_open,
- .release = channel_file_release,
- .read = channel_file_read,
- .llseek = lib_ring_buffer_no_llseek,
-};
-EXPORT_SYMBOL_GPL(channel_payload_file_operations);
-
-const struct file_operations lib_ring_buffer_payload_file_operations = {
- .open = lib_ring_buffer_file_open,
- .release = lib_ring_buffer_file_release,
- .read = lib_ring_buffer_file_read,
- .llseek = lib_ring_buffer_no_llseek,
-};
-EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
+++ /dev/null
-/*
- * ring_buffer_vfs.c
- *
- * Copyright (C) 2009-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Ring Buffer VFS file operations.
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include "backend.h"
-#include "frontend.h"
-#include "vfs.h"
-
-static int put_ulong(unsigned long val, unsigned long arg)
-{
- return put_user(val, (unsigned long __user *)arg);
-}
-
-#ifdef CONFIG_COMPAT
-static int compat_put_ulong(compat_ulong_t val, unsigned long arg)
-{
- return put_user(val, (compat_ulong_t __user *)compat_ptr(arg));
-}
-#endif
-
-/**
- * lib_ring_buffer_open - ring buffer open file operation
- * @inode: opened inode
- * @file: opened file
- *
- * Open implementation. Makes sure only one open instance of a buffer is
- * done at a given moment.
- */
-int lib_ring_buffer_open(struct inode *inode, struct file *file)
-{
- struct lib_ring_buffer *buf = inode->i_private;
- int ret;
-
- ret = lib_ring_buffer_open_read(buf);
- if (ret)
- return ret;
-
- file->private_data = buf;
- ret = nonseekable_open(inode, file);
- if (ret)
- goto release_read;
- return 0;
-
-release_read:
- lib_ring_buffer_release_read(buf);
- return ret;
-}
-
-/**
- * lib_ring_buffer_release - ring buffer release file operation
- * @inode: opened inode
- * @file: opened file
- *
- * Release implementation.
- */
-int lib_ring_buffer_release(struct inode *inode, struct file *file)
-{
- struct lib_ring_buffer *buf = file->private_data;
-
- lib_ring_buffer_release_read(buf);
-
- return 0;
-}
-
-/**
- * lib_ring_buffer_poll - ring buffer poll file operation
- * @filp: the file
- * @wait: poll table
- *
- * Poll implementation.
- */
-unsigned int lib_ring_buffer_poll(struct file *filp, poll_table *wait)
-{
- unsigned int mask = 0;
- struct lib_ring_buffer *buf = filp->private_data;
- struct channel *chan = buf->backend.chan;
- const struct lib_ring_buffer_config *config = chan->backend.config;
- int finalized, disabled;
-
- if (filp->f_mode & FMODE_READ) {
- poll_wait_set_exclusive(wait);
- poll_wait(filp, &buf->read_wait, wait);
-
- finalized = lib_ring_buffer_is_finalized(config, buf);
- disabled = lib_ring_buffer_channel_is_disabled(chan);
-
- /*
- * lib_ring_buffer_is_finalized() contains a smp_rmb() ordering
- * finalized load before offsets loads.
- */
- WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-retry:
- if (disabled)
- return POLLERR;
-
- if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf), chan)
- - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf), chan)
- == 0) {
- if (finalized)
- return POLLHUP;
- else {
- /*
- * The memory barriers
- * __wait_event()/wake_up_interruptible() take
- * care of "raw_spin_is_locked" memory ordering.
- */
- if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
- goto retry;
- else
- return 0;
- }
- } else {
- if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf),
- chan)
- - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf),
- chan)
- >= chan->backend.buf_size)
- return POLLPRI | POLLRDBAND;
- else
- return POLLIN | POLLRDNORM;
- }
- }
- return mask;
-}
-
-/**
- * lib_ring_buffer_ioctl - control ring buffer reader synchronization
- *
- * @filp: the file
- * @cmd: the command
- * @arg: command arg
- *
- * This ioctl implements commands necessary for producer/consumer
- * and flight recorder reader interaction :
- * RING_BUFFER_GET_NEXT_SUBBUF
- * Get the next sub-buffer that can be read. It never blocks.
- * RING_BUFFER_PUT_NEXT_SUBBUF
- * Release the currently read sub-buffer.
- * RING_BUFFER_GET_SUBBUF_SIZE
- * returns the size of the current sub-buffer.
- * RING_BUFFER_GET_MAX_SUBBUF_SIZE
- * returns the maximum size for sub-buffers.
- * RING_BUFFER_GET_NUM_SUBBUF
- * returns the number of reader-visible sub-buffers in the per cpu
- * channel (for mmap).
- * RING_BUFFER_GET_MMAP_READ_OFFSET
- * returns the offset of the subbuffer belonging to the reader.
- * Should only be used for mmap clients.
- */
-long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
-{
- struct lib_ring_buffer *buf = filp->private_data;
- struct channel *chan = buf->backend.chan;
- const struct lib_ring_buffer_config *config = chan->backend.config;
-
- if (lib_ring_buffer_channel_is_disabled(chan))
- return -EIO;
-
- switch (cmd) {
- case RING_BUFFER_SNAPSHOT:
- return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
- &buf->prod_snapshot);
- case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
- return put_ulong(buf->cons_snapshot, arg);
- case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
- return put_ulong(buf->prod_snapshot, arg);
- case RING_BUFFER_GET_SUBBUF:
- {
- unsigned long uconsume;
- long ret;
-
- ret = get_user(uconsume, (unsigned long __user *) arg);
- if (ret)
- return ret; /* will return -EFAULT */
- ret = lib_ring_buffer_get_subbuf(buf, uconsume);
- if (!ret) {
- /* Set file position to zero at each successful "get" */
- filp->f_pos = 0;
- }
- return ret;
- }
- case RING_BUFFER_PUT_SUBBUF:
- lib_ring_buffer_put_subbuf(buf);
- return 0;
-
- case RING_BUFFER_GET_NEXT_SUBBUF:
- {
- long ret;
-
- ret = lib_ring_buffer_get_next_subbuf(buf);
- if (!ret) {
- /* Set file position to zero at each successful "get" */
- filp->f_pos = 0;
- }
- return ret;
- }
- case RING_BUFFER_PUT_NEXT_SUBBUF:
- lib_ring_buffer_put_next_subbuf(buf);
- return 0;
- case RING_BUFFER_GET_SUBBUF_SIZE:
- return put_ulong(lib_ring_buffer_get_read_data_size(config, buf),
- arg);
- case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
- {
- unsigned long size;
-
- size = lib_ring_buffer_get_read_data_size(config, buf);
- size = PAGE_ALIGN(size);
- return put_ulong(size, arg);
- }
- case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
- return put_ulong(chan->backend.subbuf_size, arg);
- case RING_BUFFER_GET_MMAP_LEN:
- {
- unsigned long mmap_buf_len;
-
- if (config->output != RING_BUFFER_MMAP)
- return -EINVAL;
- mmap_buf_len = chan->backend.buf_size;
- if (chan->backend.extra_reader_sb)
- mmap_buf_len += chan->backend.subbuf_size;
- if (mmap_buf_len > INT_MAX)
- return -EFBIG;
- return put_ulong(mmap_buf_len, arg);
- }
- case RING_BUFFER_GET_MMAP_READ_OFFSET:
- {
- unsigned long sb_bindex;
-
- if (config->output != RING_BUFFER_MMAP)
- return -EINVAL;
- sb_bindex = subbuffer_id_get_index(config,
- buf->backend.buf_rsb.id);
- return put_ulong(buf->backend.array[sb_bindex]->mmap_offset,
- arg);
- }
- case RING_BUFFER_FLUSH:
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
- return 0;
- default:
- return -ENOIOCTLCMD;
- }
-}
-
-#ifdef CONFIG_COMPAT
-long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
- unsigned long arg)
-{
- struct lib_ring_buffer *buf = filp->private_data;
- struct channel *chan = buf->backend.chan;
- const struct lib_ring_buffer_config *config = chan->backend.config;
-
- if (lib_ring_buffer_channel_is_disabled(chan))
- return -EIO;
-
- switch (cmd) {
- case RING_BUFFER_SNAPSHOT:
- return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
- &buf->prod_snapshot);
- case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
- return compat_put_ulong(buf->cons_snapshot, arg);
- case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
- return compat_put_ulong(buf->prod_snapshot, arg);
- case RING_BUFFER_GET_SUBBUF:
- {
- __u32 uconsume;
- unsigned long consume;
- long ret;
-
- ret = get_user(uconsume, (__u32 __user *) arg);
- if (ret)
- return ret; /* will return -EFAULT */
- consume = buf->cons_snapshot;
- consume &= ~0xFFFFFFFFL;
- consume |= uconsume;
- ret = lib_ring_buffer_get_subbuf(buf, consume);
- if (!ret) {
- /* Set file position to zero at each successful "get" */
- filp->f_pos = 0;
- }
- return ret;
- }
- case RING_BUFFER_PUT_SUBBUF:
- lib_ring_buffer_put_subbuf(buf);
- return 0;
-
- case RING_BUFFER_GET_NEXT_SUBBUF:
- {
- long ret;
-
- ret = lib_ring_buffer_get_next_subbuf(buf);
- if (!ret) {
- /* Set file position to zero at each successful "get" */
- filp->f_pos = 0;
- }
- return ret;
- }
- case RING_BUFFER_PUT_NEXT_SUBBUF:
- lib_ring_buffer_put_next_subbuf(buf);
- return 0;
- case RING_BUFFER_GET_SUBBUF_SIZE:
- {
- unsigned long data_size;
-
- data_size = lib_ring_buffer_get_read_data_size(config, buf);
- if (data_size > UINT_MAX)
- return -EFBIG;
- return put_ulong(data_size, arg);
- }
- case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
- {
- unsigned long size;
-
- size = lib_ring_buffer_get_read_data_size(config, buf);
- size = PAGE_ALIGN(size);
- if (size > UINT_MAX)
- return -EFBIG;
- return put_ulong(size, arg);
- }
- case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
- if (chan->backend.subbuf_size > UINT_MAX)
- return -EFBIG;
- return put_ulong(chan->backend.subbuf_size, arg);
- case RING_BUFFER_GET_MMAP_LEN:
- {
- unsigned long mmap_buf_len;
-
- if (config->output != RING_BUFFER_MMAP)
- return -EINVAL;
- mmap_buf_len = chan->backend.buf_size;
- if (chan->backend.extra_reader_sb)
- mmap_buf_len += chan->backend.subbuf_size;
- if (mmap_buf_len > UINT_MAX)
- return -EFBIG;
- return put_ulong(mmap_buf_len, arg);
- }
- case RING_BUFFER_GET_MMAP_READ_OFFSET:
- {
- unsigned long sb_bindex, read_offset;
-
- if (config->output != RING_BUFFER_MMAP)
- return -EINVAL;
- sb_bindex = subbuffer_id_get_index(config,
- buf->backend.buf_rsb.id);
- read_offset = buf->backend.array[sb_bindex]->mmap_offset;
- if (read_offset > UINT_MAX)
- return -EINVAL;
- return put_ulong(read_offset, arg);
- }
- case RING_BUFFER_FLUSH:
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
- return 0;
- default:
- return -ENOIOCTLCMD;
- }
-}
-#endif
-
-const struct file_operations lib_ring_buffer_file_operations = {
- .open = lib_ring_buffer_open,
- .release = lib_ring_buffer_release,
- .poll = lib_ring_buffer_poll,
- .splice_read = lib_ring_buffer_splice_read,
- .mmap = lib_ring_buffer_mmap,
- .unlocked_ioctl = lib_ring_buffer_ioctl,
- .llseek = lib_ring_buffer_no_llseek,
-#ifdef CONFIG_COMPAT
- .compat_ioctl = lib_ring_buffer_compat_ioctl,
-#endif
-};
-EXPORT_SYMBOL_GPL(lib_ring_buffer_file_operations);
-
-MODULE_LICENSE("GPL and additional rights");
-MODULE_AUTHOR("Mathieu Desnoyers");
-MODULE_DESCRIPTION("Ring Buffer Library VFS");
* Dual LGPL v2.1/GPL v2 license.
*/
+#include <assert.h>
+#include <urcu/uatomic.h>
+
/*
* Same data type (long) accessed differently depending on configuration.
* v field is for non-atomic access (protected by mutual exclusion).
* atomic_long_t is used for globally shared buffers.
*/
union v_atomic {
- local_t l;
- atomic_long_t a;
+ long a; /* accessed through uatomic */
long v;
};
static inline
long v_read(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
{
- if (config->sync == RING_BUFFER_SYNC_PER_CPU)
- return local_read(&v_a->l);
- else
- return atomic_long_read(&v_a->a);
+ assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+ return uatomic_read(&v_a->a);
}
static inline
void v_set(const struct lib_ring_buffer_config *config, union v_atomic *v_a,
long v)
{
- if (config->sync == RING_BUFFER_SYNC_PER_CPU)
- local_set(&v_a->l, v);
- else
- atomic_long_set(&v_a->a, v);
+ assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+ uatomic_set(&v_a->a, v);
}
static inline
void v_add(const struct lib_ring_buffer_config *config, long v, union v_atomic *v_a)
{
- if (config->sync == RING_BUFFER_SYNC_PER_CPU)
- local_add(v, &v_a->l);
- else
- atomic_long_add(v, &v_a->a);
+ assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+ uatomic_add(&v_a->a, v);
}
static inline
void v_inc(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
{
- if (config->sync == RING_BUFFER_SYNC_PER_CPU)
- local_inc(&v_a->l);
- else
- atomic_long_inc(&v_a->a);
+ assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+ uatomic_inc(&v_a->a);
}
/*
long v_cmpxchg(const struct lib_ring_buffer_config *config, union v_atomic *v_a,
long old, long _new)
{
- if (config->sync == RING_BUFFER_SYNC_PER_CPU)
- return local_cmpxchg(&v_a->l, old, _new);
- else
- return atomic_long_cmpxchg(&v_a->a, old, _new);
+ assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+ return uatomic_cmpxchg(&v_a->a, old, _new);
}
#endif /* _LINUX_RING_BUFFER_VATOMIC_H */