rculfhash: support replacement operation
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 21 Sep 2011 03:05:08 +0000 (23:05 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 21 Sep 2011 03:05:08 +0000 (23:05 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
rculfhash.c
tests/test_urcu_hash.c
urcu/rculfhash.h

index 074a6f4ea8c4c40f7b2f7374d3ecdc8387e34bff..30c5c8a2e75390e72623e45af1f0889bbbcccd84 100644 (file)
 
 /*
  * The removed flag needs to be updated atomically with the pointer.
+ * It indicates that no node must attach to the node scheduled for
+ * removal. The gc flag also needs to be updated atomically with the
+ * pointer. It indicates that node garbage collection must be performed.
+ * "removed" and "gc" flags are separate for the benefit of replacement
+ * operation.
  * The dummy flag does not require to be updated atomically with the
  * pointer, but it is added as a pointer low bit flag to save space.
  */
 #define REMOVED_FLAG           (1UL << 0)
-#define DUMMY_FLAG             (1UL << 1)
-#define FLAGS_MASK             ((1UL << 2) - 1)
+#define GC_FLAG                        (1UL << 1)
+#define DUMMY_FLAG             (1UL << 2)
+#define FLAGS_MASK             ((1UL << 3) - 1)
 
 /* Value of the end pointer. Should not interact with flags. */
 #define END_VALUE              NULL
@@ -261,11 +267,22 @@ struct partition_resize_work {
                    unsigned long start, unsigned long len);
 };
 
+enum add_mode {
+       ADD_DEFAULT = 0,
+       ADD_UNIQUE = 1,
+       ADD_REPLACE = 2,
+};
+
 static
 struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
                                unsigned long size,
                                struct cds_lfht_node *node,
-                               int unique, int dummy);
+                               enum add_mode mode, int dummy);
+
+static
+int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
+               struct cds_lfht_node *node,
+               int dummy_removal, int do_gc);
 
 /*
  * Algorithm to reverse bits in a word by lookup table, extended to
@@ -661,6 +678,18 @@ struct cds_lfht_node *flag_removed(struct cds_lfht_node *node)
        return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG);
 }
 
+static
+int is_gc(struct cds_lfht_node *node)
+{
+       return ((unsigned long) node) & GC_FLAG;
+}
+
+static
+struct cds_lfht_node *flag_gc(struct cds_lfht_node *node)
+{
+       return (struct cds_lfht_node *) (((unsigned long) node) | GC_FLAG);
+}
+
 static
 int is_dummy(struct cds_lfht_node *node)
 {
@@ -716,8 +745,10 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node
        struct cds_lfht_node *iter_prev, *iter, *next, *new_next;
 
        assert(!is_dummy(dummy));
+       assert(!is_gc(dummy));
        assert(!is_removed(dummy));
        assert(!is_dummy(node));
+       assert(!is_gc(node));
        assert(!is_removed(node));
        for (;;) {
                iter_prev = dummy;
@@ -737,16 +768,18 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node
                        if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash))
                                return;
                        next = rcu_dereference(clear_flag(iter)->p.next);
-                       if (likely(is_removed(next)))
+                       if (likely(is_gc(next)))
                                break;
                        iter_prev = clear_flag(iter);
                        iter = next;
                }
-               assert(!is_removed(iter));
+               assert(!is_gc(iter));
                if (is_dummy(iter))
                        new_next = flag_dummy(clear_flag(next));
                else
                        new_next = clear_flag(next);
+               if (is_removed(iter))
+                       new_next = flag_removed(new_next);
                (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next);
        }
        return;
@@ -756,14 +789,15 @@ static
 struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
                                unsigned long size,
                                struct cds_lfht_node *node,
-                               int unique, int dummy)
+                               enum add_mode mode, int dummy)
 {
        struct cds_lfht_node *iter_prev, *iter, *next, *new_node, *new_next,
-                       *dummy_node;
+                       *dummy_node, *return_node, *replace_pinned = NULL;
        struct _cds_lfht_node *lookup;
        unsigned long hash, index, order;
 
        assert(!is_dummy(node));
+       assert(!is_gc(node));
        assert(!is_removed(node));
        if (!size) {
                assert(dummy);
@@ -772,8 +806,10 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
        }
        hash = bit_reverse_ulong(node->p.reverse_hash);
        for (;;) {
-               uint32_t chain_len = 0;
+               uint32_t chain_len;
 
+       retry:
+               chain_len = 0;
                /*
                 * iter_prev points to the non-removed node prior to the
                 * insert location.
@@ -791,14 +827,37 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
                        if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash))
                                goto insert;
                        next = rcu_dereference(clear_flag(iter)->p.next);
-                       if (unlikely(is_removed(next)))
+                       if (unlikely(is_gc(next)))
                                goto gc_node;
-                       if (unique
+                       if (unlikely(replace_pinned)) {
+                               /*
+                                * We're in the retry of a node
+                                * replacement. Only get exact iter
+                                * pointer match. We own it, so it
+                                * _needs_ to be there at some point.
+                                */
+                               if (clear_flag(iter) == replace_pinned)
+                                       goto replace;
+                       }
+                       /*
+                        * Next is removed but not gc'd. We need to
+                        * busy-loop, because a concurrent replacement
+                        * is keeping it temporarily pinned there but we
+                        * cannot attach to it. The easiest solution is
+                        * to retry.
+                        */
+                       if (unlikely(is_removed(next)))
+                               goto retry;
+                       if ((mode == ADD_UNIQUE || mode == ADD_REPLACE)
                            && !is_dummy(next)
                            && !ht->compare_fct(node->key, node->key_len,
                                                clear_flag(iter)->key,
-                                               clear_flag(iter)->key_len))
-                               return clear_flag(iter);
+                                               clear_flag(iter)->key_len)) {
+                               if (mode == ADD_UNIQUE)
+                                       return clear_flag(iter);
+                               else /* mode == ADD_REPLACE */
+                                       goto replace;
+                       }
                        /* Only account for identical reverse hash once */
                        if (iter_prev->p.reverse_hash != clear_flag(iter)->p.reverse_hash
                            && !is_dummy(next))
@@ -806,11 +865,15 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
                        iter_prev = clear_flag(iter);
                        iter = next;
                }
+
        insert:
                assert(node != clear_flag(iter));
                assert(!is_removed(iter_prev));
                assert(!is_removed(iter));
+               assert(!is_gc(iter_prev));
+               assert(!is_gc(iter));
                assert(iter_prev != node);
+               assert(!replace_pinned);
                if (!dummy)
                        node->p.next = clear_flag(iter);
                else
@@ -820,12 +883,76 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
                else
                        new_node = node;
                if (uatomic_cmpxchg(&iter_prev->p.next, iter,
-                                   new_node) != iter)
+                                   new_node) != iter) {
                        continue;       /* retry */
-               else
+               } else {
+                       if (mode == ADD_REPLACE)
+                               return_node = NULL;
+                       else    /* ADD_DEFAULT and ADD_UNIQUE */
+                               return_node = node;
                        goto gc_end;
+               }
+
+       replace:
+               assert(node != clear_flag(iter));
+               assert(!is_removed(iter_prev));
+               assert(!is_removed(iter));
+               assert(!is_gc(iter_prev));
+               assert(!is_gc(iter));
+               assert(iter_prev != node);
+               assert(!dummy);
+               node->p.next = clear_flag(next);
+               if (is_dummy(iter))
+                       new_node = flag_dummy(node);
+               else
+                       new_node = node;
+               /*
+                * Try to delete to-be-replaced node. Don't gc yet. Not
+                * performing gc here is important, because this lets
+                * concurrent lookups see the old node until we
+                * atomically swap the new node into its place.
+                *
+                * This algorithm is _not_ strictly lock-free between
+                * _cds_lfht_del and the uatomic_cmpxchg of the
+                * replacement operation, so a replacement should _not_
+                * crash here (which means: don't do replacements if you
+                * need strict lock-free guarantees).
+                */
+               if (!replace_pinned) {
+                       if (_cds_lfht_del(ht, size, clear_flag(iter), 0, 0))
+                               continue;       /* concurrently removed. retry. */
+               }
+               /*
+                * After _cds_lfht_del succeeds, we have pinned the
+                * to-be-removed node in place by setting its removed
+                * flag, but not its gc flag. If we fail to cmpxchg our
+                * new node with this node, we need to retry everything
+                * from the initial lookup, and only stop when we reach
+                * the node we pinned into place.
+                */
+               return_node = uatomic_cmpxchg(&iter_prev->p.next,
+                                             iter, new_node);
+               if (return_node != iter) {
+                       /*
+                        * If cmpxchg fails, we need to do path
+                        * compression, but end it by placing our own
+                        * node into place.
+                        */
+                       replace_pinned = clear_flag(iter);
+                       continue;       /* retry */
+               } else {
+                       /*
+                        * cmpxchg succeeded. gc unnecessary, because we
+                        * unlinked the return_node ourself with the
+                        * cmpxchg.
+                        */
+                       return_node = clear_flag(return_node);
+                       goto end;
+               }
+
        gc_node:
                assert(!is_removed(iter));
+               assert(!is_gc(iter));
                if (is_dummy(iter))
                        new_next = flag_dummy(clear_flag(next));
                else
@@ -840,13 +967,14 @@ gc_end:
        lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))];
        dummy_node = (struct cds_lfht_node *) lookup;
        _cds_lfht_gc_bucket(dummy_node, node);
-       return node;
+end:
+       return return_node;
 }
 
 static
 int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
                struct cds_lfht_node *node,
-               int dummy_removal)
+               int dummy_removal, int do_gc)
 {
        struct cds_lfht_node *dummy, *next, *old;
        struct _cds_lfht_node *lookup;
@@ -855,9 +983,12 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
 
        /* logically delete the node */
        assert(!is_dummy(node));
+       assert(!is_gc(node));
        assert(!is_removed(node));
        old = rcu_dereference(node->p.next);
        do {
+               struct cds_lfht_node *new_next;
+
                next = old;
                if (unlikely(is_removed(next)))
                        goto end;
@@ -865,13 +996,18 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
                        assert(is_dummy(next));
                else
                        assert(!is_dummy(next));
-               old = uatomic_cmpxchg(&node->p.next, next,
-                                     flag_removed(next));
+               new_next = flag_removed(next);
+               if (do_gc)
+                       new_next = flag_gc(new_next);
+               old = uatomic_cmpxchg(&node->p.next, next, new_next);
        } while (old != next);
 
        /* We performed the (logical) deletion. */
        flagged = 1;
 
+       if (!do_gc)
+               goto end;
+
        /*
         * Ensure that the node is not visible to readers anymore: lookup for
         * the node, and remove it (along with any other logically removed node)
@@ -975,7 +1111,7 @@ void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
                new_node->p.reverse_hash =
                        bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j);
                (void) _cds_lfht_add(ht, !i ? 0 : (1UL << (i - 1)),
-                               new_node, 0, 1);
+                               new_node, ADD_DEFAULT, 1);
                if (CMM_LOAD_SHARED(ht->in_progress_destroy))
                        break;
        }
@@ -1078,7 +1214,7 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i,
                fini_node->p.reverse_hash =
                        bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j);
                (void) _cds_lfht_del(ht, !i ? 0 : (1UL << (i - 1)),
-                               fini_node, 1);
+                               fini_node, 1, 1);
                if (CMM_LOAD_SHARED(ht->in_progress_destroy))
                        break;
        }
@@ -1225,7 +1361,11 @@ struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key
                        break;
                }
                next = rcu_dereference(node->p.next);
-               if (likely(!is_removed(next))
+               /*
+                * We consider return nodes marked removed but not gc as
+                * hits for lookup vs replacement consistency.
+                */
+               if (likely(!is_gc(next))
                    && !is_dummy(next)
                    && likely(!ht->compare_fct(node->key, node->key_len, key, key_len))) {
                                break;
@@ -1260,7 +1400,11 @@ struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht,
                        break;
                }
                next = rcu_dereference(node->p.next);
-               if (likely(!is_removed(next))
+               /*
+                * We consider return nodes marked removed but not gc as
+                * hits for lookup vs replacement consistency.
+                */
+               if (likely(!is_gc(next))
                    && !is_dummy(next)
                    && likely(!ht->compare_fct(node->key, node->key_len, key, key_len))) {
                                break;
@@ -1279,12 +1423,12 @@ void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node)
        node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash);
 
        size = rcu_dereference(ht->t.size);
-       (void) _cds_lfht_add(ht, size, node, 0, 0);
+       (void) _cds_lfht_add(ht, size, node, ADD_DEFAULT, 0);
        ht_count_add(ht, size);
 }
 
 struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht,
-                                       struct cds_lfht_node *node)
+                               struct cds_lfht_node *node)
 {
        unsigned long hash, size;
        struct cds_lfht_node *ret;
@@ -1293,19 +1437,35 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht,
        node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash);
 
        size = rcu_dereference(ht->t.size);
-       ret = _cds_lfht_add(ht, size, node, 1, 0);
+       ret = _cds_lfht_add(ht, size, node, ADD_UNIQUE, 0);
        if (ret == node)
                ht_count_add(ht, size);
        return ret;
 }
 
+struct cds_lfht_node *cds_lfht_replace(struct cds_lfht *ht,
+                               struct cds_lfht_node *node)
+{
+       unsigned long hash, size;
+       struct cds_lfht_node *ret;
+
+       hash = ht->hash_fct(node->key, node->key_len, ht->hash_seed);
+       node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash);
+
+       size = rcu_dereference(ht->t.size);
+       ret = _cds_lfht_add(ht, size, node, ADD_REPLACE, 0);
+       if (ret == NULL)
+               ht_count_add(ht, size);
+       return ret;
+}
+
 int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
 {
        unsigned long size;
        int ret;
 
        size = rcu_dereference(ht->t.size);
-       ret = _cds_lfht_del(ht, size, node, 0);
+       ret = _cds_lfht_del(ht, size, node, 0, 1);
        if (!ret)
                ht_count_del(ht, size);
        return ret;
@@ -1326,6 +1486,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht)
                if (!is_dummy(node))
                        return -EPERM;
                assert(!is_removed(node));
+               assert(!is_gc(node));
        } while (!is_end(node));
        /*
         * size accessed without rcu_dereference because hash table is
@@ -1386,7 +1547,7 @@ void cds_lfht_count_nodes(struct cds_lfht *ht,
        node = (struct cds_lfht_node *) lookup;
        do {
                next = rcu_dereference(node->p.next);
-               if (is_removed(next)) {
+               if (is_removed(next) || is_gc(next)) {
                        assert(!is_dummy(next));
                        (*removed)++;
                } else if (!is_dummy(next))
index c19a4876df4443add7031311ccf161504abb08f6..89063b98626042262341fdcc27e5b6ee28c0277c 100644 (file)
@@ -116,7 +116,7 @@ static unsigned long rduration;
 static unsigned long init_hash_size = DEFAULT_HASH_SIZE;
 static unsigned long init_populate;
 static int opt_auto_resize;
-static int add_only, add_unique;
+static int add_only, add_unique, add_replace;
 
 static unsigned long init_pool_offset, lookup_pool_offset, write_pool_offset;
 static unsigned long init_pool_size = DEFAULT_RAND_POOL,
@@ -467,16 +467,26 @@ void *thr_writer(void *_count)
                        cds_lfht_node_init(node,
                                (void *)(((unsigned long) rand_r(&rand_lookup) % write_pool_size) + write_pool_offset),
                                sizeof(void *));
-                       if (add_unique)
+                       if (add_unique) {
                                ret_node = cds_lfht_add_unique(test_ht, node);
-                       else
-                               cds_lfht_add(test_ht, node);
+                       } else {
+                               if (add_replace)
+                                       ret_node = cds_lfht_replace(test_ht, node);
+                               else
+                                       cds_lfht_add(test_ht, node);
+                       }
                        rcu_read_unlock();
                        if (add_unique && ret_node != node) {
                                free(node);
                                nr_addexist++;
-                       } else
-                               nr_add++;
+                       } else {
+                               if (add_replace && ret_node) {
+                                       call_rcu(&ret_node->head, free_node_cb);
+                                       nr_addexist++;
+                               } else {
+                                       nr_add++;
+                               }
+                       }
                } else {
                        /* May delete */
                        rcu_read_lock();
@@ -536,9 +546,9 @@ static int populate_hash(void)
        if (!init_populate)
                return 0;
 
-       if (add_unique && init_populate * 10 > init_pool_size) {
+       if ((add_unique || add_replace) && init_populate * 10 > init_pool_size) {
                printf("WARNING: required to populate %lu nodes (-k), but random "
-"pool is quite small (%lu values) and we are in add_unique (-u) mode. Try with a "
+"pool is quite small (%lu values) and we are in add_unique (-u) or add_replace (-s) mode. Try with a "
 "larger random pool (-p option). This may take a while...\n", init_populate, init_pool_size);
        }
 
@@ -547,15 +557,27 @@ static int populate_hash(void)
                cds_lfht_node_init(node,
                        (void *)(((unsigned long) rand_r(&rand_lookup) % init_pool_size) + init_pool_offset),
                        sizeof(void *));
-               if (add_unique)
+               rcu_read_lock();
+               if (add_unique) {
                        ret_node = cds_lfht_add_unique(test_ht, node);
-               else
-                       cds_lfht_add(test_ht, node);
+               } else {
+                       if (add_replace)
+                               ret_node = cds_lfht_replace(test_ht, node);
+                       else
+                               cds_lfht_add(test_ht, node);
+               }
+               rcu_read_unlock();
                if (add_unique && ret_node != node) {
                        free(node);
                        nr_addexist++;
-               } else
-                       nr_add++;
+               } else {
+                       if (add_replace && ret_node) {
+                               call_rcu(&ret_node->head, free_node_cb);
+                               nr_addexist++;
+                       } else {
+                               nr_add++;
+                       }
+               }
                nr_writes++;
        }
        return 0;
@@ -563,27 +585,29 @@ static int populate_hash(void)
 
 void show_usage(int argc, char **argv)
 {
-       printf("Usage : %s nr_readers nr_writers duration (s)", argv[0]);
+       printf("Usage : %s nr_readers nr_writers duration (s)\n", argv[0]);
 #ifdef DEBUG_YIELD
-       printf(" [-r] [-w] (yield reader and/or writer)");
+       printf("        [-r] [-w] (yield reader and/or writer)\n");
 #endif
-       printf(" [-d delay] (writer period (us))");
-       printf(" [-c duration] (reader C.S. duration (in loops))");
-       printf(" [-v] (verbose output)");
-       printf(" [-a cpu#] [-a cpu#]... (affinity)");
-       printf(" [-h size] (initial hash table size)");
-       printf(" [-u] Uniquify add.");
-       printf(" [-i] Add only (no removal).");
-       printf(" [-k nr_nodes] Number of nodes to insert initially.");
-       printf(" [-A] Automatically resize hash table.");
-       printf(" [-R offset] Lookup pool offset.");
-       printf(" [-S offset] Write pool offset.");
-       printf(" [-T offset] Init pool offset.");
-       printf(" [-M size] Lookup pool size.");
-       printf(" [-N size] Write pool size.");
-       printf(" [-O size] Init pool size.");
-       printf(" [-V] Validate lookups of init values (use with filled init pool, same lookup range, with different write range).");
-       printf("\n");
+       printf("        [-d delay] (writer period (us))\n");
+       printf("        [-c duration] (reader C.S. duration (in loops))\n");
+       printf("        [-v] (verbose output)\n");
+       printf("        [-a cpu#] [-a cpu#]... (affinity)\n");
+       printf("        [-h size] (initial hash table size)\n");
+       printf("        [not -u nor -s] Add entries (supports redundant keys).\n");
+       printf("        [-u] Uniquify add (no redundant keys).\n");
+       printf("        [-s] Replace (swap) entries.\n");
+       printf("        [-i] Add only (no removal).\n");
+       printf("        [-k nr_nodes] Number of nodes to insert initially.\n");
+       printf("        [-A] Automatically resize hash table.\n");
+       printf("        [-R offset] Lookup pool offset.\n");
+       printf("        [-S offset] Write pool offset.\n");
+       printf("        [-T offset] Init pool offset.\n");
+       printf("        [-M size] Lookup pool size.\n");
+       printf("        [-N size] Write pool size.\n");
+       printf("        [-O size] Init pool size.\n");
+       printf("        [-V] Validate lookups of init values (use with filled init pool, same lookup range, with different write range).\n");
+       printf("\n\n");
 }
 
 int main(int argc, char **argv)
@@ -670,8 +694,19 @@ int main(int argc, char **argv)
                        init_hash_size = atol(argv[++i]);
                        break;
                case 'u':
+                       if (add_replace) {
+                               printf("Please specify at most one of -s or -u.\n");
+                               exit(-1);
+                       }
                        add_unique = 1;
                        break;
+               case 's':
+                       if (add_unique) {
+                               printf("Please specify at most one of -s or -u.\n");
+                               exit(-1);
+                       }
+                       add_replace = 1;
+                       break;
                case 'i':
                        add_only = 1;
                        break;
@@ -733,7 +768,7 @@ int main(int argc, char **argv)
        printf_verbose("Reader duration : %lu loops.\n", rduration);
        printf_verbose("Mode:%s%s.\n",
                add_only ? " add only" : " add/remove",
-               add_unique ? " uniquify" : "");
+               add_unique ? " uniquify" : ( add_replace ? " replace" : " insert"));
        printf_verbose("Initial hash table size: %lu buckets.\n", init_hash_size);
        printf_verbose("Init pool size offset %lu size %lu.\n",
                init_pool_offset, init_pool_size);
@@ -748,13 +783,21 @@ int main(int argc, char **argv)
        tid_writer = malloc(sizeof(*tid_writer) * nr_writers);
        count_reader = malloc(sizeof(*count_reader) * nr_readers);
        count_writer = malloc(sizeof(*count_writer) * nr_writers);
+
+       err = create_all_cpu_call_rcu_data(0);
+        assert(!err);
+
+       /*
+        * Hash creation and population needs to be seen as a RCU reader
+        * thread from the point of view of resize.
+        */
+       rcu_register_thread();
        test_ht = cds_lfht_new(test_hash, test_compare, 0x42UL,
                        init_hash_size,
                        opt_auto_resize ? CDS_LFHT_AUTO_RESIZE : 0, NULL);
-       ret = populate_hash();
+       ret = populate_hash();
        assert(!ret);
-        err = create_all_cpu_call_rcu_data(0);
-        assert(!err);
+       rcu_unregister_thread();
 
        next_aff = 0;
 
index c1c9ddcf2a28d662668bf0ac465e658417730828..359edb574661400d53c3a2a94c79bf185fa67eb7 100644 (file)
@@ -34,13 +34,13 @@ extern "C" {
 
 /*
  * struct cds_lfht_node and struct _cds_lfht_node should be aligned on
- * 4-bytes boundaries because the two lower bits are used as flags.
+ * 8-bytes boundaries because the two lower bits are used as flags.
  */
 
 struct _cds_lfht_node {
-       struct cds_lfht_node *next;     /* ptr | DUMMY_FLAG | REMOVED_FLAG */
+       struct cds_lfht_node *next;     /* ptr | DUMMY_FLAG | GC_FLAG | REMOVED_FLAG */
        unsigned long reverse_hash;
-};
+} __attribute__((aligned(8)));
 
 struct cds_lfht_node {
        /* cache-hot for iteration */
@@ -180,6 +180,7 @@ struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_node *n
  * cds_lfht_add - add a node to the hash table.
  *
  * Call with rcu_read_lock held.
+ * This function supports adding redundant keys into the table.
  */
 void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node);
 
@@ -191,15 +192,50 @@ void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node);
  * cds_lfht_add_unique fails, the node passed as parameter should be
  * freed by the caller.
  * Call with rcu_read_lock held.
+ *
+ * The semantic of this function is that if only this function is used
+ * to add keys into the table, no duplicated keys should ever be
+ * observable in the table. The same guarantee apply for combination of
+ * add_unique and replace (see below).
  */
 struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, struct cds_lfht_node *node);
 
+/*
+ * cds_lfht_replace - replace a node within hash table.
+ *
+ * Return the node replaced upon success. If no node matching the key
+ * was present, return NULL, which also means the operation succeeded.
+ * This replacement operation should never fail.
+ * Call with rcu_read_lock held.
+ * After successful replacement, a grace period must be waited for before
+ * freeing the memory reserved for the returned node.
+ *
+ * The semantic of replacement vs lookups is the following: if lookups
+ * are performed between a key insertion and its removal, we guarantee
+ * that the lookups will always find the key if it is replaced
+ * concurrently with the lookups. Providing this guarantee require us to
+ * pin the node to remove in place (disallowing any insertion after this
+ * node temporarily) before we can proceed to its exchange with the new
+ * node atomically. This renders the "replace" operation not strictly
+ * lock-free, because a thread crashing in the middle of the replace
+ * operation could stop progress for other updaters.
+ *
+ * Providing this semantic allows us to ensure that replacement-only
+ * schemes will never generate duplicated keys. It also allows us to
+ * guarantee that a combination of replacement and add_unique updates
+ * will never generate duplicated keys.
+ */
+struct cds_lfht_node *cds_lfht_replace(struct cds_lfht *ht, struct cds_lfht_node *node);
+
 /*
  * cds_lfht_del - remove node from hash table.
  *
+ * Return 0 if the node is successfully removed.
  * Node can be looked up with cds_lfht_lookup. RCU read-side lock must
  * be held between lookup and removal.
  * Call with rcu_read_lock held.
+ * After successful removal, a grace period must be waited for before
+ * freeing the memory reserved for node.
  */
 int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node);
 
This page took 0.039291 seconds and 4 git commands to generate.