From: Mathieu Desnoyers Date: Mon, 15 Jul 2013 23:45:24 +0000 (-0400) Subject: consumer: introduce channel lock X-Git-Tag: v2.2.2~17 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=72c3879d3f2b0b9f6299da88ad6c481a18fd2175;p=lttng-tools.git consumer: introduce channel lock Reviewed-by: Julien Desfossez Signed-off-by: Mathieu Desnoyers --- diff --git a/src/common/consumer-metadata-cache.c b/src/common/consumer-metadata-cache.c index 0c20e7fc7..c6e6c6b16 100644 --- a/src/common/consumer-metadata-cache.c +++ b/src/common/consumer-metadata-cache.c @@ -204,6 +204,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, cache = channel->metadata_cache; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->metadata_cache->lock); if (cache->rb_pushed >= offset) { @@ -224,6 +225,7 @@ int consumer_metadata_cache_flushed(struct lttng_consumer_channel *channel, } pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; diff --git a/src/common/consumer.c b/src/common/consumer.c index 560ec6cd0..b24f95003 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -292,6 +292,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) DBG("Consumer delete channel key %" PRIu64, channel->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); switch (consumer_data.type) { case LTTNG_CONSUMER_KERNEL: @@ -321,6 +322,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) call_rcu(&channel->node.head, free_channel_rcu); end: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); } @@ -651,6 +653,7 @@ static int add_stream(struct lttng_consumer_stream *stream, DBG3("Adding consumer stream %" PRIu64, stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); rcu_read_lock(); @@ -694,6 +697,7 @@ static int add_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; @@ -879,6 +883,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->output = output; channel->tracefile_size = tracefile_size; channel->tracefile_count = tracefile_count; + pthread_mutex_init(&channel->lock, NULL); strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -911,6 +916,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, struct lttng_ht_iter iter; pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); rcu_read_lock(); lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter); @@ -927,6 +933,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && @@ -1886,6 +1893,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); switch (consumer_data.type) { @@ -1980,6 +1988,7 @@ end: stream->chan->metadata_stream = NULL; pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -2008,6 +2017,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); /* @@ -2059,6 +2069,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream, rcu_read_unlock(); pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); return ret; } diff --git a/src/common/consumer.h b/src/common/consumer.h index a5a758be5..8751f234e 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -157,6 +157,15 @@ struct lttng_consumer_channel { /* On-disk circular buffer */ uint64_t tracefile_size; uint64_t tracefile_count; + /* + * Channel lock. + * + * This is nested INSIDE the consumer data lock. + * This is nested OUTSIDE the metadata cache lock. + * This is nested OUTSIDE stream lock. + * This is nested OUTSIDE consumer_relayd_sock_pair lock. + */ + pthread_mutex_t lock; }; /* @@ -228,6 +237,7 @@ struct lttng_consumer_stream { * * This is nested INSIDE the consumer_data lock. * This is nested INSIDE the metadata cache lock. + * This is nested INSIDE the channel lock. * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ pthread_mutex_t lock; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 106808722..739cebaef 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -653,6 +653,7 @@ static int close_metadata(uint64_t chan_key) } pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&channel->lock); if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; @@ -673,6 +674,7 @@ static int close_metadata(uint64_t chan_key) } error_unlock: + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); error: return ret; @@ -776,7 +778,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * and ultimately try to get rid of this global consumer data lock. */ pthread_mutex_lock(&consumer_data.lock); - + pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->metadata_cache->lock); ret = consumer_metadata_cache_write(channel, offset, len, metadata_str); if (ret < 0) { @@ -788,10 +790,12 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * waiting for the metadata cache to be flushed. */ pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); goto end_free; } pthread_mutex_unlock(&channel->metadata_cache->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); while (consumer_metadata_cache_flushed(channel, offset + len)) {