diff --git a/src/easylink/pipeline_graph.py b/src/easylink/pipeline_graph.py index 8f73bd29..d66e694c 100644 --- a/src/easylink/pipeline_graph.py +++ b/src/easylink/pipeline_graph.py @@ -62,7 +62,7 @@ def merge_combined_implementations(self, config): metadata_steps = implementation_metadata[combined_implementation_config["name"]][ "steps" ] - self.validate_implementation_topology(nodes_to_merge, metadata_steps) + self.validate_combined_implementation_topology(nodes_to_merge, metadata_steps) ( combined_input_slots, @@ -309,18 +309,18 @@ def spark_is_required(self) -> bool: """Check if the pipeline requires spark resources.""" return any([implementation.requires_spark for implementation in self.implementations]) - def validate_implementation_topology( + def validate_combined_implementation_topology( self, nodes: list[str], metadata_steps: list[str] ) -> None: - """Validates implementation topology against intended implementation. + """Validates the combined implementation topology against intended implementation. Check that the subgraph induced by the nodes implemented by this implementation is topologically consistent with the list of steps intended to be implemented. """ + # HACK: We cannot just call self.subgraph(nodes) because networkx will + # try and instantiate another PipelineGraph which requires a Config object + # to be passed to the constructor. subgraph = ImplementationGraph(self).subgraph(nodes) - # NOTE: The subgraph is not necessarily able to be topologically sorted - # in a reproducible way. We instead rely on node name sorting for - # error messages. # Relabel nodes by schema step mapping = { @@ -328,8 +328,12 @@ def validate_implementation_topology( for node, data in subgraph.nodes(data=True) } if not set(mapping.values()) == set(metadata_steps): + # NOTE: It's possible that we've combined nodes in such a way that removed + # an edge from the graph and so nx is unable to reliably sort the subgraph. + full_pipeline_sorted_nodes = list(nx.topological_sort(self)) + sorted_nodes = [node for node in full_pipeline_sorted_nodes if node in mapping] raise ValueError( - f"Pipeline configuration nodes {sorted(mapping.values())} do not match metadata steps {metadata_steps}." + f"Pipeline configuration nodes {[mapping[node] for node in sorted_nodes]} do not match metadata steps {metadata_steps}." ) subgraph = nx.relabel_nodes(subgraph, mapping) # Check for topological inconsistency, i.e. if there is a path from a later node to an earlier node. @@ -339,7 +343,7 @@ def validate_implementation_topology( subgraph, metadata_steps[successor], metadata_steps[predecessor] ): raise ValueError( - f"Pipeline configuration nodes {sorted(subgraph.nodes())} are not topologically consistent with " + f"Pipeline configuration nodes {list(nx.topological_sort(subgraph))} are not topologically consistent with " f"the intended implementations for {list(metadata_steps)}:\n" f"There is a path from successor {metadata_steps[successor]} to predecessor {metadata_steps[predecessor]}." ) diff --git a/tests/unit/test_pipeline_graph.py b/tests/unit/test_pipeline_graph.py index 91536d89..3889343c 100644 --- a/tests/unit/test_pipeline_graph.py +++ b/tests/unit/test_pipeline_graph.py @@ -599,7 +599,7 @@ def test_merge_combined_implementations_parallel(default_config_params, test_dir ), ( "combined_bad_topology", - "Pipeline configuration nodes ['step_1a', 'step_1b'] are not topologically consistent with the intended implementations for ['step_1a', 'step_1b']:\nThere is a path from successor step_1b to predecessor step_1a.", + "Pipeline configuration nodes ['step_1b', 'step_1a'] are not topologically consistent with the intended implementations for ['step_1a', 'step_1b']:\nThere is a path from successor step_1b to predecessor step_1a.", True, ), ( @@ -609,7 +609,7 @@ def test_merge_combined_implementations_parallel(default_config_params, test_dir ), ], ) -def test_bad_configuration_raises( +def test_bad_combined_configuration_raises( problem_key, error_msg, use_custom_schema, default_config_params ) -> None: config_params = default_config_params