Session consumed size notification
authorJulien Desfossez <jdesfossez@efficios.com>
Fri, 9 Feb 2018 19:50:28 +0000 (14:50 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 3 Apr 2018 16:12:28 +0000 (12:12 -0400)
Add the support for notifications about the total amount of trace data
consumed for a session. The user can register itself to be notified when
a session has consumed more than a threshold. This sums the data for all
channels in a session.

For the review: part of this code was written by Jérémie, but it was on
top of my development branch with major updates on my early work with
notifications, so I had to squash it because it made no sense to keep
Jérémie's code separate.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
13 files changed:
include/Makefile.am
include/lttng/condition/condition.h
include/lttng/condition/session-consumed-size-internal.h [new file with mode: 0644]
include/lttng/condition/session-consumed-size.h [new file with mode: 0644]
include/lttng/lttng.h
src/bin/lttng-sessiond/notification-thread-events.c
src/bin/lttng-sessiond/notification-thread-internal.h
src/common/Makefile.am
src/common/condition.c
src/common/consumer/consumer-timer.c
src/common/evaluation.c
src/common/session-consumed-size.c [new file with mode: 0644]
src/common/sessiond-comm/sessiond-comm.h

index ab4665d5ba6f0d370dbed46fef9391a2c3e3faff..ddf252899c56d6e608beee5384b8c712c24bd32f 100644 (file)
@@ -88,6 +88,7 @@ lttngactioninclude_HEADERS= \
 lttngconditioninclude_HEADERS= \
        lttng/condition/condition.h \
        lttng/condition/buffer-usage.h \
+       lttng/condition/session-consumed-size.h \
        lttng/condition/evaluation.h
 
 lttngnotificationinclude_HEADERS= \
@@ -106,6 +107,7 @@ noinst_HEADERS = \
        lttng/action/notify-internal.h \
        lttng/condition/condition-internal.h \
        lttng/condition/buffer-usage-internal.h \
+       lttng/condition/session-consumed-size-internal.h \
        lttng/condition/evaluation-internal.h \
        lttng/notification/notification-internal.h \
        lttng/trigger/trigger-internal.h \
index 71762ab017e0e9396530ff9e515d7447193e6e81..ae51e8a7a782e2c8773627dbd169e179af799d81 100644 (file)
@@ -30,6 +30,7 @@ enum lttng_condition_type {
        LTTNG_CONDITION_TYPE_UNKNOWN = -1,
        LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW = 102,
        LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH = 101,
+       LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE = 100,
 };
 
 enum lttng_condition_status {
diff --git a/include/lttng/condition/session-consumed-size-internal.h b/include/lttng/condition/session-consumed-size-internal.h
new file mode 100644 (file)
index 0000000..8d4b948
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H
+#define LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H
+
+#include <lttng/condition/session-consumed-size.h>
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/evaluation-internal.h>
+#include <lttng/domain.h>
+#include "common/buffer-view.h"
+
+struct lttng_condition_session_consumed_size {
+       struct lttng_condition parent;
+       struct {
+               bool set;
+               uint64_t value;
+       } consumed_threshold_bytes;
+       char *session_name;
+};
+
+struct lttng_condition_session_consumed_size_comm {
+       uint64_t consumed_threshold_bytes;
+       /* Length includes the trailing \0. */
+       uint32_t session_name_len;
+       char session_name[];
+} LTTNG_PACKED;
+
+struct lttng_evaluation_session_consumed_size {
+       struct lttng_evaluation parent;
+       uint64_t session_consumed;
+};
+
+struct lttng_evaluation_session_consumed_size_comm {
+       uint64_t session_consumed;
+} LTTNG_PACKED;
+
+LTTNG_HIDDEN
+struct lttng_evaluation *lttng_evaluation_session_consumed_size_create(
+               enum lttng_condition_type type, uint64_t consumed);
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_session_consumed_size_create_from_buffer(
+               const struct lttng_buffer_view *view,
+               struct lttng_condition **condition);
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_session_consumed_size_create_from_buffer(
+               const struct lttng_buffer_view *view,
+               struct lttng_evaluation **evaluation);
+
+#endif /* LTTNG_CONDITION_SESSION_CONSUMED_SIZE_INTERNAL_H */
diff --git a/include/lttng/condition/session-consumed-size.h b/include/lttng/condition/session-consumed-size.h
new file mode 100644 (file)
index 0000000..4079e8c
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H
+#define LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H
+
+#include <lttng/condition/evaluation.h>
+#include <lttng/condition/condition.h>
+#include <stdint.h>
+#include <lttng/domain.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct lttng_condition;
+struct lttng_evaluation;
+
+/**
+ * Session consumed size conditions allow an action to be taken whenever a
+ * session's produced data size crosses a set threshold.
+ *
+ * These conditions are periodically evaluated against the current session
+ * statistics. The period at which these conditions are evaluated is
+ * governed by the channels' monitor timer.
+ *
+ * Session consumed size conditions have the following properties:
+ *   - the exact name of the session to be monitored,
+ *   - a total consumed size threshold, expressed in bytes.
+ *
+ * Wildcards, regular expressions or other globbing mechanisms are not supported
+ * in session consumed size condition properties.
+ */
+
+/*
+ * Create a newly allocated session consumed size condition.
+ *
+ * A session consumed size condition evaluates to true whenever the sum of all
+ * its channels' consumed data size is higher than a set threshold. The
+ * consumed data sizes are free running counters.
+ *
+ * Returns a new condition on success, NULL on failure. This condition must be
+ * destroyed using lttng_condition_destroy().
+ */
+extern struct lttng_condition *
+lttng_condition_session_consumed_size_create(void);
+
+/*
+ * Get the threshold of a session consumed size condition.
+ *
+ * The session consumed size condition's threshold must have been defined as
+ * an absolute value expressed in bytes in order for this call to succeed.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success and a threshold expressed in
+ * bytes, LTTNG_CONDITION_STATUS_INVALID if an invalid parameter is passed, or
+ * LTTNG_CONDITION_STATUS_UNSET if a threshold, expressed as an absolute size in
+ * bytes, was not set prior to this call.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_get_threshold(
+               const struct lttng_condition *condition,
+               uint64_t *consumed_threshold_bytes);
+
+/*
+ * Set the threshold of a session consumed size usage condition.
+ *
+ * Setting a threshold overrides any previously set threshold.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success, LTTNG_CONDITION_STATUS_INVALID
+ * if invalid parameters are passed.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_set_threshold(
+               struct lttng_condition *condition,
+               uint64_t consumed_threshold_bytes);
+
+/*
+ * Get the session name property of a session consumed size condition.
+ *
+ * The caller does not assume the ownership of the returned session name. The
+ * session name shall only be used for the duration of the condition's
+ * lifetime, or before a different session name is set.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK and a pointer to the condition's session
+ * name on success, LTTNG_CONDITION_STATUS_INVALID if an invalid
+ * parameter is passed, or LTTNG_CONDITION_STATUS_UNSET if a session name
+ * was not set prior to this call.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_get_session_name(
+               const struct lttng_condition *condition,
+               const char **session_name);
+
+/*
+ * Set the session name property of a session consumed size condition.
+ *
+ * The passed session name parameter will be copied to the condition.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success, LTTNG_CONDITION_STATUS_INVALID
+ * if invalid parameters are passed.
+ */
+extern enum lttng_condition_status
+lttng_condition_session_consumed_size_set_session_name(
+               struct lttng_condition *condition,
+               const char *session_name);
+
+/**
+ * lttng_evaluation_session_consumed_size is specialised lttng_evaluations
+ * which allow users to query a number of properties resulting from the
+ * evaluation of a condition which evaluated to true.
+ */
+
+/*
+ * Get the session consumed property of a session consumed size evaluation.
+ *
+ * Returns LTTNG_CONDITION_STATUS_OK on success and a threshold expressed in
+ * bytes, or LTTNG_CONDITION_STATUS_INVALID if an invalid parameter is passed.
+ */
+extern enum lttng_evaluation_status
+lttng_evaluation_session_consumed_size_get_consumed_size(
+               const struct lttng_evaluation *evaluation,
+               uint64_t *session_consumed);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LTTNG_CONDITION_SESSION_CONSUMED_SIZE_H */
index 1dfac7849d73605e2bd62d5313d28562bfe9ecec..d13f9639f2c39814806fd20c936f2606f630ba5e 100644 (file)
@@ -39,6 +39,7 @@
 #include <lttng/action/notify.h>
 #include <lttng/condition/condition.h>
 #include <lttng/condition/buffer-usage.h>
+#include <lttng/condition/session-consumed-size.h>
 #include <lttng/condition/evaluation.h>
 #include <lttng/notification/channel.h>
 #include <lttng/notification/notification.h>
index df801be59d77f1b50561cae48705177277f133cb..6a7477aff44a165d1cb5d763526aea8498fde2f8 100644 (file)
@@ -32,6 +32,7 @@
 #include <lttng/notification/notification-internal.h>
 #include <lttng/condition/condition-internal.h>
 #include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
 #include <lttng/notification/channel-internal.h>
 
 #include <time.h>
@@ -152,15 +153,18 @@ struct channel_state_sample {
        struct cds_lfht_node channel_state_ht_node;
        uint64_t highest_usage;
        uint64_t lowest_usage;
+       uint64_t channel_total_consumed;
 };
 
 static unsigned long hash_channel_key(struct channel_key *key);
-static int evaluate_condition(struct lttng_condition *condition,
+static int evaluate_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
-               struct notification_thread_state *state,
-               struct channel_state_sample *previous_sample,
-               struct channel_state_sample *latest_sample,
-               uint64_t buffer_capacity);
+               const struct notification_thread_state *state,
+               const struct channel_state_sample *previous_sample,
+               const struct channel_state_sample *latest_sample,
+               uint64_t previous_session_consumed_total,
+               uint64_t latest_session_consumed_total,
+               struct channel_info *channel_info);
 static
 int send_evaluation_to_clients(struct lttng_trigger *trigger,
                struct lttng_evaluation *evaluation,
@@ -321,6 +325,25 @@ unsigned long lttng_condition_buffer_usage_hash(
        return hash;
 }
 
+static
+unsigned long lttng_condition_session_consumed_size_hash(
+       struct lttng_condition *_condition)
+{
+       unsigned long hash = 0;
+       struct lttng_condition_session_consumed_size *condition;
+       uint64_t val;
+
+       condition = container_of(_condition,
+                       struct lttng_condition_session_consumed_size, parent);
+
+       if (condition->session_name) {
+               hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+       }
+       val = condition->consumed_threshold_bytes.value;
+       hash ^= hash_key_u64(&val, lttng_ht_seed);
+       return hash;
+}
+
 /*
  * The lttng_condition hashing code is kept in this file (rather than
  * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
@@ -333,6 +356,8 @@ unsigned long lttng_condition_hash(struct lttng_condition *condition)
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
                return lttng_condition_buffer_usage_hash(condition);
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               return lttng_condition_session_consumed_size_hash(condition);
        default:
                ERR("[notification-thread] Unexpected condition type caught");
                abort();
@@ -574,8 +599,10 @@ int evaluate_condition_for_client(struct lttng_trigger *trigger,
                goto end;
        }
 
-       ret = evaluate_condition(condition, &evaluation, state, NULL,
-                       last_sample, channel_info->capacity);
+       ret = evaluate_condition(condition, &evaluation, state,
+                       NULL, last_sample,
+                       0, channel_info->session_info->consumed_data_size,
+                       channel_info);
        if (ret) {
                WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
                goto end;
@@ -844,51 +871,90 @@ end:
 }
 
 static
-bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+bool buffer_usage_condition_applies_to_channel(
+               struct lttng_condition *condition,
                struct channel_info *channel_info)
 {
        enum lttng_condition_status status;
-       struct lttng_condition *condition;
-       const char *trigger_session_name = NULL;
-       const char *trigger_channel_name = NULL;
-       enum lttng_domain_type trigger_domain;
+       enum lttng_domain_type condition_domain;
+       const char *condition_session_name = NULL;
+       const char *condition_channel_name = NULL;
 
-       condition = lttng_trigger_get_condition(trigger);
-       if (!condition) {
+       status = lttng_condition_buffer_usage_get_domain_type(condition,
+                       &condition_domain);
+       assert(status == LTTNG_CONDITION_STATUS_OK);
+       if (channel_info->key.domain != condition_domain) {
                goto fail;
        }
 
-       switch (lttng_condition_get_type(condition)) {
-       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
-       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
-               break;
-       default:
+       status = lttng_condition_buffer_usage_get_session_name(
+                       condition, &condition_session_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+       status = lttng_condition_buffer_usage_get_channel_name(
+                       condition, &condition_channel_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
+
+       if (strcmp(channel_info->session_info->name, condition_session_name)) {
+               goto fail;
+       }
+       if (strcmp(channel_info->name, condition_channel_name)) {
                goto fail;
        }
 
-       status = lttng_condition_buffer_usage_get_domain_type(condition,
-                       &trigger_domain);
-       assert(status == LTTNG_CONDITION_STATUS_OK);
-       if (channel_info->key.domain != trigger_domain) {
+       return true;
+fail:
+       return false;
+}
+
+static
+bool session_consumed_size_condition_applies_to_channel(
+               struct lttng_condition *condition,
+               struct channel_info *channel_info)
+{
+       enum lttng_condition_status status;
+       const char *condition_session_name = NULL;
+
+       status = lttng_condition_session_consumed_size_get_session_name(
+                       condition, &condition_session_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+       if (strcmp(channel_info->session_info->name, condition_session_name)) {
                goto fail;
        }
 
-       status = lttng_condition_buffer_usage_get_session_name(
-                       condition, &trigger_session_name);
-       assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
+       return true;
+fail:
+       return false;
+}
 
-       status = lttng_condition_buffer_usage_get_channel_name(
-                       condition, &trigger_channel_name);
-       assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
+static
+bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+               struct channel_info *channel_info)
+{
+       struct lttng_condition *condition;
+       bool trigger_applies;
 
-       if (strcmp(channel_info->session_info->name, trigger_session_name)) {
+       condition = lttng_trigger_get_condition(trigger);
+       if (!condition) {
                goto fail;
        }
-       if (strcmp(channel_info->name, trigger_channel_name)) {
+
+       switch (lttng_condition_get_type(condition)) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               trigger_applies = buffer_usage_condition_applies_to_channel(
+                               condition, channel_info);
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               trigger_applies = session_consumed_size_condition_applies_to_channel(
+                               condition, channel_info);
+               break;
+       default:
                goto fail;
        }
 
-       return true;
+       return trigger_applies;
 fail:
        return false;
 }
@@ -1315,9 +1381,6 @@ int handle_notification_thread_command_register_trigger(
                CDS_INIT_LIST_HEAD(&trigger_list_element->node);
                trigger_list_element->trigger = trigger;
                cds_list_add(&trigger_list_element->node, &trigger_list->list);
-
-               /* A trigger can only apply to one channel. */
-               break;
        }
 
        /*
@@ -2129,20 +2192,17 @@ end:
 }
 
 static
-bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
-               struct channel_state_sample *sample, uint64_t buffer_capacity)
+bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
+               const struct channel_state_sample *sample,
+               uint64_t buffer_capacity)
 {
        bool result = false;
        uint64_t threshold;
        enum lttng_condition_type condition_type;
-       struct lttng_condition_buffer_usage *use_condition = container_of(
+       const struct lttng_condition_buffer_usage *use_condition = container_of(
                        condition, struct lttng_condition_buffer_usage,
                        parent);
 
-       if (!sample) {
-               goto end;
-       }
-
        if (use_condition->threshold_bytes.set) {
                threshold = use_condition->threshold_bytes.value;
        } else {
@@ -2186,32 +2246,73 @@ bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
                        result = true;
                }
        }
-end:
+
        return result;
 }
 
 static
-int evaluate_condition(struct lttng_condition *condition,
+bool evaluate_session_consumed_size_condition(
+               const struct lttng_condition *condition,
+               uint64_t session_consumed_size)
+{
+       uint64_t threshold;
+       const struct lttng_condition_session_consumed_size *size_condition =
+                       container_of(condition,
+                               struct lttng_condition_session_consumed_size,
+                               parent);
+
+       threshold = size_condition->consumed_threshold_bytes.value;
+       DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+                       threshold, session_consumed_size);
+       return session_consumed_size >= threshold;
+}
+
+static
+int evaluate_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
-               struct notification_thread_state *state,
-               struct channel_state_sample *previous_sample,
-               struct channel_state_sample *latest_sample,
-               uint64_t buffer_capacity)
+               const struct notification_thread_state *state,
+               const struct channel_state_sample *previous_sample,
+               const struct channel_state_sample *latest_sample,
+               uint64_t previous_session_consumed_total,
+               uint64_t latest_session_consumed_total,
+               struct channel_info *channel_info)
 {
        int ret = 0;
        enum lttng_condition_type condition_type;
-       bool previous_sample_result;
+       const bool previous_sample_available = !!previous_sample;
+       bool previous_sample_result = false;
        bool latest_sample_result;
 
        condition_type = lttng_condition_get_type(condition);
-       /* No other condition type supported for the moment. */
-       assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
-                       condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
 
-       previous_sample_result = evaluate_buffer_usage_condition(condition,
-                       previous_sample, buffer_capacity);
-       latest_sample_result = evaluate_buffer_usage_condition(condition,
-                       latest_sample, buffer_capacity);
+       switch (condition_type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               if (caa_likely(previous_sample_available)) {
+                       previous_sample_result =
+                               evaluate_buffer_usage_condition(condition,
+                                       previous_sample, channel_info->capacity);
+               }
+               latest_sample_result = evaluate_buffer_usage_condition(
+                               condition, latest_sample,
+                               channel_info->capacity);
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               if (caa_likely(previous_sample_available)) {
+                       previous_sample_result =
+                               evaluate_session_consumed_size_condition(
+                                       condition,
+                                       previous_session_consumed_total);
+               }
+               latest_sample_result =
+                               evaluate_session_consumed_size_condition(
+                                       condition,
+                                       latest_session_consumed_total);
+               break;
+       default:
+               /* Unknown condition type; internal error. */
+               abort();
+       }
 
        if (!latest_sample_result ||
                        (previous_sample_result == latest_sample_result)) {
@@ -2224,15 +2325,30 @@ int evaluate_condition(struct lttng_condition *condition,
                goto end;
        }
 
-       if (evaluation && latest_sample_result) {
+       if (!evaluation || !latest_sample_result) {
+               goto end;
+       }
+
+       switch (condition_type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
                *evaluation = lttng_evaluation_buffer_usage_create(
                                condition_type,
                                latest_sample->highest_usage,
-                               buffer_capacity);
-               if (!*evaluation) {
-                       ret = -1;
-                       goto end;
-               }
+                               channel_info->capacity);
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               *evaluation = lttng_evaluation_session_consumed_size_create(
+                               condition_type,
+                               latest_session_consumed_total);
+               break;
+       default:
+               abort();
+       }
+
+       if (!*evaluation) {
+               ret = -1;
+               goto end;
        }
 end:
        return ret;
@@ -2369,13 +2485,14 @@ int handle_notification_thread_channel_sample(
 {
        int ret = 0;
        struct lttcomm_consumer_channel_monitor_msg sample_msg;
-       struct channel_state_sample previous_sample, latest_sample;
        struct channel_info *channel_info;
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
        struct lttng_channel_trigger_list *trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        bool previous_sample_available = false;
+       struct channel_state_sample previous_sample, latest_sample;
+       uint64_t previous_session_consumed_total, latest_session_consumed_total;
 
        /*
         * The monitoring pipe only holds messages smaller than PIPE_BUF,
@@ -2394,6 +2511,7 @@ int handle_notification_thread_channel_sample(
        latest_sample.key.domain = domain;
        latest_sample.highest_usage = sample_msg.highest;
        latest_sample.lowest_usage = sample_msg.lowest;
+       latest_sample.channel_total_consumed = sample_msg.total_consumed;
 
        rcu_read_lock();
 
@@ -2419,12 +2537,16 @@ int handle_notification_thread_channel_sample(
        }
        channel_info = caa_container_of(node, struct channel_info,
                        channels_ht_node);
-       DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
+       DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
                        channel_info->name,
                        latest_sample.key.key,
                        channel_info->session_info->name,
                        latest_sample.highest_usage,
-                       latest_sample.lowest_usage);
+                       latest_sample.lowest_usage,
+                       latest_sample.channel_total_consumed);
+
+       previous_session_consumed_total =
+                       channel_info->session_info->consumed_data_size;
 
        /* Retrieve the channel's last sample, if it exists, and update it. */
        cds_lfht_lookup(state->channel_state_ht,
@@ -2440,12 +2562,17 @@ int handle_notification_thread_channel_sample(
                stored_sample = caa_container_of(node,
                                struct channel_state_sample,
                                channel_state_ht_node);
+
                memcpy(&previous_sample, stored_sample,
                                sizeof(previous_sample));
                stored_sample->highest_usage = latest_sample.highest_usage;
                stored_sample->lowest_usage = latest_sample.lowest_usage;
                stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
                previous_sample_available = true;
+
+               latest_session_consumed_total =
+                               previous_session_consumed_total +
+                               (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
        } else {
                /*
                 * This is the channel's first sample, allocate space for and
@@ -2464,8 +2591,15 @@ int handle_notification_thread_channel_sample(
                cds_lfht_add(state->channel_state_ht,
                                hash_channel_key(&stored_sample->key),
                                &stored_sample->channel_state_ht_node);
+
+               latest_session_consumed_total =
+                               previous_session_consumed_total +
+                               latest_sample.channel_total_consumed;
        }
 
+       channel_info->session_info->consumed_data_size =
+                       latest_session_consumed_total;
+
        /* Find triggers associated with this channel. */
        cds_lfht_lookup(state->channel_triggers_ht,
                        hash_channel_key(&latest_sample.key),
@@ -2521,12 +2655,15 @@ int handle_notification_thread_channel_sample(
 
                ret = evaluate_condition(condition, &evaluation, state,
                                previous_sample_available ? &previous_sample : NULL,
-                               &latest_sample, channel_info->capacity);
-               if (ret) {
+                               &latest_sample,
+                               previous_session_consumed_total,
+                               latest_session_consumed_total,
+                               channel_info);
+               if (caa_unlikely(ret)) {
                        goto end_unlock;
                }
 
-               if (!evaluation) {
+               if (caa_likely(!evaluation)) {
                        continue;
                }
 
index 8149e2c2116a1ab0dd4cfc4581809c468872cdfa..b135f128c6a894fb75cfef5e7acf077ad0e6a2e1 100644 (file)
@@ -40,6 +40,7 @@ struct session_info {
        struct cds_lfht *channel_infos_ht;
        /* Node in the notification thread state's sessions_ht. */
        struct cds_lfht_node sessions_ht_node;
+       uint64_t consumed_data_size;
 };
 
 struct channel_info {
index 3995b2f7650909da5f22d91fd8b6c44e0124bf0b..5392bf4d94a2f79f5f563f842ab64ba0435a5492 100644 (file)
@@ -72,6 +72,7 @@ libcommon_la_SOURCES = error.h error.c utils.c utils.h runas.c runas.h \
                        unix.c unix.h \
                        filter.c filter.h context.c context.h \
                        action.c notify.c condition.c buffer-usage.c \
+                       session-consumed-size.c \
                        evaluation.c notification.c trigger.c endpoint.c \
                        dynamic-buffer.h dynamic-buffer.c \
                        buffer-view.h buffer-view.c \
index 98c97a595016c772c54aead040809005f6838be2..60d32e6cc7816c461dd0f9a0bdee6340e3ea1e45 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <lttng/condition/condition-internal.h>
 #include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
 #include <common/macros.h>
 #include <common/error.h>
 #include <common/dynamic-buffer.h>
@@ -136,6 +137,9 @@ ssize_t lttng_condition_create_from_buffer(
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
                create_from_buffer = lttng_condition_buffer_usage_high_create_from_buffer;
                break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               create_from_buffer = lttng_condition_session_consumed_size_create_from_buffer;
+               break;
        default:
                ERR("Attempted to create condition of unknown type (%i)",
                                (int) condition_comm->condition_type);
index e42940ed991fa60a6ec7e0e744ebd1033e203bf9..721f146e20fdff2a5d7cecd0e488008bd618269f 100644 (file)
@@ -633,7 +633,7 @@ int consumer_signal_init(void)
 
 static
 int sample_channel_positions(struct lttng_consumer_channel *channel,
-               uint64_t *_highest_use, uint64_t *_lowest_use,
+               uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
                sample_positions_cb sample, get_consumed_cb get_consumed,
                get_produced_cb get_produced)
 {
@@ -644,6 +644,8 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
        uint64_t high = 0, low = UINT64_MAX;
        struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
 
+       *_total_consumed = 0;
+
        rcu_read_lock();
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
@@ -681,6 +683,15 @@ int sample_channel_positions(struct lttng_consumer_channel *channel,
                usage = produced - consumed;
                high = (usage > high) ? usage : high;
                low = (usage < low) ? usage : low;
+
+               /*
+                * We don't use consumed here for 2 reasons:
+                *  - output_written takes into account the padding written in the
+                *    tracefiles when we stop the session;
+                *  - the consumed position is not the accurate representation of what
+                *    was extracted from a buffer in overwrite mode.
+                */
+               *_total_consumed += stream->output_written;
        next:
                pthread_mutex_unlock(&stream->lock);
        }
@@ -735,7 +746,7 @@ void monitor_timer(struct lttng_consumer_local_data *ctx,
        }
 
        ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
-                       sample, get_consumed, get_produced);
+                       &msg.total_consumed, sample, get_consumed, get_produced);
        if (ret) {
                return;
        }
index 43b4e743f22e807dfd799201387ef40b4df755fc..b2c2df81585e34d943af6605d04d8633b084a277 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <lttng/condition/evaluation-internal.h>
 #include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
 #include <common/macros.h>
 #include <common/error.h>
 #include <stdbool.h>
@@ -86,6 +87,14 @@ ssize_t lttng_evaluation_create_from_buffer(
                }
                evaluation_size += ret;
                break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               ret = lttng_evaluation_session_consumed_size_create_from_buffer(
+                               &evaluation_view, evaluation);
+               if (ret < 0) {
+                       goto end;
+               }
+               evaluation_size += ret;
+               break;
        default:
                ERR("Attempted to create evaluation of unknown type (%i)",
                                (int) evaluation_comm->type);
diff --git a/src/common/session-consumed-size.c b/src/common/session-consumed-size.c
new file mode 100644 (file)
index 0000000..f2ae65a
--- /dev/null
@@ -0,0 +1,468 @@
+/*
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License, version 2.1 only,
+ * as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <lttng/condition/condition-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
+#include <common/macros.h>
+#include <common/error.h>
+#include <assert.h>
+#include <math.h>
+#include <float.h>
+#include <time.h>
+
+#define IS_CONSUMED_SIZE_CONDITION(condition) ( \
+       lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE \
+       )
+
+#define IS_CONSUMED_SIZE_EVALUATION(evaluation) ( \
+       lttng_evaluation_get_type(evaluation) == LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE \
+       )
+
+static
+void lttng_condition_session_consumed_size_destroy(struct lttng_condition *condition)
+{
+       struct lttng_condition_session_consumed_size *consumed_size;
+
+       consumed_size = container_of(condition,
+                       struct lttng_condition_session_consumed_size, parent);
+
+       free(consumed_size->session_name);
+       free(consumed_size);
+}
+
+static
+bool lttng_condition_session_consumed_size_validate(
+               const struct lttng_condition *condition)
+{
+       bool valid = false;
+       struct lttng_condition_session_consumed_size *consumed;
+
+       if (!condition) {
+               goto end;
+       }
+
+       consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+                       parent);
+       if (!consumed->session_name) {
+               ERR("Invalid buffer condition: a target session name must be set.");
+               goto end;
+       }
+       if (!consumed->consumed_threshold_bytes.set) {
+               ERR("Invalid session condition: a threshold must be set.");
+               goto end;
+       }
+
+       valid = true;
+end:
+       return valid;
+}
+
+static
+ssize_t lttng_condition_session_consumed_size_serialize(
+               const struct lttng_condition *condition, char *buf)
+{
+       struct lttng_condition_session_consumed_size *consumed;
+       ssize_t ret, size;
+       size_t session_name_len;
+
+       if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition)) {
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Serializing session consumed condition");
+       consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+                       parent);
+       size = sizeof(struct lttng_condition_session_consumed_size_comm);
+       session_name_len = strlen(consumed->session_name) + 1;
+       if (session_name_len > LTTNG_NAME_MAX) {
+               ret = -1;
+               goto end;
+       }
+       size += session_name_len;
+       if (buf) {
+               struct lttng_condition_session_consumed_size_comm consumed_comm = {
+                       .consumed_threshold_bytes = consumed->consumed_threshold_bytes.value,
+                       .session_name_len = session_name_len,
+               };
+
+               memcpy(buf, &consumed_comm, sizeof(consumed_comm));
+               buf += sizeof(consumed_comm);
+               memcpy(buf, consumed->session_name, session_name_len);
+               buf += session_name_len;
+       }
+       ret = size;
+end:
+       return ret;
+}
+
+static
+bool lttng_condition_session_consumed_size_is_equal(const struct lttng_condition *_a,
+               const struct lttng_condition *_b)
+{
+       bool is_equal = false;
+       struct lttng_condition_session_consumed_size *a, *b;
+
+       a = container_of(_a, struct lttng_condition_session_consumed_size, parent);
+       b = container_of(_b, struct lttng_condition_session_consumed_size, parent);
+
+       if (a->consumed_threshold_bytes.set && b->consumed_threshold_bytes.set) {
+               uint64_t a_value, b_value;
+
+               a_value = a->consumed_threshold_bytes.value;
+               b_value = b->consumed_threshold_bytes.value;
+               if (a_value != b_value) {
+                       goto end;
+               }
+       }
+
+       if ((a->session_name && !b->session_name) ||
+                       (!a->session_name && b->session_name)) {
+               goto end;
+       }
+
+       is_equal = true;
+end:
+       return is_equal;
+}
+
+struct lttng_condition *lttng_condition_session_consumed_size_create(void)
+{
+       struct lttng_condition_session_consumed_size *condition;
+
+       condition = zmalloc(sizeof(struct lttng_condition_session_consumed_size));
+       if (!condition) {
+               return NULL;
+       }
+
+       lttng_condition_init(&condition->parent, LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE);
+       condition->parent.validate = lttng_condition_session_consumed_size_validate;
+       condition->parent.serialize = lttng_condition_session_consumed_size_serialize;
+       condition->parent.equal = lttng_condition_session_consumed_size_is_equal;
+       condition->parent.destroy = lttng_condition_session_consumed_size_destroy;
+       return &condition->parent;
+}
+
+static
+ssize_t init_condition_from_buffer(struct lttng_condition *condition,
+               const struct lttng_buffer_view *src_view)
+{
+       ssize_t ret, condition_size;
+       enum lttng_condition_status status;
+       const struct lttng_condition_session_consumed_size_comm *condition_comm;
+       const char *session_name;
+       struct lttng_buffer_view names_view;
+
+       if (src_view->size < sizeof(*condition_comm)) {
+               ERR("Failed to initialize from malformed condition buffer: buffer too short to contain header");
+               ret = -1;
+               goto end;
+       }
+
+       condition_comm = (const struct lttng_condition_session_consumed_size_comm *) src_view->data;
+       names_view = lttng_buffer_view_from_view(src_view,
+                       sizeof(*condition_comm), -1);
+
+       if (condition_comm->session_name_len > LTTNG_NAME_MAX) {
+               ERR("Failed to initialize from malformed condition buffer: name exceeds LTTNG_MAX_NAME");
+               ret = -1;
+               goto end;
+       }
+
+       if (names_view.size < condition_comm->session_name_len) {
+               ERR("Failed to initialize from malformed condition buffer: buffer too short to contain element names");
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_condition_session_consumed_size_set_threshold(condition,
+                       condition_comm->consumed_threshold_bytes);
+       if (status != LTTNG_CONDITION_STATUS_OK) {
+               ERR("Failed to initialize session consumed condition threshold");
+               ret = -1;
+               goto end;
+       }
+
+       session_name = names_view.data;
+       if (*(session_name + condition_comm->session_name_len - 1) != '\0') {
+               ERR("Malformed session name encountered in condition buffer");
+               ret = -1;
+               goto end;
+       }
+
+       status = lttng_condition_session_consumed_size_set_session_name(condition,
+                       session_name);
+       if (status != LTTNG_CONDITION_STATUS_OK) {
+               ERR("Failed to set buffer consumed session name");
+               ret = -1;
+               goto end;
+       }
+
+       if (!lttng_condition_validate(condition)) {
+               ret = -1;
+               goto end;
+       }
+
+       condition_size = sizeof(*condition_comm) +
+                       (ssize_t) condition_comm->session_name_len;
+       ret = condition_size;
+end:
+       return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_condition_session_consumed_size_create_from_buffer(
+               const struct lttng_buffer_view *view,
+               struct lttng_condition **_condition)
+{
+       ssize_t ret;
+       struct lttng_condition *condition =
+                       lttng_condition_session_consumed_size_create();
+
+       if (!_condition || !condition) {
+               ret = -1;
+               goto error;
+       }
+
+       ret = init_condition_from_buffer(condition, view);
+       if (ret < 0) {
+               goto error;
+       }
+
+       *_condition = condition;
+       return ret;
+error:
+       lttng_condition_destroy(condition);
+       return ret;
+}
+
+static
+struct lttng_evaluation *create_evaluation_from_buffer(
+               enum lttng_condition_type type,
+               const struct lttng_buffer_view *view)
+{
+       const struct lttng_evaluation_session_consumed_size_comm *comm =
+                       (const struct lttng_evaluation_session_consumed_size_comm *) view->data;
+       struct lttng_evaluation *evaluation = NULL;
+
+       if (view->size < sizeof(*comm)) {
+               goto end;
+       }
+
+       evaluation = lttng_evaluation_session_consumed_size_create(type,
+                       comm->session_consumed);
+end:
+       return evaluation;
+}
+
+LTTNG_HIDDEN
+ssize_t lttng_evaluation_session_consumed_size_create_from_buffer(
+               const struct lttng_buffer_view *view,
+               struct lttng_evaluation **_evaluation)
+{
+       ssize_t ret;
+       struct lttng_evaluation *evaluation = NULL;
+
+       if (!_evaluation) {
+               ret = -1;
+               goto error;
+       }
+
+       evaluation = create_evaluation_from_buffer(
+                       LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE, view);
+       if (!evaluation) {
+               ret = -1;
+               goto error;
+       }
+
+       *_evaluation = evaluation;
+       ret = sizeof(struct lttng_evaluation_session_consumed_size_comm);
+       return ret;
+error:
+       lttng_evaluation_destroy(evaluation);
+       return ret;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_get_threshold(
+               const struct lttng_condition *condition,
+               uint64_t *consumed_threshold_bytes)
+{
+       struct lttng_condition_session_consumed_size *consumed;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || !consumed_threshold_bytes) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+                       parent);
+       if (!consumed->consumed_threshold_bytes.set) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *consumed_threshold_bytes = consumed->consumed_threshold_bytes.value;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_set_threshold(
+               struct lttng_condition *condition, uint64_t consumed_threshold_bytes)
+{
+       struct lttng_condition_session_consumed_size *consumed;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition)) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+                       parent);
+       consumed->consumed_threshold_bytes.set = true;
+       consumed->consumed_threshold_bytes.value = consumed_threshold_bytes;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_get_session_name(
+               const struct lttng_condition *condition,
+               const char **session_name)
+{
+       struct lttng_condition_session_consumed_size *consumed;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) || !session_name) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+                       parent);
+       if (!consumed->session_name) {
+               status = LTTNG_CONDITION_STATUS_UNSET;
+               goto end;
+       }
+       *session_name = consumed->session_name;
+end:
+       return status;
+}
+
+enum lttng_condition_status
+lttng_condition_session_consumed_size_set_session_name(
+               struct lttng_condition *condition, const char *session_name)
+{
+       char *session_name_copy;
+       struct lttng_condition_session_consumed_size *consumed;
+       enum lttng_condition_status status = LTTNG_CONDITION_STATUS_OK;
+
+       if (!condition || !IS_CONSUMED_SIZE_CONDITION(condition) ||
+                       !session_name || strlen(session_name) == 0) {
+               status = LTTNG_CONDITION_STATUS_INVALID;
+               goto end;
+       }
+
+       consumed = container_of(condition, struct lttng_condition_session_consumed_size,
+                       parent);
+       session_name_copy = strdup(session_name);
+       if (!session_name_copy) {
+               status = LTTNG_CONDITION_STATUS_ERROR;
+               goto end;
+       }
+
+       if (consumed->session_name) {
+               free(consumed->session_name);
+       }
+       consumed->session_name = session_name_copy;
+end:
+       return status;
+}
+
+static
+ssize_t lttng_evaluation_session_consumed_size_serialize(
+               struct lttng_evaluation *evaluation, char *buf)
+{
+       ssize_t ret;
+       struct lttng_evaluation_session_consumed_size *consumed;
+
+       consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size,
+                       parent);
+       if (buf) {
+               struct lttng_evaluation_session_consumed_size_comm comm = {
+                       .session_consumed = consumed->session_consumed,
+               };
+
+               memcpy(buf, &comm, sizeof(comm));
+       }
+
+       ret = sizeof(struct lttng_evaluation_session_consumed_size_comm);
+       return ret;
+}
+
+static
+void lttng_evaluation_session_consumed_size_destroy(
+               struct lttng_evaluation *evaluation)
+{
+       struct lttng_evaluation_session_consumed_size *consumed;
+
+       consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size,
+                       parent);
+       free(consumed);
+}
+
+LTTNG_HIDDEN
+struct lttng_evaluation *lttng_evaluation_session_consumed_size_create(
+               enum lttng_condition_type type, uint64_t consumed)
+{
+       struct lttng_evaluation_session_consumed_size *consumed_eval;
+
+       consumed_eval = zmalloc(sizeof(struct lttng_evaluation_session_consumed_size));
+       if (!consumed_eval) {
+               goto end;
+       }
+
+       consumed_eval->parent.type = type;
+       consumed_eval->session_consumed = consumed;
+       consumed_eval->parent.serialize = lttng_evaluation_session_consumed_size_serialize;
+       consumed_eval->parent.destroy = lttng_evaluation_session_consumed_size_destroy;
+end:
+       return &consumed_eval->parent;
+}
+
+enum lttng_evaluation_status
+lttng_evaluation_session_consumed_size_get_consumed_size(
+               const struct lttng_evaluation *evaluation,
+               uint64_t *session_consumed)
+{
+       struct lttng_evaluation_session_consumed_size *consumed;
+       enum lttng_evaluation_status status = LTTNG_EVALUATION_STATUS_OK;
+
+       if (!evaluation || !IS_CONSUMED_SIZE_EVALUATION(evaluation) ||
+                       !session_consumed) {
+               status = LTTNG_EVALUATION_STATUS_INVALID;
+               goto end;
+       }
+
+       consumed = container_of(evaluation, struct lttng_evaluation_session_consumed_size,
+                       parent);
+       *session_consumed = consumed->session_consumed;
+end:
+       return status;
+}
index 48daec06a1af0a8f9ed88d51ebd34e1f4a244d5a..8f08d9904a16a05713edf20ae21b20d7b003af83 100644 (file)
@@ -595,6 +595,10 @@ struct lttcomm_consumer_channel_monitor_msg {
         * Lowest and highest usage (bytes) at the moment the sample was taken.
         */
        uint64_t lowest, highest;
+       /*
+        * Sum of all the consumed positions for a channel.
+        */
+       uint64_t total_consumed;
 } LTTNG_PACKED;
 
 /*
This page took 0.042474 seconds and 4 git commands to generate.