Skip to content

Commit

Permalink
[dagster-dbt][DA] Add get_automation_condition method to DagsterDbtTr…
Browse files Browse the repository at this point in the history
…anslator
  • Loading branch information
OwenKephart committed Aug 2, 2024
1 parent 34e898b commit 3044511
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 45 deletions.
33 changes: 7 additions & 26 deletions docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -568,42 +568,23 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

### Customizing auto-materialize policies
### Customizing AutomationConditions

For dbt models, seeds, and snapshots, the default <PyObject object="AutoMaterializePolicy"/> will be `None`.
To override the <PyObject object="AutomationCondition" /> generated for each dbt node in your dbt project, you can create a custom <PyObject module="dagster_dbt" object="DagsterDbtTranslator" /> and implement <PyObject module="dagster_dbt" object="DagsterDbtTranslator" method="get_automation_condition"/>. The following example defines <PyObject object="AutomationCondition" method="eager" /> as the condition for all dbt nodes:

There are two ways to customize the auto-materialize policies generated by Dagster for dbt assets:

1. Defining [meta config](https://docs.getdbt.com/reference/resource-configs/meta) on your dbt node, or
2. Overriding Dagster's auto-materialize policy generation by implementing a custom <PyObject module="dagster_dbt" object="DagsterDbtTranslator" />.

To add an `AutoMaterializePolicy` to a dbt node, you can define a `meta` key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. This policy may be one of two types, `eager` or `lazy`. The following example provides an eager `AutoMaterializePolicy` for the following model:

```yaml
models:
- name: customers
config:
meta:
dagster:
auto_materialize_policy:
type: eager
```

Alternatively, to override the Dagster auto-materialize policy generation for all dbt nodes in your dbt project, you can create a custom <PyObject module="dagster_dbt" object="DagsterDbtTranslator" /> and implement <PyObject module="dagster_dbt" object="DagsterDbtTranslator" method="get_auto_materialize_policy"/>. The following example defines <PyObject object="AutoMaterializePolicy" method="eager" /> as the auto-materialize policy for all dbt nodes:

```python startafter=start_custom_auto_materialize_policy_dagster_dbt_translator endbefore=end_custom_auto_materialize_policy_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4
```python startafter=start_custom_automation_condition_dagster_dbt_translator endbefore=end_custom_automation_condition_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4
from pathlib import Path
from dagster import AssetExecutionContext, AutoMaterializePolicy
from dagster import AssetExecutionContext, AutomationCondition
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets
from typing import Any, Mapping, Optional
my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project"))
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_auto_materialize_policy(
def get_automation_condition(
self, dbt_resource_props: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager()
) -> Optional[AutomationCondition]:
return AutomationCondition.eager()
@dbt_assets(
manifest=my_dbt_project.manifest_path,
Expand Down
14 changes: 7 additions & 7 deletions examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,20 +370,20 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
# end_custom_tags_dagster_dbt_translator


def scope_custom_auto_materialize_policy_dagster_dbt_translator():
# start_custom_auto_materialize_policy_dagster_dbt_translator
def scope_custom_automation_condition_dagster_dbt_translator():
# start_custom_automation_condition_dagster_dbt_translator
from pathlib import Path
from dagster import AssetExecutionContext, AutoMaterializePolicy
from dagster import AssetExecutionContext, AutomationCondition
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets
from typing import Any, Mapping, Optional

my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project"))

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_auto_materialize_policy(
def get_automation_condition(
self, dbt_resource_props: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager()
) -> Optional[AutomationCondition]:
return AutomationCondition.eager()

@dbt_assets(
manifest=my_dbt_project.manifest_path,
Expand All @@ -392,7 +392,7 @@ def get_auto_materialize_policy(
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

# end_custom_auto_materialize_policy_dagster_dbt_translator
# end_custom_automation_condition_dagster_dbt_translator


def scope_disable_asset_check_dagster_dbt_translator():
Expand Down
17 changes: 8 additions & 9 deletions python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
AssetsDefinition,
AssetSelection,
AutoMaterializePolicy,
AutomationCondition,
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
DefaultScheduleStatus,
Expand Down Expand Up @@ -825,7 +826,7 @@ def build_dbt_multi_asset_args(
group_name=dagster_dbt_translator.get_group_name(dbt_resource_props),
code_version=default_code_version_fn(dbt_resource_props),
freshness_policy=dagster_dbt_translator.get_freshness_policy(dbt_resource_props),
auto_materialize_policy=dagster_dbt_translator.get_auto_materialize_policy(
automation_condition=dagster_dbt_translator.get_automation_condition(
dbt_resource_props
),
)
Expand Down Expand Up @@ -937,7 +938,7 @@ def get_asset_deps(
Dict[AssetKey, Tuple[str, Out]],
Dict[AssetKey, str],
Dict[AssetKey, FreshnessPolicy],
Dict[AssetKey, AutoMaterializePolicy],
Dict[AssetKey, AutomationCondition],
Dict[str, AssetCheckSpec],
Dict[str, List[str]],
Dict[str, Dict[str, Any]],
Expand All @@ -954,7 +955,7 @@ def get_asset_deps(
# metadata that we need to store for reference.
group_names_by_key: Dict[AssetKey, str] = {}
freshness_policies_by_key: Dict[AssetKey, FreshnessPolicy] = {}
auto_materialize_policies_by_key: Dict[AssetKey, AutoMaterializePolicy] = {}
automation_conditions_by_key: Dict[AssetKey, AutomationCondition] = {}
check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {}
fqns_by_output_name: Dict[str, List[str]] = {}
metadata_by_output_name: Dict[str, Dict[str, Any]] = {}
Expand Down Expand Up @@ -1002,11 +1003,9 @@ def get_asset_deps(
if freshness_policy is not None:
freshness_policies_by_key[asset_key] = freshness_policy

auto_materialize_policy = dagster_dbt_translator.get_auto_materialize_policy(
dbt_resource_props
)
if auto_materialize_policy is not None:
auto_materialize_policies_by_key[asset_key] = auto_materialize_policy
automation_condition = dagster_dbt_translator.get_automation_condition(dbt_resource_props)
if automation_condition is not None:
automation_conditions_by_key[asset_key] = automation_condition

test_unique_ids = []
if manifest:
Expand Down Expand Up @@ -1051,7 +1050,7 @@ def get_asset_deps(
asset_outs,
group_names_by_key,
freshness_policies_by_key,
auto_materialize_policies_by_key,
automation_conditions_by_key,
check_specs_by_output_name,
fqns_by_output_name,
metadata_by_output_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def get_auto_materialize_policy(cls, dbt_resource_props):
asset_outs,
group_names_by_key,
freshness_policies_by_key,
auto_materialize_policies_by_key,
automation_conditions_by_key,
_,
fqns_by_output_name,
metadata_by_output_name,
Expand Down Expand Up @@ -410,8 +410,8 @@ def get_auto_materialize_policy(cls, dbt_resource_props):
for asset_key, freshness_policy in freshness_policies_by_key.items()
},
auto_materialize_policies_by_output_name={
asset_outs[asset_key][0]: auto_materialize_policy
for asset_key, auto_materialize_policy in auto_materialize_policies_by_key.items()
asset_outs[asset_key][0]: automation_condition.as_auto_materialize_policy()
for asset_key, automation_condition in automation_conditions_by_key.items()
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dagster import (
AssetKey,
AutoMaterializePolicy,
AutomationCondition,
FreshnessPolicy,
PartitionMapping,
_check as check,
Expand Down Expand Up @@ -428,6 +429,64 @@ def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) ->
"""
return default_auto_materialize_policy_fn(dbt_resource_props)

@public
@experimental(emit_runtime_warning=False)
def get_automation_condition(
self, dbt_resource_props: Mapping[str, Any]
) -> Optional[AutomationCondition]:
"""A function that takes a dictionary representing properties of a dbt resource, and
returns the Dagster :py:class:`dagster.AutoMaterializePolicy` for that resource.
Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents
a model, seed, snapshot or source in a given dbt project. You can learn more about dbt
resources and the properties available in this dictionary here:
https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details
This method can be overridden to provide a custom AutomationCondition for a dbt resource.
Args:
dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
Returns:
Optional[AutoMaterializePolicy]: A Dagster auto-materialize policy.
Examples:
Set a custom AutomationCondition for all dbt resources:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
return AutomationCondition.eager()
Set a custom AutomationCondition for dbt resources with a specific tag:
.. code-block:: python
from typing import Any, Mapping
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]:
automation_condition = None
if "my_custom_tag" in dbt_resource_props.get("tags", []):
automation_condition = AutomationCondition.eager()
return automation_condition
"""
auto_materialize_policy = self.get_auto_materialize_policy(dbt_resource_props)
return (
auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None
)


@dataclass
class DbtManifestWrapper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster import (
AssetKey,
AutoMaterializePolicy,
AutomationCondition,
BackfillPolicy,
DagsterInvalidDefinitionError,
DailyPartitionsDefinition,
Expand Down Expand Up @@ -740,6 +741,33 @@ def my_dbt_assets(): ...
)


def test_with_automation_condition_replacements(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
expected_automation_condition = AutomationCondition.eager()

class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_automation_condition(self, _: Mapping[str, Any]) -> Optional[AutomationCondition]:
return expected_automation_condition

expected_specs_by_key = {
spec.key: spec
for spec in build_dbt_asset_specs(
manifest=test_jaffle_shop_manifest,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
}

@dbt_assets(
manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator()
)
def my_dbt_assets(): ...

for asset_key, automation_condition in my_dbt_assets.automation_conditions_by_key.items():
assert automation_condition == expected_automation_condition
assert (
expected_specs_by_key[asset_key].automation_condition == expected_automation_condition
)


def test_dbt_meta_auto_materialize_policy(test_meta_config_manifest: Dict[str, Any]) -> None:
expected_auto_materialize_policy = AutoMaterializePolicy.eager()
expected_specs_by_key = {
Expand Down

0 comments on commit 3044511

Please sign in to comment.