From: Paul E. McKenney Date: Wed, 9 Mar 2011 02:51:54 +0000 (-0500) Subject: Provide cleanup interfaces for per-CPU and per-thread call_rcu threads X-Git-Tag: v0.6.0~50 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=7106ddf86905188b9e92ee7000165c1af22f793e;p=urcu.git Provide cleanup interfaces for per-CPU and per-thread call_rcu threads Signed-off-by: Paul E. McKenney Signed-off-by: Mathieu Desnoyers --- diff --git a/tests/rcutorture.h b/tests/rcutorture.h index b42b8ab..66fdd7f 100644 --- a/tests/rcutorture.h +++ b/tests/rcutorture.h @@ -119,6 +119,7 @@ int goflag __attribute__((__aligned__(CAA_CACHE_LINE_SIZE))) = GOFLAG_INIT; void *rcu_read_perf_test(void *arg) { + struct call_rcu_data *crdp; int i; int me = (long)arg; long long n_reads_local = 0; @@ -141,6 +142,9 @@ void *rcu_read_perf_test(void *arg) } __get_thread_var(n_reads_pt) += n_reads_local; put_thread_offline(); + crdp = get_thread_call_rcu_data(); + set_thread_call_rcu_data(NULL); + call_rcu_data_free(crdp); rcu_unregister_thread(); return (NULL); @@ -204,6 +208,10 @@ void perftestrun(int nthreads, int nreaders, int nupdaters) (double)n_reads), ((duration * 1000*1000*1000.*(double)nupdaters) / (double)n_updates)); + if (get_cpu_call_rcu_data(0)) { + fprintf(stderr, "Deallocating per-CPU call_rcu threads.\n"); + free_all_cpu_call_rcu_data(); + } exit(0); } @@ -436,6 +444,10 @@ void stresstest(int nreaders) printf(" %lld", sum); } printf("\n"); + if (get_cpu_call_rcu_data(0)) { + fprintf(stderr, "Deallocating per-CPU call_rcu threads.\n"); + free_all_cpu_call_rcu_data(); + } exit(0); } diff --git a/urcu-call-rcu.c b/urcu-call-rcu.c index aa1324e..bb56dbb 100644 --- a/urcu-call-rcu.c +++ b/urcu-call-rcu.c @@ -183,6 +183,8 @@ static void *call_rcu_thread(void *arg) } while (cbs != NULL); uatomic_sub(&crdp->qlen, cbcount); } + if (crdp->flags & URCU_CALL_RCU_STOP) + break; if (crdp->flags & URCU_CALL_RCU_RT) poll(NULL, 0, 10); else { @@ -201,7 +203,10 @@ static void *call_rcu_thread(void *arg) call_rcu_unlock(&crdp->mtx); } } - return NULL; /* NOTREACHED */ + call_rcu_lock(&crdp->mtx); + crdp->flags |= URCU_CALL_RCU_STOPPED; + call_rcu_unlock(&crdp->mtx); + return NULL; } /* @@ -297,6 +302,11 @@ struct call_rcu_data *create_call_rcu_data(unsigned long flags) /* * Set the specified CPU to use the specified call_rcu_data structure. + * + * Use NULL to remove a CPU's call_rcu_data structure, but it is + * the caller's responsibility to dispose of the removed structure. + * Use get_cpu_call_rcu_data() to obtain a pointer to the old structure + * (prior to NULLing it out, of course). */ int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) @@ -385,6 +395,11 @@ struct call_rcu_data *get_thread_call_rcu_data(void) * Set this task's call_rcu_data structure as specified, regardless * of whether or not this task already had one. (This allows switching * to and from real-time call_rcu threads, for example.) + * + * Use NULL to remove a thread's call_rcu_data structure, but it is + * the caller's responsibility to dispose of the removed structure. + * Use get_thread_call_rcu_data() to obtain a pointer to the old structure + * (prior to NULLing it out, of course). */ void set_thread_call_rcu_data(struct call_rcu_data *crdp) @@ -436,6 +451,24 @@ int create_all_cpu_call_rcu_data(unsigned long flags) return 0; } +/* + * Wake up the call_rcu thread corresponding to the specified + * call_rcu_data structure. + */ +static void wake_call_rcu_thread(struct call_rcu_data *crdp) +{ + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) { + call_rcu_lock(&crdp->mtx); + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) { + if (pthread_cond_signal(&crdp->cond) != 0) { + perror("pthread_cond_signal"); + exit(-1); + } + } + call_rcu_unlock(&crdp->mtx); + } +} + /* * Schedule a function to be invoked after a following grace period. * This is the only function that must be called -- the others are @@ -459,14 +492,102 @@ void call_rcu(struct rcu_head *head, crdp = get_call_rcu_data(); cds_wfq_enqueue(&crdp->cbs, &head->next); uatomic_inc(&crdp->qlen); - if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) { + wake_call_rcu_thread(crdp); +} + +/* + * Free up the specified call_rcu_data structure, terminating the + * associated call_rcu thread. The caller must have previously + * removed the call_rcu_data structure from per-thread or per-CPU + * usage. For example, set_cpu_call_rcu_data(cpu, NULL) for per-CPU + * call_rcu_data structures or set_thread_call_rcu_data(NULL) for + * per-thread call_rcu_data structures. + * + * We silently refuse to free up the default call_rcu_data structure + * because that is where we put any leftover callbacks. Note that + * the possibility of self-spawning callbacks makes it impossible + * to execute all the callbacks in finite time without putting any + * newly spawned callbacks somewhere else. The "somewhere else" of + * last resort is the default call_rcu_data structure. + * + * We also silently refuse to free NULL pointers. This simplifies + * the calling code. + */ +void call_rcu_data_free(struct call_rcu_data *crdp) +{ + struct cds_wfq_node *cbs; + struct cds_wfq_node **cbs_tail; + struct cds_wfq_node **cbs_endprev; + + if (crdp == NULL || crdp == default_call_rcu_data) { + return; + } + if ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) { call_rcu_lock(&crdp->mtx); - if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) { - if (pthread_cond_signal(&crdp->cond) != 0) { - perror("pthread_cond_signal"); - exit(-1); - } - } + crdp->flags |= URCU_CALL_RCU_STOP; call_rcu_unlock(&crdp->mtx); + wake_call_rcu_thread(crdp); + while ((crdp->flags & URCU_CALL_RCU_STOPPED) == 0) + poll(NULL, 0, 1); + } + if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { + while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) + poll(NULL, 0, 1); + _CMM_STORE_SHARED(crdp->cbs.head, NULL); + cbs_tail = (struct cds_wfq_node **) + uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); + cbs_endprev = (struct cds_wfq_node **) + uatomic_xchg(&default_call_rcu_data, cbs_tail); + *cbs_endprev = cbs; + uatomic_add(&default_call_rcu_data->qlen, + uatomic_read(&crdp->qlen)); + cds_list_del(&crdp->list); + free(crdp); + } +} + +/* + * Clean up all the per-CPU call_rcu threads. + */ +void free_all_cpu_call_rcu_data(void) +{ + int cpu; + struct call_rcu_data *crdp; + + if (maxcpus <= 0) + return; + for (cpu = 0; cpu < maxcpus; cpu++) { + crdp = get_cpu_call_rcu_data(cpu); + if (crdp == NULL) + continue; + set_cpu_call_rcu_data(cpu, NULL); + call_rcu_data_free(crdp); + } +} + +/* + * Clean up call_rcu data structures in the child of a successful fork() + * that is not followed by exec(). + */ +void call_rcu_after_fork_child(void) +{ + struct call_rcu_data *crdp; + + /* + * Allocate a new default call_rcu_data structure in order + * to get a working call_rcu thread to go with it. + */ + default_call_rcu_data = NULL; + (void)get_default_call_rcu_data(); + + /* Dispose of all of the rest of the call_rcu_data structures. */ + while (call_rcu_data_list.next != call_rcu_data_list.prev) { + crdp = cds_list_entry(call_rcu_data_list.prev, + struct call_rcu_data, list); + if (crdp == default_call_rcu_data) + crdp = cds_list_entry(crdp->list.prev, + struct call_rcu_data, list); + crdp->flags = URCU_CALL_RCU_STOPPED; + call_rcu_data_free(crdp); } } diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h index f965911..a841b88 100644 --- a/urcu-call-rcu.h +++ b/urcu-call-rcu.h @@ -46,6 +46,8 @@ struct call_rcu_data; #define URCU_CALL_RCU_RT 0x1 #define URCU_CALL_RCU_RUNNING 0x2 +#define URCU_CALL_RCU_STOP 0x4 +#define URCU_CALL_RCU_STOPPED 0x8 /* * The rcu_head data structure is placed in the structure to be freed @@ -71,6 +73,9 @@ void set_thread_call_rcu_data(struct call_rcu_data *crdp); int create_all_cpu_call_rcu_data(unsigned long flags); void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head)); +void call_rcu_data_free(struct call_rcu_data *crdp); +void free_all_cpu_call_rcu_data(void); +void call_rcu_after_fork_child(void); #ifdef __cplusplus }