Skip to content

Commit

Permalink
[external-assets] Remove existence checks from AssetGraph
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 28, 2024
1 parent e6572f4 commit aa004b0
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ def get_partitions_in_range(
]

def is_partitioned(self, asset_key: AssetKey) -> bool:
# Temporarily performing an existence check here for backcompat. Callsites need to be
# changed to verify this first.
return asset_key in self.all_asset_keys and self.get_partitions_def(asset_key) is not None
return self.get_partitions_def(asset_key) is not None

@abstractmethod
def get_group_name(self, asset_key: AssetKey) -> Optional[str]:
Expand Down Expand Up @@ -400,7 +398,7 @@ def get_parents_partitions(
valid_parent_partitions: Set[AssetKeyPartitionKey] = set()
required_but_nonexistent_parent_partitions: Set[AssetKeyPartitionKey] = set()
for parent_asset_key in self.get_parents(asset_key):
if self.is_partitioned(parent_asset_key):
if self.has_asset(parent_asset_key) and self.is_partitioned(parent_asset_key):
mapped_partitions_result = self.get_parent_partition_keys_for_child(
partition_key,
parent_asset_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ def _upstream_records_by_key(
upstream_records: Dict[AssetKey, EventLogRecord] = {}

for parent_key in self.asset_graph.get_parents(asset_key):
if not self.asset_graph.is_executable(parent_key):
if not (
self.asset_graph.has_asset(parent_key)
and self.asset_graph.is_executable(parent_key)
):
continue

input_event_pointer_tag = get_input_event_pointer_tag(parent_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_materializable for key in ad.keys}

def is_materializable(self, asset_key: AssetKey) -> bool:
# Performing an existence check temporarily until we change callsites
return self.has_asset(asset_key) and self.get_assets_def(asset_key).is_materializable
return self.get_assets_def(asset_key).is_materializable

@property
@cached_method
Expand All @@ -137,8 +136,7 @@ def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_executable for key in ad.keys}

def is_executable(self, asset_key: AssetKey) -> bool:
# Performing an existence check temporarily until we change callsites
return self.has_asset(asset_key) and self.get_assets_def(asset_key).is_executable
return self.get_assets_def(asset_key).is_executable

def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
return {
Expand Down Expand Up @@ -179,8 +177,7 @@ def all_group_names(self) -> AbstractSet[str]:
}

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
# Performing an existence check temporarily until we change callsites
return self.get_assets_def(asset_key).partitions_def if self.has_asset(asset_key) else None
return self.get_assets_def(asset_key).partitions_def

def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]:
return self.get_assets_def(asset_key).partition_mappings
Expand Down

0 comments on commit aa004b0

Please sign in to comment.