diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dbt_artifacts.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dbt_artifacts.py new file mode 100644 index 0000000000000..b1997e2fbb1b2 --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dbt_artifacts.py @@ -0,0 +1,127 @@ +import os +import shutil +from functools import cached_property +from pathlib import Path +from typing import List, Optional, Union + +from .core.resources_v2 import DbtCliResource +from .errors import DagsterDbtManifestNotPreparedError + + +def _should_compile() -> bool: + return ( + # if launched via `dagster dev` cli + bool(os.getenv("DAGSTER_IS_DEV_CLI")) + or + # or if explicitly opted in + bool(os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD")) + ) + + +class DbtArtifacts: + def __init__( + self, + project_dir: Union[Path, str], + *, + target_folder: str = "target", + prepare_command: List[str] = ["parse", "--quiet"], + package_data_dir: Optional[Union[Path, str]] = None, + ): + """A utility class to help manage dbt artifacts in different deployment contexts. + + This class provides a setup to solve for these goals: + * During development, reload the manifest during start-up to pick up any changes. + * When deploying, use a compiled manifest to reduce start-up time. + * Handle the scenario when the dbt project is copied in to a directory for packaging. + + Args: + project_dir (Union[str, Path]): + The directory of the dbt project. + target_folder (Union[str, Path]): + The folder in the project project directory to output artifacts. + Default: "target" + prepare_command: The dbt cli command to run to prepare the manifest.json + Default: ["parse", "--quiet"] + + """ + self.project_dir = project_dir if isinstance(project_dir, Path) else Path(project_dir) + self._target_folder = ( + target_folder if isinstance(target_folder, Path) else Path(target_folder) + ) + self._prepare_command = prepare_command + + if package_data_dir is None: + self._package_data_dir = None + elif isinstance(package_data_dir, Path): + self._package_data_dir = package_data_dir + else: + self._package_data_dir = Path(package_data_dir) + + @cached_property # to prevent repeat compilation + def manifest_path(self) -> Path: + """The path to the manifest.json, compiling the manifest first if in development or + ensuring it already exists if not. + """ + if _should_compile(): + return self._prepare_manifest() + + p = self._base_dir / self._target_folder / "manifest.json" + if not p.exists(): + raise DagsterDbtManifestNotPreparedError( + f"Did not find prepared manifest.json at expected path {p}.\n" + "If this is an environment that is expected to prepare the manifest on load, " + "set the environment variable DAGSTER_DBT_PARSE_PROJECT_ON_LOAD." + ) + return p + + def get_cli_resource(self, **kwargs) -> DbtCliResource: + if _should_compile(): + return DbtCliResource(project_dir=os.fspath(self.project_dir)) + + return DbtCliResource( + project_dir=os.fspath(self._base_dir), + **kwargs, + ) + + @property + def _base_dir(self) -> Path: + return self._package_data_dir if self._package_data_dir else self.project_dir + + def _prepare_manifest(self) -> Path: + return ( + DbtCliResource(project_dir=os.fspath(self.project_dir)) + .cli( + self._prepare_command, + target_path=self._target_folder, + ) + .wait() + .target_path.joinpath("manifest.json") + ) + + def _handle_package_data(self) -> None: + if self._package_data_dir is None: + return + + if self._package_data_dir.exists(): + shutil.rmtree(self._package_data_dir) + + # Determine if the package data dir is within the project dir, and ignore + # that path if so. + rel_path = Path(os.path.relpath(self._package_data_dir, self.project_dir)) + rel_ignore = "" + if len(rel_path.parts) > 0 and rel_path.parts[0] != "..": + rel_ignore = rel_path.parts[0] + + shutil.copytree( + src=self.project_dir, + dst=self._package_data_dir, + ignore=shutil.ignore_patterns( + "*.git*", + "*partial_parse.msgpack", + rel_ignore, + ), + ) + + def prepare(self) -> None: + self._prepare_manifest() + self._handle_package_data() diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/errors.py b/python_modules/libraries/dagster-dbt/dagster_dbt/errors.py index 8d348c2f6d834..a2a78165438c2 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/errors.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/errors.py @@ -96,3 +96,7 @@ def __init__(self, path: str): class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError, DagsterInvariantViolationError): """Represents an error when a dbt Cloud job is not supported by the ``dagster-dbt`` library.""" + + +class DagsterDbtManifestNotPreparedError(DagsterDbtError): + """Error when we expect manifest.json to be compiled already but it is absent.""" diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_artifacts.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_artifacts.py new file mode 100644 index 0000000000000..0b8fc95a60761 --- /dev/null +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_artifacts.py @@ -0,0 +1,32 @@ +import pytest +from dagster._core.test_utils import environ +from dagster._utils.test import copy_directory +from dagster_dbt.dbt_artifacts import DbtArtifacts +from dagster_dbt.errors import DagsterDbtManifestNotPreparedError + +from ..dbt_projects import test_jaffle_shop_path + + +def test_expected_but_missing() -> None: + with copy_directory(test_jaffle_shop_path) as project_dir: + dbt_artifacts = _ = DbtArtifacts(project_dir=project_dir) + + with pytest.raises(DagsterDbtManifestNotPreparedError): + _ = dbt_artifacts.manifest_path + + dbt_artifacts._prepare_manifest() + assert dbt_artifacts.manifest_path.exists() + + +def test_dagster_dev() -> None: + with environ({"DAGSTER_IS_DEV_CLI": "1"}), copy_directory(test_jaffle_shop_path) as project_dir: + dbt_artifacts = _ = DbtArtifacts(project_dir=project_dir) + assert dbt_artifacts.manifest_path.exists() + + +def test_opt_in_env_Var() -> None: + with environ({"DAGSTER_DBT_PARSE_PROJECT_ON_LOAD": "1"}), copy_directory( + test_jaffle_shop_path + ) as project_dir: + dbt_artifacts = _ = DbtArtifacts(project_dir=project_dir) + assert dbt_artifacts.manifest_path.exists()