Skip to content

Commit

Permalink
[external-assets] Create external assets defs for undefined assets in…
Browse files Browse the repository at this point in the history
… repo construction (#20145)

## Summary & Motivation

Currently it is possible for assets to be referenced as dependencies
when they have no corresponding `AssetsDefinition` (only when using
`deps`). This complicates implementation of the `AssetGraph` and
`AssetLayer`.

This PR generates unexecutable external assets during repository
construction (the same place source assets are converted to external
assets) for any referenced keys without a corresponding user-provided
definition.

## How I Tested These Changes

Existing test suite. One test was removed-- this made sure the internal
asset graph could handle undefined asset references, but we no longer
rely on this behavior.
  • Loading branch information
smackesey authored Mar 5, 2024
1 parent ffb0346 commit db6db44
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 42 deletions.
14 changes: 14 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"

# SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET lives on the metadata of external assets that are
# created for undefined but referenced assets during asset graph normalization. For example, in the
# below definitions, `foo` is referenced by upstream `bar` but has no corresponding definition:
#
#
# @asset(deps=["foo"])
# def bar(context: AssetExecutionContext):
# ...
#
# defs=Definitions(assets=[bar])
#
# During normalization we create a "stub" definition for `foo` and attach this metadata to it.
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET = "dagster/auto_created_stub_asset"


@whitelist_for_serdes
class AssetExecutionType(Enum):
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
AssetExecutionType,
)
Expand Down Expand Up @@ -938,6 +939,15 @@ def auto_observe_interval_minutes(self) -> Optional[float]:
)
return value

# Applies to AssetsDefinition that were auto-created because some asset referenced a key as a
# dependency, but no definition was provided for that key.
@property
def is_auto_created_stub(self) -> bool:
return (
self._get_external_asset_metadata_value(SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET)
is not None
)

def _get_external_asset_metadata_value(self, metadata_key: str) -> object:
first_key = next(iter(self.keys), None)
return self.metadata_by_key.get(first_key, {}).get(metadata_key) if first_key else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
)
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET,
AssetSpec,
)
from dagster._core.definitions.assets_job import (
get_base_asset_jobs,
is_base_asset_job_name,
Expand All @@ -32,7 +36,10 @@
AutoMaterializeSensorDefinition,
)
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_asset_from_spec,
)
from dagster._core.definitions.graph_definition import GraphDefinition
from dagster._core.definitions.internal_asset_graph import InternalAssetGraph
from dagster._core.definitions.job_definition import JobDefinition
Expand Down Expand Up @@ -229,6 +236,7 @@ def build_caching_repository_data_from_list(
elif isinstance(definition, SourceAsset):
source_assets.append(definition)
assets_defs.append(create_external_asset_from_source_asset(definition))
asset_keys.add(definition.key)
elif isinstance(definition, AssetChecksDefinition):
asset_checks_defs.append(definition)
else:
Expand All @@ -247,6 +255,20 @@ def build_caching_repository_data_from_list(
for ad in assets_defs
]

# Create unexecutable external assets definitions for any referenced keys for which no
# definition was provided.
all_referenced_asset_keys = {
*(key for asset_def in assets_defs for key in asset_def.dependency_keys),
*(checks_def.asset_key for checks_def in asset_checks_defs),
}
for key in all_referenced_asset_keys.difference(asset_keys):
assets_defs.append(
external_asset_from_spec(
AssetSpec(key=key, metadata={SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET: True})
)
)
asset_keys.add(key)

if assets_defs or source_assets or asset_checks_defs:
for job_def in get_base_asset_jobs(
assets=assets_defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,20 +1651,7 @@ def external_asset_nodes_from_defs(

group_name_by_asset_key.update(asset_layer.group_names_by_assets())

asset_keys_without_definitions = all_upstream_asset_keys.difference(assets_defs_by_key.keys())

asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
execution_type=AssetExecutionType.UNEXECUTABLE,
job_names=[],
group_name=group_name_by_asset_key.get(asset_key),
code_version=code_version_by_asset_key.get(asset_key),
)
for asset_key in asset_keys_without_definitions
]
asset_nodes: List[ExternalAssetNode] = []

for asset_key, node_tuple_list in node_defs_by_asset_key.items():
node_output_handle, job_def = node_tuple_list[0]
Expand Down Expand Up @@ -1744,11 +1731,15 @@ def external_asset_nodes_from_defs(

# Ensure any external assets that are have no nodes in any job are included in the asset graph
for asset in assets_defs_by_key.values():
for key in [
key
for key in asset.keys
if (key not in node_defs_by_asset_key) and key not in asset_keys_without_definitions
]:
for key in [key for key in asset.keys if key not in node_defs_by_asset_key]:
# This is in place to preserve an implicit behavior in the Dagster UI where stub
# dependencies were rendered as if they weren't part of the default asset group.
group_name = (
None
if asset.is_auto_created_stub
else group_name_by_asset_key.get(key, DEFAULT_GROUP_NAME)
)

asset_nodes.append(
ExternalAssetNode(
asset_key=key,
Expand All @@ -1758,7 +1749,7 @@ def external_asset_nodes_from_defs(
job_names=[],
op_description=asset.descriptions_by_key.get(key),
metadata=asset.metadata_by_key.get(key),
group_name=asset.group_names_by_key.get(key),
group_name=group_name,
is_source=True,
is_observable=asset.is_observable,
auto_observe_interval_minutes=asset.auto_observe_interval_minutes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,21 +405,6 @@ def op1(): ...
assert asset_graph.get_required_multi_asset_keys(asset_def.key) == set()


def test_get_non_source_roots_missing_source(asset_graph_from_assets: Callable[..., AssetGraph]):
@asset
def foo():
pass

@asset(deps=["this_source_is_fake", "source_asset"])
def bar(foo):
pass

source_asset = SourceAsset("source_asset")

asset_graph = asset_graph_from_assets([foo, bar, source_asset])
assert asset_graph.get_materializable_roots(AssetKey("bar")) == {AssetKey("foo")}


def test_partitioned_source_asset(asset_graph_from_assets: Callable[..., AssetGraph]):
partitions_def = DailyPartitionsDefinition(start_date="2022-01-01")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,16 @@ def my_sling_assets(sling: SlingResource):

def test_with_custom_name(replication_config: SlingReplicationParam):
@sling_assets(replication_config=replication_config)
def my_sling_assets():
...
def my_sling_assets(): ...

assert my_sling_assets.op.name == "my_sling_assets"

@sling_assets(replication_config=replication_config)
def my_other_assets():
...
def my_other_assets(): ...

assert my_other_assets.op.name == "my_other_assets"

@sling_assets(replication_config=replication_config, name="custom_name")
def my_third_sling_assets():
...
def my_third_sling_assets(): ...

assert my_third_sling_assets.op.name == "custom_name"

0 comments on commit db6db44

Please sign in to comment.