From: Mathieu Desnoyers Date: Fri, 19 Jul 2013 17:28:13 +0000 (-0400) Subject: Fix: data pending race X-Git-Tag: v2.2.3~5 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=155e9bb8c15b27fbbbb74d1eb69fbf61aaa55af7;p=lttng-tools.git Fix: data pending race There is a data pending race involving late population of the streams in the stream hash table, and applying flush on streams that are not yet globally visible. This is caused by the fact that streams are added to the hash table only when received by the data-handling consumer thread. This results in data_pending() incorrectly returning that there is no data pending in some cases. This has been discovered by adding 1s delay in read subbuffer function for testing. Signed-off-by: Mathieu Desnoyers --- diff --git a/src/common/consumer.c b/src/common/consumer.c index ddfca408b..4a1e6d23e 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -564,6 +564,19 @@ free_stream_rcu: call_rcu(&stream->node.head, free_stream_rcu); } +/* + * XXX naming of del vs destroy is all mixed up. + */ +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, metadata_ht); +} + struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, uint64_t stream_key, enum lttng_consumer_stream_state state, @@ -642,9 +655,9 @@ end: /* * Add a stream to the global list protected by a mutex. */ -static int add_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_data_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = data_ht; int ret = 0; struct consumer_relayd_sock_pair *relayd; @@ -706,6 +719,11 @@ static int add_stream(struct lttng_consumer_stream *stream, return ret; } +void consumer_del_data_stream(struct lttng_consumer_stream *stream) +{ + consumer_del_stream(stream, data_ht); +} + /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -2009,9 +2027,9 @@ free_stream_rcu: * Action done with the metadata stream when adding it to the consumer internal * data structures to handle it. */ -static int add_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht) +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) { + struct lttng_ht *ht = metadata_ht; int ret = 0; struct consumer_relayd_sock_pair *relayd; struct lttng_ht_iter iter; @@ -2240,14 +2258,6 @@ restart: DBG("Adding metadata stream %d to poll set", stream->wait_fd); - ret = add_metadata_stream(stream, metadata_ht); - if (ret) { - ERR("Unable to add metadata stream"); - /* Stream was not setup properly. Continuing. */ - consumer_del_metadata_stream(stream, NULL); - continue; - } - /* Add metadata stream to the global poll events list */ lttng_poll_add(&events, stream->wait_fd, LPOLLIN | LPOLLPRI); @@ -2454,17 +2464,6 @@ void *consumer_thread_data_poll(void *data) continue; } - ret = add_stream(new_stream, data_ht); - if (ret) { - ERR("Consumer add stream %" PRIu64 " failed. Continuing", - new_stream->key); - /* - * At this point, if the add_stream fails, it is not in the - * hash table thus passing the NULL value here. - */ - consumer_del_stream(new_stream, NULL); - } - /* Continue to update the local streams and handle prio ones */ continue; } diff --git a/src/common/consumer.h b/src/common/consumer.h index a0b2a7efe..bd7304735 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -565,5 +565,9 @@ int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel); void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key); +int consumer_add_data_stream(struct lttng_consumer_stream *stream); +void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); +int consumer_add_metadata_stream(struct lttng_consumer_stream *stream); +void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 5712144f7..1a44ce152 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -319,8 +319,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; + } stream_pipe = ctx->consumer_data_pipe; } @@ -329,7 +343,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_del_stream(new_stream, NULL); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index b5dfa5e74..68d467633 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -192,8 +192,20 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, /* Get the right pipe where the stream will be sent. */ if (stream->metadata_flag) { + ret = consumer_add_metadata_stream(stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed.", + stream->key); + goto error; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed.", + stream->key); + goto error; + } stream_pipe = ctx->consumer_data_pipe; } @@ -202,8 +214,13 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, ERR("Consumer write %s stream to pipe %d", stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); + if (stream->metadata_flag) { + consumer_del_stream_for_metadata(stream); + } else { + consumer_del_stream_for_data(stream); + } } - +error: return ret; } @@ -542,6 +559,8 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel, * If we are unable to send the stream to the thread, there is * a big problem so just stop everything. */ + /* Remove node from the channel stream list. */ + cds_list_del(&stream->send_node); goto error; }