Fix: sample discarded events count before reserve
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Mar 2022 21:42:15 +0000 (16:42 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 11 Mar 2022 16:39:54 +0000 (11:39 -0500)
Sampling the discarded events count in the buffer_end callback is done
out of order, and may therefore include increments performed by following
events (in following packets) if the thread doing the end-of-packet
event write is preempted for a long time.

Sampling the event discarded counts before reserving space for the last
event in a packet, and keeping this as part of the private ring buffer
context, should fix this race.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: Ib59b634bbaefd2444751547d20a891c9dd93cd73

src/common/ringbuffer-clients/metadata-template.h
src/common/ringbuffer-clients/template.h
src/common/ringbuffer/frontend.h
src/common/ringbuffer/frontend_api.h
src/common/ringbuffer/frontend_internal.h
src/common/ringbuffer/frontend_types.h
src/common/ringbuffer/ring_buffer_frontend.c
src/common/ringbuffer/ringbuffer-config.h

index 1f0e9fe43eeed742947e86eb3b90466d5c594595..56d955161af74ced9d8158ab04fb9b69cfc16844 100644 (file)
@@ -127,7 +127,8 @@ static void client_buffer_begin(struct lttng_ust_ring_buffer *buf,
 static void client_buffer_end(struct lttng_ust_ring_buffer *buf,
                uint64_t tsc  __attribute__((unused)),
                unsigned int subbuf_idx, unsigned long data_size,
-               struct lttng_ust_shm_handle *handle)
+               struct lttng_ust_shm_handle *handle,
+               const struct lttng_ust_ring_buffer_ctx *ctx)
 {
        struct lttng_ust_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
        struct metadata_packet_header *header =
@@ -146,9 +147,9 @@ static void client_buffer_end(struct lttng_ust_ring_buffer *buf,
         * We do not care about the records lost count, because the metadata
         * channel waits and retry.
         */
-       (void) lib_ring_buffer_get_records_lost_full(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+       (void) lib_ring_buffer_get_records_lost_full(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx);
        WARN_ON_ONCE(records_lost != 0);
 }
 
index b0a5fdd7a0910d00d7f875397d6db3d1f9adb540..824c344d5e7f4980f308774f77daee5f7cac7308 100644 (file)
@@ -399,7 +399,8 @@ static void client_buffer_begin(struct lttng_ust_ring_buffer *buf, uint64_t tsc,
  */
 static void client_buffer_end(struct lttng_ust_ring_buffer *buf, uint64_t tsc,
                              unsigned int subbuf_idx, unsigned long data_size,
-                             struct lttng_ust_shm_handle *handle)
+                             struct lttng_ust_shm_handle *handle,
+                             const struct lttng_ust_ring_buffer_ctx *ctx)
 {
        struct lttng_ust_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
        struct packet_header *header =
@@ -418,9 +419,9 @@ static void client_buffer_end(struct lttng_ust_ring_buffer *buf, uint64_t tsc,
        header->ctx.packet_size =
                (uint64_t) LTTNG_UST_PAGE_ALIGN(data_size) * CHAR_BIT;  /* in bits */
 
-       records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
-       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+       records_lost += lib_ring_buffer_get_records_lost_full(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx);
+       records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx);
        header->ctx.events_discarded = records_lost;
 }
 
index ed0cc7d434493acf20323f53a69f3b544e1b4679..9850d71058b7b64681e1b5c0713dd1c187856c88 100644 (file)
@@ -265,26 +265,26 @@ unsigned long lib_ring_buffer_get_records_overrun(
 
 static inline
 unsigned long lib_ring_buffer_get_records_lost_full(
-                               const struct lttng_ust_ring_buffer_config *config,
-                               struct lttng_ust_ring_buffer *buf)
+                               const struct lttng_ust_ring_buffer_config *config __attribute__((unused)),
+                               const struct lttng_ust_ring_buffer_ctx *ctx)
 {
-       return v_read(config, &buf->records_lost_full);
+       return ctx->priv->records_lost_full;
 }
 
 static inline
 unsigned long lib_ring_buffer_get_records_lost_wrap(
-                               const struct lttng_ust_ring_buffer_config *config,
-                               struct lttng_ust_ring_buffer *buf)
+                               const struct lttng_ust_ring_buffer_config *config __attribute__((unused)),
+                               const struct lttng_ust_ring_buffer_ctx *ctx)
 {
-       return v_read(config, &buf->records_lost_wrap);
+       return ctx->priv->records_lost_wrap;
 }
 
 static inline
 unsigned long lib_ring_buffer_get_records_lost_big(
-                               const struct lttng_ust_ring_buffer_config *config,
-                               struct lttng_ust_ring_buffer *buf)
+                               const struct lttng_ust_ring_buffer_config *config __attribute__((unused)),
+                               const struct lttng_ust_ring_buffer_ctx *ctx)
 {
-       return v_read(config, &buf->records_lost_big);
+       return ctx->priv->records_lost_big;
 }
 
 static inline
index b306986ecf3d74e05b41f66d58e9a239853ee41e..a697ed2e0082c5a8adc0fe352cac543cdeb265c9 100644 (file)
@@ -288,7 +288,7 @@ void lib_ring_buffer_commit(const struct lttng_ust_ring_buffer_config *config,
        commit_count = v_read(config, &cc_hot->cc);
 
        lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
-                                     commit_count, endidx, handle, ctx_private->tsc);
+                                     commit_count, endidx, handle, ctx);
        /*
         * Update used size at each commit. It's needed only for extracting
         * ring_buffer buffers from vmcore, after crash.
index 9655a19f3c957274b5e7297dade667028a779190..1dc816a3c221670a692f82bbea7bf555d062166e 100644 (file)
@@ -165,7 +165,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_ring_buffer_confi
                                   unsigned long commit_count,
                                   unsigned long idx,
                                   struct lttng_ust_shm_handle *handle,
-                                  uint64_t tsc)
+                                  const struct lttng_ust_ring_buffer_ctx *ctx)
        __attribute__((visibility("hidden")));
 
 /* Buffer write helpers */
@@ -301,7 +301,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_ring_buffer_config *co
                                   unsigned long commit_count,
                                   unsigned long idx,
                                   struct lttng_ust_shm_handle *handle,
-                                  uint64_t tsc)
+                                  const struct lttng_ust_ring_buffer_ctx *ctx)
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -310,7 +310,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_ring_buffer_config *co
        if (caa_unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
                     - (old_commit_count & chan->commit_count_mask) == 0))
                lib_ring_buffer_check_deliver_slow(config, buf, chan, offset,
-                       commit_count, idx, handle, tsc);
+                       commit_count, idx, handle, ctx);
 }
 
 /*
index d1f70c3ab59e5e2665ee5fdae54fc7c17fda121e..1b0e1a080e6bb15d79459d516c6b1eb56a81bb7e 100644 (file)
@@ -253,12 +253,19 @@ struct lttng_ust_ring_buffer_ctx_private {
                                                 */
        uint64_t tsc;                           /* time-stamp counter value */
        unsigned int rflags;                    /* reservation flags */
-
        struct lttng_ust_ring_buffer *buf;      /*
                                                 * buffer corresponding to processor id
                                                 * for this channel
                                                 */
        struct lttng_ust_ring_buffer_backend_pages *backend_pages;
+
+       /*
+        * Records lost counts are only loaded into these fields before
+        * reserving the last bytes from the ring buffer.
+        */
+       unsigned long records_lost_full;
+       unsigned long records_lost_wrap;
+       unsigned long records_lost_big;
 };
 
 static inline
index 4d06bf63140c6218c8abf7b278cd3f7b48026194..91c69b7697d9e7f1bd1b06a518a66049a8ce210c 100644 (file)
@@ -1763,7 +1763,7 @@ static
 void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
                                      struct lttng_ust_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
-                                     uint64_t tsc,
+                                     const struct lttng_ust_ring_buffer_ctx *ctx,
                                      struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1771,7 +1771,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot;
 
-       config->cb.buffer_begin(buf, tsc, oldidx, handle);
+       config->cb.buffer_begin(buf, ctx->priv->tsc, oldidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1786,7 +1786,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
        commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
-                                     commit_count, oldidx, handle, tsc);
+                                     commit_count, oldidx, handle, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + config->cb.subbuffer_header_size(),
                        commit_count, handle, cc_hot);
@@ -1804,7 +1804,7 @@ static
 void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
                                    struct lttng_ust_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   uint64_t tsc,
+                                   const struct lttng_ust_ring_buffer_ctx *ctx,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1829,7 +1829,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
         * postponed until the commit counter is incremented for the
         * current space reservation.
         */
-       *ts_end = tsc;
+       *ts_end = ctx->priv->tsc;
 
        /*
         * Order all writes to buffer and store to ts_end before the commit
@@ -1842,7 +1842,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
        v_add(config, padding_size, &cc_hot->cc);
        commit_count = v_read(config, &cc_hot->cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
-                                     commit_count, oldidx, handle, tsc);
+                                     commit_count, oldidx, handle, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + padding_size, commit_count, handle,
                        cc_hot);
@@ -1859,7 +1859,7 @@ static
 void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
                                      struct lttng_ust_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
-                                     uint64_t tsc,
+                                     const struct lttng_ust_ring_buffer_ctx *ctx,
                                      struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1867,7 +1867,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot;
 
-       config->cb.buffer_begin(buf, tsc, beginidx, handle);
+       config->cb.buffer_begin(buf, ctx->priv->tsc, beginidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1881,7 +1881,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
        commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
-                                     commit_count, beginidx, handle, tsc);
+                                     commit_count, beginidx, handle, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->begin + config->cb.subbuffer_header_size(),
                        commit_count, handle, cc_hot);
@@ -1899,7 +1899,7 @@ static
 void lib_ring_buffer_switch_new_end(struct lttng_ust_ring_buffer *buf,
                                    struct lttng_ust_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   uint64_t tsc,
+                                   const struct lttng_ust_ring_buffer_ctx *ctx,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1921,7 +1921,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_ring_buffer *buf,
         * postponed until the commit counter is incremented for the
         * current space reservation.
         */
-       *ts_end = tsc;
+       *ts_end = ctx->priv->tsc;
 }
 
 /*
@@ -1934,7 +1934,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
                                    struct lttng_ust_ring_buffer *buf,
                                    struct lttng_ust_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   uint64_t *tsc,
+                                   struct lttng_ust_ring_buffer_ctx *ctx,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1945,7 +1945,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
        offsets->switch_old_start = 0;
        off = subbuf_offset(offsets->begin, chan);
 
-       *tsc = config->cb.ring_buffer_clock_read(chan);
+       ctx->priv->tsc = config->cb.ring_buffer_clock_read(chan);
 
        /*
         * Ensure we flush the header of an empty subbuffer when doing the
@@ -2032,6 +2032,13 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
        offsets->begin = subbuf_align(offsets->begin, chan);
        /* Note: old points to the next subbuf at offset 0 */
        offsets->end = offsets->begin;
+       /*
+        * Populate the records lost counters prior to performing a
+        * sub-buffer switch.
+        */
+       ctx->priv->records_lost_full = v_read(config, &buf->records_lost_full);
+       ctx->priv->records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+       ctx->priv->records_lost_big = v_read(config, &buf->records_lost_big);
        return 0;
 }
 
@@ -2050,10 +2057,12 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
 {
        struct lttng_ust_ring_buffer_channel *chan;
        const struct lttng_ust_ring_buffer_config *config;
+       struct lttng_ust_ring_buffer_ctx_private ctx_priv;
+       struct lttng_ust_ring_buffer_ctx ctx;
        struct switch_offsets offsets;
        unsigned long oldidx;
-       uint64_t tsc;
 
+       ctx.priv = &ctx_priv;
        chan = shmp(handle, buf->backend.chan);
        if (!chan)
                return;
@@ -2066,7 +2075,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
         */
        do {
                if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
-                                                   &tsc, handle))
+                                                   &ctx, handle))
                        return; /* Switch not needed */
        } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
                 != offsets.old);
@@ -2077,7 +2086,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
         * records, never the opposite (missing a full TSC record when it would
         * be needed).
         */
-       save_last_tsc(config, buf, tsc);
+       save_last_tsc(config, buf, ctx.priv->tsc);
 
        /*
         * Push the reader if necessary
@@ -2091,14 +2100,14 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
         * May need to populate header start on SWITCH_FLUSH.
         */
        if (offsets.switch_old_start) {
-               lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
+               lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx, handle);
                offsets.old += config->cb.subbuffer_header_size();
        }
 
        /*
         * Switch old subbuffer.
         */
-       lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
+       lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx, handle);
 }
 
 static
@@ -2308,6 +2317,15 @@ retry:
                 */
                offsets->switch_new_end = 1;    /* For offsets->begin */
        }
+       /*
+        * Populate the records lost counters when the space reservation
+        * may cause a sub-buffer switch.
+        */
+       if (offsets->switch_new_end || offsets->switch_old_end) {
+               ctx_private->records_lost_full = v_read(config, &buf->records_lost_full);
+               ctx_private->records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+               ctx_private->records_lost_big = v_read(config, &buf->records_lost_big);
+       }
        return 0;
 }
 
@@ -2376,17 +2394,17 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_ring_buffer_ctx *ctx,
                lib_ring_buffer_clear_noref(config, &buf->backend,
                                            subbuf_index(offsets.old - 1, chan),
                                            handle);
-               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx_private->tsc, handle);
+               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx, handle);
        }
 
        /*
         * Populate new subbuffer.
         */
        if (caa_unlikely(offsets.switch_new_start))
-               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx_private->tsc, handle);
+               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx, handle);
 
        if (caa_unlikely(offsets.switch_new_end))
-               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx_private->tsc, handle);
+               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx, handle);
 
        ctx_private->slot_size = offsets.size;
        ctx_private->pre_offset = offsets.begin;
@@ -2447,7 +2465,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_ring_buffer_confi
                                   unsigned long commit_count,
                                   unsigned long idx,
                                   struct lttng_ust_shm_handle *handle,
-                                  uint64_t tsc __attribute__((unused)))
+                                  const struct lttng_ust_ring_buffer_ctx *ctx)
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -2515,7 +2533,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_ring_buffer_confi
                                                                buf,
                                                                idx,
                                                                handle),
-                                     handle);
+                                     handle, ctx);
 
                /*
                 * Increment the packet counter while we have exclusive
index 94f283ff930dd471b1eb5e92ae1dc5afcbb2e4b4..61386174c4f7e27ad019650ca16bf432e7b14bf6 100644 (file)
@@ -51,7 +51,8 @@ struct lttng_ust_ring_buffer_client_cb {
                              struct lttng_ust_shm_handle *handle);
        void (*buffer_end) (struct lttng_ust_ring_buffer *buf, uint64_t tsc,
                            unsigned int subbuf_idx, unsigned long data_size,
-                           struct lttng_ust_shm_handle *handle);
+                           struct lttng_ust_shm_handle *handle,
+                           const struct lttng_ust_ring_buffer_ctx *ctx);
 
        /* Optional callbacks (can be set to NULL) */
 
This page took 0.035122 seconds and 4 git commands to generate.