Skip to content

Commit

Permalink
better sort nodes in bad combination error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier committed Dec 11, 2024
1 parent 165721f commit 8333c43
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
20 changes: 12 additions & 8 deletions src/easylink/pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def merge_combined_implementations(self, config):
metadata_steps = implementation_metadata[combined_implementation_config["name"]][
"steps"
]
self.validate_implementation_topology(nodes_to_merge, metadata_steps)
self.validate_combined_implementation_topology(nodes_to_merge, metadata_steps)

(
combined_input_slots,
Expand Down Expand Up @@ -309,27 +309,31 @@ def spark_is_required(self) -> bool:
"""Check if the pipeline requires spark resources."""
return any([implementation.requires_spark for implementation in self.implementations])

def validate_implementation_topology(
def validate_combined_implementation_topology(
self, nodes: list[str], metadata_steps: list[str]
) -> None:
"""Validates implementation topology against intended implementation.
"""Validates the combined implementation topology against intended implementation.
Check that the subgraph induced by the nodes implemented by this implementation
is topologically consistent with the list of steps intended to be implemented.
"""
# HACK: We cannot just call self.subgraph(nodes) because networkx will
# try and instantiate another PipelineGraph which requires a Config object
# to be passed to the constructor.
subgraph = ImplementationGraph(self).subgraph(nodes)
# NOTE: The subgraph is not necessarily able to be topologically sorted
# in a reproducible way. We instead rely on node name sorting for
# error messages.

# Relabel nodes by schema step
mapping = {
node: data["implementation"].schema_step
for node, data in subgraph.nodes(data=True)
}
if not set(mapping.values()) == set(metadata_steps):
# NOTE: It's possible that we've combined nodes in such a way that removed
# an edge from the graph and so nx is unable to reliably sort the subgraph.
full_pipeline_sorted_nodes = list(nx.topological_sort(self))
sorted_nodes = [node for node in full_pipeline_sorted_nodes if node in mapping]
raise ValueError(
f"Pipeline configuration nodes {sorted(mapping.values())} do not match metadata steps {metadata_steps}."
f"Pipeline configuration nodes {[mapping[node] for node in sorted_nodes]} do not match metadata steps {metadata_steps}."
)
subgraph = nx.relabel_nodes(subgraph, mapping)
# Check for topological inconsistency, i.e. if there is a path from a later node to an earlier node.
Expand All @@ -339,7 +343,7 @@ def validate_implementation_topology(
subgraph, metadata_steps[successor], metadata_steps[predecessor]
):
raise ValueError(
f"Pipeline configuration nodes {sorted(subgraph.nodes())} are not topologically consistent with "
f"Pipeline configuration nodes {list(nx.topological_sort(subgraph))} are not topologically consistent with "
f"the intended implementations for {list(metadata_steps)}:\n"
f"There is a path from successor {metadata_steps[successor]} to predecessor {metadata_steps[predecessor]}."
)
4 changes: 2 additions & 2 deletions tests/unit/test_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ def test_merge_combined_implementations_parallel(default_config_params, test_dir
),
(
"combined_bad_topology",
"Pipeline configuration nodes ['step_1a', 'step_1b'] are not topologically consistent with the intended implementations for ['step_1a', 'step_1b']:\nThere is a path from successor step_1b to predecessor step_1a.",
"Pipeline configuration nodes ['step_1b', 'step_1a'] are not topologically consistent with the intended implementations for ['step_1a', 'step_1b']:\nThere is a path from successor step_1b to predecessor step_1a.",
True,
),
(
Expand All @@ -609,7 +609,7 @@ def test_merge_combined_implementations_parallel(default_config_params, test_dir
),
],
)
def test_bad_configuration_raises(
def test_bad_combined_configuration_raises(
problem_key, error_msg, use_custom_schema, default_config_params
) -> None:
config_params = default_config_params
Expand Down

0 comments on commit 8333c43

Please sign in to comment.