Skip to content

Commit

Permalink
clean up implementation.py
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier committed Jan 7, 2025
1 parent 83cc005 commit 0db68b9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 68 deletions.
8 changes: 4 additions & 4 deletions src/easylink/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
class Config(LayeredConfigTree):
"""A container for configuration information.
A ``Config`` is a container that includes the combination of the user-provided
A ``Config`` is a container that includes the combination of the user-provided
pipeline, input data, and computing environment specifications. It is a nested
dictionary-like object that supports prioritized layers of configuration settings
as well as dot-notation access to its attributes.
Expand Down Expand Up @@ -87,8 +87,8 @@ class Config(LayeredConfigTree):
Notes
-----
The requested pipeline is checked against a set of supported
``PipelineSchemas``. The first schema that successfully validates is assumed
to be the correct one and is attached to the ``Config`` object and its
``PipelineSchemas``. The first schema that successfully validates is assumed
to be the correct one and is attached to the ``Config`` object and its
:meth:`~easylink.pipeline_schema.PipelineSchema.configure_pipeline`
method is called.
"""
Expand Down Expand Up @@ -191,7 +191,7 @@ def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema
This acts as the pipeline configuration file's validation method since
we can only find a matching ``PipelineSchema`` if that file is valid.
This method returns the first ``PipelineSchema`` that successfully validates
This method returns the first ``PipelineSchema`` that successfully validates
and does not attempt to validate additional ones.
"""
errors = defaultdict(dict)
Expand Down
117 changes: 53 additions & 64 deletions src/easylink/implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@
class Implementation:
"""A representation of an actual container that will be executed for a :class:`~easylink.step.Step`.
Implementations exist at a lower level than :class:`Steps<easylink.step.Step>`.
This class contains information about what container to use, what environment
variables to set inside the container, and some metadata about the container.
``Implementations`` exist at a lower level than Steps. This class contains
information about what container to use, what environment variables to set
inside the container, and some metadata about the container.
Parameters
----------
schema_steps
The requested :class:`~easylink.pipeline_schema.PipelineSchema`
:class:`~easylink.step.Step` names for which this Implementation is
expected to be responsible.
The requested Step names for which this ``Implementation`` is expected to
be responsible.
implementation_config
The configuration for this Implementation.
The configuration for this ``Implementation``.
input_slots
The :class:`InputSlots<easylink.graph_components.InputSlot>` for this Implementation.
The :class:`InputSlots<easylink.graph_components.InputSlot>` for this ``Implementation``.
output_slots
The :class:`OutputSlots<easylink.graph_components.OutputSlot>` for this Implementation.
The :class:`OutputSlots<easylink.graph_components.OutputSlot>` for this ``Implementation``.
"""

def __init__(
Expand All @@ -47,31 +46,28 @@ def __init__(
output_slots: Iterable["OutputSlot"] = (),
):
self.name = implementation_config.name
"""The name of this Implementation."""
"""The name of this ``Implementation``."""
self.input_slots = {slot.name: slot for slot in input_slots}
"""A mapping of :class:`InputSlots<easylink.graph_components.InputSlot>`
names to their instances."""
"""A mapping of ``InputSlot`` names to their instances."""
self.output_slots = {slot.name: slot for slot in output_slots}
"""A mapping of :class:`OutputSlots<easylink.graph_components.OutputSlot>`
names to their instances."""
"""A mapping of ``OutputSlot`` names to their instances."""
self._metadata = self._load_metadata()
self.environment_variables = self._get_env_vars(implementation_config)
"""A mapping of environment variables to set."""
self.metadata_steps = self._metadata["steps"]
"""The names of the specific :class:`Steps<easylink.step.Step>` for which
this Implementation is responsible."""
"""The names of the specific ``Steps`` for which this ``Implementation``
is responsible."""
self.schema_steps = schema_steps
"""The requested :class:`~easylink.pipeline_schema.PipelineSchema`
:class:`~easylink.step.Step` names for which this Implementation is
requested to be responsible in the pipeline."""
"""The requested ``Step`` names for which this ``Implementation`` is requested
to be responsible in the pipeline."""
self.requires_spark = self._metadata.get("requires_spark", False)
"""Whether this Implementation requires a Spark environment."""
"""Whether this ``Implementation`` requires a Spark environment."""

def __repr__(self) -> str:
return f"Implementation.{self.name}"

def validate(self) -> list[str]:
"""Validates individual Implementation instances.
"""Validates individual ``Implementation`` instances.
Returns
-------
Expand All @@ -93,12 +89,12 @@ def validate(self) -> list[str]:
##################

def _load_metadata(self) -> dict[str, str]:
"""Loads the metadata for this Implementation instance."""
"""Loads the metadata for this ``Implementation`` instance."""
metadata = load_yaml(paths.IMPLEMENTATION_METADATA)
return metadata[self.name]

def _validate_expected_steps(self, logs: list[str]) -> list[str]:
"""Validates that the Implementation is responsible for the correct steps."""
"""Validates that the ``Implementation`` is responsible for the correct steps."""
if not set(self.schema_steps) == set(self.metadata_steps):
logs.append(
f"Pipeline configuration nodes {self.schema_steps} do not match "
Expand All @@ -107,51 +103,51 @@ def _validate_expected_steps(self, logs: list[str]) -> list[str]:
return logs

def _validate_container_exists(self, logs: list[str]) -> list[str]:
"""Validates that the container for this Implementation exists."""
"""Validates that the container for this ``Implementation`` exists."""
err_str = f"Container '{self.singularity_image_path}' does not exist."
if not Path(self.singularity_image_path).exists():
logs.append(err_str)
return logs

def _get_env_vars(self, implementation_config: LayeredConfigTree) -> dict[str, str]:
"""Gets the environment variables relevant to this Implementation."""
"""Gets the environment variables relevant to this ``Implementation``."""
env_vars = self._metadata.get("env", {})
env_vars.update(implementation_config.get("configuration", {}))
return env_vars

@property
def singularity_image_path(self) -> str:
"""The path to the Singularity image for this Implementation."""
"""The path to the Singularity image for this ``Implementation``."""
return self._metadata["image_path"]

@property
def script_cmd(self) -> str:
"""The command to run inside of the container for this Implementation."""
"""The command to run inside of the container for this ``Implementation``."""
return self._metadata["script_cmd"]

@property
def outputs(self) -> dict[str, list[str]]:
"""The outputs expected from this Implementation."""
"""The outputs expected from this ``Implementation``."""
return self._metadata["outputs"]


class NullImplementation:
"""An object with a partial :class:`Implementation` interface that represents that no container needs to run.
The primary use case for this class is when adding an :class:`~easylink.step.IOStep` -
which does not have a corresponding :class:`Implementation` - to an
:class:`~easylink.graph_components.ImplementationGraph` since adding any new
node requires an object with :class:`~easylink.graph_components.InputSlot`
The primary use case for this class is when adding an
:class:`~easylink.step.IOStep` - which does not have a corresponding
``Implementation`` - to an :class:`~easylink.graph_components.ImplementationGraph`
since adding any new node requires an object with :class:`~easylink.graph_components.InputSlot`
and :class:`~easylink.graph_components.OutputSlot` names.
Parameters
----------
name
The name of this NullImplementation.
The name of this ``NullImplementation``.
input_slots
The :class:`InputSlots<easylink.graph_components.InputSlot>` for this NullImplementation.
The ``InputSlots`` for this ``NullImplementation``.
output_slots
The :class:`OutputSlots<easylink.graph_components.OutputSlot>` for this NullImplementation.
The ``OutputSlots`` for this ``NullImplementation``.
"""

def __init__(
Expand All @@ -161,49 +157,45 @@ def __init__(
output_slots: Iterable["OutputSlot"] = (),
):
self.name = name
"""The name of this NullImplementation."""
"""The name of this ``NullImplementation``."""
self.input_slots = {slot.name: slot for slot in input_slots}
"""A mapping of :class:`InputSlots<easylink.graph_components.InputSlot>`
names to their instances."""
"""A mapping of ``InputSlot`` names to their instances."""
self.output_slots = {slot.name: slot for slot in output_slots}
"""A mapping of :class:`OutputSlots<easylink.graph_components.OutputSlot>`
names to their instances."""
"""A mapping of ``OutputSlot`` names to their instances."""
self.schema_steps = [self.name]
"""The requested :class:`~easylink.pipeline_schema.PipelineSchema`
:class:`~easylink.step.Step` names for which this NullImplementation is
expected to be responsible."""
"""The requested :class:`~easylink.step.Step` names for which this ``NullImplementation``
is expected to be responsible."""
self.combined_name = None
"""The name of the combined implementation that this NullImplementation
is part of. This is definitionally None for a NullImplementation."""
"""The name of the combined implementation that this ``NullImplementation``
is part of. This is definitionally None for a `NullImplementation`."""


class PartialImplementation:
"""A representation of one part of a combined implementation that spans multiple :class:`Steps<easylink.step.Step>`.
A PartialImplementation is what is initially added to the :class:`~easylink.graph_components.ImplementationGraph`
when a so-called "combined implementation" is used (i.e. an :class:`Implementation`
that spans multiple :class:`Steps<easylink.step.Step>`).
We initially add a node for _each_ :class:`~easylink.step.Step`, which has as
its ``implementation`` attribute a PartialImplementation. Such a graph is not
A ``PartialImplementation`` is what is initially added to the
:class:`~easylink.graph_components.ImplementationGraph` when a so-called
"combined implementation" is used (i.e. an :class:`Implementation` that spans
multiple ``Steps``). We initially add a node for each ``Step``, which has as
its ``implementation`` attribute a ``PartialImplementation``. Such a graph is not
yet fit to run. When we make our second pass through, after the flat (non-hierarchical)
:class:`~easylink.pipeline_graph.PipelineGraph` has been created, we find the
set of PartialImplementation nodes corresponding to each combined implementation
and replace them with a single node with a true :class:`Implementation` representing
set of ``PartialImplementation`` nodes corresponding to each combined implementation
and replace them with a single node with a true ``Implementation`` representing
the combined implementation.
Parameters
----------
combined_name
The name of the combined implementation that this PartialImplementation
The name of the combined implementation that this ``PartialImplementation``
is part of.
schema_step
The requested :class:`~easylink.pipeline_schema.PipelineSchema`
:class:`~easylink.step.Step` name for which this PartialImplementation is
The requested ``Step`` name for which this ``PartialImplementation`` is
expected to be responsible.
input_slots
The :class:`InputSlots<easylink.graph_components.InputSlot>` for this PartialImplementation.
The :class:`InputSlots<easylink.graph_components.InputSlot>` for this ``PartialImplementation``.
output_slots
The :class:`OutputSlots<easylink.graph_components.OutputSlot>` for this PartialImplementation.
The :class:`OutputSlots<easylink.graph_components.OutputSlot>` for this ``PartialImplementation``.
"""

Expand All @@ -215,15 +207,12 @@ def __init__(
output_slots: Iterable["OutputSlot"] = (),
):
self.combined_name = combined_name
"""The name of the combined implementation that this PartialImplementation
"""The name of the combined implementation that this ``PartialImplementation``
is part of."""
self.schema_step = schema_step
"""The requested :class:`~easylink.pipeline_schema.PipelineSchema`
:class:`~easylink.step.Step` name for which this PartialImplementation is
"""The requested ``Step`` name for which this ``PartialImplementation`` is
expected to be responsible."""
self.input_slots = {slot.name: slot for slot in input_slots}
"""A mapping of :class:`InputSlots<easylink.graph_components.InputSlot>`
names to their instances."""
"""A mapping of ``InputSlot`` names to their instances."""
self.output_slots = {slot.name: slot for slot in output_slots}
"""A mapping of :class:`OutputSlots<easylink.graph_components.OutputSlot>`
names to their instances."""
"""A mapping of ``OutputSlot`` names to their instances."""

0 comments on commit 0db68b9

Please sign in to comment.