};
struct urcu_worker {
+ /* Workqueue which can be either used by worker, or stolen. */
struct cds_wfcq_head head;
struct cds_wfcq_tail tail;
+ /* Work belonging to worker. Cannot be stolen. */
+ struct urcu_work *own;
+
struct urcu_wait_node wait_node;
/* RCU linked list node of siblings for work stealing. */
struct cds_list_head sibling_node;
cds_wfcq_init(&worker->head, &worker->tail);
worker->flags = flags;
urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
+ worker->own = NULL;
}
static inline
synchronize_rcu();
}
-/*
- * Try stealing work from siblings when we have nothing to do.
- */
static inline
-bool ___urcu_steal_work(struct urcu_worker *worker,
- struct urcu_worker *sibling)
+bool ___urcu_grab_work(struct urcu_worker *worker,
+ cds_wfcq_head_ptr_t src_head,
+ struct cds_wfcq_tail *src_tail,
+ bool steal)
{
enum cds_wfcq_ret splice_ret;
+ struct cds_wfcq_head tmp_head;
+ struct cds_wfcq_tail tmp_tail;
+ struct cds_wfcq_node *node;
/*
- * Don't bother grabbing the sibling queue lock if it is empty.
+ * Don't bother grabbing the src queue lock if it is empty.
*/
- if (cds_wfcq_empty(&sibling->head, &sibling->tail))
+ if (cds_wfcq_empty(src_head, src_tail))
return false;
+ cds_wfcq_init(&tmp_head, &tmp_tail);
+
+ /* Ensure that we preserve FIFO work order. */
+ assert(!steal || worker->own == NULL);
+
+ /* Splice to temporary queue. */
+ if (steal)
+ cds_wfcq_dequeue_lock(src_head.h, src_tail);
+ splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
+ &tmp_tail,
+ src_head,
+ src_tail);
+ if (steal)
+ cds_wfcq_dequeue_unlock(src_head.h, src_tail);
+ if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
+ return false;
+
+ /*
+ * Keep one work entry for ourself. This ensures forward
+ * progress amongst stealing co-workers. This also ensures that
+ * when a worker grab some work from the global workqueue, it
+ * will have at least one work item to deal with.
+ */
+ if (worker->own == NULL) {
+ if (!steal) {
+ /*
+ * Try to grab own work from worker workqueue to
+ * preserve FIFO order.
+ */
+ node = cds_wfcq_dequeue_blocking(&worker->head,
+ &worker->tail);
+ if (node)
+ goto got_node;
+ }
+ node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
+ assert(node != NULL);
+got_node:
+ worker->own = caa_container_of(node, struct urcu_work, node);
+ }
+
+ /* Splice into worker workqueue. */
splice_ret = cds_wfcq_splice_blocking(&worker->head,
&worker->tail,
- &sibling->head,
- &sibling->tail);
+ &tmp_head,
+ &tmp_tail);
/* Ensure that we preserve FIFO work order. */
- assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
- return splice_ret != CDS_WFCQ_RET_SRC_EMPTY;
+ assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
+ return true;
+}
+
+/*
+ * Try stealing work from siblings when we have nothing to do.
+ */
+static inline
+bool ___urcu_steal_work(struct urcu_worker *worker,
+ struct urcu_worker *sibling)
+{
+ return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
}
static inline
struct urcu_worker *worker)
{
enum cds_wfcq_ret wfcq_ret;
+ bool has_work;
- wfcq_ret = __cds_wfcq_splice_blocking(&worker->head,
- &worker->tail,
- &queue->head,
- &queue->tail);
+ has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
/* Don't wait if we have work to do. */
- if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
- || !cds_wfcq_empty(&worker->head,
- &worker->tail))
+ if (has_work || !cds_wfcq_empty(&worker->head, &worker->tail))
goto do_work;
/* Try to steal work from sibling instead of blocking */
if (__urcu_steal_work(queue, worker))
{
struct cds_wfcq_node *node;
+ if (worker->own) {
+ struct urcu_work *work;
+
+ /* Process our own work entry. */
+ work = worker->own;
+ worker->own = NULL;
+ return work;
+ }
/*
* If we are registered for work stealing, we need to dequeue
* safely against siblings.