Skip to content

Commit

Permalink
[external-assets] Add AssetGraph subsetting, make build_asset_job use…
Browse files Browse the repository at this point in the history
… AssetGraph
  • Loading branch information
smackesey committed Mar 12, 2024
1 parent 9832ead commit 71257fb
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 209 deletions.
55 changes: 54 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from functools import cached_property
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_layer import subset_assets_defs
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
AssetSpec,
Expand Down Expand Up @@ -215,7 +217,7 @@ def from_assets(

# Build the set of AssetNodes. Each node holds key rather than object references to parent
# and child nodes.
dep_graph = generate_asset_dep_graph(assets_defs, [])
dep_graph = generate_asset_dep_graph(assets_defs)
asset_nodes_by_key = {
key: AssetNode(
key=key,
Expand Down Expand Up @@ -294,3 +296,54 @@ def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
if isinstance(acd, AssetChecksDefinition)
}
)

@cached_property
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
return {
*(key for ad in self.assets_defs for key in ad.check_keys),
*(key for acd in self.asset_checks_defs for key in acd.keys),
}

def get_subset(
self,
executable_asset_keys: AbstractSet[AssetKey],
asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
) -> "AssetGraph":
"""Returns a new asset graph where only the provided asset keys are executable. All parent
keys of a selected executabel asset will be included as unexecutable external assets (unless
they are themselves present in the executable selection).
"""
from dagster._core.definitions.external_asset import (
create_unexecutable_external_assets_from_assets_def,
)

invalid_executable_keys = executable_asset_keys - self.executable_asset_keys
check.invariant(
not invalid_executable_keys,
"Provided executable asset keys must be a subset of existing executable asset keys."
f" Invalid provided keys: {invalid_executable_keys}",
)

# loadable_asset_keys = {
# parent_key for key in executable_asset_keys for parent_key in self.get(key).parent_keys
# } - executable_asset_keys
# full_executable_assets_defs = {self.get(key).assets_def for key in executable_asset_keys}
executable_assets_defs, raw_loadable_assets_defs = subset_assets_defs(
self.assets_defs, executable_asset_keys, asset_check_keys
)
loadable_assets_defs = [
unexecutable_ad
for ad in raw_loadable_assets_defs
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad)
]

# ignore check keys that don't correspond to an AssetChecksDefinition
asset_checks_defs = list(
{
acd
for key, acd in self._asset_check_compute_defs_by_key.items()
if key in (asset_check_keys or []) and isinstance(acd, AssetChecksDefinition)
}
)

return self.from_assets([*executable_assets_defs, *loadable_assets_defs], asset_checks_defs)
73 changes: 23 additions & 50 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .resource_definition import ResourceDefinition

if TYPE_CHECKING:
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets import AssetsDefinition, SourceAsset
from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -407,8 +408,7 @@ def from_graph_and_assets_node_mapping(
graph_def: GraphDefinition,
assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"],
source_assets: Sequence["SourceAsset"],
asset_graph: "AssetGraph",
) -> "AssetLayer":
"""Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the
corresponding AssetsDefinition objects.
Expand All @@ -419,19 +419,10 @@ def from_graph_and_assets_node_mapping(
assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from
a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up.
"""
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
unexecutable_assets_defs = asset_graph.assets_defs_for_keys(
asset_graph.unexecutable_asset_keys
)

# Eliminate source assets here
observable_assets_defs_by_node_handle = {
k: create_external_asset_from_source_asset(sa)
for k, sa in observable_source_assets_by_node_handle.items()
}
unexecutable_assets_defs = [
create_external_asset_from_source_asset(sa) for sa in source_assets
]

asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {}
asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {}
check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {}
Expand Down Expand Up @@ -571,32 +562,11 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
key: assets_def
for assets_def in (
*assets_defs_by_outer_node_handle.values(),
*observable_assets_defs_by_node_handle.values(),
*unexecutable_assets_defs,
)
for key in assets_def.keys
}

for node_handle, assets_def in observable_assets_defs_by_node_handle.items():
node_def = assets_def.node_def
check.invariant(len(node_def.output_defs) == 1)
output_name = node_def.output_defs[0].name
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)

asset_info_by_output[node_output_handle] = AssetOutputInfo(
assets_def.key,
partitions_fn=None,
partitions_def=assets_def.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
# nodes for assets
**{
Expand Down Expand Up @@ -848,7 +818,6 @@ def my_graph():
def build_asset_selection_job(
name: str,
assets: Iterable["AssetsDefinition"],
source_assets: Iterable["SourceAsset"],
asset_checks: Iterable["AssetChecksDefinition"],
executor_def: Optional[ExecutorDefinition] = None,
config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig"]] = None,
Expand All @@ -863,22 +832,24 @@ def build_asset_selection_job(
hooks: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
) -> "JobDefinition":
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets_job import build_assets_job
from dagster._core.definitions.external_asset import (
create_unexecutable_external_assets_from_assets_def,
)

if asset_selection is None and asset_check_selection is None:
# no selections, include everything
included_assets = list(assets)
excluded_assets = []
included_source_assets = list(source_assets)
included_checks_defs = list(asset_checks)
else:
# Filter to assets that match either selected assets or include a selected check.
# E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check
# defined with check_specs
(included_assets, excluded_assets) = _subset_assets_defs(
(included_assets, excluded_assets) = subset_assets_defs(
assets, asset_selection or set(), asset_check_selection
)
included_source_assets = _subset_source_assets(source_assets, asset_selection or set())

if asset_check_selection is None:
# If assets were selected and checks are None, then include all checks on the selected assets.
Expand Down Expand Up @@ -906,21 +877,23 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

all_included_assets = [*included_assets, *included_source_assets]
executable_assets = [asset for asset in all_included_assets if asset.is_executable]
loadable_assets = [
*(asset for asset in all_included_assets if not asset.is_executable),
*(asset for asset in source_assets if asset not in included_source_assets),
*excluded_assets,
executable_assets_defs = [asset for asset in included_assets if asset.is_executable]
unexecutable_assets_defs = [
unexecutable_ad
for ad in (
*(asset for asset in included_assets if not asset.is_executable),
*excluded_assets,
)
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad)
]
final_asset_checks = included_checks_defs
asset_graph = AssetGraph.from_assets(
[*executable_assets_defs, *unexecutable_assets_defs], included_checks_defs
)

return build_assets_job(
name=name,
executable_assets=executable_assets,
asset_checks=final_asset_checks,
asset_graph=asset_graph,
config=config,
loadable_assets=loadable_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand All @@ -933,7 +906,7 @@ def build_asset_selection_job(
)


def _subset_assets_defs(
def subset_assets_defs(
assets: Iterable["AssetsDefinition"],
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
Expand All @@ -958,7 +931,7 @@ def _subset_assets_defs(
# if no checks were selected, filter to checks that target selected assets
else:
selected_check_subset = {
handle for handle in asset.check_keys if handle.asset_key in selected_subset
key for key in asset.check_keys if key.asset_key in selected_subset
}

# all assets in this def are selected
Expand Down
3 changes: 1 addition & 2 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,6 @@ def with_attributes(
Union[AutoMaterializePolicy, Mapping[AssetKey, AutoMaterializePolicy]]
] = None,
backfill_policy: Optional[BackfillPolicy] = None,
is_subset: bool = False,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
) -> "AssetsDefinition":
Expand Down Expand Up @@ -1277,7 +1276,7 @@ def with_attributes(
**self._descriptions_by_key,
**replaced_descriptions_by_key,
},
is_subset=is_subset,
is_subset=self.is_subset,
check_specs_by_output_name=check_specs_by_output_name
if check_specs_by_output_name
else self.check_specs_by_output_name,
Expand Down
Loading

0 comments on commit 71257fb

Please sign in to comment.