From: Mathieu Desnoyers Date: Thu, 23 Oct 2014 16:19:31 +0000 (-0400) Subject: workqueue: keep one work item from stealing co-workers X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=a649215929e3af5a0dd26d9fc5bb9ed49e677773;p=urcu.git workqueue: keep one work item from stealing co-workers Takes care of lack of progress issue that could theoretically occur due to a steady work-stealing circular loop amongst co-workers. Keeping a work item hidden from co-workers ensures forward progress. Signed-off-by: Mathieu Desnoyers --- diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index 1292e04..883f41f 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -63,9 +63,13 @@ struct urcu_workqueue { }; struct urcu_worker { + /* Workqueue which can be either used by worker, or stolen. */ struct cds_wfcq_head head; struct cds_wfcq_tail tail; + /* Work belonging to worker. Cannot be stolen. */ + struct urcu_work *own; + struct urcu_wait_node wait_node; /* RCU linked list node of siblings for work stealing. */ struct cds_list_head sibling_node; @@ -133,6 +137,7 @@ void urcu_worker_init(struct urcu_worker *worker, int flags) cds_wfcq_init(&worker->head, &worker->tail); worker->flags = flags; urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING); + worker->own = NULL; } static inline @@ -192,27 +197,80 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, synchronize_rcu(); } -/* - * Try stealing work from siblings when we have nothing to do. - */ static inline -bool ___urcu_steal_work(struct urcu_worker *worker, - struct urcu_worker *sibling) +bool ___urcu_grab_work(struct urcu_worker *worker, + cds_wfcq_head_ptr_t src_head, + struct cds_wfcq_tail *src_tail, + bool steal) { enum cds_wfcq_ret splice_ret; + struct cds_wfcq_head tmp_head; + struct cds_wfcq_tail tmp_tail; + struct cds_wfcq_node *node; /* - * Don't bother grabbing the sibling queue lock if it is empty. + * Don't bother grabbing the src queue lock if it is empty. */ - if (cds_wfcq_empty(&sibling->head, &sibling->tail)) + if (cds_wfcq_empty(src_head, src_tail)) return false; + cds_wfcq_init(&tmp_head, &tmp_tail); + + /* Ensure that we preserve FIFO work order. */ + assert(!steal || worker->own == NULL); + + /* Splice to temporary queue. */ + if (steal) + cds_wfcq_dequeue_lock(src_head.h, src_tail); + splice_ret = __cds_wfcq_splice_blocking(&tmp_head, + &tmp_tail, + src_head, + src_tail); + if (steal) + cds_wfcq_dequeue_unlock(src_head.h, src_tail); + if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY) + return false; + + /* + * Keep one work entry for ourself. This ensures forward + * progress amongst stealing co-workers. This also ensures that + * when a worker grab some work from the global workqueue, it + * will have at least one work item to deal with. + */ + if (worker->own == NULL) { + if (!steal) { + /* + * Try to grab own work from worker workqueue to + * preserve FIFO order. + */ + node = cds_wfcq_dequeue_blocking(&worker->head, + &worker->tail); + if (node) + goto got_node; + } + node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail); + assert(node != NULL); +got_node: + worker->own = caa_container_of(node, struct urcu_work, node); + } + + /* Splice into worker workqueue. */ splice_ret = cds_wfcq_splice_blocking(&worker->head, &worker->tail, - &sibling->head, - &sibling->tail); + &tmp_head, + &tmp_tail); /* Ensure that we preserve FIFO work order. */ - assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); - return splice_ret != CDS_WFCQ_RET_SRC_EMPTY; + assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); + return true; +} + +/* + * Try stealing work from siblings when we have nothing to do. + */ +static inline +bool ___urcu_steal_work(struct urcu_worker *worker, + struct urcu_worker *sibling) +{ + return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1); } static inline @@ -302,15 +360,11 @@ enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue, struct urcu_worker *worker) { enum cds_wfcq_ret wfcq_ret; + bool has_work; - wfcq_ret = __cds_wfcq_splice_blocking(&worker->head, - &worker->tail, - &queue->head, - &queue->tail); + has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0); /* Don't wait if we have work to do. */ - if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY - || !cds_wfcq_empty(&worker->head, - &worker->tail)) + if (has_work || !cds_wfcq_empty(&worker->head, &worker->tail)) goto do_work; /* Try to steal work from sibling instead of blocking */ if (__urcu_steal_work(queue, worker)) @@ -375,6 +429,14 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) { struct cds_wfcq_node *node; + if (worker->own) { + struct urcu_work *work; + + /* Process our own work entry. */ + work = worker->own; + worker->own = NULL; + return work; + } /* * If we are registered for work stealing, we need to dequeue * safely against siblings.