*
* This function MUST be called with the consumer_data lock acquired.
*/
-void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret;
struct lttng_ht_iter iter;
/* Destroy the relayd if refcount is 0 */
if (uatomic_read(&relayd->refcount) == 0) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
rcu_read_unlock();
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
*/
-
-int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd)
+static int add_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret = 0;
struct lttng_ht_node_ulong *node;
return;
}
+ rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
ret = lttng_ht_del(ht, &iter);
assert(!ret);
free(stream);
}
+ rcu_read_unlock();
lttng_ht_destroy(ht);
}
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
+ destroy_relayd(relayd);
}
}
rcu_read_unlock();
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
+ rcu_read_lock();
/* The node should be init at this point */
lttng_ht_add_unique_ulong(metadata_ht,
&stream->waitfd_node);
+ rcu_read_unlock();
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
/* From here, the event is a metadata wait fd */
+ rcu_read_lock();
lttng_ht_lookup(metadata_ht, (void *)((unsigned long) pollfd),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node == NULL) {
/* FD not found, continue loop */
+ rcu_read_unlock();
continue;
}
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN) {
+ rcu_read_unlock();
goto end;
} else if (len > 0) {
stream->data_read = 1;
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN) {
+ rcu_read_unlock();
goto end;
}
}
/* Removing it from hash table, poll set and free memory */
lttng_ht_del(metadata_ht, &iter);
+
lttng_poll_del(&events, stream->wait_fd);
consumer_del_metadata_stream(stream);
}
+ rcu_read_unlock();
}
}
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
- consumer_add_relayd(relayd);
+ add_relayd(relayd);
/* All good! */
ret = 0;
int consumer_add_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
-int consumer_add_relayd(struct consumer_relayd_sock_pair *relayd);
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
int net_seq_idx);
struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
-void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
extern struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,