From: David Goulet Date: Thu, 4 Jul 2013 19:35:21 +0000 (-0400) Subject: Fix: add globally visible flag in stream X-Git-Tag: v2.3.0-rc1~68 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=4891ece8d4eeb2645efdf1467680b037c63ab425;p=lttng-tools.git Fix: add globally visible flag in stream This flag is added so we can destroy it correctly with the consumer-stream API. Signed-off-by: David Goulet --- diff --git a/src/common/consumer-stream.c b/src/common/consumer-stream.c index 027d75150..03bac8687 100644 --- a/src/common/consumer-stream.c +++ b/src/common/consumer-stream.c @@ -218,22 +218,13 @@ void consumer_stream_destroy_buffers(struct lttng_consumer_stream *stream) } /* - * Destroy a stream in no monitor mode. - * - * We need a separate function because this can be called inside a destroy - * channel path which have the consumer data lock acquired. Also, in no monitor - * mode, the channel refcount is NOT incremented per stream since the ownership - * of those streams are INSIDE the channel making the lazy destroy channel not - * possible for a non monitor stream. - * - * Furthermore, there is no need to delete the stream from the global hash - * table so we avoid useless calls. + * Destroy and close a already created stream. */ -static void destroy_no_monitor(struct lttng_consumer_stream *stream) +static void destroy_close_stream(struct lttng_consumer_stream *stream) { assert(stream); - DBG("Consumer stream destroy unmonitored key: %" PRIu64, stream->key); + DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key); /* Destroy tracer buffers of the stream. */ consumer_stream_destroy_buffers(stream); @@ -242,21 +233,24 @@ static void destroy_no_monitor(struct lttng_consumer_stream *stream) } /* - * Destroy a stream in monitor mode. + * Decrement the stream's channel refcount and if down to 0, return the channel + * pointer so it can be destroyed by the caller or NULL if not. */ -static void destroy_monitor(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +static struct lttng_consumer_channel *unref_channel( + struct lttng_consumer_stream *stream) { + struct lttng_consumer_channel *free_chan = NULL; + assert(stream); + assert(stream->chan); - DBG("Consumer stream destroy monitored key: %" PRIu64, stream->key); + /* Update refcount of channel and see if we need to destroy it. */ + if (!uatomic_sub_return(&stream->chan->refcount, 1) + && !uatomic_read(&stream->chan->nb_init_stream_left)) { + free_chan = stream->chan; + } - /* Remove every reference of the stream in the consumer. */ - consumer_stream_delete(stream, ht); - /* Destroy tracer buffers of the stream. */ - consumer_stream_destroy_buffers(stream); - /* Close down everything including the relayd if one. */ - consumer_stream_close(stream); + return free_chan; } /* @@ -273,35 +267,43 @@ void consumer_stream_destroy(struct lttng_consumer_stream *stream, assert(stream); /* Stream is in monitor mode. */ - if (stream->chan->monitor) { + if (stream->monitor) { struct lttng_consumer_channel *free_chan = NULL; - pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&stream->lock); - - destroy_monitor(stream, ht); - - /* Update refcount of channel and see if we need to destroy it. */ - if (!uatomic_sub_return(&stream->chan->refcount, 1) - && !uatomic_read(&stream->chan->nb_init_stream_left)) { - free_chan = stream->chan; + /* + * This means that the stream was successfully removed from the streams + * list of the channel and sent to the right thread managing this + * stream thus being globally visible. + */ + if (stream->globally_visible) { + pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->lock); + /* Remove every reference of the stream in the consumer. */ + consumer_stream_delete(stream, ht); + + destroy_close_stream(stream); + + /* Update channel's refcount of the stream. */ + free_chan = unref_channel(stream); + + /* Indicates that the consumer data state MUST be updated after this. */ + consumer_data.need_update = 1; + + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&consumer_data.lock); + } else { + /* + * If the stream is not visible globally, this needs to be done + * outside of the consumer data lock section. + */ + free_chan = unref_channel(stream); } - /* Indicates that the consumer data state MUST be updated after this. */ - consumer_data.need_update = 1; - - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&consumer_data.lock); - if (free_chan) { consumer_del_channel(free_chan); } } else { - /* - * No monitor mode the stream's ownership is in its channel thus we - * don't have to handle the channel refcount nor the lazy deletion. - */ - destroy_no_monitor(stream); + destroy_close_stream(stream); } /* Free stream within a RCU call. */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 30a95b48f..3aafb5193 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -465,7 +465,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t session_id, int cpu, int *alloc_ret, - enum consumer_channel_type type) + enum consumer_channel_type type, + unsigned int monitor) { int ret; struct lttng_consumer_stream *stream; @@ -487,6 +488,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->gid = gid; stream->net_seq_idx = relayd_id; stream->session_id = session_id; + stream->monitor = monitor; pthread_mutex_init(&stream->lock, NULL); /* If channel is the metadata, flag this stream as metadata. */ diff --git a/src/common/consumer.h b/src/common/consumer.h index 4cf6e910d..a64bcad3b 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -259,6 +259,18 @@ struct lttng_consumer_stream { /* On-disk circular buffer */ uint64_t tracefile_size_current; uint64_t tracefile_count_current; + /* + * Monitor or not the streams of this channel meaning this indicates if the + * streams should be sent to the data/metadata thread or added to the no + * monitor list of the channel. + */ + unsigned int monitor; + /* + * Indicate if the stream is globally visible meaning that it has been + * added to the multiple hash tables. If *not* set, NO lock should be + * acquired in the destroy path. + */ + unsigned int globally_visible; }; /* @@ -478,7 +490,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t session_id, int cpu, int *alloc_ret, - enum consumer_channel_type type); + enum consumer_channel_type type, + unsigned int monitor); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, const char *pathname, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 77132c9ae..bdf0eff7c 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -559,7 +559,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel->session_id, msg.u.stream.cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index ff4ba25b9..3330ff522 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -152,7 +152,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, channel->session_id, cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -559,6 +560,12 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, /* Remove node from the channel stream list. */ cds_list_del(&stream->send_node); + + /* + * From this point on, the stream's ownership has been moved away from + * the channel and becomes globally visible. + */ + stream->globally_visible = 1; } error: