From: Jérémie Galarneau Date: Wed, 26 Jun 2024 20:09:57 +0000 (+0000) Subject: Fix: consumerd: consumed size miscomputed during statistics sampling X-Git-Url: http://git.lttng.org./?a=commitdiff_plain;h=refs%2Fheads%2Fmaster;hp=dcffe9462d11f9de5b441b801da5b2b7ae42c79a;p=lttng-tools.git Fix: consumerd: consumed size miscomputed during statistics sampling Issue observed ============== When specifying a size-based automatic rotation, sessions sometimes continuously rotate after the first size-based rotation is triggered. Steps to reproduce: $ ./lttng create the_test $ ./lttng enable-channel -u the_channel $ ./lttng enable-rotation --size=32M $ ./lttng enable-event -ua --channel the_channel $ ./lttng start # Produce more than 32MiB of tracing data and stop the application(s), then # observe a rotation occuring on every expiration of the channel's monitor timer. Cause ===== The consumer daemon samples statistics of the various streams it monitors on every expiration of their channel's "monitor" timer. One of the statistic it maintains is the channel's total consumed size. Once the stats are collected, the consumer sends a channel statistics sample message to the session daemon. One of the fields of this message is the amount of data that was consumed for the channel since the last statistics sample was sent. The session daemon relies on that information to aggregate the consumed size a session based on the consumed sizes of its various channels. The consumer maintains a per-channel 'last_consumed_size_sample_sent' which is used to compute the amount of data consumed since the last statistics sample was sent to the session daemon. The computation is erroneous since the count sent to the session daemon is 'the channel's total consumed data amount' minus 'the data consumed since the last statistics sample'. Solution ======== The counter maintained on a per-channel basis is now consumed_size_as_of_last_sample_sent: the total amount of data consumed for that channel as of the last statistics sample. On every expiration of the monitor timer, the total amount of data consumed for the channel is determined and the difference from that counter is sent to the sessiond daemon. The counter is then updated with the latest computed value. Known drawbacks =============== None. Change-Id: I327fde946826ab1da0b6511a13132f3afba1ac16 Signed-off-by: Jérémie Galarneau --- diff --git a/src/common/consumer/consumer-timer.cpp b/src/common/consumer/consumer-timer.cpp index 2ae7f0f73..70094ef4f 100644 --- a/src/common/consumer/consumer-timer.cpp +++ b/src/common/consumer/consumer-timer.cpp @@ -687,7 +687,8 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel msg.highest = highest; msg.lowest = lowest; - msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent; + msg.consumed_since_last_sample = + total_consumed - channel->consumed_size_as_of_last_sample_sent; /* * Writes performed here are assumed to be atomic which is only @@ -712,7 +713,7 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel channel->key, msg.highest, msg.lowest); - channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample; + channel->consumed_size_as_of_last_sample_sent = total_consumed; } } diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index 851b3e2b8..cad2e2c56 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -258,7 +258,7 @@ struct lttng_consumer_channel { uint64_t lost_packets = 0; bool streams_sent_to_relayd = false; - uint64_t last_consumed_size_sample_sent = false; + uint64_t consumed_size_as_of_last_sample_sent = 0; }; struct stream_subbuffer { diff --git a/tests/regression/tools/live/test_per_application_leaks.py b/tests/regression/tools/live/test_per_application_leaks.py index 1d12049ed..60267810c 100755 --- a/tests/regression/tools/live/test_per_application_leaks.py +++ b/tests/regression/tools/live/test_per_application_leaks.py @@ -14,6 +14,7 @@ import os import pathlib import subprocess import sys +import time test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils" sys.path.append(str(test_utils_import_path)) @@ -22,7 +23,7 @@ import lttngtest def get_consumerd_pid(tap, parent, match_string): - pid = 0 + pid = None try: process = subprocess.Popen( ["pgrep", "-P", str(parent), "-f", match_string], @@ -30,13 +31,14 @@ def get_consumerd_pid(tap, parent, match_string): ) process.wait() output = str(process.stdout.read(), encoding="UTF-8").splitlines() - if len(output) != 1: + if len(output) > 1: raise Exception( "Unexpected number of output lines (got {}): {}".format( len(output), output ) ) - pid = int(output[0]) + elif len(output) == 1: + pid = int(output[0]) except Exception as e: tap.diagnostic( "Failed to find child process of '{}' matching '{}': '{}'".format( @@ -48,19 +50,23 @@ def get_consumerd_pid(tap, parent, match_string): def count_process_dev_shm_fds(pid): count = 0 - if pid == 0: + if pid is None: return count dir = os.path.join("/proc", str(pid), "fd") for root, dirs, files in os.walk(dir): for f in files: filename = pathlib.Path(os.path.join(root, f)) try: + # The symlink in /proc/PID may exist, but point to an unlinked + # file - shm_unlink is called but either the kernel hasn't yet + # finished the clean-up or the consumer hasn't called close() + # on the FD yet. if filename.is_symlink() and str(filename.resolve()).startswith( "/dev/shm/shm-ust-consumer" ): count += 1 except FileNotFoundError: - # As we're walking /proc/XX/fd/, fds may be added or removed + # As /proc/XX/fd/ is being walked, fds may be added or removed continue return count @@ -112,7 +118,18 @@ def test_fd_leak(tap, test_env, buffer_sharing_policy, kill_relayd=True): session.stop() session.destroy() - count_post_destroy = count_dev_shm_fds(tap, test_env) + # As there is not method to know exactly when the final close of the + # shm happens (it is timing dependant from an external point of view), + # this test iterates waiting for the post-destroy count to reach the + # post-start count. In a failure, this will loop infinitely. + tap.diagnostic( + "Waiting for post-destroy shm count to drop back to post-start level" + ) + while True: + count_post_destroy = count_dev_shm_fds(tap, test_env) + if count_post_destroy == count_post_start: + break + time.sleep(0.1) tap.diagnostic( "FD counts post-start: {}, post-destroy: {}".format(