From: Mathieu Desnoyers Date: Thu, 29 Sep 2011 21:13:48 +0000 (-0400) Subject: urcu call_rcu: Use RCU read-side protection for per-cpu call_rcu data X-Git-Tag: v0.6.5~3 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=618b25958fec4d76310f0d9c59e42128e73a8719;p=urcu.git urcu call_rcu: Use RCU read-side protection for per-cpu call_rcu data A concurrent get_cpu_call_rcu_data(), called by get_call_rcu_data(), could dereference this pointer without holding any mutex. So this situation would happen if we have a concurrent call_rcu() executing while we do the create_all_cpu_call_rcu_data(). I think we would need to put a rcu_dereference() around per_cpu_call_rcu_data read within get_cpu_call_rcu_data() too. per_cpu_call_rcu_data should be done with rcu_set_pointer. Also, a rcu read-side critical section would be required around any usage of per_cpu_call_rcu_data, and the action of tearing down the per-cpu data would require to wait for a quiescent state. So we would basically require that the call_rcu users need to be registered as RCU reader threads. Signed-off-by: Mathieu Desnoyers --- diff --git a/urcu-bp.c b/urcu-bp.c index 2ae3408..4c0ab54 100644 --- a/urcu-bp.c +++ b/urcu-bp.c @@ -39,6 +39,7 @@ #include "urcu/wfqueue.h" #include "urcu/map/urcu-bp.h" #include "urcu/static/urcu-bp.h" +#include "urcu-pointer.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index 0a47d96..d09adb1 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -82,7 +82,10 @@ static struct call_rcu_data *default_call_rcu_data; /* * Pointer to array of pointers to per-CPU call_rcu_data structures - * and # CPUs. + * and # CPUs. per_cpu_call_rcu_data is a RCU-protected pointer to an + * array of RCU-protected pointers to call_rcu_data. call_rcu acts as a + * RCU read-side and reads per_cpu_call_rcu_data and the per-cpu pointer + * without mutex. The call_rcu_mutex protects updates. */ static struct call_rcu_data **per_cpu_call_rcu_data; @@ -109,7 +112,7 @@ static void alloc_cpu_call_rcu_data(void) p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data)); if (p != NULL) { memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data)); - per_cpu_call_rcu_data = p; + rcu_set_pointer(&per_cpu_call_rcu_data, p); } else { if (!warned) { fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n"); @@ -330,13 +333,18 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, * CPU, returning NULL if there is none. We cannot automatically * created it because the platform we are running on might not define * sched_getcpu(). + * + * The call to this function and use of the returned call_rcu_data + * should be protected by RCU read-side lock. */ struct call_rcu_data *get_cpu_call_rcu_data(int cpu) { static int warned = 0; + struct call_rcu_data **pcpu_crdp; - if (per_cpu_call_rcu_data == NULL) + pcpu_crdp = rcu_dereference(per_cpu_call_rcu_data); + if (pcpu_crdp == NULL) return NULL; if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) { fprintf(stderr, "[error] liburcu: get CPU # out of range\n"); @@ -344,7 +352,7 @@ struct call_rcu_data *get_cpu_call_rcu_data(int cpu) } if (cpu < 0 || maxcpus <= cpu) return NULL; - return per_cpu_call_rcu_data[cpu]; + return rcu_dereference(pcpu_crdp[cpu]); } /* @@ -418,7 +426,7 @@ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) return -EEXIST; } - per_cpu_call_rcu_data[cpu] = crdp; + rcu_set_pointer(&per_cpu_call_rcu_data[cpu], crdp); call_rcu_unlock(&call_rcu_mutex); return 0; } @@ -450,6 +458,9 @@ struct call_rcu_data *get_default_call_rcu_data(void) * structure assigned to the CPU on which the thread is running, * followed by the default call_rcu_data structure. If there is not * yet a default call_rcu_data structure, one will be created. + * + * Calls to this function and use of the returned call_rcu_data should + * be protected by RCU read-side lock. */ struct call_rcu_data *get_call_rcu_data(void) { @@ -564,6 +575,8 @@ static void wake_call_rcu_thread(struct call_rcu_data *crdp) * need the first invocation of call_rcu() to be fast, make sure * to create a call_rcu thread first. One way to accomplish this is * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data(). + * + * call_rcu must be called by registered RCU read-side threads. */ void call_rcu(struct rcu_head *head, @@ -573,10 +586,13 @@ void call_rcu(struct rcu_head *head, cds_wfq_node_init(&head->next); head->func = func; + /* Holding rcu read-side lock across use of per-cpu crdp */ + rcu_read_lock(); crdp = get_call_rcu_data(); cds_wfq_enqueue(&crdp->cbs, &head->next); uatomic_inc(&crdp->qlen); wake_call_rcu_thread(crdp); + rcu_read_unlock(); } /* @@ -641,17 +657,37 @@ void call_rcu_data_free(struct call_rcu_data *crdp) void free_all_cpu_call_rcu_data(void) { int cpu; - struct call_rcu_data *crdp; + struct call_rcu_data **crdp; + static int warned = 0; if (maxcpus <= 0) return; + + crdp = malloc(sizeof(*crdp) * maxcpus); + if (!crdp) { + if (!warned) { + fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n"); + } + warned = 1; + } + for (cpu = 0; cpu < maxcpus; cpu++) { - crdp = get_cpu_call_rcu_data(cpu); - if (crdp == NULL) + crdp[cpu] = get_cpu_call_rcu_data(cpu); + if (crdp[cpu] == NULL) continue; set_cpu_call_rcu_data(cpu, NULL); - call_rcu_data_free(crdp); } + /* + * Wait for call_rcu sites acting as RCU readers of the + * call_rcu_data to become quiescent. + */ + synchronize_rcu(); + for (cpu = 0; cpu < maxcpus; cpu++) { + if (crdp[cpu] == NULL) + continue; + call_rcu_data_free(crdp[cpu]); + } + free(crdp); } /* @@ -700,7 +736,7 @@ void call_rcu_after_fork_child(void) /* Cleanup call_rcu_data pointers before use */ maxcpus_reset(); free(per_cpu_call_rcu_data); - per_cpu_call_rcu_data = NULL; + rcu_set_pointer(&per_cpu_call_rcu_data, NULL); thread_call_rcu_data = NULL; /* Dispose of all of the rest of the call_rcu_data structures. */ diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h index e76a844..5ea0c23 100644 --- a/urcu-call-rcu.h +++ b/urcu-call-rcu.h @@ -62,16 +62,28 @@ struct rcu_head { /* * Exported functions */ + +/* + * get_cpu_call_rcu_data should be called with RCU read-side lock held. + * Callers should be registered RCU read-side threads. + */ struct call_rcu_data *get_cpu_call_rcu_data(int cpu); pthread_t get_call_rcu_thread(struct call_rcu_data *crdp); struct call_rcu_data *create_call_rcu_data(unsigned long flags, int cpu_affinity); int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp); struct call_rcu_data *get_default_call_rcu_data(void); +/* + * get_call_rcu_data should be called from registered RCU read-side + * threads. + */ struct call_rcu_data *get_call_rcu_data(void); struct call_rcu_data *get_thread_call_rcu_data(void); void set_thread_call_rcu_data(struct call_rcu_data *crdp); int create_all_cpu_call_rcu_data(unsigned long flags); +/* + * call_rcu should be called from registered RCU read-side threads. + */ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head)); void call_rcu_data_free(struct call_rcu_data *crdp); diff --git a/urcu-qsbr.c b/urcu-qsbr.c index 1adaa94..a59a87a 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -39,6 +39,7 @@ #include "urcu/map/urcu-qsbr.h" #define BUILD_QSBR_LIB #include "urcu/static/urcu-qsbr.h" +#include "urcu-pointer.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE diff --git a/urcu.c b/urcu.c index 20bbf36..77f6888 100644 --- a/urcu.c +++ b/urcu.c @@ -39,6 +39,7 @@ #include "urcu/wfqueue.h" #include "urcu/map/urcu.h" #include "urcu/static/urcu.h" +#include "urcu-pointer.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #undef _LGPL_SOURCE