*/
void consumer_output_send_destroy_relayd(struct consumer_output *consumer)
{
- struct lttng_ht_iter iter;
- struct consumer_socket *socket;
-
LTTNG_ASSERT(consumer);
/* Destroy any relayd connection */
- if (consumer->type == CONSUMER_DST_NET) {
- const lttng::urcu::read_lock_guard read_lock;
+ if (consumer->type != CONSUMER_DST_NET) {
+ return;
+ }
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- /* Send destroy relayd command. */
- const int ret = consumer_send_destroy_relayd(socket, consumer);
+ for (auto *socket :
+ lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(*consumer->socks->ht)) {
+ /* Send destroy relayd command. */
+ const int ret = consumer_send_destroy_relayd(socket, consumer);
- if (ret < 0) {
- DBG("Unable to send destroy relayd command to consumer");
- /* Continue since we MUST delete everything at this point. */
- }
+ if (ret < 0) {
+ DBG("Unable to send destroy relayd command to consumer");
+ /* Continue since we MUST delete everything at this point. */
}
}
}
*/
void consumer_destroy_output_sockets(struct consumer_output *obj)
{
- struct lttng_ht_iter iter;
- struct consumer_socket *socket;
-
if (!obj->socks) {
return;
}
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (obj->socks->ht, &iter.iter, socket, node.node) {
- consumer_del_socket(socket, obj);
- consumer_destroy_socket(socket);
- }
+ for (auto *socket :
+ lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(*obj->socks->ht)) {
+ consumer_del_socket(socket, obj);
+ consumer_destroy_socket(socket);
}
}
int consumer_copy_sockets(struct consumer_output *dst, struct consumer_output *src)
{
int ret = 0;
- struct lttng_ht_iter iter;
- struct consumer_socket *socket, *copy_sock;
LTTNG_ASSERT(dst);
LTTNG_ASSERT(src);
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (src->socks->ht, &iter.iter, socket, node.node) {
- /* Ignore socket that are already there. */
- copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
- if (copy_sock) {
- continue;
- }
-
- /* Create new socket object. */
- copy_sock = consumer_allocate_socket(socket->fd_ptr);
- if (copy_sock == nullptr) {
- ret = -ENOMEM;
- goto error;
- }
+ for (auto *socket :
+ lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(*src->socks->ht)) {
+ /* Ignore socket that are already there. */
+ auto *copy_sock = consumer_find_socket(*socket->fd_ptr, dst);
+ if (copy_sock) {
+ continue;
+ }
- copy_sock->registered = socket->registered;
- /*
- * This is valid because this lock is shared accross all consumer
- * object being the global lock of the consumer data structure of the
- * session daemon.
- */
- copy_sock->lock = socket->lock;
- consumer_add_socket(copy_sock, dst);
+ /* Create new socket object. */
+ copy_sock = consumer_allocate_socket(socket->fd_ptr);
+ if (copy_sock == nullptr) {
+ ret = -ENOMEM;
+ goto error;
}
+
+ copy_sock->registered = socket->registered;
+ /*
+ * This is valid because this lock is shared accross all consumer
+ * object being the global lock of the consumer data structure of the
+ * session daemon.
+ */
+ copy_sock->lock = socket->lock;
+ consumer_add_socket(copy_sock, dst);
}
error:
{
int ret;
int32_t ret_code = 0; /* Default is that the data is NOT pending */
- struct consumer_socket *socket;
- struct lttng_ht_iter iter;
struct lttcomm_consumer_msg msg;
LTTNG_ASSERT(consumer);
msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
msg.u.data_pending.session_id = session_id;
- {
- /* Send command for each consumer. */
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto error_unlock;
- }
-
- /*
- * No need for a recv reply status because the answer to the command is
- * the reply status message.
- */
- ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto error_unlock;
- }
+ for (auto *socket :
+ lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(*consumer->socks->ht)) {
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+ if (ret < 0) {
pthread_mutex_unlock(socket->lock);
+ goto error_unlock;
+ }
- if (ret_code == 1) {
- break;
- }
+ pthread_mutex_unlock(socket->lock);
+
+ if (ret_code == 1) {
+ break;
}
}
uint64_t *discarded)
{
int ret;
- struct consumer_socket *socket;
- struct lttng_ht_iter iter;
struct lttcomm_consumer_msg msg;
LTTNG_ASSERT(consumer);
*discarded = 0;
/* Send command for each consumer. */
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- uint64_t consumer_discarded = 0;
-
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
-
- /*
- * No need for a recv reply status because the answer to the
- * command is the reply status message.
- */
- ret = consumer_socket_recv(
- socket, &consumer_discarded, sizeof(consumer_discarded));
- if (ret < 0) {
- ERR("get discarded events");
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ for (auto *socket :
+ lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(*consumer->socks->ht)) {
+ uint64_t consumer_discarded = 0;
+
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_discarded, sizeof(consumer_discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
pthread_mutex_unlock(socket->lock);
- *discarded += consumer_discarded;
+ goto end;
}
+
+ pthread_mutex_unlock(socket->lock);
+ *discarded += consumer_discarded;
}
ret = 0;
uint64_t *lost)
{
int ret;
- struct consumer_socket *socket;
- struct lttng_ht_iter iter;
struct lttcomm_consumer_msg msg;
LTTNG_ASSERT(consumer);
*lost = 0;
/* Send command for each consumer. */
- {
- const lttng::urcu::read_lock_guard read_lock;
-
- cds_lfht_for_each_entry (consumer->socks->ht, &iter.iter, socket, node.node) {
- uint64_t consumer_lost = 0;
- pthread_mutex_lock(socket->lock);
- ret = consumer_socket_send(socket, &msg, sizeof(msg));
- if (ret < 0) {
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ for (auto *socket :
+ lttng::urcu::lfht_iteration_adapter<consumer_socket,
+ decltype(consumer_socket::node),
+ &consumer_socket::node>(*consumer->socks->ht)) {
+ uint64_t consumer_lost = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
- /*
- * No need for a recv reply status because the answer to the
- * command is the reply status message.
- */
- ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
- if (ret < 0) {
- ERR("get lost packets");
- pthread_mutex_unlock(socket->lock);
- goto end;
- }
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_lost, sizeof(consumer_lost));
+ if (ret < 0) {
+ ERR("get lost packets");
pthread_mutex_unlock(socket->lock);
- *lost += consumer_lost;
+ goto end;
}
+ pthread_mutex_unlock(socket->lock);
+ *lost += consumer_lost;
}
ret = 0;