Extract the lost packets and discarded events counters
authorJulien Desfossez <jdesfossez@efficios.com>
Mon, 29 Jun 2015 19:58:01 +0000 (15:58 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 8 Mar 2016 03:09:02 +0000 (22:09 -0500)
The "lttng list" command now shows the number of discarded events
(discard mode) or lost packets (overwrite mode).

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
22 files changed:
src/bin/lttng-sessiond/buffer-registry.c
src/bin/lttng-sessiond/buffer-registry.h
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/trace-ust.h
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/bin/lttng/commands/list.c
src/common/config/config-session-abi.h
src/common/config/session-config.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/kernel-ctl/kernel-ctl.c
src/common/kernel-ctl/kernel-ctl.h
src/common/kernel-ctl/kernel-ioctl.h
src/common/mi-lttng-3.0.xsd
src/common/mi-lttng.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c
src/common/ust-consumer/ust-consumer.h

index 7fbe3565b85d509d30db5561fedf743d4becde00..7002b33c7b2969bca7ce97f04b2ffd64ecd394a6 100644 (file)
@@ -327,6 +327,44 @@ end:
        return reg;
 }
 
+/*
+ * Find the consumer channel key from a UST session per-uid channel key.
+ *
+ * Return the matching key or -1 if not found.
+ */
+int buffer_reg_uid_consumer_channel_key(
+               struct cds_list_head *buffer_reg_uid_list,
+               uint64_t usess_id, uint64_t chan_key,
+               uint64_t *consumer_chan_key)
+{
+       struct lttng_ht_iter iter;
+       struct buffer_reg_uid *uid_reg = NULL;
+       struct buffer_reg_session *session_reg = NULL;
+       struct buffer_reg_channel *reg_chan;
+       int ret = -1;
+
+       rcu_read_lock();
+       /*
+        * For the per-uid registry, we have to iterate since we don't have the
+        * uid and bitness key.
+        */
+       cds_list_for_each_entry(uid_reg, buffer_reg_uid_list, lnode) {
+               session_reg = uid_reg->registry;
+               cds_lfht_for_each_entry(session_reg->channels->ht,
+                               &iter.iter, reg_chan, node.node) {
+                       if (reg_chan->key == chan_key) {
+                               *consumer_chan_key = reg_chan->consumer_key;
+                               ret = 0;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Allocate and initialize a buffer registry channel with the given key. Set
  * regp with the object pointer.
index 7a817ec70464786c44c6d12e5e663516c63cfce1..db1ce0cbbbc821d338aa7edb3a81e2093e2e2084 100644 (file)
@@ -150,4 +150,9 @@ void buffer_reg_stream_destroy(struct buffer_reg_stream *regp,
 /* Global registry. */
 void buffer_reg_destroy_registries(void);
 
+int buffer_reg_uid_consumer_channel_key(
+               struct cds_list_head *buffer_reg_uid_list,
+               uint64_t usess_id, uint64_t chan_key,
+               uint64_t *consumer_chan_key);
+
 #endif /* LTTNG_BUFFER_REGISTRY_H */
index 87c6eb36dd2c6264848b021f772dddfe85abd0d5..c054db917a40c4f86bd217255224a060875f116e 100644 (file)
@@ -139,13 +139,97 @@ error:
        return ret;
 }
 
+/*
+ * Get run-time attributes if the session has been started (discarded events,
+ * lost packets).
+ */
+static int get_kernel_runtime_stats(struct ltt_session *session,
+               struct ltt_kernel_channel *kchan, uint64_t *discarded_events,
+               uint64_t *lost_packets)
+{
+       int ret;
+
+       if (!session->has_been_started) {
+               ret = 0;
+               *discarded_events = 0;
+               *lost_packets = 0;
+               goto end;
+       }
+
+       ret = consumer_get_discarded_events(session->id, kchan->fd,
+                       session->kernel_session->consumer,
+                       discarded_events);
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = consumer_get_lost_packets(session->id, kchan->fd,
+                       session->kernel_session->consumer,
+                       lost_packets);
+       if (ret < 0) {
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Get run-time attributes if the session has been started (discarded events,
+ * lost packets).
+ */
+static int get_ust_runtime_stats(struct ltt_session *session,
+               struct ltt_ust_channel *uchan, uint64_t *discarded_events,
+               uint64_t *lost_packets)
+{
+       int ret;
+       struct ltt_ust_session *usess;
+
+       usess = session->ust_session;
+
+       if (!usess || !session->has_been_started) {
+               *discarded_events = 0;
+               *lost_packets = 0;
+               ret = 0;
+               goto end;
+       }
+
+       if (usess->buffer_type == LTTNG_BUFFER_PER_UID) {
+               ret = ust_app_uid_get_channel_runtime_stats(usess->id,
+                               &usess->buffer_reg_uid_list,
+                               usess->consumer, uchan->id,
+                               uchan->attr.overwrite,
+                               discarded_events,
+                               lost_packets);
+       } else if (usess->buffer_type == LTTNG_BUFFER_PER_PID) {
+               ret = ust_app_pid_get_channel_runtime_stats(usess,
+                               uchan, usess->consumer,
+                               uchan->attr.overwrite,
+                               discarded_events,
+                               lost_packets);
+               if (ret < 0) {
+                       goto end;
+               }
+               *discarded_events += uchan->per_pid_closed_app_discarded;
+               *lost_packets += uchan->per_pid_closed_app_lost;
+       } else {
+               ERR("Unsupported buffer type");
+               ret = -1;
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Fill lttng_channel array of all channels.
  */
 static void list_lttng_channels(enum lttng_domain_type domain,
-               struct ltt_session *session, struct lttng_channel *channels)
+               struct ltt_session *session, struct lttng_channel *channels,
+               struct lttcomm_channel_extended *chan_exts)
 {
-       int i = 0;
+       int i = 0, ret;
        struct ltt_kernel_channel *kchan;
 
        DBG("Listing channels for session %s", session->name);
@@ -156,9 +240,19 @@ static void list_lttng_channels(enum lttng_domain_type domain,
                if (session->kernel_session != NULL) {
                        cds_list_for_each_entry(kchan,
                                        &session->kernel_session->channel_list.head, list) {
+                               uint64_t discarded_events, lost_packets;
+
+                               ret = get_kernel_runtime_stats(session, kchan,
+                                               &discarded_events, &lost_packets);
+                               if (ret < 0) {
+                                       goto end;
+                               }
                                /* Copy lttng_channel struct to array */
                                memcpy(&channels[i], kchan->channel, sizeof(struct lttng_channel));
                                channels[i].enabled = kchan->enabled;
+                               chan_exts[i].discarded_events =
+                                               discarded_events;
+                               chan_exts[i].lost_packets = lost_packets;
                                i++;
                        }
                }
@@ -171,6 +265,8 @@ static void list_lttng_channels(enum lttng_domain_type domain,
                rcu_read_lock();
                cds_lfht_for_each_entry(session->ust_session->domain_global.channels->ht,
                                &iter.iter, uchan, node.node) {
+                       uint64_t discarded_events, lost_packets;
+
                        strncpy(channels[i].name, uchan->name, LTTNG_SYMBOL_NAME_LEN);
                        channels[i].attr.overwrite = uchan->attr.overwrite;
                        channels[i].attr.subbuf_size = uchan->attr.subbuf_size;
@@ -188,6 +284,14 @@ static void list_lttng_channels(enum lttng_domain_type domain,
                                channels[i].attr.output = LTTNG_EVENT_MMAP;
                                break;
                        }
+
+                       ret = get_ust_runtime_stats(session, uchan,
+                                       &discarded_events, &lost_packets);
+                       if (ret < 0) {
+                               break;
+                       }
+                       chan_exts[i].discarded_events = discarded_events;
+                       chan_exts[i].lost_packets = lost_packets;
                        i++;
                }
                rcu_read_unlock();
@@ -196,6 +300,9 @@ static void list_lttng_channels(enum lttng_domain_type domain,
        default:
                break;
        }
+
+end:
+       return;
 }
 
 static void increment_extended_len(const char *filter_expression,
index 650d002cdda75ab6cbcb03b1676ed92505552818..fcda59363dfb23019aa793dd244c400c044296de 100644 (file)
@@ -1369,3 +1369,117 @@ error:
        health_code_update();
        return ret;
 }
+
+/*
+ * Ask the consumer the number of discarded events for a channel.
+ */
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+               struct consumer_output *consumer, uint64_t *discarded)
+{
+       int ret;
+       struct consumer_socket *socket;
+       struct lttng_ht_iter iter;
+       struct lttcomm_consumer_msg msg;
+
+       assert(consumer);
+
+       DBG3("Consumer discarded events id %" PRIu64, session_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
+       msg.u.discarded_events.session_id = session_id;
+       msg.u.discarded_events.channel_key = channel_key;
+
+       *discarded = 0;
+
+       /* Send command for each consumer */
+       rcu_read_lock();
+       cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+                       node.node) {
+               uint64_t consumer_discarded = 0;
+               pthread_mutex_lock(socket->lock);
+               ret = consumer_socket_send(socket, &msg, sizeof(msg));
+               if (ret < 0) {
+                       pthread_mutex_unlock(socket->lock);
+                       goto end;
+               }
+
+               /*
+                * No need for a recv reply status because the answer to the
+                * command is the reply status message.
+                */
+               ret = consumer_socket_recv(socket, &consumer_discarded,
+                               sizeof(consumer_discarded));
+               if (ret < 0) {
+                       ERR("get discarded events");
+                       pthread_mutex_unlock(socket->lock);
+                       goto end;
+               }
+               pthread_mutex_unlock(socket->lock);
+               *discarded += consumer_discarded;
+       }
+       ret = 0;
+       DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64,
+                       *discarded, session_id);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Ask the consumer the number of lost packets for a channel.
+ */
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+               struct consumer_output *consumer, uint64_t *lost)
+{
+       int ret;
+       struct consumer_socket *socket;
+       struct lttng_ht_iter iter;
+       struct lttcomm_consumer_msg msg;
+
+       assert(consumer);
+
+       DBG3("Consumer lost packets id %" PRIu64, session_id);
+
+       memset(&msg, 0, sizeof(msg));
+       msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
+       msg.u.lost_packets.session_id = session_id;
+       msg.u.lost_packets.channel_key = channel_key;
+
+       *lost = 0;
+
+       /* Send command for each consumer */
+       rcu_read_lock();
+       cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+                       node.node) {
+               uint64_t consumer_lost = 0;
+               pthread_mutex_lock(socket->lock);
+               ret = consumer_socket_send(socket, &msg, sizeof(msg));
+               if (ret < 0) {
+                       pthread_mutex_unlock(socket->lock);
+                       goto end;
+               }
+
+               /*
+                * No need for a recv reply status because the answer to the
+                * command is the reply status message.
+                */
+               ret = consumer_socket_recv(socket, &consumer_lost,
+                               sizeof(consumer_lost));
+               if (ret < 0) {
+                       ERR("get lost packets");
+                       pthread_mutex_unlock(socket->lock);
+                       goto end;
+               }
+               pthread_mutex_unlock(socket->lock);
+               *lost += consumer_lost;
+       }
+       ret = 0;
+       DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64,
+                       *lost, session_id);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
index 47e59c7c28de86a8e550df650bd705b5dc9de9f3..c84d85140a03dc647695edc6ea190b7281757146 100644 (file)
@@ -281,6 +281,10 @@ int consumer_push_metadata(struct consumer_socket *socket,
                uint64_t metadata_key, char *metadata_str, size_t len,
                size_t target_offset);
 int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+               struct consumer_output *consumer, uint64_t *discarded);
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+               struct consumer_output *consumer, uint64_t *lost);
 
 /* Snapshot command. */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
index 5a48885d8f0a386d398d2afe4336a364b3d03452..fc9eef4b7b927820e15429e76afe098d5892ceb0 100644 (file)
@@ -80,6 +80,8 @@ struct ltt_ust_channel {
        struct lttng_ht_node_str node;
        uint64_t tracefile_size;
        uint64_t tracefile_count;
+       uint64_t per_pid_closed_app_discarded;
+       uint64_t per_pid_closed_app_lost;
 };
 
 /* UST domain global (LTTNG_DOMAIN_UST) */
index 160a15640ff4efd5709af571c9ab3b64b68c0334..e3995bdf612d0e82a2fea38e0c4008fd4795b8f2 100644 (file)
@@ -40,6 +40,7 @@
 #include "ust-consumer.h"
 #include "ust-ctl.h"
 #include "utils.h"
+#include "session.h"
 
 static
 int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
@@ -371,6 +372,57 @@ void delete_ust_app_channel_rcu(struct rcu_head *head)
        free(ua_chan);
 }
 
+/*
+ * Extract the lost packet or discarded events counter when the channel is
+ * being deleted and store the value in the parent channel so we can
+ * access it from lttng list and at stop/destroy.
+ */
+static
+void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan)
+{
+       uint64_t discarded = 0, lost = 0;
+       struct ltt_session *session;
+       struct ltt_ust_channel *uchan;
+
+       if (ua_chan->attr.type != LTTNG_UST_CHAN_PER_CPU) {
+               return;
+       }
+
+       rcu_read_lock();
+       session = session_find_by_id(ua_chan->session->tracing_id);
+       if (!session) {
+               ERR("Missing LTT session to get discarded events");
+               goto end;
+       }
+       if (!session->ust_session) {
+               ERR("Missing UST session to get discarded events");
+               goto end;
+       }
+
+       if (ua_chan->attr.overwrite) {
+               consumer_get_lost_packets(ua_chan->session->tracing_id,
+                               ua_chan->key, session->ust_session->consumer,
+                               &lost);
+       } else {
+               consumer_get_discarded_events(ua_chan->session->tracing_id,
+                               ua_chan->key, session->ust_session->consumer,
+                               &discarded);
+       }
+       uchan = trace_ust_find_channel_by_name(
+                       session->ust_session->domain_global.channels,
+                       ua_chan->name);
+       if (!uchan) {
+               ERR("Missing UST channel to store discarded counters");
+               goto end;
+       }
+
+       uchan->per_pid_closed_app_discarded += discarded;
+       uchan->per_pid_closed_app_lost += lost;
+
+end:
+       rcu_read_unlock();
+}
+
 /*
  * Delete ust app channel safely. RCU read lock must be held before calling
  * this function.
@@ -418,6 +470,7 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan,
                if (registry) {
                        ust_registry_channel_del_free(registry, ua_chan->key);
                }
+               save_per_pid_lost_discarded_counters(ua_chan);
        }
 
        if (ua_chan->obj != NULL) {
@@ -1874,7 +1927,8 @@ static void shadow_copy_session(struct ust_app_session *ua_sess,
 
                DBG2("Channel %s not found on shadow session copy, creating it",
                                uchan->name);
-               ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr);
+               ua_chan = alloc_ust_app_channel(uchan->name, ua_sess,
+                               &uchan->attr);
                if (ua_chan == NULL) {
                        /* malloc failed FIXME: Might want to do handle ENOMEM .. */
                        continue;
@@ -5847,3 +5901,80 @@ uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *use
 
        return tot_size;
 }
+
+int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
+               struct cds_list_head *buffer_reg_uid_list,
+               struct consumer_output *consumer, uint64_t uchan_id,
+               int overwrite, uint64_t *discarded, uint64_t *lost)
+{
+       int ret;
+       uint64_t consumer_chan_key;
+
+       ret = buffer_reg_uid_consumer_channel_key(
+                       buffer_reg_uid_list, ust_session_id,
+                       uchan_id, &consumer_chan_key);
+       if (ret < 0) {
+               goto end;
+       }
+
+       if (overwrite) {
+               ret = consumer_get_lost_packets(ust_session_id,
+                               consumer_chan_key, consumer, lost);
+       } else {
+               ret = consumer_get_discarded_events(ust_session_id,
+                               consumer_chan_key, consumer, discarded);
+       }
+
+end:
+       return ret;
+}
+
+int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
+               struct ltt_ust_channel *uchan,
+               struct consumer_output *consumer, int overwrite,
+               uint64_t *discarded, uint64_t *lost)
+{
+       int ret = 0;
+       struct lttng_ht_iter iter;
+       struct lttng_ht_node_str *ua_chan_node;
+       struct ust_app *app;
+       struct ust_app_session *ua_sess;
+       struct ust_app_channel *ua_chan;
+
+       rcu_read_lock();
+       /*
+        * Iterate over every registered applications, return when we
+        * found one in the right session and channel.
+        */
+       cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+               struct lttng_ht_iter uiter;
+
+               ua_sess = lookup_session_by_app(usess, app);
+               if (ua_sess == NULL) {
+                       continue;
+               }
+
+               /* Get channel */
+               lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter);
+               ua_chan_node = lttng_ht_iter_get_node_str(&uiter);
+               /* If the session is found for the app, the channel must be there */
+               assert(ua_chan_node);
+
+               ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node);
+
+               if (overwrite) {
+                       ret = consumer_get_lost_packets(usess->id, ua_chan->key,
+                                       consumer, lost);
+                       goto end;
+               } else {
+                       ret = consumer_get_discarded_events(usess->id,
+                                       ua_chan->key, consumer, discarded);
+                       goto end;
+               }
+               goto end;
+       }
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
index 4398fe2414a68707f5fc9352b1b5eab0cfb0214c..d02f353c856a985b7a97737a230f1b0f07946492 100644 (file)
@@ -344,6 +344,14 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
 uint64_t ust_app_get_size_one_more_packet_per_stream(
                struct ltt_ust_session *usess, uint64_t cur_nr_packets);
 struct ust_app *ust_app_find_by_sock(int sock);
+int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
+               struct cds_list_head *buffer_reg_uid_list,
+               struct consumer_output *consumer, uint64_t uchan_id,
+               int overwrite, uint64_t *discarded, uint64_t *lost);
+int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
+               struct ltt_ust_channel *uchan,
+               struct consumer_output *consumer,
+               int overwrite, uint64_t *discarded, uint64_t *lost);
 
 static inline
 int ust_app_supported(void)
@@ -558,6 +566,23 @@ uint64_t ust_app_get_size_one_more_packet_per_stream(
                struct ltt_ust_session *usess, uint64_t cur_nr_packets) {
        return 0;
 }
+static inline
+int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
+               struct cds_list_head *buffer_reg_uid_list,
+               struct consumer_output *consumer, int overwrite,
+               uint64_t uchan_id, uint64_t *discarded, uint64_t *lost)
+{
+       return 0;
+}
+
+static inline
+int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
+               struct ltt_ust_channel *uchan,
+               struct consumer_output *consumer,
+               int overwrite, uint64_t *discarded, uint64_t *lost)
+{
+       return 0;
+}
 
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
index f59e4bc0b3576dcf43ab66a7ef1a465dee2e25df..8e69e09efdc6bb85be8206a05d2d703a606f7ab0 100644 (file)
@@ -1190,6 +1190,23 @@ error:
  */
 static void print_channel(struct lttng_channel *channel)
 {
+       int ret;
+       uint64_t discarded_events, lost_packets;
+
+       ret = lttng_channel_get_discarded_event_count(channel,
+                       &discarded_events);
+       if (ret) {
+               ERR("Failed to retrieve discarded event count of channel");
+               return;
+       }
+
+       ret = lttng_channel_get_lost_packet_count(channel,
+                       &lost_packets);
+       if (ret) {
+               ERR("Failed to retrieve lost packet count of channel");
+               return;
+       }
+
        MSG("- %s:%s\n", channel->name, enabled_string(channel->enabled));
 
        MSG("%sAttributes:", indent4);
@@ -1200,6 +1217,8 @@ static void print_channel(struct lttng_channel *channel)
        MSG("%sread timer interval: %u", indent6, channel->attr.read_timer_interval);
        MSG("%strace file count: %" PRIu64, indent6, channel->attr.tracefile_count);
        MSG("%strace file size (bytes): %" PRIu64, indent6, channel->attr.tracefile_size);
+       MSG("%sdiscarded events: %" PRIu64, indent6, discarded_events);
+       MSG("%slost packets: %" PRIu64, indent6, lost_packets);
        switch (channel->attr.output) {
                case LTTNG_EVENT_SPLICE:
                        MSG("%soutput: splice()", indent6);
index f7cee34eb44a0d9886fd112d731101f65f6968fc..df77c63a5bf6d1884071dcb12b78242aae86bb2e 100644 (file)
@@ -46,6 +46,8 @@ extern const char * const config_element_output_type;
 extern const char * const config_element_tracefile_size;
 extern const char * const config_element_tracefile_count;
 extern const char * const config_element_live_timer_interval;
+extern const char * const config_element_discarded_events;
+extern const char * const config_element_lost_packets;
 extern const char * const config_element_type;
 extern const char * const config_element_buffer_type;
 extern const char * const config_element_session;
index dec891349a5384a928abf153447452b51131498c..d89249879aece2dc99ea2b138dfcc3784154f097 100644 (file)
@@ -95,6 +95,8 @@ const char * const config_element_output_type = "output_type";
 const char * const config_element_tracefile_size = "tracefile_size";
 const char * const config_element_tracefile_count = "tracefile_count";
 const char * const config_element_live_timer_interval = "live_timer_interval";
+const char * const config_element_discarded_events = "discarded_events";
+const char * const config_element_lost_packets = "lost_packets";
 const char * const config_element_type = "type";
 const char * const config_element_buffer_type = "buffer_type";
 const char * const config_element_session = "session";
index 5625e59ddd5cb8a2092d21d1145149df63ed66d6..7911a5b4e8d4d0701d40e88d3ebdd6841c4061af 100644 (file)
@@ -571,6 +571,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_fd = -1;
+       stream->last_sequence_number = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
index c7ef3fb38317a795b5d9f7bc394a708619108966..3af7db9136f3b011eef851397a2747ac86878ce2 100644 (file)
@@ -58,6 +58,8 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
        LTTNG_CONSUMER_SNAPSHOT_METADATA,
        LTTNG_CONSUMER_STREAMS_SENT,
+       LTTNG_CONSUMER_DISCARDED_EVENTS,
+       LTTNG_CONSUMER_LOST_PACKETS,
 };
 
 /* State of each fd in consumer */
@@ -211,6 +213,10 @@ struct lttng_consumer_channel {
        int nr_stream_fds;
        char root_shm_path[PATH_MAX];
        char shm_path[PATH_MAX];
+       /* Total number of discarded events for that channel. */
+       uint64_t discarded_events;
+       /* Total number of missed packets due to overwriting (overwrite). */
+       uint64_t lost_packets;
 };
 
 /*
@@ -351,6 +357,13 @@ struct lttng_consumer_stream {
         * to the channel.
         */
        uint64_t ust_metadata_pushed;
+       /*
+        * Copy of the last discarded event value to detect the overflow of
+        * the counter.
+        */
+       uint64_t last_discarded_events;
+       /* Copy of the sequence number of the last packet extracted. */
+       uint64_t last_sequence_number;
        /*
         * FD of the index file for this stream.
         */
index fac1f0b8a2d58b988d9a30cb6789b8c6bd0c3a5c..2ea5fa114223f101362e4d780d106b25b3812238 100644 (file)
@@ -940,6 +940,66 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                goto end_nosignal;
        }
+       case LTTNG_CONSUMER_DISCARDED_EVENTS:
+       {
+               uint64_t ret;
+               struct lttng_consumer_channel *channel;
+               uint64_t id = msg.u.discarded_events.session_id;
+               uint64_t key = msg.u.discarded_events.channel_key;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("Kernel consumer discarded events channel %"
+                                       PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               DBG("Kernel consumer discarded events command for session id %"
+                               PRIu64 ", channel key %" PRIu64, id, key);
+
+               ret = channel->discarded_events;
+
+               health_code_update();
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               if (ret < 0) {
+                       PERROR("send discarded events");
+                       goto error_fatal;
+               }
+
+               break;
+       }
+       case LTTNG_CONSUMER_LOST_PACKETS:
+       {
+               uint64_t ret;
+               struct lttng_consumer_channel *channel;
+               uint64_t id = msg.u.lost_packets.session_id;
+               uint64_t key = msg.u.lost_packets.channel_key;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("Kernel consumer lost packets channel %"
+                                       PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               DBG("Kernel consumer lost packets command for session id %"
+                               PRIu64 ", channel key %" PRIu64, id, key);
+
+               ret = channel->lost_packets;
+
+               health_code_update();
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               if (ret < 0) {
+                       PERROR("send lost packets");
+                       goto error_fatal;
+               }
+
+               break;
+       }
        default:
                goto end_nosignal;
        }
@@ -1051,6 +1111,61 @@ end:
        return ret;
 }
 
+static
+int update_stream_stats(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       uint64_t seq, discarded;
+
+       ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
+       if (ret < 0) {
+               PERROR("kernctl_get_sequence_number");
+               goto end;
+       }
+
+       /*
+        * Start the sequence when we extract the first packet in case we don't
+        * start at 0 (for example if a consumer is not connected to the
+        * session immediately after the beginning).
+        */
+       if (stream->last_sequence_number == -1ULL) {
+               stream->last_sequence_number = seq;
+       } else if (seq > stream->last_sequence_number) {
+               stream->chan->lost_packets += seq -
+                               stream->last_sequence_number - 1;
+       } else {
+               /* seq <= last_sequence_number */
+               ERR("Sequence number inconsistent : prev = %" PRIu64
+                               ", current = %" PRIu64,
+                               stream->last_sequence_number, seq);
+               ret = -1;
+               goto end;
+       }
+       stream->last_sequence_number = seq;
+
+       ret = kernctl_get_events_discarded(stream->wait_fd, &discarded);
+       if (ret < 0) {
+               PERROR("kernctl_get_events_discarded");
+               goto end;
+       }
+       if (discarded < stream->last_discarded_events) {
+               /*
+                * Overflow has occured. We assume only one wrap-around
+                * has occured.
+                */
+               stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
+                       stream->last_discarded_events + discarded;
+       } else {
+               stream->chan->discarded_events += discarded -
+                       stream->last_discarded_events;
+       }
+       stream->last_discarded_events = discarded;
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Consume data on a file descriptor and write it on a trace file.
  */
@@ -1115,6 +1230,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        }
                        goto end;
                }
+               ret = update_stream_stats(stream);
+               if (ret < 0) {
+                       goto end;
+               }
        } else {
                write_index = 0;
        }
index 8574c1f050b88f387e78e8b06ac80d6c97df5be6..82d766e1c1e70b53f0b0e3ec354bd7ab93da621b 100644 (file)
@@ -532,3 +532,9 @@ int kernctl_get_current_timestamp(int fd, uint64_t *ts)
 {
        return ioctl(fd, LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP, ts);
 }
+
+/* Returns the packet sequence number of the current sub-buffer. */
+int kernctl_get_sequence_number(int fd, uint64_t *seq)
+{
+       return ioctl(fd, LTTNG_RING_BUFFER_GET_SEQ_NUM, seq);
+}
index b71b28530521273e9f937d1256b98be26a5e0a3f..ab8154ca6c97e49a71540f9969b87ab973828e11 100644 (file)
@@ -99,5 +99,6 @@ int kernctl_get_content_size(int fd, uint64_t *content_size);
 int kernctl_get_packet_size(int fd, uint64_t *packet_size);
 int kernctl_get_stream_id(int fd, uint64_t *stream_id);
 int kernctl_get_current_timestamp(int fd, uint64_t *ts);
+int kernctl_get_sequence_number(int fd, uint64_t *seq);
 
 #endif /* _LTTNG_KERNEL_CTL_H */
index e469b5fe5bbcc49ed719a0067a4ff96db683038c..d988a83ea8e6081c98d85a3f482300689a644641 100644 (file)
@@ -64,6 +64,8 @@
 #define LTTNG_RING_BUFFER_GET_STREAM_ID           _IOR(0xF6, 0x25, uint64_t)
 /* returns the current timestamp */
 #define LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP   _IOR(0xF6, 0x26, uint64_t)
+/* returns the packet sequence number */
+#define LTTNG_RING_BUFFER_GET_SEQ_NUM             _IOR(0xF6, 0x27, uint64_t)
 
 /* Old ABI (without support for 32/64 bits compat) */
 /* LTTng file descriptor ioctl */
index 528ff60718d65307e33bc06cdd98a99b2ac86673..5c58f6896bc50e56af45c6157010f38a7bb149aa 100644 (file)
@@ -360,6 +360,8 @@ THE SOFTWARE.
                        <xs:element name="tracefile_size" type="tns:uint64_type" default="0" minOccurs="0" /> <!-- bytes -->
                        <xs:element name="tracefile_count" type="tns:uint64_type" default="0" minOccurs="0" />
                        <xs:element name="live_timer_interval" type="tns:uint32_type" default="0" minOccurs="0" /> <!-- usec -->
+                       <xs:element name="discarded_events" type="tns:uint64_type" default="0" minOccurs="0" />
+                       <xs:element name="lost_packets" type="tns:uint64_type" default="0" minOccurs="0" />
                </xs:all>
        </xs:complexType>
 
index 72b0791527eddf25ba89fb1a7ed4b27197fc9101..5530acee9a1ce11e91aa2cef3e863ca385901a98 100644 (file)
@@ -21,6 +21,7 @@
 #include <common/config/session-config.h>
 #include <common/defaults.h>
 #include <lttng/snapshot-internal.h>
+#include <lttng/channel.h>
 #include "mi-lttng.h"
 
 #include <assert.h>
@@ -866,9 +867,22 @@ int mi_lttng_channel_attr(struct mi_writer *writer,
                struct lttng_channel_attr *attr)
 {
        int ret = 0;
+       struct lttng_channel *chan = caa_container_of(attr,
+                       struct lttng_channel, attr);
+       uint64_t discarded_events, lost_packets;
 
        assert(attr);
 
+       ret = lttng_channel_get_discarded_event_count(chan, &discarded_events);
+       if (ret) {
+               goto end;
+       }
+
+       ret = lttng_channel_get_lost_packet_count(chan, &lost_packets);
+       if (ret) {
+               goto end;
+       }
+
        /* Opening Attributes */
        ret = mi_lttng_writer_open_element(writer, config_element_attributes);
        if (ret) {
@@ -947,6 +961,22 @@ int mi_lttng_channel_attr(struct mi_writer *writer,
                goto end;
        }
 
+       /* Discarded events */
+       ret = mi_lttng_writer_write_element_unsigned_int(writer,
+               config_element_discarded_events,
+               discarded_events);
+       if (ret) {
+               goto end;
+       }
+
+       /* Lost packets */
+       ret = mi_lttng_writer_write_element_unsigned_int(writer,
+               config_element_lost_packets,
+               lost_packets);
+       if (ret) {
+               goto end;
+       }
+
        /* Closing attributes */
        ret = mi_lttng_writer_close_element(writer);
        if (ret) {
index d436094ff36d0c6a71671d380b34c13e1c106c56..a60fc4506a9c63b42cc36300a79e08b125e69802 100644 (file)
@@ -509,6 +509,14 @@ struct lttcomm_consumer_msg {
                        uint64_t channel_key;
                        uint64_t net_seq_idx;
                } LTTNG_PACKED sent_streams;
+               struct {
+                       uint64_t session_id;
+                       uint64_t channel_key;
+               } LTTNG_PACKED discarded_events;
+               struct {
+                       uint64_t session_id;
+                       uint64_t channel_key;
+               } LTTNG_PACKED lost_packets;
        } u;
 } LTTNG_PACKED;
 
index 44bcfdc374d9a0bd9b2ce869059fa912794a19c1..a17c48f2f9ba2639fd39754550e8c1eacddacb86 100644 (file)
@@ -1666,6 +1666,105 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
                break;
        }
+       case LTTNG_CONSUMER_DISCARDED_EVENTS:
+       {
+               uint64_t ret;
+               struct lttng_ht_iter iter;
+               struct lttng_ht *ht;
+               struct lttng_consumer_stream *stream;
+               uint64_t id = msg.u.discarded_events.session_id;
+               uint64_t key = msg.u.discarded_events.channel_key;
+
+               DBG("UST consumer discarded events command for session id %"
+                               PRIu64, id);
+               rcu_read_lock();
+               pthread_mutex_lock(&consumer_data.lock);
+
+               ht = consumer_data.stream_list_ht;
+
+               /*
+                * We only need a reference to the channel, but they are not
+                * directly indexed, so we just use the first matching stream
+                * to extract the information we need, we default to 0 if not
+                * found (no events are dropped if the channel is not yet in
+                * use).
+                */
+               ret = 0;
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
+                       if (stream->chan->key == key) {
+                               ret = stream->chan->discarded_events;
+                               break;
+                       }
+               }
+               pthread_mutex_unlock(&consumer_data.lock);
+               rcu_read_unlock();
+
+               DBG("UST consumer discarded events command for session id %"
+                               PRIu64 ", channel key %" PRIu64, id, key);
+
+               health_code_update();
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               if (ret < 0) {
+                       PERROR("send discarded events");
+                       goto error_fatal;
+               }
+
+               break;
+       }
+       case LTTNG_CONSUMER_LOST_PACKETS:
+       {
+               uint64_t ret;
+               struct lttng_ht_iter iter;
+               struct lttng_ht *ht;
+               struct lttng_consumer_stream *stream;
+               uint64_t id = msg.u.lost_packets.session_id;
+               uint64_t key = msg.u.lost_packets.channel_key;
+
+               DBG("UST consumer lost packets command for session id %"
+                               PRIu64, id);
+               rcu_read_lock();
+               pthread_mutex_lock(&consumer_data.lock);
+
+               ht = consumer_data.stream_list_ht;
+
+               /*
+                * We only need a reference to the channel, but they are not
+                * directly indexed, so we just use the first matching stream
+                * to extract the information we need, we default to 0 if not
+                * found (no packets lost if the channel is not yet in use).
+                */
+               ret = 0;
+               cds_lfht_for_each_entry_duplicate(ht->ht,
+                               ht->hash_fct(&id, lttng_ht_seed),
+                               ht->match_fct, &id,
+                               &iter.iter, stream, node_session_id.node) {
+                       if (stream->chan->key == key) {
+                               ret = stream->chan->lost_packets;
+                               break;
+                       }
+               }
+               pthread_mutex_unlock(&consumer_data.lock);
+               rcu_read_unlock();
+
+               DBG("UST consumer lost packets command for session id %"
+                               PRIu64 ", channel key %" PRIu64, id, key);
+
+               health_code_update();
+
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+               if (ret < 0) {
+                       PERROR("send lost packets");
+                       goto error_fatal;
+               }
+
+               break;
+       }
        default:
                break;
        }
@@ -1808,6 +1907,16 @@ int lttng_ustconsumer_get_current_timestamp(
        return ustctl_get_current_timestamp(stream->ustream, ts);
 }
 
+int lttng_ustconsumer_get_sequence_number(
+               struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+       assert(stream);
+       assert(stream->ustream);
+       assert(seq);
+
+       return ustctl_get_sequence_number(stream->ustream, seq);
+}
+
 /*
  * Called when the stream signal the consumer that it has hang up.
  */
@@ -2104,6 +2213,61 @@ end:
        return ret;
 }
 
+static
+int update_stream_stats(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       uint64_t seq, discarded;
+
+       ret = ustctl_get_sequence_number(stream->ustream, &seq);
+       if (ret < 0) {
+               PERROR("ustctl_get_sequence_number");
+               goto end;
+       }
+       /*
+        * Start the sequence when we extract the first packet in case we don't
+        * start at 0 (for example if a consumer is not connected to the
+        * session immediately after the beginning).
+        */
+       if (stream->last_sequence_number == -1ULL) {
+               stream->last_sequence_number = seq;
+       } else if (seq > stream->last_sequence_number) {
+               stream->chan->lost_packets += seq -
+                               stream->last_sequence_number - 1;
+       } else {
+               /* seq <= last_sequence_number */
+               ERR("Sequence number inconsistent : prev = %" PRIu64
+                               ", current = %" PRIu64,
+                               stream->last_sequence_number, seq);
+               ret = -1;
+               goto end;
+       }
+       stream->last_sequence_number = seq;
+
+       ret = ustctl_get_events_discarded(stream->ustream, &discarded);
+       if (ret < 0) {
+               PERROR("kernctl_get_events_discarded");
+               goto end;
+       }
+       if (discarded < stream->last_discarded_events) {
+               /*
+                * Overflow has occured. We assume only one wrap-around
+                * has occured.
+                */
+               stream->chan->discarded_events +=
+                               (1ULL << (CAA_BITS_PER_LONG - 1)) -
+                               stream->last_discarded_events + discarded;
+       } else {
+               stream->chan->discarded_events += discarded -
+                               stream->last_discarded_events;
+       }
+       stream->last_discarded_events = discarded;
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Read subbuffer from the given stream.
  *
@@ -2187,6 +2351,13 @@ retry:
                if (ret < 0) {
                        goto end;
                }
+
+               /* Update the stream's sequence and discarded events count. */
+               ret = update_stream_stats(stream);
+               if (ret < 0) {
+                       PERROR("kernctl_get_events_discarded");
+                       goto end;
+               }
        } else {
                write_index = 0;
        }
index 01a50773c263ab562c9955105f7f5495c8fc06f7..b19fe16a5349408a115b1ea65d28ba12b2d40228 100644 (file)
@@ -67,6 +67,8 @@ void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
                int producer);
 int lttng_ustconsumer_get_current_timestamp(
                struct lttng_consumer_stream *stream, uint64_t *ts);
+int lttng_ustconsumer_get_sequence_number(
+               struct lttng_consumer_stream *stream, uint64_t *seq);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -211,6 +213,11 @@ int lttng_ustconsumer_get_current_timestamp(
 {
        return -ENOSYS;
 }
+int lttng_ustconsumer_get_sequence_number(
+               struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+       return -ENOSYS;
+}
 static inline
 int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
                uint64_t *stream_id)
This page took 0.043702 seconds and 4 git commands to generate.