From: Mathieu Desnoyers Date: Fri, 28 Sep 2012 19:39:42 +0000 (-0400) Subject: Fix: consumer should await for initial streams X-Git-Tag: v2.1.0-rc5~43 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=c30aaa51f34105a7f20b9ceb39866001843db6e6;p=lttng-tools.git Fix: consumer should await for initial streams lttng-sessiond need to let the consumer know how many streams are sent initially, so that for very short traces (short-lived apps, short kernel trace), the consumerd don't run into the scenario where it deletes the channel when there are still pending streams to receive for this channel. Fixes #355 Signed-off-by: Mathieu Desnoyers Signed-off-by: David Goulet --- diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index b69df16fd..d33f85f1b 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -486,7 +486,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, int channel_key, uint64_t max_sb_size, uint64_t mmap_len, - const char *name) + const char *name, + unsigned int nb_init_streams) { assert(msg); @@ -500,6 +501,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.channel_key = channel_key; msg->u.channel.max_sb_size = max_sb_size; msg->u.channel.mmap_len = mmap_len; + msg->u.channel.nb_init_streams = nb_init_streams; } /* diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 5e8ad9b9e..ec4ef3f31 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -195,6 +195,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, int channel_key, uint64_t max_sb_size, uint64_t mmap_len, - const char *name); + const char *name, + unsigned int nb_init_streams); #endif /* _CONSUMER_H */ diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 825121382..33cbbed3e 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -48,7 +48,8 @@ int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel) channel->fd, channel->channel->attr.subbuf_size, 0, /* Kernel */ - channel->channel->name); + channel->channel->name, + channel->stream_count); ret = consumer_send_channel(sock, &lkm); if (ret < 0) { @@ -116,7 +117,8 @@ int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session) session->metadata->fd, session->metadata->conf->attr.subbuf_size, 0, /* for kernel */ - "metadata"); + "metadata", + 1); ret = consumer_send_channel(sock, &lkm); if (ret < 0) { diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 3202cd4e0..fc8728dd2 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -2239,7 +2239,8 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) /* Order is important */ cds_list_add_tail(&ustream->list, &ua_chan->streams.head); ret = snprintf(ustream->name, sizeof(ustream->name), "%s_%u", - ua_chan->name, ua_chan->streams.count++); + ua_chan->name, ua_chan->streams.count); + ua_chan->streams.count++; if (ret < 0) { PERROR("asprintf UST create stream"); /* diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index aabe49403..44913cb8f 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -52,7 +52,8 @@ static int send_channel(int sock, struct ust_app_channel *uchan) uchan->obj->shm_fd, uchan->attr.subbuf_size, uchan->obj->memory_map_size, - uchan->name); + uchan->name, + uchan->streams.count); ret = consumer_send_channel(sock, &msg); if (ret < 0) { @@ -208,7 +209,8 @@ static int send_metadata(int sock, struct ust_app_session *usess, usess->metadata->obj->shm_fd, usess->metadata->attr.subbuf_size, usess->metadata->obj->memory_map_size, - "metadata"); + "metadata", + 1); ret = consumer_send_channel(sock, &msg); if (ret < 0) { diff --git a/src/common/consumer.c b/src/common/consumer.c index a2980e77d..f4eaf705f 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -319,18 +319,20 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) } rcu_read_unlock(); - if (!--stream->chan->refcount) { + uatomic_dec(&stream->chan->refcount); + if (!uatomic_read(&stream->chan->refcount) + && !uatomic_read(&stream->chan->nb_init_streams)) { free_chan = stream->chan; } - call_rcu(&stream->node.head, consumer_free_stream); end: consumer_data.need_update = 1; pthread_mutex_unlock(&consumer_data.lock); - if (free_chan) + if (free_chan) { consumer_del_channel(free_chan); + } } struct lttng_consumer_stream *consumer_allocate_stream( @@ -394,13 +396,24 @@ struct lttng_consumer_stream *consumer_allocate_stream( assert(0); goto end; } - DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)", - stream->path_name, stream->key, - stream->shm_fd, - stream->wait_fd, - (unsigned long long) stream->mmap_len, - stream->out_fd, + + /* + * When nb_init_streams reaches 0, we don't need to trigger any action in + * terms of destroying the associated channel, because the action that + * causes the count to become 0 also causes a stream to be added. The + * channel deletion will thus be triggered by the following removal of this + * stream. + */ + if (uatomic_read(&stream->chan->nb_init_streams) > 0) { + uatomic_dec(&stream->chan->nb_init_streams); + } + + DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu," + " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key, + stream->shm_fd, stream->wait_fd, + (unsigned long long) stream->mmap_len, stream->out_fd, stream->net_seq_idx); + end: return stream; } @@ -671,7 +684,8 @@ struct lttng_consumer_channel *consumer_allocate_channel( int channel_key, int shm_fd, int wait_fd, uint64_t mmap_len, - uint64_t max_sb_size) + uint64_t max_sb_size, + unsigned int nb_init_streams) { struct lttng_consumer_channel *channel; int ret; @@ -687,6 +701,7 @@ struct lttng_consumer_channel *consumer_allocate_channel( channel->mmap_len = mmap_len; channel->max_sb_size = max_sb_size; channel->refcount = 0; + channel->nb_init_streams = nb_init_streams; lttng_ht_node_init_ulong(&channel->node, channel->key); switch (consumer_data.type) { @@ -1520,7 +1535,6 @@ static void destroy_stream_ht(struct lttng_ht *ht) static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream) { int ret; - struct lttng_consumer_channel *free_chan = NULL; struct consumer_relayd_sock_pair *relayd; assert(stream); @@ -1602,12 +1616,10 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream) /* Atomically decrement channel refcount since other threads can use it. */ uatomic_dec(&stream->chan->refcount); - if (!uatomic_read(&stream->chan->refcount)) { - free_chan = stream->chan; - } - - if (free_chan) { - consumer_del_channel(free_chan); + if (!uatomic_read(&stream->chan->refcount) + && !uatomic_read(&stream->chan->nb_init_streams)) { + /* Go for channel deletion! */ + consumer_del_channel(stream->chan); } free(stream); diff --git a/src/common/consumer.h b/src/common/consumer.h index dba776577..0f82a1086 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -77,6 +77,12 @@ struct lttng_consumer_channel { int key; uint64_t max_sb_size; /* the subbuffer size for this channel */ int refcount; /* Number of streams referencing this channel */ + /* + * The number of streams to receive initially. Used to guarantee that we do + * not destroy a channel before receiving all its associated streams. + */ + unsigned int nb_init_streams; + /* For UST */ int shm_fd; int wait_fd; @@ -342,7 +348,8 @@ extern struct lttng_consumer_channel *consumer_allocate_channel( int channel_key, int shm_fd, int wait_fd, uint64_t mmap_len, - uint64_t max_sb_size); + uint64_t max_sb_size, + unsigned int nb_init_streams); int consumer_add_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f910f033d..5a219fc0b 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -118,7 +118,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, -1, -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 62205f4c9..6d796efe4 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -255,6 +255,8 @@ struct lttcomm_consumer_msg { uint64_t max_sb_size; /* the subbuffer size for this channel */ /* shm_fd and wait_fd are sent as ancillary data */ uint64_t mmap_len; + /* nb_init_streams is the number of streams open initially. */ + unsigned int nb_init_streams; char name[LTTNG_SYMBOL_NAME_LEN]; } channel; struct { diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 8ab2b819d..ad4b014c6 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -150,7 +150,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, fds[0], -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal;