split-ordered hash table
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 5 Jul 2011 23:35:08 +0000 (19:35 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 5 Jul 2011 23:35:08 +0000 (19:35 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Makefile.am
rculfhash.c
tests/Makefile.am
tests/test_urcu_hash.c
urcu/rculfhash.h

index ef1105813cbb01994a64afedb7d4288af06ce166..23260496660f21d857006ec1740ddddebd1daf70 100644 (file)
@@ -15,7 +15,7 @@ nobase_dist_include_HEADERS = urcu/compiler.h urcu/hlist.h urcu/list.h \
                urcu/wfqueue.h urcu/rculfstack.h urcu/rculfqueue.h \
                urcu/ref.h urcu/map/*.h urcu/static/*.h urcu/cds.h \
                urcu/urcu_ref.h urcu/urcu-futex.h urcu/uatomic_arch.h \
-               urcu/urcu-ht.h
+               urcu/rculfhash.h
 nobase_nodist_include_HEADERS = urcu/arch.h urcu/uatomic.h urcu/config.h
 
 EXTRA_DIST = $(top_srcdir)/urcu/arch/*.h $(top_srcdir)/urcu/uatomic/*.h \
index ec799dcc87a6d38bb37fd5e8320d5bfad63607a8..9c957cf4026fe1106f6e0f5db3a63b58585e8269 100644 (file)
@@ -1,6 +1,23 @@
-
 /*
- * TODO: keys are currently assumed <= sizeof(void *). Key target never freed.
+ * rculfhash.c
+ *
+ * Userspace RCU library - Lock-Free Expandable RCU Hash Table
+ *
+ * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
 #define _LGPL_SOURCE
 #include <errno.h>
 #include <assert.h>
 #include <stdio.h>
+#include <stdint.h>
 
 #include <urcu.h>
-#include <urcu-defer.h>
+#include <urcu-call-rcu.h>
 #include <urcu/arch.h>
 #include <urcu/uatomic.h>
 #include <urcu/jhash.h>
 #include <urcu/compiler.h>
+#include <urcu/rculfhash.h>
 #include <stdio.h>
 #include <pthread.h>
-#include <urcu/rculfhash.h>
-
-/*
- * Maximum number of hash table buckets: 256M on 64-bit.
- * Should take about 512MB max if we assume 1 node per 4 buckets.
- */
-#define MAX_HT_BUCKETS ((256 << 10) / sizeof(void *))
 
-/* node flags */
-#define        NODE_STOLEN     (1 << 0)
+#define BUCKET_SIZE_RESIZE_THRESHOLD   5
 
-struct rcu_ht_node;
-
-struct rcu_ht_node {
-       struct rcu_ht_node *next;
-       void *key;
-       void *data;
-       unsigned int flags;
-};
+#ifndef max
+#define max(a, b)      ((a) > (b) ? (a) : (b))
+#endif
 
 struct rcu_table {
-       unsigned long size;
+       unsigned long size;     /* always a power of 2 */
+       struct rcu_head head;
        struct rcu_ht_node *tbl[0];
 };
 
 struct rcu_ht {
        struct rcu_table *t;            /* shared */
        ht_hash_fct hash_fct;
-       void (*free_fct)(void *data);   /* fct to free data */
-       uint32_t keylen;
-       uint32_t hashseed;
+       void *hashseed;
        pthread_mutex_t resize_mutex;   /* resize mutex: add/del mutex */
-       int resize_ongoing;             /* fast-path resize check */
+       unsigned long target_size;
+       void (*ht_call_rcu)(struct rcu_head *head,
+                     void (*func)(struct rcu_head *head));
 };
 
-struct rcu_ht *ht_new(ht_hash_fct hash_fct, void (*free_fct)(void *data),
-                     unsigned long init_size, uint32_t keylen,
-                     uint32_t hashseed)
-{
+struct rcu_resize_work {
+       struct rcu_head head;
        struct rcu_ht *ht;
+};
 
-       ht = calloc(1, sizeof(struct rcu_ht));
-       ht->hash_fct = hash_fct;
-       ht->free_fct = free_fct;
-       ht->keylen = keylen;
-       ht->hashseed = hashseed;
-       /* this mutex should not nest in read-side C.S. */
-       pthread_mutex_init(&ht->resize_mutex, NULL);
-       ht->resize_ongoing = 0; /* shared */
-       ht->t = calloc(1, sizeof(struct rcu_table)
-                      + (init_size * sizeof(struct rcu_ht_node *)));
-       ht->t->size = init_size;
-       return ht;
+static
+void ht_resize_lazy(struct rcu_ht *ht, int growth);
+
+static
+void check_resize(struct rcu_ht *ht, unsigned long chain_len)
+{
+       if (chain_len >= BUCKET_SIZE_RESIZE_THRESHOLD)
+               ht_resize_lazy(ht, chain_len / BUCKET_SIZE_RESIZE_THRESHOLD);
 }
 
-void *ht_lookup(struct rcu_ht *ht, void *key)
+/*
+ * Algorithm to reverse bits in a word by lookup table, extended to
+ * 64-bit words.
+ * ref.
+ * http://graphics.stanford.edu/~seander/bithacks.html#BitReverseTable
+ */
+
+static const uint8_t BitReverseTable256[256] = 
 {
-       struct rcu_table *t;
-       unsigned long hash;
-       struct rcu_ht_node *node;
-       void *ret;
+#define R2(n) (n),   (n) + 2*64,     (n) + 1*64,     (n) + 3*64
+#define R4(n) R2(n), R2((n) + 2*16), R2((n) + 1*16), R2((n) + 3*16)
+#define R6(n) R4(n), R4((n) + 2*4 ), R4((n) + 1*4 ), R4((n) + 3*4 )
+       R6(0), R6(2), R6(1), R6(3)
+};
+#undef R2
+#undef R4
+#undef R6
 
-       rcu_read_lock();
-       t = rcu_dereference(ht->t);
-       smp_read_barrier_depends();     /* read t before size and table */
-       hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % t->size;
-       smp_read_barrier_depends();     /* read size before links */
-       node = rcu_dereference(t->tbl[hash]);
-       for (;;) {
-               if (likely(!node)) {
-                       ret = NULL;
-                       break;
-               }
-               if (node->key == key) {
-                       ret = node->data;
-                       break;
-               }
-               node = rcu_dereference(node->next);
-       }
-       rcu_read_unlock();
+static
+uint8_t bit_reverse_u8(uint8_t v)
+{
+       return BitReverseTable256[v];
+}
 
-       return ret;
+static __attribute__((unused))
+uint32_t bit_reverse_u32(uint32_t v)
+{
+       return ((uint32_t) bit_reverse_u8(v) << 24) | 
+               ((uint32_t) bit_reverse_u8(v >> 8) << 16) | 
+               ((uint32_t) bit_reverse_u8(v >> 16) << 8) | 
+               ((uint32_t) bit_reverse_u8(v >> 24));
 }
 
-/*
- * Will re-try until either:
- * - The key is already there (-EEXIST)
- * - We successfully add the key at the head of a table bucket.
- */
-int ht_add(struct rcu_ht *ht, void *key, void *data)
+static __attribute__((unused))
+uint64_t bit_reverse_u64(uint64_t v)
 {
-       struct rcu_ht_node *node, *old_head, *new_head;
-       struct rcu_table *t;
-       unsigned long hash;
-       int ret = 0;
-
-       new_head = calloc(1, sizeof(struct rcu_ht_node));
-       new_head->key = key;
-       new_head->data = data;
-       new_head->flags = 0;
-       /* here comes the fun and tricky part.
-        * Add at the beginning with a cmpxchg.
-        * Hold a read lock between the moment the first element is read
-        * and the nodes traversal (to find duplicates). This ensures
-        * the head pointer has not been reclaimed when cmpxchg is done.
-        * Always adding at the head ensures that we would have to
-        * re-try if a new item has been added concurrently. So we ensure that
-        * we never add duplicates. */
-retry:
-       rcu_read_lock();
-
-       if (unlikely(LOAD_SHARED(ht->resize_ongoing))) {
-               rcu_read_unlock();
-               /*
-                * Wait for resize to complete before continuing.
-                */
-               ret = pthread_mutex_lock(&ht->resize_mutex);
-               assert(!ret);
-               ret = pthread_mutex_unlock(&ht->resize_mutex);
-               assert(!ret);
-               goto retry;
+       return ((uint64_t) bit_reverse_u8(v) << 56) | 
+               ((uint64_t) bit_reverse_u8(v >> 8)  << 48) | 
+               ((uint64_t) bit_reverse_u8(v >> 16) << 40) |
+               ((uint64_t) bit_reverse_u8(v >> 24) << 32) |
+               ((uint64_t) bit_reverse_u8(v >> 32) << 24) | 
+               ((uint64_t) bit_reverse_u8(v >> 40) << 16) | 
+               ((uint64_t) bit_reverse_u8(v >> 48) << 8) |
+               ((uint64_t) bit_reverse_u8(v >> 56));
+}
+
+static
+unsigned long bit_reverse_ulong(unsigned long v)
+{
+#if (CAA_BITS_PER_LONG == 32)
+       return bit_reverse_u32(v);
+#else
+       return bit_reverse_u64(v);
+#endif
+}
+
+static
+struct rcu_ht_node *clear_flag(struct rcu_ht_node *node)
+{
+       return (struct rcu_ht_node *) (((unsigned long) node) & ~0x1);
+}
+
+static
+int is_removed(struct rcu_ht_node *node)
+{
+       return ((unsigned long) node) & 0x1;
+}
+
+static
+struct rcu_ht_node *flag_removed(struct rcu_ht_node *node)
+{
+       return (struct rcu_ht_node *) (((unsigned long) node) | 0x1);
+}
+
+static
+void _uatomic_max(unsigned long *ptr, unsigned long v)
+{
+       unsigned long old1, old2;
+
+       old1 = uatomic_read(ptr);
+       do {
+               old2 = old1;
+               if (old2 >= v)
+                       break;
+       } while ((old1 = uatomic_cmpxchg(ptr, old2, v)) != old2);
+}
+
+static
+int _ht_add(struct rcu_ht *ht, struct rcu_table *t, struct rcu_ht_node *node)
+{
+       struct rcu_ht_node *iter_prev = NULL, *iter = NULL;
+
+       for (;;) {
+               unsigned long chain_len = 0;
+
+               iter_prev = rcu_dereference(t->tbl[node->hash & (t->size - 1)]);
+               assert(iter_prev);
+               assert(iter_prev->reverse_hash <= node->reverse_hash);
+               for (;;) {
+                       iter = clear_flag(rcu_dereference(iter_prev->next));
+                       if (unlikely(!iter))
+                               break;
+                       if (iter->reverse_hash < node->reverse_hash)
+                               break;
+                       iter_prev = iter;
+                       check_resize(ht, ++chain_len);
+               }
+               /* add in iter_prev->next */
+               if (is_removed(iter))
+                       continue;
+               node->next = iter;
+               if (uatomic_cmpxchg(&iter_prev->next, iter, node) != iter)
+                       continue;
        }
+}
 
-       t = rcu_dereference(ht->t);
-       /* no read barrier needed, because no concurrency with resize */
-       hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % t->size;
+static
+int _ht_remove(struct rcu_ht *ht, struct rcu_table *t, struct rcu_ht_node *node)
+{
+       struct rcu_ht_node *iter_prev, *iter, *next, *old;
+       unsigned long chain_len;
+       int found, ret = 0;
+       int flagged = 0;
 
-       old_head = node = rcu_dereference(t->tbl[hash]);
+retry:
+       chain_len = 0;
+       found = 0;
+       iter_prev = rcu_dereference(t->tbl[node->hash & (t->size - 1)]);
+       assert(iter_prev);
+       assert(iter_prev->reverse_hash <= node->reverse_hash);
        for (;;) {
-               if (likely(!node)) {
+               iter = clear_flag(rcu_dereference(iter_prev->next));
+               if (unlikely(!iter))
+                       break;
+               if (iter->reverse_hash < node->reverse_hash)
+                       break;
+               if (iter == node) {
+                       found = 1;
                        break;
                }
-               if (node->key == key) {
-                       ret = -EEXIST;
+               iter_prev = iter;
+       } 
+       if (!found) {
+               ret = -ENOENT;
+               goto end;
+       }
+       next = rcu_dereference(iter->next);
+       if (!flagged) {
+               if (is_removed(next)) {
+                       ret = -ENOENT;
                        goto end;
                }
-               node = rcu_dereference(node->next);
+               /* set deletion flag */
+               if ((old = uatomic_cmpxchg(&iter->next, next, flag_removed(next))) != next) {
+                       if (old == flag_removed(next)) {
+                               ret = -ENOENT;
+                               goto end;
+                       } else {
+                               goto retry;
+                       }
+               }
+               flagged = 1;
        }
-       new_head->next = old_head;
-       if (rcu_cmpxchg_pointer(&t->tbl[hash], old_head, new_head) != old_head)
-               goto restart;
+       /*
+        * Remove the element from the list. Retry if there has been a
+        * concurrent add (there cannot be a concurrent delete, because
+        * we won the deletion flag cmpxchg).
+        */
+       if (uatomic_cmpxchg(&iter_prev->next, iter, clear_flag(next)) != iter)
+               goto retry;
 end:
-       rcu_read_unlock();
        return ret;
+}
 
-       /* restart loop, release and re-take the read lock to be kind to GP */
-restart:
-       rcu_read_unlock();
-       goto retry;
+static
+void init_table(struct rcu_ht *ht, struct rcu_table *t,
+               unsigned long first, unsigned long len)
+{
+       unsigned long i, end;
+
+       end = first + len;
+       for (i = first; i < end; i++) {
+               /* Update table size when power of two */
+               if (i != 0 && !(i & (i - 1)))
+                       t->size = i;
+               t->tbl[i] = calloc(1, sizeof(struct rcu_ht_node));
+               t->tbl[i]->dummy = 1;
+               t->tbl[i]->hash = i;
+               t->tbl[i]->reverse_hash = bit_reverse_ulong(i);
+               _ht_add(ht, t, t->tbl[i]);
+       }
+       t->size = end;
 }
 
-/*
- * Restart until we successfully remove the entry, or no entry is left
- * ((void *)(unsigned long)-ENOENT).
- * Deal with concurrent stealers by doing an extra verification pass to check
- * that no element in the list are still pointing to the element stolen.
- * This could happen if two concurrent steal for consecutive objects are
- * executed. A pointer to an object being stolen could be saved by the
- * concurrent stealer for the previous object.
- * Also, given that in this precise scenario, another stealer can also want to
- * delete the doubly-referenced object; use a "stolen" flag to let only one
- * stealer delete the object.
- */
-void *ht_steal(struct rcu_ht *ht, void *key)
+struct rcu_ht *ht_new(ht_hash_fct hash_fct,
+                     void *hashseed,
+                     unsigned long init_size,
+                     void (*ht_call_rcu)(struct rcu_head *head,
+                               void (*func)(struct rcu_head *head)))
+{
+       struct rcu_ht *ht;
+
+       ht = calloc(1, sizeof(struct rcu_ht));
+       ht->hash_fct = hash_fct;
+       ht->hashseed = hashseed;
+       /* this mutex should not nest in read-side C.S. */
+       pthread_mutex_init(&ht->resize_mutex, NULL);
+       ht->t = calloc(1, sizeof(struct rcu_table)
+                      + (max(init_size, 1) * sizeof(struct rcu_ht_node *)));
+       ht->t->size = 0;
+       init_table(ht, ht->t, 0, max(init_size, 1));
+       ht->target_size = ht->t->size;
+       ht->ht_call_rcu = ht_call_rcu;
+       return ht;
+}
+
+struct rcu_ht_node *ht_lookup(struct rcu_ht *ht, void *key)
 {
-       struct rcu_ht_node **prev, *node, *del_node = NULL;
        struct rcu_table *t;
-       unsigned long hash;
-       void *data;
-       int ret;
+       struct rcu_ht_node *node;
+       unsigned long hash, reverse_hash;
 
-retry:
-       rcu_read_lock();
-
-       if (unlikely(LOAD_SHARED(ht->resize_ongoing))) {
-               rcu_read_unlock();
-               /*
-                * Wait for resize to complete before continuing.
-                */
-               ret = pthread_mutex_lock(&ht->resize_mutex);
-               assert(!ret);
-               ret = pthread_mutex_unlock(&ht->resize_mutex);
-               assert(!ret);
-               goto retry;
-       }
+       hash = ht->hash_fct(ht->hashseed, key);
+       reverse_hash = bit_reverse_ulong(hash);
 
        t = rcu_dereference(ht->t);
-       /* no read barrier needed, because no concurrency with resize */
-       hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % t->size;
-
-       prev = &t->tbl[hash];
-       node = rcu_dereference(*prev);
+       cmm_smp_read_barrier_depends(); /* read t before size and table */
+       node = rcu_dereference(t->tbl[hash & (t->size - 1)]);
        for (;;) {
-               if (likely(!node)) {
-                       if (del_node) {
-                               goto end;
-                       } else {
-                               goto error;
-                       }
+               if (unlikely(!node))
+                       break;
+               if (node->reverse_hash > reverse_hash) {
+                       node = NULL;
+                       break;
                }
                if (node->key == key) {
+                       if (is_removed(rcu_dereference(node->next)))
+                               node = NULL;
                        break;
                }
-               prev = &node->next;
-               node = rcu_dereference(*prev);
+               node = clear_flag(rcu_dereference(node->next));
        }
+       return node;
+}
 
-       if (!del_node) {
-               /*
-                * Another concurrent thread stole it ? If so, let it deal with
-                * this. Assume NODE_STOLEN is the only flag. If this changes,
-                * read flags before cmpxchg.
-                */
-               if (cmpxchg(&node->flags, 0, NODE_STOLEN) != 0)
-                       goto error;
-       }
-
-       /* Found it ! pointer to object is in "prev" */
-       if (rcu_cmpxchg_pointer(prev, node, node->next) == node)
-               del_node = node;
-       goto restart;
-
-end:
-       /*
-        * From that point, we own node. Note that there can still be concurrent
-        * RCU readers using it. We can free it outside of read lock after a GP.
-        */
-       rcu_read_unlock();
-
-       data = del_node->data;
-       call_rcu(free, del_node);
-       return data;
+int ht_add(struct rcu_ht *ht, struct rcu_ht_node *node)
+{
+       struct rcu_table *t;
 
-error:
-       data = (void *)(unsigned long)-ENOENT;
-       rcu_read_unlock();
-       return data;
+       node->hash = ht->hash_fct(ht->hashseed, node->key);
+       node->reverse_hash = bit_reverse_ulong((unsigned long) node->hash);
 
-       /* restart loop, release and re-take the read lock to be kind to GP */
-restart:
-       rcu_read_unlock();
-       goto retry;
+       t = rcu_dereference(ht->t);
+       cmm_smp_read_barrier_depends(); /* read t before size and table */
+       return _ht_add(ht, t, node);
 }
 
-int ht_delete(struct rcu_ht *ht, void *key)
+int ht_remove(struct rcu_ht *ht, struct rcu_ht_node *node)
 {
-       void *data;
-
-       data = ht_steal(ht, key);
-       if (data && data != (void *)(unsigned long)-ENOENT) {
-               if (ht->free_fct)
-                       call_rcu(ht->free_fct, data);
-               return 0;
-       } else {
-               return -ENOENT;
-       }
+       struct rcu_table *t;
+
+       t = rcu_dereference(ht->t);
+       cmm_smp_read_barrier_depends(); /* read t before size and table */
+       return _ht_remove(ht, t, node);
 }
 
-/* Delete all old elements. Allow concurrent writer accesses. */
-int ht_delete_all(struct rcu_ht *ht)
+static
+int ht_delete_dummy(struct rcu_ht *ht)
 {
-       unsigned long i;
-       struct rcu_ht_node **prev, *node, *inext;
        struct rcu_table *t;
-       int cnt = 0;
-       int ret;
-
-       /*
-        * Mutual exclusion with resize operations, but leave add/steal execute
-        * concurrently. This is OK because we operate only on the heads.
-        */
-       ret = pthread_mutex_lock(&ht->resize_mutex);
-       assert(!ret);
+       struct rcu_ht_node *node;
+       unsigned long i;
 
-       t = rcu_dereference(ht->t);
-       /* no read barrier needed, because no concurrency with resize */
+       t = ht->t;
+       /* Check that the table is empty */
+       node = t->tbl[0];
+       do {
+               if (!node->dummy)
+                       return -EPERM;
+               node = node->next;
+       } while (node);
+       /* Internal sanity check: all nodes left should be dummy */
        for (i = 0; i < t->size; i++) {
-               rcu_read_lock();
-               prev = &t->tbl[i];
-               /*
-                * Cut the head. After that, we own the first element.
-                */
-               node = rcu_xchg_pointer(prev, NULL);
-               if (!node) {
-                       rcu_read_unlock();
-                       continue;
-               }
-               /*
-                * We manage a list shared with concurrent writers and readers.
-                * Note that a concurrent add may or may not be deleted by us,
-                * depending if it arrives before or after the head is cut.
-                * "node" points to our first node. Remove first elements
-                * iteratively.
-                */
-               for (;;) {
-                       inext = NULL;
-                       prev = &node->next;
-                       if (prev)
-                               inext = rcu_xchg_pointer(prev, NULL);
-                       /*
-                        * "node" is the first element of the list we have cut.
-                        * We therefore own it, no concurrent writer may delete
-                        * it. There can only be concurrent lookups. Concurrent
-                        * add can only be done on a bucket head, but we've cut
-                        * it already. inext is also owned by us, because we
-                        * have exchanged it for "NULL". It will therefore be
-                        * safe to use it after a G.P.
-                        */
-                       rcu_read_unlock();
-                       if (node->data)
-                               call_rcu(ht->free_fct, node->data);
-                       call_rcu(free, node);
-                       cnt++;
-                       if (likely(!inext))
-                               break;
-                       rcu_read_lock();
-                       node = inext;
-               }
+               assert(t->tbl[i]->dummy);
+               free(t->tbl[i]);
        }
-
-       ret = pthread_mutex_unlock(&ht->resize_mutex);
-       assert(!ret);
-       return cnt;
+       return 0;
 }
 
 /*
@@ -350,136 +371,83 @@ int ht_destroy(struct rcu_ht *ht)
 {
        int ret;
 
-       ret = ht_delete_all(ht);
+       ret = ht_delete_dummy(ht);
+       if (ret)
+               return ret;
        free(ht->t);
        free(ht);
        return ret;
 }
 
-static void ht_resize_grow(struct rcu_ht *ht)
+static
+void ht_free_table_cb(struct rcu_head *head)
+{
+       struct rcu_table *t =
+               caa_container_of(head, struct rcu_table, head);
+       free(t);
+}
+
+/* called with resize mutex held */
+static
+void _do_ht_resize(struct rcu_ht *ht)
 {
-       unsigned long i, new_size, old_size;
+       unsigned long new_size, old_size;
        struct rcu_table *new_t, *old_t;
-       struct rcu_ht_node *node, *new_node, *tmp;
-       unsigned long hash;
 
        old_t = ht->t;
        old_size = old_t->size;
 
-       if (old_size == MAX_HT_BUCKETS)
+       new_size = CMM_LOAD_SHARED(ht->target_size);
+       if (old_size == new_size)
                return;
-
-       new_size = old_size << 1;
-       new_t = calloc(1, sizeof(struct rcu_table)
-                      + (new_size * sizeof(struct rcu_ht_node *)));
-       new_t->size = new_size;
-
-       for (i = 0; i < old_size; i++) {
-               /*
-                * Re-hash each entry, insert in new table.
-                * It's important that a reader looking for a key _will_ find it
-                * if it's in the table.
-                * Copy each node. (just the node, not ->data)
-                */
-               node = old_t->tbl[i];
-               while (node) {
-                       hash = ht->hash_fct(node->key, ht->keylen, ht->hashseed)
-                                           % new_size;
-                       new_node = malloc(sizeof(struct rcu_ht_node));
-                       new_node->key = node->key;
-                       new_node->data = node->data;
-                       new_node->flags = node->flags;
-                       new_node->next = new_t->tbl[hash]; /* link to first */
-                       new_t->tbl[hash] = new_node;       /* add to head */
-                       node = node->next;
-               }
-       }
-
-       /* Changing table and size atomically wrt lookups */
-       rcu_assign_pointer(ht->t, new_t);
-
-       /* Ensure all concurrent lookups use new size and table */
-       synchronize_rcu();
-
-       for (i = 0; i < old_size; i++) {
-               node = old_t->tbl[i];
-               while (node) {
-                       tmp = node->next;
-                       free(node);
-                       node = tmp;
-               }
+       new_t = realloc(old_t, sizeof(struct rcu_table)
+                       + (new_size * sizeof(struct rcu_ht_node *)));
+       if (new_size > old_size)
+               init_table(ht, new_t, old_size, new_size - old_size);
+       cmm_smp_wmb();  /* update content before updating reallocated size */
+       CMM_STORE_SHARED(new_t->size, new_size);
+       if (new_t != old_t) {
+               /* Changing table and size atomically wrt lookups */
+               rcu_assign_pointer(ht->t, new_t);
+               ht->ht_call_rcu(&old_t->head, ht_free_table_cb);
        }
-       free(old_t);
 }
 
-static void ht_resize_shrink(struct rcu_ht *ht)
+static
+void resize_target_update(struct rcu_ht *ht, int growth_order)
 {
-       unsigned long i, new_size;
-       struct rcu_table *new_t, *old_t;
-       struct rcu_ht_node **prev, *node;
-
-       old_t = ht->t;
-       if (old_t->size == 1)
-               return;
-
-       new_size = old_t->size >> 1;
-
-       for (i = 0; i < new_size; i++) {
-               /* Link end with first entry of i + new_size */
-               prev = &old_t->tbl[i];
-               node = *prev;
-               while (node) {
-                       prev = &node->next;
-                       node = *prev;
-               }
-               *prev = old_t->tbl[i + new_size];
-       }
-       smp_wmb();      /* write links before changing size */
-       STORE_SHARED(old_t->size, new_size);
-
-       /* Ensure all concurrent lookups use new size */
-       synchronize_rcu();
-
-       new_t = realloc(old_t, sizeof(struct rcu_table)
-                         + (new_size * sizeof(struct rcu_ht_node *)));
-       /* shrinking, pointers should not move */
-       assert(new_t == old_t);
+       _uatomic_max(&ht->target_size,
+               CMM_LOAD_SHARED(ht->target_size) << growth_order);
 }
 
-/*
- * growth: >0: *2, <0: /2
- */
 void ht_resize(struct rcu_ht *ht, int growth)
 {
-       int ret;
+       resize_target_update(ht, growth);
+       pthread_mutex_lock(&ht->resize_mutex);
+       _do_ht_resize(ht);
+       pthread_mutex_unlock(&ht->resize_mutex);
+}
 
-       ret = pthread_mutex_lock(&ht->resize_mutex);
-       assert(!ret);
-       STORE_SHARED(ht->resize_ongoing, 1);
-       synchronize_rcu();
-       /* All add/remove are waiting on the mutex. */
-       if (growth > 0)
-               ht_resize_grow(ht);
-       else if (growth < 0)
-               ht_resize_shrink(ht);
-       smp_mb();
-       STORE_SHARED(ht->resize_ongoing, 0);
-       ret = pthread_mutex_unlock(&ht->resize_mutex);
-       assert(!ret);
+static
+void do_resize_cb(struct rcu_head *head)
+{
+       struct rcu_resize_work *work =
+               caa_container_of(head, struct rcu_resize_work, head);
+       struct rcu_ht *ht = work->ht;
+
+       pthread_mutex_lock(&ht->resize_mutex);
+       _do_ht_resize(ht);
+       pthread_mutex_unlock(&ht->resize_mutex);
+       free(work);
 }
 
-/*
- * Expects keys <= than pointer size to be encoded in the pointer itself.
- */
-uint32_t ht_jhash(void *key, uint32_t length, uint32_t initval)
+static
+void ht_resize_lazy(struct rcu_ht *ht, int growth)
 {
-       uint32_t ret;
-       void *vkey;
-
-       if (length <= sizeof(void *))
-               vkey = &key;
-       else
-               vkey = key;
-       ret = jhash(vkey, length, initval);
-       return ret;
+       struct rcu_resize_work *work;
+
+       work = malloc(sizeof(*work));
+       work->ht = ht;
+       resize_target_update(ht, growth);
+       ht->ht_call_rcu(&work->head, do_resize_cb);
 }
index 73dce45962382239fc26108b3a967ae289bb5823..3917cd824ca2179aec232a82876440332be53ccc 100644 (file)
@@ -176,7 +176,8 @@ test_urcu_wfs_dynlink_CFLAGS = -DDYNAMIC_LINK_TEST $(AM_CFLAGS)
 test_urcu_wfs_dynlink_LDADD = $(URCU_CDS_LIB)
 
 test_urcu_hash_SOURCES = test_urcu_hash.c $(COMPAT)
-test_ht_LDADD = $(URCU_CDS_LIB)
+test_urcu_hash_CFLAGS = -DRCU_MEMBARRIER $(AM_CFLAGS)
+test_urcu_hash_LDADD = $(URCU) $(URCU_CDS_LIB)
 
 urcutorture.c: api.h
 
index cdc78d54c420f0b9f4128aadd429c2dd5266a261..2750dd3c6527a35bcb75c28b3fa9fe5a08d18aa4 100644 (file)
 #include <unistd.h>
 #include <stdio.h>
 #include <assert.h>
-#include <sys/syscall.h>
 #include <sched.h>
-#include <urcu-ht.h>
-#include <urcu-defer.h>
 #include <errno.h>
 
-#include "../arch.h"
+#ifdef __linux__
+#include <syscall.h>
+#endif
 
 #define HASH_SIZE      32
 #define RAND_POOL      1000
@@ -67,7 +66,9 @@ static inline pid_t gettid(void)
 #else
 #define debug_yield_read()
 #endif
-#include "../urcu.h"
+#include <urcu.h>
+#include <urcu/rculfhash.h>
+#include <urcu-call-rcu.h>
 
 static unsigned int __thread rand_lookup;
 static unsigned long __thread nr_add;
@@ -96,7 +97,7 @@ static unsigned long rduration;
 static inline void loop_sleep(unsigned long l)
 {
        while(l-- != 0)
-               cpu_relax();
+               caa_cpu_relax();
 }
 
 static int verbose_mode;
@@ -180,12 +181,28 @@ void rcu_copy_mutex_unlock(void)
        }
 }
 
-#define ARRAY_POISON 0xDEADBEEF
+static
+unsigned long test_hash(void *_hashseed, void *_key)
+{
+       unsigned long seed = (unsigned long) _hashseed;
+       unsigned long key = (unsigned long) _key;
+       unsigned long v;
+
+       v = key ^ seed;
+       v ^= v << 8;
+       v ^= v << 16;
+       v ^= v << 27;
+       v ^= v >> 8;
+       v ^= v >> 16;
+       v ^= v >> 27;
+
+       return v;
+}
 
 void *thr_reader(void *_count)
 {
        unsigned long long *count = _count;
-       struct test_data *local_ptr;
+       struct rcu_ht_node *node;
 
        printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
                        "reader", pthread_self(), (unsigned long)gettid());
@@ -197,13 +214,13 @@ void *thr_reader(void *_count)
        while (!test_go)
        {
        }
-       smp_mb();
+       cmm_smp_mb();
 
        for (;;) {
                rcu_read_lock();
-               local_ptr = ht_lookup(test_ht,
+               node = ht_lookup(test_ht,
                        (void *)(unsigned long)(rand_r(&rand_lookup) % RAND_POOL));
-               if (local_ptr == NULL)
+               if (node == NULL)
                        lookup_fail++;
                else
                        lookup_ok++;
@@ -227,10 +244,18 @@ void *thr_reader(void *_count)
 
 }
 
+static
+void free_node_cb(struct rcu_head *head)
+{
+       struct rcu_ht_node *node =
+               caa_container_of(head, struct rcu_ht_node, head);
+       free(node);
+}
+
 void *thr_writer(void *_count)
 {
+       struct rcu_ht_node *node;
        unsigned long long *count = _count;
-       struct test_data *data;
        int ret;
 
        printf_verbose("thread_begin %s, thread id : %lx, tid %lu\n",
@@ -239,46 +264,55 @@ void *thr_writer(void *_count)
        set_affinity();
 
        rcu_register_thread();
-       rcu_defer_register_thread();
 
        while (!test_go)
        {
        }
-       smp_mb();
+       cmm_smp_mb();
 
        for (;;) {
                if (rand_r(&rand_lookup) & 1) {
-                       data = malloc(sizeof(struct test_data));
-                       //rcu_copy_mutex_lock();
-                       ret = ht_add(test_ht,
-                           (void *)(unsigned long)(rand_r(&rand_lookup) % RAND_POOL),
-                           data);
+                       node = malloc(sizeof(struct rcu_ht_node));
+                       rcu_read_lock();
+                       ht_node_init(node,
+                               (void *)(unsigned long)(rand_r(&rand_lookup) % RAND_POOL),
+                               (void *) 0x42);
+                       ret = ht_add(test_ht, node);
+                       rcu_read_unlock();
                        if (ret == -EEXIST) {
-                               free(data);
+                               free(node);
                                nr_addexist++;
                        } else {
                                nr_add++;
                        }
-                       //rcu_copy_mutex_unlock();
                } else {
                        /* May delete */
-                       //rcu_copy_mutex_lock();
-                       ret = ht_delete(test_ht,
-                          (void *)(unsigned long)(rand_r(&rand_lookup) % RAND_POOL));
-                       if (ret == -ENOENT)
-                               nr_delnoent++;
+                       rcu_read_lock();
+                       node = ht_lookup(test_ht,
+                               (void *)(unsigned long)(rand_r(&rand_lookup) % RAND_POOL));
+                       if (node)
+                               ret = ht_remove(test_ht, node);
                        else
+                               ret = -ENOENT;
+                       rcu_read_unlock();
+                       if (ret == 0) {
+                               call_rcu(&node->head, free_node_cb);
                                nr_del++;
-                       //rcu_copy_mutex_unlock();
+                       } else
+                               nr_delnoent++;
                }
+#if 0
                //if (nr_writes % 100000 == 0) {
                if (nr_writes % 1000 == 0) {
+                       rcu_read_lock();
                        if (rand_r(&rand_lookup) & 1) {
                                ht_resize(test_ht, 1);
                        } else {
                                ht_resize(test_ht, -1);
                        }
+                       rcu_read_unlock();
                }
+#endif //0
                nr_writes++;
                if (unlikely(!test_duration_write()))
                        break;
@@ -286,7 +320,6 @@ void *thr_writer(void *_count)
                        loop_sleep(wdelay);
        }
 
-       rcu_defer_unregister_thread();
        rcu_unregister_thread();
 
        printf_verbose("thread_end %s, thread id : %lx, tid %lu\n",
@@ -396,8 +429,8 @@ int main(int argc, char **argv)
        tid_writer = malloc(sizeof(*tid_writer) * nr_writers);
        count_reader = malloc(sizeof(*count_reader) * nr_readers);
        count_writer = malloc(sizeof(*count_writer) * nr_writers);
-       test_ht = ht_new(ht_jhash, free, HASH_SIZE, sizeof(unsigned long),
-                        43223455);
+       test_ht = ht_new(test_hash, (void *) 0x42,
+                        HASH_SIZE, call_rcu);
        next_aff = 0;
 
        for (i = 0; i < nr_readers; i++) {
@@ -413,7 +446,7 @@ int main(int argc, char **argv)
                        exit(1);
        }
 
-       smp_mb();
+       cmm_smp_mb();
 
        test_go = 1;
 
@@ -434,9 +467,7 @@ int main(int argc, char **argv)
                tot_writes += count_writer[i];
        }
        rcu_register_thread();
-       rcu_defer_register_thread();
        ret = ht_destroy(test_ht);
-       rcu_defer_unregister_thread();
        rcu_unregister_thread();
        
        printf_verbose("final delete: %d items\n", ret);
index d989221ae889c6726672dcb8e1f95023fa7c1586..fab7d82d77ddbcf7ce31759588e1f50b1929614d 100644 (file)
@@ -2,37 +2,55 @@
 #define _URCU_RCULFHASH_H
 
 #include <stdint.h>
+#include <urcu-call-rcu.h>
+
+struct rcu_ht_node {
+       struct rcu_ht_node *next;
+       void *key;
+       unsigned long hash;
+       unsigned long reverse_hash;
+       unsigned int dummy;
+       void *value;
+       struct rcu_head head;
+};
+
+struct rcu_ht;
 
 /*
  * Caution !
- * Ensure writer threads are registered as urcu readers and with with
- * urcu-defer.
- * Ensure reader threads are registered as urcu readers.
+ * Ensure reader and writer threads are registered as urcu readers.
  */
 
-typedef uint32_t (*ht_hash_fct)(void *key, uint32_t length, uint32_t initval);
+typedef unsigned long (*ht_hash_fct)(void *hashseed, void *key);
+
+static inline
+void ht_node_init(struct rcu_ht_node *node, void *key, void *value)
+{
+       node->key = key;
+       node->value = value;
+       node->dummy = 0;
+}
 
 /*
  * init_size must be power of two.
  */
-struct rcu_ht *ht_new(ht_hash_fct hash_fct, void (*free_fct)(void *data),
-                     unsigned long init_size, uint32_t keylen,
-                     uint32_t hashseed);
-
-int ht_delete_all(struct rcu_ht *ht);
+struct rcu_ht *ht_new(ht_hash_fct hash_fct,
+                     void *hashseed,
+                     unsigned long init_size,
+                     void (*ht_call_rcu)(struct rcu_head *head,
+                               void (*func)(struct rcu_head *head)));
 
 int ht_destroy(struct rcu_ht *ht);
 
-void *ht_lookup(struct rcu_ht *ht, void *key);
-
-int ht_add(struct rcu_ht *ht, void *key, void *data);
+/* Call with rcu_read_lock held. */
+struct rcu_ht_node *ht_lookup(struct rcu_ht *ht, void *key);
 
-int ht_delete(struct rcu_ht *ht, void *key);
+/* Call with rcu_read_lock held. */
+int ht_add(struct rcu_ht *ht, struct rcu_ht_node *node);
 
-void *ht_steal(struct rcu_ht *ht, void *key);
+/* Call with rcu_read_lock held. */
+int ht_remove(struct rcu_ht *ht, struct rcu_ht_node *node);
 
 void ht_resize(struct rcu_ht *ht, int growth);
 
-uint32_t ht_jhash(void *key, uint32_t length, uint32_t initval);
-
 #endif /* _URCU_RCULFHASH_H */
This page took 0.040081 seconds and 4 git commands to generate.