Fix: high cpu usage in synchronize_rcu with long RCU read-side C.S.
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 1 Mar 2014 16:33:25 +0000 (11:33 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 1 Mar 2014 17:28:38 +0000 (12:28 -0500)
We noticed that with this kind of scenario:
- application using urcu-mb, urcu-membarrier, urcu-signal, or urcu-bp,
- long RCU read-side critical sections, caused by e.g. long network I/O
  system calls,
- other short lived RCU critical sections running in other threads,
- very frequent invocation of call_rcu to enqueue callbacks,
lead to abnormally high CPU usage within synchronize_rcu() in the
call_rcu worker threads.

Inspection of the code gives us the answer: in urcu.c, we expect that if
we need to wait on a futex (wait_gp()), we expect to be able to end the
grace period within the next loop, having been notified by a
rcu_read_unlock(). However, this is not always the case: we can very
well be awakened by a rcu_read_unlock() executed on a thread running
short-lived RCU read-side critical sections, while the long-running RCU
read-side C.S. is still active. We end up in a situation where we
busy-wait for a very long time, because the counter is !=
RCU_QS_ACTIVE_ATTEMPTS until a 32-bit overflow happens (or more likely,
until we complete the grace period). We need to change the wait_loops ==
RCU_QS_ACTIVE_ATTEMPTS check into an inequality to use wait_gp() for
every attempts beyond RCU_QS_ACTIVE_ATTEMPTS loops.

urcu-bp.c also has this issue. Moreover, it uses usleep() rather than
poll() when dealing with long-running RCU read-side critical sections.
Turn the usleep 1000us (1ms) into a poll of 10ms. One of the advantage
of using poll() rather than usleep() is that it does not interact with
SIGALRM.

urcu-qsbr.c already checks for wait_loops >= RCU_QS_ACTIVE_ATTEMPTS, so
it is not affected by this issue.

Looking into these loops, however, shows that overflow of the loop
counter, although unlikely, would bring us back to a situation of high
cpu usage (a negative value well below RCU_QS_ACTIVE_ATTEMPTS).
Therefore, change the counter behavior so it stops incrementing when it
reaches RCU_QS_ACTIVE_ATTEMPTS, to eliminate overflow.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
urcu-bp.c
urcu-qsbr.c
urcu.c

index 59cc4bc8c76b7833d3e7573b4bb7f53802a28f8f..1386dfc8fdfb82a533aface955c79c6f0b318daf 100644 (file)
--- a/urcu-bp.c
+++ b/urcu-bp.c
@@ -79,8 +79,8 @@ void *mremap_wrapper(void *old_address, size_t old_size,
 }
 #endif
 
-/* Sleep delay in us */
-#define RCU_SLEEP_DELAY                1000
+/* Sleep delay in ms */
+#define RCU_SLEEP_DELAY_MS     10
 #define INIT_NR_THREADS                8
 #define ARENA_INIT_ALLOC               \
        sizeof(struct registry_chunk)   \
@@ -174,7 +174,7 @@ static void mutex_unlock(pthread_mutex_t *mutex)
 void update_counter_and_wait(void)
 {
        CDS_LIST_HEAD(qsreaders);
-       int wait_loops = 0;
+       unsigned int wait_loops = 0;
        struct rcu_reader *index, *tmp;
 
        /* Switch parity: 0 -> 1, 1 -> 0 */
@@ -198,7 +198,9 @@ void update_counter_and_wait(void)
         * Wait for each thread rcu_reader.ctr count to become 0.
         */
        for (;;) {
-               wait_loops++;
+               if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS)
+                       wait_loops++;
+
                cds_list_for_each_entry_safe(index, tmp, &registry, node) {
                        if (!rcu_old_gp_ongoing(&index->ctr))
                                cds_list_move(&index->node, &qsreaders);
@@ -207,8 +209,8 @@ void update_counter_and_wait(void)
                if (cds_list_empty(&registry)) {
                        break;
                } else {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS)
-                               usleep(RCU_SLEEP_DELAY);
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)
+                               (void) poll(NULL, 0, RCU_SLEEP_DELAY_MS);
                        else
                                caa_cpu_relax();
                }
index ec483d925c0bdbfc85e8a0dfeb1690849a940705..76aaabb4eac85fbb11aefc487bab627aa241a384 100644 (file)
@@ -119,7 +119,7 @@ static void wait_gp(void)
 static void update_counter_and_wait(void)
 {
        CDS_LIST_HEAD(qsreaders);
-       int wait_loops = 0;
+       unsigned int wait_loops = 0;
        struct rcu_reader *index, *tmp;
 
 #if (CAA_BITS_PER_LONG < 64)
@@ -150,7 +150,6 @@ static void update_counter_and_wait(void)
         * Wait for each thread rcu_reader_qs_gp count to become 0.
         */
        for (;;) {
-               wait_loops++;
                if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                        uatomic_set(&gp_futex, -1);
                        /*
@@ -163,6 +162,8 @@ static void update_counter_and_wait(void)
                        }
                        /* Write futex before read reader_gp */
                        cmm_smp_mb();
+               } else {
+                       wait_loops++;
                }
                cds_list_for_each_entry_safe(index, tmp, &registry, node) {
                        if (!rcu_gp_ongoing(&index->ctr))
diff --git a/urcu.c b/urcu.c
index 7c03d900c7767f80c340d15cffcfeebc15f585c6..33e35e1d36a722cb2de2f74e8f3a15c8327f3687 100644 (file)
--- a/urcu.c
+++ b/urcu.c
@@ -52,9 +52,9 @@
 /*
  * If a reader is really non-cooperative and refuses to commit its
  * rcu_active_readers count to memory (there is no barrier in the reader
- * per-se), kick it after a few loops waiting for it.
+ * per-se), kick it after 10 loops waiting for it.
  */
-#define KICK_READER_LOOPS 10000
+#define KICK_READER_LOOPS      10
 
 /*
  * Active attempts to check for reader Q.S. before calling futex().
@@ -218,8 +218,11 @@ static void wait_gp(void)
 void update_counter_and_wait(void)
 {
        CDS_LIST_HEAD(qsreaders);
-       int wait_loops = 0;
+       unsigned int wait_loops = 0;
        struct rcu_reader *index, *tmp;
+#ifdef HAS_INCOHERENT_CACHES
+       unsigned int wait_gp_loops = 0;
+#endif /* HAS_INCOHERENT_CACHES */
 
        /* Switch parity: 0 -> 1, 1 -> 0 */
        CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR_PHASE);
@@ -244,11 +247,12 @@ void update_counter_and_wait(void)
         * Wait for each thread URCU_TLS(rcu_reader).ctr count to become 0.
         */
        for (;;) {
-               wait_loops++;
-               if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) {
+               if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                        uatomic_dec(&gp_futex);
                        /* Write futex before read reader_gp */
                        smp_mb_master(RCU_MB_GROUP);
+               } else {
+                       wait_loops++;
                }
 
                cds_list_for_each_entry_safe(index, tmp, &registry, node) {
@@ -258,14 +262,14 @@ void update_counter_and_wait(void)
 
 #ifndef HAS_INCOHERENT_CACHES
                if (cds_list_empty(&registry)) {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) {
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                                /* Read reader_gp before write futex */
                                smp_mb_master(RCU_MB_GROUP);
                                uatomic_set(&gp_futex, 0);
                        }
                        break;
                } else {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS)
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)
                                wait_gp();
                        else
                                caa_cpu_relax();
@@ -277,22 +281,21 @@ void update_counter_and_wait(void)
                 * for too long.
                 */
                if (cds_list_empty(&registry)) {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) {
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                                /* Read reader_gp before write futex */
                                smp_mb_master(RCU_MB_GROUP);
                                uatomic_set(&gp_futex, 0);
                        }
                        break;
                } else {
-                       switch (wait_loops) {
-                       case RCU_QS_ACTIVE_ATTEMPTS:
-                               wait_gp();
-                               break; /* only escape switch */
-                       case KICK_READER_LOOPS:
+                       if (wait_gp_loops == KICK_READER_LOOPS) {
                                smp_mb_master(RCU_MB_GROUP);
-                               wait_loops = 0;
-                               break; /* only escape switch */
-                       default:
+                               wait_gp_loops = 0;
+                       }
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
+                               wait_gp();
+                               wait_gp_loops++;
+                       } else {
                                caa_cpu_relax();
                        }
                }
This page took 0.030634 seconds and 4 git commands to generate.