From 8ca46fd0ef16b54c64415f392418b32b551a21f7 Mon Sep 17 00:00:00 2001 From: Owen Kephart Date: Mon, 18 Dec 2023 14:48:07 -0800 Subject: [PATCH] Add asset_condition argument to AssetsDefinition etal. --- .../dagster/_core/definitions/asset_checks.py | 2 +- .../_core/definitions/asset_condition.py | 14 ++- .../_core/definitions/asset_daemon_context.py | 18 ++-- .../_core/definitions/asset_daemon_cursor.py | 9 +- .../dagster/_core/definitions/asset_graph.py | 26 +++-- .../dagster/_core/definitions/assets.py | 98 ++++++++++++------- .../definitions/decorators/asset_decorator.py | 45 ++++++--- .../_core/definitions/external_asset_graph.py | 16 +-- .../_core/host_representation/external.py | 4 +- .../host_representation/external_data.py | 12 ++- .../dagster/dagster/_daemon/asset_daemon.py | 2 +- .../asset_defs_tests/test_assets.py | 8 +- .../test_assets_from_modules.py | 3 +- .../asset_defs_tests/test_decorators.py | 16 +-- .../asset_daemon_scenario.py | 4 +- .../auto_materialize_tests/base_scenario.py | 25 +++-- .../multi_code_location_scenarios.py | 5 +- .../test_load_from_instance.py | 11 ++- .../cloud/test_asset_defs.py | 4 +- .../dagster_dbt_tests/test_asset_decorator.py | 12 +-- .../dagster_dbt_tests/test_asset_defs.py | 8 +- 21 files changed, 198 insertions(+), 144 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index f37a47fad83d6..b9d366c6dd126 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -248,7 +248,7 @@ def blocking_asset(**kwargs): resource_defs=asset_def.resource_defs, metadata=asset_def.metadata_by_key.get(asset_def.key), freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), - auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), + asset_condition=asset_def.asset_conditions_by_key.get(asset_def.key), backfill_policy=asset_def.backfill_policy, config=None, # gets config from asset_def.op ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition.py index 2b2185c67b567..d2d8fd56605c0 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition.py @@ -20,12 +20,10 @@ from dagster._core.definitions.metadata import MetadataMapping, MetadataValue from dagster._serdes.serdes import whitelist_for_serdes -from .asset_condition_evaluation_context import ( - AssetConditionEvaluationContext, -) from .asset_subset import AssetSubset if TYPE_CHECKING: + from .asset_condition_evaluation_context import AssetConditionEvaluationContext from .auto_materialize_rule import AutoMaterializeRule @@ -143,7 +141,7 @@ class AssetCondition(ABC): @abstractmethod def evaluate( - self, context: AssetConditionEvaluationContext + self, context: "AssetConditionEvaluationContext" ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: raise NotImplementedError() @@ -211,7 +209,7 @@ def description(self) -> str: return self.rule.description def evaluate( - self, context: AssetConditionEvaluationContext + self, context: "AssetConditionEvaluationContext" ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: context.root_context.daemon_context._verbose_log_fn( # noqa f"Evaluating rule: {self.rule.to_snapshot()}" @@ -239,7 +237,7 @@ def description(self) -> str: return "All of" def evaluate( - self, context: AssetConditionEvaluationContext + self, context: "AssetConditionEvaluationContext" ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: child_evaluations: List[AssetConditionEvaluation] = [] child_extras: List[AssetConditionCursorExtras] = [] @@ -269,7 +267,7 @@ def description(self) -> str: return "Any of" def evaluate( - self, context: AssetConditionEvaluationContext + self, context: "AssetConditionEvaluationContext" ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: child_evaluations: List[AssetConditionEvaluation] = [] child_extras: List[AssetConditionCursorExtras] = [] @@ -309,7 +307,7 @@ def child(self) -> AssetCondition: return self.children[0] def evaluate( - self, context: AssetConditionEvaluationContext + self, context: "AssetConditionEvaluationContext" ) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]: child_context = context.for_child( condition=self.child, candidate_subset=context.candidate_subset diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py index 41e7a08dc3ee5..be9942fbdcd20 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_context.py @@ -34,7 +34,7 @@ from ... import PartitionKeyRange from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG -from .asset_condition import AssetConditionEvaluation +from .asset_condition import AssetCondition, AssetConditionEvaluation from .asset_condition_evaluation_context import ( AssetConditionEvaluationContext, ) @@ -50,12 +50,12 @@ from dagster._utils.caching_instance_queryer import CachingInstanceQueryer # expensive import -def get_implicit_auto_materialize_policy( +def get_implicit_asset_condition( asset_key: AssetKey, asset_graph: AssetGraph -) -> Optional[AutoMaterializePolicy]: +) -> Optional[AssetCondition]: """For backcompat with pre-auto materialize policy graphs, assume a default scope of 1 day.""" - auto_materialize_policy = asset_graph.get_auto_materialize_policy(asset_key) - if auto_materialize_policy is None: + asset_condition = asset_graph.get_asset_condition(asset_key) + if asset_condition is None: time_partitions_def = get_time_partitions_def(asset_graph.get_partitions_def(asset_key)) if time_partitions_def is None: max_materializations_per_minute = None @@ -76,8 +76,8 @@ def get_implicit_auto_materialize_policy( return AutoMaterializePolicy( rules=rules, max_materializations_per_minute=max_materializations_per_minute, - ) - return auto_materialize_policy + ).to_asset_condition() + return asset_condition class AssetDaemonContext: @@ -192,9 +192,7 @@ def evaluate_asset( """ # convert the legacy AutoMaterializePolicy to an Evaluator - asset_condition = check.not_none( - self.asset_graph.auto_materialize_policies_by_key.get(asset_key) - ).to_asset_condition() + asset_condition = check.not_none(self.asset_graph.get_asset_condition(asset_key)) asset_cursor = self.cursor.get_asset_cursor(asset_key) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py index 01a4025a54152..d66e76147cb38 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_daemon_cursor.py @@ -10,7 +10,6 @@ TypeVar, ) -from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.asset_subset import AssetSubset from dagster._core.definitions.auto_materialize_rule_evaluation import ( BackcompatAutoMaterializeAssetEvaluationSerializer, @@ -26,10 +25,9 @@ whitelist_for_serdes, ) -from .asset_graph import AssetGraph - if TYPE_CHECKING: from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetConditionSnapshot + from .asset_graph import AssetGraph T = TypeVar("T") @@ -155,7 +153,7 @@ def __hash__(self) -> int: def deserialize_asset_daemon_cursor( cursor_str: Optional[str], - asset_graph: Optional[AssetGraph] = None, + asset_graph: Optional["AssetGraph"] = None, default_evaluation_id: int = 0, ) -> AssetDaemonCursor: """Deserializes an AssetDaemonCursor from a string. Provides a backcompat layer for the old @@ -202,12 +200,13 @@ def get_backcompat_asset_condition_cursor( def backcompat_deserialize_asset_daemon_cursor_str( - cursor_str: str, asset_graph: Optional[AssetGraph], default_evaluation_id: int + cursor_str: str, asset_graph: Optional["AssetGraph"], default_evaluation_id: int ) -> AssetDaemonCursor: """This serves as a backcompat layer for deserializing the old cursor format. Some information is impossible to fully recover, this will recover enough to continue operating as normal. """ from .asset_condition import AssetConditionEvaluationWithRunIds + from .asset_graph_subset import AssetGraphSubset data = json.loads(cursor_str) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index fbda6375a9dda..c359f600ed233 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -22,7 +22,7 @@ import toposort import dagster._check as check -from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.asset_condition import AssetCondition from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.instance import DynamicPartitionsStore from dagster._core.selector.subset_selector import ( @@ -79,7 +79,7 @@ def __init__( partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]], group_names_by_key: Mapping[AssetKey, Optional[str]], freshness_policies_by_key: Mapping[AssetKey, Optional[FreshnessPolicy]], - auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]], + asset_conditions_by_key: Mapping[AssetKey, Optional[AssetCondition]], backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]], code_versions_by_key: Mapping[AssetKey, Optional[str]], is_observable_by_key: Mapping[AssetKey, bool], @@ -94,7 +94,7 @@ def __init__( self._partition_mappings_by_key = partition_mappings_by_key self._group_names_by_key = group_names_by_key self._freshness_policies_by_key = freshness_policies_by_key - self._auto_materialize_policies_by_key = auto_materialize_policies_by_key + self._asset_conditions_by_key = asset_conditions_by_key self._backfill_policies_by_key = backfill_policies_by_key self._code_versions_by_key = code_versions_by_key self._is_observable_by_key = is_observable_by_key @@ -141,10 +141,8 @@ def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolic return self._freshness_policies_by_key @property - def auto_materialize_policies_by_key( - self, - ) -> Mapping[AssetKey, Optional[AutoMaterializePolicy]]: - return self._auto_materialize_policies_by_key + def asset_condtitions_by_key(self) -> Mapping[AssetKey, Optional[AssetCondition]]: + return self._asset_conditions_by_key @property def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]: @@ -166,7 +164,7 @@ def from_assets( ] = {} group_names_by_key: Dict[AssetKey, Optional[str]] = {} freshness_policies_by_key: Dict[AssetKey, Optional[FreshnessPolicy]] = {} - auto_materialize_policies_by_key: Dict[AssetKey, Optional[AutoMaterializePolicy]] = {} + asset_conditions_by_key: Dict[AssetKey, Optional[AssetCondition]] = {} backfill_policies_by_key: Dict[AssetKey, Optional[BackfillPolicy]] = {} code_versions_by_key: Dict[AssetKey, Optional[str]] = {} is_observable_by_key: Dict[AssetKey, bool] = {} @@ -192,7 +190,7 @@ def from_assets( partitions_defs_by_key.update({key: asset.partitions_def for key in asset.keys}) group_names_by_key.update(asset.group_names_by_key) freshness_policies_by_key.update(asset.freshness_policies_by_key) - auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key) + asset_conditions_by_key.update(asset.asset_conditions_by_key) backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) code_versions_by_key.update(asset.code_versions_by_key) @@ -209,7 +207,7 @@ def from_assets( partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, freshness_policies_by_key=freshness_policies_by_key, - auto_materialize_policies_by_key=auto_materialize_policies_by_key, + asset_conditions_by_key=asset_conditions_by_key, backfill_policies_by_key=backfill_policies_by_key, assets=assets_defs, asset_checks=asset_checks or [], @@ -531,8 +529,8 @@ def toposort_asset_keys(self) -> Sequence[AbstractSet[AssetKey]]: {key for key in level} for level in toposort.toposort(self._asset_dep_graph["upstream"]) ] - def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]: - return self.auto_materialize_policies_by_key.get(asset_key) + def get_asset_condition(self, asset_key: AssetKey) -> Optional[AssetCondition]: + return self.asset_condtitions_by_key.get(asset_key) def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]: return self.backfill_policies_by_key.get(asset_key) @@ -707,7 +705,7 @@ def __init__( partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]], group_names_by_key: Mapping[AssetKey, Optional[str]], freshness_policies_by_key: Mapping[AssetKey, Optional[FreshnessPolicy]], - auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]], + asset_conditions_by_key: Mapping[AssetKey, Optional[AssetCondition]], backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]], assets: Sequence[AssetsDefinition], source_assets: Sequence[SourceAsset], @@ -726,7 +724,7 @@ def __init__( partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, freshness_policies_by_key=freshness_policies_by_key, - auto_materialize_policies_by_key=auto_materialize_policies_by_key, + asset_conditions_by_key=asset_conditions_by_key, backfill_policies_by_key=backfill_policies_by_key, code_versions_by_key=code_versions_by_key, is_observable_by_key=is_observable_by_key, diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 270f62a3a632c..ba9bfa6e8d3dc 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -19,6 +19,7 @@ import dagster._check as check from dagster._annotations import experimental_param, public from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec +from dagster._core.definitions.asset_condition import AssetCondition from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy @@ -88,7 +89,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit): _can_subset: bool _metadata_by_key: Mapping[AssetKey, ArbitraryMetadataMapping] _freshness_policies_by_key: Mapping[AssetKey, FreshnessPolicy] - _auto_materialize_policies_by_key: Mapping[AssetKey, AutoMaterializePolicy] + _asset_conditions_by_key: Mapping[AssetKey, AssetCondition] _backfill_policy: Optional[BackfillPolicy] _code_versions_by_key: Mapping[AssetKey, Optional[str]] _descriptions_by_key: Mapping[AssetKey, str] @@ -111,6 +112,7 @@ def __init__( metadata_by_key: Optional[Mapping[AssetKey, ArbitraryMetadataMapping]] = None, freshness_policies_by_key: Optional[Mapping[AssetKey, FreshnessPolicy]] = None, auto_materialize_policies_by_key: Optional[Mapping[AssetKey, AutoMaterializePolicy]] = None, + asset_conditions_by_key: Optional[Mapping[AssetKey, AssetCondition]] = None, backfill_policy: Optional[BackfillPolicy] = None, descriptions_by_key: Optional[Mapping[AssetKey, str]] = None, check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, @@ -277,11 +279,16 @@ def __init__( value_type=FreshnessPolicy, ) - self._auto_materialize_policies_by_key = check.opt_mapping_param( - auto_materialize_policies_by_key, - "auto_materialize_policies_by_key", + asset_conditions_by_key = ( + {key: amp.to_asset_condition() for key, amp in auto_materialize_policies_by_key.items()} + if auto_materialize_policies_by_key + else asset_conditions_by_key + ) + self._asset_conditions_by_key = check.opt_mapping_param( + asset_conditions_by_key, + "asset_conditions_by_key", key_type=AssetKey, - value_type=AutoMaterializePolicy, + value_type=AssetCondition, ) self._backfill_policy = check.opt_inst_param( @@ -331,7 +338,7 @@ def dagster_internal_init( group_names_by_key: Optional[Mapping[AssetKey, str]], metadata_by_key: Optional[Mapping[AssetKey, ArbitraryMetadataMapping]], freshness_policies_by_key: Optional[Mapping[AssetKey, FreshnessPolicy]], - auto_materialize_policies_by_key: Optional[Mapping[AssetKey, AutoMaterializePolicy]], + asset_conditions_by_key: Optional[Mapping[AssetKey, AssetCondition]], backfill_policy: Optional[BackfillPolicy], descriptions_by_key: Optional[Mapping[AssetKey, str]], check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]], @@ -351,7 +358,7 @@ def dagster_internal_init( group_names_by_key=group_names_by_key, metadata_by_key=metadata_by_key, freshness_policies_by_key=freshness_policies_by_key, - auto_materialize_policies_by_key=auto_materialize_policies_by_key, + asset_conditions_by_key=asset_conditions_by_key, backfill_policy=backfill_policy, descriptions_by_key=descriptions_by_key, check_specs_by_output_name=check_specs_by_output_name, @@ -391,6 +398,7 @@ def from_graph( auto_materialize_policies_by_output_name: Optional[ Mapping[str, Optional[AutoMaterializePolicy]] ] = None, + asset_conditions_by_output_name: Optional[Mapping[str, Optional[AssetCondition]]] = None, backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, check_specs: Optional[Sequence[AssetCheckSpec]] = None, @@ -462,7 +470,13 @@ def from_graph( descriptions_by_output_name=descriptions_by_output_name, metadata_by_output_name=metadata_by_output_name, freshness_policies_by_output_name=freshness_policies_by_output_name, - auto_materialize_policies_by_output_name=auto_materialize_policies_by_output_name, + asset_conditions_by_output_name={ + name: amp.to_asset_condition() + for name, amp in auto_materialize_policies_by_output_name.items() + if amp is not None + } + if auto_materialize_policies_by_output_name is not None + else asset_conditions_by_output_name, backfill_policy=backfill_policy, can_subset=can_subset, check_specs=check_specs, @@ -487,6 +501,7 @@ def from_op( auto_materialize_policies_by_output_name: Optional[ Mapping[str, Optional[AutoMaterializePolicy]] ] = None, + asset_conditions_by_output_name: Optional[Mapping[str, Optional[AssetCondition]]] = None, backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, ) -> "AssetsDefinition": @@ -551,7 +566,13 @@ def from_op( descriptions_by_output_name=descriptions_by_output_name, metadata_by_output_name=metadata_by_output_name, freshness_policies_by_output_name=freshness_policies_by_output_name, - auto_materialize_policies_by_output_name=auto_materialize_policies_by_output_name, + asset_conditions_by_output_name={ + name: amp.to_asset_condition() + for name, amp in auto_materialize_policies_by_output_name.items() + if amp is not None + } + if auto_materialize_policies_by_output_name is not None + else asset_conditions_by_output_name, backfill_policy=backfill_policy, can_subset=can_subset, ) @@ -572,9 +593,7 @@ def _from_node( descriptions_by_output_name: Optional[Mapping[str, str]] = None, metadata_by_output_name: Optional[Mapping[str, Optional[ArbitraryMetadataMapping]]] = None, freshness_policies_by_output_name: Optional[Mapping[str, Optional[FreshnessPolicy]]] = None, - auto_materialize_policies_by_output_name: Optional[ - Mapping[str, Optional[AutoMaterializePolicy]] - ] = None, + asset_conditions_by_output_name: Optional[Mapping[str, Optional[AssetCondition]]] = None, backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, check_specs: Optional[Sequence[AssetCheckSpec]] = None, @@ -689,13 +708,13 @@ def _from_node( if freshness_policies_by_output_name else None ), - auto_materialize_policies_by_key=( + asset_conditions_by_key=( { - keys_by_output_name_with_prefix[output_name]: auto_materialize_policy - for output_name, auto_materialize_policy in auto_materialize_policies_by_output_name.items() - if auto_materialize_policy is not None + keys_by_output_name_with_prefix[output_name]: asset_condition + for output_name, asset_condition in asset_conditions_by_output_name.items() + if asset_condition is not None } - if auto_materialize_policies_by_output_name + if asset_conditions_by_output_name else None ), backfill_policy=check.opt_inst_param( @@ -855,8 +874,8 @@ def freshness_policies_by_key(self) -> Mapping[AssetKey, FreshnessPolicy]: return self._freshness_policies_by_key @property - def auto_materialize_policies_by_key(self) -> Mapping[AssetKey, AutoMaterializePolicy]: - return self._auto_materialize_policies_by_key + def asset_conditions_by_key(self) -> Mapping[AssetKey, AssetCondition]: + return self._asset_conditions_by_key @property def backfill_policy(self) -> Optional[BackfillPolicy]: @@ -978,6 +997,7 @@ def with_attributes( auto_materialize_policy: Optional[ Union[AutoMaterializePolicy, Mapping[AssetKey, AutoMaterializePolicy]] ] = None, + asset_condition: Optional[Union[AssetCondition, Mapping[AssetKey, AssetCondition]]] = None, backfill_policy: Optional[BackfillPolicy] = None, is_subset: bool = False, check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, @@ -1051,31 +1071,43 @@ def with_attributes( output_asset_key_replacements.get(key, key) ] = replaced_freshness_policy + # convert legacy auto_materialize_policy argument to asset_condition if auto_materialize_policy: - auto_materialize_policy_conflicts = ( - self.auto_materialize_policies_by_key.keys() + asset_condition = ( + auto_materialize_policy.to_asset_condition() if isinstance(auto_materialize_policy, AutoMaterializePolicy) - else (auto_materialize_policy.keys() & self.auto_materialize_policies_by_key.keys()) + else { + key: amp.to_asset_condition() + for key, amp in auto_materialize_policy.items() + if amp is not None + } + ) + + if asset_condition: + asset_condition_conflicts = ( + self.asset_conditions_by_key.keys() + if isinstance(asset_condition, AssetCondition) + else (asset_condition.keys() & self.asset_conditions_by_key.keys()) ) - if auto_materialize_policy_conflicts: + if asset_condition_conflicts: raise DagsterInvalidDefinitionError( - "AutoMaterializePolicy already exists on assets" - f" {', '.join(key.to_string() for key in auto_materialize_policy_conflicts)}" + "AssetCondition already exists on assets" + f" {', '.join(key.to_string() for key in asset_condition_conflicts)}" ) replaced_auto_materialize_policies_by_key = {} for key in self.keys: - if isinstance(auto_materialize_policy, AutoMaterializePolicy): - replaced_auto_materialize_policy = auto_materialize_policy - elif auto_materialize_policy: - replaced_auto_materialize_policy = auto_materialize_policy.get(key) + if isinstance(asset_condition, AssetCondition): + replaced_asset_condition = asset_condition + elif asset_condition: + replaced_asset_condition = asset_condition.get(key) else: - replaced_auto_materialize_policy = self.auto_materialize_policies_by_key.get(key) + replaced_asset_condition = self.asset_conditions_by_key.get(key) - if replaced_auto_materialize_policy: + if replaced_asset_condition: replaced_auto_materialize_policies_by_key[ output_asset_key_replacements.get(key, key) - ] = replaced_auto_materialize_policy + ] = asset_condition replaced_descriptions_by_key = { output_asset_key_replacements.get(key, key): description @@ -1373,7 +1405,7 @@ def get_attributes_dict(self) -> Dict[str, Any]: group_names_by_key=self._group_names_by_key, metadata_by_key=self._metadata_by_key, freshness_policies_by_key=self._freshness_policies_by_key, - auto_materialize_policies_by_key=self._auto_materialize_policies_by_key, + asset_conditions_by_key=self._asset_conditions_by_key, backfill_policy=self._backfill_policy, descriptions_by_key=self._descriptions_by_key, check_specs_by_output_name=self._check_specs_by_output_name, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index dd37721605125..630133b1c0c47 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -22,6 +22,7 @@ from dagster._builtins import Nothing from dagster._config import UserConfigSchema from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations +from dagster._core.definitions.asset_condition import AssetCondition from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.config import ConfigMapping @@ -83,6 +84,7 @@ def asset( output_required: bool = ..., freshness_policy: Optional[FreshnessPolicy] = ..., auto_materialize_policy: Optional[AutoMaterializePolicy] = ..., + asset_condition: Optional[AssetCondition] = ..., backfill_policy: Optional[BackfillPolicy] = ..., retry_policy: Optional[RetryPolicy] = ..., code_version: Optional[str] = ..., @@ -96,6 +98,7 @@ def asset( @experimental_param(param="resource_defs") @experimental_param(param="io_manager_def") @experimental_param(param="auto_materialize_policy") +@experimental_param(param="asset_condition") @experimental_param(param="backfill_policy") @deprecated_param( param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead." @@ -122,6 +125,7 @@ def asset( output_required: bool = True, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + asset_condition: Optional[AssetCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, @@ -191,6 +195,8 @@ def asset( with respect to its root data. auto_materialize_policy (AutoMaterializePolicy): (Experimental) Configure Dagster to automatically materialize this asset according to its FreshnessPolicy and when upstream dependencies change. + asset_condition (AssetCondition): (Experimental) Configure Dagster to automatically materialize + this asset whenever this condition evaluates to true. backfill_policy (BackfillPolicy): (Experimental) Configure Dagster to backfill this asset according to its BackfillPolicy. retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset. @@ -235,7 +241,9 @@ def create_asset(): group_name=group_name, output_required=output_required, freshness_policy=freshness_policy, - auto_materialize_policy=auto_materialize_policy, + asset_condition=auto_materialize_policy.to_asset_condition() + if auto_materialize_policy + else asset_condition, backfill_policy=backfill_policy, retry_policy=retry_policy, code_version=code_version, @@ -247,6 +255,11 @@ def create_asset(): return create_asset()(compute_fn) def inner(fn: Callable[..., Any]) -> AssetsDefinition: + check.invariant( + not (auto_materialize_policy and asset_condition), + "Both auto_materialize_policy and asset_condition were provided to `@asset` decorator. Please" + " provide one or the other. ", + ) check.invariant( not (io_manager_key and io_manager_def), "Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please" @@ -307,7 +320,7 @@ def __init__( group_name: Optional[str] = None, output_required: bool = True, freshness_policy: Optional[FreshnessPolicy] = None, - auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + asset_condition: Optional[AssetCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, @@ -335,7 +348,7 @@ def __init__( self.output_required = output_required self.freshness_policy = freshness_policy self.retry_policy = retry_policy - self.auto_materialize_policy = auto_materialize_policy + self.asset_condition = asset_condition self.backfill_policy = backfill_policy self.code_version = code_version self.check_specs = check_specs @@ -468,10 +481,8 @@ def __call__(self, fn: Callable) -> AssetsDefinition: freshness_policies_by_key=( {out_asset_key: self.freshness_policy} if self.freshness_policy else None ), - auto_materialize_policies_by_key=( - {out_asset_key: self.auto_materialize_policy} - if self.auto_materialize_policy - else None + asset_conditions_by_key=( + {out_asset_key: self.asset_condition} if self.asset_condition else None ), backfill_policy=self.backfill_policy, asset_deps=None, # no asset deps in single-asset decorator @@ -843,8 +854,8 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: for asset_key, props in props_by_asset_key.items() if props.freshness_policy is not None } - auto_materialize_policies_by_key = { - asset_key: props.auto_materialize_policy + asset_conditions_by_key = { + asset_key: props.auto_materialize_policy.to_asset_condition() for asset_key, props in props_by_asset_key.items() if props.auto_materialize_policy is not None } @@ -865,7 +876,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: resource_defs=resource_defs, group_names_by_key=group_names_by_key, freshness_policies_by_key=freshness_policies_by_key, - auto_materialize_policies_by_key=auto_materialize_policies_by_key, + asset_conditions_by_key=asset_conditions_by_key, backfill_policy=backfill_policy, selected_asset_keys=None, # no subselection in decorator # descriptions by key is more accurately understood as _overriding_ the descriptions @@ -1015,6 +1026,7 @@ def graph_asset( metadata: Optional[MetadataUserInput] = ..., freshness_policy: Optional[FreshnessPolicy] = ..., auto_materialize_policy: Optional[AutoMaterializePolicy] = ..., + asset_condition: Optional[AssetCondition] = ..., backfill_policy: Optional[BackfillPolicy] = ..., resource_defs: Optional[Mapping[str, ResourceDefinition]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = None, @@ -1036,6 +1048,7 @@ def graph_asset( metadata: Optional[MetadataUserInput] = None, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + asset_condition: Optional[AssetCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, @@ -1082,6 +1095,7 @@ def graph_asset( intended to be updated with respect to its root data. auto_materialize_policy (Optional[AutoMaterializePolicy]): The AutoMaterializePolicy to use for this asset. + asset_condition (Optional[AssetCondition]): The AssetCondition to use for this asset. backfill_policy (Optional[BackfillPolicy]): The BackfillPolicy to use for this asset. key (Optional[CoeercibleToAssetKey]): The key for this asset. If provided, cannot specify key_prefix or name. @@ -1113,6 +1127,7 @@ def slack_files_table(): metadata=metadata, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, + asset_condition=asset_condition, backfill_policy=backfill_policy, resource_defs=resource_defs, check_specs=check_specs, @@ -1130,7 +1145,9 @@ def slack_files_table(): partitions_def=partitions_def, metadata=metadata, freshness_policy=freshness_policy, - auto_materialize_policy=auto_materialize_policy, + asset_condition=auto_materialize_policy.to_asset_condition() + if auto_materialize_policy + else asset_condition, backfill_policy=backfill_policy, resource_defs=resource_defs, check_specs=check_specs, @@ -1150,7 +1167,7 @@ def graph_asset_no_defaults( partitions_def: Optional[PartitionsDefinition], metadata: Optional[MetadataUserInput], freshness_policy: Optional[FreshnessPolicy], - auto_materialize_policy: Optional[AutoMaterializePolicy], + asset_condition: Optional[AssetCondition], backfill_policy: Optional[BackfillPolicy], resource_defs: Optional[Mapping[str, ResourceDefinition]], check_specs: Optional[Sequence[AssetCheckSpec]], @@ -1203,9 +1220,7 @@ def graph_asset_no_defaults( freshness_policies_by_output_name=( {"result": freshness_policy} if freshness_policy else None ), - auto_materialize_policies_by_output_name=( - {"result": auto_materialize_policy} if auto_materialize_policy else None - ), + asset_conditions_by_output_name={"result": asset_condition} if asset_condition else None, backfill_policy=backfill_policy, descriptions_by_output_name={"result": description} if description else None, resource_defs=resource_defs, diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index ff09eef6b3f2f..9eb6247945f24 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -13,8 +13,8 @@ ) import dagster._check as check +from dagster._core.definitions.asset_condition import AssetCondition from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX -from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.host_representation.external import ExternalRepository from dagster._core.host_representation.handle import RepositoryHandle from dagster._core.selector.subset_selector import DependencyGraph @@ -43,7 +43,7 @@ def __init__( partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]], group_names_by_key: Mapping[AssetKey, Optional[str]], freshness_policies_by_key: Mapping[AssetKey, Optional[FreshnessPolicy]], - auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]], + asset_conditions_by_key: Mapping[AssetKey, Optional[AssetCondition]], backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]], repo_handles_by_key: Mapping[AssetKey, RepositoryHandle], job_names_by_key: Mapping[AssetKey, Sequence[str]], @@ -61,7 +61,7 @@ def __init__( partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, freshness_policies_by_key=freshness_policies_by_key, - auto_materialize_policies_by_key=auto_materialize_policies_by_key, + asset_conditions_by_key=asset_conditions_by_key, backfill_policies_by_key=backfill_policies_by_key, code_versions_by_key=code_versions_by_key, is_observable_by_key=is_observable_by_key, @@ -130,7 +130,7 @@ def from_repository_handles_and_external_asset_nodes( ) group_names_by_key = {} freshness_policies_by_key = {} - auto_materialize_policies_by_key = {} + asset_conditions_by_key = {} backfill_policies_by_key = {} keys_by_atomic_execution_unit_id: Dict[str, Set[AssetKeyOrCheckKey]] = defaultdict(set) repo_handles_by_key = { @@ -184,7 +184,11 @@ def from_repository_handles_and_external_asset_nodes( ) group_names_by_key[node.asset_key] = node.group_name freshness_policies_by_key[node.asset_key] = node.freshness_policy - auto_materialize_policies_by_key[node.asset_key] = node.auto_materialize_policy + asset_conditions_by_key[node.asset_key] = ( + node.auto_materialize_policy.to_asset_condition() + if node.auto_materialize_policy + else node.asset_condition + ) backfill_policies_by_key[node.asset_key] = node.backfill_policy if node.atomic_execution_unit_id is not None: @@ -216,7 +220,7 @@ def from_repository_handles_and_external_asset_nodes( partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, freshness_policies_by_key=freshness_policies_by_key, - auto_materialize_policies_by_key=auto_materialize_policies_by_key, + asset_conditions_by_key=asset_conditions_by_key, backfill_policies_by_key=backfill_policies_by_key, repo_handles_by_key=repo_handles_by_key, job_names_by_key=job_names_by_key, diff --git a/python_modules/dagster/dagster/_core/host_representation/external.py b/python_modules/dagster/dagster/_core/host_representation/external.py index a92cba30c9f30..d755fb7859b52 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external.py +++ b/python_modules/dagster/dagster/_core/host_representation/external.py @@ -209,8 +209,8 @@ def _external_sensors(self) -> Dict[str, "ExternalSensor"]: default_sensor_asset_keys = set() - for asset_key, policy in asset_graph.auto_materialize_policies_by_key.items(): - if not policy: + for asset_key, asset_condition in asset_graph.asset_condtitions_by_key.items(): + if not asset_condition: continue if asset_key not in covered_asset_keys: diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index a31a0e9aad826..7946ff822d4c6 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -48,6 +48,7 @@ ) from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.asset_condition import AssetCondition from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition from dagster._core.definitions.asset_spec import ( SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, @@ -1172,6 +1173,7 @@ class ExternalAssetNode( ("atomic_execution_unit_id", Optional[str]), ("required_top_level_resources", Optional[Sequence[str]]), ("auto_materialize_policy", Optional[AutoMaterializePolicy]), + ("asset_condition", Optional[AssetCondition]), ("backfill_policy", Optional[BackfillPolicy]), ("auto_observe_interval_minutes", Optional[float]), ], @@ -1206,6 +1208,7 @@ def __new__( atomic_execution_unit_id: Optional[str] = None, required_top_level_resources: Optional[Sequence[str]] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, + asset_condition: Optional[AssetCondition] = None, backfill_policy: Optional[BackfillPolicy] = None, auto_observe_interval_minutes: Optional[float] = None, ): @@ -1263,6 +1266,9 @@ def __new__( "auto_materialize_policy", AutoMaterializePolicy, ), + asset_condition=check.opt_inst_param( + asset_condition, "asset_condition", AssetCondition + ), backfill_policy=check.opt_inst_param( backfill_policy, "backfill_policy", BackfillPolicy ), @@ -1512,7 +1518,7 @@ def external_asset_nodes_from_defs( asset_info_by_asset_key: Dict[AssetKey, AssetOutputInfo] = dict() freshness_policy_by_asset_key: Dict[AssetKey, FreshnessPolicy] = dict() metadata_by_asset_key: Dict[AssetKey, MetadataUserInput] = dict() - auto_materialize_policy_by_asset_key: Dict[AssetKey, AutoMaterializePolicy] = dict() + asset_conditions_by_asset_key: Dict[AssetKey, AssetCondition] = dict() backfill_policy_by_asset_key: Dict[AssetKey, Optional[BackfillPolicy]] = dict() deps: Dict[AssetKey, Dict[AssetKey, ExternalAssetDependency]] = defaultdict(dict) @@ -1567,7 +1573,7 @@ def external_asset_nodes_from_defs( for assets_def in asset_layer.assets_defs_by_key.values(): metadata_by_asset_key.update(assets_def.metadata_by_key) freshness_policy_by_asset_key.update(assets_def.freshness_policies_by_key) - auto_materialize_policy_by_asset_key.update(assets_def.auto_materialize_policies_by_key) + asset_conditions_by_asset_key.update(assets_def.asset_conditions_by_key) backfill_policy_by_asset_key.update( {key: assets_def.backfill_policy for key in assets_def.keys} ) @@ -1699,7 +1705,7 @@ def external_asset_nodes_from_defs( # such assets are part of the default group group_name=group_name_by_asset_key.get(asset_key, DEFAULT_GROUP_NAME), freshness_policy=freshness_policy_by_asset_key.get(asset_key), - auto_materialize_policy=auto_materialize_policy_by_asset_key.get(asset_key), + asset_condition=asset_conditions_by_asset_key.get(asset_key), backfill_policy=backfill_policy_by_asset_key.get(asset_key), atomic_execution_unit_id=atomic_execution_unit_ids_by_key.get(asset_key), required_top_level_resources=required_top_level_resources, diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 9cb3a94c3e4c7..2805b19a69787 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -251,7 +251,7 @@ def _run_iteration_impl( auto_materialize_asset_keys = { target_key for target_key in asset_graph.materializable_asset_keys - if asset_graph.get_auto_materialize_policy(target_key) is not None + if asset_graph.get_asset_condition(target_key) is not None } num_target_assets = len(auto_materialize_asset_keys) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 40356bbf7a645..7562857f8e0bd 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -224,7 +224,7 @@ def bar(): ) -def test_graph_backed_retain_freshness_policy_and_auto_materialize_policy(): +def test_graph_backed_retain_freshness_policy_and_auto_materialize_policy() -> None: fpa = FreshnessPolicy(maximum_lag_minutes=24.5) fpb = FreshnessPolicy( maximum_lag_minutes=30.5, cron_schedule="0 0 * * *", cron_schedule_timezone="US/Eastern" @@ -262,9 +262,9 @@ def my_graph(): assert replaced.freshness_policies_by_key[AssetKey("bb")] == fpb assert replaced.freshness_policies_by_key.get(AssetKey("cc")) is None - assert replaced.auto_materialize_policies_by_key[AssetKey("aa")] == ampa - assert replaced.auto_materialize_policies_by_key[AssetKey("bb")] == ampb - assert replaced.auto_materialize_policies_by_key.get(AssetKey("cc")) is None + assert replaced.asset_conditions_by_key[AssetKey("aa")] == ampa.to_asset_condition() + assert replaced.asset_conditions_by_key[AssetKey("bb")] == ampb.to_asset_condition() + assert replaced.asset_conditions_by_key.get(AssetKey("cc")) is None def test_retain_metadata_graph(): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py index f418f85a61e05..cf04ebd990c82 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_from_modules.py @@ -46,7 +46,8 @@ def check_auto_materialize_policy(assets, auto_materialize_policy): asset_keys = a.keys for asset_key in asset_keys: assert ( - a.auto_materialize_policies_by_key.get(asset_key) == auto_materialize_policy + a.asset_conditions_by_key.get(asset_key) + == auto_materialize_policy.to_asset_condition() ), asset_key diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index 131f72699ff11..b2a3c307dce6a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -922,8 +922,8 @@ def my_asset(x): maximum_lag_minutes=5 ) assert ( - my_asset.auto_materialize_policies_by_key[AssetKey("my_asset")] - == AutoMaterializePolicy.lazy() + my_asset.asset_conditions_by_key[AssetKey("my_asset")] + == AutoMaterializePolicy.lazy().to_asset_condition() ) assert my_asset.resource_defs["foo"] == foo_resource @@ -1098,10 +1098,10 @@ def two_assets(x, y): ) assert ( - two_assets.auto_materialize_policies_by_key[AssetKey("first_asset")] - == AutoMaterializePolicy.eager() + two_assets.asset_conditions_by_key[AssetKey("first_asset")] + == AutoMaterializePolicy.eager().to_asset_condition() ) - assert two_assets.auto_materialize_policies_by_key.get(AssetKey("second_asset")) is None + assert two_assets.asset_conditions_by_key.get(AssetKey("second_asset")) is None assert two_assets.resource_defs["foo"] == foo_resource @@ -1254,9 +1254,9 @@ def test_multi_asset_with_auto_materialize_policy(): def my_asset(): ... - assert my_asset.auto_materialize_policies_by_key == { - AssetKey("o2"): AutoMaterializePolicy.eager(), - AssetKey("o3"): AutoMaterializePolicy.lazy(), + assert my_asset.asset_conditions_by_key == { + AssetKey("o2"): AutoMaterializePolicy.eager().to_asset_condition(), + AssetKey("o3"): AutoMaterializePolicy.lazy().to_asset_condition(), } diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py index 2609444c48d55..1f05d70beceac 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_daemon_scenario.py @@ -338,8 +338,8 @@ def _evaluate_tick_fast( asset_graph=self.asset_graph, auto_materialize_asset_keys={ key - for key, policy in self.asset_graph.auto_materialize_policies_by_key.items() - if policy is not None + for key, condition in self.asset_graph.asset_condtitions_by_key.items() + if condition is not None }, instance=self.instance, materialize_run_tags={}, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index 019dd84860336..fce12d1b2ed74 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -46,7 +46,7 @@ ) from dagster._core.definitions.asset_daemon_context import ( AssetDaemonContext, - get_implicit_auto_materialize_policy, + get_implicit_asset_condition, ) from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor from dagster._core.definitions.asset_graph import AssetGraph @@ -183,7 +183,7 @@ def __new__( # and add them to the assets. assets_with_implicit_policies = assets if assets and all( - (isinstance(a, AssetsDefinition) and not a.auto_materialize_policies_by_key) + (isinstance(a, AssetsDefinition) and not a.asset_conditions_by_key) or isinstance(a, SourceAsset) for a in assets ): @@ -703,7 +703,9 @@ def with_auto_materialize_policy( """ ret = [] for assets_def in assets_defs: - ret.append(assets_def.with_attributes(auto_materialize_policy=auto_materialize_policy)) + ret.append( + assets_def.with_attributes(asset_condition=auto_materialize_policy.to_asset_condition()) + ) return ret @@ -717,22 +719,17 @@ def with_implicit_auto_materialize_policies( """ ret = [] for assets_def in assets_defs: - if ( - isinstance(assets_def, AssetsDefinition) - and not assets_def.auto_materialize_policies_by_key - ): + if isinstance(assets_def, AssetsDefinition) and not assets_def.asset_conditions_by_key: targeted_keys = ( assets_def.keys & targeted_assets if targeted_assets else assets_def.keys ) - auto_materialize_policies_by_key = {} + asset_conditions_by_key = {} for key in targeted_keys: - policy = get_implicit_auto_materialize_policy(key, asset_graph) - if policy: - auto_materialize_policies_by_key[key] = policy + condition = get_implicit_asset_condition(key, asset_graph) + if condition: + asset_conditions_by_key[key] = condition - ret.append( - assets_def.with_attributes(auto_materialize_policy=auto_materialize_policies_by_key) - ) + ret.append(assets_def.with_attributes(asset_condition=asset_conditions_by_key)) else: ret.append(assets_def) return ret diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/multi_code_location_scenarios.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/multi_code_location_scenarios.py index 9b59fe0510110..e155cddd5e1a5 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/multi_code_location_scenarios.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenarios/multi_code_location_scenarios.py @@ -29,8 +29,9 @@ def with_auto_materialize_policy( ret = [] for assets_def in assets_defs: new_assets_def = copy.copy(assets_def) - new_assets_def._auto_materialize_policies_by_key = { # noqa: SLF001 - asset_key: auto_materialize_policy for asset_key in new_assets_def.keys + new_assets_def._asset_conditions_by_key = { # noqa: SLF001 + asset_key: auto_materialize_policy.to_asset_condition() + for asset_key in new_assets_def.keys } ret.append(new_assets_def) return ret diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_instance.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_instance.py index e69dcac9369ef..c084d224a933e 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_instance.py @@ -241,10 +241,15 @@ def downstream_asset(dagster_tags): expected_auto_materialize_policy = ( AutoMaterializePolicy.lazy() if connection_to_auto_materialize_policy_fn else None ) - auto_materialize_policies_by_key = ab_assets[0].auto_materialize_policies_by_key + asset_conditions_by_key = ab_assets[0].asset_conditions_by_key assert all( - auto_materialize_policies_by_key[key] == expected_auto_materialize_policy - for key in auto_materialize_policies_by_key + asset_conditions_by_key[key] + == ( + expected_auto_materialize_policy.to_asset_condition() + if expected_auto_materialize_policy + else None + ) + for key in asset_conditions_by_key ) responses.add( diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/test_asset_defs.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/test_asset_defs.py index 34729255bba50..cc39c95cf1e62 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/test_asset_defs.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/cloud/test_asset_defs.py @@ -529,8 +529,8 @@ def test_custom_auto_materialize_policy(dbt_cloud, dbt_cloud_service): dbt_assets_definition_cacheable_data ) - assert dbt_cloud_assets[0].auto_materialize_policies_by_key == { - key: AutoMaterializePolicy.eager() for key in dbt_cloud_assets[0].keys + assert dbt_cloud_assets[0].asset_conditions_by_key == { + key: AutoMaterializePolicy.eager().to_asset_condition() for key in dbt_cloud_assets[0].keys } diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py index 0944afde1cbb8..e9b19b17d17ba 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py @@ -479,8 +479,8 @@ def get_auto_materialize_policy( def my_dbt_assets(): ... - for auto_materialize_policy in my_dbt_assets.auto_materialize_policies_by_key.values(): - assert auto_materialize_policy == expected_auto_materialize_policy + for asset_condition in my_dbt_assets.asset_conditions_by_key.values(): + assert asset_condition == expected_auto_materialize_policy.to_asset_condition() def test_dbt_meta_auto_materialize_policy() -> None: @@ -488,11 +488,11 @@ def test_dbt_meta_auto_materialize_policy() -> None: def my_dbt_assets(): ... - auto_materialize_policies = my_dbt_assets.auto_materialize_policies_by_key.values() - assert auto_materialize_policies + asset_conditions = my_dbt_assets.asset_conditions_by_key.values() + assert asset_conditions - for auto_materialize_policy in auto_materialize_policies: - assert auto_materialize_policy == AutoMaterializePolicy.eager() + for asset_condition in asset_conditions: + assert asset_condition == AutoMaterializePolicy.eager().to_asset_condition() def test_dbt_meta_freshness_policy() -> None: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_defs.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_defs.py index 74ec803b1134f..90acfea39a709 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_defs.py @@ -342,8 +342,8 @@ def test_custom_auto_materialize_policy(): node_info_to_auto_materialize_policy_fn=lambda _: AutoMaterializePolicy.lazy(), ) - assert dbt_assets[0].auto_materialize_policies_by_key == { - key: AutoMaterializePolicy.lazy() for key in dbt_assets[0].keys + assert dbt_assets[0].asset_conditions_by_key == { + key: AutoMaterializePolicy.lazy().to_asset_condition() for key in dbt_assets[0].keys } @@ -624,8 +624,8 @@ def get_auto_materialize_policy(cls, dbt_resource_props): for freshness_policy in dbt_assets[0].freshness_policies_by_key.values(): assert freshness_policy == FreshnessPolicy(maximum_lag_minutes=1) - for auto_materialize_policy in dbt_assets[0].auto_materialize_policies_by_key.values(): - assert auto_materialize_policy == AutoMaterializePolicy.lazy() + for asset_condition in dbt_assets[0].asset_conditions_by_key.values(): + assert asset_condition == AutoMaterializePolicy.lazy().to_asset_condition() result = materialize_to_memory( dbt_assets,