dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy));
assert(dummy);
dummy->parent.next = next;
+ dummy->parent.dummy = 1;
dummy->q = q;
return &dummy->parent;
}
{
struct cds_lfq_node_rcu_dummy *dummy;
+ assert(node->dummy);
dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb);
}
{
struct cds_lfq_node_rcu_dummy *dummy;
+ assert(node->dummy);
dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
free(dummy);
}
void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
{
node->next = NULL;
+ node->dummy = 0;
}
static inline
void (*func)(struct rcu_head *head)))
{
q->tail = make_dummy(q, NULL);
- q->dummy = q->tail;
q->head = q->tail;
q->queue_call_rcu = queue_call_rcu;
}
struct cds_lfq_node_rcu *head;
head = rcu_dereference(q->head);
- if (!(head == q->dummy && head->next == NULL))
+ if (!(head->dummy && head->next == NULL))
return -EPERM; /* not empty */
free_dummy(head);
return 0;
}
static inline
-void reenqueue_dummy(struct cds_lfq_queue_rcu *q)
+void enqueue_dummy(struct cds_lfq_queue_rcu *q)
{
struct cds_lfq_node_rcu *node;
/* We need to reallocate to protect from ABA. */
node = make_dummy(q, NULL);
- if (uatomic_cmpxchg(&q->dummy, NULL, node) != NULL) {
- /* other dequeue populated its new dummy */
- free_dummy(node);
- return;
- }
_cds_lfq_enqueue_rcu(q, node);
}
struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
{
for (;;) {
- struct cds_lfq_node_rcu *head, *next, *dummy;
+ struct cds_lfq_node_rcu *head, *next;
head = rcu_dereference(q->head);
next = rcu_dereference(head->next);
- dummy = rcu_dereference(q->dummy);
- if (head == dummy && next == NULL)
+ if (head->dummy && next == NULL)
return NULL; /* empty */
/*
* We never, ever allow dequeue to get to a state where
* the queue is empty (we need at least one node in the
* queue). This is ensured by checking if the head next
- * is NULL. This means a concurrent dummy node
- * re-enqueue is in progress. We help it reach
- * completion, and retry.
+ * is NULL, which means we need to enqueue a dummy node
+ * before we can hope dequeuing anything.
*/
if (!next) {
- /*
- * Dummy node re-enqueue is in progress. Try to
- * help.
- */
- reenqueue_dummy(q);
- continue; /* try again */
+ enqueue_dummy(q);
+ next = rcu_dereference(head->next);
}
if (uatomic_cmpxchg(&q->head, head, next) != head)
continue; /* Concurrently pushed. */
- if (head == dummy) {
- /* Free old and requeue new dummy. */
- rcu_set_pointer(&q->dummy, NULL);
- rcu_free_dummy(dummy);
- reenqueue_dummy(q);
+ if (head->dummy) {
+ /* Free dummy after grace period. */
+ rcu_free_dummy(head);
continue; /* try again */
}
return head;