From: Mathieu Desnoyers Date: Fri, 6 Feb 2009 00:06:44 +0000 (-0500) Subject: init version X-Git-Tag: v0.1~334 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=27b012e271a82b9a0d94543688904f207cd154ea;p=userspace-rcu.git init version --- 27b012e271a82b9a0d94543688904f207cd154ea diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1f53f00 --- /dev/null +++ b/Makefile @@ -0,0 +1,6 @@ + +test_urcu: urcu.o test_urcu.c + gcc -g -o test_urcu urcu.o test_urcu.c + +urcu.o: urcu.c urcu.h + gcc -g -o urcu.o urcu.c diff --git a/urcu.c b/urcu.c new file mode 100644 index 0000000..b8c80ab --- /dev/null +++ b/urcu.c @@ -0,0 +1,250 @@ +#include +#include +#include +#include + +#include "urcu.h" + +pthread_mutex_t urcu_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* Global quiescent period parity */ +int urcu_qparity; + +int __thread urcu_active_readers[2]; + +/* Thread IDs of registered readers */ +#define INIT_NUM_THREADS 4 + +struct reader_data { + pthread_t tid; + int **urcu_active_readers; +}; + +static struct reader_data *reader_data; +static int num_readers, alloc_readers; +static int sig_done; + +/* + * called with urcu_mutex held. + */ +static int switch_next_urcu_qparity(void) +{ + int old_parity = urcu_qparity; + urcu_qparity = 1 - old_parity; + return old_parity; +} + +static void force_mb_all_threads(void) +{ + pthread_t *index; + /* + * Ask for each threads to execute a mb() so we can consider the + * compiler barriers around rcu read lock as real memory barriers. + */ + if (!reader_data) + return; + sigtask = TASK_FORCE_MB; + sig_done = 0; + mb(); /* write sig_done and sigtask before sending the signals */ + for (index = reader_data; index < reader_data + num_readers; index++) + pthread_kill(*index, SIGURCU); + /* + * Wait for sighandler (and thus mb()) to execute on every thread. + * BUSY-LOOP. + */ + while (sig_done < num_readers) + barrier(); + mb(); /* read sig_done before writing sigtask */ + sigtask = TASK_NONE; +} + +void wait_for_quiescent_state(int parity) +{ + + if (!reader_data) + return; + /* Wait for each thread urcu_active_readers count to become 0. + */ + for (index = readers_data; index < reader_data + num_readers; index++) { + /* + * BUSY-LOOP. + */ + while (*index->urcu_active_readers != 0) + barrier(); + } + /* + * Locally : read *index->urcu_active_readers before freeing old + * pointer. + * Remote (reader threads) : Order urcu_qparity update and other + * thread's quiescent state counter read. + */ + force_mb_all_threads(); +} + +/* + * Return old pointer, OK to free, no more reference exist. + */ +void *urcu_publish_content(void **ptr, void *new) +{ + int ret, prev_parity; + void *oldptr; + + ret = pthread_mutex_lock(&urcu_mutex); + if (ret) { + perror("Error in %s pthread mutex lock", __func__); + exit(-1); + } + + /* + * We can publish the new pointer before we change the current qparity. + * Readers seeing the new pointer while being in the previous qparity + * window will make us wait until the end of the quiescent state before + * we release the unrelated memory area. However, given we hold the + * urcu_mutex, we are making sure that no further garbage collection can + * occur until we release the mutex, therefore we guarantee that this + * given reader will have completed its execution using the new pointer + * when the next quiescent state window will be over. + */ + oldptr = *ptr; + *ptr = new; + wmb(); /* Write ptr before changing the qparity */ + /* All threads should read qparity before ptr */ + force_rmb_all_threads(); + prev_parity = switch_next_urcu_qparity(); + + /* + * Wait for previous parity to be empty of readers. + */ + wait_for_quiescent_state(prev_parity); + /* + * Deleting old data is ok ! + */ + + ret = pthread_mutex_unlock(&urcu_mutex); + if (ret) { + perror("Error in %s pthread mutex lock", __func__); + exit(-1); + } + return oldptr; +} + +void urcu_add_reader(pthread_t id) +{ + if (!reader_data) { + alloc_readers = INIT_NUM_THREADS; + num_readers = 1; + reader_data = + malloc(sizeof(struct reader_data) * alloc_readers); + return; + } + if (alloc_readers < num_readers + 1) { + pthread_t *oldarray; + oldarray = reader_data; + reader_data = malloc(sizeof(struct reader_data) + * (alloc_readers << 1)); + memcpy(reader_data, oldarray, + sizeof(struct reader_data) * alloc_readers); + alloc_readers <<= 1; + free(oldarray); + } + reader_data[num_readers].tid = id; + /* reference to the TLS of _this_ reader thread. */ + reader_data[num_readers].urcu_active_readers = urcu_active_readers; + num_readers++; +} + +/* + * Never shrink (implementation limitation). + * This is O(nb threads). Eventually use a hash table. + */ +void urcu_remove_reader(pthread_t id) +{ + struct reader_data *index; + + assert(reader_data != NULL); + for (index = reader_data; index < reader_data + num_readers; index++) { + if (index->tid == id) { + memcpy(index, &reader_data[num_readers - 1], + sizeof(struct reader_data)); + reader_data[num_readers - 1].tid = 0; + reader_data[num_readers - 1].urcu_active_readers = NULL; + num_readers--; + return; + } + } + /* Hrm not found, forgot to register ? */ + assert(0); +} + +void urcu_register_thread(void) +{ + pthread_t self = pthread_self(); + + ret = pthread_mutex_lock(&urcu_mutex); + if (ret) { + perror("Error in %s pthread mutex lock", __func__); + exit(-1); + } + + urcu_add_reader(self); + + + ret = pthread_mutex_unlock(&urcu_mutex); + if (ret) { + perror("Error in %s pthread mutex unlock", __func__); + exit(-1); + } +} + +void urcu_register_thread(void) +{ + pthread_t self = pthread_self(); + + ret = pthread_mutex_lock(&urcu_mutex); + if (ret) { + perror("Error in %s pthread mutex lock", __func__); + exit(-1); + } + + urcu_remove_reader(self); + + ret = pthread_mutex_unlock(&urcu_mutex); + if (ret) { + perror("Error in %s pthread mutex unlock", __func__); + exit(-1); + } + +} + +void handler(int signo, siginfo_t *siginfo, void *context) +{ + mb(); + atomic_inc(&sig_done); +} + +void __attribute__((constructor)) urcu_init(void) +{ + struct sigaction act; + int ret; + + act.sa_sigaction = sigurcu_handler; + ret = sigaction(SIGURCU, &act, NULL); + if (!ret) { + perror("Error in %s sigaction", __func__); + exit(-1); + } +} + +void __attribute__((destructor)) urcu_exit(void) +{ + struct sigaction act; + int ret; + + ret = sigaction(SIGURCU, NULL, &act); + if (!ret) { + perror("Error in %s sigaction", __func__); + exit(-1); + } + assert(act.sa_sigaction == sigurcu_handler); + free(reader_data); +} diff --git a/urcu.h b/urcu.h new file mode 100644 index 0000000..d417f34 --- /dev/null +++ b/urcu.h @@ -0,0 +1,69 @@ +#ifndef _URCU_H +#define _URCU_H + +/* The "volatile" is due to gcc bugs */ +#define barrier() __asm__ __volatile__("": : :"memory") + +/* x86 32/64 specific */ +#define mb() asm volatile("mfence":::"memory") +#define rmb() asm volatile("lfence":::"memory") +#define wmb() asm volatile("sfence" ::: "memory") + + + +/* x86 32 */ +static inline void atomic_inc(int *v) +{ + asm volatile("lock; incl %0" + : "+m" (v->counter)); +} + +/* Nop everywhere except on alpha. */ +#define smp_read_barrier_depends() + +#define SIGURCU SIGUSR1 + +/* Global quiescent period parity */ +extern int urcu_qparity; + +extern int __thread urcu_active_readers[2]; + +static inline int get_urcu_qparity(void) +{ + return urcu_qparity; +} + +/* + * returns urcu_parity. + */ +static inline int rcu_read_lock(void) +{ + int urcu_parity = get_urcu_qparity(); + urcu_active_readers[urcu_parity]++; + /* + * Increment active readers count before accessing the pointer. + * See force_mb_all_threads(). + */ + barrier(); + return urcu_parity; +} + +static inline void rcu_read_unlock(int urcu_parity) +{ + barrier(); + /* + * Finish using rcu before decrementing the pointer. + * See force_mb_all_threads(). + */ + urcu_active_readers[urcu_parity]--; +} + +extern void *urcu_publish_content(void **ptr, void *new); + +/* + * Reader thread registration. + */ +extern void urcu_register_thread(void); +extern void urcu_register_thread(void); + +#endif /* _URCU_H */