ring buffer context: cpu number becomes an output of reserve
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 26 Mar 2021 17:52:25 +0000 (13:52 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 26 Mar 2021 20:06:36 +0000 (16:06 -0400)
In order to facilitate eventual integration of a ring buffer scheme
based on Restartable Sequences (sys_rseq), change the ownership of the
ring buffer context "cpu" field so it is now populated by the ring
buffer reserve operation. This means a rseq-based reserve could retry
on a new current cpu after a rseq-cmpxchg fails to reserve and then a
migration occurs.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: If0c0689446975085b5e22b14aef6a15f12f8ff9f

include/lttng/ringbuffer-context.h
include/lttng/ust-tracepoint-event.h
liblttng-ust-ctl/ustctl.c
liblttng-ust/lttng-ring-buffer-client.h
libringbuffer/frontend_api.h
libringbuffer/ring_buffer_frontend.c
libringbuffer/ringbuffer-config.h

index e42c3f52e128f3b591e5547e50d9ade32c4287e2..53670d9313b2973694767889c055d50f5be43209 100644 (file)
@@ -51,9 +51,9 @@ struct lttng_ust_lib_ring_buffer_ctx {
                                         * alignment of the largest element
                                         * in the payload
                                         */
-       int cpu;                        /* processor id */
 
        /* output from lib_ring_buffer_reserve() */
+       int reserve_cpu;                /* processor id updated by the reserve */
        struct lttng_ust_lib_ring_buffer *buf;  /*
                                         * buffer corresponding to processor id
                                         * for this channel
@@ -81,25 +81,24 @@ struct lttng_ust_lib_ring_buffer_ctx {
  * @priv: client private data
  * @data_size: size of record data payload
  * @largest_align: largest alignment within data payload types
- * @cpu: processor id
  */
 static inline lttng_ust_notrace
 void lib_ring_buffer_ctx_init(struct lttng_ust_lib_ring_buffer_ctx *ctx,
                              struct lttng_ust_lib_ring_buffer_channel *chan,
                              void *priv, size_t data_size, int largest_align,
-                             int cpu, struct lttng_ust_shm_handle *handle);
+                             struct lttng_ust_shm_handle *handle);
 static inline
 void lib_ring_buffer_ctx_init(struct lttng_ust_lib_ring_buffer_ctx *ctx,
                              struct lttng_ust_lib_ring_buffer_channel *chan,
                              void *priv, size_t data_size, int largest_align,
-                             int cpu, struct lttng_ust_shm_handle *handle)
+                             struct lttng_ust_shm_handle *handle)
 {
        ctx->struct_size = sizeof(struct lttng_ust_lib_ring_buffer_ctx);
        ctx->chan = chan;
        ctx->priv = priv;
        ctx->data_size = data_size;
+       ctx->reserve_cpu = -1;
        ctx->largest_align = largest_align;
-       ctx->cpu = cpu;
        ctx->rflags = 0;
        ctx->handle = handle;
        ctx->ip = 0;
index d12afa0b84ea35f936407ac380eab216fdee3b6d..971a83c887a9825f10c09ee5212898f58e4ac0a1 100644 (file)
@@ -853,7 +853,7 @@ void __event_probe__##_provider##___##_name(_TP_ARGS_DATA_PROTO(_args))           \
                __lttng_ctx.struct_size = sizeof(struct lttng_ust_stack_ctx);     \
                __lttng_ctx.event_recorder = __event_recorder;                \
                lib_ring_buffer_ctx_init(&__ctx, __chan->chan, &__lttng_ctx, __event_len, \
-                                        __event_align, -1, __chan->handle); \
+                                        __event_align, __chan->handle);      \
                __ctx.ip = _TP_IP_PARAM(TP_IP_PARAM);                         \
                __ret = __chan->ops->event_reserve(&__ctx, __event_recorder->id); \
                if (__ret < 0)                                                \
index 89b566bdccaceade94a553f235a6f1d36f7f0b7a..394e14174f6985bd0798e77c0ef1a99ebc2770a0 100644 (file)
@@ -1354,7 +1354,7 @@ int ustctl_write_metadata_to_channel(
                                lttng_chan_buf->ops->priv->packet_avail_size(lttng_chan_buf->chan, lttng_chan_buf->handle),
                                len - pos);
                lib_ring_buffer_ctx_init(&ctx, lttng_chan_buf->chan, NULL, reserve_len,
-                                        sizeof(char), -1, lttng_chan_buf->handle);
+                                        sizeof(char), lttng_chan_buf->handle);
                /*
                 * We don't care about metadata buffer's records lost
                 * count, because we always retry here. Report error if
@@ -1401,7 +1401,7 @@ ssize_t ustctl_write_one_packet_to_channel(
                        lttng_chan_buf->ops->priv->packet_avail_size(lttng_chan_buf->chan, lttng_chan_buf->handle),
                        len);
        lib_ring_buffer_ctx_init(&ctx, lttng_chan_buf->chan, NULL, reserve_len,
-                       sizeof(char), -1, lttng_chan_buf->handle);
+                       sizeof(char), lttng_chan_buf->handle);
        ret = lttng_chan_buf->ops->event_reserve(&ctx, 0);
        if (ret != 0) {
                DBG("LTTng: event reservation failed");
index 8e14acf5fc3caebad3ad2ad54c0ed52001a979c2..4e2b86a2d6c7ccbef220ed50e0fc1d4730dff0fb 100644 (file)
@@ -699,7 +699,7 @@ int lttng_event_reserve(struct lttng_ust_lib_ring_buffer_ctx *ctx,
        struct lttng_ust_stack_ctx *lttng_ctx = ctx->priv;
        struct lttng_ust_event_recorder *event_recorder = lttng_ctx->event_recorder;
        struct lttng_client_ctx client_ctx;
-       int ret, cpu;
+       int ret;
 
        client_ctx.chan_ctx = lttng_ust_rcu_dereference(lttng_chan->priv->ctx);
        client_ctx.event_ctx = lttng_ust_rcu_dereference(event_recorder->priv->ctx);
@@ -709,10 +709,8 @@ int lttng_event_reserve(struct lttng_ust_lib_ring_buffer_ctx *ctx,
        ctx_get_struct_size(client_ctx.event_ctx, &client_ctx.event_context_len,
                        APP_CTX_ENABLED);
 
-       cpu = lib_ring_buffer_get_cpu(&client_config);
-       if (cpu < 0)
+       if (lib_ring_buffer_nesting_inc(&client_config) < 0)
                return -EPERM;
-       ctx->cpu = cpu;
 
        switch (lttng_chan->priv->header_type) {
        case 1: /* compact */
@@ -738,7 +736,7 @@ int lttng_event_reserve(struct lttng_ust_lib_ring_buffer_ctx *ctx,
        lttng_write_event_header(&client_config, ctx, &client_ctx, event_id);
        return 0;
 put:
-       lib_ring_buffer_put_cpu(&client_config);
+       lib_ring_buffer_nesting_dec(&client_config);
        return ret;
 }
 
@@ -746,7 +744,7 @@ static
 void lttng_event_commit(struct lttng_ust_lib_ring_buffer_ctx *ctx)
 {
        lib_ring_buffer_commit(&client_config, ctx);
-       lib_ring_buffer_put_cpu(&client_config);
+       lib_ring_buffer_nesting_dec(&client_config);
 }
 
 static
index fd601ce5ca93589636d6932cc435a3a1c8f1677b..96f79554b3764bb3cb40d140e34b5ebcebcc725e 100644 (file)
 #include "frontend.h"
 
 /**
- * lib_ring_buffer_get_cpu - Precedes ring buffer reserve/commit.
+ * lib_ring_buffer_nesting_inc - Ring buffer recursive use protection.
  *
- * Keeps a ring buffer nesting count as supplementary safety net to
- * ensure tracer client code will never trigger an endless recursion.
- * Returns the processor ID on success, -EPERM on failure (nesting count
- * too high).
+ * The rint buffer buffer nesting count is a safety net to ensure tracer
+ * client code will never trigger an endless recursion.
+ * Returns 0 on success, -EPERM on failure (nesting count too high).
  *
  * asm volatile and "memory" clobber prevent the compiler from moving
  * instructions out of the ring buffer nesting count. This is required to ensure
  * section.
  */
 static inline
-int lib_ring_buffer_get_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
+int lib_ring_buffer_nesting_inc(const struct lttng_ust_lib_ring_buffer_config *config)
 {
-       int cpu, nesting;
+       int nesting;
 
-       cpu = lttng_ust_get_cpu();
        nesting = ++URCU_TLS(lib_ring_buffer_nesting);
        cmm_barrier();
-
        if (caa_unlikely(nesting > 4)) {
                WARN_ON_ONCE(1);
                URCU_TLS(lib_ring_buffer_nesting)--;
                return -EPERM;
-       } else
-               return cpu;
+       }
+       return 0;
 }
 
-/**
- * lib_ring_buffer_put_cpu - Follows ring buffer reserve/commit.
- */
 static inline
-void lib_ring_buffer_put_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
+void lib_ring_buffer_nesting_dec(const struct lttng_ust_lib_ring_buffer_config *config)
 {
        cmm_barrier();
        URCU_TLS(lib_ring_buffer_nesting)--;            /* TLS */
@@ -148,10 +142,12 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
        if (caa_unlikely(uatomic_read(&chan->record_disabled)))
                return -EAGAIN;
 
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
-       else
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               ctx->reserve_cpu = lttng_ust_get_cpu();
+               buf = shmp(handle, chan->backend.buf[ctx->reserve_cpu].shmp);
+       } else {
                buf = shmp(handle, chan->backend.buf[0].shmp);
+       }
        if (caa_unlikely(!buf))
                return -EIO;
        if (caa_unlikely(uatomic_read(&buf->record_disabled)))
index 7b276ad935a3b89e4a10b1a41402f8976b80bd70..83da122b80cf8d5e50fa8d99827ddf58b73b45b6 100644 (file)
@@ -2321,7 +2321,7 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
        int ret;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
+               buf = shmp(handle, chan->backend.buf[ctx->reserve_cpu].shmp);
        else
                buf = shmp(handle, chan->backend.buf[0].shmp);
        if (!buf)
index 0d33ace253ead62e70923d2de28a6fdc629817ea..badf7566cf0329756c689a5ea9ce921c9a4fbf2b 100644 (file)
@@ -96,9 +96,7 @@ struct lttng_ust_lib_ring_buffer_client_cb {
  * alloc/sync pairs:
  *
  * RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_PER_CPU :
- *   Per-cpu buffers with per-cpu synchronization. Tracing must be performed
- *   with preemption disabled (lib_ring_buffer_get_cpu() and
- *   lib_ring_buffer_put_cpu()).
+ *   Per-cpu buffers with per-cpu synchronization.
  *
  * RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_GLOBAL :
  *   Per-cpu buffer with global synchronization. Tracing can be performed with
This page took 0.033316 seconds and 4 git commands to generate.