Fix: grab more than one packet for snapshots
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Thu, 15 Jan 2015 22:24:27 +0000 (17:24 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 22 Jan 2015 05:00:38 +0000 (00:00 -0500)
There are a few issues with snapshot size: when taking a snapshot
without specifying any "max size" (should be unlimited), only a single
packet from each stream is saved. We expect all stream available content
to be saved. There is a similar issue when a max size is specified.

Also, trying to make all streams save as much data has unexpected
corner-cases: for instance, if we have this configuration:
- kernel channels: 2 subbuffers of 1MB x 8 CPUs
- per-PID UST channels: 16 subbuffers of 4kB x 8 CPUs x 100 apps

would require the user to have a very large max size, since it would try
to fit (8 + (100 * 8)) * 1MB = 808MB of sub-buffers, else it would fail.
This issue here is using the largest subbuffer size as the criterion
applied to all channels.

We fix those issues by simplifying the algorithm used to calculate how
much data to grab. Rather than calculating the size to grab from each
stream, we calculate a number of packets to grab. It fails if we cannot
grab at least one packet from each stream in the session. Then checks if
it can grab 2 packets from each stream, and so on, until there is no
more space available (based on max size). This is not a perfect
solution, but has the merit of being simple to understand, and has no
(or few) unexpected corner-cases.

Fixes #860

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Conflicts:
src/bin/lttng-sessiond/kernel.h

Conflicts:
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/kernel.h
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/bin/lttng/commands/snapshot.c

19 files changed:
src/bin/lttng-sessiond/buffer-registry.h
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/kernel.h
src/bin/lttng-sessiond/snapshot.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/common/align.h [new file with mode: 0644]
src/common/bug.h [new file with mode: 0644]
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c
src/lib/lttng-ctl/filter/align.h [deleted file]
src/lib/lttng-ctl/filter/bug.h [deleted file]
src/lib/lttng-ctl/filter/filter-visitor-generate-bytecode.c

index c2099b6883c7ff70174431635b33080eb3e4d4b2..656fc9175b7d1bd6415ed3a2fd63ad9cc8def4f4 100644 (file)
@@ -51,6 +51,8 @@ struct buffer_reg_channel {
        struct lttng_ht_node_u64 node;
        /* Size of subbuffers in this channel. */
        size_t subbuf_size;
+       /* Number of subbuffers per stream. */
+       size_t num_subbuf;
        union {
                /* Original object data that MUST be copied over. */
                struct lttng_ust_object_data *ust;
index 26e8d152c2d02c9558f200f78a8af3da2c614033..be14b6bd333cce161ebe6b3c0fad8a807cd1d18c 100644 (file)
@@ -2829,7 +2829,7 @@ error:
  */
 static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
                struct snapshot_output *output, struct ltt_session *session,
-               int wait, int nb_streams)
+               int wait, uint64_t nb_packets_per_stream)
 {
        int ret;
 
@@ -2860,7 +2860,7 @@ static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
                goto error_snapshot;
        }
 
-       ret = kernel_snapshot_record(ksess, output, wait, nb_streams);
+       ret = kernel_snapshot_record(ksess, output, wait, nb_packets_per_stream);
        if (ret != LTTNG_OK) {
                goto error_snapshot;
        }
@@ -2883,7 +2883,7 @@ end:
  */
 static int record_ust_snapshot(struct ltt_ust_session *usess,
                struct snapshot_output *output, struct ltt_session *session,
-               int wait, int nb_streams)
+               int wait, uint64_t nb_packets_per_stream)
 {
        int ret;
 
@@ -2914,7 +2914,7 @@ static int record_ust_snapshot(struct ltt_ust_session *usess,
                goto error_snapshot;
        }
 
-       ret = ust_app_snapshot_record(usess, output, wait, nb_streams);
+       ret = ust_app_snapshot_record(usess, output, wait, nb_packets_per_stream);
        if (ret < 0) {
                switch (-ret) {
                case EINVAL:
@@ -2939,27 +2939,90 @@ error:
        return ret;
 }
 
-/*
- * Returns the total number of streams for a session or a negative value
- * on error.
- */
-static unsigned int get_total_nb_stream(struct ltt_session *session)
+static
+uint64_t get_session_size_one_more_packet_per_stream(struct ltt_session *session,
+       uint64_t cur_nr_packets)
 {
-       unsigned int total_streams = 0;
+       uint64_t tot_size = 0;
 
        if (session->kernel_session) {
+               struct ltt_kernel_channel *chan;
                struct ltt_kernel_session *ksess = session->kernel_session;
 
-               total_streams += ksess->stream_count_global;
+               cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
+                       if (cur_nr_packets >= chan->channel->attr.num_subbuf) {
+                               /*
+                                * Don't take channel into account if we
+                                * already grab all its packets.
+                                */
+                               continue;
+                       }
+                       tot_size += chan->channel->attr.subbuf_size
+                               * chan->stream_count;
+               }
        }
 
        if (session->ust_session) {
                struct ltt_ust_session *usess = session->ust_session;
 
-               total_streams += ust_app_get_nb_stream(usess);
+               tot_size += ust_app_get_size_one_more_packet_per_stream(usess,
+                               cur_nr_packets);
        }
 
-       return total_streams;
+       return tot_size;
+}
+
+/*
+ * Calculate the number of packets we can grab from each stream that
+ * fits within the overall snapshot max size.
+ *
+ * Returns -1 on error, 0 means infinite number of packets, else > 0 is
+ * the number of packets per stream.
+ *
+ * TODO: this approach is not perfect: we consider the worse case
+ * (packet filling the sub-buffers) as an upper bound, but we could do
+ * better if we do this calculation while we actually grab the packet
+ * content: we would know how much padding we don't actually store into
+ * the file.
+ *
+ * This algorithm is currently bounded by the number of packets per
+ * stream.
+ *
+ * Since we call this algorithm before actually grabbing the data, it's
+ * an approximation: for instance, applications could appear/disappear
+ * in between this call and actually grabbing data.
+ */
+static
+int64_t get_session_nb_packets_per_stream(struct ltt_session *session, uint64_t max_size)
+{
+       int64_t size_left;
+       uint64_t cur_nb_packets = 0;
+
+       if (!max_size) {
+               return 0;       /* Infinite */
+       }
+
+       size_left = max_size;
+       for (;;) {
+               uint64_t one_more_packet_tot_size;
+
+               one_more_packet_tot_size = get_session_size_one_more_packet_per_stream(session,
+                                       cur_nb_packets);
+               if (!one_more_packet_tot_size) {
+                       /* We are already grabbing all packets. */
+                       break;
+               }
+               size_left -= one_more_packet_tot_size;
+               if (size_left < 0) {
+                       break;
+               }
+               cur_nb_packets++;
+       }
+       if (!cur_nb_packets) {
+               /* Not enough room to grab one packet of each stream, error. */
+               return -1;
+       }
+       return cur_nb_packets;
 }
 
 /*
@@ -2976,7 +3039,7 @@ int cmd_snapshot_record(struct ltt_session *session,
        int ret = LTTNG_OK;
        unsigned int use_tmp_output = 0;
        struct snapshot_output tmp_output;
-       unsigned int nb_streams, snapshot_success = 0;
+       unsigned int snapshot_success = 0;
 
        assert(session);
 
@@ -3015,18 +3078,20 @@ int cmd_snapshot_record(struct ltt_session *session,
                use_tmp_output = 1;
        }
 
-       /*
-        * Get the total number of stream of that session which is used by the
-        * maximum size of the snapshot feature.
-        */
-       nb_streams = get_total_nb_stream(session);
-
        if (session->kernel_session) {
                struct ltt_kernel_session *ksess = session->kernel_session;
 
                if (use_tmp_output) {
+                       int64_t nb_packets_per_stream;
+
+                       nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                       tmp_output.max_size);
+                       if (nb_packets_per_stream < 0) {
+                               ret = LTTNG_ERR_INVALID;
+                               goto error;
+                       }
                        ret = record_kernel_snapshot(ksess, &tmp_output, session,
-                                       wait, nb_streams);
+                                       wait, nb_packets_per_stream);
                        if (ret != LTTNG_OK) {
                                goto error;
                        }
@@ -3038,6 +3103,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
+                               int64_t nb_packets_per_stream;
+
                                /*
                                 * Make a local copy of the output and assign the possible
                                 * temporary value given by the caller.
@@ -3045,11 +3112,17 @@ int cmd_snapshot_record(struct ltt_session *session,
                                memset(&tmp_output, 0, sizeof(tmp_output));
                                memcpy(&tmp_output, sout, sizeof(tmp_output));
 
-                               /* Use temporary max size. */
                                if (output->max_size != (uint64_t) -1ULL) {
                                        tmp_output.max_size = output->max_size;
                                }
 
+                               nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                               tmp_output.max_size);
+                               if (nb_packets_per_stream < 0) {
+                                       ret = LTTNG_ERR_INVALID;
+                                       goto error;
+                               }
+
                                /* Use temporary name. */
                                if (*output->name != '\0') {
                                        strncpy(tmp_output.name, output->name,
@@ -3059,7 +3132,7 @@ int cmd_snapshot_record(struct ltt_session *session,
                                tmp_output.nb_snapshot = session->snapshot.nb_snapshot;
 
                                ret = record_kernel_snapshot(ksess, &tmp_output,
-                                               session, wait, nb_streams);
+                                               session, wait, nb_packets_per_stream);
                                if (ret != LTTNG_OK) {
                                        rcu_read_unlock();
                                        goto error;
@@ -3074,8 +3147,16 @@ int cmd_snapshot_record(struct ltt_session *session,
                struct ltt_ust_session *usess = session->ust_session;
 
                if (use_tmp_output) {
+                       int64_t nb_packets_per_stream;
+
+                       nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                       tmp_output.max_size);
+                       if (nb_packets_per_stream < 0) {
+                               ret = LTTNG_ERR_INVALID;
+                               goto error;
+                       }
                        ret = record_ust_snapshot(usess, &tmp_output, session,
-                                       wait, nb_streams);
+                                       wait, nb_packets_per_stream);
                        if (ret != LTTNG_OK) {
                                goto error;
                        }
@@ -3087,6 +3168,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
+                               int64_t nb_packets_per_stream;
+
                                /*
                                 * Make a local copy of the output and assign the possible
                                 * temporary value given by the caller.
@@ -3094,11 +3177,18 @@ int cmd_snapshot_record(struct ltt_session *session,
                                memset(&tmp_output, 0, sizeof(tmp_output));
                                memcpy(&tmp_output, sout, sizeof(tmp_output));
 
-                               /* Use temporary max size. */
                                if (output->max_size != (uint64_t) -1ULL) {
                                        tmp_output.max_size = output->max_size;
                                }
 
+                               nb_packets_per_stream = get_session_nb_packets_per_stream(session,
+                                               tmp_output.max_size);
+                               if (nb_packets_per_stream < 0) {
+                                       ret = LTTNG_ERR_INVALID;
+                                       rcu_read_unlock();
+                                       goto error;
+                               }
+
                                /* Use temporary name. */
                                if (*output->name != '\0') {
                                        strncpy(tmp_output.name, output->name,
@@ -3108,7 +3198,7 @@ int cmd_snapshot_record(struct ltt_session *session,
                                tmp_output.nb_snapshot = session->snapshot.nb_snapshot;
 
                                ret = record_ust_snapshot(usess, &tmp_output, session,
-                                               wait, nb_streams);
+                                               wait, nb_packets_per_stream);
                                if (ret != LTTNG_OK) {
                                        rcu_read_unlock();
                                        goto error;
index ce3e5da9adda44d3481c4bb07785a30f76ac14f5..166d6c0440202d9798a6bbad8b656fb35bb9432b 100644 (file)
@@ -1276,7 +1276,7 @@ end:
  */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, int max_stream_size)
+               const char *session_path, int wait, uint64_t nb_packets_per_stream)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1290,7 +1290,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        memset(&msg, 0, sizeof(msg));
        msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
        msg.u.snapshot_channel.key = key;
-       msg.u.snapshot_channel.max_stream_size = max_stream_size;
+       msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
        msg.u.snapshot_channel.metadata = metadata;
 
        if (output->consumer->type == CONSUMER_DST_NET) {
index 3601ed9147c170ae3a5bb8a7180f4e3f8e44279b..a9266d5b939798882377bc2caeb0e67251464fa2 100644 (file)
@@ -279,6 +279,6 @@ int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
 /* Snapshot command. */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, int max_size_per_stream);
+               const char *session_path, int wait, uint64_t nb_packets_per_stream);
 
 #endif /* _CONSUMER_H */
index 7a1b7a98b3ce0d5451ad9d901a64142772d4d9d7..bac01a0359ab85d93990c0fcb6ec1676c7cd896b 100644 (file)
@@ -831,7 +831,8 @@ void kernel_destroy_channel(struct ltt_kernel_channel *kchan)
  * Return 0 on success or else return a LTTNG_ERR code.
  */
 int kernel_snapshot_record(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, int wait, unsigned int nb_streams)
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream)
 {
        int err, ret, saved_metadata_fd;
        struct consumer_socket *socket;
@@ -863,10 +864,6 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                goto error_open_stream;
        }
 
-       if (output->max_size > 0 && nb_streams > 0) {
-               max_size_per_stream = output->max_size / nb_streams;
-       }
-
        /* Send metadata to consumer and snapshot everything. */
        cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter,
                        socket, node.node) {
@@ -893,22 +890,11 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
 
                /* For each channel, ask the consumer to snapshot it. */
                cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
-                       if (max_size_per_stream &&
-                                       chan->channel->attr.subbuf_size > max_size_per_stream) {
-                               ret = LTTNG_ERR_INVALID;
-                               DBG3("Kernel snapshot record maximum stream size %" PRIu64
-                                               " is smaller than subbuffer size of %" PRIu64,
-                                               max_size_per_stream, chan->channel->attr.subbuf_size);
-                               (void) kernel_consumer_destroy_metadata(socket,
-                                               ksess->metadata);
-                               goto error_consumer;
-                       }
-
                        pthread_mutex_lock(socket->lock);
                        ret = consumer_snapshot_channel(socket, chan->fd, output, 0,
                                        ksess->uid, ksess->gid,
                                        DEFAULT_KERNEL_TRACE_DIR, wait,
-                                       max_size_per_stream);
+                                       nb_packets_per_stream);
                        pthread_mutex_unlock(socket->lock);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
@@ -922,7 +908,7 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                pthread_mutex_lock(socket->lock);
                ret = consumer_snapshot_channel(socket, ksess->metadata->fd, output,
                                1, ksess->uid, ksess->gid,
-                               DEFAULT_KERNEL_TRACE_DIR, wait, max_size_per_stream);
+                               DEFAULT_KERNEL_TRACE_DIR, wait, 0);
                pthread_mutex_unlock(socket->lock);
                if (ret < 0) {
                        ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
index 78d12aba3ddc0bd68bd0699181f9457eca898075..7d69c7d0992d84799389e9eacfda9ba261334be3 100644 (file)
@@ -55,8 +55,8 @@ int kernel_validate_version(int tracer_fd);
 void kernel_destroy_session(struct ltt_kernel_session *ksess);
 void kernel_destroy_channel(struct ltt_kernel_channel *kchan);
 int kernel_snapshot_record(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, int wait, unsigned int nb_streams);
-
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream);
 int init_kernel_workarounds(void);
 
 #endif /* _LTT_KERNEL_CTL_H */
index 8faf9a76a28f99f497c1b4542ed0acdfd5d5603f..e5660dac7b02f13bb59c49cabfdd93ba3952ccdf 100644 (file)
@@ -46,10 +46,11 @@ static int output_init(uint64_t max_size, const char *name,
 {
        int ret = 0, i;
 
-       assert(output);
-
        memset(output, 0, sizeof(struct snapshot_output));
 
+       /*
+        * max_size of -1ULL means unset. Set to default (unlimited).
+        */
        if (max_size == (uint64_t) -1ULL) {
                max_size = 0;
        }
index 4bf819015e5dd239961fbe664277ef6c63feee54..c9a5f52618089fdbf48d5ab7364141ecf99a699b 100644 (file)
@@ -2332,6 +2332,7 @@ static int create_buffer_reg_channel(struct buffer_reg_session *reg_sess,
        assert(reg_chan);
        reg_chan->consumer_key = ua_chan->key;
        reg_chan->subbuf_size = ua_chan->attr.subbuf_size;
+       reg_chan->num_subbuf = ua_chan->attr.num_subbuf;
 
        /* Create and add a channel registry to session. */
        ret = ust_registry_channel_add(reg_sess->reg.ust,
@@ -4979,7 +4980,8 @@ void ust_app_destroy(struct ust_app *app)
  * Return 0 on success or else a negative value.
  */
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait, unsigned int nb_streams)
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream)
 {
        int ret = 0;
        unsigned int snapshot_done = 0;
@@ -4993,14 +4995,6 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
 
        rcu_read_lock();
 
-       /*
-        * Compute the maximum size of a single stream if a max size is asked by
-        * the caller.
-        */
-       if (output->max_size > 0 && nb_streams > 0) {
-               max_stream_size = output->max_size / nb_streams;
-       }
-
        switch (usess->buffer_type) {
        case LTTNG_BUFFER_PER_UID:
        {
@@ -5030,30 +5024,16 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        /* Add the UST default trace dir to path. */
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-
-                               /*
-                                * Make sure the maximum stream size is not lower than the
-                                * subbuffer size or else it's an error since we won't be able to
-                                * snapshot anything.
-                                */
-                               if (max_stream_size &&
-                                               reg_chan->subbuf_size > max_stream_size) {
-                                       ret = -EINVAL;
-                                       DBG3("UST app snapshot record maximum stream size %" PRIu64
-                                                       " is smaller than subbuffer size of %zu",
-                                                       max_stream_size, reg_chan->subbuf_size);
-                                       goto error;
-                               }
-                               ret = consumer_snapshot_channel(socket, reg_chan->consumer_key, output, 0,
-                                               usess->uid, usess->gid, pathname, wait,
-                                               max_stream_size);
+                               ret = consumer_snapshot_channel(socket, reg_chan->consumer_key,
+                                               output, 0, usess->uid, usess->gid, pathname, wait,
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
                        }
-                       ret = consumer_snapshot_channel(socket, reg->registry->reg.ust->metadata_key, output,
-                                       1, usess->uid, usess->gid, pathname, wait,
-                                       max_stream_size);
+                       ret = consumer_snapshot_channel(socket,
+                                       reg->registry->reg.ust->metadata_key, output, 1,
+                                       usess->uid, usess->gid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5095,23 +5075,9 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
 
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               /*
-                                * Make sure the maximum stream size is not lower than the
-                                * subbuffer size or else it's an error since we won't be able to
-                                * snapshot anything.
-                                */
-                               if (max_stream_size &&
-                                               ua_chan->attr.subbuf_size > max_stream_size) {
-                                       ret = -EINVAL;
-                                       DBG3("UST app snapshot record maximum stream size %" PRIu64
-                                                       " is smaller than subbuffer size of %" PRIu64,
-                                                       max_stream_size, ua_chan->attr.subbuf_size);
-                                       goto error;
-                               }
-
-                               ret = consumer_snapshot_channel(socket, ua_chan->key, output, 0,
-                                               ua_sess->euid, ua_sess->egid, pathname, wait,
-                                               max_stream_size);
+                               ret = consumer_snapshot_channel(socket, ua_chan->key, output,
+                                               0, ua_sess->euid, ua_sess->egid, pathname, wait,
+                                               nb_packets_per_stream);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -5120,8 +5086,7 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        registry = get_session_registry(ua_sess);
                        assert(registry);
                        ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
-                                       1, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                       max_stream_size);
+                                       1, ua_sess->euid, ua_sess->egid, pathname, wait, 0);
                        if (ret < 0) {
                                goto error;
                        }
@@ -5149,11 +5114,12 @@ error:
 }
 
 /*
- * Return the number of streams for a UST session.
+ * Return the size taken by one more packet per stream.
  */
-unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
+uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *usess,
+               uint64_t cur_nr_packets)
 {
-       unsigned int ret = 0;
+       uint64_t tot_size = 0;
        struct ust_app *app;
        struct lttng_ht_iter iter;
 
@@ -5170,7 +5136,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                        rcu_read_lock();
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-                               ret += reg_chan->stream_count;
+                               if (cur_nr_packets >= reg_chan->num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += reg_chan->subbuf_size * reg_chan->stream_count;
                        }
                        rcu_read_unlock();
                }
@@ -5192,7 +5165,14 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
 
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               ret += ua_chan->streams.count;
+                               if (cur_nr_packets >= ua_chan->attr.num_subbuf) {
+                                       /*
+                                        * Don't take channel into account if we
+                                        * already grab all its packets.
+                                        */
+                                       continue;
+                               }
+                               tot_size += ua_chan->attr.subbuf_size * ua_chan->streams.count;
                        }
                }
                rcu_read_unlock();
@@ -5203,5 +5183,5 @@ unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
                break;
        }
 
-       return ret;
+       return tot_size;
 }
index 2fd3b56d936f764e46c31c3ba0e19ba53480dbb2..e943b4d21bce1bba3bc52f003874959af24595b4 100644 (file)
@@ -325,8 +325,10 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                struct consumer_socket *socket, int send_zero_data);
 void ust_app_destroy(struct ust_app *app);
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait, unsigned int nb_streams);
-unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess);
+               struct snapshot_output *output, int wait,
+               uint64_t nb_packets_per_stream);
+uint64_t ust_app_get_size_one_more_packet_per_stream(struct ltt_ust_session *usess,
+               uint64_t cur_nr_packets);
 struct ust_app *ust_app_find_by_sock(int sock);
 
 static inline
diff --git a/src/common/align.h b/src/common/align.h
new file mode 100644 (file)
index 0000000..928c5b6
--- /dev/null
@@ -0,0 +1,64 @@
+#ifndef _LTTNG_ALIGN_H
+#define _LTTNG_ALIGN_H
+
+/*
+ * align.h
+ *
+ * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ */
+
+#include "bug.h"
+#include <unistd.h>
+#include <limits.h>
+
+#ifndef PAGE_SIZE      /* Cygwin limits.h defines its own PAGE_SIZE */
+#define PAGE_SIZE              sysconf(_SC_PAGE_SIZE)
+#endif
+
+#define PAGE_MASK              (~(PAGE_SIZE - 1))
+#define __ALIGN_MASK(v, mask)  (((v) + (mask)) & ~(mask))
+#define ALIGN(v, align)                __ALIGN_MASK(v, (__typeof__(v)) (align) - 1)
+#define PAGE_ALIGN(addr)       ALIGN(addr, PAGE_SIZE)
+
+/**
+ * offset_align - Calculate the offset needed to align an object on its natural
+ *                alignment towards higher addresses.
+ * @align_drift:  object offset from an "alignment"-aligned address.
+ * @alignment:    natural object alignment. Must be non-zero, power of 2.
+ *
+ * Returns the offset that must be added to align towards higher
+ * addresses.
+ */
+#define offset_align(align_drift, alignment)                                  \
+       ({                                                                     \
+               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
+                                  || ((alignment) & ((alignment) - 1)));      \
+               (((alignment) - (align_drift)) & ((alignment) - 1));           \
+       })
+
+/**
+ * offset_align_floor - Calculate the offset needed to align an object
+ *                      on its natural alignment towards lower addresses.
+ * @align_drift:  object offset from an "alignment"-aligned address.
+ * @alignment:    natural object alignment. Must be non-zero, power of 2.
+ *
+ * Returns the offset that must be substracted to align towards lower addresses.
+ */
+#define offset_align_floor(align_drift, alignment)                            \
+       ({                                                                     \
+               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
+                                  || ((alignment) & ((alignment) - 1)));      \
+               (((align_drift) - (alignment)) & ((alignment) - 1));           \
+       })
+
+#endif /* _LTTNG_ALIGN_H */
diff --git a/src/common/bug.h b/src/common/bug.h
new file mode 100644 (file)
index 0000000..9368c08
--- /dev/null
@@ -0,0 +1,54 @@
+#ifndef _LTTNG_BUG_H
+#define _LTTNG_BUG_H
+
+/*
+ * lttng/bug.h
+ *
+ * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ */
+
+#include <urcu/compiler.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define LTTNG_BUG_ON(condition)                                                \
+       do {                                                            \
+               if (caa_unlikely(condition)) {                          \
+                       fprintf(stderr,                                 \
+                               "LTTng BUG in file %s, line %d.\n",     \
+                               __FILE__, __LINE__);                    \
+                       exit(EXIT_FAILURE);                             \
+               }                                                       \
+       } while (0)
+
+#define LTTNG_BUILD_BUG_ON(condition)                                  \
+       ((void) sizeof(char[-!!(condition)]))
+
+/**
+ * LTTNG_BUILD_RUNTIME_BUG_ON - check condition at build (if constant) or runtime
+ * @condition: the condition which should be false.
+ *
+ * If the condition is a constant and true, the compiler will generate a build
+ * error. If the condition is not constant, a BUG will be triggered at runtime
+ * if the condition is ever true. If the condition is constant and false, no
+ * code is emitted.
+ */
+#define LTTNG_BUILD_RUNTIME_BUG_ON(condition)                  \
+       do {                                                    \
+               if (__builtin_constant_p(condition))            \
+                       LTTNG_BUILD_BUG_ON(condition);          \
+               else                                            \
+                       LTTNG_BUG_ON(condition);                \
+       } while (0)
+
+#endif
index 700149fe429d25104429f31b4c400c54246c30e8..25db6604a698fb2821af398dc05cf92ad2d59cec 100644 (file)
@@ -46,6 +46,7 @@
 #include "consumer.h"
 #include "consumer-stream.h"
 #include "consumer-testpoint.h"
+#include "align.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -3650,22 +3651,19 @@ int consumer_send_status_channel(int sock,
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
 
-/*
- * Using a maximum stream size with the produced and consumed position of a
- * stream, computes the new consumed position to be as close as possible to the
- * maximum possible stream size.
- *
- * If maximum stream size is lower than the possible buffer size (produced -
- * consumed), the consumed_pos given is returned untouched else the new value
- * is returned.
- */
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size)
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size)
 {
-       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
-               /* Offset from the produced position to get the latest buffers. */
-               return produced_pos - max_stream_size;
-       }
+       unsigned long start_pos;
 
-       return consumed_pos;
+       if (!nb_packets_per_stream) {
+               return consumed_pos;    /* Grab everything */
+       }
+       start_pos = produced_pos - offset_align_floor(produced_pos, max_sb_size);
+       start_pos -= max_sb_size * nb_packets_per_stream;
+       if ((long) (start_pos - consumed_pos) < 0) {
+               return consumed_pos;    /* Grab everything */
+       }
+       return start_pos;
 }
index 1e378f04ea631ea4387a1940c407599d19a63966..790cb6b135d9b9983a9826e4713112ad586da247 100644 (file)
@@ -666,8 +666,9 @@ int consumer_send_status_channel(int sock,
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
                uint64_t key);
 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
-unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
-               unsigned long produced_pos, uint64_t max_stream_size);
+unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t nb_packets_per_stream,
+               uint64_t max_sb_size);
 int consumer_add_data_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
 int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
index 79b25d35d2970d827f41637b51987e2e1580dc48..bca9ad2b9b867ab59acd5842eb1421eddced7f66 100644 (file)
@@ -113,7 +113,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
-               uint64_t relayd_id, uint64_t max_stream_size,
+               uint64_t relayd_id, uint64_t nb_packets_per_stream,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
@@ -219,14 +219,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        }
                }
 
-               /*
-                * The original value is sent back if max stream size is larger than
-                * the possible size of the snapshot. Also, we asume that the session
-                * daemon should never send a maximum stream size that is lower than
-                * subbuffer size.
-                */
-               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
-                               produced_pos, max_stream_size);
+               consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+                               produced_pos, nb_packets_per_stream,
+                               stream->max_sb_size);
 
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
@@ -884,7 +879,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
-                                       msg.u.snapshot_channel.max_stream_size,
+                                       msg.u.snapshot_channel.nb_packets_per_stream,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
index 1c495cf864b4fd39611086510b01d2e163bf1590..6ca6de2ca119a0888ef7869d364b5bd77994a3b3 100644 (file)
@@ -434,7 +434,7 @@ struct lttcomm_consumer_msg {
                        uint32_t metadata;              /* This a metadata snapshot. */
                        uint64_t relayd_id;             /* Relayd id if apply. */
                        uint64_t key;
-                       uint64_t max_stream_size;
+                       uint64_t nb_packets_per_stream;
                } LTTNG_PACKED snapshot_channel;
                struct {
                        uint64_t channel_key;
index ea30bd433ec4b0fc2265f491658c3890afabf73a..8aaa020bfe105cd41ab708809d7e447d3d7f0068 100644 (file)
@@ -858,7 +858,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
-               uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
+               uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -940,12 +940,13 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
 
                /*
                 * The original value is sent back if max stream size is larger than
-                * the possible size of the snapshot. Also, we asume that the session
+                * the possible size of the snapshot. Also, we assume that the session
                 * daemon should never send a maximum stream size that is lower than
                 * subbuffer size.
                 */
-               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
-                               produced_pos, max_stream_size);
+               consumed_pos = consumer_get_consume_start_pos(consumed_pos,
+                               produced_pos, nb_packets_per_stream,
+                               stream->max_sb_size);
 
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
@@ -1488,7 +1489,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
-                                       msg.u.snapshot_channel.max_stream_size,
+                                       msg.u.snapshot_channel.nb_packets_per_stream,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
diff --git a/src/lib/lttng-ctl/filter/align.h b/src/lib/lttng-ctl/filter/align.h
deleted file mode 100644 (file)
index fe32673..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-#ifndef _LTTNG_ALIGN_H
-#define _LTTNG_ALIGN_H
-
-/*
- * align.h
- *
- * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include "bug.h"
-#include <unistd.h>
-#include <limits.h>
-
-#ifndef PAGE_SIZE      /* Cygwin limits.h defines its own PAGE_SIZE */
-#define PAGE_SIZE              sysconf(_SC_PAGE_SIZE)
-#endif
-
-#define PAGE_MASK              (~(PAGE_SIZE - 1))
-#define __ALIGN_MASK(v, mask)  (((v) + (mask)) & ~(mask))
-#define ALIGN(v, align)                __ALIGN_MASK(v, (__typeof__(v)) (align) - 1)
-#define PAGE_ALIGN(addr)       ALIGN(addr, PAGE_SIZE)
-
-/**
- * offset_align - Calculate the offset needed to align an object on its natural
- *                alignment towards higher addresses.
- * @align_drift:  object offset from an "alignment"-aligned address.
- * @alignment:    natural object alignment. Must be non-zero, power of 2.
- *
- * Returns the offset that must be added to align towards higher
- * addresses.
- */
-#define offset_align(align_drift, alignment)                                  \
-       ({                                                                     \
-               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
-                                  || ((alignment) & ((alignment) - 1)));      \
-               (((alignment) - (align_drift)) & ((alignment) - 1));           \
-       })
-
-/**
- * offset_align_floor - Calculate the offset needed to align an object
- *                      on its natural alignment towards lower addresses.
- * @align_drift:  object offset from an "alignment"-aligned address.
- * @alignment:    natural object alignment. Must be non-zero, power of 2.
- *
- * Returns the offset that must be substracted to align towards lower addresses.
- */
-#define offset_align_floor(align_drift, alignment)                            \
-       ({                                                                     \
-               LTTNG_BUILD_RUNTIME_BUG_ON((alignment) == 0                    \
-                                  || ((alignment) & ((alignment) - 1)));      \
-               (((align_drift) - (alignment)) & ((alignment) - 1);            \
-       })
-
-#endif /* _LTTNG_ALIGN_H */
diff --git a/src/lib/lttng-ctl/filter/bug.h b/src/lib/lttng-ctl/filter/bug.h
deleted file mode 100644 (file)
index 9368c08..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-#ifndef _LTTNG_BUG_H
-#define _LTTNG_BUG_H
-
-/*
- * lttng/bug.h
- *
- * (C) Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- */
-
-#include <urcu/compiler.h>
-#include <stdio.h>
-#include <stdlib.h>
-
-#define LTTNG_BUG_ON(condition)                                                \
-       do {                                                            \
-               if (caa_unlikely(condition)) {                          \
-                       fprintf(stderr,                                 \
-                               "LTTng BUG in file %s, line %d.\n",     \
-                               __FILE__, __LINE__);                    \
-                       exit(EXIT_FAILURE);                             \
-               }                                                       \
-       } while (0)
-
-#define LTTNG_BUILD_BUG_ON(condition)                                  \
-       ((void) sizeof(char[-!!(condition)]))
-
-/**
- * LTTNG_BUILD_RUNTIME_BUG_ON - check condition at build (if constant) or runtime
- * @condition: the condition which should be false.
- *
- * If the condition is a constant and true, the compiler will generate a build
- * error. If the condition is not constant, a BUG will be triggered at runtime
- * if the condition is ever true. If the condition is constant and false, no
- * code is emitted.
- */
-#define LTTNG_BUILD_RUNTIME_BUG_ON(condition)                  \
-       do {                                                    \
-               if (__builtin_constant_p(condition))            \
-                       LTTNG_BUILD_BUG_ON(condition);          \
-               else                                            \
-                       LTTNG_BUG_ON(condition);                \
-       } while (0)
-
-#endif
index 8c6dc96ff236520de23ea0efa7ee7b37f1c20784..46733ba2df418a446a5fd3160ed22acf199b837f 100644 (file)
@@ -22,7 +22,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <errno.h>
-#include "align.h"
+#include "common/align.h"
 #include "filter-bytecode.h"
 #include "filter-ir.h"
 #include "filter-ast.h"
This page took 0.048394 seconds and 4 git commands to generate.