* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
+#define _LGPL_SOURCE
/* Use the urcu symbols to select the appropriate rcu flavor at link time */
#include "urcu.h"
+
+#undef _LGPL_SOURCE
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#include "urcu/rculfqueue.h"
#include "urcu/rculfqueue-static.h"
_cds_lfq_node_init_rcu(node);
}
-void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+ void (*release)(struct urcu_ref *ref))
{
- _cds_lfq_init_rcu(q);
+ _cds_lfq_init_rcu(q, release);
}
void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node)
}
struct cds_lfq_node_rcu *
-cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *))
+cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
{
- return _cds_lfq_dequeue_rcu(q, release);
+ return _cds_lfq_dequeue_rcu(q);
}
*/
/* Use the urcu symbols to select the appropriate rcu flavor at link time */
+#define _LGPL_SOURCE
#include "urcu.h"
+
+#undef _LGPL_SOURCE
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#include "urcu/rculfstack.h"
#include "urcu/rculfstack-static.h"
if (!node)
goto fail;
cds_lfq_node_init_rcu(node);
+ rcu_read_lock();
cds_lfq_enqueue_rcu(&q, node);
+ rcu_read_unlock();
nr_successful_enqueues++;
if (unlikely(wdelay))
}
-static void rcu_release_node(struct urcu_ref *ref)
+static void rcu_free_node(struct rcu_head *head)
{
- struct cds_lfq_node_rcu *node = caa_container_of(ref, struct cds_lfq_node_rcu, ref);
- defer_rcu(free, node);
- //synchronize_rcu();
- //free(node);
+ struct cds_lfq_node_rcu *node =
+ caa_container_of(head, struct cds_lfq_node_rcu, rcu_head);
+ free(node);
+}
+
+static void ref_release_node(struct urcu_ref *ref)
+{
+ struct cds_lfq_node_rcu *node =
+ caa_container_of(ref, struct cds_lfq_node_rcu, ref);
+ call_rcu(&node->rcu_head, rcu_free_node);
}
void *thr_dequeuer(void *_count)
cmm_smp_mb();
for (;;) {
- struct cds_lfq_node_rcu *node = cds_lfq_dequeue_rcu(&q,
- rcu_release_node);
+ struct cds_lfq_node_rcu *node;
+
+ rcu_read_lock();
+ node = cds_lfq_dequeue_rcu(&q);
+ rcu_read_unlock();
if (node) {
- urcu_ref_put(&node->ref, rcu_release_node);
+ urcu_ref_put(&node->ref, ref_release_node);
nr_successful_dequeues++;
}
struct cds_lfq_node_rcu *node;
do {
- node = cds_lfq_dequeue_rcu(q, release_node);
+ rcu_read_lock();
+ node = cds_lfq_dequeue_rcu(q);
+ rcu_read_unlock();
if (node) {
urcu_ref_put(&node->ref, release_node);
(*nr_dequeues)++;
tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers);
count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers);
- cds_lfq_init_rcu(&q);
+ cds_lfq_init_rcu(&q, ref_release_node);
next_aff = 0;
if (!node)
goto fail;
cds_lfs_node_init_rcu(node);
+ /* No rcu read-side is needed for push */
cds_lfs_push_rcu(&s, node);
nr_successful_enqueues++;
cmm_smp_mb();
for (;;) {
- struct cds_lfs_node_rcu *node = cds_lfs_pop_rcu(&s);
+ struct cds_lfs_node_rcu *node;
+ rcu_read_lock();
+ node = cds_lfs_pop_rcu(&s);
+ rcu_read_unlock();
if (node) {
defer_rcu(free, node);
nr_successful_dequeues++;
}
-
nr_dequeues++;
if (unlikely(!test_duration_dequeue()))
break;
urcu_ref_init(&node->ref);
}
-void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+ void (*release)(struct urcu_ref *ref))
{
_cds_lfq_node_init_rcu(&q->init);
/* Make sure the initial node is never freed. */
urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF);
q->head = q->tail = &q->init;
+ q->release = release;
}
-void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node)
+/*
+ * Should be called under rcu read lock critical section.
+ */
+void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
+ struct cds_lfq_node_rcu *node)
{
urcu_ref_get(&node->ref);
+ node->queue = q;
/*
* uatomic_cmpxchg() implicit memory barrier orders earlier stores to
for (;;) {
struct cds_lfq_node_rcu *tail, *next;
- rcu_read_lock();
tail = rcu_dereference(q->tail);
/*
* Typically expect tail->next to be NULL.
* us to it, that's fine).
*/
(void) uatomic_cmpxchg(&q->tail, tail, node);
- rcu_read_unlock();
return;
} else {
/*
* further and retry.
*/
(void) uatomic_cmpxchg(&q->tail, tail, next);
- rcu_read_unlock();
continue;
}
}
}
/*
- * The entry returned by dequeue must be taken care of by doing a urcu_ref_put,
- * which calls the release primitive when the reference count drops to zero. A
- * grace period must be waited after execution of the release callback before
- * performing the actual memory reclamation or modifying the cds_lfq_node_rcu
- * structure.
+ * Should be called under rcu read lock critical section.
+ *
+ * The entry returned by dequeue must be taken care of by doing a
+ * urcu_ref_put after a grace period passes.
+ *
* In other words, the entry lfq node returned by dequeue must not be
* modified/re-used/freed until the reference count reaches zero and a grace
- * period has elapsed (after the refcount reached 0).
+ * period has elapsed.
*/
struct cds_lfq_node_rcu *
-_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *))
+_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
{
for (;;) {
struct cds_lfq_node_rcu *head, *next;
- rcu_read_lock();
head = rcu_dereference(q->head);
next = rcu_dereference(head->next);
if (next) {
if (uatomic_cmpxchg(&q->head, head, next) == head) {
- rcu_read_unlock();
- urcu_ref_put(&head->ref, release);
+ urcu_ref_put(&head->ref, q->release);
return next;
} else {
/* Concurrently pushed, retry */
- rcu_read_unlock();
continue;
}
} else {
/* Empty */
- rcu_read_unlock();
return NULL;
}
}
* which point their reference count will be decremented.
*/
+struct cds_lfq_queue_rcu;
+
struct cds_lfq_node_rcu {
struct cds_lfq_node_rcu *next;
struct urcu_ref ref;
+ struct cds_lfq_queue_rcu *queue;
+ struct rcu_head rcu_head;
};
struct cds_lfq_queue_rcu {
struct cds_lfq_node_rcu *head, *tail;
struct cds_lfq_node_rcu init; /* Dummy initialization node */
+ void (*release)(struct urcu_ref *ref);
};
#ifdef _LGPL_SOURCE
#else /* !_LGPL_SOURCE */
extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node);
-extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q);
-extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node);
+extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+ void (*release)(struct urcu_ref *ref));
+
+/*
+ * Should be called under rcu read lock critical section.
+ */
+extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
+ struct cds_lfq_node_rcu *node);
/*
- * The entry returned by dequeue must be taken care of by doing a urcu_ref_put,
- * which calls the release primitive when the reference count drops to zero. A
- * grace period must be waited after execution of the release callback before
- * performing the actual memory reclamation or modifying the cds_lfq_node_rcu
- * structure.
+ * Should be called under rcu read lock critical section.
+ *
+ * The entry returned by dequeue must be taken care of by doing a
+ * urcu_delayed_ref_put, which calls the release primitive after the
+ * reference count drops to zero _and_ a following grace period passes.
+ *
* In other words, the entry lfq node returned by dequeue must not be
* modified/re-used/freed until the reference count reaches zero and a grace
* period has elapsed (after the refcount reached 0).
*/
extern struct cds_lfq_node_rcu *
-cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *));
+cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q);
#endif /* !_LGPL_SOURCE */
}
/*
+ * Should be called under rcu read-side lock.
+ *
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfs_node_rcu structure.
* Returns NULL if stack is empty.
for (;;) {
struct cds_lfs_node_rcu *head;
- rcu_read_lock();
head = rcu_dereference(s->head);
if (head) {
struct cds_lfs_node_rcu *next = rcu_dereference(head->next);
if (uatomic_cmpxchg(&s->head, head, next) == head) {
- rcu_read_unlock();
return head;
} else {
/* Concurrent modification. Retry. */
- rcu_read_unlock();
continue;
}
} else {
/* Empty stack */
- rcu_read_unlock();
return NULL;
}
}
extern void cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, struct cds_lfs_node_rcu *node);
/*
+ * Should be called under rcu read lock critical section.
+ *
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfs_node_rcu structure.
* Returns NULL if stack is empty.