Skip to content

Commit

Permalink
fixes for bad merge conflict resolutions
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier committed Jan 8, 2025
1 parent 8cc6d82 commit 6332f60
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 65 deletions.
24 changes: 12 additions & 12 deletions src/easylink/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def spark_resources(self) -> dict[str, Any]:
#################

def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema:
"""Returns the first ``PipelineSchema`` that successfully validates the requested pipeline.
"""Returns the first :class:`~easylink.pipeline_schema.PipelineSchema` that validates the requested pipeline.
Parameters
----------
Expand All @@ -183,17 +183,17 @@ def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema
Returns
-------
The first pipeline schema that successfully validates the requested pipeline.
If no validated pipeline schema is found, `exit()` is called with `errno.EINVAL`
and any validation errors are logged.
The first ``PipelineSchema`` that validates the requested pipeline.
If no validated ``PipelineSchema`` is found, `exit()` is called with
`errno.EINVAL` and any validation errors are logged.
Notes
-----
This acts as the pipeline configuration file's validation method since
we can only find a matching ``PipelineSchema`` if that file is valid.
This method returns the first ``PipelineSchema`` that successfully validates
and does not attempt to validate additional ones.
This method returns the *first* ``PipelineSchema`` that validates and does
not attempt to check additional ones.
"""
errors = defaultdict(dict)
# Try each schema until one is validated
Expand Down Expand Up @@ -283,11 +283,11 @@ def load_params_from_specification(
Parameters
----------
pipeline_specification
The path to the pipeline specification yaml file.
The path to the pipeline specification file.
input_data
The path to the input data yaml file.
The path to the input data file.
computing_environment
The path to the computing environment yaml file.
The path to the computing environment file.
results_dir
The path to the results directory.
Expand All @@ -306,7 +306,7 @@ def load_params_from_specification(
def _load_input_data_paths(
input_data_specification_path: str | Path,
) -> dict[str, list[Path]]:
"""Creates a dictionary of input data paths from the input data yaml file."""
"""Creates a dictionary of input data paths from the input data specification file."""
input_data_paths = load_yaml(input_data_specification_path)
if not isinstance(input_data_paths, dict):
raise TypeError(
Expand All @@ -322,13 +322,13 @@ def _load_input_data_paths(
def _load_computing_environment(
computing_environment_specification_path: str | None,
) -> dict[Any, Any]:
"""Loads the computing environment yaml file and returns the contents as a dict."""
"""Loads the computing environment specification file and returns the contents as a dict."""
if not computing_environment_specification_path:
return {} # handles empty environment.yaml
elif not Path(computing_environment_specification_path).is_file():
raise FileNotFoundError(
"Computing environment is expected to be a path to an existing"
f" yaml file. Input was: '{computing_environment_specification_path}'"
f" specification file. Input was: '{computing_environment_specification_path}'"
)
else:
return load_yaml(computing_environment_specification_path)
76 changes: 43 additions & 33 deletions src/easylink/graph_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

@dataclass(frozen=True)
class InputSlot:
"""An abstraction representing a single input slot to a specific node.
"""A single input slot to a specific node.
``InputSlots`` represent distinct semantic categories of input files, between
which a node must be able to differentiate. In order to pass data between nodes,
Expand All @@ -39,20 +39,24 @@ class InputSlot:
name: str
"""The name of the ``InputSlot``."""
env_var: str | None
"""The environment variable that this ``InputSlot`` will use to pass a list
of data filepaths to an ``Implementation``."""
"""The environment variable that is used to pass a list of data filepaths to
an ``Implementation``."""
validator: Callable[[str], None]
"""A callable that validates the input data being passed into the pipeline via
this ``InputSlot``. If the data is invalid, the callable should raise an exception
with a descriptive error message which will then be reported to the user."""
"""A function that validates the input data being passed into the pipeline via
this ``InputSlot``. If the data is invalid, the function should raise an exception
with a descriptive error message which will then be reported to the user.
**Note that the function *must* be defined in the :mod:`easylink.utilities.validation_utils`
module!**"""


@dataclass(frozen=True)
class OutputSlot:
"""An abstraction representing a single output slot from a specific node.
"""A single output slot from a specific node.
In order to pass data between nodes, an ``OutputSlot`` of one node can be connected
to an :class:`InputSlot` of another node via an :class:`EdgeParams` instance.
``Outputslots`` represent distinct semantic categories of output files, between
which a node must be able to differentiate. In order to pass data between nodes,
an ``OutputSlot`` of one node can be connected to an :class:`InputSlot` of another
node via an :class:`EdgeParams` instance.
Notes
-----
Expand All @@ -70,9 +74,9 @@ class OutputSlot:

@dataclass(frozen=True)
class EdgeParams:
"""A representation of an edge between two nodes in a graph.
"""The details of an edge between two nodes in a graph.
EdgeParams connect the :class:`OutputSlot` of a source node to the :class:`InputSlot`
``EdgeParams`` connect the :class:`OutputSlot` of a source node to the :class:`InputSlot`
of a target node.
Notes
Expand All @@ -81,15 +85,15 @@ class EdgeParams:
"""

source_node: str
"""The name of the source node/``Step``."""
"""The name of the source node."""
target_node: str
"""The name of the target node/``Step``."""
"""The name of the target node."""
output_slot: str
"""The name of the ``OutputSlot`` of the source node/``Step``."""
"""The name of the source node's ``OutputSlot``."""
input_slot: str
"""The name of the ``InputSlot`` of the target node/``Step``."""
"""The name of the target node's ``InputSlot``."""
filepaths: tuple[str] | None = None
"""The filepaths that are passed from the source node/``Step`` to the target node/``Step``."""
"""The filepaths that are passed from the source node to the target node."""

@classmethod
def from_graph_edge(
Expand All @@ -103,12 +107,13 @@ def from_graph_edge(
Parameters
----------
source
The name of the source node/``Step``.
The name of the source node.
sink
The name of the target node/``Step``.
The name of the target node.
edge_attrs
The attributes of the edge connecting the source and target nodes/``Steps``.
'output_slot' and 'input_slot' are required keys and 'filepaths' is optional.
The attributes of the edge connecting the source and target nodes.
'output_slot' and 'input_slot' are required keys while 'filepaths' is
optional.
"""
return cls(
source,
Expand All @@ -120,10 +125,11 @@ def from_graph_edge(


class StepGraph(nx.MultiDiGraph):
"""A directed acyclic graph (DAG) of :class:`Steps<easylink.step.Step>` and the data dependencies between them.
"""A directed acyclic graph (DAG) of :class:`Steps<easylink.step.Step>`.
``StepGraphs`` are DAGs with ``Steps`` for nodes and the file dependencies between
them for edges. Multiple edges between nodes are permitted.
``StepGraphs`` are DAGs with ``Step`` names for nodes and their corresponding
``Step`` instances as attributes on those nodes. The file dependencies between
nodes are the graph edges; multiple edges between nodes are permitted.
Notes
-----
Expand All @@ -135,13 +141,13 @@ class StepGraph(nx.MultiDiGraph):

@property
def step_nodes(self) -> list[str]:
"""The topologically sorted list of node/``Step`` names."""
"""The topologically sorted list of ``Step`` names."""
ordered_nodes = list(nx.topological_sort(self))
return [node for node in ordered_nodes if node != "input_data" and node != "results"]

@property
def steps(self) -> list[Step]:
"""The list of all ``Steps`` in the graph."""
"""The topologically sorted list of all ``Steps`` in the graph."""
return [self.nodes[node]["step"] for node in self.step_nodes]

def add_node_from_step(self, step: Step) -> None:
Expand Down Expand Up @@ -196,13 +202,13 @@ class ImplementationGraph(nx.MultiDiGraph):

@property
def implementation_nodes(self) -> list[str]:
"""The topologically sorted list of node/``Implementation`` names."""
"""The topologically sorted list of ``Implementation`` names."""
ordered_nodes = list(nx.topological_sort(self))
return [node for node in ordered_nodes if node != "input_data" and node != "results"]

@property
def implementations(self) -> list[Implementation]:
"""The list of all ``Implementations`` in the graph."""
"""The topologically sorted list of all ``Implementations`` in the graph."""
return [self.nodes[node]["implementation"] for node in self.implementation_nodes]

def add_node_from_implementation(self, node_name, implementation: Implementation) -> None:
Expand Down Expand Up @@ -240,29 +246,33 @@ def add_edge_from_params(self, edge_params: EdgeParams) -> None:

@dataclass(frozen=True)
class SlotMapping(ABC):
"""A mapping between a slot on a parent :class:`~easylink.step.Step` and a slot on one of its child ``Steps``.
"""A mapping between a slot on a parent node and a slot on one of its child nodes.
``SlotMapping`` is an interface intended to be used by concrete :class:`InputSlotMapping`
and :class:`OutputSlotMapping` classes. It represents a mapping between
parent and child nodes/``Steps`` at different levels of a potentially-nested
parent and child nodes at different levels of a potentially-nested
:class:`~easylink.pipeline_schema.PipelineSchema`.
Notes
-----
Nodes can be either :class:`Steps<easylink.step.Step>` or :class:`Implementations<easylink.implementation.Implementation>`.
"""

parent_slot: str
"""The name of the parent slot."""
child_node: str
"""The name of the child node/``Step``."""
"""The name of the child node."""
child_slot: str
"""The name of the child slot."""

@abstractmethod
def remap_edge(self, edge: EdgeParams) -> EdgeParams:
"""Remaps an edge to connect the parent and child nodes/``Steps``."""
"""Remaps an edge to connect the parent and child nodes."""
pass


class InputSlotMapping(SlotMapping):
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node/:class:`~easylink.step.Step` and a child node/``Step``."""
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node and a child node."""

def remap_edge(self, edge: EdgeParams) -> EdgeParams:
"""Remaps an edge's ``InputSlot``.
Expand Down Expand Up @@ -293,7 +303,7 @@ def remap_edge(self, edge: EdgeParams) -> EdgeParams:


class OutputSlotMapping(SlotMapping):
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node/:class:`~easylink.step.Step` and a child node/``Step``."""
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node and a child node."""

def remap_edge(self, edge: EdgeParams) -> EdgeParams:
"""Remaps an edge's :class:`OutputSlot`.
Expand Down
38 changes: 19 additions & 19 deletions src/easylink/implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@
class Implementation:
"""A representation of an actual container that will be executed for a :class:`~easylink.step.Step`.
``Implementations`` exist at a lower level than Steps. This class contains
``Implementations`` exist at a lower level than ``Steps``. This class contains
information about what container to use, what environment variables to set
inside the container, and some metadata about the container.
Parameters
----------
schema_steps
The requested Step names for which this ``Implementation`` is expected to
be responsible.
The user-requested ``Step`` names for which this ``Implementation`` is
expected to implement.
implementation_config
The configuration for this ``Implementation``.
The configuration details required to run the relevant container.
input_slots
The :class:`InputSlots<easylink.graph_components.InputSlot>` for this ``Implementation``.
All required :class:`InputSlots<easylink.graph_components.InputSlot>`.
output_slots
The :class:`OutputSlots<easylink.graph_components.OutputSlot>` for this ``Implementation``.
All required :class:`OutputSlots<easylink.graph_components.OutputSlot>`.
"""

def __init__(
Expand All @@ -57,10 +57,10 @@ def __init__(
"""A mapping of environment variables to set."""
self.metadata_steps = self._metadata["steps"]
"""The names of the specific ``Steps`` for which this ``Implementation``
is responsible to implement."""
has been designed to implement."""
self.schema_steps = schema_steps
"""The *user-requested* ``Step`` names for which this ``Implementation``
is responsible to implement."""
"""The names of the specific ``Steps`` that the user has requested to be
implemented by this particular ``Implementation``."""
self.requires_spark = self._metadata.get("requires_spark", False)
"""Whether this ``Implementation`` requires a Spark environment."""

Expand Down Expand Up @@ -90,7 +90,7 @@ def validate(self) -> list[str]:
##################

def _load_metadata(self) -> dict[str, str]:
"""Loads the metadata for this ``Implementation`` instance."""
"""Loads the relevant implementation metadata."""
metadata = load_yaml(paths.IMPLEMENTATION_METADATA)
return metadata[self.name]

Expand All @@ -104,36 +104,36 @@ def _validate_expected_steps(self, logs: list[str]) -> list[str]:
return logs

def _validate_container_exists(self, logs: list[str]) -> list[str]:
"""Validates that the container for this ``Implementation`` exists."""
"""Validates that the container to run exists."""
err_str = f"Container '{self.singularity_image_path}' does not exist."
if not Path(self.singularity_image_path).exists():
logs.append(err_str)
return logs

def _get_env_vars(self, implementation_config: LayeredConfigTree) -> dict[str, str]:
"""Gets the environment variables relevant to this ``Implementation``."""
"""Gets the relevant environment variables."""
env_vars = self._metadata.get("env", {})
env_vars.update(implementation_config.get("configuration", {}))
return env_vars

@property
def singularity_image_path(self) -> str:
"""The path to the Singularity image for this ``Implementation``."""
"""The path to the required Singularity image."""
return self._metadata["image_path"]

@property
def script_cmd(self) -> str:
"""The command to run inside of the container for this ``Implementation``."""
"""The command to run inside of the container."""
return self._metadata["script_cmd"]

@property
def outputs(self) -> dict[str, list[str]]:
"""The outputs expected from this ``Implementation``."""
"""The expected output metadata."""
return self._metadata["outputs"]


class NullImplementation:
"""A partial :class:`Implementation` interface that represents that no container needs to run.
"""A partial :class:`Implementation` interface when no container is needed to run.
The primary use case for this class is when adding an
:class:`~easylink.step.IOStep` - which does not have a corresponding
Expand All @@ -146,9 +146,9 @@ class NullImplementation:
name
The name of this ``NullImplementation``.
input_slots
The ``InputSlots`` for this ``NullImplementation``.
All required ``InputSlots``.
output_slots
The ``OutputSlots`` for this ``NullImplementation``.
All required ``OutputSlots``.
"""

def __init__(
Expand All @@ -171,7 +171,7 @@ def __init__(


class PartialImplementation:
"""A representation of one part of a combined implementation that spans multiple :class:`Steps<easylink.step.Step>`.
"""One part of a combined implementation that spans multiple :class:`Steps<easylink.step.Step>`.
A ``PartialImplementation`` is what is initially added to the
:class:`~easylink.graph_components.ImplementationGraph` when a so-called
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test__load_computing_environment(test_dir, environment_file, expected):
def test_load_missing_computing_environment_fails():
with pytest.raises(
FileNotFoundError,
match="Computing environment is expected to be a path to an existing yaml file. .*",
match="Computing environment is expected to be a path to an existing specification file. .*",
):
_load_computing_environment(Path("some/bogus/path.yaml"))

Expand Down

0 comments on commit 6332f60

Please sign in to comment.