Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[external-assets] Remove existence checks from AssetGraph #20118

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ def get_partitions_in_range(
]

def is_partitioned(self, asset_key: AssetKey) -> bool:
# Temporarily performing an existence check here for backcompat. Callsites need to be
# changed to verify this first.
return asset_key in self.all_asset_keys and self.get_partitions_def(asset_key) is not None
return self.get_partitions_def(asset_key) is not None

@abstractmethod
def get_group_name(self, asset_key: AssetKey) -> Optional[str]:
Expand Down Expand Up @@ -400,7 +398,7 @@ def get_parents_partitions(
valid_parent_partitions: Set[AssetKeyPartitionKey] = set()
required_but_nonexistent_parent_partitions: Set[AssetKeyPartitionKey] = set()
for parent_asset_key in self.get_parents(asset_key):
if self.is_partitioned(parent_asset_key):
if self.has_asset(parent_asset_key) and self.is_partitioned(parent_asset_key):
mapped_partitions_result = self.get_parent_partition_keys_for_child(
partition_key,
parent_asset_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ def _upstream_records_by_key(
upstream_records: Dict[AssetKey, EventLogRecord] = {}

for parent_key in self.asset_graph.get_parents(asset_key):
if not self.asset_graph.is_executable(parent_key):
if not (
self.asset_graph.has_asset(parent_key)
and self.asset_graph.is_executable(parent_key)
):
continue

input_event_pointer_tag = get_input_event_pointer_tag(parent_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_materializable for key in ad.keys}

def is_materializable(self, asset_key: AssetKey) -> bool:
# Performing an existence check temporarily until we change callsites
return self.has_asset(asset_key) and self.get_assets_def(asset_key).is_materializable
return self.get_assets_def(asset_key).is_materializable

@property
@cached_method
Expand All @@ -137,8 +136,7 @@ def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {key for ad in self._assets_defs if ad.is_executable for key in ad.keys}

def is_executable(self, asset_key: AssetKey) -> bool:
# Performing an existence check temporarily until we change callsites
return self.has_asset(asset_key) and self.get_assets_def(asset_key).is_executable
return self.get_assets_def(asset_key).is_executable

def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
return {
Expand Down Expand Up @@ -179,8 +177,7 @@ def all_group_names(self) -> AbstractSet[str]:
}

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
# Performing an existence check temporarily until we change callsites
return self.get_assets_def(asset_key).partitions_def if self.has_asset(asset_key) else None
return self.get_assets_def(asset_key).partitions_def

def get_partition_mappings(self, asset_key: AssetKey) -> Mapping[AssetKey, PartitionMapping]:
return self.get_assets_def(asset_key).partition_mappings
Expand Down