}
rcu_read_unlock();
- if (!--stream->chan->refcount) {
+ uatomic_dec(&stream->chan->refcount);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
free_chan = stream->chan;
}
-
call_rcu(&stream->node.head, consumer_free_stream);
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
- if (free_chan)
+ if (free_chan) {
consumer_del_channel(free_chan);
+ }
}
struct lttng_consumer_stream *consumer_allocate_stream(
assert(0);
goto end;
}
- DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
- stream->path_name, stream->key,
- stream->shm_fd,
- stream->wait_fd,
- (unsigned long long) stream->mmap_len,
- stream->out_fd,
+
+ /*
+ * When nb_init_streams reaches 0, we don't need to trigger any action in
+ * terms of destroying the associated channel, because the action that
+ * causes the count to become 0 also causes a stream to be added. The
+ * channel deletion will thus be triggered by the following removal of this
+ * stream.
+ */
+ if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+ uatomic_dec(&stream->chan->nb_init_streams);
+ }
+
+ DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
+ " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
+ stream->shm_fd, stream->wait_fd,
+ (unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx);
+
end:
return stream;
}
int channel_key,
int shm_fd, int wait_fd,
uint64_t mmap_len,
- uint64_t max_sb_size)
+ uint64_t max_sb_size,
+ unsigned int nb_init_streams)
{
struct lttng_consumer_channel *channel;
int ret;
channel->mmap_len = mmap_len;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
+ channel->nb_init_streams = nb_init_streams;
lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
{
int ret;
- struct lttng_consumer_channel *free_chan = NULL;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)) {
- free_chan = stream->chan;
- }
-
- if (free_chan) {
- consumer_del_channel(free_chan);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
+ /* Go for channel deletion! */
+ consumer_del_channel(stream->chan);
}
free(stream);