From: Julien Desfossez Date: Mon, 29 Jun 2015 19:58:01 +0000 (-0400) Subject: Extract the lost packets and discarded events counters X-Git-Tag: v2.8.0-rc1~115 X-Git-Url: https://git.lttng.org./?a=commitdiff_plain;h=fb83fe64f250bec7416f18891a8264450c61ead3;p=lttng-tools.git Extract the lost packets and discarded events counters The "lttng list" command now shows the number of discarded events (discard mode) or lost packets (overwrite mode). Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-sessiond/buffer-registry.c b/src/bin/lttng-sessiond/buffer-registry.c index 7fbe3565b..7002b33c7 100644 --- a/src/bin/lttng-sessiond/buffer-registry.c +++ b/src/bin/lttng-sessiond/buffer-registry.c @@ -327,6 +327,44 @@ end: return reg; } +/* + * Find the consumer channel key from a UST session per-uid channel key. + * + * Return the matching key or -1 if not found. + */ +int buffer_reg_uid_consumer_channel_key( + struct cds_list_head *buffer_reg_uid_list, + uint64_t usess_id, uint64_t chan_key, + uint64_t *consumer_chan_key) +{ + struct lttng_ht_iter iter; + struct buffer_reg_uid *uid_reg = NULL; + struct buffer_reg_session *session_reg = NULL; + struct buffer_reg_channel *reg_chan; + int ret = -1; + + rcu_read_lock(); + /* + * For the per-uid registry, we have to iterate since we don't have the + * uid and bitness key. + */ + cds_list_for_each_entry(uid_reg, buffer_reg_uid_list, lnode) { + session_reg = uid_reg->registry; + cds_lfht_for_each_entry(session_reg->channels->ht, + &iter.iter, reg_chan, node.node) { + if (reg_chan->key == chan_key) { + *consumer_chan_key = reg_chan->consumer_key; + ret = 0; + goto end; + } + } + } + +end: + rcu_read_unlock(); + return ret; +} + /* * Allocate and initialize a buffer registry channel with the given key. Set * regp with the object pointer. diff --git a/src/bin/lttng-sessiond/buffer-registry.h b/src/bin/lttng-sessiond/buffer-registry.h index 7a817ec70..db1ce0cbb 100644 --- a/src/bin/lttng-sessiond/buffer-registry.h +++ b/src/bin/lttng-sessiond/buffer-registry.h @@ -150,4 +150,9 @@ void buffer_reg_stream_destroy(struct buffer_reg_stream *regp, /* Global registry. */ void buffer_reg_destroy_registries(void); +int buffer_reg_uid_consumer_channel_key( + struct cds_list_head *buffer_reg_uid_list, + uint64_t usess_id, uint64_t chan_key, + uint64_t *consumer_chan_key); + #endif /* LTTNG_BUFFER_REGISTRY_H */ diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 87c6eb36d..c054db917 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -139,13 +139,97 @@ error: return ret; } +/* + * Get run-time attributes if the session has been started (discarded events, + * lost packets). + */ +static int get_kernel_runtime_stats(struct ltt_session *session, + struct ltt_kernel_channel *kchan, uint64_t *discarded_events, + uint64_t *lost_packets) +{ + int ret; + + if (!session->has_been_started) { + ret = 0; + *discarded_events = 0; + *lost_packets = 0; + goto end; + } + + ret = consumer_get_discarded_events(session->id, kchan->fd, + session->kernel_session->consumer, + discarded_events); + if (ret < 0) { + goto end; + } + + ret = consumer_get_lost_packets(session->id, kchan->fd, + session->kernel_session->consumer, + lost_packets); + if (ret < 0) { + goto end; + } + +end: + return ret; +} + +/* + * Get run-time attributes if the session has been started (discarded events, + * lost packets). + */ +static int get_ust_runtime_stats(struct ltt_session *session, + struct ltt_ust_channel *uchan, uint64_t *discarded_events, + uint64_t *lost_packets) +{ + int ret; + struct ltt_ust_session *usess; + + usess = session->ust_session; + + if (!usess || !session->has_been_started) { + *discarded_events = 0; + *lost_packets = 0; + ret = 0; + goto end; + } + + if (usess->buffer_type == LTTNG_BUFFER_PER_UID) { + ret = ust_app_uid_get_channel_runtime_stats(usess->id, + &usess->buffer_reg_uid_list, + usess->consumer, uchan->id, + uchan->attr.overwrite, + discarded_events, + lost_packets); + } else if (usess->buffer_type == LTTNG_BUFFER_PER_PID) { + ret = ust_app_pid_get_channel_runtime_stats(usess, + uchan, usess->consumer, + uchan->attr.overwrite, + discarded_events, + lost_packets); + if (ret < 0) { + goto end; + } + *discarded_events += uchan->per_pid_closed_app_discarded; + *lost_packets += uchan->per_pid_closed_app_lost; + } else { + ERR("Unsupported buffer type"); + ret = -1; + goto end; + } + +end: + return ret; +} + /* * Fill lttng_channel array of all channels. */ static void list_lttng_channels(enum lttng_domain_type domain, - struct ltt_session *session, struct lttng_channel *channels) + struct ltt_session *session, struct lttng_channel *channels, + struct lttcomm_channel_extended *chan_exts) { - int i = 0; + int i = 0, ret; struct ltt_kernel_channel *kchan; DBG("Listing channels for session %s", session->name); @@ -156,9 +240,19 @@ static void list_lttng_channels(enum lttng_domain_type domain, if (session->kernel_session != NULL) { cds_list_for_each_entry(kchan, &session->kernel_session->channel_list.head, list) { + uint64_t discarded_events, lost_packets; + + ret = get_kernel_runtime_stats(session, kchan, + &discarded_events, &lost_packets); + if (ret < 0) { + goto end; + } /* Copy lttng_channel struct to array */ memcpy(&channels[i], kchan->channel, sizeof(struct lttng_channel)); channels[i].enabled = kchan->enabled; + chan_exts[i].discarded_events = + discarded_events; + chan_exts[i].lost_packets = lost_packets; i++; } } @@ -171,6 +265,8 @@ static void list_lttng_channels(enum lttng_domain_type domain, rcu_read_lock(); cds_lfht_for_each_entry(session->ust_session->domain_global.channels->ht, &iter.iter, uchan, node.node) { + uint64_t discarded_events, lost_packets; + strncpy(channels[i].name, uchan->name, LTTNG_SYMBOL_NAME_LEN); channels[i].attr.overwrite = uchan->attr.overwrite; channels[i].attr.subbuf_size = uchan->attr.subbuf_size; @@ -188,6 +284,14 @@ static void list_lttng_channels(enum lttng_domain_type domain, channels[i].attr.output = LTTNG_EVENT_MMAP; break; } + + ret = get_ust_runtime_stats(session, uchan, + &discarded_events, &lost_packets); + if (ret < 0) { + break; + } + chan_exts[i].discarded_events = discarded_events; + chan_exts[i].lost_packets = lost_packets; i++; } rcu_read_unlock(); @@ -196,6 +300,9 @@ static void list_lttng_channels(enum lttng_domain_type domain, default: break; } + +end: + return; } static void increment_extended_len(const char *filter_expression, diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 650d002cd..fcda59363 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1369,3 +1369,117 @@ error: health_code_update(); return ret; } + +/* + * Ask the consumer the number of discarded events for a channel. + */ +int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *discarded) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer discarded events id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS; + msg.u.discarded_events.session_id = session_id; + msg.u.discarded_events.channel_key = channel_key; + + *discarded = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_discarded = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_discarded, + sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *discarded += consumer_discarded; + } + ret = 0; + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, + *discarded, session_id); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Ask the consumer the number of lost packets for a channel. + */ +int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *lost) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer lost packets id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS; + msg.u.lost_packets.session_id = session_id; + msg.u.lost_packets.channel_key = channel_key; + + *lost = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, + sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *lost += consumer_lost; + } + ret = 0; + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, + *lost, session_id); + +end: + rcu_read_unlock(); + return ret; +} diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 47e59c7c2..c84d85140 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -281,6 +281,10 @@ int consumer_push_metadata(struct consumer_socket *socket, uint64_t metadata_key, char *metadata_str, size_t len, size_t target_offset); int consumer_flush_channel(struct consumer_socket *socket, uint64_t key); +int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *discarded); +int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *lost); /* Snapshot command. */ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, diff --git a/src/bin/lttng-sessiond/trace-ust.h b/src/bin/lttng-sessiond/trace-ust.h index 5a48885d8..fc9eef4b7 100644 --- a/src/bin/lttng-sessiond/trace-ust.h +++ b/src/bin/lttng-sessiond/trace-ust.h @@ -80,6 +80,8 @@ struct ltt_ust_channel { struct lttng_ht_node_str node; uint64_t tracefile_size; uint64_t tracefile_count; + uint64_t per_pid_closed_app_discarded; + uint64_t per_pid_closed_app_lost; }; /* UST domain global (LTTNG_DOMAIN_UST) */ diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 160a15640..e3995bdf6 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -40,6 +40,7 @@ #include "ust-consumer.h" #include "ust-ctl.h" #include "utils.h" +#include "session.h" static int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess); @@ -371,6 +372,57 @@ void delete_ust_app_channel_rcu(struct rcu_head *head) free(ua_chan); } +/* + * Extract the lost packet or discarded events counter when the channel is + * being deleted and store the value in the parent channel so we can + * access it from lttng list and at stop/destroy. + */ +static +void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan) +{ + uint64_t discarded = 0, lost = 0; + struct ltt_session *session; + struct ltt_ust_channel *uchan; + + if (ua_chan->attr.type != LTTNG_UST_CHAN_PER_CPU) { + return; + } + + rcu_read_lock(); + session = session_find_by_id(ua_chan->session->tracing_id); + if (!session) { + ERR("Missing LTT session to get discarded events"); + goto end; + } + if (!session->ust_session) { + ERR("Missing UST session to get discarded events"); + goto end; + } + + if (ua_chan->attr.overwrite) { + consumer_get_lost_packets(ua_chan->session->tracing_id, + ua_chan->key, session->ust_session->consumer, + &lost); + } else { + consumer_get_discarded_events(ua_chan->session->tracing_id, + ua_chan->key, session->ust_session->consumer, + &discarded); + } + uchan = trace_ust_find_channel_by_name( + session->ust_session->domain_global.channels, + ua_chan->name); + if (!uchan) { + ERR("Missing UST channel to store discarded counters"); + goto end; + } + + uchan->per_pid_closed_app_discarded += discarded; + uchan->per_pid_closed_app_lost += lost; + +end: + rcu_read_unlock(); +} + /* * Delete ust app channel safely. RCU read lock must be held before calling * this function. @@ -418,6 +470,7 @@ void delete_ust_app_channel(int sock, struct ust_app_channel *ua_chan, if (registry) { ust_registry_channel_del_free(registry, ua_chan->key); } + save_per_pid_lost_discarded_counters(ua_chan); } if (ua_chan->obj != NULL) { @@ -1874,7 +1927,8 @@ static void shadow_copy_session(struct ust_app_session *ua_sess, DBG2("Channel %s not found on shadow session copy, creating it", uchan->name); - ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr); + ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, + &uchan->attr); if (ua_chan == NULL) { /* malloc failed FIXME: Might want to do handle ENOMEM .. */ continue; @@ -5847,3 +5901,80 @@ uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *use return tot_size; } + +int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id, + struct cds_list_head *buffer_reg_uid_list, + struct consumer_output *consumer, uint64_t uchan_id, + int overwrite, uint64_t *discarded, uint64_t *lost) +{ + int ret; + uint64_t consumer_chan_key; + + ret = buffer_reg_uid_consumer_channel_key( + buffer_reg_uid_list, ust_session_id, + uchan_id, &consumer_chan_key); + if (ret < 0) { + goto end; + } + + if (overwrite) { + ret = consumer_get_lost_packets(ust_session_id, + consumer_chan_key, consumer, lost); + } else { + ret = consumer_get_discarded_events(ust_session_id, + consumer_chan_key, consumer, discarded); + } + +end: + return ret; +} + +int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess, + struct ltt_ust_channel *uchan, + struct consumer_output *consumer, int overwrite, + uint64_t *discarded, uint64_t *lost) +{ + int ret = 0; + struct lttng_ht_iter iter; + struct lttng_ht_node_str *ua_chan_node; + struct ust_app *app; + struct ust_app_session *ua_sess; + struct ust_app_channel *ua_chan; + + rcu_read_lock(); + /* + * Iterate over every registered applications, return when we + * found one in the right session and channel. + */ + cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) { + struct lttng_ht_iter uiter; + + ua_sess = lookup_session_by_app(usess, app); + if (ua_sess == NULL) { + continue; + } + + /* Get channel */ + lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter); + ua_chan_node = lttng_ht_iter_get_node_str(&uiter); + /* If the session is found for the app, the channel must be there */ + assert(ua_chan_node); + + ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node); + + if (overwrite) { + ret = consumer_get_lost_packets(usess->id, ua_chan->key, + consumer, lost); + goto end; + } else { + ret = consumer_get_discarded_events(usess->id, + ua_chan->key, consumer, discarded); + goto end; + } + goto end; + } + +end: + rcu_read_unlock(); + return ret; +} diff --git a/src/bin/lttng-sessiond/ust-app.h b/src/bin/lttng-sessiond/ust-app.h index 4398fe241..d02f353c8 100644 --- a/src/bin/lttng-sessiond/ust-app.h +++ b/src/bin/lttng-sessiond/ust-app.h @@ -344,6 +344,14 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, uint64_t ust_app_get_size_one_more_packet_per_stream( struct ltt_ust_session *usess, uint64_t cur_nr_packets); struct ust_app *ust_app_find_by_sock(int sock); +int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id, + struct cds_list_head *buffer_reg_uid_list, + struct consumer_output *consumer, uint64_t uchan_id, + int overwrite, uint64_t *discarded, uint64_t *lost); +int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess, + struct ltt_ust_channel *uchan, + struct consumer_output *consumer, + int overwrite, uint64_t *discarded, uint64_t *lost); static inline int ust_app_supported(void) @@ -558,6 +566,23 @@ uint64_t ust_app_get_size_one_more_packet_per_stream( struct ltt_ust_session *usess, uint64_t cur_nr_packets) { return 0; } +static inline +int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id, + struct cds_list_head *buffer_reg_uid_list, + struct consumer_output *consumer, int overwrite, + uint64_t uchan_id, uint64_t *discarded, uint64_t *lost) +{ + return 0; +} + +static inline +int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess, + struct ltt_ust_channel *uchan, + struct consumer_output *consumer, + int overwrite, uint64_t *discarded, uint64_t *lost) +{ + return 0; +} #endif /* HAVE_LIBLTTNG_UST_CTL */ diff --git a/src/bin/lttng/commands/list.c b/src/bin/lttng/commands/list.c index f59e4bc0b..8e69e09ef 100644 --- a/src/bin/lttng/commands/list.c +++ b/src/bin/lttng/commands/list.c @@ -1190,6 +1190,23 @@ error: */ static void print_channel(struct lttng_channel *channel) { + int ret; + uint64_t discarded_events, lost_packets; + + ret = lttng_channel_get_discarded_event_count(channel, + &discarded_events); + if (ret) { + ERR("Failed to retrieve discarded event count of channel"); + return; + } + + ret = lttng_channel_get_lost_packet_count(channel, + &lost_packets); + if (ret) { + ERR("Failed to retrieve lost packet count of channel"); + return; + } + MSG("- %s:%s\n", channel->name, enabled_string(channel->enabled)); MSG("%sAttributes:", indent4); @@ -1200,6 +1217,8 @@ static void print_channel(struct lttng_channel *channel) MSG("%sread timer interval: %u", indent6, channel->attr.read_timer_interval); MSG("%strace file count: %" PRIu64, indent6, channel->attr.tracefile_count); MSG("%strace file size (bytes): %" PRIu64, indent6, channel->attr.tracefile_size); + MSG("%sdiscarded events: %" PRIu64, indent6, discarded_events); + MSG("%slost packets: %" PRIu64, indent6, lost_packets); switch (channel->attr.output) { case LTTNG_EVENT_SPLICE: MSG("%soutput: splice()", indent6); diff --git a/src/common/config/config-session-abi.h b/src/common/config/config-session-abi.h index f7cee34eb..df77c63a5 100644 --- a/src/common/config/config-session-abi.h +++ b/src/common/config/config-session-abi.h @@ -46,6 +46,8 @@ extern const char * const config_element_output_type; extern const char * const config_element_tracefile_size; extern const char * const config_element_tracefile_count; extern const char * const config_element_live_timer_interval; +extern const char * const config_element_discarded_events; +extern const char * const config_element_lost_packets; extern const char * const config_element_type; extern const char * const config_element_buffer_type; extern const char * const config_element_session; diff --git a/src/common/config/session-config.c b/src/common/config/session-config.c index dec891349..d89249879 100644 --- a/src/common/config/session-config.c +++ b/src/common/config/session-config.c @@ -95,6 +95,8 @@ const char * const config_element_output_type = "output_type"; const char * const config_element_tracefile_size = "tracefile_size"; const char * const config_element_tracefile_count = "tracefile_count"; const char * const config_element_live_timer_interval = "live_timer_interval"; +const char * const config_element_discarded_events = "discarded_events"; +const char * const config_element_lost_packets = "lost_packets"; const char * const config_element_type = "type"; const char * const config_element_buffer_type = "buffer_type"; const char * const config_element_session = "session"; diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 5625e59dd..7911a5b4e 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -571,6 +571,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_fd = -1; + stream->last_sequence_number = -1ULL; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index c7ef3fb38..3af7db913 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -58,6 +58,8 @@ enum lttng_consumer_command { LTTNG_CONSUMER_SNAPSHOT_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_METADATA, LTTNG_CONSUMER_STREAMS_SENT, + LTTNG_CONSUMER_DISCARDED_EVENTS, + LTTNG_CONSUMER_LOST_PACKETS, }; /* State of each fd in consumer */ @@ -211,6 +213,10 @@ struct lttng_consumer_channel { int nr_stream_fds; char root_shm_path[PATH_MAX]; char shm_path[PATH_MAX]; + /* Total number of discarded events for that channel. */ + uint64_t discarded_events; + /* Total number of missed packets due to overwriting (overwrite). */ + uint64_t lost_packets; }; /* @@ -351,6 +357,13 @@ struct lttng_consumer_stream { * to the channel. */ uint64_t ust_metadata_pushed; + /* + * Copy of the last discarded event value to detect the overflow of + * the counter. + */ + uint64_t last_discarded_events; + /* Copy of the sequence number of the last packet extracted. */ + uint64_t last_sequence_number; /* * FD of the index file for this stream. */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index fac1f0b8a..2ea5fa114 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -940,6 +940,66 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + case LTTNG_CONSUMER_DISCARDED_EVENTS: + { + uint64_t ret; + struct lttng_consumer_channel *channel; + uint64_t id = msg.u.discarded_events.session_id; + uint64_t key = msg.u.discarded_events.channel_key; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer discarded events channel %" + PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + DBG("Kernel consumer discarded events command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + ret = channel->discarded_events; + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send discarded events"); + goto error_fatal; + } + + break; + } + case LTTNG_CONSUMER_LOST_PACKETS: + { + uint64_t ret; + struct lttng_consumer_channel *channel; + uint64_t id = msg.u.lost_packets.session_id; + uint64_t key = msg.u.lost_packets.channel_key; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer lost packets channel %" + PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + DBG("Kernel consumer lost packets command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + ret = channel->lost_packets; + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send lost packets"); + goto error_fatal; + } + + break; + } default: goto end_nosignal; } @@ -1051,6 +1111,61 @@ end: return ret; } +static +int update_stream_stats(struct lttng_consumer_stream *stream) +{ + int ret; + uint64_t seq, discarded; + + ret = kernctl_get_sequence_number(stream->wait_fd, &seq); + if (ret < 0) { + PERROR("kernctl_get_sequence_number"); + goto end; + } + + /* + * Start the sequence when we extract the first packet in case we don't + * start at 0 (for example if a consumer is not connected to the + * session immediately after the beginning). + */ + if (stream->last_sequence_number == -1ULL) { + stream->last_sequence_number = seq; + } else if (seq > stream->last_sequence_number) { + stream->chan->lost_packets += seq - + stream->last_sequence_number - 1; + } else { + /* seq <= last_sequence_number */ + ERR("Sequence number inconsistent : prev = %" PRIu64 + ", current = %" PRIu64, + stream->last_sequence_number, seq); + ret = -1; + goto end; + } + stream->last_sequence_number = seq; + + ret = kernctl_get_events_discarded(stream->wait_fd, &discarded); + if (ret < 0) { + PERROR("kernctl_get_events_discarded"); + goto end; + } + if (discarded < stream->last_discarded_events) { + /* + * Overflow has occured. We assume only one wrap-around + * has occured. + */ + stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) - + stream->last_discarded_events + discarded; + } else { + stream->chan->discarded_events += discarded - + stream->last_discarded_events; + } + stream->last_discarded_events = discarded; + ret = 0; + +end: + return ret; +} + /* * Consume data on a file descriptor and write it on a trace file. */ @@ -1115,6 +1230,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } goto end; } + ret = update_stream_stats(stream); + if (ret < 0) { + goto end; + } } else { write_index = 0; } diff --git a/src/common/kernel-ctl/kernel-ctl.c b/src/common/kernel-ctl/kernel-ctl.c index 8574c1f05..82d766e1c 100644 --- a/src/common/kernel-ctl/kernel-ctl.c +++ b/src/common/kernel-ctl/kernel-ctl.c @@ -532,3 +532,9 @@ int kernctl_get_current_timestamp(int fd, uint64_t *ts) { return ioctl(fd, LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP, ts); } + +/* Returns the packet sequence number of the current sub-buffer. */ +int kernctl_get_sequence_number(int fd, uint64_t *seq) +{ + return ioctl(fd, LTTNG_RING_BUFFER_GET_SEQ_NUM, seq); +} diff --git a/src/common/kernel-ctl/kernel-ctl.h b/src/common/kernel-ctl/kernel-ctl.h index b71b28530..ab8154ca6 100644 --- a/src/common/kernel-ctl/kernel-ctl.h +++ b/src/common/kernel-ctl/kernel-ctl.h @@ -99,5 +99,6 @@ int kernctl_get_content_size(int fd, uint64_t *content_size); int kernctl_get_packet_size(int fd, uint64_t *packet_size); int kernctl_get_stream_id(int fd, uint64_t *stream_id); int kernctl_get_current_timestamp(int fd, uint64_t *ts); +int kernctl_get_sequence_number(int fd, uint64_t *seq); #endif /* _LTTNG_KERNEL_CTL_H */ diff --git a/src/common/kernel-ctl/kernel-ioctl.h b/src/common/kernel-ctl/kernel-ioctl.h index e469b5fe5..d988a83ea 100644 --- a/src/common/kernel-ctl/kernel-ioctl.h +++ b/src/common/kernel-ctl/kernel-ioctl.h @@ -64,6 +64,8 @@ #define LTTNG_RING_BUFFER_GET_STREAM_ID _IOR(0xF6, 0x25, uint64_t) /* returns the current timestamp */ #define LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP _IOR(0xF6, 0x26, uint64_t) +/* returns the packet sequence number */ +#define LTTNG_RING_BUFFER_GET_SEQ_NUM _IOR(0xF6, 0x27, uint64_t) /* Old ABI (without support for 32/64 bits compat) */ /* LTTng file descriptor ioctl */ diff --git a/src/common/mi-lttng-3.0.xsd b/src/common/mi-lttng-3.0.xsd index 528ff6071..5c58f6896 100644 --- a/src/common/mi-lttng-3.0.xsd +++ b/src/common/mi-lttng-3.0.xsd @@ -360,6 +360,8 @@ THE SOFTWARE. + + diff --git a/src/common/mi-lttng.c b/src/common/mi-lttng.c index 72b079152..5530acee9 100644 --- a/src/common/mi-lttng.c +++ b/src/common/mi-lttng.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "mi-lttng.h" #include @@ -866,9 +867,22 @@ int mi_lttng_channel_attr(struct mi_writer *writer, struct lttng_channel_attr *attr) { int ret = 0; + struct lttng_channel *chan = caa_container_of(attr, + struct lttng_channel, attr); + uint64_t discarded_events, lost_packets; assert(attr); + ret = lttng_channel_get_discarded_event_count(chan, &discarded_events); + if (ret) { + goto end; + } + + ret = lttng_channel_get_lost_packet_count(chan, &lost_packets); + if (ret) { + goto end; + } + /* Opening Attributes */ ret = mi_lttng_writer_open_element(writer, config_element_attributes); if (ret) { @@ -947,6 +961,22 @@ int mi_lttng_channel_attr(struct mi_writer *writer, goto end; } + /* Discarded events */ + ret = mi_lttng_writer_write_element_unsigned_int(writer, + config_element_discarded_events, + discarded_events); + if (ret) { + goto end; + } + + /* Lost packets */ + ret = mi_lttng_writer_write_element_unsigned_int(writer, + config_element_lost_packets, + lost_packets); + if (ret) { + goto end; + } + /* Closing attributes */ ret = mi_lttng_writer_close_element(writer); if (ret) { diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index d436094ff..a60fc4506 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -509,6 +509,14 @@ struct lttcomm_consumer_msg { uint64_t channel_key; uint64_t net_seq_idx; } LTTNG_PACKED sent_streams; + struct { + uint64_t session_id; + uint64_t channel_key; + } LTTNG_PACKED discarded_events; + struct { + uint64_t session_id; + uint64_t channel_key; + } LTTNG_PACKED lost_packets; } u; } LTTNG_PACKED; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 44bcfdc37..a17c48f2f 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1666,6 +1666,105 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); break; } + case LTTNG_CONSUMER_DISCARDED_EVENTS: + { + uint64_t ret; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + uint64_t id = msg.u.discarded_events.session_id; + uint64_t key = msg.u.discarded_events.channel_key; + + DBG("UST consumer discarded events command for session id %" + PRIu64, id); + rcu_read_lock(); + pthread_mutex_lock(&consumer_data.lock); + + ht = consumer_data.stream_list_ht; + + /* + * We only need a reference to the channel, but they are not + * directly indexed, so we just use the first matching stream + * to extract the information we need, we default to 0 if not + * found (no events are dropped if the channel is not yet in + * use). + */ + ret = 0; + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { + if (stream->chan->key == key) { + ret = stream->chan->discarded_events; + break; + } + } + pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); + + DBG("UST consumer discarded events command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send discarded events"); + goto error_fatal; + } + + break; + } + case LTTNG_CONSUMER_LOST_PACKETS: + { + uint64_t ret; + struct lttng_ht_iter iter; + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + uint64_t id = msg.u.lost_packets.session_id; + uint64_t key = msg.u.lost_packets.channel_key; + + DBG("UST consumer lost packets command for session id %" + PRIu64, id); + rcu_read_lock(); + pthread_mutex_lock(&consumer_data.lock); + + ht = consumer_data.stream_list_ht; + + /* + * We only need a reference to the channel, but they are not + * directly indexed, so we just use the first matching stream + * to extract the information we need, we default to 0 if not + * found (no packets lost if the channel is not yet in use). + */ + ret = 0; + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { + if (stream->chan->key == key) { + ret = stream->chan->lost_packets; + break; + } + } + pthread_mutex_unlock(&consumer_data.lock); + rcu_read_unlock(); + + DBG("UST consumer lost packets command for session id %" + PRIu64 ", channel key %" PRIu64, id, key); + + health_code_update(); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send lost packets"); + goto error_fatal; + } + + break; + } default: break; } @@ -1808,6 +1907,16 @@ int lttng_ustconsumer_get_current_timestamp( return ustctl_get_current_timestamp(stream->ustream, ts); } +int lttng_ustconsumer_get_sequence_number( + struct lttng_consumer_stream *stream, uint64_t *seq) +{ + assert(stream); + assert(stream->ustream); + assert(seq); + + return ustctl_get_sequence_number(stream->ustream, seq); +} + /* * Called when the stream signal the consumer that it has hang up. */ @@ -2104,6 +2213,61 @@ end: return ret; } +static +int update_stream_stats(struct lttng_consumer_stream *stream) +{ + int ret; + uint64_t seq, discarded; + + ret = ustctl_get_sequence_number(stream->ustream, &seq); + if (ret < 0) { + PERROR("ustctl_get_sequence_number"); + goto end; + } + /* + * Start the sequence when we extract the first packet in case we don't + * start at 0 (for example if a consumer is not connected to the + * session immediately after the beginning). + */ + if (stream->last_sequence_number == -1ULL) { + stream->last_sequence_number = seq; + } else if (seq > stream->last_sequence_number) { + stream->chan->lost_packets += seq - + stream->last_sequence_number - 1; + } else { + /* seq <= last_sequence_number */ + ERR("Sequence number inconsistent : prev = %" PRIu64 + ", current = %" PRIu64, + stream->last_sequence_number, seq); + ret = -1; + goto end; + } + stream->last_sequence_number = seq; + + ret = ustctl_get_events_discarded(stream->ustream, &discarded); + if (ret < 0) { + PERROR("kernctl_get_events_discarded"); + goto end; + } + if (discarded < stream->last_discarded_events) { + /* + * Overflow has occured. We assume only one wrap-around + * has occured. + */ + stream->chan->discarded_events += + (1ULL << (CAA_BITS_PER_LONG - 1)) - + stream->last_discarded_events + discarded; + } else { + stream->chan->discarded_events += discarded - + stream->last_discarded_events; + } + stream->last_discarded_events = discarded; + ret = 0; + +end: + return ret; +} + /* * Read subbuffer from the given stream. * @@ -2187,6 +2351,13 @@ retry: if (ret < 0) { goto end; } + + /* Update the stream's sequence and discarded events count. */ + ret = update_stream_stats(stream); + if (ret < 0) { + PERROR("kernctl_get_events_discarded"); + goto end; + } } else { write_index = 0; } diff --git a/src/common/ust-consumer/ust-consumer.h b/src/common/ust-consumer/ust-consumer.h index 01a50773c..b19fe16a5 100644 --- a/src/common/ust-consumer/ust-consumer.h +++ b/src/common/ust-consumer/ust-consumer.h @@ -67,6 +67,8 @@ void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream, int producer); int lttng_ustconsumer_get_current_timestamp( struct lttng_consumer_stream *stream, uint64_t *ts); +int lttng_ustconsumer_get_sequence_number( + struct lttng_consumer_stream *stream, uint64_t *seq); #else /* HAVE_LIBLTTNG_UST_CTL */ @@ -211,6 +213,11 @@ int lttng_ustconsumer_get_current_timestamp( { return -ENOSYS; } +int lttng_ustconsumer_get_sequence_number( + struct lttng_consumer_stream *stream, uint64_t *seq) +{ + return -ENOSYS; +} static inline int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream, uint64_t *stream_id)