diff --git a/src/easylink/configuration.py b/src/easylink/configuration.py index c95e14ad..d52d742d 100644 --- a/src/easylink/configuration.py +++ b/src/easylink/configuration.py @@ -174,7 +174,7 @@ def spark_resources(self) -> dict[str, Any]: ################# def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema: - """Returns the first ``PipelineSchema`` that successfully validates the requested pipeline. + """Returns the first :class:`~easylink.pipeline_schema.PipelineSchema` that validates the requested pipeline. Parameters ---------- @@ -183,17 +183,17 @@ def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema Returns ------- - The first pipeline schema that successfully validates the requested pipeline. - If no validated pipeline schema is found, `exit()` is called with `errno.EINVAL` - and any validation errors are logged. + The first ``PipelineSchema`` that validates the requested pipeline. + If no validated ``PipelineSchema`` is found, `exit()` is called with + `errno.EINVAL` and any validation errors are logged. Notes ----- This acts as the pipeline configuration file's validation method since we can only find a matching ``PipelineSchema`` if that file is valid. - This method returns the first ``PipelineSchema`` that successfully validates - and does not attempt to validate additional ones. + This method returns the *first* ``PipelineSchema`` that validates and does + not attempt to check additional ones. """ errors = defaultdict(dict) # Try each schema until one is validated @@ -283,11 +283,11 @@ def load_params_from_specification( Parameters ---------- pipeline_specification - The path to the pipeline specification yaml file. + The path to the pipeline specification file. input_data - The path to the input data yaml file. + The path to the input data file. computing_environment - The path to the computing environment yaml file. + The path to the computing environment file. results_dir The path to the results directory. @@ -306,7 +306,7 @@ def load_params_from_specification( def _load_input_data_paths( input_data_specification_path: str | Path, ) -> dict[str, list[Path]]: - """Creates a dictionary of input data paths from the input data yaml file.""" + """Creates a dictionary of input data paths from the input data specification file.""" input_data_paths = load_yaml(input_data_specification_path) if not isinstance(input_data_paths, dict): raise TypeError( @@ -322,13 +322,13 @@ def _load_input_data_paths( def _load_computing_environment( computing_environment_specification_path: str | None, ) -> dict[Any, Any]: - """Loads the computing environment yaml file and returns the contents as a dict.""" + """Loads the computing environment specification file and returns the contents as a dict.""" if not computing_environment_specification_path: return {} # handles empty environment.yaml elif not Path(computing_environment_specification_path).is_file(): raise FileNotFoundError( "Computing environment is expected to be a path to an existing" - f" yaml file. Input was: '{computing_environment_specification_path}'" + f" specification file. Input was: '{computing_environment_specification_path}'" ) else: return load_yaml(computing_environment_specification_path) diff --git a/src/easylink/graph_components.py b/src/easylink/graph_components.py index 8aa0c019..7093856c 100644 --- a/src/easylink/graph_components.py +++ b/src/easylink/graph_components.py @@ -24,7 +24,7 @@ @dataclass(frozen=True) class InputSlot: - """An abstraction representing a single input slot to a specific node. + """A single input slot to a specific node. ``InputSlots`` represent distinct semantic categories of input files, between which a node must be able to differentiate. In order to pass data between nodes, @@ -39,20 +39,24 @@ class InputSlot: name: str """The name of the ``InputSlot``.""" env_var: str | None - """The environment variable that this ``InputSlot`` will use to pass a list - of data filepaths to an ``Implementation``.""" + """The environment variable that is used to pass a list of data filepaths to + an ``Implementation``.""" validator: Callable[[str], None] - """A callable that validates the input data being passed into the pipeline via - this ``InputSlot``. If the data is invalid, the callable should raise an exception - with a descriptive error message which will then be reported to the user.""" + """A function that validates the input data being passed into the pipeline via + this ``InputSlot``. If the data is invalid, the function should raise an exception + with a descriptive error message which will then be reported to the user. + **Note that the function *must* be defined in the :mod:`easylink.utilities.validation_utils` + module!**""" @dataclass(frozen=True) class OutputSlot: - """An abstraction representing a single output slot from a specific node. + """A single output slot from a specific node. - In order to pass data between nodes, an ``OutputSlot`` of one node can be connected - to an :class:`InputSlot` of another node via an :class:`EdgeParams` instance. + ``Outputslots`` represent distinct semantic categories of output files, between + which a node must be able to differentiate. In order to pass data between nodes, + an ``OutputSlot`` of one node can be connected to an :class:`InputSlot` of another + node via an :class:`EdgeParams` instance. Notes ----- @@ -70,9 +74,9 @@ class OutputSlot: @dataclass(frozen=True) class EdgeParams: - """A representation of an edge between two nodes in a graph. + """The details of an edge between two nodes in a graph. - EdgeParams connect the :class:`OutputSlot` of a source node to the :class:`InputSlot` + ``EdgeParams`` connect the :class:`OutputSlot` of a source node to the :class:`InputSlot` of a target node. Notes @@ -81,15 +85,15 @@ class EdgeParams: """ source_node: str - """The name of the source node/``Step``.""" + """The name of the source node.""" target_node: str - """The name of the target node/``Step``.""" + """The name of the target node.""" output_slot: str - """The name of the ``OutputSlot`` of the source node/``Step``.""" + """The name of the source node's ``OutputSlot``.""" input_slot: str - """The name of the ``InputSlot`` of the target node/``Step``.""" + """The name of the target node's ``InputSlot``.""" filepaths: tuple[str] | None = None - """The filepaths that are passed from the source node/``Step`` to the target node/``Step``.""" + """The filepaths that are passed from the source node to the target node.""" @classmethod def from_graph_edge( @@ -103,12 +107,13 @@ def from_graph_edge( Parameters ---------- source - The name of the source node/``Step``. + The name of the source node. sink - The name of the target node/``Step``. + The name of the target node. edge_attrs - The attributes of the edge connecting the source and target nodes/``Steps``. - 'output_slot' and 'input_slot' are required keys and 'filepaths' is optional. + The attributes of the edge connecting the source and target nodes. + 'output_slot' and 'input_slot' are required keys while 'filepaths' is + optional. """ return cls( source, @@ -120,10 +125,11 @@ def from_graph_edge( class StepGraph(nx.MultiDiGraph): - """A directed acyclic graph (DAG) of :class:`Steps` and the data dependencies between them. + """A directed acyclic graph (DAG) of :class:`Steps`. - ``StepGraphs`` are DAGs with ``Steps`` for nodes and the file dependencies between - them for edges. Multiple edges between nodes are permitted. + ``StepGraphs`` are DAGs with ``Step`` names for nodes and their corresponding + ``Step`` instances as attributes on those nodes. The file dependencies between + nodes are the graph edges; multiple edges between nodes are permitted. Notes ----- @@ -135,13 +141,13 @@ class StepGraph(nx.MultiDiGraph): @property def step_nodes(self) -> list[str]: - """The topologically sorted list of node/``Step`` names.""" + """The topologically sorted list of ``Step`` names.""" ordered_nodes = list(nx.topological_sort(self)) return [node for node in ordered_nodes if node != "input_data" and node != "results"] @property def steps(self) -> list[Step]: - """The list of all ``Steps`` in the graph.""" + """The topologically sorted list of all ``Steps`` in the graph.""" return [self.nodes[node]["step"] for node in self.step_nodes] def add_node_from_step(self, step: Step) -> None: @@ -196,13 +202,13 @@ class ImplementationGraph(nx.MultiDiGraph): @property def implementation_nodes(self) -> list[str]: - """The topologically sorted list of node/``Implementation`` names.""" + """The topologically sorted list of ``Implementation`` names.""" ordered_nodes = list(nx.topological_sort(self)) return [node for node in ordered_nodes if node != "input_data" and node != "results"] @property def implementations(self) -> list[Implementation]: - """The list of all ``Implementations`` in the graph.""" + """The topologically sorted list of all ``Implementations`` in the graph.""" return [self.nodes[node]["implementation"] for node in self.implementation_nodes] def add_node_from_implementation(self, node_name, implementation: Implementation) -> None: @@ -240,29 +246,33 @@ def add_edge_from_params(self, edge_params: EdgeParams) -> None: @dataclass(frozen=True) class SlotMapping(ABC): - """A mapping between a slot on a parent :class:`~easylink.step.Step` and a slot on one of its child ``Steps``. + """A mapping between a slot on a parent node and a slot on one of its child nodes. ``SlotMapping`` is an interface intended to be used by concrete :class:`InputSlotMapping` and :class:`OutputSlotMapping` classes. It represents a mapping between - parent and child nodes/``Steps`` at different levels of a potentially-nested + parent and child nodes at different levels of a potentially-nested :class:`~easylink.pipeline_schema.PipelineSchema`. + + Notes + ----- + Nodes can be either :class:`Steps` or :class:`Implementations`. """ parent_slot: str """The name of the parent slot.""" child_node: str - """The name of the child node/``Step``.""" + """The name of the child node.""" child_slot: str """The name of the child slot.""" @abstractmethod def remap_edge(self, edge: EdgeParams) -> EdgeParams: - """Remaps an edge to connect the parent and child nodes/``Steps``.""" + """Remaps an edge to connect the parent and child nodes.""" pass class InputSlotMapping(SlotMapping): - """A mapping between :class:`InputSlots` of a parent node/:class:`~easylink.step.Step` and a child node/``Step``.""" + """A mapping between :class:`InputSlots` of a parent node and a child node.""" def remap_edge(self, edge: EdgeParams) -> EdgeParams: """Remaps an edge's ``InputSlot``. @@ -293,7 +303,7 @@ def remap_edge(self, edge: EdgeParams) -> EdgeParams: class OutputSlotMapping(SlotMapping): - """A mapping between :class:`InputSlots` of a parent node/:class:`~easylink.step.Step` and a child node/``Step``.""" + """A mapping between :class:`InputSlots` of a parent node and a child node.""" def remap_edge(self, edge: EdgeParams) -> EdgeParams: """Remaps an edge's :class:`OutputSlot`. diff --git a/src/easylink/implementation.py b/src/easylink/implementation.py index a50ad653..d05e8b5d 100644 --- a/src/easylink/implementation.py +++ b/src/easylink/implementation.py @@ -22,21 +22,21 @@ class Implementation: """A representation of an actual container that will be executed for a :class:`~easylink.step.Step`. - ``Implementations`` exist at a lower level than Steps. This class contains + ``Implementations`` exist at a lower level than ``Steps``. This class contains information about what container to use, what environment variables to set inside the container, and some metadata about the container. Parameters ---------- schema_steps - The requested Step names for which this ``Implementation`` is expected to - be responsible. + The user-requested ``Step`` names for which this ``Implementation`` is + expected to implement. implementation_config - The configuration for this ``Implementation``. + The configuration details required to run the relevant container. input_slots - The :class:`InputSlots` for this ``Implementation``. + All required :class:`InputSlots`. output_slots - The :class:`OutputSlots` for this ``Implementation``. + All required :class:`OutputSlots`. """ def __init__( @@ -57,10 +57,10 @@ def __init__( """A mapping of environment variables to set.""" self.metadata_steps = self._metadata["steps"] """The names of the specific ``Steps`` for which this ``Implementation`` - is responsible to implement.""" + has been designed to implement.""" self.schema_steps = schema_steps - """The *user-requested* ``Step`` names for which this ``Implementation`` - is responsible to implement.""" + """The names of the specific ``Steps`` that the user has requested to be + implemented by this particular ``Implementation``.""" self.requires_spark = self._metadata.get("requires_spark", False) """Whether this ``Implementation`` requires a Spark environment.""" @@ -90,7 +90,7 @@ def validate(self) -> list[str]: ################## def _load_metadata(self) -> dict[str, str]: - """Loads the metadata for this ``Implementation`` instance.""" + """Loads the relevant implementation metadata.""" metadata = load_yaml(paths.IMPLEMENTATION_METADATA) return metadata[self.name] @@ -104,36 +104,36 @@ def _validate_expected_steps(self, logs: list[str]) -> list[str]: return logs def _validate_container_exists(self, logs: list[str]) -> list[str]: - """Validates that the container for this ``Implementation`` exists.""" + """Validates that the container to run exists.""" err_str = f"Container '{self.singularity_image_path}' does not exist." if not Path(self.singularity_image_path).exists(): logs.append(err_str) return logs def _get_env_vars(self, implementation_config: LayeredConfigTree) -> dict[str, str]: - """Gets the environment variables relevant to this ``Implementation``.""" + """Gets the relevant environment variables.""" env_vars = self._metadata.get("env", {}) env_vars.update(implementation_config.get("configuration", {})) return env_vars @property def singularity_image_path(self) -> str: - """The path to the Singularity image for this ``Implementation``.""" + """The path to the required Singularity image.""" return self._metadata["image_path"] @property def script_cmd(self) -> str: - """The command to run inside of the container for this ``Implementation``.""" + """The command to run inside of the container.""" return self._metadata["script_cmd"] @property def outputs(self) -> dict[str, list[str]]: - """The outputs expected from this ``Implementation``.""" + """The expected output metadata.""" return self._metadata["outputs"] class NullImplementation: - """A partial :class:`Implementation` interface that represents that no container needs to run. + """A partial :class:`Implementation` interface when no container is needed to run. The primary use case for this class is when adding an :class:`~easylink.step.IOStep` - which does not have a corresponding @@ -146,9 +146,9 @@ class NullImplementation: name The name of this ``NullImplementation``. input_slots - The ``InputSlots`` for this ``NullImplementation``. + All required ``InputSlots``. output_slots - The ``OutputSlots`` for this ``NullImplementation``. + All required ``OutputSlots``. """ def __init__( @@ -171,7 +171,7 @@ def __init__( class PartialImplementation: - """A representation of one part of a combined implementation that spans multiple :class:`Steps`. + """One part of a combined implementation that spans multiple :class:`Steps`. A ``PartialImplementation`` is what is initially added to the :class:`~easylink.graph_components.ImplementationGraph` when a so-called diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index d90eb2aa..2e01d55d 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -78,7 +78,7 @@ def test__load_computing_environment(test_dir, environment_file, expected): def test_load_missing_computing_environment_fails(): with pytest.raises( FileNotFoundError, - match="Computing environment is expected to be a path to an existing yaml file. .*", + match="Computing environment is expected to be a path to an existing specification file. .*", ): _load_computing_environment(Path("some/bogus/path.yaml"))