Skip to content

Commit

Permalink
[external-assets] Remove SourceAsset from AssetLayer
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Mar 12, 2024
1 parent 730b904 commit 42e33eb
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 54 deletions.
64 changes: 33 additions & 31 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ class AssetLayer(NamedTuple):
check_key_by_node_output_handle: Mapping[NodeOutputHandle, AssetCheckKey]
asset_deps: Mapping[AssetKey, AbstractSet[AssetKey]]
dependency_node_handles_by_asset_key: Mapping[AssetKey, Set[NodeHandle]]
source_assets_by_key: Mapping[AssetKey, "SourceAsset"]
io_manager_keys_by_asset_key: Mapping[AssetKey, str]
# Used to store the asset key dependencies of op node handles within graph backed assets
# See AssetLayer.downstream_dep_assets for more information
Expand Down Expand Up @@ -420,12 +419,25 @@ def from_graph_and_assets_node_mapping(
assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from
a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up.
"""
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
)

# Eliminate source assets here
observable_assets_defs_by_node_handle = {
k: create_external_asset_from_source_asset(sa)
for k, sa in observable_source_assets_by_node_handle.items()
}
unexecutable_assets_defs = [
create_external_asset_from_source_asset(sa) for sa in source_assets
]

asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {}
asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {}
check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {}
asset_deps: Dict[AssetKey, AbstractSet[AssetKey]] = {}
io_manager_by_asset: Dict[AssetKey, str] = {
source_asset.key: source_asset.get_io_manager_key() for source_asset in source_assets
ad.key: ad.get_io_manager_key_for_asset_key(ad.key) for ad in unexecutable_assets_defs
}
partition_mappings_by_asset_dep: Dict[Tuple[NodeHandle, AssetKey], "PartitionMapping"] = {}

Expand Down Expand Up @@ -557,13 +569,16 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:

assets_defs_by_key = {
key: assets_def
for assets_def in assets_defs_by_outer_node_handle.values()
for assets_def in (
*assets_defs_by_outer_node_handle.values(),
*observable_assets_defs_by_node_handle.values(),
*unexecutable_assets_defs,
)
for key in assets_def.keys
}

source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}
for node_handle, source_asset in observable_source_assets_by_node_handle.items():
node_def = cast(NodeDefinition, source_asset.node_def)
for node_handle, assets_def in observable_assets_defs_by_node_handle.items():
node_def = assets_def.node_def
check.invariant(len(node_def.output_defs) == 1)
output_name = node_def.output_defs[0].name
# resolve graph output to the op output it comes from
Expand All @@ -575,9 +590,9 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
)

asset_info_by_output[node_output_handle] = AssetOutputInfo(
source_asset.key,
assets_def.key,
partitions_fn=None,
partitions_def=source_asset.partitions_def,
partitions_def=assets_def.partitions_def,
is_required=True,
code_version=inner_output_def.code_version,
)
Expand Down Expand Up @@ -606,7 +621,6 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
assets_defs_by_node_handle=assets_defs_by_node_handle,
dependency_node_handles_by_asset_key=dep_node_handles_by_asset_key,
assets_defs_by_key=assets_defs_by_key,
source_assets_by_key=source_assets_by_key,
io_manager_keys_by_asset_key=io_manager_by_asset,
dep_asset_keys_by_node_output_handle=dep_asset_keys_by_node_output_handle,
partition_mappings_by_asset_dep=partition_mappings_by_asset_dep,
Expand Down Expand Up @@ -724,13 +738,17 @@ def io_manager_key_for_asset(self, asset_key: AssetKey) -> str:
def execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType:
if asset_key in self.assets_defs_by_key:
return self.assets_defs_by_key[asset_key].execution_type
elif asset_key in self.source_assets_by_key:
return self.source_assets_by_key[asset_key].execution_type
else:
check.failed(f"Couldn't find key {asset_key}")

def is_executable_for_asset(self, asset_key: AssetKey) -> bool:
asset = self.assets_defs_by_key.get(asset_key)
# TODO: remove existence check
return False if asset is None else asset.is_executable

def is_observable_for_asset(self, asset_key: AssetKey) -> bool:
asset = self.assets_defs_by_key.get(asset_key) or self.source_assets_by_key.get(asset_key)
asset = self.assets_defs_by_key.get(asset_key)
# TODO: remove existence check
return False if asset is None else asset.is_observable

def is_materializable_for_asset(self, asset_key: AssetKey) -> bool:
Expand All @@ -750,10 +768,7 @@ def code_version_for_asset(self, asset_key: AssetKey) -> Optional[str]:
def metadata_for_asset(
self, asset_key: AssetKey
) -> Optional[Mapping[str, ArbitraryMetadataMapping]]:
if asset_key in self.source_assets_by_key:
raw_metadata = self.source_assets_by_key[asset_key].raw_metadata
return raw_metadata or None
elif asset_key in self.assets_defs_by_key:
if asset_key in self.assets_defs_by_key:
return self.assets_defs_by_key[asset_key].metadata_by_key[asset_key]
else:
check.failed(f"Couldn't find key {asset_key}")
Expand All @@ -776,31 +791,18 @@ def asset_check_key_for_output(
return self.check_key_by_node_output_handle.get(NodeOutputHandle(node_handle, output_name))

def group_names_by_assets(self) -> Mapping[AssetKey, str]:
group_names: Dict[AssetKey, str] = {
return {
key: assets_def.group_names_by_key[key]
for key, assets_def in self.assets_defs_by_key.items()
if key in assets_def.group_names_by_key
}

group_names.update(
{
key: source_asset_def.group_name
for key, source_asset_def in self.source_assets_by_key.items()
}
)

return group_names

def partitions_def_for_asset(self, asset_key: AssetKey) -> Optional["PartitionsDefinition"]:
assets_def = self.assets_defs_by_key.get(asset_key)

# TODO: Remove existence check
if assets_def is not None:
return assets_def.partitions_def
else:
source_asset = self.source_assets_by_key.get(asset_key)
if source_asset is not None:
return source_asset.partitions_def

return None

def partition_mapping_for_node_input(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,7 @@ def _get_job_def_for_asset_selection(
check.opt_set_param(asset_check_selection, "asset_check_selection", AssetCheckKey)

nonexistent_assets = [
asset
for asset in asset_selection
if asset not in self.asset_layer.asset_keys
and asset not in self.asset_layer.source_assets_by_key
asset for asset in asset_selection if asset not in self.asset_layer.asset_keys
]
nonexistent_asset_strings = [
asset_str
Expand Down Expand Up @@ -818,7 +815,7 @@ def _get_job_def_for_asset_selection(
new_job = build_asset_selection_job(
name=self.name,
assets=self.asset_layer.assets_defs,
source_assets=self.asset_layer.source_assets_by_key.values(),
source_assets=[],
executor_def=self.executor_def,
resource_defs=self.resource_defs,
description=self.description,
Expand Down Expand Up @@ -1219,6 +1216,10 @@ def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) ->
"""For non-asset jobs that have some inputs that are fed from SourceAssets, constructs an
AssetLayer that includes those SourceAssets.
"""
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
)

asset_keys_by_node_input_handle: Dict[NodeInputHandle, AssetKey] = {}
source_assets_list = []
source_asset_keys_set = set()
Expand Down Expand Up @@ -1257,9 +1258,8 @@ def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) ->
asset_info_by_node_output_handle={},
asset_deps={},
dependency_node_handles_by_asset_key={},
assets_defs_by_key={},
source_assets_by_key={
source_asset.key: source_asset for source_asset in source_assets_list
assets_defs_by_key={
sa.key: create_external_asset_from_source_asset(sa) for sa in source_assets_list
},
io_manager_keys_by_asset_key=io_manager_keys_by_asset_key,
dep_asset_keys_by_node_output_handle={},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,22 +299,14 @@ def get_implicit_job_def_for_assets(
"""
if self.has_job(ASSET_BASE_JOB_PREFIX):
base_job = self.get_job(ASSET_BASE_JOB_PREFIX)
if all(
key in base_job.asset_layer.assets_defs_by_key
or base_job.asset_layer.is_observable_for_asset(key)
for key in asset_keys
):
if all(base_job.asset_layer.is_executable_for_asset(key) for key in asset_keys):
return base_job
else:
i = 0
while self.has_job(f"{ASSET_BASE_JOB_PREFIX}_{i}"):
base_job = self.get_job(f"{ASSET_BASE_JOB_PREFIX}_{i}")

if all(
key in base_job.asset_layer.assets_defs_by_key
or base_job.asset_layer.is_observable_for_asset(key)
for key in asset_keys
):
if all(base_job.asset_layer.is_executable_for_asset(key) for key in asset_keys):
return base_job

i += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,6 @@ def _log_materialization_or_observation_events_for_asset(
assets_def = asset_layer.assets_def_for_node(step_context.node_handle)
if assets_def is not None:
execution_type = assets_def.execution_type
elif asset_key in asset_layer.source_assets_by_key:
execution_type = asset_layer.source_assets_by_key[asset_key].execution_type
else:
# This is a situation that shouldn't really ever occur, but appears to be able to happen
# when multiple output names point to the same asset key, which also shouldn't occur,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@ def external_asset_nodes_from_defs(
downstream_asset_key=output_key
)

for assets_def in asset_layer.assets_defs:
for assets_def in [ad for ad in asset_layer.assets_defs if ad.is_executable]:
metadata_by_asset_key.update(assets_def.metadata_by_key)
freshness_policy_by_asset_key.update(assets_def.freshness_policies_by_key)
auto_materialize_policy_by_asset_key.update(assets_def.auto_materialize_policies_by_key)
Expand Down Expand Up @@ -1703,7 +1703,6 @@ def external_asset_nodes_from_defs(
while node_handle.parent:
node_handle = node_handle.parent
graph_name = node_handle.name

asset_nodes.append(
ExternalAssetNode(
asset_key=asset_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def handle_output(self, context, obj): ...
def load_input(self, context):
self.loaded_input = True
assert context.asset_key == source_asset.key
assert context.upstream_output.metadata == expected_metadata
for key, value in expected_metadata.items():
assert context.upstream_output.metadata[key] == value
return input_value

return MyIOManager()
Expand Down

0 comments on commit 42e33eb

Please sign in to comment.