Skip to content

Commit

Permalink
[external-assets] Allow asset jobs to combine materializations and ob…
Browse files Browse the repository at this point in the history
…servations
  • Loading branch information
smackesey committed Feb 12, 2024
1 parent c2222ce commit 0e3c817
Show file tree
Hide file tree
Showing 25 changed files with 367 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1335,8 +1335,8 @@ def never_runs_asset(

hanging_job = build_assets_job(
name="hanging_job",
source_assets=[dummy_source_asset],
assets=[first_asset, hanging_asset, never_runs_asset],
other_assets=[dummy_source_asset],
assets_to_execute=[first_asset, hanging_asset, never_runs_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
"hanging_asset_resource": hanging_asset_resource,
Expand Down Expand Up @@ -1383,7 +1383,7 @@ def downstream_asset(hanging_graph):

hanging_graph_asset_job = build_assets_job(
name="hanging_graph_asset_job",
assets=[hanging_graph_asset, downstream_asset],
assets_to_execute=[hanging_graph_asset, downstream_asset],
resource_defs={
"hanging_asset_resource": hanging_asset_resource,
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1401,7 +1401,7 @@ def asset_two(asset_one):
return asset_one + 1


two_assets_job = build_assets_job(name="two_assets_job", assets=[asset_one, asset_two])
two_assets_job = build_assets_job(name="two_assets_job", assets_to_execute=[asset_one, asset_two])


@asset
Expand All @@ -1412,7 +1412,7 @@ def executable_asset() -> None:
unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")])))

executable_test_job = build_assets_job(
name="executable_test_job", assets=[executable_asset, unexecutable_asset]
name="executable_test_job", assets_to_execute=[executable_asset, unexecutable_asset]
)

static_partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"])
Expand Down Expand Up @@ -1455,7 +1455,7 @@ def downstream_dynamic_partitioned_asset(

dynamic_partitioned_assets_job = build_assets_job(
"dynamic_partitioned_assets_job",
assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
assets_to_execute=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
)


Expand Down Expand Up @@ -1529,7 +1529,7 @@ def yield_partition_materialization():

partition_materialization_job = build_assets_job(
"partition_materialization_job",
assets=[yield_partition_materialization],
assets_to_execute=[yield_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1543,7 +1543,7 @@ def fail_partition_materialization(context):

fail_partition_materialization_job = build_assets_job(
"fail_partition_materialization_job",
assets=[fail_partition_materialization],
assets_to_execute=[fail_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1562,7 +1562,7 @@ def hanging_partition_asset(context):

hanging_partition_asset_job = build_assets_job(
"hanging_partition_asset_job",
assets=[hanging_partition_asset],
assets_to_execute=[hanging_partition_asset],
executor_def=in_process_executor,
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1580,7 +1580,7 @@ def asset_yields_observation():

observation_job = build_assets_job(
"observation_job",
assets=[asset_yields_observation],
assets_to_execute=[asset_yields_observation],
executor_def=in_process_executor,
)

Expand Down
102 changes: 69 additions & 33 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ def asset_key_to_dep_node_handles(
dep_nodes_by_asset_key: Dict[AssetKey, List[NodeHandle]] = {}
dep_node_outputs_by_asset_key: Dict[AssetKey, List[NodeOutputHandle]] = {}

for node_handle, assets_defs in assets_defs_by_node_handle.items():
for node_handle, assets_def in assets_defs_by_node_handle.items():
dep_node_output_handles_by_node: Dict[
NodeOutputHandle, Sequence[NodeOutputHandle]
] = {} # memoized map of node output handles to all node output handle dependencies that are from ops
for output_name, asset_key in assets_defs.keys_by_output_name.items():
for output_name, asset_key in assets_def.keys_by_output_name.items():
dep_nodes_by_asset_key[
asset_key
] = [] # first element in list is node that outputs asset
Expand Down Expand Up @@ -395,10 +395,11 @@ class AssetLayer(NamedTuple):
@staticmethod
def from_graph_and_assets_node_mapping(
graph_def: GraphDefinition,
assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"],
source_assets: Sequence["SourceAsset"],
assets_to_execute_by_node_handle: Mapping[
NodeHandle, Union["AssetsDefinition", "SourceAsset"]
],
asset_checks_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
other_assets: Sequence["SourceAsset"],
resolved_asset_deps: "ResolvedAssetDependencies",
) -> "AssetLayer":
"""Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the
Expand All @@ -410,24 +411,41 @@ def from_graph_and_assets_node_mapping(
assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from
a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up.
"""
from dagster._core.definitions.assets import AssetsDefinition, SourceAsset

asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {}
asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {}
check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {}
asset_deps: Dict[AssetKey, AbstractSet[AssetKey]] = {}
io_manager_by_asset: Dict[AssetKey, str] = {
source_asset.key: source_asset.get_io_manager_key() for source_asset in source_assets
source_asset.key: source_asset.get_io_manager_key() for source_asset in other_assets
}
partition_mappings_by_asset_dep: Dict[Tuple[NodeHandle, AssetKey], "PartitionMapping"] = {}

assets_to_observe_by_node_handle = {
k: v for k, v in assets_to_execute_by_node_handle.items() if v.is_observable
}

# This can be executed for just materialized assets because observed assets do not have node
# dependencies.
(
dep_node_handles_by_asset_key,
dep_node_output_handles_by_asset_key,
) = asset_key_to_dep_node_handles(graph_def, assets_defs_by_outer_node_handle)
) = asset_key_to_dep_node_handles(
graph_def,
{
k: v
for k, v in assets_to_execute_by_node_handle.items()
if isinstance(v, AssetsDefinition)
},
)

node_output_handles_by_asset_check_key: Mapping[AssetCheckKey, NodeOutputHandle] = {}
check_names_by_asset_key_by_node_handle: Dict[NodeHandle, Dict[AssetKey, Set[str]]] = {}

for node_handle, assets_def in assets_defs_by_outer_node_handle.items():
for node_handle, assets_def in assets_to_execute_by_node_handle.items():
if isinstance(assets_def, SourceAsset): # source assets are processed later
continue
for key in assets_def.keys:
asset_deps[key] = resolved_asset_deps.get_resolved_upstream_asset_keys(
assets_def, key
Expand Down Expand Up @@ -516,7 +534,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
for node_output_handle in node_output_handles:
dep_asset_keys_by_node_output_handle[node_output_handle].add(asset_key)

for node_handle, checks_def in asset_checks_defs_by_node_handle.items():
for node_handle, checks_def in asset_checks_by_node_handle.items():
check_names_by_asset_key_by_node_handle[node_handle] = defaultdict(set)
for output_name, check_spec in checks_def.specs_by_output_name.items():
inner_output_def, inner_node_handle = checks_def.node_def.resolve_output_to_origin(
Expand All @@ -541,27 +559,39 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:

assets_defs_by_key = {
key: assets_def
for assets_def in assets_defs_by_outer_node_handle.values()
for assets_def in assets_to_execute_by_node_handle.values()
if isinstance(assets_def, AssetsDefinition)
for key in assets_def.keys
}

source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}
for node_handle, source_asset in observable_source_assets_by_node_handle.items():
node_def = cast(NodeDefinition, source_asset.node_def)
source_assets_by_key = {
**{source_asset.key: source_asset for source_asset in other_assets},
**{
asset.key: asset
for asset in assets_to_observe_by_node_handle.values()
if isinstance(asset, SourceAsset)
},
}
for node_handle, asset in assets_to_observe_by_node_handle.items():
if isinstance(asset, AssetsDefinition):
continue
node_def = cast(NodeDefinition, asset.node_def)
check.invariant(len(node_def.output_defs) == 1)
output_name = node_def.output_defs[0].name
output_def = node_def.output_defs[0]
output_name = output_def.name
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)
io_manager_by_asset[asset.key] = asset.get_io_manager_key()

asset_info_by_output[node_output_handle] = AssetOutputInfo(
source_asset.key,
asset.key,
partitions_fn=None,
partitions_def=source_asset.partitions_def,
partitions_def=asset.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)
Expand All @@ -573,12 +603,17 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
for asset_key, node_handles in dep_node_handles_by_asset_key.items()
for node_handle in node_handles
},
**{
node_handle: assets_def
for node_handle, assets_def in assets_to_observe_by_node_handle.items()
if isinstance(assets_def, AssetsDefinition)
},
# nodes for asset checks. Required for AssetsDefs that have selected checks
# but not assets
**{
node_handle: assets_def
for node_handle, assets_def in assets_defs_by_outer_node_handle.items()
if assets_def.check_keys
node_handle: asset
for node_handle, asset in assets_to_execute_by_node_handle.items()
if isinstance(asset, AssetsDefinition) and asset.check_keys
},
}

Expand All @@ -594,7 +629,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
io_manager_keys_by_asset_key=io_manager_by_asset,
dep_asset_keys_by_node_output_handle=dep_asset_keys_by_node_output_handle,
partition_mappings_by_asset_dep=partition_mappings_by_asset_dep,
asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle,
asset_checks_defs_by_node_handle=asset_checks_by_node_handle,
node_output_handles_by_asset_check_key=node_output_handles_by_asset_check_key,
check_names_by_asset_key_by_node_handle=check_names_by_asset_key_by_node_handle,
)
Expand Down Expand Up @@ -883,23 +918,24 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks_defs) > 0:
# Job materializes assets and/or executes checks
final_assets = included_assets
final_asset_checks = included_checks_defs
final_source_assets = [*source_assets, *excluded_assets]
else:
# Job only observes source assets
final_assets = []
final_asset_checks = []
final_source_assets = included_source_assets
included_observables = [asset for asset in included_source_assets if asset.is_observable]
final_assets = [*included_assets, *included_observables]
final_asset_checks = included_checks_defs
final_source_assets = [
*(
source_asset
for source_asset in source_assets
if source_asset not in included_observables
),
*excluded_assets,
]

return build_assets_job(
name=name,
assets=final_assets,
assets_to_execute=final_assets,
asset_checks=final_asset_checks,
config=config,
source_assets=final_source_assets,
other_assets=final_source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
24 changes: 24 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,30 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]:
"""
return self._selected_asset_check_keys

@property
def execution_type(self) -> AssetExecutionType:
key = next(iter(self.keys), None)

# This occurs when a multi-asset has been subsetted to have only checks-- for now supported
# only on materializable assets.
if key is None:
return AssetExecutionType.MATERIALIZATION

# All assets in an AssetsDefinition currently must have the same execution type
return self.asset_execution_type_for_asset(key)

@property
def is_observable(self) -> bool:
return self.execution_type == AssetExecutionType.OBSERVATION

@property
def is_materializable(self) -> bool:
return self.execution_type == AssetExecutionType.MATERIALIZATION

@property
def is_executable(self) -> bool:
return self.execution_type != AssetExecutionType.UNEXECUTABLE

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
Expand Down
Loading

0 comments on commit 0e3c817

Please sign in to comment.