m4_define([_DEFAULT_CHANNEL_SWITCH_TIMER], [0])
m4_define([_DEFAULT_CHANNEL_LIVE_TIMER], [0])
m4_define([_DEFAULT_CHANNEL_READ_TIMER], [200000])
+m4_define([_DEFAULT_CHANNEL_MONITOR_TIMER], [1000000])
_AC_DEFINE_AND_SUBST([DEFAULT_AGENT_TCP_PORT], [5345])
_AC_DEFINE_AND_SUBST([DEFAULT_APP_SOCKET_RW_TIMEOUT], [5])
_AC_DEFINE_AND_SUBST([DEFAULT_CHANNEL_SUBBUF_SIZE], [_DEFAULT_CHANNEL_SUBBUF_SIZE])
_AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_SUBBUF_NUM], [_DEFAULT_CHANNEL_SUBBUF_NUM])
_AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_SUBBUF_SIZE], [262144])
_AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER], [_DEFAULT_CHANNEL_SWITCH_TIMER])
+_AC_DEFINE_AND_SUBST([DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER], [_DEFAULT_CHANNEL_MONITOR_TIMER])
_AC_DEFINE_AND_SUBST([DEFAULT_LTTNG_LIVE_TIMER], [1000000])
_AC_DEFINE_AND_SUBST([DEFAULT_METADATA_CACHE_SIZE], [4096])
_AC_DEFINE_AND_SUBST([DEFAULT_METADATA_READ_TIMER], [0])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_SUBBUF_NUM], [_DEFAULT_CHANNEL_SUBBUF_NUM])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_SUBBUF_SIZE], [_DEFAULT_CHANNEL_SUBBUF_SIZE])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_SWITCH_TIMER], [_DEFAULT_CHANNEL_SWITCH_TIMER])
+_AC_DEFINE_AND_SUBST([DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER], [_DEFAULT_CHANNEL_MONITOR_TIMER])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_LIVE_TIMER], [_DEFAULT_CHANNEL_LIVE_TIMER])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_READ_TIMER], [0])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_SUBBUF_NUM], [_DEFAULT_CHANNEL_SUBBUF_NUM])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_SUBBUF_SIZE], [131072])
_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_SWITCH_TIMER], [_DEFAULT_CHANNEL_SWITCH_TIMER])
+_AC_DEFINE_AND_SUBST([DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER], [_DEFAULT_CHANNEL_MONITOR_TIMER])
_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_AGENT_BIND_ADDRESS], [localhost])
_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_CONTROL_BIND_ADDRESS], [0.0.0.0])
_AC_DEFINE_QUOTED_AND_SUBST([DEFAULT_NETWORK_DATA_BIND_ADDRESS], [0.0.0.0])
struct lttng_channel_extended {
uint64_t discarded_events;
uint64_t lost_packets;
+ uint64_t monitor_timer_interval;
} LTTNG_PACKED;
#endif /* LTTNG_CHANNEL_INTERNAL_H */
PERROR("pthread_join sessiond_thread");
retval = -1;
}
+
+ ret = consumer_timer_thread_get_channel_monitor_pipe();
+ if (ret >= 0) {
+ ret = close(ret);
+ if (ret) {
+ PERROR("close channel monitor pipe");
+ }
+ }
exit_sessiond_thread:
ret = pthread_join(data_thread, &status);
{
struct lttng_channel *chan;
const char *channel_name = DEFAULT_CHANNEL_NAME;
+ struct lttng_channel_extended *extended_attr = NULL;
chan = zmalloc(sizeof(struct lttng_channel));
if (chan == NULL) {
goto error_alloc;
}
+ extended_attr = zmalloc(sizeof(struct lttng_channel_extended));
+ if (!extended_attr) {
+ PERROR("zmalloc channel extended init");
+ goto error;
+ }
+
+ chan->attr.extended.ptr = extended_attr;
+
/* Same for all domains. */
chan->attr.overwrite = DEFAULT_CHANNEL_OVERWRITE;
chan->attr.tracefile_size = DEFAULT_CHANNEL_TRACEFILE_SIZE;
chan->attr.switch_timer_interval = DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER;
chan->attr.read_timer_interval = DEFAULT_KERNEL_CHANNEL_READ_TIMER;
chan->attr.live_timer_interval = DEFAULT_KERNEL_CHANNEL_LIVE_TIMER;
+ extended_attr->monitor_timer_interval =
+ DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER;
break;
case LTTNG_DOMAIN_JUL:
channel_name = DEFAULT_JUL_CHANNEL_NAME;
DEFAULT_UST_UID_CHANNEL_READ_TIMER;
chan->attr.live_timer_interval =
DEFAULT_UST_UID_CHANNEL_LIVE_TIMER;
+ extended_attr->monitor_timer_interval =
+ DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER;
break;
case LTTNG_BUFFER_PER_PID:
default:
chan->attr.read_timer_interval =
DEFAULT_UST_PID_CHANNEL_READ_TIMER;
chan->attr.live_timer_interval =
- DEFAULT_UST_UID_CHANNEL_LIVE_TIMER;
+ DEFAULT_UST_PID_CHANNEL_LIVE_TIMER;
+ extended_attr->monitor_timer_interval =
+ DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER;
break;
}
break;
return chan;
error:
+ free(extended_attr);
free(chan);
error_alloc:
return NULL;
}
+void channel_attr_destroy(struct lttng_channel *channel)
+{
+ if (!channel) {
+ return;
+ }
+ free(channel->attr.extended.ptr);
+ free(channel);
+}
+
/*
* Disable kernel channel of the kernel session.
*/
ret = LTTNG_OK;
error:
- free(defattr);
+ channel_attr_destroy(defattr);
return ret;
}
}
}
- free(defattr);
+ channel_attr_destroy(defattr);
return LTTNG_OK;
error_free_chan:
*/
trace_ust_destroy_channel(uchan);
error:
- free(defattr);
+ channel_attr_destroy(defattr);
return ret;
}
struct lttng_channel *channel_new_default_attr(int domain,
enum lttng_buffer_type type);
+void channel_attr_destroy(struct lttng_channel *channel);
int channel_ust_create(struct ltt_ust_session *usess,
struct lttng_channel *attr, enum lttng_buffer_type type);
}
ret = consumer_recv_status_reply(sock);
-
error:
return ret;
}
unsigned int switch_timer_interval,
unsigned int read_timer_interval,
unsigned int live_timer_interval,
+ unsigned int monitor_timer_interval,
int output,
int type,
uint64_t session_id,
msg->u.ask_channel.switch_timer_interval = switch_timer_interval;
msg->u.ask_channel.read_timer_interval = read_timer_interval;
msg->u.ask_channel.live_timer_interval = live_timer_interval;
+ msg->u.ask_channel.monitor_timer_interval = monitor_timer_interval;
msg->u.ask_channel.output = output;
msg->u.ask_channel.type = type;
msg->u.ask_channel.session_id = session_id;
uint64_t tracefile_size,
uint64_t tracefile_count,
unsigned int monitor,
- unsigned int live_timer_interval)
+ unsigned int live_timer_interval,
+ unsigned int monitor_timer_interval)
{
assert(msg);
msg->u.channel.tracefile_count = tracefile_count;
msg->u.channel.monitor = monitor;
msg->u.channel.live_timer_interval = live_timer_interval;
+ msg->u.channel.monitor_timer_interval = monitor_timer_interval;
strncpy(msg->u.channel.pathname, pathname,
sizeof(msg->u.channel.pathname));
return ret;
}
+int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
+ int pipe)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+
+ /* Code flow error. Safety net. */
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE;
+
+ DBG3("Sending set_channel_monitor_pipe command to consumer");
+ ret = consumer_send_msg(consumer_sock, &msg);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG3("Sending channel monitoring pipe %d to consumer on socket %d",
+ pipe, *consumer_sock->fd_ptr);
+ ret = consumer_send_fds(consumer_sock, &pipe, 1);
+ if (ret < 0) {
+ goto error;
+ }
+
+ DBG2("Channel monitoring pipe successfully sent");
+error:
+ return ret;
+}
+
/*
* Set consumer subdirectory using the session name and a generated datetime if
* needed. This is appended to the current subdirectory.
int err_sock;
/* These two sockets uses the cmd_unix_sock_path. */
int cmd_sock;
+ /*
+ * Write-end of the channel monitoring pipe to be passed to the
+ * consumer.
+ */
+ int channel_monitor_pipe;
/*
* The metadata socket object is handled differently and only created
* locally in this object thus it's the only reference available in the
struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer,
enum lttng_stream_type type, uint64_t session_id,
char *session_name, char *hostname, int session_live_timer);
+int consumer_send_channel_monitor_pipe(struct consumer_socket *consumer_sock,
+ int pipe);
int consumer_send_destroy_relayd(struct consumer_socket *sock,
struct consumer_output *consumer);
int consumer_recv_status_reply(struct consumer_socket *sock);
unsigned int switch_timer_interval,
unsigned int read_timer_interval,
unsigned int live_timer_interval,
+ unsigned int monitor_timer_interval,
int output,
int type,
uint64_t session_id,
uint64_t tracefile_size,
uint64_t tracefile_count,
unsigned int monitor,
- unsigned int live_timer_interval);
+ unsigned int live_timer_interval,
+ unsigned int monitor_timer_interval);
int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer);
int consumer_close_metadata(struct consumer_socket *socket,
#include "consumer.h"
#include "health-sessiond.h"
#include "kernel-consumer.h"
+#include "notification-thread-commands.h"
+#include "session.h"
+#include "lttng-sessiond.h"
static char *create_channel_path(struct consumer_output *consumer,
uid_t uid, gid_t gid)
* Sending a single channel to the consumer with command ADD_CHANNEL.
*/
int kernel_consumer_add_channel(struct consumer_socket *sock,
- struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
+ struct ltt_kernel_channel *channel,
+ struct ltt_kernel_session *ksession,
unsigned int monitor)
{
int ret;
char *pathname;
struct lttcomm_consumer_msg lkm;
struct consumer_output *consumer;
+ enum lttng_error_code status;
+ struct ltt_session *session;
+ struct lttng_channel_extended *channel_attr_extended;
/* Safety net */
assert(channel);
- assert(session);
- assert(session->consumer);
+ assert(ksession);
+ assert(ksession->consumer);
- consumer = session->consumer;
+ consumer = ksession->consumer;
+ channel_attr_extended = (struct lttng_channel_extended *)
+ channel->channel->attr.extended.ptr;
DBG("Kernel consumer adding channel %s to kernel consumer",
channel->channel->name);
if (monitor) {
- pathname = create_channel_path(consumer, session->uid, session->gid);
+ pathname = create_channel_path(consumer, ksession->uid,
+ ksession->gid);
} else {
/* Empty path. */
pathname = strdup("");
consumer_init_channel_comm_msg(&lkm,
LTTNG_CONSUMER_ADD_CHANNEL,
channel->fd,
- session->id,
+ ksession->id,
pathname,
- session->uid,
- session->gid,
+ ksession->uid,
+ ksession->gid,
consumer->net_seq_index,
channel->channel->name,
channel->stream_count,
channel->channel->attr.tracefile_size,
channel->channel->attr.tracefile_count,
monitor,
- channel->channel->attr.live_timer_interval);
+ channel->channel->attr.live_timer_interval,
+ channel_attr_extended->monitor_timer_interval);
health_code_update();
}
health_code_update();
+ rcu_read_lock();
+ session = session_find_by_id(ksession->id);
+ assert(session);
+ status = notification_thread_command_add_channel(
+ notification_thread_handle, session->name,
+ ksession->uid, ksession->gid,
+ channel->channel->name, channel->fd,
+ LTTNG_DOMAIN_KERNEL,
+ channel->channel->attr.subbuf_size * channel->channel->attr.num_subbuf);
+ rcu_read_unlock();
+ if (status != LTTNG_OK) {
+ ret = -1;
+ goto error;
+ }
error:
free(pathname);
return ret;
DEFAULT_KERNEL_CHANNEL_OUTPUT,
CONSUMER_CHANNEL_TYPE_METADATA,
0, 0,
- monitor, 0);
+ monitor, 0, 0);
health_code_update();
#include "session.h"
#include "ust-app.h"
#include "version.h"
+#include "notification-thread.h"
extern const char default_home_dir[],
default_tracing_group[],
/* Set in main.c at boot time of the daemon */
extern int kernel_tracer_fd;
+extern struct notification_thread_handle *notification_thread_handle;
+
/*
* This contains extra data needed for processing a command received by the
* session daemon from the lttng client.
/* Is this daemon root or not. */
extern int is_root;
+extern const char *tracing_group_name;
+
int sessiond_check_thread_quit_pipe(int fd, uint32_t events);
int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size);
void sessiond_notify_ready(void);
#include "consumer.h"
#include "trace-kernel.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
/*
* Find the channel name for the given kernel session.
struct lttng_channel *chan)
{
struct ltt_kernel_channel *lkc;
+ struct lttng_channel_extended *extended;
assert(chan);
lkc->channel = zmalloc(sizeof(struct lttng_channel));
if (lkc->channel == NULL) {
PERROR("lttng_channel zmalloc");
- free(lkc);
+ goto error;
+ }
+
+ extended = zmalloc(sizeof(struct lttng_channel_extended));
+ if (!extended) {
+ PERROR("lttng_channel_channel zmalloc");
goto error;
}
memcpy(lkc->channel, chan, sizeof(struct lttng_channel));
+ memcpy(extended, chan->attr.extended.ptr, sizeof(struct lttng_channel_extended));
+ lkc->channel->attr.extended.ptr = extended;
+ extended = NULL;
/*
* If we receive an empty string for channel name, it means the
return lkc;
error:
+ if (lkc) {
+ free(lkc->channel);
+ }
+ free(extended);
+ free(lkc);
return NULL;
}
struct ltt_kernel_event *event, *etmp;
struct ltt_kernel_context *ctx, *ctmp;
int ret;
+ enum lttng_error_code status;
assert(channel);
/* Remove from channel list */
cds_list_del(&channel->list);
+ status = notification_thread_command_remove_channel(
+ notification_thread_handle,
+ channel->fd, LTTNG_DOMAIN_KERNEL);
+ assert(status == LTTNG_OK);
+ free(channel->channel->attr.extended.ptr);
free(channel->channel);
free(channel);
}
luc->attr.switch_timer_interval = chan->attr.switch_timer_interval;
luc->attr.read_timer_interval = chan->attr.read_timer_interval;
luc->attr.output = (enum lttng_ust_output) chan->attr.output;
+ luc->monitor_timer_interval =
+ ((struct lttng_channel_extended *) chan->attr.extended.ptr)->monitor_timer_interval;
/* Translate to UST output enum */
switch (luc->attr.output) {
uint64_t tracefile_count;
uint64_t per_pid_closed_app_discarded;
uint64_t per_pid_closed_app_lost;
+ uint64_t monitor_timer_interval;
};
/* UST domain global (LTTNG_DOMAIN_UST) */
#include "ust-ctl.h"
#include "utils.h"
#include "session.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
static
int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
/* Wipe and free registry from session registry. */
registry = get_session_registry(ua_chan->session);
if (registry) {
- ust_registry_channel_del_free(registry, ua_chan->key);
+ ust_registry_channel_del_free(registry, ua_chan->key,
+ true);
}
save_per_pid_lost_discarded_counters(ua_chan);
}
ua_chan->attr.overwrite = uchan->attr.overwrite;
ua_chan->attr.switch_timer_interval = uchan->attr.switch_timer_interval;
ua_chan->attr.read_timer_interval = uchan->attr.read_timer_interval;
+ ua_chan->monitor_timer_interval = uchan->monitor_timer_interval;
ua_chan->attr.output = uchan->attr.output;
/*
* Note that the attribute channel type is not set since the channel on the
int ret;
struct buffer_reg_uid *reg_uid;
struct buffer_reg_channel *reg_chan;
+ bool created = false;
assert(app);
assert(usess);
* it's not visible anymore in the session registry.
*/
ust_registry_channel_del_free(reg_uid->registry->reg.ust,
- ua_chan->tracing_channel_id);
+ ua_chan->tracing_channel_id, false);
buffer_reg_channel_remove(reg_uid->registry, reg_chan);
buffer_reg_channel_destroy(reg_chan, LTTNG_DOMAIN_UST);
goto error;
ua_chan->name);
goto error;
}
-
+ created = true;
}
/* Send buffers to the application. */
goto error;
}
+ if (created) {
+ enum lttng_error_code cmd_ret;
+ struct ltt_session *session;
+ uint64_t chan_reg_key;
+ struct ust_registry_channel *chan_reg;
+
+ rcu_read_lock();
+ chan_reg_key = ua_chan->tracing_channel_id;
+
+ pthread_mutex_lock(®_uid->registry->reg.ust->lock);
+ chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust,
+ chan_reg_key);
+ assert(chan_reg);
+ chan_reg->consumer_key = ua_chan->key;
+ chan_reg = NULL;
+ pthread_mutex_unlock(®_uid->registry->reg.ust->lock);
+
+ session = session_find_by_id(ua_sess->tracing_id);
+ assert(session);
+
+ cmd_ret = notification_thread_command_add_channel(
+ notification_thread_handle, session->name,
+ ua_sess->euid, ua_sess->egid,
+ ua_chan->name,
+ ua_chan->key,
+ LTTNG_DOMAIN_UST,
+ ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+ rcu_read_unlock();
+ if (cmd_ret != LTTNG_OK) {
+ ret = - (int) cmd_ret;
+ ERR("Failed to add channel to notification thread");
+ goto error;
+ }
+ }
+
error:
return ret;
}
{
int ret;
struct ust_registry_session *registry;
+ enum lttng_error_code cmd_ret;
+ struct ltt_session *session;
+ uint64_t chan_reg_key;
+ struct ust_registry_channel *chan_reg;
assert(app);
assert(usess);
goto error;
}
+ session = session_find_by_id(ua_sess->tracing_id);
+ assert(session);
+
+ chan_reg_key = ua_chan->key;
+ pthread_mutex_lock(®istry->lock);
+ chan_reg = ust_registry_channel_find(registry, chan_reg_key);
+ assert(chan_reg);
+ chan_reg->consumer_key = ua_chan->key;
+ pthread_mutex_unlock(®istry->lock);
+
+ cmd_ret = notification_thread_command_add_channel(
+ notification_thread_handle, session->name,
+ ua_sess->euid, ua_sess->egid,
+ ua_chan->name,
+ ua_chan->key,
+ LTTNG_DOMAIN_UST,
+ ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+ if (cmd_ret != LTTNG_OK) {
+ ret = - (int) cmd_ret;
+ ERR("Failed to add channel to notification thread");
+ goto error;
+ }
+
error:
rcu_read_unlock();
return ret;
/* Only add the channel if successful on the tracer side. */
lttng_ht_add_unique_str(ua_sess->channels, &ua_chan->node);
-
end:
if (ua_chanp) {
*ua_chanp = ua_chan;
struct lttng_ht *events;
uint64_t tracefile_size;
uint64_t tracefile_count;
+ uint64_t monitor_timer_interval;
/*
* Node indexed by channel name in the channels' hash table of a session.
*/
#include "ust-consumer.h"
#include "buffer-registry.h"
#include "session.h"
+#include "lttng-sessiond.h"
/*
* Return allocated full pathname of the session using the consumer trace path
}
/*
- * Send a single channel to the consumer using command ADD_CHANNEL.
+ * Send a single channel to the consumer using command ASK_CHANNEL_CREATION.
*
* Consumer socket lock MUST be acquired before calling this.
*/
ua_chan->attr.switch_timer_interval,
ua_chan->attr.read_timer_interval,
ua_sess->live_timer_interval,
+ ua_chan->monitor_timer_interval,
output,
(int) ua_chan->attr.type,
ua_sess->tracing_id,
/*
* Ask consumer to create a channel for a given session.
*
+ * Session list and rcu read side locks must be held by the caller.
+ *
* Returns 0 on success else a negative value.
*/
int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
struct consumer_socket *socket, struct ust_registry_session *registry)
{
int ret;
+ struct ltt_session *session;
assert(ua_sess);
assert(ua_chan);
goto error;
}
+ session = session_find_by_id(ua_sess->tracing_id);
+ assert(session);
+
pthread_mutex_lock(socket->lock);
ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
+ ERR("ask_channel_creation consumer command failed");
goto error;
}
#include "ust-registry.h"
#include "ust-app.h"
#include "utils.h"
+#include "lttng-sessiond.h"
+#include "notification-thread-commands.h"
/*
* Hash table match function for event in the registry.
* free the registry pointer since it might not have been allocated before so
* it's the caller responsability.
*/
-static void destroy_channel(struct ust_registry_channel *chan)
+static void destroy_channel(struct ust_registry_channel *chan, bool notif)
{
struct lttng_ht_iter iter;
struct ust_registry_event *event;
+ enum lttng_error_code cmd_ret;
assert(chan);
+ if (notif) {
+ cmd_ret = notification_thread_command_remove_channel(
+ notification_thread_handle, chan->consumer_key,
+ LTTNG_DOMAIN_UST);
+ if (cmd_ret != LTTNG_OK) {
+ ERR("Failed to remove channel from notification thread");
+ }
+ }
+
rcu_read_lock();
/* Destroy all event associated with this registry. */
cds_lfht_for_each_entry(chan->ht->ht, &iter.iter, event, node.node) {
return 0;
error:
- destroy_channel(chan);
+ destroy_channel(chan, false);
error_alloc:
return ret;
}
* Remove channel using key from registry and free memory.
*/
void ust_registry_channel_del_free(struct ust_registry_session *session,
- uint64_t key)
+ uint64_t key, bool notif)
{
struct lttng_ht_iter iter;
struct ust_registry_channel *chan;
ret = lttng_ht_del(session->channels, &iter);
assert(!ret);
rcu_read_unlock();
- destroy_channel(chan);
+ destroy_channel(chan, notif);
end:
return;
/* Delete the node from the ht and free it. */
ret = lttng_ht_del(reg->channels, &iter);
assert(!ret);
- destroy_channel(chan);
+ destroy_channel(chan, true);
}
rcu_read_unlock();
ht_cleanup_push(reg->channels);
struct ust_registry_channel {
uint64_t key;
+ uint64_t consumer_key;
/* Id set when replying to a register channel. */
uint32_t chan_id;
enum ustctl_channel_header header_type;
int ust_registry_channel_add(struct ust_registry_session *session,
uint64_t key);
void ust_registry_channel_del_free(struct ust_registry_session *session,
- uint64_t key);
+ uint64_t key, bool notif);
int ust_registry_session_init(struct ust_registry_session **sessionp,
struct ust_app *app,
}
static inline
void ust_registry_channel_del_free(struct ust_registry_session *session,
- uint64_t key)
+ uint64_t key, bool notif)
{}
static inline
int ust_registry_session_init(struct ust_registry_session **sessionp,
#include <inttypes.h>
#include <signal.h>
+#include <lttng/ust-ctl.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/compat/endian.h>
#include <common/consumer/consumer-testpoint.h>
#include <common/ust-consumer/ust-consumer.h>
+typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
+ unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
+ unsigned long *produced);
+
static struct timer_signal_data timer_signal = {
.tid = 0,
.setup_done = 0,
if (ret) {
PERROR("sigaddset live");
}
+ ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
+ if (ret) {
+ PERROR("sigaddset monitor");
+ }
}
+static int channel_monitor_pipe = -1;
+
/*
* Execute action on a timer switch.
*
* deadlocks.
*/
static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si, void *uc)
+ int sig, siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
* Execute action on a live timer
*/
static void live_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si, void *uc)
+ int sig, siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
}
/*
- * Set the timer for periodical metadata flush.
+ * Start a timer channel timer which will fire at a given interval
+ * (timer_interval_us)and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
*/
-void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
- unsigned int switch_timer_interval)
+static
+int consumer_channel_timer_start(timer_t *timer_id,
+ struct lttng_consumer_channel *channel,
+ unsigned int timer_interval_us, int signal)
{
- int ret;
+ int ret = 0, delete_ret;
struct sigevent sev;
struct itimerspec its;
assert(channel);
assert(channel->key);
- if (switch_timer_interval == 0) {
- return;
+ if (timer_interval_us == 0) {
+ /* No creation needed; not an error. */
+ ret = 1;
+ goto end;
}
sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
+ sev.sigev_signo = signal;
sev.sigev_value.sival_ptr = channel;
- ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
+ ret = timer_create(CLOCKID, &sev, timer_id);
if (ret == -1) {
PERROR("timer_create");
+ goto end;
}
- channel->switch_timer_enabled = 1;
- its.it_value.tv_sec = switch_timer_interval / 1000000;
- its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
+ its.it_value.tv_sec = timer_interval_us / 1000000;
+ its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
its.it_interval.tv_sec = its.it_value.tv_sec;
its.it_interval.tv_nsec = its.it_value.tv_nsec;
- ret = timer_settime(channel->switch_timer, 0, &its, NULL);
+ ret = timer_settime(*timer_id, 0, &its, NULL);
if (ret == -1) {
PERROR("timer_settime");
+ goto error_destroy_timer;
+ }
+end:
+ return ret;
+error_destroy_timer:
+ delete_ret = timer_delete(*timer_id);
+ if (delete_ret == -1) {
+ PERROR("timer_delete");
+ }
+ goto end;
+}
+
+static
+int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+{
+ int ret = 0;
+
+ ret = timer_delete(*timer_id);
+ if (ret == -1) {
+ PERROR("timer_delete");
+ goto end;
}
+
+ consumer_timer_signal_thread_qs(signal);
+ *timer_id = 0;
+end:
+ return ret;
+}
+
+/*
+ * Set the channel's switch timer.
+ */
+void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
+ unsigned int switch_timer_interval_us)
+{
+ int ret;
+
+ assert(channel);
+ assert(channel->key);
+
+ ret = consumer_channel_timer_start(&channel->switch_timer, channel,
+ switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+
+ channel->switch_timer_enabled = !!(ret == 0);
}
/*
- * Stop and delete timer.
+ * Stop and delete the channel's switch timer.
*/
void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
{
assert(channel);
- ret = timer_delete(channel->switch_timer);
+ ret = consumer_channel_timer_stop(&channel->switch_timer,
+ LTTNG_CONSUMER_SIG_SWITCH);
if (ret == -1) {
- PERROR("timer_delete");
+ ERR("Failed to stop switch timer");
}
- consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
-
- channel->switch_timer = 0;
channel->switch_timer_enabled = 0;
}
/*
- * Set the timer for the live mode.
+ * Set the channel's live timer.
*/
void consumer_timer_live_start(struct lttng_consumer_channel *channel,
- int live_timer_interval)
+ unsigned int live_timer_interval_us)
{
int ret;
- struct sigevent sev;
- struct itimerspec its;
assert(channel);
assert(channel->key);
- if (live_timer_interval <= 0) {
- return;
- }
+ ret = consumer_channel_timer_start(&channel->live_timer, channel,
+ live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
- sev.sigev_notify = SIGEV_SIGNAL;
- sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
- sev.sigev_value.sival_ptr = channel;
- ret = timer_create(CLOCKID, &sev, &channel->live_timer);
- if (ret == -1) {
- PERROR("timer_create");
- }
- channel->live_timer_enabled = 1;
+ channel->live_timer_enabled = !!(ret == 0);
+}
- its.it_value.tv_sec = live_timer_interval / 1000000;
- its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
- its.it_interval.tv_sec = its.it_value.tv_sec;
- its.it_interval.tv_nsec = its.it_value.tv_nsec;
+/*
+ * Stop and delete the channel's live timer.
+ */
+void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+{
+ int ret;
+
+ assert(channel);
- ret = timer_settime(channel->live_timer, 0, &its, NULL);
+ ret = consumer_channel_timer_stop(&channel->live_timer,
+ LTTNG_CONSUMER_SIG_LIVE);
if (ret == -1) {
- PERROR("timer_settime");
+ ERR("Failed to stop live timer");
}
+
+ channel->live_timer_enabled = 0;
}
/*
- * Stop and delete timer.
+ * Set the channel's monitoring timer.
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
*/
-void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+ unsigned int monitor_timer_interval_us)
{
int ret;
assert(channel);
+ assert(channel->key);
+ assert(!channel->monitor_timer_enabled);
- ret = timer_delete(channel->live_timer);
+ ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
+ monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+ channel->monitor_timer_enabled = !!(ret == 0);
+ return ret;
+}
+
+/*
+ * Stop and delete the channel's monitoring timer.
+ */
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
+{
+ int ret;
+
+ assert(channel);
+ assert(channel->monitor_timer_enabled);
+
+ ret = consumer_channel_timer_stop(&channel->monitor_timer,
+ LTTNG_CONSUMER_SIG_MONITOR);
if (ret == -1) {
- PERROR("timer_delete");
+ ERR("Failed to stop live timer");
+ goto end;
}
- consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
-
- channel->live_timer = 0;
- channel->live_timer_enabled = 0;
+ channel->monitor_timer_enabled = 0;
+end:
+ return ret;
}
/*
return 0;
}
+static
+int sample_channel_positions(struct lttng_consumer_channel *channel,
+ uint64_t *_highest_use, uint64_t *_lowest_use,
+ sample_positions_cb sample, get_consumed_cb get_consumed,
+ get_produced_cb get_produced)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool empty_channel = true;
+ uint64_t high = 0, low = UINT64_MAX;
+ struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key,
+ &iter.iter, stream, node_channel_id.node) {
+ unsigned long produced, consumed, usage;
+
+ empty_channel = false;
+
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
+
+ ret = sample(stream);
+ if (ret) {
+ ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_consumed(stream, &consumed);
+ if (ret) {
+ ERR("Failed to get buffer consumed position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+ ret = get_produced(stream, &produced);
+ if (ret) {
+ ERR("Failed to get buffer produced position in monitor timer");
+ pthread_mutex_unlock(&stream->lock);
+ goto end;
+ }
+
+ usage = produced - consumed;
+ high = (usage > high) ? usage : high;
+ low = (usage < low) ? usage : low;
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ *_highest_use = high;
+ *_lowest_use = low;
+end:
+ rcu_read_unlock();
+ if (empty_channel) {
+ ret = -1;
+ }
+ return ret;
+}
+
+/*
+ * Execute action on a monitor timer.
+ */
+static
+void monitor_timer(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_channel *channel)
+{
+ int ret;
+ int channel_monitor_pipe =
+ consumer_timer_thread_get_channel_monitor_pipe();
+ struct lttcomm_consumer_channel_monitor_msg msg = {
+ .key = channel->key,
+ };
+ sample_positions_cb sample;
+ get_consumed_cb get_consumed;
+ get_produced_cb get_produced;
+
+ assert(channel);
+ pthread_mutex_lock(&consumer_data.lock);
+
+ if (channel_monitor_pipe < 0) {
+ goto end;
+ }
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ sample = lttng_kconsumer_sample_snapshot_positions;
+ get_consumed = lttng_kconsumer_get_consumed_snapshot;
+ get_produced = lttng_kconsumer_get_produced_snapshot;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ sample = lttng_ustconsumer_sample_snapshot_positions;
+ get_consumed = lttng_ustconsumer_get_consumed_snapshot;
+ get_produced = lttng_ustconsumer_get_produced_snapshot;
+ break;
+ default:
+ abort();
+ }
+
+ ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
+ sample, get_consumed, get_produced);
+ if (ret) {
+ goto end;
+ }
+
+ /*
+ * Writes performed here are assumed to be atomic which is only
+ * guaranteed for sizes < than PIPE_BUF.
+ */
+ assert(sizeof(msg) <= PIPE_BUF);
+
+ do {
+ ret = write(channel_monitor_pipe, &msg, sizeof(msg));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ if (errno == EAGAIN) {
+ /* Not an error, the sample is merely dropped. */
+ DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
+ channel->key);
+ } else {
+ PERROR("write to the channel monitor pipe");
+ }
+ } else {
+ DBG("Sent channel monitoring sample for channel key %" PRIu64
+ ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
+ channel->key, msg.highest, msg.lowest);
+ }
+end:
+ pthread_mutex_unlock(&consumer_data.lock);
+}
+
+int consumer_timer_thread_get_channel_monitor_pipe(void)
+{
+ return uatomic_read(&channel_monitor_pipe);
+}
+
+int consumer_timer_thread_set_channel_monitor_pipe(int fd)
+{
+ int ret;
+
+ ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+ if (ret != -1) {
+ ret = -1;
+ goto end;
+ }
+ ret = 0;
+end:
+ return ret;
+}
+
/*
* This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
- * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
+ * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
+ * LTTNG_CONSUMER_SIG_MONITOR.
*/
void *consumer_timer_thread(void *data)
{
health_poll_entry();
signr = sigwaitinfo(&mask, &info);
health_poll_exit();
+
+ /*
+ * NOTE: cascading conditions are used instead of a switch case
+ * since the use of SIGRTMIN in the definition of the signals'
+ * values prevents the reduction to an integer constant.
+ */
if (signr == -1) {
if (errno != EINTR) {
PERROR("sigwaitinfo");
}
continue;
} else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
- metadata_switch_timer(ctx, info.si_signo, &info, NULL);
+ metadata_switch_timer(ctx, info.si_signo, &info);
} else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
cmm_smp_mb();
CMM_STORE_SHARED(timer_signal.qs_done, 1);
cmm_smp_mb();
DBG("Signal timer metadata thread teardown");
} else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
- live_timer(ctx, info.si_signo, &info, NULL);
+ live_timer(ctx, info.si_signo, &info);
+ } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
+ struct lttng_consumer_channel *channel;
+
+ channel = info.si_value.sival_ptr;
+ monitor_timer(ctx, channel);
} else {
ERR("Unexpected signal %d\n", info.si_signo);
}
#define LTTNG_CONSUMER_SIG_SWITCH SIGRTMIN + 10
#define LTTNG_CONSUMER_SIG_TEARDOWN SIGRTMIN + 11
#define LTTNG_CONSUMER_SIG_LIVE SIGRTMIN + 12
+#define LTTNG_CONSUMER_SIG_MONITOR SIGRTMIN + 13
#define CLOCKID CLOCK_MONOTONIC
};
void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
- unsigned int switch_timer_interval);
+ unsigned int switch_timer_interval_us);
void consumer_timer_switch_stop(struct lttng_consumer_channel *channel);
void consumer_timer_live_start(struct lttng_consumer_channel *channel,
- int live_timer_interval);
+ unsigned int live_timer_interval_us);
void consumer_timer_live_stop(struct lttng_consumer_channel *channel);
+int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
+ unsigned int monitor_timer_interval_us);
+int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel);
void *consumer_timer_thread(void *data);
int consumer_signal_init(void);
int consumer_flush_kernel_index(struct lttng_consumer_stream *stream);
int consumer_flush_ust_index(struct lttng_consumer_stream *stream);
+int consumer_timer_thread_get_channel_monitor_pipe(void);
+int consumer_timer_thread_set_channel_monitor_pipe(int fd);
+
#endif /* CONSUMER_TIMER_H */
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
}
+ if (channel->monitor_timer_enabled == 1) {
+ consumer_timer_monitor_stop(channel);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
goto error_metadata_pipe;
}
+ ctx->channel_monitor_pipe = -1;
+
return ctx;
error_metadata_pipe:
/* See lttng-kernel.h enum lttng_kernel_output for channel output */
#define DEFAULT_KERNEL_CHANNEL_OUTPUT LTTNG_EVENT_SPLICE
#define DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_SWITCH_TIMER
+#define DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_MONITOR_TIMER
#define DEFAULT_KERNEL_CHANNEL_READ_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_READ_TIMER
#define DEFAULT_KERNEL_CHANNEL_LIVE_TIMER CONFIG_DEFAULT_KERNEL_CHANNEL_LIVE_TIMER
#define DEFAULT_UST_UID_CHANNEL_SWITCH_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_SWITCH_TIMER
#define DEFAULT_UST_PID_CHANNEL_LIVE_TIMER CONFIG_DEFAULT_UST_PID_CHANNEL_LIVE_TIMER
#define DEFAULT_UST_UID_CHANNEL_LIVE_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_LIVE_TIMER
+#define DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER CONFIG_DEFAULT_UST_PID_CHANNEL_MONITOR_TIMER
+#define DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_MONITOR_TIMER
#define DEFAULT_UST_PID_CHANNEL_READ_TIMER CONFIG_DEFAULT_UST_PID_CHANNEL_READ_TIMER
#define DEFAULT_UST_UID_CHANNEL_READ_TIMER CONFIG_DEFAULT_UST_UID_CHANNEL_READ_TIMER
return ret;
}
+/*
+ * Sample consumed and produced positions for a specific fd.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_kconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+
+ return kernctl_snapshot_sample_positions(stream->wait_fd);
+}
+
/*
* Get the produced position
*
} else {
ret = consumer_add_channel(new_channel, ctx);
}
- if (CONSUMER_CHANNEL_TYPE_DATA) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ int monitor_start_ret;
+
+ DBG("Consumer starting monitor timer");
consumer_timer_live_start(new_channel,
msg.u.channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ new_channel,
+ msg.u.channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_nosignal;
+ }
+
}
health_code_update();
/*
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Copyright (C) 2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
#include <common/consumer/consumer.h>
int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream);
+int lttng_kconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream);
int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos);
int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
#include <lttng/lttng.h>
#include <lttng/snapshot-internal.h>
#include <lttng/save-internal.h>
+#include <lttng/channel-internal.h>
+#include <lttng/trigger/trigger-internal.h>
#include <common/compat/socket.h>
#include <common/uri.h>
#include <common/defaults.h>
LTTNG_SET_SESSION_SHM_PATH = 40,
LTTNG_REGENERATE_METADATA = 41,
LTTNG_REGENERATE_STATEDUMP = 42,
+ LTTNG_REGISTER_TRIGGER = 43,
+ LTTNG_UNREGISTER_TRIGGER = 44,
};
enum lttcomm_relayd_command {
LTTCOMM_CONSUMERD_RELAYD_FAIL, /* Error on remote relayd */
LTTCOMM_CONSUMERD_CHANNEL_FAIL, /* Channel creation failed. */
LTTCOMM_CONSUMERD_CHAN_NOT_FOUND, /* Channel not found. */
+ LTTCOMM_CONSUMERD_ALREADY_SET, /* Resource already set. */
/* MUST be last element */
LTTCOMM_NR, /* Last element */
/* Create channel */
struct {
struct lttng_channel chan LTTNG_PACKED;
+ /* struct lttng_channel_extended is already packed. */
+ struct lttng_channel_extended extended;
} LTTNG_PACKED channel;
/* Context */
struct {
struct {
uint32_t pid;
} LTTNG_PACKED pid_tracker;
+ struct {
+ uint32_t length;
+ } LTTNG_PACKED trigger;
} u;
} LTTNG_PACKED;
uint32_t nb_exclusions;
} LTTNG_PACKED;
-/*
- * Channel extended info.
- */
-struct lttcomm_channel_extended {
- uint64_t discarded_events;
- uint64_t lost_packets;
-} LTTNG_PACKED;
-
/*
* Data structure for the response from sessiond to the lttng client.
*/
uint32_t monitor;
/* timer to check the streams usage in live mode (usec). */
unsigned int live_timer_interval;
+ /* timer to sample a channel's positions (usec). */
+ unsigned int monitor_timer_interval;
} LTTNG_PACKED channel; /* Only used by Kernel. */
struct {
uint64_t stream_key;
int32_t overwrite; /* 1: overwrite, 0: discard */
uint32_t switch_timer_interval; /* usec */
uint32_t read_timer_interval; /* usec */
- unsigned int live_timer_interval; /* usec */
+ unsigned int live_timer_interval; /* usec */
+ uint32_t monitor_timer_interval; /* usec */
int32_t output; /* splice, mmap */
int32_t type; /* metadata or per_cpu */
uint64_t session_id; /* Tracing session id */
} u;
} LTTNG_PACKED;
+/*
+ * Channel monitoring message returned to the session daemon on every
+ * monitor timer expiration.
+ */
+struct lttcomm_consumer_channel_monitor_msg {
+ /* Key of the sampled channel. */
+ uint64_t key;
+ /*
+ * Lowest and highest usage (bytes) at the moment the sample was taken.
+ */
+ uint64_t lowest, highest;
+} LTTNG_PACKED;
+
/*
* Status message returned to the sessiond after a received command.
*/
consumer_timer_switch_start(channel, attr.switch_timer_interval);
attr.switch_timer_interval = 0;
} else {
+ int monitor_start_ret;
+
consumer_timer_live_start(channel,
msg.u.ask_channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ channel,
+ msg.u.ask_channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_channel_error;
+ }
}
health_code_update();
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
}
+ if (channel->monitor_timer_enabled == 1) {
+ consumer_timer_monitor_stop(channel);
+ }
goto end_channel_error;
}
}
/*
- * Take a snapshot for a specific fd
+ * Take a snapshot for a specific stream.
*
* Returns 0 on success, < 0 on error
*/
return ustctl_snapshot(stream->ustream);
}
+/*
+ * Sample consumed and produced positions for a specific stream.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_ustconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_snapshot_sample_positions(stream->ustream);
+}
+
/*
* Get the produced position
*
#ifdef HAVE_LIBLTTNG_UST_CTL
int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream);
+int lttng_ustconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream);
int lttng_ustconsumer_get_produced_snapshot(
struct lttng_consumer_stream *stream, unsigned long *pos);
return -ENOSYS;
}
+static inline
+int lttng_ustconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ return -ENOSYS;
+}
+
static inline
int lttng_ustconsumer_get_produced_snapshot(
struct lttng_consumer_stream *stream, unsigned long *pos)
return -ENOSYS;
}
+static inline
+int lttng_ustconsumer_get_consumed_snapshot(
+ struct lttng_consumer_stream *stream, unsigned long *pos)
+{
+ return -ENOSYS;
+}
+
static inline
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
int sock, struct pollfd *consumer_sockpoll)