Fix: high cpu usage in synchronize_rcu with long RCU read-side C.S.
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 1 Mar 2014 16:33:25 +0000 (11:33 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 1 Mar 2014 17:49:28 +0000 (12:49 -0500)
We noticed that with this kind of scenario:
- application using urcu-mb, urcu-membarrier, urcu-signal, or urcu-bp,
- long RCU read-side critical sections, caused by e.g. long network I/O
  system calls,
- other short lived RCU critical sections running in other threads,
- very frequent invocation of call_rcu to enqueue callbacks,
lead to abnormally high CPU usage within synchronize_rcu() in the
call_rcu worker threads.

Inspection of the code gives us the answer: in urcu.c, we expect that if
we need to wait on a futex (wait_gp()), we expect to be able to end the
grace period within the next loop, having been notified by a
rcu_read_unlock(). However, this is not always the case: we can very
well be awakened by a rcu_read_unlock() executed on a thread running
short-lived RCU read-side critical sections, while the long-running RCU
read-side C.S. is still active. We end up in a situation where we
busy-wait for a very long time, because the counter is !=
RCU_QS_ACTIVE_ATTEMPTS until a 32-bit overflow happens (or more likely,
until we complete the grace period). We need to change the wait_loops ==
RCU_QS_ACTIVE_ATTEMPTS check into an inequality to use wait_gp() for
every attempts beyond RCU_QS_ACTIVE_ATTEMPTS loops.

urcu-bp.c also has this issue. Moreover, it uses usleep() rather than
poll() when dealing with long-running RCU read-side critical sections.
Turn the usleep 1000us (1ms) into a poll of 10ms. One of the advantage
of using poll() rather than usleep() is that it does not interact with
SIGALRM.

urcu-qsbr.c already checks for wait_loops >= RCU_QS_ACTIVE_ATTEMPTS, so
it is not affected by this issue.

Looking into these loops, however, shows that overflow of the loop
counter, although unlikely, would bring us back to a situation of high
cpu usage (a negative value well below RCU_QS_ACTIVE_ATTEMPTS).
Therefore, change the counter behavior so it stops incrementing when it
reaches RCU_QS_ACTIVE_ATTEMPTS, to eliminate overflow.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
urcu-bp.c
urcu-qsbr.c
urcu.c

index 836ef92fe1f8ba92335ab9b8a3047e51f520d190..7c573d6ec1f7624328018dfc19313a5d6e964a3f 100644 (file)
--- a/urcu-bp.c
+++ b/urcu-bp.c
@@ -79,8 +79,8 @@ void *mremap_wrapper(void *old_address, size_t old_size,
 }
 #endif
 
-/* Sleep delay in us */
-#define RCU_SLEEP_DELAY                1000
+/* Sleep delay in ms */
+#define RCU_SLEEP_DELAY_MS     10
 #define INIT_NR_THREADS                8
 #define ARENA_INIT_ALLOC               \
        sizeof(struct registry_chunk)   \
@@ -169,7 +169,7 @@ static void wait_for_readers(struct cds_list_head *input_readers,
                        struct cds_list_head *cur_snap_readers,
                        struct cds_list_head *qsreaders)
 {
-       int wait_loops = 0;
+       unsigned int wait_loops = 0;
        struct rcu_reader *index, *tmp;
 
        /*
@@ -178,7 +178,9 @@ static void wait_for_readers(struct cds_list_head *input_readers,
         * rcu_gp.ctr value.
         */
        for (;;) {
-               wait_loops++;
+               if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS)
+                       wait_loops++;
+
                cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
                        switch (rcu_reader_state(&index->ctr)) {
                        case RCU_READER_ACTIVE_CURRENT:
@@ -205,8 +207,8 @@ static void wait_for_readers(struct cds_list_head *input_readers,
                if (cds_list_empty(input_readers)) {
                        break;
                } else {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS)
-                               usleep(RCU_SLEEP_DELAY);
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)
+                               (void) poll(NULL, 0, RCU_SLEEP_DELAY_MS);
                        else
                                caa_cpu_relax();
                }
index 2cd6c126d8dd8d78ebd7877a88f324ea64cab0c7..d02572513139eb5986529328c0066deb319cafc9 100644 (file)
@@ -121,7 +121,7 @@ static void wait_for_readers(struct cds_list_head *input_readers,
                        struct cds_list_head *cur_snap_readers,
                        struct cds_list_head *qsreaders)
 {
-       int wait_loops = 0;
+       unsigned int wait_loops = 0;
        struct rcu_reader *index, *tmp;
 
        /*
@@ -130,7 +130,6 @@ static void wait_for_readers(struct cds_list_head *input_readers,
         * current rcu_gp.ctr value.
         */
        for (;;) {
-               wait_loops++;
                if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                        uatomic_set(&rcu_gp.futex, -1);
                        /*
@@ -143,6 +142,8 @@ static void wait_for_readers(struct cds_list_head *input_readers,
                        }
                        /* Write futex before read reader_gp */
                        cmm_smp_mb();
+               } else {
+                       wait_loops++;
                }
                cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
                        switch (rcu_reader_state(&index->ctr)) {
diff --git a/urcu.c b/urcu.c
index 0bd15471d29f40129bef0cefae55c3ca3d3dc1b3..087a0c67ad73fddf212fec3bd4ff63854ab8b8d4 100644 (file)
--- a/urcu.c
+++ b/urcu.c
@@ -53,9 +53,9 @@
 /*
  * If a reader is really non-cooperative and refuses to commit its
  * rcu_active_readers count to memory (there is no barrier in the reader
- * per-se), kick it after a few loops waiting for it.
+ * per-se), kick it after 10 loops waiting for it.
  */
-#define KICK_READER_LOOPS 10000
+#define KICK_READER_LOOPS      10
 
 /*
  * Active attempts to check for reader Q.S. before calling futex().
@@ -230,8 +230,11 @@ static void wait_for_readers(struct cds_list_head *input_readers,
                        struct cds_list_head *cur_snap_readers,
                        struct cds_list_head *qsreaders)
 {
-       int wait_loops = 0;
+       unsigned int wait_loops = 0;
        struct rcu_reader *index, *tmp;
+#ifdef HAS_INCOHERENT_CACHES
+       unsigned int wait_gp_loops = 0;
+#endif /* HAS_INCOHERENT_CACHES */
 
        /*
         * Wait for each thread URCU_TLS(rcu_reader).ctr to either
@@ -239,11 +242,12 @@ static void wait_for_readers(struct cds_list_head *input_readers,
         * rcu_gp.ctr value.
         */
        for (;;) {
-               wait_loops++;
-               if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) {
+               if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                        uatomic_dec(&rcu_gp.futex);
                        /* Write futex before read reader_gp */
                        smp_mb_master(RCU_MB_GROUP);
+               } else {
+                       wait_loops++;
                }
 
                cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
@@ -271,14 +275,14 @@ static void wait_for_readers(struct cds_list_head *input_readers,
 
 #ifndef HAS_INCOHERENT_CACHES
                if (cds_list_empty(input_readers)) {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) {
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                                /* Read reader_gp before write futex */
                                smp_mb_master(RCU_MB_GROUP);
                                uatomic_set(&rcu_gp.futex, 0);
                        }
                        break;
                } else {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS)
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)
                                wait_gp();
                        else
                                caa_cpu_relax();
@@ -290,22 +294,21 @@ static void wait_for_readers(struct cds_list_head *input_readers,
                 * for too long.
                 */
                if (cds_list_empty(input_readers)) {
-                       if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) {
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
                                /* Read reader_gp before write futex */
                                smp_mb_master(RCU_MB_GROUP);
                                uatomic_set(&rcu_gp.futex, 0);
                        }
                        break;
                } else {
-                       switch (wait_loops) {
-                       case RCU_QS_ACTIVE_ATTEMPTS:
-                               wait_gp();
-                               break; /* only escape switch */
-                       case KICK_READER_LOOPS:
+                       if (wait_gp_loops == KICK_READER_LOOPS) {
                                smp_mb_master(RCU_MB_GROUP);
-                               wait_loops = 0;
-                               break; /* only escape switch */
-                       default:
+                               wait_gp_loops = 0;
+                       }
+                       if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
+                               wait_gp();
+                               wait_gp_loops++;
+                       } else {
                                caa_cpu_relax();
                        }
                }
This page took 0.029102 seconds and 4 git commands to generate.