Fix: data pending race
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 19 Jul 2013 17:28:13 +0000 (13:28 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 19 Jul 2013 17:28:13 +0000 (13:28 -0400)
There is a data pending race involving late population of the streams in
the stream hash table, and applying flush on streams that are not yet
globally visible.

This is caused by the fact that streams are added to the hash table only
when received by the data-handling consumer thread.

This results in data_pending() incorrectly returning that there is no
data pending in some cases.

This has been discovered by adding 1s delay in read subbuffer function
for testing.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 6cf85d96a75b0afa69994886d9a94d91a7faaa29..f5bc5c58aa8506f27b8dffc37a02172223222e54 100644 (file)
@@ -457,6 +457,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        consumer_stream_destroy(stream, ht);
 }
 
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, metadata_ht);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -539,9 +552,9 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-static int add_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = data_ht;
        int ret = 0;
 
        assert(stream);
@@ -596,6 +609,11 @@ static int add_stream(struct lttng_consumer_stream *stream,
        return ret;
 }
 
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+       consumer_del_stream(stream, data_ht);
+}
+
 /*
  * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
  * be acquired before calling this.
@@ -1982,9 +2000,9 @@ free_stream_rcu:
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = metadata_ht;
        int ret = 0;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
@@ -2206,14 +2224,6 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       ret = add_metadata_stream(stream, metadata_ht);
-                                       if (ret) {
-                                               ERR("Unable to add metadata stream");
-                                               /* Stream was not setup properly. Continuing. */
-                                               consumer_del_metadata_stream(stream, NULL);
-                                               continue;
-                                       }
-
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI);
@@ -2427,17 +2437,6 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = add_stream(new_stream, data_ht);
-                       if (ret) {
-                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               /*
-                                * At this point, if the add_stream fails, it is not in the
-                                * hash table thus passing the NULL value here.
-                                */
-                               consumer_del_stream(new_stream, NULL);
-                       }
-
                        /* Continue to update the local streams and handle prio ones */
                        continue;
                }
index bedc8885dbc97e6c67faa5f5d5a41917eabf7531..5021a103569ea28d18bcd9d071f2781704da078b 100644 (file)
@@ -620,5 +620,9 @@ void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
 unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
                unsigned long produced_pos, uint64_t max_stream_size);
+int consumer_add_data_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 
 #endif /* LIB_CONSUMER_H */
index 4f3e2871daeaa1c66faadaf64a754dccf0711b94..a94749635246a04f195cf7c0ff577ec9abae3dd1 100644 (file)
@@ -653,21 +653,40 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
+                       ret = consumer_add_metadata_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
+                       ret = consumer_add_data_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
+               /* Vitible to other threads */
+               new_stream->globally_visible = 1;
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_stream_free(new_stream);
+                       if (new_stream->metadata_flag) {
+                               consumer_del_stream_for_metadata(new_stream);
+                       } else {
+                               consumer_del_stream_for_data(new_stream);
+                       }
                        goto end_nosignal;
                }
-               /* Successfully sent to the right thread. */
-               new_stream->globally_visible = 1;
 
                DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64,
                                new_stream->name, fd, new_stream->relayd_stream_id);
index 5bcca4e17a3d94622f423d8a30e410e05473d741..336466a470bd453da46248ecb4e699341e942f94 100644 (file)
@@ -194,18 +194,41 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
 
        /* Get the right pipe where the stream will be sent. */
        if (stream->metadata_flag) {
+               ret = consumer_add_metadata_stream(stream);
+               if (ret) {
+                       ERR("Consumer add metadata stream %" PRIu64 " failed.",
+                                       stream->key);
+                       goto error;
+               }
                stream_pipe = ctx->consumer_metadata_pipe;
        } else {
+               ret = consumer_add_data_stream(stream);
+               if (ret) {
+                       ERR("Consumer add stream %" PRIu64 " failed.",
+                                       stream->key);
+                       goto error;
+               }
                stream_pipe = ctx->consumer_data_pipe;
        }
 
+       /*
+        * From this point on, the stream's ownership has been moved away from
+        * the channel and becomes globally visible.
+        */
+       stream->globally_visible = 1;
+
        ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
        if (ret < 0) {
                ERR("Consumer write %s stream to pipe %d",
                                stream->metadata_flag ? "metadata" : "data",
                                lttng_pipe_get_writefd(stream_pipe));
+               if (stream->metadata_flag) {
+                       consumer_del_stream_for_metadata(stream);
+               } else {
+                       consumer_del_stream_for_data(stream);
+               }
        }
-
+error:
        return ret;
 }
 
@@ -533,17 +556,14 @@ static int send_streams_to_thread(struct lttng_consumer_channel *channel,
                         * If we are unable to send the stream to the thread, there is
                         * a big problem so just stop everything.
                         */
+                       /* Remove node from the channel stream list. */
+                       cds_list_del(&stream->send_node);
                        goto error;
                }
 
                /* Remove node from the channel stream list. */
                cds_list_del(&stream->send_node);
 
-               /*
-                * From this point on, the stream's ownership has been moved away from
-                * the channel and becomes globally visible.
-                */
-               stream->globally_visible = 1;
        }
 
 error:
This page took 0.032017 seconds and 4 git commands to generate.