Fix: relayd refcount updates for stream
authorDavid Goulet <dgoulet@efficios.com>
Thu, 4 Jul 2013 20:25:27 +0000 (16:25 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 5 Jul 2013 18:26:02 +0000 (14:26 -0400)
Increment refcount only when the stream was successfully sent to the
relayd and set the new stream's flag "sent_to_relayd" which is used
before the refcount update when closing the relayd. A stream that was
unable to be sent, the close relayd code path does not decrement the
refcount anymore.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer-stream.c
src/common/consumer.c
src/common/consumer.h
src/common/ust-consumer/ust-consumer.c

index 03bac86870031eb2b63661f178f5cee0601748c6..723ec829f80095ff17bd3e5996540d01d754072a 100644 (file)
@@ -58,8 +58,10 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
        assert(stream);
        assert(relayd);
 
-       uatomic_dec(&relayd->refcount);
-       assert(uatomic_read(&relayd->refcount) >= 0);
+       if (stream->sent_to_relayd) {
+               uatomic_dec(&relayd->refcount);
+               assert(uatomic_read(&relayd->refcount) >= 0);
+       }
 
        /* Closing streams requires to lock the control socket. */
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
@@ -82,6 +84,7 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
                consumer_destroy_relayd(relayd);
        }
        stream->net_seq_idx = (uint64_t) -1ULL;
+       stream->sent_to_relayd = 0;
 }
 
 /*
index 3aafb519380b334099d584b83ca17d9b201ed6e1..910f386d932f5a71924bd637b4a5a467abcd6e2f 100644 (file)
@@ -540,7 +540,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        assert(ht);
@@ -566,12 +565,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
         */
        lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
-       /* Check and cleanup relayd */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -709,6 +702,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                        goto end;
                }
                uatomic_inc(&relayd->refcount);
+               stream->sent_to_relayd = 1;
        } else {
                ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
                                stream->key, stream->net_seq_idx);
@@ -1969,7 +1963,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
 
@@ -1996,12 +1989,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        node = lttng_ht_iter_get_node_u64(&iter);
        assert(!node);
 
-       /* Find relayd and, if one is found, increment refcount. */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
index a64bcad3bba01975a97609793379baab651dee97..23f2c9d840fdcde302ca0a9b4024bd9533d499da 100644 (file)
@@ -216,6 +216,14 @@ struct lttng_consumer_stream {
        gid_t gid;
        /* Network sequence number. Indicating on which relayd socket it goes. */
        uint64_t net_seq_idx;
+       /*
+        * Indicate if this stream was successfully sent to a relayd. This is set
+        * after the refcount of the relayd is incremented and is checked when the
+        * stream is closed before decrementing the refcount in order to avoid an
+        * unbalanced state.
+        */
+       unsigned int sent_to_relayd;
+
        /* Identify if the stream is the metadata */
        unsigned int metadata_flag;
        /* Used when the stream is set for network streaming */
index 4a7c6db1a69a5ac146f20e9a8b80978ba1f1b1fb..3133835be78091700eaa64e16e114626ca8327f2 100644 (file)
@@ -209,42 +209,6 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-/*
- * Search for a relayd object related to the stream. If found, send the stream
- * to the relayd.
- *
- * On success, returns 0 else a negative value.
- */
-static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
-{
-       int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               /* Add stream on the relayd */
-               ret = relayd_add_stream(&relayd->control_sock, stream->name,
-                               stream->chan->pathname, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto error;
-               }
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
-               ret = -1;
-               goto error;
-       }
-
-error:
-       return ret;
-}
-
 /*
  * Create streams for the given channel using liblttng-ust-ctl.
  *
@@ -411,7 +375,7 @@ static int send_sessiond_channel(int sock,
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
                /* Try to send the stream to the relayd if one is available. */
-               ret = send_stream_to_relayd(stream);
+               ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
                if (ret < 0) {
                        /*
                         * Flag that the relayd was the problem here probably due to a
@@ -737,7 +701,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
        }
 
        /* Send metadata stream to relayd if needed. */
-       ret = send_stream_to_relayd(metadata->metadata_stream);
+       ret = consumer_send_relayd_stream(metadata->metadata_stream,
+                       metadata->pathname);
        if (ret < 0) {
                ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
                goto error;
This page took 0.031297 seconds and 4 git commands to generate.