diff --git a/src/easylink/implementation_metadata.yaml b/src/easylink/implementation_metadata.yaml index 94019b77..613d621b 100644 --- a/src/easylink/implementation_metadata.yaml +++ b/src/easylink/implementation_metadata.yaml @@ -29,6 +29,24 @@ step_4_python_pandas: INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" outputs: step_4_main_output: result.parquet +step_5_python_pandas: + steps: + - step_5 + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif + script_cmd: python /dummy_step.py + env: + INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" + outputs: + step_5_main_output: result.parquet +step_6_python_pandas: + steps: + - step_6 + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif + script_cmd: python /dummy_step.py + env: + INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" + outputs: + step_6_main_output: result.parquet step_4a_python_pandas: steps: - step_4a @@ -39,6 +57,15 @@ step_4a_python_pandas: outputs: step_4a_main_output: result.parquet step_4b_python_pandas: + steps: + - step_4b + image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/r-image.sif + script_cmd: Rscript /dummy_step.R + env: + INPUT_ENV_VARS: "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS,DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS" + outputs: + step_4b_main_output: result.parquet +step_4b_r: steps: - step_4b image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif @@ -139,4 +166,4 @@ step_3_and_step_4_combined_python_pandas: image_path: /mnt/team/simulation_science/priv/engineering/er_ecosystem/images/python_pandas.sif script_cmd: python /dummy_step.py outputs: - step_4_main_output: result.parquet \ No newline at end of file + step_4_main_output: result.parquet diff --git a/src/easylink/pipeline_schema_constants/development.py b/src/easylink/pipeline_schema_constants/development.py index 1bd2b915..7a64dc83 100644 --- a/src/easylink/pipeline_schema_constants/development.py +++ b/src/easylink/pipeline_schema_constants/development.py @@ -6,6 +6,7 @@ OutputSlotMapping, ) from easylink.step import ( + ChoiceStep, HierarchicalStep, InputStep, LoopStep, @@ -62,87 +63,177 @@ ) ], ), - HierarchicalStep( - step_name="step_4", + ChoiceStep( + step_name="choice_section", input_slots=[ InputSlot( - name="step_4_main_input", + name="choice_section_main_input", env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", validator=validate_input_file_dummy, ), InputSlot( - name="step_4_secondary_input", + name="choice_section_secondary_input", env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", validator=validate_input_file_dummy, ), ], - output_slots=[OutputSlot("step_4_main_output")], - nodes=[ - Step( - step_name="step_4a", - input_slots=[ - InputSlot( - name="step_4a_main_input", - env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", - validator=validate_input_file_dummy, + output_slots=[OutputSlot("choice_section_main_output")], + choices={ + "simple": { + "nodes": [ + HierarchicalStep( + step_name="step_4", + input_slots=[ + InputSlot( + name="step_4_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="step_4_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_4_main_output")], + nodes=[ + Step( + step_name="step_4a", + input_slots=[ + InputSlot( + name="step_4a_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="step_4a_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_4a_main_output")], + ), + Step( + step_name="step_4b", + input_slots=[ + InputSlot( + name="step_4b_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="step_4b_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_4b_main_output")], + ), + ], + edges=[ + EdgeParams( + source_node="step_4a", + target_node="step_4b", + output_slot="step_4a_main_output", + input_slot="step_4b_main_input", + ), + ], + input_slot_mappings=[ + InputSlotMapping( + parent_slot="step_4_main_input", + child_node="step_4a", + child_slot="step_4a_main_input", + ), + InputSlotMapping( + parent_slot="step_4_secondary_input", + child_node="step_4a", + child_slot="step_4a_secondary_input", + ), + InputSlotMapping( + parent_slot="step_4_secondary_input", + child_node="step_4b", + child_slot="step_4b_secondary_input", + ), + ], + output_slot_mappings=[ + OutputSlotMapping( + parent_slot="step_4_main_output", + child_node="step_4b", + child_slot="step_4b_main_output", + ), + ], ), - InputSlot( - name="step_4a_secondary_input", - env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", - validator=validate_input_file_dummy, + ], + "edges": [], + "input_slot_mappings": [ + InputSlotMapping( + parent_slot="choice_section_main_input", + child_node="step_4", + child_slot="step_4_main_input", + ), + InputSlotMapping( + parent_slot="choice_section_secondary_input", + child_node="step_4", + child_slot="step_4_secondary_input", ), ], - output_slots=[OutputSlot("step_4a_main_output")], - ), - Step( - step_name="step_4b", - input_slots=[ - InputSlot( - name="step_4b_main_input", - env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", - validator=validate_input_file_dummy, + "output_slot_mappings": [ + OutputSlotMapping( + parent_slot="choice_section_main_output", + child_node="step_4", + child_slot="step_4_main_output", ), - InputSlot( - name="step_4b_secondary_input", - env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", - validator=validate_input_file_dummy, + ], + }, + "complex": { + "nodes": [ + Step( + step_name="step_5", + input_slots=[ + InputSlot( + name="step_5_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_5_main_output")], + ), + Step( + step_name="step_6", + input_slots=[ + InputSlot( + name="step_6_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_6_main_output")], ), ], - output_slots=[OutputSlot("step_4b_main_output")], - ), - ], - edges=[ - EdgeParams( - source_node="step_4a", - target_node="step_4b", - output_slot="step_4a_main_output", - input_slot="step_4b_main_input", - ), - ], - input_slot_mappings=[ - InputSlotMapping( - parent_slot="step_4_main_input", - child_node="step_4a", - child_slot="step_4a_main_input", - ), - InputSlotMapping( - parent_slot="step_4_secondary_input", - child_node="step_4a", - child_slot="step_4a_secondary_input", - ), - InputSlotMapping( - parent_slot="step_4_secondary_input", - child_node="step_4b", - child_slot="step_4b_secondary_input", - ), - ], - output_slot_mappings=[ - OutputSlotMapping( - parent_slot="step_4_main_output", - child_node="step_4b", - child_slot="step_4b_main_output", - ), - ], + "edges": [ + EdgeParams( + source_node="step_5", + target_node="step_6", + output_slot="step_5_main_output", + input_slot="step_6_main_input", + ), + ], + "input_slot_mappings": [ + InputSlotMapping( + parent_slot="choice_section_main_input", + child_node="step_5", + child_slot="step_5_main_input", + ), + ], + "output_slot_mappings": [ + OutputSlotMapping( + parent_slot="choice_section_main_output", + child_node="step_6", + child_slot="step_6_main_output", + ), + ], + }, + }, ), OutputStep( input_slots=[ @@ -159,9 +250,9 @@ ), EdgeParams( source_node="input_data", - target_node="step_4", + target_node="choice_section", output_slot="all", - input_slot="step_4_secondary_input", + input_slot="choice_section_secondary_input", ), EdgeParams( source_node="step_1", @@ -177,14 +268,14 @@ ), EdgeParams( source_node="step_3", - target_node="step_4", + target_node="choice_section", output_slot="step_3_main_output", - input_slot="step_4_main_input", + input_slot="choice_section_main_input", ), EdgeParams( - source_node="step_4", + source_node="choice_section", target_node="results", - output_slot="step_4_main_output", + output_slot="choice_section_main_output", input_slot="result", ), ] diff --git a/src/easylink/step.py b/src/easylink/step.py index 21f86795..f0cc7a1b 100644 --- a/src/easylink/step.py +++ b/src/easylink/step.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Iterable +from typing import Any from layered_config_tree import LayeredConfigTree @@ -243,6 +244,7 @@ def __init__( output_slot_mappings: Iterable[OutputSlotMapping] = (), ) -> None: self.step_name = step_name + # TODO: try removing name as an arg and just set self.name = self.step_name self.name = name if name else step_name self.input_slots = {slot.name: slot for slot in input_slots} self.output_slots = {slot.name: slot for slot in output_slots} @@ -483,7 +485,7 @@ def get_implementation_graph(self) -> ImplementationGraph: class InputStep(IOStep): def __init__( self, - output_slots: Iterable[InputSlot] = (OutputSlot("all"),), + output_slots: Iterable[OutputSlot] = (OutputSlot("all"),), ) -> None: super().__init__(step_name="input_data", output_slots=output_slots) @@ -610,7 +612,10 @@ def get_state_config(self, step_config: LayeredConfigTree) -> None: return step_config def set_configuration_state( - self, parent_config, combined_implementations, input_data_config + self, + parent_config: LayeredConfigTree, + combined_implementations: LayeredConfigTree, + input_data_config: LayeredConfigTree, ): num_repeats = len(self.get_state_config(parent_config[self.name])) self.step_graph = self._update_step_graph(num_repeats) @@ -705,7 +710,7 @@ def config_key(self): def node_prefix(self): return "parallel_split" - def _update_step_graph(self, num_repeats) -> StepGraph: + def _update_step_graph(self, num_repeats: int) -> StepGraph: """Makes N copies of the template step that are independent and contain the same edges as the current step""" graph = StepGraph() @@ -717,7 +722,7 @@ def _update_step_graph(self, num_repeats) -> StepGraph: graph.add_node_from_step(updated_step) return graph - def _update_slot_mappings(self, num_repeats) -> dict[str, list[SlotMapping]]: + def _update_slot_mappings(self, num_repeats: int) -> dict[str, list[SlotMapping]]: """Gets the appropriate slot mappings based on the number of parallel copies and the existing input and output slots.""" input_mappings = [ InputSlotMapping(slot, f"{self.name}_{self.node_prefix}_{n+1}", slot) @@ -730,3 +735,114 @@ def _update_slot_mappings(self, num_repeats) -> dict[str, list[SlotMapping]]: for slot in self.output_slots ] return {"input": input_mappings, "output": output_mappings} + + +class ChoiceStep(Step): + """A ChoiceStep allows a user to select a single path from a set of possible paths.""" + + def __init__( + self, + step_name: str, + input_slots: Iterable[InputSlot], + output_slots: Iterable[OutputSlot], + choices: dict[ + str, dict[str, list[Step | EdgeParams | InputSlotMapping | OutputSlotMapping]] + ], + ) -> None: + super().__init__( + step_name, + input_slots=input_slots, + output_slots=output_slots, + ) + self.choices = choices + + def validate_step( + self, + step_config: LayeredConfigTree, + combined_implementations: LayeredConfigTree, + input_data_config: LayeredConfigTree, + ) -> dict[str, list[str]]: + """Validates the ChoiceStep. + + Notes + ----- + We update the step graph and slot mappings here in validation as opposed to + in set_configuration_state (as is done in TemplatedSteps) because ChoiceStep + validation happens prior to set_configuration_state and actually requires + the step graph and mappings. + + We do not attempt to validate the subgraph here if the 'type' key is unable + to be validated. + """ + + chosen_type = step_config.get("type") + # Handle problems with the 'type' key + if not chosen_type: + return {f"step {self.name}": ["The step requires a 'type' key."]} + if chosen_type not in self.choices: + return { + f"step {self.name}": [ + f"'{step_config['type']}' is not a supported 'type'. Valid choices are: {list(self.choices)}." + ] + } + # Handle type-subgraph inconsistencies + subgraph = self.choices[chosen_type] + chosen_step_config = LayeredConfigTree( + {key: value for key, value in step_config.items() if key != "type"} + ) + allowable_steps = [node.name for node in subgraph["nodes"]] + if set(allowable_steps) != set(chosen_step_config): + return { + f"step {self.name}": [ + f"Invalid configuration for '{chosen_type}' type. Valid steps are {allowable_steps}." + ] + } + + # Handle the actual chosen step_config + self.step_graph = self._update_step_graph(subgraph) + self.slot_mappings = self._update_slot_mappings(subgraph) + # NOTE: A ChoiceStep is by definition non-leaf step + return self.validate_nonleaf( + chosen_step_config, combined_implementations, input_data_config + ) + + def set_configuration_state( + self, + parent_config: LayeredConfigTree, + combined_implementations: LayeredConfigTree, + input_data_config: LayeredConfigTree, + ): + """Sets the configuration state for a ChoiceStep. + + Notes + ----- + We update the step graph and slot mappings validate_step as opposed to here + (as is done in TemplatedSteps) because ChoiceStep validation happens prior + this but actually requires the step graph and mappings. + """ + + chosen_parent_config = LayeredConfigTree( + {key: value for key, value in parent_config[self.name].items() if key != "type"} + ) + # ChoiceSteps by definition cannot be in a LeafConfigurationState. + self._configuration_state = NonLeafConfigurationState( + self, chosen_parent_config, combined_implementations, input_data_config + ) + + @staticmethod + def _update_step_graph(subgraph: dict[str, Any]): + nodes = subgraph["nodes"] + edges = subgraph["edges"] + + graph = StepGraph() + for node in nodes: + graph.add_node_from_step(node) + for edge in edges: + graph.add_edge_from_params(edge) + return graph + + @staticmethod + def _update_slot_mappings(subgraph: dict[str, Any]) -> dict[str, list[SlotMapping]]: + input_mappings = subgraph["input_slot_mappings"] + output_mappings = subgraph["output_slot_mappings"] + return {"input": input_mappings, "output": output_mappings} diff --git a/tests/e2e/test_easylink_run.py b/tests/e2e/test_easylink_run.py index 6f987dca..43ab5e15 100644 --- a/tests/e2e/test_easylink_run.py +++ b/tests/e2e/test_easylink_run.py @@ -38,9 +38,11 @@ ], ) def test_easylink_run(pipeline_specification, input_data, computing_environment, capsys): - """e2e tests for 'easylink run' command + """Tests the 'easylink run' command - NOTE: We use various print statements in this test because they show up in the + Notes + ----- + We use various print statements in this test because they show up in the Jenkins logs. """ # Create a temporary directory to store results. We cannot use pytest's tmp_path fixture diff --git a/tests/e2e/test_step_types.py b/tests/e2e/test_step_types.py index 5a4142d9..9172e177 100644 --- a/tests/e2e/test_step_types.py +++ b/tests/e2e/test_step_types.py @@ -39,10 +39,11 @@ ], ) def test_step_types(pipeline_specification, implementations, capsys): - """e2e Tests for various permutations of complex step types. The goal is to test - that EasyLink generates the correct output implementations depending on the configuration. - That is, if we have a substeps key in the config, we get an implementation for each, but if we don't, - we get just one. + """Tests against various permutations of complex step types. + + The goal is to test that EasyLink generates the correct output implementations + depending on the configuration; i.e. if we have a 'substeps' key in the config, + we get an implementation for each (or else we get a single implementation). """ # Create a temporary directory to store results. We cannot use pytest's tmp_path fixture # because other nodes do not have access to it. Also, do not use a context manager diff --git a/tests/integration/test_snakemake.py b/tests/integration/test_snakemake.py index bfec61fb..8c492c29 100644 --- a/tests/integration/test_snakemake.py +++ b/tests/integration/test_snakemake.py @@ -13,6 +13,7 @@ def test_missing_results(mocker, caplog): """Test that the pipeline fails when a step is missing output files.""" nodes, edges = TESTING_SCHEMA_PARAMS["integration"] + mocker.patch("easylink.pipeline_schema.ALLOWED_SCHEMA_PARAMS", TESTING_SCHEMA_PARAMS) mocker.patch( "easylink.configuration.Config._get_schema", return_value=PipelineSchema("integration", nodes=nodes, edges=edges), diff --git a/tests/integration/test_snakemake_slurm.py b/tests/integration/test_snakemake_slurm.py index 9804572a..9351c9da 100644 --- a/tests/integration/test_snakemake_slurm.py +++ b/tests/integration/test_snakemake_slurm.py @@ -21,6 +21,7 @@ def test_slurm(mocker: MockerFixture, caplog: pytest.LogCaptureFixture) -> None: """Test that the pipeline runs on SLURM with appropriate resources.""" nodes, edges = TESTING_SCHEMA_PARAMS["integration"] + mocker.patch("easylink.pipeline_schema.ALLOWED_SCHEMA_PARAMS", TESTING_SCHEMA_PARAMS) mocker.patch( "easylink.configuration.Config._get_schema", return_value=PipelineSchema("integration", nodes=nodes, edges=edges), diff --git a/tests/integration/test_snakemake_spark.py b/tests/integration/test_snakemake_spark.py index 59b658f5..ad0fa572 100644 --- a/tests/integration/test_snakemake_spark.py +++ b/tests/integration/test_snakemake_spark.py @@ -21,6 +21,7 @@ def test_spark_slurm(mocker, caplog): """Test that the pipeline runs spark on SLURM with appropriate resources.""" nodes, edges = TESTING_SCHEMA_PARAMS["integration"] + mocker.patch("easylink.pipeline_schema.ALLOWED_SCHEMA_PARAMS", TESTING_SCHEMA_PARAMS) mocker.patch( "easylink.configuration.Config._get_schema", return_value=PipelineSchema("integration", nodes=nodes, edges=edges), diff --git a/tests/specifications/common/pipeline.yaml b/tests/specifications/common/pipeline.yaml index 12e4e68b..f1d29c18 100644 --- a/tests/specifications/common/pipeline.yaml +++ b/tests/specifications/common/pipeline.yaml @@ -8,6 +8,8 @@ steps: step_3: implementation: name: step_3_python_pandas - step_4: - implementation: - name: step_4_python_pandas + choice_section: + type: simple + step_4: + implementation: + name: step_4_python_pandas diff --git a/tests/specifications/e2e/pipeline.yaml b/tests/specifications/e2e/pipeline.yaml index 9f7181ff..7432a661 100644 --- a/tests/specifications/e2e/pipeline.yaml +++ b/tests/specifications/e2e/pipeline.yaml @@ -12,8 +12,10 @@ steps: name: step_3_python_pandas configuration: DUMMY_CONTAINER_INCREMENT: 702 - step_4: - implementation: - name: step_4_r - configuration: - DUMMY_CONTAINER_INCREMENT: 912 + choice_section: + type: simple + step_4: + implementation: + name: step_4_r + configuration: + DUMMY_CONTAINER_INCREMENT: 912 diff --git a/tests/specifications/e2e/pipeline_expanded.yaml b/tests/specifications/e2e/pipeline_expanded.yaml index d6057057..b3dae989 100644 --- a/tests/specifications/e2e/pipeline_expanded.yaml +++ b/tests/specifications/e2e/pipeline_expanded.yaml @@ -19,11 +19,13 @@ steps: name: step_3_python_pandas - implementation: name: step_3_python_pandas - step_4: - substeps: - step_4a: - implementation: - name: step_4a_python_pandas - step_4b: - implementation: - name: step_4b_python_pandas \ No newline at end of file + choice_section: + type: simple + step_4: + substeps: + step_4a: + implementation: + name: step_4a_python_pandas + step_4b: + implementation: + name: step_4b_python_pandas diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 797cd0ed..d8b78ae4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -51,9 +51,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -85,9 +88,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -119,9 +125,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -139,9 +148,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -161,9 +173,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -183,9 +198,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -203,9 +221,12 @@ "step_3": { "iterate": [], }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -223,18 +244,19 @@ "step_3": { "iterate": {"implementation": "step_3_python_pandas"}, }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, - "missing_substep_keys": { - "step_4": { - "substeps": { - "step_4a": { - "implementation": {}, - }, + "missing_substeps": { + "step_1": { + "implementation": { + "name": "step_1_python_pandas", }, }, "step_2": { @@ -247,9 +269,15 @@ "name": "step_3_python_pandas", }, }, - "step_1": { - "implementation": { - "name": "step_1_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "substeps": { + "step_4a": { + "implementation": {}, # missing name key + }, + # missing step_4b + }, }, }, }, @@ -274,9 +302,12 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "implementation": { - "name": "step_1_python_pandas", + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, }, }, }, @@ -294,9 +325,92 @@ "step_3": { "combined_implementation_key": "foo", }, - "step_4": { + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, + }, + }, + }, + "missing_type_key": { + "step_1": { + "implementation": { + "name": "step_1_python_pandas", + }, + }, + "step_2": { + "implementation": { + "name": "step_2_python_pandas", + }, + }, + "step_3": { + "implementation": { + "name": "step_3_python_pandas", + }, + }, + "choice_section": { + # missing 'type' key + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, + }, + }, + }, + "bad_type_key": { + "step_1": { + "implementation": { + "name": "step_1_python_pandas", + }, + }, + "step_2": { + "implementation": { + "name": "step_2_python_pandas", + }, + }, + "step_3": { + "implementation": { + "name": "step_3_python_pandas", + }, + }, + "choice_section": { + "type": "foo", # Not a supported 'type' + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, + }, + }, + }, + "type_config_mismatch": { + "step_1": { + "implementation": { + "name": "step_1_python_pandas", + }, + }, + "step_2": { + "implementation": { + "name": "step_2_python_pandas", + }, + }, + "step_3": { "implementation": { - "name": "step_4_python_pandas", + "name": "step_3_python_pandas", + }, + }, + "choice_section": { + "type": "simple", + "step_5": { + "implementation": { + "name": "step_5_python_pandas", + }, + }, + "step_6": { + "implementation": { + "name": "step_6_python_pandas", + }, }, }, }, @@ -323,8 +437,11 @@ "step_3": { "combined_implementation_key": "step_3_4", }, - "step_4": { - "combined_implementation_key": "step_3_4", + "choice_section": { + "type": "simple", + "step_4": { + "combined_implementation_key": "step_3_4", + }, }, }, "combined_implementations": { @@ -357,8 +474,11 @@ }, ] }, - "step_4": { - "combined_implementation_key": "step_3_4", + "choice_section": { + "type": "simple", + "step_4": { + "combined_implementation_key": "step_3_4", + }, }, }, "combined_implementations": { @@ -391,8 +511,11 @@ }, ] }, - "step_4": { - "combined_implementation_key": "step_3_4", + "choice_section": { + "type": "simple", + "step_4": { + "combined_implementation_key": "step_3_4", + }, }, }, "combined_implementations": { @@ -427,10 +550,13 @@ "name": "step_3_python_pandas", } }, - "step_4": { - "implementation": { - "name": "step_4_python_pandas", - } + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, + }, }, }, "combined_implementations": { @@ -452,8 +578,11 @@ "step_3": { "combined_implementation_key": "step_3_4", }, - "step_4": { - "combined_implementation_key": "step_3_4", + "choice_section": { + "type": "simple", + "step_4": { + "combined_implementation_key": "step_3_4", + }, }, }, "combined_implementations": { @@ -479,8 +608,11 @@ "name": "step_3_python_pandas", }, }, - "step_4": { - "combined_implementation_key": "step_3_4", + "choice_section": { + "type": "simple", + "step_4": { + "combined_implementation_key": "step_3_4", + }, }, }, "combined_implementations": { diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 69519a72..d90eb2aa 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -28,7 +28,10 @@ def test_load_params_from_specification( "step_1": {"implementation": {"name": "step_1_python_pandas"}}, "step_2": {"implementation": {"name": "step_2_python_pandas"}}, "step_3": {"implementation": {"name": "step_3_python_pandas"}}, - "step_4": {"implementation": {"name": "step_4_python_pandas"}}, + "choice_section": { + "type": "simple", + "step_4": {"implementation": {"name": "step_4_python_pandas"}}, + }, } }, "input_data": { diff --git a/tests/unit/test_pipeline_schema.py b/tests/unit/test_pipeline_schema.py index ad8b0fd8..a6d83e6b 100644 --- a/tests/unit/test_pipeline_schema.py +++ b/tests/unit/test_pipeline_schema.py @@ -20,7 +20,7 @@ def test_schema_instantiation() -> None: "step_1", "step_2", "step_3", - "step_4", + "choice_section", "results", ] step_types = [node["step"] for node in sorted_graph] diff --git a/tests/unit/test_step.py b/tests/unit/test_step.py index a0f82ceb..a11c5eb9 100644 --- a/tests/unit/test_step.py +++ b/tests/unit/test_step.py @@ -1,3 +1,14 @@ +"""Unit tests for the various Step classes. + +Notes +----- +These unit tests often instantiate a Step object using parameters that would not +actually pass validation in a real-world scenario (i.e. they do not conform to the pipeline +schema). This is intentional because it's easier to flex complexity and edge cases here +rather than try to get full coverage in the e2e tests. It also allows us to test for +future pipeline schema expansion/flexibility in a relative simple manner now. +""" + from typing import Any import pytest @@ -12,16 +23,38 @@ OutputSlotMapping, ) from easylink.pipeline_schema_constants.development import NODES -from easylink.step import HierarchicalStep, IOStep, LoopStep, ParallelStep, Step +from easylink.step import ( + ChoiceStep, + HierarchicalStep, + IOStep, + LoopStep, + ParallelStep, + Step, +) from easylink.utilities.validation_utils import validate_input_file_dummy STEP_KEYS = {step.name: step for step in NODES} +@pytest.fixture +def basic_step_params() -> dict[str, Any]: + return { + "step_name": "step_1", + "input_slots": [ + InputSlot( + "step_1_main_input", + "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validate_input_file_dummy, + ) + ], + "output_slots": [OutputSlot("step_1_main_output")], + } + + def test_implementation_node_name( - implemented_step_params: dict[str, Any], default_config: Config + basic_step_params: dict[str, Any], default_config: Config ) -> None: - step = Step(**implemented_step_params) + step = Step(**basic_step_params) step.set_configuration_state(default_config["pipeline"]["steps"], {}, {}) node_name = step.implementation_node_name assert node_name == "step_1_python_pandas" @@ -40,7 +73,7 @@ def io_step_params() -> dict[str, Any]: } -def test_io_step(io_step_params: dict[str, Any]) -> None: +def test_io_step_slots(io_step_params: dict[str, Any]) -> None: step = IOStep(**io_step_params) assert step.name == step.step_name == "io" assert step.input_slots == { @@ -59,23 +92,8 @@ def test_io_get_implementation_graph( assert list(subgraph.edges) == [] -@pytest.fixture -def implemented_step_params() -> dict[str, Any]: - return { - "step_name": "step_1", - "input_slots": [ - InputSlot( - "step_1_main_input", - "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", - validate_input_file_dummy, - ) - ], - "output_slots": [OutputSlot("step_1_main_output")], - } - - -def test_implemented_step(implemented_step_params: dict[str, Any]) -> None: - step = Step(**implemented_step_params) +def test_basic_step_slots(basic_step_params: dict[str, Any]) -> None: + step = Step(**basic_step_params) assert step.name == step.step_name == "step_1" assert step.input_slots == { "step_1_main_input": InputSlot( @@ -87,10 +105,10 @@ def test_implemented_step(implemented_step_params: dict[str, Any]) -> None: assert step.output_slots == {"step_1_main_output": OutputSlot("step_1_main_output")} -def test_implemented_step_get_implementation_graph( - implemented_step_params: dict[str, Any], default_config: Config +def test_basic_step_get_implementation_graph( + basic_step_params: dict[str, Any], default_config: Config ) -> None: - step = Step(**implemented_step_params) + step = Step(**basic_step_params) step.set_configuration_state(default_config["pipeline"]["steps"], {}, {}) subgraph = step.get_implementation_graph() assert list(subgraph.nodes) == ["step_1_python_pandas"] @@ -145,7 +163,7 @@ def hierarchical_step_params() -> dict[str, Any]: } -def test_hierarchical_step(hierarchical_step_params: dict[str, Any]) -> None: +def test_hierarchical_step_slots(hierarchical_step_params: dict[str, Any]) -> None: step = HierarchicalStep(**hierarchical_step_params) assert step.name == "step_4" assert step.input_slots == { @@ -313,7 +331,7 @@ def loop_step_params() -> dict[str, Any]: } -def test_loop_step(loop_step_params: dict[str, Any]) -> None: +def test_loop_step_slots(loop_step_params: dict[str, Any]) -> None: step = LoopStep(**loop_step_params) assert step.name == step.step_name == "step_3" assert isinstance(step, LoopStep) @@ -487,7 +505,7 @@ def parallel_step_params() -> dict[str, Any]: } -def test_parallel_step(parallel_step_params: dict[str, Any]) -> None: +def test_parallel_step_slots(parallel_step_params: dict[str, Any]) -> None: step = ParallelStep(**parallel_step_params) assert step.name == "step_1" assert step.input_slots == { @@ -613,3 +631,357 @@ def test_parallel_step_get_implementation_graph( assert len(subgraph.edges) == len(expected_edges) for edge in expected_edges: assert edge in subgraph.edges(data=True) + + +@pytest.fixture +def choice_step_params() -> dict[str, Any]: + return { + "step_name": "choice_section", + "input_slots": [ + InputSlot( + name="choice_section_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="choice_section_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + "output_slots": [OutputSlot("choice_section_main_output")], + "choices": { + "simple": { + "nodes": [ + HierarchicalStep( + step_name="step_4", + input_slots=[ + InputSlot( + name="step_4_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="step_4_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_4_main_output")], + nodes=[ + Step( + step_name="step_4a", + input_slots=[ + InputSlot( + name="step_4a_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="step_4a_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_4a_main_output")], + ), + Step( + step_name="step_4b", + input_slots=[ + InputSlot( + name="step_4b_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + InputSlot( + name="step_4b_secondary_input", + env_var="DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_4b_main_output")], + ), + ], + edges=[ + EdgeParams( + source_node="step_4a", + target_node="step_4b", + output_slot="step_4a_main_output", + input_slot="step_4b_main_input", + ), + ], + input_slot_mappings=[ + InputSlotMapping( + parent_slot="step_4_main_input", + child_node="step_4a", + child_slot="step_4a_main_input", + ), + InputSlotMapping( + parent_slot="step_4_secondary_input", + child_node="step_4a", + child_slot="step_4a_secondary_input", + ), + InputSlotMapping( + parent_slot="step_4_secondary_input", + child_node="step_4b", + child_slot="step_4b_secondary_input", + ), + ], + output_slot_mappings=[ + OutputSlotMapping( + parent_slot="step_4_main_output", + child_node="step_4b", + child_slot="step_4b_main_output", + ), + ], + ), + ], + "edges": [], + "input_slot_mappings": [ + InputSlotMapping( + parent_slot="choice_section_main_input", + child_node="step_4", + child_slot="step_4_main_input", + ), + InputSlotMapping( + parent_slot="choice_section_secondary_input", + child_node="step_4", + child_slot="step_4_secondary_input", + ), + ], + "output_slot_mappings": [ + OutputSlotMapping( + parent_slot="choice_section_main_output", + child_node="step_4", + child_slot="step_4_main_output", + ), + ], + }, + "complex": { + "nodes": [ + Step( + step_name="step_5", + input_slots=[ + InputSlot( + name="step_5_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_5_main_output")], + ), + # Add a more complicated (unsupported) loop step to ensure flexibility + LoopStep( + template_step=Step( + step_name="step_6", + input_slots=[ + InputSlot( + name="step_6_main_input", + env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validator=validate_input_file_dummy, + ), + ], + output_slots=[OutputSlot("step_6_main_output")], + ), + self_edges=[ + EdgeParams( + source_node="step_6", + target_node="step_6", + output_slot="step_6_main_output", + input_slot="step_6_main_input", + ) + ], + ), + ], + "edges": [ + EdgeParams( + source_node="step_5", + target_node="step_6", + output_slot="step_5_main_output", + input_slot="step_6_main_input", + ) + ], + "input_slot_mappings": [ + InputSlotMapping( + parent_slot="choice_section_main_input", + child_node="step_5", + child_slot="step_5_main_input", + ), + ], + "output_slot_mappings": [ + OutputSlotMapping( + parent_slot="choice_section_main_output", + child_node="step_6", + child_slot="step_6_main_output", + ), + ], + }, + }, + } + + +def test_choice_step_slots(choice_step_params: dict[str, Any]) -> None: + step = ChoiceStep(**choice_step_params) + assert step.name == "choice_section" + assert step.input_slots == { + "choice_section_main_input": InputSlot( + "choice_section_main_input", + "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validate_input_file_dummy, + ), + "choice_section_secondary_input": InputSlot( + "choice_section_secondary_input", + "DUMMY_CONTAINER_SECONDARY_INPUT_FILE_PATHS", + validate_input_file_dummy, + ), + } + assert step.output_slots == { + "choice_section_main_output": OutputSlot("choice_section_main_output") + } + + +def test_simple_choice_step_get_implementation_graph( + choice_step_params: dict[str, Any] +) -> None: + step = ChoiceStep(**choice_step_params) + + # Test get_implementation_graph for single step (no substeps) + pipeline_dict = { + "choice_section": { + "type": "simple", + "step_4": { + "implementation": { + "name": "step_4_python_pandas", + }, + }, + }, + } + pipeline_params = LayeredConfigTree(pipeline_dict) + # Need to validate in order to set the step graph an mappings prior to calling `set_configuration_state` + step.validate_step(pipeline_dict["choice_section"], {}, {}) + step.set_configuration_state(pipeline_params, {}, {}) + subgraph = step.get_implementation_graph() + assert list(subgraph.nodes) == ["step_4_python_pandas"] + assert list(subgraph.edges) == [] + + # Test get_implementation_graph for a step with substeps + pipeline_dict = { + "choice_section": { + "type": "simple", + "step_4": { + "substeps": { + "step_4a": { + "implementation": { + "name": "step_4a_python_pandas", + }, + }, + "step_4b": { + "implementation": { + "name": "step_4b_r", + }, + }, + }, + }, + }, + } + pipeline_params = LayeredConfigTree(pipeline_dict) + step.set_configuration_state(pipeline_params, {}, {}) + subgraph = step.get_implementation_graph() + assert list(subgraph.nodes) == [ + "step_4a_python_pandas", + "step_4b_r", + ] + expected_edges = [ + ( + "step_4a_python_pandas", + "step_4b_r", + { + "input_slot": InputSlot( + "step_4b_main_input", + "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validate_input_file_dummy, + ), + "output_slot": OutputSlot("step_4a_main_output"), + "filepaths": None, + }, + ), + ] + assert len(subgraph.edges) == len(expected_edges) + for edge in expected_edges: + assert edge in subgraph.edges(data=True) + + +def test_complex_choice_step_get_implementation_graph( + choice_step_params: dict[str, Any] +) -> None: + step = ChoiceStep(**choice_step_params) + + pipeline_dict = { + "choice_section": { + "type": "complex", + "step_5": { + "implementation": { + "name": "step_5_python_pandas", + }, + }, + "step_6": { + "iterate": [ + LayeredConfigTree( + { + "implementation": { + "name": "step_6_python_pandas", + } + } + ), + LayeredConfigTree( + { + "implementation": { + "name": "step_6_python_pandas", + } + } + ), + ], + }, + }, + } + pipeline_params = LayeredConfigTree(pipeline_dict) + # Need to validate in order to set the step graph an mappings prior to calling `set_configuration_state` + step.validate_step(pipeline_dict["choice_section"], {}, {}) + step.set_configuration_state(pipeline_params, {}, {}) + subgraph = step.get_implementation_graph() + assert list(subgraph.nodes) == [ + "step_5_python_pandas", + "step_6_loop_1_step_6_python_pandas", + "step_6_loop_2_step_6_python_pandas", + ] + expected_edges = [ + ( + "step_5_python_pandas", + "step_6_loop_1_step_6_python_pandas", + { + "input_slot": InputSlot( + "step_6_main_input", + "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validate_input_file_dummy, + ), + "output_slot": OutputSlot("step_5_main_output"), + "filepaths": None, + }, + ), + ( + "step_6_loop_1_step_6_python_pandas", + "step_6_loop_2_step_6_python_pandas", + { + "input_slot": InputSlot( + "step_6_main_input", + "DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS", + validate_input_file_dummy, + ), + "output_slot": OutputSlot("step_6_main_output"), + "filepaths": None, + }, + ), + ] + assert len(subgraph.edges) == len(expected_edges) + for edge in expected_edges: + assert edge in subgraph.edges(data=True) diff --git a/tests/unit/test_validations.py b/tests/unit/test_validations.py index 47add7d3..cd0cf930 100644 --- a/tests/unit/test_validations.py +++ b/tests/unit/test_validations.py @@ -70,7 +70,6 @@ def test_batch_validation(): @pytest.mark.parametrize( "pipeline, expected_msg", [ - # missing 'implementation' key ( "missing_implementations", { @@ -83,7 +82,6 @@ def test_batch_validation(): }, }, ), - # missing implementation 'name' key ( "missing_implementation_name", { @@ -96,15 +94,14 @@ def test_batch_validation(): }, }, ), - # missing a step ( "missing_step", { PIPELINE_ERRORS_KEY: { "development": { + "step choice_section": ["The step is not configured."], "step step_1": ["The step is not configured."], "step step_3": ["The step is not configured."], - "step step_4": ["The step is not configured."], }, }, }, @@ -132,7 +129,7 @@ def test_batch_validation(): }, ), ( - "missing_substep_keys", + "missing_substeps", { PIPELINE_ERRORS_KEY: { "development": { @@ -172,6 +169,40 @@ def test_batch_validation(): } }, ), + ( + "missing_type_key", + { + PIPELINE_ERRORS_KEY: { + "development": { + "step choice_section": ["The step requires a 'type' key."] + } + } + }, + ), + ( + "bad_type_key", + { + PIPELINE_ERRORS_KEY: { + "development": { + "step choice_section": [ + "'foo' is not a supported 'type'. Valid choices are: \['simple', 'complex'\]." + ] + } + } + }, + ), + ( + "type_config_mismatch", + { + PIPELINE_ERRORS_KEY: { + "development": { + "step choice_section": [ + "Invalid configuration for 'simple' type. Valid steps are \['step_4'\]." + ] + } + } + }, + ), ], ) def test_pipeline_validation(pipeline, default_config_params, expected_msg, caplog):