Fix: clear the CTF traces when all the streams are closed
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 12 Dec 2013 21:37:03 +0000 (16:37 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Mon, 16 Dec 2013 20:20:10 +0000 (15:20 -0500)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/bin/lttng-relayd/ctf-trace.h
src/bin/lttng-relayd/live.c
src/bin/lttng-relayd/main.c

index 6e39af02b936273447bf69379f7169e90c4a6971..66f7a15684b03d7ec0063f1702462bdf3cc03261 100644 (file)
@@ -31,6 +31,7 @@ struct ctf_trace {
        uint64_t metadata_received;
        uint64_t metadata_sent;
        struct relay_stream *metadata_stream;
+       struct relay_viewer_stream *viewer_metadata_stream;
 };
 
 void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream);
index c1f0ff050c575aee5d9b9cfc88371e482ac5c6c7..c312e78e976e89bb6949e0aee54cf48e2bdca99c 100644 (file)
@@ -707,11 +707,11 @@ int init_viewer_stream(struct relay_stream *stream, int seek_last)
                        stream->oldest_tracefile_id;
        }
 
-       /*
-        * The deletion of this ctf_trace object is only done in a call RCU of the
-        * relay stream making it valid as long as we have the read side lock.
-        */
        viewer_stream->ctf_trace = stream->ctf_trace;
+       if (viewer_stream->metadata_flag) {
+               viewer_stream->ctf_trace->viewer_metadata_stream =
+                       viewer_stream;
+       }
        uatomic_inc(&viewer_stream->ctf_trace->refcount);
 
        lttng_ht_node_init_u64(&viewer_stream->stream_n, stream->stream_handle);
@@ -992,10 +992,10 @@ send_reply:
        health_code_update();
 
        /*
-        * Unknown or busy session, just return gracefully, the viewer knows what
+        * Unknown or empty session, just return gracefully, the viewer knows what
         * is happening.
         */
-       if (!send_streams) {
+       if (!send_streams || !nb_streams) {
                ret = 0;
                goto end_unlock;
        }
@@ -1063,6 +1063,72 @@ end:
        return stream;
 }
 
+static
+void deferred_free_viewer_stream(struct rcu_head *head)
+{
+       struct relay_viewer_stream *stream =
+               caa_container_of(head, struct relay_viewer_stream, rcu_node);
+
+       free(stream->path_name);
+       free(stream->channel_name);
+       free(stream);
+}
+
+static
+void delete_viewer_stream(struct relay_viewer_stream *vstream)
+{
+       int delret;
+       struct lttng_ht_iter iter;
+
+       iter.iter.node = &vstream->stream_n.node;
+       delret = lttng_ht_del(viewer_streams_ht, &iter);
+       assert(!delret);
+}
+
+static
+void destroy_viewer_stream(struct relay_viewer_stream *vstream)
+{
+       unsigned long ret_ref;
+       int ret;
+
+       assert(vstream);
+       ret_ref = uatomic_add_return(&vstream->ctf_trace->refcount, -1);
+       assert(ret_ref >= 0);
+
+       if (vstream->read_fd >= 0) {
+               ret = close(vstream->read_fd);
+               if (ret < 0) {
+                       PERROR("close read_fd");
+               }
+       }
+       if (vstream->index_read_fd >= 0) {
+               ret = close(vstream->index_read_fd);
+               if (ret < 0) {
+                       PERROR("close index_read_fd");
+               }
+       }
+
+       /*
+        * If the only stream left in the HT is the metadata stream,
+        * we need to remove it because we won't detect a EOF for this
+        * stream.
+        */
+       if (ret_ref == 1 && vstream->ctf_trace->metadata_stream) {
+               destroy_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
+               vstream->ctf_trace->metadata_stream = NULL;
+               DBG("Freeing ctf_trace %" PRIu64, vstream->ctf_trace->id);
+               /*
+                * The streaming-side is already closed and we can't receive a new
+                * stream concurrently at this point (since the session is being
+                * destroyed), so when we detect the refcount equals 0, we are the
+                * only owners of the ctf_trace and we can free it ourself.
+                */
+               free(vstream->ctf_trace);
+       }
+
+       call_rcu(&vstream->rcu_node, deferred_free_viewer_stream);
+}
+
 /*
  * Send the next index for a stream.
  *
@@ -1143,6 +1209,8 @@ int viewer_get_next_index(struct relay_command *cmd,
                                goto end_unlock;
                        } else if (ret == 1) {
                                viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                               delete_viewer_stream(vstream);
+                               destroy_viewer_stream(vstream);
                                goto send_reply;
                        }
                }
@@ -1172,6 +1240,8 @@ int viewer_get_next_index(struct relay_command *cmd,
                        vstream->total_index_received == vstream->last_sent_index) {
                /* Last index sent and current tracefile closed in write */
                viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+               delete_viewer_stream(vstream);
+               destroy_viewer_stream(vstream);
                goto send_reply;
        } else {
                vstream->close_write_flag = 1;
@@ -1196,6 +1266,8 @@ int viewer_get_next_index(struct relay_command *cmd,
                        goto end_unlock;
                } else if (ret == 1) {
                        viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                       delete_viewer_stream(vstream);
+                       destroy_viewer_stream(vstream);
                        goto send_reply;
                }
                goto send_reply;
@@ -1215,6 +1287,8 @@ int viewer_get_next_index(struct relay_command *cmd,
                                goto end_unlock;
                        } else if (ret == 1) {
                                viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+                               delete_viewer_stream(vstream);
+                               destroy_viewer_stream(vstream);
                                goto send_reply;
                        }
                } else {
@@ -1664,66 +1738,38 @@ void deferred_free_connection(struct rcu_head *head)
        free(relay_connection);
 }
 
-static
-void deferred_free_viewer_stream(struct rcu_head *head)
-{
-       struct relay_viewer_stream *stream =
-               caa_container_of(head, struct relay_viewer_stream, rcu_node);
-
-       if (stream->ctf_trace) {
-               uatomic_dec(&stream->ctf_trace->refcount);
-               assert(uatomic_read(&stream->ctf_trace->refcount) >= 0);
-               if (uatomic_read(&stream->ctf_trace->refcount) == 0) {
-                       DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id);
-                       free(stream->ctf_trace);
-               }
-       }
-
-       free(stream->path_name);
-       free(stream->channel_name);
-       free(stream);
-}
-
+/*
+ * Delete all streams for a specific session ID.
+ */
 static
 void viewer_del_streams(uint64_t session_id)
 {
-       int ret;
        struct relay_viewer_stream *stream;
-       struct lttng_ht_node_u64 *node;
        struct lttng_ht_iter iter;
 
        rcu_read_lock();
-       cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, node, node) {
+       cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream,
+                       stream_n.node) {
                health_code_update();
 
-               node = lttng_ht_iter_get_node_u64(&iter);
-               if (!node) {
-                       continue;
-               }
-
-               stream = caa_container_of(node, struct relay_viewer_stream, stream_n);
                if (stream->session_id != session_id) {
                        continue;
                }
 
-               if (stream->read_fd >= 0) {
-                       ret = close(stream->read_fd);
-                       if (ret < 0) {
-                               PERROR("close read_fd");
-                       }
-               }
-               if (stream->index_read_fd >= 0) {
-                       ret = close(stream->index_read_fd);
-                       if (ret < 0) {
-                               PERROR("close index_read_fd");
-                       }
-               }
-               if (stream->metadata_flag && stream->ctf_trace) {
+               delete_viewer_stream(stream);
+               assert(stream->ctf_trace);
+
+               if (stream->metadata_flag) {
+                       /*
+                        * The metadata viewer stream is destroyed once the refcount on the
+                        * ctf trace goes to 0 in the destroy stream function thus there is
+                        * no explicit call to that function here.
+                        */
                        stream->ctf_trace->metadata_sent = 0;
+                       stream->ctf_trace->viewer_metadata_stream = NULL;
+               } else {
+                       destroy_viewer_stream(stream);
                }
-               ret = lttng_ht_del(viewer_streams_ht, &iter);
-               assert(!ret);
-               call_rcu(&stream->rcu_node, deferred_free_viewer_stream);
        }
        rcu_read_unlock();
 }
@@ -1743,6 +1789,9 @@ void del_connection(struct lttng_ht *relay_connections_ht,
        assert(iter);
        assert(relay_connection);
 
+       DBG("Cleaning connection of session ID %" PRIu64,
+                       relay_connection->session_id);
+
        ret = lttng_ht_del(relay_connections_ht, iter);
        assert(!ret);
 
index b8e2f72e6706bc194be2d095ddca89f65abb0dca..a6b408a90740943138e43ad69160d34a5395baa6 100644 (file)
@@ -800,8 +800,6 @@ void deferred_free_stream(struct rcu_head *head)
        struct relay_stream *stream =
                caa_container_of(head, struct relay_stream, rcu_node);
 
-       ctf_trace_try_destroy(stream->ctf_trace);
-
        free(stream->path_name);
        free(stream->channel_name);
        free(stream);
@@ -865,6 +863,11 @@ static void destroy_stream(struct relay_stream *stream)
        iter.iter.node = &stream->ctf_trace_node.node;
        delret = lttng_ht_del(stream->ctf_traces_ht, &iter);
        assert(!delret);
+
+       if (stream->ctf_trace) {
+               ctf_trace_try_destroy(stream->ctf_trace);
+       }
+
        call_rcu(&stream->rcu_node, deferred_free_stream);
        DBG("Closed tracefile %d from close stream", stream->fd);
 }
This page took 0.029712 seconds and 4 git commands to generate.