1 // SPDX-FileCopyrightText: 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
2 // SPDX-FileCopyrightText: 2009 Paul E. McKenney, IBM Corporation.
4 // SPDX-License-Identifier: LGPL-2.1-or-later
7 * Userspace RCU QSBR library
9 * IBM's contributions to this file may be relicensed under LGPLv2 or later.
12 #define URCU_NO_COMPAT_IDENTIFIERS
23 #include <urcu/annotate.h>
24 #include <urcu/assert.h>
25 #include <urcu/wfcqueue.h>
26 #include <urcu/map/urcu-qsbr.h>
27 #define BUILD_QSBR_LIB
28 #include <urcu/static/urcu-qsbr.h>
29 #include <urcu/pointer.h>
30 #include <urcu/tls-compat.h>
33 #include "urcu-wait.h"
34 #include "urcu-utils.h"
37 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
39 #include <urcu/urcu-qsbr.h>
42 void __attribute__((destructor
)) urcu_qsbr_exit(void);
43 static void urcu_call_rcu_exit(void);
46 * rcu_gp_lock ensures mutual exclusion between threads calling
49 static pthread_mutex_t rcu_gp_lock
= PTHREAD_MUTEX_INITIALIZER
;
51 * rcu_registry_lock ensures mutual exclusion between threads
52 * registering and unregistering themselves to/from the registry, and
53 * with threads reading that registry from synchronize_rcu(). However,
54 * this lock is not held all the way through the completion of awaiting
55 * for the grace period. It is sporadically released between iterations
57 * rcu_registry_lock may nest inside rcu_gp_lock.
59 static pthread_mutex_t rcu_registry_lock
= PTHREAD_MUTEX_INITIALIZER
;
60 struct urcu_gp urcu_qsbr_gp
= { .ctr
= URCU_QSBR_GP_ONLINE
};
63 * Active attempts to check for reader Q.S. before calling futex().
65 #define RCU_QS_ACTIVE_ATTEMPTS 100
68 * Written to only by each individual reader. Read by both the reader and the
71 DEFINE_URCU_TLS(struct urcu_qsbr_reader
, urcu_qsbr_reader
);
73 static CDS_LIST_HEAD(registry
);
76 * Queue keeping threads awaiting to wait for a grace period. Contains
77 * struct gp_waiters_thread objects.
79 static DEFINE_URCU_WAIT_QUEUE(gp_waiters
);
81 static void mutex_lock(pthread_mutex_t
*mutex
)
85 #ifndef DISTRUST_SIGNALS_EXTREME
86 ret
= pthread_mutex_lock(mutex
);
89 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
90 while ((ret
= pthread_mutex_trylock(mutex
)) != 0) {
91 if (ret
!= EBUSY
&& ret
!= EINTR
)
95 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
98 static void mutex_unlock(pthread_mutex_t
*mutex
)
102 ret
= pthread_mutex_unlock(mutex
);
108 * synchronize_rcu() waiting. Single thread.
110 static void wait_gp(void)
112 /* Read reader_gp before read futex */
114 while (uatomic_read(&urcu_qsbr_gp
.futex
) == -1) {
115 if (!futex_noasync(&urcu_qsbr_gp
.futex
, FUTEX_WAIT
, -1, NULL
, NULL
, 0)) {
117 * Prior queued wakeups queued by unrelated code
118 * using the same address can cause futex wait to
119 * return 0 even through the futex value is still
120 * -1 (spurious wakeups). Check the value again
121 * in user-space to validate whether it really
128 /* Value already changed. */
131 /* Retry if interrupted by signal. */
132 break; /* Get out of switch. Check again. */
134 /* Unexpected error. */
141 * Always called with rcu_registry lock held. Releases this lock between
142 * iterations and grabs it again. Holds the lock when it returns.
144 static void wait_for_readers(struct cds_list_head
*input_readers
,
145 struct cds_list_head
*cur_snap_readers
,
146 struct cds_list_head
*qsreaders
,
147 cmm_annotate_t
*group
)
149 unsigned int wait_loops
= 0;
150 struct urcu_qsbr_reader
*index
, *tmp
;
153 * Wait for each thread URCU_TLS(urcu_qsbr_reader).ctr to either
154 * indicate quiescence (offline), or for them to observe the
155 * current urcu_qsbr_gp.ctr value.
158 if (wait_loops
< RCU_QS_ACTIVE_ATTEMPTS
)
160 if (wait_loops
>= RCU_QS_ACTIVE_ATTEMPTS
) {
161 uatomic_set(&urcu_qsbr_gp
.futex
, -1);
163 * Write futex before write waiting (the other side
164 * reads them in the opposite order).
167 cds_list_for_each_entry(index
, input_readers
, node
) {
168 _CMM_STORE_SHARED(index
->waiting
, 1);
170 /* Write futex before read reader_gp */
173 cds_list_for_each_entry_safe(index
, tmp
, input_readers
, node
) {
174 switch (urcu_qsbr_reader_state(&index
->ctr
, group
)) {
175 case URCU_READER_ACTIVE_CURRENT
:
176 if (cur_snap_readers
) {
177 cds_list_move(&index
->node
,
182 case URCU_READER_INACTIVE
:
183 cds_list_move(&index
->node
, qsreaders
);
185 case URCU_READER_ACTIVE_OLD
:
187 * Old snapshot. Leaving node in
188 * input_readers will make us busy-loop
189 * until the snapshot becomes current or
190 * the reader becomes inactive.
196 if (cds_list_empty(input_readers
)) {
197 if (wait_loops
>= RCU_QS_ACTIVE_ATTEMPTS
) {
198 /* Read reader_gp before write futex */
199 uatomic_store(&urcu_qsbr_gp
.futex
, 0, CMM_RELEASE
);
203 /* Temporarily unlock the registry lock. */
204 mutex_unlock(&rcu_registry_lock
);
205 if (wait_loops
>= RCU_QS_ACTIVE_ATTEMPTS
) {
208 #ifndef HAS_INCOHERENT_CACHES
210 #else /* #ifndef HAS_INCOHERENT_CACHES */
212 #endif /* #else #ifndef HAS_INCOHERENT_CACHES */
214 /* Re-lock the registry lock before the next loop. */
215 mutex_lock(&rcu_registry_lock
);
221 * Using a two-subphases algorithm for architectures with smaller than 64-bit
222 * long-size to ensure we do not encounter an overflow bug.
225 #if (CAA_BITS_PER_LONG < 64)
226 void urcu_qsbr_synchronize_rcu(void)
228 cmm_annotate_define(acquire_group
);
229 cmm_annotate_define(release_group
);
230 CDS_LIST_HEAD(cur_snap_readers
);
231 CDS_LIST_HEAD(qsreaders
);
232 unsigned long was_online
;
233 DEFINE_URCU_WAIT_NODE(wait
, URCU_WAIT_WAITING
);
234 struct urcu_waiters waiters
;
236 was_online
= urcu_qsbr_read_ongoing();
238 /* All threads should read qparity before accessing data structure
239 * where new ptr points to. In the "then" case, rcu_thread_offline
240 * includes a memory barrier.
242 * Mark the writer thread offline to make sure we don't wait for
243 * our own quiescent state. This allows using synchronize_rcu()
244 * in threads registered as readers.
247 urcu_qsbr_thread_offline();
250 cmm_annotate_group_mb_release(&release_group
);
253 * Add ourself to gp_waiters queue of threads awaiting to wait
254 * for a grace period. Proceed to perform the grace period only
255 * if we are the first thread added into the queue.
257 if (urcu_wait_add(&gp_waiters
, &wait
) != 0) {
258 /* Not first in queue: will be awakened by another thread. */
259 urcu_adaptative_busy_wait(&wait
);
262 /* We won't need to wake ourself up */
263 urcu_wait_set_state(&wait
, URCU_WAIT_RUNNING
);
265 mutex_lock(&rcu_gp_lock
);
268 * Move all waiters into our local queue.
270 urcu_move_waiters(&waiters
, &gp_waiters
);
272 mutex_lock(&rcu_registry_lock
);
274 if (cds_list_empty(®istry
))
278 * Wait for readers to observe original parity or be quiescent.
279 * wait_for_readers() can release and grab again rcu_registry_lock
282 wait_for_readers(®istry
, &cur_snap_readers
, &qsreaders
, &acquire_group
);
285 * Must finish waiting for quiescent state for original parity
286 * before committing next urcu_qsbr_gp.ctr update to memory. Failure
287 * to do so could result in the writer waiting forever while new
288 * readers are always accessing data (no progress). Enforce
289 * compiler-order of load URCU_TLS(urcu_qsbr_reader).ctr before store
290 * to urcu_qsbr_gp.ctr.
295 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
296 * model easier to understand. It does not have a big performance impact
297 * anyway, given this is the write-side.
301 /* Switch parity: 0 -> 1, 1 -> 0 */
302 cmm_annotate_group_mem_release(&release_group
, &urcu_qsbr_gp
.ctr
);
303 uatomic_store(&urcu_qsbr_gp
.ctr
, urcu_qsbr_gp
.ctr
^ URCU_QSBR_GP_CTR
, CMM_RELAXED
);
306 * Must commit urcu_qsbr_gp.ctr update to memory before waiting for
307 * quiescent state. Failure to do so could result in the writer
308 * waiting forever while new readers are always accessing data
309 * (no progress). Enforce compiler-order of store to urcu_qsbr_gp.ctr
310 * before load URCU_TLS(urcu_qsbr_reader).ctr.
315 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
316 * model easier to understand. It does not have a big performance impact
317 * anyway, given this is the write-side.
322 * Wait for readers to observe new parity or be quiescent.
323 * wait_for_readers() can release and grab again rcu_registry_lock
326 wait_for_readers(&cur_snap_readers
, NULL
, &qsreaders
, &acquire_group
);
329 * Put quiescent reader list back into registry.
331 cds_list_splice(&qsreaders
, ®istry
);
333 mutex_unlock(&rcu_registry_lock
);
334 mutex_unlock(&rcu_gp_lock
);
335 urcu_wake_all_waiters(&waiters
);
338 * Finish waiting for reader threads before letting the old ptr being
341 cmm_annotate_group_mb_acquire(&acquire_group
);
344 urcu_qsbr_thread_online();
348 #else /* !(CAA_BITS_PER_LONG < 64) */
349 void urcu_qsbr_synchronize_rcu(void)
351 cmm_annotate_define(acquire_group
);
352 cmm_annotate_define(release_group
);
353 CDS_LIST_HEAD(qsreaders
);
354 unsigned long was_online
;
355 DEFINE_URCU_WAIT_NODE(wait
, URCU_WAIT_WAITING
);
356 struct urcu_waiters waiters
;
358 was_online
= urcu_qsbr_read_ongoing();
361 * Mark the writer thread offline to make sure we don't wait for
362 * our own quiescent state. This allows using synchronize_rcu()
363 * in threads registered as readers.
366 urcu_qsbr_thread_offline();
369 cmm_annotate_group_mb_release(&release_group
);
372 * Add ourself to gp_waiters queue of threads awaiting to wait
373 * for a grace period. Proceed to perform the grace period only
374 * if we are the first thread added into the queue.
376 if (urcu_wait_add(&gp_waiters
, &wait
) != 0) {
377 /* Not first in queue: will be awakened by another thread. */
378 urcu_adaptative_busy_wait(&wait
);
381 /* We won't need to wake ourself up */
382 urcu_wait_set_state(&wait
, URCU_WAIT_RUNNING
);
384 mutex_lock(&rcu_gp_lock
);
387 * Move all waiters into our local queue.
389 urcu_move_waiters(&waiters
, &gp_waiters
);
391 mutex_lock(&rcu_registry_lock
);
393 if (cds_list_empty(®istry
))
396 /* Increment current G.P. */
397 cmm_annotate_group_mem_release(&release_group
, &urcu_qsbr_gp
.ctr
);
398 uatomic_store(&urcu_qsbr_gp
.ctr
, urcu_qsbr_gp
.ctr
+ URCU_QSBR_GP_CTR
, CMM_RELAXED
);
401 * Must commit urcu_qsbr_gp.ctr update to memory before waiting for
402 * quiescent state. Failure to do so could result in the writer
403 * waiting forever while new readers are always accessing data
404 * (no progress). Enforce compiler-order of store to urcu_qsbr_gp.ctr
405 * before load URCU_TLS(urcu_qsbr_reader).ctr.
410 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
411 * model easier to understand. It does not have a big performance impact
412 * anyway, given this is the write-side.
417 * Wait for readers to observe new count of be quiescent.
418 * wait_for_readers() can release and grab again rcu_registry_lock
421 wait_for_readers(®istry
, NULL
, &qsreaders
, &acquire_group
);
424 * Put quiescent reader list back into registry.
426 cds_list_splice(&qsreaders
, ®istry
);
428 mutex_unlock(&rcu_registry_lock
);
429 mutex_unlock(&rcu_gp_lock
);
430 urcu_wake_all_waiters(&waiters
);
433 urcu_qsbr_thread_online();
437 cmm_annotate_group_mb_acquire(&acquire_group
);
439 #endif /* !(CAA_BITS_PER_LONG < 64) */
442 * library wrappers to be used by non-LGPL compatible source code.
445 void urcu_qsbr_read_lock(void)
447 _urcu_qsbr_read_lock();
450 void urcu_qsbr_read_unlock(void)
452 _urcu_qsbr_read_unlock();
455 int urcu_qsbr_read_ongoing(void)
457 return _urcu_qsbr_read_ongoing();
459 void rcu_read_ongoing_qsbr();
461 void urcu_qsbr_quiescent_state(void)
463 _urcu_qsbr_quiescent_state();
465 void rcu_quiescent_state_qsbr();
467 void urcu_qsbr_thread_offline(void)
469 _urcu_qsbr_thread_offline();
471 void rcu_thread_offline_qsbr();
473 void urcu_qsbr_thread_online(void)
475 _urcu_qsbr_thread_online();
478 void urcu_qsbr_register_thread(void)
480 URCU_TLS(urcu_qsbr_reader
).tid
= pthread_self();
481 urcu_posix_assert(URCU_TLS(urcu_qsbr_reader
).ctr
== 0);
483 mutex_lock(&rcu_registry_lock
);
484 urcu_posix_assert(!URCU_TLS(urcu_qsbr_reader
).registered
);
485 URCU_TLS(urcu_qsbr_reader
).registered
= 1;
486 cds_list_add(&URCU_TLS(urcu_qsbr_reader
).node
, ®istry
);
487 mutex_unlock(&rcu_registry_lock
);
488 _urcu_qsbr_thread_online();
491 void urcu_qsbr_unregister_thread(void)
494 * We have to make the thread offline otherwise we end up dealocking
495 * with a waiting writer.
497 _urcu_qsbr_thread_offline();
498 urcu_posix_assert(URCU_TLS(urcu_qsbr_reader
).registered
);
499 URCU_TLS(urcu_qsbr_reader
).registered
= 0;
500 mutex_lock(&rcu_registry_lock
);
501 cds_list_del(&URCU_TLS(urcu_qsbr_reader
).node
);
502 mutex_unlock(&rcu_registry_lock
);
505 void urcu_qsbr_exit(void)
508 * Assertion disabled because call_rcu threads are now rcu
509 * readers, and left running at exit.
510 * urcu_posix_assert(cds_list_empty(®istry));
512 urcu_call_rcu_exit();
515 DEFINE_RCU_FLAVOR(rcu_flavor
);
517 #include "urcu-call-rcu-impl.h"
518 #include "urcu-defer-impl.h"
519 #include "urcu-poll-impl.h"