From: David Goulet Date: Wed, 11 Jan 2012 17:03:07 +0000 (-0500) Subject: Add lttng hash table support to liblttng-consumer X-Git-Tag: v2.0-pre17~29 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=e4421fecbda445e77b4604d2332014960bfbf695;p=lttng-tools.git Add lttng hash table support to liblttng-consumer Remove linked list usage from liblttng-consumer and replace them by lockless RCU hash tables. Note that there is still a mutex lock protecting those hash tables and no RCU lock mechanism used. For now, it's OK and a very small performance hit. Signed-off-by: David Goulet --- diff --git a/include/lttng/lttng-consumer.h b/include/lttng/lttng-consumer.h index 81fd83e0f..bba72ee69 100644 --- a/include/lttng/lttng-consumer.h +++ b/include/lttng/lttng-consumer.h @@ -23,8 +23,9 @@ #include #include #include -#include + #include +#include /* * When the receiving thread dies, we need to have a way to make the polling @@ -58,14 +59,6 @@ enum lttng_consumer_stream_state { LTTNG_CONSUMER_DELETE_STREAM, }; -struct lttng_consumer_channel_list { - struct cds_list_head head; -}; - -struct lttng_consumer_stream_list { - struct cds_list_head head; -}; - enum lttng_consumer_type { LTTNG_CONSUMER_UNKNOWN = 0, LTTNG_CONSUMER_KERNEL, @@ -74,7 +67,7 @@ enum lttng_consumer_type { }; struct lttng_consumer_channel { - struct cds_list_head list; + struct lttng_ht_node_ulong node; int key; uint64_t max_sb_size; /* the subbuffer size for this channel */ int refcount; /* Number of streams referencing this channel */ @@ -98,7 +91,7 @@ struct lttng_ust_lib_ring_buffer; * uniquely a stream. */ struct lttng_consumer_stream { - struct cds_list_head list; + struct lttng_ht_node_ulong node; struct lttng_consumer_channel *chan; /* associated channel */ /* * key is the key used by the session daemon to refer to the @@ -187,26 +180,25 @@ struct lttng_consumer_local_data { * Library-level data. One instance per process. */ struct lttng_consumer_global_data { + /* - * consumer_data.lock protects consumer_data.fd_list, - * consumer_data.stream_count, and consumer_data.need_update. It - * ensures the count matches the number of items in the fd_list. - * It ensures the list updates *always* trigger an fd_array - * update (therefore need to make list update vs - * consumer_data.need_update flag update atomic, and also flag - * read, fd array and flag clear atomic). + * At this time, this lock is used to ensure coherence between the count + * and number of element in the hash table. It's also a protection for + * concurrent read/write between threads. Although hash table used are + * lockless data structure, appropriate RCU lock mechanism are not yet + * implemented in the consumer. */ pthread_mutex_t lock; + /* - * Number of streams in the list below. Protected by - * consumer_data.lock. + * Number of streams in the hash table. Protected by consumer_data.lock. */ int stream_count; /* - * Lists of streams and channels. Protected by consumer_data.lock. + * Hash tables of streams and channels. Protected by consumer_data.lock. */ - struct lttng_consumer_stream_list stream_list; - struct lttng_consumer_channel_list channel_list; + struct lttng_ht *stream_ht; + struct lttng_ht *channel_ht; /* * Flag specifying if the local array of FDs needs update in the * poll function. Protected by consumer_data.lock. @@ -215,6 +207,11 @@ struct lttng_consumer_global_data { enum lttng_consumer_type type; }; +/* + * Init consumer data structures. + */ +extern void lttng_consumer_init(void); + /* * Set the error socket for communication with a session daemon. */ diff --git a/liblttng-consumer/Makefile.am b/liblttng-consumer/Makefile.am index 80f5843da..aadba52b5 100644 --- a/liblttng-consumer/Makefile.am +++ b/liblttng-consumer/Makefile.am @@ -6,8 +6,8 @@ liblttng_consumer_la_SOURCES = lttng-consumer.c liblttng_consumer_la_LIBADD = \ $(top_builddir)/liblttng-sessiond-comm/liblttng-sessiond-comm.la \ - $(top_builddir)/liblttng-kconsumer/liblttng-kconsumer.la - + $(top_builddir)/liblttng-kconsumer/liblttng-kconsumer.la \ + $(top_builddir)/liblttng-ht/liblttng-ht.la if HAVE_LIBLTTNG_UST_CTL liblttng_consumer_la_LIBADD += \ diff --git a/liblttng-consumer/lttng-consumer.c b/liblttng-consumer/lttng-consumer.c index 0811e68ca..f4af47404 100644 --- a/liblttng-consumer/lttng-consumer.c +++ b/liblttng-consumer/lttng-consumer.c @@ -37,8 +37,6 @@ #include struct lttng_consumer_global_data consumer_data = { - .stream_list.head = CDS_LIST_HEAD_INIT(consumer_data.stream_list.head), - .channel_list.head = CDS_LIST_HEAD_INIT(consumer_data.channel_list.head), .stream_count = 0, .need_update = 1, .type = LTTNG_CONSUMER_UNKNOWN, @@ -61,18 +59,22 @@ volatile int consumer_quit = 0; */ static struct lttng_consumer_stream *consumer_find_stream(int key) { - struct lttng_consumer_stream *iter; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_stream *stream = NULL; /* Negative keys are lookup failures */ if (key < 0) return NULL; - cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { - if (iter->key == key) { - DBG("Found stream key %d", key); - return iter; - } + + lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + stream = caa_container_of(node, struct lttng_consumer_stream, node); } - return NULL; + + return stream; } static void consumer_steal_stream_key(int key) @@ -86,18 +88,22 @@ static void consumer_steal_stream_key(int key) static struct lttng_consumer_channel *consumer_find_channel(int key) { - struct lttng_consumer_channel *iter; + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct lttng_consumer_channel *channel = NULL; /* Negative keys are lookup failures */ if (key < 0) return NULL; - cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) { - if (iter->key == key) { - DBG("Found channel key %d", key); - return iter; - } + + lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + channel = caa_container_of(node, struct lttng_consumer_channel, node); } - return NULL; + + return channel; } static void consumer_steal_channel_key(int key) @@ -116,6 +122,7 @@ static void consumer_steal_channel_key(int key) void consumer_del_stream(struct lttng_consumer_stream *stream) { int ret; + struct lttng_ht_iter iter; struct lttng_consumer_channel *free_chan = NULL; pthread_mutex_lock(&consumer_data.lock); @@ -139,7 +146,13 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) goto end; } - cds_list_del(&stream->list); + /* Get stream node from hash table */ + lttng_ht_lookup(consumer_data.stream_ht, + (void *)((unsigned long) stream->key), &iter); + /* Remove stream node from hash table */ + ret = lttng_ht_del(consumer_data.stream_ht, &iter); + assert(!ret); + if (consumer_data.stream_count <= 0) { goto end; } @@ -205,6 +218,7 @@ struct lttng_consumer_stream *consumer_allocate_stream( stream->gid = gid; strncpy(stream->path_name, path_name, PATH_MAX - 1); stream->path_name[PATH_MAX - 1] = '\0'; + lttng_ht_node_init_ulong(&stream->node, stream->key); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -243,7 +257,7 @@ int consumer_add_stream(struct lttng_consumer_stream *stream) pthread_mutex_lock(&consumer_data.lock); /* Steal stream identifier, for UST */ consumer_steal_stream_key(stream->key); - cds_list_add(&stream->list, &consumer_data.stream_list.head); + lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node); consumer_data.stream_count++; consumer_data.need_update = 1; @@ -290,6 +304,7 @@ void consumer_change_stream_state(int stream_key, void consumer_del_channel(struct lttng_consumer_channel *channel) { int ret; + struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); @@ -306,7 +321,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - cds_list_del(&channel->list); + lttng_ht_lookup(consumer_data.channel_ht, + (void *)((unsigned long) channel->key), &iter); + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); + if (channel->mmap_base != NULL) { ret = munmap(channel->mmap_base, channel->mmap_len); if (ret != 0) { @@ -346,6 +365,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( channel->max_sb_size = max_sb_size; channel->refcount = 0; channel->nr_streams = 0; + lttng_ht_node_init_ulong(&channel->node, channel->key); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -383,7 +403,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel) pthread_mutex_lock(&consumer_data.lock); /* Steal channel identifier, for UST */ consumer_steal_channel_key(channel->key); - cds_list_add(&channel->list, &consumer_data.channel_list.head); + lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node); pthread_mutex_unlock(&consumer_data.lock); return 0; } @@ -399,18 +419,20 @@ int consumer_update_poll_array( struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream) { - struct lttng_consumer_stream *iter; int i = 0; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; DBG("Updating poll fd array"); - cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) { - if (iter->state != LTTNG_CONSUMER_ACTIVE_STREAM) { + cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, + node.node) { + if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) { continue; } - DBG("Active FD %d", iter->wait_fd); - (*pollfd)[i].fd = iter->wait_fd; + DBG("Active FD %d", stream->wait_fd); + (*pollfd)[i].fd = stream->wait_fd; (*pollfd)[i].events = POLLIN | POLLPRI; - local_stream[i] = iter; + local_stream[i] = stream; i++; } @@ -486,21 +508,28 @@ int lttng_consumer_send_error( */ void lttng_consumer_cleanup(void) { - struct lttng_consumer_stream *iter, *tmp; - struct lttng_consumer_channel *citer, *ctmp; + int ret; + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + struct lttng_consumer_channel *channel; /* * close all outfd. Called when there are no more threads * running (after joining on the threads), no need to protect * list iteration with mutex. */ - cds_list_for_each_entry_safe(iter, tmp, - &consumer_data.stream_list.head, list) { - consumer_del_stream(iter); + cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream, + node.node) { + ret = lttng_ht_del(consumer_data.stream_ht, &iter); + assert(!ret); + consumer_del_stream(stream); } - cds_list_for_each_entry_safe(citer, ctmp, - &consumer_data.channel_list.head, list) { - consumer_del_channel(citer); + + cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel, + node.node) { + ret = lttng_ht_del(consumer_data.channel_ht, &iter); + assert(!ret); + consumer_del_channel(channel); } } @@ -759,7 +788,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } /* - * This thread polls the fds in the ltt_fd_list to consume the data and write + * This thread polls the fds in the set to consume the data and write * it to tracefile if necessary. */ void *lttng_consumer_thread_poll_fds(void *data) @@ -781,7 +810,7 @@ void *lttng_consumer_thread_poll_fds(void *data) num_hup = 0; /* - * the ltt_fd_list has been updated, we need to update our + * the fds set has been updated, we need to update our * local array as well */ pthread_mutex_lock(&consumer_data.lock); @@ -1073,3 +1102,13 @@ int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream) return -ENOSYS; } } + +/* + * Allocate and set consumer data hash tables. + */ +void lttng_consumer_init(void) +{ + consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); +} + diff --git a/lttng-consumerd/lttng-consumerd.c b/lttng-consumerd/lttng-consumerd.c index cf515a9df..e8e511702 100644 --- a/lttng-consumerd/lttng-consumerd.c +++ b/lttng-consumerd/lttng-consumerd.c @@ -267,6 +267,10 @@ int main(int argc, char **argv) goto error; } } + + /* Init */ + lttng_consumer_init(); + /* create the consumer instance with and assign the callbacks */ ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer, NULL, lttng_consumer_on_recv_stream, NULL);