diff --git a/charms/worker/k8s/src/literals.py b/charms/worker/k8s/src/literals.py index 50b2180f..df1a1ef7 100644 --- a/charms/worker/k8s/src/literals.py +++ b/charms/worker/k8s/src/literals.py @@ -61,9 +61,9 @@ }, # NOTE: Update the dependencies for the k8s-service before releasing. "k8s_service": { - "dependencies": {"k8s-worker": "^1.30, < 1.32"}, + "dependencies": {"k8s-worker": "^1.31, < 1.33"}, "name": "k8s", - "upgrade_supported": "^1.30, < 1.32", - "version": "1.31.3", + "upgrade_supported": "^1.31, < 1.33", + "version": "1.32.0", }, } diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9b84c194..9e269810 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -2,22 +2,17 @@ # See LICENSE file for licensing details. """Fixtures for charm tests.""" -import asyncio import contextlib import json import logging -import re import shlex -from dataclasses import dataclass, field -from itertools import chain from pathlib import Path -from typing import Dict, List, Mapping, Optional, Tuple +from typing import Optional import juju.utils import pytest import pytest_asyncio import yaml -from juju.application import Application from juju.model import Model from juju.tag import untag from kubernetes import config as k8s_config @@ -25,19 +20,26 @@ from pytest_operator.plugin import OpsTest from .cos_substrate import LXDSubstrate -from .helpers import get_unit_cidrs, is_deployed +from .helpers import Bundle, Charm, cloud_type, get_unit_cidrs, is_deployed log = logging.getLogger(__name__) TEST_DATA = Path(__file__).parent / "data" DEFAULT_SNAP_INSTALLATION = TEST_DATA / "default-snap-installation.tar.gz" -DEFAULT_RESOURCES = {"snap-installation": ""} def pytest_addoption(parser: pytest.Parser): """Parse additional pytest options. - --charm-file can be used multiple times, specifies which local charm files are available - --upgrade-from instruct tests to start with a specific channel, and upgrade to these charms + --charm-file + can be used multiple times, specifies which local charm files are available + --upgrade-from + instruct tests to start with a specific channel, and upgrade to these charms + --snap-installation-resource + path to the snap installation resource + --lxd-containers + if cloud is LXD, use containers + --apply-proxy + apply proxy to model-config Args: parser: Pytest parser. @@ -68,7 +70,10 @@ def pytest_configure(config): config: Pytest config. """ config.addinivalue_line("markers", "cos: mark COS integration tests.") - config.addinivalue_line("markers", "bundle_file(name): specify a YAML bundle file for a test.") + config.addinivalue_line( + "markers", + "bundle(file='', apps_local={}, apps_channel={}, apps_resources={}): specify a YAML bundle file for a test.", + ) def pytest_collection_modifyitems(config, items): @@ -87,234 +92,6 @@ def pytest_collection_modifyitems(config, items): item.add_marker(skip_cos) -@dataclass -class Charm: - """Represents source charms. - - Attrs: - ops_test: Instance of the pytest-operator plugin - arch: Cloud Architecture - path: Path to the charm file - metadata: Charm's metadata - app_name: Preferred name of the juju application - """ - - ops_test: OpsTest - arch: str - path: Path - _charmfile: Optional[Path] = None - _URL_RE = re.compile(r"ch:(?P\w+)/(?P\w+)/(?P.+)") - - @staticmethod - def craft_url(charm: str, series: str, arch: str) -> str: - """Craft a charm URL. - - Args: - charm: Charm name - series: Cloud series - arch: Cloud architecture - - Returns: - string: URL to the charm - """ - if m := Charm._URL_RE.match(charm): - charm = m.group("charm") - return f"ch:{arch}/{series}/{charm}" - - @property - def metadata(self) -> dict: - """Charm Metadata.""" - return yaml.safe_load((self.path / "charmcraft.yaml").read_text()) - - @property - def app_name(self) -> str: - """Suggested charm name.""" - return self.metadata["name"] - - async def resolve(self, charm_files: List[str]) -> Path: - """Build or find the charm with ops_test. - - Args: - charm_files: The list charms files to resolve - - Return: - path to charm file - - Raises: - FileNotFoundError: the charm file wasn't found - """ - if self._charmfile is None: - try: - header = f"{self.app_name}_" - charm_name = header + "*.charm" - potentials = chain( - map(Path, charm_files), # Look in pytest arguments - Path().glob(charm_name), # Look in top-level path - self.path.glob(charm_name), # Look in charm-level path - ) - arch_choices = filter(lambda s: self.arch in str(s), potentials) - self._charmfile, *_ = filter(lambda s: s.name.startswith(header), arch_choices) - log.info("For %s found charmfile %s", self.app_name, self._charmfile) - except ValueError: - log.warning("No pre-built charm is available, let's build it") - if self._charmfile is None: - log.info("For %s build charmfile", self.app_name) - self._charmfile = await self.ops_test.build_charm(self.path) - if self._charmfile is None: - raise FileNotFoundError(f"{self.app_name}_*.charm not found") - return self._charmfile.resolve() - - -@dataclass -class Bundle: - """Represents test bundle. - - Attrs: - ops_test: Instance of the pytest-operator plugin - path: Path to the bundle file - content: Loaded content from the path - arch: Cloud Architecture - render: Path to a rendered bundle - applications: Mapping of applications in the bundle. - """ - - ops_test: OpsTest - path: Path - arch: str - _content: Mapping = field(default_factory=dict) - - @classmethod - async def create(cls, ops_test: OpsTest, path: Path) -> "Bundle": - """Create a bundle object. - - Args: - ops_test: Instance of the pytest-operator plugin - path: Path to the bundle file - - Returns: - Bundle: Instance of the Bundle - """ - arch = await cloud_arch(ops_test) - _type, _vms = await cloud_type(ops_test) - bundle = cls(ops_test, path, arch) - if _type == "lxd" and not _vms: - log.info("Drop lxd machine constraints") - bundle.drop_constraints() - if _type == "lxd" and _vms: - log.info("Constrain lxd machines with virt-type: virtual-machine") - bundle.add_constraints({"virt-type": "virtual-machine"}) - return bundle - - @property - def content(self) -> Mapping: - """Yaml content of the bundle loaded into a dict""" - if not self._content: - loaded = yaml.safe_load(self.path.read_bytes()) - series = loaded.get("series", "focal") - for app in loaded["applications"].values(): - app["charm"] = Charm.craft_url(app["charm"], series=series, arch=self.arch) - self._content = loaded - return self._content - - @property - def applications(self) -> Mapping[str, dict]: - """Mapping of all available application in the bundle.""" - return self.content["applications"] - - @property - def render(self) -> Path: - """Path to written bundle config to be deployed.""" - self.add_constraints({"arch": self.arch}) - target = self.ops_test.tmp_path / "bundles" / self.path.name - target.parent.mkdir(exist_ok=True, parents=True) - yaml.safe_dump(self.content, target.open("w")) - return target - - def switch(self, name: str, path: Optional[Path] = None, channel: Optional[str] = None): - """Replace charmhub application with a local charm path or specific channel. - - Args: - name (str): Which application - path (Path): Optional path to local charm - channel (str): Optional channel to use - - Raises: - ValueError: if both path and channel are provided, or neither are provided - """ - app = self.applications.get(name) - if not app: - return # Skip if the application is not in the bundle - if (not path and not channel) or (path and channel): - raise ValueError("channel and path are mutually exclusive") - if path: - app["charm"] = str(path.resolve()) - app["channel"] = None - app["resources"] = DEFAULT_RESOURCES - if channel: - app["charm"] = name - app["channel"] = channel - - def drop_constraints(self): - """Remove constraints on applications. Useful for testing on lxd.""" - for app in self.applications.values(): - app["constraints"] = "" - - def add_constraints(self, constraints: Dict[str, str]): - """Add constraints to applications. - - Args: - constraints: Mapping of constraints to add to applications. - """ - for app in self.applications.values(): - if app.get("num_units", 0) < 1: - log.info("Skipping constraints for subordinate charm: %s", app["charm"]) - continue - val: str = app.get("constraints", "") - existing = dict(kv.split("=", 1) for kv in val.split()) - existing.update(constraints) - app["constraints"] = " ".join(f"{k}={v}" for k, v in existing.items()) - - -async def cloud_arch(ops_test: OpsTest) -> str: - """Return current architecture of the selected controller - - Args: - ops_test (OpsTest): ops_test plugin - - Returns: - string describing current architecture of the underlying cloud - """ - assert ops_test.model, "Model must be present" - controller = await ops_test.model.get_controller() - controller_model = await controller.get_model("controller") - arch = set( - machine.safe_data["hardware-characteristics"]["arch"] - for machine in controller_model.machines.values() - ) - return arch.pop() - - -async def cloud_type(ops_test: OpsTest) -> Tuple[str, bool]: - """Return current cloud type of the selected controller - - Args: - ops_test (OpsTest): ops_test plugin - - Returns: - Tuple: - string describing current type of the underlying cloud - bool describing if VMs are enabled - """ - assert ops_test.model, "Model must be present" - controller = await ops_test.model.get_controller() - cloud = await controller.cloud() - _type = cloud.cloud.type_ - vms = True # Assume VMs are enabled - if _type == "lxd": - vms = not ops_test.request.config.getoption("--lxd-containers") - return _type, vms - - async def cloud_proxied(ops_test: OpsTest): """Setup a cloud proxy settings if necessary @@ -354,7 +131,6 @@ async def cloud_profile(ops_test: OpsTest): @contextlib.asynccontextmanager async def deploy_model( - request: pytest.FixtureRequest, ops_test: OpsTest, model_name: str, bundle: Bundle, @@ -362,7 +138,6 @@ async def deploy_model( """Add a juju model, deploy apps into it, wait for them to be active. Args: - request: handle to pytest requests from calling fixture ops_test: Instance of the pytest-operator plugin model_name: name of the model in which to deploy bundle: Bundle object to deploy or redeploy into the model @@ -371,8 +146,8 @@ async def deploy_model( model object """ config: Optional[dict] = {} - if request.config.option.model_config: - config = ops_test.read_model_config(request.config.option.model_config) + if ops_test.request.config.option.model_config: + config = ops_test.read_model_config(ops_test.request.config.option.model_config) credential_name = ops_test.cloud_name if model_name not in ops_test.models: await ops_test.track_model( @@ -384,7 +159,8 @@ async def deploy_model( with ops_test.model_context(model_name) as the_model: await cloud_profile(ops_test) async with ops_test.fast_forward("60s"): - await the_model.deploy(bundle.render) + bundle_yaml = bundle.render(ops_test.tmp_path) + await the_model.deploy(bundle_yaml) await the_model.wait_for_idle( apps=list(bundle.applications), status="active", @@ -396,85 +172,27 @@ async def deploy_model( log.fatal("Failed to determine model: model_name=%s", model_name) -def bundle_file(request) -> Path: - """Helper to get bundle file. - - Args: - request: pytest request object - - Returns: - path to test's bundle file - """ - _file = "test-bundle.yaml" - bundle_marker = request.node.get_closest_marker("bundle_file") - if bundle_marker: - _file = bundle_marker.args[0] - return Path(__file__).parent / "data" / _file - - @pytest_asyncio.fixture(scope="module") async def kubernetes_cluster(request: pytest.FixtureRequest, ops_test: OpsTest): - """Deploy local kubernetes charms.""" - bundle_path = bundle_file(request) + """Deploy kubernetes charms according to the bundle_marker.""" model = "main" + bundle, markings = await Bundle.create(ops_test) with ops_test.model_context(model) as the_model: - if await is_deployed(the_model, bundle_path): + if await is_deployed(the_model, bundle.path): log.info("Using existing model.") yield ops_test.model return - log.info("Deploying cluster using %s bundle.", bundle_path) - - bundle = await Bundle.create(ops_test, bundle_path) + log.info("Deploying new cluster using %s bundle.", bundle.path) if request.config.option.apply_proxy: await cloud_proxied(ops_test) - charms = [Charm(ops_test, bundle.arch, Path("charms") / p) for p in ("worker/k8s", "worker")] - charm_files_args = request.config.option.charm_files - DEFAULT_RESOURCES["snap-installation"] = request.config.option.snap_installation_resource - charm_files = await asyncio.gather(*[charm.resolve(charm_files_args) for charm in charms]) - switch_to_path = {} - for path, charm in zip(charm_files, charms): - if upgrade_channel := request.config.option.upgrade_from: - bundle.switch(charm.app_name, channel=upgrade_channel) - switch_to_path[charm.app_name] = path - else: - bundle.switch(charm.app_name, path=path) - - async with deploy_model(request, ops_test, model, bundle) as the_model: - await upgrade_model(the_model, switch_to_path) + await bundle.apply_marking(ops_test, markings) + async with deploy_model(ops_test, model, bundle) as the_model: yield the_model -async def upgrade_model(model: Model, switch_to_path: dict[str, Path]): - """Upgrade the model with the provided charms. - - Args: - model: Juju model - switch_to_path: Mapping of app_name to charm - - """ - if not switch_to_path: - return - - async def _refresh(app_name: str): - """Refresh the application. - - Args: - app_name: Name of the application to refresh - """ - app: Application = model.applications[app_name] - await app.refresh(path=switch_to_path[app_name], resources=DEFAULT_RESOURCES) - - await asyncio.gather(*[_refresh(app) for app in switch_to_path]) - await model.wait_for_idle( - apps=list(switch_to_path.keys()), - status="active", - timeout=30 * 60, - ) - - @pytest_asyncio.fixture(name="_grafana_agent", scope="module") async def grafana_agent(kubernetes_cluster: Model): """Deploy Grafana Agent.""" diff --git a/tests/integration/data/test-bundle-ceph.yaml b/tests/integration/data/test-bundle-ceph.yaml index 4f93361e..dc015b6f 100644 --- a/tests/integration/data/test-bundle-ceph.yaml +++ b/tests/integration/data/test-bundle-ceph.yaml @@ -8,7 +8,6 @@ series: jammy applications: k8s: charm: k8s - channel: latest/edge constraints: cores=2 mem=8G root-disk=16G num_units: 1 ceph-csi: diff --git a/tests/integration/data/test-bundle-etcd.yaml b/tests/integration/data/test-bundle-etcd.yaml index 662c984f..42dbdf62 100644 --- a/tests/integration/data/test-bundle-etcd.yaml +++ b/tests/integration/data/test-bundle-etcd.yaml @@ -18,7 +18,6 @@ applications: num_units: 1 k8s: charm: k8s - channel: latest/edge num_units: 1 constraints: cores=2 mem=8G root-disk=16G options: @@ -26,7 +25,6 @@ applications: bootstrap-node-taints: "node-role.kubernetes.io/control-plane=:NoSchedule" k8s-worker: charm: k8s-worker - channel: latest/edge constraints: cores=2 mem=8G root-disk=16G num_units: 1 relations: diff --git a/tests/integration/data/test-bundle.yaml b/tests/integration/data/test-bundle.yaml index b0fe4a0f..ca38a19a 100644 --- a/tests/integration/data/test-bundle.yaml +++ b/tests/integration/data/test-bundle.yaml @@ -8,7 +8,6 @@ series: focal applications: k8s: charm: k8s - channel: latest/edge num_units: 3 constraints: cores=2 mem=8G root-disk=16G expose: true @@ -21,7 +20,6 @@ applications: kubelet-extra-args: "v=3" k8s-worker: charm: k8s-worker - channel: latest/edge num_units: 2 constraints: cores=2 mem=8G root-disk=16G options: diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index dae1b2ba..97819d00 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -4,19 +4,26 @@ # pylint: disable=too-many-arguments,too-many-positional-arguments +import asyncio import ipaddress import json import logging +import re import shlex +from dataclasses import dataclass, field +from functools import cache, cached_property +from itertools import chain from pathlib import Path -from typing import List +from typing import Dict, List, Mapping, Optional, Sequence, Tuple import yaml from juju import unit from juju.model import Model +from pytest_operator.plugin import OpsTest from tenacity import AsyncRetrying, before_sleep_log, retry, stop_after_attempt, wait_fixed log = logging.getLogger(__name__) +CHARMCRAFT_DIRS = {"k8s": Path("charms/worker/k8s"), "k8s-worker": Path("charms/worker")} async def is_deployed(model: Model, bundle_path: Path) -> bool: @@ -193,3 +200,388 @@ async def get_pod_logs( result = await action.wait() assert result.results["return-code"] == 0, f"Failed to retrieve pod {name} logs." return result.results["stdout"] + + +async def get_leader(app) -> int: + """Find leader unit of an application. + + Args: + app: Juju application + + Returns: + int: index to leader unit + + Raises: + ValueError: No leader found + """ + is_leader = await asyncio.gather(*(u.is_leader_from_status() for u in app.units)) + for idx, flag in enumerate(is_leader): + if flag: + return idx + raise ValueError("No leader found") + + +@dataclass +class Markings: + """Test markings for the bundle. + + Attrs: + apps_local: List of application names needing local files to replace charm urls + apps_channel: Mapping of application names to channels + apps_resources: Mapping of application names to resources + """ + + apps_local: List[str] = field(default_factory=list) + apps_channel: Mapping = field(default_factory=dict) + apps_resources: Mapping = field(default_factory=dict) + + +@dataclass +class CharmUrl: + """Represents a charm URL. + + Attrs: + name: Name of the charm in the store + series: Cloud series + arch: Cloud architecture + """ + + name: str + series: str + arch: str + _URL_RE = re.compile(r"ch:(?P\w+)/(?P\w+)/(?P.+)") + + @classmethod + def craft(cls, name: str, series: str, arch: str) -> "CharmUrl": + """Parse a charm URL. + + Args: + name: Name or URL of the charm + series: Cloud series + arch: Cloud architecture + + Returns: + CharmUrl object + """ + if m := cls._URL_RE.match(name): + name = m.group("charm") + return cls(name, series, arch) + + def __str__(self) -> str: + """Return the charm URL. + + Returns: + string: charm URL + """ + return f"ch:{self.arch}/{self.series}/{self.name}" + + +@dataclass +class Charm: + """Represents source charms in this repository. + + Attrs: + path: Path to the charmcraft file + metadata: Charm's metadata + name: Name of the charm from the metadata + local_path: Path to the built charm file + """ + + path: Path + _charmfile: Optional[Path] = None + + @cached_property + def metadata(self) -> dict: + """Charm Metadata.""" + return yaml.safe_load((self.path / "charmcraft.yaml").read_text()) + + @property + def name(self) -> str: + """Name defined by the charm.""" + return self.metadata["name"] + + @property + def local_path(self) -> Path: + """Local path to the charm. + + Returns: + Path to the built charm file + Raises: + FileNotFoundError: the charm file wasn't found + """ + if self._charmfile is None: + raise FileNotFoundError(f"{self.name}_*.charm not found") + return self._charmfile + + @classmethod + @cache + def find(cls, name: str) -> Optional["Charm"]: + """Find a charm by name. + + Args: + name: Name of the charm + + Returns: + Charm object or None + """ + if charmcraft := CHARMCRAFT_DIRS.get(name): + return cls(charmcraft) + + async def resolve(self, ops_test: OpsTest, arch: str) -> "Charm": + """Build or find the charm with ops_test. + + Args: + ops_test: Instance of the pytest-operator plugin + arch (str): Cloud architecture + + Return: + self (Charm): the resolved charm + + Raises: + FileNotFoundError: the charm file wasn't found + """ + prefix = f"{self.name}_" + if self._charmfile is None: + charm_files = ops_test.request.config.option.charm_files or [] + try: + charm_name = prefix + "*.charm" + potentials = chain( + map(Path, charm_files), # Look in pytest arguments + Path().glob(charm_name), # Look in top-level path + self.path.glob(charm_name), # Look in charm-level path + ) + arch_choices = filter(lambda s: arch in str(s), potentials) + self._charmfile, *_ = filter(lambda s: s.name.startswith(prefix), arch_choices) + log.info("For %s found charmfile %s", self.name, self._charmfile) + except ValueError: + log.warning("No pre-built charm is available, let's build it") + if self._charmfile is None: + log.info("For %s build charmfile", self.name) + self._charmfile = await ops_test.build_charm(self.path) + if self._charmfile is None: + raise FileNotFoundError(f"{prefix}*.charm not found") + return self + + +@dataclass +class Bundle: + """Represents a test bundle. + + Attrs: + path: Path to the bundle file + arch: Cloud Architecture + content: Loaded content from the path + applications: Mapping of applications in the bundle. + """ + + path: Path + arch: str + _content: Mapping = field(default_factory=dict) + + @classmethod + async def create(cls, ops_test) -> Tuple["Bundle", Markings]: + """Craft a bundle for the given ops_test environment. + + Args: + ops_test: Instance of the pytest-operator plugin + + Returns: + Bundle object for the test + Markings from the test + """ + bundle_marker = ops_test.request.node.get_closest_marker("bundle") + assert bundle_marker, "No bundle marker found" + kwargs = {**bundle_marker.kwargs} + + if val := kwargs.pop("file", None): + path = Path(__file__).parent / "data" / val + else: + log.warning("No file specified, using default test-bundle.yaml") + path = Path(__file__).parent / "data" / "test-bundle.yaml" + + arch = await cloud_arch(ops_test) + assert arch, "Architecture must be known before customizing the bundle" + + bundle = cls(path=path, arch=arch) + bundle.add_constraints({"arch": arch}) + + assert not all( + _ in kwargs for _ in ("apps_local", "apps_channel") + ), "Cannot use both apps_local and apps_channel" + + return bundle, Markings(**kwargs) + + @property + def content(self) -> Mapping: + """Yaml content of the bundle loaded into a dict + + Returns: + Mapping: bundle content + """ + if not self._content: + loaded = yaml.safe_load(self.path.read_bytes()) + series = loaded.get("series", "focal") + for app in loaded["applications"].values(): + app["charm"] = CharmUrl(app["charm"], series=series, arch=self.arch) + self._content = loaded + return self._content + + @property + def applications(self) -> Mapping[str, dict]: + """Mapping of all available application in the bundle. + + Returns: + Mapping: application name to application details + """ + return self.content["applications"] + + async def discover_charm_files(self, ops_test: OpsTest) -> Dict[str, Charm]: + """Discover charm files for the applications in the bundle. + + Args: + ops_test: Instance of the pytest-operator plugin + arch: Cloud architecture + + Returns: + Mapping: application name to Charm object + """ + app_to_charm = {} + for app in self.applications.values(): + if charm := Charm.find(app["charm"].name): + await charm.resolve(ops_test, self.arch) + app_to_charm[charm.name] = charm + return app_to_charm + + async def apply_marking(self, ops_test: OpsTest, markings: Markings): + """Customize the bundle for the test. + + Args: + ops_test: Instance of the pytest-operator plugin + """ + _type, _vms = await cloud_type(ops_test) + if _type == "lxd" and not _vms: + log.info("Drop lxd machine constraints") + self.drop_constraints() + if _type == "lxd" and _vms: + log.info("Constrain lxd machines with virt-type: virtual-machine") + self.add_constraints({"virt-type": "virtual-machine"}) + + charms = await self.discover_charm_files(ops_test) + + empty_resource = { + "snap-installation": ops_test.request.config.option.snap_installation_resource + } + for app in markings.apps_local: + assert app in charms, f"App={app} doesn't have a local charm" + rsc = markings.apps_resources.get(app) or empty_resource + self.switch(app, charm=charms[app], channel=None, resources=rsc) + + for app, channel in markings.apps_channel.items(): + rsc = markings.apps_resources.get(app) + self.switch(app, charm=charms[app], channel=channel, resources=rsc) + + def switch( + self, + name: str, + charm: Charm, + channel: Optional[str] = None, + resources: Optional[dict] = None, + ): + """Replace charmhub application with a local path or specific channel. + + Args: + name (str): Which application + charm (Charm): Which charm to use + channel (Optional[str]): If specified use channel, otherwise use local path + resources (dict): Optional resources to add + + Raises: + ValueError: if both path and channel are provided, or neither are provided + """ + app = self.applications.get(name) + if not app: + return # Skip if the application is not in the bundle + if not charm.local_path and not channel: + raise FileNotFoundError(f"Charm={charm.name} for App={app} not found") + if channel: + app["charm"] = charm.name + app["channel"] = channel + else: + app["charm"] = str(charm.local_path.resolve()) + app["channel"] = None + if resources: + app["resources"] = resources + + def drop_constraints(self): + """Remove constraints on applications. Useful for testing on lxd.""" + for app in self.applications.values(): + app["constraints"] = "" + + def add_constraints(self, constraints: Dict[str, str]): + """Add constraints to applications. + + Args: + constraints: Mapping of constraints to add to applications. + """ + for app in self.applications.values(): + if app.get("num_units", 0) < 1: + log.info("Skipping constraints for subordinate charm: %s", app["charm"]) + continue + val: str = app.get("constraints", "") + existing = dict(kv.split("=", 1) for kv in val.split()) + existing.update(constraints) + app["constraints"] = " ".join(f"{k}={v}" for k, v in existing.items()) + + def render(self, tmp_path: Path) -> Path: + """Path to written bundle config to be deployed. + + Args: + tmp_path: temporary path to write the bundle + + Returns: + Path to the written bundle + """ + target = tmp_path / "bundles" / self.path.name + target.parent.mkdir(exist_ok=True, parents=True) + yaml.safe_dump(self.content, target.open("w")) + return target + + +async def cloud_arch(ops_test: OpsTest) -> str: + """Return current architecture of the selected controller + + Args: + ops_test (OpsTest): ops_test plugin + + Returns: + string describing current architecture of the underlying cloud + """ + assert ops_test.model, "Model must be present" + controller = await ops_test.model.get_controller() + controller_model = await controller.get_model("controller") + arch = set( + machine.safe_data["hardware-characteristics"]["arch"] + for machine in controller_model.machines.values() + ) + return arch.pop() + + +async def cloud_type(ops_test: OpsTest) -> Tuple[str, bool]: + """Return current cloud type of the selected controller + + Args: + ops_test (OpsTest): ops_test plugin + + Returns: + Tuple: + string describing current type of the underlying cloud + bool describing if VMs are enabled + """ + assert ops_test.model, "Model must be present" + controller = await ops_test.model.get_controller() + cloud = await controller.cloud() + _type = cloud.cloud.type_ + vms = True # Assume VMs are enabled + if _type == "lxd": + vms = not ops_test.request.config.getoption("--lxd-containers") + return _type, vms diff --git a/tests/integration/test_ceph.py b/tests/integration/test_ceph.py index ae1a4732..02caa7a6 100644 --- a/tests/integration/test_ceph.py +++ b/tests/integration/test_ceph.py @@ -15,7 +15,7 @@ # This pytest mark configures the test environment to use the Canonical Kubernetes # bundle with ceph, for all the test within this module. -pytestmark = [pytest.mark.bundle_file("test-bundle-ceph.yaml")] +pytestmark = [pytest.mark.bundle(file="test-bundle-ceph.yaml", apps_local=["k8s"])] def _get_data_file_path(name) -> str: diff --git a/tests/integration/test_etcd.py b/tests/integration/test_etcd.py index 9708a93d..800af23a 100644 --- a/tests/integration/test_etcd.py +++ b/tests/integration/test_etcd.py @@ -14,9 +14,7 @@ # This pytest mark configures the test environment to use the Canonical Kubernetes # bundle with etcd, for all the test within this module. -pytestmark = [ - pytest.mark.bundle_file("test-bundle-etcd.yaml"), -] +pytestmark = [pytest.mark.bundle(file="test-bundle-etcd.yaml", apps_local=["k8s", "k8s-worker"])] @pytest.mark.abort_on_fail diff --git a/tests/integration/test_k8s.py b/tests/integration/test_k8s.py index 13f73726..81f6de98 100644 --- a/tests/integration/test_k8s.py +++ b/tests/integration/test_k8s.py @@ -15,29 +15,15 @@ from tenacity import retry, stop_after_attempt, wait_fixed from .grafana import Grafana -from .helpers import get_nodes, ready_nodes +from .helpers import get_leader, get_nodes, ready_nodes from .prometheus import Prometheus log = logging.getLogger(__name__) -async def get_leader(app) -> int: - """Find leader unit of an application. - - Args: - app: Juju application - - Returns: - int: index to leader unit - - Raises: - ValueError: No leader found - """ - is_leader = await asyncio.gather(*(u.is_leader_from_status() for u in app.units)) - for idx, flag in enumerate(is_leader): - if flag: - return idx - raise ValueError("No leader found") +pytestmark = [ + pytest.mark.bundle(file="test-bundle.yaml", apps_local=["k8s", "k8s-worker"]), +] @pytest.mark.abort_on_fail diff --git a/tests/integration/test_upgrade.py b/tests/integration/test_upgrade.py new file mode 100644 index 00000000..1d22c9e1 --- /dev/null +++ b/tests/integration/test_upgrade.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 + +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Upgrade Integration tests.""" + +import logging +from typing import Optional + +import juju.application +import juju.model +import juju.unit +import pytest +import yaml +from pytest_operator.plugin import OpsTest + +from .helpers import Bundle, Charm, get_leader + +# This pytest mark configures the test environment to use the Canonical Kubernetes +# deploying charms from the edge channels, then upgrading them to the built charm. +pytestmark = [ + pytest.mark.bundle( + file="test-bundle.yaml", apps_channel={"k8s": "edge", "k8s-worker": "edge"} + ), +] + + +log = logging.getLogger(__name__) + + +@pytest.mark.abort_on_fail +async def test_k8s_upgrade(kubernetes_cluster: juju.model.Model, ops_test: OpsTest): + """Upgrade the model with the provided charms. + + Args: + kubernetes_cluster: The kubernetes model + ops_test: The test harness + request: The request object + """ + local_resources = { + "snap-installation": ops_test.request.config.option.snap_installation_resource + } + bundle, _ = await Bundle.create(ops_test) + charms = await bundle.discover_charm_files(ops_test) + + async def _refresh(app_name: str): + """Refresh the application. + + Args: + app_name: Name of the application to refresh + """ + app: Optional[juju.application.Application] = kubernetes_cluster.applications[app_name] + assert app is not None, f"Application {app_name} not found" + + log.info(f"Refreshing {app_name}") + leader_idx: int = await get_leader(app) + leader: juju.unit.Unit = app.units[leader_idx] + action = await leader.run_action("pre-upgrade-check") + await action.wait() + with_fault = f"Pre-upgrade of {app_name} failed with {yaml.safe_dump(action.results)}" + if app_name == "k8s": + # The k8s charm has a pre-upgrade-check action that works, k8s-worker does not. + assert action.status == "completed", with_fault + assert action.results["return-code"] == 0, with_fault + await app.refresh(path=charms[app_name].local_path, resources=local_resources) + await kubernetes_cluster.wait_for_idle( + apps=list(charms.keys()), + status="active", + timeout=30 * 60, + ) + + for app in charms: + await _refresh(app)