From 3044511c7c88bbdcf89d7671a252e7ee69df3897 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Fri, 2 Aug 2024 13:50:28 -0700 Subject: [PATCH] [dagster-dbt][DA] Add get_automation_condition method to DagsterDbtTranslator --- docs/content/integrations/dbt/reference.mdx | 33 +++-------- .../docs_snippets/integrations/dbt/dbt.py | 14 ++--- .../dagster-dbt/dagster_dbt/asset_utils.py | 17 +++--- .../dagster_dbt/cloud/asset_defs.py | 6 +- .../dagster_dbt/dagster_dbt_translator.py | 59 +++++++++++++++++++ .../core/test_asset_decorator.py | 28 +++++++++ 6 files changed, 112 insertions(+), 45 deletions(-) diff --git a/docs/content/integrations/dbt/reference.mdx b/docs/content/integrations/dbt/reference.mdx index bb970fa0bb8e0..46d58f16cdd80 100644 --- a/docs/content/integrations/dbt/reference.mdx +++ b/docs/content/integrations/dbt/reference.mdx @@ -568,42 +568,23 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() ``` -### Customizing auto-materialize policies +### Customizing AutomationConditions -For dbt models, seeds, and snapshots, the default will be `None`. +To override the generated for each dbt node in your dbt project, you can create a custom and implement . The following example defines as the condition for all dbt nodes: -There are two ways to customize the auto-materialize policies generated by Dagster for dbt assets: - -1. Defining [meta config](https://docs.getdbt.com/reference/resource-configs/meta) on your dbt node, or -2. Overriding Dagster's auto-materialize policy generation by implementing a custom . - -To add an `AutoMaterializePolicy` to a dbt node, you can define a `meta` key in your dbt project file, on your dbt node's property file, or on the node's in-file config block. This policy may be one of two types, `eager` or `lazy`. The following example provides an eager `AutoMaterializePolicy` for the following model: - -```yaml -models: - - name: customers - config: - meta: - dagster: - auto_materialize_policy: - type: eager -``` - -Alternatively, to override the Dagster auto-materialize policy generation for all dbt nodes in your dbt project, you can create a custom and implement . The following example defines as the auto-materialize policy for all dbt nodes: - -```python startafter=start_custom_auto_materialize_policy_dagster_dbt_translator endbefore=end_custom_auto_materialize_policy_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 +```python startafter=start_custom_automation_condition_dagster_dbt_translator endbefore=end_custom_automation_condition_dagster_dbt_translator file=/integrations/dbt/dbt.py dedent=4 from pathlib import Path -from dagster import AssetExecutionContext, AutoMaterializePolicy +from dagster import AssetExecutionContext, AutomationCondition from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets from typing import Any, Mapping, Optional my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) class CustomDagsterDbtTranslator(DagsterDbtTranslator): - def get_auto_materialize_policy( + def get_automation_condition( self, dbt_resource_props: Mapping[str, Any] - ) -> Optional[AutoMaterializePolicy]: - return AutoMaterializePolicy.eager() + ) -> Optional[AutomationCondition]: + return AutomationCondition.eager() @dbt_assets( manifest=my_dbt_project.manifest_path, diff --git a/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py b/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py index 25ceee962cb63..7b3021e7093be 100644 --- a/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py +++ b/examples/docs_snippets/docs_snippets/integrations/dbt/dbt.py @@ -370,20 +370,20 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): # end_custom_tags_dagster_dbt_translator -def scope_custom_auto_materialize_policy_dagster_dbt_translator(): - # start_custom_auto_materialize_policy_dagster_dbt_translator +def scope_custom_automation_condition_dagster_dbt_translator(): + # start_custom_automation_condition_dagster_dbt_translator from pathlib import Path - from dagster import AssetExecutionContext, AutoMaterializePolicy + from dagster import AssetExecutionContext, AutomationCondition from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets from typing import Any, Mapping, Optional my_dbt_project = DbtProject(project_dir=Path("path/to/dbt_project")) class CustomDagsterDbtTranslator(DagsterDbtTranslator): - def get_auto_materialize_policy( + def get_automation_condition( self, dbt_resource_props: Mapping[str, Any] - ) -> Optional[AutoMaterializePolicy]: - return AutoMaterializePolicy.eager() + ) -> Optional[AutomationCondition]: + return AutomationCondition.eager() @dbt_assets( manifest=my_dbt_project.manifest_path, @@ -392,7 +392,7 @@ def get_auto_materialize_policy( def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() - # end_custom_auto_materialize_policy_dagster_dbt_translator + # end_custom_automation_condition_dagster_dbt_translator def scope_disable_asset_check_dagster_dbt_translator(): diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 0c50a8e7ef683..11fc5c39f0705 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -26,6 +26,7 @@ AssetsDefinition, AssetSelection, AutoMaterializePolicy, + AutomationCondition, DagsterInvalidDefinitionError, DagsterInvariantViolationError, DefaultScheduleStatus, @@ -825,7 +826,7 @@ def build_dbt_multi_asset_args( group_name=dagster_dbt_translator.get_group_name(dbt_resource_props), code_version=default_code_version_fn(dbt_resource_props), freshness_policy=dagster_dbt_translator.get_freshness_policy(dbt_resource_props), - auto_materialize_policy=dagster_dbt_translator.get_auto_materialize_policy( + automation_condition=dagster_dbt_translator.get_automation_condition( dbt_resource_props ), ) @@ -937,7 +938,7 @@ def get_asset_deps( Dict[AssetKey, Tuple[str, Out]], Dict[AssetKey, str], Dict[AssetKey, FreshnessPolicy], - Dict[AssetKey, AutoMaterializePolicy], + Dict[AssetKey, AutomationCondition], Dict[str, AssetCheckSpec], Dict[str, List[str]], Dict[str, Dict[str, Any]], @@ -954,7 +955,7 @@ def get_asset_deps( # metadata that we need to store for reference. group_names_by_key: Dict[AssetKey, str] = {} freshness_policies_by_key: Dict[AssetKey, FreshnessPolicy] = {} - auto_materialize_policies_by_key: Dict[AssetKey, AutoMaterializePolicy] = {} + automation_conditions_by_key: Dict[AssetKey, AutomationCondition] = {} check_specs_by_key: Dict[AssetCheckKey, AssetCheckSpec] = {} fqns_by_output_name: Dict[str, List[str]] = {} metadata_by_output_name: Dict[str, Dict[str, Any]] = {} @@ -1002,11 +1003,9 @@ def get_asset_deps( if freshness_policy is not None: freshness_policies_by_key[asset_key] = freshness_policy - auto_materialize_policy = dagster_dbt_translator.get_auto_materialize_policy( - dbt_resource_props - ) - if auto_materialize_policy is not None: - auto_materialize_policies_by_key[asset_key] = auto_materialize_policy + automation_condition = dagster_dbt_translator.get_automation_condition(dbt_resource_props) + if automation_condition is not None: + automation_conditions_by_key[asset_key] = automation_condition test_unique_ids = [] if manifest: @@ -1051,7 +1050,7 @@ def get_asset_deps( asset_outs, group_names_by_key, freshness_policies_by_key, - auto_materialize_policies_by_key, + automation_conditions_by_key, check_specs_by_output_name, fqns_by_output_name, metadata_by_output_name, diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py index cca0a94ec3ff6..0b887c39ce0de 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/cloud/asset_defs.py @@ -361,7 +361,7 @@ def get_auto_materialize_policy(cls, dbt_resource_props): asset_outs, group_names_by_key, freshness_policies_by_key, - auto_materialize_policies_by_key, + automation_conditions_by_key, _, fqns_by_output_name, metadata_by_output_name, @@ -410,8 +410,8 @@ def get_auto_materialize_policy(cls, dbt_resource_props): for asset_key, freshness_policy in freshness_policies_by_key.items() }, auto_materialize_policies_by_output_name={ - asset_outs[asset_key][0]: auto_materialize_policy - for asset_key, auto_materialize_policy in auto_materialize_policies_by_key.items() + asset_outs[asset_key][0]: automation_condition.as_auto_materialize_policy() + for asset_key, automation_condition in automation_conditions_by_key.items() }, ) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py index 0689ba0d9b39b..0496cd40a38df 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py @@ -4,6 +4,7 @@ from dagster import ( AssetKey, AutoMaterializePolicy, + AutomationCondition, FreshnessPolicy, PartitionMapping, _check as check, @@ -428,6 +429,64 @@ def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> """ return default_auto_materialize_policy_fn(dbt_resource_props) + @public + @experimental(emit_runtime_warning=False) + def get_automation_condition( + self, dbt_resource_props: Mapping[str, Any] + ) -> Optional[AutomationCondition]: + """A function that takes a dictionary representing properties of a dbt resource, and + returns the Dagster :py:class:`dagster.AutoMaterializePolicy` for that resource. + + Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents + a model, seed, snapshot or source in a given dbt project. You can learn more about dbt + resources and the properties available in this dictionary here: + https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details + + This method can be overridden to provide a custom AutomationCondition for a dbt resource. + + Args: + dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. + + Returns: + Optional[AutoMaterializePolicy]: A Dagster auto-materialize policy. + + Examples: + Set a custom AutomationCondition for all dbt resources: + + .. code-block:: python + + from typing import Any, Mapping + + from dagster_dbt import DagsterDbtTranslator + + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]: + return AutomationCondition.eager() + + Set a custom AutomationCondition for dbt resources with a specific tag: + + .. code-block:: python + + from typing import Any, Mapping + + from dagster_dbt import DagsterDbtTranslator + + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutomationCondition]: + automation_condition = None + if "my_custom_tag" in dbt_resource_props.get("tags", []): + automation_condition = AutomationCondition.eager() + + return automation_condition + + """ + auto_materialize_policy = self.get_auto_materialize_policy(dbt_resource_props) + return ( + auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None + ) + @dataclass class DbtManifestWrapper: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py index 6dd0ba6ca1326..0dffea0a042d1 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py @@ -6,6 +6,7 @@ from dagster import ( AssetKey, AutoMaterializePolicy, + AutomationCondition, BackfillPolicy, DagsterInvalidDefinitionError, DailyPartitionsDefinition, @@ -740,6 +741,33 @@ def my_dbt_assets(): ... ) +def test_with_automation_condition_replacements(test_jaffle_shop_manifest: Dict[str, Any]) -> None: + expected_automation_condition = AutomationCondition.eager() + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_automation_condition(self, _: Mapping[str, Any]) -> Optional[AutomationCondition]: + return expected_automation_condition + + expected_specs_by_key = { + spec.key: spec + for spec in build_dbt_asset_specs( + manifest=test_jaffle_shop_manifest, + dagster_dbt_translator=CustomDagsterDbtTranslator(), + ) + } + + @dbt_assets( + manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator() + ) + def my_dbt_assets(): ... + + for asset_key, automation_condition in my_dbt_assets.automation_conditions_by_key.items(): + assert automation_condition == expected_automation_condition + assert ( + expected_specs_by_key[asset_key].automation_condition == expected_automation_condition + ) + + def test_dbt_meta_auto_materialize_policy(test_meta_config_manifest: Dict[str, Any]) -> None: expected_auto_materialize_policy = AutoMaterializePolicy.eager() expected_specs_by_key = {