From: Mathieu Desnoyers Date: Sat, 26 Sep 2009 13:31:12 +0000 (-0400) Subject: Implement sched_yield UP support X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=ae62b5e8fe0986d0e1f62f59e9958da78f27946b;p=userspace-rcu.git Implement sched_yield UP support Signed-off-by: Mathieu Desnoyers --- diff --git a/urcu-qsbr-static.h b/urcu-qsbr-static.h index 87305cb..c39ab81 100644 --- a/urcu-qsbr-static.h +++ b/urcu-qsbr-static.h @@ -33,8 +33,7 @@ #include #include #include -#include -#include +#include #include #include @@ -89,10 +88,6 @@ (_________p1); \ }) -#define futex(...) syscall(__NR_futex, __VA_ARGS__) -#define FUTEX_WAIT 0 -#define FUTEX_WAKE 1 - /* * This code section can only be included in LGPL 2.1 compatible source code. * See below for the function call wrappers which can be used in code meant to @@ -109,7 +104,7 @@ #define KICK_READER_LOOPS 10000 /* - * Active attempts to check for reader Q.S. before calling futex(). + * Active attempts to check for reader Q.S. before calling sched_yield(). */ #define RCU_QS_ACTIVE_ATTEMPTS 100 @@ -173,7 +168,8 @@ static inline void reader_barrier() } #define RCU_GP_ONLINE (1UL << 0) -#define RCU_GP_CTR (1UL << 1) +#define RCU_GP_ONGOING (1UL << 1) +#define RCU_GP_CTR (1UL << 2) /* * Global quiescent period counter with low-order bits unused. @@ -184,20 +180,6 @@ extern unsigned long urcu_gp_ctr; extern unsigned long __thread rcu_reader_qs_gp; -extern int gp_futex; - -/* - * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. - */ -static inline void wake_up_gp(void) -{ - if (unlikely(atomic_read(&gp_futex) == -1)) { - atomic_set(&gp_futex, 0); - futex(&gp_futex, FUTEX_WAKE, 1, - NULL, NULL, 0); - } -} - #if (BITS_PER_LONG < 64) static inline int rcu_gp_ongoing(unsigned long *value) { @@ -231,10 +213,15 @@ static inline void _rcu_read_unlock(void) static inline void _rcu_quiescent_state(void) { - smp_mb(); - _STORE_SHARED(rcu_reader_qs_gp, _LOAD_SHARED(urcu_gp_ctr)); - smp_mb(); /* write rcu_reader_qs_gp before read futex */ - wake_up_gp(); + long gp_ctr; + + smp_mb(); + gp_ctr = LOAD_SHARED(urcu_gp_ctr); + if (unlikely(gp_ctr & RCU_GP_ONGOING)) { + sched_yield(); + gp_ctr = LOAD_SHARED(urcu_gp_ctr); + } + _STORE_SHARED(rcu_reader_qs_gp, gp_ctr); smp_mb(); } @@ -242,13 +229,18 @@ static inline void _rcu_thread_offline(void) { smp_mb(); STORE_SHARED(rcu_reader_qs_gp, 0); - smp_mb(); /* write rcu_reader_qs_gp before read futex */ - wake_up_gp(); } static inline void _rcu_thread_online(void) { - _STORE_SHARED(rcu_reader_qs_gp, LOAD_SHARED(urcu_gp_ctr)); + long gp_ctr; + + gp_ctr = LOAD_SHARED(urcu_gp_ctr); + if (unlikely(gp_ctr & RCU_GP_ONGOING)) { + sched_yield(); + gp_ctr = LOAD_SHARED(urcu_gp_ctr); + } + _STORE_SHARED(rcu_reader_qs_gp, gp_ctr); smp_mb(); } diff --git a/urcu-qsbr.c b/urcu-qsbr.c index dac6649..b42d7c4 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -39,8 +39,6 @@ static pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER; -int gp_futex; - /* * Global grace period counter. */ @@ -104,24 +102,6 @@ static void internal_urcu_unlock(void) /* * synchronize_rcu() waiting. Single thread. */ -static void wait_gp(struct reader_registry *index) -{ - atomic_dec(&gp_futex); - smp_mb(); /* Write futex before read reader_gp */ - if (!rcu_gp_ongoing(index->rcu_reader_qs_gp)) { - /* Read reader_gp before write futex */ - smp_mb(); - /* Callbacks are queued, don't wait. */ - atomic_set(&gp_futex, 0); - } else { - /* Read reader_gp before read futex */ - smp_rmb(); - if (atomic_read(&gp_futex) == -1) - futex(&gp_futex, FUTEX_WAIT, -1, - NULL, NULL, 0); - } -} - static void wait_for_quiescent_state(void) { struct reader_registry *index; @@ -136,7 +116,7 @@ static void wait_for_quiescent_state(void) while (rcu_gp_ongoing(index->rcu_reader_qs_gp)) { if (wait_loops++ == RCU_QS_ACTIVE_ATTEMPTS) { - wait_gp(index); + sched_yield(); /* ideally sched_yield_to() */ } else { #ifndef HAS_INCOHERENT_CACHES cpu_relax(); @@ -184,6 +164,8 @@ void synchronize_rcu(void) internal_urcu_lock(); + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_ONGOING); + switch_next_urcu_qparity(); /* 0 -> 1 */ /* @@ -222,6 +204,8 @@ void synchronize_rcu(void) */ wait_for_quiescent_state(); /* Wait readers in parity 1 */ + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_ONGOING); + internal_urcu_unlock(); /* @@ -249,8 +233,10 @@ void synchronize_rcu(void) STORE_SHARED(rcu_reader_qs_gp, 0); internal_urcu_lock(); + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_ONGOING); STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr + RCU_GP_CTR); wait_for_quiescent_state(); + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_ONGOING); internal_urcu_unlock(); if (was_online) diff --git a/urcu-static.h b/urcu-static.h index 3caa0f9..7bde5ba 100644 --- a/urcu-static.h +++ b/urcu-static.h @@ -31,8 +31,7 @@ #include #include -#include -#include +#include #include #include @@ -96,10 +95,6 @@ (_________p1); \ }) -#define futex(...) syscall(__NR_futex, __VA_ARGS__) -#define FUTEX_WAIT 0 -#define FUTEX_WAKE 1 - /* * This code section can only be included in LGPL 2.1 compatible source code. * See below for the function call wrappers which can be used in code meant to @@ -124,7 +119,7 @@ #define KICK_READER_LOOPS 10000 /* - * Active attempts to check for reader Q.S. before calling futex(). + * Active attempts to check for reader Q.S. before calling sched_yield(). */ #define RCU_QS_ACTIVE_ATTEMPTS 100 @@ -210,6 +205,7 @@ static inline void reader_barrier() /* Use the amount of bits equal to half of the architecture long size */ #define RCU_GP_CTR_BIT (1UL << (sizeof(long) << 2)) #define RCU_GP_CTR_NEST_MASK (RCU_GP_CTR_BIT - 1) +#define RCU_GP_ONGOING (RCU_GP_CTR_BIT << 1) /* * Global quiescent period counter with low-order bits unused. @@ -220,20 +216,6 @@ extern long urcu_gp_ctr; extern long __thread urcu_active_readers; -extern int gp_futex; - -/* - * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. - */ -static inline void wake_up_gp(void) -{ - if (unlikely(atomic_read(&gp_futex) == -1)) { - atomic_set(&gp_futex, 0); - futex(&gp_futex, FUTEX_WAKE, 1, - NULL, NULL, 0); - } -} - static inline int rcu_old_gp_ongoing(long *value) { long v; @@ -251,12 +233,17 @@ static inline int rcu_old_gp_ongoing(long *value) static inline void _rcu_read_lock(void) { - long tmp; + long tmp, gp_ctr; tmp = urcu_active_readers; /* urcu_gp_ctr = RCU_GP_COUNT | (~RCU_GP_CTR_BIT or RCU_GP_CTR_BIT) */ if (likely(!(tmp & RCU_GP_CTR_NEST_MASK))) { - _STORE_SHARED(urcu_active_readers, _LOAD_SHARED(urcu_gp_ctr)); + gp_ctr = _LOAD_SHARED(urcu_gp_ctr); + if (unlikely(gp_ctr & RCU_GP_ONGOING)) { + sched_yield(); + gp_ctr = _LOAD_SHARED(urcu_gp_ctr); + } + _STORE_SHARED(urcu_active_readers, gp_ctr); /* * Set active readers count for outermost nesting level before * accessing the pointer. See force_mb_all_threads(). @@ -269,24 +256,15 @@ static inline void _rcu_read_lock(void) static inline void _rcu_read_unlock(void) { - long tmp; - - tmp = urcu_active_readers; /* * Finish using rcu before decrementing the pointer. * See force_mb_all_threads(). + * Formally only needed for outermost nesting level, but leave barrier + * in place for nested unlocks to remove a branch from the common case + * (no nesting). */ - if (likely((tmp & RCU_GP_CTR_NEST_MASK) == RCU_GP_COUNT)) { - reader_barrier(); - _STORE_SHARED(urcu_active_readers, - urcu_active_readers - RCU_GP_COUNT); - /* write urcu_active_readers before read futex */ - reader_barrier(); - wake_up_gp(); - } else { - _STORE_SHARED(urcu_active_readers, - urcu_active_readers - RCU_GP_COUNT); - } + reader_barrier(); + _STORE_SHARED(urcu_active_readers, urcu_active_readers - RCU_GP_COUNT); } /** diff --git a/urcu.c b/urcu.c index 07661a3..d960497 100644 --- a/urcu.c +++ b/urcu.c @@ -49,8 +49,6 @@ void urcu_init(void) static pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER; -int gp_futex; - /* * Global grace period counter. * Contains the current RCU_GP_CTR_BIT. @@ -130,16 +128,19 @@ static void switch_next_urcu_qparity(void) } #ifdef URCU_MB +#ifdef HAS_INCOHERENT_CACHES static void force_mb_single_thread(struct reader_registry *index) { smp_mb(); } +#endif /* #ifdef HAS_INCOHERENT_CACHES */ static void force_mb_all_threads(void) { smp_mb(); } #else /* #ifdef URCU_MB */ +#ifdef HAS_INCOHERENT_CACHES static void force_mb_single_thread(struct reader_registry *index) { assert(registry); @@ -162,6 +163,7 @@ static void force_mb_single_thread(struct reader_registry *index) } smp_mb(); /* read ->need_mb before ending the barrier */ } +#endif /* #ifdef HAS_INCOHERENT_CACHES */ static void force_mb_all_threads(void) { @@ -206,27 +208,6 @@ static void force_mb_all_threads(void) } #endif /* #else #ifdef URCU_MB */ -/* - * synchronize_rcu() waiting. Single thread. - */ -static void wait_gp(struct reader_registry *index) -{ - atomic_dec(&gp_futex); - force_mb_single_thread(index); /* Write futex before read reader_gp */ - if (!rcu_old_gp_ongoing(index->urcu_active_readers)) { - /* Read reader_gp before write futex */ - force_mb_single_thread(index); - /* Callbacks are queued, don't wait. */ - atomic_set(&gp_futex, 0); - } else { - /* Read reader_gp before read futex */ - force_mb_single_thread(index); - if (atomic_read(&gp_futex) == -1) - futex(&gp_futex, FUTEX_WAIT, -1, - NULL, NULL, 0); - } -} - void wait_for_quiescent_state(void) { struct reader_registry *index; @@ -241,7 +222,7 @@ void wait_for_quiescent_state(void) #ifndef HAS_INCOHERENT_CACHES while (rcu_old_gp_ongoing(index->urcu_active_readers)) { if (wait_loops++ == RCU_QS_ACTIVE_ATTEMPTS) { - wait_gp(index); + sched_yield(); /* ideally sched_yield_to() */ } else { cpu_relax(); } @@ -254,7 +235,7 @@ void wait_for_quiescent_state(void) while (rcu_old_gp_ongoing(index->urcu_active_readers)) { switch (wait_loops++) { case RCU_QS_ACTIVE_ATTEMPTS: - wait_gp(index); + sched_yield(); /* ideally sched_yield_to() */ break; case KICK_READER_LOOPS: force_mb_single_thread(index); @@ -278,6 +259,8 @@ void synchronize_rcu(void) /* Write new ptr before changing the qparity */ force_mb_all_threads(); + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_ONGOING); + switch_next_urcu_qparity(); /* 0 -> 1 */ /* @@ -337,6 +320,8 @@ void synchronize_rcu(void) */ wait_for_quiescent_state(); /* Wait readers in parity 1 */ + STORE_SHARED(urcu_gp_ctr, urcu_gp_ctr ^ RCU_GP_ONGOING); + /* Finish waiting for reader threads before letting the old ptr being * freed. Must be done within internal_urcu_lock because it iterates on * reader threads. */