diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 0fbe21c90d760..5c7529fcad86b 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -1,8 +1,10 @@ from functools import cached_property from typing import AbstractSet, Iterable, Mapping, Optional, Sequence, Union +import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.asset_layer import subset_assets_defs from dagster._core.definitions.asset_spec import ( SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET, AssetSpec, @@ -215,7 +217,7 @@ def from_assets( # Build the set of AssetNodes. Each node holds key rather than object references to parent # and child nodes. - dep_graph = generate_asset_dep_graph(assets_defs, []) + dep_graph = generate_asset_dep_graph(assets_defs) asset_nodes_by_key = { key: AssetNode( key=key, @@ -294,3 +296,54 @@ def asset_checks_defs(self) -> Sequence[AssetChecksDefinition]: if isinstance(acd, AssetChecksDefinition) } ) + + @cached_property + def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: + return { + *(key for ad in self.assets_defs for key in ad.check_keys), + *(key for acd in self.asset_checks_defs for key in acd.keys), + } + + def get_subset( + self, + executable_asset_keys: AbstractSet[AssetKey], + asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None, + ) -> "AssetGraph": + """Returns a new asset graph where only the provided asset keys are executable. All parent + keys of a selected executabel asset will be included as unexecutable external assets (unless + they are themselves present in the executable selection). + """ + from dagster._core.definitions.external_asset import ( + create_unexecutable_external_assets_from_assets_def, + ) + + invalid_executable_keys = executable_asset_keys - self.executable_asset_keys + check.invariant( + not invalid_executable_keys, + "Provided executable asset keys must be a subset of existing executable asset keys." + f" Invalid provided keys: {invalid_executable_keys}", + ) + + # loadable_asset_keys = { + # parent_key for key in executable_asset_keys for parent_key in self.get(key).parent_keys + # } - executable_asset_keys + # full_executable_assets_defs = {self.get(key).assets_def for key in executable_asset_keys} + executable_assets_defs, raw_loadable_assets_defs = subset_assets_defs( + self.assets_defs, executable_asset_keys, asset_check_keys + ) + loadable_assets_defs = [ + unexecutable_ad + for ad in raw_loadable_assets_defs + for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad) + ] + + # ignore check keys that don't correspond to an AssetChecksDefinition + asset_checks_defs = list( + { + acd + for key, acd in self._asset_check_compute_defs_by_key.items() + if key in (asset_check_keys or []) and isinstance(acd, AssetChecksDefinition) + } + ) + + return self.from_assets([*executable_assets_defs, *loadable_assets_defs], asset_checks_defs) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index f1b29903aac3d..a71a889004480 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -44,6 +44,7 @@ from .resource_definition import ResourceDefinition if TYPE_CHECKING: + from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets import AssetsDefinition, SourceAsset from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey from dagster._core.definitions.job_definition import JobDefinition @@ -407,8 +408,7 @@ def from_graph_and_assets_node_mapping( graph_def: GraphDefinition, assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"], asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"], - observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"], - source_assets: Sequence["SourceAsset"], + asset_graph: "AssetGraph", ) -> "AssetLayer": """Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the corresponding AssetsDefinition objects. @@ -419,19 +419,10 @@ def from_graph_and_assets_node_mapping( assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up. """ - from dagster._core.definitions.external_asset import ( - create_external_asset_from_source_asset, + unexecutable_assets_defs = asset_graph.assets_defs_for_keys( + asset_graph.unexecutable_asset_keys ) - # Eliminate source assets here - observable_assets_defs_by_node_handle = { - k: create_external_asset_from_source_asset(sa) - for k, sa in observable_source_assets_by_node_handle.items() - } - unexecutable_assets_defs = [ - create_external_asset_from_source_asset(sa) for sa in source_assets - ] - asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {} asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {} check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {} @@ -571,32 +562,11 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: key: assets_def for assets_def in ( *assets_defs_by_outer_node_handle.values(), - *observable_assets_defs_by_node_handle.values(), *unexecutable_assets_defs, ) for key in assets_def.keys } - for node_handle, assets_def in observable_assets_defs_by_node_handle.items(): - node_def = assets_def.node_def - check.invariant(len(node_def.output_defs) == 1) - output_name = node_def.output_defs[0].name - # resolve graph output to the op output it comes from - inner_output_def, inner_node_handle = node_def.resolve_output_to_origin( - output_name, handle=node_handle - ) - node_output_handle = NodeOutputHandle( - check.not_none(inner_node_handle), inner_output_def.name - ) - - asset_info_by_output[node_output_handle] = AssetOutputInfo( - assets_def.key, - partitions_fn=None, - partitions_def=assets_def.partitions_def, - is_required=True, - code_version=inner_output_def.code_version, - ) - assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = { # nodes for assets **{ @@ -848,7 +818,6 @@ def my_graph(): def build_asset_selection_job( name: str, assets: Iterable["AssetsDefinition"], - source_assets: Iterable["SourceAsset"], asset_checks: Iterable["AssetChecksDefinition"], executor_def: Optional[ExecutorDefinition] = None, config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig"]] = None, @@ -863,22 +832,24 @@ def build_asset_selection_job( hooks: Optional[AbstractSet[HookDefinition]] = None, op_retry_policy: Optional[RetryPolicy] = None, ) -> "JobDefinition": + from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.assets_job import build_assets_job + from dagster._core.definitions.external_asset import ( + create_unexecutable_external_assets_from_assets_def, + ) if asset_selection is None and asset_check_selection is None: # no selections, include everything included_assets = list(assets) excluded_assets = [] - included_source_assets = list(source_assets) included_checks_defs = list(asset_checks) else: # Filter to assets that match either selected assets or include a selected check. # E.g. a multi asset can be included even if it's not in asset_selection, if it has a selected check # defined with check_specs - (included_assets, excluded_assets) = _subset_assets_defs( + (included_assets, excluded_assets) = subset_assets_defs( assets, asset_selection or set(), asset_check_selection ) - included_source_assets = _subset_source_assets(source_assets, asset_selection or set()) if asset_check_selection is None: # If assets were selected and checks are None, then include all checks on the selected assets. @@ -906,21 +877,23 @@ def build_asset_selection_job( f"{partitions_def}.", ) - all_included_assets = [*included_assets, *included_source_assets] - executable_assets = [asset for asset in all_included_assets if asset.is_executable] - loadable_assets = [ - *(asset for asset in all_included_assets if not asset.is_executable), - *(asset for asset in source_assets if asset not in included_source_assets), - *excluded_assets, + executable_assets_defs = [asset for asset in included_assets if asset.is_executable] + unexecutable_assets_defs = [ + unexecutable_ad + for ad in ( + *(asset for asset in included_assets if not asset.is_executable), + *excluded_assets, + ) + for unexecutable_ad in create_unexecutable_external_assets_from_assets_def(ad) ] - final_asset_checks = included_checks_defs + asset_graph = AssetGraph.from_assets( + [*executable_assets_defs, *unexecutable_assets_defs], included_checks_defs + ) return build_assets_job( name=name, - executable_assets=executable_assets, - asset_checks=final_asset_checks, + asset_graph=asset_graph, config=config, - loadable_assets=loadable_assets, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, @@ -933,7 +906,7 @@ def build_asset_selection_job( ) -def _subset_assets_defs( +def subset_assets_defs( assets: Iterable["AssetsDefinition"], selected_asset_keys: AbstractSet[AssetKey], selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], @@ -958,7 +931,7 @@ def _subset_assets_defs( # if no checks were selected, filter to checks that target selected assets else: selected_check_subset = { - handle for handle in asset.check_keys if handle.asset_key in selected_subset + key for key in asset.check_keys if key.asset_key in selected_subset } # all assets in this def are selected diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 08066bf2c452c..ae6f9fcee65b4 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1112,7 +1112,6 @@ def with_attributes( Union[AutoMaterializePolicy, Mapping[AssetKey, AutoMaterializePolicy]] ] = None, backfill_policy: Optional[BackfillPolicy] = None, - is_subset: bool = False, check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None, ) -> "AssetsDefinition": @@ -1277,7 +1276,7 @@ def with_attributes( **self._descriptions_by_key, **replaced_descriptions_by_key, }, - is_subset=is_subset, + is_subset=self.is_subset, check_specs_by_output_name=check_specs_by_output_name if check_specs_by_output_name else self.check_specs_by_output_name, diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index e12c98dc1040d..5008deb598ef6 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -5,6 +5,7 @@ Any, Dict, Iterable, + List, Mapping, Optional, Sequence, @@ -66,14 +67,10 @@ def get_base_asset_jobs( executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: if len(asset_graph.all_partitions_defs) == 0: - executable_asset_keys = asset_graph.executable_asset_keys - loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), - loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), - asset_checks=asset_graph.asset_checks_defs, + asset_graph=asset_graph, executor_def=executor_def, resource_defs=resource_defs, ) @@ -85,13 +82,10 @@ def get_base_asset_jobs( *asset_graph.asset_keys_for_partitions_def(partitions_def=partitions_def), *asset_graph.unpartitioned_asset_keys, } - loadable_asset_keys = asset_graph.all_asset_keys - executable_asset_keys jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - executable_assets=asset_graph.assets_defs_for_keys(executable_asset_keys), - loadable_assets=asset_graph.assets_defs_for_keys(loadable_asset_keys), - asset_checks=asset_graph.asset_checks_defs, + asset_graph.get_subset(executable_asset_keys), resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, @@ -102,9 +96,7 @@ def get_base_asset_jobs( def build_assets_job( name: str, - executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]], - loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, - asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, + asset_graph: AssetGraph, resource_defs: Optional[Mapping[str, object]] = None, description: Optional[str] = None, config: Optional[ @@ -126,11 +118,7 @@ def build_assets_job( Args: name (str): The name of the job. - executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets - to be executed by the job. SourceAssets must be observable. - loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of - AssetsDefinitions or SourceAssets that are not exectued by this job, but that are - available to be loaded as inputs by executable assets. + asset_graph (AssetGraph): The asset graph that contains the assets and checks to be executed. resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in this job. description (Optional[str]): A description of the job. @@ -155,70 +143,38 @@ def asset2(asset1): """ from dagster._core.execution.build_resources import wrap_resources_for_execution - check.str_param(name, "name") - check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset)) - for asset in executable_assets: - if not asset.is_executable: - keys = [asset.key] if isinstance(asset, SourceAsset) else asset.keys - check.failed(f"Passed unexecutable keys to executable_assets: {keys}") - - loadable_assets = check.opt_sequence_param( - loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) - ) - asset_checks = check.opt_sequence_param( - asset_checks, "asset_checks", of_type=AssetChecksDefinition - ) - check.opt_str_param(description, "description") - check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData) - resource_defs = check.opt_mapping_param(resource_defs, "resource_defs") resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs) wrapped_resource_defs = wrap_resources_for_execution(resource_defs) - assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)] - resolved_source_assets = [ - asset for asset in executable_assets if isinstance(asset, SourceAsset) - ] - for asset in loadable_assets or []: - if isinstance(asset, AssetsDefinition): - resolved_source_assets += asset.to_source_assets() - elif isinstance(asset, SourceAsset): - resolved_source_assets.append(asset) - # figure out what partitions (if any) exist for this job - partitions_def = partitions_def or build_job_partitions_from_assets(assets) + partitions_def = partitions_def or build_job_partitions_from_assets( + asset_graph.assets_defs_for_keys(asset_graph.executable_asset_keys) + ) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks + asset_graph, ) # attempt to resolve cycles using multi-asset subsetting if _has_cycles(deps): - assets = _attempt_resolve_cycles(assets, resolved_source_assets) - + asset_graph = _attempt_resolve_node_cycles(asset_graph) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, - asset_checks, + asset_graph, ) - if len(assets) > 0 or len(asset_checks) > 0: - node_defs = [ - *(asset.node_def for asset in assets), - *(asset_check.node_def for asset_check in asset_checks), - ] - observable_source_assets_by_node_handle = {} - else: - node_defs = [] - observable_source_assets_by_node_handle: Mapping[NodeHandle, SourceAsset] = {} - for asset in resolved_source_assets: - if ( - isinstance(asset, SourceAsset) - and asset.is_observable - and asset.node_def is not None - ): - node_defs.append(asset.node_def) - node_handle = NodeHandle(asset.node_def.name, parent=None) - observable_source_assets_by_node_handle[node_handle] = asset + node_defs = [ + *( + asset.node_def + for asset in asset_graph.assets_defs_for_keys( + [ + *asset_graph.executable_asset_keys, + *asset_graph.asset_check_keys, + ] + ) + ), + *(asset_check.node_def for asset_check in asset_graph.asset_checks_defs), + ] graph = GraphDefinition( name=name, @@ -233,14 +189,11 @@ def asset2(asset1): asset_layer = AssetLayer.from_graph_and_assets_node_mapping( graph_def=graph, asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, - source_assets=resolved_source_assets, assets_defs_by_outer_node_handle=assets_defs_by_node_handle, - observable_source_assets_by_node_handle=observable_source_assets_by_node_handle, + asset_graph=asset_graph, ) - all_resource_defs = get_all_resource_defs( - assets, asset_checks, resolved_source_assets, wrapped_resource_defs - ) + all_resource_defs = get_all_resource_defs(asset_graph, wrapped_resource_defs) if _asset_selection_data: original_job = _asset_selection_data.parent_job_def @@ -337,15 +290,15 @@ def _get_blocking_asset_check_output_handles_by_asset_key( def build_node_deps( - assets_defs: Iterable[AssetsDefinition], - asset_checks_defs: Sequence[AssetChecksDefinition], + asset_graph: AssetGraph, ) -> Tuple[ DependencyMapping[NodeInvocation], Mapping[NodeHandle, AssetsDefinition], Mapping[NodeHandle, AssetChecksDefinition], ]: # sort so that nodes get a consistent name - assets_defs = sorted(assets_defs, key=lambda ad: (sorted((ak for ak in ad.keys)))) + assets_defs = sorted(asset_graph.assets_defs, key=lambda ad: (sorted((ak for ak in ad.keys)))) + asset_checks_defs = asset_graph.asset_checks_defs # if the same graph/op is used in multiple assets_definitions, their invocations must have # different names. we keep track of definitions that share a name and add a suffix to their @@ -353,7 +306,7 @@ def build_node_deps( collisions: Dict[str, int] = {} assets_defs_by_node_handle: Dict[NodeHandle, AssetsDefinition] = {} node_alias_and_output_by_asset_key: Dict[AssetKey, Tuple[str, str]] = {} - for assets_def in assets_defs: + for assets_def in (ad for ad in assets_defs if ad.is_executable): node_name = assets_def.node_def.name if collisions.get(node_name): collisions[node_name] += 1 @@ -466,10 +419,7 @@ def _has_cycles( return True -def _attempt_resolve_cycles( - assets_defs: Iterable["AssetsDefinition"], - source_assets: Iterable["SourceAsset"], -) -> Sequence["AssetsDefinition"]: +def _attempt_resolve_node_cycles(asset_graph: AssetGraph) -> AssetGraph: """DFS starting at root nodes to color the asset dependency graph. Each time you leave your current AssetsDefinition, the color increments. @@ -483,76 +433,56 @@ def _attempt_resolve_cycles( This ensures that no asset that shares a node with another asset will be downstream of that asset via a different node (i.e. there will be no cycles). """ - from dagster._core.selector.subset_selector import generate_asset_dep_graph - - # get asset dependencies - asset_deps = generate_asset_dep_graph(assets_defs, source_assets) - - # index AssetsDefinitions by their asset names - assets_defs_by_asset_key: Dict[AssetKey, AssetsDefinition] = {} - for assets_def in assets_defs: - for asset_key in assets_def.keys: - assets_defs_by_asset_key[asset_key] = assets_def - # color for each asset - colors = {} + colors: Dict[AssetKey, int] = {} # recursively color an asset and all of its downstream assets - def _dfs(key, cur_color): + def _dfs(key: AssetKey, cur_color: int): + node = asset_graph.get(key) colors[key] = cur_color - if key in assets_defs_by_asset_key: - cur_node_asset_keys = assets_defs_by_asset_key[key].keys - else: - # in a SourceAsset, treat all downstream as if they're in the same node - cur_node_asset_keys = asset_deps["downstream"][key] + # in an external asset, treat all downstream as if they're in the same node + cur_node_asset_keys = node.assets_def.keys if node.is_materializable else node.child_keys - for downstream_key in asset_deps["downstream"][key]: + for child_key in node.child_keys: # if the downstream asset is in the current node,keep the same color - if downstream_key in cur_node_asset_keys: - new_color = cur_color - else: - new_color = cur_color + 1 + new_color = cur_color if child_key in cur_node_asset_keys else cur_color + 1 # if current color of the downstream asset is less than the new color, re-do dfs - if colors.get(downstream_key, -1) < new_color: - _dfs(downstream_key, new_color) - - # validate that there are no cycles in the overall asset graph - toposorted = list(toposort(asset_deps["upstream"])) + if colors.get(child_key, -1) < new_color: + _dfs(child_key, new_color) - # dfs for each root node - for root_name in toposorted[0]: - _dfs(root_name, 0) + # dfs for each root node; will throw an error if there are key-level cycles + root_keys = asset_graph.toposorted_asset_keys_by_level[0] + for key in root_keys: + _dfs(key, 0) color_mapping_by_assets_defs: Dict[AssetsDefinition, Any] = defaultdict( lambda: defaultdict(set) ) for key, color in colors.items(): - # ignore source assets - if key not in assets_defs_by_asset_key: - continue - color_mapping_by_assets_defs[assets_defs_by_asset_key[key]][color].add(key) + node = asset_graph.get(key) + color_mapping_by_assets_defs[node.assets_def][color].add(key) - ret = [] + subsetted_assets_defs: List[AssetsDefinition] = [] for assets_def, color_mapping in color_mapping_by_assets_defs.items(): - if len(color_mapping) == 1 or not assets_def.can_subset: - ret.append(assets_def) + if assets_def.is_external or len(color_mapping) == 1 or not assets_def.can_subset: + subsetted_assets_defs.append(assets_def) else: for asset_keys in color_mapping.values(): - ret.append(assets_def.subset_for(asset_keys, selected_asset_check_keys=None)) + subsetted_assets_defs.append( + assets_def.subset_for(asset_keys, selected_asset_check_keys=None) + ) - return ret + return AssetGraph.from_assets(subsetted_assets_defs, asset_graph.asset_checks_defs) def _ensure_resources_dont_conflict( - assets: Iterable[AssetsDefinition], - source_assets: Sequence[SourceAsset], + asset_graph: AssetGraph, resource_defs: Mapping[str, ResourceDefinition], ) -> None: """Ensures that resources between assets, source assets, and provided resource dictionary do not conflict.""" resource_defs_from_assets = {} - all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets] - for asset in all_assets: + for asset in asset_graph.assets_defs: for resource_key, resource_def in asset.resource_defs.items(): if resource_key not in resource_defs_from_assets: resource_defs_from_assets[resource_key] = resource_def @@ -578,44 +508,39 @@ def _ensure_resources_dont_conflict( def check_resources_satisfy_requirements( - assets: Iterable[AssetsDefinition], - source_assets: Sequence[SourceAsset], - asset_checks: Iterable[AssetChecksDefinition], + asset_graph: AssetGraph, resource_defs: Mapping[str, ResourceDefinition], ) -> None: """Ensures that between the provided resources on an asset and the resource_defs mapping, that all resource requirements are satisfied. Note that resources provided on assets cannot satisfy resource requirements provided on other assets. """ - _ensure_resources_dont_conflict(assets, source_assets, resource_defs) + _ensure_resources_dont_conflict(asset_graph, resource_defs) - all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets] - for asset in all_assets: + for assets_def in asset_graph.assets_defs: ensure_requirements_satisfied( - merge_dicts(resource_defs, asset.resource_defs), list(asset.get_resource_requirements()) + merge_dicts(resource_defs, assets_def.resource_defs), + list(assets_def.get_resource_requirements()), ) - for asset_check in asset_checks: + for asset_checks_def in asset_graph.asset_checks_defs: ensure_requirements_satisfied( - merge_dicts(resource_defs, asset_check.resource_defs), - list(asset_check.get_resource_requirements()), + merge_dicts(resource_defs, asset_checks_def.resource_defs), + list(asset_checks_def.get_resource_requirements()), ) def get_all_resource_defs( - assets: Iterable[AssetsDefinition], - asset_checks: Iterable[AssetChecksDefinition], - source_assets: Sequence[SourceAsset], + asset_graph: AssetGraph, resource_defs: Mapping[str, ResourceDefinition], ) -> Mapping[str, ResourceDefinition]: # Ensures that no resource keys conflict, and each asset has its resource requirements satisfied. - check_resources_satisfy_requirements(assets, source_assets, asset_checks, resource_defs) + check_resources_satisfy_requirements(asset_graph, resource_defs) all_resource_defs = dict(resource_defs) - all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets] - for asset in all_assets: - all_resource_defs = merge_dicts(all_resource_defs, asset.resource_defs) + for assets_def in asset_graph.assets_defs: + all_resource_defs = merge_dicts(all_resource_defs, assets_def.resource_defs) - for asset_check in asset_checks: - all_resource_defs = merge_dicts(all_resource_defs, asset_check.resource_defs) + for asset_checks_def in asset_graph.asset_checks_defs: + all_resource_defs = merge_dicts(all_resource_defs, asset_checks_def.resource_defs) return all_resource_defs diff --git a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py index 883da2bfe012a..b96c98d5bdd1c 100644 --- a/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/base_asset_graph.py @@ -197,6 +197,10 @@ def external_asset_keys(self) -> AbstractSet[AssetKey]: def executable_asset_keys(self) -> AbstractSet[AssetKey]: return {node.key for node in self.asset_nodes if node.is_executable} + @cached_property + def unexecutable_asset_keys(self) -> AbstractSet[AssetKey]: + return {node.key for node in self.asset_nodes if not node.is_executable} + @cached_property def toposorted_asset_keys(self) -> Sequence[AssetKey]: """Return topologically sorted asset keys in graph. Keys with the same topological level are diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 37a0d2ff4df06..2469e2a63c551 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -181,3 +181,12 @@ def _shim_assets_def(context: AssetExecutionContext): return return_value return _shim_assets_def + + +def create_unexecutable_external_assets_from_assets_def( + assets_def: AssetsDefinition, +) -> Sequence[AssetsDefinition]: + if not assets_def.is_executable: + return [assets_def] + else: + return [create_external_asset_from_source_asset(sa) for sa in assets_def.to_source_assets()] diff --git a/python_modules/dagster/dagster/_core/definitions/job_definition.py b/python_modules/dagster/dagster/_core/definitions/job_definition.py index 030dd5e804328..705bc41172bee 100644 --- a/python_modules/dagster/dagster/_core/definitions/job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/job_definition.py @@ -815,7 +815,6 @@ def _get_job_def_for_asset_selection( new_job = build_asset_selection_job( name=self.name, assets=self.asset_layer.assets_defs, - source_assets=[], executor_def=self.executor_def, resource_defs=self.resource_defs, description=self.description, diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index c654c25cb9544..dd246b664d154 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -223,7 +223,6 @@ def resolve( assets=assets, asset_checks=asset_graph.asset_checks_defs, config=self.config, - source_assets=[], description=self.description, tags=self.tags, metadata=self.metadata, diff --git a/python_modules/dagster/dagster/_core/selector/subset_selector.py b/python_modules/dagster/dagster/_core/selector/subset_selector.py index e270b361c35c9..ba59ef2e47dcc 100644 --- a/python_modules/dagster/dagster/_core/selector/subset_selector.py +++ b/python_modules/dagster/dagster/_core/selector/subset_selector.py @@ -32,7 +32,6 @@ from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.job_definition import JobDefinition - from dagster._core.definitions.source_asset import SourceAsset MAX_NUM = sys.maxsize @@ -116,7 +115,7 @@ def __new__( def generate_asset_dep_graph( - assets_defs: Iterable["AssetsDefinition"], source_assets: Iterable["SourceAsset"] + assets_defs: Iterable["AssetsDefinition"], ) -> DependencyGraph[AssetKey]: upstream: Dict[AssetKey, Set[AssetKey]] = {} downstream: Dict[AssetKey, Set[AssetKey]] = {} @@ -483,7 +482,7 @@ def parse_asset_selection( if len(asset_selection) == 1 and asset_selection[0] == "*": return {key for ad in assets_defs for key in ad.keys} - graph = generate_asset_dep_graph(assets_defs, []) + graph = generate_asset_dep_graph(assets_defs) assets_set: Set[AssetKey] = set() # loop over clauses diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index d6018b63e2820..719186707e60e 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -53,7 +53,7 @@ ) from dagster._core.instance import DagsterInstance from dagster._core.storage.mem_io_manager import InMemoryIOManager -from dagster._core.test_utils import instance_for_test +from dagster._core.test_utils import create_test_asset_job, instance_for_test from dagster._core.types.dagster_type import Nothing @@ -1274,14 +1274,10 @@ def three(): out_2, out_3 = add_one(reused_output) return {"asset_one": out_1, "asset_two": out_2, "asset_three": out_3} - asset_job = define_asset_job("yay").resolve( - asset_graph=AssetGraph.from_assets( - [AssetsDefinition.from_graph(three, can_subset=True)], - ) - ) + job_def = create_test_asset_job([AssetsDefinition.from_graph(three, can_subset=True)]) with instance_for_test() as instance: - result = asset_job.execute_in_process(asset_selection=asset_selection, instance=instance) + result = job_def.execute_in_process(asset_selection=asset_selection, instance=instance) assert result.success assert ( get_num_events(instance, result.run_id, DagsterEventType.ASSET_MATERIALIZATION) diff --git a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py index 190832ec08efb..1918fbd0e4739 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py @@ -119,7 +119,6 @@ def f(): result = build_asset_selection_job( "materialize_job", assets=all_assets, - source_assets=[], asset_selection=AssetSelection.keys(*(AssetKey(c) for c in to_materialize)).resolve( all_assets ),