rcuja: implement ja_node_clear_nth
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 22 Aug 2012 16:58:17 +0000 (12:58 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 14 May 2013 14:22:23 +0000 (16:22 +0200)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
rcuja/rcuja.c

index 75113954ccdb9dee09193a41321002b623bd6cfd..92e35ea91d4fcb9ada8f185971514d30495aaf56 100644 (file)
@@ -260,6 +260,12 @@ struct cds_ja_inode {
        } u;
 };
 
+enum ja_recompact {
+       JA_RECOMPACT,
+       JA_RECOMPACT_ADD,
+       JA_RECOMPACT_DEL,
+};
+
 static
 struct cds_ja_inode_flag *ja_node_flag(struct cds_ja_inode *node,
                unsigned long type)
@@ -313,7 +319,7 @@ uint8_t ja_linear_node_get_nr_child(const struct cds_ja_type *type,
                struct cds_ja_inode *node)
 {
        assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
-       return CMM_LOAD_SHARED(node->u.data[0]);
+       return rcu_dereference(node->u.data[0]);
 }
 
 /*
@@ -348,10 +354,9 @@ struct cds_ja_inode_flag *ja_linear_node_get_nth(const struct cds_ja_type *type,
        if (i >= nr_child)
                return NULL;
        pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
-       if (caa_unlikely(child_node_flag_ptr))
-               *child_node_flag_ptr = &pointers[i];
        ptr = rcu_dereference(pointers[i]);
-       assert(ja_node_ptr(ptr) != NULL);
+       if (caa_unlikely(child_node_flag_ptr) && ptr)
+               *child_node_flag_ptr = &pointers[i];
        return ptr;
 }
 
@@ -419,6 +424,14 @@ struct cds_ja_inode_flag *ja_pigeon_node_get_nth(const struct cds_ja_type *type,
        return rcu_dereference(*child_node_flag);
 }
 
+static
+struct cds_ja_inode_flag *ja_pigeon_node_get_ith_pos(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               uint8_t i)
+{
+       return ja_pigeon_node_get_nth(type, node, NULL, i);
+}
+
 /*
  * ja_node_get_nth: get nth item from a node.
  * node_flag is already rcu_dereference'd.
@@ -453,18 +466,6 @@ struct cds_ja_inode_flag * ja_node_get_nth(struct cds_ja_inode_flag *node_flag,
        }
 }
 
-/*
- * TODO: use ja_get_nr_child to monitor limits triggering shrink
- * recompaction.
- * Also use ja_get_nr_child to make the difference between resize and
- * pool change of compaction bit(s).
- */
-static
-unsigned int ja_get_nr_child(struct cds_ja_shadow_node *shadow_node)
-{
-       return shadow_node->nr_child;
-}
-
 static
 int ja_linear_node_set_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
@@ -475,7 +476,7 @@ int ja_linear_node_set_nth(const struct cds_ja_type *type,
        uint8_t nr_child;
        uint8_t *values, *nr_child_ptr;
        struct cds_ja_inode_flag **pointers;
-       unsigned int i;
+       unsigned int i, unused = 0;
 
        assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
 
@@ -485,20 +486,35 @@ int ja_linear_node_set_nth(const struct cds_ja_type *type,
        assert(nr_child <= type->max_linear_child);
 
        values = &node->u.data[1];
+       pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
+       /* Check if node value is already populated */
        for (i = 0; i < nr_child; i++) {
-               if (values[i] == n)
-                       return -EEXIST;
+               if (values[i] == n) {
+                       if (pointers[i])
+                               return -EEXIST;
+                       else
+                               break;
+               } else {
+                       if (!pointers[i])
+                               unused++;
+               }
        }
-       if (nr_child >= type->max_linear_child) {
-               /* No space left in this node type */
-               return -ENOSPC;
+       if (i == nr_child && nr_child >= type->max_linear_child) {
+               if (unused)
+                       return -ERANGE; /* recompact node */
+               else
+                       return -ENOSPC; /* No space left in this node type */
+       }
+
+       assert(pointers[i] == NULL);
+       rcu_assign_pointer(pointers[i], child_node_flag);
+       /* If we expanded the nr_child, increment it */
+       if (i == nr_child) {
+               CMM_STORE_SHARED(values[nr_child], n);
+               /* write pointer and value before nr_child */
+               cmm_smp_wmb();
+               CMM_STORE_SHARED(*nr_child_ptr, nr_child + 1);
        }
-       pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
-       assert(pointers[nr_child] == NULL);
-       rcu_assign_pointer(pointers[nr_child], child_node_flag);
-       CMM_STORE_SHARED(values[nr_child], n);
-       cmm_smp_wmb();  /* write value and pointer before nr_child */
-       CMM_STORE_SHARED(*nr_child_ptr, nr_child + 1);
        shadow_node->nr_child++;
        dbg_printf("linear set nth: %u child, shadow: %u child, for node %p shadow %p\n",
                (unsigned int) CMM_LOAD_SHARED(*nr_child_ptr),
@@ -545,7 +561,6 @@ int ja_pigeon_node_set_nth(const struct cds_ja_type *type,
 /*
  * _ja_node_set_nth: set nth item within a node. Return an error
  * (negative error value) if it is already there.
- * TODO: exclusive access on node.
  */
 static
 int _ja_node_set_nth(const struct cds_ja_type *type,
@@ -574,14 +589,129 @@ int _ja_node_set_nth(const struct cds_ja_type *type,
        return 0;
 }
 
+static
+int ja_linear_node_clear_nth(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               uint8_t n)
+{
+       uint8_t nr_child;
+       uint8_t *values, *nr_child_ptr;
+       struct cds_ja_inode_flag **pointers;
+       unsigned int i;
+
+       assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
+
+       nr_child_ptr = &node->u.data[0];
+       dbg_printf("linear clear nth: nr_child_ptr %p\n", nr_child_ptr);
+       nr_child = *nr_child_ptr;
+       assert(nr_child <= type->max_linear_child);
+
+       values = &node->u.data[1];
+       pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
+       for (i = 0; i < nr_child; i++) {
+               if (values[i] == n) {
+                       if (pointers[i])
+                               break;
+                       else
+                               return -ENOENT;
+               }
+       }
+       if (i >= nr_child)
+               return -ENOENT;
+       if (shadow_node->fallback_removal_count) {
+               shadow_node->fallback_removal_count--;
+       } else {
+               if (shadow_node->nr_child <= type->min_child) {
+                       /* We need to try recompacting the node */
+                       return -EFBIG;
+               }
+       }
+       assert(pointers[i] != NULL);
+       rcu_assign_pointer(pointers[i], NULL);
+       /*
+        * Value and nr_child are never changed (would cause ABA issue).
+        * Instead, we leave the pointer to NULL and recompact the node
+        * once in a while. It is allowed to set a NULL pointer to a new
+        * value without recompaction though.
+        * Only update the shadow node accounting.
+        */
+       shadow_node->nr_child--;
+       dbg_printf("linear set nth: %u child, shadow: %u child, for node %p shadow %p\n",
+               (unsigned int) CMM_LOAD_SHARED(*nr_child_ptr),
+               (unsigned int) shadow_node->nr_child,
+               node, shadow_node);
+
+       return 0;
+}
+
+static
+int ja_pool_node_clear_nth(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               uint8_t n)
+{
+       struct cds_ja_inode *linear;
+
+       assert(type->type_class == RCU_JA_POOL);
+       linear = (struct cds_ja_inode *)
+               &node->u.data[((unsigned long) n >> (CHAR_BIT - type->nr_pool_order)) << type->pool_size_order];
+       return ja_linear_node_clear_nth(type, linear, shadow_node, n);
+}
+
+static
+int ja_pigeon_node_clear_nth(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               uint8_t n)
+{
+       struct cds_ja_inode_flag **ptr;
+
+       assert(type->type_class == RCU_JA_PIGEON);
+       ptr = &((struct cds_ja_inode_flag **) node->u.data)[n];
+       if (!*ptr)
+               return -ENOENT;
+       rcu_assign_pointer(*ptr, NULL);
+       shadow_node->nr_child--;
+       return 0;
+}
+
+/*
+ * _ja_node_clear_nth: clear nth item within a node. Return an error
+ * (negative error value) if it is not found (-ENOENT).
+ */
+static
+int _ja_node_clear_nth(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               uint8_t n)
+{
+       switch (type->type_class) {
+       case RCU_JA_LINEAR:
+               return ja_linear_node_clear_nth(type, node, shadow_node, n);
+       case RCU_JA_POOL:
+               return ja_pool_node_clear_nth(type, node, shadow_node, n);
+       case RCU_JA_PIGEON:
+               return ja_pigeon_node_clear_nth(type, node, shadow_node, n);
+       case RCU_JA_NULL:
+               return -ENOENT;
+       default:
+               assert(0);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
 /*
  * ja_node_recompact_add: recompact a node, adding a new child.
  * TODO: for pool type, take selection bit(s) into account.
- * Return 0 on success, -ENOENT if need to retry, or other negative
+ * Return 0 on success, -EAGAIN if need to retry, or other negative
  * error value otherwise.
  */
 static
-int ja_node_recompact_add(struct cds_ja *ja,
+int ja_node_recompact(enum ja_recompact mode,
+               struct cds_ja *ja,
                unsigned int old_type_index,
                const struct cds_ja_type *old_type,
                struct cds_ja_inode *old_node,
@@ -597,32 +727,56 @@ int ja_node_recompact_add(struct cds_ja *ja,
        int ret;
        int fallback = 0;
 
-       if (!shadow_node || old_type_index == NODE_INDEX_NULL) {
-               new_type_index = 0;
-       } else {
-               new_type_index = old_type_index + 1;
+       switch (mode) {
+       case JA_RECOMPACT:
+               new_type_index = old_type_index;
+               break;
+       case JA_RECOMPACT_ADD:
+               if (!shadow_node || old_type_index == NODE_INDEX_NULL) {
+                       new_type_index = 0;
+               } else {
+                       new_type_index = old_type_index + 1;
+               }
+               break;
+       case JA_RECOMPACT_DEL:
+               if (old_type_index == 0) {
+                       new_type_index = NODE_INDEX_NULL;
+               } else {
+                       new_type_index = old_type_index - 1;
+               }
+               break;
+       default:
+               assert(0);
        }
 
 retry:         /* for fallback */
        dbg_printf("Recompact from type %d to type %d\n",
                        old_type_index, new_type_index);
        new_type = &ja_types[new_type_index];
-       new_node = alloc_cds_ja_node(new_type);
-       if (!new_node)
-               return -ENOMEM;
-       new_node_flag = ja_node_flag(new_node, new_type_index);
-
-       dbg_printf("Recompact inherit lock from %p\n", shadow_node);
-       new_shadow_node = rcuja_shadow_set(ja->ht, new_node, shadow_node);
-       if (!new_shadow_node) {
-               free(new_node);
-               return -ENOMEM;
+       if (new_type_index != NODE_INDEX_NULL) {
+               new_node = alloc_cds_ja_node(new_type);
+               if (!new_node)
+                       return -ENOMEM;
+               new_node_flag = ja_node_flag(new_node, new_type_index);
+               dbg_printf("Recompact inherit lock from %p\n", shadow_node);
+               new_shadow_node = rcuja_shadow_set(ja->ht, new_node, shadow_node);
+               if (!new_shadow_node) {
+                       free(new_node);
+                       return -ENOMEM;
+               }
+               if (fallback)
+                       new_shadow_node->fallback_removal_count =
+                                               JA_FALLBACK_REMOVAL_COUNT;
+       } else {
+               new_node = NULL;
+               new_node_flag = NULL;
        }
-       if (fallback)
-               new_shadow_node->fallback_removal_count =
-                                       JA_FALLBACK_REMOVAL_COUNT;
 
-       assert(old_type->type_class != RCU_JA_PIGEON);
+       assert(mode != JA_RECOMPACT_ADD || old_type->type_class != RCU_JA_PIGEON);
+
+       if (new_type_index == NODE_INDEX_NULL)
+               goto skip_copy;
+
        switch (old_type->type_class) {
        case RCU_JA_LINEAR:
        {
@@ -637,6 +791,8 @@ retry:              /* for fallback */
                        ja_linear_node_get_ith_pos(old_type, old_node, i, &v, &iter);
                        if (!iter)
                                continue;
+                       if (mode == JA_RECOMPACT_DEL && v == n)
+                               continue;
                        ret = _ja_node_set_nth(new_type, new_node,
                                        new_shadow_node,
                                        v, iter);
@@ -667,6 +823,8 @@ retry:              /* for fallback */
                                                j, &v, &iter);
                                if (!iter)
                                        continue;
+                               if (mode == JA_RECOMPACT_DEL && v == n)
+                                       continue;
                                ret = _ja_node_set_nth(new_type, new_node,
                                                new_shadow_node,
                                                v, iter);
@@ -680,25 +838,62 @@ retry:            /* for fallback */
                break;
        }
        case RCU_JA_NULL:
-               /* Nothing to copy */
+               assert(mode == JA_RECOMPACT_ADD);
                break;
        case RCU_JA_PIGEON:
+       {
+               uint8_t nr_child;
+               unsigned int i;
+
+               assert(mode == JA_RECOMPACT_DEL);
+               nr_child = shadow_node->nr_child;
+               for (i = 0; i < nr_child; i++) {
+                       struct cds_ja_inode_flag *iter;
+
+                       iter = ja_pigeon_node_get_ith_pos(old_type, old_node, i);
+                       if (!iter)
+                               continue;
+                       if (mode == JA_RECOMPACT_DEL && i == n)
+                               continue;
+                       ret = _ja_node_set_nth(new_type, new_node,
+                                       new_shadow_node,
+                                       i, iter);
+                       if (new_type->type_class == RCU_JA_POOL && ret) {
+                               goto fallback_toosmall;
+                       }
+                       assert(!ret);
+               }
+               break;
+       }
        default:
                assert(0);
                ret = -EINVAL;
                goto end;
        }
+skip_copy:
 
-       /* add node */
-       ret = _ja_node_set_nth(new_type, new_node,
-                       new_shadow_node,
-                       n, child_node_flag);
-       assert(!ret);
-       /* Return pointer to new recompacted new through old_node_flag */
+       if (JA_RECOMPACT_ADD) {
+               /* add node */
+               ret = _ja_node_set_nth(new_type, new_node,
+                               new_shadow_node,
+                               n, child_node_flag);
+               assert(!ret);
+       }
+       /* Return pointer to new recompacted node through old_node_flag */
        *old_node_flag = new_node_flag;
        if (old_node) {
+               int flags;
+
+               flags = RCUJA_SHADOW_CLEAR_FREE_NODE;
+               /*
+                * It is OK to free the lock associated with a node
+                * going to NULL, since we are holding the parent lock.
+                * This synchronizes removal with re-add of that node.
+                */
+               if (new_type_index == NODE_INDEX_NULL)
+                       flags = RCUJA_SHADOW_CLEAR_FREE_LOCK;
                ret = rcuja_shadow_clear(ja->ht, old_node, shadow_node,
-                               RCUJA_SHADOW_CLEAR_FREE_NODE);
+                               flags);
                assert(!ret);
        }
 
@@ -712,7 +907,7 @@ fallback_toosmall:
                        RCUJA_SHADOW_CLEAR_FREE_NODE);
        assert(!ret);
 
-       /* Last type: pigeon */
+       /* Choose fallback type: pigeon */
        new_type_index = (1UL << JA_TYPE_BITS) - 1;
        dbg_printf("Fallback to type %d\n", new_type_index);
        uatomic_inc(&ja->nr_fallback);
@@ -721,7 +916,7 @@ fallback_toosmall:
 }
 
 /*
- * Return 0 on success, -ENOENT if need to retry, or other negative
+ * Return 0 on success, -EAGAIN if need to retry, or other negative
  * error value otherwise.
  */
 static
@@ -743,10 +938,46 @@ int ja_node_set_nth(struct cds_ja *ja,
        type = &ja_types[type_index];
        ret = _ja_node_set_nth(type, node, shadow_node,
                        n, child_node_flag);
-       if (ret == -ENOSPC) {
+       switch (ret) {
+       case -ENOSPC:
                /* Not enough space in node, need to recompact. */
-               ret = ja_node_recompact_add(ja, type_index, type, node,
+               ret = ja_node_recompact(JA_RECOMPACT_ADD, ja, type_index, type, node,
+                               shadow_node, node_flag, n, child_node_flag);
+               break;
+       case -ERANGE:
+               /* Node needs to be recompacted. */
+               ret = ja_node_recompact(JA_RECOMPACT, ja, type_index, type, node,
                                shadow_node, node_flag, n, child_node_flag);
+               break;
+       }
+       return ret;
+}
+
+/*
+ * Return 0 on success, -EAGAIN if need to retry, or other negative
+ * error value otherwise.
+ */
+static
+int ja_node_clear_nth(struct cds_ja *ja,
+               struct cds_ja_inode_flag **node_flag, uint8_t n,
+               struct cds_ja_shadow_node *shadow_node)
+{
+       int ret;
+       unsigned int type_index;
+       const struct cds_ja_type *type;
+       struct cds_ja_inode *node;
+
+       dbg_printf("ja_node_clear_nth for n=%u, node %p, shadow %p\n",
+               (unsigned int) n, ja_node_ptr(*node_flag), shadow_node);
+
+       node = ja_node_ptr(*node_flag);
+       type_index = ja_node_type(*node_flag);
+       type = &ja_types[type_index];
+       ret = _ja_node_clear_nth(type, node, shadow_node, n);
+       if (ret == -EFBIG) {
+               /* Should to try recompaction. */
+               ret = ja_node_recompact(JA_RECOMPACT_DEL, ja, type_index, type, node,
+                               shadow_node, node_flag, n, NULL);
        }
        return ret;
 }
@@ -819,14 +1050,14 @@ int ja_attach_node(struct cds_ja *ja,
        assert(node);
        shadow_node = rcuja_shadow_lookup_lock(ja->ht, node);
        if (!shadow_node) {
-               ret = -ENOENT;
+               ret = -EAGAIN;
                goto end;
        }
        if (parent_node) {
                parent_shadow_node = rcuja_shadow_lookup_lock(ja->ht,
                                                parent_node);
                if (!parent_shadow_node) {
-                       ret = -ENOENT;
+                       ret = -EAGAIN;
                        goto unlock_shadow;
                }
        }
@@ -928,7 +1159,7 @@ int ja_chain_node(struct cds_ja *ja,
        shadow_node = rcuja_shadow_lookup_lock(ja->ht,
                (struct cds_ja_inode *) head);
        if (!shadow_node)
-               return -ENOENT;
+               return -EAGAIN;
        cds_hlist_add_head_rcu(&node->list, head);
        rcuja_shadow_unlock(shadow_node);
        return 0;
@@ -967,7 +1198,7 @@ retry:
                        ret = ja_attach_node(ja, node_flag_ptr,
                                        parent_node_flag, parent2_node_flag,
                                        key, i, new_node);
-                       if (ret == -ENOENT || ret == -EEXIST)
+                       if (ret == -EAGAIN || ret == -EEXIST)
                                goto retry;
                        else
                                goto end;
@@ -996,7 +1227,7 @@ retry:
                        (struct cds_hlist_head *) ja_node_ptr(node_flag),
                        new_node);
        }
-       if (ret == -ENOENT)
+       if (ret == -EAGAIN)
                goto retry;
 end:
        return ret;
This page took 0.031714 seconds and 4 git commands to generate.