LTTNG_ERR_KERN_CONSUMER_FAIL = 48, /* Kernel consumer start failed */
LTTNG_ERR_KERN_STREAM_FAIL = 49, /* Kernel create stream failed */
LTTNG_ERR_START_SESSION_ONCE = 50, /* Session needs to be started once. */
- /* 51 */
+ LTTNG_ERR_SNAPSHOT_FAIL = 51, /* Snapshot record failed. */
/* 52 */
LTTNG_ERR_KERN_LIST_FAIL = 53, /* Kernel listing events failed */
LTTNG_ERR_UST_CALIBRATE_FAIL = 54, /* UST calibration failed */
pthread_mutex_lock(&channel->stream_list_lock);
cds_list_add_tail(&stream->lnode, &channel->streams);
+ channel->stream_count++;
pthread_mutex_unlock(&channel->stream_list_lock);
}
/* Wipe stream */
cds_list_for_each_entry_safe(sreg, stmp, ®p->streams, lnode) {
cds_list_del(&sreg->lnode);
+ regp->stream_count--;
buffer_reg_stream_destroy(sreg, domain);
}
uint64_t consumer_key;
/* Stream registry object of this channel registry. */
struct cds_list_head streams;
+ /* Total number of stream in the list. */
+ uint64_t stream_count;
/* Used to ensure mutual exclusion to the stream's list. */
pthread_mutex_t stream_list_lock;
/* Node for hash table usage. */
* Return 0 on success or else a negative value.
*/
static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
- struct snapshot_output *output, struct ltt_session *session, int wait)
+ struct snapshot_output *output, struct ltt_session *session,
+ int wait, int nb_streams)
{
int ret;
goto error_snapshot;
}
- ret = kernel_snapshot_record(ksess, output, wait);
+ ret = kernel_snapshot_record(ksess, output, wait, nb_streams);
if (ret < 0) {
+ ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+ if (ret == -EINVAL) {
+ ret = -LTTNG_ERR_INVALID;
+ }
goto error_snapshot;
}
* Return 0 on success or else a negative value.
*/
static int record_ust_snapshot(struct ltt_ust_session *usess,
- struct snapshot_output *output, struct ltt_session *session, int wait)
+ struct snapshot_output *output, struct ltt_session *session,
+ int wait, int nb_streams)
{
int ret;
goto error_snapshot;
}
- ret = ust_app_snapshot_record(usess, output, wait);
+ ret = ust_app_snapshot_record(usess, output, wait, nb_streams);
if (ret < 0) {
+ ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+ if (ret == -EINVAL) {
+ ret = -LTTNG_ERR_INVALID;
+ }
goto error_snapshot;
}
return ret;
}
+/*
+ * Returns the total number of streams for a session or a negative value
+ * on error.
+ */
+static unsigned int get_total_nb_stream(struct ltt_session *session)
+{
+ unsigned int total_streams = 0;
+
+ if (session->kernel_session) {
+ struct ltt_kernel_session *ksess = session->kernel_session;
+
+ total_streams += ksess->stream_count_global;
+ }
+
+ if (session->ust_session) {
+ struct ltt_ust_session *usess = session->ust_session;
+
+ total_streams += ust_app_get_nb_stream(usess);
+ }
+
+ return total_streams;
+}
+
/*
* Command LTTNG_SNAPSHOT_RECORD from lib lttng ctl.
*
{
int ret = LTTNG_OK;
struct snapshot_output *tmp_sout = NULL;
+ unsigned int nb_streams;
assert(session);
}
}
+ /*
+ * Get the total number of stream of that session which is used by the
+ * maximum size of the snapshot feature.
+ */
+ nb_streams = get_total_nb_stream(session);
+
if (session->kernel_session) {
struct ltt_kernel_session *ksess = session->kernel_session;
if (tmp_sout) {
- ret = record_kernel_snapshot(ksess, tmp_sout, session, wait);
+ ret = record_kernel_snapshot(ksess, tmp_sout, session,
+ wait, nb_streams);
if (ret < 0) {
goto error;
}
rcu_read_lock();
cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
&iter.iter, sout, node.node) {
- ret = record_kernel_snapshot(ksess, sout, session, wait);
+ ret = record_kernel_snapshot(ksess, sout,
+ session, wait, nb_streams);
if (ret < 0) {
rcu_read_unlock();
goto error;
struct ltt_ust_session *usess = session->ust_session;
if (tmp_sout) {
- ret = record_ust_snapshot(usess, tmp_sout, session, wait);
+ ret = record_ust_snapshot(usess, tmp_sout, session,
+ wait, nb_streams);
if (ret < 0) {
goto error;
}
rcu_read_lock();
cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
&iter.iter, sout, node.node) {
- ret = record_ust_snapshot(usess, sout, session, wait);
+ ret = record_ust_snapshot(usess, sout, session,
+ wait, nb_streams);
if (ret < 0) {
rcu_read_unlock();
goto error;
*/
int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
- const char *session_path, int wait)
+ const char *session_path, int wait, int max_stream_size)
{
int ret;
struct lttcomm_consumer_msg msg;
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
msg.u.snapshot_channel.key = key;
- msg.u.snapshot_channel.max_size = output->max_size;
+ msg.u.snapshot_channel.max_stream_size = max_stream_size;
msg.u.snapshot_channel.metadata = metadata;
if (output->consumer->type == CONSUMER_DST_NET) {
/* Snapshot command. */
int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
- const char *session_path, int wait);
+ const char *session_path, int wait, int max_size_per_stream);
#endif /* _CONSUMER_H */
* Return 0 on success or else a negative value.
*/
int kernel_snapshot_record(struct ltt_kernel_session *ksess,
- struct snapshot_output *output, int wait)
+ struct snapshot_output *output, int wait, unsigned int nb_streams)
{
int ret, saved_metadata_fd;
struct consumer_socket *socket;
struct lttng_ht_iter iter;
struct ltt_kernel_metadata *saved_metadata;
+ uint64_t max_size_per_stream = 0;
assert(ksess);
assert(ksess->consumer);
goto error_open_stream;
}
+ if (output->max_size > 0 && nb_streams > 0) {
+ max_size_per_stream = output->max_size / nb_streams;
+ }
+
/* Send metadata to consumer and snapshot everything. */
cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter,
socket, node.node) {
/* For each channel, ask the consumer to snapshot it. */
cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
+ if (max_size_per_stream &&
+ chan->channel->attr.subbuf_size > max_size_per_stream) {
+ ret = LTTNG_ERR_INVALID;
+ DBG3("Kernel snapshot record maximum stream size %" PRIu64
+ " is smaller than subbuffer size of %" PRIu64,
+ max_size_per_stream, chan->channel->attr.subbuf_size);
+ goto error_consumer;
+ }
+
pthread_mutex_lock(socket->lock);
ret = consumer_snapshot_channel(socket, chan->fd, output, 0,
- ksess->uid, ksess->gid, DEFAULT_KERNEL_TRACE_DIR, wait);
+ ksess->uid, ksess->gid,
+ DEFAULT_KERNEL_TRACE_DIR, wait,
+ max_size_per_stream);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
/* Snapshot metadata, */
pthread_mutex_lock(socket->lock);
ret = consumer_snapshot_channel(socket, ksess->metadata->fd, output,
- 1, ksess->uid, ksess->gid, DEFAULT_KERNEL_TRACE_DIR, wait);
+ 1, ksess->uid, ksess->gid,
+ DEFAULT_KERNEL_TRACE_DIR, wait, max_size_per_stream);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
void kernel_destroy_session(struct ltt_kernel_session *ksess);
void kernel_destroy_channel(struct ltt_kernel_channel *kchan);
int kernel_snapshot_record(struct ltt_kernel_session *ksess,
- struct snapshot_output *output, int wait);
+ struct snapshot_output *output, int wait, unsigned int nb_streams);
int init_kernel_workarounds(void);
if (ret < 0) {
goto error;
}
+ /* Update the stream global counter */
+ ksess->stream_count_global += ret;
/*
* Have we already sent fds to the consumer? If yes, it means
* Return 0 on success or else a negative value.
*/
int ust_app_snapshot_record(struct ltt_ust_session *usess,
- struct snapshot_output *output, int wait)
+ struct snapshot_output *output, int wait, unsigned int nb_streams)
{
int ret = 0;
struct lttng_ht_iter iter;
struct ust_app *app;
char pathname[PATH_MAX];
+ uint64_t max_stream_size = 0;
assert(usess);
assert(output);
rcu_read_lock();
+ /*
+ * Compute the maximum size of a single stream if a max size is asked by
+ * the caller.
+ */
+ if (output->max_size > 0 && nb_streams > 0) {
+ max_stream_size = output->max_size / nb_streams;
+ }
+
cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
struct consumer_socket *socket;
struct lttng_ht_iter chan_iter;
cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
ua_chan, node.node) {
+ /*
+ * Make sure the maximum stream size is not lower than the
+ * subbuffer size or else it's an error since we won't be able to
+ * snapshot anything.
+ */
+ if (ua_chan->attr.subbuf_size > max_stream_size) {
+ ret = -EINVAL;
+ DBG3("UST app snapshot record maximum stream size %" PRIu64
+ " is smaller than subbuffer size of %" PRIu64,
+ max_stream_size, ua_chan->attr.subbuf_size);
+ goto error;
+ }
+
ret = consumer_snapshot_channel(socket, ua_chan->key, output, 0,
- ua_sess->euid, ua_sess->egid, pathname, wait);
+ ua_sess->euid, ua_sess->egid, pathname, wait,
+ max_stream_size);
if (ret < 0) {
goto error;
}
registry = get_session_registry(ua_sess);
assert(registry);
ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
- 1, ua_sess->euid, ua_sess->egid, pathname, wait);
+ 1, ua_sess->euid, ua_sess->egid, pathname, wait,
+ max_stream_size);
if (ret < 0) {
goto error;
}
rcu_read_unlock();
return ret;
}
+
+/*
+ * Return the number of streams for a UST session.
+ */
+unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
+{
+ unsigned int ret = 0;
+ struct ust_app *app;
+ struct lttng_ht_iter iter;
+
+ assert(usess);
+
+ switch (usess->buffer_type) {
+ case LTTNG_BUFFER_PER_UID:
+ {
+ struct buffer_reg_uid *reg;
+
+ cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+ struct buffer_reg_channel *reg_chan;
+
+ cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+ reg_chan, node.node) {
+ ret += reg_chan->stream_count;
+ }
+ }
+ break;
+ }
+ case LTTNG_BUFFER_PER_PID:
+ {
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+ struct ust_app_channel *ua_chan;
+ struct ust_app_session *ua_sess;
+ struct lttng_ht_iter chan_iter;
+
+ ua_sess = lookup_session_by_app(usess, app);
+ if (!ua_sess) {
+ /* Session not associated with this app. */
+ continue;
+ }
+
+ cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
+ ua_chan, node.node) {
+ ret += ua_chan->streams.count;
+ }
+ }
+ rcu_read_unlock();
+ break;
+ }
+ default:
+ assert(0);
+ break;
+ }
+
+ return ret;
+}
struct consumer_socket *socket, int send_zero_data);
void ust_app_destroy(struct ust_app *app);
int ust_app_snapshot_record(struct ltt_ust_session *usess,
- struct snapshot_output *output, int wait);
+ struct snapshot_output *output, int wait, unsigned int nb_streams);
+unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess);
#else /* HAVE_LIBLTTNG_UST_CTL */
}
static inline
int ust_app_snapshot_record(struct ltt_ust_session *usess,
- struct snapshot_output *output, int wait)
+ struct snapshot_output *output, int wait, unsigned int nb_stream)
+{
+ return 0;
+}
+static inline
+unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
{
return 0;
}
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
}
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size)
+{
+ if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+ /* Offset from the produced position to get the latest buffers. */
+ return produced_pos - max_stream_size;
+ }
+
+ return consumed_pos;
+}
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
uint64_t key);
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size);
#endif /* LIB_CONSUMER_H */
[ ERROR_INDEX(LTTNG_ERR_NOMEM)] = "Not enough memory",
[ ERROR_INDEX(LTTNG_ERR_SNAPSHOT_OUTPUT_EXIST) ] = "Snapshot output already exists",
[ ERROR_INDEX(LTTNG_ERR_START_SESSION_ONCE) ] = "Session needs to be started once",
+ [ ERROR_INDEX(LTTNG_ERR_SNAPSHOT_FAIL) ] = "Snapshot record failed",
/* Last element */
[ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
* Returns 0 on success, < 0 on error
*/
int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
- uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+ uint64_t relayd_id, uint64_t max_stream_size,
+ struct lttng_consumer_local_data *ctx)
{
int ret;
unsigned long consumed_pos, produced_pos;
}
}
+ /*
+ * The original value is sent back if max stream size is larger than
+ * the possible size of the snapshot. Also, we asume that the session
+ * daemon should never send a maximum stream size that is lower than
+ * subbuffer size.
+ */
+ consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+ produced_pos, max_stream_size);
+
while (consumed_pos < produced_pos) {
ssize_t read_len;
unsigned long len, padded_len;
} else {
ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
msg.u.snapshot_channel.pathname,
- msg.u.snapshot_channel.relayd_id, ctx);
+ msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.max_stream_size,
+ ctx);
if (ret < 0) {
ERR("Snapshot channel failed");
ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
uint32_t metadata; /* This a metadata snapshot. */
uint64_t relayd_id; /* Relayd id if apply. */
uint64_t key;
- uint64_t max_size;
+ uint64_t max_stream_size;
} LTTNG_PACKED snapshot_channel;
} u;
} LTTNG_PACKED;
* Returns 0 on success, < 0 on error
*/
static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
- struct lttng_consumer_local_data *ctx)
+ uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
{
int ret;
unsigned use_relayd = 0;
goto error_unlock;
}
+ /*
+ * The original value is sent back if max stream size is larger than
+ * the possible size of the snapshot. Also, we asume that the session
+ * daemon should never send a maximum stream size that is lower than
+ * subbuffer size.
+ */
+ consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+ produced_pos, max_stream_size);
+
while (consumed_pos < produced_pos) {
ssize_t read_len;
unsigned long len, padded_len;
ret = snapshot_channel(msg.u.snapshot_channel.key,
msg.u.snapshot_channel.pathname,
msg.u.snapshot_channel.relayd_id,
+ msg.u.snapshot_channel.max_stream_size,
ctx);
if (ret < 0) {
ERR("Snapshot channel failed");