Skip to content

Commit

Permalink
Sbachmei/mic 5561/improve docstrings cli configuration (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier authored Jan 2, 2025
1 parent 1261e53 commit 6bef110
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 54 deletions.
1 change: 0 additions & 1 deletion build-doc

This file was deleted.

11 changes: 10 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
"numpy": ("https://numpy.org/doc/stable/", None),
"networkx": ("https://networkx.org/documentation/stable/", None),
"layered_config_tree": ("https://layered-config-tree.readthedocs.io/en/latest/", None),
"loguru": ("https://loguru.readthedocs.io/en/stable/", None),
}


Expand All @@ -213,10 +214,18 @@
# Generate docs even if an item has no docstring.
"undoc-members": True,
# Don't document things with a leading underscore.
"private-members": False,
"private-members": True,
# Include special members.
"special-members": "__init__",
# Show class inheritance.
"show-inheritance": True,
}
# Display type hints in the description instead of the signature.
autodoc_typehints = "description"
# Mock problematic imports
autodoc_mock_imports = [
"networkx",
]


# -- nitpicky mode --------------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions docs/source/user_guide/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
Command Line Interface
======================

.. automodule:: easylink.cli

.. click:: easylink.cli:easylink
:prog: easylink
:show-nested:
:nested: full
:commands: run, generate-dag
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
"pytest-mock",
]
doc_requirements = [
"sphinx>=4.0,<8.0.0",
"sphinx-rtd-theme>=0.6",
"sphinx",
"sphinx-rtd-theme",
"sphinx-autodoc-typehints",
"sphinx-click",
"typing_extensions",
]
lint_requirements = [
"black==22.3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/easylink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
EasyLink
========
Research repository for the EasyLink ER ecosystem project.
Research repository for the EasyLink entity resolution (ER) ecosystem project.
"""

Expand Down
34 changes: 21 additions & 13 deletions src/easylink/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
),
),
click.option(
"--timestamp/--no-timestamp",
default=True,
show_default=True,
help="Save the results in a timestamped sub-directory of --output-dir.",
"--no-timestamp",
is_flag=True,
default=False,
help="Do not save the results in a timestamped sub-directory of ``--output-dir``.",
),
]

Expand All @@ -66,9 +66,8 @@ def easylink():
show_default=True,
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
help=(
"Path to the specification yaml defining the computing environment to "
"run the pipeline on. If no value is passed, the pipeline will be run "
"locally."
"Path to the computing environment specification yaml. If no value is passed, "
"the pipeline will be run locally."
),
)
@click.option("-v", "--verbose", count=True, help="Increase logging verbosity.", hidden=True)
Expand All @@ -83,15 +82,20 @@ def run(
pipeline_specification: str,
input_data: str,
output_dir: str | None,
timestamp: bool,
no_timestamp: bool,
computing_environment: str | None,
verbose: int,
with_debugger: bool,
) -> None:
"""Run a pipeline from the command line."""
"""Runs a pipeline from the command line.
In addition to running the pipeline, this command will also generate the
DAG image. If you only want to generate the image without actually running
the pipeline, use the ``easylink generate-dag`` command.
"""
configure_logging_to_terminal(verbose)
logger.info("Running pipeline")
results_dir = get_results_directory(output_dir, timestamp).as_posix()
results_dir = get_results_directory(output_dir, no_timestamp).as_posix()
logger.info(f"Results directory: {results_dir}")
# TODO [MIC-4493]: Add configuration validation

Expand All @@ -114,11 +118,15 @@ def generate_dag(
pipeline_specification: str,
input_data: str,
output_dir: str | None,
timestamp: bool,
no_timestamp: bool,
) -> None:
"""Generate an image of the proposed pipeline DAG."""
"""Generates an image of the proposed pipeline DAG.
This command only generates the DAG image of the pipeline; it does not actually
run it. To run the pipeline, use the ``easylink run`` command.
"""
logger.info("Generating DAG")
results_dir = get_results_directory(output_dir, timestamp).as_posix()
results_dir = get_results_directory(output_dir, no_timestamp).as_posix()
logger.info(f"Results directory: {results_dir}")
# TODO [MIC-4493]: Add configuration validation
runner.main(
Expand Down
142 changes: 123 additions & 19 deletions src/easylink/configuration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
"""
=============
Configuration
=============
The configuration module is responsible for managing an easylink run's configuration
and includes the ``Config`` container class as well as various configuration-related
utility functions.
"""

from collections import defaultdict
from pathlib import Path
from typing import Any
Expand All @@ -23,6 +34,8 @@
},
}
}
"""The default environment configuration settings."""

SPARK_DEFAULTS = {
"workers": {
"num_workers": 2,
Expand All @@ -32,24 +45,64 @@
},
"keep_alive": False,
}
"""The default spark configuration settings."""

# Allow some buffer so that slurm doesn't kill spark workers
SLURM_SPARK_MEM_BUFFER = 500
SLURM_SPARK_MEM_BUFFER = 500 # MB


class Config(LayeredConfigTree):
"""A container for configuration information.
This class combines the pipeline, input data, and computing environment
specifications into a single LayeredConfigTree object. It is also responsible
for validating these specifications.
A ``Config`` (which inherits from :class:`~layered_config_tree.LayeredConfigTree`)
is a container that includes the combination of the user-provided pipeline,
input data, and computing environment specifications. It is a nested
dictionary-like object that supports prioritized layers of configuration settings
as well as dot-notation access to its attributes.
The ``Config`` is also reponsible for various validation checks on the provided
specifications. If any of these are invalid, a validation error is raised with
as much information as can possibly be provided.
Attributes
----------
environment
The environment configuration, including computing environment,
container engine, implementation resources, and slurm- and spark-specific
requests.
pipeline
The pipeline configuration.
input_data
The input data filepaths.
schema
The :class:`~easylink.pipeline_schema.PipelineSchema` that successfully
validated the requested pipeline.
Notes
-----
The requested pipeline is checked against a set of supported
:class:`pipeline schemas <easylink.pipeline_schema.PipelineSchema>`. The first
schema that successfully validates is assumed to be the correct one and is attached
to the ``Config`` object and its :meth:`~easylink.pipeline_schema.PipelineSchema.configure_pipeline`
method is called.
"""

def __init__(
self,
config_params: dict[str, Any],
potential_schemas: list[PipelineSchema] | PipelineSchema = PIPELINE_SCHEMAS,
):
) -> None:
"""Initializes an instance of the ``Config`` class.
Parameters
----------
config_params
A dictionary of all specifications required to run the pipeline. This
includes the pipeline, input data, and computing environment specifications,
as well as the results directory.
potential_schemas
A list of potential schemas to validate the pipeline configuration against.
This is primarily used for testing purposes. Defaults to the supported schemas.
"""
super().__init__(layers=["initial_data", "default", "user_configured"])
self.update(DEFAULT_ENVIRONMENT, layer="default")
self.update(config_params, layer="user_configured")
Expand All @@ -67,26 +120,26 @@ def __init__(
self.freeze()

@property
def computing_environment(self) -> dict[str, Any]:
"""The computing environment to run on (generally either 'local' or 'slurm')."""
def computing_environment(self) -> str:
"""The computing environment to run on ('local' or 'slurm')."""
return self.environment.computing_environment

@property
def slurm(self) -> dict[str, Any]:
"""A dictionary of slurm configuration settings."""
"""A dictionary of slurm-specific configuration settings."""
if not self.environment.computing_environment == "slurm":
return {}
else:
return self.environment.slurm.to_dict()

@property
def spark(self) -> dict[str, Any]:
"""A dictionary of spark configuration settings."""
"""A dictionary of spark-specific configuration settings."""
return self.environment.spark.to_dict()

@property
def slurm_resources(self) -> dict[str, str]:
"""A flat dictionary of the slurm resources."""
"""A flat dictionary of slurm resource requests."""
if not self.computing_environment == "slurm":
return {}
raw_slurm_resources = {
Expand All @@ -103,7 +156,7 @@ def slurm_resources(self) -> dict[str, str]:

@property
def spark_resources(self) -> dict[str, Any]:
"""A flat dictionary of the spark resources."""
"""A flat dictionary of spark resource requests."""
spark_workers_raw = self.spark["workers"]
spark_workers = {
"num_workers": spark_workers_raw.get("num_workers"),
Expand All @@ -125,14 +178,26 @@ def spark_resources(self) -> dict[str, Any]:
#################

def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema:
"""Validates the requested pipeline against supported schemas.
"""Returns the first pipeline schema that successfully validates the requested pipeline.
Parameters
----------
potential_schemas
Pipeline schemas to validate the pipeline configuration against.
Returns
-------
The first pipeline schema that successfully validates the requested pipeline.
If no validated pipeline schema is found, `exit()` is called with `errno.EINVAL`
and any validation errors are logged.
Notes
-----
This acts as the pipeline configuration file's validation method since
we can only find a matching schema if the file is valid.
we can only find a matching schema if that file is valid.
We use the first schema that validates the pipeline configuration.
This method returns the first schema that successfully validates and does
not attempt to validate additional ones.
"""
errors = defaultdict(dict)
# Try each schema until one is validated
Expand All @@ -147,6 +212,18 @@ def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema
exit_with_validation_error(dict(errors))

def _validate(self) -> None:
"""Validates the ``Config``.
Raises
------
SystemExit
If any errors are found, they are batch-logged into a dictionary and
the program exits with a non-zero code.
Notes
-----
Pipeline validations are handled in :meth:`~easylink.configuration.Config._get_schema`.
"""
# TODO [MIC-4880]: refactor into validation object
errors = {
# NOTE: pipeline configuration validation happens in '_get_schema()'
Expand All @@ -157,6 +234,12 @@ def _validate(self) -> None:
exit_with_validation_error(errors)

def _validate_input_data(self) -> dict[Any, Any]:
"""Validates the input data configuration.
Returns
-------
A dictionary of input data configuration validation errors.
"""
errors = defaultdict(dict)
input_data_dict = self.input_data.to_dict()
if not input_data_dict:
Expand All @@ -168,6 +251,12 @@ def _validate_input_data(self) -> dict[Any, Any]:
return errors

def _validate_environment(self) -> dict[Any, Any]:
"""Validates the environment configuration.
Returns
-------
A dictionary of environment configuration validation errors.
"""
errors = defaultdict(dict)
if not self.environment.container_engine in ["docker", "singularity", "undefined"]:
errors[ENVIRONMENT_ERRORS_KEY]["container_engine"] = [
Expand All @@ -189,11 +278,26 @@ def load_params_from_specification(
computing_environment: str | None,
results_dir: str,
) -> dict[str, Any]:
"""Gather together all specification data.
"""Gathers together all specification data.
This gathers the pipeline, input data, and computing environment specifications
as well as the results directory into a single dictionary for insertion into
the Config object.
the ``Config`` object.
Parameters
----------
pipeline_specification
The path to the pipeline specification yaml file.
input_data
The path to the input data yaml file.
computing_environment
The path to the computing environment yaml file.
results_dir
The path to the results directory.
Returns
-------
A dictionary of all provided specification data.
"""
return {
"pipeline": load_yaml(pipeline_specification),
Expand All @@ -206,7 +310,7 @@ def load_params_from_specification(
def _load_input_data_paths(
input_data_specification_path: str | Path,
) -> dict[str, list[Path]]:
"""Create dictionary of input data paths from the input data yaml file."""
"""Creates a dictionary of input data paths from the input data yaml file."""
input_data_paths = load_yaml(input_data_specification_path)
if not isinstance(input_data_paths, dict):
raise TypeError(
Expand All @@ -222,7 +326,7 @@ def _load_input_data_paths(
def _load_computing_environment(
computing_environment_specification_path: str | None,
) -> dict[Any, Any]:
"""Load the computing environment yaml file and return the contents as a dict."""
"""Loads the computing environment yaml file and returns the contents as a dict."""
if not computing_environment_specification_path:
return {} # handles empty environment.yaml
elif not Path(computing_environment_specification_path).is_file():
Expand Down
Loading

0 comments on commit 6bef110

Please sign in to comment.