diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..41161ee --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +# Dockerfile to use with the controller environment where we run the environment provider as a Job +# instead of running it as an imported service in the suite runner. +FROM python:3.9-bookworm AS build + +COPY . /src +WORKDIR /src +RUN pip install --no-cache-dir build && python3 -m build + +FROM python:3.9-slim-bookworm + +COPY --from=build /src/dist/*.whl /tmp +# hadolint ignore=DL3013 + +RUN pip install --no-cache-dir /tmp/*.whl && groupadd -r etos && useradd -r -m -s /bin/false -g etos etos + +USER etos + +LABEL org.opencontainers.image.source=https://github.com/eiffel-community/etos-environment-provider +LABEL org.opencontainers.image.authors=etos-maintainers@googlegroups.com +LABEL org.opencontainers.image.licenses=Apache-2.0 + +CMD ["python", "-u", "-m", "environment_provider.environment_provider"] diff --git a/pyproject.toml b/pyproject.toml index 5c89182..bf0ac5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "jsontas~=1.3", "packageurl-python~=0.11", "etcd3gw~=2.3", - "etos_lib==4.3.6", + "etos-lib==4.4.1", "opentelemetry-api~=1.21", "opentelemetry-exporter-otlp~=1.21", "opentelemetry-sdk~=1.21", @@ -52,4 +52,4 @@ norecursedirs = ["dist", "build", ".tox"] testpaths = ["tests"] [tool.setuptools.packages] -find = { where = ["src"], exclude = ["tests"] } \ No newline at end of file +find = { where = ["src"], exclude = ["tests"] } diff --git a/requirements.txt b/requirements.txt index 4c87091..378adee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ cryptography>=42.0.4,<43.0.0 jsontas~=1.3 packageurl-python~=0.11 etcd3gw~=2.3 -etos_lib==4.3.6 +etos-lib==4.4.1 opentelemetry-api~=1.21 opentelemetry-exporter-otlp~=1.21 opentelemetry-sdk~=1.21 diff --git a/src/environment_provider/environment.py b/src/environment_provider/environment.py index d16c198..ab7f21f 100644 --- a/src/environment_provider/environment.py +++ b/src/environment_provider/environment.py @@ -16,7 +16,9 @@ """Backend for the environment requests.""" import json import time +import sys import traceback +import logging import re from typing import Optional, Union @@ -26,6 +28,7 @@ from environment_provider.lib.database import ETCDPath from environment_provider.lib.registry import ProviderRegistry +from environment_provider.lib.releaser import EnvironmentReleaser from execution_space_provider import ExecutionSpaceProvider from execution_space_provider.execution_space import ExecutionSpace from iut_provider import IutProvider @@ -71,16 +74,12 @@ def release_environment( """ etos.config.set("SUITE_ID", sub_suite.get("suite_id")) iut = sub_suite.get("iut") - iut_ruleset = provider_registry.get_iut_provider_by_id(iut.get("provider_id")).get("iut") + iut_ruleset = provider_registry.get_iut_provider().get("iut") executor = sub_suite.get("executor") - executor_ruleset = provider_registry.get_execution_space_provider_by_id( - executor.get("provider_id") - ).get("execution_space") + executor_ruleset = provider_registry.get_execution_space_provider().get("execution_space") log_area = sub_suite.get("log_area") - log_area_ruleset = provider_registry.get_log_area_provider_by_id( - log_area.get("provider_id") - ).get("log") + log_area_ruleset = provider_registry.get_log_area_provider().get("log") failure = None @@ -151,3 +150,29 @@ def release_full_environment(etos: ETOS, jsontas: JsonTas, suite_id: str) -> tup traceback.format_exception(failure, value=failure, tb=failure.__traceback__) ) return True, "" + + +def run(environment_id: str): + """Run is an entrypoint for releasing environments.""" + logformat = "[%(asctime)s] %(levelname)s:%(message)s" + logging.basicConfig( + level=logging.INFO, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" + ) + try: + releaser = EnvironmentReleaser() + releaser.run(environment_id) + result = {"conclusion": "Successful", "description": "Successfully released an environment"} + with open("/dev/termination-log", "w", encoding="utf-8") as termination_log: + json.dump(result, termination_log) + except: + try: + result = {"conclusion": "Failed", "description": traceback.format_exc()} + with open("/dev/termination-log", "w", encoding="utf-8") as termination_log: + json.dump(result, termination_log) + except PermissionError: + pass + raise + + +if __name__ == "__main__": + run(sys.argv[1]) diff --git a/src/environment_provider/environment_provider.py b/src/environment_provider/environment_provider.py index 4e38c3b..f622d23 100644 --- a/src/environment_provider/environment_provider.py +++ b/src/environment_provider/environment_provider.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS Environment Provider module.""" +import sys import json import logging import os @@ -22,14 +23,21 @@ import uuid from datetime import datetime from tempfile import NamedTemporaryFile -from typing import Any, Union +from typing import Any, Optional from etos_lib.etos import ETOS from etos_lib.lib.events import EiffelEnvironmentDefinedEvent from etos_lib.logging.logger import FORMAT_CONFIG from etos_lib.opentelemetry.semconv import Attributes as SemConvAttributes +from etos_lib.kubernetes import Kubernetes, Environment, Provider +from etos_lib.kubernetes.schemas.common import OwnerReference +from etos_lib.kubernetes.schemas import Environment as EnvironmentSchema, EnvironmentSpec, Metadata +from etos_lib.kubernetes.schemas import Test +from etos_lib.kubernetes.schemas import Provider as ProviderSchema +from etos_lib.kubernetes.schemas import EnvironmentRequest as EnvironmentRequestSchema from jsontas.jsontas import JsonTas -import opentelemetry +from packageurl import PackageURL +from opentelemetry import trace from opentelemetry.trace import SpanKind from execution_space_provider.execution_space import ExecutionSpace @@ -42,6 +50,7 @@ from .lib.json_dumps import JsonDumps from .lib.log_area import LogArea from .lib.registry import ProviderRegistry +from .lib.database import ETCDPath from .lib.test_suite import TestSuite from .lib.uuid_generate import UuidGenerate from .splitter.split import Splitter @@ -57,32 +66,39 @@ class EnvironmentProviderNotConfigured(Exception): """Environment provider was not configured prior to request.""" +class EnvironmentProviderError(Exception): + """Environment provider got an error.""" + + class EnvironmentProvider: # pylint:disable=too-many-instance-attributes """Environment provider.""" logger = logging.getLogger("EnvironmentProvider") - environment_provider_config = None iut_provider = None log_area_provider = None execution_space_provider = None + testrun = None - def __init__(self, suite_id: str, suite_runner_ids: list[str]) -> None: + def __init__(self, suite_runner_ids: Optional[list[str]] = None) -> None: """Initialize ETOS, dataset, provider registry and splitter. - :param suite_id: Suite ID to get an environment for :param suite_runner_ids: IDs from the suite runner to correlate sub suites. """ - FORMAT_CONFIG.identifier = suite_id - self.logger.info("Initializing EnvironmentProvider.") - self.tracer = opentelemetry.trace.get_tracer(__name__) + self.etos = ETOS( + "ETOS Environment Provider", os.getenv("HOSTNAME", "Unknown"), "Environment Provider" + ) + self.kubernetes = Kubernetes() + self.environment_provider_config = Config(self.etos, self.kubernetes, suite_runner_ids) - self.etos = ETOS("ETOS Environment Provider", os.getenv("HOSTNAME"), "Environment Provider") + FORMAT_CONFIG.identifier = self.environment_provider_config.requests[0].spec.identifier + self.suite_id = self.environment_provider_config.requests[0].spec.identifier + + self.logger.info("Initializing EnvironmentProvider.") + self.tracer = trace.get_tracer(__name__) - self.suite_id = suite_id self.suite_runner_ids = suite_runner_ids self.reset() - self.splitter = Splitter(self.etos, {}) def reset(self) -> None: """Create a new dataset and provider registry.""" @@ -94,24 +110,18 @@ def reset(self) -> None: self.dataset.add("encrypt", Encrypt) self.registry = ProviderRegistry(self.etos, self.jsontas, self.suite_id) - def new_dataset(self, dataset: dict) -> None: + def new_dataset(self, request: EnvironmentRequestSchema) -> None: """Load a new dataset. :param dataset: Dataset to use for this configuration. """ self.reset() - self.dataset.add("environment", os.environ) - self.dataset.add("config", self.etos.config) - self.dataset.add("identity", self.environment_provider_config.identity) + self.dataset.add("identity", PackageURL.from_string(request.spec.identity)) - self.dataset.add("artifact_id", self.environment_provider_config.artifact_id) + self.dataset.add("artifact_id", request.spec.artifact) self.dataset.add("context", self.environment_provider_config.context) - self.dataset.add("uuid", str(uuid.uuid4())) - - self.dataset.add("artifact_created", self.environment_provider_config.artifact_created) - self.dataset.add("artifact_published", self.environment_provider_config.artifact_published) - self.dataset.add("tercc", self.environment_provider_config.tercc) + dataset = request.spec.dataset or {} self.dataset.add("dataset", dataset) self.dataset.merge(dataset) @@ -119,11 +129,8 @@ def new_dataset(self, dataset: dict) -> None: self.log_area_provider = self.registry.log_area_provider() self.execution_space_provider = self.registry.execution_space_provider() - def configure(self, suite_id: str) -> None: - """Configure environment provider. - - :param suite_id: Suite ID for this task. - """ + def configure(self, request: EnvironmentRequestSchema) -> None: + """Configure environment provider.""" self.logger.info("Configure environment provider.") if not self.registry.wait_for_configuration(): # TODO: Add link ref to docs that describe how the config is done. @@ -133,7 +140,7 @@ def configure(self, suite_id: str) -> None: "environment." ) self.logger.info("Registry is configured.") - self.etos.config.set("SUITE_ID", suite_id) + self.etos.config.set("SUITE_ID", request.spec.identifier) self.etos.config.set("EVENT_DATA_TIMEOUT", int(os.getenv("ETOS_EVENT_DATA_TIMEOUT", "10"))) self.etos.config.set( @@ -155,25 +162,6 @@ def configure(self, suite_id: str) -> None: self.etos.publisher.wait_start() self.logger.info("Connected") - self.environment_provider_config = Config(self.etos, suite_id) - if not self.environment_provider_config.generated: - missing = [ - name - for name, value in [ - ("tercc", self.environment_provider_config.tercc), - ( - "artifact_created", - self.environment_provider_config.artifact_created, - ), - ( - "activity_triggered", - self.environment_provider_config.activity_triggered, - ), - ] - if value is None - ] - raise NoEventDataFound(f"Missing: {', '.join(missing)}") - def cleanup(self) -> None: """Clean up by checkin in all checked out providers.""" self.logger.info("Cleanup by checking in all checked out providers.") @@ -196,61 +184,6 @@ def get_constraint(recipe: dict, key: str) -> Any: return constraint.get("value") return None - def create_test_suite_dict(self) -> dict: - """Create a test suite dictionary based on test runners. - - I.e. If there is only one test_runner the dictionary would be:: - - { - "test_suite_name": { - "MyTestrunner": { - "docker": "MyTestrunner", - "priority": 1, - "unsplit_recipes": [...] - } - } - } - - Or two:: - - { - "test_suite_name": { - "MyTestrunner": { - "docker": "MyTestrunner", - "priority": 1, - "unsplit_recipes": [...] - }, - "MyOtherTestrunner": { - "docker": "MyOtherTestrunner", - "priority": 1, - "unsplit_recipes": [...] - } - } - } - - etc. - - :return: A test suite dictionary based on test runners. - :rtype: dict - """ - self.logger.info("Create new test suite dictionary.") - test_suites = {} - for test_suite in self.environment_provider_config.test_suite: - test_runners = test_suites.setdefault(test_suite.get("name"), {}) - - for recipe in test_suite.get("recipes", []): - test_runner = self.get_constraint(recipe, "TEST_RUNNER") - test_runners.setdefault( - test_runner, - { - "docker": test_runner, - "priority": test_suite.get("priority"), - "unsplit_recipes": [], - }, - ) - test_runners[test_runner]["unsplit_recipes"].append(recipe) - return test_suites - def set_total_test_count_and_test_runners(self, test_runners: dict) -> None: """Set total test count and test runners to be used by the splitter algorithm. @@ -262,19 +195,6 @@ def set_total_test_count_and_test_runners(self, test_runners: dict) -> None: self.etos.config.set("TOTAL_TEST_COUNT", total_test_count) self.etos.config.set("NUMBER_OF_TESTRUNNERS", len(test_runners.keys())) - def verify_json(self, json_data: Union[str, dict]) -> None: - """Verify that JSON data can be serialized properly. - - :param json_data: JSON data to test. - """ - try: - if isinstance(json_data, dict): - json_data = json.dumps(json_data) - json.loads(json_data) - except (json.decoder.JSONDecodeError, TypeError): - self.logger.error(json_data) - raise - def send_environment_events(self, url: str, sub_suite: dict) -> None: """Send environment defined events for the created sub suites. @@ -295,25 +215,114 @@ def send_environment_events(self, url: str, sub_suite: dict) -> None: suite = self.registry.testrun.join(f"suite/{sub_suite['test_suite_started_id']}") suite.join(f"/subsuite/{event_id}/suite").write(json.dumps(sub_suite)) - def upload_sub_suite(self, sub_suite: dict) -> str: + def upload_sub_suite(self, sub_suite: dict) -> tuple[str, dict]: """Upload sub suite to log area. :param sub_suite: Sub suite to upload to log area. :return: URI to file uploaded. """ + sub_suite = sub_suite.copy() + sub_suite["recipes"] = self.recipes_from_tests(sub_suite["recipes"]) try: with NamedTemporaryFile(mode="w", delete=False) as sub_suite_file: json.dump(sub_suite, sub_suite_file) log_area = LogArea(self.etos, sub_suite) - return log_area.upload( - sub_suite_file.name, - f"{sub_suite['name']}.json", - sub_suite["test_suite_started_id"], - sub_suite["sub_suite_id"], + return ( + log_area.upload( + sub_suite_file.name, + f"{sub_suite['name']}.json", + sub_suite["test_suite_started_id"], + sub_suite["sub_suite_id"], + ), + sub_suite, ) finally: os.remove(sub_suite_file.name) + def recipes_from_tests(self, tests: list[Test]) -> list[dict]: + """Load Eiffel TERCC recipes from test. + + :param tests: The tests defined in a Test model. + :return: A list of Eiffel TERCC recipes. + """ + recipes: list[dict] = [] + for test in tests: + recipes.append( + { + "id": test.id, + "testCase": test.testCase.model_dump(), + "constraints": [ + { + "key": "ENVIRONMENT", + "value": test.execution.environment, + }, + { + "key": "COMMAND", + "value": test.execution.command, + }, + { + "key": "EXECUTE", + "value": test.execution.execute, + }, + { + "key": "CHECKOUT", + "value": test.execution.checkout, + }, + { + "key": "PARAMETERS", + "value": test.execution.parameters, + }, + { + "key": "TEST_RUNNER", + "value": test.execution.testRunner, + }, + ], + } + ) + return recipes + + def create_environment_resource( + self, request: EnvironmentRequestSchema, sub_suite: dict + ) -> tuple[str, dict]: + """Create an environment resource in Kubernetes. + + :param sub_suite: Sub suite to add to Environment resource. + :return: URI to ETOS API for ETR to fetch resource. + """ + # In a valid sub suite all of these keys must exist + # making this a safe assumption + environment_id = sub_suite["executor"]["instructions"]["environment"]["ENVIRONMENT_ID"] + labels = request.metadata.labels or {} + labels["etos.eiffel-community.github.io/suite-id"] = sub_suite["test_suite_started_id"] + labels["etos.eiffel-community.github.io/sub-suite-id"] = sub_suite["sub_suite_id"] + owners = request.metadata.ownerReferences + owners.append( + OwnerReference( + kind="EnvironmentRequest", + name=request.metadata.name, + uid=request.metadata.uid, + apiVersion="etos.eiffel-community.github.io/v1alpha1", + controller=False, + blockOwnerDeletion=True, + ) + ) + environment = EnvironmentSchema( + metadata=Metadata( + name=environment_id, + namespace=request.metadata.namespace, + labels=labels, + ownerReferences=owners, + ), + spec=EnvironmentSpec(**sub_suite.copy()), + ) + environment_client = Environment(self.kubernetes) + if not environment_client.create(environment): + raise RuntimeError("Failed to create the environment for an etos testrun") + return ( + f"{os.getenv('ETOS_API')}/v1alpha/testrun/{environment_id}", + environment.spec.model_dump(), + ) + def checkout_an_execution_space(self) -> ExecutionSpace: """Check out a single execution space. @@ -330,12 +339,10 @@ def checkout_a_log_area(self) -> LogArea: def checkout_timeout(self) -> int: """Get timeout for checkout.""" - timeout = ( - self.etos.config.get("WAIT_FOR_IUT_TIMEOUT") - + self.etos.config.get("WAIT_FOR_EXECUTION_SPACE_TIMEOUT") - + self.etos.config.get("WAIT_FOR_LOG_AREA_TIMEOUT") - + 10 - ) + iut_timeout = self.etos.config.get("WAIT_FOR_IUT_TIMEOUT") + exec_timeout = self.etos.config.get("WAIT_FOR_EXECUTION_SPACE_TIMEOUT") + log_timeout = self.etos.config.get("WAIT_FOR_LOG_AREA_TIMEOUT") + timeout = iut_timeout + exec_timeout + log_timeout + 10 minutes, seconds = divmod(timeout, 60) hours, minutes = divmod(minutes, 60) @@ -351,19 +358,26 @@ def checkout_timeout(self) -> int: ) return endtime - def checkout( - self, test_suite_name: str, test_runners: dict, dataset: dict, main_suite_id: str - ) -> dict: + def checkout(self, request: EnvironmentRequestSchema) -> None: """Checkout an environment for a test suite. - :param test_suite_name: Name of the test suite. - :param test_runners: The test runners and corresponding unassigned tests. - :param dataset: The dataset for this particular checkout. - :param main_suite_id: The ID of the main suite that initiated this checkout. - :return: The test suite and environment json for this checkout. + A request can have multiple environments due to IUT availability or the amount of + unique test runners in the request. """ - self.logger.info("Checkout environment for %r", test_suite_name, extra={"user_log": True}) - self.new_dataset(dataset) + # pylint:disable=too-many-statements + self.logger.info("Checkout environment for %r", request.spec.name, extra={"user_log": True}) + self.new_dataset(request) + splitter = Splitter(self.etos, request.spec.splitter) + + # TODO: This is a hack to make the controller environment work without too many changes + # to the original code, since we want to run them at the same time. + test_runners = {} + for test in request.spec.splitter.tests: + test_runners.setdefault( + test.execution.testRunner, + {"docker": test.execution.testRunner, "priority": 1, "unsplit_recipes": []}, + ) + test_runners[test.execution.testRunner]["unsplit_recipes"].append(test) self.set_total_test_count_and_test_runners(test_runners) @@ -372,6 +386,7 @@ def checkout( self.etos.config.get("TOTAL_TEST_COUNT"), extra={"user_log": True}, ) + self.logger.info( "Total testrunners: %r", self.etos.config.get("NUMBER_OF_TESTRUNNERS"), @@ -393,8 +408,8 @@ def checkout( ) test_suite = TestSuite( - test_suite_name, - main_suite_id, + request.spec.name or "NoSuite", + request.spec.id, self.environment_provider_config, ) finished = [] @@ -405,7 +420,9 @@ def checkout( with self.tracer.start_as_current_span("request_iuts", kind=SpanKind.CLIENT) as span: # Check out and assign IUTs to test runners. iuts = self.iut_provider.wait_for_and_checkout_iuts( - minimum_amount=1, + minimum_amount=request.spec.minimumAmount, + # maximum_amount=request.spec.maximumAmount, + # TODO: Total test count changes, must check maximum_amount=self.dataset.get( "maximum_amount", os.getenv( @@ -414,10 +431,10 @@ def checkout( ), ), ) - self.splitter.assign_iuts(test_runners, iuts) + splitter.assign_iuts(test_runners, iuts) span.set_attribute(SemConvAttributes.IUT_DESCRIPTION, str(iuts)) - for test_runner in test_runners.keys(): + for test_runner in test_runners: # pylint:disable=consider-using-dict-items self.dataset.add("test_runner", test_runner) # No IUTs assigned to test runner @@ -444,14 +461,19 @@ def checkout( suite["log_area"] = self.checkout_a_log_area() # Split the tests into sub suites - self.splitter.split(test_runners[test_runner]) + splitter.split(test_runners[test_runner]) # Add sub suites to test suite structure and send environment events to the ESR. for iut, suite in test_runners[test_runner].get("iuts", {}).items(): sub_suite = test_suite.add( - test_runner, iut, suite, test_runners[test_runner]["priority"] + request, test_runner, iut, suite, test_runners[test_runner]["priority"] ) - self.send_environment_events(self.upload_sub_suite(sub_suite), sub_suite) + if self.environment_provider_config.etos_controller: + self.send_environment_events( + *self.create_environment_resource(request, sub_suite) + ) + else: + self.send_environment_events(*self.upload_sub_suite(sub_suite)) self.logger.info( "Environment for %r checked out and is ready for use", @@ -474,18 +496,11 @@ def checkout( else: raise TimeoutError("Could not check out an environment before timeout.") - test_suite_json = test_suite.to_json() - # Test that the test suite JSON is serializable so that the - # exception is caught here and not by the webserver. - # This makes sure that we can cleanup if anything breaks. - self.verify_json(test_suite_json) - self.logger.info( "All environments for test suite %r have been checked out", - test_suite_name, + request.spec.name, extra={"user_log": True}, ) - return test_suite_json def wait_for_main_suite(self, test_suite_id: str) -> dict: """Wait for main test suite started to be available in ER. @@ -500,55 +515,93 @@ def wait_for_main_suite(self, test_suite_id: str) -> dict: time.sleep(5) return main_suite - def _run(self) -> dict: - """Run the environment provider task. - - :return: Test suite JSON with assigned IUTs, execution spaces and log areas. - """ - suites = [] + def _run(self, request: EnvironmentRequestSchema) -> None: + """Run the environment provider task.""" error = None - test_suites = self.create_test_suite_dict() - - datasets = self.registry.dataset() - if isinstance(datasets, list): - assert len(datasets) == len( - test_suites - ), "If multiple datasets are provided it must correspond with number of test suites" - else: - datasets = [datasets] * len(test_suites) - - for test_suite_name, test_runners in test_suites.items(): - triggered = None - try: - main_suite = self.wait_for_main_suite(self.suite_runner_ids.pop(0)) + triggered = None + try: + main_suite_id = None + # When running as ETOS controller, we expect the suite runner to receive the + # Environment resource instead of relying on Eiffel. + if not self.environment_provider_config.etos_controller: + main_suite = self.wait_for_main_suite(request.spec.id) if main_suite is None: raise TimeoutError("Timed out while waiting for test suite started from ESR") main_suite_id = main_suite["meta"]["id"] + else: + # If running as ETOS controller, we will need to get the request ID for + # the suite runner to use when sending main suites. The main suite ID + # is sent to the Test Runner, so that the test runner can send its sub + # suite started events in a way that the suite runner can pick them up. + # TODO: This main suite id should be removed as in the future we cannot + # guarantee that it is allowed as a CONTEXT link or even an eiffel event + # id. It is currently required for ESR. + main_suite_id = request.spec.id + + links = {"CONTEXT": main_suite_id} if main_suite_id is not None else None + triggered = self.etos.events.send_activity_triggered( + f"Checkout environment for {request.spec.name}", + links, + executionType="AUTOMATED", + ) - triggered = self.etos.events.send_activity_triggered( - f"Checkout environment for {test_suite_name}", - {"CONTEXT": main_suite_id}, - executionType="AUTOMATED", - ) - - self.etos.config.set("environment_provider_context", triggered) - self.etos.events.send_activity_started(triggered) - - suites.append( - self.checkout(test_suite_name, test_runners, datasets.pop(0), main_suite_id) - ) - except Exception as exception: # pylint:disable=broad-except - error = exception - raise - finally: - if error is None: - outcome = {"conclusion": "SUCCESSFUL"} - else: - outcome = {"conclusion": "UNSUCCESSFUL", "description": str(error)} - if triggered is not None: - self.etos.events.send_activity_finished(triggered, outcome) - return {"suites": suites, "error": None} + self.etos.config.set("environment_provider_context", triggered) + self.etos.events.send_activity_started(triggered) + self.checkout(request) + except Exception as exception: # pylint:disable=broad-except + error = exception + raise + finally: + if error is None: + outcome = {"conclusion": "SUCCESSFUL"} + else: + outcome = {"conclusion": "UNSUCCESSFUL", "description": str(error)} + if triggered is not None: + self.etos.events.send_activity_finished(triggered, outcome) + + def _configure_provider(self, provider_db: ETCDPath, provider_spec: dict, name: str): + """Configure a single provider for a testrun.""" + self.logger.info("Saving provider with name %r in %r", name, provider_db) + provider_model = ProviderSchema.model_validate(provider_spec) + if provider_model.spec.jsontas: + ruleset = json.dumps({name: provider_model.to_jsontas()}) + else: + ruleset = json.dumps({name: provider_model.to_external()}) + provider_db.write(ruleset) + + def _configure_iut(self, provider_spec: dict): + """Configure iut provider for a testrun.""" + db = self.registry.testrun.join("provider/iut") # type: ignore + self._configure_provider(db, provider_spec, "iut") + + def _configure_log_area(self, provider_spec: dict): + """Configure log area provider for a testrun.""" + db = self.registry.testrun.join("provider/log-area") # type: ignore + self._configure_provider(db, provider_spec, "log") + + def _configure_execution_space(self, provider_spec: dict): + """Configure execution space provider for a testrun.""" + db = self.registry.testrun.join("provider/execution-space") # type: ignore + self._configure_provider(db, provider_spec, "execution_space") + + def _configure_dataset(self, datasets: list[dict]): + """Configure dataset for a testrun.""" + db = self.registry.testrun.join("provider/dataset") # type: ignore + db.write(json.dumps(datasets)) + + def configure_environment_provider(self, request: EnvironmentRequestSchema): + """Configure the environment provider if run as a part of the ETOS kubernetes controller.""" + self.logger.info("Running in an ETOS cluster - Configuring testrun") + provider_client = Provider(self.kubernetes) + + iut = provider_client.get(request.spec.providers.iut.id).to_dict() # type: ignore + self._configure_iut(iut) # type: ignore + log_area = provider_client.get(request.spec.providers.logArea.id).to_dict() # type: ignore + self._configure_log_area(log_area) # type: ignore + provider_id = request.spec.providers.executionSpace.id # type: ignore + execution_space = provider_client.get(provider_id).to_dict() # type: ignore + self._configure_execution_space(execution_space) # type: ignore def run(self) -> dict: """Run the environment provider task. @@ -559,8 +612,12 @@ def run(self) -> dict: :rtype: dict """ try: - self.configure(self.suite_id) - return self._run() + for request in self.environment_provider_config.requests: + if self.environment_provider_config.etos_controller: + self.configure_environment_provider(request) + self.configure(request) + self._run(request) + return {"error": None} except Exception as exception: # pylint:disable=broad-except self.cleanup() traceback.print_exc() @@ -572,3 +629,34 @@ def run(self) -> dict: if self.etos.publisher is not None and not self.etos.debug.disable_sending_events: self.etos.publisher.wait_for_unpublished_events() self.etos.publisher.stop() + + +def get_environment(): + """Entrypoint for getting an environment.""" + logformat = "[%(asctime)s] %(levelname)s:%(message)s" + logging.basicConfig( + level=logging.INFO, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" + ) + logging.getLogger("gql").setLevel(logging.WARNING) + try: + status = EnvironmentProvider().run() + if status.get("error") is not None: + raise EnvironmentProviderError(status.get("error")) + result = { + "conclusion": "Successful", + "description": "Successfully provisioned an environment", + } + with open("/dev/termination-log", "w", encoding="utf-8") as termination_log: + json.dump(result, termination_log) + except: + try: + result = {"conclusion": "Failed", "description": traceback.format_exc()} + with open("/dev/termination-log", "w", encoding="utf-8") as termination_log: + json.dump(result, termination_log) + except PermissionError: + pass + raise + + +if __name__ == "__main__": + get_environment() diff --git a/src/environment_provider/lib/config.py b/src/environment_provider/lib/config.py index 26d0b8a..406ac58 100644 --- a/src/environment_provider/lib/config.py +++ b/src/environment_provider/lib/config.py @@ -15,37 +15,53 @@ # limitations under the License. """ETOS Environment Provider configuration module.""" import logging -import os import time -from typing import Iterator, Union +import json +import os +from typing import Optional from etos_lib import ETOS -from packageurl import PackageURL - -from .graphql import request_activity_triggered, request_artifact_published, request_tercc - - -class Config: # pylint:disable=too-many-instance-attributes +from etos_lib.kubernetes.schemas.testrun import Suite +from etos_lib.kubernetes.schemas.environment_request import ( + EnvironmentRequest as EnvironmentRequestSchema, + EnvironmentRequestSpec, + EnvironmentProviders, + Splitter, +) +from etos_lib.kubernetes import Kubernetes, EnvironmentRequest +from etos_lib.kubernetes.schemas.common import Metadata +from jsontas.jsontas import JsonTas +from environment_provider.lib.registry import ProviderRegistry + +from .graphql import request_activity_triggered, request_artifact_created + + +class Config: """Environment provider configuration.""" logger = logging.getLogger("Config") - __test_suite = None - generated = False - artifact_created = None - artifact_published = None - activity_triggered = None - tercc = None - - def __init__(self, etos: ETOS, tercc_id: str) -> None: + __request = None + __activity_triggered = None + + def __init__(self, etos: ETOS, kubernetes: Kubernetes, ids: Optional[list[str]] = None) -> None: """Initialize with ETOS library and automatically load the config. :param etos: ETOS library instance. - :param tercc_id: ID of test execution recipe. + :param kubernetes: Kubernetes client. + :param ids: Suite runner IDs to correlate environment requests when not running in the + ETOS controller environment. Is set to None if executed by the ETOS controller. """ + self.kubernetes = kubernetes self.etos = etos + self.ids = ids self.load_config() - self.tercc_id = tercc_id - self.__generate() + + @property + def etos_controller(self) -> bool: + """Whether or not the environment provider is running as a part of the ETOS controller.""" + request = EnvironmentRequest(self.kubernetes) + request_name = os.getenv("REQUEST") + return request_name is not None and request.exists(request_name) def load_config(self) -> None: """Load config from environment variables.""" @@ -67,139 +83,144 @@ def load_config(self) -> None: value = float(value) self.etos.config.set(key.replace("ENVIRONMENT_PROVIDER_", ""), value) - def __search_for_node_typename( - self, response: dict, *nodes: list[str], key: str = "node" - ) -> Iterator[tuple[str, dict]]: - """Search for a graphql node by __typename. - - :param response: Response to search through. - :param nodes: Nodes to search for. - :param key: Name of the node key. - :return: Iterator - """ - for _, node in self.etos.utils.search(response, key): - if isinstance(node, dict) and node.get("__typename") in nodes: - yield node.get("__typename"), node - - def __get_node(self, response: dict, node: str, key: str) -> tuple[str, dict]: - """Get a single node from graphql response. - - :param response: Response to search through. - :param node: Node to search for. - :param key: Name of the node key. - :return: Tuple of node name(str) and node data(dict) - """ - try: - node_name, node = next(self.__search_for_node_typename(response, node, key=key)) - node = node.copy() - try: - node.pop("reverse") - except KeyError: - pass - return node_name, node - except StopIteration: - return "", {} - - def _validate_event_data(self) -> bool: - """Validate that the event data required for environment provider is set. - - :return: Whether event data is set or not. - """ - try: - assert self.tercc is not None - assert self.artifact_created is not None - assert self.activity_triggered is not None - return True - except AssertionError: - return False - - def __generate(self) -> None: - """Generate the event data required for the environment provider.""" - if self.generated is False: - self.logger.info("Generate event data from event storage.") - timeout = time.time() + self.etos.config.get("EVENT_DATA_TIMEOUT") - while not self._validate_event_data(): - self.logger.info("Waiting for event data.") - if time.time() > timeout: - self.logger.error("Timeout reached. Exiting.") - return None - - try: - response = request_tercc(self.etos, self.tercc_id) - node = response["testExecutionRecipeCollectionCreated"]["edges"][0]["node"] - node = node.copy() - node.pop("links") - self.tercc = node - _, self.artifact_created = self.__get_node(response, "ArtifactCreated", "links") - - response = request_activity_triggered(self.etos, self.tercc_id) - self.activity_triggered = response["activityTriggered"]["edges"][0]["node"] - - response = request_artifact_published(self.etos, self.artifact_id) - # ArtifactPublished is not required and can be None. - if response: - self.artifact_published = response["artifactPublished"]["edges"][0]["node"] - except: # noqa, pylint:disable=bare-except - pass - time.sleep(1) - self.generated = True + def __wait_for_activity(self) -> Optional[dict]: + """Wait for activity triggered event.""" + self.logger.info( + "Waiting for an activity triggered event - Timeout: %ds", + self.etos.config.get("EVENT_DATA_TIMEOUT"), + ) + timeout = time.time() + self.etos.config.get("EVENT_DATA_TIMEOUT") # type: ignore + while time.time() <= timeout: + time.sleep(1) + # The reason we select the first index in the list here is because of how the + # current way of running the environment provider works. Whereas the new controller + # based way of running will create a request per test suite, the current way + # will start the environment provider once for all test suites. We will create + # requests per test suite in this config, but they will hold mostly the same + # information, such as the identifier being the same on all requests. + testrun_id = self.requests[0].spec.identifier + self.logger.info("Testrun ID is %s", testrun_id) + response = request_activity_triggered(self.etos, testrun_id) + self.logger.info("Response from GraphQL query: %s", response) + if response is None: + self.logger.info("No response from event repository yet, retrying") + continue + edges = response.get("activityTriggered", {}).get("edges", []) + self.logger.info("Activity triggered edges found: %s", edges) + if len(edges) == 0: + self.logger.info("No activity triggered found yet, retrying") + continue + return edges[0]["node"] + self.logger.info( + "Activity triggered event not found after %ds", + self.etos.config.get("EVENT_DATA_TIMEOUT"), + ) return None + # TODO: The requests method shall not return a list in the future, this is just to + # keep the changes backwards compatible. + @property + def requests(self) -> list[EnvironmentRequestSchema]: + """Request returns the environment request, either from Eiffel TERCC or environment.""" + if self.__request is None: + if self.etos_controller: + request_client = EnvironmentRequest(self.kubernetes) + request_name = os.getenv("REQUEST") + assert request_name is not None, "Environment variable REQUEST must be set!" + self.__request = [ + EnvironmentRequestSchema.model_validate( + request_client.get(request_name).to_dict() # type: ignore + ) + ] + else: + # Whenever the environment provider is run as a part of the suite runner, + # this variable is set. + tercc = json.loads(os.getenv("TERCC", "{}")) + self.__request = self.__request_from_tercc(tercc) + return self.__request + @property def context(self) -> str: """Get activity triggered ID. :return: Activity Triggered ID """ + if self.__activity_triggered is None: + self.__activity_triggered = self.__wait_for_activity() + assert ( + self.__activity_triggered is not None + ), "ActivityTriggered must exist for the environment provider" try: - return self.activity_triggered["meta"]["id"] - except KeyError: - return "" - - @property - def artifact_id(self) -> str: - """Get artifact ID. - - :return: Artifact ID - """ - try: - return self.artifact_created["meta"]["id"] - except KeyError: - return "" - - @property - def identity(self) -> Union[PackageURL, str]: - """Get artifact identity. - - :return: Artifact identity. - """ - try: - return PackageURL.from_string(self.artifact_created["data"]["identity"]) + return self.__activity_triggered["meta"]["id"] except KeyError: return "" - @property - def test_suite(self) -> list[dict]: + def __request_from_tercc(self, tercc: dict) -> list[EnvironmentRequestSchema]: + """Create an environment request schema from a TERCC.""" + assert ( + self.ids is not None + ), "Suite runner IDs must be provided when running outside of controller" + requests = [] + artifact_id = tercc["links"][0]["target"] + response = request_artifact_created(self.etos, artifact_id) + assert response is not None, "ArtifactCreated must exist for the environment provider" + artifact = response["artifactCreated"]["edges"][0]["node"] + + test_suites = self.__test_suite(tercc) + + registry = ProviderRegistry(self.etos, JsonTas(), tercc["meta"]["id"]) + + datasets = registry.dataset() + if isinstance(datasets, list): + assert len(datasets) == len(test_suites), ( + "If multiple datasets are provided, the number of datasets must correspond " + "with number of test suites" + ) + else: + datasets = [datasets] * len(test_suites) + + for suite in test_suites: + requests.append( + EnvironmentRequestSchema( + metadata=Metadata(), + spec=EnvironmentRequestSpec( + id=self.ids.pop(0), + name=suite.get("name"), + identifier=tercc["meta"]["id"], + artifact=artifact_id, + identity=artifact["data"]["identity"], + minimumAmount=1, + maximumAmount=10, # TODO: Ignored in environment_provider.py + image="N/A", + imagePullPolicy="N/A", + splitter=Splitter(tests=Suite.tests_from_recipes(suite.get("recipes", []))), + dataset=datasets.pop(0), + providers=EnvironmentProviders(), + ), + ) + ) + return requests + + def __test_suite(self, tercc: dict) -> list[dict]: """Download and return test batches. :return: Batches. """ - if self.__test_suite is None: - try: - batch = self.tercc.get("data", {}).get("batches") - batch_uri = self.tercc.get("data", {}).get("batchesUri") - if batch is not None and batch_uri is not None: - raise ValueError("Only one of 'batches' or 'batchesUri' shall be set") - if batch is not None: - self.__test_suite = batch - elif batch_uri is not None: - response = self.etos.http.get( - batch_uri, - timeout=self.etos.config.get("TEST_SUITE_TIMEOUT"), - headers={"Accept": "application/json"}, - ) - response.raise_for_status() - self.__test_suite = response.json() - except AttributeError: - pass - return self.__test_suite if self.__test_suite else [] + try: + batch = tercc.get("data", {}).get("batches") + batch_uri = tercc.get("data", {}).get("batchesUri") + if batch is not None and batch_uri is not None: + raise ValueError("Only one of 'batches' or 'batchesUri' shall be set") + if batch is not None: + return batch + if batch_uri is not None: + response = self.etos.http.get( + batch_uri, + timeout=self.etos.config.get("TEST_SUITE_TIMEOUT"), + headers={"Accept": "application/json"}, + ) + response.raise_for_status() + return response.json() + return [] + except AttributeError: + return [] diff --git a/src/environment_provider/lib/graphql.py b/src/environment_provider/lib/graphql.py index 2fe7a67..c9e8d5a 100644 --- a/src/environment_provider/lib/graphql.py +++ b/src/environment_provider/lib/graphql.py @@ -78,6 +78,32 @@ def request_tercc(etos: ETOS, suite_id: str) -> Optional[dict]: return None +def request_artifact_created(etos: ETOS, artifact_id: str) -> Optional[dict]: + """Request an artifact created event from graphql. + + :param etos: ETOS library instance. + :param artifact_id: ID of artifact to request. + :return: Response from graphql or None + """ + query = """ +{ + artifactCreated(search: "{'meta.id': '%s'}") { + edges { + node { + data { + identity + } + } + } + } +} + """ + for response in request(etos, query % artifact_id): + if response: + return response + return None + + def request_activity_triggered(etos: ETOS, suite_id: str) -> Optional[dict]: """Request an activiy triggered event from graphql. diff --git a/src/environment_provider/lib/log_area.py b/src/environment_provider/lib/log_area.py index d7670ff..cb3bf64 100644 --- a/src/environment_provider/lib/log_area.py +++ b/src/environment_provider/lib/log_area.py @@ -28,7 +28,8 @@ from requests.exceptions import ConnectionError as RequestsConnectionError from requests.exceptions import HTTPError -# pylint:disable=too-many-arguments,too-many-positional-arguments +# pylint:disable=too-many-arguments +# pylint:disable=too-many-positional-arguments class LogArea: # pylint:disable=too-few-public-methods diff --git a/src/environment_provider/lib/registry.py b/src/environment_provider/lib/registry.py index 14c7226..a275ced 100644 --- a/src/environment_provider/lib/registry.py +++ b/src/environment_provider/lib/registry.py @@ -86,44 +86,45 @@ def validate(self, provider: dict, schema: str) -> dict: jsonschema.validate(instance=provider, schema=schema) return provider - def get_log_area_provider_by_id(self, provider_id: str) -> Optional[dict]: - """Get log area provider by name from the ETOS Database. + def get_log_area_provider(self) -> Optional[dict]: + """Get log area provider for a testrun from the ETOS Database. - Must have been registered with the /register endpoint. - - :param provider_id: ID of log area provider. :return: Provider JSON or None. """ - self.logger.info("Getting log area provider %r", provider_id) - provider = self.providers.join(f"log-area/{provider_id}").read() + if self.testrun is None: + self.logger.error( + "Could not retrieve log area provider from database, testrun is not set." + ) + return None + provider = self.testrun.join("provider/log-area").read() if provider: return json.loads(provider, object_pairs_hook=OrderedDict) return None - def get_iut_provider_by_id(self, provider_id: str) -> Optional[dict]: - """Get IUT provider by name from the ETOS Database. - - Must have been registered with the /register endpoint. + def get_iut_provider(self) -> Optional[dict]: + """Get IUT provider for testrun from the ETOS Database. - :param provider_id: ID of IUT provider. :return: Provider JSON or None. """ - self.logger.info("Getting iut provider %r", provider_id) - provider = self.providers.join(f"iut/{provider_id}").read() + if self.testrun is None: + self.logger.error("Could not retrieve IUT provider from database, testrun is not set.") + return None + provider = self.testrun.join("provider/iut").read() if provider: return json.loads(provider, object_pairs_hook=OrderedDict) return None - def get_execution_space_provider_by_id(self, provider_id: str) -> Optional[dict]: + def get_execution_space_provider(self) -> Optional[dict]: """Get execution space provider by name from the ETOS Database. - Must have been registered with the /register endpoint. - - :param provider_id: ID of execution space provider. :return: Provider JSON or None. """ - self.logger.info("Getting execution space provider %r", provider_id) - provider = self.providers.join(f"execution-space/{provider_id}").read() + if self.testrun is None: + self.logger.error( + "Could not retrieve execution space provider from database, testrun is not set." + ) + return None + provider = self.testrun.join("provider/execution-space").read() if provider: return json.loads(provider, object_pairs_hook=OrderedDict) return None diff --git a/src/environment_provider/lib/releaser.py b/src/environment_provider/lib/releaser.py new file mode 100644 index 0000000..a2ea10f --- /dev/null +++ b/src/environment_provider/lib/releaser.py @@ -0,0 +1,275 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Releaser of environments.""" +import logging +from jsontas.jsontas import JsonTas +from opentelemetry import trace +from pydantic import ValidationError +from etos_lib.kubernetes.schemas import Environment as EnvironmentSchema +from etos_lib.kubernetes.schemas import Provider as ProviderSchema +from etos_lib.kubernetes import Kubernetes, Environment, Provider +from etos_lib import ETOS +from execution_space_provider import ExecutionSpaceProvider +from execution_space_provider.exceptions import ExecutionSpaceCheckinFailed +from execution_space_provider.execution_space import ExecutionSpace +from iut_provider import IutProvider +from iut_provider.exceptions import IutCheckinFailed +from iut_provider.iut import Iut as IutSpec +from log_area_provider import LogAreaProvider +from log_area_provider.exceptions import LogAreaCheckinFailed +from log_area_provider.log_area import LogArea as LogAreaSpec + +TRACER = trace.get_tracer(__name__) + + +class ReleaseError(Exception): + """Error when releasing environment.""" + + +class Releaser: + """Releaser is a tool for releasing environments that have been checked out by ETOS.""" + + logger = logging.getLogger(__name__) + + def __init__(self, etos: ETOS, environment: EnvironmentSchema): + """Set up releaser.""" + self.environment = environment + self.etos = etos + self.jsontas = JsonTas() + self.kubernetes = Kubernetes() + self.provider_client = Provider(self.kubernetes) + + def provider(self, provider_id: str) -> dict: + """Get a provider by ID from Kubernetes or a database.""" + provider = self.provider_client.get(provider_id) + assert provider is not None, f"Could not find a provider with ID {provider_id!r}" + provider_model = ProviderSchema.model_validate(provider.to_dict()) + if provider_model.spec.jsontas: + return provider_model.to_jsontas() + return provider_model.to_external() + + def run(self) -> None: + """Run a release task for ETOS.""" + raise NotImplementedError() + + +class Iut(Releaser): + """Iut releases IUTs checked out by ETOS.""" + + logger = logging.getLogger(__name__) + + def get_provider(self) -> IutProvider: + """Get provider returns an IUT provider using the provider model.""" + ruleset = self.environment.spec.iut + assert ruleset is not None, f"There is no IUT field in environment {self.environment!r}" + self.logger.info("Releasing IUT with ruleset: %r", ruleset) + provider_id = ruleset.get("provider_id", "") + self.logger.info("Provider to use for release: %r", provider_id) + provider_model = self.provider(provider_id) + self.logger.info("Model to use for release: %r", provider_model) + return IutProvider(self.etos, self.jsontas, provider_model) # type: ignore + + def release(self): + """Release an IUT.""" + ruleset = self.environment.spec.iut + provider_id = ruleset.get("provider_id", "") + try: + provider = self.get_provider() + except AssertionError: + self.logger.exception("Missing IUT provider") + raise + except ValidationError: + self.logger.exception( + "The schema of IUT provider with id %r could not be validated", + provider_id, + ) + return + + self.logger.info("Initializing release of IUT %r", ruleset) + try: + provider.checkin(IutSpec(**ruleset)) + self.logger.info("Successfully released IUT") + except IutCheckinFailed: + self.logger.error("Failed to release IUT %r with provider %r", ruleset, provider_id) + raise + + def run(self): + """Run releases IUTs that ETOS has checked out for an environment.""" + with TRACER.start_as_current_span(name="stop_iuts", kind=trace.SpanKind.CLIENT) as span: + try: + self.release() + except Exception as exception: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + raise + + +class Executor(Releaser): + """Executor releases execution spaces checked out by ETOS.""" + + logger = logging.getLogger(__name__) + + def get_provider(self) -> ExecutionSpaceProvider: + """Get provider returns an execution space provider using the provider model.""" + ruleset = self.environment.spec.executor + assert ( + ruleset is not None + ), f"There is no executor field in environment {self.environment!r}" + self.logger.info("Releasing executor with ruleset: %r", ruleset) + provider_id = ruleset.get("provider_id", "") + self.logger.info("Provider to use for release: %r", provider_id) + provider_model = self.provider(provider_id) + return ExecutionSpaceProvider(self.etos, self.jsontas, provider_model) # type: ignore + + def release(self): + """Release an executor.""" + ruleset = self.environment.spec.executor + provider_id = ruleset.get("provider_id", "") + try: + provider = self.get_provider() + except AssertionError: + self.logger.exception("Missing executor provider") + raise + except ValidationError: + self.logger.exception( + "The schema of execution space provider with id %r could not be validated", + provider_id, + ) + return + + self.logger.info("Initializing release of executor %r", ruleset) + try: + provider.checkin(ExecutionSpace(**ruleset)) + self.logger.info("Successfully released executor") + except ExecutionSpaceCheckinFailed: + self.logger.error( + "Failed to release executor %r with provider %r", ruleset, provider_id + ) + raise + + def run(self): + """Run releases executors that ETOS has checked out for an environment.""" + with TRACER.start_as_current_span( + name="stop_execution_space", kind=trace.SpanKind.CLIENT + ) as span: + try: + self.release() + except Exception as exception: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + raise + + +class LogArea(Releaser): + """Logarea releases log areas checked out by ETOS.""" + + logger = logging.getLogger(__name__) + + def get_provider(self) -> LogAreaProvider: + """Get provider returns an log area provider using the provider model.""" + ruleset = self.environment.spec.log_area + assert ( + ruleset is not None + ), f"There is no log area field in environment {self.environment!r}" + self.logger.info("Releasing log area with ruleset: %r", ruleset) + provider_id = ruleset.get("provider_id", "") + self.logger.info("Provider to use for release: %r", provider_id) + provider_model = self.provider(provider_id) + return LogAreaProvider(self.etos, self.jsontas, provider_model) # type: ignore + + def release(self): + """Release an executor.""" + ruleset = self.environment.spec.log_area + provider_id = ruleset.get("provider_id", "") + try: + provider = self.get_provider() + except AssertionError: + self.logger.exception("Missing log area provider") + raise + except ValidationError: + self.logger.exception( + "The schema of log area provider with id %r could not be validated", + provider_id, + ) + return + + self.logger.info("Initializing release of log area %r", ruleset) + try: + provider.checkin(LogAreaSpec(**ruleset)) + self.logger.info("Successfully released log area") + except LogAreaCheckinFailed: + self.logger.error( + "Failed to release log area %r with provider %r", ruleset, provider_id + ) + raise + + def run(self): + """Run releases log areas that ETOS has checked out for an environment.""" + with TRACER.start_as_current_span(name="stop_log_area", kind=trace.SpanKind.CLIENT) as span: + try: + self.release() + except Exception as exception: + span.record_exception(exception) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + self.logger.exception("Release failed") + raise + + +class EnvironmentReleaser: + """Release environments checked out by ETOS.""" + + logger = logging.getLogger(__name__) + + def environment(self, environment_id: str) -> EnvironmentSchema: + """Environment gets an environment from kubernetes with environment_id as name.""" + client = Environment(Kubernetes()) + environment = client.get(environment_id).to_dict() # type: ignore + return EnvironmentSchema.model_validate(environment) + + def run(self, environment_id: str): + """Run the releaser. It will check which type of environment and release it.""" + self.logger.info("Running the environment releaser") + etos = ETOS("", "", "") + + self.logger.info("Releasing environment based of an environment ID: %r", environment_id) + try: + environment = self.environment(environment_id) + except AttributeError: + self.logger.exception( + "Could not find Environment with id %r in Kubernetes. " + "Trying to release something that's already released?", + environment_id, + ) + return + except ValidationError: + self.logger.exception( + "The schema of Environment with id %r could not be validated", + environment_id, + ) + return + etos.config.set("SUITE_ID", environment.spec.suite_id) + tasks = [Iut(etos, environment), LogArea(etos, environment), Executor(etos, environment)] + + exceptions = [] + for task in tasks: + self.logger.info("Running release task on %r", type(task).__name__) + try: + task.run() + except Exception as exception: # pylint:disable=broad-exception-caught + self.logger.error("Task %r failed", type(task).__name__) + exceptions.append(exception) + if exceptions: + raise ReleaseError("Some or all release tasks failed") diff --git a/src/environment_provider/lib/test_suite.py b/src/environment_provider/lib/test_suite.py index 36694df..e49f94a 100644 --- a/src/environment_provider/lib/test_suite.py +++ b/src/environment_provider/lib/test_suite.py @@ -14,14 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. """Test suite module.""" +from etos_lib.kubernetes.schemas.environment_request import EnvironmentRequest from iut_provider.iut import Iut from .config import Config # pylint:disable=line-too-long +# pylint:disable=too-many-arguments +# pylint:disable=too-many-positional-arguments -class TestSuite: +class TestSuite: # pylint:disable=too-few-public-methods """Test suite representation. The resulting test suite might look something like this:: @@ -142,7 +145,9 @@ def __init__( self.suite_runner_id = suite_runner_id self.environment_provider_config = environment_provider_config - def add(self, test_runner: str, iut: Iut, suite: dict, priority: int) -> dict: + def add( + self, request: EnvironmentRequest, test_runner: str, iut: Iut, suite: dict, priority: int + ) -> dict: """Add a new sub suite to suite. :param test_runner: The test runner to use for sub suite. @@ -153,21 +158,17 @@ def add(self, test_runner: str, iut: Iut, suite: dict, priority: int) -> dict: """ sub_suite = { "name": f"{self.test_suite_name}_SubSuite_{len(self._suite['sub_suites'])}", - "suite_id": self.environment_provider_config.tercc_id, + "suite_id": request.spec.identifier, "sub_suite_id": suite.get("sub_suite_id"), "test_suite_started_id": self.suite_runner_id, "priority": priority, "recipes": suite.get("recipes", []), "test_runner": test_runner, "iut": iut.as_dict, - "artifact": self.environment_provider_config.artifact_id, + "artifact": request.spec.artifact, "context": self.environment_provider_config.context, "executor": suite.get("executor").as_dict, "log_area": suite.get("log_area").as_dict, } self._suite["sub_suites"].append(sub_suite) return sub_suite - - def to_json(self) -> dict: - """Return test suite as a JSON dictionary.""" - return self._suite diff --git a/src/environment_provider/splitter/split.py b/src/environment_provider/splitter/split.py index 13f2a30..e07e105 100644 --- a/src/environment_provider/splitter/split.py +++ b/src/environment_provider/splitter/split.py @@ -19,6 +19,7 @@ from typing import Any, Iterable, Iterator from etos_lib import ETOS +from etos_lib.kubernetes.schemas.environment_request import Splitter as SplitterSchema from iut_provider.iut import Iut @@ -26,14 +27,15 @@ class Splitter: """Environment provider test suite splitter.""" - def __init__(self, etos: ETOS, ruleset: dict) -> None: + def __init__(self, etos: ETOS, config: SplitterSchema) -> None: """Initialize with etos library and splitter ruleset. :param etos: ETOS library instance. :param ruleset: JSONTas ruleset for handling splitter algorithms. """ self.etos = etos - self.ruleset = ruleset + self.config = config + self.unsplit_tests = self.config.tests @staticmethod def _iterator(iterable: Iterable) -> Iterator[Any]: diff --git a/src/execution_space_provider/utilities/external_provider.py b/src/execution_space_provider/utilities/external_provider.py index ea4dc71..f4a8ebc 100644 --- a/src/execution_space_provider/utilities/external_provider.py +++ b/src/execution_space_provider/utilities/external_provider.py @@ -147,6 +147,12 @@ def checkin(self, execution_space: ExecutionSpace) -> None: if response.status_code == requests.codes["no_content"]: return response = response.json() + if isinstance(response, str): + exc = ExecutionSpaceCheckinFailed( + f"Unable to check in {execution_spaces} ({response})" + ) + self._record_exception(exc) + raise exc if response.get("error") is not None: exc = ExecutionSpaceCheckinFailed( f"Unable to check in {execution_spaces} " f"({response.get('error')})" @@ -189,7 +195,7 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: etos_rabbitmq_password = etos_rabbitmq.get("password", "") if os.getenv("ETOS_ENCRYPTION_KEY") is not None: rabbitmq_password = encrypt( - rabbitmq_password.encode(), os.getenv("ETOS_ENCRYPTION_KEY") + rabbitmq_password.encode(), os.getenv("ETOS_ENCRYPTION_KEY", "") ) etos_rabbitmq_password = encrypt( etos_rabbitmq_password.encode(), os.getenv("ETOS_ENCRYPTION_KEY", "") @@ -223,16 +229,16 @@ def start(self, minimum_amount: int, maximum_amount: int) -> str: ), }, "artifact_id": self.dataset.get("artifact_id"), - "artifact_created": self.dataset.get("artifact_created"), - "artifact_published": self.dataset.get("artifact_published"), - "tercc": self.dataset.get("tercc"), + "artifact_created": self.dataset.get("artifact_created") or {}, + "artifact_published": self.dataset.get("artifact_published") or {}, + "tercc": self.dataset.get("tercc") or {}, "dataset": self.dataset.get("dataset"), "context": self.dataset.get("context"), } host = self.ruleset.get("start", {}).get("host") headers = {"X-ETOS-ID": self.identifier} TraceContextTextMapPropagator().inject(headers) - span = opentelemetry.trace.get_current_span() + span = opentelemetry.trace.get_current_span() # type:ignore span.set_attribute(SpanAttributes.HTTP_HOST, host) span.set_attribute("http.request.body", json.dumps(data)) for header, value in headers.items(): diff --git a/src/iut_provider/utilities/prepare.py b/src/iut_provider/utilities/prepare.py index be4c99d..c728324 100644 --- a/src/iut_provider/utilities/prepare.py +++ b/src/iut_provider/utilities/prepare.py @@ -41,10 +41,7 @@ def __init__(self, jsontas: JsonTas, prepare_ruleset: dict) -> None: self.prepare_ruleset = prepare_ruleset self.jsontas = jsontas self.dataset = self.jsontas.dataset - # Pop the config as it contains values that are not pickleable. - # Pickle is used by deepcopy inside of 'dataset.copy' which is required - # during the preparation step. - self.config = self.dataset._Dataset__dataset.pop("config") + self.suite_id = self.dataset.get("config", {}).get("SUITE_ID") def execute_preparation_steps(self, iut: Iut, preparation_steps: dict) -> tuple[bool, Iut]: """Execute the preparation steps for the environment provider on an IUT. @@ -52,11 +49,10 @@ def execute_preparation_steps(self, iut: Iut, preparation_steps: dict) -> tuple[ :param iut: IUT to prepare for execution. :param preparation_steps: Steps to execute to prepare an IUT. """ - FORMAT_CONFIG.identifier = self.config.get("SUITE_ID") + FORMAT_CONFIG.identifier = self.suite_id try: with self.lock: dataset = self.dataset.copy() - dataset.add("config", self.config) jsontas = JsonTas(dataset=dataset) steps = {} dataset.add("iut", iut) @@ -85,33 +81,29 @@ def prepare(self, iuts: list[Iut]) -> tuple[list[Iut], list[Iut]]: """ iuts = deepcopy(iuts) failed_iuts = [] - try: - if not self.prepare_ruleset: - self.logger.info("No defined preparation rule.") - return iuts, [] - thread_pool = ThreadPool() + if not self.prepare_ruleset: + self.logger.info("No defined preparation rule.") + return iuts, [] + thread_pool = ThreadPool() - stages = self.prepare_ruleset.get("stages", {}) - steps = stages.get("environment_provider", {}).get("steps", {}) - results = [] - for iut in reversed(iuts): - self.logger.info("Preparing IUT %r", iut) - results.append( - thread_pool.apply_async( - self.execute_preparation_steps, - args=(iut, deepcopy(steps)), - ) + stages = self.prepare_ruleset.get("stages", {}) + steps = stages.get("environment_provider", {}).get("steps", {}) + results = [] + for iut in reversed(iuts): + self.logger.info("Preparing IUT %r", iut) + results.append( + thread_pool.apply_async( + self.execute_preparation_steps, + args=(iut, deepcopy(steps)), ) - for result in results: - success, iut = result.get() - if not success: - self.logger.error("Unable to prepare %r.", iut) - iuts.remove(iut) - failed_iuts.append(iut) - else: - iut.update(**deepcopy(stages)) - self.dataset.add("iuts", deepcopy(iuts)) - return iuts, failed_iuts - finally: - # Re-add the config that was popped in __init__. - self.dataset.add("config", self.config) + ) + for result in results: + success, iut = result.get() + if not success: + self.logger.error("Unable to prepare %r.", iut) + iuts.remove(iut) + failed_iuts.append(iut) + else: + iut.update(**deepcopy(stages)) + self.dataset.add("iuts", deepcopy(iuts)) + return iuts, failed_iuts diff --git a/tests/library/fake_server.py b/tests/library/fake_server.py index f4a4ff5..4306ffe 100644 --- a/tests/library/fake_server.py +++ b/tests/library/fake_server.py @@ -121,7 +121,7 @@ def __enter__(self): self.port = self.__free_port() self.mock_server = HTTPServer(("localhost", self.port), self.handler) self.thread = Thread(target=self.mock_server.serve_forever) - self.thread.setDaemon(True) + self.thread.daemon = True self.thread.start() self.mock_server.RequestHandlerClass.parent = self self.mock_server.RequestHandlerClass.response_code = self.status_name diff --git a/tests/library/graphql_handler.py b/tests/library/graphql_handler.py index e9543d0..decbe25 100644 --- a/tests/library/graphql_handler.py +++ b/tests/library/graphql_handler.py @@ -107,6 +107,28 @@ def artifact_published(self): """ return {"data": {"artifactPublished": {"edges": [{"node": {"data": {"locations": []}}}]}}} + def artifact_created(self): + """Create a fake artifact created event. + + :return: A graphql response with an artifact created event. + :rtype dict: + """ + artifact_id = self.tercc["links"][0]["target"] + return { + "data": { + "artifactCreated": { + "edges": [ + { + "node": { + "data": {"identity": "pkg:test/environment-provider"}, + "meta": {"id": artifact_id}, + } + } + ] + } + } + } + def test_suite_started(self): """Create a fake test suite started to simulate ESR. @@ -126,10 +148,10 @@ def do_POST(self): request_data = self.rfile.read(int(self.headers["Content-Length"])) query_name = self.get_gql_query(request_data) - if query_name == "testExecutionRecipeCollectionCreated": - response = self.test_execution_recipe_collection_created() - elif query_name == "activityTriggered": + if query_name == "activityTriggered": response = self.activity_triggered() + elif query_name == "artifactCreated": + response = self.artifact_created() elif query_name == "artifactPublished": response = self.artifact_published() elif query_name == "testSuiteStarted": diff --git a/tests/splitter/test_splitter.py b/tests/splitter/test_splitter.py index 67594d1..ba43f7c 100644 --- a/tests/splitter/test_splitter.py +++ b/tests/splitter/test_splitter.py @@ -18,8 +18,10 @@ import unittest from etos_lib import ETOS +from etos_lib.kubernetes.schemas.environment_request import Splitter as SplitterSchema from environment_provider.splitter.split import Splitter +from iut_provider.iut import Iut class TestSplitter(unittest.TestCase): @@ -37,7 +39,7 @@ def test_assign_iuts(self) -> None: 1. Assign IUTs to the provided test runners. 2. Verify that no test runner get 0 assigned IUTs. """ - iuts = ["iut1", "iut2"] + iuts = [Iut(name="iut1"), Iut(name="iut2")] test_runners = { "runner1": {"iuts": {}, "unsplit_recipes": [1]}, "runner2": {"iuts": {}, "unsplit_recipes": [2, 3, 4, 5]}, @@ -48,7 +50,7 @@ def test_assign_iuts(self) -> None: etos.config.set("NUMBER_OF_IUTS", len(iuts)) self.logger.info("STEP: Assign IUTs to the provided test runners.") - _ = Splitter(etos, {}).assign_iuts(test_runners, iuts) + _ = Splitter(etos, SplitterSchema(tests=[])).assign_iuts(test_runners, iuts) self.logger.info("STEP: Verify that no test runner get 0 assigned IUTs.") for test_runner in test_runners.values(): diff --git a/tests/test_environment_provider.py b/tests/test_environment_provider.py index c63cba8..e524526 100644 --- a/tests/test_environment_provider.py +++ b/tests/test_environment_provider.py @@ -19,6 +19,7 @@ import logging import os import unittest +from mock import patch from etos_lib.lib.config import Config from etos_lib.lib.debug import Debug @@ -123,7 +124,8 @@ def tearDown(self): Debug()._Debug__events_published.clear() Debug()._Debug__events_received.clear() - def test_get_environment_sub_suites(self): + @patch("environment_provider.environment_provider.Kubernetes") + def test_get_environment_sub_suites(self, _): """Test environment provider with 2 different sub suites. Approval criteria: @@ -158,9 +160,10 @@ def test_get_environment_sub_suites(self): ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host os.environ["ETOS_API"] = server.host + os.environ["TERCC"] = json.dumps(tercc) self.logger.info("STEP: Run the environment provider.") - environment_provider = EnvironmentProvider(suite_id, suite_runner_ids) + environment_provider = EnvironmentProvider(suite_runner_ids) result = environment_provider.run() print(result) self.assertIsNone(result.get("error")) @@ -172,7 +175,8 @@ def test_get_environment_sub_suites(self): environments.append(event) self.assertEqual(len(environments), 2) - def test_get_environment(self): + @patch("environment_provider.environment_provider.Kubernetes") + def test_get_environment(self, _): """Test environment provider with single sub suites. Approval criteria: @@ -207,9 +211,10 @@ def test_get_environment(self): ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host os.environ["ETOS_API"] = server.host + os.environ["TERCC"] = json.dumps(tercc) self.logger.info("STEP: Run the environment provider.") - environment_provider = EnvironmentProvider(suite_id, suite_runner_ids) + environment_provider = EnvironmentProvider(suite_runner_ids) result = environment_provider.run() print(result) self.assertIsNone(result.get("error")) @@ -221,7 +226,8 @@ def test_get_environment(self): environments.append(event) self.assertEqual(len(environments), 1) - def test_get_environment_permutation(self): + @patch("environment_provider.environment_provider.Kubernetes") + def test_get_environment_permutation(self, _): """Test environment provider with 2 different environments for 2 permutations. Approval criteria: @@ -259,9 +265,10 @@ def test_get_environment_permutation(self): ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host os.environ["ETOS_API"] = server.host + os.environ["TERCC"] = json.dumps(tercc) self.logger.info("STEP: Run the environment provider.") - environment_provider = EnvironmentProvider(suite_id, suite_runner_ids) + environment_provider = EnvironmentProvider(suite_runner_ids) result = environment_provider.run() self.assertIsNone(result.get("error")) @@ -272,7 +279,8 @@ def test_get_environment_permutation(self): environments.append(event) self.assertEqual(len(environments), 2) - def test_get_environment_sub_suites_sequential(self): + @patch("environment_provider.environment_provider.Kubernetes") + def test_get_environment_sub_suites_sequential(self, _): """Test environment provider with 2 different sub suites sequentially. Approval criteria: @@ -309,9 +317,10 @@ def test_get_environment_sub_suites_sequential(self): ) os.environ["ETOS_GRAPHQL_SERVER"] = server.host os.environ["ETOS_API"] = server.host + os.environ["TERCC"] = json.dumps(tercc) self.logger.info("STEP: Run the environment provider.") - environment_provider = EnvironmentProvider(suite_id, suite_runner_ids) + environment_provider = EnvironmentProvider(suite_runner_ids) result = environment_provider.run() print(result) self.assertIsNone(result.get("error")) diff --git a/tox.ini b/tox.ini index 9eff780..7b74224 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ setenv = ETOS_GRAPHQL_SERVER=http://localhost/no ETOS_API=http://localhost/nah ETOS_ENVIRONMENT_PROVIDER=http://localhost/nuhuh + ETOS_NAMESPACE=something deps = -r{toxinidir}/test-requirements.txt commands =