Skip to content

Commit

Permalink
[external-assets] Tweaks to AssetGraph interface
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 28, 2024
1 parent aa004b0 commit f0065f1
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import itertools
import logging
import os
import time
Expand Down Expand Up @@ -230,7 +229,7 @@ def get_asset_condition_evaluations(
num_checked_assets = 0
num_auto_materialize_asset_keys = len(self.auto_materialize_asset_keys)

for asset_key in itertools.chain(*self.asset_graph.toposort_asset_keys()):
for asset_key in self.asset_graph.toposorted_asset_keys:
# an asset may have already been visited if it was part of a non-subsettable multi-asset
if asset_key not in self.auto_materialize_asset_keys:
continue
Expand Down
22 changes: 14 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ def executable_asset_keys(self) -> AbstractSet[AssetKey]:
def is_executable(self, asset_key: AssetKey) -> bool:
...

@property
@cached_method
def toposorted_asset_keys(self) -> Sequence[AssetKey]:
"""Return topologically sorted asset keys in graph. Keys with the same topological level are
sorted alphabetically to provide stability.
"""
return [
key
for keys_in_level in toposort.toposort(self.asset_dep_graph["upstream"])
for key in sorted(keys_in_level)
]

@abstractmethod
def asset_keys_for_group(self, group_name: str) -> AbstractSet[AssetKey]:
...
Expand Down Expand Up @@ -519,12 +531,6 @@ def get_required_asset_and_check_keys(
) -> AbstractSet[AssetKeyOrCheckKey]:
...

@cached_method
def toposort_asset_keys(self) -> Sequence[AbstractSet[AssetKey]]:
return [
{key for key in level} for level in toposort.toposort(self.asset_dep_graph["upstream"])
]

@cached_method
def get_downstream_freshness_policies(
self, *, asset_key: AssetKey
Expand Down Expand Up @@ -751,10 +757,10 @@ def __init__(
self._asset_graph = asset_graph
self._include_required_multi_assets = include_required_multi_assets

toposorted_asset_keys = asset_graph.toposort_asset_keys()
asset_keys_by_level = toposort.toposort(asset_graph.asset_dep_graph["upstream"])
self._toposort_level_by_asset_key = {
asset_key: i
for i, asset_keys in enumerate(toposorted_asset_keys)
for i, asset_keys in enumerate(asset_keys_by_level)
for asset_key in asset_keys
}
self._heap = [self._queue_item(asset_partition) for asset_partition in items]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ def get_asset_checks_def(self, asset_check_key: AssetCheckKey) -> AssetChecksDef
def has_asset_check(self, asset_check_key: AssetCheckKey) -> bool:
return asset_check_key in self._asset_checks_defs_by_key

def includes_materializable_and_external_assets(
self, asset_keys: AbstractSet[AssetKey]
) -> bool:
"""Returns true if the given asset keys contains at least one materializable asset and
at least one external asset.
"""
selected_external_assets = self.external_asset_keys & asset_keys
selected_materializable_assets = self.materializable_asset_keys & asset_keys
return len(selected_external_assets) > 0 and len(selected_materializable_assets) > 0

@property
@cached_method
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,13 @@ def resolve(
"""Resolve this UnresolvedAssetJobDefinition into a JobDefinition."""
assets = asset_graph.assets_defs
selected_asset_keys = self.selection.resolve(asset_graph)
if asset_graph.includes_materializable_and_external_assets(selected_asset_keys):

if (
len(selected_asset_keys & asset_graph.materializable_asset_keys) > 0
and len(selected_asset_keys & asset_graph.external_asset_keys) > 0
):
raise DagsterInvalidDefinitionError(
f"Asset selection for job '{self.name}' specified both regular assets and source "
f"Asset selection for job '{self.name}' specified both regular assets and external "
"assets. This is not currently supported. Selections must be all regular "
"assets or all source assets.",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,7 @@ def get_targeted_asset_keys_topological_order(
Orders keys in the same topological level alphabetically.
"""
toposorted_keys = asset_graph.toposort_asset_keys()

targeted_toposorted_keys = []
for level_keys in toposorted_keys:
for key in sorted(level_keys):
if key in self.target_subset.asset_keys:
targeted_toposorted_keys.append(key)

return targeted_toposorted_keys
return [k for k in asset_graph.toposorted_asset_keys if k in self.target_subset.asset_keys]

def get_backfill_status_per_asset_key(
self, asset_graph: AssetGraph
Expand Down

0 comments on commit f0065f1

Please sign in to comment.