Skip to content

Commit

Permalink
[external-assets] Change build_assets_job interface to use `executabl…
Browse files Browse the repository at this point in the history
…e_assets` and `loadable_assets` (#19765)

## Summary & Motivation

`build_assets_job` (an internal API) is the codepath through which all
asset job construction flows. Currently, it accepts two lists of assets:

- `assets: Sequence[AssetsDefinition]`
- `source_assets: Sequence[Union[AssetsDefinition, SourceAsset]]`

This is quite confusing. In particular, `source_assets` is poorly named.
Not only does it contain `AssetsDefinition` (which gets internally
converted to `SourceAsset` inside the function) but it also contains
observable source assets that actually have nodes that will be executed.

This PR changes `build_assets_job` to accept:

- `executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]]`
- `loadable_assets: Sequence[Union[AssetsDefinition, SourceAsset]]`

This makes the purpose of these two assets arrays more clear. It can be
considered an incremental step in the source asset -> external asset
refactor.

The PR leaves the internal logic of `build_assets_job` unchanged (though
it will be changed upstack), passing separate collections of
`AssetsDefinition` and `SourceAsset` further into the system per the
status quo.

## How I Tested These Changes

Existing test suite.

A few GQL snapshots are updated due to minor changes in the asset base
jobs in the GQL repo. Specifically, the unexecutable external asset is
now no longer included in any of the base jobs where previously it was
included in all of them (as all unpartitioned assets defs were). This is
not a function of change to the `build_assets_job` logic but rather to
the base job construction routine (which is one of the two
`build_assets_job` callsites).
  • Loading branch information
smackesey authored and PedramNavid committed Mar 28, 2024
1 parent 935da3f commit a28495c
Show file tree
Hide file tree
Showing 10 changed files with 2,765 additions and 3,316 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4826,94 +4826,6 @@
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
'name': 'unexecutable_asset',
}),
'invocations': list([
dict({
'pipeline': dict({
'name': '__ASSET_JOB_0',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_1',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_2',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_3',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_4',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_5',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_6',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_7',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_8',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
dict({
'pipeline': dict({
'name': '__ASSET_JOB_9',
}),
'solidHandle': dict({
'handleID': 'unexecutable_asset',
}),
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
Expand Down
22 changes: 10 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,23 +894,21 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks_defs) > 0:
# Job materializes assets and/or executes checks
final_assets = included_assets
final_asset_checks = included_checks_defs
final_source_assets = [*source_assets, *excluded_assets]
else:
# Job only observes source assets
final_assets = []
final_asset_checks = []
final_source_assets = included_source_assets
all_included_assets = [*included_assets, *included_source_assets]
executable_assets = [asset for asset in all_included_assets if asset.is_executable]
loadable_assets = [
*(asset for asset in all_included_assets if not asset.is_executable),
*(asset for asset in source_assets if asset not in included_source_assets),
*excluded_assets,
]
final_asset_checks = included_checks_defs

return build_assets_job(
name=name,
assets=final_assets,
executable_assets=executable_assets,
asset_checks=final_asset_checks,
config=config,
source_assets=final_source_assets,
loadable_assets=loadable_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
6 changes: 1 addition & 5 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,7 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]:
def execution_type(self) -> AssetExecutionType:
value = self._get_external_asset_metadata_value(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if value is None:
return (
AssetExecutionType.UNEXECUTABLE
if len(self.keys) == 0
else AssetExecutionType.MATERIALIZATION
)
return AssetExecutionType.MATERIALIZATION
elif isinstance(value, str):
return AssetExecutionType[value]
else:
Expand Down
100 changes: 56 additions & 44 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,60 +68,65 @@ def get_base_asset_jobs(
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[AssetsDefinition]
executable_assets = [a for a in (*assets, *source_assets) if a.is_executable]
unexecutable_assets = [a for a in (*assets, *source_assets) if not a.is_executable]

executable_assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[Union[AssetsDefinition, SourceAsset]]
] = defaultdict(list)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
for asset in executable_assets:
executable_assets_by_partitions_def[asset.partitions_def].append(asset)
# sort to ensure some stability in the ordering
all_partitions_defs = sorted(
[p for p in executable_assets_by_partitions_def.keys() if p], key=repr
)

if len(all_partitions_defs) == 0:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
executable_assets=executable_assets,
loadable_assets=unexecutable_assets,
asset_checks=asset_checks,
source_assets=source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
}
unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, [])
jobs = []

# sort to ensure some stability in the ordering
for i, (partitions_def, assets_with_partitions) in enumerate(
sorted(partitioned_assets_by_partitions_def.items(), key=lambda item: repr(item[0]))
):
for i, partitions_def in enumerate(all_partitions_defs):
# all base jobs contain all unpartitioned assets
executable_assets_for_job = [
*executable_assets_by_partitions_def[partitions_def],
*unpartitioned_executable_assets,
]
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
executable_assets=executable_assets_for_job,
loadable_assets=[
*(
asset
for asset in executable_assets
if asset not in executable_assets_for_job
),
*unexecutable_assets,
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
# Only explicitly set partitions_def for observable-only jobs since it can't be
# auto-detected from the passed assets (which is an empty list).
partitions_def=partitions_def if len(assets_with_partitions) == 0 else None,
partitions_def=partitions_def,
)
)
return jobs


def build_assets_job(
name: str,
assets: Sequence[AssetsDefinition],
source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]],
loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
resource_defs: Optional[Mapping[str, object]] = None,
description: Optional[str] = None,
Expand All @@ -144,11 +149,11 @@ def build_assets_job(
Args:
name (str): The name of the job.
assets (List[AssetsDefinition]): A list of assets or
multi-assets - usually constructed using the :py:func:`@asset` or :py:func:`@multi_asset`
decorator.
source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of
assets that are not materialized by this job, but that assets in this job depend on.
executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets
to be executed by the job. SourceAssets must be observable.
loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of
AssetsDefinitions or SourceAssets that are not exectued by this job, but that are
available to be loaded as inputs by executable assets.
resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in
this job.
description (Optional[str]): A description of the job.
Expand All @@ -174,31 +179,38 @@ def asset2(asset1):
from dagster._core.execution.build_resources import wrap_resources_for_execution

check.str_param(name, "name")
check.iterable_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset))
source_assets = check.opt_sequence_param(
source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset))
for asset in executable_assets:
if not asset.is_executable:
keys = [asset.key] if isinstance(asset, SourceAsset) else asset.keys
check.failed(f"Passed unexecutable keys to executable_assets: {keys}")

loadable_assets = check.opt_sequence_param(
loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
asset_checks = check.opt_sequence_param(
asset_checks, "asset_checks", of_type=AssetChecksDefinition
)
check.opt_str_param(description, "description")
check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs)
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

# turn any AssetsDefinitions into SourceAssets
resolved_source_assets: List[SourceAsset] = []
for asset in source_assets or []:
assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)]
resolved_source_assets = [
asset for asset in executable_assets if isinstance(asset, SourceAsset)
]
for asset in loadable_assets or []:
if isinstance(asset, AssetsDefinition):
resolved_source_assets += asset.to_source_assets()
elif isinstance(asset, SourceAsset):
resolved_source_assets.append(asset)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)
deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
Expand All @@ -222,7 +234,7 @@ def asset2(asset1):
else:
node_defs = []
observable_source_assets_by_node_handle: Mapping[NodeHandle, SourceAsset] = {}
for asset in source_assets:
for asset in resolved_source_assets:
if (
isinstance(asset, SourceAsset)
and asset.is_observable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,7 @@ def external_repository_data_from_def(
resource_datas = repository_def.get_top_level_resources()
asset_graph = external_asset_nodes_from_defs(
jobs,
assets_defs_by_key=repository_def.assets_defs_by_key,
source_assets_by_key=repository_def.source_assets_by_key,
)

Expand Down Expand Up @@ -1555,6 +1556,7 @@ def external_asset_checks_from_defs(

def external_asset_nodes_from_defs(
job_defs: Sequence[JobDefinition],
assets_defs_by_key: Mapping[AssetKey, AssetsDefinition],
source_assets_by_key: Mapping[AssetKey, SourceAsset],
) -> Sequence[ExternalAssetNode]:
node_defs_by_asset_key: Dict[
Expand Down Expand Up @@ -1781,6 +1783,35 @@ def external_asset_nodes_from_defs(
)
)

# Ensure any external assets that are have no nodes in any job are included in the asset graph
for asset in assets_defs_by_key.values():
for key in [
key
for key in asset.keys
if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions
]:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
dependencies=list(deps[key].values()),
depended_by=list(dep_by[key].values()),
execution_type=asset.execution_type,
job_names=[],
op_description=asset.descriptions_by_key.get(key),
metadata=asset.metadata_by_key.get(key),
group_name=asset.group_names_by_key.get(key),
is_source=True,
is_observable=asset.is_observable,
auto_observe_interval_minutes=asset.auto_observe_interval_minutes,
partitions_def_data=(
external_partitions_definition_from_def(asset.partitions_def)
if asset.partitions_def
else None
),
freshness_policy=asset.freshness_policies_by_key.get(key),
)
)

defined = set()
for node in asset_nodes:
if node.asset_key in defined:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def repo():
return assets + (asset_checks or [])

external_asset_nodes = external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key={}
repo.get_all_jobs(), repo.assets_defs_by_key, source_assets_by_key={}
)
return ExternalAssetGraph.from_repository_handles_and_external_asset_nodes(
[(MagicMock(), asset_node) for asset_node in external_asset_nodes],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,9 @@ def external_asset_graph_from_assets_by_repo_name(
repo = Definitions(assets=assets).get_repository_def()

external_asset_nodes = external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key
repo.get_all_jobs(),
repo.assets_defs_by_key,
source_assets_by_key=repo.source_assets_by_key,
)
repo_handle = MagicMock(repository_name=repo_name)
from_repository_handles_and_external_asset_nodes.extend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _get_external_asset_nodes_from_definitions(
) -> Sequence[ExternalAssetNode]:
repo = defs.get_repository_def()
return external_asset_nodes_from_defs(
repo.get_all_jobs(), source_assets_by_key=repo.source_assets_by_key
repo.get_all_jobs(), repo.assets_defs_by_key, source_assets_by_key=repo.source_assets_by_key
)


Expand Down
Loading

0 comments on commit a28495c

Please sign in to comment.