From: Mathieu Desnoyers Date: Tue, 20 Sep 2011 20:46:22 +0000 (-0400) Subject: rculfhash: parallelize resize X-Git-Tag: v0.7.0~43^2~141 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=b7d619b0a4d1613664285e3986b930a05c131c70;p=urcu.git rculfhash: parallelize resize Signed-off-by: Mathieu Desnoyers --- diff --git a/rculfhash.c b/rculfhash.c index ce1cbb7..42501a1 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -177,6 +177,11 @@ #define MAX_TABLE_ORDER 64 #endif +/* + * Minimum number of dummy nodes to touch per thread to parallelize grow/shrink. + */ +#define MIN_PARTITION_PER_THREAD 4096 + #ifndef min #define min(a, b) ((a) < (b) ? (a) : (b)) #endif @@ -235,6 +240,9 @@ struct cds_lfht { void (*cds_lfht_rcu_read_unlock)(void); void (*cds_lfht_rcu_thread_offline)(void); void (*cds_lfht_rcu_thread_online)(void); + void (*cds_lfht_rcu_register_thread)(void); + void (*cds_lfht_rcu_unregister_thread)(void); + pthread_attr_t *resize_attr; /* Resize threads attributes */ unsigned long count; /* global approximate item count */ struct ht_items_count *percpu_count; /* per-cpu item count */ }; @@ -244,6 +252,14 @@ struct rcu_resize_work { struct cds_lfht *ht; }; +struct partition_resize_work { + struct rcu_head head; + struct cds_lfht *ht; + unsigned long i, start, len; + void (*fct)(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len); +}; + static struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, unsigned long size, @@ -879,25 +895,70 @@ end: return -ENOENT; } +static +void *partition_resize_thread(void *arg) +{ + struct partition_resize_work *work = arg; + + work->ht->cds_lfht_rcu_register_thread(); + work->fct(work->ht, work->i, work->start, work->len); + work->ht->cds_lfht_rcu_unregister_thread(); + return NULL; +} + +static +void partition_resize_helper(struct cds_lfht *ht, unsigned long i, + unsigned long len, + void (*fct)(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len)) +{ + unsigned long partition_len; + struct partition_resize_work *work; + int cpu, ret; + pthread_t *thread_id; + + /* Note: nr_cpus_mask + 1 is always power of 2 */ + partition_len = len >> get_count_order_ulong(nr_cpus_mask + 1); + work = calloc(nr_cpus_mask + 1, sizeof(*work)); + thread_id = calloc(nr_cpus_mask + 1, sizeof(*thread_id)); + assert(work); + for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) { + work[cpu].ht = ht; + work[cpu].i = i; + work[cpu].len = partition_len; + work[cpu].start = cpu * partition_len; + work[cpu].fct = fct; + ret = pthread_create(&thread_id[cpu], ht->resize_attr, + partition_resize_thread, &work[cpu]); + assert(!ret); + } + for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) { + ret = pthread_join(thread_id[cpu], NULL); + assert(!ret); + } + free(work); + free(thread_id); +} + /* * Holding RCU read lock to protect _cds_lfht_add against memory * reclaim that could be performed by other call_rcu worker threads (ABA * problem). * - * TODO: when we reach a certain length, we can split this population phase over + * When we reach a certain length, we can split this population phase over * many worker threads, based on the number of CPUs available in the system. * This should therefore take care of not having the expand lagging behind too * many concurrent insertion threads by using the scheduler's ability to * schedule dummy node population fairly with insertions. */ static -void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len) +void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len) { unsigned long j; - ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); - for (j = 0; j < len; j++) { + for (j = start; j < start + len; j++) { struct cds_lfht_node *new_node = (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; @@ -911,7 +972,20 @@ void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len break; } ht->cds_lfht_rcu_read_unlock(); - ht->cds_lfht_rcu_thread_offline(); +} + +static +void init_table_populate(struct cds_lfht *ht, unsigned long i, + unsigned long len) +{ + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) { + ht->cds_lfht_rcu_thread_online(); + init_table_populate_partition(ht, i, 0, len); + ht->cds_lfht_rcu_thread_offline(); + return; + } + partition_resize_helper(ht, i, len, init_table_populate_partition); } static @@ -935,6 +1009,7 @@ void init_table(struct cds_lfht *ht, ht->t.tbl[i] = calloc(1, sizeof(struct rcu_level) + (len * sizeof(struct _cds_lfht_node))); + assert(ht->t.tbl[i]); /* * Set all dummy nodes reverse hash values for a level and @@ -974,19 +1049,19 @@ void init_table(struct cds_lfht *ht, * Logical removal and garbage collection can therefore be done in batch or on a * node-per-node basis, as long as the guarantee above holds. * - * TODO: when we reach a certain length, we can split this removal over many - * worker threads, based on the number of CPUs available in the system. This - * should take care of not letting resize process lag behind too many concurrent + * When we reach a certain length, we can split this removal over many worker + * threads, based on the number of CPUs available in the system. This should + * take care of not letting resize process lag behind too many concurrent * updater threads actively inserting into the hash table. */ static -void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) +void remove_table_partition(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len) { unsigned long j; - ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); - for (j = 0; j < len; j++) { + for (j = start; j < start + len; j++) { struct cds_lfht_node *fini_node = (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; @@ -1000,7 +1075,20 @@ void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) break; } ht->cds_lfht_rcu_read_unlock(); - ht->cds_lfht_rcu_thread_offline(); +} + +static +void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) +{ + + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) { + ht->cds_lfht_rcu_thread_online(); + remove_table_partition(ht, i, 0, len); + ht->cds_lfht_rcu_thread_offline(); + return; + } + partition_resize_helper(ht, i, len, remove_table_partition); } static @@ -1061,7 +1149,10 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, void (*cds_lfht_rcu_read_lock)(void), void (*cds_lfht_rcu_read_unlock)(void), void (*cds_lfht_rcu_thread_offline)(void), - void (*cds_lfht_rcu_thread_online)(void)) + void (*cds_lfht_rcu_thread_online)(void), + void (*cds_lfht_rcu_register_thread)(void), + void (*cds_lfht_rcu_unregister_thread)(void), + pthread_attr_t *attr) { struct cds_lfht *ht; unsigned long order; @@ -1070,6 +1161,7 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, if (init_size && (init_size & (init_size - 1))) return NULL; ht = calloc(1, sizeof(struct cds_lfht)); + assert(ht); ht->hash_fct = hash_fct; ht->compare_fct = compare_fct; ht->hash_seed = hash_seed; @@ -1079,6 +1171,9 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock; ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline; ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online; + ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread; + ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread; + ht->resize_attr = attr; ht->percpu_count = alloc_per_cpu_items_count(); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); @@ -1249,7 +1344,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht) * Should only be called when no more concurrent readers nor writers can * possibly access the table. */ -int cds_lfht_destroy(struct cds_lfht *ht) +int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { int ret; @@ -1261,6 +1356,8 @@ int cds_lfht_destroy(struct cds_lfht *ht) if (ret) return ret; free_per_cpu_items_count(ht->percpu_count); + if (attr) + *attr = ht->resize_attr; poison_free(ht); return ret; } diff --git a/tests/test_urcu_hash.c b/tests/test_urcu_hash.c index 273283e..238b8e5 100644 --- a/tests/test_urcu_hash.c +++ b/tests/test_urcu_hash.c @@ -750,7 +750,7 @@ int main(int argc, char **argv) count_writer = malloc(sizeof(*count_writer) * nr_writers); test_ht = cds_lfht_new(test_hash, test_compare, 0x42UL, init_hash_size, - opt_auto_resize ? CDS_LFHT_AUTO_RESIZE : 0); + opt_auto_resize ? CDS_LFHT_AUTO_RESIZE : 0, NULL); ret = populate_hash(); assert(!ret); err = create_all_cpu_call_rcu_data(0); @@ -804,7 +804,7 @@ int main(int argc, char **argv) if (count || removed) printf("WARNING: nodes left in the hash table upon destroy: " "%lu nodes + %lu logically removed.\n", count, removed); - ret = cds_lfht_destroy(test_ht); + ret = cds_lfht_destroy(test_ht, NULL); if (ret) printf_verbose("final delete aborted\n"); diff --git a/urcu/rculfhash.h b/urcu/rculfhash.h index bc9f232..4f7cc52 100644 --- a/urcu/rculfhash.h +++ b/urcu/rculfhash.h @@ -95,7 +95,10 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, void (*cds_lfht_rcu_read_lock)(void), void (*cds_lfht_rcu_read_unlock)(void), void (*cds_lfht_rcu_thread_offline)(void), - void (*cds_lfht_rcu_thread_online)(void)); + void (*cds_lfht_rcu_thread_online)(void), + void (*cds_lfht_rcu_register_thread)(void), + void (*cds_lfht_rcu_unregister_thread)(void), + pthread_attr_t *attr); /* * cds_lfht_new - allocate a hash table. @@ -106,30 +109,45 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, * @flags: hash table creation flags (can be combined with bitwise or: '|'). * 0: no flags. * CDS_LFHT_AUTO_RESIZE: automatically resize hash table. + * @attr: optional resize worker thread attributes. NULL for default. * * Return NULL on error. * Note: the RCU flavor must be already included before the hash table header. + * + * The programmer is responsible for ensuring that resize operation has a + * priority equal to hash table updater threads. It should be performed by + * specifying the appropriate priority in the pthread "attr" argument, and, + * for CDS_LFHT_AUTO_RESIZE, by ensuring that call_rcu worker threads also have + * this priority level. Having lower priority for call_rcu and resize threads + * does not pose any correctness issue, but the resize operations could be + * starved by updates, thus leading to long hash table bucket chains. */ static inline struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, cds_lfht_compare_fct compare_fct, unsigned long hash_seed, unsigned long init_size, - int flags) + int flags, + pthread_attr_t *attr) { return _cds_lfht_new(hash_fct, compare_fct, hash_seed, init_size, flags, call_rcu, synchronize_rcu, rcu_read_lock, rcu_read_unlock, rcu_thread_offline, - rcu_thread_online); + rcu_thread_online, rcu_register_thread, + rcu_unregister_thread, attr); } /* * cds_lfht_destroy - destroy a hash table. + * @ht: the hash table to destroy. + * @attr: (output) resize worker thread attributes, as received by cds_lfht_new. + * The caller will typically want to free this pointer if dynamically + * allocated. * * Return 0 on success, negative error value on error. */ -int cds_lfht_destroy(struct cds_lfht *ht); +int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr); /* * cds_lfht_count_nodes - count the number of nodes in the hash table.