From: Paolo Bonzini Date: Mon, 1 Mar 2010 19:57:10 +0000 (-0500) Subject: lock-free queue with RCU-based garbage collection X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=5ad7ff663ad3de34aae0e46ba3ee962f99f888c1;p=urcu.git lock-free queue with RCU-based garbage collection A lock-free queue example using RCU to avoid the need for double-word compare-and-swap and, at the same time, to implement efficient garbage collection. I tested it lightly, and I don't think it's fully ready for inclusion. It does need the previous two patches to avoid deadlocks, so I did test it somewhat. :-) See individual files for detailed comments on how it works. Cc: Paul E. McKenney --- diff --git a/tests/Makefile.am b/tests/Makefile.am index 17b89db..2ebaae5 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -12,7 +12,12 @@ noinst_PROGRAMS = test_urcu test_urcu_dynamic_link test_urcu_timing \ test_urcu_mb_gc test_qsbr_gc test_qsbr_lgc test_urcu_signal_lgc \ test_urcu_mb_lgc test_qsbr_dynamic_link test_urcu_defer \ test_uatomic test_urcu_assign test_urcu_assign_dynamic_link \ - test_urcu_bp test_urcu_bp_dynamic_link + test_urcu_bp test_urcu_bp_dynamic_link \ + test_urcu_lfq test_urcu_signal_lfq test_urcu_dynamic_link_lfq \ + test_urcu_signal_dynamic_link_lfq test_urcu_mb_lfq \ + test_urcu_yield_lfq test_urcu_signal_yield_lfq test_urcu_defer_lfq \ + test_qsbr_lfq test_qsbr_dynamic_link_lfq + noinst_HEADERS = rcutorture.h if COMPAT_ARCH @@ -149,4 +154,33 @@ test_urcu_bp_SOURCES = test_urcu_bp.c $(URCU_BP) test_urcu_bp_dynamic_link_SOURCES = test_urcu_bp.c $(URCU_BP) test_urcu_bp_dynamic_link_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS) +test_urcu_lfq_SOURCES = test_urcu_lfq.c $(URCU) + +test_urcu_defer_lfq_SOURCES = test_urcu_defer_lfq.c $(URCU_DEFER) + +test_urcu_signal_lfq_SOURCES = test_urcu_lfq.c $(URCU_SIGNAL) +test_urcu_signal_lfq_CFLAGS = -DRCU_SIGNAL $(AM_CFLAGS) + +test_urcu_dynamic_link_lfq_SOURCES = test_urcu_lfq.c $(URCU) +test_urcu_dynamic_link_lfq_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS) + +test_urcu_signal_dynamic_link_lfq_SOURCES = test_urcu_lfq.c $(URCU) +test_urcu_signal_dynamic_link_lfq_CFLAGS = -DRCU_SIGNAL -DDYNAMIC_LINK_TEST $(AM_CFLAGS) + +test_urcu_mb_lfq_SOURCES = test_urcu_lfq.c $(URCU_MB) +test_urcu_mb_lfq_CFLAGS = -DRCU_MB $(AM_CFLAGS) + +test_urcu_yield_lfq_SOURCES = test_urcu_lfq.c $(URCU) +test_urcu_yield_lfq_CFLAGS = -DDEBUG_YIELD $(AM_CFLAGS) + +test_urcu_signal_yield_lfq_SOURCES = test_urcu_lfq.c $(URCU_SIGNAL) +test_urcu_signal_yield_lfq_CFLAGS = -DRCU_SIGNAL -DDEBUG_YIELD $(AM_CFLAGS) + +test_qsbr_lfq_SOURCES = test_qsbr_lfq.c $(URCU_QSBR) + +test_qsbr_dynamic_link_lfq_SOURCES = test_qsbr_lfq.c $(URCU_QSBR) +test_qsbr_dynamic_link_lfq_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS) + + + urcutorture.c: api.h diff --git a/tests/test_qsbr_lfq.c b/tests/test_qsbr_lfq.c new file mode 100644 index 0000000..2e3f130 --- /dev/null +++ b/tests/test_qsbr_lfq.c @@ -0,0 +1,587 @@ +/* + * test_urcu.c + * + * Userspace RCU library - example RCU-based lock-free queue + * + * Copyright February 2010 - Paolo Bonzini + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include "../config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/* hardcoded number of CPUs */ +#define NR_CPUS 16384 + +#if defined(_syscall0) +_syscall0(pid_t, gettid) +#elif defined(__NR_gettid) +static inline pid_t gettid(void) +{ + return syscall(__NR_gettid); +} +#else +#warning "use pid as tid" +static inline pid_t gettid(void) +{ + return getpid(); +} +#endif + +#ifndef DYNAMIC_LINK_TEST +#define _LGPL_SOURCE +#endif +#include "urcu-qsbr.h" + + +static volatile int test_go, test_stop; + +static unsigned long rduration; + +static unsigned long duration; + +/* read-side C.S. duration, in loops */ +static unsigned long wdelay; + +static inline void loop_sleep(unsigned long l) +{ + while(l-- != 0) + cpu_relax(); +} + +static int verbose_mode; + +#define printf_verbose(fmt, args...) \ + do { \ + if (verbose_mode) \ + printf(fmt, args); \ + } while (0) + +static unsigned int cpu_affinities[NR_CPUS]; +static unsigned int next_aff = 0; +static int use_affinity = 0; + +pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER; + +#ifndef HAVE_CPU_SET_T +typedef unsigned long cpu_set_t; +# define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0) +# define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0) +#endif + +static void set_affinity(void) +{ + cpu_set_t mask; + int cpu; + int ret; + + if (!use_affinity) + return; + +#if HAVE_SCHED_SETAFFINITY + ret = pthread_mutex_lock(&affinity_mutex); + if (ret) { + perror("Error in pthread mutex lock"); + exit(-1); + } + cpu = cpu_affinities[next_aff++]; + ret = pthread_mutex_unlock(&affinity_mutex); + if (ret) { + perror("Error in pthread mutex unlock"); + exit(-1); + } + + CPU_ZERO(&mask); + CPU_SET(cpu, &mask); +#if SCHED_SETAFFINITY_ARGS == 2 + sched_setaffinity(0, &mask); +#else + sched_setaffinity(0, sizeof(mask), &mask); +#endif +#endif /* HAVE_SCHED_SETAFFINITY */ +} + +/* + * returns 0 if test should end. + */ +static int test_duration_dequeue(void) +{ + return !test_stop; +} + +static int test_duration_enqueue(void) +{ + return !test_stop; +} + +static unsigned long long __thread nr_dequeues; +static unsigned long long __thread nr_enqueues; + +static unsigned long nr_successful_dequeues; +static unsigned int nr_enqueuers; +static unsigned int nr_dequeuers; + + +#define ARRAY_POISON 0xDEADBEEF +#define PAGE_SIZE 4096 +#define PAGE_MASK (PAGE_SIZE - 1) +#define NODES_PER_PAGE (PAGE_SIZE - 16) / sizeof (struct node) + +/* Lock-free queue, using the RCU to avoid the ABA problem and (more + interestingly) to efficiently handle freeing memory. + + We have to protect both the enqueuer and dequeuer's compare-and- + exchange operation from running across a free and a subsequent + reallocation of the same memory. So, we protect the free with + synchronize_rcu; this is enough because all the allocations take + place before the compare-and-exchange ops. + + Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard + implementation of a lock-free-queue. The first node in the queue is + always dummy: dequeuing returns the data from HEAD->NEXT, advances + HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the + old HEAD. Since RCU avoids the ABA problem, it doesn't use double-word + compare-and-exchange operations. Node allocation and deallocation are + a "black box" and synchronize_rcu is hidden within node deallocation. + + So, the tricky part is finding a good allocation strategy for nodes. + The allocator for nodes is shared by multiple threads, and since + malloc/free are not lock-free a layer above them is obviously + necessary: otherwise the whole exercise is useless. In addition, + to avoid penalizing dequeues, the allocator should avoid frequent + synchronization (because synchronize_rcu is expensive). + + The scheme that is used here uses a page as the allocation + unit for nodes. A page is freed when no more nodes are in use. + Nodes from a page are never reused. + + The nodes are allocated from Q->CURRENT. Since whoever finds a full + page has to busy wait, we use a trick to limit the duration of busy + waiting. A free page Q->FREE is always kept ready, so that any thread + that allocates the last node in a page, or finds a full page can try + to update Q->CURRENT. Whoever loses the race has to busy wait, OTOH + whoever wins the race has to allocate the new Q->FREE. In other words, + if the following sequence happens + + Thread 1 Thread 2 other threads + ----------------------------------------------------------------------- + Get last node from page + q->current = q->free; + fill up q->current + q->current = q->free fails + + then thread 1 does not have anymore the duty of allocating q->current; + thread 2 will do that. If a thread finds a full current page and + Q->CURRENT == Q->FREE, this means that another thread is going to + allocate Q->FREE soon, and it busy waits. After the allocation + finishes, everything proceeds normally: some thread will take care + of setting Q->CURRENT and allocating a new Q->FREE. + + One common scheme for allocation is to use a free list (implemented + as a lock-free stack), but this free list is potentially unbounded. + Instead, with the above scheme the number of live pages at any time + is equal to the number of enqueuing threads. */ + +struct node { + void *data; + void *next; +}; + +struct node_page { + int in; + int out; + char padding[16 - sizeof(int) * 2]; + struct node nodes[NODES_PER_PAGE]; +}; + + +struct queue { + struct node_page *current, *free; + struct node *head, *tail; +}; + +static struct node_page *new_node_page() +{ + struct node_page *p = valloc (PAGE_SIZE); + rcu_quiescent_state(); + p->in = p->out = 0; + return p; +} + +static void free_node_page(struct node_page *p) +{ + /* Help making sure that accessing a dangling pointer is + adequately punished. */ + p->in = ARRAY_POISON; + free (p); +} + +static struct node *new_node(struct queue *q) +{ + struct node *n; + struct node_page *p; + int i; + + do { + p = q->current; + i = p->in; + if (i >= NODES_PER_PAGE - 1 && + q->free != p && + uatomic_cmpxchg(&q->current, p, q->free) == p) + q->free = new_node_page(); + + } while (i == NODES_PER_PAGE || uatomic_cmpxchg(&p->in, i, i+1) != i); + + assert (i >= 0 && i < NODES_PER_PAGE); + n = &p->nodes[i]; + n->next = NULL; + return n; +} + +void free_node(struct node *n) +{ + struct node_page *p = (struct node_page *) ((intptr_t) n & ~PAGE_MASK); + + if (uatomic_add_return(&p->out, 1) == NODES_PER_PAGE) { + rcu_quiescent_state(); + synchronize_rcu(); + free_node_page(p); + } +} + +void init_queue(struct queue *q) +{ + q->current = new_node_page(); + q->free = new_node_page(); + q->head = q->tail = new_node(q); +} + +void enqueue(struct queue *q, void *value) +{ + struct node *n = new_node(q); + n->data = value; + rcu_read_lock(); + for (;;) { + struct node *tail = rcu_dereference(q->tail); + struct node *next = rcu_dereference(tail->next); + if (tail != q->tail) { + /* A change occurred while reading the values. */ + continue; + } + + if (next) { + /* Help moving tail further. */ + uatomic_cmpxchg(&q->tail, tail, next); + continue; + } + + if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) { + /* Move tail (another operation might beat us to it, + that's fine). */ + uatomic_cmpxchg(&q->tail, tail, n); + rcu_read_unlock(); + return; + } + } +} + +void *dequeue(struct queue *q, bool *not_empty) +{ + bool dummy; + if (!not_empty) + not_empty = &dummy; + + rcu_read_lock(); + *not_empty = false; + for (;;) { + void *data; + struct node *head = rcu_dereference(q->head); + struct node *tail = rcu_dereference(q->tail); + struct node *next = rcu_dereference(head->next); + + if (head != q->head) { + /* A change occurred while reading the values. */ + continue; + } + + if (head == tail) { + /* If all three are consistent, the queue is empty. */ + if (!next) + return NULL; + + /* Help moving tail further. */ + uatomic_cmpxchg(&q->tail, tail, next); + continue; + } + + data = next->data; + if (uatomic_cmpxchg(&q->head, head, next) == head) { + /* Next remains as a dummy node, head is freed. */ + rcu_read_unlock(); + *not_empty = true; + free_node (head); + return data; + } + } +} + + +static struct queue q; + +void *thr_enqueuer(void *_count) +{ + unsigned long long *count = _count; + + printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n", + "enqueuer", pthread_self(), (unsigned long)gettid()); + + set_affinity(); + + rcu_register_thread(); + + while (!test_go) + { + } + smp_mb(); + + for (;;) { + enqueue (&q, NULL); + + if (unlikely(wdelay)) + loop_sleep(wdelay); + nr_enqueues++; + if (unlikely(!test_duration_enqueue())) + break; + } + + rcu_unregister_thread(); + + *count = nr_enqueues; + printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n", + "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues); + return ((void*)1); + +} + +void *thr_dequeuer(void *_count) +{ + unsigned long long *count = _count; + + printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n", + "dequeuer", pthread_self(), (unsigned long)gettid()); + + set_affinity(); + + rcu_register_thread(); + + while (!test_go) + { + } + smp_mb(); + + for (;;) { + bool not_empty; + dequeue (&q, ¬_empty); + if (not_empty) + uatomic_inc (&nr_successful_dequeues); + + nr_dequeues++; + if (unlikely(!test_duration_dequeue())) + break; + if (unlikely(rduration)) + loop_sleep(rduration); + } + + rcu_unregister_thread(); + + printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n", + "dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues); + *count = nr_dequeues; + return ((void*)2); +} + +void test_end(struct queue *q) +{ + bool not_empty; + do + dequeue (q, ¬_empty); + while (!not_empty); + if (q->current != q->free) + free_node_page(q->free); + free_node_page(q->current); +} + +void show_usage(int argc, char **argv) +{ + printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]); + printf(" [-d delay] (enqueuer period (us))"); + printf(" [-c duration] (dequeuer C.S. duration (in loops))"); + printf(" [-v] (verbose output)"); + printf(" [-a cpu#] [-a cpu#]... (affinity)"); + printf("\n"); +} + +int main(int argc, char **argv) +{ + int err; + pthread_t *tid_enqueuer, *tid_dequeuer; + void *tret; + unsigned long long *count_enqueuer, *count_dequeuer; + unsigned long long tot_enqueues = 0, tot_dequeues = 0; + int i, a; + + if (argc < 4) { + show_usage(argc, argv); + return -1; + } + + err = sscanf(argv[1], "%u", &nr_dequeuers); + if (err != 1) { + show_usage(argc, argv); + return -1; + } + + err = sscanf(argv[2], "%u", &nr_enqueuers); + if (err != 1) { + show_usage(argc, argv); + return -1; + } + + err = sscanf(argv[3], "%lu", &duration); + if (err != 1) { + show_usage(argc, argv); + return -1; + } + + for (i = 4; i < argc; i++) { + if (argv[i][0] != '-') + continue; + switch (argv[i][1]) { + case 'a': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + a = atoi(argv[++i]); + cpu_affinities[next_aff++] = a; + use_affinity = 1; + printf_verbose("Adding CPU %d affinity\n", a); + break; + case 'c': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + rduration = atol(argv[++i]); + break; + case 'd': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + wdelay = atol(argv[++i]); + break; + case 'v': + verbose_mode = 1; + break; + } + } + + printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n", + duration, nr_enqueuers, nr_dequeuers); + printf_verbose("Writer delay : %lu loops.\n", rduration); + printf_verbose("Reader duration : %lu loops.\n", wdelay); + printf_verbose("thread %-6s, thread id : %lx, tid %lu\n", + "main", pthread_self(), (unsigned long)gettid()); + + tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers); + tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); + count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers); + count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers); + init_queue (&q); + + next_aff = 0; + + for (i = 0; i < nr_enqueuers; i++) { + err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer, + &count_enqueuer[i]); + if (err != 0) + exit(1); + } + for (i = 0; i < nr_dequeuers; i++) { + err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer, + &count_dequeuer[i]); + if (err != 0) + exit(1); + } + + smp_mb(); + + test_go = 1; + + for (i = 0; i < duration; i++) { + sleep(1); + if (verbose_mode) + write (1, ".", 1); + } + + test_stop = 1; + + for (i = 0; i < nr_enqueuers; i++) { + err = pthread_join(tid_enqueuer[i], &tret); + if (err != 0) + exit(1); + tot_enqueues += count_enqueuer[i]; + } + for (i = 0; i < nr_dequeuers; i++) { + err = pthread_join(tid_dequeuer[i], &tret); + if (err != 0) + exit(1); + tot_dequeues += count_dequeuer[i]; + } + + printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues, + tot_dequeues); + printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu " + "nr_dequeuers %3u " + "rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n", + argv[0], duration, nr_enqueuers, wdelay, + nr_dequeuers, rduration, tot_enqueues, tot_dequeues, + nr_successful_dequeues, tot_enqueues + tot_dequeues); + + test_end(&q); + free(tid_enqueuer); + free(tid_dequeuer); + free(count_enqueuer); + free(count_dequeuer); + return 0; +} diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c new file mode 100644 index 0000000..90587b7 --- /dev/null +++ b/tests/test_urcu_lfq.c @@ -0,0 +1,585 @@ +/* + * test_urcu.c + * + * Userspace RCU library - example RCU-based lock-free queue + * + * Copyright February 2010 - Paolo Bonzini + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include "../config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/* hardcoded number of CPUs */ +#define NR_CPUS 16384 + +#if defined(_syscall0) +_syscall0(pid_t, gettid) +#elif defined(__NR_gettid) +static inline pid_t gettid(void) +{ + return syscall(__NR_gettid); +} +#else +#warning "use pid as tid" +static inline pid_t gettid(void) +{ + return getpid(); +} +#endif + +#ifndef DYNAMIC_LINK_TEST +#define _LGPL_SOURCE +#endif +#include + + +static volatile int test_go, test_stop; + +static unsigned long rduration; + +static unsigned long duration; + +/* read-side C.S. duration, in loops */ +static unsigned long wdelay; + +static inline void loop_sleep(unsigned long l) +{ + while(l-- != 0) + cpu_relax(); +} + +static int verbose_mode; + +#define printf_verbose(fmt, args...) \ + do { \ + if (verbose_mode) \ + printf(fmt, args); \ + } while (0) + +static unsigned int cpu_affinities[NR_CPUS]; +static unsigned int next_aff = 0; +static int use_affinity = 0; + +pthread_mutex_t affinity_mutex = PTHREAD_MUTEX_INITIALIZER; + +#ifndef HAVE_CPU_SET_T +typedef unsigned long cpu_set_t; +# define CPU_ZERO(cpuset) do { *(cpuset) = 0; } while(0) +# define CPU_SET(cpu, cpuset) do { *(cpuset) |= (1UL << (cpu)); } while(0) +#endif + +static void set_affinity(void) +{ + cpu_set_t mask; + int cpu; + int ret; + + if (!use_affinity) + return; + +#if HAVE_SCHED_SETAFFINITY + ret = pthread_mutex_lock(&affinity_mutex); + if (ret) { + perror("Error in pthread mutex lock"); + exit(-1); + } + cpu = cpu_affinities[next_aff++]; + ret = pthread_mutex_unlock(&affinity_mutex); + if (ret) { + perror("Error in pthread mutex unlock"); + exit(-1); + } + + CPU_ZERO(&mask); + CPU_SET(cpu, &mask); +#if SCHED_SETAFFINITY_ARGS == 2 + sched_setaffinity(0, &mask); +#else + sched_setaffinity(0, sizeof(mask), &mask); +#endif +#endif /* HAVE_SCHED_SETAFFINITY */ +} + +/* + * returns 0 if test should end. + */ +static int test_duration_dequeue(void) +{ + return !test_stop; +} + +static int test_duration_enqueue(void) +{ + return !test_stop; +} + +static unsigned long long __thread nr_dequeues; +static unsigned long long __thread nr_enqueues; + +static unsigned long nr_successful_dequeues; +static unsigned int nr_enqueuers; +static unsigned int nr_dequeuers; + + +#define ARRAY_POISON 0xDEADBEEF +#define PAGE_SIZE 4096 +#define PAGE_MASK (PAGE_SIZE - 1) +#define NODES_PER_PAGE (PAGE_SIZE - 16) / sizeof (struct node) + +/* Lock-free queue, using the RCU to avoid the ABA problem and (more + interestingly) to efficiently handle freeing memory. + + We have to protect both the enqueuer and dequeuer's compare-and- + exchange operation from running across a free and a subsequent + reallocation of the same memory. So, we protect the free with + synchronize_rcu; this is enough because all the allocations take + place before the compare-and-exchange ops. + + Besides adding rcu_read_{,un}lock, the enqueue/dequeue are a standard + implementation of a lock-free-queue. The first node in the queue is + always dummy: dequeuing returns the data from HEAD->NEXT, advances + HEAD to HEAD->NEXT (which will now serve as dummy node), and frees the + old HEAD. Since RCU avoids the ABA problem, it doesn't use double-word + compare-and-exchange operations. Node allocation and deallocation are + a "black box" and synchronize_rcu is hidden within node deallocation. + + So, the tricky part is finding a good allocation strategy for nodes. + The allocator for nodes is shared by multiple threads, and since + malloc/free are not lock-free a layer above them is obviously + necessary: otherwise the whole exercise is useless. In addition, + to avoid penalizing dequeues, the allocator should avoid frequent + synchronization (because synchronize_rcu is expensive). + + The scheme that is used here uses a page as the allocation + unit for nodes. A page is freed when no more nodes are in use. + Nodes from a page are never reused. + + The nodes are allocated from Q->CURRENT. Since whoever finds a full + page has to busy wait, we use a trick to limit the duration of busy + waiting. A free page Q->FREE is always kept ready, so that any thread + that allocates the last node in a page, or finds a full page can try + to update Q->CURRENT. Whoever loses the race has to busy wait, OTOH + whoever wins the race has to allocate the new Q->FREE. In other words, + if the following sequence happens + + Thread 1 Thread 2 other threads + ----------------------------------------------------------------------- + Get last node from page + q->current = q->free; + fill up q->current + q->current = q->free fails + + then thread 1 does not have anymore the duty of allocating q->current; + thread 2 will do that. If a thread finds a full current page and + Q->CURRENT == Q->FREE, this means that another thread is going to + allocate Q->FREE soon, and it busy waits. After the allocation + finishes, everything proceeds normally: some thread will take care + of setting Q->CURRENT and allocating a new Q->FREE. + + One common scheme for allocation is to use a free list (implemented + as a lock-free stack), but this free list is potentially unbounded. + Instead, with the above scheme the number of live pages at any time + is equal to the number of enqueuing threads. */ + +struct node { + void *data; + void *next; +}; + +struct node_page { + int in; + int out; + char padding[16 - sizeof(int) * 2]; + struct node nodes[NODES_PER_PAGE]; +}; + + +struct queue { + struct node_page *current, *free; + struct node *head, *tail; +}; + +static struct node_page *new_node_page() +{ + struct node_page *p = valloc (PAGE_SIZE); + p->in = p->out = 0; + return p; +} + +static void free_node_page(struct node_page *p) +{ + /* Help making sure that accessing a dangling pointer is + adequately punished. */ + p->in = ARRAY_POISON; + free (p); +} + +static struct node *new_node(struct queue *q) +{ + struct node *n; + struct node_page *p; + int i; + + do { + p = q->current; + i = p->in; + if (i >= NODES_PER_PAGE - 1 && + q->free != p && + uatomic_cmpxchg(&q->current, p, q->free) == p) + q->free = new_node_page(); + + } while (i == NODES_PER_PAGE || uatomic_cmpxchg(&p->in, i, i+1) != i); + + assert (i >= 0 && i < NODES_PER_PAGE); + n = &p->nodes[i]; + n->next = NULL; + return n; +} + +void free_node(struct node *n) +{ + struct node_page *p = (struct node_page *) ((intptr_t) n & ~PAGE_MASK); + + if (uatomic_add_return(&p->out, 1) == NODES_PER_PAGE) { + synchronize_rcu(); + free_node_page(p); + } +} + +void init_queue(struct queue *q) +{ + q->current = new_node_page(); + q->free = new_node_page(); + q->head = q->tail = new_node(q); +} + +void enqueue(struct queue *q, void *value) +{ + struct node *n = new_node(q); + n->data = value; + rcu_read_lock(); + for (;;) { + struct node *tail = rcu_dereference(q->tail); + struct node *next = rcu_dereference(tail->next); + if (tail != q->tail) { + /* A change occurred while reading the values. */ + continue; + } + + if (next) { + /* Help moving tail further. */ + uatomic_cmpxchg(&q->tail, tail, next); + continue; + } + + if (uatomic_cmpxchg(&tail->next, NULL, n) == NULL) { + /* Move tail (another operation might beat us to it, + that's fine). */ + uatomic_cmpxchg(&q->tail, tail, n); + rcu_read_unlock(); + return; + } + } +} + +void *dequeue(struct queue *q, bool *not_empty) +{ + bool dummy; + if (!not_empty) + not_empty = &dummy; + + rcu_read_lock(); + *not_empty = false; + for (;;) { + void *data; + struct node *head = rcu_dereference(q->head); + struct node *tail = rcu_dereference(q->tail); + struct node *next = rcu_dereference(head->next); + + if (head != q->head) { + /* A change occurred while reading the values. */ + continue; + } + + if (head == tail) { + /* If all three are consistent, the queue is empty. */ + if (!next) + return NULL; + + /* Help moving tail further. */ + uatomic_cmpxchg(&q->tail, tail, next); + continue; + } + + data = next->data; + if (uatomic_cmpxchg(&q->head, head, next) == head) { + /* Next remains as a dummy node, head is freed. */ + rcu_read_unlock(); + *not_empty = true; + free_node (head); + return data; + } + } +} + + +static struct queue q; + +void *thr_enqueuer(void *_count) +{ + unsigned long long *count = _count; + + printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n", + "enqueuer", pthread_self(), (unsigned long)gettid()); + + set_affinity(); + + rcu_register_thread(); + + while (!test_go) + { + } + smp_mb(); + + for (;;) { + enqueue (&q, NULL); + + if (unlikely(wdelay)) + loop_sleep(wdelay); + nr_enqueues++; + if (unlikely(!test_duration_enqueue())) + break; + } + + rcu_unregister_thread(); + + *count = nr_enqueues; + printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n", + "enqueuer", pthread_self(), (unsigned long)gettid(), nr_enqueues); + return ((void*)1); + +} + +void *thr_dequeuer(void *_count) +{ + unsigned long long *count = _count; + + printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n", + "dequeuer", pthread_self(), (unsigned long)gettid()); + + set_affinity(); + + rcu_register_thread(); + + while (!test_go) + { + } + smp_mb(); + + for (;;) { + bool not_empty; + dequeue (&q, ¬_empty); + if (not_empty) + uatomic_inc (&nr_successful_dequeues); + + nr_dequeues++; + if (unlikely(!test_duration_dequeue())) + break; + if (unlikely(rduration)) + loop_sleep(rduration); + } + + rcu_unregister_thread(); + + printf_verbose("thread_end %s, thread id : %lx, tid %lu - count %d\n", + "dequeuer", pthread_self(), (unsigned long)gettid(), nr_dequeues); + *count = nr_dequeues; + return ((void*)2); +} + +void test_end(struct queue *q) +{ + bool not_empty; + do + dequeue (q, ¬_empty); + while (!not_empty); + if (q->current != q->free) + free_node_page(q->free); + free_node_page(q->current); +} + +void show_usage(int argc, char **argv) +{ + printf("Usage : %s nr_dequeuers nr_enqueuers duration (s)", argv[0]); + printf(" [-d delay] (enqueuer period (in loops))"); + printf(" [-c duration] (dequeuer period (in loops))"); + printf(" [-v] (verbose output)"); + printf(" [-a cpu#] [-a cpu#]... (affinity)"); + printf("\n"); +} + +int main(int argc, char **argv) +{ + int err; + pthread_t *tid_enqueuer, *tid_dequeuer; + void *tret; + unsigned long long *count_enqueuer, *count_dequeuer; + unsigned long long tot_enqueues = 0, tot_dequeues = 0; + int i, a; + + if (argc < 4) { + show_usage(argc, argv); + return -1; + } + + err = sscanf(argv[1], "%u", &nr_dequeuers); + if (err != 1) { + show_usage(argc, argv); + return -1; + } + + err = sscanf(argv[2], "%u", &nr_enqueuers); + if (err != 1) { + show_usage(argc, argv); + return -1; + } + + err = sscanf(argv[3], "%lu", &duration); + if (err != 1) { + show_usage(argc, argv); + return -1; + } + + for (i = 4; i < argc; i++) { + if (argv[i][0] != '-') + continue; + switch (argv[i][1]) { + case 'a': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + a = atoi(argv[++i]); + cpu_affinities[next_aff++] = a; + use_affinity = 1; + printf_verbose("Adding CPU %d affinity\n", a); + break; + case 'c': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + rduration = atol(argv[++i]); + break; + case 'd': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + wdelay = atol(argv[++i]); + break; + case 'v': + verbose_mode = 1; + break; + } + } + + printf_verbose("running test for %lu seconds, %u enqueuers, %u dequeuers.\n", + duration, nr_enqueuers, nr_dequeuers); + printf_verbose("Writer delay : %lu loops.\n", rduration); + printf_verbose("Reader duration : %lu loops.\n", wdelay); + printf_verbose("thread %-6s, thread id : %lx, tid %lu\n", + "main", pthread_self(), (unsigned long)gettid()); + + tid_enqueuer = malloc(sizeof(*tid_enqueuer) * nr_enqueuers); + tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); + count_enqueuer = malloc(sizeof(*count_enqueuer) * nr_enqueuers); + count_dequeuer = malloc(sizeof(*count_dequeuer) * nr_dequeuers); + init_queue (&q); + + next_aff = 0; + + for (i = 0; i < nr_enqueuers; i++) { + err = pthread_create(&tid_enqueuer[i], NULL, thr_enqueuer, + &count_enqueuer[i]); + if (err != 0) + exit(1); + } + for (i = 0; i < nr_dequeuers; i++) { + err = pthread_create(&tid_dequeuer[i], NULL, thr_dequeuer, + &count_dequeuer[i]); + if (err != 0) + exit(1); + } + + smp_mb(); + + test_go = 1; + + for (i = 0; i < duration; i++) { + sleep(1); + if (verbose_mode) + write (1, ".", 1); + } + + test_stop = 1; + + for (i = 0; i < nr_enqueuers; i++) { + err = pthread_join(tid_enqueuer[i], &tret); + if (err != 0) + exit(1); + tot_enqueues += count_enqueuer[i]; + } + for (i = 0; i < nr_dequeuers; i++) { + err = pthread_join(tid_dequeuer[i], &tret); + if (err != 0) + exit(1); + tot_dequeues += count_dequeuer[i]; + } + + printf_verbose("total number of enqueues : %llu, dequeues %llu\n", tot_enqueues, + tot_dequeues); + printf("SUMMARY %-25s testdur %4lu nr_enqueuers %3u wdelay %6lu " + "nr_dequeuers %3u " + "rdur %6lu nr_enqueues %12llu nr_dequeues %12llu successful %12lu nr_ops %12llu\n", + argv[0], duration, nr_enqueuers, wdelay, + nr_dequeuers, rduration, tot_enqueues, tot_dequeues, + nr_successful_dequeues, tot_enqueues + tot_dequeues); + + test_end(&q); + free(tid_enqueuer); + free(tid_dequeuer); + free(count_enqueuer); + free(count_dequeuer); + return 0; +}