]> git.lttng.org Git - userspace-rcu.git/commitdiff
workqueue: add approximate upper bound to queue length
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 22:28:09 +0000 (18:28 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 23 Oct 2014 22:28:09 +0000 (18:28 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
tests/benchmark/test_urcu_workqueue.c
urcu/workqueue-fifo.h

index d12d9e8659f2f4d4e27f93626835288bf511c534..4c070861530c08ca2028cd0059abad5144c12d2f 100644 (file)
@@ -60,6 +60,8 @@ static unsigned long duration;
 
 static unsigned long dispatch_delay_loops;
 
+static unsigned long max_queue_len;
+
 static inline void loop_sleep(unsigned long loops)
 {
        while (loops-- != 0)
@@ -153,10 +155,19 @@ static void *thr_dispatcher(void *_count)
 
        for (;;) {
                struct test_work *work = malloc(sizeof(*work));
+               enum urcu_enqueue_ret ret;
+
                if (!work)
                        goto fail;
-               printf_verbose("queue work %p\n", work);
-               urcu_queue_work(&workqueue, &work->w);
+retry:
+               printf_verbose("attempt queue work %p\n", work);
+               ret = urcu_queue_work(&workqueue, &work->w);
+               if (ret == URCU_ENQUEUE_FULL) {
+                       printf_verbose("queue work %p (queue full)\n", work);
+                       (void) poll(NULL, 0, 10);
+                       goto retry;
+               }
+               printf_verbose("queue work %p (ok)\n", work);
                URCU_TLS(nr_enqueues)++;
 
                if (caa_unlikely(dispatch_delay_loops))
@@ -187,8 +198,8 @@ static void *thr_worker(void *_count)
        set_affinity();
 
        rcu_register_thread();
-       urcu_worker_init(&worker, URCU_WORKER_STEAL);
-       //urcu_worker_init(&worker, 0);
+       urcu_worker_init(&workqueue, &worker, URCU_WORKER_STEAL);
+       //urcu_worker_init(&workqueue, &worker, 0);
        urcu_worker_register(&workqueue, &worker);
 
        while (!test_go)
@@ -199,7 +210,7 @@ static void *thr_worker(void *_count)
        for (;;) {
                enum urcu_accept_ret ret;
 
-               ret = urcu_accept_work(&workqueue, &worker);
+               ret = urcu_accept_work(&worker);
                if (ret == URCU_ACCEPT_SHUTDOWN)
                        break;
                for (;;) {
@@ -239,6 +250,7 @@ static void show_usage(int argc, char **argv)
        printf("        [-v] (verbose output)\n");
        printf("        [-a cpu#] [-a cpu#]... (affinity)\n");
        printf("        [-w] Wait for worker to empty stack\n");
+       printf("        [-m len] (Max queue length. 0 means infinite.))\n");
        printf("\n");
 }
 
@@ -289,6 +301,13 @@ int main(int argc, char **argv)
                        use_affinity = 1;
                        printf_verbose("Adding CPU %d affinity\n", a);
                        break;
+               case 'm':
+                       if (argc < i + 2) {
+                               show_usage(argc, argv);
+                               return -1;
+                       }
+                       max_queue_len = atol(argv[++i]);
+                       break;
                case 'c':
                        if (argc < i + 2) {
                                show_usage(argc, argv);
@@ -326,7 +345,7 @@ int main(int argc, char **argv)
        tid_worker = calloc(nr_workers, sizeof(*tid_worker));
        count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
        count_worker = calloc(nr_workers, sizeof(*count_worker));
-       urcu_workqueue_init(&workqueue);
+       urcu_workqueue_init(&workqueue, max_queue_len);
 
        next_aff = 0;
 
@@ -380,9 +399,9 @@ int main(int argc, char **argv)
 
        printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
                "work_loops %lu nr_workers %3u "
-               "nr_enqueues %12llu nr_dequeues %12llu\n",
+               "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu\n",
                argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
-               nr_workers, tot_enqueues, tot_dequeues);
+               nr_workers, tot_enqueues, tot_dequeues, max_queue_len);
        free(count_dispatcher);
        free(count_worker);
        free(tid_dispatcher);
index a2bbd909fdb1ae3923c8743d155752dceb016802..13d9278a5beca9baa46ea9ef3cc7f3b1981091e0 100644 (file)
@@ -36,6 +36,11 @@ enum urcu_accept_ret {
        URCU_ACCEPT_SHUTDOWN    = 1,
 };
 
+enum urcu_enqueue_ret {
+       URCU_ENQUEUE_OK         = 0,
+       URCU_ENQUEUE_FULL       = 1,
+};
+
 /*
  * We use RCU to steal work from siblings. Therefore, one of RCU flavors
  * need to be included before this header. All worker that participate
@@ -59,6 +64,9 @@ struct urcu_workqueue {
        struct cds_list_head sibling_head;
        pthread_mutex_t sibling_lock;   /* Protect sibling list updates */
 
+       /* Maximum number of work entries (approximate). 0 means infinite. */
+       unsigned long nr_work_max;
+       unsigned long nr_work;          /* Current number of work items */
        bool shutdown;                  /* Shutdown performed */
 };
 
@@ -73,6 +81,7 @@ struct urcu_worker {
        struct urcu_wait_node wait_node;
        /* RCU linked list node of siblings for work stealing. */
        struct cds_list_head sibling_node;
+       struct urcu_workqueue *queue;
        int flags;      /* enum urcu_worker_flags */
 };
 
@@ -81,20 +90,32 @@ enum urcu_worker_flags {
 };
 
 static inline
-void urcu_workqueue_init(struct urcu_workqueue *queue)
+void urcu_workqueue_init(struct urcu_workqueue *queue,
+               unsigned long max_queue_len)
 {
        __cds_wfcq_init(&queue->head, &queue->tail);
        urcu_wait_queue_init(&queue->waitqueue);
        CDS_INIT_LIST_HEAD(&queue->sibling_head);
        pthread_mutex_init(&queue->sibling_lock, NULL);
+       queue->nr_work_max = max_queue_len;
+       queue->nr_work = 0;
        queue->shutdown = false;
 }
 
 static inline
-void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
+               struct urcu_work *work)
 {
        bool was_empty;
-
+       unsigned long nr_work_max;
+
+       nr_work_max = queue->nr_work_max;
+       if (nr_work_max) {
+               /* Approximate max queue size. */
+               if (uatomic_read(&queue->nr_work) >= nr_work_max)
+                       return URCU_ENQUEUE_FULL;
+               uatomic_inc(&queue->nr_work);
+       }
        cds_wfcq_node_init(&work->node);
 
        /* Enqueue work. */
@@ -118,6 +139,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
                (void) urcu_dequeue_wake_single(&queue->waitqueue);
                rcu_read_unlock();      /* Protect stack dequeue */
        }
+       return URCU_ENQUEUE_OK;
 }
 
 static inline
@@ -133,13 +155,15 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 }
 
 static inline
-void urcu_worker_init(struct urcu_worker *worker, int flags)
+void urcu_worker_init(struct urcu_workqueue *queue,
+               struct urcu_worker *worker, int flags)
 {
        cds_wfcq_init(&worker->head, &worker->tail);
        worker->flags = flags;
        urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
        worker->own = NULL;
        worker->wait_node.node.next = NULL;
+       worker->queue = queue;
 }
 
 static inline
@@ -358,9 +382,9 @@ end:
 }
 
 static inline
-enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
-               struct urcu_worker *worker)
+enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
 {
+       struct urcu_workqueue *queue = worker->queue;
        enum cds_wfcq_ret wfcq_ret;
        bool has_work;
 
@@ -430,15 +454,15 @@ do_work:
 static inline
 struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
 {
+       struct urcu_workqueue *queue = worker->queue;
        struct cds_wfcq_node *node;
+       struct urcu_work *work;
 
        if (worker->own) {
-               struct urcu_work *work;
-
                /* Process our own work entry. */
                work = worker->own;
                worker->own = NULL;
-               return work;
+               goto end;
        }
        /*
         * If we are registered for work stealing, we need to dequeue
@@ -459,7 +483,11 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
        }
        if (!node)
                return NULL;
-       return caa_container_of(node, struct urcu_work, node);
+       work = caa_container_of(node, struct urcu_work, node);
+end:
+       if (queue->nr_work_max)
+               uatomic_dec(&queue->nr_work);
+       return work;
 }
 
 static inline
This page took 0.033431 seconds and 4 git commands to generate.