Skip to content

Commit

Permalink
[external-assets] Create external assets defs for undefined assets in…
Browse files Browse the repository at this point in the history
… repo construction
  • Loading branch information
smackesey committed Mar 3, 2024
1 parent 072ed7a commit c015d8f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets_job import (
get_base_asset_jobs,
is_base_asset_job_name,
Expand All @@ -32,7 +33,10 @@
AutoMaterializeSensorDefinition,
)
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_asset_from_spec,
)
from dagster._core.definitions.graph_definition import GraphDefinition
from dagster._core.definitions.internal_asset_graph import InternalAssetGraph
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -229,6 +233,7 @@ def build_caching_repository_data_from_list(
elif isinstance(definition, SourceAsset):
source_assets.append(definition)
assets_defs.append(create_external_asset_from_source_asset(definition))
asset_keys.add(definition.key)
elif isinstance(definition, AssetChecksDefinition):
asset_checks_defs.append(definition)
else:
Expand All @@ -247,6 +252,16 @@ def build_caching_repository_data_from_list(
for ad in assets_defs
]

# Create unexecutable external assets definitions for any referenced keys for which no
# definition was provided.
all_referenced_asset_keys = {
*(key for asset_def in assets_defs for key in asset_def.dependency_keys),
*(checks_def.asset_key for checks_def in asset_checks_defs),
}
for key in all_referenced_asset_keys.difference(asset_keys):
assets_defs.append(external_asset_from_spec(AssetSpec(key=key)))
asset_keys.add(key)

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,20 +1651,7 @@ def external_asset_nodes_from_defs(

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

asset_keys_without_definitions = all_upstream_asset_keys.difference(assets_defs_by_key.keys())

asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
execution_type=AssetExecutionType.UNEXECUTABLE,
job_names=[],
group_name=group_name_by_asset_key.get(asset_key),
code_version=code_version_by_asset_key.get(asset_key),
)
for asset_key in asset_keys_without_definitions
]
asset_nodes: List[ExternalAssetNode] = []

for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]
Expand Down Expand Up @@ -1744,11 +1731,7 @@ def external_asset_nodes_from_defs(

# Ensure any external assets that are have no nodes in any job are included in the asset graph
for asset in assets_defs_by_key.values():
for key in [
key
for key in asset.keys
if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions
]:
for key in [key for key in asset.keys if key not in node_defs_by_asset_key]:
asset_nodes.append(
ExternalAssetNode(
asset_key=key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,21 +424,6 @@ def op1():
assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set()


def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]):
@asset
def foo():
pass

@asset(deps=["this_source_is_fake", "source_asset"])
def bar(foo):
pass

source_asset = SourceAsset("source_asset")

asset_graph = asset_graph_from_assets([foo, bar, source_asset])
assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")}


def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]):
partitions_def = DailyPartitionsDefinition(start_date="2022-01-01")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ def foo(bar):
),
key=lambda n: n.asset_key,
)

assert external_asset_nodes == [
ExternalAssetNode(
asset_key=AssetKey("bar"),
Expand Down

0 comments on commit c015d8f

Please sign in to comment.