Port ring buffer to userspace, part 1
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 27 Jun 2011 19:24:39 +0000 (15:24 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 27 Jun 2011 19:24:39 +0000 (15:24 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
16 files changed:
include/ust/core.h
libprio_heap/lttng_prio_heap.c [deleted file]
libprio_heap/lttng_prio_heap.h [deleted file]
libringbuffer/Makefile.am
libringbuffer/backend.h
libringbuffer/backend_internal.h
libringbuffer/backend_types.h
libringbuffer/frontend.h
libringbuffer/frontend_internal.h
libringbuffer/frontend_types.h
libringbuffer/ring_buffer_abi.c [new file with mode: 0644]
libringbuffer/ring_buffer_backend.c
libringbuffer/ring_buffer_frontend.c
libringbuffer/ring_buffer_iterator.c [deleted file]
libringbuffer/ring_buffer_vfs.c [deleted file]
libringbuffer/vatomic.h

index eee405dd4c27a46d743eecb625daaed5c144f065..8c1c490eedde18b0615741ee1ec1a9773c0589ab 100644 (file)
@@ -82,7 +82,25 @@ static inline long IS_ERR(const void *ptr)
 
 /* MALLOCATION */
 
-#define zmalloc(s) calloc(1, s)
+#include <stdlib.h>
+
+static inline
+void *zmalloc(size_t len)
+{
+       return calloc(1, len);
+}
+
+static inline
+void *malloc_align(size_t len)
+{
+       return malloc(ALIGN(len, CAA_CACHE_LINE_SIZE));
+}
+
+static inline
+void *zmalloc_align(size_t len)
+{
+       return calloc(1, ALIGN(len, CAA_CACHE_LINE_SIZE));
+}
 
 /* MATH */
 
@@ -110,4 +128,12 @@ static __inline__ int get_count_order(unsigned int count)
         const typeof( ((type *)0)->member ) *__mptr = (ptr);    \
         (type *)( (char *)__mptr - offsetof(type,member) );})
 
+#ifndef inline_memcpy
+#define inline_memcpy memcpy
+#endif
+
+#ifndef __same_type
+#define __same_type(a, b) __builtin_types_compatible_p(typeof(a), typeof(b))
+#endif
+
 #endif /* UST_CORE_H */
diff --git a/libprio_heap/lttng_prio_heap.c b/libprio_heap/lttng_prio_heap.c
deleted file mode 100644 (file)
index 5bbd079..0000000
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * lttng_prio_heap.c
- *
- * Priority heap containing pointers. Based on CLRS, chapter 6.
- *
- * Copyright 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include <linux/slab.h>
-#include "lttng_prio_heap.h"
-
-#ifdef DEBUG_HEAP
-void lttng_check_heap(const struct lttng_ptr_heap *heap)
-{
-       size_t i;
-
-       if (!heap->len)
-               return;
-
-       for (i = 1; i < heap->len; i++)
-               WARN_ON_ONCE(!heap->gt(heap->ptrs[i], heap->ptrs[0]));
-}
-#endif
-
-static
-size_t parent(size_t i)
-{
-       return (i -1) >> 1;
-}
-
-static
-size_t left(size_t i)
-{
-       return (i << 1) + 1;
-}
-
-static
-size_t right(size_t i)
-{
-       return (i << 1) + 2;
-}
-
-/*
- * Copy of heap->ptrs pointer is invalid after heap_grow.
- */
-static
-int heap_grow(struct lttng_ptr_heap *heap, size_t new_len)
-{
-       void **new_ptrs;
-
-       if (heap->alloc_len >= new_len)
-               return 0;
-
-       heap->alloc_len = max_t(size_t, new_len, heap->alloc_len << 1);
-       new_ptrs = kmalloc(heap->alloc_len * sizeof(void *), heap->gfpmask);
-       if (!new_ptrs)
-               return -ENOMEM;
-       if (heap->ptrs)
-               memcpy(new_ptrs, heap->ptrs, heap->len * sizeof(void *));
-       kfree(heap->ptrs);
-       heap->ptrs = new_ptrs;
-       return 0;
-}
-
-static
-int heap_set_len(struct lttng_ptr_heap *heap, size_t new_len)
-{
-       int ret;
-
-       ret = heap_grow(heap, new_len);
-       if (ret)
-               return ret;
-       heap->len = new_len;
-       return 0;
-}
-
-int lttng_heap_init(struct lttng_ptr_heap *heap, size_t alloc_len,
-             gfp_t gfpmask, int gt(void *a, void *b))
-{
-       heap->ptrs = NULL;
-       heap->len = 0;
-       heap->alloc_len = 0;
-       heap->gt = gt;
-       /*
-        * Minimum size allocated is 1 entry to ensure memory allocation
-        * never fails within heap_replace_max.
-        */
-       return heap_grow(heap, max_t(size_t, 1, alloc_len));
-}
-
-void lttng_heap_free(struct lttng_ptr_heap *heap)
-{
-       kfree(heap->ptrs);
-}
-
-static void heapify(struct lttng_ptr_heap *heap, size_t i)
-{
-       void **ptrs = heap->ptrs;
-       size_t l, r, largest;
-
-       for (;;) {
-               void *tmp;
-
-               l = left(i);
-               r = right(i);
-               if (l < heap->len && heap->gt(ptrs[l], ptrs[i]))
-                       largest = l;
-               else
-                       largest = i;
-               if (r < heap->len && heap->gt(ptrs[r], ptrs[largest]))
-                       largest = r;
-               if (largest == i)
-                       break;
-               tmp = ptrs[i];
-               ptrs[i] = ptrs[largest];
-               ptrs[largest] = tmp;
-               i = largest;
-       }
-       lttng_check_heap(heap);
-}
-
-void *lttng_heap_replace_max(struct lttng_ptr_heap *heap, void *p)
-{
-       void *res;
-
-       if (!heap->len) {
-               (void) heap_set_len(heap, 1);
-               heap->ptrs[0] = p;
-               lttng_check_heap(heap);
-               return NULL;
-       }
-
-       /* Replace the current max and heapify */
-       res = heap->ptrs[0];
-       heap->ptrs[0] = p;
-       heapify(heap, 0);
-       return res;
-}
-
-int lttng_heap_insert(struct lttng_ptr_heap *heap, void *p)
-{
-       void **ptrs;
-       size_t pos;
-       int ret;
-
-       ret = heap_set_len(heap, heap->len + 1);
-       if (ret)
-               return ret;
-       ptrs = heap->ptrs;
-       pos = heap->len - 1;
-       while (pos > 0 && heap->gt(p, ptrs[parent(pos)])) {
-               /* Move parent down until we find the right spot */
-               ptrs[pos] = ptrs[parent(pos)];
-               pos = parent(pos);
-       }
-       ptrs[pos] = p;
-       lttng_check_heap(heap);
-       return 0;
-}
-
-void *lttng_heap_remove(struct lttng_ptr_heap *heap)
-{
-       switch (heap->len) {
-       case 0:
-               return NULL;
-       case 1:
-               (void) heap_set_len(heap, 0);
-               return heap->ptrs[0];
-       }
-       /* Shrink, replace the current max by previous last entry and heapify */
-       heap_set_len(heap, heap->len - 1);
-       /* len changed. previous last entry is at heap->len */
-       return lttng_heap_replace_max(heap, heap->ptrs[heap->len]);
-}
-
-void *lttng_heap_cherrypick(struct lttng_ptr_heap *heap, void *p)
-{
-       size_t pos, len = heap->len;
-
-       for (pos = 0; pos < len; pos++)
-               if (heap->ptrs[pos] == p)
-                       goto found;
-       return NULL;
-found:
-       if (heap->len == 1) {
-               (void) heap_set_len(heap, 0);
-               lttng_check_heap(heap);
-               return heap->ptrs[0];
-       }
-       /* Replace p with previous last entry and heapify. */
-       heap_set_len(heap, heap->len - 1);
-       /* len changed. previous last entry is at heap->len */
-       heap->ptrs[pos] = heap->ptrs[heap->len];
-       heapify(heap, pos);
-       return p;
-}
diff --git a/libprio_heap/lttng_prio_heap.h b/libprio_heap/lttng_prio_heap.h
deleted file mode 100644 (file)
index ea8dbb8..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-#ifndef _LTTNG_PRIO_HEAP_H
-#define _LTTNG_PRIO_HEAP_H
-
-/*
- * lttng_prio_heap.h
- *
- * Priority heap containing pointers. Based on CLRS, chapter 6.
- *
- * Copyright 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include <linux/gfp.h>
-
-struct lttng_ptr_heap {
-       size_t len, alloc_len;
-       void **ptrs;
-       int (*gt)(void *a, void *b);
-       gfp_t gfpmask;
-};
-
-#ifdef DEBUG_HEAP
-void lttng_check_heap(const struct lttng_ptr_heap *heap);
-#else
-static inline
-void lttng_check_heap(const struct lttng_ptr_heap *heap)
-{
-}
-#endif
-
-/**
- * lttng_heap_maximum - return the largest element in the heap
- * @heap: the heap to be operated on
- *
- * Returns the largest element in the heap, without performing any modification
- * to the heap structure. Returns NULL if the heap is empty.
- */
-static inline void *lttng_heap_maximum(const struct lttng_ptr_heap *heap)
-{
-       lttng_check_heap(heap);
-       return heap->len ? heap->ptrs[0] : NULL;
-}
-
-/**
- * lttng_heap_init - initialize the heap
- * @heap: the heap to initialize
- * @alloc_len: number of elements initially allocated
- * @gfp: allocation flags
- * @gt: function to compare the elements
- *
- * Returns -ENOMEM if out of memory.
- */
-extern int lttng_heap_init(struct lttng_ptr_heap *heap,
-                    size_t alloc_len, gfp_t gfpmask,
-                    int gt(void *a, void *b));
-
-/**
- * lttng_heap_free - free the heap
- * @heap: the heap to free
- */
-extern void lttng_heap_free(struct lttng_ptr_heap *heap);
-
-/**
- * lttng_heap_insert - insert an element into the heap
- * @heap: the heap to be operated on
- * @p: the element to add
- *
- * Insert an element into the heap.
- *
- * Returns -ENOMEM if out of memory.
- */
-extern int lttng_heap_insert(struct lttng_ptr_heap *heap, void *p);
-
-/**
- * lttng_heap_remove - remove the largest element from the heap
- * @heap: the heap to be operated on
- *
- * Returns the largest element in the heap. It removes this element from the
- * heap. Returns NULL if the heap is empty.
- */
-extern void *lttng_heap_remove(struct lttng_ptr_heap *heap);
-
-/**
- * lttng_heap_cherrypick - remove a given element from the heap
- * @heap: the heap to be operated on
- * @p: the element
- *
- * Remove the given element from the heap. Return the element if present, else
- * return NULL. This algorithm has a complexity of O(n), which is higher than
- * O(log(n)) provided by the rest of this API.
- */
-extern void *lttng_heap_cherrypick(struct lttng_ptr_heap *heap, void *p);
-
-/**
- * lttng_heap_replace_max - replace the the largest element from the heap
- * @heap: the heap to be operated on
- * @p: the pointer to be inserted as topmost element replacement
- *
- * Returns the largest element in the heap. It removes this element from the
- * heap. The heap is rebalanced only once after the insertion. Returns NULL if
- * the heap is empty.
- *
- * This is the equivalent of calling heap_remove() and then heap_insert(), but
- * it only rebalances the heap once. It never allocates memory.
- */
-extern void *lttng_heap_replace_max(struct lttng_ptr_heap *heap, void *p);
-
-#endif /* _LTTNG_PRIO_HEAP_H */
index bc2c7a727db05d5d3e3c65453bfa1a3b6821dace..226605337268dcff8e2d817ca07eb4c5fdf2c707 100644 (file)
@@ -6,11 +6,7 @@ lib_LTLIBRARIES = libringbuffer.la
 libringbuffer_la_SOURCES = \
        ring_buffer_backend.c \
        ring_buffer_frontend.c \
-       ring_buffer_iterator.c \
-       ring_buffer_vfs.c \
-       ring_buffer_splice.c \
-       ring_buffer_mmap.c \
-       ../libprio_heap/lttng_prio_heap.c
+       ring_buffer_abi.c
 
 libringbuffer_la_LDFLAGS = -no-undefined -version-info 0:0:0
 
index b50bae1a1f0ca4323fe30a9f73a9204317199589..61d2f3277f3192765b14b51f8175d40e1b8a8d42 100644 (file)
  * the reader in flight recorder mode.
  */
 
+#include <unistd.h>
+
+#include "ust/core.h"
+
 /* Internal helpers */
 #include "backend_internal.h"
 #include "frontend_internal.h"
 extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb,
                                   size_t offset, void *dest, size_t len);
 
-extern int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
-                                         size_t offset, void __user *dest,
-                                         size_t len);
-
 extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb,
                                     size_t offset, void *dest, size_t len);
 
@@ -76,7 +76,7 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config,
 
        offset &= chanb->buf_size - 1;
        sbidx = offset >> chanb->subbuf_size_order;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+       index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
        pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK);
        id = bufb->buf_wsb[sbidx].id;
        sb_bindex = subbuffer_id_get_index(config, id);
@@ -124,9 +124,4 @@ unsigned long lib_ring_buffer_get_records_unread(
        return records_unread;
 }
 
-ssize_t lib_ring_buffer_file_splice_read(struct file *in, loff_t *ppos,
-                                        struct pipe_inode_info *pipe,
-                                        size_t len, unsigned int flags);
-loff_t lib_ring_buffer_no_llseek(struct file *file, loff_t offset, int origin);
-
 #endif /* _LINUX_RING_BUFFER_BACKEND_H */
index 2c3de67a6f9e826741ae7ef86b5993241b65d6e4..c5f3362e53ed03c21267e635e5242a6c7299ccec 100644 (file)
@@ -11,6 +11,9 @@
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <unistd.h>
+#include <urcu/compiler.h>
+
 #include "config.h"
 #include "backend_types.h"
 #include "frontend_types.h"
@@ -52,7 +55,7 @@ extern void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb,
  * sampling and subbuffer ID exchange).
  */
 
-#define HALF_ULONG_BITS                (BITS_PER_LONG >> 1)
+#define HALF_ULONG_BITS                (CAA_BITS_PER_LONG >> 1)
 
 #define SB_ID_OFFSET_SHIFT     (HALF_ULONG_BITS + 1)
 #define SB_ID_OFFSET_COUNT     (1UL << SB_ID_OFFSET_SHIFT)
@@ -145,7 +148,7 @@ void subbuffer_id_set_noref_offset(const struct lib_ring_buffer_config *config,
                tmp |= offset << SB_ID_OFFSET_SHIFT;
                tmp |= SB_ID_NOREF_MASK;
                /* Volatile store, read concurrently by readers. */
-               ACCESS_ONCE(*id) = tmp;
+               CMM_ACCESS_ONCE(*id) = tmp;
        }
 }
 
@@ -300,7 +303,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
         * Performing a volatile access to read the sb_pages, because we want to
         * read a coherent version of the pointer and the associated noref flag.
         */
-       id = ACCESS_ONCE(bufb->buf_wsb[idx].id);
+       id = CMM_ACCESS_ONCE(bufb->buf_wsb[idx].id);
        for (;;) {
                /* This check is called on the fast path for each record. */
                if (likely(!subbuffer_id_is_noref(config, id))) {
@@ -314,7 +317,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config,
                }
                new_id = id;
                subbuffer_id_clear_noref(config, &new_id);
-               new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
+               new_id = uatomic_cmpxchg(&bufb->buf_wsb[idx].id, id, new_id);
                if (likely(new_id == id))
                        break;
                id = new_id;
@@ -350,7 +353,7 @@ void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *confi
         * Memory barrier that ensures counter stores are ordered before set
         * noref and offset.
         */
-       smp_mb();
+       cmm_smp_mb();
        subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset);
 }
 
@@ -369,7 +372,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config,
        if (config->mode == RING_BUFFER_OVERWRITE) {
                /*
                 * Exchange the target writer subbuffer with our own unused
-                * subbuffer. No need to use ACCESS_ONCE() here to read the
+                * subbuffer. No need to use CMM_ACCESS_ONCE() here to read the
                 * old_wpage, because the value read will be confirmed by the
                 * following cmpxchg().
                 */
@@ -387,7 +390,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config,
                             !subbuffer_id_is_noref(config, bufb->buf_rsb.id));
                subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id,
                                              consumed_count);
-               new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
+               new_id = uatomic_cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id,
                                 bufb->buf_rsb.id);
                if (unlikely(old_id != new_id))
                        return -EAGAIN;
index 99de9f11dd9f48b17441d7e43db00a63d6f43060..cfbe59c3cdedefb091148f648671bd35972c01c0 100644 (file)
@@ -68,9 +68,7 @@ struct channel_backend {
        unsigned long num_subbuf;       /* Number of sub-buffers for writer */
        u64 start_tsc;                  /* Channel creation TSC value */
        void *priv;                     /* Client-specific information */
-       struct notifier_block cpu_hp_notifier;   /* CPU hotplug notifier */
        const struct lib_ring_buffer_config *config; /* Ring buffer configuration */
-       cpumask_var_t cpumask;          /* Allocated per-cpu buffers cpumask */
        char name[NAME_MAX];            /* Channel name */
 };
 
index 1eb4ae92d6c69cab1f184f572b21c647f5f866ee..9d73da16fee45b0f28937d86f2a459d04c6767a2 100644 (file)
@@ -16,6 +16,9 @@
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <urcu/compiler.h>
+#include <urcu/uatomic.h>
+
 /* Internal helpers */
 #include "frontend_internal.h"
 
@@ -60,7 +63,7 @@ void *channel_destroy(struct channel *chan);
 #define for_each_channel_cpu(cpu, chan)                                        \
        for ((cpu) = -1;                                                \
                ({ (cpu) = cpumask_next(cpu, (chan)->backend.cpumask);  \
-                  smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });)
+                  cmm_smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });)
 
 extern struct lib_ring_buffer *channel_get_ring_buffer(
                                const struct lib_ring_buffer_config *config,
@@ -118,7 +121,7 @@ static inline
 unsigned long lib_ring_buffer_get_consumed(const struct lib_ring_buffer_config *config,
                                           struct lib_ring_buffer *buf)
 {
-       return atomic_long_read(&buf->consumed);
+       return uatomic_read(&buf->consumed);
 }
 
 /*
@@ -129,11 +132,11 @@ static inline
 int lib_ring_buffer_is_finalized(const struct lib_ring_buffer_config *config,
                                 struct lib_ring_buffer *buf)
 {
-       int finalized = ACCESS_ONCE(buf->finalized);
+       int finalized = CMM_ACCESS_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
-       smp_rmb();
+       cmm_smp_rmb();
        return finalized;
 }
 
@@ -146,7 +149,7 @@ int lib_ring_buffer_channel_is_finalized(const struct channel *chan)
 static inline
 int lib_ring_buffer_channel_is_disabled(const struct channel *chan)
 {
-       return atomic_read(&chan->record_disabled);
+       return uatomic_read(&chan->record_disabled);
 }
 
 static inline
index 37e046bc2d40d557cd3830fa46b171c57a0dc4c2..f758a6842257ece1dfaea0bb7a7e93e5d802b7ad 100644 (file)
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <urcu/compiler.h>
+
 #include "config.h"
 #include "backend_types.h"
 #include "frontend_types.h"
-#include "../lib_prio_heap/lttng_prio_heap.h"  /* For per-CPU read-side iterator */
 
 /* Buffer offset macros */
 
@@ -81,7 +82,7 @@ unsigned long subbuf_index(unsigned long offset, struct channel *chan)
  * last_tsc atomically.
  */
 
-#if (BITS_PER_LONG == 32)
+#if (CAA_BITS_PER_LONG == 32)
 static inline
 void save_last_tsc(const struct lib_ring_buffer_config *config,
                   struct lib_ring_buffer *buf, u64 tsc)
@@ -154,7 +155,7 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf,
        unsigned long consumed_old, consumed_new;
 
        do {
-               consumed_old = atomic_long_read(&buf->consumed);
+               consumed_old = uatomic_read(&buf->consumed);
                /*
                 * If buffer is in overwrite mode, push the reader consumed
                 * count if the write position has reached it and we are not
@@ -170,7 +171,7 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf,
                        consumed_new = subbuf_align(consumed_old, chan);
                else
                        return;
-       } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
+       } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
                                              consumed_new) != consumed_old));
 }
 
@@ -191,7 +192,7 @@ int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config,
 {
        unsigned long consumed_old, consumed_idx, commit_count, write_offset;
 
-       consumed_old = atomic_long_read(&buf->consumed);
+       consumed_old = uatomic_read(&buf->consumed);
        consumed_idx = subbuf_index(consumed_old, chan);
        commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb);
        /*
@@ -354,7 +355,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                         * respect to writers coming into the subbuffer after
                         * wrap around, and also order wrt concurrent readers.
                         */
-                       smp_mb();
+                       cmm_smp_mb();
                        /* End of exclusive subbuffer access */
                        v_set(config, &buf->commit_cold[idx].cc_sb,
                              commit_count);
@@ -365,10 +366,10 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config,
                         * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free.
                         */
                        if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER
-                           && atomic_long_read(&buf->active_readers)
+                           && uatomic_read(&buf->active_readers)
                            && lib_ring_buffer_poll_deliver(config, buf, chan)) {
-                               wake_up_interruptible(&buf->read_wait);
-                               wake_up_interruptible(&chan->read_wait);
+                               //wake_up_interruptible(&buf->read_wait);
+                               //wake_up_interruptible(&chan->read_wait);
                        }
 
                }
@@ -419,6 +420,6 @@ extern int lib_ring_buffer_create(struct lib_ring_buffer *buf,
 extern void lib_ring_buffer_free(struct lib_ring_buffer *buf);
 
 /* Keep track of trap nesting inside ring buffer code */
-DECLARE_PER_CPU(unsigned int, lib_ring_buffer_nesting);
+extern __thread unsigned int lib_ring_buffer_nesting;
 
 #endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */
index f89d803cefacb59542b6e1f095e2ac8c0aa93950..c9f98cb79810dd9385e578da5f520c60e8582340 100644 (file)
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <urcu/list.h>
+#include <urcu/uatomic.h>
+#include <urcu/ref.h>
+
+#include "ust/core.h"
+
+#include "usterr_signal_safe.h"
 #include "config.h"
 #include "backend_types.h"
-#include "../lib_prio_heap/lttng_prio_heap.h"  /* For per-CPU read-side iterator */
 
 /*
  * A switch is done during tracing or as a final flush after tracing (so it
  */
 enum switch_mode { SWITCH_ACTIVE, SWITCH_FLUSH };
 
-/* channel-level read-side iterator */
-struct channel_iter {
-       /* Prio heap of buffers. Lowest timestamps at the top. */
-       struct lttng_ptr_heap heap;     /* Heap of struct lib_ring_buffer ptrs */
-       struct list_head empty_head;    /* Empty buffers linked-list head */
-       int read_open;                  /* Opened for reading ? */
-       u64 last_qs;                    /* Last quiescent state timestamp */
-       u64 last_timestamp;             /* Last timestamp (for WARN_ON) */
-       int last_cpu;                   /* Last timestamp cpu */
-       /*
-        * read() file operation state.
-        */
-       unsigned long len_left;
-};
-
 /* channel: collection of per-cpu ring buffers. */
 struct channel {
-       atomic_t record_disabled;
+       int record_disabled;
        unsigned long commit_count_mask;        /*
                                                 * Commit count mask, removing
                                                 * the MSBs corresponding to
@@ -55,16 +46,9 @@ struct channel {
 
        unsigned long switch_timer_interval;    /* Buffer flush (jiffies) */
        unsigned long read_timer_interval;      /* Reader wakeup (jiffies) */
-       struct notifier_block cpu_hp_notifier;  /* CPU hotplug notifier */
-       struct notifier_block tick_nohz_notifier; /* CPU nohz notifier */
-       struct notifier_block hp_iter_notifier; /* hotplug iterator notifier */
-       int cpu_hp_enable:1;                    /* Enable CPU hotplug notif. */
-       int hp_iter_enable:1;                   /* Enable hp iter notif. */
-       wait_queue_head_t read_wait;            /* reader wait queue */
-       wait_queue_head_t hp_wait;              /* CPU hotplug wait queue */
+       //wait_queue_head_t read_wait;          /* reader wait queue */
        int finalized;                          /* Has channel been finalized */
-       struct channel_iter iter;               /* Channel read-side iterator */
-       struct kref ref;                        /* Reference count */
+       struct urcu_ref ref;                    /* Reference count */
 };
 
 /* Per-subbuffer commit counters used on the hot path */
@@ -78,35 +62,17 @@ struct commit_counters_cold {
        union v_atomic cc_sb;           /* Incremented _once_ at sb switch */
 };
 
-/* Per-buffer read iterator */
-struct lib_ring_buffer_iter {
-       u64 timestamp;                  /* Current record timestamp */
-       size_t header_len;              /* Current record header length */
-       size_t payload_len;             /* Current record payload length */
-
-       struct list_head empty_node;    /* Linked list of empty buffers */
-       unsigned long consumed, read_offset, data_size;
-       enum {
-               ITER_GET_SUBBUF = 0,
-               ITER_TEST_RECORD,
-               ITER_NEXT_RECORD,
-               ITER_PUT_SUBBUF,
-       } state;
-       int allocated:1;
-       int read_open:1;                /* Opened for reading ? */
-};
-
 /* ring buffer state */
 struct lib_ring_buffer {
        /* First 32 bytes cache-hot cacheline */
        union v_atomic offset;          /* Current offset in the buffer */
        struct commit_counters_hot *commit_hot;
                                        /* Commit count per sub-buffer */
-       atomic_long_t consumed;         /*
+       long consumed;                  /*
                                         * Current offset in the buffer
                                         * standard atomic access (shared)
                                         */
-       atomic_t record_disabled;
+       int record_disabled;
        /* End of first 32 bytes cacheline */
        union v_atomic last_tsc;        /*
                                         * Last timestamp written in the buffer.
@@ -116,7 +82,7 @@ struct lib_ring_buffer {
 
        struct commit_counters_cold *commit_cold;
                                        /* Commit count per sub-buffer */
-       atomic_long_t active_readers;   /*
+       long active_readers;            /*
                                         * Active readers count
                                         * standard atomic access (shared)
                                         */
@@ -126,12 +92,10 @@ struct lib_ring_buffer {
        union v_atomic records_lost_big;        /* Events too big */
        union v_atomic records_count;   /* Number of records written */
        union v_atomic records_overrun; /* Number of overwritten records */
-       wait_queue_head_t read_wait;    /* reader buffer-level wait queue */
+       //wait_queue_head_t read_wait;  /* reader buffer-level wait queue */
        int finalized;                  /* buffer has been finalized */
-       struct timer_list switch_timer; /* timer for periodical switch */
-       struct timer_list read_timer;   /* timer for read poll */
-       raw_spinlock_t raw_tick_nohz_spinlock;  /* nohz entry lock/trylock */
-       struct lib_ring_buffer_iter iter;       /* read-side iterator */
+       //struct timer_list switch_timer;       /* timer for periodical switch */
+       //struct timer_list read_timer; /* timer for read poll */
        unsigned long get_subbuf_consumed;      /* Read-side consumed */
        unsigned long prod_snapshot;    /* Producer count snapshot */
        unsigned long cons_snapshot;    /* Consumer count snapshot */
@@ -157,14 +121,14 @@ void *channel_get_private(struct channel *chan)
                int _____ret = unlikely(cond);                          \
                if (_____ret) {                                         \
                        if (__same_type(*(c), struct channel_backend))  \
-                               __chan = container_of((void *) (c),     \
+                               __chan = caa_container_of((void *) (c), \
                                                        struct channel, \
                                                        backend);       \
                        else if (__same_type(*(c), struct channel))     \
                                __chan = (void *) (c);                  \
                        else                                            \
                                BUG_ON(1);                              \
-                       atomic_inc(&__chan->record_disabled);           \
+                       uatomic_inc(&__chan->record_disabled);          \
                        WARN_ON(1);                                     \
                }                                                       \
                _____ret;                                               \
diff --git a/libringbuffer/ring_buffer_abi.c b/libringbuffer/ring_buffer_abi.c
new file mode 100644 (file)
index 0000000..c105fe0
--- /dev/null
@@ -0,0 +1,381 @@
+/*
+ * ring_buffer_vfs.c
+ *
+ * Copyright (C) 2009-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Ring Buffer VFS file operations.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include "backend.h"
+#include "frontend.h"
+#include "vfs.h"
+
+static int put_ulong(unsigned long val, unsigned long arg)
+{
+       return put_user(val, (unsigned long __user *)arg);
+}
+
+#ifdef CONFIG_COMPAT
+static int compat_put_ulong(compat_ulong_t val, unsigned long arg)
+{
+       return put_user(val, (compat_ulong_t __user *)compat_ptr(arg));
+}
+#endif
+
+/**
+ *     lib_ring_buffer_open - ring buffer open file operation
+ *     @inode: opened inode
+ *     @file: opened file
+ *
+ *     Open implementation. Makes sure only one open instance of a buffer is
+ *     done at a given moment.
+ */
+int lib_ring_buffer_open(struct inode *inode, struct file *file)
+{
+       struct lib_ring_buffer *buf = inode->i_private;
+       int ret;
+
+       ret = lib_ring_buffer_open_read(buf);
+       if (ret)
+               return ret;
+
+       file->private_data = buf;
+       ret = nonseekable_open(inode, file);
+       if (ret)
+               goto release_read;
+       return 0;
+
+release_read:
+       lib_ring_buffer_release_read(buf);
+       return ret;
+}
+
+/**
+ *     lib_ring_buffer_release - ring buffer release file operation
+ *     @inode: opened inode
+ *     @file: opened file
+ *
+ *     Release implementation.
+ */
+int lib_ring_buffer_release(struct inode *inode, struct file *file)
+{
+       struct lib_ring_buffer *buf = file->private_data;
+
+       lib_ring_buffer_release_read(buf);
+
+       return 0;
+}
+
+/**
+ *     lib_ring_buffer_poll - ring buffer poll file operation
+ *     @filp: the file
+ *     @wait: poll table
+ *
+ *     Poll implementation.
+ */
+unsigned int lib_ring_buffer_poll(struct file *filp, poll_table *wait)
+{
+       unsigned int mask = 0;
+       struct lib_ring_buffer *buf = filp->private_data;
+       struct channel *chan = buf->backend.chan;
+       const struct lib_ring_buffer_config *config = chan->backend.config;
+       int finalized, disabled;
+
+       if (filp->f_mode & FMODE_READ) {
+               poll_wait_set_exclusive(wait);
+               poll_wait(filp, &buf->read_wait, wait);
+
+               finalized = lib_ring_buffer_is_finalized(config, buf);
+               disabled = lib_ring_buffer_channel_is_disabled(chan);
+
+               /*
+                * lib_ring_buffer_is_finalized() contains a smp_rmb() ordering
+                * finalized load before offsets loads.
+                */
+               WARN_ON(atomic_long_read(&buf->active_readers) != 1);
+retry:
+               if (disabled)
+                       return POLLERR;
+
+               if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf), chan)
+                 - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf), chan)
+                 == 0) {
+                       if (finalized)
+                               return POLLHUP;
+                       else {
+                               /*
+                                * The memory barriers
+                                * __wait_event()/wake_up_interruptible() take
+                                * care of "raw_spin_is_locked" memory ordering.
+                                */
+                               if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
+                                       goto retry;
+                               else
+                                       return 0;
+                       }
+               } else {
+                       if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf),
+                                        chan)
+                         - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf),
+                                        chan)
+                         >= chan->backend.buf_size)
+                               return POLLPRI | POLLRDBAND;
+                       else
+                               return POLLIN | POLLRDNORM;
+               }
+       }
+       return mask;
+}
+
+/**
+ *     lib_ring_buffer_ioctl - control ring buffer reader synchronization
+ *
+ *     @filp: the file
+ *     @cmd: the command
+ *     @arg: command arg
+ *
+ *     This ioctl implements commands necessary for producer/consumer
+ *     and flight recorder reader interaction :
+ *     RING_BUFFER_GET_NEXT_SUBBUF
+ *             Get the next sub-buffer that can be read. It never blocks.
+ *     RING_BUFFER_PUT_NEXT_SUBBUF
+ *             Release the currently read sub-buffer.
+ *     RING_BUFFER_GET_SUBBUF_SIZE
+ *             returns the size of the current sub-buffer.
+ *     RING_BUFFER_GET_MAX_SUBBUF_SIZE
+ *             returns the maximum size for sub-buffers.
+ *     RING_BUFFER_GET_NUM_SUBBUF
+ *             returns the number of reader-visible sub-buffers in the per cpu
+ *              channel (for mmap).
+ *      RING_BUFFER_GET_MMAP_READ_OFFSET
+ *              returns the offset of the subbuffer belonging to the reader.
+ *              Should only be used for mmap clients.
+ */
+long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
+{
+       struct lib_ring_buffer *buf = filp->private_data;
+       struct channel *chan = buf->backend.chan;
+       const struct lib_ring_buffer_config *config = chan->backend.config;
+
+       if (lib_ring_buffer_channel_is_disabled(chan))
+               return -EIO;
+
+       switch (cmd) {
+       case RING_BUFFER_SNAPSHOT:
+               return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+                                           &buf->prod_snapshot);
+       case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
+               return put_ulong(buf->cons_snapshot, arg);
+       case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
+               return put_ulong(buf->prod_snapshot, arg);
+       case RING_BUFFER_GET_SUBBUF:
+       {
+               unsigned long uconsume;
+               long ret;
+
+               ret = get_user(uconsume, (unsigned long __user *) arg);
+               if (ret)
+                       return ret; /* will return -EFAULT */
+               ret = lib_ring_buffer_get_subbuf(buf, uconsume);
+               if (!ret) {
+                       /* Set file position to zero at each successful "get" */
+                       filp->f_pos = 0;
+               }
+               return ret;
+       }
+       case RING_BUFFER_PUT_SUBBUF:
+               lib_ring_buffer_put_subbuf(buf);
+               return 0;
+
+       case RING_BUFFER_GET_NEXT_SUBBUF:
+       {
+               long ret;
+
+               ret = lib_ring_buffer_get_next_subbuf(buf);
+               if (!ret) {
+                       /* Set file position to zero at each successful "get" */
+                       filp->f_pos = 0;
+               }
+               return ret;
+       }
+       case RING_BUFFER_PUT_NEXT_SUBBUF:
+               lib_ring_buffer_put_next_subbuf(buf);
+               return 0;
+       case RING_BUFFER_GET_SUBBUF_SIZE:
+               return put_ulong(lib_ring_buffer_get_read_data_size(config, buf),
+                                arg);
+       case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
+       {
+               unsigned long size;
+
+               size = lib_ring_buffer_get_read_data_size(config, buf);
+               size = PAGE_ALIGN(size);
+               return put_ulong(size, arg);
+       }
+       case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
+               return put_ulong(chan->backend.subbuf_size, arg);
+       case RING_BUFFER_GET_MMAP_LEN:
+       {
+               unsigned long mmap_buf_len;
+
+               if (config->output != RING_BUFFER_MMAP)
+                       return -EINVAL;
+               mmap_buf_len = chan->backend.buf_size;
+               if (chan->backend.extra_reader_sb)
+                       mmap_buf_len += chan->backend.subbuf_size;
+               if (mmap_buf_len > INT_MAX)
+                       return -EFBIG;
+               return put_ulong(mmap_buf_len, arg);
+       }
+       case RING_BUFFER_GET_MMAP_READ_OFFSET:
+       {
+               unsigned long sb_bindex;
+
+               if (config->output != RING_BUFFER_MMAP)
+                       return -EINVAL;
+               sb_bindex = subbuffer_id_get_index(config,
+                                                  buf->backend.buf_rsb.id);
+               return put_ulong(buf->backend.array[sb_bindex]->mmap_offset,
+                                arg);
+       }
+       case RING_BUFFER_FLUSH:
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               return 0;
+       default:
+               return -ENOIOCTLCMD;
+       }
+}
+
+#ifdef CONFIG_COMPAT
+long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
+                                 unsigned long arg)
+{
+       struct lib_ring_buffer *buf = filp->private_data;
+       struct channel *chan = buf->backend.chan;
+       const struct lib_ring_buffer_config *config = chan->backend.config;
+
+       if (lib_ring_buffer_channel_is_disabled(chan))
+               return -EIO;
+
+       switch (cmd) {
+       case RING_BUFFER_SNAPSHOT:
+               return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+                                               &buf->prod_snapshot);
+       case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
+               return compat_put_ulong(buf->cons_snapshot, arg);
+       case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
+               return compat_put_ulong(buf->prod_snapshot, arg);
+       case RING_BUFFER_GET_SUBBUF:
+       {
+               __u32 uconsume;
+               unsigned long consume;
+               long ret;
+
+               ret = get_user(uconsume, (__u32 __user *) arg);
+               if (ret)
+                       return ret; /* will return -EFAULT */
+               consume = buf->cons_snapshot;
+               consume &= ~0xFFFFFFFFL;
+               consume |= uconsume;
+               ret = lib_ring_buffer_get_subbuf(buf, consume);
+               if (!ret) {
+                       /* Set file position to zero at each successful "get" */
+                       filp->f_pos = 0;
+               }
+               return ret;
+       }
+       case RING_BUFFER_PUT_SUBBUF:
+               lib_ring_buffer_put_subbuf(buf);
+               return 0;
+
+       case RING_BUFFER_GET_NEXT_SUBBUF:
+       {
+               long ret;
+
+               ret = lib_ring_buffer_get_next_subbuf(buf);
+               if (!ret) {
+                       /* Set file position to zero at each successful "get" */
+                       filp->f_pos = 0;
+               }
+               return ret;
+       }
+       case RING_BUFFER_PUT_NEXT_SUBBUF:
+               lib_ring_buffer_put_next_subbuf(buf);
+               return 0;
+       case RING_BUFFER_GET_SUBBUF_SIZE:
+       {
+               unsigned long data_size;
+
+               data_size = lib_ring_buffer_get_read_data_size(config, buf);
+               if (data_size > UINT_MAX)
+                       return -EFBIG;
+               return put_ulong(data_size, arg);
+       }
+       case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
+       {
+               unsigned long size;
+
+               size = lib_ring_buffer_get_read_data_size(config, buf);
+               size = PAGE_ALIGN(size);
+               if (size > UINT_MAX)
+                       return -EFBIG;
+               return put_ulong(size, arg);
+       }
+       case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
+               if (chan->backend.subbuf_size > UINT_MAX)
+                       return -EFBIG;
+               return put_ulong(chan->backend.subbuf_size, arg);
+       case RING_BUFFER_GET_MMAP_LEN:
+       {
+               unsigned long mmap_buf_len;
+
+               if (config->output != RING_BUFFER_MMAP)
+                       return -EINVAL;
+               mmap_buf_len = chan->backend.buf_size;
+               if (chan->backend.extra_reader_sb)
+                       mmap_buf_len += chan->backend.subbuf_size;
+               if (mmap_buf_len > UINT_MAX)
+                       return -EFBIG;
+               return put_ulong(mmap_buf_len, arg);
+       }
+       case RING_BUFFER_GET_MMAP_READ_OFFSET:
+       {
+               unsigned long sb_bindex, read_offset;
+
+               if (config->output != RING_BUFFER_MMAP)
+                       return -EINVAL;
+               sb_bindex = subbuffer_id_get_index(config,
+                                                  buf->backend.buf_rsb.id);
+               read_offset = buf->backend.array[sb_bindex]->mmap_offset;
+               if (read_offset > UINT_MAX)
+                       return -EINVAL;
+               return put_ulong(read_offset, arg);
+       }
+       case RING_BUFFER_FLUSH:
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               return 0;
+       default:
+               return -ENOIOCTLCMD;
+       }
+}
+#endif
+
+const struct file_operations lib_ring_buffer_file_operations = {
+       .open = lib_ring_buffer_open,
+       .release = lib_ring_buffer_release,
+       .poll = lib_ring_buffer_poll,
+       .splice_read = lib_ring_buffer_splice_read,
+       .mmap = lib_ring_buffer_mmap,
+       .unlocked_ioctl = lib_ring_buffer_ioctl,
+       .llseek = lib_ring_buffer_no_llseek,
+#ifdef CONFIG_COMPAT
+       .compat_ioctl = lib_ring_buffer_compat_ioctl,
+#endif
+};
+EXPORT_SYMBOL_GPL(lib_ring_buffer_file_operations);
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Ring Buffer Library VFS");
index 5ff38f0daa42286578ebc74fc018d337458e6341..861acf746572779eb71039bad1191b9ebb224bf8 100644 (file)
@@ -6,6 +6,10 @@
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <urcu/arch.h>
+
+#include "ust/core.h"
+
 #include "config.h"
 #include "backend.h"
 #include "frontend.h"
@@ -32,7 +36,7 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
        void **virt;
        unsigned long i;
 
-       num_pages = size >> PAGE_SHIFT;
+       num_pages = size >> get_count_order(PAGE_SIZE);
        num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
        subbuf_size = chanb->subbuf_size;
        num_subbuf_alloc = num_subbuf;
@@ -42,22 +46,15 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
                num_subbuf_alloc++;
        }
 
-       pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
-                                  1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+       pages = malloc_align(sizeof(*pages) * num_pages);
        if (unlikely(!pages))
                goto pages_error;
 
-       virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
-                                 1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+       virt = malloc_align(sizeof(*virt) * num_pages);
        if (unlikely(!virt))
                goto virt_error;
 
-       bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
-                                        * num_subbuf_alloc,
-                                 1 << INTERNODE_CACHE_SHIFT),
-                       GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+       bufb->array = malloc_align(sizeof(*bufb->array) * num_subbuf_alloc);
        if (unlikely(!bufb->array))
                goto array_error;
 
@@ -73,22 +70,18 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
        /* Allocate backend pages array elements */
        for (i = 0; i < num_subbuf_alloc; i++) {
                bufb->array[i] =
-                       kzalloc_node(ALIGN(
+                       zmalloc_align(
                                sizeof(struct lib_ring_buffer_backend_pages) +
                                sizeof(struct lib_ring_buffer_backend_page)
-                               * num_pages_per_subbuf,
-                               1 << INTERNODE_CACHE_SHIFT),
-                               GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+                               * num_pages_per_subbuf);
                if (!bufb->array[i])
                        goto free_array;
        }
 
        /* Allocate write-side subbuffer table */
-       bufb->buf_wsb = kzalloc_node(ALIGN(
+       bufb->buf_wsb = zmalloc_align(
                                sizeof(struct lib_ring_buffer_backend_subbuffer)
-                               * num_subbuf,
-                               1 << INTERNODE_CACHE_SHIFT),
-                               GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
+                               * num_subbuf);
        if (unlikely(!bufb->buf_wsb))
                goto free_array;
 
@@ -116,11 +109,6 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config
                }
        }
 
-       /*
-        * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
-        * will not fault.
-        */
-       wrapper_vmalloc_sync_all();
        kfree(virt);
        kfree(pages);
        return 0;
@@ -146,7 +134,7 @@ int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
 {
        const struct lib_ring_buffer_config *config = chanb->config;
 
-       bufb->chan = container_of(chanb, struct channel, backend);
+       bufb->chan = caa_container_of(chanb, struct channel, backend);
        bufb->cpu = cpu;
 
        return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
@@ -209,7 +197,7 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
  */
 void channel_backend_reset(struct channel_backend *chanb)
 {
-       struct channel *chan = container_of(chanb, struct channel, backend);
+       struct channel *chan = caa_container_of(chanb, struct channel, backend);
        const struct lib_ring_buffer_config *config = chanb->config;
 
        /*
@@ -235,7 +223,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
                                              void *hcpu)
 {
        unsigned int cpu = (unsigned long)hcpu;
-       struct channel_backend *chanb = container_of(nb, struct channel_backend,
+       struct channel_backend *chanb = caa_container_of(nb, struct channel_backend,
                                                     cpu_hp_notifier);
        const struct lib_ring_buffer_config *config = chanb->config;
        struct lib_ring_buffer *buf;
@@ -289,7 +277,7 @@ int channel_backend_init(struct channel_backend *chanb,
                         const struct lib_ring_buffer_config *config,
                         void *priv, size_t subbuf_size, size_t num_subbuf)
 {
-       struct channel *chan = container_of(chanb, struct channel, backend);
+       struct channel *chan = caa_container_of(chanb, struct channel, backend);
        unsigned int i;
        int ret;
 
@@ -468,7 +456,7 @@ void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
                src += pagecpy;
                offset += pagecpy;
                sbidx = offset >> chanb->subbuf_size_order;
-               index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+               index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
 
                /*
                 * Underlying layer should never ask for writes across
@@ -512,7 +500,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
 
        orig_len = len;
        offset &= chanb->buf_size - 1;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+       index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
        if (unlikely(!len))
                return 0;
        for (;;) {
@@ -529,7 +517,7 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
                        break;
                dest += pagecpy;
                offset += pagecpy;
-               index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+               index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
                /*
                 * Underlying layer should never ask for reads across
                 * subbuffers.
@@ -540,60 +528,6 @@ size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
 }
 EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
 
-/**
- * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
- * @bufb : buffer backend
- * @offset : offset within the buffer
- * @dest : destination userspace address
- * @len : length to copy to destination
- *
- * Should be protected by get_subbuf/put_subbuf.
- * access_ok() must have been performed on dest addresses prior to call this
- * function.
- * Returns -EFAULT on error, 0 if ok.
- */
-int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
-                                  size_t offset, void __user *dest, size_t len)
-{
-       struct channel_backend *chanb = &bufb->chan->backend;
-       const struct lib_ring_buffer_config *config = chanb->config;
-       size_t index;
-       ssize_t pagecpy, orig_len;
-       struct lib_ring_buffer_backend_pages *rpages;
-       unsigned long sb_bindex, id;
-
-       orig_len = len;
-       offset &= chanb->buf_size - 1;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
-       if (unlikely(!len))
-               return 0;
-       for (;;) {
-               pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
-               id = bufb->buf_rsb.id;
-               sb_bindex = subbuffer_id_get_index(config, id);
-               rpages = bufb->array[sb_bindex];
-               CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
-                            && subbuffer_id_is_noref(config, id));
-               if (__copy_to_user(dest,
-                              rpages->p[index].virt + (offset & ~PAGE_MASK),
-                              pagecpy))
-                       return -EFAULT;
-               len -= pagecpy;
-               if (likely(!len))
-                       break;
-               dest += pagecpy;
-               offset += pagecpy;
-               index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
-               /*
-                * Underlying layer should never ask for reads across
-                * subbuffers.
-                */
-               CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
-       }
-       return 0;
-}
-EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
-
 /**
  * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
  * @bufb : buffer backend
@@ -616,7 +550,7 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+       index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
        orig_offset = offset;
        for (;;) {
                id = bufb->buf_rsb.id;
@@ -636,7 +570,7 @@ int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offse
                        len -= pagecpy;
                }
                offset += strpagelen;
-               index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+               index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
                if (strpagelen < pagelen)
                        break;
                /*
@@ -670,7 +604,7 @@ struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+       index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
        id = bufb->buf_rsb.id;
        sb_bindex = subbuffer_id_get_index(config, id);
        rpages = bufb->array[sb_bindex];
@@ -701,7 +635,7 @@ void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
        unsigned long sb_bindex, id;
 
        offset &= chanb->buf_size - 1;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+       index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
        id = bufb->buf_rsb.id;
        sb_bindex = subbuffer_id_get_index(config, id);
        rpages = bufb->array[sb_bindex];
@@ -732,7 +666,7 @@ void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
 
        offset &= chanb->buf_size - 1;
        sbidx = offset >> chanb->subbuf_size_order;
-       index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
+       index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE);
        id = bufb->buf_wsb[sbidx].id;
        sb_bindex = subbuffer_id_get_index(config, id);
        rpages = bufb->array[sb_bindex];
index a3f14f3d9926855b738d7e6e82c8a950844a12dc..5ceda871a907912d517c506f103b8686a0b4fea5 100644 (file)
@@ -38,6 +38,8 @@
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <urcu/compiler.h>
+
 #include "config.h"
 #include "backend.h"
 #include "frontend.h"
@@ -158,7 +160,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                           struct channel_backend *chanb, int cpu)
 {
        const struct lib_ring_buffer_config *config = chanb->config;
-       struct channel *chan = container_of(chanb, struct channel, backend);
+       struct channel *chan = caa_container_of(chanb, struct channel, backend);
        void *priv = chanb->priv;
        unsigned int num_subbuf;
        size_t subbuf_header_size;
@@ -391,7 +393,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
                                              void *hcpu)
 {
        unsigned int cpu = (unsigned long)hcpu;
-       struct channel *chan = container_of(nb, struct channel,
+       struct channel *chan = caa_container_of(nb, struct channel,
                                            cpu_hp_notifier);
        struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
        const struct lib_ring_buffer_config *config = chan->backend.config;
@@ -447,7 +449,7 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
                                                  unsigned long val,
                                                  void *data)
 {
-       struct channel *chan = container_of(nb, struct channel,
+       struct channel *chan = caa_container_of(nb, struct channel,
                                            tick_nohz_notifier);
        const struct lib_ring_buffer_config *config = chan->backend.config;
        struct lib_ring_buffer *buf;
@@ -687,7 +689,7 @@ EXPORT_SYMBOL_GPL(channel_create);
 static
 void channel_release(struct kref *kref)
 {
-       struct channel *chan = container_of(kref, struct channel, ref);
+       struct channel *chan = caa_container_of(kref, struct channel, ref);
        channel_free(chan);
 }
 
@@ -729,7 +731,7 @@ void *channel_destroy(struct channel *chan)
                         * Perform flush before writing to finalized.
                         */
                        smp_wmb();
-                       ACCESS_ONCE(buf->finalized) = 1;
+                       CMM_ACCESS_ONCE(buf->finalized) = 1;
                        wake_up_interruptible(&buf->read_wait);
                }
        } else {
@@ -743,10 +745,10 @@ void *channel_destroy(struct channel *chan)
                 * Perform flush before writing to finalized.
                 */
                smp_wmb();
-               ACCESS_ONCE(buf->finalized) = 1;
+               CMM_ACCESS_ONCE(buf->finalized) = 1;
                wake_up_interruptible(&buf->read_wait);
        }
-       ACCESS_ONCE(chan->finalized) = 1;
+       CMM_ACCESS_ONCE(chan->finalized) = 1;
        wake_up_interruptible(&chan->hp_wait);
        wake_up_interruptible(&chan->read_wait);
        kref_put(&chan->ref, channel_release);
@@ -820,7 +822,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
        int finalized;
 
 retry:
-       finalized = ACCESS_ONCE(buf->finalized);
+       finalized = CMM_ACCESS_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
@@ -908,7 +910,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
        int finalized;
 
 retry:
-       finalized = ACCESS_ONCE(buf->finalized);
+       finalized = CMM_ACCESS_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
diff --git a/libringbuffer/ring_buffer_iterator.c b/libringbuffer/ring_buffer_iterator.c
deleted file mode 100644 (file)
index ff8e582..0000000
+++ /dev/null
@@ -1,793 +0,0 @@
-/*
- * ring_buffer_iterator.c
- *
- * (C) Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Ring buffer and channel iterators. Get each event of a channel in order. Uses
- * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
- * complexity for the "get next event" operation.
- *
- * Author:
- *     Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include "iterator.h"
-
-/*
- * Safety factor taking into account internal kernel interrupt latency.
- * Assuming 250ms worse-case latency.
- */
-#define MAX_SYSTEM_LATENCY     250
-
-/*
- * Maximum delta expected between trace clocks. At most 1 jiffy delta.
- */
-#define MAX_CLOCK_DELTA                (jiffies_to_usecs(1) * 1000)
-
-/**
- * lib_ring_buffer_get_next_record - Get the next record in a buffer.
- * @chan: channel
- * @buf: buffer
- *
- * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
- * buffer is empty and finalized. The buffer must already be opened for reading.
- */
-ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
-                                       struct lib_ring_buffer *buf)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       struct lib_ring_buffer_iter *iter = &buf->iter;
-       int ret;
-
-restart:
-       switch (iter->state) {
-       case ITER_GET_SUBBUF:
-               ret = lib_ring_buffer_get_next_subbuf(buf);
-               if (ret && !ACCESS_ONCE(buf->finalized)
-                   && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
-                       /*
-                        * Use "pull" scheme for global buffers. The reader
-                        * itself flushes the buffer to "pull" data not visible
-                        * to readers yet. Flush current subbuffer and re-try.
-                        *
-                        * Per-CPU buffers rather use a "push" scheme because
-                        * the IPI needed to flush all CPU's buffers is too
-                        * costly. In the "push" scheme, the reader waits for
-                        * the writer periodic deferrable timer to flush the
-                        * buffers (keeping track of a quiescent state
-                        * timestamp). Therefore, the writer "pushes" data out
-                        * of the buffers rather than letting the reader "pull"
-                        * data from the buffer.
-                        */
-                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
-                       ret = lib_ring_buffer_get_next_subbuf(buf);
-               }
-               if (ret)
-                       return ret;
-               iter->consumed = buf->cons_snapshot;
-               iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
-               iter->read_offset = iter->consumed;
-               /* skip header */
-               iter->read_offset += config->cb.subbuffer_header_size();
-               iter->state = ITER_TEST_RECORD;
-               goto restart;
-       case ITER_TEST_RECORD:
-               if (iter->read_offset - iter->consumed >= iter->data_size) {
-                       iter->state = ITER_PUT_SUBBUF;
-               } else {
-                       CHAN_WARN_ON(chan, !config->cb.record_get);
-                       config->cb.record_get(config, chan, buf,
-                                             iter->read_offset,
-                                             &iter->header_len,
-                                             &iter->payload_len,
-                                             &iter->timestamp);
-                       iter->read_offset += iter->header_len;
-                       subbuffer_consume_record(config, &buf->backend);
-                       iter->state = ITER_NEXT_RECORD;
-                       return iter->payload_len;
-               }
-               goto restart;
-       case ITER_NEXT_RECORD:
-               iter->read_offset += iter->payload_len;
-               iter->state = ITER_TEST_RECORD;
-               goto restart;
-       case ITER_PUT_SUBBUF:
-               lib_ring_buffer_put_next_subbuf(buf);
-               iter->state = ITER_GET_SUBBUF;
-               goto restart;
-       default:
-               CHAN_WARN_ON(chan, 1);  /* Should not happen */
-               return -EPERM;
-       }
-}
-EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
-
-static int buf_is_higher(void *a, void *b)
-{
-       struct lib_ring_buffer *bufa = a;
-       struct lib_ring_buffer *bufb = b;
-
-       /* Consider lowest timestamps to be at the top of the heap */
-       return (bufa->iter.timestamp < bufb->iter.timestamp);
-}
-
-static
-void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
-                                          struct channel *chan)
-{
-       struct lttng_ptr_heap *heap = &chan->iter.heap;
-       struct lib_ring_buffer *buf, *tmp;
-       ssize_t len;
-
-       list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
-                                iter.empty_node) {
-               len = lib_ring_buffer_get_next_record(chan, buf);
-
-               /*
-                * Deal with -EAGAIN and -ENODATA.
-                * len >= 0 means record contains data.
-                * -EBUSY should never happen, because we support only one
-                * reader.
-                */
-               switch (len) {
-               case -EAGAIN:
-                       /* Keep node in empty list */
-                       break;
-               case -ENODATA:
-                       /*
-                        * Buffer is finalized. Don't add to list of empty
-                        * buffer, because it has no more data to provide, ever.
-                        */
-                       list_del(&buf->iter.empty_node);
-                       break;
-               case -EBUSY:
-                       CHAN_WARN_ON(chan, 1);
-                       break;
-               default:
-                       /*
-                        * Insert buffer into the heap, remove from empty buffer
-                        * list.
-                        */
-                       CHAN_WARN_ON(chan, len < 0);
-                       list_del(&buf->iter.empty_node);
-                       CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
-               }
-       }
-}
-
-static
-void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
-                                struct channel *chan)
-{
-       u64 timestamp_qs;
-       unsigned long wait_msecs;
-
-       /*
-        * No need to wait if no empty buffers are present.
-        */
-       if (list_empty(&chan->iter.empty_head))
-               return;
-
-       timestamp_qs = config->cb.ring_buffer_clock_read(chan);
-       /*
-        * We need to consider previously empty buffers.
-        * Do a get next buf record on each of them. Add them to
-        * the heap if they have data. If at least one of them
-        * don't have data, we need to wait for
-        * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
-        * buffers have been switched either by the timer or idle entry) and
-        * check them again, adding them if they have data.
-        */
-       lib_ring_buffer_get_empty_buf_records(config, chan);
-
-       /*
-        * No need to wait if no empty buffers are present.
-        */
-       if (list_empty(&chan->iter.empty_head))
-               return;
-
-       /*
-        * We need to wait for the buffer switch timer to run. If the
-        * CPU is idle, idle entry performed the switch.
-        * TODO: we could optimize further by skipping the sleep if all
-        * empty buffers belong to idle or offline cpus.
-        */
-       wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
-       wait_msecs += MAX_SYSTEM_LATENCY;
-       msleep(wait_msecs);
-       lib_ring_buffer_get_empty_buf_records(config, chan);
-       /*
-        * Any buffer still in the empty list here cannot possibly
-        * contain an event with a timestamp prior to "timestamp_qs".
-        * The new quiescent state timestamp is the one we grabbed
-        * before waiting for buffer data.  It is therefore safe to
-        * ignore empty buffers up to last_qs timestamp for fusion
-        * merge.
-        */
-       chan->iter.last_qs = timestamp_qs;
-}
-
-/**
- * channel_get_next_record - Get the next record in a channel.
- * @chan: channel
- * @ret_buf: the buffer in which the event is located (output)
- *
- * Returns the size of new current event, -EAGAIN if all buffers are empty,
- * -ENODATA if all buffers are empty and finalized. The channel must already be
- * opened for reading.
- */
-
-ssize_t channel_get_next_record(struct channel *chan,
-                               struct lib_ring_buffer **ret_buf)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       struct lib_ring_buffer *buf;
-       struct lttng_ptr_heap *heap;
-       ssize_t len;
-
-       if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
-               *ret_buf = channel_get_ring_buffer(config, chan, 0);
-               return lib_ring_buffer_get_next_record(chan, *ret_buf);
-       }
-
-       heap = &chan->iter.heap;
-
-       /*
-        * get next record for topmost buffer.
-        */
-       buf = lttng_heap_maximum(heap);
-       if (buf) {
-               len = lib_ring_buffer_get_next_record(chan, buf);
-               /*
-                * Deal with -EAGAIN and -ENODATA.
-                * len >= 0 means record contains data.
-                */
-               switch (len) {
-               case -EAGAIN:
-                       buf->iter.timestamp = 0;
-                       list_add(&buf->iter.empty_node, &chan->iter.empty_head);
-                       /* Remove topmost buffer from the heap */
-                       CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
-                       break;
-               case -ENODATA:
-                       /*
-                        * Buffer is finalized. Remove buffer from heap and
-                        * don't add to list of empty buffer, because it has no
-                        * more data to provide, ever.
-                        */
-                       CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
-                       break;
-               case -EBUSY:
-                       CHAN_WARN_ON(chan, 1);
-                       break;
-               default:
-                       /*
-                        * Reinsert buffer into the heap. Note that heap can be
-                        * partially empty, so we need to use
-                        * lttng_heap_replace_max().
-                        */
-                       CHAN_WARN_ON(chan, len < 0);
-                       CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
-                       break;
-               }
-       }
-
-       buf = lttng_heap_maximum(heap);
-       if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
-               /*
-                * Deal with buffers previously showing no data.
-                * Add buffers containing data to the heap, update
-                * last_qs.
-                */
-               lib_ring_buffer_wait_for_qs(config, chan);
-       }
-
-       *ret_buf = buf = lttng_heap_maximum(heap);
-       if (buf) {
-               /*
-                * If this warning triggers, you probably need to check your
-                * system interrupt latency. Typical causes: too many printk()
-                * output going to a serial console with interrupts off.
-                * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
-                * Observed on SMP KVM setups with trace_clock().
-                */
-               if (chan->iter.last_timestamp
-                   > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
-                       printk(KERN_WARNING "ring_buffer: timestamps going "
-                              "backward. Last time %llu ns, cpu %d, "
-                              "current time %llu ns, cpu %d, "
-                              "delta %llu ns.\n",
-                              chan->iter.last_timestamp, chan->iter.last_cpu,
-                              buf->iter.timestamp, buf->backend.cpu,
-                              chan->iter.last_timestamp - buf->iter.timestamp);
-                       CHAN_WARN_ON(chan, 1);
-               }
-               chan->iter.last_timestamp = buf->iter.timestamp;
-               chan->iter.last_cpu = buf->backend.cpu;
-               return buf->iter.payload_len;
-       } else {
-               /* Heap is empty */
-               if (list_empty(&chan->iter.empty_head))
-                       return -ENODATA;        /* All buffers finalized */
-               else
-                       return -EAGAIN;         /* Temporarily empty */
-       }
-}
-EXPORT_SYMBOL_GPL(channel_get_next_record);
-
-static
-void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
-{
-       if (buf->iter.allocated)
-               return;
-
-       buf->iter.allocated = 1;
-       if (chan->iter.read_open && !buf->iter.read_open) {
-               CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
-               buf->iter.read_open = 1;
-       }
-
-       /* Add to list of buffers without any current record */
-       if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               list_add(&buf->iter.empty_node, &chan->iter.empty_head);
-}
-
-#ifdef CONFIG_HOTPLUG_CPU
-static
-int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb,
-                                          unsigned long action,
-                                          void *hcpu)
-{
-       unsigned int cpu = (unsigned long)hcpu;
-       struct channel *chan = container_of(nb, struct channel,
-                                           hp_iter_notifier);
-       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-
-       if (!chan->hp_iter_enable)
-               return NOTIFY_DONE;
-
-       CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
-
-       switch (action) {
-       case CPU_DOWN_FAILED:
-       case CPU_DOWN_FAILED_FROZEN:
-       case CPU_ONLINE:
-       case CPU_ONLINE_FROZEN:
-               lib_ring_buffer_iterator_init(chan, buf);
-               return NOTIFY_OK;
-       default:
-               return NOTIFY_DONE;
-       }
-}
-#endif
-
-int channel_iterator_init(struct channel *chan)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       struct lib_ring_buffer *buf;
-
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               int cpu, ret;
-
-               INIT_LIST_HEAD(&chan->iter.empty_head);
-               ret = lttng_heap_init(&chan->iter.heap,
-                               num_possible_cpus(),
-                               GFP_KERNEL, buf_is_higher);
-               if (ret)
-                       return ret;
-               /*
-                * In case of non-hotplug cpu, if the ring-buffer is allocated
-                * in early initcall, it will not be notified of secondary cpus.
-                * In that off case, we need to allocate for all possible cpus.
-                */
-#ifdef CONFIG_HOTPLUG_CPU
-               chan->hp_iter_notifier.notifier_call =
-                       channel_iterator_cpu_hotplug;
-               chan->hp_iter_notifier.priority = 10;
-               register_cpu_notifier(&chan->hp_iter_notifier);
-               get_online_cpus();
-               for_each_online_cpu(cpu) {
-                       buf = per_cpu_ptr(chan->backend.buf, cpu);
-                       lib_ring_buffer_iterator_init(chan, buf);
-               }
-               chan->hp_iter_enable = 1;
-               put_online_cpus();
-#else
-               for_each_possible_cpu(cpu) {
-                       buf = per_cpu_ptr(chan->backend.buf, cpu);
-                       lib_ring_buffer_iterator_init(chan, buf);
-               }
-#endif
-       } else {
-               buf = channel_get_ring_buffer(config, chan, 0);
-               lib_ring_buffer_iterator_init(chan, buf);
-       }
-       return 0;
-}
-
-void channel_iterator_unregister_notifiers(struct channel *chan)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               chan->hp_iter_enable = 0;
-               unregister_cpu_notifier(&chan->hp_iter_notifier);
-       }
-}
-
-void channel_iterator_free(struct channel *chan)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               lttng_heap_free(&chan->iter.heap);
-}
-
-int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
-{
-       struct channel *chan = buf->backend.chan;
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
-       return lib_ring_buffer_open_read(buf);
-}
-EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
-
-/*
- * Note: Iterators must not be mixed with other types of outputs, because an
- * iterator can leave the buffer in "GET" state, which is not consistent with
- * other types of output (mmap, splice, raw data read).
- */
-void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
-{
-       lib_ring_buffer_release_read(buf);
-}
-EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
-
-int channel_iterator_open(struct channel *chan)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       struct lib_ring_buffer *buf;
-       int ret = 0, cpu;
-
-       CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
-
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               get_online_cpus();
-               /* Allow CPU hotplug to keep track of opened reader */
-               chan->iter.read_open = 1;
-               for_each_channel_cpu(cpu, chan) {
-                       buf = channel_get_ring_buffer(config, chan, cpu);
-                       ret = lib_ring_buffer_iterator_open(buf);
-                       if (ret)
-                               goto error;
-                       buf->iter.read_open = 1;
-               }
-               put_online_cpus();
-       } else {
-               buf = channel_get_ring_buffer(config, chan, 0);
-               ret = lib_ring_buffer_iterator_open(buf);
-       }
-       return ret;
-error:
-       /* Error should always happen on CPU 0, hence no close is required. */
-       CHAN_WARN_ON(chan, cpu != 0);
-       put_online_cpus();
-       return ret;
-}
-EXPORT_SYMBOL_GPL(channel_iterator_open);
-
-void channel_iterator_release(struct channel *chan)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       struct lib_ring_buffer *buf;
-       int cpu;
-
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               get_online_cpus();
-               for_each_channel_cpu(cpu, chan) {
-                       buf = channel_get_ring_buffer(config, chan, cpu);
-                       if (buf->iter.read_open) {
-                               lib_ring_buffer_iterator_release(buf);
-                               buf->iter.read_open = 0;
-                       }
-               }
-               chan->iter.read_open = 0;
-               put_online_cpus();
-       } else {
-               buf = channel_get_ring_buffer(config, chan, 0);
-               lib_ring_buffer_iterator_release(buf);
-       }
-}
-EXPORT_SYMBOL_GPL(channel_iterator_release);
-
-void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
-{
-       struct channel *chan = buf->backend.chan;
-
-       if (buf->iter.state != ITER_GET_SUBBUF)
-               lib_ring_buffer_put_next_subbuf(buf);
-       buf->iter.state = ITER_GET_SUBBUF;
-       /* Remove from heap (if present). */
-       if (lttng_heap_cherrypick(&chan->iter.heap, buf))
-               list_add(&buf->iter.empty_node, &chan->iter.empty_head);
-       buf->iter.timestamp = 0;
-       buf->iter.header_len = 0;
-       buf->iter.payload_len = 0;
-       buf->iter.consumed = 0;
-       buf->iter.read_offset = 0;
-       buf->iter.data_size = 0;
-       /* Don't reset allocated and read_open */
-}
-
-void channel_iterator_reset(struct channel *chan)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       struct lib_ring_buffer *buf;
-       int cpu;
-
-       /* Empty heap, put into empty_head */
-       while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
-               list_add(&buf->iter.empty_node, &chan->iter.empty_head);
-
-       for_each_channel_cpu(cpu, chan) {
-               buf = channel_get_ring_buffer(config, chan, cpu);
-               lib_ring_buffer_iterator_reset(buf);
-       }
-       /* Don't reset read_open */
-       chan->iter.last_qs = 0;
-       chan->iter.last_timestamp = 0;
-       chan->iter.last_cpu = 0;
-       chan->iter.len_left = 0;
-}
-
-/*
- * Ring buffer payload extraction read() implementation.
- */
-static
-ssize_t channel_ring_buffer_file_read(struct file *filp,
-                                     char __user *user_buf,
-                                     size_t count,
-                                     loff_t *ppos,
-                                     struct channel *chan,
-                                     struct lib_ring_buffer *buf,
-                                     int fusionmerge)
-{
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       size_t read_count = 0, read_offset;
-       ssize_t len;
-
-       might_sleep();
-       if (!access_ok(VERIFY_WRITE, user_buf, count))
-               return -EFAULT;
-
-       /* Finish copy of previous record */
-       if (*ppos != 0) {
-               if (read_count < count) {
-                       len = chan->iter.len_left;
-                       read_offset = *ppos;
-                       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
-                           && fusionmerge)
-                               buf = lttng_heap_maximum(&chan->iter.heap);
-                       CHAN_WARN_ON(chan, !buf);
-                       goto skip_get_next;
-               }
-       }
-
-       while (read_count < count) {
-               size_t copy_len, space_left;
-
-               if (fusionmerge)
-                       len = channel_get_next_record(chan, &buf);
-               else
-                       len = lib_ring_buffer_get_next_record(chan, buf);
-len_test:
-               if (len < 0) {
-                       /*
-                        * Check if buffer is finalized (end of file).
-                        */
-                       if (len == -ENODATA) {
-                               /* A 0 read_count will tell about end of file */
-                               goto nodata;
-                       }
-                       if (filp->f_flags & O_NONBLOCK) {
-                               if (!read_count)
-                                       read_count = -EAGAIN;
-                               goto nodata;
-                       } else {
-                               int error;
-
-                               /*
-                                * No data available at the moment, return what
-                                * we got.
-                                */
-                               if (read_count)
-                                       goto nodata;
-
-                               /*
-                                * Wait for returned len to be >= 0 or -ENODATA.
-                                */
-                               if (fusionmerge)
-                                       error = wait_event_interruptible(
-                                         chan->read_wait,
-                                         ((len = channel_get_next_record(chan,
-                                               &buf)), len != -EAGAIN));
-                               else
-                                       error = wait_event_interruptible(
-                                         buf->read_wait,
-                                         ((len = lib_ring_buffer_get_next_record(
-                                                 chan, buf)), len != -EAGAIN));
-                               CHAN_WARN_ON(chan, len == -EBUSY);
-                               if (error) {
-                                       read_count = error;
-                                       goto nodata;
-                               }
-                               CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
-                               goto len_test;
-                       }
-               }
-               read_offset = buf->iter.read_offset;
-skip_get_next:
-               space_left = count - read_count;
-               if (len <= space_left) {
-                       copy_len = len;
-                       chan->iter.len_left = 0;
-                       *ppos = 0;
-               } else {
-                       copy_len = space_left;
-                       chan->iter.len_left = len - copy_len;
-                       *ppos = read_offset + copy_len;
-               }
-               if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
-                                              &user_buf[read_count],
-                                              copy_len)) {
-                       /*
-                        * Leave the len_left and ppos values at their current
-                        * state, as we currently have a valid event to read.
-                        */
-                       return -EFAULT;
-               }
-               read_count += copy_len;
-       };
-       return read_count;
-
-nodata:
-       *ppos = 0;
-       chan->iter.len_left = 0;
-       return read_count;
-}
-
-/**
- * lib_ring_buffer_file_read - Read buffer record payload.
- * @filp: file structure pointer.
- * @buffer: user buffer to read data into.
- * @count: number of bytes to read.
- * @ppos: file read position.
- *
- * Returns a negative value on error, or the number of bytes read on success.
- * ppos is used to save the position _within the current record_ between calls
- * to read().
- */
-static
-ssize_t lib_ring_buffer_file_read(struct file *filp,
-                                 char __user *user_buf,
-                                 size_t count,
-                                 loff_t *ppos)
-{
-       struct inode *inode = filp->f_dentry->d_inode;
-       struct lib_ring_buffer *buf = inode->i_private;
-       struct channel *chan = buf->backend.chan;
-
-       return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
-                                            chan, buf, 0);
-}
-
-/**
- * channel_file_read - Read channel record payload.
- * @filp: file structure pointer.
- * @buffer: user buffer to read data into.
- * @count: number of bytes to read.
- * @ppos: file read position.
- *
- * Returns a negative value on error, or the number of bytes read on success.
- * ppos is used to save the position _within the current record_ between calls
- * to read().
- */
-static
-ssize_t channel_file_read(struct file *filp,
-                         char __user *user_buf,
-                         size_t count,
-                         loff_t *ppos)
-{
-       struct inode *inode = filp->f_dentry->d_inode;
-       struct channel *chan = inode->i_private;
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               return channel_ring_buffer_file_read(filp, user_buf, count,
-                                                    ppos, chan, NULL, 1);
-       else {
-               struct lib_ring_buffer *buf =
-                       channel_get_ring_buffer(config, chan, 0);
-               return channel_ring_buffer_file_read(filp, user_buf, count,
-                                                    ppos, chan, buf, 0);
-       }
-}
-
-static
-int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
-{
-       struct lib_ring_buffer *buf = inode->i_private;
-       int ret;
-
-       ret = lib_ring_buffer_iterator_open(buf);
-       if (ret)
-               return ret;
-
-       file->private_data = buf;
-       ret = nonseekable_open(inode, file);
-       if (ret)
-               goto release_iter;
-       return 0;
-
-release_iter:
-       lib_ring_buffer_iterator_release(buf);
-       return ret;
-}
-
-static
-int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
-{
-       struct lib_ring_buffer *buf = inode->i_private;
-
-       lib_ring_buffer_iterator_release(buf);
-       return 0;
-}
-
-static
-int channel_file_open(struct inode *inode, struct file *file)
-{
-       struct channel *chan = inode->i_private;
-       int ret;
-
-       ret = channel_iterator_open(chan);
-       if (ret)
-               return ret;
-
-       file->private_data = chan;
-       ret = nonseekable_open(inode, file);
-       if (ret)
-               goto release_iter;
-       return 0;
-
-release_iter:
-       channel_iterator_release(chan);
-       return ret;
-}
-
-static
-int channel_file_release(struct inode *inode, struct file *file)
-{
-       struct channel *chan = inode->i_private;
-
-       channel_iterator_release(chan);
-       return 0;
-}
-
-const struct file_operations channel_payload_file_operations = {
-       .open = channel_file_open,
-       .release = channel_file_release,
-       .read = channel_file_read,
-       .llseek = lib_ring_buffer_no_llseek,
-};
-EXPORT_SYMBOL_GPL(channel_payload_file_operations);
-
-const struct file_operations lib_ring_buffer_payload_file_operations = {
-       .open = lib_ring_buffer_file_open,
-       .release = lib_ring_buffer_file_release,
-       .read = lib_ring_buffer_file_read,
-       .llseek = lib_ring_buffer_no_llseek,
-};
-EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);
diff --git a/libringbuffer/ring_buffer_vfs.c b/libringbuffer/ring_buffer_vfs.c
deleted file mode 100644 (file)
index c105fe0..0000000
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * ring_buffer_vfs.c
- *
- * Copyright (C) 2009-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Ring Buffer VFS file operations.
- *
- * Dual LGPL v2.1/GPL v2 license.
- */
-
-#include "backend.h"
-#include "frontend.h"
-#include "vfs.h"
-
-static int put_ulong(unsigned long val, unsigned long arg)
-{
-       return put_user(val, (unsigned long __user *)arg);
-}
-
-#ifdef CONFIG_COMPAT
-static int compat_put_ulong(compat_ulong_t val, unsigned long arg)
-{
-       return put_user(val, (compat_ulong_t __user *)compat_ptr(arg));
-}
-#endif
-
-/**
- *     lib_ring_buffer_open - ring buffer open file operation
- *     @inode: opened inode
- *     @file: opened file
- *
- *     Open implementation. Makes sure only one open instance of a buffer is
- *     done at a given moment.
- */
-int lib_ring_buffer_open(struct inode *inode, struct file *file)
-{
-       struct lib_ring_buffer *buf = inode->i_private;
-       int ret;
-
-       ret = lib_ring_buffer_open_read(buf);
-       if (ret)
-               return ret;
-
-       file->private_data = buf;
-       ret = nonseekable_open(inode, file);
-       if (ret)
-               goto release_read;
-       return 0;
-
-release_read:
-       lib_ring_buffer_release_read(buf);
-       return ret;
-}
-
-/**
- *     lib_ring_buffer_release - ring buffer release file operation
- *     @inode: opened inode
- *     @file: opened file
- *
- *     Release implementation.
- */
-int lib_ring_buffer_release(struct inode *inode, struct file *file)
-{
-       struct lib_ring_buffer *buf = file->private_data;
-
-       lib_ring_buffer_release_read(buf);
-
-       return 0;
-}
-
-/**
- *     lib_ring_buffer_poll - ring buffer poll file operation
- *     @filp: the file
- *     @wait: poll table
- *
- *     Poll implementation.
- */
-unsigned int lib_ring_buffer_poll(struct file *filp, poll_table *wait)
-{
-       unsigned int mask = 0;
-       struct lib_ring_buffer *buf = filp->private_data;
-       struct channel *chan = buf->backend.chan;
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-       int finalized, disabled;
-
-       if (filp->f_mode & FMODE_READ) {
-               poll_wait_set_exclusive(wait);
-               poll_wait(filp, &buf->read_wait, wait);
-
-               finalized = lib_ring_buffer_is_finalized(config, buf);
-               disabled = lib_ring_buffer_channel_is_disabled(chan);
-
-               /*
-                * lib_ring_buffer_is_finalized() contains a smp_rmb() ordering
-                * finalized load before offsets loads.
-                */
-               WARN_ON(atomic_long_read(&buf->active_readers) != 1);
-retry:
-               if (disabled)
-                       return POLLERR;
-
-               if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf), chan)
-                 - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf), chan)
-                 == 0) {
-                       if (finalized)
-                               return POLLHUP;
-                       else {
-                               /*
-                                * The memory barriers
-                                * __wait_event()/wake_up_interruptible() take
-                                * care of "raw_spin_is_locked" memory ordering.
-                                */
-                               if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock))
-                                       goto retry;
-                               else
-                                       return 0;
-                       }
-               } else {
-                       if (subbuf_trunc(lib_ring_buffer_get_offset(config, buf),
-                                        chan)
-                         - subbuf_trunc(lib_ring_buffer_get_consumed(config, buf),
-                                        chan)
-                         >= chan->backend.buf_size)
-                               return POLLPRI | POLLRDBAND;
-                       else
-                               return POLLIN | POLLRDNORM;
-               }
-       }
-       return mask;
-}
-
-/**
- *     lib_ring_buffer_ioctl - control ring buffer reader synchronization
- *
- *     @filp: the file
- *     @cmd: the command
- *     @arg: command arg
- *
- *     This ioctl implements commands necessary for producer/consumer
- *     and flight recorder reader interaction :
- *     RING_BUFFER_GET_NEXT_SUBBUF
- *             Get the next sub-buffer that can be read. It never blocks.
- *     RING_BUFFER_PUT_NEXT_SUBBUF
- *             Release the currently read sub-buffer.
- *     RING_BUFFER_GET_SUBBUF_SIZE
- *             returns the size of the current sub-buffer.
- *     RING_BUFFER_GET_MAX_SUBBUF_SIZE
- *             returns the maximum size for sub-buffers.
- *     RING_BUFFER_GET_NUM_SUBBUF
- *             returns the number of reader-visible sub-buffers in the per cpu
- *              channel (for mmap).
- *      RING_BUFFER_GET_MMAP_READ_OFFSET
- *              returns the offset of the subbuffer belonging to the reader.
- *              Should only be used for mmap clients.
- */
-long lib_ring_buffer_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
-{
-       struct lib_ring_buffer *buf = filp->private_data;
-       struct channel *chan = buf->backend.chan;
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-
-       if (lib_ring_buffer_channel_is_disabled(chan))
-               return -EIO;
-
-       switch (cmd) {
-       case RING_BUFFER_SNAPSHOT:
-               return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
-                                           &buf->prod_snapshot);
-       case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
-               return put_ulong(buf->cons_snapshot, arg);
-       case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
-               return put_ulong(buf->prod_snapshot, arg);
-       case RING_BUFFER_GET_SUBBUF:
-       {
-               unsigned long uconsume;
-               long ret;
-
-               ret = get_user(uconsume, (unsigned long __user *) arg);
-               if (ret)
-                       return ret; /* will return -EFAULT */
-               ret = lib_ring_buffer_get_subbuf(buf, uconsume);
-               if (!ret) {
-                       /* Set file position to zero at each successful "get" */
-                       filp->f_pos = 0;
-               }
-               return ret;
-       }
-       case RING_BUFFER_PUT_SUBBUF:
-               lib_ring_buffer_put_subbuf(buf);
-               return 0;
-
-       case RING_BUFFER_GET_NEXT_SUBBUF:
-       {
-               long ret;
-
-               ret = lib_ring_buffer_get_next_subbuf(buf);
-               if (!ret) {
-                       /* Set file position to zero at each successful "get" */
-                       filp->f_pos = 0;
-               }
-               return ret;
-       }
-       case RING_BUFFER_PUT_NEXT_SUBBUF:
-               lib_ring_buffer_put_next_subbuf(buf);
-               return 0;
-       case RING_BUFFER_GET_SUBBUF_SIZE:
-               return put_ulong(lib_ring_buffer_get_read_data_size(config, buf),
-                                arg);
-       case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
-       {
-               unsigned long size;
-
-               size = lib_ring_buffer_get_read_data_size(config, buf);
-               size = PAGE_ALIGN(size);
-               return put_ulong(size, arg);
-       }
-       case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
-               return put_ulong(chan->backend.subbuf_size, arg);
-       case RING_BUFFER_GET_MMAP_LEN:
-       {
-               unsigned long mmap_buf_len;
-
-               if (config->output != RING_BUFFER_MMAP)
-                       return -EINVAL;
-               mmap_buf_len = chan->backend.buf_size;
-               if (chan->backend.extra_reader_sb)
-                       mmap_buf_len += chan->backend.subbuf_size;
-               if (mmap_buf_len > INT_MAX)
-                       return -EFBIG;
-               return put_ulong(mmap_buf_len, arg);
-       }
-       case RING_BUFFER_GET_MMAP_READ_OFFSET:
-       {
-               unsigned long sb_bindex;
-
-               if (config->output != RING_BUFFER_MMAP)
-                       return -EINVAL;
-               sb_bindex = subbuffer_id_get_index(config,
-                                                  buf->backend.buf_rsb.id);
-               return put_ulong(buf->backend.array[sb_bindex]->mmap_offset,
-                                arg);
-       }
-       case RING_BUFFER_FLUSH:
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
-               return 0;
-       default:
-               return -ENOIOCTLCMD;
-       }
-}
-
-#ifdef CONFIG_COMPAT
-long lib_ring_buffer_compat_ioctl(struct file *filp, unsigned int cmd,
-                                 unsigned long arg)
-{
-       struct lib_ring_buffer *buf = filp->private_data;
-       struct channel *chan = buf->backend.chan;
-       const struct lib_ring_buffer_config *config = chan->backend.config;
-
-       if (lib_ring_buffer_channel_is_disabled(chan))
-               return -EIO;
-
-       switch (cmd) {
-       case RING_BUFFER_SNAPSHOT:
-               return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
-                                               &buf->prod_snapshot);
-       case RING_BUFFER_SNAPSHOT_GET_CONSUMED:
-               return compat_put_ulong(buf->cons_snapshot, arg);
-       case RING_BUFFER_SNAPSHOT_GET_PRODUCED:
-               return compat_put_ulong(buf->prod_snapshot, arg);
-       case RING_BUFFER_GET_SUBBUF:
-       {
-               __u32 uconsume;
-               unsigned long consume;
-               long ret;
-
-               ret = get_user(uconsume, (__u32 __user *) arg);
-               if (ret)
-                       return ret; /* will return -EFAULT */
-               consume = buf->cons_snapshot;
-               consume &= ~0xFFFFFFFFL;
-               consume |= uconsume;
-               ret = lib_ring_buffer_get_subbuf(buf, consume);
-               if (!ret) {
-                       /* Set file position to zero at each successful "get" */
-                       filp->f_pos = 0;
-               }
-               return ret;
-       }
-       case RING_BUFFER_PUT_SUBBUF:
-               lib_ring_buffer_put_subbuf(buf);
-               return 0;
-
-       case RING_BUFFER_GET_NEXT_SUBBUF:
-       {
-               long ret;
-
-               ret = lib_ring_buffer_get_next_subbuf(buf);
-               if (!ret) {
-                       /* Set file position to zero at each successful "get" */
-                       filp->f_pos = 0;
-               }
-               return ret;
-       }
-       case RING_BUFFER_PUT_NEXT_SUBBUF:
-               lib_ring_buffer_put_next_subbuf(buf);
-               return 0;
-       case RING_BUFFER_GET_SUBBUF_SIZE:
-       {
-               unsigned long data_size;
-
-               data_size = lib_ring_buffer_get_read_data_size(config, buf);
-               if (data_size > UINT_MAX)
-                       return -EFBIG;
-               return put_ulong(data_size, arg);
-       }
-       case RING_BUFFER_GET_PADDED_SUBBUF_SIZE:
-       {
-               unsigned long size;
-
-               size = lib_ring_buffer_get_read_data_size(config, buf);
-               size = PAGE_ALIGN(size);
-               if (size > UINT_MAX)
-                       return -EFBIG;
-               return put_ulong(size, arg);
-       }
-       case RING_BUFFER_GET_MAX_SUBBUF_SIZE:
-               if (chan->backend.subbuf_size > UINT_MAX)
-                       return -EFBIG;
-               return put_ulong(chan->backend.subbuf_size, arg);
-       case RING_BUFFER_GET_MMAP_LEN:
-       {
-               unsigned long mmap_buf_len;
-
-               if (config->output != RING_BUFFER_MMAP)
-                       return -EINVAL;
-               mmap_buf_len = chan->backend.buf_size;
-               if (chan->backend.extra_reader_sb)
-                       mmap_buf_len += chan->backend.subbuf_size;
-               if (mmap_buf_len > UINT_MAX)
-                       return -EFBIG;
-               return put_ulong(mmap_buf_len, arg);
-       }
-       case RING_BUFFER_GET_MMAP_READ_OFFSET:
-       {
-               unsigned long sb_bindex, read_offset;
-
-               if (config->output != RING_BUFFER_MMAP)
-                       return -EINVAL;
-               sb_bindex = subbuffer_id_get_index(config,
-                                                  buf->backend.buf_rsb.id);
-               read_offset = buf->backend.array[sb_bindex]->mmap_offset;
-               if (read_offset > UINT_MAX)
-                       return -EINVAL;
-               return put_ulong(read_offset, arg);
-       }
-       case RING_BUFFER_FLUSH:
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
-               return 0;
-       default:
-               return -ENOIOCTLCMD;
-       }
-}
-#endif
-
-const struct file_operations lib_ring_buffer_file_operations = {
-       .open = lib_ring_buffer_open,
-       .release = lib_ring_buffer_release,
-       .poll = lib_ring_buffer_poll,
-       .splice_read = lib_ring_buffer_splice_read,
-       .mmap = lib_ring_buffer_mmap,
-       .unlocked_ioctl = lib_ring_buffer_ioctl,
-       .llseek = lib_ring_buffer_no_llseek,
-#ifdef CONFIG_COMPAT
-       .compat_ioctl = lib_ring_buffer_compat_ioctl,
-#endif
-};
-EXPORT_SYMBOL_GPL(lib_ring_buffer_file_operations);
-
-MODULE_LICENSE("GPL and additional rights");
-MODULE_AUTHOR("Mathieu Desnoyers");
-MODULE_DESCRIPTION("Ring Buffer Library VFS");
index eea6d049c71d995545054c5775a0b9cda43baf59..831ea0431a00777f724e4dc9587a263d20d31b47 100644 (file)
@@ -9,6 +9,9 @@
  * Dual LGPL v2.1/GPL v2 license.
  */
 
+#include <assert.h>
+#include <urcu/uatomic.h>
+
 /*
  * Same data type (long) accessed differently depending on configuration.
  * v field is for non-atomic access (protected by mutual exclusion).
  * atomic_long_t is used for globally shared buffers.
  */
 union v_atomic {
-       local_t l;
-       atomic_long_t a;
+       long a; /* accessed through uatomic */
        long v;
 };
 
 static inline
 long v_read(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
 {
-       if (config->sync == RING_BUFFER_SYNC_PER_CPU)
-               return local_read(&v_a->l);
-       else
-               return atomic_long_read(&v_a->a);
+       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+       return uatomic_read(&v_a->a);
 }
 
 static inline
 void v_set(const struct lib_ring_buffer_config *config, union v_atomic *v_a,
           long v)
 {
-       if (config->sync == RING_BUFFER_SYNC_PER_CPU)
-               local_set(&v_a->l, v);
-       else
-               atomic_long_set(&v_a->a, v);
+       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+       uatomic_set(&v_a->a, v);
 }
 
 static inline
 void v_add(const struct lib_ring_buffer_config *config, long v, union v_atomic *v_a)
 {
-       if (config->sync == RING_BUFFER_SYNC_PER_CPU)
-               local_add(v, &v_a->l);
-       else
-               atomic_long_add(v, &v_a->a);
+       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+       uatomic_add(&v_a->a, v);
 }
 
 static inline
 void v_inc(const struct lib_ring_buffer_config *config, union v_atomic *v_a)
 {
-       if (config->sync == RING_BUFFER_SYNC_PER_CPU)
-               local_inc(&v_a->l);
-       else
-               atomic_long_inc(&v_a->a);
+       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+       uatomic_inc(&v_a->a);
 }
 
 /*
@@ -73,10 +67,8 @@ static inline
 long v_cmpxchg(const struct lib_ring_buffer_config *config, union v_atomic *v_a,
               long old, long _new)
 {
-       if (config->sync == RING_BUFFER_SYNC_PER_CPU)
-               return local_cmpxchg(&v_a->l, old, _new);
-       else
-               return atomic_long_cmpxchg(&v_a->a, old, _new);
+       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
+       return uatomic_cmpxchg(&v_a->a, old, _new);
 }
 
 #endif /* _LINUX_RING_BUFFER_VATOMIC_H */
This page took 0.0571 seconds and 4 git commands to generate.