From da6a5795e4a2f14ebc02c3a005ab774fa1434f16 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Tue, 27 Feb 2024 21:14:44 -0500 Subject: [PATCH] [external-assets] Remove existence checks from AssetGraph --- .../dagster/dagster/_core/definitions/asset_graph.py | 6 ++---- .../dagster/dagster/_core/definitions/data_time.py | 5 ++++- .../dagster/_core/definitions/internal_asset_graph.py | 9 +++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 8c232448a5f0a..480d8c58efed3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -175,9 +175,7 @@ def get_partitions_in_range( ] def is_partitioned(self, asset_key: AssetKey) -> bool: - # Temporarily performing an existence check here for backcompat. Callsites need to be - # changed to verify this first. - return asset_key in self.all_asset_keys and self.get_partitions_def(asset_key) is not None + return self.get_partitions_def(asset_key) is not None @abstractmethod def get_group_name(self, asset_key: AssetKey) -> Optional[str]: @@ -400,7 +398,7 @@ def get_parents_partitions( valid_parent_partitions: Set[AssetKeyPartitionKey] = set() required_but_nonexistent_parent_partitions: Set[AssetKeyPartitionKey] = set() for parent_asset_key in self.get_parents(asset_key): - if self.is_partitioned(parent_asset_key): + if self.has_asset(parent_asset_key) and self.is_partitioned(parent_asset_key): mapped_partitions_result = self.get_parent_partition_keys_for_child( partition_key, parent_asset_key, diff --git a/python_modules/dagster/dagster/_core/definitions/data_time.py b/python_modules/dagster/dagster/_core/definitions/data_time.py index 070d76ead9606..3530e4214b8f9 100644 --- a/python_modules/dagster/dagster/_core/definitions/data_time.py +++ b/python_modules/dagster/dagster/_core/definitions/data_time.py @@ -176,7 +176,10 @@ def _upstream_records_by_key( upstream_records: Dict[AssetKey, EventLogRecord] = {} for parent_key in self.asset_graph.get_parents(asset_key): - if not self.asset_graph.is_executable(parent_key): + if not ( + self.asset_graph.has_asset(parent_key) + and self.asset_graph.is_executable(parent_key) + ): continue input_event_pointer_tag = get_input_event_pointer_tag(parent_key) diff --git a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py index 6ad3200a851df..916df96f7d56b 100644 --- a/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/internal_asset_graph.py @@ -112,8 +112,7 @@ def materializable_asset_keys(self) -> AbstractSet[AssetKey]: return {key for ad in self._assets_defs if ad.is_materializable for key in ad.keys} def is_materializable(self, asset_key: AssetKey) -> bool: - # Performing an existence check temporarily until we change callsites - return self.has_asset(asset_key) and self.get_assets_def(asset_key).is_materializable + return self.get_assets_def(asset_key).is_materializable @property @cached_method @@ -137,8 +136,7 @@ def executable_asset_keys(self) -> AbstractSet[AssetKey]: return {key for ad in self._assets_defs if ad.is_executable for key in ad.keys} def is_executable(self, asset_key: AssetKey) -> bool: - # Performing an existence check temporarily until we change callsites - return self.has_asset(asset_key) and self.get_assets_def(asset_key).is_executable + return self.get_assets_def(asset_key).is_executable def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]: return { @@ -179,8 +177,7 @@ def all_group_names(self) -> AbstractSet[str]: } def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]: - # Performing an existence check temporarily until we change callsites - return self.get_assets_def(asset_key).partitions_def if self.has_asset(asset_key) else None + return self.get_assets_def(asset_key).partitions_def def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]: return self.get_assets_def(asset_key).partition_mappings