Support snapshot max-size limitation
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 10 Jul 2013 02:42:13 +0000 (22:42 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 12 Jul 2013 15:18:22 +0000 (11:18 -0400)
This features allows the user to specify a maximum size for the snapshot
collected. The maximum stream size is split equally by the total number
of streams active at the time of the snapshot record across all the
domains. The metadata is not limited and its size is not taken into
account in the maximum stream size computation.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
17 files changed:
include/lttng/lttng-error.h
src/bin/lttng-sessiond/buffer-registry.c
src/bin/lttng-sessiond/buffer-registry.h
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/kernel.h
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/common/consumer.c
src/common/consumer.h
src/common/error.c
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 8d2e1d0022bb101dc528e0b67a50019214fce6a5..4e98f43bea86eca94969a4b1de859ebf01a12f00 100644 (file)
@@ -81,7 +81,7 @@ enum lttng_error_code {
        LTTNG_ERR_KERN_CONSUMER_FAIL     = 48,  /* Kernel consumer start failed */
        LTTNG_ERR_KERN_STREAM_FAIL       = 49,  /* Kernel create stream failed */
        LTTNG_ERR_START_SESSION_ONCE     = 50,  /* Session needs to be started once. */
-       /* 51 */
+       LTTNG_ERR_SNAPSHOT_FAIL          = 51,  /* Snapshot record failed. */
        /* 52 */
        LTTNG_ERR_KERN_LIST_FAIL         = 53,  /* Kernel listing events failed */
        LTTNG_ERR_UST_CALIBRATE_FAIL     = 54,  /* UST calibration failed */
index 93da2f12253270c54856bc9fe303ff392141d260..bab1f309f54560d12f56a8be8675d0532d8772bd 100644 (file)
@@ -377,6 +377,7 @@ void buffer_reg_stream_add(struct buffer_reg_stream *stream,
 
        pthread_mutex_lock(&channel->stream_list_lock);
        cds_list_add_tail(&stream->lnode, &channel->streams);
+       channel->stream_count++;
        pthread_mutex_unlock(&channel->stream_list_lock);
 }
 
@@ -504,6 +505,7 @@ void buffer_reg_channel_destroy(struct buffer_reg_channel *regp,
                /* Wipe stream */
                cds_list_for_each_entry_safe(sreg, stmp, &regp->streams, lnode) {
                        cds_list_del(&sreg->lnode);
+                       regp->stream_count--;
                        buffer_reg_stream_destroy(sreg, domain);
                }
 
index cb976dcdc3db25609bb32838a8abe50266908439..0bf439d78ce692938f605fde915495d2e3ef7754 100644 (file)
@@ -43,6 +43,8 @@ struct buffer_reg_channel {
        uint64_t consumer_key;
        /* Stream registry object of this channel registry. */
        struct cds_list_head streams;
+       /* Total number of stream in the list. */
+       uint64_t stream_count;
        /* Used to ensure mutual exclusion to the stream's list. */
        pthread_mutex_t stream_list_lock;
        /* Node for hash table usage. */
index 903dfd36defb5f8591aa7532f7b0a202d8be863d..c522d51c2dc27e46efa08ccb7f85085e16e4e4d3 100644 (file)
@@ -2441,7 +2441,8 @@ error:
  * Return 0 on success or else a negative value.
  */
 static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, struct ltt_session *session, int wait)
+               struct snapshot_output *output, struct ltt_session *session,
+               int wait, int nb_streams)
 {
        int ret;
 
@@ -2471,8 +2472,12 @@ static int record_kernel_snapshot(struct ltt_kernel_session *ksess,
                goto error_snapshot;
        }
 
-       ret = kernel_snapshot_record(ksess, output, wait);
+       ret = kernel_snapshot_record(ksess, output, wait, nb_streams);
        if (ret < 0) {
+               ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+               if (ret == -EINVAL) {
+                       ret = -LTTNG_ERR_INVALID;
+               }
                goto error_snapshot;
        }
 
@@ -2489,7 +2494,8 @@ error:
  * Return 0 on success or else a negative value.
  */
 static int record_ust_snapshot(struct ltt_ust_session *usess,
-               struct snapshot_output *output, struct ltt_session *session, int wait)
+               struct snapshot_output *output, struct ltt_session *session,
+               int wait, int nb_streams)
 {
        int ret;
 
@@ -2519,8 +2525,12 @@ static int record_ust_snapshot(struct ltt_ust_session *usess,
                goto error_snapshot;
        }
 
-       ret = ust_app_snapshot_record(usess, output, wait);
+       ret = ust_app_snapshot_record(usess, output, wait, nb_streams);
        if (ret < 0) {
+               ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+               if (ret == -EINVAL) {
+                       ret = -LTTNG_ERR_INVALID;
+               }
                goto error_snapshot;
        }
 
@@ -2531,6 +2541,29 @@ error:
        return ret;
 }
 
+/*
+ * Returns the total number of streams for a session or a negative value
+ * on error.
+ */
+static unsigned int get_total_nb_stream(struct ltt_session *session)
+{
+       unsigned int total_streams = 0;
+
+       if (session->kernel_session) {
+               struct ltt_kernel_session *ksess = session->kernel_session;
+
+               total_streams += ksess->stream_count_global;
+       }
+
+       if (session->ust_session) {
+               struct ltt_ust_session *usess = session->ust_session;
+
+               total_streams += ust_app_get_nb_stream(usess);
+       }
+
+       return total_streams;
+}
+
 /*
  * Command LTTNG_SNAPSHOT_RECORD from lib lttng ctl.
  *
@@ -2544,6 +2577,7 @@ int cmd_snapshot_record(struct ltt_session *session,
 {
        int ret = LTTNG_OK;
        struct snapshot_output *tmp_sout = NULL;
+       unsigned int nb_streams;
 
        assert(session);
 
@@ -2585,11 +2619,18 @@ int cmd_snapshot_record(struct ltt_session *session,
                }
        }
 
+       /*
+        * Get the total number of stream of that session which is used by the
+        * maximum size of the snapshot feature.
+        */
+       nb_streams = get_total_nb_stream(session);
+
        if (session->kernel_session) {
                struct ltt_kernel_session *ksess = session->kernel_session;
 
                if (tmp_sout) {
-                       ret = record_kernel_snapshot(ksess, tmp_sout, session, wait);
+                       ret = record_kernel_snapshot(ksess, tmp_sout, session,
+                                       wait, nb_streams);
                        if (ret < 0) {
                                goto error;
                        }
@@ -2600,7 +2641,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
-                               ret = record_kernel_snapshot(ksess, sout, session, wait);
+                               ret = record_kernel_snapshot(ksess, sout,
+                                               session, wait, nb_streams);
                                if (ret < 0) {
                                        rcu_read_unlock();
                                        goto error;
@@ -2614,7 +2656,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                struct ltt_ust_session *usess = session->ust_session;
 
                if (tmp_sout) {
-                       ret = record_ust_snapshot(usess, tmp_sout, session, wait);
+                       ret = record_ust_snapshot(usess, tmp_sout, session,
+                                       wait, nb_streams);
                        if (ret < 0) {
                                goto error;
                        }
@@ -2625,7 +2668,8 @@ int cmd_snapshot_record(struct ltt_session *session,
                        rcu_read_lock();
                        cds_lfht_for_each_entry(session->snapshot.output_ht->ht,
                                        &iter.iter, sout, node.node) {
-                               ret = record_ust_snapshot(usess, sout, session, wait);
+                               ret = record_ust_snapshot(usess, sout, session,
+                                               wait, nb_streams);
                                if (ret < 0) {
                                        rcu_read_unlock();
                                        goto error;
index b1ccbc1082c7728e306bdccfa680652063bc358a..66fecd617612bb44fe6c7d066a32a7a75e65f5b3 100644 (file)
@@ -1210,7 +1210,7 @@ end:
  */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait)
+               const char *session_path, int wait, int max_stream_size)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1225,7 +1225,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        memset(&msg, 0, sizeof(msg));
        msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
        msg.u.snapshot_channel.key = key;
-       msg.u.snapshot_channel.max_size = output->max_size;
+       msg.u.snapshot_channel.max_stream_size = max_stream_size;
        msg.u.snapshot_channel.metadata = metadata;
 
        if (output->consumer->type == CONSUMER_DST_NET) {
index c219420cacb34c08acca28e560020d9f918f3902..44fd708ec8b8779a8d9ac77eec5835fbeb9611f9 100644 (file)
@@ -249,6 +249,6 @@ int consumer_flush_channel(struct consumer_socket *socket, uint64_t key);
 /* Snapshot command. */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait);
+               const char *session_path, int wait, int max_size_per_stream);
 
 #endif /* _CONSUMER_H */
index 6f2604c607d2c1bc5fe4c24a53a2cf0458a4053d..ddf01044c816f179b668525c5463c60a6f3483c2 100644 (file)
@@ -822,12 +822,13 @@ void kernel_destroy_channel(struct ltt_kernel_channel *kchan)
  * Return 0 on success or else a negative value.
  */
 int kernel_snapshot_record(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, int wait)
+               struct snapshot_output *output, int wait, unsigned int nb_streams)
 {
        int ret, saved_metadata_fd;
        struct consumer_socket *socket;
        struct lttng_ht_iter iter;
        struct ltt_kernel_metadata *saved_metadata;
+       uint64_t max_size_per_stream = 0;
 
        assert(ksess);
        assert(ksess->consumer);
@@ -853,6 +854,10 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                goto error_open_stream;
        }
 
+       if (output->max_size > 0 && nb_streams > 0) {
+               max_size_per_stream = output->max_size / nb_streams;
+       }
+
        /* Send metadata to consumer and snapshot everything. */
        cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter,
                        socket, node.node) {
@@ -881,9 +886,20 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
 
                /* For each channel, ask the consumer to snapshot it. */
                cds_list_for_each_entry(chan, &ksess->channel_list.head, list) {
+                       if (max_size_per_stream &&
+                                       chan->channel->attr.subbuf_size > max_size_per_stream) {
+                               ret = LTTNG_ERR_INVALID;
+                               DBG3("Kernel snapshot record maximum stream size %" PRIu64
+                                               " is smaller than subbuffer size of %" PRIu64,
+                                               max_size_per_stream, chan->channel->attr.subbuf_size);
+                               goto error_consumer;
+                       }
+
                        pthread_mutex_lock(socket->lock);
                        ret = consumer_snapshot_channel(socket, chan->fd, output, 0,
-                                       ksess->uid, ksess->gid, DEFAULT_KERNEL_TRACE_DIR, wait);
+                                       ksess->uid, ksess->gid,
+                                       DEFAULT_KERNEL_TRACE_DIR, wait,
+                                       max_size_per_stream);
                        pthread_mutex_unlock(socket->lock);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
@@ -894,7 +910,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                /* Snapshot metadata, */
                pthread_mutex_lock(socket->lock);
                ret = consumer_snapshot_channel(socket, ksess->metadata->fd, output,
-                               1, ksess->uid, ksess->gid, DEFAULT_KERNEL_TRACE_DIR, wait);
+                               1, ksess->uid, ksess->gid,
+                               DEFAULT_KERNEL_TRACE_DIR, wait, max_size_per_stream);
                pthread_mutex_unlock(socket->lock);
                if (ret < 0) {
                        ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
index a87405a4dfcd7e0372f4461b90f512a83a987713..78d12aba3ddc0bd68bd0699181f9457eca898075 100644 (file)
@@ -55,7 +55,7 @@ int kernel_validate_version(int tracer_fd);
 void kernel_destroy_session(struct ltt_kernel_session *ksess);
 void kernel_destroy_channel(struct ltt_kernel_channel *kchan);
 int kernel_snapshot_record(struct ltt_kernel_session *ksess,
-               struct snapshot_output *output, int wait);
+               struct snapshot_output *output, int wait, unsigned int nb_streams);
 
 int init_kernel_workarounds(void);
 
index 1d047fd85240582854f5a6c50744b23ce7ac4cb1..bfc8a389bd62d0a32446bf7850e28929c7f1e593 100644 (file)
@@ -675,6 +675,8 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd)
                                if (ret < 0) {
                                        goto error;
                                }
+                               /* Update the stream global counter */
+                               ksess->stream_count_global += ret;
 
                                /*
                                 * Have we already sent fds to the consumer? If yes, it means
index 60b55d25a56e9832a261941ecf34007d6bd752d7..223fb77c0c936e0b81d678e4787a2c5f46e80bdc 100644 (file)
@@ -4871,18 +4871,27 @@ void ust_app_destroy(struct ust_app *app)
  * Return 0 on success or else a negative value.
  */
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait)
+               struct snapshot_output *output, int wait, unsigned int nb_streams)
 {
        int ret = 0;
        struct lttng_ht_iter iter;
        struct ust_app *app;
        char pathname[PATH_MAX];
+       uint64_t max_stream_size = 0;
 
        assert(usess);
        assert(output);
 
        rcu_read_lock();
 
+       /*
+        * Compute the maximum size of a single stream if a max size is asked by
+        * the caller.
+        */
+       if (output->max_size > 0 && nb_streams > 0) {
+               max_stream_size = output->max_size / nb_streams;
+       }
+
        cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
                struct consumer_socket *socket;
                struct lttng_ht_iter chan_iter;
@@ -4915,8 +4924,22 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
 
                cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                ua_chan, node.node) {
+                       /*
+                        * Make sure the maximum stream size is not lower than the
+                        * subbuffer size or else it's an error since we won't be able to
+                        * snapshot anything.
+                        */
+                       if (ua_chan->attr.subbuf_size > max_stream_size) {
+                               ret = -EINVAL;
+                               DBG3("UST app snapshot record maximum stream size %" PRIu64
+                                               " is smaller than subbuffer size of %" PRIu64,
+                                               max_stream_size, ua_chan->attr.subbuf_size);
+                               goto error;
+                       }
+
                        ret = consumer_snapshot_channel(socket, ua_chan->key, output, 0,
-                                       ua_sess->euid, ua_sess->egid, pathname, wait);
+                                       ua_sess->euid, ua_sess->egid, pathname, wait,
+                                       max_stream_size);
                        if (ret < 0) {
                                goto error;
                        }
@@ -4925,7 +4948,8 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                registry = get_session_registry(ua_sess);
                assert(registry);
                ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
-                               1, ua_sess->euid, ua_sess->egid, pathname, wait);
+                               1, ua_sess->euid, ua_sess->egid, pathname, wait,
+                               max_stream_size);
                if (ret < 0) {
                        goto error;
                }
@@ -4936,3 +4960,59 @@ error:
        rcu_read_unlock();
        return ret;
 }
+
+/*
+ * Return the number of streams for a UST session.
+ */
+unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
+{
+       unsigned int ret = 0;
+       struct ust_app *app;
+       struct lttng_ht_iter iter;
+
+       assert(usess);
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct buffer_reg_uid *reg;
+
+               cds_list_for_each_entry(reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct buffer_reg_channel *reg_chan;
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
+                                       reg_chan, node.node) {
+                               ret += reg_chan->stream_count;
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               rcu_read_lock();
+               cds_lfht_for_each_entry(ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+                       struct ust_app_channel *ua_chan;
+                       struct ust_app_session *ua_sess;
+                       struct lttng_ht_iter chan_iter;
+
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (!ua_sess) {
+                               /* Session not associated with this app. */
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
+                                       ua_chan, node.node) {
+                               ret += ua_chan->streams.count;
+                       }
+               }
+               rcu_read_unlock();
+               break;
+       }
+       default:
+               assert(0);
+               break;
+       }
+
+       return ret;
+}
index 9116a2192fdbaa2c4ac920c6bce3e7b95e1c2089..11202ecaea5845a4ae8f354c7218a0a8d4a9b68c 100644 (file)
@@ -309,7 +309,8 @@ ssize_t ust_app_push_metadata(struct ust_registry_session *registry,
                struct consumer_socket *socket, int send_zero_data);
 void ust_app_destroy(struct ust_app *app);
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait);
+               struct snapshot_output *output, int wait, unsigned int nb_streams);
+unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess);
 
 #else /* HAVE_LIBLTTNG_UST_CTL */
 
@@ -509,7 +510,12 @@ void ust_app_destroy(struct ust_app *app)
 }
 static inline
 int ust_app_snapshot_record(struct ltt_ust_session *usess,
-               struct snapshot_output *output, int wait)
+               struct snapshot_output *output, int wait, unsigned int nb_stream)
+{
+       return 0;
+}
+static inline
+unsigned int ust_app_get_nb_stream(struct ltt_ust_session *usess)
 {
        return 0;
 }
index bcede1ef5d8f3857515baeaf709ad9b78294a684..9b544dd112cbc58b4e339a3126b19e83116fd577 100644 (file)
@@ -3457,3 +3457,23 @@ int consumer_send_status_channel(int sock,
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t max_stream_size)
+{
+       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+               /* Offset from the produced position to get the latest buffers. */
+               return produced_pos - max_stream_size;
+       }
+
+       return consumed_pos;
+}
index eeaf1100243e1f232a1acb47e0aacdeb4487293c..dd26e51834d5738a653380b0ab8e872902d565ed 100644 (file)
@@ -573,5 +573,7 @@ int consumer_send_status_channel(int sock,
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
                uint64_t key);
 void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t max_stream_size);
 
 #endif /* LIB_CONSUMER_H */
index 416365f08a77432bfc4e13233c09b6a957ea18cd..5851012529902f176d3d2c39d6fa0ac8de765551 100644 (file)
@@ -110,6 +110,7 @@ static const char *error_string_array[] = {
        [ ERROR_INDEX(LTTNG_ERR_NOMEM)] = "Not enough memory",
        [ ERROR_INDEX(LTTNG_ERR_SNAPSHOT_OUTPUT_EXIST) ] = "Snapshot output already exists",
        [ ERROR_INDEX(LTTNG_ERR_START_SESSION_ONCE) ] = "Session needs to be started once",
+       [ ERROR_INDEX(LTTNG_ERR_SNAPSHOT_FAIL) ] = "Snapshot record failed",
 
        /* Last element */
        [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code"
index 42af65ac3d8c715a8a0851cd44dfdb54f9278072..21ac08f1b4d278c3a4fd35226508e274903673ce 100644 (file)
@@ -110,7 +110,8 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
-               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+               uint64_t relayd_id, uint64_t max_stream_size,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned long consumed_pos, produced_pos;
@@ -203,6 +204,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        }
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
@@ -739,7 +749,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
-                                       msg.u.snapshot_channel.relayd_id, ctx);
+                                       msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
+                                       ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
                                ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
index 2ee761749558ebe8e169e38d2c2859bedb9f5506..ea8ad1ce0f72e04704309d415ef162f966457b72 100644 (file)
@@ -396,7 +396,7 @@ struct lttcomm_consumer_msg {
                        uint32_t metadata;              /* This a metadata snapshot. */
                        uint64_t relayd_id;             /* Relayd id if apply. */
                        uint64_t key;
-                       uint64_t max_size;
+                       uint64_t max_stream_size;
                } LTTNG_PACKED snapshot_channel;
        } u;
 } LTTNG_PACKED;
index 1bb07c0c73606139e38a7c8e8c0e932ab7709459..b2434eb5a78948da66555e4ce48951c1b776fede 100644 (file)
@@ -859,7 +859,7 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
-               struct lttng_consumer_local_data *ctx)
+               uint64_t max_stream_size, struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -930,6 +930,15 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
                        goto error_unlock;
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
@@ -1425,6 +1434,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
                                        ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
This page took 0.036338 seconds and 4 git commands to generate.