Fix: do not generate packet at destroy after stop
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 01:23:24 +0000 (21:23 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 18 May 2016 19:37:11 +0000 (15:37 -0400)
In the following scenario:
- create, enable events (kernel),
- start
- ...
- stop (await for data_pending to complete)
- destroy
- rm the trace directory

We would expect that the "rm" operation would not conflict with the
consumer daemon trying to output data into the trace files, since the
"stop" operation ensured that there was no data_pending.

However, the "destroy" operation currently generates an extra packet
after the data_pending check. This causes the consumer daemon to try to
perform trace file rotation concurrently with the trace directory
removal in the scenario above, which triggers errors. The main reason
why this empty packet is generated by "destroy" is to deal with trace
start/stop scenario which would otherwise generate a completely empty
stream.

Therefore, introduce the concept of a "quiescent stream". It is
initialized at false on stream creation (first packet is empty). When
tracing is started, it is set to false (for cases of start/stop/start).
When tracing is stopped, if the stream is not quiescent, perform a
"final" flush (which will generate an empty packet if the current packet
was empty), and set quiescent to true.  On "destroy" stream: if the
stream is not quiescent, perform a "final" flush, and set the quiescent
state to true.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
lib/ringbuffer/frontend.h
lib/ringbuffer/frontend_types.h
lib/ringbuffer/ring_buffer_frontend.c
lttng-events.c

index 4473e86f37592fdce2c286373ee0859342cbafaa..cf75dfe825e6a5fe7cc06772a408501169c4d049 100644 (file)
@@ -113,6 +113,9 @@ extern int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
                                      unsigned long consumed);
 extern void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf);
 
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan);
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan);
+
 /*
  * lib_ring_buffer_get_next_subbuf/lib_ring_buffer_put_next_subbuf are helpers
  * to read sub-buffers sequentially.
index 9fc3a30a90319ffa89a20c3b243160fb830372a8..453ec38c75d8387979e830ed37e8aa5d2151f29d 100644 (file)
@@ -152,7 +152,8 @@ struct lib_ring_buffer {
        unsigned long cons_snapshot;    /* Consumer count snapshot */
        unsigned int get_subbuf:1,      /* Sub-buffer being held by reader */
                switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */
-               read_timer_enabled:1;   /* Protected by ring_buffer_nohz_lock */
+               read_timer_enabled:1,   /* Protected by ring_buffer_nohz_lock */
+               quiescent:1;
 };
 
 static inline
index dbe52c157c71be0ec560bbc6b97dc77d1b1e27ea..8b947be88e2e3df479a948a6730854bdc848fd00 100644 (file)
@@ -92,6 +92,9 @@ EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting);
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
                                  struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+               enum switch_mode mode);
 
 /*
  * Must be called under cpu hotplug protection.
@@ -586,6 +589,63 @@ static void channel_unregister_notifiers(struct channel *chan)
        channel_backend_unregister_notifiers(&chan->backend);
 }
 
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+       if (!buf->quiescent) {
+               buf->quiescent = true;
+               _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+       }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+       buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+       int cpu;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                             cpu);
+
+                       lib_ring_buffer_set_quiescent(buf);
+               }
+               put_online_cpus();
+       } else {
+               struct lib_ring_buffer *buf = chan->backend.buf;
+
+               lib_ring_buffer_set_quiescent(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+       int cpu;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+                                                             cpu);
+
+                       lib_ring_buffer_clear_quiescent(buf);
+               }
+               put_online_cpus();
+       } else {
+               struct lib_ring_buffer *buf = chan->backend.buf;
+
+               lib_ring_buffer_clear_quiescent(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
 static void channel_free(struct channel *chan)
 {
        if (chan->backend.release_priv_ops) {
@@ -746,7 +806,7 @@ void *channel_destroy(struct channel *chan)
                                                           chan->backend.priv,
                                                           cpu);
                        if (buf->backend.allocated)
-                               lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                               lib_ring_buffer_set_quiescent(buf);
                        /*
                         * Perform flush before writing to finalized.
                         */
@@ -760,7 +820,7 @@ void *channel_destroy(struct channel *chan)
                if (config->cb.buffer_finalize)
                        config->cb.buffer_finalize(buf, chan->backend.priv, -1);
                if (buf->backend.allocated)
-                       lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+                       lib_ring_buffer_set_quiescent(buf);
                /*
                 * Perform flush before writing to finalized.
                 */
@@ -1550,7 +1610,8 @@ static void remote_switch(void *info)
        lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
 }
 
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+               enum switch_mode mode)
 {
        struct channel *chan = buf->backend.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
@@ -1560,7 +1621,7 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
         * With global synchronization we don't need to use the IPI scheme.
         */
        if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, mode);
                return;
        }
 
@@ -1579,10 +1640,15 @@ void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
                                 remote_switch, buf, 1);
        if (ret) {
                /* Remote CPU is offline, do it ourself. */
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               lib_ring_buffer_switch_slow(buf, mode);
        }
        put_online_cpus();
 }
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+       _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
 
 /*
index f9a19190ffc03be47ab8f8196a49f180c1b8dbc0..1aff9ec75c579dbf6a1a16e0929d2c01c8c404bc 100644 (file)
@@ -52,6 +52,8 @@
 #include "lttng-abi-old.h"
 #include "lttng-endian.h"
 #include "wrapper/vzalloc.h"
+#include "wrapper/ringbuffer/backend.h"
+#include "wrapper/ringbuffer/frontend.h"
 
 #define METADATA_CACHE_DEFAULT_SIZE 4096
 
@@ -237,6 +239,10 @@ int lttng_session_enable(struct lttng_session *session)
        /* We need to sync enablers with session before activation. */
        lttng_session_sync_enablers(session);
 
+       /* Clear each stream's quiescent state. */
+       list_for_each_entry(chan, &session->chan, list)
+               lib_ring_buffer_clear_quiescent_channel(chan->chan);
+
        ACCESS_ONCE(session->active) = 1;
        ACCESS_ONCE(session->been_active) = 1;
        ret = _lttng_session_metadata_statedump(session);
@@ -255,6 +261,7 @@ end:
 int lttng_session_disable(struct lttng_session *session)
 {
        int ret = 0;
+       struct lttng_channel *chan;
 
        mutex_lock(&sessions_mutex);
        if (!session->active) {
@@ -266,6 +273,10 @@ int lttng_session_disable(struct lttng_session *session)
        /* Set transient enabler state to "disabled" */
        session->tstate = 0;
        lttng_session_sync_enablers(session);
+
+       /* Set each stream's quiescent state. */
+       list_for_each_entry(chan, &session->chan, list)
+               lib_ring_buffer_set_quiescent_channel(chan->chan);
 end:
        mutex_unlock(&sessions_mutex);
        return ret;
This page took 0.030611 seconds and 4 git commands to generate.