From: Mathieu Desnoyers Date: Fri, 1 Jul 2011 23:51:48 +0000 (-0400) Subject: rculfqueue: provide locklessness by allowing multiple dummy nodes X-Git-Tag: v0.6.5~61 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=fbdb32f60f6c185c76dc5c8b9171e2f7a69dabe0;p=userspace-rcu.git rculfqueue: provide locklessness by allowing multiple dummy nodes Signed-off-by: Mathieu Desnoyers --- diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h index b9ed63b..598fa50 100644 --- a/urcu/rculfqueue.h +++ b/urcu/rculfqueue.h @@ -34,10 +34,11 @@ struct cds_lfq_queue_rcu; struct cds_lfq_node_rcu { struct cds_lfq_node_rcu *next; + int dummy; }; struct cds_lfq_queue_rcu { - struct cds_lfq_node_rcu *head, *tail, *dummy; + struct cds_lfq_node_rcu *head, *tail; void (*queue_call_rcu)(struct rcu_head *head, void (*func)(struct rcu_head *head)); }; diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index f3df7c0..fea6110 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -68,6 +68,7 @@ struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q, dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy)); assert(dummy); dummy->parent.next = next; + dummy->parent.dummy = 1; dummy->q = q; return &dummy->parent; } @@ -85,6 +86,7 @@ void rcu_free_dummy(struct cds_lfq_node_rcu *node) { struct cds_lfq_node_rcu_dummy *dummy; + assert(node->dummy); dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb); } @@ -94,6 +96,7 @@ void free_dummy(struct cds_lfq_node_rcu *node) { struct cds_lfq_node_rcu_dummy *dummy; + assert(node->dummy); dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); free(dummy); } @@ -102,6 +105,7 @@ static inline void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) { node->next = NULL; + node->dummy = 0; } static inline @@ -110,7 +114,6 @@ void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, void (*func)(struct rcu_head *head))) { q->tail = make_dummy(q, NULL); - q->dummy = q->tail; q->head = q->tail; q->queue_call_rcu = queue_call_rcu; } @@ -126,7 +129,7 @@ int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) struct cds_lfq_node_rcu *head; head = rcu_dereference(q->head); - if (!(head == q->dummy && head->next == NULL)) + if (!(head->dummy && head->next == NULL)) return -EPERM; /* not empty */ free_dummy(head); return 0; @@ -169,17 +172,12 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, } static inline -void reenqueue_dummy(struct cds_lfq_queue_rcu *q) +void enqueue_dummy(struct cds_lfq_queue_rcu *q) { struct cds_lfq_node_rcu *node; /* We need to reallocate to protect from ABA. */ node = make_dummy(q, NULL); - if (uatomic_cmpxchg(&q->dummy, NULL, node) != NULL) { - /* other dequeue populated its new dummy */ - free_dummy(node); - return; - } _cds_lfq_enqueue_rcu(q, node); } @@ -194,36 +192,28 @@ static inline struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { - struct cds_lfq_node_rcu *head, *next, *dummy; + struct cds_lfq_node_rcu *head, *next; head = rcu_dereference(q->head); next = rcu_dereference(head->next); - dummy = rcu_dereference(q->dummy); - if (head == dummy && next == NULL) + if (head->dummy && next == NULL) return NULL; /* empty */ /* * We never, ever allow dequeue to get to a state where * the queue is empty (we need at least one node in the * queue). This is ensured by checking if the head next - * is NULL. This means a concurrent dummy node - * re-enqueue is in progress. We help it reach - * completion, and retry. + * is NULL, which means we need to enqueue a dummy node + * before we can hope dequeuing anything. */ if (!next) { - /* - * Dummy node re-enqueue is in progress. Try to - * help. - */ - reenqueue_dummy(q); - continue; /* try again */ + enqueue_dummy(q); + next = rcu_dereference(head->next); } if (uatomic_cmpxchg(&q->head, head, next) != head) continue; /* Concurrently pushed. */ - if (head == dummy) { - /* Free old and requeue new dummy. */ - rcu_set_pointer(&q->dummy, NULL); - rcu_free_dummy(dummy); - reenqueue_dummy(q); + if (head->dummy) { + /* Free dummy after grace period. */ + rcu_free_dummy(head); continue; /* try again */ } return head;