diff --git a/doc/kernel_tests.rst b/doc/kernel_tests.rst index 9174c350f0..9efbd09da9 100644 --- a/doc/kernel_tests.rst +++ b/doc/kernel_tests.rst @@ -35,13 +35,6 @@ The following tests are available. They can be used as: * the individual classes/methods they are composed of can be used in custom scripts/jupyter notebooks (see ipynb/tests/synthetics_example.ipynb) -.. run-command:: - :capture-stderr: - - # Disable warnings to avoid dependencies to break the reStructuredText output - export PYTHONWARNINGS="ignore" - exekall run lisa lisa_tests --rst-list --inject-empty-target-conf - Running tests ============= @@ -178,17 +171,6 @@ It can be executed using: exekall run lisa lisa_tests.test_example --conf $LISA_CONF -.. exec:: - # Check that links inside 'test_example.py' are not broken. - from lisa._doc.helpers import check_dead_links - from lisa_tests import test_example - check_dead_links(test_example.__file__) - -.. literalinclude:: ../lisa_tests/test_example.py - :language: python - :pyobject: ExampleTestBundle - :linenos: - API === diff --git a/doc/workflows/automated_testing.rst b/doc/workflows/automated_testing.rst index ac46d52938..4bbd275c7f 100644 --- a/doc/workflows/automated_testing.rst +++ b/doc/workflows/automated_testing.rst @@ -38,6 +38,9 @@ specified by :class:`~lisa.target.TargetConf`. When pointed at folders (or packages), ``exekall`` will recursively look for Python files. +.. note:: ``lisa_tests`` package is now distributed separately from the + ``lisa`` package. + A subset of the tests can be selected using ``-s PATTERN``. The pattern is a globbing-style pattern, where ``*`` stands as a wildcard. If the pattern starts with an ``!``, no test matching that pattern will be selected. Use ``--list`` diff --git a/lisa_tests/README b/lisa_tests/README deleted file mode 100644 index 2a7e1a249a..0000000000 --- a/lisa_tests/README +++ /dev/null @@ -1,6 +0,0 @@ -This package is a namespace Package. - -Sub-packages at that level must identify the entity creating the test, such as -the company name. This ensures freedom of conflicts between all the users of -that namespace. - diff --git a/lisa_tests/arm/kernel/cpufreq/sanity.py b/lisa_tests/arm/kernel/cpufreq/sanity.py deleted file mode 100644 index 4de205def0..0000000000 --- a/lisa_tests/arm/kernel/cpufreq/sanity.py +++ /dev/null @@ -1,162 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2018, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import os - -from lisa.tests.base import DmesgTestBundle, ResultBundle, TestBundle -from lisa.wlgen.sysbench import Sysbench -from lisa.target import Target -from lisa.utils import ArtifactPath, groupby, nullcontext - - -class UserspaceSanityItem(TestBundle): - """ - Record the number of sysbench events on a given CPU at a given frequency. - """ - - def __init__(self, res_dir, plat_info, cpu, freq, work): - super().__init__(res_dir, plat_info) - - self.cpu = cpu - self.freq = freq - self.work = work - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath, cpu, freq, switch_governor=True, collector=None) -> 'UserspaceSanityItem': - """ - :meta public: - - Create a :class:`UserspaceSanityItem` from a live :class:`lisa.target.Target`. - - :param cpu: CPU to run on. - :type cpu: int - - :param freq: Frequency to run at. - :type freq: int - - :param switch_governor: Switch the governor to userspace, and undo it at the end. - If that has been done in advance, not doing it for every item saves substantial time. - :type switch_governor: bool - """ - - sysbench = Sysbench(target, res_dir=res_dir) - - cm = target.cpufreq.use_governor('userspace') if switch_governor else nullcontext() - with cm, collector: - target.cpufreq.set_frequency(cpu, freq) - output = sysbench(cpus=[cpu], max_duration_s=1).run() - - work = output.nr_events - return cls(res_dir, target.plat_info, cpu, freq, work) - - -class UserspaceSanity(DmesgTestBundle, TestBundle): - """ - A class for making sure the userspace governor behaves sanely - - :param sanity_items: A list of :class:`UserspaceSanityItem`. - :type sanity_items: list(UserspaceSanityItem) - """ - - DMESG_IGNORED_PATTERNS = [ - *DmesgTestBundle.DMESG_IGNORED_PATTERNS, - - # Since we use the performance governor, we will hit a warning when - # disabling schedutil - DmesgTestBundle.CANNED_DMESG_IGNORED_PATTERNS['EAS-schedutil'] - ] - - def __init__(self, res_dir, plat_info, sanity_items): - super().__init__(res_dir, plat_info) - - self.sanity_items = sanity_items - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, - freq_count_limit=5, collector=None) -> 'UserspaceSanity': - """ - Factory method to create a bundle using a live target - - :param freq_count_limit: The maximum amount of frequencies to test - :type freq_count_limit: int - - This will run Sysbench at different frequencies using the userspace - governor - """ - sanity_items = [] - - plat_info = target.plat_info - with collector, target.cpufreq.use_governor("userspace"): - for domain in plat_info['freq-domains']: - cpu = domain[0] - freqs = plat_info['freqs'][cpu] - - if len(freqs) > freq_count_limit: - freqs = freqs[::len(freqs) // freq_count_limit + - (1 if len(freqs) % 2 else 0)] - - for freq in freqs: - item_res_dir = ArtifactPath.join(res_dir, f'CPU{cpu}@{freq}') - os.makedirs(item_res_dir) - item = UserspaceSanityItem.from_target( - target=target, - cpu=cpu, - freq=freq, - res_dir=item_res_dir, - # We already did that once and for all, so that we - # don't spend too much time endlessly switching back - # and forth between governors - switch_governor=False, - ) - sanity_items.append(item) - - return cls(res_dir, plat_info, sanity_items) - - def test_performance_sanity(self) -> ResultBundle: - """ - Assert that higher CPU frequency leads to more work done - """ - res = ResultBundle.from_bool(True) - - cpu_items = { - cpu: { - # We expect only one item per frequency - item.freq: item - for item in freq_items - } - for cpu, freq_items in groupby(self.sanity_items, key=lambda item: item.cpu) - } - - failed = [] - passed = True - for cpu, freq_items in cpu_items.items(): - sorted_items = sorted(freq_items.values(), key=lambda item: item.freq) - work = [item.work for item in sorted_items] - if work != sorted(work): - passed = False - failed.append(cpu) - - res = ResultBundle.from_bool(passed) - work_metric = { - cpu: {freq: item.work for freq, item in freq_items.items()} - for cpu, freq_items in cpu_items.items() - } - res.add_metric('CPUs work', work_metric) - res.add_metric('Failed CPUs', failed) - - return res - -# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/hotplug/__init__.py b/lisa_tests/arm/kernel/hotplug/__init__.py deleted file mode 100644 index 2c8d121f67..0000000000 --- a/lisa_tests/arm/kernel/hotplug/__init__.py +++ /dev/null @@ -1,485 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2021, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import abc -import sys -import random -import operator -import collections -import time -from time import sleep -from threading import Thread -from functools import partial -from itertools import chain - -import pandas as pd -from devlib.module.hotplug import HotplugModule -from devlib.exception import TargetStableError - -from lisa.datautils import df_merge -from lisa.tests.base import DmesgTestBundle, FtraceTestBundle, ResultBundle, TestBundle -from lisa.target import Target -from lisa.trace import requires_events -from lisa.utils import ArtifactPath - - -class CPUHPSequenceError(Exception): - pass - - -class HotplugDmesgTestBundle(DmesgTestBundle): - DMESG_IGNORED_PATTERNS = [ - *DmesgTestBundle.DMESG_IGNORED_PATTERNS, - 'irq|IRQ', - ] - - -class HotplugBase(HotplugDmesgTestBundle, TestBundle): - def __init__(self, res_dir, plat_info, target_alive, hotpluggable_cpus, live_cpus): - super().__init__(res_dir, plat_info) - self.target_alive = target_alive - self.hotpluggable_cpus = hotpluggable_cpus - self.live_cpus = live_cpus - - @classmethod - def _check_cpuhp_seq_consistency(cls, nr_operations, hotpluggable_cpus, - max_cpus_off, sequence): - """ - Check that a hotplug sequence given by :meth:`cpuhp_seq` - is consistent. Parameters are the same as for :meth:`cpuhp_seq`, - with the addition of: - - :param sequence: A hotplug sequence, consisting of a sequence of - 2-tuples (CPU and hot plug way) - :type sequence: Sequence - - """ - if len(sequence) != nr_operations: - raise CPUHPSequenceError(f'{nr_operations} operations requested, but got {len(sequence)}') - - # Assume als CPUs are plugged in at the beginning - state = collections.defaultdict(lambda: 1) - - for step, (cpu, plug_way) in enumerate(sequence): - if cpu not in hotpluggable_cpus: - raise CPUHPSequenceError('CPU {cpu} is plugged {way} but is not part of hotpluggable CPUs: {cpu_list}'.format( - cpu=cpu, - way='in' if plug_way else 'out', - cpu_list=str(hotpluggable_cpus), - )) - - # Forbid plugging OFF offlined CPUs and plugging IN online CPUs - if plug_way == state[cpu]: - raise CPUHPSequenceError('Cannot plug {way} a CPU that is already plugged {way}'.format( - way='in' if plug_way else 'out' - )) - - state[cpu] = plug_way - cpus_off = [cpu for cpu, state in state.items() if state == 0] - if len(cpus_off) > max_cpus_off: - raise CPUHPSequenceError(f'A maximum of {max_cpus_off} CPUs is allowed to be plugged out, but {len(cpus_off)} CPUs were plugged out at step {step}') - - for cpu, state in state.items(): - if state != 1: - raise CPUHPSequenceError(f'CPU {cpu} is plugged out but not plugged in at the end of the sequence') - - @classmethod - @abc.abstractmethod - def cpuhp_seq(cls, nr_operations, hotpluggable_cpus, max_cpus_off, random_gen): - """ - Yield a consistent random sequence of CPU hotplug operations - - :param nr_operations: Number of operations in the sequence - :param max_cpus_off: Max number of CPUs plugged-off - - :param random_gen: A random generator instance - :type random_gen: ``random.Random`` - - "Consistent" means that a CPU will be plugged-in only if it was - plugged-off before (and vice versa). Moreover the state of the CPUs - once the sequence has completed should the same as it was before. - """ - - @classmethod - def _cpuhp_func(cls, target, res_dir, sequence, sleep_min_ms, - sleep_max_ms, random_gen): - """ - Generate a script consisting of a random sequence of hotplugs operations - - Two consecutive hotplugs can be separated by a random sleep in the script. - """ - - def make_sleep(): - if sleep_max_ms: - return random_gen.randint(sleep_min_ms, sleep_max_ms) / 1000 - else: - return 0 - - sequence = [ - dict( - path=HotplugModule._cpu_path(target, cpu), - sleep=make_sleep(), - way=plug_way, - ) - for cpu, plug_way in sequence - ] - - # The main contributor to the execution time are sleeps, so set a - # timeout to 10 times the total sleep time. This should be enough to - # take into account sysfs writes too - timeout = 10 * sum(map(operator.itemgetter('sleep'), sequence)) - - # This function will be executed on the target directly to avoid the - # overhead of executing the calls one by one, which could mask - # concurrency issues in the kernel - @target.remote_func(timeout=timeout, as_root=True) - def do_hotplug(): - for desc in sequence: - with open(desc['path'], 'w') as f: - f.write(str(desc['way'])) - - sleep = desc['sleep'] - if sleep: - time.sleep(sleep) - - return do_hotplug - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, seed=None, - nr_operations=100, sleep_min_ms=10, sleep_max_ms=100, - max_cpus_off=sys.maxsize, collector=None) -> 'HotplugBase': - """ - :param seed: Seed of the RNG used to create the hotplug sequences - :type seed: int - - :param nr_operations: Number of operations in the sequence - :type nr_operations: int - - :param sleep_min_ms: Minimum sleep duration between hotplug operations - :type sleep_min_ms: int - - :param sleep_max_ms: Maximum sleep duration between hotplug operations - (0 would lead to no sleep) - :type sleep_max_ms: int - - :param max_cpus_off: Maximum number of CPUs hotplugged out at any given - moment - :type max_cpus_off: int - """ - - # Instantiate a generator so we can change the seed without any global - # effect - random_gen = random.Random() - random_gen.seed(seed) - - target.hotplug.online_all() - hotpluggable_cpus = target.hotplug.list_hotpluggable_cpus() - - sequence = list(cls.cpuhp_seq( - nr_operations, hotpluggable_cpus, max_cpus_off, random_gen)) - - cls._check_cpuhp_seq_consistency(nr_operations, hotpluggable_cpus, - max_cpus_off, sequence) - - do_hotplug = cls._cpuhp_func( - target, res_dir, sequence, sleep_min_ms, sleep_max_ms, random_gen) - - # We don't want a timeout but we do want to detect if/when the target - # stops responding. So handle the hotplug remote func in a separate - # thread and keep polling the target - thread = Thread(target=do_hotplug, daemon=True) - - with collector: - try: - thread.start() - while thread.is_alive(): - # We might have a thread hanging off in that case, but there is - # not much we can do since the remote func cannot really be - # canceled. Since it was spawned with a timeout, it will - # eventually die. - if not target.check_responsive(): - break - sleep(0.1) - finally: - target_alive = bool(target.check_responsive()) - target.hotplug.online_all() - - live_cpus = target.list_online_cpus() if target_alive else [] - return cls(res_dir, target.plat_info, target_alive, hotpluggable_cpus, live_cpus) - - def test_target_alive(self) -> ResultBundle: - """ - Test that the hotplugs didn't leave the target in an unusable state - """ - return ResultBundle.from_bool(self.target_alive) - - def test_cpus_alive(self) -> ResultBundle: - """ - Test that all CPUs came back online after the hotplug operations - """ - res = ResultBundle.from_bool(self.hotpluggable_cpus == self.live_cpus) - dead_cpus = sorted(set(self.hotpluggable_cpus) - set(self.live_cpus)) - res.add_metric("dead CPUs", dead_cpus) - res.add_metric("number of dead CPUs", len(dead_cpus)) - return res - - -class HotplugTorture(HotplugBase): - - @classmethod - def cpuhp_seq(cls, nr_operations, hotpluggable_cpus, max_cpus_off, random_gen): - """ - FIXME: is that actually still true ? - The actual length of the sequence might differ from the requested one - by 1 because it's easier to implement and it shouldn't be an issue for - most test cases. - """ - - cur_on_cpus = hotpluggable_cpus[:] - cur_off_cpus = [] - i = 0 - while i < nr_operations - len(cur_off_cpus): - if not (1 < len(cur_on_cpus) < max_cpus_off): - # Force plug IN when only 1 CPU is on or too many are off - plug_way = 1 - elif not cur_off_cpus: - # Force plug OFF if all CPUs are on - plug_way = 0 # Plug OFF - else: - plug_way = random_gen.randint(0, 1) - - src = cur_off_cpus if plug_way else cur_on_cpus - dst = cur_on_cpus if plug_way else cur_off_cpus - cpu = random_gen.choice(src) - src.remove(cpu) - dst.append(cpu) - i += 1 - yield cpu, plug_way - - # Re-plug offline cpus to come back to original state - for cpu in cur_off_cpus: - yield cpu, 1 - - -class HotplugRollback(HotplugDmesgTestBundle, FtraceTestBundle, TestBundle): - - @classmethod - def _online(cls, target, cpu, online, verify=True): - try: - if online: - target.hotplug.online(cpu) - else: - target.hotplug.offline(cpu) - except TargetStableError as e: - if verify: - raise e - - @classmethod - def _reset_fail(cls, target, cpu): - target.hotplug.fail(cpu, -1) - - @classmethod - def _state_can_fail(cls, target, cpu, state, up): - """ - There are no way of probing the kernel for a list of hotplug states - that can fail and for which we can test the rollback. We need therefore - to try: - - If we can set the state in the kernel 'fail' interface. - - If the hotplug is reset actually failing (some states can fail only - when going up or down) - """ - try: - target.hotplug.fail(cpu, state) - except TargetStableError: - return False - - try: - cls._online(target, cpu, up) - cls._reset_fail(target, cpu) - cls._online(target, cpu, not up) - #If we can go up/down without a failure, that's because this state - #doesn't have a up/down callback and can't fail. - return False - except TargetStableError: - return True - - @classmethod - def _prepare_hotplug(cls, target, cpu, up): - cls._reset_fail(target, cpu) - cls._online(target, cpu, not up) - - @classmethod - def _get_states(cls, target, cpu, up): - states = target.hotplug.get_states() - cls._prepare_hotplug(target, cpu, not up) - return [ - state - for state in states - if cls._state_can_fail(target, cpu, state, up) - ] - - @classmethod - def _mark_trace(cls, target, collector, start=True, - expected=False, up=False, failing_state=0): - """ - Convert start, expected and up to int for a lighter trace - """ - target.write_value( - collector['ftrace'].marker_file, - "hotplug_rollback: test={} expected={} up={} failing_state={}".format( - int(start), int(expected), int(up), failing_state), - verify=False - ) - - @classmethod - def _test_rollback(cls, target, collector, cpu, failing_state, up): - cls._prepare_hotplug(target, cpu, up=up) - target.hotplug.fail(cpu, failing_state) - cls._mark_trace(target, collector, up=up, - failing_state=failing_state) - cls._online(target, cpu, online=up, verify=False) - cls._mark_trace(target, collector, start=False) - - @classmethod - def _do_from_target(cls, target, res_dir, collector, cpu): - # Get the list of each state that can fail - states_down = cls._get_states(target, cpu, up=False) - states_up = cls._get_states(target, cpu, up=True) - - cls._prepare_hotplug(target, cpu, up=False) - with collector: - # Get the expected list of states for a complete Hotplug - cls._mark_trace(target, collector, expected=True, up=False) - cls._online(target, cpu, online=False) - cls._mark_trace(target, collector, expected=True, up=True) - cls._online(target, cpu, online=True) - cls._mark_trace(target, collector, start=False) - - # Test hotunplug rollback for each possible state failure - for failing_state in states_down: - cls._test_rollback(target, collector, cpu=cpu, - failing_state=failing_state, up=False) - - # Test hotplug rollback for each possible state failure - for failing_state in states_up: - cls._test_rollback(target, collector, cpu=cpu, - failing_state=failing_state, up=True) - - # TODO: trace-cmd is relying on _SC_NPROCESSORS_CONF to know how - # many CPUs are present in the system and what to flush from the - # ftrace buffer to the trace.dat file. The problem is that the Musl - # libc that we use to build trace-cmd in LISA is returning, for - # _SC_NPROCESSORS_CONF, the number of CPUs _online_. We then need, - # until this problem is fixed to set the CPU back online before - # collecting the trace, or some data would be missing. - cls._online(target, cpu, online=True) - - return cls(res_dir, target.plat_info) - - @classmethod - def _from_target(cls, target, *, - res_dir: ArtifactPath = None, collector=None) -> 'HotplugRollback': - cpu = min(target.hotplug.list_hotpluggable_cpus()) - cls._online(target, cpu, online=True) - - try: - return cls._do_from_target(target, res_dir, collector, cpu) - finally: - cls._reset_fail(target, cpu) - cls._online(target, cpu, online=True) - - @classmethod - def check_from_target(cls, target): - super().check_from_target(target) - try: - cls._reset_fail(target, 0) - except TargetStableError: - ResultBundle.raise_skip( - "Target can't reset the hotplug fail interface") - - @classmethod - def _get_expected_states(cls, df, up): - df = df[(df['expected']) & (df['up'] == up)] - - return df['idx'].dropna() - - @requires_events('userspace@hotplug_rollback', 'cpuhp_enter') - def test_hotplug_rollback(self) -> ResultBundle: - """ - Test that the hotplug can rollback to its previous state after a - failure. All possible steps, up/down combinations will be tested. For - each combination, also verify that the hotplug is going through all the - steps it is supposed to. - """ - df = df_merge([ - self.trace.df_event('userspace@hotplug_rollback'), - self.trace.df_event('cpuhp_enter') - ]) - - # Keep only the states delimited by _mark_trace() - df['test'].ffill(inplace=True) - df = df[df['test'] == 1] - df.drop(columns='test', inplace=True) - - df['up'].ffill(inplace=True) - df['up'] = df['up'].astype(bool) - - # Read the expected states from full hot(un)plug - df['expected'].ffill(inplace=True) - df['expected'] = df['expected'].astype(bool) - expected_down = self._get_expected_states(df, up=False) - expected_up = self._get_expected_states(df, up=True) - df = df[~df['expected']] - df.drop(columns='expected', inplace=True) - - def _get_expected_rollback(up, failing_state): - return list( - filter( - partial( - operator.gt if up else operator.lt, - failing_state, - ), - chain(expected_up, expected_down) if up else - chain(expected_down, expected_up) - ) - ) - - def _verify_rollback(df): - failing_state = df['failing_state'].iloc[0] - up = df['up'].iloc[0] - expected = _get_expected_rollback(up, failing_state) - - return pd.DataFrame(data={ - 'failing_state': df['failing_state'], - 'up': up, - 'result': df['idx'].tolist() == expected - }) - - df['failing_state'].ffill(inplace=True) - df.dropna(inplace=True) - df = df.groupby( - ['up', 'failing_state'], - observed=True, - group_keys=False, - ).apply(_verify_rollback) - df.drop_duplicates(inplace=True) - - res = ResultBundle.from_bool(df['result'].all()) - res.add_metric('Failed rollback states', - df[~df['result']]['failing_state'].tolist()) - - return res diff --git a/lisa_tests/arm/kernel/scheduler/__init__.py b/lisa_tests/arm/kernel/scheduler/__init__.py deleted file mode 100644 index b5edfb655b..0000000000 --- a/lisa_tests/arm/kernel/scheduler/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/scheduler/eas_behaviour.py b/lisa_tests/arm/kernel/scheduler/eas_behaviour.py deleted file mode 100644 index ba99f779ef..0000000000 --- a/lisa_tests/arm/kernel/scheduler/eas_behaviour.py +++ /dev/null @@ -1,888 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2016, ARM Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import abc -from math import isnan - -import pandas as pd -import holoviews as hv - -from itertools import chain - -from lisa.wlgen.rta import RTAPhase, PeriodicWload, DutyCycleSweepPhase -from lisa.analysis.rta import RTAEventsAnalysis -from lisa.analysis.tasks import TasksAnalysis -from lisa.tests.base import ResultBundle, TestBundle, RTATestBundle, TestConfBase -from lisa.utils import ArtifactPath, memoized -from lisa.datautils import series_integrate, df_deduplicate -from lisa.energy_model import EnergyModel, EnergyModelCapacityError -from lisa.target import Target -from lisa.pelt import PELT_SCALE, pelt_swing -from lisa.datautils import df_refit_index -from lisa.notebook import plot_signal -from lisa.conf import ( - KeyDesc, TopLevelKeyDesc, -) - - -class EASBehaviourTestConf(TestConfBase): - """ - Configuration class for :meth:`lisa_tests.arm.kernel.scheduler.eas_behaviour.EASBehaviour.get_big_duty_cycle`. - - {generated_help} - {yaml_example} - """ - - STRUCTURE = TopLevelKeyDesc('eas-behaviour', 'EAS-behaviour test configuration', ( - KeyDesc('big-task-duty-cycle', 'Duty cycle of the big tasks for the eas-behaviour tests.', [int]), - )) - - -class EASBehaviour(RTATestBundle, TestBundle): - """ - Abstract class for EAS behavioural testing. - - :param nrg_model: The energy model of the platform the synthetic workload - was run on - :type nrg_model: EnergyModel - - This class provides :meth:`test_task_placement` to validate the basic - behaviour of EAS. The implementations of this class have been developed to - verify patches supporting Arm's big.LITTLE in the Linux scheduler. You can - see these test results being published - `here `_. - """ - - @property - def nrg_model(self): - return self.plat_info['nrg-model'] - - @classmethod - def get_pelt_swing(cls, pct): - return pelt_swing( - period=cls.TASK_PERIOD, - duty_cycle=pct / 100, - kind='above', - ) / PELT_SCALE * 100 - - @classmethod - def get_big_duty_cycle(cls, plat_info, big_task_duty_cycle=None): - """ - Returns a duty cycle for :class:`lisa.wlgen.rta.PeriodicWload` that - will guarantee placement on a big CPU. - - The duty cycle will be chosen so that the task will not fit on the - second to biggest CPUs in the system, thereby forcing up-migration - while minimizing the thermal impact. - """ - # big_task_duty_cycle is set when the platform requires a specific - # value for the big task duty cycle. - if big_task_duty_cycle is None: - capa_classes = plat_info['capacity-classes'] - max_class = len(capa_classes) - 1 - - def get_class_util(class_, pct): - cpus = capa_classes[class_] - return cls.unscaled_utilization(plat_info, cpus[0], pct) - - class_ = -2 - - # Resolve to an positive index - class_ %= (max_class + 1) - - capacity_margin_pct = 20 - util = get_class_util(class_, 100) - - if class_ < max_class: - higher_class_capa = get_class_util(class_ + 1, (100 - capacity_margin_pct)) - # If the CPU class and util we picked is too close to the capacity - # of the next bigger CPU, we need to take a smaller util - if (util + cls.get_pelt_swing(util)) >= higher_class_capa: - # Take a 5% margin for rounding errors - util = 0.95 * higher_class_capa - return ( - util - - # And take extra margin to take into account the swing of - # the PELT value around the average - cls.get_pelt_swing(util) - ) - else: - return util - else: - return util - else: - return big_task_duty_cycle - - @classmethod - def get_little_cpu(cls, plat_info): - """ - Return a little CPU ID. - """ - littles = plat_info["capacity-classes"][0] - return littles[0] - - @classmethod - def get_little_duty_cycle(cls, plat_info): - """ - Returns a duty cycle for :class:`lisa.wlgen.rta.PeriodicWload` that - is guaranteed to fit on the little CPUs. - - The duty cycle is chosen to be ~50% of the capacity of the little CPU - and to generate a target frequency half-way between two frequencies of - that same CPU. This intends to avoid picking a value too close from an - OPP which could, for the same duty cycle use an upper OPP or not, depending - on the PELT hazard. - - The returned value is a duty cycle in percentage of the full PELT scale. - """ - cpu = cls.get_little_cpu(plat_info) - freqs = sorted(plat_info['freqs'][cpu]) - capa = plat_info['cpu-capacities']['rtapp'][cpu] - - max_freq = max(freqs) - target_freq = 0.5 * max_freq # 50% duty cycle - schedutil_factor = 1.25 - - # Return the PELT swing in pct for a given duty cycle in pct - def _get_pelt_swing_dc(dc): - return cls.get_pelt_swing(dc) * 100 / PELT_SCALE - - # Duty cycle for a given frequency band - def _get_dc(freq_band): - minf, maxf = freq_band - freq = ((maxf - minf) / 2) + minf - - # freq to dc in pct - dc = freq * 100 / max_freq * (capa / PELT_SCALE) - - # Ensure that the max value of util_avg will more or less make - # schedutil select the midpoint in the freq_band - dc -= _get_pelt_swing_dc(dc) - dc /= schedutil_factor - - # Check that the duty cycle we computed still fits in the selected - # frequency band - real_freq = (dc + _get_pelt_swing_dc(dc)) * schedutil_factor * \ - max_freq / 100 * PELT_SCALE / capa - - if minf < real_freq < maxf: - return dc - else: - raise ValueError(f'Could not find util fitting the frequency band {freq_band}') - - minf, maxf = min( - (freq, next_freq) - for freq, next_freq in zip(freqs, freqs[1:]) - if next_freq > target_freq - ) - - return _get_dc((minf, maxf)) - - @classmethod - def check_from_target(cls, target): - super().check_from_target(target) - kconfig = target.plat_info['kernel']['config'] - for option in ( - 'CONFIG_ENERGY_MODEL', - 'CONFIG_CPU_FREQ_GOV_SCHEDUTIL', - ): - if not kconfig.get(option): - ResultBundle.raise_skip(f"The target's kernel needs {option}=y kconfig enabled") - - for domain in target.plat_info['freq-domains']: - if "schedutil" not in target.cpufreq.list_governors(domain[0]): - ResultBundle.raise_skip( - f"Can't set schedutil governor for domain {domain}") - - if 'nrg-model' not in target.plat_info: - ResultBundle.raise_skip("Energy model not available") - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, collector=None, - big_task_duty_cycle: EASBehaviourTestConf.BigTaskDutyCycle = None) -> 'EASBehaviour': - """ - :meta public: - - Factory method to create a bundle using a live target - - This will execute the rt-app workload described in - :meth:`lisa.tests.base.RTATestBundle.get_rtapp_profile` - """ - plat_info = target.plat_info - profile_kwargs = dict(big_task_duty_cycle=big_task_duty_cycle) - - rtapp_profile = cls.get_rtapp_profile(plat_info, **profile_kwargs) - - # EAS doesn't make a lot of sense without schedutil, - # so make sure this is what's being used - with target.disable_idle_states(): - with target.cpufreq.use_governor("schedutil"): - cls.run_rtapp(target, res_dir, rtapp_profile, collector=collector) - - return cls(res_dir, plat_info, rtapp_profile_kwargs=profile_kwargs) - - @RTAEventsAnalysis.df_phases.used_events - def _get_expected_task_utils_df(self): - """ - Get a DataFrame with the *expected* utilization of each task over time. - - :param nrg_model: EnergyModel used to computed the expected utilization - :type nrg_model: EnergyModel - - :returns: A Pandas DataFrame with a column for each task, showing how - the utilization of that task varies over time - - .. note:: The timestamps to match the beginning and end of each rtapp - phase are taken from the trace. - """ - tasks_map = self.rtapp_tasks_map - rtapp_profile = self.rtapp_profile - - def task_util(task, wlgen_task): - task_list = tasks_map[task] - assert len(task_list) == 1 - task = task_list[0] - - df = self.trace.ana.rta.df_phases(task, wlgen_profile=rtapp_profile) - df = df[df['properties'].transform(lambda phase: phase['meta']['from_test'])] - - def get_phase_max_util(phase): - wload = phase['wload'] - # Take into account the duty cycle of the phase - avg = wload.unscaled_duty_cycle_pct( - plat_info=self.plat_info, - ) * PELT_SCALE / 100 - # Also take into account the period and the swing of PELT - # around its "average" - swing = pelt_swing( - period=wload.period, - duty_cycle=wload.duty_cycle_pct / 100, - kind='above', - ) - return avg + swing - - phases_util = { - phase.get('name'): get_phase_max_util(phase) - for phase in wlgen_task.phases - if phase['meta']['from_test'] - } - - expected_util = df['phase'].map(phases_util) - return task, expected_util - - cols = dict( - task_util(task, wlgen_task) - for task, wlgen_task in rtapp_profile.items() - ) - df = pd.DataFrame(cols) - df.ffill(inplace=True) - df.dropna(inplace=True) - - # Ensure the index is refitted so that integrals work as expected - df = df_refit_index(df, window=self.trace.window) - return df - - @TasksAnalysis.df_task_activation.used_events - def _get_task_cpu_df(self): - """ - Get a DataFrame mapping task names to the CPU they ran on - - Use the sched_switch trace event to find which CPU each task ran - on. Does not reflect idleness - tasks not running are shown as running - on the last CPU they woke on. - - :returns: A Pandas DataFrame with a column for each task, showing the - CPU that the task was "on" at each moment in time - """ - def task_cpu(task): - return task.comm, self.trace.ana.tasks.df_task_activation(task=task)['cpu'] - - df = pd.DataFrame(dict( - task_cpu(task_ids[0]) - for task, task_ids in self.rtapp_task_ids_map.items() - )) - df.ffill(inplace=True) - df.dropna(inplace=True) - df = df_deduplicate(df, consecutives=True, keep='first') - - # Ensure the index is refitted so that integrals work as expected - df = df_refit_index(df, window=self.trace.window) - return df - - def _sort_power_df_columns(self, df, nrg_model): - """ - Helper method to re-order the columns of a power DataFrame - - This has no significance for code, but when examining DataFrames by hand - they are easier to understand if the columns are in a logical order. - - :param nrg_model: EnergyModel used to get the CPU from - :type nrg_model: EnergyModel - """ - node_cpus = [node.cpus for node in nrg_model.root.iter_nodes()] - return pd.DataFrame(df, columns=[c for c in node_cpus if c in df]) - - def _plot_expected_util(self, util_df, nrg_model): - """ - Create a plot of the expected per-CPU utilization for the experiment - The plot is then output to the test results directory. - - :param experiment: The :class:Experiment to examine - :param util_df: A Pandas Dataframe with a column per CPU giving their - (expected) utilization at each timestamp. - - :param nrg_model: EnergyModel used to get the CPU from - :type nrg_model: EnergyModel - """ - def plot_cpu(cpu): - name = f'CPU{cpu} util' - series = util_df[cpu].copy(deep=False) - series.index.name = 'Time' - series.name = name - fig = plot_signal(series).options( - 'Curve', - ylabel='Utilization', - ) - - # The "y" dimension has the name of the series that we plotted - fig = fig.redim.range(**{name: (-10, 1034)}) - - times, utils = zip(*series.items()) - fig *= hv.Overlay( - [ - hv.VSpan(start, end).options( - alpha=0.1, - color='grey', - ) - for util, start, end in zip( - utils, - times, - times[1:], - ) - if not util - ] - ) - return fig - - cpus = sorted(nrg_model.cpus) - fig = hv.Layout( - list(map(plot_cpu, cpus)) - ).cols(1).options( - title='Per-CPU expected utilization', - ) - - self._save_debug_plot(fig, name='expected_placement') - return fig - - @_get_expected_task_utils_df.used_events - def _get_expected_power_df(self, nrg_model, capacity_margin_pct): - """ - Estimate *optimal* power usage over time - - Examine a trace and use :meth:get_optimal_placements and - :meth:EnergyModel.estimate_from_cpu_util to get a DataFrame showing the - estimated power usage over time under ideal EAS behaviour. - - :meth:get_optimal_placements returns several optimal placements. They - are usually equivalent, but can be drastically different in some cases. - Currently only one of those placements is used (the first in the list). - - :param nrg_model: EnergyModel used compute the optimal placement - :type nrg_model: EnergyModel - - :param capacity_margin_pct: - - :returns: A Pandas DataFrame with a column each node in the energy model - (keyed with a tuple of the CPUs contained by that node) and a - "power" column with the sum of other columns. Shows the - estimated *optimal* power over time. - """ - task_utils_df = self._get_expected_task_utils_df() - - data = [] - index = [] - - def exp_power(row): - task_utils = row.to_dict() - try: - expected_utils = nrg_model.get_optimal_placements(task_utils, capacity_margin_pct)[0] - except EnergyModelCapacityError: - ResultBundle.raise_skip( - 'The workload will result in overutilized status for all possible task placement, making it unsuitable to test EAS on this platform' - ) - power = nrg_model.estimate_from_cpu_util(expected_utils) - columns = list(power.keys()) - - # Assemble a dataframe to plot the expected utilization - data.append(expected_utils) - index.append(row.name) - - return pd.Series([power[c] for c in columns], index=columns) - - res_df = self._sort_power_df_columns( - task_utils_df.apply(exp_power, axis=1), nrg_model) - - self._plot_expected_util(pd.DataFrame(data, index=index), nrg_model) - - return res_df - - @_get_task_cpu_df.used_events - @_get_expected_task_utils_df.used_events - def _get_estimated_power_df(self, nrg_model): - """ - Considering only the task placement, estimate power usage over time - - Examine a trace and use :meth:EnergyModel.estimate_from_cpu_util to get - a DataFrame showing the estimated power usage over time. This assumes - perfect cpuidle and cpufreq behaviour. Only the CPU on which the tasks - are running is extracted from the trace, all other signals are guessed. - - :param nrg_model: EnergyModel used compute the optimal placement and - CPUs - :type nrg_model: EnergyModel - - :returns: A Pandas DataFrame with a column node in the energy model - (keyed with a tuple of the CPUs contained by that node) Shows - the estimated power over time. - """ - task_cpu_df = self._get_task_cpu_df() - task_utils_df = self._get_expected_task_utils_df() - tasks = self.rtapp_tasks - - # Create a combined DataFrame with the utilization of a task and the CPU - # it was running on at each moment. Looks like: - # utils cpus - # task_wmig0 task_wmig1 task_wmig0 task_wmig1 - # 2.375056 102.4 102.4 NaN NaN - # 2.375105 102.4 102.4 2.0 NaN - - df = pd.concat([task_utils_df, task_cpu_df], - axis=1, keys=['utils', 'cpus']) - df = df.sort_index().ffill().dropna() - - # Now make a DataFrame with the estimated power at each moment. - def est_power(row): - cpu_utils = [0 for cpu in nrg_model.cpus] - for task in tasks: - cpu = row['cpus'][task] - util = row['utils'][task] - if not isnan(cpu): - cpu_utils[int(cpu)] += util - power = nrg_model.estimate_from_cpu_util(cpu_utils) - columns = list(power.keys()) - return pd.Series([power[c] for c in columns], index=columns) - - return self._sort_power_df_columns(df.apply(est_power, axis=1), nrg_model) - - @_get_expected_power_df.used_events - @_get_estimated_power_df.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - # Memoize so that the result is shared with _check_valid_placement() - @memoized - def test_task_placement(self, energy_est_threshold_pct=5, - nrg_model: EnergyModel = None, capacity_margin_pct=20) -> ResultBundle: - """ - Test that task placement was energy-efficient - - :param nrg_model: Allow using an alternate EnergyModel instead of - ``nrg_model``` - :type nrg_model: EnergyModel - - :param energy_est_threshold_pct: Allowed margin for estimated vs - optimal task placement energy cost - :type energy_est_threshold_pct: int - - Compute optimal energy consumption (energy-optimal task placement) - and compare to energy consumption estimated from the trace. - Check that the estimated energy does not exceed the optimal energy by - more than ``energy_est_threshold_pct``` percents. - """ - nrg_model = nrg_model or self.nrg_model - - exp_power = self._get_expected_power_df(nrg_model, capacity_margin_pct) - est_power = self._get_estimated_power_df(nrg_model) - - exp_energy = series_integrate(exp_power.sum(axis=1), method='rect') - est_energy = series_integrate(est_power.sum(axis=1), method='rect') - - msg = f'Estimated {est_energy} bogo-Joules to run workload, expected {exp_energy}' - threshold = exp_energy * (1 + (energy_est_threshold_pct / 100)) - - passed = est_energy < threshold - res = ResultBundle.from_bool(passed) - res.add_metric("estimated energy", est_energy, 'bogo-joules') - res.add_metric("energy threshold", threshold, 'bogo-joules') - - return res - - def _check_valid_placement(self): - """ - Check that a valid placement can be found for the tasks. - - If no placement can be found, :meth:`test_task_placement` will raise - an :class:`ResultBundle`. - """ - self.test_task_placement() - - @RTAEventsAnalysis.df_rtapp_stats.used_events - def test_slack(self, negative_slack_allowed_pct=15) -> ResultBundle: - """ - Assert that the RTApp workload was given enough performance - - :param negative_slack_allowed_pct: Allowed percentage of RT-app task - activations with negative slack. - :type negative_slack_allowed_pct: int - - Use :class:`lisa.analysis.rta.RTAEventsAnalysis` to find instances - where the RT-App workload wasn't able to complete its activations (i.e. - its reported "slack" was negative). Assert that this happened less than - ``negative_slack_allowed_pct`` percent of the time. - """ - self._check_valid_placement() - - passed = True - bad_activations = {} - test_tasks = list(chain.from_iterable(self.rtapp_tasks_map.values())) - for task in test_tasks: - slack = self.trace.ana.rta.df_rtapp_stats(task)["slack"] - - bad_activations_pct = len(slack[slack < 0]) * 100 / len(slack) - if bad_activations_pct > negative_slack_allowed_pct: - passed = False - - bad_activations[task] = bad_activations_pct - - res = ResultBundle.from_bool(passed) - - for task, bad_activations_pct in bad_activations.items(): - res.add_metric( - f"{task} delayed activations", - bad_activations_pct, '%' - ) - return res - - -class EASBehaviourNoEWMA(EASBehaviour): - """ - Abstract class for EAS behavioural testing, with mitigation for the - util_est.ewma influence - - This class provides :meth:`_get_rtapp_profile` which prepend a custom - RTAPhase buffer to the rtapp profile. This buffer is composed of a dozen - of very short activation. It intends to reset util_est.ewma before starting - the test. util_est.ewma is computed for the CFS policy on the utilization - ramp down. It holds the utilization value and prevents convergence to a - value matching the duty cycle set in the rt-app profile. - """ - - _BUFFER_PHASE_DURATION_S = 0 # Bypass add_buffer() default RTAPhase buffer - - @abc.abstractmethod - def _do_get_rtapp_profile(cls, plat_info, **kwargs): - """ - :meta public: - - Abstract method used by children class to provide the rt-app profile - for the test to run. - """ - pass - - @classmethod - def _get_rtapp_profile(cls, plat_info, **kwargs): - """ - :meta public: - - Prepends a :class:`lisa.wlgen.rta.RTAPhase` buffer to the children - class rt-app profile :meth:`_do_get_rtapp_profile`. This buffer intends - to mitigate the util_est.ewma influence. - """ - profile = cls._do_get_rtapp_profile(plat_info, **kwargs) - - return { - task: RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=0.01, - duration=0.1, - period=cls.TASK_PERIOD - ), - prop_meta={'from_test': False} - ) + phase - for task, phase in profile.items() - } - - -class OneSmallTask(EASBehaviourNoEWMA): - """ - A single 'small' task - """ - - task_name = "small" - - @classmethod - def _do_get_rtapp_profile(cls, plat_info, **kwargs): - return { - cls.task_name: RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=cls.get_little_duty_cycle(plat_info), - duration=1, - period=cls.TASK_PERIOD, - ) - ) - } - - -class ThreeSmallTasks(EASBehaviourNoEWMA): - """ - Three 'small' tasks - """ - task_prefix = "small" - - @EASBehaviour.test_task_placement.used_events - def test_task_placement(self, energy_est_threshold_pct=20, nrg_model: EnergyModel = None, - noise_threshold_pct=1, noise_threshold_ms=None, - capacity_margin_pct=20) -> ResultBundle: - """ - Same as :meth:`EASBehaviour.test_task_placement` but with a higher - default threshold - - The energy estimation for this test is probably not very accurate and this - isn't a very realistic workload. It doesn't really matter if we pick an - "ideal" task placement for this workload, we just want to avoid using big - CPUs in a big.LITTLE system. So use a larger energy threshold that - hopefully prevents too much use of big CPUs but otherwise is flexible in - allocation of LITTLEs. - """ - return super().test_task_placement( - energy_est_threshold_pct, nrg_model, - noise_threshold_pct=noise_threshold_pct, - noise_threshold_ms=noise_threshold_ms, - capacity_margin_pct=capacity_margin_pct) - - @classmethod - def _do_get_rtapp_profile(cls, plat_info, **kwargs): - return { - f"{cls.task_prefix}_{i}": RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=cls.get_little_duty_cycle(plat_info), - duration=1, - period=cls.TASK_PERIOD, - ) - ) - for i in range(3) - } - - -class TwoBigTasks(EASBehaviourNoEWMA): - """ - Two 'big' tasks - """ - - task_prefix = "big" - - @classmethod - def _do_get_rtapp_profile(cls, plat_info, big_task_duty_cycle=None): - duty = cls.get_big_duty_cycle(plat_info, big_task_duty_cycle=big_task_duty_cycle) - - return { - f"{cls.task_prefix}_{i}": RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=duty, - duration=1, - period=cls.TASK_PERIOD, - ) - ) - for i in range(2) - } - - -class TwoBigThreeSmall(EASBehaviourNoEWMA): - """ - A mix of 'big' and 'small' tasks - """ - - small_prefix = "small" - big_prefix = "big" - - @classmethod - def _do_get_rtapp_profile(cls, plat_info, big_task_duty_cycle=None): - little_duty = cls.get_little_duty_cycle(plat_info) - big_duty = cls.get_big_duty_cycle(plat_info, big_task_duty_cycle=big_task_duty_cycle) - - return { - **{ - f"{cls.small_prefix}_{i}": RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=little_duty, - duration=1, - period=cls.TASK_PERIOD - ) - ) - for i in range(3) - }, - **{ - f"{cls.big_prefix}_{i}": RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=big_duty, - duration=1, - period=cls.TASK_PERIOD - ) - ) - for i in range(2) - } - } - - -class EnergyModelWakeMigration(EASBehaviourNoEWMA): - """ - One task per big CPU, alternating between two phases: - - * Low utilization phase (should run on a LITTLE CPU) - * High utilization phase (should run on a big CPU) - """ - task_prefix = "emwm" - - @classmethod - def check_from_target(cls, target): - super().check_from_target(target) - if len(target.plat_info["capacity-classes"]) < 2: - ResultBundle.raise_skip( - 'Cannot test migration on single capacity group') - - @classmethod - def _do_get_rtapp_profile(cls, plat_info, big_task_duty_cycle=None): - little = cls.get_little_cpu(plat_info) - end_pct = cls.get_big_duty_cycle(plat_info, big_task_duty_cycle=big_task_duty_cycle) - bigs = plat_info["capacity-classes"][-1] - - return { - f"{cls.task_prefix}_{i}": 2 * ( - RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=20, - scale_for_cpu=little, - duration=2, - period=cls.TASK_PERIOD, - ) - ) + - RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=end_pct, - duration=2, - period=cls.TASK_PERIOD, - ) - ) - ) - for i in range(len(bigs)) - } - - -class RampUp(EASBehaviourNoEWMA): - """ - A single task whose utilization slowly ramps up - """ - task_name = "up" - - @EASBehaviour.test_task_placement.used_events - def test_task_placement(self, energy_est_threshold_pct=15, nrg_model: EnergyModel = None, - noise_threshold_pct=1, noise_threshold_ms=None, - capacity_margin_pct=20) -> ResultBundle: - """ - Same as :meth:`EASBehaviour.test_task_placement` but with a higher - default threshold. - - The main purpose of this test is to ensure that as it grows in load, a - task is migrated from LITTLE to big CPUs on a big.LITTLE system. - This migration naturally happens some time _after_ it could possibly be - done, since there must be some hysteresis to avoid a performance cost. - Therefore allow a larger energy usage threshold - """ - return super().test_task_placement( - energy_est_threshold_pct, nrg_model, - noise_threshold_pct=noise_threshold_pct, - noise_threshold_ms=noise_threshold_ms, - capacity_margin_pct=capacity_margin_pct) - - @classmethod - def _do_get_rtapp_profile(cls, plat_info, big_task_duty_cycle=None): - little = cls.get_little_cpu(plat_info) - start_pct = cls.unscaled_utilization(plat_info, little, 10) - end_pct = cls.get_big_duty_cycle(plat_info, big_task_duty_cycle=big_task_duty_cycle) - - return { - cls.task_name: DutyCycleSweepPhase( - start=start_pct, - stop=end_pct, - step=5, - duration=0.5, - duration_of='step', - period=cls.TASK_PERIOD, - ) - } - - -class RampDown(EASBehaviour): - """ - A single task whose utilization slowly ramps down - """ - task_name = "down" - - @EASBehaviour.test_task_placement.used_events - def test_task_placement(self, energy_est_threshold_pct=18, nrg_model: EnergyModel = None, - noise_threshold_pct=1, noise_threshold_ms=None, - capacity_margin_pct=20) -> ResultBundle: - """ - Same as :meth:`EASBehaviour.test_task_placement` but with a higher - default threshold - - The main purpose of this test is to ensure that as it reduces in load, a - task is migrated from big to LITTLE CPUs on a big.LITTLE system. - This migration naturally happens some time _after_ it could possibly be - done, since there must be some hysteresis to avoid a performance cost. - Therefore allow a larger energy usage threshold - - The number below has been found by trial and error on the platform - generally used for testing EAS (at the time of writing: Juno r0, Juno r2, - Hikey960 and TC2). It would be better to estimate the amount of energy - 'wasted' in the hysteresis (the overutilized band) and compute a threshold - based on that. But implementing this isn't easy because it's very platform - dependent, so until we have a way to do that easily in test classes, let's - stick with the arbitrary threshold. - """ - return super().test_task_placement( - energy_est_threshold_pct, nrg_model, - noise_threshold_pct=noise_threshold_pct, - noise_threshold_ms=noise_threshold_ms, - capacity_margin_pct=capacity_margin_pct) - - @classmethod - def _get_rtapp_profile(cls, plat_info, big_task_duty_cycle=None): - little = cls.get_little_cpu(plat_info) - start_pct = cls.get_big_duty_cycle(plat_info, big_task_duty_cycle=big_task_duty_cycle) - end_pct = cls.unscaled_utilization(plat_info, little, 10) - - return { - cls.task_name: DutyCycleSweepPhase( - start=start_pct, - stop=end_pct, - step=5, - duration=0.5, - duration_of='step', - period=cls.TASK_PERIOD, - ) - } - -# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/scheduler/load_tracking.py b/lisa_tests/arm/kernel/scheduler/load_tracking.py deleted file mode 100644 index 716d746370..0000000000 --- a/lisa_tests/arm/kernel/scheduler/load_tracking.py +++ /dev/null @@ -1,850 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2016, ARM Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import abc -import os -import itertools -import contextlib -from statistics import mean -from typing import TypeVar - -from devlib.exception import TargetStableError - -from lisa.tests.base import ( - Result, ResultBundle, AggregatedResultBundle, TestBundleBase, TestBundle, - RTATestBundle -) -from lisa.target import Target -from lisa.utils import ArtifactPath, ExekallTaggable, groupby, kwargs_forwarded_to, memoized, ignore_exceps -from lisa.datautils import df_refit_index, series_dereference, series_mean -from lisa.wlgen.rta import PeriodicWload, RTAPhase -from lisa.trace import MissingTraceEventError -from lisa.analysis.load_tracking import LoadTrackingAnalysis -from lisa.analysis.tasks import TasksAnalysis -from lisa.pelt import PELT_SCALE, simulate_pelt, pelt_settling_time, kernel_util_mean -from lisa.notebook import plot_signal - -UTIL_SCALE = PELT_SCALE - -UTIL_CONVERGENCE_TIME_S = pelt_settling_time(1, init=0, final=1024) -""" -Time in seconds for util_avg to converge (i.e. ignored time) -""" - - -class LoadTrackingHelpers: - """ - Common bunch of helpers for load tracking tests. - """ - - MAX_RTAPP_CALIB_DEVIATION = 3 / 100 - """ - Ignore CPUs that have a RTapp calibration value that deviates too much - from the average calib value in their capacity class. - """ - - @classmethod - def _get_ignored_cpus(cls, plat_info): - """ - :meta public: - - Consider some CPUs as ignored when the load would not be - proportionnal to utilization on them. - - That happens for CPUs that are busy executing other code than the test - workload, like handling interrupts. It is detect that by looking at the - RTapp calibration value and we ignore outliers. - """ - rtapp_calib = plat_info['rtapp']['calib'] - ignored = set() - # For each class of CPUs, get the average rtapp calibration value - # and ignore the ones that are deviating too much from that - for cpu_class in plat_info['capacity-classes']: - calib_mean = mean(rtapp_calib[cpu] for cpu in cpu_class) - calib_max = (1 + cls.MAX_RTAPP_CALIB_DEVIATION) * calib_mean - ignored.update( - cpu - for cpu in cpu_class - # exclude outliers that are too slow (i.e. calib value too small) - if rtapp_calib[cpu] > calib_max - ) - return sorted(ignored) - - @classmethod - def filter_capacity_classes(cls, plat_info): - """ - Filter out capacity-classes key of ``plat_info`` to remove ignored - CPUs provided by: - """ - ignored_cpus = set(cls._get_ignored_cpus(plat_info)) - return [ - sorted(set(cpu_class) - ignored_cpus) - for cpu_class in plat_info['capacity-classes'] - ] - - @classmethod - def correct_expected_pelt(cls, plat_info, cpu, signal_value): - """ - Correct an expected PELT signal from ``rt-app`` based on the calibration - values. - - Since the instruction mix of ``rt-app`` might not be the same as the - benchmark that was used to establish CPU capacities, the duty cycle of - ``rt-app`` will only be accurate on big CPUs. When we know on which CPU - the task actually executed, we can correct the expected value based on - the ratio of calibration values and CPU capacities. - """ - - calib = plat_info['rtapp']['calib'] - rtapp_capacities = plat_info['cpu-capacities']['rtapp'] - orig_capacities = plat_info['cpu-capacities']['orig'] - - # Correct the signal mean to what it should have been if rt-app - # workload was exactly the same as the one used to establish CPU - # capacities - return signal_value * orig_capacities[cpu] / rtapp_capacities[cpu] - - -class InvarianceItemBase(RTATestBundle, LoadTrackingHelpers, TestBundle, ExekallTaggable, abc.ABC): - """ - Basic check for CPU and frequency invariant load and utilization tracking - - **Expected Behaviour:** - - Load tracking signals are scaled so that the workload results in - roughly the same util & load values regardless of compute power of the - CPU used and its frequency. - """ - task_prefix = 'invar' - cpufreq_conf = { - "governor": "userspace" - } - - def __init__(self, res_dir, plat_info, cpu, freq, freq_list): - super().__init__(res_dir, plat_info) - - self.freq = freq - self.freq_list = freq_list - self.cpu = cpu - - @property - def rtapp_profile(self): - return self.get_rtapp_profile(self.plat_info, cpu=self.cpu, freq=self.freq) - - @property - def task_name(self): - """ - The name of the only task this test uses - """ - tasks = self.rtapp_tasks - assert len(tasks) == 1 - return tasks[0] - - @property - def wlgen_task(self): - """ - The :class:`lisa.wlgen.rta.RTATask` description of the only rt-app - task, as specified in the profile. - """ - tasks = list(self.rtapp_profile.values()) - assert len(tasks) == 1 - return tasks[0] - - @property - def cpus(self): - """ - All CPUs used by RTapp workload. - """ - return set(itertools.chain.from_iterable( - phase['cpus'] - for task in self.rtapp_profile.values() - for phase in task.phases - )) - - def get_tags(self): - return {'cpu': f'{self.cpu}@{self.freq}'} - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, collector=None) -> 'InvarianceItemBase': - plat_info = target.plat_info - rtapp_profile = cls.get_rtapp_profile(plat_info) - - # After a bit of experimenting, it turns out that on some platforms - # misprediction of the idle time (which leads to a shallow idle state, - # a wakeup and another idle nap) can mess up the duty cycle of the - # rt-app task we're running. In our case, a 50% duty cycle, 16ms period - # task would always be active for 8ms, but it would sometimes sleep for - # only 5 or 6 ms. - # This is fine to do this here, as we only care about the proper - # behaviour of the signal on running/not-running tasks. - with target.disable_idle_states(): - with target.cpufreq.use_governor(**cls.cpufreq_conf): - cls.run_rtapp( - target=target, - res_dir=res_dir, - profile=rtapp_profile, - collector=collector - ) - - return cls(res_dir, plat_info) - - @classmethod - def _get_rtapp_profile(cls, plat_info, cpu, freq): - """ - :meta public: - - Get a specification for a rt-app workload with the specificied duty - cycle, pinned to the given CPU. - """ - freq_capa = cls._get_freq_capa(cpu, freq, plat_info) - duty_cycle_pct = freq_capa / UTIL_SCALE * 100 - # Use half of the capacity at that OPP, so we are sure that the - # task will fit even at the lowest OPP - duty_cycle_pct //= 2 - - # Catch rt-app calibration induced issues early. - assert duty_cycle_pct > 0 - - return { - f"{cls.task_prefix}{cpu}": RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=duty_cycle_pct, - duration=2, - period=cls.TASK_PERIOD, - ), - prop_cpus=[cpu], - ) - } - - @classmethod - def _from_target(cls, target: Target, *, cpu: int, freq: int, freq_list=None, res_dir: ArtifactPath = None, collector=None) -> 'InvarianceItemBase': - """ - :meta public: - - :param cpu: CPU to use, or ``None`` to automatically choose an - appropriate set of CPUs. - :type cpu: int or None - - :param freq: Frequency to run at in kHz. It is only relevant in - combination with ``cpu``. - :type freq: int or None - """ - plat_info = target.plat_info - rtapp_profile = cls.get_rtapp_profile(plat_info, cpu=cpu, freq=freq) - logger = cls.get_logger() - - with target.cpufreq.use_governor(**cls.cpufreq_conf): - target.cpufreq.set_frequency(cpu, freq) - logger.debug(f'CPU{cpu} frequency: {target.cpufreq.get_frequency(cpu)}') - cls.run_rtapp( - target=target, - res_dir=res_dir, - profile=rtapp_profile, - collector=collector - ) - - freq_list = freq_list or [freq] - return cls(res_dir, plat_info, cpu, freq, freq_list) - - @staticmethod - def _get_freq_capa(cpu, freq, plat_info): - capacity = plat_info['cpu-capacities']['rtapp'][cpu] - # Scale the capacity linearly according to the frequency - max_freq = max(plat_info['freqs'][cpu]) - capacity *= freq / max_freq - - return capacity - - @abc.abstractmethod - def _get_trace_signal(self, task, cpus, signal_name): - pass - - @LoadTrackingAnalysis.df_task_signal.used_events - @LoadTrackingAnalysis.df_cpus_signal.used_events - @TasksAnalysis.df_task_activation.used_events - def get_simulated_pelt(self, task, signal_name): - """ - Simulate a PELT signal for a given task. - - :param task: task to look for in the trace. - :type task: int or str or tuple(int, str) - - :param signal_name: Name of the PELT signal to simulate. - :type signal_name: str - - :return: A :class:`pandas.DataFrame` with a ``simulated`` column - containing the simulated signal, along with the column of the - signal as found in the trace. - """ - logger = self.logger - trace = self.trace - task = trace.get_task_id(task) - - df_activation = trace.ana.tasks.df_task_activation( - task, - # Util only takes into account times where the task is actually - # executing - preempted_value=0, - ) - - pinned_cpus = sorted(self.cpus) - assert len(pinned_cpus) == 1 - df = self._get_trace_signal(task, pinned_cpus, signal_name) - - df = df.copy(deep=False) - - # Ignore the first activation, as its signals are incorrect - df_activation = df_activation.iloc[2:] - - # Make sure the activation df does not start before the dataframe of - # signal values, otherwise we cannot provide a sensible init value - df_activation = df_activation[df.index[0]:] - - # Get the initial signal value matching the first activation we will care about - init_iloc = df.index.get_indexer([df_activation.index[0]], method='ffill')[0] - init = df[signal_name].iloc[init_iloc] - - try: - # PELT clock in nanoseconds - clock = df['update_time'] * 1e-9 - except KeyError: - if any( - self.plat_info['cpu-capacities']['rtapp'][cpu] != UTIL_SCALE - for phase in self.wlgen_task.phases - for cpu in phase['cpus'] - ): - ResultBundle.raise_skip('PELT time scaling can only be simulated when the PELT clock is available from the trace') - - logger.warning('PELT clock is not available, ftrace timestamp will be used at the expense of accuracy') - clock = None - - try: - cpus = trace.ana.tasks.cpus_of_tasks([task]) - capacity = trace.ana.load_tracking.df_cpus_signal('capacity', cpus) - except MissingTraceEventError: - capacity = None - else: - capacity = capacity[['cpu', 'capacity_curr']] - # We are interested in the current CPU capacity as seen by CFS. - # This takes into account: - # * The frequency - # * The capacity of other sched classes (RT, IRQ etc) - capacity = capacity.rename(columns={'capacity_curr': 'capacity'}) - - # Reshape the capacity dataframe so that we get one column per CPU - capacity = capacity.pivot(columns=['cpu']) - capacity.columns = capacity.columns.droplevel(0) - capacity.ffill(inplace=True) - capacity = df_refit_index( - capacity, - window=(df_activation.index[0], df_activation.index[-1]) - ) - # Make sure we end up with the timestamp at which the capacity - # changes, rather than the timestamps at which the task is enqueued - # or dequeued. - activation_cpu = df_activation['cpu'].reindex(capacity.index, method='ffill') - capacity = series_dereference(activation_cpu, capacity) - - df['simulated'] = simulate_pelt( - df_activation['active'], - index=df.index, - init=init, - clock=clock, - capacity=capacity, - ) - - # Since load is now CPU invariant in recent kernel versions, we don't - # rescale it back. To match the old behavior, that line is - # needed: - # df['simulated'] /= self.plat_info['cpu-capacities']['rtapp'][cpu] / UTIL_SCALE - kernel_version = self.plat_info['kernel']['version'] - if ( - signal_name == 'load' - and kernel_version.parts[:2] < (5, 1) - ): - logger().warning(f'Load signal is assumed to be CPU invariant, which is true for recent mainline kernels, but may be wrong for {kernel_version}') - - df['error'] = df[signal_name] - df['simulated'] - df = df.dropna() - return df - - def _plot_pelt(self, task, signal_name, simulated, test_name): - ana = self.trace.ana( - backend='bokeh', - task=task, - tasks=[task], - ) - - fig = ( - ana.load_tracking.plot_task_signals(signals=[signal_name]) * - plot_signal(simulated, name=f'simulated {signal_name}') * - ana.tasks.plot_tasks_activation( - alpha=0.2, - overlay=True, - which_cpu=False, - # TODO: reeanble that when we get working twinx - # duration=True, - ) - ) - - self._save_debug_plot(fig, name=f'{test_name}_{signal_name}') - return fig - - def _add_cpu_metric(self, res_bundle): - freq_str = f'@{self.freq}' if self.freq is not None else '' - res_bundle.add_metric("cpu", f'{self.cpu}{freq_str}') - return res_bundle - - @memoized - @get_simulated_pelt.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def _test_correctness(self, signal_name, mean_error_margin_pct, max_error_margin_pct): - - task = self.task_name - df = self.get_simulated_pelt(task, signal_name) - - abs_error = df['error'].abs() - mean_error_pct = series_mean(abs_error) / UTIL_SCALE * 100 - max_error_pct = abs_error.max() / UTIL_SCALE * 100 - - mean_ok = mean_error_pct <= mean_error_margin_pct - max_ok = max_error_pct <= max_error_margin_pct - - res = ResultBundle.from_bool(mean_ok and max_ok) - - res.add_metric('actual mean', series_mean(df[signal_name])) - res.add_metric('simulated mean', series_mean(df['simulated'])) - res.add_metric('mean error', mean_error_pct, '%') - - res.add_metric('actual max', df[signal_name].max()) - res.add_metric('simulated max', df['simulated'].max()) - res.add_metric('max error', max_error_pct, '%') - - self._plot_pelt(task, signal_name, df['simulated'], 'correctness') - - res = self._add_cpu_metric(res) - return res - - @memoized - @_test_correctness.used_events - def test_util_correctness(self, mean_error_margin_pct=2, max_error_margin_pct=5) -> ResultBundle: - """ - Check that the utilization signal is as expected. - - :param mean_error_margin_pct: Maximum allowed difference in the mean of - the actual signal and the simulated one, as a percentage of utilization - scale. - :type mean_error_margin_pct: float - - :param max_error_margin_pct: Maximum allowed difference between samples - of the actual signal and the simulated one, as a percentage of - utilization scale. - :type max_error_margin_pct: float - """ - return self._test_correctness( - signal_name='util', - mean_error_margin_pct=mean_error_margin_pct, - max_error_margin_pct=max_error_margin_pct, - ) - - @memoized - @_test_correctness.used_events - def test_load_correctness(self, mean_error_margin_pct=2, max_error_margin_pct=5) -> ResultBundle: - """ - Same as :meth:`test_util_correctness` but checking the load. - """ - return self._test_correctness( - signal_name='load', - mean_error_margin_pct=mean_error_margin_pct, - max_error_margin_pct=max_error_margin_pct, - ) - - -class InvarianceBase(TestBundleBase, LoadTrackingHelpers, abc.ABC): - """ - Basic check for frequency invariant load and utilization tracking - - This test runs the same workload on one CPU of each capacity available in - the system at a cross section of available frequencies. - - This class is mostly a wrapper around :class:`InvarianceItemBase`, - providing a way to build a list of those for a few frequencies, and - providing aggregated versions of the tests. Calling the tests methods on - the items directly is recommended to avoid the unavoidable loss of - information when aggregating the - :class:`~lisa.tests.base.Result` of each item. - - `invariance_items` instance attribute is a list of instances of - :class:`InvarianceItemBase`. - """ - - ITEM_CLS = TypeVar('ITEM_CLS') - - NR_FREQUENCIES = 8 - """ - Maximum number of tested frequencies. - """ - - def __init__(self, res_dir, plat_info, invariance_items): - super().__init__(res_dir, plat_info) - - self.invariance_items = invariance_items - - @classmethod - def _build_invariance_items(cls, target, res_dir, **kwargs): - """ - Yield a :class:`InvarianceItemBase` for a subset of target's - frequencies, for one CPU of each capacity class. - - This is a generator function. - - :Variable keyword arguments: Forwarded to :meth:`InvarianceItemBase.from_target` - - :rtype: Iterator[:class:`InvarianceItemBase`] - """ - plat_info = target.plat_info - - def pick_cpu(filtered_class, cpu_class): - try: - return filtered_class[0] - except IndexError: - raise RuntimeError(f'All CPUs of one capacity class have been ignored: {cpu_class}') - - # pick one CPU per class of capacity - cpus = [ - pick_cpu(filtered_class, cpu_class) - for cpu_class, filtered_class - in zip( - plat_info['capacity-classes'], - cls.filter_capacity_classes(plat_info) - ) - ] - - def select_freqs(cpu): - all_freqs = plat_info['freqs'][cpu] - - def interpolate(start, stop, nr): - step = (stop - start) / (nr - 1) - return [start + i * step for i in range(nr)] - - # Select the higher freq no matter what - selected_freqs = {max(all_freqs)} - - available_freqs = set(all_freqs) - selected_freqs - nr_freqs = cls.NR_FREQUENCIES - len(selected_freqs) - for ideal_freq in interpolate(min(all_freqs), max(all_freqs), nr_freqs): - - if not available_freqs: - break - - # Select the freq closest to ideal - selected_freq = min(available_freqs, key=lambda freq: abs(freq - ideal_freq)) - available_freqs.discard(selected_freq) - selected_freqs.add(selected_freq) - - return all_freqs, sorted(selected_freqs) - - cpu_freqs = { - cpu: select_freqs(cpu) - for cpu in cpus - } - - logger = cls.get_logger() - logger.info('Will run on: {}'.format( - ', '.join( - f'CPU{cpu}@{freq}' - for cpu, (all_freqs, freq_list) in sorted(cpu_freqs.items()) - for freq in freq_list - ) - )) - - with ignore_exceps( - (FileNotFoundError, TargetStableError), - target.revertable_write_value('/sys/kernel/debug/workqueue/high_prio_wq', '0') - ): - for cpu, (all_freqs, freq_list) in sorted(cpu_freqs.items()): - for freq in freq_list: - item_dir = ArtifactPath.join(res_dir, f"{InvarianceItemBase.task_prefix}_{cpu}@{freq}") - os.makedirs(item_dir) - - logger.info(f'Running experiment for CPU {cpu}@{freq}') - yield cls.ITEM_CLS.from_target( - target, - cpu=cpu, - freq=freq, - freq_list=all_freqs, - res_dir=item_dir, - **kwargs, - ) - - def iter_invariance_items(self) -> 'ITEM_CLS': - yield from self.invariance_items - - @classmethod - @kwargs_forwarded_to( - InvarianceItemBase._from_target, - ignore=[ - 'cpu', - 'freq', - 'freq_list', - ] - ) - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, collector=None, **kwargs) -> 'InvarianceBase': - return cls(res_dir, target.plat_info, - list(cls._build_invariance_items(target, res_dir, **kwargs)) - ) - - def get_item(self, cpu, freq): - """ - :returns: The - :class:`~lisa_tests.arm.kernel.scheduler.load_tracking.InvarianceItemBase` - generated when running at a given frequency - """ - for item in self.invariance_items: - if item.cpu == cpu and item.freq == freq: - return item - raise ValueError('No invariance item matching {cpu}@{freq}'.format(cpu, freq)) - - # Combined version of some other tests, applied on all available - # InvarianceItemBase with the result merged. - - @InvarianceItemBase.test_util_correctness.used_events - def test_util_correctness(self, mean_error_margin_pct=2, max_error_margin_pct=5) -> AggregatedResultBundle: - """ - Aggregated version of :meth:`InvarianceItemBase.test_util_correctness` - """ - def item_test(test_item): - return test_item.test_util_correctness( - mean_error_margin_pct=mean_error_margin_pct, - max_error_margin_pct=max_error_margin_pct, - ) - return self._test_all_items(item_test) - - @InvarianceItemBase.test_load_correctness.used_events - def test_load_correctness(self, mean_error_margin_pct=2, max_error_margin_pct=5) -> AggregatedResultBundle: - """ - Aggregated version of :meth:`InvarianceItemBase.test_load_correctness` - """ - def item_test(test_item): - return test_item.test_load_correctness( - mean_error_margin_pct=mean_error_margin_pct, - max_error_margin_pct=max_error_margin_pct, - ) - return self._test_all_items(item_test) - - def _test_all_items(self, item_test): - """ - Apply the `item_test` function on all instances of - :class:`InvarianceItemBase` and aggregate the returned - :class:`~lisa.tests.base.ResultBundle` into one. - - :attr:`~lisa.tests.base.Result.UNDECIDED` is ignored. - """ - item_res_bundles = [ - item_test(item) - for item in self.invariance_items - ] - return AggregatedResultBundle(item_res_bundles, 'cpu') - - -class TaskInvariance(InvarianceBase): - class ITEM_CLS(InvarianceItemBase): - """ - Provide specific :class:`TaskInvariance.ITEM_CLS` methods. - The common methods are implemented in :class:`InvarianceItemBase`. - """ - - def _get_trace_signal(self, task, cpus, signal_name): - return self.trace.ana.load_tracking.df_task_signal(task, signal_name) - - @memoized - @InvarianceItemBase.get_simulated_pelt.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def _test_behaviour(self, signal_name, error_margin_pct): - - task = self.task_name - phase = self.wlgen_task.phases[0] - df = self.get_simulated_pelt(task, signal_name) - - cpus = sorted(phase['cpus']) - assert len(cpus) == 1 - cpu = cpus[0] - - expected_duty_cycle_pct = phase['wload'].unscaled_duty_cycle_pct(self.plat_info) - expected_final_util = expected_duty_cycle_pct / 100 * UTIL_SCALE - settling_time = pelt_settling_time(10, init=0, final=expected_final_util) - settling_time += df.index[0] - - df = df[settling_time:] - - # Instead of taking the mean, take the average between the min and max - # values of the settled signal. This avoids the bias introduced by the - # fact that the util signal stays high while the task sleeps - settled_signal_mean = kernel_util_mean(df[signal_name], plat_info=self.plat_info) - expected_signal_mean = expected_final_util - - signal_mean_error_pct = abs(expected_signal_mean - settled_signal_mean) / UTIL_SCALE * 100 - res = ResultBundle.from_bool(signal_mean_error_pct < error_margin_pct) - - res.add_metric('expected mean', expected_signal_mean) - res.add_metric('settled mean', settled_signal_mean) - res.add_metric('settled mean error', signal_mean_error_pct, '%') - - self._plot_pelt(task, signal_name, df['simulated'], 'behaviour') - - res = self._add_cpu_metric(res) - return res - - @memoized - @_test_behaviour.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_util_behaviour(self, error_margin_pct=5) -> ResultBundle: - """ - Check the utilization mean is linked to the task duty cycle. - - - .. note:: That is not really the case, as the util of a task is not - updated when the task is sleeping, but is fairly close to reality - as long as the task period is small enough. - - :param error_margin_pct: Allowed difference in percentage of - utilization scale. - :type error_margin_pct: float - - """ - return self._test_behaviour('util', error_margin_pct) - - @memoized - @_test_behaviour.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_load_behaviour(self, error_margin_pct=5) -> ResultBundle: - """ - Same as :meth:`TaskInvariance.ITEM_CLS.test_util_behaviour` but checking the load. - """ - return self._test_behaviour('load', error_margin_pct) - - @ITEM_CLS.test_load_behaviour.used_events - def test_util_behaviour(self, error_margin_pct=5) -> AggregatedResultBundle: - """ - Aggregated version of :meth:`TaskInvariance.ITEM_CLS.test_util_behaviour` - """ - def item_test(test_item): - return test_item.test_util_behaviour( - error_margin_pct=error_margin_pct, - ) - return self._test_all_items(item_test) - - @ITEM_CLS.test_load_behaviour.used_events - def test_load_behaviour(self, error_margin_pct=5) -> AggregatedResultBundle: - """ - Aggregated version of :meth:`TaskInvariance.ITEM_CLS.test_load_behaviour` - """ - def item_test(test_item): - return test_item.test_load_behaviour( - error_margin_pct=error_margin_pct, - ) - return self._test_all_items(item_test) - - @ITEM_CLS.test_util_behaviour.used_events - def test_cpu_invariance(self) -> AggregatedResultBundle: - """ - Check that items using the max freq on each CPU is passing util avg test. - - There could be false positives, but they are expected to be relatively - rare. - - .. seealso:: :class:`TaskInvariance.ITEM_CLS.test_util_behaviour` - """ - res_list = [] - for cpu, item_group in groupby(self.invariance_items, key=lambda x: x.cpu): - item_group = list(item_group) - # combine all frequencies of that CPU class, although they should - # all be the same - max_freq = max(itertools.chain.from_iterable( - x.freq_list for x in item_group - )) - max_freq_items = [ - item - for item in item_group - if item.freq == max_freq - ] - for item in max_freq_items: - # Only test util, as it should be more robust - res = item.test_util_behaviour() - res_list.append(res) - - return AggregatedResultBundle(res_list, 'cpu') - - @ITEM_CLS.test_util_behaviour.used_events - def test_freq_invariance(self) -> AggregatedResultBundle: - """ - Check that at least one CPU has items passing for all tested frequencies. - - .. seealso:: :class:`TaskInvariance.ITEM_CLS.test_util_behaviour` - """ - - logger = self.logger - - def make_group_bundle(cpu, item_group): - bundle = AggregatedResultBundle( - [ - # Only test util, as it should be more robust - item.test_util_behaviour() - for item in item_group - ], - # each item's "cpu" metric also contains the frequency - name_metric='cpu', - ) - # At that level, we only report the CPU, since nested bundles cover - # different frequencies - bundle.add_metric('cpu', cpu) - - logger.info(f'Util avg invariance {bundle.result.lower_name} for CPU {cpu}') - return bundle - - group_result_bundles = [ - make_group_bundle(cpu, item_group) - for cpu, item_group in groupby(self.invariance_items, key=lambda x: x.cpu) - ] - - # The combination differs from the AggregatedResultBundle default one: - # we consider as passed as long as at least one of the group has - # passed, instead of forcing all of them to pass. - if any(result_bundle.result is Result.PASSED for result_bundle in group_result_bundles): - overall_result = Result.PASSED - elif all(result_bundle.result is Result.UNDECIDED for result_bundle in group_result_bundles): - overall_result = Result.UNDECIDED - else: - overall_result = Result.FAILED - - return AggregatedResultBundle( - group_result_bundles, - name_metric='cpu', - result=overall_result - ) - - -class RqInvariance(InvarianceBase): - class ITEM_CLS(InvarianceItemBase): - """ - Provide specific :class:`RqInvariance.ITEM_CLS` methods. - The common methods are implemented in :class:`InvarianceItemBase`. - """ - - def _get_trace_signal(self, task, cpus, signal_name): - return self.trace.ana.load_tracking.df_cpus_signal(signal_name, cpus) - # vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/scheduler/misfit.py b/lisa_tests/arm/kernel/scheduler/misfit.py deleted file mode 100644 index 0d6850f574..0000000000 --- a/lisa_tests/arm/kernel/scheduler/misfit.py +++ /dev/null @@ -1,343 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2018, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from math import ceil - -import pandas as pd - -from lisa.utils import memoized -from lisa.datautils import df_squash, df_add_delta -from lisa.trace import requires_events -from lisa.wlgen.rta import RTAPhase, RunWload, SleepWload -from lisa.tests.base import TestBundle, RTATestBundle, Result, ResultBundle, TestMetric -from lisa.analysis.tasks import TasksAnalysis, TaskState -from lisa.analysis.idle import IdleAnalysis -from lisa.analysis.rta import RTAEventsAnalysis - - -class MisfitMigrationBase(RTATestBundle, TestBundle): - """ - Abstract class for Misfit behavioural testing - - This class provides some helpers for features related to Misfit. - """ - - @classmethod - def _has_asym_cpucapacity(cls, target): - """ - :returns: Whether the target has asymmetric CPU capacities - """ - return len(set(target.plat_info["cpu-capacities"]['orig'].values())) > 1 - - @classmethod - def _get_max_lb_interval(cls, plat_info): - """ - Get the value of maximum_load_balance_interval. - - The kernel computes it so: - HZ*num_online_cpus()/10; - (https://elixir.bootlin.com/linux/v4.15/source/kernel/sched/fair.c#L9101) - - Here we don't do any hotplugging so we consider all CPUs to be online. - - :returns: The absolute maximum load-balance interval in seconds - """ - HZ = plat_info['kernel']['config']['CONFIG_HZ'] - return ((HZ * plat_info['cpus-count']) // 10) * (1. / HZ) - - @classmethod - def _get_lb_interval(cls, plat_info): - # Regular interval is 1 ms * nr_cpus, rounded to closest jiffy multiple - jiffy = 1 / plat_info['kernel']['config']['CONFIG_HZ'] - interval = 1e-3 * plat_info["cpus-count"] - - return ceil(interval / jiffy) * jiffy - -class StaggeredFinishes(MisfitMigrationBase): - """ - One 100% task per CPU, with staggered completion times. - - By spawning one task per CPU on an asymmetric system, we expect the tasks - running on the higher-performance CPUs to complete first. At this point, - the misfit logic should kick in and they should pull tasks from - lower-performance CPUs. - - The tasks have staggered completion times to prevent having several of them - completing at the same time, which can cause some unwanted noise (e.g. some - sshd or systemd activity at the end of the task). - - The end result should look something like this on big.LITTLE:: - - a,b,c,d are CPU-hogging tasks - _ signifies idling - - LITTLE_0 | a a a a _ _ _ - LITTLE_1 | b b b b b _ _ - ---------|-------------- - big_0 | c c c c a a a - big_1 | d d d d d b b - - """ - - task_prefix = "msft" - - PIN_DELAY = 0.001 - """ - How long the tasks will be pinned to their "starting" CPU. Doesn't have - to be long (we just have to ensure they spawn there), so arbitrary value - """ - - # Let us handle things ourselves - _BUFFER_PHASE_DURATION_S=0 - - IDLING_DELAY = 1 - """ - A somewhat arbitray delay - long enough to ensure - rq->avg_idle > sysctl_sched_migration_cost - """ - - @property - def src_cpus(self): - return self.plat_info['capacity-classes'][0] - - @property - def dst_cpus(self): - cpu_classes = self.plat_info['capacity-classes'] - - # XXX: Might need to check the tasks can fit on all of those, rather - # than just pick all but the smallest CPUs - dst_cpus = [] - for group in cpu_classes[1:]: - dst_cpus += group - return dst_cpus - - @property - def end_time(self): - return self.trace.end - - @property - def duration(self): - return self.end_time - self.start_time - - @property - @memoized - @RTAEventsAnalysis.df_rtapp_phases_start.used_events - def start_time(self): - """ - The tasks don't wake up at the same exact time, find the task that is - the last to wake up (after the idling phase). - - .. note:: We don't want to redefine - :meth:`~lisa.tests.base.RTATestBundle.trace_window` here because we - still need the first wakeups to be visible. - """ - phase_df = self.trace.ana.rta.df_rtapp_phases_start(wlgen_profile=self.rtapp_profile) - return phase_df[ - phase_df.index.get_level_values('phase') == 'test/pinned' - ]['Time'].max() - - @classmethod - def check_from_target(cls, target): - super().check_from_target(target) - if not cls._has_asym_cpucapacity(target): - ResultBundle.raise_skip( - "Target doesn't have asymmetric CPU capacities") - - @classmethod - def _get_rtapp_profile(cls, plat_info): - cpus = list(range(plat_info['cpus-count'])) - - # We're pinning stuff in the first phase, so give it ample time to - # clean the pinned logic out of balance_interval - free_time_s = 1.1 * cls._get_max_lb_interval(plat_info) - - # Ideally we'd like the different tasks not to complete at the same time - # (hence the "staggered" name), but this depends on a lot of factors - # (capacity ratios, available frequencies, thermal conditions...) so the - # best we can do is wing it. - stagger_s = cls._get_lb_interval(plat_info) * 1.5 - - return { - f"{cls.task_prefix}{cpu}": ( - RTAPhase( - prop_name='idling', - prop_wload=SleepWload(cls.IDLING_DELAY), - prop_cpus=[cpu], - ) + - RTAPhase( - prop_name='pinned', - prop_wload=RunWload(cls.PIN_DELAY), - prop_cpus=[cpu], - ) + - RTAPhase( - prop_name='staggered', - prop_wload=RunWload( - # Introduce staggered task completions - free_time_s + cpu * stagger_s - ), - prop_cpus=cpus, - ) - ) - for cpu in cpus - } - - def _trim_state_df(self, state_df): - if state_df.empty: - return state_df - - return df_squash(state_df, self.start_time, - state_df.index[-1] + state_df['delta'].iloc[-1], "delta") - - @requires_events('sched_switch', TasksAnalysis.df_task_states.used_events) - def test_preempt_time(self, allowed_preempt_pct=1) -> ResultBundle: - """ - Test that tasks are not being preempted too much - """ - - sdf = self.trace.df_event('sched_switch') - task_state_dfs = { - task: self.trace.ana.tasks.df_task_states(task) - for task in self.rtapp_tasks - } - - res = ResultBundle.from_bool(True) - for task, state_df in task_state_dfs.items(): - # The sched_switch dataframe where the misfit task - # is replaced by another misfit task - preempt_sdf = sdf[ - (sdf.prev_comm == task) & - (sdf.next_comm.str.startswith(self.task_prefix)) - ] - - state_df = self._trim_state_df(state_df) - state_df = state_df[ - (state_df.index.isin(preempt_sdf.index)) & - # Ensure this is a preemption and not just the task ending - (state_df.curr_state == TaskState.TASK_INTERRUPTIBLE) - ] - - preempt_time = state_df.delta.sum() - preempt_pct = (preempt_time / self.duration) * 100 - - res.add_metric(f"{task} preemption", { - "ratio": TestMetric(preempt_pct, "%"), - "time": TestMetric(preempt_time, "seconds")}) - - if preempt_pct > allowed_preempt_pct: - res.result = Result.FAILED - - return res - - @memoized - @IdleAnalysis.signal_cpu_active.used_events - def _get_active_df(self, cpu): - """ - :returns: A dataframe that describes the idle status (on/off) of 'cpu' - """ - active_df = pd.DataFrame( - self.trace.ana.idle.signal_cpu_active(cpu), columns=['state'] - ) - df_add_delta(active_df, inplace=True, window=self.trace.window) - return active_df - - @_get_active_df.used_events - def _max_idle_time(self, start, end, cpus): - """ - :returns: The maximum idle time of 'cpus' in the [start, end] interval - """ - max_time = 0 - max_cpu = 0 - - for cpu in cpus: - busy_df = self._get_active_df(cpu) - busy_df = df_squash(busy_df, start, end) - busy_df = busy_df[busy_df.state == 0] - - if busy_df.empty: - continue - - local_max = busy_df.delta.max() - if local_max > max_time: - max_time = local_max - max_cpu = cpu - - return max_time, max_cpu - - @_max_idle_time.used_events - def _test_cpus_busy(self, task_state_dfs, cpus, allowed_idle_time_s): - """ - Test that for every window in which the tasks are running, :attr:`cpus` - are not idle for more than :attr:`allowed_idle_time_s` - """ - if allowed_idle_time_s is None: - allowed_idle_time_s = self._get_lb_interval(self.plat_info) - - res = ResultBundle.from_bool(True) - - for task, state_df in task_state_dfs.items(): - # Have a look at every task activation - task_idle_times = [self._max_idle_time(index, index + row.delta, cpus) - for index, row in state_df.iterrows()] - - if not task_idle_times: - continue - - max_time, max_cpu = max(task_idle_times) - res.add_metric(f"{task} max idle", data={ - "time": TestMetric(max_time, "seconds"), "cpu": TestMetric(max_cpu)}) - - if max_time > allowed_idle_time_s: - res.result = Result.FAILED - - return res - - @TasksAnalysis.df_task_states.used_events - @_test_cpus_busy.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_throughput(self, allowed_idle_time_s=None) -> ResultBundle: - """ - Test that big CPUs are not idle when there are misfit tasks to upmigrate - - :param allowed_idle_time_s: How much time should be allowed between a - big CPU going idle and a misfit task ending on that CPU. In theory - a newidle balance should lead to a null delay, but in practice - there's a tiny one, so don't set that to 0 and expect the test to - pass. - - Furthermore, we're not always guaranteed to get a newidle pull, so - allow time for a regular load balance to happen. - - When ``None``, this defaults to (1ms x number_of_cpus) to mimic the - default balance_interval (balance_interval = sd_weight), see - kernel/sched/topology.c:sd_init(). - :type allowed_idle_time_s: int - """ - task_state_dfs = {} - for task in self.rtapp_tasks: - # This test is all about throughput: check that every time a task - # runs on a little it's because bigs are busy - df = self.trace.ana.tasks.df_task_states(task) - # Trim first to keep coherent deltas - df = self._trim_state_df(df) - task_state_dfs[task] = df[ - # Task is active - (df.curr_state == TaskState.TASK_ACTIVE) & - # Task needs to be upmigrated - (df.cpu.isin(self.src_cpus)) - ] - - return self._test_cpus_busy(task_state_dfs, self.dst_cpus, allowed_idle_time_s) diff --git a/lisa_tests/arm/kernel/scheduler/sanity.py b/lisa_tests/arm/kernel/scheduler/sanity.py deleted file mode 100644 index ee2b58793b..0000000000 --- a/lisa_tests/arm/kernel/scheduler/sanity.py +++ /dev/null @@ -1,85 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2018, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys - -from lisa.target import Target -from lisa.utils import ArtifactPath, group_by_value -from lisa.tests.base import TestMetric, ResultBundle, TestBundle -from lisa.wlgen.sysbench import Sysbench - - -class CapacitySanity(TestBundle): - """ - A class for making sure capacity values make sense on a given target - - :param capacity_work: A description of the amount of work done on the - target, per capacity value ({capacity : work}) - :type capacity_work: dict - """ - - def __init__(self, res_dir, plat_info, capacity_work): - super().__init__(res_dir, plat_info) - - self.capacity_work = capacity_work - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, collector=None) -> 'CapacitySanity': - """ - :meta public: - - Factory method to create a bundle using a live target - """ - with target.cpufreq.use_governor("performance"): - sysbench = Sysbench(target, res_dir=res_dir) - - def run(cpu): - output = sysbench(cpus=[cpu], max_duration_s=1).run() - return output.nr_events - - cpu_capacities = target.sched.get_capacities() - capacities = group_by_value(cpu_capacities) - - with collector: - capa_work = { - capa: min(map(run, cpus)) - for capa, cpus in capacities.items() - } - - - return cls(res_dir, target.plat_info, capa_work) - - def test_capacity_sanity(self) -> ResultBundle: - """ - Assert that higher CPU capacity means more work done - """ - sorted_capacities = sorted(self.capacity_work.keys()) - work = [self.capacity_work[cap] for cap in sorted_capacities] - - # Check the list of work units is monotonically increasing - work_increasing = (work == sorted(work)) - res = ResultBundle.from_bool(work_increasing) - - capa_score = {} - for capacity, work in self.capacity_work.items(): - capa_score[capacity] = TestMetric(work) - - res.add_metric("Capacity to performance", capa_score) - - return res - -# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/scheduler/sched_android.py b/lisa_tests/arm/kernel/scheduler/sched_android.py deleted file mode 100644 index 315e148731..0000000000 --- a/lisa_tests/arm/kernel/scheduler/sched_android.py +++ /dev/null @@ -1,327 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2019, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import os.path -import abc - -from lisa.wlgen.rta import RTAPhase, PeriodicWload -from lisa.tests.base import TestBundleBase, TestBundle, ResultBundle, RTATestBundle, AggregatedResultBundle -from lisa.trace import requires_events -from lisa.target import Target -from lisa.utils import ArtifactPath, kwargs_forwarded_to -from lisa.analysis.frequency import FrequencyAnalysis -from lisa.analysis.tasks import TasksAnalysis - - -class SchedTuneItemBase(RTATestBundle, TestBundle): - """ - Abstract class enabling rtapp execution in a schedtune group - - :param boost: The boost level to set for the cgroup - :type boost: int - - :param prefer_idle: The prefer_idle flag to set for the cgroup - :type prefer_idle: bool - """ - - def __init__(self, res_dir, plat_info, boost, prefer_idle): - super().__init__(res_dir, plat_info) - self.boost = boost - self.prefer_idle = prefer_idle - - @property - def cgroup_configuration(self): - return self.get_cgroup_configuration(self.plat_info, self.boost, self.prefer_idle) - - @classmethod - def get_cgroup_configuration(cls, plat_info, boost, prefer_idle): - attributes = { - 'boost': boost, - 'prefer_idle': int(prefer_idle) - } - return {'name': 'lisa_test', - 'controller': 'schedtune', - 'attributes': attributes} - - @classmethod - # Not annotated, to prevent exekall from picking it up. See - # SchedTuneBase.from_target - def _from_target(cls, target, *, res_dir, boost, prefer_idle, collector=None): - plat_info = target.plat_info - rtapp_profile = cls.get_rtapp_profile(plat_info) - cgroup_config = cls.get_cgroup_configuration(plat_info, boost, prefer_idle) - cls.run_rtapp(target, res_dir, rtapp_profile, collector=collector, cg_cfg=cgroup_config) - - return cls(res_dir, plat_info, boost, prefer_idle) - - -class SchedTuneBase(TestBundleBase): - """ - Abstract class enabling the aggregation of ``SchedTuneItemBase`` - - :param test_bundles: a list of test bundles generated by - multiple ``SchedTuneItemBase`` instances - :type test_bundles: list - """ - - def __init__(self, res_dir, plat_info, test_bundles): - super().__init__(res_dir, plat_info) - - self.test_bundles = test_bundles - - @classmethod - @kwargs_forwarded_to( - SchedTuneItemBase._from_target, - ignore=[ - 'boost', - 'prefer_idle', - ] - ) - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, - collector=None, **kwargs) -> 'SchedTuneBase': - """ - Creates a SchedTuneBase bundle from the target. - """ - return cls(res_dir, target.plat_info, - list(cls._create_test_bundles(target, res_dir, **kwargs)) - ) - - @classmethod - @abc.abstractmethod - def _create_test_bundles(cls, target, res_dir, **kwargs): - """ - Collects and yields a :class:`lisa.tests.base.ResultBundle` per test - item. - """ - - @classmethod - def _create_test_bundle_item(cls, target, res_dir, item_cls, - boost, prefer_idle, **kwargs): - """ - Creates and returns a TestBundle for a given item class, and a given - schedtune configuration - """ - item_dir = ArtifactPath.join(res_dir, f'boost_{boost}_prefer_idle_{int(prefer_idle)}') - os.makedirs(item_dir) - - logger = cls.get_logger() - logger.info(f'Running {item_cls.__name__} with boost={boost}, prefer_idle={prefer_idle}') - return item_cls.from_target(target, - boost=boost, - prefer_idle=prefer_idle, - res_dir=item_dir, - **kwargs, - ) - - -class SchedTuneFreqItem(SchedTuneItemBase): - """ - Runs a tiny RT rtapp task pinned to a big CPU at a given boost level and - checks the frequency selection was performed accordingly. - """ - - @classmethod - def _get_rtapp_profile(cls, plat_info): - cpu = plat_info['capacity-classes'][-1][0] - return { - 'stune': RTAPhase( - prop_wload=PeriodicWload( - # very small task, no impact on freq w/o boost - duty_cycle_pct=1, - duration=10, - period=cls.TASK_PERIOD, - ), - # pin to big CPU, to focus on frequency selection - prop_cpus=[cpu], - # RT tasks have the boost holding feature so the frequency - # should be more stable, and we shouldn't go to max freq in - # Android - prop_policy='SCHED_FIFO' - ) - } - - @FrequencyAnalysis.df_cpu_frequency.used_events - @requires_events(SchedTuneItemBase.trace_window.used_events, "cpu_frequency") - def trace_window(self, trace): - """ - Set the boundaries of the trace window to ``cpu_frequency`` events - before/after the task's start/end time - """ - rta_start, rta_stop = super().trace_window(trace) - - cpu = self.plat_info['capacity-classes'][-1][0] - freq_df = trace.ana.frequency.df_cpu_frequency(cpu) - - # Find the frequency events before and after the task runs - freq_start = freq_df[freq_df.index < rta_start].index[-1] - freq_stop = freq_df[freq_df.index > rta_stop].index[0] - - return (freq_start, freq_stop) - - @FrequencyAnalysis.get_average_cpu_frequency.used_events - def test_stune_frequency(self, freq_margin_pct=10) -> ResultBundle: - """ - Test that frequency selection followed the boost - - :param: freq_margin_pct: Allowed margin between estimated and measured - average frequencies - :type freq_margin_pct: int - - Compute the expected frequency given the boost level and compare to the - real average frequency from the trace. - Check that the difference between expected and measured frequencies is - no larger than ``freq_margin_pct``. - """ - kernel_version = self.plat_info['kernel']['version'] - if kernel_version.parts[:2] < (4, 14): - self.logger.warning(f'This test requires the RT boost hold, but it may be disabled in {kernel_version}') - - cpu = self.plat_info['capacity-classes'][-1][0] - freqs = self.plat_info['freqs'][cpu] - max_freq = max(freqs) - - # Estimate the target frequency, including sugov's margin, and round - # into a real OPP - boost = self.boost - target_freq = min(max_freq, max_freq * boost / 80) - target_freq = list(filter(lambda f: f >= target_freq, freqs))[0] - - # Get the real average frequency - avg_freq = self.trace.ana.frequency.get_average_cpu_frequency(cpu) - - distance = abs(target_freq - avg_freq) * 100 / target_freq - res = ResultBundle.from_bool(distance < freq_margin_pct) - res.add_metric("target freq", target_freq, 'kHz') - res.add_metric("average freq", avg_freq, 'kHz') - res.add_metric("boost", boost, '%') - - return res - - -class SchedTuneFrequencyTest(SchedTuneBase): - """ - Runs multiple ``SchedTuneFreqItem`` tests at various boost levels ranging - from 20% to 100%, then checks all succedeed. - """ - - @classmethod - def _create_test_bundles(cls, target, res_dir, **kwargs): - for boost in range(20, 101, 20): - yield cls._create_test_bundle_item( - target=target, - res_dir=res_dir, - item_cls=SchedTuneFreqItem, - boost=boost, - prefer_idle=False, - **kwargs - ) - - def test_stune_frequency(self, freq_margin_pct=10) -> AggregatedResultBundle: - """ - .. seealso:: :meth:`SchedTuneFreqItem.test_stune_frequency` - """ - item_res_bundles = [ - item.test_stune_frequency(freq_margin_pct) - for item in self.test_bundles - ] - return AggregatedResultBundle(item_res_bundles, 'boost') - - -class SchedTunePlacementItem(SchedTuneItemBase): - """ - Runs a tiny RT-App task marked 'prefer_idle' at a given boost level and - tests if it was placed on big-enough CPUs. - """ - - @classmethod - def _get_rtapp_profile(cls, plat_info): - return { - 'stune': RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=1, - duration=3, - period=cls.TASK_PERIOD, - ) - ) - } - - @TasksAnalysis.df_task_total_residency.used_events - def test_stune_task_placement(self, bad_cpu_margin_pct=10) -> ResultBundle: - """ - Test that the task placement satisfied the boost requirement - - Check that top-app tasks spend no more than ``bad_cpu_margin_pct`` of - their time on CPUs that don't have enough capacity to serve their - boost. - """ - assert len(self.rtapp_tasks) == 1 - task = self.rtapp_tasks[0] - df = self.trace.ana.tasks.df_task_total_residency(task) - - # Find CPUs without enough capacity to meet the boost - boost = self.boost - cpu_caps = self.plat_info['cpu-capacities']['rtapp'] - ko_cpus = list(filter(lambda x: (cpu_caps[x] / 10.24) < boost, cpu_caps)) - - # Count how much time was spend on wrong CPUs - time_ko = 0 - total_time = 0 - for cpu in cpu_caps: - t = df['runtime'][cpu] - if cpu in ko_cpus: - time_ko += t - total_time += t - - pct_ko = time_ko * 100 / total_time - res = ResultBundle.from_bool(pct_ko < bad_cpu_margin_pct) - res.add_metric("time spent on inappropriate CPUs", pct_ko, '%') - res.add_metric("boost", boost, '%') - - return res - - -class SchedTunePlacementTest(SchedTuneBase): - """ - Runs multiple ``SchedTunePlacementItem`` tests with prefer_idle set and - typical top-app boost levels, then checks all succedeed. - """ - - @classmethod - def _create_test_bundles(cls, target, res_dir, **kwargs): - # Typically top-app tasks are boosted by 10%, or 50% during touchboost - for boost in [10, 50]: - yield cls._create_test_bundle_item( - target=target, - res_dir=res_dir, - item_cls=SchedTunePlacementItem, - boost=boost, - prefer_idle=True, - **kwargs - ) - - def test_stune_task_placement(self, margin_pct=10) -> AggregatedResultBundle: - """ - .. seealso:: :meth:`SchedTunePlacementItem.test_stune_task_placement` - """ - item_res_bundles = [ - item.test_stune_task_placement(margin_pct) - for item in self.test_bundles - ] - return AggregatedResultBundle(item_res_bundles, 'boost') - -# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/scheduler/util_tracking.py b/lisa_tests/arm/kernel/scheduler/util_tracking.py deleted file mode 100644 index ecd5c0df52..0000000000 --- a/lisa_tests/arm/kernel/scheduler/util_tracking.py +++ /dev/null @@ -1,401 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2019, ARM Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import functools - -import holoviews as hv - -from lisa.tests.base import ResultBundle, TestBundle, RTATestBundle -from lisa.target import Target -from lisa.utils import ArtifactPath, namedtuple -from lisa.wlgen.rta import RTAPhase, PeriodicWload, DutyCycleSweepPhase -from lisa.trace import requires_events -from lisa.analysis.rta import RTAEventsAnalysis -from lisa.analysis.tasks import TaskState, TasksAnalysis -from lisa.analysis.load_tracking import LoadTrackingAnalysis -from lisa.datautils import df_window, df_refit_index, series_mean, df_filter_task_ids - -from lisa_tests.arm.kernel.scheduler.load_tracking import LoadTrackingHelpers - - -class UtilTrackingBase(RTATestBundle, LoadTrackingHelpers, TestBundle): - """ - Base class for shared functionality of utilization tracking tests - """ - - @classmethod - def _from_target(cls, - target: Target, *, - res_dir: ArtifactPath = None, - collector=None, - ) -> 'UtilTrackingBase': - plat_info = target.plat_info - rtapp_profile = cls.get_rtapp_profile(plat_info) - - # After a bit of experimenting, it turns out that on some platforms - # misprediction of the idle time (which leads to a shallow idle state, - # a wakeup and another idle nap) can mess up the duty cycle of the - # rt-app task we're running. In our case, a 50% duty cycle, 16ms period - # task would always be active for 8ms, but it would sometimes sleep for - # only 5 or 6 ms. - # This is fine to do this here, as we only care about the proper - # behaviour of the signal on running/not-running tasks. - with target.disable_idle_states(): - with target.cpufreq.use_governor('performance'): - cls.run_rtapp(target, res_dir, rtapp_profile, collector=collector) - - return cls(res_dir, plat_info) - - -PhaseStats = namedtuple("PhaseStats", - ['start', 'end', 'mean_util', 'mean_enqueued', 'mean_ewma', 'issue'], - module=__name__, -) - - -ActivationSignals = namedtuple("ActivationSignals", [ - 'time', 'util', 'enqueued', 'ewma', 'issue'], - module=__name__, -) - - -class UtilConvergence(UtilTrackingBase): - """ - Basic checks for estimated utilization signals. - - .. attention:: Tests methods of this class assume the kernel has the util - est EWMA fast ramp behavior, which was merged in v5.5, and backported on - Android Common Kernel 4.19 and 5.4. The feature was introduced in - mainline in:: - - commit b8c96361402aa3e74ad48ceef18aed99153d8da8 - Author: Patrick Bellasi - Date: Wed Oct 23 21:56:30 2019 +0100 - - sched/fair/util_est: Implement faster ramp-up EWMA on utilization increases - - **Expected Behaviour:** - - The estimated utilization of a task is properly computed starting form its - `util` value at the end of each activation. - - Two signals composes the estimated utlization of a task: - - * `enqueued` : is expected to match the max between `util` and - `ewma` at the end of the previous activation - - * `ewma` : is expected to track an Exponential Weighted Moving - Average of the `util` signal sampled at the end of each activation. - - Based on these two invariant, this class provides a set of tests to verify - these conditions using different methods and sampling points. - """ - - @classmethod - def _get_rtapp_profile(cls, plat_info): - big_cpu = plat_info["capacity-classes"][-1][0] - - return { - 'test': ( - # Big task - RTAPhase( - prop_name='stable', - prop_wload=PeriodicWload( - duty_cycle_pct=75, - duration=5, - period=200e-3, - ), - prop_cpus=[big_cpu], - ) + - # Ramp Down - DutyCycleSweepPhase( - prop_name='ramp_down', - start=50, - stop=5, - step=20, - duration=1, - duration_of='step', - period=200e-3, - prop_cpus=[big_cpu], - ) + - # Ramp Up - DutyCycleSweepPhase( - prop_name='ramp_up', - start=10, - stop=60, - step=20, - duration=1, - duration_of='step', - period=200e-3, - prop_cpus=[big_cpu] - ) - ) - } - - @property - def fast_ramp(self): - # If someone wants to check the behavior pre-fast-ramp-up, this would - # need to be set to False. - # Note that no-one has been checking this other path in a while, so - # it's quite likely the test would need fixing anyway - return True - - def _plot_signals(self, task, test, failures): - ana = self.trace.ana( - task=task, - backend='bokeh', - ) - fig = ( - ana.load_tracking.plot_task_signals( - signals=['util', 'enqueued', 'ewma'] - ) * - ana.rta.plot_phases() * - hv.Overlay([ - hv.VLine(x).options( - alpha=0.5, - color='red', - ) - for x in failures - ]) - ).options( - title='UtilConvergence debug plot', - ) - - self._save_debug_plot(fig, name=f'util_est_{test}') - return fig - - @requires_events('sched_util_est_se') - @LoadTrackingAnalysis.df_tasks_signal.used_events - @RTAEventsAnalysis.task_phase_windows.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_means(self) -> ResultBundle: - """ - Test signals are properly "dominated". - - The mean of `enqueued` is expected to be always not - smaller than that of `util`, since this last is subject to decays - while the first not. - - The mean of `enqueued` is expected to be always greater or - equal than the mean of `util`, since this `util` is subject - to decays while `enqueued` not. - - On fast-ramp systems, the `ewma` signal is never smaller then - the `enqueued`, thus his mean is expected to be bigger. - - On non fast-ramp systems instead, the `ewma` is expected to be - smaller then `enqueued` in ramp-up phases, or bigger in - ramp-down phases. - - Those conditions are checked on a single execution of a task which has - three main behaviours: - - * STABLE: periodic big task running for a relatively long period to - ensure `util` saturation. - * DOWN: periodic ramp-down task, to slowly decay `util` - * UP: periodic ramp-up task, to slowly increase `util` - - """ - failure_reasons = {} - metrics = {} - - task = self.rtapp_task_ids_map['test'][0] - - ue_df = self.trace.df_event('sched_util_est_se') - ue_df = df_filter_task_ids(ue_df, [task]) - ua_df = self.trace.ana.load_tracking.df_task_signal(task, 'util') - - failures = [] - for phase in self.trace.ana.rta.task_phase_windows(task, wlgen_profile=self.rtapp_profile): - if not phase.properties['meta']['from_test']: - continue - - apply_phase_window = functools.partial(df_refit_index, window=(phase.start, phase.end)) - - ue_phase_df = apply_phase_window(ue_df) - mean_enqueued = series_mean(ue_phase_df['enqueued']) - mean_ewma = series_mean(ue_phase_df['ewma']) - - ua_phase_df = apply_phase_window(ua_df) - mean_util = series_mean(ua_phase_df['util']) - - def make_issue(msg): - return msg.format( - util=f'util={mean_util}', - enq=f'enqueued={mean_enqueued}', - ewma=f'ewma={mean_ewma}', - ) - - issue = None - if mean_enqueued < mean_util: - issue = make_issue('{enq} smaller than {util}') - - # Running on FastRamp kernels: - elif self.fast_ramp: - - # STABLE, DOWN and UP: - if mean_ewma < mean_enqueued: - issue = make_issue('no fast ramp: {ewma} smaller than {enq}') - - # Running on (legacy) non FastRamp kernels: - else: - - # STABLE: ewma ramping up - if phase.id.startswith('test/stable'): - if mean_ewma > mean_enqueued: - issue = make_issue('fast ramp, stable: {ewma} bigger than {enq}') - - # DOWN: ewma ramping down - elif phase.id.startswith('test/ramp_down'): - if mean_ewma < mean_enqueued: - issue = make_issue('fast ramp, down: {ewma} smaller than {enq}') - - # UP: ewma ramping up - elif phase.id.startswith('test/ramp_up'): - if mean_ewma > mean_enqueued: - issue = make_issue('fast ramp, up: {ewma} bigger than {enq}') - - metrics[phase.id] = PhaseStats( - phase.start, phase.end, mean_util, mean_enqueued, mean_ewma, issue - ) - - failures = [ - (phase, stat) - for phase, stat in metrics.items() - if stat.issue - ] - - # Plot signals to support debugging analysis - self._plot_signals(task, 'means', sorted(stat.start for phase, stat in failures)) - - bundle = ResultBundle.from_bool(not failures) - bundle.add_metric("fast ramp", self.fast_ramp) - bundle.add_metric("phases", metrics) - bundle.add_metric("failures", sorted(phase for phase, stat in failures)) - return bundle - - @requires_events('sched_util_est_se') - @TasksAnalysis.df_task_states.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_activations(self) -> ResultBundle: - """ - Test signals are properly "aggregated" at enqueue/dequeue time. - - On fast-ramp systems, `enqueued` is expected to be always - smaller than `ewma`. - - On non fast-ramp systems, the `enqueued` is expected to be - smaller then `ewma` in ramp-down phases, or bigger in ramp-up - phases. - - Those conditions are checked on a single execution of a task which has - three main behaviours: - - * STABLE: periodic big task running for a relatively long period to - ensure `util` saturation. - * DOWN: periodic ramp-down task, to slowly decay `util` - * UP: periodic ramp-up task, to slowly increase `util` - - """ - metrics = {} - task = self.rtapp_task_ids_map['test'][0] - - # Get list of task's activations - df = self.trace.ana.tasks.df_task_states(task) - activations = df[ - (df.curr_state == TaskState.TASK_WAKING) & - (df.next_state == TaskState.TASK_ACTIVE) - ].index - - # Check task signals at each activation - df = self.trace.df_event('sched_util_est_se') - df = df_filter_task_ids(df, [task]) - - - for idx, activation in enumerate(activations): - - # Get the value of signals at their first update after the activation - row = df_window(df, (activation, None), method='post').iloc[0] - # It can happen that the first updated after the activation is - # actually in the next phase, in which case we need to check the - # util values against the right phase - activation = row.name - - # If we are outside a phase, ignore the activation - try: - phase = self.trace.ana.rta.task_phase_at(task, activation, wlgen_profile=self.rtapp_profile) - except KeyError: - continue - - util = row['util'] - enq = row['enqueued'] - ewma = row['ewma'] - def make_issue(msg): - return msg.format( - util=f'util={util}', - enq=f'enqueued={enq}', - ewma=f'ewma={ewma}', - ) - - issue = None - - # UtilEst is not updated when within 1% of previous activation - if 1.01 * enq < util: - issue = make_issue('{enq} smaller than {util}') - - # Running on FastRamp kernels: - elif self.fast_ramp: - - # ewma stable, down and up - if enq > ewma: - issue = make_issue('{enq} bigger than {ewma}') - - # Running on (legacy) non FastRamp kernels: - else: - if not phase.properties['meta']['from_test']: - continue - - # ewma stable - if phase.id.startswith('test/stable'): - if enq < ewma: - issue = make_issue('stable: {enq} smaller than {ewma}') - - # ewma ramping down - elif phase.id.startswith('test/ramp_down'): - if enq > ewma: - issue = make_issue('ramp down: {enq} bigger than {ewma}') - - # ewma ramping up - elif phase.id.startswith('test/ramp_up'): - if enq < ewma: - issue = make_issue('ramp up: {enq} smaller than {ewma}') - - metrics[idx] = ActivationSignals(activation, util, enq, ewma, issue) - - failures = [ - (idx, activation_signals) - for idx, activation_signals in metrics.items() - if activation_signals.issue - ] - - bundle = ResultBundle.from_bool(not failures) - bundle.add_metric("failures", sorted(idx for idx, activation in failures)) - bundle.add_metric("activations", metrics) - - failures_time = [activation.time for idx, activation in failures] - self._plot_signals(task, 'activations', failures_time) - return bundle diff --git a/lisa_tests/arm/kernel/staging/PLACEHOLDER b/lisa_tests/arm/kernel/staging/PLACEHOLDER deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/lisa_tests/arm/kernel/staging/numa_behaviour.py b/lisa_tests/arm/kernel/staging/numa_behaviour.py deleted file mode 100644 index c743b4b83c..0000000000 --- a/lisa_tests/arm/kernel/staging/numa_behaviour.py +++ /dev/null @@ -1,112 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2019, Linaro and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from lisa.wlgen.rta import RTAPhase, PeriodicWload -from lisa.tests.base import ResultBundle, TestBundle, RTATestBundle, TestMetric -from lisa.datautils import df_deduplicate -from lisa.analysis.tasks import TasksAnalysis - -class NUMABehaviour(RTATestBundle, TestBundle): - """ - Abstract class for NUMA related scheduler testing. - """ - @classmethod - def check_from_target(cls, target): - super().check_from_target(target) - if target.number_of_nodes < 2: - ResultBundle.raise_skip( - "Target doesn't have at least two NUMA nodes") - - @TasksAnalysis.df_task_states.used_events - def _get_task_cpu_df(self, task_id): - """ - Get a DataFrame for task migrations - - Use the sched_switch trace event to find task migration from one CPU to another. - - :returns: A Pandas DataFrame for the task, showing the - CPU's that the task was migrated to - """ - df = self.trace.ana.tasks.df_task_states(task_id) - cpu_df = df_deduplicate(df, cols=['cpu'], keep='first', consecutives=True) - - return cpu_df - - @_get_task_cpu_df.used_events - def test_task_remains(self) -> ResultBundle: - """ - Test that task remains on the same core - """ - test_passed = True - metrics = {} - - for task_id in self.rtapp_task_ids: - cpu_df = self._get_task_cpu_df(task_id) - core_migrations = len(cpu_df.index) - metrics[task_id] = TestMetric(core_migrations) - - # Ideally, task with 50% utilization - # should stay on the same core - if core_migrations > 1: - test_passed = False - - res = ResultBundle.from_bool(test_passed) - res.add_metric("Migrations", metrics) - - return res - -class NUMASmallTaskPlacement(NUMABehaviour): - """ - A single task with 50% utilization - """ - - task_prefix = "tsk" - - @classmethod - def _get_rtapp_profile(cls, plat_info): - return { - cls.task_prefix: RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=50, - duration=30, - period=cls.TASK_PERIOD - ) - ) - } - -class NUMAMultipleTasksPlacement(NUMABehaviour): - """ - Multiple tasks with 50% utilization - """ - task_prefix = "tsk" - - @classmethod - def _get_rtapp_profile(cls, plat_info): - # Four CPU's is enough to demonstrate task migration problem - cpu_count = min(4, plat_info["cpus-count"]) - - return { - f"{cls.task_prefix}{cpu}": RTAPhase( - prop_wload=PeriodicWload( - duty_cycle_pct=50, - duration=30, - period=cls.TASK_PERIOD - ) - ) - for cpu in range(cpu_count) - } -# vim :set tabstop=4 shiftwidth=4 textwidth=80 expandtab diff --git a/lisa_tests/arm/kernel/staging/schedutil.py b/lisa_tests/arm/kernel/staging/schedutil.py deleted file mode 100644 index 9985fad7ed..0000000000 --- a/lisa_tests/arm/kernel/staging/schedutil.py +++ /dev/null @@ -1,349 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2019, ARM Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from math import ceil -import itertools - -import pandas as pd -import holoviews as hv - -from lisa.wlgen.rta import DutyCycleSweepPhase -from lisa.tests.base import ResultBundle, Result, TestBundle, RTATestBundle -from lisa.target import Target -from lisa.trace import requires_events -from lisa.datautils import df_merge, series_mean -from lisa.utils import ArtifactPath - -from lisa.notebook import plot_signal -from lisa.analysis.frequency import FrequencyAnalysis -from lisa.analysis.load_tracking import LoadTrackingAnalysis -from lisa.analysis.rta import RTAEventsAnalysis -from lisa.analysis.tasks import TasksAnalysis, TaskState - - -class RampBoostTestBase(RTATestBundle, TestBundle): - """ - Test schedutil's ramp boost feature. - """ - - def __init__(self, res_dir, plat_info, cpu, rtapp_profile_kwargs=None): - super().__init__(res_dir, plat_info, rtapp_profile_kwargs=rtapp_profile_kwargs) - self.cpu = cpu - - @requires_events('cpu_idle', 'cpu_frequency', 'sched_wakeup') - def estimate_nrg(self): - return self.plat_info['nrg-model'].estimate_from_trace(self.trace).sum(axis=1) - - def get_avg_slack(self, only_negative=False): - analysis = self.trace.ana.rta - - def get_slack(task): - series = analysis.df_rtapp_stats(task)['slack'] - if only_negative: - series = series[series < 0] - - if series.empty: - return 0 - else: - # average negative slack across all activations - return series.mean() - - return { - task: get_slack(task) - for task in self.trace.ana.rta.rtapp_tasks - } - - @LoadTrackingAnalysis.df_cpus_signal.used_events - @requires_events('schedutil_em') - def df_ramp_boost(self): - """ - Return a dataframe with schedutil-related signals, sampled at the - frequency decisions timestamps for the CPU this bundle was executed on. - - .. note:: The computed columns only take into account the CPU the test - was executing on. It currently does not handle multi-task workloads. - """ - trace = self.trace - cpu = self.cpu - task = self.rtapp_task_ids[0] - - # schedutil_df also has a 'util' column that would conflict - schedutil_df = trace.df_event('schedutil_em')[['cpu', 'cost_margin', 'base_freq']] - schedutil_df = schedutil_df.copy() - schedutil_df['from_schedutil'] = True - - def compute_base_cost(row): - freq = row['base_freq'] - cpu = row['cpu'] - - em = self.plat_info['nrg-model'] - active_states = em.cpu_nodes[cpu].active_states - freqs = sorted(active_states.keys()) - max_freq = max(freqs) - - def cost(freq): - higher_freqs = list(itertools.dropwhile(lambda f: f < freq, freqs)) - freq = freqs[-1] if not higher_freqs else higher_freqs[0] - active_state = active_states[freq] - return active_state.power * max_freq / freq - - max_cost = max( - cost(freq) - for freq in active_states.keys() - ) - - return cost(freq) / max_cost * 100 - - schedutil_df['base_cost'] = schedutil_df.apply(compute_base_cost, axis=1) - - task_active = trace.ana.tasks.df_task_states(task)['curr_state'] - task_active = task_active.apply(lambda state: int(state == TaskState.TASK_ACTIVE)) - task_active = task_active.reindex(schedutil_df.index, method='ffill') - # Assume task active == CPU active, since there is only one task - assert len(self.rtapp_task_ids) == 1 - cpu_active_df = pd.DataFrame({'cpu_active': task_active}) - cpu_active_df['cpu'] = cpu - cpu_active_df.dropna(inplace=True) - - df_list = [ - schedutil_df, - trace.ana.load_tracking.df_cpus_signal('util'), - trace.ana.load_tracking.df_cpus_signal('enqueued'), - cpu_active_df, - ] - - df = df_merge(df_list, filter_columns={'cpu': cpu}) - df['from_schedutil'].fillna(value=False, inplace=True) - df.ffill(inplace=True) - df.dropna(inplace=True) - - # Reconstitute how schedutil sees signals by subsampling the - # "main" dataframe, so we can look at signals coming from other - # dataframes - df = df[df['from_schedutil'] == True] # pylint: disable=singleton-comparison - df.drop(columns=['from_schedutil'], inplace=True) - - # If there are some NaN at the beginning, just drop some data rather - # than using fake data - df.dropna(inplace=True) - - boost_points = ( - # util_est_enqueued is the same as last freq update - (df['enqueued'].diff() == 0) & - - # util_avg is increasing - (df['util'].diff() >= 0) & - - # util_avg > util_est_enqueued - (df['util'] > df['enqueued']) & - - # CPU is not idle - (df['cpu_active']) - ) - df['boost_points'] = boost_points - - df['expected_cost_margin'] = (df['util'] - df['enqueued']).where( - cond=boost_points, - other=0, - ) - - # cost_margin values range from 0 to 1024 - ENERGY_SCALE = 1024 - - for col in ('expected_cost_margin', 'cost_margin'): - df[col] *= 100 / ENERGY_SCALE - - df['allowed_cost'] = df['base_cost'] + df['cost_margin'] - - # We cannot know if the first row is supposed to be boosted or not - # because we lack history, so we just drop it - return df.iloc[1:] - - @FrequencyAnalysis.plot_cpu_frequencies.used_events - @TasksAnalysis.plot_tasks_activation.used_events - @LoadTrackingAnalysis.plot_task_signals.used_events - def _plot_test_boost(self, df): - task, = self.rtapp_tasks - ana = self.trace.ana( - task=task, - ) - - fig = hv.Layout( - [ - ( - plot_signal(df['cost_margin']).options( - 'Curve', - color='red' - ) * - plot_signal(df['boost_points'].astype(int)).options( - 'Curve', - color='black' - ) * - plot_signal(df['expected_cost_margin']).options( - 'Curve', - color='blue' - ) * - plot_signal(df['base_cost']).options( - 'Curve', - color='orange' - ) * - plot_signal(df['allowed_cost']).options( - 'Curve', - color='green' - ) * - ana.tasks.plot_tasks_activation(overlay=True) - ).options( - title='Ramp boost for 5% => 75% util step', - ylabel='Cost (% of max cost)', - ), - - ana.frequency.plot_cpu_frequencies(cpu=self.cpu, average=False), - - ( - ana.load_tracking.plot_task_signals( - signals=['util', 'enqueued'], - colors=['orange', 'red'] - ) * - ana.tasks.plot_tasks_activation(overlay=True) - ), - ] - ).cols(1) - - self._save_debug_plot(fig, name=f'ramp_boost') - return fig - - @RTAEventsAnalysis.plot_slack_histogram.used_events - @RTAEventsAnalysis.plot_perf_index_histogram.used_events - @RTAEventsAnalysis.plot_latency.used_events - @df_ramp_boost.used_events - @_plot_test_boost.used_events - def test_ramp_boost(self, cost_threshold_pct=0.1, bad_samples_threshold_pct=0.1) -> ResultBundle: - """ - Test that the energy boost feature is triggering as expected. - """ - # If there was no cost_margin sample to look at, that means boosting - # was not exhibited by that test so we cannot conclude anything - df = self.df_ramp_boost() - self._plot_test_boost(df) - - if df.empty: - return ResultBundle(Result.UNDECIDED) - - # Make sure the boost is always positive (negative cannot really happen - # since the kernel is using unsigned arithmetic, but still check in - # case there are some dataframe handling issues) - assert not (df['expected_cost_margin'] < 0).any() - assert not (df['cost_margin'] < 0).any() - - # "rect" method is accurate here since the signal is really following - # "post" steps - expected_boost_cost = series_mean(df['expected_cost_margin']) - actual_boost_cost = series_mean(df['cost_margin']) - boost_overhead = series_mean(df['cost_margin'] / df['base_cost'] * 100) - - # Check that the total amount of boost is close to expectations - lower = max(0, expected_boost_cost - cost_threshold_pct) - higher = expected_boost_cost - passed_overhead = lower <= actual_boost_cost <= higher - - # Check the shape of the signal: actual boost must be lower or equal - # than the expected one. - good_shape_nr = (df['cost_margin'] <= df['expected_cost_margin']).sum() - - df_len = len(df) - bad_shape_nr = df_len - good_shape_nr - bad_shape_pct = bad_shape_nr / df_len * 100 - - # Tolerate a few bad samples that added too much boost - passed_shape = bad_shape_pct < bad_samples_threshold_pct - - passed = passed_overhead and passed_shape - res = ResultBundle.from_bool(passed) - res.add_metric('expected boost cost', expected_boost_cost, '%') - res.add_metric('boost cost', actual_boost_cost, '%') - res.add_metric('boost overhead', boost_overhead, '%') - res.add_metric('bad boost samples', bad_shape_pct, '%') - - # Add some slack metrics and plots - analysis = self.trace.ana.rta - for task in self.rtapp_tasks: - analysis.plot_slack_histogram(task) - analysis.plot_perf_index_histogram(task) - analysis.plot_latency(task) - - res.add_metric('avg slack', self.get_avg_slack(), 'us') - res.add_metric('avg negative slack', self.get_avg_slack(only_negative=True), 'us') - - return res - - -class LargeStepUp(RampBoostTestBase): - """ - A single task whose utilization rises extremely quickly - """ - task_name = "step_up" - - def __init__(self, res_dir, plat_info, cpu, nr_steps): - rtapp_profile_kwargs = dict( - cpu=cpu, - nr_steps=nr_steps, - ) - super().__init__( - res_dir, - plat_info, - cpu=cpu, - rtapp_profile_kwargs=rtapp_profile_kwargs, - ) - self.nr_steps = nr_steps - - @classmethod - def _from_target(cls, target: Target, *, res_dir: ArtifactPath = None, collector=None, cpu=None, nr_steps=1) -> 'LargeStepUp': - plat_info = target.plat_info - - # Use a big CPU by default to allow maximum range of utilization - cpu = cpu if cpu is not None else plat_info["capacity-classes"][-1][0] - - rtapp_profile = cls.get_rtapp_profile(plat_info, cpu=cpu, nr_steps=nr_steps) - - # Ensure accurate duty cycle and idle state misprediction on some - # boards. This helps having predictable execution. - with target.disable_idle_states(): - with target.cpufreq.use_governor("schedutil"): - cls.run_rtapp(target, res_dir, rtapp_profile, collector=collector) - - return cls(res_dir, plat_info, cpu, nr_steps) - - @classmethod - def _get_rtapp_profile(cls, plat_info, cpu, nr_steps, min_util=5, max_util=75): - start_pct = cls.unscaled_utilization(plat_info, cpu, min_util) - end_pct = cls.unscaled_utilization(plat_info, cpu, max_util) - - delta_pct = ceil((end_pct - start_pct) / nr_steps) - - return { - cls.task_name: 20 * DutyCycleSweepPhase( - start=start_pct, - stop=end_pct, - step=delta_pct, - duration=0.3, - duration_of='step', - period=cls.TASK_PERIOD, - # Make sure we run on one CPU only, so that we only stress - # frequency scaling and not placement. - prop_cpus=[cpu], - ) - } diff --git a/lisa_tests/arm/kernel/staging/utilclamp.py b/lisa_tests/arm/kernel/staging/utilclamp.py deleted file mode 100644 index c34f4e0376..0000000000 --- a/lisa_tests/arm/kernel/staging/utilclamp.py +++ /dev/null @@ -1,442 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2020, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import functools -from operator import itemgetter - -import numpy as np -import pandas as pd -import holoviews as hv - -from lisa.analysis.frequency import FrequencyAnalysis -from lisa.analysis.load_tracking import LoadTrackingAnalysis -from lisa.datautils import df_refit_index, series_mean -from lisa.pelt import PELT_SCALE -from lisa.tests.base import ResultBundle, TestBundle, RTATestBundle, TestMetric -from lisa.wlgen.rta import RTAPhase, PeriodicWload -from lisa.notebook import plot_signal - - -class UtilClamp(RTATestBundle, TestBundle): - """ - Validate that UtilClamp min values are honoured properly by the kernel. - - The test is split into 8 phases. For each phase, a UtilClamp value is set - for a task, whose duty cycle would generate a lower utilization. Then the - actual capacity, allocated to the task during its activation is checked. - - The 8 phases UtilClamp values are picked to cover the entire SoC's CPU - scale. (usually between 0 and 1024) - - .. code-block:: text - - |<-- band 0 -->|<-- band 1 -->|<-- band 2 -->|<-- ... - capacities: 0 | 128 | 256 512 - | | - --------------------|--------------|------------------------------- - phase 1: uclamp_val | - | - -----------------------------------|------------------------------- - phase 2: uclamp_val - ... - - phase 8: - - """ - - NR_PHASES = 8 - CAPACITY_MARGIN = 0.8 # kernel task placement a 80% capacity margin - - @classmethod - def check_from_target(cls, target): - super().check_from_target(target) - kconfig = target.plat_info['kernel']['config'] - if not kconfig.get('UCLAMP_TASK'): - ResultBundle.raise_skip("The target's kernel needs CONFIG_UCLAMP_TASK=y kconfig enabled") - - @classmethod - def _collect_capacities(cls, plat_info): - """ - Returns, for each CPU a mapping frequency / capacity: - - dict(cpu, dict(freq, capacity)) - - where capacity = max_cpu_capacity * freq / max_cpu_frequency. - """ - - max_capacities = plat_info['cpu-capacities']['rtapp'] - capacity_classes = plat_info['capacity-classes'] - - capacities = { - cpu: { - freq: int(max_capacities[cpu] * freq / max(freqs)) - for freq in freqs - } - for cpu, freqs in plat_info['freqs'].items() - } - - - # Ensure there is no overlap between CPUs by ignoring all capacities - # that are lower than the max capacity of CPUs with lower max cap. For - # example, the capacities of a big CPU that will be considered will - # always be higher than the capacities of any LITTLE. - # - # This avoids choosing any uclamp value that could be placed on one CPU - # or another. - for cpu, max_cap in max_capacities.items(): - for _cpu, _max_cap in max_capacities.items(): - if _max_cap > max_cap: - capacities[_cpu] = { - freq: cap - for freq, cap in capacities[_cpu].items() - if cap >= max_cap - } - - return capacities - - - @classmethod - def _collect_capacity_classes(cls, plat_info): - return sorted(set( - tuple(sorted(freq_capas.values())) - for freq_capas in cls._collect_capacities(plat_info).values() - )) - - @classmethod - def _get_bands(cls, capacity_classes): - - def get_bands(capacities): - bands = list(zip(capacities, capacities[1:])) - - # Pick the bands covering the widest range of util, since they - # are easier to test - bands = sorted( - bands, - key=lambda band: band[1] - band[0], - reverse=True - ) - bands = bands[:cls.NR_PHASES] - bands = sorted(bands, key=itemgetter(0)) - - return bands - - return [ - band - for capacities in capacity_classes - for band in get_bands(capacities) - ] - - @classmethod - def _get_phases(cls, plat_info): - """ - Returns a list of phases. Each phase being described by a tuple: - - (uclamp_val, util) - """ - - capacity_classes = cls._collect_capacity_classes(plat_info) - bands = cls._get_bands(capacity_classes) - - def band_mid(band): - return int((band[1] + band[0]) / 2) - - def make_phase(band): - uclamp = band_mid(band) - # We don't ask for the middle of the band, we ask for the util that - # will map to a frequency in the middle of the band when processed - # by schedutil - uclamp *= cls.CAPACITY_MARGIN - util = uclamp / 2 - - uclamp = int(uclamp) - name = f'uclamp-{uclamp}' - return (name, (uclamp, util)) - - return dict(map(make_phase, bands)) - - @classmethod - def _get_rtapp_profile(cls, plat_info): - periods = [ - RTAPhase( - prop_name=name, - prop_wload=PeriodicWload( - duty_cycle_pct=(util / PELT_SCALE) * 100, # util to pct - duration=5, - period=cls.TASK_PERIOD, - ), - prop_uclamp=(uclamp_val, uclamp_val), - prop_meta={'uclamp_val': uclamp_val}, - ) - for name, (uclamp_val, util) in cls._get_phases(plat_info).items() - ] - - return {'task': functools.reduce(lambda a, b: a + b, periods)} - - def _get_trace_df(self): - task = self.rtapp_task_ids_map['task'][0] - - # There is no CPU selection when we're going back from preemption. - # Setting preempted_value=1 ensures that it won't count as a new - # activation. - df = self.trace.ana.tasks.df_task_activation(task, - preempted_value=1) - df = df_refit_index(df, window=self.trace.window) - df = df[['active', 'cpu']] - df['activation_start'] = df['active'] == 1 - - df_freq = self.trace.ana.frequency.df_cpus_frequency() - df_freq = df_freq[['cpu', 'frequency']] - df_freq = df_freq.pivot(columns='cpu', values='frequency') - df_freq.reset_index(inplace=True) - df_freq.set_index('Time', inplace=True) - - df = df.merge(df_freq, how='outer', left_index=True, right_index=True) - - # Merge with df_freq will bring NaN in the activation column. We do not - # want to ffill() them. - df['activation_start'].fillna(value=False, inplace=True) - - # Ensures that frequency values are propogated through the entire - # DataFrame, as it is possible that no frequency event occur - # during a phase. - df.ffill(inplace=True) - - return df - - def _get_phases_df(self): - task = self.rtapp_task_ids_map['task'][0] - - df = self.trace.ana.rta.df_phases(task, wlgen_profile=self.rtapp_profile) - df = df.copy() - df = df[df['properties'].apply(lambda props: props['meta']['from_test'])] - df.reset_index(inplace=True) - df.rename(columns={'index': 'start'}, inplace=True) - df['end'] = df['start'].shift(-1) - df['uclamp_val'] = df['properties'].apply(lambda row: row['meta']['uclamp_val']) - return df - - def _for_each_phase(self, callback): - df_phases = self._get_phases_df() - df_trace = self._get_trace_df() - - def parse_phase(phase): - start = phase['start'] - end = phase['end'] - df = df_trace - - # During a phase change, rt-app will wakeup and then change - # UtilClamp value will be changed. We then need to wait for the - # second wakeup for the kernel to apply the most recently set - # UtilClamp value. - start = df[(df.index >= start) & - (df['active'] == 1)].first_valid_index() - - end = end if not np.isnan(end) else df.last_valid_index() - - if (start > end): - raise ValueError('Phase ends before it has even started') - - df = df_trace[start:end].copy() - - return callback(df, phase) - - return df_phases.apply(parse_phase, axis=1) - - def _plot_phases(self, test, failures, signals=None): - task, = self.rtapp_task_ids - ana = self.trace.ana( - task=task, - tasks=[task], - ) - figs = [ - ( - ana.tasks.plot_tasks_activation( - overlay=True, - which_cpu=True - ) * - ana.rta.plot_phases(wlgen_profile=self.rtapp_profile) * - hv.Overlay( - [ - hv.VLine(failure).options( - alpha=0.5, - color='red' - ) - for failure in failures - ] - ) - ), - ] - if signals is not None: - figs.append( - hv.Overlay([ - plot_signal(signals[signal]).opts(responsive=True, height=400) - for signal in signals.columns - ]) - ) - - fig = hv.Layout(figs).cols(1) - - self._save_debug_plot(fig, name=f'utilclamp_{test}') - return fig - - @FrequencyAnalysis.df_cpus_frequency.used_events - @LoadTrackingAnalysis.df_tasks_signal.used_events - def test_placement(self) -> ResultBundle: - """ - For each phase, checks if the task placement is compatible with - UtilClamp requirements. This is done by comparing the maximum capacity - of the CPU on which the task has been placed, with the UtilClamp - value. - """ - - metrics = {} - test_failures = [] - cpu_max_capacities = self.plat_info['cpu-capacities']['rtapp'] - - def parse_phase(df, phase): - # Only keep the activations - df = df[df['activation_start']] - - uclamp_val = phase['uclamp_val'] - num_activations = len(df.index) - cpus = set(map(int, df['cpu'].dropna().unique())) - fitting_cpus = { - cpu - for cpu, cap in cpu_max_capacities.items() - if (cap == PELT_SCALE) or cap > uclamp_val - } - - failures = df[(df['cpu'].isin(cpus - fitting_cpus))].index.tolist() - num_failures = len(failures) - test_failures.extend(failures) - - metrics[phase['phase']] = { - 'uclamp-min': TestMetric(uclamp_val), - 'cpu-placements': TestMetric(cpus), - 'expected-cpus': TestMetric(fitting_cpus), - 'bad-activations': TestMetric( - num_failures * 100 / num_activations, "%"), - } - - return cpus.issubset(fitting_cpus) - - res = ResultBundle.from_bool(self._for_each_phase(parse_phase).all()) - res.add_metric('Phases', metrics) - - self._plot_phases('test_placement', test_failures) - - return res - - @FrequencyAnalysis.df_cpus_frequency.used_events - @LoadTrackingAnalysis.df_tasks_signal.used_events - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_freq_selection(self) -> ResultBundle: - """ - For each phase, checks if the task placement and frequency selection - is compatible with UtilClamp requirements. This is done by comparing - the current CPU capacity on which the task has been placed, with the - UtilClamp value. - - The expected capacity is the schedutil projected frequency selection - for the given uclamp value. - """ - - metrics = {} - test_failures = [] - capacity_dfs = [] - # ( - # # schedutil factor that converts util to a frequency for a - # # given CPU: - # # - # # next_freq = max_freq * C * util / max_cap - # # - # # where C = 1.25 - # schedutil_factor, - # - # # list of frequencies available for a given CPU. - # frequencies, - # ) - cpu_frequencies = { - cpu: ( - (max(capacities) * (1 / self.CAPACITY_MARGIN)) / max(capacities.values()), - sorted(capacities) - ) - for cpu, capacities in - self._collect_capacities(self.plat_info).items() - } - cpu_capacities = self._collect_capacities(self.plat_info) - - @functools.lru_cache(maxsize=4096) - def schedutil_map_util_cap(cpu, util): - """ - Returns, for a given util on a given CPU, the capacity that - schedutil would select. - """ - - schedutil_factor, frequencies = cpu_frequencies[cpu] - schedutil_freq = schedutil_factor * util - - # Find the first available freq that meet the schedutil freq - # requirement. - for freq in frequencies: - if freq >= schedutil_freq: - break - - return cpu_capacities[cpu][freq] - - def parse_phase(df, phase): - uclamp_val = phase['uclamp_val'] - num_activations = df['activation_start'].sum() - - df['expected_capacity'] = df.apply(lambda line: schedutil_map_util_cap(line['cpu'], uclamp_val), axis=1) - - # Activations numbering - df['activation'] = df['activation_start'].cumsum() - - # Only keep the activations - df = df[df['activation_start']] - - # Actual capacity at which the task is running - for cpu, freq_to_capa in cpu_capacities.items(): - df[cpu] = df[cpu].map(freq_to_capa) - df['capacity'] = df.apply(lambda line: line[line['cpu']], axis=1) - - failures = df[df['capacity'] != df['expected_capacity']] - num_failures = failures['activation'].nunique() - - test_failures.extend(failures.index.tolist()) - capacity_dfs.append(df[['capacity', 'expected_capacity']]) - - metrics[phase['phase']] = { - 'uclamp-min': TestMetric(uclamp_val), - 'expected-mean-capacity': TestMetric(series_mean(df['expected_capacity'])), - 'bad-activations': TestMetric( - num_failures * 100 / num_activations, "%"), - } - - return failures.empty - - res = ResultBundle.from_bool(self._for_each_phase(parse_phase).all()) - res.add_metric('Phases', metrics) - - self._plot_phases( - 'test_frequency', - test_failures, - signals=pd.concat(capacity_dfs) - ) - - return res diff --git a/lisa_tests/test_example.py b/lisa_tests/test_example.py deleted file mode 100644 index 8067f7dfba..0000000000 --- a/lisa_tests/test_example.py +++ /dev/null @@ -1,273 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# Copyright (C) 2019, Arm Limited and contributors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from lisa.utils import ArtifactPath -from lisa.datautils import df_filter_task_ids -from lisa.trace import requires_events -from lisa.wlgen.rta import RTAPhase, PeriodicWload -from lisa.tests.base import TestBundle, RTATestBundle, ResultBundle -from lisa.target import Target -from lisa.analysis.load_tracking import LoadTrackingAnalysis - -""" -This module provides a LISA synthetic test example, heavily commented to show -how to use the main APIs. -""" - -################################################################################ -# It's a good idea to open the online doc in your browser when reading -# this example: -# https://lisa-linux-integrated-system-analysis.readthedocs.io/en/latest/ -# -# Also, lisa.utils.show_doc() can be called on any class/function to open the -# corresponding documentation in a browser. -################################################################################ - - -class ExampleTestBundle(RTATestBundle, TestBundle): - """ - The test bundle contains the data the test will work on. See - :class:`lisa.tests.base.TestBundle` for design notes. - - This example derives from :class:`lisa.tests.base.RTATestBundle`, so it - gains some ``rt-app``-specific and ftrace capabilities. - """ - - task_prefix = 'exmpl' - "Prefix used for rt-app task names" - - # res_dir and plat_info are "mandatory" parameters of all TestBundle, but - # the other ones are specific to a given use case. - def __init__(self, res_dir, plat_info, shell_output): - # This must be called, don't set res_dir or plat_info yourself - super().__init__(res_dir, plat_info) - - self.shell_output = shell_output - - @classmethod - # Uncomment that return annotation to allow exekall to work - def _from_target(cls, target: Target, *, res_dir: ArtifactPath, collector=None): #-> 'ExampleTestBundle': - """ - :meta public: - - This class method is the main way of creating a :class:`ExampleTestBundle`. - - It takes a first (positional) ``target`` parameter, which is a live - :class:`lisa.target.Target` object. It can be used to manipulate a - remote device such as a development board, to run workloads on it, - manipulate sysfs entries and so on. - - The ``collector`` parameter is a context manager to be used once around - while running the workload. It is created "magically": it's filled - automatically by the :class:`lisa.tests.base.TestBundleMeta` machinery, - based on the mixin base classes. For example, inheriting from - :class:`lisa.tests.base.DmesgTestBundle` and - :class:`lisa.tests.base.FtraceTestBundle` will lead to getting a - :class:`lisa.trace.ComposedCollector` that saves both an ftrace trace - and dmesg log. - - **All other parameters are keyword-only** - This means they must appear after the lone ``*`` in the parameter list. - - ``res_dir`` stands for "result directory" and is a location where the - bundle can store some artifacts collected from the target. The bundle - can rely on that folder being populated by this method. - - The "'ExampleTestBundle'" return annotation tells the test runner that - this class method acts as a factory of :class:`ExampleTestBundle`, so it - will be used to assemble the test case. - - .. seealso:: The class :class:`lisa.platforms.platinfo.PlatformInfo` - provides information about a device that are usually needed in - tests. - - .. seealso: This methods provides an easy way of running an rt-app - workload on the target device - :meth:`lisa.tests.base.RTATestBundle.run_rtapp` - """ - # PlatformInfo - # https://lisa-linux-integrated-system-analysis.readthedocs.io/en/latest/target.html#lisa.platforms.platinfo.PlatformInfo - # - # It's a central piece of LISA: it holds all the information about a - # given device. Use it to access any data it contains rather than - # fetching them yourselves, as the final user will have ways of - # providing values in case auto-detection fails, and logging of all the - # data it contains is provided out of the box. - plat_info = target.plat_info - - # The rt-app profile defines the rt-app workload that will be run - # note: If None is given to run_rtapp(), it will default to calling - # get_rtapp_profile() - rtapp_profile = cls.get_rtapp_profile(plat_info) - - # Here, we wanted to make sure the cpufreq governor is schedutil, since - # that's what we want to test. This is achieved through the used of - # devlib modules: - # https://devlib.readthedocs.io/en/latest/modules.html - with target.cpufreq.use_governor("schedutil"): - # RTATestBundle.run_rtapp() - # https://lisa-linux-integrated-system-analysis.readthedocs.io/en/latest/kernel_tests.html#lisa.tests.base.RTATestBundle.run_rtapp - # - # It allows running the rt-app profile on the target. "collector" - # is the object used to control the recording of the trace, and is - # setup by the test runner. This allows the final user to extend - # the list of ftrace events collected. If no collector is provided, - # a default one will be created by run_rtapp() based on the - # @requires_events() decorators used on method of that - # ExampleTestBundle. Note that it will also freeze all the tasks on - # the target device, so that the scheduler signals are not - # disturbed. Some critical tasks are not frozen though. - cls.run_rtapp(target, res_dir, rtapp_profile, collector=collector) - - # Execute a silly shell command on the target device as well - output = target.execute('echo $((21+21))').split() - - # Logging must be done through the provided logger, so it integrates well in LISA. - cls.get_logger().info('Finished doing stuff') - - # Actually create a ExampleTestBundle by calling the class. - return cls(res_dir, plat_info, output) - - @classmethod - def _get_rtapp_profile(cls, plat_info): - """ - :meta public: - - This class method is in charge of generating an rt-app profile, to - configure the workload that will be run using - :meth:`lisa.tests.base.RTATestBundle.run_rtapp`. - - It can access any information in the given - :class:`lisa.platforms.PlatformInfo` in order to obtain a workload - tailored to the capacity of the CPUs of the target, the available - frequencies and so on. - """ - - # Build a list of the CPU IDs that are available - cpus = list(range(plat_info['cpus-count'])) - - # The profile is a dictionary of task names (keys) to - # lisa.wlgen.rta.RTATask instances - # https://lisa-linux-integrated-system-analysis.readthedocs.io/en/latest/workloads.html - profile = {} - - for cpu in cpus: - util = cls.unscaled_utilization(plat_info, cpu, 50) - - # A PeriodicWload workload has a period, and a duty_cycle (which - # relates directly to task utilisation signal). - # - # LISA will run rt-app calibration if needed in order to know what - # actual quantity of work each CPU can do. It can be provided by - # the user in the platform information. - profile[f"{cls.task_prefix}_{cpu}"] = RTAPhase( - prop_wload=PeriodicWload( - # Fill 50% of ``cpu`` capacity. - duty_cycle_pct=50, - # If omitted, the biggest CPU in the system will be assumed - # and the amount of work will be scaled accordingly - scale_for_cpu=cpu, - duration=1, - period=cls.TASK_PERIOD, - ), - prop_cpus=[cpu], - ) - - return profile - - # ftrace events necessary for that test method to run must be specified here. - # This information will be used in a number of places: - # * To build the ExampleTestBundle.FTRACE_CONF attribute, which is then used by RTATestBundle.run_rtapp() - # * To parse the ftrace trace - # * In the Sphinx documentation. - # * To check that the events are available in the trace. A clear exception - # is raised if an even is missing. - # Note: Other decorators can be used to express optional events or - # alternatives, see lisa.trace module. - @requires_events('sched_switch', 'sched_wakeup') - # This allows referencing the @requires_events() of - # LoadTrackingAnalysis.df_tasks_signal(), so we don't duplicate that - # information here in case it changes in the future. Use that when you - # don't use the events directly in your code. - @LoadTrackingAnalysis.df_tasks_signal.used_events - # This decorator allows checking that there was no background noise (other - # tasks executing) while running the workload. If that was the case, the - # returned result_bundle.result will be set to Result.UNDECIDED, expressing - # that the data don't allow drawing a pass/fail conclusion. - @RTATestBundle.test_noisy_tasks.undecided_filter(noise_threshold_pct=1) - def test_output(self, util_margin=50) -> ResultBundle: - """ - Actual test method that looks at the collected data and draws a - conclusion based on it. - - The return annotation "'ResultBundle'" is used by the test runner to - assemble the test cases, since it's driven by types and what function - can produce them. - - .. seealso:: :class:`lisa.tests.base.ResultBundle` - """ - - # Get the pandas DataFrame of tasks utilisation. - # - # self.trace: This is a lisa.trace.Trace object, with all the events - # specified using @requires_events() on methods of this class. For - # subclasses of RTATestBundle, self.trace is actually a TraceView - # object, restricting the time range to when the rt-app tasks were - # executing. The test methods can therefore work on minimal and - # hopefully clean/relevant data. - # - # self.trace.analyis: A number of analysis objects are available, - # giving df_* methods that return various dataframes, and plot_* - # functions that can do various plots. - # https://lisa-linux-integrated-system-analysis.readthedocs.io/en/latest/trace_analysis.html - df = self.trace.ana.load_tracking.df_tasks_signal('util') - - # "resolve" the task names into (pid, comm) tuples. If there is any - # ambiguity because of the same name is reused in different PIDs, an - # exception will be raised. - # self.rtapp_tasks gives the list of task names as defined in - # get_rtapp_profile(). - task_ids = [self.trace.get_task_id(task) for task in self.rtapp_tasks] - - util_means = {} - - # Example test that checks the tasks' average utilisation is as expected - def check_task_util(task_id): - # Only keep the data about the tasks we care about. - _df = df_filter_task_ids(df, [task_id]) - avg = _df['util'].mean() - util_means[task_id.comm] = avg - # Util is not supposed to be higher than 512 given what we asked for in get_rtapp_profile() - return avg < (512 + util_margin) - - # Will be True if all check_task_util() calls are True - ok = all(check_task_util(task_id) for task_id in task_ids) - - # Create a pass/fail ResultBundle. - res_bundle = ResultBundle.from_bool(ok) - - # Named metrics (with a optional unit) can be attached to the - # ResultBundle, and will be reported to whoever runs the test. Good - # practice for threshold-based tests is to add one metric for the - # computed value, and one for the threshold. - # Extra metrics can be very helpful when doing initial investigations - # on a test failure, so it's better to be more verbose than not. - res_bundle.add_metric('expected util', 512) - for task, util_mean in util_means.items(): - res_bundle.add_metric(f'{task} util', util_mean) - - return res_bundle diff --git a/setup.py b/setup.py index a6584c179c..e2d296e924 100755 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ def _find_packages(toplevel): ))) ] -packages = _find_packages('lisa') + _find_packages('lisa_tests') +packages = _find_packages('lisa') package_data = { package: ['*'] diff --git a/tools/tests.sh b/tools/tests.sh index c125f08efb..3f1323ce69 100755 --- a/tools/tests.sh +++ b/tools/tests.sh @@ -35,9 +35,6 @@ timeout -s INT 1h python3 -m pytest -vv echo "Starting exekall self tests" exekall run "$LISA_HOME/tools/exekall/exekall/tests" -echo "Available LISA tests:" -lisa-test --list - echo "Starting documentation pedantic build ..." lisa-doc-build