channel->key = -1;
}
+static
+void consumer_free_stream(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+
+ free(stream);
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
/* Get stream node from hash table */
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
- /* Remove stream node from hash table */
- ret = lttng_ht_del(consumer_data.stream_ht, &iter);
- assert(!ret);
+ /*
+ * Remove stream node from hash table. It can fail if it's been
+ * replaced due to key reuse.
+ */
+ (void) lttng_ht_del(consumer_data.stream_ht, &iter);
rcu_read_unlock();
}
if (!--stream->chan->refcount)
free_chan = stream->chan;
- free(stream);
+
+ call_rcu(&stream->node.head, consumer_free_stream);
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
consumer_del_channel(free_chan);
}
-static void consumer_del_stream_rcu(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
-
- consumer_del_stream(stream);
-}
-
struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
rcu_read_lock();
- lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+ /*
+ * We simply remove the old channel from the hash table. It's
+ * ok, since we know for sure the sessiond wants to replace it
+ * with the new version, because the key has been reused.
+ */
+ (void) lttng_ht_add_replace_ulong(consumer_data.stream_ht, &stream->node);
rcu_read_unlock();
consumer_data.stream_count++;
consumer_data.need_update = 1;
end:
pthread_mutex_unlock(&consumer_data.lock);
+
return ret;
}
pthread_mutex_unlock(&consumer_data.lock);
}
+static
+void consumer_free_channel(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_channel *channel =
+ caa_container_of(node, struct lttng_consumer_channel, node);
+
+ free(channel);
+}
+
/*
* Remove a channel from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
lttng_ht_lookup(consumer_data.channel_ht,
(void *)((unsigned long) channel->key), &iter);
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
+
+ /*
+ * Remove channel node from hash table. It can fail if it's been
+ * replaced due to key reuse.
+ */
+ (void) lttng_ht_del(consumer_data.channel_ht, &iter);
rcu_read_unlock();
PERROR("close");
}
}
- free(channel);
+
+ call_rcu(&channel->node.head, consumer_free_channel);
end:
pthread_mutex_unlock(&consumer_data.lock);
}
-static void consumer_del_channel_rcu(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_channel *channel=
- caa_container_of(node, struct lttng_consumer_channel, node);
-
- consumer_del_channel(channel);
-}
-
struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
rcu_read_lock();
- lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+ /*
+ * We simply remove the old channel from the hash table. It's
+ * ok, since we know for sure the sessiond wants to replace it
+ * with the new version, because the key has been reused.
+ */
+ (void) lttng_ht_add_replace_ulong(consumer_data.channel_ht, &channel->node);
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
+
return 0;
}
*/
void lttng_consumer_cleanup(void)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
*/
cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
node) {
- ret = lttng_ht_del(consumer_data.stream_ht, &iter);
- assert(!ret);
- call_rcu(&node->head, consumer_del_stream_rcu);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+ consumer_del_stream(stream);
}
cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
node) {
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- call_rcu(&node->head, consumer_del_channel_rcu);
+ struct lttng_consumer_channel *channel =
+ caa_container_of(node, struct lttng_consumer_channel, node);
+ consumer_del_channel(channel);
}
rcu_read_unlock();
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
+ consumer_del_stream(local_stream[i]);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
+ consumer_del_stream(local_stream[i]);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
+ consumer_del_stream(local_stream[i]);
num_hup++;
}
}