Commit | Line | Data |
---|---|---|
8ad4ce58 MD |
1 | #ifndef _URCU_WFCQUEUE_STATIC_H |
2 | #define _URCU_WFCQUEUE_STATIC_H | |
3 | ||
4 | /* | |
5 | * wfcqueue-static.h | |
6 | * | |
7 | * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue | |
8 | * | |
9 | * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See wfcqueue.h for linking | |
10 | * dynamically with the userspace rcu library. | |
11 | * | |
12 | * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com> | |
13 | * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com> | |
14 | * | |
15 | * This library is free software; you can redistribute it and/or | |
16 | * modify it under the terms of the GNU Lesser General Public | |
17 | * License as published by the Free Software Foundation; either | |
18 | * version 2.1 of the License, or (at your option) any later version. | |
19 | * | |
20 | * This library is distributed in the hope that it will be useful, | |
21 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
22 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
23 | * Lesser General Public License for more details. | |
24 | * | |
25 | * You should have received a copy of the GNU Lesser General Public | |
26 | * License along with this library; if not, write to the Free Software | |
27 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
28 | */ | |
29 | ||
30 | #include <pthread.h> | |
31 | #include <assert.h> | |
32 | #include <poll.h> | |
33 | #include <stdbool.h> | |
34 | #include <urcu/compiler.h> | |
35 | #include <urcu/uatomic.h> | |
36 | ||
37 | #ifdef __cplusplus | |
38 | extern "C" { | |
39 | #endif | |
40 | ||
41 | /* | |
42 | * Concurrent queue with wait-free enqueue/blocking dequeue. | |
43 | * | |
44 | * Inspired from half-wait-free/half-blocking queue implementation done by | |
45 | * Paul E. McKenney. | |
46 | * | |
47 | * Mutual exclusion of __cds_wfcq_* API | |
48 | * | |
49 | * Unless otherwise stated, the caller must ensure mutual exclusion of | |
50 | * queue update operations "dequeue" and "splice" (for source queue). | |
f94061a3 MD |
51 | * Queue read operations "first" and "next", which are used by |
52 | * "for_each" iterations, need to be protected against concurrent | |
53 | * "dequeue" and "splice" (for source queue) by the caller. | |
8ad4ce58 MD |
54 | * "enqueue", "splice" (for destination queue), and "empty" are the only |
55 | * operations that can be used without any mutual exclusion. | |
56 | * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock(). | |
57 | * | |
58 | * For convenience, cds_wfcq_dequeue_blocking() and | |
59 | * cds_wfcq_splice_blocking() hold the dequeue lock. | |
60 | */ | |
61 | ||
62 | #define WFCQ_ADAPT_ATTEMPTS 10 /* Retry if being set */ | |
63 | #define WFCQ_WAIT 10 /* Wait 10 ms if being set */ | |
64 | ||
65 | /* | |
66 | * cds_wfcq_node_init: initialize wait-free queue node. | |
67 | */ | |
68 | static inline void _cds_wfcq_node_init(struct cds_wfcq_node *node) | |
69 | { | |
70 | node->next = NULL; | |
71 | } | |
72 | ||
73 | /* | |
74 | * cds_wfcq_init: initialize wait-free queue. | |
75 | */ | |
76 | static inline void _cds_wfcq_init(struct cds_wfcq_head *head, | |
77 | struct cds_wfcq_tail *tail) | |
78 | { | |
79 | int ret; | |
80 | ||
81 | /* Set queue head and tail */ | |
82 | _cds_wfcq_node_init(&head->node); | |
83 | tail->p = &head->node; | |
84 | ret = pthread_mutex_init(&head->lock, NULL); | |
85 | assert(!ret); | |
86 | } | |
87 | ||
88 | /* | |
89 | * cds_wfcq_empty: return whether wait-free queue is empty. | |
90 | * | |
91 | * No memory barrier is issued. No mutual exclusion is required. | |
92 | */ | |
93 | static inline bool _cds_wfcq_empty(struct cds_wfcq_head *head, | |
94 | struct cds_wfcq_tail *tail) | |
95 | { | |
96 | /* | |
97 | * Queue is empty if no node is pointed by head->node.next nor | |
98 | * tail->p. Even though the tail->p check is sufficient to find | |
99 | * out of the queue is empty, we first check head->node.next as a | |
100 | * common case to ensure that dequeuers do not frequently access | |
101 | * enqueuer's tail->p cache line. | |
102 | */ | |
103 | return CMM_LOAD_SHARED(head->node.next) == NULL | |
104 | && CMM_LOAD_SHARED(tail->p) == &head->node; | |
105 | } | |
106 | ||
107 | static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head *head, | |
108 | struct cds_wfcq_tail *tail) | |
109 | { | |
110 | int ret; | |
111 | ||
112 | ret = pthread_mutex_lock(&head->lock); | |
113 | assert(!ret); | |
114 | } | |
115 | ||
116 | static inline void _cds_wfcq_dequeue_unlock(struct cds_wfcq_head *head, | |
117 | struct cds_wfcq_tail *tail) | |
118 | { | |
119 | int ret; | |
120 | ||
121 | ret = pthread_mutex_unlock(&head->lock); | |
122 | assert(!ret); | |
123 | } | |
124 | ||
125 | static inline void ___cds_wfcq_append(struct cds_wfcq_head *head, | |
126 | struct cds_wfcq_tail *tail, | |
127 | struct cds_wfcq_node *new_head, | |
128 | struct cds_wfcq_node *new_tail) | |
129 | { | |
130 | struct cds_wfcq_node *old_tail; | |
131 | ||
132 | /* | |
133 | * Implicit memory barrier before uatomic_xchg() orders earlier | |
134 | * stores to data structure containing node and setting | |
135 | * node->next to NULL before publication. | |
136 | */ | |
137 | old_tail = uatomic_xchg(&tail->p, new_tail); | |
138 | ||
139 | /* | |
140 | * Implicit memory barrier after uatomic_xchg() orders store to | |
141 | * q->tail before store to old_tail->next. | |
142 | * | |
143 | * At this point, dequeuers see a NULL tail->p->next, which | |
144 | * indicates that the queue is being appended to. The following | |
145 | * store will append "node" to the queue from a dequeuer | |
146 | * perspective. | |
147 | */ | |
148 | CMM_STORE_SHARED(old_tail->next, new_head); | |
149 | } | |
150 | ||
151 | /* | |
152 | * cds_wfcq_enqueue: enqueue a node into a wait-free queue. | |
153 | * | |
154 | * Issues a full memory barrier before enqueue. No mutual exclusion is | |
155 | * required. | |
156 | */ | |
157 | static inline void _cds_wfcq_enqueue(struct cds_wfcq_head *head, | |
158 | struct cds_wfcq_tail *tail, | |
159 | struct cds_wfcq_node *new_tail) | |
160 | { | |
161 | ___cds_wfcq_append(head, tail, new_tail, new_tail); | |
162 | } | |
163 | ||
164 | /* | |
165 | * Waiting for enqueuer to complete enqueue and return the next node. | |
166 | */ | |
167 | static inline struct cds_wfcq_node * | |
168 | ___cds_wfcq_node_sync_next(struct cds_wfcq_node *node) | |
169 | { | |
170 | struct cds_wfcq_node *next; | |
171 | int attempt = 0; | |
172 | ||
173 | /* | |
174 | * Adaptative busy-looping waiting for enqueuer to complete enqueue. | |
175 | */ | |
176 | while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { | |
177 | if (++attempt >= WFCQ_ADAPT_ATTEMPTS) { | |
178 | poll(NULL, 0, WFCQ_WAIT); /* Wait for 10ms */ | |
179 | attempt = 0; | |
180 | } else { | |
181 | caa_cpu_relax(); | |
182 | } | |
183 | } | |
184 | ||
185 | return next; | |
186 | } | |
187 | ||
188 | /* | |
189 | * __cds_wfcq_first_blocking: get first node of a queue, without dequeuing. | |
190 | * | |
191 | * Content written into the node before enqueue is guaranteed to be | |
192 | * consistent, but no other memory ordering is ensured. | |
193 | * Should be called with cds_wfcq_dequeue_lock() held. | |
f94061a3 MD |
194 | * |
195 | * Used by for-like iteration macros in urcu/wfqueue.h: | |
196 | * __cds_wfcq_for_each_blocking() | |
197 | * __cds_wfcq_for_each_blocking_safe() | |
8ad4ce58 MD |
198 | */ |
199 | static inline struct cds_wfcq_node * | |
200 | ___cds_wfcq_first_blocking(struct cds_wfcq_head *head, | |
201 | struct cds_wfcq_tail *tail) | |
202 | { | |
203 | struct cds_wfcq_node *node; | |
204 | ||
205 | if (_cds_wfcq_empty(head, tail)) | |
206 | return NULL; | |
207 | node = ___cds_wfcq_node_sync_next(&head->node); | |
208 | /* Load head->node.next before loading node's content */ | |
209 | cmm_smp_read_barrier_depends(); | |
210 | return node; | |
211 | } | |
212 | ||
213 | /* | |
214 | * __cds_wfcq_next_blocking: get next node of a queue, without dequeuing. | |
215 | * | |
216 | * Content written into the node before enqueue is guaranteed to be | |
217 | * consistent, but no other memory ordering is ensured. | |
218 | * Should be called with cds_wfcq_dequeue_lock() held. | |
f94061a3 MD |
219 | * |
220 | * Used by for-like iteration macros in urcu/wfqueue.h: | |
221 | * __cds_wfcq_for_each_blocking() | |
222 | * __cds_wfcq_for_each_blocking_safe() | |
8ad4ce58 MD |
223 | */ |
224 | static inline struct cds_wfcq_node * | |
225 | ___cds_wfcq_next_blocking(struct cds_wfcq_head *head, | |
226 | struct cds_wfcq_tail *tail, | |
227 | struct cds_wfcq_node *node) | |
228 | { | |
229 | struct cds_wfcq_node *next; | |
230 | ||
231 | /* | |
232 | * Even though the following tail->p check is sufficient to find | |
233 | * out if we reached the end of the queue, we first check | |
234 | * node->next as a common case to ensure that iteration on nodes | |
235 | * do not frequently access enqueuer's tail->p cache line. | |
236 | */ | |
237 | if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { | |
238 | /* Load node->next before tail->p */ | |
239 | cmm_smp_rmb(); | |
240 | if (CMM_LOAD_SHARED(tail->p) == node) | |
241 | return NULL; | |
242 | next = ___cds_wfcq_node_sync_next(node); | |
243 | } | |
244 | /* Load node->next before loading next's content */ | |
245 | cmm_smp_read_barrier_depends(); | |
246 | return next; | |
247 | } | |
248 | ||
249 | /* | |
250 | * __cds_wfcq_dequeue_blocking: dequeue a node from the queue. | |
251 | * | |
252 | * No need to go on a waitqueue here, as there is no possible state in which the | |
253 | * list could cause dequeue to busy-loop needlessly while waiting for another | |
254 | * thread to be scheduled. The queue appears empty until tail->next is set by | |
255 | * enqueue. | |
256 | * | |
257 | * Content written into the node before enqueue is guaranteed to be | |
258 | * consistent, but no other memory ordering is ensured. | |
259 | * It is valid to reuse and free a dequeued node immediately. | |
260 | * Should be called with cds_wfcq_dequeue_lock() held. | |
261 | */ | |
262 | static inline struct cds_wfcq_node * | |
263 | ___cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head, | |
264 | struct cds_wfcq_tail *tail) | |
265 | { | |
266 | struct cds_wfcq_node *node, *next; | |
267 | ||
268 | if (_cds_wfcq_empty(head, tail)) | |
269 | return NULL; | |
270 | ||
271 | node = ___cds_wfcq_node_sync_next(&head->node); | |
272 | ||
273 | if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { | |
274 | /* | |
275 | * @node is probably the only node in the queue. | |
276 | * Try to move the tail to &q->head. | |
277 | * q->head.next is set to NULL here, and stays | |
278 | * NULL if the cmpxchg succeeds. Should the | |
279 | * cmpxchg fail due to a concurrent enqueue, the | |
280 | * q->head.next will be set to the next node. | |
281 | * The implicit memory barrier before | |
282 | * uatomic_cmpxchg() orders load node->next | |
283 | * before loading q->tail. | |
284 | * The implicit memory barrier before uatomic_cmpxchg | |
285 | * orders load q->head.next before loading node's | |
286 | * content. | |
287 | */ | |
288 | _cds_wfcq_node_init(&head->node); | |
289 | if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) | |
290 | return node; | |
291 | next = ___cds_wfcq_node_sync_next(node); | |
292 | } | |
293 | ||
294 | /* | |
295 | * Move queue head forward. | |
296 | */ | |
297 | head->node.next = next; | |
298 | ||
299 | /* Load q->head.next before loading node's content */ | |
300 | cmm_smp_read_barrier_depends(); | |
301 | return node; | |
302 | } | |
303 | ||
304 | /* | |
305 | * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. | |
306 | * | |
307 | * Dequeue all nodes from src_q. | |
308 | * dest_q must be already initialized. | |
309 | * Should be called with cds_wfcq_dequeue_lock() held on src_q. | |
310 | */ | |
311 | static inline void | |
312 | ___cds_wfcq_splice_blocking( | |
313 | struct cds_wfcq_head *dest_q_head, | |
314 | struct cds_wfcq_tail *dest_q_tail, | |
315 | struct cds_wfcq_head *src_q_head, | |
316 | struct cds_wfcq_tail *src_q_tail) | |
317 | { | |
318 | struct cds_wfcq_node *head, *tail; | |
319 | ||
320 | if (_cds_wfcq_empty(src_q_head, src_q_tail)) | |
321 | return; | |
322 | ||
323 | head = ___cds_wfcq_node_sync_next(&src_q_head->node); | |
324 | _cds_wfcq_node_init(&src_q_head->node); | |
325 | ||
326 | /* | |
327 | * Memory barrier implied before uatomic_xchg() orders store to | |
328 | * src_q->head before store to src_q->tail. This is required by | |
329 | * concurrent enqueue on src_q, which exchanges the tail before | |
330 | * updating the previous tail's next pointer. | |
331 | */ | |
332 | tail = uatomic_xchg(&src_q_tail->p, &src_q_head->node); | |
333 | ||
334 | /* | |
335 | * Append the spliced content of src_q into dest_q. Does not | |
336 | * require mutual exclusion on dest_q (wait-free). | |
337 | */ | |
338 | ___cds_wfcq_append(dest_q_head, dest_q_tail, head, tail); | |
339 | } | |
340 | ||
341 | /* | |
342 | * cds_wfcq_dequeue_blocking: dequeue a node from a wait-free queue. | |
343 | * | |
344 | * Content written into the node before enqueue is guaranteed to be | |
345 | * consistent, but no other memory ordering is ensured. | |
346 | * Mutual exlusion with (and only with) cds_wfcq_splice_blocking is | |
347 | * ensured. | |
348 | * It is valid to reuse and free a dequeued node immediately. | |
349 | */ | |
350 | static inline struct cds_wfcq_node * | |
351 | _cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head, | |
352 | struct cds_wfcq_tail *tail) | |
353 | { | |
354 | struct cds_wfcq_node *retval; | |
355 | ||
356 | _cds_wfcq_dequeue_lock(head, tail); | |
357 | retval = ___cds_wfcq_dequeue_blocking(head, tail); | |
358 | _cds_wfcq_dequeue_unlock(head, tail); | |
359 | return retval; | |
360 | } | |
361 | ||
362 | /* | |
363 | * cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q. | |
364 | * | |
365 | * Dequeue all nodes from src_q. | |
366 | * dest_q must be already initialized. | |
367 | * Content written into the node before enqueue is guaranteed to be | |
368 | * consistent, but no other memory ordering is ensured. | |
369 | * Mutual exlusion with (and only with) cds_wfcq_dequeue_blocking is | |
370 | * ensured. | |
371 | */ | |
372 | static inline void | |
373 | _cds_wfcq_splice_blocking( | |
374 | struct cds_wfcq_head *dest_q_head, | |
375 | struct cds_wfcq_tail *dest_q_tail, | |
376 | struct cds_wfcq_head *src_q_head, | |
377 | struct cds_wfcq_tail *src_q_tail) | |
378 | { | |
379 | _cds_wfcq_dequeue_lock(src_q_head, src_q_tail); | |
380 | ___cds_wfcq_splice_blocking(dest_q_head, dest_q_tail, | |
381 | src_q_head, src_q_tail); | |
382 | _cds_wfcq_dequeue_unlock(src_q_head, src_q_tail); | |
383 | } | |
384 | ||
385 | #ifdef __cplusplus | |
386 | } | |
387 | #endif | |
388 | ||
389 | #endif /* _URCU_WFCQUEUE_STATIC_H */ |