This reverts commit
7618919ae496bda84a2efa4f2ad0abe569892a9e.
Rationale:
I thought about it some more, and had discussions with various people,
and there are a few reasons to go for a scheme where rcu read lock
should be taken by the caller, and to pass call_rcu as a parameter to
the data structure init function:
A) The advantage, as Paul E. McKenney pointed out, is that one single .so
is enough to support all RCU flavors. Very convenient for external data
structure containers.
B) It clearly documents where rcu read-side locks are needed, so the user
keep control and in-depth understanding of their read-side locks.
C) When multiple API functions that require RCU read-side lock to be
held (sometimes even the same lock) throughout a sequence of API
calls, we have no choice but to let the caller hold the read-side
lock.
D) Due to support of multiple nesting of rcu read-side lock, any
"improvement" we could get by releasing the read-side lock in
retry loops would vanish in the cases where we are called within
nested C.S..
E) If a library uses synchronize_rcu, this should be clearly documented,
and even frowned upon, because this involves important limitations on
the design of the caller, and important performance hit. There are
usually ways to reach the same result through use of call_rcu, which
should really be used thoroughout these libraries.
F) It clearly documents when a data structure needs to use call_rcu
internally.
G) Some very early benchmark results show that there is indeed not
much performance gain to achieve by inlining call_rcu, even if it is
a version with a cache for the "call_rcu structure" lookup
(per-cpu/per-thread/global). So passing it as a parameter to
the data structure init function should be fine, even in cases
where it is called very often.
H) For use-cases where applications would like to use more than one
RCU flavor concurrently (which is now supported), leaving management
of RCU read-side C.S. to the reader allows the application to take
more than one RCU read-side lock across API calls. It also lets the
application specify its own call_rcu function that could handle more
than one RCU flavor.
So for all these reasons, I reverting back to the API we have in our
last release (0.6.4).
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
_cds_lfq_node_init_rcu(node);
}
-void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+ void queue_call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)))
{
- _cds_lfq_init_rcu(q);
+ _cds_lfq_init_rcu(q, queue_call_rcu);
}
int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
if (!node)
goto fail;
cds_lfq_node_init_rcu(node);
+ rcu_read_lock();
cds_lfq_enqueue_rcu(&q, node);
+ rcu_read_unlock();
nr_successful_enqueues++;
if (unlikely(wdelay))
for (;;) {
struct cds_lfq_node_rcu *node;
+ rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q);
+ rcu_read_unlock();
+
if (node) {
defer_rcu(free, node);
nr_successful_dequeues++;
struct cds_lfq_node_rcu *node;
do {
+ rcu_read_lock();
node = cds_lfq_dequeue_rcu(q);
+ rcu_read_unlock();
if (node) {
free(node); /* no more concurrent access */
(*nr_dequeues)++;
tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers);
count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers);
- cds_lfq_init_rcu(&q);
+ cds_lfq_init_rcu(&q, call_rcu);
next_aff = 0;
for (;;) {
struct cds_lfs_node_rcu *node;
+ rcu_read_lock();
node = cds_lfs_pop_rcu(&s);
+ rcu_read_unlock();
if (node) {
defer_rcu(free, node);
nr_successful_dequeues++;
struct cds_lfq_queue_rcu {
struct cds_lfq_node_rcu *head, *tail;
+ void (*queue_call_rcu)(struct rcu_head *head,
+ void (*func)(struct rcu_head *head));
};
#ifdef _LGPL_SOURCE
#else /* !_LGPL_SOURCE */
extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node);
-extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q);
+extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+ void queue_call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)));
/*
* The queue should be emptied before calling destroy.
*
extern int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q);
/*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
*/
extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
struct cds_lfq_node_rcu *node);
/*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
*
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfq_node_rcu structure.
struct cds_lfs_node_rcu *node);
/*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
*
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfs_node_rcu structure.
assert(node->dummy);
dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
- call_rcu(&dummy->head, free_dummy_cb);
+ dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb);
}
static inline
}
static inline
-void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+ void queue_call_rcu(struct rcu_head *head,
+ void (*func)(struct rcu_head *head)))
{
q->tail = make_dummy(q, NULL);
q->head = q->tail;
+ q->queue_call_rcu = queue_call_rcu;
}
/*
}
/*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
*/
static inline
void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
for (;;) {
struct cds_lfq_node_rcu *tail, *next;
- rcu_read_lock();
tail = rcu_dereference(q->tail);
next = uatomic_cmpxchg(&tail->next, NULL, node);
if (next == NULL) {
* enqueue might beat us to it, that's fine).
*/
(void) uatomic_cmpxchg(&q->tail, tail, node);
- rcu_read_unlock();
return;
} else {
/*
* Help moving tail further and retry.
*/
(void) uatomic_cmpxchg(&q->tail, tail, next);
- rcu_read_unlock();
continue;
}
}
}
/*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
*
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfq_node_rcu structure.
for (;;) {
struct cds_lfq_node_rcu *head, *next;
- rcu_read_lock();
head = rcu_dereference(q->head);
next = rcu_dereference(head->next);
- if (head->dummy && next == NULL) {
- rcu_read_unlock();
+ if (head->dummy && next == NULL)
return NULL; /* empty */
- }
/*
* We never, ever allow dequeue to get to a state where
* the queue is empty (we need at least one node in the
enqueue_dummy(q);
next = rcu_dereference(head->next);
}
- if (uatomic_cmpxchg(&q->head, head, next) != head) {
- rcu_read_unlock();
+ if (uatomic_cmpxchg(&q->head, head, next) != head)
continue; /* Concurrently pushed. */
- }
if (head->dummy) {
/* Free dummy after grace period. */
rcu_free_dummy(head);
- rcu_read_unlock();
continue; /* try again */
}
- rcu_read_unlock();
return head;
}
}
}
/*
- * Acts as a RCU reader.
+ * Should be called under rcu read-side lock.
*
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfs_node_rcu structure.
for (;;) {
struct cds_lfs_node_rcu *head;
- rcu_read_lock();
head = rcu_dereference(s->head);
if (head) {
struct cds_lfs_node_rcu *next = rcu_dereference(head->next);
if (uatomic_cmpxchg(&s->head, head, next) == head) {
- rcu_read_unlock();
return head;
} else {
/* Concurrent modification. Retry. */
- rcu_read_unlock();
continue;
}
} else {
/* Empty stack */
- rcu_read_unlock();
return NULL;
}
}