static unsigned long dispatch_delay_loops;
+static unsigned long max_queue_len;
+
static inline void loop_sleep(unsigned long loops)
{
while (loops-- != 0)
for (;;) {
struct test_work *work = malloc(sizeof(*work));
+ enum urcu_enqueue_ret ret;
+
if (!work)
goto fail;
- printf_verbose("queue work %p\n", work);
- urcu_queue_work(&workqueue, &work->w);
+retry:
+ printf_verbose("attempt queue work %p\n", work);
+ ret = urcu_queue_work(&workqueue, &work->w);
+ if (ret == URCU_ENQUEUE_FULL) {
+ printf_verbose("queue work %p (queue full)\n", work);
+ (void) poll(NULL, 0, 10);
+ goto retry;
+ }
+ printf_verbose("queue work %p (ok)\n", work);
URCU_TLS(nr_enqueues)++;
if (caa_unlikely(dispatch_delay_loops))
set_affinity();
rcu_register_thread();
- urcu_worker_init(&worker, URCU_WORKER_STEAL);
- //urcu_worker_init(&worker, 0);
+ urcu_worker_init(&workqueue, &worker, URCU_WORKER_STEAL);
+ //urcu_worker_init(&workqueue, &worker, 0);
urcu_worker_register(&workqueue, &worker);
while (!test_go)
for (;;) {
enum urcu_accept_ret ret;
- ret = urcu_accept_work(&workqueue, &worker);
+ ret = urcu_accept_work(&worker);
if (ret == URCU_ACCEPT_SHUTDOWN)
break;
for (;;) {
printf(" [-v] (verbose output)\n");
printf(" [-a cpu#] [-a cpu#]... (affinity)\n");
printf(" [-w] Wait for worker to empty stack\n");
+ printf(" [-m len] (Max queue length. 0 means infinite.))\n");
printf("\n");
}
use_affinity = 1;
printf_verbose("Adding CPU %d affinity\n", a);
break;
+ case 'm':
+ if (argc < i + 2) {
+ show_usage(argc, argv);
+ return -1;
+ }
+ max_queue_len = atol(argv[++i]);
+ break;
case 'c':
if (argc < i + 2) {
show_usage(argc, argv);
tid_worker = calloc(nr_workers, sizeof(*tid_worker));
count_dispatcher = calloc(nr_dispatchers, sizeof(*count_dispatcher));
count_worker = calloc(nr_workers, sizeof(*count_worker));
- urcu_workqueue_init(&workqueue);
+ urcu_workqueue_init(&workqueue, max_queue_len);
next_aff = 0;
printf("SUMMARY %-25s testdur %4lu nr_dispatchers %3u dispatch_delay_loops %6lu "
"work_loops %lu nr_workers %3u "
- "nr_enqueues %12llu nr_dequeues %12llu\n",
+ "nr_enqueues %12llu nr_dequeues %12llu max_queue_len %lu\n",
argv[0], duration, nr_dispatchers, dispatch_delay_loops, work_loops,
- nr_workers, tot_enqueues, tot_dequeues);
+ nr_workers, tot_enqueues, tot_dequeues, max_queue_len);
free(count_dispatcher);
free(count_worker);
free(tid_dispatcher);
URCU_ACCEPT_SHUTDOWN = 1,
};
+enum urcu_enqueue_ret {
+ URCU_ENQUEUE_OK = 0,
+ URCU_ENQUEUE_FULL = 1,
+};
+
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
* need to be included before this header. All worker that participate
struct cds_list_head sibling_head;
pthread_mutex_t sibling_lock; /* Protect sibling list updates */
+ /* Maximum number of work entries (approximate). 0 means infinite. */
+ unsigned long nr_work_max;
+ unsigned long nr_work; /* Current number of work items */
bool shutdown; /* Shutdown performed */
};
struct urcu_wait_node wait_node;
/* RCU linked list node of siblings for work stealing. */
struct cds_list_head sibling_node;
+ struct urcu_workqueue *queue;
int flags; /* enum urcu_worker_flags */
};
};
static inline
-void urcu_workqueue_init(struct urcu_workqueue *queue)
+void urcu_workqueue_init(struct urcu_workqueue *queue,
+ unsigned long max_queue_len)
{
__cds_wfcq_init(&queue->head, &queue->tail);
urcu_wait_queue_init(&queue->waitqueue);
CDS_INIT_LIST_HEAD(&queue->sibling_head);
pthread_mutex_init(&queue->sibling_lock, NULL);
+ queue->nr_work_max = max_queue_len;
+ queue->nr_work = 0;
queue->shutdown = false;
}
static inline
-void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
+ struct urcu_work *work)
{
bool was_empty;
-
+ unsigned long nr_work_max;
+
+ nr_work_max = queue->nr_work_max;
+ if (nr_work_max) {
+ /* Approximate max queue size. */
+ if (uatomic_read(&queue->nr_work) >= nr_work_max)
+ return URCU_ENQUEUE_FULL;
+ uatomic_inc(&queue->nr_work);
+ }
cds_wfcq_node_init(&work->node);
/* Enqueue work. */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
rcu_read_unlock(); /* Protect stack dequeue */
}
+ return URCU_ENQUEUE_OK;
}
static inline
}
static inline
-void urcu_worker_init(struct urcu_worker *worker, int flags)
+void urcu_worker_init(struct urcu_workqueue *queue,
+ struct urcu_worker *worker, int flags)
{
cds_wfcq_init(&worker->head, &worker->tail);
worker->flags = flags;
urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
worker->own = NULL;
worker->wait_node.node.next = NULL;
+ worker->queue = queue;
}
static inline
}
static inline
-enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
- struct urcu_worker *worker)
+enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
enum cds_wfcq_ret wfcq_ret;
bool has_work;
static inline
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
struct cds_wfcq_node *node;
+ struct urcu_work *work;
if (worker->own) {
- struct urcu_work *work;
-
/* Process our own work entry. */
work = worker->own;
worker->own = NULL;
- return work;
+ goto end;
}
/*
* If we are registered for work stealing, we need to dequeue
}
if (!node)
return NULL;
- return caa_container_of(node, struct urcu_work, node);
+ work = caa_container_of(node, struct urcu_work, node);
+end:
+ if (queue->nr_work_max)
+ uatomic_dec(&queue->nr_work);
+ return work;
}
static inline