#include <urcu/wfstack.h>
#include <urcu/workqueue-fifo.h>
-static volatile int test_go, test_stop_enqueue, test_stop_dequeue;
+static volatile int test_go, test_stop_enqueue;
static unsigned long work_loops;
/*
* returns 0 if test should end.
*/
-static int test_duration_dequeue(void)
-{
- return !test_stop_dequeue;
-}
-
static int test_duration_enqueue(void)
{
return !test_stop_enqueue;
unsigned long long *count = _count;
unsigned int counter = 0;
struct urcu_worker worker;
- int blocking = 1;
printf_verbose("thread_begin %s, tid %lu\n",
"worker", urcu_get_thread_id());
cmm_smp_mb();
for (;;) {
- int batch_work_count = 0;
+ enum urcu_accept_ret ret;
- urcu_accept_work(&workqueue, &worker, blocking);
+ ret = urcu_accept_work(&workqueue, &worker);
+ if (ret == URCU_ACCEPT_SHUTDOWN)
+ break;
for (;;) {
struct urcu_work *work;
struct test_work *t;
break;
t = caa_container_of(work, struct test_work, w);
printf_verbose("dequeue work %p\n", t);
- batch_work_count++;
URCU_TLS(nr_dequeues)++;
if (caa_unlikely(work_loops))
loop_sleep(work_loops);
free(t);
}
- if (!test_duration_dequeue())
- blocking = 0;
- if (caa_unlikely(!test_duration_dequeue()
- && !batch_work_count))
- break;
}
end:
urcu_worker_unregister(&workqueue, &worker);
sleep(1);
}
}
- test_stop_dequeue = 1;
-
- /* Send finish to all workers */
- urcu_workqueue_wakeup_all(&workqueue);
+ urcu_workqueue_shutdown(&workqueue);
for (i = 0; i < nr_dispatchers; i++) {
err = pthread_join(tid_dispatcher[i], &tret);
#include <pthread.h>
#include <assert.h>
+enum urcu_accept_ret {
+ URCU_ACCEPT_WORK = 0,
+ URCU_ACCEPT_SHUTDOWN = 1,
+};
+
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
* need to be included before this header. All worker that participate
/* RCU linked list head of siblings for work stealing. */
struct cds_list_head sibling_head;
pthread_mutex_t sibling_lock; /* Protect sibling list updates */
+
+ bool shutdown; /* Shutdown performed */
};
struct urcu_worker {
__cds_wfcq_init(&queue->head, &queue->tail);
urcu_wait_queue_init(&queue->waitqueue);
CDS_INIT_LIST_HEAD(&queue->sibling_head);
+ queue->shutdown = false;
}
static inline
}
static inline
-void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
+void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
{
struct urcu_waiters waiters;
}
static inline
-void urcu_accept_work(struct urcu_workqueue *queue,
- struct urcu_worker *worker,
- int blocking)
+enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
+ struct urcu_worker *worker)
{
enum cds_wfcq_ret wfcq_ret;
/* Try to steal work from sibling instead of blocking */
if (__urcu_steal_work(queue, worker))
goto do_work;
- if (!blocking)
- return;
+ /* No more work to do, check shutdown state */
+ if (CMM_LOAD_SHARED(queue->shutdown))
+ return URCU_ACCEPT_SHUTDOWN;
urcu_wait_set_state(&worker->wait_node,
URCU_WAIT_WAITING);
if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
* they can steal from us.
*/
(void) __urcu_wakeup_siblings(queue, worker);
+ return URCU_ACCEPT_WORK;
}
static inline
return caa_container_of(node, struct urcu_work, node);
}
+static inline
+void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
+{
+ /* Set shutdown */
+ CMM_STORE_SHARED(queue->shutdown, true);
+ /* Wakeup all workers */
+ __urcu_workqueue_wakeup_all(queue);
+}
+
#endif /* _URCU_WORKQUEUE_FIFO_H */