lttngconditioninclude_HEADERS= \
lttng/condition/condition.h \
lttng/condition/buffer-usage.h \
+ lttng/condition/session-consumed-size.h \
lttng/condition/evaluation.h
lttngnotificationinclude_HEADERS= \
lttng/action/notify-internal.h \
lttng/condition/condition-internal.h \
lttng/condition/buffer-usage-internal.h \
+ lttng/condition/session-consumed-size-internal.h \
lttng/condition/evaluation-internal.h \
lttng/notification/notification-internal.h \
lttng/trigger/trigger-internal.h \
LTTNG_CONDITION_TYPE_UNKNOWN = -1,
LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW = 102,
LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH = 101,
+ LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE = 100,
};
enum lttng_condition_status {
--- /dev/null
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H
+#define LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H
+
+#include <lttng/condition/session-consumed-size.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/evaluation-internal.h>
+#include <lttng/domain.h>
+#include "common/buffer-view.h"
+
+struct lttng_condition_session_consumed_size {
+ struct lttng_condition parent;
+ struct {
+ bool set;
+ uint64_t value;
+ } consumed_threshold_bytes;
+ char *session_name;
+};
+
+struct lttng_condition_session_consumed_size_comm {
+ uint64_t consumed_threshold_bytes;
+ /* Length includes the trailing \0. */
+ uint32_t session_name_len;
+ char session_name[];
+} LTTNG_PACKED;
+
+struct lttng_evaluation_session_consumed_size {
+ struct lttng_evaluation parent;
+ uint64_t session_consumed;
+};
+
+struct lttng_evaluation_session_consumed_size_comm {
+ uint64_t session_consumed;
+} LTTNG_PACKED;
+
+LTTNG_HIDDEN
+struct lttng_evaluation *lttng_evaluation_session_consumed_size_create(
+ enum lttng_condition_type type, uint64_t consumed);
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_session_consumed_size_create_from_buffer(
+ const struct lttng_buffer_view *view,
+ struct lttng_condition **condition);
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_session_consumed_size_create_from_buffer(
+ const struct lttng_buffer_view *view,
+ struct lttng_evaluation **evaluation);
+
+#endif /* LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H */
--- /dev/null
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H
+#define LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H
+
+#include <lttng/condition/evaluation.h>
+#include <lttng/condition/condition.h>
+#include <stdint.h>
+#include <lttng/domain.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_condition;
+struct lttng_evaluation;
+
+/**
+ * Session consumed size conditions allow an action to be taken whenever a
+ * session's produced data size crosses a set threshold.
+ *
+ * These conditions are periodically evaluated against the current session
+ * statistics. The period at which these conditions are evaluated is
+ * governed by the channels' monitor timer.
+ *
+ * Session consumed size conditions have the following properties:
+ * - the exact name of the session to be monitored,
+ * - a total consumed size threshold, expressed in bytes.
+ *
+ * Wildcards, regular expressions or other globbing mechanisms are not supported
+ * in session consumed size condition properties.
+ */
+
+/*
+ * Create a newly allocated session consumed size condition.
+ *
+ * A session consumed size condition evaluates to true whenever the sum of all
+ * its channels' consumed data size is higher than a set threshold. The
+ * consumed data sizes are free running counters.
+ *
+ * Returns a new condition on success, NULL on failure. This condition must be
+ * destroyed using lttng_condition_destroy().
+ */
+extern struct lttng_condition *
+lttng_condition_session_consumed_size_create(void);
+
+/*
+ * Get the threshold of a session consumed size condition.
+ *
+ * The session consumed size condition's threshold must have been defined as
+ * an absolute value expressed in bytes in order for this call to succeed.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success and a threshold expressed in
+ * bytes, LTTNG_CONDITION_STATUS_INVALID if an invalid parameter is passed, or
+ * LTTNG_CONDITION_STATUS_UNSET if a threshold, expressed as an absolute size in
+ * bytes, was not set prior to this call.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_get_threshold(
+ const struct lttng_condition *condition,
+ uint64_t *consumed_threshold_bytes);
+
+/*
+ * Set the threshold of a session consumed size usage condition.
+ *
+ * Setting a threshold overrides any previously set threshold.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success, LTTNG_CONDITION_STATUS_INVALID
+ * if invalid parameters are passed.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_set_threshold(
+ struct lttng_condition *condition,
+ uint64_t consumed_threshold_bytes);
+
+/*
+ * Get the session name property of a session consumed size condition.
+ *
+ * The caller does not assume the ownership of the returned session name. The
+ * session name shall only be used for the duration of the condition's
+ * lifetime, or before a different session name is set.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK and a pointer to the condition's session
+ * name on success, LTTNG_CONDITION_STATUS_INVALID if an invalid
+ * parameter is passed, or LTTNG_CONDITION_STATUS_UNSET if a session name
+ * was not set prior to this call.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_get_session_name(
+ const struct lttng_condition *condition,
+ const char **session_name);
+
+/*
+ * Set the session name property of a session consumed size condition.
+ *
+ * The passed session name parameter will be copied to the condition.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success, LTTNG_CONDITION_STATUS_INVALID
+ * if invalid parameters are passed.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_set_session_name(
+ struct lttng_condition *condition,
+ const char *session_name);
+
+/**
+ * lttng_evaluation_session_consumed_size is specialised lttng_evaluations
+ * which allow users to query a number of properties resulting from the
+ * evaluation of a condition which evaluated to true.
+ */
+
+/*
+ * Get the session consumed property of a session consumed size evaluation.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success and a threshold expressed in
+ * bytes, or LTTNG_CONDITION_STATUS_INVALID if an invalid parameter is passed.
+ */
+extern enum lttng_evaluation_status
+lttng_evaluation_session_consumed_size_get_consumed_size(
+ const struct lttng_evaluation *evaluation,
+ uint64_t *session_consumed);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H */
#include <lttng/action/notify.h>
#include <lttng/condition/condition.h>
#include <lttng/condition/buffer-usage.h>
+#include <lttng/condition/session-consumed-size.h>
#include <lttng/condition/evaluation.h>
#include <lttng/notification/channel.h>
#include <lttng/notification/notification.h>
#include <lttng/notification/notification-internal.h>
#include <lttng/condition/condition-internal.h>
#include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
#include <lttng/notification/channel-internal.h>
#include <time.h>
struct cds_lfht_node channel_state_ht_node;
uint64_t highest_usage;
uint64_t lowest_usage;
+ uint64_t channel_total_consumed;
};
static unsigned long hash_channel_key(struct channel_key *key);
-static int evaluate_condition(struct lttng_condition *condition,
+static int evaluate_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- struct notification_thread_state *state,
- struct channel_state_sample *previous_sample,
- struct channel_state_sample *latest_sample,
- uint64_t buffer_capacity);
+ const struct notification_thread_state *state,
+ const struct channel_state_sample *previous_sample,
+ const struct channel_state_sample *latest_sample,
+ uint64_t previous_session_consumed_total,
+ uint64_t latest_session_consumed_total,
+ struct channel_info *channel_info);
static
int send_evaluation_to_clients(struct lttng_trigger *trigger,
struct lttng_evaluation *evaluation,
return hash;
}
+static
+unsigned long lttng_condition_session_consumed_size_hash(
+ struct lttng_condition *_condition)
+{
+ unsigned long hash = 0;
+ struct lttng_condition_session_consumed_size *condition;
+ uint64_t val;
+
+ condition = container_of(_condition,
+ struct lttng_condition_session_consumed_size, parent);
+
+ if (condition->session_name) {
+ hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+ }
+ val = condition->consumed_threshold_bytes.value;
+ hash ^= hash_key_u64(&val, lttng_ht_seed);
+ return hash;
+}
+
/*
* The lttng_condition hashing code is kept in this file (rather than
* condition.c) since it makes use of GPLv2 code (hashtable utils), which we
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
return lttng_condition_buffer_usage_hash(condition);
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ return lttng_condition_session_consumed_size_hash(condition);
default:
ERR("[notification-thread] Unexpected condition type caught");
abort();
goto end;
}
- ret = evaluate_condition(condition, &evaluation, state, NULL,
- last_sample, channel_info->capacity);
+ ret = evaluate_condition(condition, &evaluation, state,
+ NULL, last_sample,
+ 0, channel_info->session_info->consumed_data_size,
+ channel_info);
if (ret) {
WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
goto end;
}
static
-bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+bool buffer_usage_condition_applies_to_channel(
+ struct lttng_condition *condition,
struct channel_info *channel_info)
{
enum lttng_condition_status status;
- struct lttng_condition *condition;
- const char *trigger_session_name = NULL;
- const char *trigger_channel_name = NULL;
- enum lttng_domain_type trigger_domain;
+ enum lttng_domain_type condition_domain;
+ const char *condition_session_name = NULL;
+ const char *condition_channel_name = NULL;
- condition = lttng_trigger_get_condition(trigger);
- if (!condition) {
+ status = lttng_condition_buffer_usage_get_domain_type(condition,
+ &condition_domain);
+ assert(status == LTTNG_CONDITION_STATUS_OK);
+ if (channel_info->key.domain != condition_domain) {
goto fail;
}
- switch (lttng_condition_get_type(condition)) {
- case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
- case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
- break;
- default:
+ status = lttng_condition_buffer_usage_get_session_name(
+ condition, &condition_session_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+ status = lttng_condition_buffer_usage_get_channel_name(
+ condition, &condition_channel_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
+
+ if (strcmp(channel_info->session_info->name, condition_session_name)) {
+ goto fail;
+ }
+ if (strcmp(channel_info->name, condition_channel_name)) {
goto fail;
}
- status = lttng_condition_buffer_usage_get_domain_type(condition,
- &trigger_domain);
- assert(status == LTTNG_CONDITION_STATUS_OK);
- if (channel_info->key.domain != trigger_domain) {
+ return true;
+fail:
+ return false;
+}
+
+static
+bool session_consumed_size_condition_applies_to_channel(
+ struct lttng_condition *condition,
+ struct channel_info *channel_info)
+{
+ enum lttng_condition_status status;
+ const char *condition_session_name = NULL;
+
+ status = lttng_condition_session_consumed_size_get_session_name(
+ condition, &condition_session_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+ if (strcmp(channel_info->session_info->name, condition_session_name)) {
goto fail;
}
- status = lttng_condition_buffer_usage_get_session_name(
- condition, &trigger_session_name);
- assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
+ return true;
+fail:
+ return false;
+}
- status = lttng_condition_buffer_usage_get_channel_name(
- condition, &trigger_channel_name);
- assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
+static
+bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+ struct channel_info *channel_info)
+{
+ struct lttng_condition *condition;
+ bool trigger_applies;
- if (strcmp(channel_info->session_info->name, trigger_session_name)) {
+ condition = lttng_trigger_get_condition(trigger);
+ if (!condition) {
goto fail;
}
- if (strcmp(channel_info->name, trigger_channel_name)) {
+
+ switch (lttng_condition_get_type(condition)) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ trigger_applies = buffer_usage_condition_applies_to_channel(
+ condition, channel_info);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ trigger_applies = session_consumed_size_condition_applies_to_channel(
+ condition, channel_info);
+ break;
+ default:
goto fail;
}
- return true;
+ return trigger_applies;
fail:
return false;
}
CDS_INIT_LIST_HEAD(&trigger_list_element->node);
trigger_list_element->trigger = trigger;
cds_list_add(&trigger_list_element->node, &trigger_list->list);
-
- /* A trigger can only apply to one channel. */
- break;
}
/*
}
static
-bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
- struct channel_state_sample *sample, uint64_t buffer_capacity)
+bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
+ const struct channel_state_sample *sample,
+ uint64_t buffer_capacity)
{
bool result = false;
uint64_t threshold;
enum lttng_condition_type condition_type;
- struct lttng_condition_buffer_usage *use_condition = container_of(
+ const struct lttng_condition_buffer_usage *use_condition = container_of(
condition, struct lttng_condition_buffer_usage,
parent);
- if (!sample) {
- goto end;
- }
-
if (use_condition->threshold_bytes.set) {
threshold = use_condition->threshold_bytes.value;
} else {
result = true;
}
}
-end:
+
return result;
}
static
-int evaluate_condition(struct lttng_condition *condition,
+bool evaluate_session_consumed_size_condition(
+ const struct lttng_condition *condition,
+ uint64_t session_consumed_size)
+{
+ uint64_t threshold;
+ const struct lttng_condition_session_consumed_size *size_condition =
+ container_of(condition,
+ struct lttng_condition_session_consumed_size,
+ parent);
+
+ threshold = size_condition->consumed_threshold_bytes.value;
+ DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+ threshold, session_consumed_size);
+ return session_consumed_size >= threshold;
+}
+
+static
+int evaluate_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- struct notification_thread_state *state,
- struct channel_state_sample *previous_sample,
- struct channel_state_sample *latest_sample,
- uint64_t buffer_capacity)
+ const struct notification_thread_state *state,
+ const struct channel_state_sample *previous_sample,
+ const struct channel_state_sample *latest_sample,
+ uint64_t previous_session_consumed_total,
+ uint64_t latest_session_consumed_total,
+ struct channel_info *channel_info)
{
int ret = 0;
enum lttng_condition_type condition_type;
- bool previous_sample_result;
+ const bool previous_sample_available = !!previous_sample;
+ bool previous_sample_result = false;
bool latest_sample_result;
condition_type = lttng_condition_get_type(condition);
- /* No other condition type supported for the moment. */
- assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
- condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
- previous_sample_result = evaluate_buffer_usage_condition(condition,
- previous_sample, buffer_capacity);
- latest_sample_result = evaluate_buffer_usage_condition(condition,
- latest_sample, buffer_capacity);
+ switch (condition_type) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ if (caa_likely(previous_sample_available)) {
+ previous_sample_result =
+ evaluate_buffer_usage_condition(condition,
+ previous_sample, channel_info->capacity);
+ }
+ latest_sample_result = evaluate_buffer_usage_condition(
+ condition, latest_sample,
+ channel_info->capacity);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ if (caa_likely(previous_sample_available)) {
+ previous_sample_result =
+ evaluate_session_consumed_size_condition(
+ condition,
+ previous_session_consumed_total);
+ }
+ latest_sample_result =
+ evaluate_session_consumed_size_condition(
+ condition,
+ latest_session_consumed_total);
+ break;
+ default:
+ /* Unknown condition type; internal error. */
+ abort();
+ }
if (!latest_sample_result ||
(previous_sample_result == latest_sample_result)) {
goto end;
}
- if (evaluation && latest_sample_result) {
+ if (!evaluation || !latest_sample_result) {
+ goto end;
+ }
+
+ switch (condition_type) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
*evaluation = lttng_evaluation_buffer_usage_create(
condition_type,
latest_sample->highest_usage,
- buffer_capacity);
- if (!*evaluation) {
- ret = -1;
- goto end;
- }
+ channel_info->capacity);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ *evaluation = lttng_evaluation_session_consumed_size_create(
+ condition_type,
+ latest_session_consumed_total);
+ break;
+ default:
+ abort();
+ }
+
+ if (!*evaluation) {
+ ret = -1;
+ goto end;
}
end:
return ret;
{
int ret = 0;
struct lttcomm_consumer_channel_monitor_msg sample_msg;
- struct channel_state_sample previous_sample, latest_sample;
struct channel_info *channel_info;
struct cds_lfht_node *node;
struct cds_lfht_iter iter;
struct lttng_channel_trigger_list *trigger_list;
struct lttng_trigger_list_element *trigger_list_element;
bool previous_sample_available = false;
+ struct channel_state_sample previous_sample, latest_sample;
+ uint64_t previous_session_consumed_total, latest_session_consumed_total;
/*
* The monitoring pipe only holds messages smaller than PIPE_BUF,
latest_sample.key.domain = domain;
latest_sample.highest_usage = sample_msg.highest;
latest_sample.lowest_usage = sample_msg.lowest;
+ latest_sample.channel_total_consumed = sample_msg.total_consumed;
rcu_read_lock();
}
channel_info = caa_container_of(node, struct channel_info,
channels_ht_node);
- DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
+ DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
channel_info->name,
latest_sample.key.key,
channel_info->session_info->name,
latest_sample.highest_usage,
- latest_sample.lowest_usage);
+ latest_sample.lowest_usage,
+ latest_sample.channel_total_consumed);
+
+ previous_session_consumed_total =
+ channel_info->session_info->consumed_data_size;
/* Retrieve the channel's last sample, if it exists, and update it. */
cds_lfht_lookup(state->channel_state_ht,
stored_sample = caa_container_of(node,
struct channel_state_sample,
channel_state_ht_node);
+
memcpy(&previous_sample, stored_sample,
sizeof(previous_sample));
stored_sample->highest_usage = latest_sample.highest_usage;
stored_sample->lowest_usage = latest_sample.lowest_usage;
stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
previous_sample_available = true;
+
+ latest_session_consumed_total =
+ previous_session_consumed_total +
+ (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
} else {
/*
* This is the channel's first sample, allocate space for and
cds_lfht_add(state->channel_state_ht,
hash_channel_key(&stored_sample->key),
&stored_sample->channel_state_ht_node);
+
+ latest_session_consumed_total =
+ previous_session_consumed_total +
+ latest_sample.channel_total_consumed;
}
+ channel_info->session_info->consumed_data_size =
+ latest_session_consumed_total;
+
/* Find triggers associated with this channel. */
cds_lfht_lookup(state->channel_triggers_ht,
hash_channel_key(&latest_sample.key),
ret = evaluate_condition(condition, &evaluation, state,
previous_sample_available ? &previous_sample : NULL,
- &latest_sample, channel_info->capacity);
- if (ret) {
+ &latest_sample,
+ previous_session_consumed_total,
+ latest_session_consumed_total,
+ channel_info);
+ if (caa_unlikely(ret)) {
goto end_unlock;
}
- if (!evaluation) {
+ if (caa_likely(!evaluation)) {
continue;
}
struct cds_lfht *channel_infos_ht;
/* Node in the notification thread state's sessions_ht. */
struct cds_lfht_node sessions_ht_node;
+ uint64_t consumed_data_size;
};
struct channel_info {
unix.c unix.h \
filter.c filter.h context.c context.h \
action.c notify.c condition.c buffer-usage.c \
+ session-consumed-size.c \
evaluation.c notification.c trigger.c endpoint.c \
dynamic-buffer.h dynamic-buffer.c \
buffer-view.h buffer-view.c \
#include <lttng/condition/condition-internal.h>
#include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
#include <common/macros.h>
#include <common/error.h>
#include <common/dynamic-buffer.h>
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
create_from_buffer = lttng_condition_buffer_usage_high_create_from_buffer;
break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ create_from_buffer = lttng_condition_session_consumed_size_create_from_buffer;
+ break;
default:
ERR("Attempted to create condition of unknown type (%i)",
(int) condition_comm->condition_type);
static
int sample_channel_positions(struct lttng_consumer_channel *channel,
- uint64_t *_highest_use, uint64_t *_lowest_use,
+ uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
sample_positions_cb sample, get_consumed_cb get_consumed,
get_produced_cb get_produced)
{
uint64_t high = 0, low = UINT64_MAX;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ *_total_consumed = 0;
+
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
usage = produced - consumed;
high = (usage > high) ? usage : high;
low = (usage < low) ? usage : low;
+
+ /*
+ * We don't use consumed here for 2 reasons:
+ * - output_written takes into account the padding written in the
+ * tracefiles when we stop the session;
+ * - the consumed position is not the accurate representation of what
+ * was extracted from a buffer in overwrite mode.
+ */
+ *_total_consumed += stream->output_written;
next:
pthread_mutex_unlock(&stream->lock);
}
}
ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
- sample, get_consumed, get_produced);
+ &msg.total_consumed, sample, get_consumed, get_produced);
if (ret) {
return;
}
#include <lttng/condition/evaluation-internal.h>
#include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
#include <common/macros.h>
#include <common/error.h>
#include <stdbool.h>
}
evaluation_size += ret;
break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ ret = lttng_evaluation_session_consumed_size_create_from_buffer(
+ &evaluation_view, evaluation);
+ if (ret < 0) {
+ goto end;
+ }
+ evaluation_size += ret;
+ break;
default:
ERR("Attempted to create evaluation of unknown type (%i)",
(int) evaluation_comm->type);
--- /dev/null
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
+#include <common/macros.h>
+#include <common/error.h>
+#include <assert.h>
+#include <math.h>
+#include <float.h>
+#include <time.h>
+
+#define IS_CONSUMED_SIZE_CONDITION(condition) ( \
+ lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE \
+ )
+
+#define IS_CONSUMED_SIZE_EVALUATION(evaluation) ( \
+ lttng_evaluation_get_type(evaluation) == LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE \
+ )
+
+static
+void lttng_condition_session_consumed_size_destroy(struct lttng_condition *condition)
+{
+ struct lttng_condition_session_consumed_size *consumed_size;
+
+ consumed_size = container_of(condition,
+ struct lttng_condition_session_consumed_size, parent);
+
+ free(consumed_size->session_name);
+ free(consumed_size);
+}
+
+static
+bool lttng_condition_session_consumed_size_validate(
+ const struct lttng_condition *condition)
+{
+ bool valid = false;
+ struct lttng_condition_session_consumed_size *consumed;
+
+ if (!condition) {
+ goto end;
+ }
+
+ consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+ parent);
+ if (!consumed->session_name) {
+ ERR("Invalid buffer condition: a target session name must be set.");
+ goto end;
+ }
+ if (!consumed->consumed_threshold_bytes.set) {
+ ERR("Invalid session condition: a threshold must be set.");
+ goto end;
+ }
+
+ valid = true;
+end:
+ return valid;
+}
+
+static
+ssize_t lttng_condition_session_consumed_size_serialize(
+ const struct lttng_condition *condition, char *buf)
+{
+ struct lttng_condition_session_consumed_size *consumed;
+ ssize_t ret, size;
+ size_t session_name_len;
+
+ if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition)) {
+ ret = -1;
+ goto end;
+ }
+
+ DBG("Serializing session consumed condition");
+ consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+ parent);
+ size = sizeof(struct lttng_condition_session_consumed_size_comm);
+ session_name_len = strlen(consumed->session_name) + 1;
+ if (session_name_len > LTTNG_NAME_MAX) {
+ ret = -1;
+ goto end;
+ }
+ size += session_name_len;
+ if (buf) {
+ struct lttng_condition_session_consumed_size_comm consumed_comm = {
+ .consumed_threshold_bytes = consumed->consumed_threshold_bytes.value,
+ .session_name_len = session_name_len,
+ };
+
+ memcpy(buf, &consumed_comm, sizeof(consumed_comm));
+ buf += sizeof(consumed_comm);
+ memcpy(buf, consumed->session_name, session_name_len);
+ buf += session_name_len;
+ }
+ ret = size;
+end:
+ return ret;
+}
+
+static
+bool lttng_condition_session_consumed_size_is_equal(const struct lttng_condition *_a,
+ const struct lttng_condition *_b)
+{
+ bool is_equal = false;
+ struct lttng_condition_session_consumed_size *a, *b;
+
+ a = container_of(_a, struct lttng_condition_session_consumed_size, parent);
+ b = container_of(_b, struct lttng_condition_session_consumed_size, parent);
+
+ if (a->consumed_threshold_bytes.set && b->consumed_threshold_bytes.set) {
+ uint64_t a_value, b_value;
+
+ a_value = a->consumed_threshold_bytes.value;
+ b_value = b->consumed_threshold_bytes.value;
+ if (a_value != b_value) {
+ goto end;
+ }
+ }
+
+ if ((a->session_name && !b->session_name) ||
+ (!a->session_name && b->session_name)) {
+ goto end;
+ }
+
+ is_equal = true;
+end:
+ return is_equal;
+}
+
+struct lttng_condition *lttng_condition_session_consumed_size_create(void)
+{
+ struct lttng_condition_session_consumed_size *condition;
+
+ condition = zmalloc(sizeof(struct lttng_condition_session_consumed_size));
+ if (!condition) {
+ return NULL;
+ }
+
+ lttng_condition_init(&condition->parent, LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE);
+ condition->parent.validate = lttng_condition_session_consumed_size_validate;
+ condition->parent.serialize = lttng_condition_session_consumed_size_serialize;
+ condition->parent.equal = lttng_condition_session_consumed_size_is_equal;
+ condition->parent.destroy = lttng_condition_session_consumed_size_destroy;
+ return &condition->parent;
+}
+
+static
+ssize_t init_condition_from_buffer(struct lttng_condition *condition,
+ const struct lttng_buffer_view *src_view)
+{
+ ssize_t ret, condition_size;
+ enum lttng_condition_status status;
+ const struct lttng_condition_session_consumed_size_comm *condition_comm;
+ const char *session_name;
+ struct lttng_buffer_view names_view;
+
+ if (src_view->size < sizeof(*condition_comm)) {
+ ERR("Failed to initialize from malformed condition buffer: buffer too short to contain header");
+ ret = -1;
+ goto end;
+ }
+
+ condition_comm = (const struct lttng_condition_session_consumed_size_comm *) src_view->data;
+ names_view = lttng_buffer_view_from_view(src_view,
+ sizeof(*condition_comm), -1);
+
+ if (condition_comm->session_name_len > LTTNG_NAME_MAX) {
+ ERR("Failed to initialize from malformed condition buffer: name exceeds LTTNG_MAX_NAME");
+ ret = -1;
+ goto end;
+ }
+
+ if (names_view.size < condition_comm->session_name_len) {
+ ERR("Failed to initialize from malformed condition buffer: buffer too short to contain element names");
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_condition_session_consumed_size_set_threshold(condition,
+ condition_comm->consumed_threshold_bytes);
+ if (status != LTTNG_CONDITION_STATUS_OK) {
+ ERR("Failed to initialize session consumed condition threshold");
+ ret = -1;
+ goto end;
+ }
+
+ session_name = names_view.data;
+ if (*(session_name + condition_comm->session_name_len - 1) != '\0') {
+ ERR("Malformed session name encountered in condition buffer");
+ ret = -1;
+ goto end;
+ }
+
+ status = lttng_condition_session_consumed_size_set_session_name(condition,
+ session_name);
+ if (status != LTTNG_CONDITION_STATUS_OK) {
+ ERR("Failed to set buffer consumed session name");
+ ret = -1;
+ goto end;
+ }
+
+ if (!lttng_condition_validate(condition)) {
+ ret = -1;
+ goto end;
+ }
+
+ condition_size = sizeof(*condition_comm) +
+ (ssize_t) condition_comm->session_name_len;
+ ret = condition_size;
+end:
+ return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_session_consumed_size_create_from_buffer(
+ const struct lttng_buffer_view *view,
+ struct lttng_condition **_condition)
+{
+ ssize_t ret;
+ struct lttng_condition *condition =
+ lttng_condition_session_consumed_size_create();
+
+ if (!_condition || !condition) {
+ ret = -1;
+ goto error;
+ }
+
+ ret = init_condition_from_buffer(condition, view);
+ if (ret < 0) {
+ goto error;
+ }
+
+ *_condition = condition;
+ return ret;
+error:
+ lttng_condition_destroy(condition);
+ return ret;
+}
+
+static
+struct lttng_evaluation *create_evaluation_from_buffer(
+ enum lttng_condition_type type,
+ const struct lttng_buffer_view *view)
+{
+ const struct lttng_evaluation_session_consumed_size_comm *comm =
+ (const struct lttng_evaluation_session_consumed_size_comm *) view->data;
+ struct lttng_evaluation *evaluation = NULL;
+
+ if (view->size < sizeof(*comm)) {
+ goto end;
+ }
+
+ evaluation = lttng_evaluation_session_consumed_size_create(type,
+ comm->session_consumed);
+end:
+ return evaluation;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_session_consumed_size_create_from_buffer(
+ const struct lttng_buffer_view *view,
+ struct lttng_evaluation **_evaluation)
+{
+ ssize_t ret;
+ struct lttng_evaluation *evaluation = NULL;
+
+ if (!_evaluation) {
+ ret = -1;
+ goto error;
+ }
+
+ evaluation = create_evaluation_from_buffer(
+ LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE, view);
+ if (!evaluation) {
+ ret = -1;
+ goto error;
+ }
+
+ *_evaluation = evaluation;
+ ret = sizeof(struct lttng_evaluation_session_consumed_size_comm);
+ return ret;
+error:
+ lttng_evaluation_destroy(evaluation);
+ return ret;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_get_threshold(
+ const struct lttng_condition *condition,
+ uint64_t *consumed_threshold_bytes)
+{
+ struct lttng_condition_session_consumed_size *consumed;
+ enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+ if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || !consumed_threshold_bytes) {
+ status = LTTNG_CONDITION_STATUS_INVALID;
+ goto end;
+ }
+
+ consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+ parent);
+ if (!consumed->consumed_threshold_bytes.set) {
+ status = LTTNG_CONDITION_STATUS_UNSET;
+ goto end;
+ }
+ *consumed_threshold_bytes = consumed->consumed_threshold_bytes.value;
+end:
+ return status;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_set_threshold(
+ struct lttng_condition *condition, uint64_t consumed_threshold_bytes)
+{
+ struct lttng_condition_session_consumed_size *consumed;
+ enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+ if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition)) {
+ status = LTTNG_CONDITION_STATUS_INVALID;
+ goto end;
+ }
+
+ consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+ parent);
+ consumed->consumed_threshold_bytes.set = true;
+ consumed->consumed_threshold_bytes.value = consumed_threshold_bytes;
+end:
+ return status;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_get_session_name(
+ const struct lttng_condition *condition,
+ const char **session_name)
+{
+ struct lttng_condition_session_consumed_size *consumed;
+ enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+ if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || !session_name) {
+ status = LTTNG_CONDITION_STATUS_INVALID;
+ goto end;
+ }
+
+ consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+ parent);
+ if (!consumed->session_name) {
+ status = LTTNG_CONDITION_STATUS_UNSET;
+ goto end;
+ }
+ *session_name = consumed->session_name;
+end:
+ return status;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_set_session_name(
+ struct lttng_condition *condition, const char *session_name)
+{
+ char *session_name_copy;
+ struct lttng_condition_session_consumed_size *consumed;
+ enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+ if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) ||
+ !session_name || strlen(session_name) == 0) {
+ status = LTTNG_CONDITION_STATUS_INVALID;
+ goto end;
+ }
+
+ consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+ parent);
+ session_name_copy = strdup(session_name);
+ if (!session_name_copy) {
+ status = LTTNG_CONDITION_STATUS_ERROR;
+ goto end;
+ }
+
+ if (consumed->session_name) {
+ free(consumed->session_name);
+ }
+ consumed->session_name = session_name_copy;
+end:
+ return status;
+}
+
+static
+ssize_t lttng_evaluation_session_consumed_size_serialize(
+ struct lttng_evaluation *evaluation, char *buf)
+{
+ ssize_t ret;
+ struct lttng_evaluation_session_consumed_size *consumed;
+
+ consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size,
+ parent);
+ if (buf) {
+ struct lttng_evaluation_session_consumed_size_comm comm = {
+ .session_consumed = consumed->session_consumed,
+ };
+
+ memcpy(buf, &comm, sizeof(comm));
+ }
+
+ ret = sizeof(struct lttng_evaluation_session_consumed_size_comm);
+ return ret;
+}
+
+static
+void lttng_evaluation_session_consumed_size_destroy(
+ struct lttng_evaluation *evaluation)
+{
+ struct lttng_evaluation_session_consumed_size *consumed;
+
+ consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size,
+ parent);
+ free(consumed);
+}
+
+LTTNG_HIDDEN
+struct lttng_evaluation *lttng_evaluation_session_consumed_size_create(
+ enum lttng_condition_type type, uint64_t consumed)
+{
+ struct lttng_evaluation_session_consumed_size *consumed_eval;
+
+ consumed_eval = zmalloc(sizeof(struct lttng_evaluation_session_consumed_size));
+ if (!consumed_eval) {
+ goto end;
+ }
+
+ consumed_eval->parent.type = type;
+ consumed_eval->session_consumed = consumed;
+ consumed_eval->parent.serialize = lttng_evaluation_session_consumed_size_serialize;
+ consumed_eval->parent.destroy = lttng_evaluation_session_consumed_size_destroy;
+end:
+ return &consumed_eval->parent;
+}
+
+enum lttng_evaluation_status
+lttng_evaluation_session_consumed_size_get_consumed_size(
+ const struct lttng_evaluation *evaluation,
+ uint64_t *session_consumed)
+{
+ struct lttng_evaluation_session_consumed_size *consumed;
+ enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK;
+
+ if (!evaluation || !IS_CONSUMED_SIZE_EVALUATION(evaluation) ||
+ !session_consumed) {
+ status = LTTNG_EVALUATION_STATUS_INVALID;
+ goto end;
+ }
+
+ consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size,
+ parent);
+ *session_consumed = consumed->session_consumed;
+end:
+ return status;
+}
* Lowest and highest usage (bytes) at the moment the sample was taken.
*/
uint64_t lowest, highest;
+ /*
+ * Sum of all the consumed positions for a channel.
+ */
+ uint64_t total_consumed;
} LTTNG_PACKED;
/*