Fix: consumerd: consumed size miscomputed during statistics sampling master
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 26 Jun 2024 20:09:57 +0000 (20:09 +0000)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 27 Jun 2024 14:57:26 +0000 (10:57 -0400)
Issue observed
==============

When specifying a size-based automatic rotation, sessions sometimes continuously
rotate after the first size-based rotation is triggered.

Steps to reproduce:

  $ ./lttng create the_test
  $ ./lttng enable-channel -u the_channel
  $ ./lttng enable-rotation --size=32M
  $ ./lttng enable-event -ua --channel the_channel
  $ ./lttng start
  # Produce more than 32MiB of tracing data and stop the application(s), then
  # observe a rotation occuring on every expiration of the channel's monitor timer.

Cause
=====

The consumer daemon samples statistics of the various streams it monitors on
every expiration of their channel's "monitor" timer. One of the statistic it
maintains is the channel's total consumed size.

Once the stats are collected, the consumer sends a channel statistics sample
message to the session daemon. One of the fields of this message is the amount
of data that was consumed for the channel since the last statistics sample was
sent. The session daemon relies on that information to aggregate the consumed
size a session based on the consumed sizes of its various channels.

The consumer maintains a per-channel 'last_consumed_size_sample_sent' which is
used to compute the amount of data consumed since the last statistics sample was
sent to the session daemon. The computation is erroneous since the count sent to
the session daemon is 'the channel's total consumed data amount' minus 'the data
consumed since the last statistics sample'.

Solution
========

The counter maintained on a per-channel basis is now
consumed_size_as_of_last_sample_sent: the total amount of data consumed for that
channel as of the last statistics sample. On every expiration of the monitor
timer, the total amount of data consumed for the channel is determined and the
difference from that counter is sent to the sessiond daemon. The counter is then
updated with the latest computed value.

Known drawbacks
===============

None.

Change-Id: I327fde946826ab1da0b6511a13132f3afba1ac16
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/common/consumer/consumer-timer.cpp
src/common/consumer/consumer.hpp
tests/regression/tools/live/test_per_application_leaks.py

index 2ae7f0f73d1b2fdaffe59ed6b21b3814b78e8c22..70094ef4f98e30c5da46e9c8b087e3bd5dfdd7da 100644 (file)
@@ -687,7 +687,8 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
 
        msg.highest = highest;
        msg.lowest = lowest;
-       msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent;
+       msg.consumed_since_last_sample =
+               total_consumed - channel->consumed_size_as_of_last_sample_sent;
 
        /*
         * Writes performed here are assumed to be atomic which is only
@@ -712,7 +713,7 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel
                    channel->key,
                    msg.highest,
                    msg.lowest);
-               channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample;
+               channel->consumed_size_as_of_last_sample_sent = total_consumed;
        }
 }
 
index 851b3e2b8137eaef4d5904b19deb81a3f9921bdf..cad2e2c5611139d814168807e73a0a3abad1fcc4 100644 (file)
@@ -258,7 +258,7 @@ struct lttng_consumer_channel {
        uint64_t lost_packets = 0;
 
        bool streams_sent_to_relayd = false;
-       uint64_t last_consumed_size_sample_sent = false;
+       uint64_t consumed_size_as_of_last_sample_sent = 0;
 };
 
 struct stream_subbuffer {
index 1d12049ed544ccaaca332d7e76d76ce222c8bd59..60267810c417ca0334abf2c0a01658ef80d3e9d5 100755 (executable)
@@ -14,6 +14,7 @@ import os
 import pathlib
 import subprocess
 import sys
+import time
 
 test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
 sys.path.append(str(test_utils_import_path))
@@ -22,7 +23,7 @@ import lttngtest
 
 
 def get_consumerd_pid(tap, parent, match_string):
-    pid = 0
+    pid = None
     try:
         process = subprocess.Popen(
             ["pgrep", "-P", str(parent), "-f", match_string],
@@ -30,13 +31,14 @@ def get_consumerd_pid(tap, parent, match_string):
         )
         process.wait()
         output = str(process.stdout.read(), encoding="UTF-8").splitlines()
-        if len(output) != 1:
+        if len(output) > 1:
             raise Exception(
                 "Unexpected number of output lines (got {}): {}".format(
                     len(output), output
                 )
             )
-        pid = int(output[0])
+        elif len(output) == 1:
+            pid = int(output[0])
     except Exception as e:
         tap.diagnostic(
             "Failed to find child process of '{}' matching '{}': '{}'".format(
@@ -48,19 +50,23 @@ def get_consumerd_pid(tap, parent, match_string):
 
 def count_process_dev_shm_fds(pid):
     count = 0
-    if pid == 0:
+    if pid is None:
         return count
     dir = os.path.join("/proc", str(pid), "fd")
     for root, dirs, files in os.walk(dir):
         for f in files:
             filename = pathlib.Path(os.path.join(root, f))
             try:
+                # The symlink in /proc/PID may exist, but point to an unlinked
+                # file - shm_unlink is called but either the kernel hasn't yet
+                # finished the clean-up or the consumer hasn't called close()
+                # on the FD yet.
                 if filename.is_symlink() and str(filename.resolve()).startswith(
                     "/dev/shm/shm-ust-consumer"
                 ):
                     count += 1
             except FileNotFoundError:
-                # As we're walking /proc/XX/fd/, fds may be added or removed
+                # As /proc/XX/fd/ is being walked, fds may be added or removed
                 continue
     return count
 
@@ -112,7 +118,18 @@ def test_fd_leak(tap, test_env, buffer_sharing_policy, kill_relayd=True):
     session.stop()
     session.destroy()
 
-    count_post_destroy = count_dev_shm_fds(tap, test_env)
+    # As there is not method to know exactly when the final close of the
+    # shm happens (it is timing dependant from an external point of view),
+    # this test iterates waiting for the post-destroy count to reach the
+    # post-start count. In a failure, this will loop infinitely.
+    tap.diagnostic(
+        "Waiting for post-destroy shm count to drop back to post-start level"
+    )
+    while True:
+        count_post_destroy = count_dev_shm_fds(tap, test_env)
+        if count_post_destroy == count_post_start:
+            break
+        time.sleep(0.1)
 
     tap.diagnostic(
         "FD counts post-start: {}, post-destroy: {}".format(
This page took 0.02864 seconds and 4 git commands to generate.