From e8360425c2fd0f8cfef1e678af5adfde7ae0a68e Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Fri, 9 Feb 2018 14:50:28 -0500 Subject: [PATCH] Session consumed size notification MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Add the support for notifications about the total amount of trace data consumed for a session. The user can register itself to be notified when a session has consumed more than a threshold. This sums the data for all channels in a session. For the review: part of this code was written by Jérémie, but it was on top of my development branch with major updates on my early work with notifications, so I had to squash it because it made no sense to keep Jérémie's code separate. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- include/Makefile.am | 2 + include/lttng/condition/condition.h | 1 + .../session-consumed-size-internal.h | 66 +++ .../lttng/condition/session-consumed-size.h | 142 ++++++ include/lttng/lttng.h | 1 + .../notification-thread-events.c | 273 +++++++--- .../notification-thread-internal.h | 1 + src/common/Makefile.am | 1 + src/common/condition.c | 4 + src/common/consumer/consumer-timer.c | 15 +- src/common/evaluation.c | 9 + src/common/session-consumed-size.c | 468 ++++++++++++++++++ src/common/sessiond-comm/sessiond-comm.h | 4 + 13 files changed, 917 insertions(+), 70 deletions(-) create mode 100644 include/lttng/condition/session-consumed-size-internal.h create mode 100644 include/lttng/condition/session-consumed-size.h create mode 100644 src/common/session-consumed-size.c diff --git a/include/Makefile.am b/include/Makefile.am index ab4665d5b..ddf252899 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -88,6 +88,7 @@ lttngactioninclude_HEADERS= \ lttngconditioninclude_HEADERS= \ lttng/condition/condition.h \ lttng/condition/buffer-usage.h \ + lttng/condition/session-consumed-size.h \ lttng/condition/evaluation.h lttngnotificationinclude_HEADERS= \ @@ -106,6 +107,7 @@ noinst_HEADERS = \ lttng/action/notify-internal.h \ lttng/condition/condition-internal.h \ lttng/condition/buffer-usage-internal.h \ + lttng/condition/session-consumed-size-internal.h \ lttng/condition/evaluation-internal.h \ lttng/notification/notification-internal.h \ lttng/trigger/trigger-internal.h \ diff --git a/include/lttng/condition/condition.h b/include/lttng/condition/condition.h index 71762ab01..ae51e8a7a 100644 --- a/include/lttng/condition/condition.h +++ b/include/lttng/condition/condition.h @@ -30,6 +30,7 @@ enum lttng_condition_type { LTTNG_CONDITION_TYPE_UNKNOWN = -1, LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW = 102, LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH = 101, + LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE = 100, }; enum lttng_condition_status { diff --git a/include/lttng/condition/session-consumed-size-internal.h b/include/lttng/condition/session-consumed-size-internal.h new file mode 100644 index 000000000..8d4b94889 --- /dev/null +++ b/include/lttng/condition/session-consumed-size-internal.h @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H +#define LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H + +#include +#include +#include +#include +#include "common/buffer-view.h" + +struct lttng_condition_session_consumed_size { + struct lttng_condition parent; + struct { + bool set; + uint64_t value; + } consumed_threshold_bytes; + char *session_name; +}; + +struct lttng_condition_session_consumed_size_comm { + uint64_t consumed_threshold_bytes; + /* Length includes the trailing \0. */ + uint32_t session_name_len; + char session_name[]; +} LTTNG_PACKED; + +struct lttng_evaluation_session_consumed_size { + struct lttng_evaluation parent; + uint64_t session_consumed; +}; + +struct lttng_evaluation_session_consumed_size_comm { + uint64_t session_consumed; +} LTTNG_PACKED; + +LTTNG_HIDDEN +struct lttng_evaluation *lttng_evaluation_session_consumed_size_create( + enum lttng_condition_type type, uint64_t consumed); + +LTTNG_HIDDEN +ssize_t lttng_condition_session_consumed_size_create_from_buffer( + const struct lttng_buffer_view *view, + struct lttng_condition **condition); + +LTTNG_HIDDEN +ssize_t lttng_evaluation_session_consumed_size_create_from_buffer( + const struct lttng_buffer_view *view, + struct lttng_evaluation **evaluation); + +#endif /* LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H */ diff --git a/include/lttng/condition/session-consumed-size.h b/include/lttng/condition/session-consumed-size.h new file mode 100644 index 000000000..4079e8c0b --- /dev/null +++ b/include/lttng/condition/session-consumed-size.h @@ -0,0 +1,142 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H +#define LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct lttng_condition; +struct lttng_evaluation; + +/** + * Session consumed size conditions allow an action to be taken whenever a + * session's produced data size crosses a set threshold. + * + * These conditions are periodically evaluated against the current session + * statistics. The period at which these conditions are evaluated is + * governed by the channels' monitor timer. + * + * Session consumed size conditions have the following properties: + * - the exact name of the session to be monitored, + * - a total consumed size threshold, expressed in bytes. + * + * Wildcards, regular expressions or other globbing mechanisms are not supported + * in session consumed size condition properties. + */ + +/* + * Create a newly allocated session consumed size condition. + * + * A session consumed size condition evaluates to true whenever the sum of all + * its channels' consumed data size is higher than a set threshold. The + * consumed data sizes are free running counters. + * + * Returns a new condition on success, NULL on failure. This condition must be + * destroyed using lttng_condition_destroy(). + */ +extern struct lttng_condition * +lttng_condition_session_consumed_size_create(void); + +/* + * Get the threshold of a session consumed size condition. + * + * The session consumed size condition's threshold must have been defined as + * an absolute value expressed in bytes in order for this call to succeed. + * + * Returns LTTNG_CONDITION_STATUS_OK on success and a threshold expressed in + * bytes, LTTNG_CONDITION_STATUS_INVALID if an invalid parameter is passed, or + * LTTNG_CONDITION_STATUS_UNSET if a threshold, expressed as an absolute size in + * bytes, was not set prior to this call. + */ +extern enum lttng_condition_status +lttng_condition_session_consumed_size_get_threshold( + const struct lttng_condition *condition, + uint64_t *consumed_threshold_bytes); + +/* + * Set the threshold of a session consumed size usage condition. + * + * Setting a threshold overrides any previously set threshold. + * + * Returns LTTNG_CONDITION_STATUS_OK on success, LTTNG_CONDITION_STATUS_INVALID + * if invalid parameters are passed. + */ +extern enum lttng_condition_status +lttng_condition_session_consumed_size_set_threshold( + struct lttng_condition *condition, + uint64_t consumed_threshold_bytes); + +/* + * Get the session name property of a session consumed size condition. + * + * The caller does not assume the ownership of the returned session name. The + * session name shall only be used for the duration of the condition's + * lifetime, or before a different session name is set. + * + * Returns LTTNG_CONDITION_STATUS_OK and a pointer to the condition's session + * name on success, LTTNG_CONDITION_STATUS_INVALID if an invalid + * parameter is passed, or LTTNG_CONDITION_STATUS_UNSET if a session name + * was not set prior to this call. + */ +extern enum lttng_condition_status +lttng_condition_session_consumed_size_get_session_name( + const struct lttng_condition *condition, + const char **session_name); + +/* + * Set the session name property of a session consumed size condition. + * + * The passed session name parameter will be copied to the condition. + * + * Returns LTTNG_CONDITION_STATUS_OK on success, LTTNG_CONDITION_STATUS_INVALID + * if invalid parameters are passed. + */ +extern enum lttng_condition_status +lttng_condition_session_consumed_size_set_session_name( + struct lttng_condition *condition, + const char *session_name); + +/** + * lttng_evaluation_session_consumed_size is specialised lttng_evaluations + * which allow users to query a number of properties resulting from the + * evaluation of a condition which evaluated to true. + */ + +/* + * Get the session consumed property of a session consumed size evaluation. + * + * Returns LTTNG_CONDITION_STATUS_OK on success and a threshold expressed in + * bytes, or LTTNG_CONDITION_STATUS_INVALID if an invalid parameter is passed. + */ +extern enum lttng_evaluation_status +lttng_evaluation_session_consumed_size_get_consumed_size( + const struct lttng_evaluation *evaluation, + uint64_t *session_consumed); + +#ifdef __cplusplus +} +#endif + +#endif /* LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H */ diff --git a/include/lttng/lttng.h b/include/lttng/lttng.h index 1dfac7849..d13f9639f 100644 --- a/include/lttng/lttng.h +++ b/include/lttng/lttng.h @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include diff --git a/src/bin/lttng-sessiond/notification-thread-events.c b/src/bin/lttng-sessiond/notification-thread-events.c index df801be59..6a7477aff 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.c +++ b/src/bin/lttng-sessiond/notification-thread-events.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -152,15 +153,18 @@ struct channel_state_sample { struct cds_lfht_node channel_state_ht_node; uint64_t highest_usage; uint64_t lowest_usage; + uint64_t channel_total_consumed; }; static unsigned long hash_channel_key(struct channel_key *key); -static int evaluate_condition(struct lttng_condition *condition, +static int evaluate_condition(const struct lttng_condition *condition, struct lttng_evaluation **evaluation, - struct notification_thread_state *state, - struct channel_state_sample *previous_sample, - struct channel_state_sample *latest_sample, - uint64_t buffer_capacity); + const struct notification_thread_state *state, + const struct channel_state_sample *previous_sample, + const struct channel_state_sample *latest_sample, + uint64_t previous_session_consumed_total, + uint64_t latest_session_consumed_total, + struct channel_info *channel_info); static int send_evaluation_to_clients(struct lttng_trigger *trigger, struct lttng_evaluation *evaluation, @@ -321,6 +325,25 @@ unsigned long lttng_condition_buffer_usage_hash( return hash; } +static +unsigned long lttng_condition_session_consumed_size_hash( + struct lttng_condition *_condition) +{ + unsigned long hash = 0; + struct lttng_condition_session_consumed_size *condition; + uint64_t val; + + condition = container_of(_condition, + struct lttng_condition_session_consumed_size, parent); + + if (condition->session_name) { + hash ^= hash_key_str(condition->session_name, lttng_ht_seed); + } + val = condition->consumed_threshold_bytes.value; + hash ^= hash_key_u64(&val, lttng_ht_seed); + return hash; +} + /* * The lttng_condition hashing code is kept in this file (rather than * condition.c) since it makes use of GPLv2 code (hashtable utils), which we @@ -333,6 +356,8 @@ unsigned long lttng_condition_hash(struct lttng_condition *condition) case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: return lttng_condition_buffer_usage_hash(condition); + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + return lttng_condition_session_consumed_size_hash(condition); default: ERR("[notification-thread] Unexpected condition type caught"); abort(); @@ -574,8 +599,10 @@ int evaluate_condition_for_client(struct lttng_trigger *trigger, goto end; } - ret = evaluate_condition(condition, &evaluation, state, NULL, - last_sample, channel_info->capacity); + ret = evaluate_condition(condition, &evaluation, state, + NULL, last_sample, + 0, channel_info->session_info->consumed_data_size, + channel_info); if (ret) { WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition"); goto end; @@ -844,51 +871,90 @@ end: } static -bool trigger_applies_to_channel(struct lttng_trigger *trigger, +bool buffer_usage_condition_applies_to_channel( + struct lttng_condition *condition, struct channel_info *channel_info) { enum lttng_condition_status status; - struct lttng_condition *condition; - const char *trigger_session_name = NULL; - const char *trigger_channel_name = NULL; - enum lttng_domain_type trigger_domain; + enum lttng_domain_type condition_domain; + const char *condition_session_name = NULL; + const char *condition_channel_name = NULL; - condition = lttng_trigger_get_condition(trigger); - if (!condition) { + status = lttng_condition_buffer_usage_get_domain_type(condition, + &condition_domain); + assert(status == LTTNG_CONDITION_STATUS_OK); + if (channel_info->key.domain != condition_domain) { goto fail; } - switch (lttng_condition_get_type(condition)) { - case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: - case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: - break; - default: + status = lttng_condition_buffer_usage_get_session_name( + condition, &condition_session_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name); + + status = lttng_condition_buffer_usage_get_channel_name( + condition, &condition_channel_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name); + + if (strcmp(channel_info->session_info->name, condition_session_name)) { + goto fail; + } + if (strcmp(channel_info->name, condition_channel_name)) { goto fail; } - status = lttng_condition_buffer_usage_get_domain_type(condition, - &trigger_domain); - assert(status == LTTNG_CONDITION_STATUS_OK); - if (channel_info->key.domain != trigger_domain) { + return true; +fail: + return false; +} + +static +bool session_consumed_size_condition_applies_to_channel( + struct lttng_condition *condition, + struct channel_info *channel_info) +{ + enum lttng_condition_status status; + const char *condition_session_name = NULL; + + status = lttng_condition_session_consumed_size_get_session_name( + condition, &condition_session_name); + assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name); + + if (strcmp(channel_info->session_info->name, condition_session_name)) { goto fail; } - status = lttng_condition_buffer_usage_get_session_name( - condition, &trigger_session_name); - assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name); + return true; +fail: + return false; +} - status = lttng_condition_buffer_usage_get_channel_name( - condition, &trigger_channel_name); - assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name); +static +bool trigger_applies_to_channel(struct lttng_trigger *trigger, + struct channel_info *channel_info) +{ + struct lttng_condition *condition; + bool trigger_applies; - if (strcmp(channel_info->session_info->name, trigger_session_name)) { + condition = lttng_trigger_get_condition(trigger); + if (!condition) { goto fail; } - if (strcmp(channel_info->name, trigger_channel_name)) { + + switch (lttng_condition_get_type(condition)) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + trigger_applies = buffer_usage_condition_applies_to_channel( + condition, channel_info); + break; + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + trigger_applies = session_consumed_size_condition_applies_to_channel( + condition, channel_info); + break; + default: goto fail; } - return true; + return trigger_applies; fail: return false; } @@ -1315,9 +1381,6 @@ int handle_notification_thread_command_register_trigger( CDS_INIT_LIST_HEAD(&trigger_list_element->node); trigger_list_element->trigger = trigger; cds_list_add(&trigger_list_element->node, &trigger_list->list); - - /* A trigger can only apply to one channel. */ - break; } /* @@ -2129,20 +2192,17 @@ end: } static -bool evaluate_buffer_usage_condition(struct lttng_condition *condition, - struct channel_state_sample *sample, uint64_t buffer_capacity) +bool evaluate_buffer_usage_condition(const struct lttng_condition *condition, + const struct channel_state_sample *sample, + uint64_t buffer_capacity) { bool result = false; uint64_t threshold; enum lttng_condition_type condition_type; - struct lttng_condition_buffer_usage *use_condition = container_of( + const struct lttng_condition_buffer_usage *use_condition = container_of( condition, struct lttng_condition_buffer_usage, parent); - if (!sample) { - goto end; - } - if (use_condition->threshold_bytes.set) { threshold = use_condition->threshold_bytes.value; } else { @@ -2186,32 +2246,73 @@ bool evaluate_buffer_usage_condition(struct lttng_condition *condition, result = true; } } -end: + return result; } static -int evaluate_condition(struct lttng_condition *condition, +bool evaluate_session_consumed_size_condition( + const struct lttng_condition *condition, + uint64_t session_consumed_size) +{ + uint64_t threshold; + const struct lttng_condition_session_consumed_size *size_condition = + container_of(condition, + struct lttng_condition_session_consumed_size, + parent); + + threshold = size_condition->consumed_threshold_bytes.value; + DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64, + threshold, session_consumed_size); + return session_consumed_size >= threshold; +} + +static +int evaluate_condition(const struct lttng_condition *condition, struct lttng_evaluation **evaluation, - struct notification_thread_state *state, - struct channel_state_sample *previous_sample, - struct channel_state_sample *latest_sample, - uint64_t buffer_capacity) + const struct notification_thread_state *state, + const struct channel_state_sample *previous_sample, + const struct channel_state_sample *latest_sample, + uint64_t previous_session_consumed_total, + uint64_t latest_session_consumed_total, + struct channel_info *channel_info) { int ret = 0; enum lttng_condition_type condition_type; - bool previous_sample_result; + const bool previous_sample_available = !!previous_sample; + bool previous_sample_result = false; bool latest_sample_result; condition_type = lttng_condition_get_type(condition); - /* No other condition type supported for the moment. */ - assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW || - condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH); - previous_sample_result = evaluate_buffer_usage_condition(condition, - previous_sample, buffer_capacity); - latest_sample_result = evaluate_buffer_usage_condition(condition, - latest_sample, buffer_capacity); + switch (condition_type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: + if (caa_likely(previous_sample_available)) { + previous_sample_result = + evaluate_buffer_usage_condition(condition, + previous_sample, channel_info->capacity); + } + latest_sample_result = evaluate_buffer_usage_condition( + condition, latest_sample, + channel_info->capacity); + break; + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + if (caa_likely(previous_sample_available)) { + previous_sample_result = + evaluate_session_consumed_size_condition( + condition, + previous_session_consumed_total); + } + latest_sample_result = + evaluate_session_consumed_size_condition( + condition, + latest_session_consumed_total); + break; + default: + /* Unknown condition type; internal error. */ + abort(); + } if (!latest_sample_result || (previous_sample_result == latest_sample_result)) { @@ -2224,15 +2325,30 @@ int evaluate_condition(struct lttng_condition *condition, goto end; } - if (evaluation && latest_sample_result) { + if (!evaluation || !latest_sample_result) { + goto end; + } + + switch (condition_type) { + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW: + case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: *evaluation = lttng_evaluation_buffer_usage_create( condition_type, latest_sample->highest_usage, - buffer_capacity); - if (!*evaluation) { - ret = -1; - goto end; - } + channel_info->capacity); + break; + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + *evaluation = lttng_evaluation_session_consumed_size_create( + condition_type, + latest_session_consumed_total); + break; + default: + abort(); + } + + if (!*evaluation) { + ret = -1; + goto end; } end: return ret; @@ -2369,13 +2485,14 @@ int handle_notification_thread_channel_sample( { int ret = 0; struct lttcomm_consumer_channel_monitor_msg sample_msg; - struct channel_state_sample previous_sample, latest_sample; struct channel_info *channel_info; struct cds_lfht_node *node; struct cds_lfht_iter iter; struct lttng_channel_trigger_list *trigger_list; struct lttng_trigger_list_element *trigger_list_element; bool previous_sample_available = false; + struct channel_state_sample previous_sample, latest_sample; + uint64_t previous_session_consumed_total, latest_session_consumed_total; /* * The monitoring pipe only holds messages smaller than PIPE_BUF, @@ -2394,6 +2511,7 @@ int handle_notification_thread_channel_sample( latest_sample.key.domain = domain; latest_sample.highest_usage = sample_msg.highest; latest_sample.lowest_usage = sample_msg.lowest; + latest_sample.channel_total_consumed = sample_msg.total_consumed; rcu_read_lock(); @@ -2419,12 +2537,16 @@ int handle_notification_thread_channel_sample( } channel_info = caa_container_of(node, struct channel_info, channels_ht_node); - DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")", + DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")", channel_info->name, latest_sample.key.key, channel_info->session_info->name, latest_sample.highest_usage, - latest_sample.lowest_usage); + latest_sample.lowest_usage, + latest_sample.channel_total_consumed); + + previous_session_consumed_total = + channel_info->session_info->consumed_data_size; /* Retrieve the channel's last sample, if it exists, and update it. */ cds_lfht_lookup(state->channel_state_ht, @@ -2440,12 +2562,17 @@ int handle_notification_thread_channel_sample( stored_sample = caa_container_of(node, struct channel_state_sample, channel_state_ht_node); + memcpy(&previous_sample, stored_sample, sizeof(previous_sample)); stored_sample->highest_usage = latest_sample.highest_usage; stored_sample->lowest_usage = latest_sample.lowest_usage; stored_sample->channel_total_consumed = latest_sample.channel_total_consumed; previous_sample_available = true; + + latest_session_consumed_total = + previous_session_consumed_total + + (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed); } else { /* * This is the channel's first sample, allocate space for and @@ -2464,8 +2591,15 @@ int handle_notification_thread_channel_sample( cds_lfht_add(state->channel_state_ht, hash_channel_key(&stored_sample->key), &stored_sample->channel_state_ht_node); + + latest_session_consumed_total = + previous_session_consumed_total + + latest_sample.channel_total_consumed; } + channel_info->session_info->consumed_data_size = + latest_session_consumed_total; + /* Find triggers associated with this channel. */ cds_lfht_lookup(state->channel_triggers_ht, hash_channel_key(&latest_sample.key), @@ -2521,12 +2655,15 @@ int handle_notification_thread_channel_sample( ret = evaluate_condition(condition, &evaluation, state, previous_sample_available ? &previous_sample : NULL, - &latest_sample, channel_info->capacity); - if (ret) { + &latest_sample, + previous_session_consumed_total, + latest_session_consumed_total, + channel_info); + if (caa_unlikely(ret)) { goto end_unlock; } - if (!evaluation) { + if (caa_likely(!evaluation)) { continue; } diff --git a/src/bin/lttng-sessiond/notification-thread-internal.h b/src/bin/lttng-sessiond/notification-thread-internal.h index 8149e2c21..b135f128c 100644 --- a/src/bin/lttng-sessiond/notification-thread-internal.h +++ b/src/bin/lttng-sessiond/notification-thread-internal.h @@ -40,6 +40,7 @@ struct session_info { struct cds_lfht *channel_infos_ht; /* Node in the notification thread state's sessions_ht. */ struct cds_lfht_node sessions_ht_node; + uint64_t consumed_data_size; }; struct channel_info { diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 3995b2f76..5392bf4d9 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -72,6 +72,7 @@ libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \ unix.c unix.h \ filter.c filter.h context.c context.h \ action.c notify.c condition.c buffer-usage.c \ + session-consumed-size.c \ evaluation.c notification.c trigger.c endpoint.c \ dynamic-buffer.h dynamic-buffer.c \ buffer-view.h buffer-view.c \ diff --git a/src/common/condition.c b/src/common/condition.c index 98c97a595..60d32e6cc 100644 --- a/src/common/condition.c +++ b/src/common/condition.c @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -136,6 +137,9 @@ ssize_t lttng_condition_create_from_buffer( case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH: create_from_buffer = lttng_condition_buffer_usage_high_create_from_buffer; break; + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + create_from_buffer = lttng_condition_session_consumed_size_create_from_buffer; + break; default: ERR("Attempted to create condition of unknown type (%i)", (int) condition_comm->condition_type); diff --git a/src/common/consumer/consumer-timer.c b/src/common/consumer/consumer-timer.c index e42940ed9..721f146e2 100644 --- a/src/common/consumer/consumer-timer.c +++ b/src/common/consumer/consumer-timer.c @@ -633,7 +633,7 @@ int consumer_signal_init(void) static int sample_channel_positions(struct lttng_consumer_channel *channel, - uint64_t *_highest_use, uint64_t *_lowest_use, + uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed, sample_positions_cb sample, get_consumed_cb get_consumed, get_produced_cb get_produced) { @@ -644,6 +644,8 @@ int sample_channel_positions(struct lttng_consumer_channel *channel, uint64_t high = 0, low = UINT64_MAX; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; + *_total_consumed = 0; + rcu_read_lock(); cds_lfht_for_each_entry_duplicate(ht->ht, @@ -681,6 +683,15 @@ int sample_channel_positions(struct lttng_consumer_channel *channel, usage = produced - consumed; high = (usage > high) ? usage : high; low = (usage < low) ? usage : low; + + /* + * We don't use consumed here for 2 reasons: + * - output_written takes into account the padding written in the + * tracefiles when we stop the session; + * - the consumed position is not the accurate representation of what + * was extracted from a buffer in overwrite mode. + */ + *_total_consumed += stream->output_written; next: pthread_mutex_unlock(&stream->lock); } @@ -735,7 +746,7 @@ void monitor_timer(struct lttng_consumer_local_data *ctx, } ret = sample_channel_positions(channel, &msg.highest, &msg.lowest, - sample, get_consumed, get_produced); + &msg.total_consumed, sample, get_consumed, get_produced); if (ret) { return; } diff --git a/src/common/evaluation.c b/src/common/evaluation.c index 43b4e743f..b2c2df815 100644 --- a/src/common/evaluation.c +++ b/src/common/evaluation.c @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -86,6 +87,14 @@ ssize_t lttng_evaluation_create_from_buffer( } evaluation_size += ret; break; + case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE: + ret = lttng_evaluation_session_consumed_size_create_from_buffer( + &evaluation_view, evaluation); + if (ret < 0) { + goto end; + } + evaluation_size += ret; + break; default: ERR("Attempted to create evaluation of unknown type (%i)", (int) evaluation_comm->type); diff --git a/src/common/session-consumed-size.c b/src/common/session-consumed-size.c new file mode 100644 index 000000000..f2ae65a9e --- /dev/null +++ b/src/common/session-consumed-size.c @@ -0,0 +1,468 @@ +/* + * Copyright (C) 2017 - Jérémie Galarneau + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License, version 2.1 only, + * as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define IS_CONSUMED_SIZE_CONDITION(condition) ( \ + lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE \ + ) + +#define IS_CONSUMED_SIZE_EVALUATION(evaluation) ( \ + lttng_evaluation_get_type(evaluation) == LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE \ + ) + +static +void lttng_condition_session_consumed_size_destroy(struct lttng_condition *condition) +{ + struct lttng_condition_session_consumed_size *consumed_size; + + consumed_size = container_of(condition, + struct lttng_condition_session_consumed_size, parent); + + free(consumed_size->session_name); + free(consumed_size); +} + +static +bool lttng_condition_session_consumed_size_validate( + const struct lttng_condition *condition) +{ + bool valid = false; + struct lttng_condition_session_consumed_size *consumed; + + if (!condition) { + goto end; + } + + consumed = container_of(condition, struct lttng_condition_session_consumed_size, + parent); + if (!consumed->session_name) { + ERR("Invalid buffer condition: a target session name must be set."); + goto end; + } + if (!consumed->consumed_threshold_bytes.set) { + ERR("Invalid session condition: a threshold must be set."); + goto end; + } + + valid = true; +end: + return valid; +} + +static +ssize_t lttng_condition_session_consumed_size_serialize( + const struct lttng_condition *condition, char *buf) +{ + struct lttng_condition_session_consumed_size *consumed; + ssize_t ret, size; + size_t session_name_len; + + if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition)) { + ret = -1; + goto end; + } + + DBG("Serializing session consumed condition"); + consumed = container_of(condition, struct lttng_condition_session_consumed_size, + parent); + size = sizeof(struct lttng_condition_session_consumed_size_comm); + session_name_len = strlen(consumed->session_name) + 1; + if (session_name_len > LTTNG_NAME_MAX) { + ret = -1; + goto end; + } + size += session_name_len; + if (buf) { + struct lttng_condition_session_consumed_size_comm consumed_comm = { + .consumed_threshold_bytes = consumed->consumed_threshold_bytes.value, + .session_name_len = session_name_len, + }; + + memcpy(buf, &consumed_comm, sizeof(consumed_comm)); + buf += sizeof(consumed_comm); + memcpy(buf, consumed->session_name, session_name_len); + buf += session_name_len; + } + ret = size; +end: + return ret; +} + +static +bool lttng_condition_session_consumed_size_is_equal(const struct lttng_condition *_a, + const struct lttng_condition *_b) +{ + bool is_equal = false; + struct lttng_condition_session_consumed_size *a, *b; + + a = container_of(_a, struct lttng_condition_session_consumed_size, parent); + b = container_of(_b, struct lttng_condition_session_consumed_size, parent); + + if (a->consumed_threshold_bytes.set && b->consumed_threshold_bytes.set) { + uint64_t a_value, b_value; + + a_value = a->consumed_threshold_bytes.value; + b_value = b->consumed_threshold_bytes.value; + if (a_value != b_value) { + goto end; + } + } + + if ((a->session_name && !b->session_name) || + (!a->session_name && b->session_name)) { + goto end; + } + + is_equal = true; +end: + return is_equal; +} + +struct lttng_condition *lttng_condition_session_consumed_size_create(void) +{ + struct lttng_condition_session_consumed_size *condition; + + condition = zmalloc(sizeof(struct lttng_condition_session_consumed_size)); + if (!condition) { + return NULL; + } + + lttng_condition_init(&condition->parent, LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE); + condition->parent.validate = lttng_condition_session_consumed_size_validate; + condition->parent.serialize = lttng_condition_session_consumed_size_serialize; + condition->parent.equal = lttng_condition_session_consumed_size_is_equal; + condition->parent.destroy = lttng_condition_session_consumed_size_destroy; + return &condition->parent; +} + +static +ssize_t init_condition_from_buffer(struct lttng_condition *condition, + const struct lttng_buffer_view *src_view) +{ + ssize_t ret, condition_size; + enum lttng_condition_status status; + const struct lttng_condition_session_consumed_size_comm *condition_comm; + const char *session_name; + struct lttng_buffer_view names_view; + + if (src_view->size < sizeof(*condition_comm)) { + ERR("Failed to initialize from malformed condition buffer: buffer too short to contain header"); + ret = -1; + goto end; + } + + condition_comm = (const struct lttng_condition_session_consumed_size_comm *) src_view->data; + names_view = lttng_buffer_view_from_view(src_view, + sizeof(*condition_comm), -1); + + if (condition_comm->session_name_len > LTTNG_NAME_MAX) { + ERR("Failed to initialize from malformed condition buffer: name exceeds LTTNG_MAX_NAME"); + ret = -1; + goto end; + } + + if (names_view.size < condition_comm->session_name_len) { + ERR("Failed to initialize from malformed condition buffer: buffer too short to contain element names"); + ret = -1; + goto end; + } + + status = lttng_condition_session_consumed_size_set_threshold(condition, + condition_comm->consumed_threshold_bytes); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to initialize session consumed condition threshold"); + ret = -1; + goto end; + } + + session_name = names_view.data; + if (*(session_name + condition_comm->session_name_len - 1) != '\0') { + ERR("Malformed session name encountered in condition buffer"); + ret = -1; + goto end; + } + + status = lttng_condition_session_consumed_size_set_session_name(condition, + session_name); + if (status != LTTNG_CONDITION_STATUS_OK) { + ERR("Failed to set buffer consumed session name"); + ret = -1; + goto end; + } + + if (!lttng_condition_validate(condition)) { + ret = -1; + goto end; + } + + condition_size = sizeof(*condition_comm) + + (ssize_t) condition_comm->session_name_len; + ret = condition_size; +end: + return ret; +} + +LTTNG_HIDDEN +ssize_t lttng_condition_session_consumed_size_create_from_buffer( + const struct lttng_buffer_view *view, + struct lttng_condition **_condition) +{ + ssize_t ret; + struct lttng_condition *condition = + lttng_condition_session_consumed_size_create(); + + if (!_condition || !condition) { + ret = -1; + goto error; + } + + ret = init_condition_from_buffer(condition, view); + if (ret < 0) { + goto error; + } + + *_condition = condition; + return ret; +error: + lttng_condition_destroy(condition); + return ret; +} + +static +struct lttng_evaluation *create_evaluation_from_buffer( + enum lttng_condition_type type, + const struct lttng_buffer_view *view) +{ + const struct lttng_evaluation_session_consumed_size_comm *comm = + (const struct lttng_evaluation_session_consumed_size_comm *) view->data; + struct lttng_evaluation *evaluation = NULL; + + if (view->size < sizeof(*comm)) { + goto end; + } + + evaluation = lttng_evaluation_session_consumed_size_create(type, + comm->session_consumed); +end: + return evaluation; +} + +LTTNG_HIDDEN +ssize_t lttng_evaluation_session_consumed_size_create_from_buffer( + const struct lttng_buffer_view *view, + struct lttng_evaluation **_evaluation) +{ + ssize_t ret; + struct lttng_evaluation *evaluation = NULL; + + if (!_evaluation) { + ret = -1; + goto error; + } + + evaluation = create_evaluation_from_buffer( + LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE, view); + if (!evaluation) { + ret = -1; + goto error; + } + + *_evaluation = evaluation; + ret = sizeof(struct lttng_evaluation_session_consumed_size_comm); + return ret; +error: + lttng_evaluation_destroy(evaluation); + return ret; +} + +enum lttng_condition_status +lttng_condition_session_consumed_size_get_threshold( + const struct lttng_condition *condition, + uint64_t *consumed_threshold_bytes) +{ + struct lttng_condition_session_consumed_size *consumed; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || !consumed_threshold_bytes) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + consumed = container_of(condition, struct lttng_condition_session_consumed_size, + parent); + if (!consumed->consumed_threshold_bytes.set) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *consumed_threshold_bytes = consumed->consumed_threshold_bytes.value; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_session_consumed_size_set_threshold( + struct lttng_condition *condition, uint64_t consumed_threshold_bytes) +{ + struct lttng_condition_session_consumed_size *consumed; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition)) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + consumed = container_of(condition, struct lttng_condition_session_consumed_size, + parent); + consumed->consumed_threshold_bytes.set = true; + consumed->consumed_threshold_bytes.value = consumed_threshold_bytes; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_session_consumed_size_get_session_name( + const struct lttng_condition *condition, + const char **session_name) +{ + struct lttng_condition_session_consumed_size *consumed; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || !session_name) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + consumed = container_of(condition, struct lttng_condition_session_consumed_size, + parent); + if (!consumed->session_name) { + status = LTTNG_CONDITION_STATUS_UNSET; + goto end; + } + *session_name = consumed->session_name; +end: + return status; +} + +enum lttng_condition_status +lttng_condition_session_consumed_size_set_session_name( + struct lttng_condition *condition, const char *session_name) +{ + char *session_name_copy; + struct lttng_condition_session_consumed_size *consumed; + enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK; + + if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || + !session_name || strlen(session_name) == 0) { + status = LTTNG_CONDITION_STATUS_INVALID; + goto end; + } + + consumed = container_of(condition, struct lttng_condition_session_consumed_size, + parent); + session_name_copy = strdup(session_name); + if (!session_name_copy) { + status = LTTNG_CONDITION_STATUS_ERROR; + goto end; + } + + if (consumed->session_name) { + free(consumed->session_name); + } + consumed->session_name = session_name_copy; +end: + return status; +} + +static +ssize_t lttng_evaluation_session_consumed_size_serialize( + struct lttng_evaluation *evaluation, char *buf) +{ + ssize_t ret; + struct lttng_evaluation_session_consumed_size *consumed; + + consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size, + parent); + if (buf) { + struct lttng_evaluation_session_consumed_size_comm comm = { + .session_consumed = consumed->session_consumed, + }; + + memcpy(buf, &comm, sizeof(comm)); + } + + ret = sizeof(struct lttng_evaluation_session_consumed_size_comm); + return ret; +} + +static +void lttng_evaluation_session_consumed_size_destroy( + struct lttng_evaluation *evaluation) +{ + struct lttng_evaluation_session_consumed_size *consumed; + + consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size, + parent); + free(consumed); +} + +LTTNG_HIDDEN +struct lttng_evaluation *lttng_evaluation_session_consumed_size_create( + enum lttng_condition_type type, uint64_t consumed) +{ + struct lttng_evaluation_session_consumed_size *consumed_eval; + + consumed_eval = zmalloc(sizeof(struct lttng_evaluation_session_consumed_size)); + if (!consumed_eval) { + goto end; + } + + consumed_eval->parent.type = type; + consumed_eval->session_consumed = consumed; + consumed_eval->parent.serialize = lttng_evaluation_session_consumed_size_serialize; + consumed_eval->parent.destroy = lttng_evaluation_session_consumed_size_destroy; +end: + return &consumed_eval->parent; +} + +enum lttng_evaluation_status +lttng_evaluation_session_consumed_size_get_consumed_size( + const struct lttng_evaluation *evaluation, + uint64_t *session_consumed) +{ + struct lttng_evaluation_session_consumed_size *consumed; + enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK; + + if (!evaluation || !IS_CONSUMED_SIZE_EVALUATION(evaluation) || + !session_consumed) { + status = LTTNG_EVALUATION_STATUS_INVALID; + goto end; + } + + consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size, + parent); + *session_consumed = consumed->session_consumed; +end: + return status; +} diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 48daec06a..8f08d9904 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -595,6 +595,10 @@ struct lttcomm_consumer_channel_monitor_msg { * Lowest and highest usage (bytes) at the moment the sample was taken. */ uint64_t lowest, highest; + /* + * Sum of all the consumed positions for a channel. + */ + uint64_t total_consumed; } LTTNG_PACKED; /* -- 2.34.1