Skip to content

Commit

Permalink
[external-assets] Add AssetGraph subsetting, make build_asset_job use…
Browse files Browse the repository at this point in the history
… AssetGraph
  • Loading branch information
smackesey committed Mar 11, 2024
1 parent bff9f3d commit c25b92d
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 45 deletions.
40 changes: 40 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from functools import cached_property
from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_spec import (
Expand Down Expand Up @@ -260,3 +261,42 @@ def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefin
@property
def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]:
return list(dict.fromkeys(self._asset_checks_defs_by_key.values()))

def get_subset(
self,
executable_asset_keys: AbstractSet[AssetKey],
asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
) -> "AssetGraph":
"""Returns a new asset graph where only the provided asset keys are executable. All parent
keys of a selected executabel asset will be included as unexecutable external assets (unless
they are themselves present in the executable selection).
"""
from dagster._core.definitions.external_asset import (
create_unexecutable_external_assets_from_assets_def,
)

invalid_executable_keys = executable_asset_keys - self.executable_asset_keys
check.invariant(
not invalid_executable_keys,
"Provided executable asset keys must be a subset of existing executable asset keys."
f" These keys are not executable: {invalid_executable_keys}",
)

loadable_asset_keys = {
parent_key for key in executable_asset_keys for parent_key in self.get(key).parent_keys
} - executable_asset_keys
executable_assets_defs = list({self.get(key).assets_def for key in executable_asset_keys})
loadable_assets_defs = [
unexecutable_ad
for ad in {self.get(key).assets_def for key in loadable_asset_keys}
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad)
]

# ignore check keys that don't correspond to an AssetChecksDefinition
asset_checks_defs = [
self._asset_checks_defs_by_key[key]
for key in asset_check_keys or []
if key in self._asset_checks_defs_by_key
]

return self.from_assets([*executable_assets_defs, *loadable_assets_defs], asset_checks_defs)
31 changes: 22 additions & 9 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,12 @@ def build_asset_selection_job(
hooks: Optional[AbstractSet[HookDefinition]] = None,
op_retry_policy: Optional[RetryPolicy] = None,
) -> "JobDefinition":
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.assets_job import build_assets_job
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
create_unexecutable_external_assets_from_assets_def,
)

if asset_selection is None and asset_check_selection is None:
# no selections, include everything
Expand Down Expand Up @@ -905,20 +910,28 @@ def build_asset_selection_job(
)

all_included_assets = [*included_assets, *included_source_assets]
executable_assets = [asset for asset in all_included_assets if asset.is_executable]
loadable_assets = [
*(asset for asset in all_included_assets if not asset.is_executable),
*(asset for asset in source_assets if asset not in included_source_assets),
*excluded_assets,
executable_assets_defs = [asset for asset in all_included_assets if asset.is_executable]
unexecutable_assets_defs = [
unexecutable_ad
for ad in (
*(asset for asset in all_included_assets if not asset.is_executable),
*(
create_external_asset_from_source_asset(sa)
for sa in source_assets
if sa not in included_source_assets
),
*excluded_assets,
)
for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad)
]
final_asset_checks = included_checks_defs
asset_graph = AssetGraph.from_assets(
[*executable_assets_defs, *unexecutable_assets_defs], included_checks_defs
)

return build_assets_job(
name=name,
executable_assets=executable_assets,
asset_checks=final_asset_checks,
asset_graph=asset_graph,
config=config,
loadable_assets=loadable_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
43 changes: 7 additions & 36 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,10 @@ def get_base_asset_jobs(
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
if len(asset_graph.all_partitions_defs) == 0:
executable_asset_keys = asset_graph.executable_asset_keys
loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys),
loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys),
asset_checks=asset_graph.asset_checks_defs,
asset_graph=asset_graph,
executor_def=executor_def,
resource_defs=resource_defs,
)
Expand All @@ -85,13 +81,10 @@ def get_base_asset_jobs(
*asset_graph.asset_keys_for_partitions_def(partitions_def=partitions_def),
*asset_graph.unpartitioned_asset_keys,
}
loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys),
loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys),
asset_checks=asset_graph.asset_checks_defs,
asset_graph.get_subset(executable_asset_keys),
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand All @@ -102,9 +95,7 @@ def get_base_asset_jobs(

def build_assets_job(
name: str,
executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]],
loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
asset_graph: AssetGraph,
resource_defs: Optional[Mapping[str, object]] = None,
description: Optional[str] = None,
config: Optional[
Expand All @@ -126,11 +117,7 @@ def build_assets_job(
Args:
name (str): The name of the job.
executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets
to be executed by the job. SourceAssets must be observable.
loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of
AssetsDefinitions or SourceAssets that are not exectued by this job, but that are
available to be loaded as inputs by executable assets.
asset_graph (AssetGraph): The asset graph that contains the assets and checks to be executed.
resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in
this job.
description (Optional[str]): A description of the job.
Expand All @@ -156,34 +143,18 @@ def asset2(asset1):
from dagster._core.execution.build_resources import wrap_resources_for_execution

check.str_param(name, "name")
check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset))
for asset in executable_assets:
if not asset.is_executable:
keys = [asset.key] if isinstance(asset, SourceAsset) else asset.keys
check.failed(f"Passed unexecutable keys to executable_assets: {keys}")

loadable_assets = check.opt_sequence_param(
loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
asset_checks = check.opt_sequence_param(
asset_checks, "asset_checks", of_type=AssetChecksDefinition
)
check.opt_str_param(description, "description")
check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData)

resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs)
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)]
assets = asset_graph.assets_defs_for_keys(asset_graph.executable_asset_keys)
resolved_source_assets = [
asset for asset in executable_assets if isinstance(asset, SourceAsset)
ad.to_source_asset() for ad in asset_graph.assets_defs_for_keys(asset_graph.all_asset_keys)
]
for asset in loadable_assets or []:
if isinstance(asset, AssetsDefinition):
resolved_source_assets += asset.to_source_assets()
elif isinstance(asset, SourceAsset):
resolved_source_assets.append(asset)
asset_checks = asset_graph.asset_checks_defs

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,12 @@ def _shim_assets_def(context: AssetExecutionContext):
return return_value

return _shim_assets_def


def create_unexecutable_external_assets_from_assets_def(
assets_def: AssetsDefinition,
) -> Sequence[AssetsDefinition]:
if not assets_def.is_executable:
return [assets_def]
else:
return [create_external_asset_from_source_asset(sa) for sa in assets_def.to_source_assets()]

0 comments on commit c25b92d

Please sign in to comment.