Fix: Stream allocation and insertion consistency
authorDavid Goulet <dgoulet@efficios.com>
Wed, 3 Oct 2012 15:22:50 +0000 (11:22 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Tue, 9 Oct 2012 15:20:49 +0000 (11:20 -0400)
This commit restores the consistency between the allocate and insertion
operations on streams. The allocate stream function was changing the
channel state by updating the refcount, cpucount and number of init
stream. We now moved these operations into the add_stream function so
that the initialization is done there and the del_stream handles the
cleanup.

So basically, any side effect done on a channel using a stream is now
done in the add/del functions. The same was done for the metadata which
is a special case that does not need to set the need update flag.

Furthermore, the consumer_del_stream now can destroy a stream even if
that stream was not successfully added to its hash table. The kernel and
UST consumers now use it on error between allocation and the add_stream
function.

This refactoring fixes memory leaks, bad refcount values and file
descriptor leaks. Also, the metadata destroy stream function was also
fixed to use the waitfd_node which is also fixed in the
consumer_del_metadata_stream that was deleting the wrong node pointer.

The waitfd_node fixes were merged with this commit in order to make the
whole patch works and not failed on make check.

Acked-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index 161bf7e324ba38abcc6976f91f219c9f2257d4af..242b05b3d6bb9c65939ea72a900a84fa74d382b7 100644 (file)
@@ -161,6 +161,17 @@ void consumer_free_stream(struct rcu_head *head)
        free(stream);
 }
 
+static
+void consumer_free_metadata_stream(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_consumer_stream *stream =
+               caa_container_of(node, struct lttng_consumer_stream, waitfd_node);
+
+       free(stream);
+}
+
 /*
  * RCU protected relayd socket pair free.
  */
@@ -230,7 +241,8 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
  * Remove a stream from the global list protected by a mutex. This
  * function is also responsible for freeing its data structures.
  */
-void consumer_del_stream(struct lttng_consumer_stream *stream)
+void consumer_del_stream(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -239,6 +251,11 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
+       if (ht == NULL) {
+               /* Means the stream was allocated but not successfully added */
+               goto free_stream;
+       }
+
        pthread_mutex_lock(&consumer_data.lock);
 
        switch (consumer_data.type) {
@@ -262,7 +279,7 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
 
        rcu_read_lock();
        iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(consumer_data.stream_ht, &iter);
+       ret = lttng_ht_del(ht, &iter);
        assert(!ret);
 
        rcu_read_unlock();
@@ -329,7 +346,6 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                free_chan = stream->chan;
        }
 
-       call_rcu(&stream->node.head, consumer_free_stream);
 end:
        consumer_data.need_update = 1;
        pthread_mutex_unlock(&consumer_data.lock);
@@ -337,6 +353,9 @@ end:
        if (free_chan) {
                consumer_del_channel(free_chan);
        }
+
+free_stream:
+       call_rcu(&stream->node.head, consumer_free_stream);
 }
 
 struct lttng_consumer_stream *consumer_allocate_stream(
@@ -353,7 +372,6 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                int *alloc_ret)
 {
        struct lttng_consumer_stream *stream;
-       int ret;
 
        stream = zmalloc(sizeof(*stream));
        if (stream == NULL) {
@@ -372,7 +390,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(
                ERR("Unable to find channel for stream %d", stream_key);
                goto error;
        }
-       stream->chan->refcount++;
+
        stream->key = stream_key;
        stream->shm_fd = shm_fd;
        stream->wait_fd = wait_fd;
@@ -388,37 +406,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        stream->metadata_flag = metadata_flag;
        strncpy(stream->path_name, path_name, sizeof(stream->path_name));
        stream->path_name[sizeof(stream->path_name) - 1] = '\0';
-       lttng_ht_node_init_ulong(&stream->node, stream->key);
        lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
-
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               stream->cpu = stream->chan->cpucount++;
-               ret = lttng_ustconsumer_allocate_stream(stream);
-               if (ret) {
-                       *alloc_ret = -EINVAL;
-                       goto error;
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               *alloc_ret = -EINVAL;
-               goto error;
-       }
-
-       /*
-        * When nb_init_streams reaches 0, we don't need to trigger any action in
-        * terms of destroying the associated channel, because the action that
-        * causes the count to become 0 also causes a stream to be added. The
-        * channel deletion will thus be triggered by the following removal of this
-        * stream.
-        */
-       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
-               uatomic_dec(&stream->chan->nb_init_streams);
-       }
+       lttng_ht_node_init_ulong(&stream->node, stream->key);
 
        DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
                        " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
@@ -439,22 +428,35 @@ end:
 int consumer_add_stream(struct lttng_consumer_stream *stream)
 {
        int ret = 0;
-       struct lttng_ht_node_ulong *node;
-       struct lttng_ht_iter iter;
        struct consumer_relayd_sock_pair *relayd;
 
-       pthread_mutex_lock(&consumer_data.lock);
-       /* Steal stream identifier, for UST */
-       consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
+       assert(stream);
 
+       DBG3("Adding consumer stream %d", stream->key);
+
+       pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
-       lttng_ht_lookup(consumer_data.stream_ht,
-                       (void *)((unsigned long) stream->key), &iter);
-       node = lttng_ht_iter_get_node_ulong(&iter);
-       if (node != NULL) {
-               rcu_read_unlock();
-               /* Stream already exist. Ignore the insertion */
-               goto end;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               stream->cpu = stream->chan->cpucount++;
+               ret = lttng_ustconsumer_add_stream(stream);
+               if (ret) {
+                       ret = -EINVAL;
+                       goto error;
+               }
+
+               /* Steal stream identifier only for UST */
+               consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               ret = -ENOSYS;
+               goto error;
        }
 
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
@@ -464,13 +466,27 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        if (relayd != NULL) {
                uatomic_inc(&relayd->refcount);
        }
-       rcu_read_unlock();
 
-       /* Update consumer data */
+       /* Update channel refcount once added without error(s). */
+       uatomic_inc(&stream->chan->refcount);
+
+       /*
+        * When nb_init_streams reaches 0, we don't need to trigger any action in
+        * terms of destroying the associated channel, because the action that
+        * causes the count to become 0 also causes a stream to be added. The
+        * channel deletion will thus be triggered by the following removal of this
+        * stream.
+        */
+       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+               uatomic_dec(&stream->chan->nb_init_streams);
+       }
+
+       /* Update consumer data once the node is inserted. */
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
-end:
+error:
+       rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -896,7 +912,7 @@ void lttng_consumer_cleanup(void)
                        node) {
                struct lttng_consumer_stream *stream =
                        caa_container_of(node, struct lttng_consumer_stream, node);
-               consumer_del_stream(stream);
+               consumer_del_stream(stream, consumer_data.stream_ht);
        }
 
        cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
@@ -1519,6 +1535,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
 /*
  * Iterate over all streams of the hashtable and free them properly.
+ *
+ * XXX: Should not be only for metadata stream or else use an other name.
  */
 static void destroy_stream_ht(struct lttng_ht *ht)
 {
@@ -1531,11 +1549,11 @@ static void destroy_stream_ht(struct lttng_ht *ht)
        }
 
        rcu_read_lock();
-       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+       cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, waitfd_node.node) {
                ret = lttng_ht_del(ht, &iter);
                assert(!ret);
 
-               call_rcu(&stream->node.head, consumer_free_stream);
+               call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream);
        }
        rcu_read_unlock();
 
@@ -1545,9 +1563,12 @@ static void destroy_stream_ht(struct lttng_ht *ht)
 /*
  * Clean up a metadata stream and free its memory.
  */
-static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
+void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
 {
        int ret;
+       struct lttng_ht_iter iter;
+       struct lttng_consumer_channel *free_chan = NULL;
        struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
@@ -1557,6 +1578,19 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
         */
        assert(stream->metadata_flag);
 
+       DBG3("Consumer delete metadata stream %d", stream->wait_fd);
+
+       if (ht == NULL) {
+               /* Means the stream was allocated but not successfully added */
+               goto free_stream;
+       }
+
+       rcu_read_lock();
+       iter.iter.node = &stream->waitfd_node.node;
+       ret = lttng_ht_del(ht, &iter);
+       assert(!ret);
+       rcu_read_unlock();
+
        pthread_mutex_lock(&consumer_data.lock);
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1574,8 +1608,8 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
        default:
                ERR("Unknown consumer_data type");
                assert(0);
+               goto end;
        }
-       pthread_mutex_unlock(&consumer_data.lock);
 
        if (stream->out_fd >= 0) {
                ret = close(stream->out_fd);
@@ -1632,27 +1666,90 @@ static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
        if (!uatomic_read(&stream->chan->refcount)
                        && !uatomic_read(&stream->chan->nb_init_streams)) {
                /* Go for channel deletion! */
-               consumer_del_channel(stream->chan);
+               free_chan = stream->chan;
        }
 
-       call_rcu(&stream->node.head, consumer_free_stream);
+end:
+       pthread_mutex_unlock(&consumer_data.lock);
+
+       if (free_chan) {
+               consumer_del_channel(free_chan);
+       }
+
+free_stream:
+       call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream);
 }
 
 /*
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static void consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
+static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht)
 {
+       int ret = 0;
        struct consumer_relayd_sock_pair *relayd;
 
-       /* Find relayd and, if one is found, increment refcount. */
+       assert(stream);
+       assert(ht);
+
+       DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
+
+       pthread_mutex_lock(&consumer_data.lock);
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               ret = lttng_ustconsumer_add_stream(stream);
+               if (ret) {
+                       ret = -EINVAL;
+                       goto error;
+               }
+
+               /* Steal stream identifier only for UST */
+               consumer_steal_stream_key(stream->wait_fd, ht);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               assert(0);
+               ret = -ENOSYS;
+               goto error;
+       }
+
+       /*
+        * From here, refcounts are updated so be _careful_ when returning an error
+        * after this point.
+        */
+
        rcu_read_lock();
+       /* Find relayd and, if one is found, increment refcount. */
        relayd = consumer_find_relayd(stream->net_seq_idx);
        if (relayd != NULL) {
                uatomic_inc(&relayd->refcount);
        }
+
+       /* Update channel refcount once added without error(s). */
+       uatomic_inc(&stream->chan->refcount);
+
+       /*
+        * When nb_init_streams reaches 0, we don't need to trigger any action in
+        * terms of destroying the associated channel, because the action that
+        * causes the count to become 0 also causes a stream to be added. The
+        * channel deletion will thus be triggered by the following removal of this
+        * stream.
+        */
+       if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+               uatomic_dec(&stream->chan->nb_init_streams);
+       }
+
+       lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
        rcu_read_unlock();
+
+error:
+       pthread_mutex_unlock(&consumer_data.lock);
+       return ret;
 }
 
 /*
@@ -1663,7 +1760,7 @@ void *lttng_consumer_thread_poll_metadata(void *data)
 {
        int ret, i, pollfd;
        uint32_t revents, nb_fd;
-       struct lttng_consumer_stream *stream;
+       struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_ulong *node;
        struct lttng_ht *metadata_ht = NULL;
@@ -1711,16 +1808,22 @@ restart:
                DBG("Metadata event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
+                               ERR("Poll EINTR catched");
                                goto restart;
                        }
                        goto error;
                }
 
+               /* From here, the event is a metadata wait fd */
                for (i = 0; i < nb_fd; i++) {
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Check the metadata pipe for incoming metadata. */
+                       /* Just don't waste time if no returned events for the fd */
+                       if (!revents) {
+                               continue;
+                       }
+
                        if (pollfd == ctx->consumer_metadata_pipe[0]) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -1749,59 +1852,35 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       rcu_read_lock();
-                                       /* The node should be init at this point */
-                                       lttng_ht_add_unique_ulong(metadata_ht,
-                                                       &stream->waitfd_node);
-                                       rcu_read_unlock();
+                                       ret = consumer_add_metadata_stream(stream, metadata_ht);
+                                       if (ret) {
+                                               ERR("Unable to add metadata stream");
+                                               /* Stream was not setup properly. Continuing. */
+                                               consumer_del_metadata_stream(stream, NULL);
+                                               continue;
+                                       }
 
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI);
-
-                                       consumer_add_metadata_stream(stream);
                                }
 
-                               /* Metadata pipe handled. Continue handling the others */
+                               /* Handle other stream */
                                continue;
                        }
 
-                       /* From here, the event is a metadata wait fd */
-
                        rcu_read_lock();
                        lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
                                        &iter);
                        node = lttng_ht_iter_get_node_ulong(&iter);
-                       if (node == NULL) {
-                               /* FD not found, continue loop */
-                               rcu_read_unlock();
-                               continue;
-                       }
+                       assert(node);
 
                        stream = caa_container_of(node, struct lttng_consumer_stream,
                                        waitfd_node);
 
-                       /* Get the data out of the metadata file descriptor */
-                       if (revents & (LPOLLIN | LPOLLPRI)) {
-                               DBG("Metadata available on fd %d", pollfd);
-                               assert(stream->wait_fd == pollfd);
-
-                               len = ctx->on_buffer_ready(stream, ctx);
-                               /* It's ok to have an unavailable sub-buffer */
-                               if (len < 0 && len != -EAGAIN) {
-                                       rcu_read_unlock();
-                                       goto end;
-                               } else if (len > 0) {
-                                       stream->data_read = 1;
-                               }
-                       }
-
-                       /*
-                        * Remove the stream from the hash table since there is no data
-                        * left on the fd because we previously did a read on the buffer.
-                        */
+                       /* Check for error event */
                        if (revents & (LPOLLERR | LPOLLHUP)) {
-                               DBG("Metadata fd %d is hup|err|nval.", pollfd);
+                               DBG("Metadata fd %d is hup|err.", pollfd);
                                if (!stream->hangup_flush_done
                                                && (consumer_data.type == LTTNG_CONSUMER32_UST
                                                        || consumer_data.type == LTTNG_CONSUMER64_UST)) {
@@ -1817,12 +1896,28 @@ restart:
                                        }
                                }
 
-                               /* Removing it from hash table, poll set and free memory */
-                               lttng_ht_del(metadata_ht, &iter);
-
                                lttng_poll_del(&events, stream->wait_fd);
-                               consumer_del_metadata_stream(stream);
+                               /*
+                                * This call update the channel states, closes file descriptors
+                                * and securely free the stream.
+                                */
+                               consumer_del_metadata_stream(stream, metadata_ht);
+                       } else if (revents & (LPOLLIN | LPOLLPRI)) {
+                               /* Get the data out of the metadata file descriptor */
+                               DBG("Metadata available on fd %d", pollfd);
+                               assert(stream->wait_fd == pollfd);
+
+                               len = ctx->on_buffer_ready(stream, ctx);
+                               /* It's ok to have an unavailable sub-buffer */
+                               if (len < 0 && len != -EAGAIN) {
+                                       rcu_read_unlock();
+                                       goto end;
+                               } else if (len > 0) {
+                                       stream->data_read = 1;
+                               }
                        }
+
+                       /* Release RCU lock for the stream looked up */
                        rcu_read_unlock();
                }
        }
@@ -2015,19 +2110,22 @@ void *lttng_consumer_thread_poll_fds(void *data)
                        if ((pollfd[i].revents & POLLHUP)) {
                                DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
-                                       consumer_del_stream(local_stream[i]);
+                                       consumer_del_stream(local_stream[i],
+                                                       consumer_data.stream_ht);
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
-                                       consumer_del_stream(local_stream[i]);
+                                       consumer_del_stream(local_stream[i],
+                                                       consumer_data.stream_ht);
                                        num_hup++;
                                }
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
                                if (!local_stream[i]->data_read) {
-                                       consumer_del_stream(local_stream[i]);
+                                       consumer_del_stream(local_stream[i],
+                                                       consumer_data.stream_ht);
                                        num_hup++;
                                }
                        }
index 9a93c427915bbd6de3aef1bdbc9440320244092a..d0cd8fd869e2b647581e02e163f75725365124f8 100644 (file)
@@ -341,7 +341,10 @@ extern struct lttng_consumer_stream *consumer_allocate_stream(
                int metadata_flag,
                int *alloc_ret);
 extern int consumer_add_stream(struct lttng_consumer_stream *stream);
-extern void consumer_del_stream(struct lttng_consumer_stream *stream);
+extern void consumer_del_stream(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht);
+extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
+               struct lttng_ht *ht);
 extern void consumer_change_stream_state(int stream_key,
                enum lttng_consumer_stream_state state);
 extern void consumer_del_channel(struct lttng_consumer_channel *channel);
index 4d61cc506f32e57896b96bf72ee50b91cc3a50b5..13cbe2149de6852ed08e6532f6025ff3e382a103 100644 (file)
@@ -206,18 +206,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
-                       free(new_stream);
+                       consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
 
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                }
@@ -230,9 +232,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        } while (ret < 0 && errno == EINTR);
                        if (ret < 0) {
                                PERROR("write metadata pipe");
+                               consumer_del_stream(new_stream, NULL);
                        }
                } else {
-                       consumer_add_stream(new_stream);
+                       ret = consumer_add_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %d failed. Continuing",
+                                               new_stream->key);
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
+                       }
                }
 
                DBG("Kernel consumer_add_stream (%d)", fd);
index 76238a087976ff5ca341040ecbafecfcb3d0b0a0..11706877a7f5b3f147034abcf106a0bb5e77c5d2 100644 (file)
@@ -234,12 +234,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                        &new_stream->relayd_stream_id);
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                } else if (msg.u.stream.net_index != -1) {
                        ERR("Network sequence index %d unknown. Not adding stream.",
                                        msg.u.stream.net_index);
-                       free(new_stream);
+                       consumer_del_stream(new_stream, NULL);
                        goto end_nosignal;
                }
 
@@ -247,6 +248,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
+                               consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                }
@@ -259,9 +261,21 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        } while (ret < 0 && errno == EINTR);
                        if (ret < 0) {
                                PERROR("write metadata pipe");
+                               consumer_del_metadata_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
                } else {
-                       consumer_add_stream(new_stream);
+                       ret = consumer_add_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %d failed. Continuing",
+                                               new_stream->key);
+                               /*
+                                * At this point, if the add_stream fails, it is not in the
+                                * hash table thus passing the NULL value here.
+                                */
+                               consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
+                       }
                }
 
                DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
@@ -373,7 +387,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
        ustctl_unmap_channel(chan->handle);
 }
 
-int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
 {
        struct lttng_ust_object_data obj;
        int ret;
@@ -385,13 +399,14 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
        ret = ustctl_add_stream(stream->chan->handle, &obj);
        if (ret) {
                ERR("UST ctl add_stream failed with ret %d", ret);
-               return ret;
+               goto error;
        }
 
        stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu);
        if (!stream->buf) {
                ERR("UST ctl open_stream_read failed");
-               return -EBUSY;
+               ret = -EBUSY;
+               goto error;
        }
 
        /* ustctl_open_stream_read has closed the shm fd. */
@@ -401,10 +416,16 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
        stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf);
        if (!stream->mmap_base) {
                ERR("UST ctl get_mmap_base failed");
-               return -EINVAL;
+               ret = -EINVAL;
+               goto mmap_error;
        }
 
        return 0;
+
+mmap_error:
+       ustctl_close_stream_read(stream->chan->handle, stream->buf);
+error:
+       return ret;
 }
 
 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
index 3f76f234feaa32348e539c3401061e5a530d26d0..6b507ed99f9a9fa1f2640f0d7a06f25315fd46ee 100644 (file)
@@ -49,7 +49,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
 extern int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan);
 extern void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan);
-extern int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream);
+extern int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream);
 extern void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream);
 
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
@@ -117,7 +117,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
 }
 
 static inline
-int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream)
+int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream)
 {
        return -ENOSYS;
 }
This page took 0.036888 seconds and 4 git commands to generate.