From: Mathieu Desnoyers Date: Wed, 11 Feb 2009 21:52:54 +0000 (-0500) Subject: Add missing memory barriers to ensure progress and remove an unnecessary ACCESS_ONCE X-Git-Tag: v0.1~297 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=9598a4814c854780e9ca9bb2cfff8d77442c3db6;p=urcu.git Add missing memory barriers to ensure progress and remove an unnecessary ACCESS_ONCE [...] So we could have the following reader+writer sequence : Interleaved writer Sequence 2 and reader seq. 1. Reader: R.1 - read urcu_active_readers S.2 - read counter Writer: B - read other threads urcu_active_readers (there are none) A.1 - flip counter A.2 - read counter Reader: RS.2- write urcu_active_readers, depends on read counter and read urcu_active_readers Here, the reader would have updated its counter as belonging to the old q.s. period, but the writer will later wait for the new period. But given the writer will eventually do a second flip+wait, the reader in the other q.s. window will be caught by the second flip. Therefore, we could be tempted to think that those mb() could be unnecessary, which would lead to a scheme where urcu_active_readers and urcu_gp_ctr are done in a completely random order one vs the other. Let's see what it gives : synchronize_rcu() force_mb_all_threads() /* * Orders pointer publication and * (urcu_active_readers/urcu_gp_ctr accesses) */ switch_qparity() switch_next_urcu_qparity() [just does counter flip 0->1] wait_for_quiescent_state() wait for all threads in parity 0 switch_qparity() switch_next_urcu_qparity() [Just does counter flip 1->0] wait_for_quiescent_state() Wait for all threads in parity 1 force_mb_all_threads() /* * Orders * (urcu_active_readers/urcu_gp_ctr accesses) * and old data removal. */ *but* ! There is a reason why we don't want to do this. If switch_next_urcu_qparity() [Just does counter flip 1->0] happens before the end of the previous Wait for all threads in parity 0 We enter in a situation where all newly coming readers will see the parity bit as 0, although we are still waiting for that parity to end. We end up in a state when the writer can be blocked forever (no possible progress) if there are steadily readers subscribed for the data. Basically, to put it differently, we could simply remove the bit flipping from the writer and wait for *all* readers to exit their critical section (even the ones simply interested in the new pointer). But this shares the same problem the version above has, which is that we end up in a situation where the writer won't progress if there are always readers in a critical section. The same applies to switch_next_urcu_qparity() [Just does counter flip 0->1] wait for all threads in parity 0 If we don't put a mb() between those two (as I mistakenly did), we can end up waiting for readers in parity 0 while the parity bit wasn't flipped yet. oops. Same potential no-progress situation. The ordering of memory reads in the reader for urcu_active_readers/urcu_gp_ctr accesses does not seem to matter because the data contains information about which q.s. period parity it is in. In whichever order those variables are read seems to all work fine. In the end, it's to insure that the writer will always progress that we have to enforce smp_mb() between *all* switch_next_urcu_qparity and wait for threads. Mine and yours. The code in rcu_old_gp_ongoing (in my git tree) uses ACCESS_ONCE around urcu_active_readers and urcu_gp_ctr reads. I think the ACCESS_ONCE around urcu_gp_crt is useless because urcu_gp_ctr is only being modified by ourself and we hold a mutex. However, making sure that urcu_active_readers is only read once is clearly required. Signed-off-by: Mathieu Desnoyers --- diff --git a/urcu.c b/urcu.c index ac1a4d6..9542b26 100644 --- a/urcu.c +++ b/urcu.c @@ -125,41 +125,70 @@ void wait_for_quiescent_state(void) while (rcu_old_gp_ongoing(index->urcu_active_readers)) barrier(); } - /* - * Locally : read *index->urcu_active_readers before freeing old - * pointer. - * Remote (reader threads) : Order urcu_qparity update and other - * thread's quiescent state counter read. - */ - force_mb_all_threads(); } -static void switch_qparity(void) +void synchronize_rcu(void) { - /* All threads should read qparity before accessing data structure. */ - /* Write ptr before changing the qparity */ + /* All threads should read qparity before accessing data structure + * where new ptr points to. */ + /* Write new ptr before changing the qparity */ force_mb_all_threads(); debug_yield_write(); - switch_next_urcu_qparity(); + + internal_urcu_lock(); + debug_yield_write(); + + switch_next_urcu_qparity(); /* 0 -> 1 */ debug_yield_write(); /* - * Wait for previous parity to be empty of readers. + * Must commit qparity update to memory before waiting for parity + * 0 quiescent state. Failure to do so could result in the writer + * waiting forever while new readers are always accessing data (no + * progress). */ - wait_for_quiescent_state(); -} + mb(); -void synchronize_rcu(void) -{ - debug_yield_write(); - internal_urcu_lock(); + /* + * Wait for previous parity to be empty of readers. + */ + wait_for_quiescent_state(); /* Wait readers in parity 0 */ debug_yield_write(); - switch_qparity(); + + /* + * Must finish waiting for quiescent state for parity 0 before + * committing qparity update to memory. Failure to do so could result in + * the writer waiting forever while new readers are always accessing + * data (no progress). + */ + mb(); + + switch_next_urcu_qparity(); /* 1 -> 0 */ debug_yield_write(); - switch_qparity(); + + /* + * Must commit qparity update to memory before waiting for parity + * 1 quiescent state. Failure to do so could result in the writer + * waiting forever while new readers are always accessing data (no + * progress). + */ + mb(); + + /* + * Wait for previous parity to be empty of readers. + */ + wait_for_quiescent_state(); /* Wait readers in parity 1 */ debug_yield_write(); + internal_urcu_unlock(); debug_yield_write(); + + /* All threads should finish using the data referred to by old ptr + * before decrementing their urcu_active_readers count */ + /* Finish waiting for reader threads before letting the old ptr being + * freed. */ + force_mb_all_threads(); + debug_yield_write(); } void urcu_add_reader(pthread_t id) diff --git a/urcu.h b/urcu.h index 03764ab..c4a7992 100644 --- a/urcu.h +++ b/urcu.h @@ -207,10 +207,14 @@ static inline int rcu_old_gp_ongoing(long *value) if (value == NULL) return 0; debug_yield_write(); + /* + * Make sure both tests below are done on the same version of *value + * to insure consistency. + */ v = ACCESS_ONCE(*value); debug_yield_write(); return (v & RCU_GP_CTR_NEST_MASK) && - ((v ^ ACCESS_ONCE(urcu_gp_ctr)) & RCU_GP_CTR_BIT); + ((v ^ urcu_gp_ctr) & RCU_GP_CTR_BIT); } static inline void rcu_read_lock(void)