diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 800ade27b64e2..f5707d60a0687 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -1,6 +1,7 @@ from functools import cached_property from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union +import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_spec import ( @@ -260,3 +261,42 @@ def assets_defs_for_keys(self, keys: Iterable[AssetKey]) -> Sequence[AssetsDefin @property def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: return list(dict.fromkeys(self._asset_checks_defs_by_key.values())) + + def get_subset( + self, + executable_asset_keys: AbstractSet[AssetKey], + asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None, + ) -> "AssetGraph": + """Returns a new asset graph where only the provided asset keys are executable. All parent + keys of a selected executabel asset will be included as unexecutable external assets (unless + they are themselves present in the executable selection). + """ + from dagster._core.definitions.external_asset import ( + create_unexecutable_external_assets_from_assets_def, + ) + + invalid_executable_keys = executable_asset_keys - self.executable_asset_keys + check.invariant( + not invalid_executable_keys, + "Provided executable asset keys must be a subset of existing executable asset keys." + f" These keys are not executable: {invalid_executable_keys}", + ) + + loadable_asset_keys = { + parent_key for key in executable_asset_keys for parent_key in self.get(key).parent_keys + } - executable_asset_keys + executable_assets_defs = list({self.get(key).assets_def for key in executable_asset_keys}) + loadable_assets_defs = [ + unexecutable_ad + for ad in {self.get(key).assets_def for key in loadable_asset_keys} + for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad) + ] + + # ignore check keys that don't correspond to an AssetChecksDefinition + asset_checks_defs = [ + self._asset_checks_defs_by_key[key] + for key in asset_check_keys or [] + if key in self._asset_checks_defs_by_key + ] + + return self.from_assets([*executable_assets_defs, *loadable_assets_defs], asset_checks_defs) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index f1b29903aac3d..78c8bf9de58d6 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -863,7 +863,12 @@ def build_asset_selection_job( hooks: Optional[AbstractSet[HookDefinition]] = None, op_retry_policy: Optional[RetryPolicy] = None, ) -> "JobDefinition": + from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets_job import build_assets_job + from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + create_unexecutable_external_assets_from_assets_def, + ) if asset_selection is None and asset_check_selection is None: # no selections, include everything @@ -907,20 +912,28 @@ def build_asset_selection_job( ) all_included_assets = [*included_assets, *included_source_assets] - executable_assets = [asset for asset in all_included_assets if asset.is_executable] - loadable_assets = [ - *(asset for asset in all_included_assets if not asset.is_executable), - *(asset for asset in source_assets if asset not in included_source_assets), - *excluded_assets, + executable_assets_defs = [asset for asset in all_included_assets if asset.is_executable] + unexecutable_assets_defs = [ + unexecutable_ad + for ad in ( + *(asset for asset in all_included_assets if not asset.is_executable), + *( + create_external_asset_from_source_asset(sa) + for sa in source_assets + if sa not in included_source_assets + ), + *excluded_assets, + ) + for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad) ] - final_asset_checks = included_checks_defs + asset_graph = AssetGraph.from_assets( + [*executable_assets_defs, *unexecutable_assets_defs], included_checks_defs + ) return build_assets_job( name=name, - executable_assets=executable_assets, - asset_checks=final_asset_checks, + asset_graph=asset_graph, config=config, - loadable_assets=loadable_assets, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index e12c98dc1040d..dcea669951b8d 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -66,14 +66,10 @@ def get_base_asset_jobs( executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: if len(asset_graph.all_partitions_defs) == 0: - executable_asset_keys = asset_graph.executable_asset_keys - loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), - loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), - asset_checks=asset_graph.asset_checks_defs, + asset_graph=asset_graph, executor_def=executor_def, resource_defs=resource_defs, ) @@ -85,13 +81,10 @@ def get_base_asset_jobs( *asset_graph.asset_keys_for_partitions_def(partitions_def=partitions_def), *asset_graph.unpartitioned_asset_keys, } - loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), - loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), - asset_checks=asset_graph.asset_checks_defs, + asset_graph.get_subset(executable_asset_keys), resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, @@ -102,9 +95,7 @@ def get_base_asset_jobs( def build_assets_job( name: str, - executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]], - loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, - asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, + asset_graph: AssetGraph, resource_defs: Optional[Mapping[str, object]] = None, description: Optional[str] = None, config: Optional[ @@ -126,11 +117,7 @@ def build_assets_job( Args: name (str): The name of the job. - executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets - to be executed by the job. SourceAssets must be observable. - loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of - AssetsDefinitions or SourceAssets that are not exectued by this job, but that are - available to be loaded as inputs by executable assets. + asset_graph (AssetGraph): The asset graph that contains the assets and checks to be executed. resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in this job. description (Optional[str]): A description of the job. @@ -156,18 +143,6 @@ def asset2(asset1): from dagster._core.execution.build_resources import wrap_resources_for_execution check.str_param(name, "name") - check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset)) - for asset in executable_assets: - if not asset.is_executable: - keys = [asset.key] if isinstance(asset, SourceAsset) else asset.keys - check.failed(f"Passed unexecutable keys to executable_assets: {keys}") - - loadable_assets = check.opt_sequence_param( - loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) - ) - asset_checks = check.opt_sequence_param( - asset_checks, "asset_checks", of_type=AssetChecksDefinition - ) check.opt_str_param(description, "description") check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData) @@ -175,15 +150,11 @@ def asset2(asset1): resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs) wrapped_resource_defs = wrap_resources_for_execution(resource_defs) - assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)] + assets = asset_graph.assets_defs_for_keys(asset_graph.executable_asset_keys) resolved_source_assets = [ - asset for asset in executable_assets if isinstance(asset, SourceAsset) + ad.to_source_asset() for ad in asset_graph.assets_defs_for_keys(asset_graph.all_asset_keys) ] - for asset in loadable_assets or []: - if isinstance(asset, AssetsDefinition): - resolved_source_assets += asset.to_source_assets() - elif isinstance(asset, SourceAsset): - resolved_source_assets.append(asset) + asset_checks = asset_graph.asset_checks_defs # figure out what partitions (if any) exist for this job partitions_def = partitions_def or build_job_partitions_from_assets(assets) diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 37a0d2ff4df06..2469e2a63c551 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -181,3 +181,12 @@ def _shim_assets_def(context: AssetExecutionContext): return return_value return _shim_assets_def + + +def create_unexecutable_external_assets_from_assets_def( + assets_def: AssetsDefinition, +) -> Sequence[AssetsDefinition]: + if not assets_def.is_executable: + return [assets_def] + else: + return [create_external_asset_from_source_asset(sa) for sa in assets_def.to_source_assets()]