Skip to content

Commit

Permalink
[external-assets] Allow asset jobs to combine materializations and ob…
Browse files Browse the repository at this point in the history
…servations
  • Loading branch information
smackesey committed Feb 7, 2024
1 parent b1e59a1 commit 1098e02
Show file tree
Hide file tree
Showing 19 changed files with 304 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1329,8 +1329,8 @@ def never_runs_asset(

hanging_job = build_assets_job(
name="hanging_job",
source_assets=[dummy_source_asset],
assets=[first_asset, hanging_asset, never_runs_asset],
other_assets=[dummy_source_asset],
assets_to_execute=[first_asset, hanging_asset, never_runs_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
"hanging_asset_resource": hanging_asset_resource,
Expand Down Expand Up @@ -1377,7 +1377,7 @@ def downstream_asset(hanging_graph):

hanging_graph_asset_job = build_assets_job(
name="hanging_graph_asset_job",
assets=[hanging_graph_asset, downstream_asset],
assets_to_execute=[hanging_graph_asset, downstream_asset],
resource_defs={
"hanging_asset_resource": hanging_asset_resource,
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1395,7 +1395,7 @@ def asset_two(asset_one):
return asset_one + 1


two_assets_job = build_assets_job(name="two_assets_job", assets=[asset_one, asset_two])
two_assets_job = build_assets_job(name="two_assets_job", assets_to_execute=[asset_one, asset_two])


@asset
Expand All @@ -1406,7 +1406,7 @@ def executable_asset() -> None:
unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")])))

executable_test_job = build_assets_job(
name="executable_test_job", assets=[executable_asset, unexecutable_asset]
name="executable_test_job", assets_to_execute=[executable_asset, unexecutable_asset]
)

static_partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"])
Expand Down Expand Up @@ -1449,7 +1449,7 @@ def downstream_dynamic_partitioned_asset(

dynamic_partitioned_assets_job = build_assets_job(
"dynamic_partitioned_assets_job",
assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
assets_to_execute=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
)


Expand Down Expand Up @@ -1523,7 +1523,7 @@ def yield_partition_materialization():

partition_materialization_job = build_assets_job(
"partition_materialization_job",
assets=[yield_partition_materialization],
assets_to_execute=[yield_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1537,7 +1537,7 @@ def fail_partition_materialization(context):

fail_partition_materialization_job = build_assets_job(
"fail_partition_materialization_job",
assets=[fail_partition_materialization],
assets_to_execute=[fail_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1556,7 +1556,7 @@ def hanging_partition_asset(context):

hanging_partition_asset_job = build_assets_job(
"hanging_partition_asset_job",
assets=[hanging_partition_asset],
assets_to_execute=[hanging_partition_asset],
executor_def=in_process_executor,
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1574,7 +1574,7 @@ def asset_yields_observation():

observation_job = build_assets_job(
"observation_job",
assets=[asset_yields_observation],
assets_to_execute=[asset_yields_observation],
executor_def=in_process_executor,
)

Expand Down
112 changes: 68 additions & 44 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,11 @@ class AssetLayer(NamedTuple):
@staticmethod
def from_graph_and_assets_node_mapping(
graph_def: GraphDefinition,
assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"],
asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"],
source_assets: Sequence["SourceAsset"],
assets_to_execute_by_node_handle: Mapping[
NodeHandle, Union["AssetsDefinition", "SourceAsset"]
],
asset_checks_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"],
other_assets: Sequence["SourceAsset"],
resolved_asset_deps: "ResolvedAssetDependencies",
) -> "AssetLayer":
"""Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the
Expand All @@ -410,24 +411,37 @@ def from_graph_and_assets_node_mapping(
assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from
a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up.
"""
from dagster._core.definitions.assets import AssetsDefinition, SourceAsset

asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {}
asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {}
check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {}
asset_deps: Dict[AssetKey, AbstractSet[AssetKey]] = {}
io_manager_by_asset: Dict[AssetKey, str] = {
source_asset.key: source_asset.get_io_manager_key() for source_asset in source_assets
source_asset.key: source_asset.get_io_manager_key() for source_asset in other_assets
}
partition_mappings_by_asset_dep: Dict[Tuple[NodeHandle, AssetKey], "PartitionMapping"] = {}

assets_to_materialize_by_node_handle = {
k: v
for k, v in assets_to_execute_by_node_handle.items()
if isinstance(v, AssetsDefinition) and v.is_materializable
}
assets_to_observe_by_node_handle = {
k: v for k, v in assets_to_execute_by_node_handle.items() if v.is_observable
}
# This can be executed for just materialized assets because observed assets do not have node
# dependencies.
(
dep_node_handles_by_asset_key,
dep_node_output_handles_by_asset_key,
) = asset_key_to_dep_node_handles(graph_def, assets_defs_by_outer_node_handle)
) = asset_key_to_dep_node_handles(graph_def, assets_to_materialize_by_node_handle)

node_output_handles_by_asset_check_key: Mapping[AssetCheckKey, NodeOutputHandle] = {}
check_names_by_asset_key_by_node_handle: Dict[NodeHandle, Dict[AssetKey, Set[str]]] = {}

for node_handle, assets_def in assets_defs_by_outer_node_handle.items():
# Also do this for just materializations?
for node_handle, assets_def in assets_to_materialize_by_node_handle.items():
for key in assets_def.keys:
asset_deps[key] = resolved_asset_deps.get_resolved_upstream_asset_keys(
assets_def, key
Expand Down Expand Up @@ -516,7 +530,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
for node_output_handle in node_output_handles:
dep_asset_keys_by_node_output_handle[node_output_handle].add(asset_key)

for node_handle, checks_def in asset_checks_defs_by_node_handle.items():
for node_handle, checks_def in asset_checks_by_node_handle.items():
check_names_by_asset_key_by_node_handle[node_handle] = defaultdict(set)
for output_name, check_spec in checks_def.specs_by_output_name.items():
inner_output_def, inner_node_handle = checks_def.node_def.resolve_output_to_origin(
Expand All @@ -541,30 +555,39 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:

assets_defs_by_key = {
key: assets_def
for assets_def in assets_defs_by_outer_node_handle.values()
for assets_def in assets_to_execute_by_node_handle.values()
if isinstance(assets_def, AssetsDefinition)
for key in assets_def.keys
}

source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}
for node_handle, source_asset in observable_source_assets_by_node_handle.items():
node_def = cast(NodeDefinition, source_asset.node_def)
check.invariant(len(node_def.output_defs) == 1)
output_name = node_def.output_defs[0].name
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)
source_assets_by_key = {
**{source_asset.key: source_asset for source_asset in other_assets},
**{
asset.key: asset
for asset in assets_to_observe_by_node_handle.values()
if isinstance(asset, SourceAsset)
},
}
for node_handle, asset in assets_to_observe_by_node_handle.items():
node_def = cast(NodeDefinition, asset.node_def)
# check.invariant(len(node_def.output_defs) == 1)
for output_def in node_def.output_defs:
output_name = output_def.name
# resolve graph output to the op output it comes from
inner_output_def, inner_node_handle = node_def.resolve_output_to_origin(
output_name, handle=node_handle
)
node_output_handle = NodeOutputHandle(
check.not_none(inner_node_handle), inner_output_def.name
)

asset_info_by_output[node_output_handle] = AssetOutputInfo(
source_asset.key,
partitions_fn=None,
partitions_def=source_asset.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)
asset_info_by_output[node_output_handle] = AssetOutputInfo(
asset.key,
partitions_fn=None,
partitions_def=asset.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
# nodes for assets
Expand All @@ -576,9 +599,9 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
# nodes for asset checks. Required for AssetsDefs that have selected checks
# but not assets
**{
node_handle: assets_def
for node_handle, assets_def in assets_defs_by_outer_node_handle.items()
if assets_def.check_keys
node_handle: asset
for node_handle, asset in assets_to_execute_by_node_handle.items()
if isinstance(asset, AssetsDefinition) and asset.check_keys
},
}

Expand All @@ -594,7 +617,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
io_manager_keys_by_asset_key=io_manager_by_asset,
dep_asset_keys_by_node_output_handle=dep_asset_keys_by_node_output_handle,
partition_mappings_by_asset_dep=partition_mappings_by_asset_dep,
asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle,
asset_checks_defs_by_node_handle=asset_checks_by_node_handle,
node_output_handles_by_asset_check_key=node_output_handles_by_asset_check_key,
check_names_by_asset_key_by_node_handle=check_names_by_asset_key_by_node_handle,
)
Expand Down Expand Up @@ -883,23 +906,24 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks_defs) > 0:
# Job materializes assets and/or executes checks
final_assets = included_assets
final_asset_checks = included_checks_defs
final_source_assets = [*source_assets, *excluded_assets]
else:
# Job only observes source assets
final_assets = []
final_asset_checks = []
final_source_assets = included_source_assets
included_observables = [asset for asset in included_source_assets if asset.is_observable]
final_assets = [*included_assets, *included_observables]
final_asset_checks = included_checks_defs
final_source_assets = [
*(
source_asset
for source_asset in source_assets
if source_asset not in included_observables
),
*excluded_assets,
]

return build_assets_job(
name=name,
assets=final_assets,
assets_to_execute=final_assets,
asset_checks=final_asset_checks,
config=config,
source_assets=final_source_assets,
other_assets=final_source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
19 changes: 19 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,25 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]:
"""
return self._selected_asset_check_keys

@property
def execution_type(self) -> AssetExecutionType:
key = next(iter(self.keys))

# All assets in an AssetsDefinition currently must have the same execution type
return self.asset_execution_type_for_asset(key)

@property
def is_observable(self) -> bool:
return self.execution_type == AssetExecutionType.OBSERVATION

@property
def is_materializable(self) -> bool:
return self.execution_type == AssetExecutionType.MATERIALIZATION

@property
def is_executable(self) -> bool:
return self.execution_type != AssetExecutionType.UNEXECUTABLE

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
Expand Down
Loading

0 comments on commit 1098e02

Please sign in to comment.