Skip to content

Commit

Permalink
[dbt] utility class for managing artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Feb 20, 2024
1 parent aa1d0b0 commit 3c00ba0
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
127 changes: 127 additions & 0 deletions python_modules/libraries/dagster-dbt/dagster_dbt/dbt_artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import os
import shutil
from functools import cached_property
from pathlib import Path
from typing import List, Optional, Union

from .core.resources_v2 import DbtCliResource
from .errors import DagsterDbtManifestNotPreparedError


def _should_compile() -> bool:
return (
# if launched via `dagster dev` cli
bool(os.getenv("DAGSTER_IS_DEV_CLI"))
or
# or if explicitly opted in
bool(os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"))
)


class DbtArtifacts:
def __init__(
self,
project_dir: Union[Path, str],
*,
target_folder: str = "target",
prepare_command: List[str] = ["parse", "--quiet"],
package_data_dir: Optional[Union[Path, str]] = None,
):
"""A utility class to help manage dbt artifacts in different deployment contexts.
This class provides a setup to solve for these goals:
* During development, reload the manifest during start-up to pick up any changes.
* When deploying, use a compiled manifest to reduce start-up time.
* Handle the scenario when the dbt project is copied in to a directory for packaging.
Args:
project_dir (Union[str, Path]):
The directory of the dbt project.
target_folder (Union[str, Path]):
The folder in the project project directory to output artifacts.
Default: "target"
prepare_command: The dbt cli command to run to prepare the manifest.json
Default: ["parse", "--quiet"]
"""
self.project_dir = project_dir if isinstance(project_dir, Path) else Path(project_dir)
self._target_folder = (
target_folder if isinstance(target_folder, Path) else Path(target_folder)
)
self._prepare_command = prepare_command

if package_data_dir is None:
self._package_data_dir = None
elif isinstance(package_data_dir, Path):
self._package_data_dir = package_data_dir
else:
self._package_data_dir = Path(package_data_dir)

@cached_property # to prevent repeat compilation
def manifest_path(self) -> Path:
"""The path to the manifest.json, compiling the manifest first if in development or
ensuring it already exists if not.
"""
if _should_compile():
return self._prepare_manifest()

p = self._base_dir / self._target_folder / "manifest.json"
if not p.exists():
raise DagsterDbtManifestNotPreparedError(
f"Did not find prepared manifest.json at expected path {p}.\n"
"If this is an environment that is expected to prepare the manifest on load, "
"set the environment variable DAGSTER_DBT_PARSE_PROJECT_ON_LOAD."
)
return p

def get_cli_resource(self, **kwargs) -> DbtCliResource:
if _should_compile():
return DbtCliResource(project_dir=os.fspath(self.project_dir))

return DbtCliResource(
project_dir=os.fspath(self._base_dir),
**kwargs,
)

@property
def _base_dir(self) -> Path:
return self._package_data_dir if self._package_data_dir else self.project_dir

def _prepare_manifest(self) -> Path:
return (
DbtCliResource(project_dir=os.fspath(self.project_dir))
.cli(
self._prepare_command,
target_path=self._target_folder,
)
.wait()
.target_path.joinpath("manifest.json")
)

def _handle_package_data(self) -> None:
if self._package_data_dir is None:
return

if self._package_data_dir.exists():
shutil.rmtree(self._package_data_dir)

# Determine if the package data dir is within the project dir, and ignore
# that path if so.
rel_path = Path(os.path.relpath(self._package_data_dir, self.project_dir))
rel_ignore = ""
if len(rel_path.parts) > 0 and rel_path.parts[0] != "..":
rel_ignore = rel_path.parts[0]

shutil.copytree(
src=self.project_dir,
dst=self._package_data_dir,
ignore=shutil.ignore_patterns(
"*.git*",
"*partial_parse.msgpack",
rel_ignore,
),
)

def prepare(self) -> None:
self._prepare_manifest()
self._handle_package_data()
4 changes: 4 additions & 0 deletions python_modules/libraries/dagster-dbt/dagster_dbt/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ def __init__(self, path: str):

class DagsterDbtCloudJobInvariantViolationError(DagsterDbtError, DagsterInvariantViolationError):
"""Represents an error when a dbt Cloud job is not supported by the ``dagster-dbt`` library."""


class DagsterDbtManifestNotPreparedError(DagsterDbtError):
"""Error when we expect manifest.json to be compiled already but it is absent."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest
from dagster._core.test_utils import environ
from dagster._utils.test import copy_directory
from dagster_dbt.dbt_artifacts import DbtArtifacts
from dagster_dbt.errors import DagsterDbtManifestNotPreparedError

from ..dbt_projects import test_jaffle_shop_path


def test_expected_but_missing() -> None:
with copy_directory(test_jaffle_shop_path) as project_dir:
dbt_artifacts = _ = DbtArtifacts(project_dir=project_dir)

with pytest.raises(DagsterDbtManifestNotPreparedError):
_ = dbt_artifacts.manifest_path

dbt_artifacts._prepare_manifest()
assert dbt_artifacts.manifest_path.exists()


def test_dagster_dev() -> None:
with environ({"DAGSTER_IS_DEV_CLI": "1"}), copy_directory(test_jaffle_shop_path) as project_dir:
dbt_artifacts = _ = DbtArtifacts(project_dir=project_dir)
assert dbt_artifacts.manifest_path.exists()


def test_opt_in_env_Var() -> None:
with environ({"DAGSTER_DBT_PARSE_PROJECT_ON_LOAD": "1"}), copy_directory(
test_jaffle_shop_path
) as project_dir:
dbt_artifacts = _ = DbtArtifacts(project_dir=project_dir)
assert dbt_artifacts.manifest_path.exists()

0 comments on commit 3c00ba0

Please sign in to comment.