Provide cleanup interfaces for per-CPU and per-thread call_rcu threads
[urcu.git] / urcu-call-rcu.c
CommitLineData
b57aee66
PM
1/*
2 * urcu-call-rcu.c
3 *
4 * Userspace RCU library - batch memory reclamation with kernel API
5 *
6 * Copyright (c) 2010 Paul E. McKenney <paulmck@linux.vnet.ibm.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23#include <stdio.h>
24#include <pthread.h>
25#include <signal.h>
26#include <assert.h>
27#include <stdlib.h>
28#include <string.h>
29#include <errno.h>
30#include <poll.h>
31#include <sys/time.h>
32#include <syscall.h>
33#include <unistd.h>
34
35#include "config.h"
36#include "urcu/wfqueue.h"
37#include "urcu-call-rcu.h"
38#include "urcu-pointer.h"
3c24913f 39#include "urcu/list.h"
b57aee66
PM
40
41/* Data structure that identifies a call_rcu thread. */
42
43struct call_rcu_data {
44 struct cds_wfq_queue cbs;
45 unsigned long flags;
46 pthread_mutex_t mtx;
47 pthread_cond_t cond;
48 unsigned long qlen;
49 pthread_t tid;
3c24913f 50 struct cds_list_head list;
b57aee66
PM
51} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
52
3c24913f
PM
53/*
54 * List of all call_rcu_data structures to keep valgrind happy.
55 * Protected by call_rcu_mutex.
56 */
57
58CDS_LIST_HEAD(call_rcu_data_list);
59
b57aee66
PM
60/* Link a thread using call_rcu() to its call_rcu thread. */
61
62static __thread struct call_rcu_data *thread_call_rcu_data;
63
64/* Guard call_rcu thread creation. */
65
66static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER;
67
68/* If a given thread does not have its own call_rcu thread, this is default. */
69
70static struct call_rcu_data *default_call_rcu_data;
71
72extern void synchronize_rcu(void);
73
74/*
75 * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
76 * available, then we can have call_rcu threads assigned to individual
77 * CPUs rather than only to specific threads.
78 */
79
80#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
81
82/*
83 * Pointer to array of pointers to per-CPU call_rcu_data structures
84 * and # CPUs.
85 */
86
87static struct call_rcu_data **per_cpu_call_rcu_data;
88static long maxcpus;
89
90/* Allocate the array if it has not already been allocated. */
91
92static void alloc_cpu_call_rcu_data(void)
93{
94 struct call_rcu_data **p;
95 static int warned = 0;
96
97 if (maxcpus != 0)
98 return;
99 maxcpus = sysconf(_SC_NPROCESSORS_CONF);
100 if (maxcpus <= 0) {
101 return;
102 }
103 p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data));
104 if (p != NULL) {
105 memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data));
106 per_cpu_call_rcu_data = p;
107 } else {
108 if (!warned) {
109 fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n");
110 }
111 warned = 1;
112 }
113}
114
115#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
116
117static const struct call_rcu_data **per_cpu_call_rcu_data = NULL;
118static const long maxcpus = -1;
119
120static void alloc_cpu_call_rcu_data(void)
121{
122}
123
124static int sched_getcpu(void)
125{
126 return -1;
127}
128
129#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
130
131/* Acquire the specified pthread mutex. */
132
133static void call_rcu_lock(pthread_mutex_t *pmp)
134{
135 if (pthread_mutex_lock(pmp) != 0) {
136 perror("pthread_mutex_lock");
137 exit(-1);
138 }
139}
140
141/* Release the specified pthread mutex. */
142
143static void call_rcu_unlock(pthread_mutex_t *pmp)
144{
145 if (pthread_mutex_unlock(pmp) != 0) {
146 perror("pthread_mutex_unlock");
147 exit(-1);
148 }
149}
150
151/* This is the code run by each call_rcu thread. */
152
153static void *call_rcu_thread(void *arg)
154{
155 unsigned long cbcount;
156 struct cds_wfq_node *cbs;
157 struct cds_wfq_node **cbs_tail;
158 struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
159 struct rcu_head *rhp;
160
161 thread_call_rcu_data = crdp;
162 for (;;) {
163 if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
164 while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
165 poll(NULL, 0, 1);
166 _CMM_STORE_SHARED(crdp->cbs.head, NULL);
167 cbs_tail = (struct cds_wfq_node **)
168 uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
169 synchronize_rcu();
170 cbcount = 0;
171 do {
172 while (cbs->next == NULL &&
173 &cbs->next != cbs_tail)
174 poll(NULL, 0, 1);
175 if (cbs == &crdp->cbs.dummy) {
176 cbs = cbs->next;
177 continue;
178 }
179 rhp = (struct rcu_head *)cbs;
180 cbs = cbs->next;
181 rhp->func(rhp);
182 cbcount++;
183 } while (cbs != NULL);
184 uatomic_sub(&crdp->qlen, cbcount);
185 }
7106ddf8
PM
186 if (crdp->flags & URCU_CALL_RCU_STOP)
187 break;
b57aee66
PM
188 if (crdp->flags & URCU_CALL_RCU_RT)
189 poll(NULL, 0, 10);
190 else {
191 call_rcu_lock(&crdp->mtx);
192 _CMM_STORE_SHARED(crdp->flags,
193 crdp->flags & ~URCU_CALL_RCU_RUNNING);
194 if (&crdp->cbs.head ==
195 _CMM_LOAD_SHARED(crdp->cbs.tail) &&
196 pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
197 perror("pthread_cond_wait");
198 exit(-1);
199 }
200 _CMM_STORE_SHARED(crdp->flags,
201 crdp->flags | URCU_CALL_RCU_RUNNING);
202 poll(NULL, 0, 10);
203 call_rcu_unlock(&crdp->mtx);
204 }
205 }
7106ddf8
PM
206 call_rcu_lock(&crdp->mtx);
207 crdp->flags |= URCU_CALL_RCU_STOPPED;
208 call_rcu_unlock(&crdp->mtx);
209 return NULL;
b57aee66
PM
210}
211
212/*
213 * Create both a call_rcu thread and the corresponding call_rcu_data
3c24913f
PM
214 * structure, linking the structure in as specified. Caller must hold
215 * call_rcu_mutex.
b57aee66
PM
216 */
217
3c24913f
PM
218static void call_rcu_data_init(struct call_rcu_data **crdpp,
219 unsigned long flags)
b57aee66
PM
220{
221 struct call_rcu_data *crdp;
222
223 crdp = malloc(sizeof(*crdp));
224 if (crdp == NULL) {
225 fprintf(stderr, "Out of memory.\n");
226 exit(-1);
227 }
228 memset(crdp, '\0', sizeof(*crdp));
229 cds_wfq_init(&crdp->cbs);
230 crdp->qlen = 0;
231 if (pthread_mutex_init(&crdp->mtx, NULL) != 0) {
232 perror("pthread_mutex_init");
233 exit(-1);
234 }
235 if (pthread_cond_init(&crdp->cond, NULL) != 0) {
236 perror("pthread_cond_init");
237 exit(-1);
238 }
239 crdp->flags = flags | URCU_CALL_RCU_RUNNING;
3c24913f 240 cds_list_add(&crdp->list, &call_rcu_data_list);
b57aee66
PM
241 cmm_smp_mb(); /* Structure initialized before pointer is planted. */
242 *crdpp = crdp;
243 if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) {
244 perror("pthread_create");
245 exit(-1);
246 }
247}
248
249/*
250 * Return a pointer to the call_rcu_data structure for the specified
251 * CPU, returning NULL if there is none. We cannot automatically
252 * created it because the platform we are running on might not define
253 * sched_getcpu().
254 */
255
256struct call_rcu_data *get_cpu_call_rcu_data(int cpu)
257{
258 static int warned = 0;
259
260 if (per_cpu_call_rcu_data == NULL)
261 return NULL;
262 if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) {
263 fprintf(stderr, "[error] liburcu: get CPU # out of range\n");
264 warned = 1;
265 }
266 if (cpu < 0 || maxcpus <= cpu)
267 return NULL;
268 return per_cpu_call_rcu_data[cpu];
269}
270
271/*
272 * Return the tid corresponding to the call_rcu thread whose
273 * call_rcu_data structure is specified.
274 */
275
276pthread_t get_call_rcu_thread(struct call_rcu_data *crdp)
277{
278 return crdp->tid;
279}
280
281/*
282 * Create a call_rcu_data structure (with thread) and return a pointer.
283 */
284
3c24913f 285static struct call_rcu_data *__create_call_rcu_data(unsigned long flags)
b57aee66
PM
286{
287 struct call_rcu_data *crdp;
288
289 call_rcu_data_init(&crdp, flags);
290 return crdp;
291}
292
3c24913f
PM
293struct call_rcu_data *create_call_rcu_data(unsigned long flags)
294{
295 struct call_rcu_data *crdp;
296
297 call_rcu_lock(&call_rcu_mutex);
298 crdp = __create_call_rcu_data(flags);
299 call_rcu_unlock(&call_rcu_mutex);
300 return crdp;
301}
302
b57aee66
PM
303/*
304 * Set the specified CPU to use the specified call_rcu_data structure.
7106ddf8
PM
305 *
306 * Use NULL to remove a CPU's call_rcu_data structure, but it is
307 * the caller's responsibility to dispose of the removed structure.
308 * Use get_cpu_call_rcu_data() to obtain a pointer to the old structure
309 * (prior to NULLing it out, of course).
b57aee66
PM
310 */
311
312int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp)
313{
314 int warned = 0;
315
316 call_rcu_lock(&call_rcu_mutex);
317 if (cpu < 0 || maxcpus <= cpu) {
318 if (!warned) {
319 fprintf(stderr, "[error] liburcu: set CPU # out of range\n");
320 warned = 1;
321 }
322 call_rcu_unlock(&call_rcu_mutex);
323 errno = EINVAL;
324 return -EINVAL;
325 }
326 alloc_cpu_call_rcu_data();
327 call_rcu_unlock(&call_rcu_mutex);
328 if (per_cpu_call_rcu_data == NULL) {
329 errno = ENOMEM;
330 return -ENOMEM;
331 }
332 per_cpu_call_rcu_data[cpu] = crdp;
333 return 0;
334}
335
336/*
337 * Return a pointer to the default call_rcu_data structure, creating
338 * one if need be. Because we never free call_rcu_data structures,
339 * we don't need to be in an RCU read-side critical section.
340 */
341
342struct call_rcu_data *get_default_call_rcu_data(void)
343{
344 if (default_call_rcu_data != NULL)
345 return rcu_dereference(default_call_rcu_data);
346 call_rcu_lock(&call_rcu_mutex);
347 if (default_call_rcu_data != NULL) {
348 call_rcu_unlock(&call_rcu_mutex);
349 return default_call_rcu_data;
350 }
351 call_rcu_data_init(&default_call_rcu_data, 0);
352 call_rcu_unlock(&call_rcu_mutex);
353 return default_call_rcu_data;
354}
355
356/*
357 * Return the call_rcu_data structure that applies to the currently
358 * running thread. Any call_rcu_data structure assigned specifically
359 * to this thread has first priority, followed by any call_rcu_data
360 * structure assigned to the CPU on which the thread is running,
361 * followed by the default call_rcu_data structure. If there is not
362 * yet a default call_rcu_data structure, one will be created.
363 */
364struct call_rcu_data *get_call_rcu_data(void)
365{
366 int curcpu;
367 static int warned = 0;
368
369 if (thread_call_rcu_data != NULL)
370 return thread_call_rcu_data;
371 if (maxcpus <= 0)
372 return get_default_call_rcu_data();
373 curcpu = sched_getcpu();
374 if (!warned && (curcpu < 0 || maxcpus <= curcpu)) {
375 fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n");
376 warned = 1;
377 }
378 if (curcpu >= 0 && maxcpus > curcpu &&
379 per_cpu_call_rcu_data != NULL &&
380 per_cpu_call_rcu_data[curcpu] != NULL)
381 return per_cpu_call_rcu_data[curcpu];
382 return get_default_call_rcu_data();
383}
384
385/*
386 * Return a pointer to this task's call_rcu_data if there is one.
387 */
388
389struct call_rcu_data *get_thread_call_rcu_data(void)
390{
391 return thread_call_rcu_data;
392}
393
394/*
395 * Set this task's call_rcu_data structure as specified, regardless
396 * of whether or not this task already had one. (This allows switching
397 * to and from real-time call_rcu threads, for example.)
7106ddf8
PM
398 *
399 * Use NULL to remove a thread's call_rcu_data structure, but it is
400 * the caller's responsibility to dispose of the removed structure.
401 * Use get_thread_call_rcu_data() to obtain a pointer to the old structure
402 * (prior to NULLing it out, of course).
b57aee66
PM
403 */
404
405void set_thread_call_rcu_data(struct call_rcu_data *crdp)
406{
407 thread_call_rcu_data = crdp;
408}
409
410/*
411 * Create a separate call_rcu thread for each CPU. This does not
412 * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data()
413 * function if you want that behavior.
414 */
415
416int create_all_cpu_call_rcu_data(unsigned long flags)
417{
418 int i;
419 struct call_rcu_data *crdp;
420 int ret;
421
422 call_rcu_lock(&call_rcu_mutex);
423 alloc_cpu_call_rcu_data();
424 call_rcu_unlock(&call_rcu_mutex);
425 if (maxcpus <= 0) {
426 errno = EINVAL;
427 return -EINVAL;
428 }
429 if (per_cpu_call_rcu_data == NULL) {
430 errno = ENOMEM;
431 return -ENOMEM;
432 }
433 for (i = 0; i < maxcpus; i++) {
434 call_rcu_lock(&call_rcu_mutex);
435 if (get_cpu_call_rcu_data(i)) {
436 call_rcu_unlock(&call_rcu_mutex);
437 continue;
438 }
3c24913f 439 crdp = __create_call_rcu_data(flags);
b57aee66
PM
440 if (crdp == NULL) {
441 call_rcu_unlock(&call_rcu_mutex);
442 errno = ENOMEM;
443 return -ENOMEM;
444 }
445 call_rcu_unlock(&call_rcu_mutex);
446 if ((ret = set_cpu_call_rcu_data(i, crdp)) != 0) {
447 /* FIXME: Leaks crdp for now. */
448 return ret; /* Can happen on race. */
449 }
450 }
451 return 0;
452}
453
7106ddf8
PM
454/*
455 * Wake up the call_rcu thread corresponding to the specified
456 * call_rcu_data structure.
457 */
458static void wake_call_rcu_thread(struct call_rcu_data *crdp)
459{
460 if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
461 call_rcu_lock(&crdp->mtx);
462 if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
463 if (pthread_cond_signal(&crdp->cond) != 0) {
464 perror("pthread_cond_signal");
465 exit(-1);
466 }
467 }
468 call_rcu_unlock(&crdp->mtx);
469 }
470}
471
b57aee66
PM
472/*
473 * Schedule a function to be invoked after a following grace period.
474 * This is the only function that must be called -- the others are
475 * only present to allow applications to tune their use of RCU for
476 * maximum performance.
477 *
478 * Note that unless a call_rcu thread has not already been created,
479 * the first invocation of call_rcu() will create one. So, if you
480 * need the first invocation of call_rcu() to be fast, make sure
481 * to create a call_rcu thread first. One way to accomplish this is
482 * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data().
483 */
484
485void call_rcu(struct rcu_head *head,
486 void (*func)(struct rcu_head *head))
487{
488 struct call_rcu_data *crdp;
489
490 cds_wfq_node_init(&head->next);
491 head->func = func;
492 crdp = get_call_rcu_data();
493 cds_wfq_enqueue(&crdp->cbs, &head->next);
494 uatomic_inc(&crdp->qlen);
7106ddf8
PM
495 wake_call_rcu_thread(crdp);
496}
497
498/*
499 * Free up the specified call_rcu_data structure, terminating the
500 * associated call_rcu thread. The caller must have previously
501 * removed the call_rcu_data structure from per-thread or per-CPU
502 * usage. For example, set_cpu_call_rcu_data(cpu, NULL) for per-CPU
503 * call_rcu_data structures or set_thread_call_rcu_data(NULL) for
504 * per-thread call_rcu_data structures.
505 *
506 * We silently refuse to free up the default call_rcu_data structure
507 * because that is where we put any leftover callbacks. Note that
508 * the possibility of self-spawning callbacks makes it impossible
509 * to execute all the callbacks in finite time without putting any
510 * newly spawned callbacks somewhere else. The "somewhere else" of
511 * last resort is the default call_rcu_data structure.
512 *
513 * We also silently refuse to free NULL pointers. This simplifies
514 * the calling code.
515 */
516void call_rcu_data_free(struct call_rcu_data *crdp)
517{
518 struct cds_wfq_node *cbs;
519 struct cds_wfq_node **cbs_tail;
520 struct cds_wfq_node **cbs_endprev;
521
522 if (crdp == NULL || crdp == default_call_rcu_data) {
523 return;
524 }
525 if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) {
b57aee66 526 call_rcu_lock(&crdp->mtx);
7106ddf8 527 crdp->flags |= URCU_CALL_RCU_STOP;
b57aee66 528 call_rcu_unlock(&crdp->mtx);
7106ddf8
PM
529 wake_call_rcu_thread(crdp);
530 while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0)
531 poll(NULL, 0, 1);
532 }
533 if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
534 while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
535 poll(NULL, 0, 1);
536 _CMM_STORE_SHARED(crdp->cbs.head, NULL);
537 cbs_tail = (struct cds_wfq_node **)
538 uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
539 cbs_endprev = (struct cds_wfq_node **)
540 uatomic_xchg(&default_call_rcu_data, cbs_tail);
541 *cbs_endprev = cbs;
542 uatomic_add(&default_call_rcu_data->qlen,
543 uatomic_read(&crdp->qlen));
544 cds_list_del(&crdp->list);
545 free(crdp);
546 }
547}
548
549/*
550 * Clean up all the per-CPU call_rcu threads.
551 */
552void free_all_cpu_call_rcu_data(void)
553{
554 int cpu;
555 struct call_rcu_data *crdp;
556
557 if (maxcpus <= 0)
558 return;
559 for (cpu = 0; cpu < maxcpus; cpu++) {
560 crdp = get_cpu_call_rcu_data(cpu);
561 if (crdp == NULL)
562 continue;
563 set_cpu_call_rcu_data(cpu, NULL);
564 call_rcu_data_free(crdp);
565 }
566}
567
568/*
569 * Clean up call_rcu data structures in the child of a successful fork()
570 * that is not followed by exec().
571 */
572void call_rcu_after_fork_child(void)
573{
574 struct call_rcu_data *crdp;
575
576 /*
577 * Allocate a new default call_rcu_data structure in order
578 * to get a working call_rcu thread to go with it.
579 */
580 default_call_rcu_data = NULL;
581 (void)get_default_call_rcu_data();
582
583 /* Dispose of all of the rest of the call_rcu_data structures. */
584 while (call_rcu_data_list.next != call_rcu_data_list.prev) {
585 crdp = cds_list_entry(call_rcu_data_list.prev,
586 struct call_rcu_data, list);
587 if (crdp == default_call_rcu_data)
588 crdp = cds_list_entry(crdp->list.prev,
589 struct call_rcu_data, list);
590 crdp->flags = URCU_CALL_RCU_STOPPED;
591 call_rcu_data_free(crdp);
b57aee66
PM
592 }
593}
This page took 0.043946 seconds and 4 git commands to generate.