diff --git a/build-doc b/build-doc deleted file mode 100644 index 9f269077..00000000 --- a/build-doc +++ /dev/null @@ -1 +0,0 @@ -Ignore, Created by Makefile, Fri 27 Dec 2024 12:52:35 PM PST diff --git a/docs/source/conf.py b/docs/source/conf.py index b25561be..9e1de210 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -198,6 +198,7 @@ "numpy": ("https://numpy.org/doc/stable/", None), "networkx": ("https://networkx.org/documentation/stable/", None), "layered_config_tree": ("https://layered-config-tree.readthedocs.io/en/latest/", None), + "loguru": ("https://loguru.readthedocs.io/en/stable/", None), } @@ -213,10 +214,18 @@ # Generate docs even if an item has no docstring. "undoc-members": True, # Don't document things with a leading underscore. - "private-members": False, + "private-members": True, + # Include special members. + "special-members": "__init__", + # Show class inheritance. + "show-inheritance": True, } # Display type hints in the description instead of the signature. autodoc_typehints = "description" +# Mock problematic imports +autodoc_mock_imports = [ + "networkx", +] # -- nitpicky mode -------------------------------------------------------- diff --git a/docs/source/user_guide/cli.rst b/docs/source/user_guide/cli.rst index a8057fa8..09434266 100644 --- a/docs/source/user_guide/cli.rst +++ b/docs/source/user_guide/cli.rst @@ -4,8 +4,7 @@ Command Line Interface ====================== -.. automodule:: easylink.cli - .. click:: easylink.cli:easylink :prog: easylink - :show-nested: + :nested: full + :commands: run, generate-dag \ No newline at end of file diff --git a/setup.py b/setup.py index 63c04e6f..55e792ff 100644 --- a/setup.py +++ b/setup.py @@ -39,10 +39,11 @@ "pytest-mock", ] doc_requirements = [ - "sphinx>=4.0,<8.0.0", - "sphinx-rtd-theme>=0.6", + "sphinx", + "sphinx-rtd-theme", "sphinx-autodoc-typehints", "sphinx-click", + "typing_extensions", ] lint_requirements = [ "black==22.3.0", diff --git a/src/easylink/__init__.py b/src/easylink/__init__.py index 1e2ad34d..4a0a535e 100644 --- a/src/easylink/__init__.py +++ b/src/easylink/__init__.py @@ -3,7 +3,7 @@ EasyLink ======== -Research repository for the EasyLink ER ecosystem project. +Research repository for the EasyLink entity resolution (ER) ecosystem project. """ diff --git a/src/easylink/cli.py b/src/easylink/cli.py index c5f26222..6d3b626e 100644 --- a/src/easylink/cli.py +++ b/src/easylink/cli.py @@ -37,10 +37,10 @@ ), ), click.option( - "--timestamp/--no-timestamp", - default=True, - show_default=True, - help="Save the results in a timestamped sub-directory of --output-dir.", + "--no-timestamp", + is_flag=True, + default=False, + help="Do not save the results in a timestamped sub-directory of ``--output-dir``.", ), ] @@ -66,9 +66,8 @@ def easylink(): show_default=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True), help=( - "Path to the specification yaml defining the computing environment to " - "run the pipeline on. If no value is passed, the pipeline will be run " - "locally." + "Path to the computing environment specification yaml. If no value is passed, " + "the pipeline will be run locally." ), ) @click.option("-v", "--verbose", count=True, help="Increase logging verbosity.", hidden=True) @@ -83,15 +82,20 @@ def run( pipeline_specification: str, input_data: str, output_dir: str | None, - timestamp: bool, + no_timestamp: bool, computing_environment: str | None, verbose: int, with_debugger: bool, ) -> None: - """Run a pipeline from the command line.""" + """Runs a pipeline from the command line. + + In addition to running the pipeline, this command will also generate the + DAG image. If you only want to generate the image without actually running + the pipeline, use the ``easylink generate-dag`` command. + """ configure_logging_to_terminal(verbose) logger.info("Running pipeline") - results_dir = get_results_directory(output_dir, timestamp).as_posix() + results_dir = get_results_directory(output_dir, no_timestamp).as_posix() logger.info(f"Results directory: {results_dir}") # TODO [MIC-4493]: Add configuration validation @@ -114,11 +118,15 @@ def generate_dag( pipeline_specification: str, input_data: str, output_dir: str | None, - timestamp: bool, + no_timestamp: bool, ) -> None: - """Generate an image of the proposed pipeline DAG.""" + """Generates an image of the proposed pipeline DAG. + + This command only generates the DAG image of the pipeline; it does not actually + run it. To run the pipeline, use the ``easylink run`` command. + """ logger.info("Generating DAG") - results_dir = get_results_directory(output_dir, timestamp).as_posix() + results_dir = get_results_directory(output_dir, no_timestamp).as_posix() logger.info(f"Results directory: {results_dir}") # TODO [MIC-4493]: Add configuration validation runner.main( diff --git a/src/easylink/configuration.py b/src/easylink/configuration.py index 19619ed3..60a7e3ed 100644 --- a/src/easylink/configuration.py +++ b/src/easylink/configuration.py @@ -1,3 +1,14 @@ +""" +============= +Configuration +============= + +The configuration module is responsible for managing an easylink run's configuration +and includes the ``Config`` container class as well as various configuration-related +utility functions. + +""" + from collections import defaultdict from pathlib import Path from typing import Any @@ -23,6 +34,8 @@ }, } } +"""The default environment configuration settings.""" + SPARK_DEFAULTS = { "workers": { "num_workers": 2, @@ -32,24 +45,64 @@ }, "keep_alive": False, } +"""The default spark configuration settings.""" -# Allow some buffer so that slurm doesn't kill spark workers -SLURM_SPARK_MEM_BUFFER = 500 +SLURM_SPARK_MEM_BUFFER = 500 # MB class Config(LayeredConfigTree): """A container for configuration information. - This class combines the pipeline, input data, and computing environment - specifications into a single LayeredConfigTree object. It is also responsible - for validating these specifications. + A ``Config`` (which inherits from :class:`~layered_config_tree.LayeredConfigTree`) + is a container that includes the combination of the user-provided pipeline, + input data, and computing environment specifications. It is a nested + dictionary-like object that supports prioritized layers of configuration settings + as well as dot-notation access to its attributes. + + The ``Config`` is also reponsible for various validation checks on the provided + specifications. If any of these are invalid, a validation error is raised with + as much information as can possibly be provided. + + Attributes + ---------- + environment + The environment configuration, including computing environment, + container engine, implementation resources, and slurm- and spark-specific + requests. + pipeline + The pipeline configuration. + input_data + The input data filepaths. + schema + The :class:`~easylink.pipeline_schema.PipelineSchema` that successfully + validated the requested pipeline. + + Notes + ----- + The requested pipeline is checked against a set of supported + :class:`pipeline schemas `. The first + schema that successfully validates is assumed to be the correct one and is attached + to the ``Config`` object and its :meth:`~easylink.pipeline_schema.PipelineSchema.configure_pipeline` + method is called. """ def __init__( self, config_params: dict[str, Any], potential_schemas: list[PipelineSchema] | PipelineSchema = PIPELINE_SCHEMAS, - ): + ) -> None: + """Initializes an instance of the ``Config`` class. + + Parameters + ---------- + config_params + A dictionary of all specifications required to run the pipeline. This + includes the pipeline, input data, and computing environment specifications, + as well as the results directory. + potential_schemas + A list of potential schemas to validate the pipeline configuration against. + This is primarily used for testing purposes. Defaults to the supported schemas. + """ super().__init__(layers=["initial_data", "default", "user_configured"]) self.update(DEFAULT_ENVIRONMENT, layer="default") self.update(config_params, layer="user_configured") @@ -67,13 +120,13 @@ def __init__( self.freeze() @property - def computing_environment(self) -> dict[str, Any]: - """The computing environment to run on (generally either 'local' or 'slurm').""" + def computing_environment(self) -> str: + """The computing environment to run on ('local' or 'slurm').""" return self.environment.computing_environment @property def slurm(self) -> dict[str, Any]: - """A dictionary of slurm configuration settings.""" + """A dictionary of slurm-specific configuration settings.""" if not self.environment.computing_environment == "slurm": return {} else: @@ -81,12 +134,12 @@ def slurm(self) -> dict[str, Any]: @property def spark(self) -> dict[str, Any]: - """A dictionary of spark configuration settings.""" + """A dictionary of spark-specific configuration settings.""" return self.environment.spark.to_dict() @property def slurm_resources(self) -> dict[str, str]: - """A flat dictionary of the slurm resources.""" + """A flat dictionary of slurm resource requests.""" if not self.computing_environment == "slurm": return {} raw_slurm_resources = { @@ -103,7 +156,7 @@ def slurm_resources(self) -> dict[str, str]: @property def spark_resources(self) -> dict[str, Any]: - """A flat dictionary of the spark resources.""" + """A flat dictionary of spark resource requests.""" spark_workers_raw = self.spark["workers"] spark_workers = { "num_workers": spark_workers_raw.get("num_workers"), @@ -125,14 +178,26 @@ def spark_resources(self) -> dict[str, Any]: ################# def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema: - """Validates the requested pipeline against supported schemas. + """Returns the first pipeline schema that successfully validates the requested pipeline. + + Parameters + ---------- + potential_schemas + Pipeline schemas to validate the pipeline configuration against. + + Returns + ------- + The first pipeline schema that successfully validates the requested pipeline. + If no validated pipeline schema is found, `exit()` is called with `errno.EINVAL` + and any validation errors are logged. Notes ----- This acts as the pipeline configuration file's validation method since - we can only find a matching schema if the file is valid. + we can only find a matching schema if that file is valid. - We use the first schema that validates the pipeline configuration. + This method returns the first schema that successfully validates and does + not attempt to validate additional ones. """ errors = defaultdict(dict) # Try each schema until one is validated @@ -147,6 +212,18 @@ def _get_schema(self, potential_schemas: list[PipelineSchema]) -> PipelineSchema exit_with_validation_error(dict(errors)) def _validate(self) -> None: + """Validates the ``Config``. + + Raises + ------ + SystemExit + If any errors are found, they are batch-logged into a dictionary and + the program exits with a non-zero code. + + Notes + ----- + Pipeline validations are handled in :meth:`~easylink.configuration.Config._get_schema`. + """ # TODO [MIC-4880]: refactor into validation object errors = { # NOTE: pipeline configuration validation happens in '_get_schema()' @@ -157,6 +234,12 @@ def _validate(self) -> None: exit_with_validation_error(errors) def _validate_input_data(self) -> dict[Any, Any]: + """Validates the input data configuration. + + Returns + ------- + A dictionary of input data configuration validation errors. + """ errors = defaultdict(dict) input_data_dict = self.input_data.to_dict() if not input_data_dict: @@ -168,6 +251,12 @@ def _validate_input_data(self) -> dict[Any, Any]: return errors def _validate_environment(self) -> dict[Any, Any]: + """Validates the environment configuration. + + Returns + ------- + A dictionary of environment configuration validation errors. + """ errors = defaultdict(dict) if not self.environment.container_engine in ["docker", "singularity", "undefined"]: errors[ENVIRONMENT_ERRORS_KEY]["container_engine"] = [ @@ -189,11 +278,26 @@ def load_params_from_specification( computing_environment: str | None, results_dir: str, ) -> dict[str, Any]: - """Gather together all specification data. + """Gathers together all specification data. This gathers the pipeline, input data, and computing environment specifications as well as the results directory into a single dictionary for insertion into - the Config object. + the ``Config`` object. + + Parameters + ---------- + pipeline_specification + The path to the pipeline specification yaml file. + input_data + The path to the input data yaml file. + computing_environment + The path to the computing environment yaml file. + results_dir + The path to the results directory. + + Returns + ------- + A dictionary of all provided specification data. """ return { "pipeline": load_yaml(pipeline_specification), @@ -206,7 +310,7 @@ def load_params_from_specification( def _load_input_data_paths( input_data_specification_path: str | Path, ) -> dict[str, list[Path]]: - """Create dictionary of input data paths from the input data yaml file.""" + """Creates a dictionary of input data paths from the input data yaml file.""" input_data_paths = load_yaml(input_data_specification_path) if not isinstance(input_data_paths, dict): raise TypeError( @@ -222,7 +326,7 @@ def _load_input_data_paths( def _load_computing_environment( computing_environment_specification_path: str | None, ) -> dict[Any, Any]: - """Load the computing environment yaml file and return the contents as a dict.""" + """Loads the computing environment yaml file and returns the contents as a dict.""" if not computing_environment_specification_path: return {} # handles empty environment.yaml elif not Path(computing_environment_specification_path).is_file(): diff --git a/src/easylink/pipeline_graph.py b/src/easylink/pipeline_graph.py index d66e694c..e29fdb5d 100644 --- a/src/easylink/pipeline_graph.py +++ b/src/easylink/pipeline_graph.py @@ -106,7 +106,7 @@ def _get_combined_slots_and_edges( Returns ------- - The set of InputSlots, OutputSlots, and EdgeParams needed to construct the combined implementation + The set of InputSlots, OutputSlots, and EdgeParams needed to construct the combined implementation """ slot_types = ["input_slot", "output_slot"] combined_slots_by_type = combined_input_slots, combined_output_slots = set(), set() @@ -154,7 +154,7 @@ def _get_edges_by_slot( Returns ------- - A tuple of dictionaries keyed by slot, with values for edges corresponding to that slot. + A tuple of dictionaries keyed by slot, with values for edges corresponding to that slot. """ in_edges_by_slot = defaultdict(list) @@ -196,7 +196,7 @@ def _get_duplicate_slots( Returns ------- - A set of (step_name, slot) tuples that have duplicate names or environment variables. + A set of (step_name, slot) tuples that have duplicate names or environment variables. """ name_freq = Counter([slot.name for step_name, slot in slot_tuples]) duplicate_names = [name for name, count in name_freq.items() if count > 1] diff --git a/src/easylink/utilities/data_utils.py b/src/easylink/utilities/data_utils.py index 23de922b..6df3f30b 100644 --- a/src/easylink/utilities/data_utils.py +++ b/src/easylink/utilities/data_utils.py @@ -42,9 +42,9 @@ def copy_configuration_files_to_results_directory( shutil.copy(computing_environment, results_dir) -def get_results_directory(output_dir: str | None, timestamp: bool) -> Path: +def get_results_directory(output_dir: str | None, no_timestamp: bool) -> Path: results_dir = Path("results" if output_dir is None else output_dir).resolve() - if timestamp: + if not no_timestamp: launch_time = _get_timestamp() results_dir = results_dir / launch_time return results_dir diff --git a/src/easylink/utilities/general_utils.py b/src/easylink/utilities/general_utils.py index e4486b38..f5c033f2 100644 --- a/src/easylink/utilities/general_utils.py +++ b/src/easylink/utilities/general_utils.py @@ -87,13 +87,23 @@ def _add_logging_sink( def exit_with_validation_error(error_msg: dict) -> None: - """Exits the program with a validation error. + """Logs error messages and exits the program. + + This function logs the provided validation error messages using a structured + YAML format and terminates the program execution with a non-zero exit code + (indicating an error). Parameters ---------- error_msg The error message to print to the user. + Raises + ------ + SystemExit + Exits the program with an EINVAL (invalid argument) code due to + previously-determined validation errors. + """ logger.error( @@ -103,7 +113,7 @@ def exit_with_validation_error(error_msg: dict) -> None: "\nValidation errors found. Please see above." "\n==========================================\n" ) - exit(errno.EINVAL) + sys.exit(errno.EINVAL) def is_on_slurm() -> bool: diff --git a/tests/unit/test_data_utils.py b/tests/unit/test_data_utils.py index eae9d354..13bed35c 100644 --- a/tests/unit/test_data_utils.py +++ b/tests/unit/test_data_utils.py @@ -22,17 +22,17 @@ def test_create_results_directory(test_dir): @pytest.mark.parametrize( - "output_dir_provided, timestamp", + "output_dir_provided, no_timestamp", [ - (False, False), (False, True), - (True, False), + (False, False), (True, True), + (True, False), ], ) -def test_get_results_directory(test_dir, output_dir_provided, timestamp, mocker): +def test_get_results_directory(test_dir, output_dir_provided, no_timestamp, mocker): """Tests expected behavior. If directory is provided then a "results/" folder - is created at the working directory. If timestamp is True, then a timestamped + is created at the working directory. If no_timestamp is False, then a timestamped directory is created within the results directory. """ if output_dir_provided: @@ -44,12 +44,12 @@ def test_get_results_directory(test_dir, output_dir_provided, timestamp, mocker) mocker.patch( "easylink.utilities.data_utils._get_timestamp", return_value="2024_01_01_00_00_00" ) - results_dir = get_results_directory(output_dir, timestamp) + results_dir = get_results_directory(output_dir, no_timestamp) expected_results_dir = Path(test_dir) if not output_dir_provided: expected_results_dir = expected_results_dir / "results" - if timestamp: + if not no_timestamp: expected_results_dir = expected_results_dir / "2024_01_01_00_00_00" assert expected_results_dir == results_dir