#define MAX_TABLE_ORDER 64
#endif
+/*
+ * Minimum number of dummy nodes to touch per thread to parallelize grow/shrink.
+ */
+#define MIN_PARTITION_PER_THREAD 4096
+
#ifndef min
#define min(a, b) ((a) < (b) ? (a) : (b))
#endif
void (*cds_lfht_rcu_read_unlock)(void);
void (*cds_lfht_rcu_thread_offline)(void);
void (*cds_lfht_rcu_thread_online)(void);
+ void (*cds_lfht_rcu_register_thread)(void);
+ void (*cds_lfht_rcu_unregister_thread)(void);
+ pthread_attr_t *resize_attr; /* Resize threads attributes */
unsigned long count; /* global approximate item count */
struct ht_items_count *percpu_count; /* per-cpu item count */
};
struct cds_lfht *ht;
};
+struct partition_resize_work {
+ struct rcu_head head;
+ struct cds_lfht *ht;
+ unsigned long i, start, len;
+ void (*fct)(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len);
+};
+
static
struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
unsigned long size,
return -ENOENT;
}
+static
+void *partition_resize_thread(void *arg)
+{
+ struct partition_resize_work *work = arg;
+
+ work->ht->cds_lfht_rcu_register_thread();
+ work->fct(work->ht, work->i, work->start, work->len);
+ work->ht->cds_lfht_rcu_unregister_thread();
+ return NULL;
+}
+
+static
+void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
+ unsigned long len,
+ void (*fct)(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len))
+{
+ unsigned long partition_len;
+ struct partition_resize_work *work;
+ int cpu, ret;
+ pthread_t *thread_id;
+
+ /* Note: nr_cpus_mask + 1 is always power of 2 */
+ partition_len = len >> get_count_order_ulong(nr_cpus_mask + 1);
+ work = calloc(nr_cpus_mask + 1, sizeof(*work));
+ thread_id = calloc(nr_cpus_mask + 1, sizeof(*thread_id));
+ assert(work);
+ for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) {
+ work[cpu].ht = ht;
+ work[cpu].i = i;
+ work[cpu].len = partition_len;
+ work[cpu].start = cpu * partition_len;
+ work[cpu].fct = fct;
+ ret = pthread_create(&thread_id[cpu], ht->resize_attr,
+ partition_resize_thread, &work[cpu]);
+ assert(!ret);
+ }
+ for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) {
+ ret = pthread_join(thread_id[cpu], NULL);
+ assert(!ret);
+ }
+ free(work);
+ free(thread_id);
+}
+
/*
* Holding RCU read lock to protect _cds_lfht_add against memory
* reclaim that could be performed by other call_rcu worker threads (ABA
* problem).
*
- * TODO: when we reach a certain length, we can split this population phase over
+ * When we reach a certain length, we can split this population phase over
* many worker threads, based on the number of CPUs available in the system.
* This should therefore take care of not having the expand lagging behind too
* many concurrent insertion threads by using the scheduler's ability to
* schedule dummy node population fairly with insertions.
*/
static
-void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len)
+void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len)
{
unsigned long j;
- ht->cds_lfht_rcu_thread_online();
ht->cds_lfht_rcu_read_lock();
- for (j = 0; j < len; j++) {
+ for (j = start; j < start + len; j++) {
struct cds_lfht_node *new_node =
(struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j];
break;
}
ht->cds_lfht_rcu_read_unlock();
- ht->cds_lfht_rcu_thread_offline();
+}
+
+static
+void init_table_populate(struct cds_lfht *ht, unsigned long i,
+ unsigned long len)
+{
+ assert(nr_cpus_mask != -1);
+ if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) {
+ ht->cds_lfht_rcu_thread_online();
+ init_table_populate_partition(ht, i, 0, len);
+ ht->cds_lfht_rcu_thread_offline();
+ return;
+ }
+ partition_resize_helper(ht, i, len, init_table_populate_partition);
}
static
ht->t.tbl[i] = calloc(1, sizeof(struct rcu_level)
+ (len * sizeof(struct _cds_lfht_node)));
+ assert(ht->t.tbl[i]);
/*
* Set all dummy nodes reverse hash values for a level and
* Logical removal and garbage collection can therefore be done in batch or on a
* node-per-node basis, as long as the guarantee above holds.
*
- * TODO: when we reach a certain length, we can split this removal over many
- * worker threads, based on the number of CPUs available in the system. This
- * should take care of not letting resize process lag behind too many concurrent
+ * When we reach a certain length, we can split this removal over many worker
+ * threads, based on the number of CPUs available in the system. This should
+ * take care of not letting resize process lag behind too many concurrent
* updater threads actively inserting into the hash table.
*/
static
-void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
+void remove_table_partition(struct cds_lfht *ht, unsigned long i,
+ unsigned long start, unsigned long len)
{
unsigned long j;
- ht->cds_lfht_rcu_thread_online();
ht->cds_lfht_rcu_read_lock();
- for (j = 0; j < len; j++) {
+ for (j = start; j < start + len; j++) {
struct cds_lfht_node *fini_node =
(struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j];
break;
}
ht->cds_lfht_rcu_read_unlock();
- ht->cds_lfht_rcu_thread_offline();
+}
+
+static
+void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
+{
+
+ assert(nr_cpus_mask != -1);
+ if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) {
+ ht->cds_lfht_rcu_thread_online();
+ remove_table_partition(ht, i, 0, len);
+ ht->cds_lfht_rcu_thread_offline();
+ return;
+ }
+ partition_resize_helper(ht, i, len, remove_table_partition);
}
static
void (*cds_lfht_rcu_read_lock)(void),
void (*cds_lfht_rcu_read_unlock)(void),
void (*cds_lfht_rcu_thread_offline)(void),
- void (*cds_lfht_rcu_thread_online)(void))
+ void (*cds_lfht_rcu_thread_online)(void),
+ void (*cds_lfht_rcu_register_thread)(void),
+ void (*cds_lfht_rcu_unregister_thread)(void),
+ pthread_attr_t *attr)
{
struct cds_lfht *ht;
unsigned long order;
if (init_size && (init_size & (init_size - 1)))
return NULL;
ht = calloc(1, sizeof(struct cds_lfht));
+ assert(ht);
ht->hash_fct = hash_fct;
ht->compare_fct = compare_fct;
ht->hash_seed = hash_seed;
ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline;
ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online;
+ ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread;
+ ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread;
+ ht->resize_attr = attr;
ht->percpu_count = alloc_per_cpu_items_count();
/* this mutex should not nest in read-side C.S. */
pthread_mutex_init(&ht->resize_mutex, NULL);
* Should only be called when no more concurrent readers nor writers can
* possibly access the table.
*/
-int cds_lfht_destroy(struct cds_lfht *ht)
+int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
{
int ret;
if (ret)
return ret;
free_per_cpu_items_count(ht->percpu_count);
+ if (attr)
+ *attr = ht->resize_attr;
poison_free(ht);
return ret;
}
void (*cds_lfht_rcu_read_lock)(void),
void (*cds_lfht_rcu_read_unlock)(void),
void (*cds_lfht_rcu_thread_offline)(void),
- void (*cds_lfht_rcu_thread_online)(void));
+ void (*cds_lfht_rcu_thread_online)(void),
+ void (*cds_lfht_rcu_register_thread)(void),
+ void (*cds_lfht_rcu_unregister_thread)(void),
+ pthread_attr_t *attr);
/*
* cds_lfht_new - allocate a hash table.
* @flags: hash table creation flags (can be combined with bitwise or: '|').
* 0: no flags.
* CDS_LFHT_AUTO_RESIZE: automatically resize hash table.
+ * @attr: optional resize worker thread attributes. NULL for default.
*
* Return NULL on error.
* Note: the RCU flavor must be already included before the hash table header.
+ *
+ * The programmer is responsible for ensuring that resize operation has a
+ * priority equal to hash table updater threads. It should be performed by
+ * specifying the appropriate priority in the pthread "attr" argument, and,
+ * for CDS_LFHT_AUTO_RESIZE, by ensuring that call_rcu worker threads also have
+ * this priority level. Having lower priority for call_rcu and resize threads
+ * does not pose any correctness issue, but the resize operations could be
+ * starved by updates, thus leading to long hash table bucket chains.
*/
static inline
struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct,
cds_lfht_compare_fct compare_fct,
unsigned long hash_seed,
unsigned long init_size,
- int flags)
+ int flags,
+ pthread_attr_t *attr)
{
return _cds_lfht_new(hash_fct, compare_fct, hash_seed,
init_size, flags,
call_rcu, synchronize_rcu, rcu_read_lock,
rcu_read_unlock, rcu_thread_offline,
- rcu_thread_online);
+ rcu_thread_online, rcu_register_thread,
+ rcu_unregister_thread, attr);
}
/*
* cds_lfht_destroy - destroy a hash table.
+ * @ht: the hash table to destroy.
+ * @attr: (output) resize worker thread attributes, as received by cds_lfht_new.
+ * The caller will typically want to free this pointer if dynamically
+ * allocated.
*
* Return 0 on success, negative error value on error.
*/
-int cds_lfht_destroy(struct cds_lfht *ht);
+int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr);
/*
* cds_lfht_count_nodes - count the number of nodes in the hash table.