diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index ea27c80eb7e92..300d1c77e7379 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -894,23 +894,24 @@ def build_asset_selection_job( f"{partitions_def}.", ) - if len(included_assets) or len(included_checks_defs) > 0: - # Job materializes assets and/or executes checks - final_assets = included_assets - final_asset_checks = included_checks_defs - final_source_assets = [*source_assets, *excluded_assets] - else: - # Job only observes source assets - final_assets = [] - final_asset_checks = [] - final_source_assets = included_source_assets + included_observables = [asset for asset in included_source_assets if asset.is_observable] + final_assets = [*included_assets, *included_observables] + final_asset_checks = included_checks_defs + final_source_assets = [ + *( + source_asset + for source_asset in source_assets + if source_asset not in included_observables + ), + *excluded_assets, + ] return build_assets_job( name=name, - assets=final_assets, + executable_assets=final_assets, asset_checks=final_asset_checks, config=config, - source_assets=final_source_assets, + loadable_assets=final_source_assets, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index 3696f7c25ad7f..e7c2762da1727 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -68,51 +68,56 @@ def get_base_asset_jobs( resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - assets_by_partitions_def: Dict[ - Optional[PartitionsDefinition], List[AssetsDefinition] + executable_assets = [a for a in (*assets, *source_assets) if a.is_executable] + unexecutable_assets = [a for a in (*assets, *source_assets) if not a.is_executable] + + executable_assets_by_partitions_def: Dict[ + Optional[PartitionsDefinition], List[Union[AssetsDefinition, SourceAsset]] ] = defaultdict(list) - for assets_def in assets: - assets_by_partitions_def[assets_def.partitions_def].append(assets_def) - - # We need to create "empty" jobs for each partitions def that is used by an observable but no - # materializable asset. They are empty because we don't assign the source asset to the `assets`, - # but rather the `source_assets` argument of `build_assets_job`. - for observable in [sa for sa in source_assets if sa.is_observable]: - if observable.partitions_def not in assets_by_partitions_def: - assets_by_partitions_def[observable.partitions_def] = [] - if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}: + for asset in executable_assets: + executable_assets_by_partitions_def[asset.partitions_def].append(asset) + # sort to ensure some stability in the ordering + all_partitions_defs = sorted( + [p for p in executable_assets_by_partitions_def.keys() if p], key=repr + ) + + if len(all_partitions_defs) == 0: return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - assets=assets, + executable_assets=executable_assets, + loadable_assets=unexecutable_assets, asset_checks=asset_checks, - source_assets=source_assets, executor_def=executor_def, resource_defs=resource_defs, ) ] else: - unpartitioned_assets = assets_by_partitions_def.get(None, []) - partitioned_assets_by_partitions_def = { - k: v for k, v in assets_by_partitions_def.items() if k is not None - } + unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, []) jobs = [] - # sort to ensure some stability in the ordering - for i, (partitions_def, assets_with_partitions) in enumerate( - sorted(partitioned_assets_by_partitions_def.items(), key=lambda item: repr(item[0])) - ): + for i, partitions_def in enumerate(all_partitions_defs): + # all base jobs contain all unpartitioned assets + executable_assets_for_job = [ + *executable_assets_by_partitions_def[partitions_def], + *unpartitioned_executable_assets, + ] jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - assets=[*assets_with_partitions, *unpartitioned_assets], - source_assets=[*source_assets, *assets], + executable_assets=executable_assets_for_job, + loadable_assets=[ + *( + asset + for asset in executable_assets + if asset not in executable_assets_for_job + ), + *unexecutable_assets, + ], asset_checks=asset_checks, resource_defs=resource_defs, executor_def=executor_def, - # Only explicitly set partitions_def for observable-only jobs since it can't be - # auto-detected from the passed assets (which is an empty list). - partitions_def=partitions_def if len(assets_with_partitions) == 0 else None, + partitions_def=partitions_def, ) ) return jobs @@ -120,8 +125,8 @@ def get_base_asset_jobs( def build_assets_job( name: str, - assets: Sequence[AssetsDefinition], - source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, + executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]], + loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, resource_defs: Optional[Mapping[str, object]] = None, description: Optional[str] = None, @@ -144,11 +149,11 @@ def build_assets_job( Args: name (str): The name of the job. - assets (List[AssetsDefinition]): A list of assets or - multi-assets - usually constructed using the :py:func:`@asset` or :py:func:`@multi_asset` - decorator. - source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of - assets that are not materialized by this job, but that assets in this job depend on. + executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets + to be executed by the job. SourceAssets must be observable. + loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of + AssetsDefinitions or SourceAssets that are not exectued by this job, but that are + available to be loaded as inputs by executable assets. resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in this job. description (Optional[str]): A description of the job. @@ -174,9 +179,9 @@ def asset2(asset1): from dagster._core.execution.build_resources import wrap_resources_for_execution check.str_param(name, "name") - check.iterable_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) - source_assets = check.opt_sequence_param( - source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) + check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset)) + loadable_assets = check.opt_sequence_param( + loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) ) asset_checks = check.opt_sequence_param( asset_checks, "asset_checks", of_type=AssetChecksDefinition @@ -184,30 +189,30 @@ def asset2(asset1): check.opt_str_param(description, "description") check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData) - # figure out what partitions (if any) exist for this job - partitions_def = partitions_def or build_job_partitions_from_assets(assets) - resource_defs = check.opt_mapping_param(resource_defs, "resource_defs") resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs) wrapped_resource_defs = wrap_resources_for_execution(resource_defs) - # turn any AssetsDefinitions into SourceAssets - resolved_source_assets: List[SourceAsset] = [] - for asset in source_assets or []: + assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)] + source_assets = [asset for asset in executable_assets if isinstance(asset, SourceAsset)] + for asset in loadable_assets or []: if isinstance(asset, AssetsDefinition): - resolved_source_assets += asset.to_source_assets() + source_assets += asset.to_source_assets() elif isinstance(asset, SourceAsset): - resolved_source_assets.append(asset) + source_assets.append(asset) + + # figure out what partitions (if any) exist for this job + partitions_def = partitions_def or build_job_partitions_from_assets(assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) + resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( assets, asset_checks, resolved_asset_deps ) # attempt to resolve cycles using multi-asset subsetting if _has_cycles(deps): - assets = _attempt_resolve_cycles(assets, resolved_source_assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) + assets = _attempt_resolve_cycles(assets, source_assets) + resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( assets, asset_checks, resolved_asset_deps @@ -245,14 +250,14 @@ def asset2(asset1): asset_layer = AssetLayer.from_graph_and_assets_node_mapping( graph_def=graph, asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, - source_assets=resolved_source_assets, + source_assets=source_assets, resolved_asset_deps=resolved_asset_deps, assets_defs_by_outer_node_handle=assets_defs_by_node_handle, observable_source_assets_by_node_handle=observable_source_assets_by_node_handle, ) all_resource_defs = get_all_resource_defs( - assets, asset_checks, resolved_source_assets, wrapped_resource_defs + assets, asset_checks, source_assets, wrapped_resource_defs ) if _asset_selection_data: diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index d164d7e349af6..3914dd1442632 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -209,13 +209,11 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str: assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02" shimmed_source_asset = create_external_asset_from_source_asset(source_asset) - defs_with_shim = Definitions( - assets=[create_external_asset_from_source_asset(source_asset), an_asset] - ) + defs_with_shim = Definitions(assets=[shimmed_source_asset, an_asset]) assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition) - job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset) + job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset) result_two = job_def_with_shim.execute_in_process( instance=instance, @@ -284,9 +282,9 @@ def _generated_asset_def(context: AssetExecutionContext): defs = Definitions(assets=[_generated_asset_def]) assert defs - assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[ - AssetKey("downstream_asset") - ] == {AssetKey("upstream_asset")} + assert defs.get_asset_graph().asset_dep_graph["upstream"][downstream_asset.key] == { + upstream_asset.key + } def test_external_assets_with_dependencies() -> None: @@ -296,6 +294,6 @@ def test_external_assets_with_dependencies() -> None: defs = Definitions(assets=external_assets_from_specs([upstream_asset, downstream_asset])) assert defs - assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[ - AssetKey("downstream_asset") - ] == {AssetKey("upstream_asset")} + assert defs.get_asset_graph().asset_dep_graph["upstream"][downstream_asset.key] == { + upstream_asset.key + }