Skip to content

Commit

Permalink
Make AssetsDefinition.node_def optional (#22165)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Makes the `_node_def` property on `AssetsDefinition` optional. This
means that:
- `AssetsDefinition`s constructed using `external_assets_from_specs` no
longer include a dummy function.
- `AssetsDefinition`s created from `SourceAsset`s no longer include a
dummy function.
- `{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: "UNEXECUTABLE"}` goes away
- `external_assets_from_specs(spec)` now functions identically to
`[AssetsDefinition(specs=[spec])]`

A question that came up while implementing this was: what to do about IO
managers? `SourceAsset` has an `io_manager_key` attribute. IO manager
keys are currently stored inside `OutputDefinition`s. Previously, when
converting `SourceAsset`s to `AssetsDefinition`s, we were stashing the
`io_manager_key` inside the output defs on the dummy node. This PR adds
a "dagster/io_manager_key" asset definition metadata key for these
cases. An alternative would be to give `AssetSpec` an `io_manager_key`
attribute.

## How I Tested These Changes
  • Loading branch information
sryza authored Jun 13, 2024
1 parent 1382886 commit 26e03b8
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"


# SYSTEM_METADATA_KEY_IO_MANAGER_KEY lives on the metadata of an asset without a node def and
# determines the io_manager_key that can be used to load it. This is necessary because IO manager
# keys are otherwise encoded inside OutputDefinitions within NodeDefinitions.
SYSTEM_METADATA_KEY_IO_MANAGER_KEY = "dagster/io_manager_key"

# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of
# external assets resulting from a source asset conversion. It contains the
# `auto_observe_interval_minutes` value from the source asset and is consulted
Expand Down
105 changes: 79 additions & 26 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@
)
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.definitions.utils import normalize_group_name, validate_asset_owner
from dagster._core.definitions.utils import (
DEFAULT_IO_MANAGER_KEY,
normalize_group_name,
validate_asset_owner,
)
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
Expand All @@ -60,7 +64,7 @@
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import ExperimentalWarning, disable_dagster_warnings

from .asset_spec import AssetSpec
from .asset_spec import SYSTEM_METADATA_KEY_IO_MANAGER_KEY, AssetSpec
from .dependency import NodeHandle
from .events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix
from .node_definition import NodeDefinition
Expand Down Expand Up @@ -102,7 +106,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
"owners_by_key",
}

_node_def: NodeDefinition
_node_def: Optional[NodeDefinition]
_keys_by_input_name: Mapping[str, AssetKey]
_keys_by_output_name: Mapping[str, AssetKey]
_partitions_def: Optional[PartitionsDefinition]
Expand All @@ -122,9 +126,9 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
def __init__(
self,
*,
keys_by_input_name: Mapping[str, AssetKey],
keys_by_output_name: Mapping[str, AssetKey],
node_def: NodeDefinition,
keys_by_input_name: Optional[Mapping[str, AssetKey]] = None,
keys_by_output_name: Optional[Mapping[str, AssetKey]] = None,
node_def: Optional[NodeDefinition] = None,
partitions_def: Optional[PartitionsDefinition] = None,
partition_mappings: Optional[Mapping[AssetKey, PartitionMapping]] = None,
asset_deps: Optional[Mapping[AssetKey, AbstractSet[AssetKey]]] = None,
Expand Down Expand Up @@ -162,14 +166,30 @@ def __init__(
if isinstance(node_def, GraphDefinition):
_validate_graph_def(node_def)

if node_def is None:
check.invariant(
not keys_by_input_name,
"node_def is None, so keys_by_input_name must be empty",
)
check.invariant(
not keys_by_output_name,
"node_def is None, so keys_by_output_name must be empty",
)
check.invariant(
backfill_policy is None, "node_def is None, so backfill_policy must be None"
)
check.invariant(
not can_subset, "node_def is None, so backfill_policy must not be provided"
)

self._node_def = node_def
self._keys_by_input_name = check.mapping_param(
self._keys_by_input_name = check.opt_mapping_param(
keys_by_input_name,
"keys_by_input_name",
key_type=str,
value_type=AssetKey,
)
self._keys_by_output_name = check.mapping_param(
self._keys_by_output_name = check.opt_mapping_param(
keys_by_output_name,
"keys_by_output_name",
key_type=str,
Expand Down Expand Up @@ -222,7 +242,7 @@ def __init__(
resolved_specs = specs

else:
all_asset_keys = set(keys_by_output_name.values())
all_asset_keys = set(self._keys_by_output_name.values())

if asset_deps:
check.invariant(
Expand All @@ -236,13 +256,15 @@ def __init__(
if partition_mappings:
_validate_partition_mappings(
partition_mappings=partition_mappings,
input_asset_keys=set(keys_by_input_name.values()),
input_asset_keys=set(self._keys_by_input_name.values()),
all_asset_keys=all_asset_keys,
)

check.invariant(node_def, "Must provide node_def if not providing specs")

resolved_specs = _asset_specs_from_attr_key_params(
all_asset_keys=all_asset_keys,
keys_by_input_name=keys_by_input_name,
keys_by_input_name=self._keys_by_input_name,
deps_by_asset_key=asset_deps,
partition_mappings=partition_mappings,
tags_by_key=tags_by_key,
Expand All @@ -256,7 +278,7 @@ def __init__(
)

normalized_specs: List[AssetSpec] = []
output_names_by_key = {key: name for name, key in keys_by_output_name.items()}
output_names_by_key = {key: name for name, key in self._keys_by_output_name.items()}

for spec in resolved_specs:
if spec.owners:
Expand All @@ -265,13 +287,27 @@ def __init__(

group_name = normalize_group_name(spec.group_name)

output_def, _ = node_def.resolve_output_to_origin(output_names_by_key[spec.key], None)
metadata = {**output_def.metadata, **(spec.metadata or {})}
if node_def is not None:
output_def, _ = node_def.resolve_output_to_origin(
output_names_by_key[spec.key], None
)
node_def_description = node_def.description
output_def_metadata = output_def.metadata
output_def_description = output_def.description
output_def_code_version = output_def.code_version
skippable = not output_def.is_required
else:
node_def_description = None
output_def_metadata = {}
output_def_description = None
output_def_code_version = None
skippable = False

metadata = {**output_def_metadata, **(spec.metadata or {})}
# We construct description from three sources of truth here. This
# highly unfortunate. See commentary in @multi_asset's call to dagster_internal_init.
description = spec.description or output_def.description or node_def.description
code_version = spec.code_version or output_def.code_version
skippable = not output_def.is_required
description = spec.description or output_def_description or node_def_description
code_version = spec.code_version or output_def_code_version

check.invariant(
not (
Expand All @@ -296,7 +332,9 @@ def __init__(
self._specs_by_key = {spec.key: spec for spec in normalized_specs}

self._partition_mappings = get_partition_mappings_from_deps(
{}, [dep for spec in normalized_specs for dep in spec.deps], node_def.name
{},
[dep for spec in normalized_specs for dep in spec.deps],
node_def.name if node_def else "external assets",
)
self._selected_asset_keys, self._selected_asset_check_keys = _resolve_selections(
all_asset_keys=self._specs_by_key.keys(),
Expand Down Expand Up @@ -362,8 +400,8 @@ def __call__(self, *args: object, **kwargs: object) -> object:
from .graph_definition import GraphDefinition

# defer to GraphDefinition.__call__ for graph backed assets, or if invoked in composition
if isinstance(self.node_def, GraphDefinition) or is_in_composition():
return self._node_def(*args, **kwargs)
if isinstance(self._node_def, GraphDefinition) or is_in_composition():
return check.not_none(self._node_def)(*args, **kwargs)

# invoke against self to allow assets def information to be used
return direct_invocation_result(self, *args, **kwargs)
Expand Down Expand Up @@ -781,7 +819,7 @@ def node_def(self) -> NodeDefinition:
"""NodeDefinition: Returns the OpDefinition or GraphDefinition that is used to materialize
the assets in this AssetsDefinition.
"""
return self._node_def
return check.not_none(self._node_def, "This AssetsDefinition has no node_def")

@public
@property
Expand Down Expand Up @@ -1024,6 +1062,9 @@ def check_key(self) -> AssetCheckKey:

@property
def execution_type(self) -> AssetExecutionType:
if self._node_def is None:
return AssetExecutionType.UNEXECUTABLE

value = self._get_external_asset_metadata_value(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if value is None:
return AssetExecutionType.MATERIALIZATION
Expand Down Expand Up @@ -1078,10 +1119,13 @@ def get_output_name_for_asset_check_key(self, key: AssetCheckKey) -> str:
f"Asset check key {key.to_user_string()} not found in AssetsDefinition"
)

def get_op_def_for_asset_key(self, key: AssetKey) -> OpDefinition:
def get_op_def_for_asset_key(self, key: AssetKey) -> Optional[OpDefinition]:
"""If this is an op-backed asset, returns the op def. If it's a graph-backed asset,
returns the op def within the graph that produces the given asset key.
"""
if self._node_def is None:
return None

output_name = self.get_output_name_for_asset_key(key)
return self.node_def.resolve_output_to_origin_op_def(output_name)

Expand Down Expand Up @@ -1387,10 +1431,19 @@ def _output_to_source_asset(self, output_name: str) -> SourceAsset:
)

def get_io_manager_key_for_asset_key(self, key: AssetKey) -> str:
output_name = self.get_output_name_for_asset_key(key)
return self.node_def.resolve_output_to_origin(
output_name, NodeHandle(self.node_def.name, parent=None)
)[0].io_manager_key
if self._node_def is None:
return self._specs_by_key[key].metadata.get(
SYSTEM_METADATA_KEY_IO_MANAGER_KEY, DEFAULT_IO_MANAGER_KEY
)
else:
check.invariant(
SYSTEM_METADATA_KEY_IO_MANAGER_KEY not in self._specs_by_key[key].metadata
)

output_name = self.get_output_name_for_asset_key(key)
return self.node_def.resolve_output_to_origin(
output_name, NodeHandle(self.node_def.name, parent=None)
)[0].io_manager_key

def get_resource_requirements(self) -> Iterator[ResourceRequirement]:
if self.is_executable:
Expand Down
Loading

0 comments on commit 26e03b8

Please sign in to comment.