workqueue: use RCU to protect waitqueue stack dequeue against ABA
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 22 Oct 2014 00:29:26 +0000 (20:29 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 22 Oct 2014 00:29:26 +0000 (20:29 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
urcu/workqueue-fifo.h

index bc19967a6a062eebc5d9c2d33598caab3f2a8f64..0769acced6667b6d56ffd20b65fd0d160602cdc6 100644 (file)
@@ -100,8 +100,11 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
         * worker threads when threads are busy enough to still be
         * running when work is enqueued.
         */
-       if (was_empty)
+       if (was_empty) {
+               rcu_read_lock();        /* Protect stack dequeue */
                (void) urcu_dequeue_wake_single(&queue->waitqueue);
+               rcu_read_unlock();      /* Protect stack dequeue */
+       }
 }
 
 static inline
@@ -109,7 +112,10 @@ void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 {
        struct urcu_waiters waiters;
 
+       rcu_read_lock();        /* Protect stack dequeue */
        urcu_move_waiters(&waiters, &queue->waitqueue);
+       rcu_read_unlock();      /* Protect stack dequeue */
+
        (void) urcu_wake_all_waiters(&waiters);
 }
 
@@ -142,14 +148,17 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
                pthread_mutex_lock(&queue->sibling_lock);
                cds_list_del_rcu(&worker->sibling_node);
                pthread_mutex_unlock(&queue->sibling_lock);
-
-               /*
-                * Wait for grace period before freeing or reusing
-                * "worker" because used by RCU linked list.
-                */
-               synchronize_rcu();
        }
 
+       /*
+        * Wait for grace period before freeing or reusing
+        * "worker" because used by RCU linked list.
+        * Also prevents ABA for waitqueue stack dequeue: matches RCU
+        * read-side critical sections around dequeue and move all
+        * operations on waitqueue).
+        */
+       synchronize_rcu();
+
        /*
         * Put any local work we still have back into the workqueue.
         */
@@ -163,7 +172,9 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
                 * Wakeup worker thread if we have put work back into
                 * workqueue that was previously empty.
                 */
+               rcu_read_lock();        /* Protect stack dequeue */
                (void) urcu_dequeue_wake_single(&queue->waitqueue);
+               rcu_read_unlock();      /* Protect stack dequeue */
        }
 }
 
@@ -305,6 +316,8 @@ void urcu_accept_work(struct urcu_workqueue *queue,
                 * the queue.
                 */
                cds_wfs_node_init(&worker->wait_node.node);
+               /* Protect stack dequeue against ABA */
+               synchronize_rcu();
                was_empty = !urcu_wait_add(&queue->waitqueue,
                                &worker->wait_node);
                /*
@@ -317,8 +330,11 @@ void urcu_accept_work(struct urcu_workqueue *queue,
                 * a wake up.
                 */
                if (was_empty && !cds_wfcq_empty(&queue->head,
-                                               &queue->tail))
+                                               &queue->tail)) {
+                       rcu_read_lock();        /* Protect stack dequeue */
                        (void) urcu_dequeue_wake_single(&queue->waitqueue);
+                       rcu_read_unlock();      /* Protect stack dequeue */
+               }
        } else {
                /*
                 * Non-NULL next pointer. We are therefore in
This page took 0.0269 seconds and 4 git commands to generate.