Skip to content

Commit

Permalink
[external-assets] Change build_assets_job interface to use assets_to_…
Browse files Browse the repository at this point in the history
…execute
  • Loading branch information
smackesey committed Feb 20, 2024
1 parent 9e942ff commit 78e4dc5
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 72 deletions.
25 changes: 13 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,23 +894,24 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks_defs) > 0:
# Job materializes assets and/or executes checks
final_assets = included_assets
final_asset_checks = included_checks_defs
final_source_assets = [*source_assets, *excluded_assets]
else:
# Job only observes source assets
final_assets = []
final_asset_checks = []
final_source_assets = included_source_assets
included_observables = [asset for asset in included_source_assets if asset.is_observable]
final_assets = [*included_assets, *included_observables]
final_asset_checks = included_checks_defs
final_source_assets = [
*(
source_asset
for source_asset in source_assets
if source_asset not in included_observables
),
*excluded_assets,
]

return build_assets_job(
name=name,
assets=final_assets,
executable_assets=final_assets,
asset_checks=final_asset_checks,
config=config,
source_assets=final_source_assets,
loadable_assets=final_source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
105 changes: 55 additions & 50 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,60 +68,65 @@ def get_base_asset_jobs(
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[AssetsDefinition]
executable_assets = [a for a in (*assets, *source_assets) if a.is_executable]
unexecutable_assets = [a for a in (*assets, *source_assets) if not a.is_executable]

executable_assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[Union[AssetsDefinition, SourceAsset]]
] = defaultdict(list)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
for asset in executable_assets:
executable_assets_by_partitions_def[asset.partitions_def].append(asset)
# sort to ensure some stability in the ordering
all_partitions_defs = sorted(
[p for p in executable_assets_by_partitions_def.keys() if p], key=repr
)

if len(all_partitions_defs) == 0:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
executable_assets=executable_assets,
loadable_assets=unexecutable_assets,
asset_checks=asset_checks,
source_assets=source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
}
unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, [])
jobs = []

# sort to ensure some stability in the ordering
for i, (partitions_def, assets_with_partitions) in enumerate(
sorted(partitioned_assets_by_partitions_def.items(), key=lambda item: repr(item[0]))
):
for i, partitions_def in enumerate(all_partitions_defs):
# all base jobs contain all unpartitioned assets
executable_assets_for_job = [
*executable_assets_by_partitions_def[partitions_def],
*unpartitioned_executable_assets,
]
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
executable_assets=executable_assets_for_job,
loadable_assets=[
*(
asset
for asset in executable_assets
if asset not in executable_assets_for_job
),
*unexecutable_assets,
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
# Only explicitly set partitions_def for observable-only jobs since it can't be
# auto-detected from the passed assets (which is an empty list).
partitions_def=partitions_def if len(assets_with_partitions) == 0 else None,
partitions_def=partitions_def,
)
)
return jobs


def build_assets_job(
name: str,
assets: Sequence[AssetsDefinition],
source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]],
loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
resource_defs: Optional[Mapping[str, object]] = None,
description: Optional[str] = None,
Expand All @@ -144,11 +149,11 @@ def build_assets_job(
Args:
name (str): The name of the job.
assets (List[AssetsDefinition]): A list of assets or
multi-assets - usually constructed using the :py:func:`@asset` or :py:func:`@multi_asset`
decorator.
source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of
assets that are not materialized by this job, but that assets in this job depend on.
executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets
to be executed by the job. SourceAssets must be observable.
loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of
AssetsDefinitions or SourceAssets that are not exectued by this job, but that are
available to be loaded as inputs by executable assets.
resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in
this job.
description (Optional[str]): A description of the job.
Expand All @@ -174,40 +179,40 @@ def asset2(asset1):
from dagster._core.execution.build_resources import wrap_resources_for_execution

check.str_param(name, "name")
check.iterable_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset))
source_assets = check.opt_sequence_param(
source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset))
loadable_assets = check.opt_sequence_param(
loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
asset_checks = check.opt_sequence_param(
asset_checks, "asset_checks", of_type=AssetChecksDefinition
)
check.opt_str_param(description, "description")
check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs)
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

# turn any AssetsDefinitions into SourceAssets
resolved_source_assets: List[SourceAsset] = []
for asset in source_assets or []:
assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)]
source_assets = [asset for asset in executable_assets if isinstance(asset, SourceAsset)]
for asset in loadable_assets or []:
if isinstance(asset, AssetsDefinition):
resolved_source_assets += asset.to_source_assets()
source_assets += asset.to_source_assets()
elif isinstance(asset, SourceAsset):
resolved_source_assets.append(asset)
source_assets.append(asset)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets)
deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
)

# attempt to resolve cycles using multi-asset subsetting
if _has_cycles(deps):
assets = _attempt_resolve_cycles(assets, resolved_source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)
assets = _attempt_resolve_cycles(assets, source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets)

deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
Expand Down Expand Up @@ -245,14 +250,14 @@ def asset2(asset1):
asset_layer = AssetLayer.from_graph_and_assets_node_mapping(
graph_def=graph,
asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle,
source_assets=resolved_source_assets,
source_assets=source_assets,
resolved_asset_deps=resolved_asset_deps,
assets_defs_by_outer_node_handle=assets_defs_by_node_handle,
observable_source_assets_by_node_handle=observable_source_assets_by_node_handle,
)

all_resource_defs = get_all_resource_defs(
assets, asset_checks, resolved_source_assets, wrapped_resource_defs
assets, asset_checks, source_assets, wrapped_resource_defs
)

if _asset_selection_data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,11 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:
assert result_one.output_for_node("an_asset") == "hardcoded-computed-2021-01-02"

shimmed_source_asset = create_external_asset_from_source_asset(source_asset)
defs_with_shim = Definitions(
assets=[create_external_asset_from_source_asset(source_asset), an_asset]
)
defs_with_shim = Definitions(assets=[shimmed_source_asset, an_asset])

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset, shimmed_source_asset)
job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset)

result_two = job_def_with_shim.execute_in_process(
instance=instance,
Expand Down Expand Up @@ -284,9 +282,9 @@ def _generated_asset_def(context: AssetExecutionContext):
defs = Definitions(assets=[_generated_asset_def])
assert defs

assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[
AssetKey("downstream_asset")
] == {AssetKey("upstream_asset")}
assert defs.get_asset_graph().asset_dep_graph["upstream"][downstream_asset.key] == {
upstream_asset.key
}


def test_external_assets_with_dependencies() -> None:
Expand All @@ -296,6 +294,6 @@ def test_external_assets_with_dependencies() -> None:
defs = Definitions(assets=external_assets_from_specs([upstream_asset, downstream_asset]))
assert defs

assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[
AssetKey("downstream_asset")
] == {AssetKey("upstream_asset")}
assert defs.get_asset_graph().asset_dep_graph["upstream"][downstream_asset.key] == {
upstream_asset.key
}

0 comments on commit 78e4dc5

Please sign in to comment.