#include <common/ust-consumer/ust-consumer.h>
#include "consumer.h"
+#include "consumer-stream.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
}
/*
- * Remove a stream from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Completly destroy stream from every visiable data structure and the given
+ * hash table if one.
+ *
+ * One this call returns, the stream object is not longer usable nor visible.
*/
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
- int ret;
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *free_chan = NULL;
- struct consumer_relayd_sock_pair *relayd;
-
- assert(stream);
-
- DBG("Consumer del stream %d", stream->wait_fd);
-
- if (ht == NULL) {
- /* Means the stream was allocated but not successfully added */
- goto free_stream_rcu;
- }
-
- pthread_mutex_lock(&consumer_data.lock);
- pthread_mutex_lock(&stream->lock);
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (stream->mmap_base != NULL) {
- ret = munmap(stream->mmap_base, stream->mmap_len);
- if (ret != 0) {
- PERROR("munmap");
- }
- }
-
- if (stream->wait_fd >= 0) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_del_stream(stream);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
-
- rcu_read_lock();
- iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- iter.iter.node = &stream->node_channel_id.node;
- ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
- assert(!ret);
-
- iter.iter.node = &stream->node_session_id.node;
- ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
- assert(consumer_data.stream_count > 0);
- consumer_data.stream_count--;
-
- if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- /* Check and cleanup relayd */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_dec(&relayd->refcount);
- assert(uatomic_read(&relayd->refcount) >= 0);
-
- /* Closing streams requires to lock the control socket. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_send_close_stream(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- DBG("Unable to close stream on the relayd. Continuing");
- /*
- * Continue here. There is nothing we can do for the relayd.
- * Chances are that the relayd has closed the socket so we just
- * continue cleaning up.
- */
- }
-
- /* Both conditions are met, we destroy the relayd. */
- if (uatomic_read(&relayd->refcount) == 0 &&
- uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
- }
- }
- rcu_read_unlock();
-
- if (!uatomic_sub_return(&stream->chan->refcount, 1)
- && !uatomic_read(&stream->chan->nb_init_stream_left)) {
- free_chan = stream->chan;
- }
-
-end:
- consumer_data.need_update = 1;
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&consumer_data.lock);
-
- if (free_chan) {
- consumer_del_channel(free_chan);
- }
-
-free_stream_rcu:
- call_rcu(&stream->node.head, free_stream_rcu);
+ consumer_stream_destroy(stream, ht);
}
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,