1 // SPDX-FileCopyrightText: 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
2 // SPDX-FileCopyrightText: 2011-2012 Lai Jiangshan <laijs@cn.fujitsu.com>
4 // SPDX-License-Identifier: LGPL-2.1-or-later
6 #ifndef _URCU_WFCQUEUE_STATIC_H
7 #define _URCU_WFCQUEUE_STATIC_H
10 * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue
12 * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfcqueue.h for
13 * linking dynamically with the userspace rcu library.
19 #include <urcu/assert.h>
20 #include <urcu/compiler.h>
21 #include <urcu/uatomic.h>
28 * Concurrent queue with wait-free enqueue/blocking dequeue.
30 * This queue has been designed and implemented collaboratively by
31 * Mathieu Desnoyers and Lai Jiangshan. Inspired from
32 * half-wait-free/half-blocking queue implementation done by Paul E.
35 * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API
37 * Synchronization table:
39 * External synchronization techniques described in the API below is
40 * required between pairs marked with "X". No external synchronization
41 * required between pairs marked with "-".
44 * [1] cds_wfcq_enqueue
45 * [2] __cds_wfcq_splice (destination queue)
46 * [3] __cds_wfcq_dequeue
47 * [4] __cds_wfcq_splice (source queue)
48 * [5] __cds_wfcq_first
51 * [1] [2] [3] [4] [5] [6]
59 * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock().
61 * For convenience, cds_wfcq_dequeue_blocking() and
62 * cds_wfcq_splice_blocking() hold the dequeue lock.
64 * Besides locking, mutual exclusion of dequeue, splice and iteration
65 * can be ensured by performing all of those operations from a single
66 * thread, without requiring any lock.
69 #define WFCQ_ADAPT_ATTEMPTS 10 /* Retry if being set */
70 #define WFCQ_WAIT 10 /* Wait 10 ms if being set */
73 * cds_wfcq_node_init: initialize wait-free queue node.
75 static inline void _cds_wfcq_node_init(struct cds_wfcq_node
*node
)
81 * cds_wfcq_init: initialize wait-free queue (with lock). Pair with
84 static inline void _cds_wfcq_init(struct cds_wfcq_head
*head
,
85 struct cds_wfcq_tail
*tail
)
89 /* Set queue head and tail */
90 _cds_wfcq_node_init(&head
->node
);
91 tail
->p
= &head
->node
;
92 ret
= pthread_mutex_init(&head
->lock
, NULL
);
93 urcu_posix_assert(!ret
);
97 * cds_wfcq_destroy: destroy wait-free queue (with lock). Pair with
100 static inline void _cds_wfcq_destroy(struct cds_wfcq_head
*head
,
101 struct cds_wfcq_tail
*tail
__attribute__((unused
)))
103 int ret
= pthread_mutex_destroy(&head
->lock
);
104 urcu_posix_assert(!ret
);
108 * __cds_wfcq_init: initialize wait-free queue (without lock). Don't
109 * pair with any destroy function.
111 static inline void ___cds_wfcq_init(struct __cds_wfcq_head
*head
,
112 struct cds_wfcq_tail
*tail
)
114 /* Set queue head and tail */
115 _cds_wfcq_node_init(&head
->node
);
116 tail
->p
= &head
->node
;
120 * cds_wfcq_empty: return whether wait-free queue is empty.
122 * No memory barrier is issued. No mutual exclusion is required.
124 * We perform the test on head->node.next to check if the queue is
125 * possibly empty, but we confirm this by checking if the tail pointer
126 * points to the head node because the tail pointer is the linearisation
127 * point of the enqueuers. Just checking the head next pointer could
128 * make a queue appear empty if an enqueuer is preempted for a long time
129 * between xchg() and setting the previous node's next pointer.
131 static inline bool _cds_wfcq_empty(cds_wfcq_head_ptr_t u_head
,
132 struct cds_wfcq_tail
*tail
)
134 struct __cds_wfcq_head
*head
= u_head
._h
;
136 * Queue is empty if no node is pointed by head->node.next nor
137 * tail->p. Even though the tail->p check is sufficient to find
138 * out of the queue is empty, we first check head->node.next as a
139 * common case to ensure that dequeuers do not frequently access
140 * enqueuer's tail->p cache line.
142 return CMM_LOAD_SHARED(head
->node
.next
) == NULL
143 && CMM_LOAD_SHARED(tail
->p
) == &head
->node
;
146 static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head
*head
,
147 struct cds_wfcq_tail
*tail
__attribute__((unused
)))
151 ret
= pthread_mutex_lock(&head
->lock
);
152 urcu_posix_assert(!ret
);
155 static inline void _cds_wfcq_dequeue_unlock(struct cds_wfcq_head
*head
,
156 struct cds_wfcq_tail
*tail
__attribute__((unused
)))
160 ret
= pthread_mutex_unlock(&head
->lock
);
161 urcu_posix_assert(!ret
);
164 static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head
,
165 struct cds_wfcq_tail
*tail
,
166 struct cds_wfcq_node
*new_head
,
167 struct cds_wfcq_node
*new_tail
)
169 struct __cds_wfcq_head
*head
= u_head
._h
;
170 struct cds_wfcq_node
*old_tail
;
173 * Implicit memory barrier before uatomic_xchg() orders earlier
174 * stores to data structure containing node and setting
175 * node->next to NULL before publication.
177 old_tail
= uatomic_xchg(&tail
->p
, new_tail
);
180 * Implicit memory barrier after uatomic_xchg() orders store to
181 * q->tail before store to old_tail->next.
183 * At this point, dequeuers see a NULL tail->p->next, which
184 * indicates that the queue is being appended to. The following
185 * store will append "node" to the queue from a dequeuer
188 CMM_STORE_SHARED(old_tail
->next
, new_head
);
190 * Return false if queue was empty prior to adding the node,
193 return old_tail
!= &head
->node
;
197 * cds_wfcq_enqueue: enqueue a node into a wait-free queue.
199 * Issues a full memory barrier before enqueue. No mutual exclusion is
202 * Returns false if the queue was empty prior to adding the node.
203 * Returns true otherwise.
205 static inline bool _cds_wfcq_enqueue(cds_wfcq_head_ptr_t head
,
206 struct cds_wfcq_tail
*tail
,
207 struct cds_wfcq_node
*new_tail
)
209 return ___cds_wfcq_append(head
, tail
, new_tail
, new_tail
);
213 * CDS_WFCQ_WAIT_SLEEP:
215 * By default, this sleeps for the given @msec milliseconds.
216 * This is a macro which LGPL users may #define themselves before
217 * including wfcqueue.h to override the default behavior (e.g.
218 * to log a warning or perform other background work).
220 #ifndef CDS_WFCQ_WAIT_SLEEP
221 #define CDS_WFCQ_WAIT_SLEEP(msec) ___cds_wfcq_wait_sleep(msec)
224 static inline void ___cds_wfcq_wait_sleep(int msec
)
226 (void) poll(NULL
, 0, msec
);
230 * ___cds_wfcq_busy_wait: adaptative busy-wait.
232 * Returns 1 if nonblocking and needs to block, 0 otherwise.
235 ___cds_wfcq_busy_wait(int *attempt
, int blocking
)
239 if (++(*attempt
) >= WFCQ_ADAPT_ATTEMPTS
) {
240 CDS_WFCQ_WAIT_SLEEP(WFCQ_WAIT
); /* Wait for 10ms */
249 * Waiting for enqueuer to complete enqueue and return the next node.
251 static inline struct cds_wfcq_node
*
252 ___cds_wfcq_node_sync_next(struct cds_wfcq_node
*node
, int blocking
)
254 struct cds_wfcq_node
*next
;
258 * Adaptative busy-looping waiting for enqueuer to complete enqueue.
260 while ((next
= CMM_LOAD_SHARED(node
->next
)) == NULL
) {
261 if (___cds_wfcq_busy_wait(&attempt
, blocking
))
262 return CDS_WFCQ_WOULDBLOCK
;
268 static inline struct cds_wfcq_node
*
269 ___cds_wfcq_first(cds_wfcq_head_ptr_t u_head
,
270 struct cds_wfcq_tail
*tail
,
273 struct __cds_wfcq_head
*head
= u_head
._h
;
274 struct cds_wfcq_node
*node
;
276 if (_cds_wfcq_empty(__cds_wfcq_head_cast(head
), tail
))
278 node
= ___cds_wfcq_node_sync_next(&head
->node
, blocking
);
279 /* Load head->node.next before loading node's content */
280 cmm_smp_read_barrier_depends();
285 * __cds_wfcq_first_blocking: get first node of a queue, without dequeuing.
287 * Content written into the node before enqueue is guaranteed to be
288 * consistent, but no other memory ordering is ensured.
289 * Dequeue/splice/iteration mutual exclusion should be ensured by the
292 * Used by for-like iteration macros in urcu/wfqueue.h:
293 * __cds_wfcq_for_each_blocking()
294 * __cds_wfcq_for_each_blocking_safe()
296 * Returns NULL if queue is empty, first node otherwise.
298 static inline struct cds_wfcq_node
*
299 ___cds_wfcq_first_blocking(cds_wfcq_head_ptr_t head
,
300 struct cds_wfcq_tail
*tail
)
302 return ___cds_wfcq_first(head
, tail
, 1);
307 * __cds_wfcq_first_nonblocking: get first node of a queue, without dequeuing.
309 * Same as __cds_wfcq_first_blocking, but returns CDS_WFCQ_WOULDBLOCK if
312 static inline struct cds_wfcq_node
*
313 ___cds_wfcq_first_nonblocking(cds_wfcq_head_ptr_t head
,
314 struct cds_wfcq_tail
*tail
)
316 return ___cds_wfcq_first(head
, tail
, 0);
319 static inline struct cds_wfcq_node
*
320 ___cds_wfcq_next(cds_wfcq_head_ptr_t head
__attribute__((unused
)),
321 struct cds_wfcq_tail
*tail
,
322 struct cds_wfcq_node
*node
,
325 struct cds_wfcq_node
*next
;
328 * Even though the following tail->p check is sufficient to find
329 * out if we reached the end of the queue, we first check
330 * node->next as a common case to ensure that iteration on nodes
331 * do not frequently access enqueuer's tail->p cache line.
333 if ((next
= CMM_LOAD_SHARED(node
->next
)) == NULL
) {
334 /* Load node->next before tail->p */
336 if (CMM_LOAD_SHARED(tail
->p
) == node
)
338 next
= ___cds_wfcq_node_sync_next(node
, blocking
);
340 /* Load node->next before loading next's content */
341 cmm_smp_read_barrier_depends();
346 * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing.
348 * Content written into the node before enqueue is guaranteed to be
349 * consistent, but no other memory ordering is ensured.
350 * Dequeue/splice/iteration mutual exclusion should be ensured by the
353 * Used by for-like iteration macros in urcu/wfqueue.h:
354 * __cds_wfcq_for_each_blocking()
355 * __cds_wfcq_for_each_blocking_safe()
357 * Returns NULL if reached end of queue, non-NULL next queue node
360 static inline struct cds_wfcq_node
*
361 ___cds_wfcq_next_blocking(cds_wfcq_head_ptr_t head
,
362 struct cds_wfcq_tail
*tail
,
363 struct cds_wfcq_node
*node
)
365 return ___cds_wfcq_next(head
, tail
, node
, 1);
369 * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing.
371 * Same as __cds_wfcq_next_blocking, but returns CDS_WFCQ_WOULDBLOCK if
374 static inline struct cds_wfcq_node
*
375 ___cds_wfcq_next_nonblocking(cds_wfcq_head_ptr_t head
,
376 struct cds_wfcq_tail
*tail
,
377 struct cds_wfcq_node
*node
)
379 return ___cds_wfcq_next(head
, tail
, node
, 0);
382 static inline struct cds_wfcq_node
*
383 ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head
,
384 struct cds_wfcq_tail
*tail
,
388 struct __cds_wfcq_head
*head
= u_head
._h
;
389 struct cds_wfcq_node
*node
, *next
;
394 if (_cds_wfcq_empty(__cds_wfcq_head_cast(head
), tail
)) {
398 node
= ___cds_wfcq_node_sync_next(&head
->node
, blocking
);
399 if (!blocking
&& node
== CDS_WFCQ_WOULDBLOCK
) {
400 return CDS_WFCQ_WOULDBLOCK
;
403 if ((next
= CMM_LOAD_SHARED(node
->next
)) == NULL
) {
405 * @node is probably the only node in the queue.
406 * Try to move the tail to &q->head.
407 * q->head.next is set to NULL here, and stays
408 * NULL if the cmpxchg succeeds. Should the
409 * cmpxchg fail due to a concurrent enqueue, the
410 * q->head.next will be set to the next node.
411 * The implicit memory barrier before
412 * uatomic_cmpxchg() orders load node->next
413 * before loading q->tail.
414 * The implicit memory barrier before uatomic_cmpxchg
415 * orders load q->head.next before loading node's
418 _cds_wfcq_node_init(&head
->node
);
419 if (uatomic_cmpxchg(&tail
->p
, node
, &head
->node
) == node
) {
421 *state
|= CDS_WFCQ_STATE_LAST
;
424 next
= ___cds_wfcq_node_sync_next(node
, blocking
);
426 * In nonblocking mode, if we would need to block to
427 * get node's next, set the head next node pointer
428 * (currently NULL) back to its original value.
430 if (!blocking
&& next
== CDS_WFCQ_WOULDBLOCK
) {
431 head
->node
.next
= node
;
432 return CDS_WFCQ_WOULDBLOCK
;
437 * Move queue head forward.
439 head
->node
.next
= next
;
441 /* Load q->head.next before loading node's content */
442 cmm_smp_read_barrier_depends();
447 * __cds_wfcq_dequeue_with_state_blocking: dequeue node from queue, with state.
449 * Content written into the node before enqueue is guaranteed to be
450 * consistent, but no other memory ordering is ensured.
451 * It is valid to reuse and free a dequeued node immediately.
452 * Dequeue/splice/iteration mutual exclusion should be ensured by the
455 static inline struct cds_wfcq_node
*
456 ___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_ptr_t head
,
457 struct cds_wfcq_tail
*tail
, int *state
)
459 return ___cds_wfcq_dequeue_with_state(head
, tail
, state
, 1);
463 * ___cds_wfcq_dequeue_blocking: dequeue node from queue.
465 * Same as __cds_wfcq_dequeue_with_state_blocking, but without saving
468 static inline struct cds_wfcq_node
*
469 ___cds_wfcq_dequeue_blocking(cds_wfcq_head_ptr_t head
,
470 struct cds_wfcq_tail
*tail
)
472 return ___cds_wfcq_dequeue_with_state_blocking(head
, tail
, NULL
);
476 * __cds_wfcq_dequeue_with_state_nonblocking: dequeue node, with state.
478 * Same as __cds_wfcq_dequeue_blocking, but returns CDS_WFCQ_WOULDBLOCK
479 * if it needs to block.
481 static inline struct cds_wfcq_node
*
482 ___cds_wfcq_dequeue_with_state_nonblocking(cds_wfcq_head_ptr_t head
,
483 struct cds_wfcq_tail
*tail
, int *state
)
485 return ___cds_wfcq_dequeue_with_state(head
, tail
, state
, 0);
489 * ___cds_wfcq_dequeue_nonblocking: dequeue node from queue.
491 * Same as __cds_wfcq_dequeue_with_state_nonblocking, but without saving
494 static inline struct cds_wfcq_node
*
495 ___cds_wfcq_dequeue_nonblocking(cds_wfcq_head_ptr_t head
,
496 struct cds_wfcq_tail
*tail
)
498 return ___cds_wfcq_dequeue_with_state_nonblocking(head
, tail
, NULL
);
502 * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q.
504 * Dequeue all nodes from src_q.
505 * dest_q must be already initialized.
506 * Mutual exclusion for src_q should be ensured by the caller as
507 * specified in the "Synchronisation table".
508 * Returns enum cds_wfcq_ret which indicates the state of the src or
511 static inline enum cds_wfcq_ret
513 cds_wfcq_head_ptr_t u_dest_q_head
,
514 struct cds_wfcq_tail
*dest_q_tail
,
515 cds_wfcq_head_ptr_t u_src_q_head
,
516 struct cds_wfcq_tail
*src_q_tail
,
519 struct __cds_wfcq_head
*dest_q_head
= u_dest_q_head
._h
;
520 struct __cds_wfcq_head
*src_q_head
= u_src_q_head
._h
;
521 struct cds_wfcq_node
*head
, *tail
;
525 * Initial emptiness check to speed up cases where queue is
526 * empty: only require loads to check if queue is empty.
528 if (_cds_wfcq_empty(__cds_wfcq_head_cast(src_q_head
), src_q_tail
))
529 return CDS_WFCQ_RET_SRC_EMPTY
;
533 * Open-coded _cds_wfcq_empty() by testing result of
534 * uatomic_xchg, as well as tail pointer vs head node
537 head
= uatomic_xchg(&src_q_head
->node
.next
, NULL
);
539 break; /* non-empty */
540 if (CMM_LOAD_SHARED(src_q_tail
->p
) == &src_q_head
->node
)
541 return CDS_WFCQ_RET_SRC_EMPTY
;
542 if (___cds_wfcq_busy_wait(&attempt
, blocking
))
543 return CDS_WFCQ_RET_WOULDBLOCK
;
547 * Memory barrier implied before uatomic_xchg() orders store to
548 * src_q->head before store to src_q->tail. This is required by
549 * concurrent enqueue on src_q, which exchanges the tail before
550 * updating the previous tail's next pointer.
552 tail
= uatomic_xchg(&src_q_tail
->p
, &src_q_head
->node
);
555 * Append the spliced content of src_q into dest_q. Does not
556 * require mutual exclusion on dest_q (wait-free).
558 if (___cds_wfcq_append(__cds_wfcq_head_cast(dest_q_head
), dest_q_tail
,
560 return CDS_WFCQ_RET_DEST_NON_EMPTY
;
562 return CDS_WFCQ_RET_DEST_EMPTY
;
566 * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
568 * Dequeue all nodes from src_q.
569 * dest_q must be already initialized.
570 * Mutual exclusion for src_q should be ensured by the caller as
571 * specified in the "Synchronisation table".
572 * Returns enum cds_wfcq_ret which indicates the state of the src or
573 * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
575 static inline enum cds_wfcq_ret
576 ___cds_wfcq_splice_blocking(
577 cds_wfcq_head_ptr_t dest_q_head
,
578 struct cds_wfcq_tail
*dest_q_tail
,
579 cds_wfcq_head_ptr_t src_q_head
,
580 struct cds_wfcq_tail
*src_q_tail
)
582 return ___cds_wfcq_splice(dest_q_head
, dest_q_tail
,
583 src_q_head
, src_q_tail
, 1);
587 * __cds_wfcq_splice_nonblocking: enqueue all src_q nodes at the end of dest_q.
589 * Same as __cds_wfcq_splice_blocking, but returns
590 * CDS_WFCQ_RET_WOULDBLOCK if it needs to block.
592 static inline enum cds_wfcq_ret
593 ___cds_wfcq_splice_nonblocking(
594 cds_wfcq_head_ptr_t dest_q_head
,
595 struct cds_wfcq_tail
*dest_q_tail
,
596 cds_wfcq_head_ptr_t src_q_head
,
597 struct cds_wfcq_tail
*src_q_tail
)
599 return ___cds_wfcq_splice(dest_q_head
, dest_q_tail
,
600 src_q_head
, src_q_tail
, 0);
604 * cds_wfcq_dequeue_with_state_blocking: dequeue a node from a wait-free queue.
606 * Content written into the node before enqueue is guaranteed to be
607 * consistent, but no other memory ordering is ensured.
608 * Mutual exclusion with cds_wfcq_splice_blocking and dequeue lock is
610 * It is valid to reuse and free a dequeued node immediately.
612 static inline struct cds_wfcq_node
*
613 _cds_wfcq_dequeue_with_state_blocking(struct cds_wfcq_head
*head
,
614 struct cds_wfcq_tail
*tail
, int *state
)
616 struct cds_wfcq_node
*retval
;
618 _cds_wfcq_dequeue_lock(head
, tail
);
619 retval
= ___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_cast(head
),
621 _cds_wfcq_dequeue_unlock(head
, tail
);
626 * cds_wfcq_dequeue_blocking: dequeue node from queue.
628 * Same as cds_wfcq_dequeue_blocking, but without saving state.
630 static inline struct cds_wfcq_node
*
631 _cds_wfcq_dequeue_blocking(struct cds_wfcq_head
*head
,
632 struct cds_wfcq_tail
*tail
)
634 return _cds_wfcq_dequeue_with_state_blocking(head
, tail
, NULL
);
638 * cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
640 * Dequeue all nodes from src_q.
641 * dest_q must be already initialized.
642 * Content written into the node before enqueue is guaranteed to be
643 * consistent, but no other memory ordering is ensured.
644 * Mutual exclusion with cds_wfcq_dequeue_blocking and dequeue lock is
646 * Returns enum cds_wfcq_ret which indicates the state of the src or
647 * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
649 static inline enum cds_wfcq_ret
650 _cds_wfcq_splice_blocking(
651 struct cds_wfcq_head
*dest_q_head
,
652 struct cds_wfcq_tail
*dest_q_tail
,
653 struct cds_wfcq_head
*src_q_head
,
654 struct cds_wfcq_tail
*src_q_tail
)
656 enum cds_wfcq_ret ret
;
658 _cds_wfcq_dequeue_lock(src_q_head
, src_q_tail
);
659 ret
= ___cds_wfcq_splice_blocking(cds_wfcq_head_cast(dest_q_head
), dest_q_tail
,
660 cds_wfcq_head_cast(src_q_head
), src_q_tail
);
661 _cds_wfcq_dequeue_unlock(src_q_head
, src_q_tail
);
669 #endif /* _URCU_WFCQUEUE_STATIC_H */