Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[external-assets] Create external assets defs for undefined assets in repo construction #20145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"

# SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET lives on the metadata of external assets that are
# created for undefined but referenced assets during asset graph normalization. For example, in the
# below definitions, `foo` is referenced by upstream `bar` but has no corresponding definition:
#
#
# @asset(deps=["foo"])
# def bar(context: AssetExecutionContext):
# ...
#
# defs=Definitions(assets=[bar])
#
# During normalization we create a "stub" definition for `foo` and attach this metadata to it.
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET = "dagster/auto_created_stub_asset"


@whitelist_for_serdes
class AssetExecutionType(Enum):
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
AssetExecutionType,
)
Expand Down Expand Up @@ -938,6 +939,15 @@ def auto_observe_interval_minutes(self) -> Optional[float]:
)
return value

# Applies to AssetsDefinition that were auto-created because some asset referenced a key as a
# dependency, but no definition was provided for that key.
@property
def is_auto_created_stub(self) -> bool:
return (
self._get_external_asset_metadata_value(SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET)
is not None
)

def _get_external_asset_metadata_value(self, metadata_key: str) -> object:
first_key = next(iter(self.keys), None)
return self.metadata_by_key.get(first_key, {}).get(metadata_key) if first_key else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
)
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
AssetSpec,
)
from dagster._core.definitions.assets_job import (
get_base_asset_jobs,
is_base_asset_job_name,
Expand All @@ -32,7 +36,10 @@
AutoMaterializeSensorDefinition,
)
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_asset_from_spec,
)
from dagster._core.definitions.graph_definition import GraphDefinition
from dagster._core.definitions.internal_asset_graph import InternalAssetGraph
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -229,6 +236,7 @@ def build_caching_repository_data_from_list(
elif isinstance(definition, SourceAsset):
source_assets.append(definition)
assets_defs.append(create_external_asset_from_source_asset(definition))
asset_keys.add(definition.key)
elif isinstance(definition, AssetChecksDefinition):
asset_checks_defs.append(definition)
else:
Expand All @@ -247,6 +255,20 @@ def build_caching_repository_data_from_list(
for ad in assets_defs
]

# Create unexecutable external assets definitions for any referenced keys for which no
# definition was provided.
all_referenced_asset_keys = {
*(key for asset_def in assets_defs for key in asset_def.dependency_keys),
*(checks_def.asset_key for checks_def in asset_checks_defs),
}
for key in all_referenced_asset_keys.difference(asset_keys):
assets_defs.append(
external_asset_from_spec(
AssetSpec(key=key, metadata={SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET: True})
)
)
asset_keys.add(key)

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,20 +1651,7 @@ def external_asset_nodes_from_defs(

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

asset_keys_without_definitions = all_upstream_asset_keys.difference(assets_defs_by_key.keys())

asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
execution_type=AssetExecutionType.UNEXECUTABLE,
job_names=[],
group_name=group_name_by_asset_key.get(asset_key),
code_version=code_version_by_asset_key.get(asset_key),
)
for asset_key in asset_keys_without_definitions
]
asset_nodes: List[ExternalAssetNode] = []

for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]
Expand Down Expand Up @@ -1744,11 +1731,15 @@ def external_asset_nodes_from_defs(

# Ensure any external assets that are have no nodes in any job are included in the asset graph
for asset in assets_defs_by_key.values():
for key in [
key
for key in asset.keys
if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions
]:
for key in [key for key in asset.keys if key not in node_defs_by_asset_key]:
# This is in place to preserve an implicit behavior in the Dagster UI where stub
# dependencies were rendered as if they weren't part of the default asset group.
group_name = (
None
if asset.is_auto_created_stub
else group_name_by_asset_key.get(key, DEFAULT_GROUP_NAME)
)

asset_nodes.append(
ExternalAssetNode(
asset_key=key,
Expand All @@ -1758,7 +1749,7 @@ def external_asset_nodes_from_defs(
job_names=[],
op_description=asset.descriptions_by_key.get(key),
metadata=asset.metadata_by_key.get(key),
group_name=asset.group_names_by_key.get(key),
group_name=group_name,
is_source=True,
is_observable=asset.is_observable,
auto_observe_interval_minutes=asset.auto_observe_interval_minutes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,21 +405,6 @@ def op1(): ...
assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set()


def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]):
@asset
def foo():
pass

@asset(deps=["this_source_is_fake", "source_asset"])
def bar(foo):
pass

source_asset = SourceAsset("source_asset")

asset_graph = asset_graph_from_assets([foo, bar, source_asset])
assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")}


def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]):
partitions_def = DailyPartitionsDefinition(start_date="2022-01-01")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,16 @@ def my_sling_assets(sling: SlingResource):

def test_with_custom_name(replication_config: SlingReplicationParam):
@sling_assets(replication_config=replication_config)
def my_sling_assets():
...
def my_sling_assets(): ...

assert my_sling_assets.op.name == "my_sling_assets"

@sling_assets(replication_config=replication_config)
def my_other_assets():
...
def my_other_assets(): ...

assert my_other_assets.op.name == "my_other_assets"

@sling_assets(replication_config=replication_config, name="custom_name")
def my_third_sling_assets():
...
def my_third_sling_assets(): ...

assert my_third_sling_assets.op.name == "custom_name"