Fix: do not generate packet at destroy after stop
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 01:23:24 +0000 (21:23 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 17:14:38 +0000 (13:14 -0400)
In the following scenario:
- create, enable events (kernel),
- start
- ...
- stop (await for data_pending to complete)
- destroy
- rm the trace directory

We would expect that the "rm" operation would not conflict with the
consumer daemon trying to output data into the trace files, since the
"stop" operation ensured that there was no data_pending.

However, the "destroy" operation currently generates an extra packet
after the data_pending check. This causes the consumer daemon to try to
perform trace file rotation concurrently with the trace directory
removal in the scenario above, which triggers errors. The main reason
why this empty packet is generated by "destroy" is to deal with trace
start/stop scenario which would otherwise generate a completely empty
stream.

Therefore, introduce the concept of a "quiescent stream". It is
initialized at false on stream creation (first packet is empty). When
tracing is started, it is set to false (for cases of start/stop/start).
When tracing is stopped, if the stream is not quiescent, perform a
"final" flush (which will generate an empty packet if the current packet
was empty), and set quiescent to true.  On "destroy" stream: if the
stream is not quiescent, perform a "final" flush, and set the quiescent
state to true.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
lib/ringbuffer/frontend.h
lib/ringbuffer/frontend_types.h
lib/ringbuffer/ring_buffer_frontend.c
lttng-events.c

index 4955b3dd9accf320b76c836838e6ba0406c85ea4..6ff154528dd6e2b0d2c39e388a64193c0da7b135 100644 (file)
@@ -113,6 +113,9 @@ extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
                                      unsigned long consumed);
 extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
 
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan);
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan);
+
 /*
  * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
  * to read sub-buffers sequentially.
index ef63cb5005ab61604bf5da32c69171a6a120890a..346813e9c3ea30ad2eba8e4dc54608b7b974bd45 100644 (file)
@@ -152,7 +152,8 @@ struct lib_ring_buffer {
        unsigned long cons_snapshot;    /* Consumer count snapshot */
        unsigned int get_subbuf:1,      /* Sub-buffer being held by reader */
                switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */
-               read_timer_enabled:1;   /* Protected by ring_buffer_nohz_lock */
+               read_timer_enabled:1,   /* Protected by ring_buffer_nohz_lock */
+               quiescent:1;
 };
 
 static inline
index c92e9271fe978cc0abb940420d0e3cff268518f7..c48ac620668f97f3ea7f3ff1222c16f62a40add9 100644 (file)
@@ -92,6 +92,9 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
                                  struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+               enum switch_mode mode);
 
 /*
  * Must be called under cpu hotplug protection.
@@ -586,6 +589,63 @@ static void channel_unregister_notifiers(struct channel *chan)
        channel_backend_unregister_notifiers(&chan->backend);
 }
 
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+       if (!buf->quiescent) {
+               buf->quiescent = true;
+               _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+       }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+       buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+       int cpu;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                             cpu);
+
+                       lib_ring_buffer_set_quiescent(buf);
+               }
+               put_online_cpus();
+       } else {
+               struct lib_ring_buffer *buf = chan->backend.buf;
+
+               lib_ring_buffer_set_quiescent(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+       int cpu;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                             cpu);
+
+                       lib_ring_buffer_clear_quiescent(buf);
+               }
+               put_online_cpus();
+       } else {
+               struct lib_ring_buffer *buf = chan->backend.buf;
+
+               lib_ring_buffer_clear_quiescent(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
 static void channel_free(struct channel *chan)
 {
        if (chan->backend.release_priv_ops) {
@@ -746,7 +806,7 @@ void *channel_destroy(struct channel *chan)
                                                           chan->backend.priv,
                                                           cpu);
                        if (buf->backend.allocated)
-                               lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                               lib_ring_buffer_set_quiescent(buf);
                        /*
                         * Perform flush before writing to finalized.
                         */
@@ -760,7 +820,7 @@ void *channel_destroy(struct channel *chan)
                if (config->cb.buffer_finalize)
                        config->cb.buffer_finalize(buf, chan->backend.priv, -1);
                if (buf->backend.allocated)
-                       lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                       lib_ring_buffer_set_quiescent(buf);
                /*
                 * Perform flush before writing to finalized.
                 */
@@ -1550,7 +1610,8 @@ static void remote_switch(void *info)
        lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
 }
 
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+               enum switch_mode mode)
 {
        struct channel *chan = buf->backend.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
@@ -1560,7 +1621,7 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
         * With global synchronization we don't need to use the IPI scheme.
         */
        if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, mode);
                return;
        }
 
@@ -1579,10 +1640,15 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
                                 remote_switch, buf, 1);
        if (ret) {
                /* Remote CPU is offline, do it ourself. */
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, mode);
        }
        put_online_cpus();
 }
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+       _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
 
 /*
index 5307b722b237991935d75a93f61d04caac36e06b..f2a9a8a58498f1f1a833e8ec1b97e76093391982 100644 (file)
@@ -52,6 +52,8 @@
 #include <lttng-abi-old.h>
 #include <lttng-endian.h>
 #include <wrapper/vzalloc.h>
+#include <wrapper/ringbuffer/backend.h>
+#include <wrapper/ringbuffer/frontend.h>
 
 #define METADATA_CACHE_DEFAULT_SIZE 4096
 
@@ -241,6 +243,10 @@ int lttng_session_enable(struct lttng_session *session)
        /* We need to sync enablers with session before activation. */
        lttng_session_sync_enablers(session);
 
+       /* Clear each stream's quiescent state. */
+       list_for_each_entry(chan, &session->chan, list)
+               lib_ring_buffer_clear_quiescent_channel(chan->chan);
+
        ACCESS_ONCE(session->active) = 1;
        ACCESS_ONCE(session->been_active) = 1;
        ret = _lttng_session_metadata_statedump(session);
@@ -259,6 +265,7 @@ end:
 int lttng_session_disable(struct lttng_session *session)
 {
        int ret = 0;
+       struct lttng_channel *chan;
 
        mutex_lock(&sessions_mutex);
        if (!session->active) {
@@ -270,6 +277,10 @@ int lttng_session_disable(struct lttng_session *session)
        /* Set transient enabler state to "disabled" */
        session->tstate = 0;
        lttng_session_sync_enablers(session);
+
+       /* Set each stream's quiescent state. */
+       list_for_each_entry(chan, &session->chan, list)
+               lib_ring_buffer_set_quiescent_channel(chan->chan);
 end:
        mutex_unlock(&sessions_mutex);
        return ret;
This page took 0.033549 seconds and 4 git commands to generate.