From: Mathieu Desnoyers Date: Thu, 23 Oct 2014 22:28:09 +0000 (-0400) Subject: workqueue: add approximate upper bound to queue length X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=8a2c74fefc366e219bc09c4a4ebd7bacd5cb83e5;p=urcu.git workqueue: add approximate upper bound to queue length Signed-off-by: Mathieu Desnoyers --- diff --git a/tests/benchmark/test_urcu_workqueue.c b/tests/benchmark/test_urcu_workqueue.c index d12d9e8..4c07086 100644 --- a/tests/benchmark/test_urcu_workqueue.c +++ b/tests/benchmark/test_urcu_workqueue.c @@ -60,6 +60,8 @@ static unsigned long duration; static unsigned long dispatch_delay_loops; +static unsigned long max_queue_len; + static inline void loop_sleep(unsigned long loops) { while (loops-- != 0) @@ -153,10 +155,19 @@ static void *thr_dispatcher(void *_count) for (;;) { struct test_work *work = malloc(sizeof(*work)); + enum urcu_enqueue_ret ret; + if (!work) goto fail; - printf_verbose("queue work %p\n", work); - urcu_queue_work(&workqueue, &work->w); +retry: + printf_verbose("attempt queue work %p\n", work); + ret = urcu_queue_work(&workqueue, &work->w); + if (ret == URCU_ENQUEUE_FULL) { + printf_verbose("queue work %p (queue full)\n", work); + (void) poll(NULL, 0, 10); + goto retry; + } + printf_verbose("queue work %p (ok)\n", work); URCU_TLS(nr_enqueues)++; if (caa_unlikely(dispatch_delay_loops)) @@ -187,8 +198,8 @@ static void *thr_worker(void *_count) set_affinity(); rcu_register_thread(); - urcu_worker_init(&worker, URCU_WORKER_STEAL); - //urcu_worker_init(&worker, 0); + urcu_worker_init(&workqueue, &worker, URCU_WORKER_STEAL); + //urcu_worker_init(&workqueue, &worker, 0); urcu_worker_register(&workqueue, &worker); while (!test_go) @@ -199,7 +210,7 @@ static void *thr_worker(void *_count) for (;;) { enum urcu_accept_ret ret; - ret = urcu_accept_work(&workqueue, &worker); + ret = urcu_accept_work(&worker); if (ret == URCU_ACCEPT_SHUTDOWN) break; for (;;) { @@ -239,6 +250,7 @@ static void show_usage(int argc, char **argv) printf(" [-v] (verbose output)\n"); printf(" [-a cpu#] [-a cpu#]... (affinity)\n"); printf(" [-w] Wait for worker to empty stack\n"); + printf(" [-m len] (Max queue length. 0 means infinite.))\n"); printf("\n"); } @@ -289,6 +301,13 @@ int main(int argc, char **argv) use_affinity = 1; printf_verbose("Adding CPU %d affinity\n", a); break; + case 'm': + if (argc < i + 2) { + show_usage(argc, argv); + return -1; + } + max_queue_len = atol(argv[++i]); + break; case 'c': if (argc < i + 2) { show_usage(argc, argv); @@ -326,7 +345,7 @@ int main(int argc, char **argv) tid_worker = calloc(nr_workers, sizeof(*tid_worker)); count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher)); count_worker = calloc(nr_workers, sizeof(*count_worker)); - urcu_workqueue_init(&workqueue); + urcu_workqueue_init(&workqueue, max_queue_len); next_aff = 0; @@ -380,9 +399,9 @@ int main(int argc, char **argv) printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu " "work_loops %lu nr_workers %3u " - "nr_enqueues %12llu nr_dequeues %12llu\n", + "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu\n", argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops, - nr_workers, tot_enqueues, tot_dequeues); + nr_workers, tot_enqueues, tot_dequeues, max_queue_len); free(count_dispatcher); free(count_worker); free(tid_dispatcher); diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index a2bbd90..13d9278 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -36,6 +36,11 @@ enum urcu_accept_ret { URCU_ACCEPT_SHUTDOWN = 1, }; +enum urcu_enqueue_ret { + URCU_ENQUEUE_OK = 0, + URCU_ENQUEUE_FULL = 1, +}; + /* * We use RCU to steal work from siblings. Therefore, one of RCU flavors * need to be included before this header. All worker that participate @@ -59,6 +64,9 @@ struct urcu_workqueue { struct cds_list_head sibling_head; pthread_mutex_t sibling_lock; /* Protect sibling list updates */ + /* Maximum number of work entries (approximate). 0 means infinite. */ + unsigned long nr_work_max; + unsigned long nr_work; /* Current number of work items */ bool shutdown; /* Shutdown performed */ }; @@ -73,6 +81,7 @@ struct urcu_worker { struct urcu_wait_node wait_node; /* RCU linked list node of siblings for work stealing. */ struct cds_list_head sibling_node; + struct urcu_workqueue *queue; int flags; /* enum urcu_worker_flags */ }; @@ -81,20 +90,32 @@ enum urcu_worker_flags { }; static inline -void urcu_workqueue_init(struct urcu_workqueue *queue) +void urcu_workqueue_init(struct urcu_workqueue *queue, + unsigned long max_queue_len) { __cds_wfcq_init(&queue->head, &queue->tail); urcu_wait_queue_init(&queue->waitqueue); CDS_INIT_LIST_HEAD(&queue->sibling_head); pthread_mutex_init(&queue->sibling_lock, NULL); + queue->nr_work_max = max_queue_len; + queue->nr_work = 0; queue->shutdown = false; } static inline -void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) +enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue, + struct urcu_work *work) { bool was_empty; - + unsigned long nr_work_max; + + nr_work_max = queue->nr_work_max; + if (nr_work_max) { + /* Approximate max queue size. */ + if (uatomic_read(&queue->nr_work) >= nr_work_max) + return URCU_ENQUEUE_FULL; + uatomic_inc(&queue->nr_work); + } cds_wfcq_node_init(&work->node); /* Enqueue work. */ @@ -118,6 +139,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) (void) urcu_dequeue_wake_single(&queue->waitqueue); rcu_read_unlock(); /* Protect stack dequeue */ } + return URCU_ENQUEUE_OK; } static inline @@ -133,13 +155,15 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) } static inline -void urcu_worker_init(struct urcu_worker *worker, int flags) +void urcu_worker_init(struct urcu_workqueue *queue, + struct urcu_worker *worker, int flags) { cds_wfcq_init(&worker->head, &worker->tail); worker->flags = flags; urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING); worker->own = NULL; worker->wait_node.node.next = NULL; + worker->queue = queue; } static inline @@ -358,9 +382,9 @@ end: } static inline -enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue, - struct urcu_worker *worker) +enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker) { + struct urcu_workqueue *queue = worker->queue; enum cds_wfcq_ret wfcq_ret; bool has_work; @@ -430,15 +454,15 @@ do_work: static inline struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) { + struct urcu_workqueue *queue = worker->queue; struct cds_wfcq_node *node; + struct urcu_work *work; if (worker->own) { - struct urcu_work *work; - /* Process our own work entry. */ work = worker->own; worker->own = NULL; - return work; + goto end; } /* * If we are registered for work stealing, we need to dequeue @@ -459,7 +483,11 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) } if (!node) return NULL; - return caa_container_of(node, struct urcu_work, node); + work = caa_container_of(node, struct urcu_work, node); +end: + if (queue->nr_work_max) + uatomic_dec(&queue->nr_work); + return work; } static inline