return reg;
}
+/*
+ * Find the consumer channel key from a UST session per-uid channel key.
+ *
+ * Return the matching key or -1 if not found.
+ */
+int buffer_reg_uid_consumer_channel_key(
+ struct cds_list_head *buffer_reg_uid_list,
+ uint64_t usess_id, uint64_t chan_key,
+ uint64_t *consumer_chan_key)
+{
+ struct lttng_ht_iter iter;
+ struct buffer_reg_uid *uid_reg = NULL;
+ struct buffer_reg_session *session_reg = NULL;
+ struct buffer_reg_channel *reg_chan;
+ int ret = -1;
+
+ rcu_read_lock();
+ /*
+ * For the per-uid registry, we have to iterate since we don't have the
+ * uid and bitness key.
+ */
+ cds_list_for_each_entry(uid_reg, buffer_reg_uid_list, lnode) {
+ session_reg = uid_reg->registry;
+ cds_lfht_for_each_entry(session_reg->channels->ht,
+ &iter.iter, reg_chan, node.node) {
+ if (reg_chan->key == chan_key) {
+ *consumer_chan_key = reg_chan->consumer_key;
+ ret = 0;
+ goto end;
+ }
+ }
+ }
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Allocate and initialize a buffer registry channel with the given key. Set
* regp with the object pointer.
/* Global registry. */
void buffer_reg_destroy_registries(void);
+int buffer_reg_uid_consumer_channel_key(
+ struct cds_list_head *buffer_reg_uid_list,
+ uint64_t usess_id, uint64_t chan_key,
+ uint64_t *consumer_chan_key);
+
#endif /* LTTNG_BUFFER_REGISTRY_H */
return ret;
}
+/*
+ * Get run-time attributes if the session has been started (discarded events,
+ * lost packets).
+ */
+static int get_kernel_runtime_stats(struct ltt_session *session,
+ struct ltt_kernel_channel *kchan, uint64_t *discarded_events,
+ uint64_t *lost_packets)
+{
+ int ret;
+
+ if (!session->has_been_started) {
+ ret = 0;
+ *discarded_events = 0;
+ *lost_packets = 0;
+ goto end;
+ }
+
+ ret = consumer_get_discarded_events(session->id, kchan->fd,
+ session->kernel_session->consumer,
+ discarded_events);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = consumer_get_lost_packets(session->id, kchan->fd,
+ session->kernel_session->consumer,
+ lost_packets);
+ if (ret < 0) {
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Get run-time attributes if the session has been started (discarded events,
+ * lost packets).
+ */
+static int get_ust_runtime_stats(struct ltt_session *session,
+ struct ltt_ust_channel *uchan, uint64_t *discarded_events,
+ uint64_t *lost_packets)
+{
+ int ret;
+ struct ltt_ust_session *usess;
+
+ usess = session->ust_session;
+
+ if (!usess || !session->has_been_started) {
+ *discarded_events = 0;
+ *lost_packets = 0;
+ ret = 0;
+ goto end;
+ }
+
+ if (usess->buffer_type == LTTNG_BUFFER_PER_UID) {
+ ret = ust_app_uid_get_channel_runtime_stats(usess->id,
+ &usess->buffer_reg_uid_list,
+ usess->consumer, uchan->id,
+ uchan->attr.overwrite,
+ discarded_events,
+ lost_packets);
+ } else if (usess->buffer_type == LTTNG_BUFFER_PER_PID) {
+ ret = ust_app_pid_get_channel_runtime_stats(usess,
+ uchan, usess->consumer,
+ uchan->attr.overwrite,
+ discarded_events,
+ lost_packets);
+ if (ret < 0) {
+ goto end;
+ }
+ *discarded_events += uchan->per_pid_closed_app_discarded;
+ *lost_packets += uchan->per_pid_closed_app_lost;
+ } else {
+ ERR("Unsupported buffer type");
+ ret = -1;
+ goto end;
+ }
+
+end:
+ return ret;
+}
+
/*
* Fill lttng_channel array of all channels.
*/
static void list_lttng_channels(enum lttng_domain_type domain,
- struct ltt_session *session, struct lttng_channel *channels)
+ struct ltt_session *session, struct lttng_channel *channels,
+ struct lttcomm_channel_extended *chan_exts)
{
- int i = 0;
+ int i = 0, ret;
struct ltt_kernel_channel *kchan;
DBG("Listing channels for session %s", session->name);
if (session->kernel_session != NULL) {
cds_list_for_each_entry(kchan,
&session->kernel_session->channel_list.head, list) {
+ uint64_t discarded_events, lost_packets;
+
+ ret = get_kernel_runtime_stats(session, kchan,
+ &discarded_events, &lost_packets);
+ if (ret < 0) {
+ goto end;
+ }
/* Copy lttng_channel struct to array */
memcpy(&channels[i], kchan->channel, sizeof(struct lttng_channel));
channels[i].enabled = kchan->enabled;
+ chan_exts[i].discarded_events =
+ discarded_events;
+ chan_exts[i].lost_packets = lost_packets;
i++;
}
}
rcu_read_lock();
cds_lfht_for_each_entry(session->ust_session->domain_global.channels->ht,
&iter.iter, uchan, node.node) {
+ uint64_t discarded_events, lost_packets;
+
strncpy(channels[i].name, uchan->name, LTTNG_SYMBOL_NAME_LEN);
channels[i].attr.overwrite = uchan->attr.overwrite;
channels[i].attr.subbuf_size = uchan->attr.subbuf_size;
channels[i].attr.output = LTTNG_EVENT_MMAP;
break;
}
+
+ ret = get_ust_runtime_stats(session, uchan,
+ &discarded_events, &lost_packets);
+ if (ret < 0) {
+ break;
+ }
+ chan_exts[i].discarded_events = discarded_events;
+ chan_exts[i].lost_packets = lost_packets;
i++;
}
rcu_read_unlock();
default:
break;
}
+
+end:
+ return;
}
static void increment_extended_len(const char *filter_expression,
health_code_update();
return ret;
}
+
+/*
+ * Ask the consumer the number of discarded events for a channel.
+ */
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *discarded)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer discarded events id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
+ msg.u.discarded_events.session_id = session_id;
+ msg.u.discarded_events.channel_key = channel_key;
+
+ *discarded = 0;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ uint64_t consumer_discarded = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_discarded,
+ sizeof(consumer_discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ *discarded += consumer_discarded;
+ }
+ ret = 0;
+ DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64,
+ *discarded, session_id);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Ask the consumer the number of lost packets for a channel.
+ */
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *lost)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer lost packets id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
+ msg.u.lost_packets.session_id = session_id;
+ msg.u.lost_packets.channel_key = channel_key;
+
+ *lost = 0;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ uint64_t consumer_lost = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_lost,
+ sizeof(consumer_lost));
+ if (ret < 0) {
+ ERR("get lost packets");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ *lost += consumer_lost;
+ }
+ ret = 0;
+ DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64,
+ *lost, session_id);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
uint64_t metadata_key, char *metadata_str, size_t len,
size_t target_offset);
int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *discarded);
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *lost);
/* Snapshot command. */
int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
struct lttng_ht_node_str node;
uint64_t tracefile_size;
uint64_t tracefile_count;
+ uint64_t per_pid_closed_app_discarded;
+ uint64_t per_pid_closed_app_lost;
};
/* UST domain global (LTTNG_DOMAIN_UST) */
#include "ust-consumer.h"
#include "ust-ctl.h"
#include "utils.h"
+#include "session.h"
static
int ust_app_flush_app_session(struct ust_app *app, struct ust_app_session *ua_sess);
free(ua_chan);
}
+/*
+ * Extract the lost packet or discarded events counter when the channel is
+ * being deleted and store the value in the parent channel so we can
+ * access it from lttng list and at stop/destroy.
+ */
+static
+void save_per_pid_lost_discarded_counters(struct ust_app_channel *ua_chan)
+{
+ uint64_t discarded = 0, lost = 0;
+ struct ltt_session *session;
+ struct ltt_ust_channel *uchan;
+
+ if (ua_chan->attr.type != LTTNG_UST_CHAN_PER_CPU) {
+ return;
+ }
+
+ rcu_read_lock();
+ session = session_find_by_id(ua_chan->session->tracing_id);
+ if (!session) {
+ ERR("Missing LTT session to get discarded events");
+ goto end;
+ }
+ if (!session->ust_session) {
+ ERR("Missing UST session to get discarded events");
+ goto end;
+ }
+
+ if (ua_chan->attr.overwrite) {
+ consumer_get_lost_packets(ua_chan->session->tracing_id,
+ ua_chan->key, session->ust_session->consumer,
+ &lost);
+ } else {
+ consumer_get_discarded_events(ua_chan->session->tracing_id,
+ ua_chan->key, session->ust_session->consumer,
+ &discarded);
+ }
+ uchan = trace_ust_find_channel_by_name(
+ session->ust_session->domain_global.channels,
+ ua_chan->name);
+ if (!uchan) {
+ ERR("Missing UST channel to store discarded counters");
+ goto end;
+ }
+
+ uchan->per_pid_closed_app_discarded += discarded;
+ uchan->per_pid_closed_app_lost += lost;
+
+end:
+ rcu_read_unlock();
+}
+
/*
* Delete ust app channel safely. RCU read lock must be held before calling
* this function.
if (registry) {
ust_registry_channel_del_free(registry, ua_chan->key);
}
+ save_per_pid_lost_discarded_counters(ua_chan);
}
if (ua_chan->obj != NULL) {
DBG2("Channel %s not found on shadow session copy, creating it",
uchan->name);
- ua_chan = alloc_ust_app_channel(uchan->name, ua_sess, &uchan->attr);
+ ua_chan = alloc_ust_app_channel(uchan->name, ua_sess,
+ &uchan->attr);
if (ua_chan == NULL) {
/* malloc failed FIXME: Might want to do handle ENOMEM .. */
continue;
return tot_size;
}
+
+int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
+ struct cds_list_head *buffer_reg_uid_list,
+ struct consumer_output *consumer, uint64_t uchan_id,
+ int overwrite, uint64_t *discarded, uint64_t *lost)
+{
+ int ret;
+ uint64_t consumer_chan_key;
+
+ ret = buffer_reg_uid_consumer_channel_key(
+ buffer_reg_uid_list, ust_session_id,
+ uchan_id, &consumer_chan_key);
+ if (ret < 0) {
+ goto end;
+ }
+
+ if (overwrite) {
+ ret = consumer_get_lost_packets(ust_session_id,
+ consumer_chan_key, consumer, lost);
+ } else {
+ ret = consumer_get_discarded_events(ust_session_id,
+ consumer_chan_key, consumer, discarded);
+ }
+
+end:
+ return ret;
+}
+
+int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan,
+ struct consumer_output *consumer, int overwrite,
+ uint64_t *discarded, uint64_t *lost)
+{
+ int ret = 0;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_str *ua_chan_node;
+ struct ust_app *app;
+ struct ust_app_session *ua_sess;
+ struct ust_app_channel *ua_chan;
+
+ rcu_read_lock();
+ /*
+ * Iterate over every registered applications, return when we
+ * found one in the right session and channel.
+ */
+ cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+ struct lttng_ht_iter uiter;
+
+ ua_sess = lookup_session_by_app(usess, app);
+ if (ua_sess == NULL) {
+ continue;
+ }
+
+ /* Get channel */
+ lttng_ht_lookup(ua_sess->channels, (void *)uchan->name, &uiter);
+ ua_chan_node = lttng_ht_iter_get_node_str(&uiter);
+ /* If the session is found for the app, the channel must be there */
+ assert(ua_chan_node);
+
+ ua_chan = caa_container_of(ua_chan_node, struct ust_app_channel, node);
+
+ if (overwrite) {
+ ret = consumer_get_lost_packets(usess->id, ua_chan->key,
+ consumer, lost);
+ goto end;
+ } else {
+ ret = consumer_get_discarded_events(usess->id,
+ ua_chan->key, consumer, discarded);
+ goto end;
+ }
+ goto end;
+ }
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
uint64_t ust_app_get_size_one_more_packet_per_stream(
struct ltt_ust_session *usess, uint64_t cur_nr_packets);
struct ust_app *ust_app_find_by_sock(int sock);
+int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
+ struct cds_list_head *buffer_reg_uid_list,
+ struct consumer_output *consumer, uint64_t uchan_id,
+ int overwrite, uint64_t *discarded, uint64_t *lost);
+int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan,
+ struct consumer_output *consumer,
+ int overwrite, uint64_t *discarded, uint64_t *lost);
static inline
int ust_app_supported(void)
struct ltt_ust_session *usess, uint64_t cur_nr_packets) {
return 0;
}
+static inline
+int ust_app_uid_get_channel_runtime_stats(uint64_t ust_session_id,
+ struct cds_list_head *buffer_reg_uid_list,
+ struct consumer_output *consumer, int overwrite,
+ uint64_t uchan_id, uint64_t *discarded, uint64_t *lost)
+{
+ return 0;
+}
+
+static inline
+int ust_app_pid_get_channel_runtime_stats(struct ltt_ust_session *usess,
+ struct ltt_ust_channel *uchan,
+ struct consumer_output *consumer,
+ int overwrite, uint64_t *discarded, uint64_t *lost)
+{
+ return 0;
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
*/
static void print_channel(struct lttng_channel *channel)
{
+ int ret;
+ uint64_t discarded_events, lost_packets;
+
+ ret = lttng_channel_get_discarded_event_count(channel,
+ &discarded_events);
+ if (ret) {
+ ERR("Failed to retrieve discarded event count of channel");
+ return;
+ }
+
+ ret = lttng_channel_get_lost_packet_count(channel,
+ &lost_packets);
+ if (ret) {
+ ERR("Failed to retrieve lost packet count of channel");
+ return;
+ }
+
MSG("- %s:%s\n", channel->name, enabled_string(channel->enabled));
MSG("%sAttributes:", indent4);
MSG("%sread timer interval: %u", indent6, channel->attr.read_timer_interval);
MSG("%strace file count: %" PRIu64, indent6, channel->attr.tracefile_count);
MSG("%strace file size (bytes): %" PRIu64, indent6, channel->attr.tracefile_size);
+ MSG("%sdiscarded events: %" PRIu64, indent6, discarded_events);
+ MSG("%slost packets: %" PRIu64, indent6, lost_packets);
switch (channel->attr.output) {
case LTTNG_EVENT_SPLICE:
MSG("%soutput: splice()", indent6);
extern const char * const config_element_tracefile_size;
extern const char * const config_element_tracefile_count;
extern const char * const config_element_live_timer_interval;
+extern const char * const config_element_discarded_events;
+extern const char * const config_element_lost_packets;
extern const char * const config_element_type;
extern const char * const config_element_buffer_type;
extern const char * const config_element_session;
const char * const config_element_tracefile_size = "tracefile_size";
const char * const config_element_tracefile_count = "tracefile_count";
const char * const config_element_live_timer_interval = "live_timer_interval";
+const char * const config_element_discarded_events = "discarded_events";
+const char * const config_element_lost_packets = "lost_packets";
const char * const config_element_type = "type";
const char * const config_element_buffer_type = "buffer_type";
const char * const config_element_session = "session";
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
stream->index_fd = -1;
+ stream->last_sequence_number = -1ULL;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
LTTNG_CONSUMER_SNAPSHOT_CHANNEL,
LTTNG_CONSUMER_SNAPSHOT_METADATA,
LTTNG_CONSUMER_STREAMS_SENT,
+ LTTNG_CONSUMER_DISCARDED_EVENTS,
+ LTTNG_CONSUMER_LOST_PACKETS,
};
/* State of each fd in consumer */
int nr_stream_fds;
char root_shm_path[PATH_MAX];
char shm_path[PATH_MAX];
+ /* Total number of discarded events for that channel. */
+ uint64_t discarded_events;
+ /* Total number of missed packets due to overwriting (overwrite). */
+ uint64_t lost_packets;
};
/*
* to the channel.
*/
uint64_t ust_metadata_pushed;
+ /*
+ * Copy of the last discarded event value to detect the overflow of
+ * the counter.
+ */
+ uint64_t last_discarded_events;
+ /* Copy of the sequence number of the last packet extracted. */
+ uint64_t last_sequence_number;
/*
* FD of the index file for this stream.
*/
goto end_nosignal;
}
+ case LTTNG_CONSUMER_DISCARDED_EVENTS:
+ {
+ uint64_t ret;
+ struct lttng_consumer_channel *channel;
+ uint64_t id = msg.u.discarded_events.session_id;
+ uint64_t key = msg.u.discarded_events.channel_key;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer discarded events channel %"
+ PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ DBG("Kernel consumer discarded events command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ ret = channel->discarded_events;
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ uint64_t ret;
+ struct lttng_consumer_channel *channel;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ ERR("Kernel consumer lost packets channel %"
+ PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ DBG("Kernel consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ ret = channel->lost_packets;
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
default:
goto end_nosignal;
}
return ret;
}
+static
+int update_stream_stats(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ uint64_t seq, discarded;
+
+ ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
+ if (ret < 0) {
+ PERROR("kernctl_get_sequence_number");
+ goto end;
+ }
+
+ /*
+ * Start the sequence when we extract the first packet in case we don't
+ * start at 0 (for example if a consumer is not connected to the
+ * session immediately after the beginning).
+ */
+ if (stream->last_sequence_number == -1ULL) {
+ stream->last_sequence_number = seq;
+ } else if (seq > stream->last_sequence_number) {
+ stream->chan->lost_packets += seq -
+ stream->last_sequence_number - 1;
+ } else {
+ /* seq <= last_sequence_number */
+ ERR("Sequence number inconsistent : prev = %" PRIu64
+ ", current = %" PRIu64,
+ stream->last_sequence_number, seq);
+ ret = -1;
+ goto end;
+ }
+ stream->last_sequence_number = seq;
+
+ ret = kernctl_get_events_discarded(stream->wait_fd, &discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
+ if (discarded < stream->last_discarded_events) {
+ /*
+ * Overflow has occured. We assume only one wrap-around
+ * has occured.
+ */
+ stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
+ stream->last_discarded_events + discarded;
+ } else {
+ stream->chan->discarded_events += discarded -
+ stream->last_discarded_events;
+ }
+ stream->last_discarded_events = discarded;
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Consume data on a file descriptor and write it on a trace file.
*/
}
goto end;
}
+ ret = update_stream_stats(stream);
+ if (ret < 0) {
+ goto end;
+ }
} else {
write_index = 0;
}
{
return ioctl(fd, LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP, ts);
}
+
+/* Returns the packet sequence number of the current sub-buffer. */
+int kernctl_get_sequence_number(int fd, uint64_t *seq)
+{
+ return ioctl(fd, LTTNG_RING_BUFFER_GET_SEQ_NUM, seq);
+}
int kernctl_get_packet_size(int fd, uint64_t *packet_size);
int kernctl_get_stream_id(int fd, uint64_t *stream_id);
int kernctl_get_current_timestamp(int fd, uint64_t *ts);
+int kernctl_get_sequence_number(int fd, uint64_t *seq);
#endif /* _LTTNG_KERNEL_CTL_H */
#define LTTNG_RING_BUFFER_GET_STREAM_ID _IOR(0xF6, 0x25, uint64_t)
/* returns the current timestamp */
#define LTTNG_RING_BUFFER_GET_CURRENT_TIMESTAMP _IOR(0xF6, 0x26, uint64_t)
+/* returns the packet sequence number */
+#define LTTNG_RING_BUFFER_GET_SEQ_NUM _IOR(0xF6, 0x27, uint64_t)
/* Old ABI (without support for 32/64 bits compat) */
/* LTTng file descriptor ioctl */
<xs:element name="tracefile_size" type="tns:uint64_type" default="0" minOccurs="0" /> <!-- bytes -->
<xs:element name="tracefile_count" type="tns:uint64_type" default="0" minOccurs="0" />
<xs:element name="live_timer_interval" type="tns:uint32_type" default="0" minOccurs="0" /> <!-- usec -->
+ <xs:element name="discarded_events" type="tns:uint64_type" default="0" minOccurs="0" />
+ <xs:element name="lost_packets" type="tns:uint64_type" default="0" minOccurs="0" />
</xs:all>
</xs:complexType>
#include <common/config/session-config.h>
#include <common/defaults.h>
#include <lttng/snapshot-internal.h>
+#include <lttng/channel.h>
#include "mi-lttng.h"
#include <assert.h>
struct lttng_channel_attr *attr)
{
int ret = 0;
+ struct lttng_channel *chan = caa_container_of(attr,
+ struct lttng_channel, attr);
+ uint64_t discarded_events, lost_packets;
assert(attr);
+ ret = lttng_channel_get_discarded_event_count(chan, &discarded_events);
+ if (ret) {
+ goto end;
+ }
+
+ ret = lttng_channel_get_lost_packet_count(chan, &lost_packets);
+ if (ret) {
+ goto end;
+ }
+
/* Opening Attributes */
ret = mi_lttng_writer_open_element(writer, config_element_attributes);
if (ret) {
goto end;
}
+ /* Discarded events */
+ ret = mi_lttng_writer_write_element_unsigned_int(writer,
+ config_element_discarded_events,
+ discarded_events);
+ if (ret) {
+ goto end;
+ }
+
+ /* Lost packets */
+ ret = mi_lttng_writer_write_element_unsigned_int(writer,
+ config_element_lost_packets,
+ lost_packets);
+ if (ret) {
+ goto end;
+ }
+
/* Closing attributes */
ret = mi_lttng_writer_close_element(writer);
if (ret) {
uint64_t channel_key;
uint64_t net_seq_idx;
} LTTNG_PACKED sent_streams;
+ struct {
+ uint64_t session_id;
+ uint64_t channel_key;
+ } LTTNG_PACKED discarded_events;
+ struct {
+ uint64_t session_id;
+ uint64_t channel_key;
+ } LTTNG_PACKED lost_packets;
} u;
} LTTNG_PACKED;
health_code_update();
break;
}
+ case LTTNG_CONSUMER_DISCARDED_EVENTS:
+ {
+ uint64_t ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.discarded_events.session_id;
+ uint64_t key = msg.u.discarded_events.channel_key;
+
+ DBG("UST consumer discarded events command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no events are dropped if the channel is not yet in
+ * use).
+ */
+ ret = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ ret = stream->chan->discarded_events;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer discarded events command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send discarded events");
+ goto error_fatal;
+ }
+
+ break;
+ }
+ case LTTNG_CONSUMER_LOST_PACKETS:
+ {
+ uint64_t ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ uint64_t id = msg.u.lost_packets.session_id;
+ uint64_t key = msg.u.lost_packets.channel_key;
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64, id);
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ ht = consumer_data.stream_list_ht;
+
+ /*
+ * We only need a reference to the channel, but they are not
+ * directly indexed, so we just use the first matching stream
+ * to extract the information we need, we default to 0 if not
+ * found (no packets lost if the channel is not yet in use).
+ */
+ ret = 0;
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&id, lttng_ht_seed),
+ ht->match_fct, &id,
+ &iter.iter, stream, node_session_id.node) {
+ if (stream->chan->key == key) {
+ ret = stream->chan->lost_packets;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+
+ DBG("UST consumer lost packets command for session id %"
+ PRIu64 ", channel key %" PRIu64, id, key);
+
+ health_code_update();
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ if (ret < 0) {
+ PERROR("send lost packets");
+ goto error_fatal;
+ }
+
+ break;
+ }
default:
break;
}
return ustctl_get_current_timestamp(stream->ustream, ts);
}
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+ assert(stream);
+ assert(stream->ustream);
+ assert(seq);
+
+ return ustctl_get_sequence_number(stream->ustream, seq);
+}
+
/*
* Called when the stream signal the consumer that it has hang up.
*/
return ret;
}
+static
+int update_stream_stats(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ uint64_t seq, discarded;
+
+ ret = ustctl_get_sequence_number(stream->ustream, &seq);
+ if (ret < 0) {
+ PERROR("ustctl_get_sequence_number");
+ goto end;
+ }
+ /*
+ * Start the sequence when we extract the first packet in case we don't
+ * start at 0 (for example if a consumer is not connected to the
+ * session immediately after the beginning).
+ */
+ if (stream->last_sequence_number == -1ULL) {
+ stream->last_sequence_number = seq;
+ } else if (seq > stream->last_sequence_number) {
+ stream->chan->lost_packets += seq -
+ stream->last_sequence_number - 1;
+ } else {
+ /* seq <= last_sequence_number */
+ ERR("Sequence number inconsistent : prev = %" PRIu64
+ ", current = %" PRIu64,
+ stream->last_sequence_number, seq);
+ ret = -1;
+ goto end;
+ }
+ stream->last_sequence_number = seq;
+
+ ret = ustctl_get_events_discarded(stream->ustream, &discarded);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
+ if (discarded < stream->last_discarded_events) {
+ /*
+ * Overflow has occured. We assume only one wrap-around
+ * has occured.
+ */
+ stream->chan->discarded_events +=
+ (1ULL << (CAA_BITS_PER_LONG - 1)) -
+ stream->last_discarded_events + discarded;
+ } else {
+ stream->chan->discarded_events += discarded -
+ stream->last_discarded_events;
+ }
+ stream->last_discarded_events = discarded;
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* Read subbuffer from the given stream.
*
if (ret < 0) {
goto end;
}
+
+ /* Update the stream's sequence and discarded events count. */
+ ret = update_stream_stats(stream);
+ if (ret < 0) {
+ PERROR("kernctl_get_events_discarded");
+ goto end;
+ }
} else {
write_index = 0;
}
int producer);
int lttng_ustconsumer_get_current_timestamp(
struct lttng_consumer_stream *stream, uint64_t *ts);
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq);
#else /* HAVE_LIBLTTNG_UST_CTL */
{
return -ENOSYS;
}
+int lttng_ustconsumer_get_sequence_number(
+ struct lttng_consumer_stream *stream, uint64_t *seq)
+{
+ return -ENOSYS;
+}
static inline
int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
uint64_t *stream_id)