urcu-defer: ensure callbacks will never be enqueued forever
authorMathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Wed, 23 Sep 2009 22:29:21 +0000 (18:29 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
Wed, 23 Sep 2009 22:29:21 +0000 (18:29 -0400)
Even if there are no further callbacks enqueued, ensure that after a 100ms
delay, the callback queue will be dealt with.

Required proper ordering of queue vs futex.

e.g.

 The idea is to perform the "check for empty queue" between the
&defer_thread_futex decrement and the test in wait_defer. It skips the
futex call and proceed if the list is non-empty.

As I am drilling into the problem, it looks very much like an attempt to
implement efficient wait queues in userspace based on sys_futex().

/*
 * Wake-up any waiting defer thread. Called from many concurrent
 * threads.
 */
static void wake_up_defer(void)
{
        if (unlikely(atomic_read(&defer_thread_futex) == -1)) {
                atomic_set(&defer_thread_futex, 0);
                futex(&defer_thread_futex, FUTEX_WAKE, 0,
                      NULL, NULL, 0);
        }
}

/*
 * Defer thread waiting. Single thread.
 */
static void wait_defer(void)
{
        atomic_dec(&defer_thread_futex);
        smp_mb();       /* Write futex before read queue */
        if (rcu_defer_num_callbacks()) {
                smp_mb();       /* Read queue before write futex */
                /* Callbacks are queued, don't wait. */
                atomic_set(&defer_thread_futex, 0);
        } else {
                smp_rmb();      /* Read queue before read futex */
                if (atomic_read(&defer_thread_futex) == -1)
                        futex(&defer_thread_futex, FUTEX_WAIT, -1,
                              NULL, NULL, 0);
        }
}

- call_rcu():
  * queue callbacks to perform
  * smp_mb()
  * wake_up_defer()

- defer thread:
  * for (;;)
    * wait_defer()
    * sleep 100ms (wait for more callbacks to be enqueued)
    * dequeue callbacks, execute them

The goal here is that if call_rcu() enqueues a callback (even if it
races with defer thread going to sleep), there should not be a
potentially infinite delay before it gets executed. Therefore, being
blocked in sys_futex while there is a callback to execute, without any
hope to be woken up unless another callback is queued, would not meet
that design requirement. I think that checking the number of queued
callbacks within wait_defer() as I propose here should address this
situation.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
urcu-defer.c

index 3efe5ba2a5a47ccaff0a2f956efa957fb71bde01..13f94fa0f05b05833152265da9147bdb456e88b6 100644 (file)
@@ -71,29 +71,6 @@ static int num_deferers, alloc_deferers;
 
 static pthread_t tid_defer;
 
-/*
- * Wake-up any waiting defer thread. Called from many concurrent threads.
- */
-static void wake_up_defer(void)
-{
-       if (unlikely(atomic_read(&defer_thread_futex) == -1)) {
-               atomic_set(&defer_thread_futex, 0);
-               futex(&defer_thread_futex, FUTEX_WAKE, 0,
-                     NULL, NULL, 0);
-       }
-}
-
-/*
- * Defer thread waiting. Single thread.
- */
-static void wait_defer(void)
-{
-       atomic_dec(&defer_thread_futex);
-       if (atomic_read(&defer_thread_futex) == -1)
-               futex(&defer_thread_futex, FUTEX_WAIT, -1,
-                     NULL, NULL, 0);
-}
-
 static void internal_urcu_lock(pthread_mutex_t *mutex)
 {
        int ret;
@@ -128,6 +105,51 @@ static void internal_urcu_unlock(pthread_mutex_t *mutex)
        }
 }
 
+/*
+ * Wake-up any waiting defer thread. Called from many concurrent threads.
+ */
+static void wake_up_defer(void)
+{
+       if (unlikely(atomic_read(&defer_thread_futex) == -1)) {
+               atomic_set(&defer_thread_futex, 0);
+               futex(&defer_thread_futex, FUTEX_WAKE, 0,
+                     NULL, NULL, 0);
+       }
+}
+
+static unsigned long rcu_defer_num_callbacks(void)
+{
+       unsigned long num_items = 0, head;
+       struct deferer_registry *index;
+
+       internal_urcu_lock(&urcu_defer_mutex);
+       for (index = registry; index < registry + num_deferers; index++) {
+               head = LOAD_SHARED(index->defer_queue->head);
+               num_items += head - index->defer_queue->tail;
+       }
+       internal_urcu_unlock(&urcu_defer_mutex);
+       return num_items;
+}
+
+/*
+ * Defer thread waiting. Single thread.
+ */
+static void wait_defer(void)
+{
+       atomic_dec(&defer_thread_futex);
+       smp_mb();       /* Write futex before read queue */
+       if (rcu_defer_num_callbacks()) {
+               smp_mb();       /* Read queue before write futex */
+               /* Callbacks are queued, don't wait. */
+               atomic_set(&defer_thread_futex, 0);
+       } else {
+               smp_rmb();      /* Read queue before read futex */
+               if (atomic_read(&defer_thread_futex) == -1)
+                       futex(&defer_thread_futex, FUTEX_WAIT, -1,
+                             NULL, NULL, 0);
+       }
+}
+
 /*
  * Must be called after Q.S. is reached.
  */
@@ -280,6 +302,7 @@ void _rcu_defer_queue(void (*fct)(void *p), void *p)
        smp_wmb();      /* Publish new pointer before head */
                        /* Write q[] before head. */
        STORE_SHARED(defer_queue.head, head);
+       smp_mb();       /* Write queue head before read futex */
        /*
         * Wake-up any waiting defer thread.
         */
This page took 0.026591 seconds and 4 git commands to generate.