Skip to content

Commit

Permalink
[external-assets] Add AssetGraph subsetting, make build_asset_job use…
Browse files Browse the repository at this point in the history
… AssetGraph
  • Loading branch information
smackesey committed Mar 12, 2024
1 parent 5d77fcd commit ebc8a9a
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
)
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.instance_for_test import instance_for_test
from dagster._core.test_utils import create_test_asset_job
from docs_snippets.concepts.assets.observable_source_assets import (
foo_source_asset,
observation_job,
Expand All @@ -13,11 +14,7 @@


def test_observable_source_asset():
@repository
def repo():
return [foo_source_asset]

job_def = build_assets_job("test_job", [], [foo_source_asset])
job_def = create_test_asset_job([foo_source_asset])
result = job_def.execute_in_process()
assert result.success

Expand Down
44 changes: 43 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from functools import cached_property
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_layer import subset_assets_defs
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
AssetSpec,
Expand Down Expand Up @@ -215,7 +217,7 @@ def from_assets(

# Build the set of AssetNodes. Each node holds key rather than object references to parent
# and child nodes.
dep_graph = generate_asset_dep_graph(assets_defs, [])
dep_graph = generate_asset_dep_graph(assets_defs)
asset_nodes_by_key = {
key: AssetNode(
key=key,
Expand Down Expand Up @@ -301,3 +303,43 @@ def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
*(key for ad in self.assets_defs for key in ad.check_keys),
*(key for acd in self.asset_checks_defs for key in acd.keys),
}

def get_subset(
self,
executable_asset_keys: AbstractSet[AssetKey],
asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
) -> "AssetGraph":
"""Returns a new asset graph where only the provided asset keys are executable. All parent
keys of a selected executabel asset will be included as unexecutable external assets (unless
they are themselves present in the executable selection).
"""
from dagster._core.definitions.external_asset import (
create_unexecutable_external_assets_from_assets_def,
)

invalid_executable_keys = executable_asset_keys - self.executable_asset_keys
check.invariant(
not invalid_executable_keys,
"Provided executable asset keys must be a subset of existing executable asset keys."
f" Invalid provided keys: {invalid_executable_keys}",
)

executable_assets_defs, raw_loadable_assets_defs = subset_assets_defs(
self.assets_defs, executable_asset_keys, asset_check_keys
)
loadable_assets_defs = [
unexecutable_ad
for ad in raw_loadable_assets_defs
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad)
]

# ignore check keys that don't correspond to an AssetChecksDefinition
asset_checks_defs = list(
{
acd
for key, acd in self._asset_check_compute_defs_by_key.items()
if key in (asset_check_keys or []) and isinstance(acd, AssetChecksDefinition)
}
)

return self.from_assets([*executable_assets_defs, *loadable_assets_defs], asset_checks_defs)
73 changes: 23 additions & 50 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .resource_definition import ResourceDefinition

if TYPE_CHECKING:
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets import AssetsDefinition, SourceAsset
from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -407,8 +408,7 @@ def from_graph_and_assets_node_mapping(
graph_def: GraphDefinition,
assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"],
source_assets: Sequence["SourceAsset"],
asset_graph: "AssetGraph",
) -> "AssetLayer":
"""Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the
corresponding AssetsDefinition objects.
Expand All @@ -419,19 +419,10 @@ def from_graph_and_assets_node_mapping(
assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from
a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up.
"""
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
unexecutable_assets_defs = asset_graph.assets_defs_for_keys(
asset_graph.unexecutable_asset_keys
)

# Eliminate source assets here
observable_assets_defs_by_node_handle = {
k: create_external_asset_from_source_asset(sa)
for k, sa in observable_source_assets_by_node_handle.items()
}
unexecutable_assets_defs = [
create_external_asset_from_source_asset(sa) for sa in source_assets
]

asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {}
asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {}
check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {}
Expand Down Expand Up @@ -571,32 +562,11 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
key: assets_def
for assets_def in (
*assets_defs_by_outer_node_handle.values(),
*observable_assets_defs_by_node_handle.values(),
*unexecutable_assets_defs,
)
for key in assets_def.keys
}

for node_handle, assets_def in observable_assets_defs_by_node_handle.items():
node_def = assets_def.node_def
check.invariant(len(node_def.output_defs) == 1)
output_name = node_def.output_defs[0].name
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)

asset_info_by_output[node_output_handle] = AssetOutputInfo(
assets_def.key,
partitions_fn=None,
partitions_def=assets_def.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
# nodes for assets
**{
Expand Down Expand Up @@ -848,7 +818,6 @@ def my_graph():
def build_asset_selection_job(
name: str,
assets: Iterable["AssetsDefinition"],
source_assets: Iterable["SourceAsset"],
asset_checks: Iterable["AssetChecksDefinition"],
executor_def: Optional[ExecutorDefinition] = None,
config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig"]] = None,
Expand All @@ -863,22 +832,24 @@ def build_asset_selection_job(
hooks: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
) -> "JobDefinition":
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets_job import build_assets_job
from dagster._core.definitions.external_asset import (
create_unexecutable_external_assets_from_assets_def,
)

if asset_selection is None and asset_check_selection is None:
# no selections, include everything
included_assets = list(assets)
excluded_assets = []
included_source_assets = list(source_assets)
included_checks_defs = list(asset_checks)
else:
# Filter to assets that match either selected assets or include a selected check.
# E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check
# defined with check_specs
(included_assets, excluded_assets) = _subset_assets_defs(
(included_assets, excluded_assets) = subset_assets_defs(
assets, asset_selection or set(), asset_check_selection
)
included_source_assets = _subset_source_assets(source_assets, asset_selection or set())

if asset_check_selection is None:
# If assets were selected and checks are None, then include all checks on the selected assets.
Expand Down Expand Up @@ -906,21 +877,23 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

all_included_assets = [*included_assets, *included_source_assets]
executable_assets = [asset for asset in all_included_assets if asset.is_executable]
loadable_assets = [
*(asset for asset in all_included_assets if not asset.is_executable),
*(asset for asset in source_assets if asset not in included_source_assets),
*excluded_assets,
executable_assets_defs = [asset for asset in included_assets if asset.is_executable]
unexecutable_assets_defs = [
unexecutable_ad
for ad in (
*(asset for asset in included_assets if not asset.is_executable),
*excluded_assets,
)
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad)
]
final_asset_checks = included_checks_defs
asset_graph = AssetGraph.from_assets(
[*executable_assets_defs, *unexecutable_assets_defs], included_checks_defs
)

return build_assets_job(
name=name,
executable_assets=executable_assets,
asset_checks=final_asset_checks,
asset_graph=asset_graph,
config=config,
loadable_assets=loadable_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand All @@ -933,7 +906,7 @@ def build_asset_selection_job(
)


def _subset_assets_defs(
def subset_assets_defs(
assets: Iterable["AssetsDefinition"],
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
Expand All @@ -958,7 +931,7 @@ def _subset_assets_defs(
# if no checks were selected, filter to checks that target selected assets
else:
selected_check_subset = {
handle for handle in asset.check_keys if handle.asset_key in selected_subset
key for key in asset.check_keys if key.asset_key in selected_subset
}

# all assets in this def are selected
Expand Down
3 changes: 1 addition & 2 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,6 @@ def with_attributes(
Union[AutoMaterializePolicy, Mapping[AssetKey, AutoMaterializePolicy]]
] = None,
backfill_policy: Optional[BackfillPolicy] = None,
is_subset: bool = False,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
) -> "AssetsDefinition":
Expand Down Expand Up @@ -1277,7 +1276,7 @@ def with_attributes(
**self._descriptions_by_key,
**replaced_descriptions_by_key,
},
is_subset=is_subset,
is_subset=self.is_subset,
check_specs_by_output_name=check_specs_by_output_name
if check_specs_by_output_name
else self.check_specs_by_output_name,
Expand Down
Loading

0 comments on commit ebc8a9a

Please sign in to comment.