From 71a48694aa2c8615801b467223a9759bfb677466 Mon Sep 17 00:00:00 2001 From: Nashwan Azhari Date: Fri, 20 Dec 2024 13:55:19 +0200 Subject: [PATCH] integration: fix all linter warnings in test harness. (#898) Signed-off-by: Nashwan Azhari --- .../tests/test_util/harness/lxd.py | 7 +- tests/integration/tests/test_util/util.py | 160 ++++++++++-------- 2 files changed, 97 insertions(+), 70 deletions(-) diff --git a/tests/integration/tests/test_util/harness/lxd.py b/tests/integration/tests/test_util/harness/lxd.py index 757c17cb6..7b767af05 100644 --- a/tests/integration/tests/test_util/harness/lxd.py +++ b/tests/integration/tests/test_util/harness/lxd.py @@ -6,7 +6,6 @@ import shlex import subprocess from pathlib import Path -from typing import List from test_util import config from test_util.harness import Harness, HarnessError, Instance @@ -92,6 +91,10 @@ def new_instance(self, network_type: str = "IPv4") -> Instance: ) if network_type.lower() == "dualstack": + if self.dualstack_profile is None: + raise ValueError( + "LXD `dualstack_profile` must be set for network_type=dualstack" + ) launch_lxd_command.extend(["-p", self.dualstack_profile]) if network_type.lower() == "ipv6": @@ -154,7 +157,7 @@ def _configure_profile(self, profile_name: str, profile_config: str): except subprocess.CalledProcessError as e: raise HarnessError(f"Failed to configure LXD profile {profile_name}") from e - def _configure_network(self, network_name: str, *network_args: List[str]): + def _configure_network(self, network_name: str, *network_args: str): LOG.debug("Checking for LXD network %s", network_name) try: run(["lxc", "network", "show", network_name]) diff --git a/tests/integration/tests/test_util/util.py b/tests/integration/tests/test_util/util.py index f1d0d31b2..83359407f 100644 --- a/tests/integration/tests/test_util/util.py +++ b/tests/integration/tests/test_util/util.py @@ -45,6 +45,65 @@ def run(command: list, **kwargs) -> subprocess.CompletedProcess: return subprocess.run(command, **kwargs) +class Retriable: + def __init__(self, retry_kwargs: Optional[Mapping[str, Any]]) -> None: + self._condition = None + self._run = partial(run, capture_output=True) + if retry_kwargs is None: + retry_kwargs = {} + self._retry_kwargs = retry_kwargs + + def exec( + self, + command_args: List[str], + **command_kwds, + ): + return retry(**self._retry_kwargs)(self._exec)(command_args, **command_kwds) + + def _exec( + self, + command_args: List[str], + **command_kwds, + ): + """ + Execute a command against a harness or locally with subprocess to be retried. + + :param List[str] command_args: The command to be executed, as a str or list of str + :param Mapping[str,str] command_kwds: Additional keyword arguments to be passed to exec + """ + + try: + resp = self._run(command_args, **command_kwds) + except subprocess.CalledProcessError as e: + LOG.warning(f" rc={e.returncode}") + LOG.warning(f" stdout={e.stdout.decode()}") + LOG.warning(f" stderr={e.stderr.decode()}") + raise + if self._condition: + assert self._condition(resp), "Failed to meet condition" + return resp + + def on(self, instance: harness.Instance) -> "Retriable": + """ + Target the command at some instance. + + :param instance Instance: Instance on a test harness. + """ + self._run = partial(instance.exec, capture_output=True) + return self + + def until( + self, condition: Callable[[subprocess.CompletedProcess], bool] | None = None + ) -> "Retriable": + """ + Test the output of the executed command against an expected response + + :param Callable condition: a callable which returns a truth about the command output + """ + self._condition = condition + return self + + def stubbornly( retries: Optional[int] = None, delay_s: Optional[Union[float, int]] = None, @@ -71,78 +130,32 @@ def stubbornly( def _before_sleep(retry_state: RetryCallState): attempt = retry_state.attempt_number tries = f"/{retries}" if retries is not None else "" - LOG.info( - f"Attempt {attempt}{tries} failed. Error: {retry_state.outcome.exception()}" - ) + errstr = "" + if retry_state.outcome: + errstr = f" Error: {retry_state.outcome.exception()}" + LOG.info(f"Attempt {attempt}{tries} failed.{errstr}") LOG.info(f"Retrying in {delay_s} seconds...") - _waits = wait_fixed(delay_s) if delay_s is not None else wait_fixed(0) - _stops = stop_after_attempt(retries) if retries is not None else stop_never - _exceptions = exceptions or (Exception,) # default to retry on all exceptions + waits = wait_fixed(delay_s) if delay_s is not None else wait_fixed(0) + stops = stop_after_attempt(retries) if retries is not None else stop_never + exceptions = exceptions or (Exception,) # default to retry on all exceptions - _retry_args = dict( - wait=_waits, - stop=_stops, - retry=retry_if_exception_type(_exceptions), + retry_args = dict( + wait=waits, + stop=stops, + retry=retry_if_exception_type(exceptions), before_sleep=_before_sleep, ) # Permit any tenacity retry overrides from these ^defaults - _retry_args.update(retry_kds) - - class Retriable: - def __init__(self) -> None: - self._condition = None - self._run = partial(run, capture_output=True) - - @retry(**_retry_args) - def exec( - self, - command_args: List[str], - **command_kwds, - ): - """ - Execute a command against a harness or locally with subprocess to be retried. - - :param List[str] command_args: The command to be executed, as a str or list of str - :param Mapping[str,str] command_kwds: Additional keyword arguments to be passed to exec - """ - - try: - resp = self._run(command_args, **command_kwds) - except subprocess.CalledProcessError as e: - LOG.warning(f" rc={e.returncode}") - LOG.warning(f" stdout={e.stdout.decode()}") - LOG.warning(f" stderr={e.stderr.decode()}") - raise - if self._condition: - assert self._condition(resp), "Failed to meet condition" - return resp - - def on(self, instance: harness.Instance) -> "Retriable": - """ - Target the command at some instance. - - :param instance Instance: Instance on a test harness. - """ - self._run = partial(instance.exec, capture_output=True) - return self - - def until( - self, condition: Callable[[subprocess.CompletedProcess], bool] = None - ) -> "Retriable": - """ - Test the output of the executed command against an expected response - - :param Callable condition: a callable which returns a truth about the command output - """ - self._condition = condition - return self - - return Retriable() + retry_args.update(retry_kds) + + return Retriable(retry_args) def _as_int(value: Optional[str]) -> Optional[int]: """Convert a string to an integer.""" + if value is None: + return value try: return int(value) except (TypeError, ValueError): @@ -249,10 +262,10 @@ def wait_until_k8s_ready( If the instance name is different from the hostname, the instance name should be passed to the node_names dictionary, e.g. {"instance_id": "node_name"}. """ + instance_id_node_name_map = {} for instance in instances: - if instance.id in node_names: - node_name = node_names[instance.id] - else: + node_name = node_names.get(instance.id) + if node_name is None: node_name = hostname(instance) result = ( @@ -261,8 +274,13 @@ def wait_until_k8s_ready( .until(lambda p: " Ready" in p.stdout.decode()) .exec(["k8s", "kubectl", "get", "node", node_name, "--no-headers"]) ) - LOG.info("Kubelet registered successfully!") - LOG.info("%s", result.stdout.decode()) + LOG.info(f"Kubelet registered successfully on instance '{instance.id}'") + LOG.info("%s", result.stdout.decode()) + instance_id_node_name_map[instance.id] = node_name + LOG.info( + "Successfully checked Kubelet registered on all harness instances: " + f"{instance_id_node_name_map}" + ) def wait_for_dns(instance: harness.Instance): @@ -362,7 +380,7 @@ def get_default_ip(instance: harness.Instance): return p.stdout.decode().split(" ")[8] -def get_global_unicast_ipv6(instance: harness.Instance, interface="eth0") -> str: +def get_global_unicast_ipv6(instance: harness.Instance, interface="eth0") -> str | None: # --- # 2: eth0: mtu 1500 qdisc fq_codel state UP group default qlen 1000 # link/ether 00:16:3e:0f:4d:1e brd ff:ff:ff:ff:ff:ff @@ -488,6 +506,12 @@ def previous_track(snap_version: str) -> str: stable = r.read().decode().strip() maj_min = major_minor(stable) + if not maj_min: + raise ValueError( + "Failed to determine previous snap version track for " + f"current version: {snap_version}" + ) + flavor_track = {"": "classic", "strict": ""}.get(config.FLAVOR, config.FLAVOR) track = f"{maj_min[0]}.{maj_min[1]}" + (flavor_track and f"-{flavor_track}") LOG.info("Previous track for %s is from track: %s", snap_version, track)