--- /dev/null
+#ifndef HEALTH_CONSUMERD_H
+#define HEALTH_CONSUMERD_H
+
+/*
+ * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2013 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <lttng/health-internal.h>
+
+enum health_type {
+ HEALTH_CONSUMERD_TYPE_CHANNEL = 0,
+ HEALTH_CONSUMERD_TYPE_METADATA = 1,
+ HEALTH_CONSUMERD_TYPE_DATA = 2,
+ HEALTH_CONSUMERD_TYPE_SESSIOND = 3,
+ HEALTH_CONSUMERD_TYPE_METADATA_TIMER = 4,
+
+ NR_HEALTH_CONSUMERD_TYPES,
+};
+
+/* Consumerd health monitoring */
+struct health_app *health_consumerd;
+
+#endif /* HEALTH_CONSUMERD_H */
#include <common/sessiond-comm/sessiond-comm.h>
#include "lttng-consumerd.h"
+#include "health-consumerd.h"
/* TODO : support UST (all direct kernel-ctl accesses). */
/* the liblttngconsumerd context */
static struct lttng_consumer_local_data *ctx;
+/* Consumerd health monitoring */
+struct health_app *health_consumerd;
+
/*
* Signal handler for the daemon
*/
set_ulimit();
}
+ health_consumerd = health_app_create(NR_HEALTH_CONSUMERD_TYPES);
+ if (!health_consumerd) {
+ goto error;
+ }
+
/* create the consumer instance with and assign the callbacks */
ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer,
NULL, lttng_consumer_on_recv_stream, NULL);
end:
lttng_consumer_destroy(ctx);
lttng_consumer_cleanup();
+ if (health_consumerd) {
+ health_app_destroy(health_consumerd);
+ }
return ret;
}
#include "consumer.h"
#include "consumer-stream.h"
+#include "../bin/lttng-consumerd/health-consumerd.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
*/
void *consumer_thread_metadata_poll(void *data)
{
- int ret, i, pollfd;
+ int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
while (1) {
/* Only the metadata pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
}
}
+ /* All is OK */
+ err = 0;
error:
end:
DBG("Metadata poll thread exiting");
end_poll:
destroy_stream_ht(metadata_ht);
end_ht:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i;
+ int num_rdy, num_hup, high_prio, ret, i, err = -1;
struct pollfd *pollfd = NULL;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
/* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
}
}
}
+ /* All is OK */
+ err = 0;
end:
DBG("polling thread exiting");
free(pollfd);
destroy_data_stream_ht(data_ht);
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
+
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_channel_poll(void *data)
{
- int ret, i, pollfd;
+ int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_consumer_channel *chan = NULL;
struct lttng_ht_iter iter;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!channel_ht) {
/* ENOMEM at this point. Better to bail out. */
while (1) {
/* Only the channel pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
}
}
+ /* All is OK */
+ err = 0;
end:
lttng_poll_clean(&events);
end_poll:
destroy_channel_ht(channel_ht);
end_ht:
DBG("Channel poll thread exiting");
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock = -1, client_socket, ret;
+ int sock = -1, client_socket, ret, err = -1;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
}
if (consumer_quit) {
DBG("consumer_thread_receive_fds received quit from signal");
+ err = 0; /* All is OK */
goto end;
}
DBG("received command on sock");
}
+ /* All is OK */
+ err = 0;
+
end:
DBG("Consumer thread sessiond poll exiting");
}
}
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
+
rcu_unregister_thread();
return NULL;
}