From 046bc7f9614de0fd6e70d5772a4e2c21dc77a8e5 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Thu, 8 Feb 2024 11:54:49 -0500 Subject: [PATCH] [external-assets] Make execution_type an AssetsDefinition property --- .../dagster_graphql/implementation/loader.py | 2 + .../dagster/_core/definitions/asset_checks.py | 1 + .../dagster/_core/definitions/asset_layer.py | 11 ++ .../dagster/_core/definitions/asset_spec.py | 10 +- .../dagster/_core/definitions/assets.py | 53 ++++--- .../definitions/decorators/asset_decorator.py | 18 ++- .../_core/definitions/external_asset.py | 27 +--- .../dagster/_core/definitions/source_asset.py | 9 ++ .../_core/execution/plan/execute_step.py | 2 +- .../host_representation/external_data.py | 60 ++++++-- .../test_external_data.py | 38 ++++- .../definitions_tests/test_external_assets.py | 7 +- .../definitions_tests/test_observe_result.py | 141 ++++++++++-------- 13 files changed, 248 insertions(+), 131 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py b/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py index c239a6fa03546..c63b97fdba93d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py @@ -7,6 +7,7 @@ DagsterInstance, _check as check, ) +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import CachingStaleStatusResolver from dagster._core.definitions.events import AssetKey from dagster._core.events.log import EventLogEntry @@ -315,6 +316,7 @@ def _build_cross_repo_deps( ) ], depended_by=[], + execution_type=AssetExecutionType.UNEXECUTABLE, ) return sink_assets, external_asset_deps diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 75e6dcf3cedbc..feebefcfea5f0 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -255,4 +255,5 @@ def blocking_asset(**kwargs): auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), backfill_policy=asset_def.backfill_policy, config=None, # gets config from asset_def.op + _execution_type=asset_def.execution_type, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index f8ab025115d64..ea27c80eb7e92 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -20,6 +20,9 @@ import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.asset_spec import ( + AssetExecutionType, +) from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.metadata import ( ArbitraryMetadataMapping, @@ -706,6 +709,14 @@ def input_for_asset_key(self, node_handle: NodeHandle, key: AssetKey) -> Optiona def io_manager_key_for_asset(self, asset_key: AssetKey) -> str: return self.io_manager_keys_by_asset_key.get(asset_key, "io_manager") + def execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: + if asset_key in self.assets_defs_by_key: + return self.assets_defs_by_key[asset_key].execution_type + elif asset_key in self.source_assets_by_key: + return self.source_assets_by_key[asset_key].execution_type + else: + check.failed(f"Couldn't find key {asset_key}") + def is_observable_for_asset(self, asset_key: AssetKey) -> bool: return ( asset_key in self.source_assets_by_key diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 62c4d4a8589fa..ae86a034261b8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -3,6 +3,7 @@ import dagster._check as check from dagster._annotations import PublicAttr +from dagster._serdes.serdes import whitelist_for_serdes from .auto_materialize_policy import AutoMaterializePolicy from .events import ( @@ -15,15 +16,8 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep -# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset -# (which currently ends up on the Output associated with the asset key) -# whih encodes the execution type the of asset. "Unexecutable" assets are assets -# that cannot be materialized in Dagster, but can have events in the event -# log keyed off of them, making Dagster usable as a observability and lineage tool -# for externally materialized assets. -SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type" - +@whitelist_for_serdes class AssetExecutionType(Enum): OBSERVATION = "OBSERVATION" UNEXECUTABLE = "UNEXECUTABLE" diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 270f62a3a632c..e6496d915faa5 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -94,6 +94,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit): _descriptions_by_key: Mapping[AssetKey, str] _selected_asset_check_keys: AbstractSet[AssetCheckKey] _is_subset: bool + _execution_type: AssetExecutionType def __init__( self, @@ -116,6 +117,7 @@ def __init__( check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None, is_subset: bool = False, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, # if adding new fields, make sure to handle them in the with_attributes, from_graph, and # get_attributes_dict methods ): @@ -315,6 +317,9 @@ def __init__( ) self._is_subset = check.bool_param(is_subset, "is_subset") + self._execution_type = check.inst_param( + _execution_type, "_execution_type", AssetExecutionType + ) @staticmethod def dagster_internal_init( @@ -337,6 +342,7 @@ def dagster_internal_init( check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]], selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], is_subset: bool, + _execution_type: AssetExecutionType, ) -> "AssetsDefinition": return AssetsDefinition( keys_by_input_name=keys_by_input_name, @@ -357,6 +363,7 @@ def dagster_internal_init( check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=selected_asset_check_keys, is_subset=is_subset, + _execution_type=_execution_type, ) def __call__(self, *args: object, **kwargs: object) -> object: @@ -394,6 +401,7 @@ def from_graph( backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, check_specs: Optional[Sequence[AssetCheckSpec]] = None, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ) -> "AssetsDefinition": """Constructs an AssetsDefinition from a GraphDefinition. @@ -466,6 +474,7 @@ def from_graph( backfill_policy=backfill_policy, can_subset=can_subset, check_specs=check_specs, + _execution_type=_execution_type, ) @public @@ -489,6 +498,7 @@ def from_op( ] = None, backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ) -> "AssetsDefinition": """Constructs an AssetsDefinition from an OpDefinition. @@ -554,6 +564,7 @@ def from_op( auto_materialize_policies_by_output_name=auto_materialize_policies_by_output_name, backfill_policy=backfill_policy, can_subset=can_subset, + _execution_type=_execution_type, ) @staticmethod @@ -578,6 +589,7 @@ def _from_node( backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, check_specs: Optional[Sequence[AssetCheckSpec]] = None, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ) -> "AssetsDefinition": from dagster._core.definitions.decorators.asset_decorator import ( _assign_output_names_to_check_specs, @@ -715,6 +727,7 @@ def _from_node( check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=None, is_subset=False, + _execution_type=_execution_type, ) @public @@ -908,33 +921,25 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]: """ return self._selected_asset_check_keys - def is_asset_executable(self, asset_key: AssetKey) -> bool: - """Returns True if the asset key is materializable by this AssetsDefinition. - - Args: - asset_key (AssetKey): The asset key to check. + @property + def execution_type(self) -> AssetExecutionType: + return self._execution_type - Returns: - bool: True if the asset key is materializable by this AssetsDefinition. - """ - from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, - ) + @property + def is_external(self) -> bool: + return self.execution_type != AssetExecutionType.MATERIALIZATION - return AssetExecutionType.is_executable( - self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) - ) + @property + def is_observable(self) -> bool: + return self.execution_type == AssetExecutionType.OBSERVATION - def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: - from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, - ) + @property + def is_materializable(self) -> bool: + return self.execution_type == AssetExecutionType.MATERIALIZATION - return AssetExecutionType.str_to_enum( - self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) - ) + @property + def is_executable(self) -> bool: + return self.execution_type != AssetExecutionType.UNEXECUTABLE def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) @@ -1133,6 +1138,7 @@ def with_attributes( selected_asset_check_keys=selected_asset_check_keys if selected_asset_check_keys else self._selected_asset_check_keys, + _execution_type=self.execution_type, ) return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes)) @@ -1378,6 +1384,7 @@ def get_attributes_dict(self) -> Dict[str, Any]: descriptions_by_key=self._descriptions_by_key, check_specs_by_output_name=self._check_specs_by_output_name, selected_asset_check_keys=self._selected_asset_check_keys, + _execution_type=self._execution_type, ) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index ddb9b46a0f258..c292104a1c743 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -40,7 +40,7 @@ from ..asset_check_spec import AssetCheckSpec from ..asset_in import AssetIn from ..asset_out import AssetOut -from ..asset_spec import AssetSpec +from ..asset_spec import AssetExecutionType, AssetSpec from ..assets import ASSET_SUBSET_INPUT_PREFIX, AssetsDefinition from ..backfill_policy import BackfillPolicy, BackfillPolicyType from ..decorators.graph_decorator import graph @@ -89,6 +89,7 @@ def asset( key: Optional[CoercibleToAssetKey] = None, non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = ..., + _execution_type: AssetExecutionType = ..., ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -128,6 +129,7 @@ def asset( key: Optional[CoercibleToAssetKey] = None, non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Create a definition for how to compute an asset. @@ -241,6 +243,7 @@ def create_asset(): code_version=code_version, check_specs=check_specs, key=key, + execution_type=_execution_type, ) if compute_fn is not None: @@ -313,6 +316,7 @@ def __init__( code_version: Optional[str] = None, key: Optional[CoercibleToAssetKey] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, + execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ): self.name = name self.key_prefix = key_prefix @@ -340,6 +344,7 @@ def __init__( self.code_version = code_version self.check_specs = check_specs self.key = key + self.execution_type = execution_type def __call__(self, fn: Callable) -> AssetsDefinition: from dagster._config.pythonic_config import ( @@ -484,6 +489,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition: check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=None, # no subselection in decorator is_subset=False, + _execution_type=self.execution_type, ) @@ -512,6 +518,7 @@ def multi_asset( code_version: Optional[str] = None, specs: Optional[Sequence[AssetSpec]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, # deprecated non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: @@ -882,6 +889,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=None, # no subselection in decorator is_subset=False, + _execution_type=_execution_type, ) return inner @@ -1019,6 +1027,7 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, + _execution_type: AssetExecutionType = ..., ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ... @@ -1040,6 +1049,7 @@ def graph_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Creates a software-defined asset that's computed using a graph of ops. @@ -1117,6 +1127,7 @@ def slack_files_table(): resource_defs=resource_defs, check_specs=check_specs, key=key, + _execution_type=_execution_type, ) else: return graph_asset_no_defaults( @@ -1135,6 +1146,7 @@ def slack_files_table(): resource_defs=resource_defs, check_specs=check_specs, key=key, + _execution_type=_execution_type, ) @@ -1155,6 +1167,7 @@ def graph_asset_no_defaults( resource_defs: Optional[Mapping[str, ResourceDefinition]], check_specs: Optional[Sequence[AssetCheckSpec]], key: Optional[CoercibleToAssetKey], + _execution_type: AssetExecutionType, ) -> AssetsDefinition: ins = ins or {} asset_ins = build_asset_ins(compose_fn, ins or {}, set()) @@ -1210,6 +1223,7 @@ def graph_asset_no_defaults( descriptions_by_output_name={"result": description} if description else None, resource_defs=resource_defs, check_specs=check_specs, + _execution_type=_execution_type, ) @@ -1225,6 +1239,7 @@ def graph_multi_asset( resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, config: Optional[Union[ConfigMapping, Mapping[str, Any]]] = None, + _execution_type: AssetExecutionType = AssetExecutionType.MATERIALIZATION, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a combined definition of multiple assets that are computed using the same graph of ops, and the same upstream assets. @@ -1337,6 +1352,7 @@ def inner(fn: Callable) -> AssetsDefinition: descriptions_by_output_name=descriptions_by_output_name, resource_defs=resource_defs, check_specs=check_specs, + _execution_type=_execution_type, ) return inner diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index dea5cf03085bc..57142ef838bc2 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -2,7 +2,6 @@ from dagster import _check as check from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, AssetExecutionType, AssetSpec, ) @@ -109,17 +108,11 @@ def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinit key=spec.key, description=spec.description, group_name=spec.group_name, - metadata={ - **(spec.metadata or {}), - **{ - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( - AssetExecutionType.UNEXECUTABLE.value - ) - }, - }, + metadata=spec.metadata, deps=spec.deps, ) ], + _execution_type=AssetExecutionType.UNEXECUTABLE, ) def _external_assets_def(context: AssetExecutionContext) -> None: raise DagsterInvariantViolationError( @@ -139,21 +132,17 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets " should be None", ) - injected_metadata = ( - {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} - if source_asset.observe_fn is None - else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value} - ) - kwargs = { "key": source_asset.key, - "metadata": { - **source_asset.metadata, - **injected_metadata, - }, + "metadata": source_asset.metadata, "group_name": source_asset.group_name, "description": source_asset.description, "partitions_def": source_asset.partitions_def, + "_execution_type": ( + AssetExecutionType.UNEXECUTABLE + if source_asset.observe_fn is None + else AssetExecutionType.OBSERVATION + ), } if source_asset.io_manager_def: diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 7073382cad2d7..f1404f8e05452 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -15,6 +15,7 @@ import dagster._check as check from dagster._annotations import PublicAttr, experimental_param, public from dagster._core.decorator_utils import get_function_params +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import ( DATA_VERSION_TAG, DataVersion, @@ -269,6 +270,14 @@ def op(self) -> OpDefinition: ) return cast(OpDefinition, self.node_def) + @property + def execution_type(self) -> AssetExecutionType: + return ( + AssetExecutionType.OBSERVATION + if self.is_observable + else AssetExecutionType.UNEXECUTABLE + ) + @public @property def is_observable(self) -> bool: diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 311b808db893b..deee4e9f1ed30 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -880,7 +880,7 @@ def _log_asset_materialization_events_for_asset( if asset_key: asset_layer = step_context.job_def.asset_layer execution_type = ( - asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key) + asset_layer.assets_def_for_asset(asset_key).execution_type if asset_layer.has_assets_def_for_asset(asset_key) else AssetExecutionType.MATERIALIZATION ) diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 2a1c58267232f..34c1644c14f1c 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -50,7 +50,6 @@ from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, AssetExecutionType, ) from dagster._core.definitions.assets import AssetsDefinition @@ -99,7 +98,9 @@ from dagster._core.snap.mode import ResourceDefSnap, build_resource_def_snap from dagster._core.storage.io_manager import IOManagerDefinition from dagster._serdes import whitelist_for_serdes -from dagster._serdes.serdes import is_whitelisted_for_serdes_object +from dagster._serdes.serdes import ( + is_whitelisted_for_serdes_object, +) from dagster._utils.error import SerializableErrorInfo if TYPE_CHECKING: @@ -1160,6 +1161,11 @@ def key(self) -> AssetCheckKey: return AssetCheckKey(asset_key=self.asset_key, name=self.name) +# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE used to live on the metadata of external assets and +# encoded the execution type the of asset. +_SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type" + + @whitelist_for_serdes( storage_field_names={"metadata": "metadata_entries"}, field_serializers={"metadata": MetadataFieldSerializer}, @@ -1171,6 +1177,7 @@ class ExternalAssetNode( ("asset_key", AssetKey), ("dependencies", Sequence[ExternalAssetDependency]), ("depended_by", Sequence[ExternalAssetDependedBy]), + ("execution_type", AssetExecutionType), ("compute_kind", Optional[str]), ("op_name", Optional[str]), ("op_names", Sequence[str]), @@ -1210,6 +1217,7 @@ def __new__( asset_key: AssetKey, dependencies: Sequence[ExternalAssetDependency], depended_by: Sequence[ExternalAssetDependedBy], + execution_type: Optional[AssetExecutionType] = None, compute_kind: Optional[str] = None, op_name: Optional[str] = None, op_names: Optional[Sequence[str]] = None, @@ -1232,6 +1240,32 @@ def __new__( backfill_policy: Optional[BackfillPolicy] = None, auto_observe_interval_minutes: Optional[float] = None, ): + metadata = normalize_metadata(check.opt_mapping_param(metadata, "metadata", key_type=str)) + + # backcompat logic for execution type specified via metadata + if _SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE in metadata: + val = metadata[_SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] + if not isinstance(val, TextMetadataValue): + check.failed( + f"Expected metadata value for key {_SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE} to be a TextMetadataValue, got {val}" + ) + metadata_execution_type = AssetExecutionType.str_to_enum(val.value) + if execution_type is not None: + check.invariant( + execution_type == metadata_execution_type, + f"Execution type {execution_type} in metadata does not match type inferred from metadata {metadata_execution_type}", + ) + execution_type = metadata_execution_type + else: + execution_type = ( + check.opt_inst_param( + execution_type, + "execution_type", + AssetExecutionType, + ) + or AssetExecutionType.MATERIALIZATION + ) + # backcompat logic to handle ExternalAssetNodes serialized without op_names/graph_name if not op_names: op_names = list(filter(None, [op_name])) @@ -1266,9 +1300,7 @@ def __new__( ), output_name=check.opt_str_param(output_name, "output_name"), output_description=check.opt_str_param(output_description, "output_description"), - metadata=normalize_metadata( - check.opt_mapping_param(metadata, "metadata", key_type=str) - ), + metadata=metadata, group_name=check.opt_str_param(group_name, "group_name"), freshness_policy=check.opt_inst_param( freshness_policy, "freshness_policy", FreshnessPolicy @@ -1292,19 +1324,12 @@ def __new__( auto_observe_interval_minutes=check.opt_numeric_param( auto_observe_interval_minutes, "auto_observe_interval_minutes" ), + execution_type=check.inst_param(execution_type, "execution_type", AssetExecutionType), ) @property def is_executable(self) -> bool: - metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) - if not metadata_value: - varietal_text = None - else: - check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error - assert isinstance(metadata_value, TextMetadataValue) # for type checker - varietal_text = metadata_value.value - - return AssetExecutionType.is_executable(varietal_text) + return self.execution_type != AssetExecutionType.UNEXECUTABLE ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]] @@ -1546,6 +1571,7 @@ def external_asset_nodes_from_defs( group_name_by_asset_key: Dict[AssetKey, str] = {} descriptions_by_asset_key: Dict[AssetKey, str] = {} atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {} + execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {} for job_def in job_defs: asset_layer = job_def.asset_layer @@ -1569,6 +1595,9 @@ def external_asset_nodes_from_defs( all_upstream_asset_keys.update(upstream_asset_keys) node_defs_by_asset_key[output_key].append((node_output_handle, job_def)) asset_info_by_asset_key[output_key] = asset_info + execution_types_by_asset_key[output_key] = asset_layer.execution_type_for_asset( + output_key + ) for upstream_key in upstream_asset_keys: partition_mapping = asset_layer.partition_mapping_for_node_input( @@ -1614,6 +1643,7 @@ def external_asset_nodes_from_defs( asset_key=asset_key, dependencies=list(deps[asset_key].values()), depended_by=list(dep_by[asset_key].values()), + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=group_name_by_asset_key.get(asset_key), code_version=code_version_by_asset_key.get(asset_key), @@ -1647,6 +1677,7 @@ def external_asset_nodes_from_defs( asset_key=source_asset.key, dependencies=list(deps[source_asset.key].values()), depended_by=list(dep_by[source_asset.key].values()), + execution_type=source_asset.execution_type, job_names=job_names, op_description=source_asset.description, metadata=source_asset.metadata, @@ -1703,6 +1734,7 @@ def external_asset_nodes_from_defs( asset_key=asset_key, dependencies=list(deps[asset_key].values()), depended_by=list(dep_by[asset_key].values()), + execution_type=execution_types_by_asset_key[asset_key], compute_kind=node_def.tags.get("kind"), # backcompat op_name=graph_name diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index a8a873f15a6a7..f742bde72a8a2 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -20,7 +20,7 @@ from dagster._check import ParameterCheckError from dagster._core.definitions import AssetIn, SourceAsset, asset, build_assets_job, multi_asset from dagster._core.definitions.asset_graph import AssetGraph -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.asset_spec import AssetExecutionType, AssetSpec from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.external_asset import external_assets_from_specs from dagster._core.definitions.metadata import MetadataValue, TextMetadataValue, normalize_metadata @@ -57,6 +57,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -82,6 +83,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -108,6 +110,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -152,6 +155,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -229,6 +233,7 @@ def asset2(asset1): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -242,6 +247,7 @@ def asset2(asset1): asset_key=AssetKey("asset2"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2", node_definition_name="asset2", graph_name=None, @@ -269,6 +275,7 @@ def something(result): asset_key=AssetKey("not_result"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("something"))], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, ), @@ -276,6 +283,7 @@ def something(result): asset_key=AssetKey("something"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("not_result"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="something", node_definition_name="something", graph_name=None, @@ -320,6 +328,7 @@ def c2(c): asset_key=AssetKey("a"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("a2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="abc", node_definition_name="abc", graph_name=None, @@ -337,6 +346,7 @@ def c2(c): asset_key=AssetKey("c"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("c2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="abc", node_definition_name="abc", graph_name=None, @@ -373,6 +383,7 @@ def asset2_b(asset1): ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2_a")), ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2_b")), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -386,6 +397,7 @@ def asset2_b(asset1): asset_key=AssetKey("asset2_a"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2_a", node_definition_name="asset2_a", graph_name=None, @@ -399,6 +411,7 @@ def asset2_b(asset1): asset_key=AssetKey("asset2_b"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2_b", node_definition_name="asset2_b", graph_name=None, @@ -431,6 +444,7 @@ def asset2(asset1): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -444,6 +458,7 @@ def asset2(asset1): asset_key=AssetKey("asset2"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2", node_definition_name="asset2", graph_name=None, @@ -471,6 +486,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -505,6 +521,7 @@ def assets(): asset_key=AssetKey(f"asset{i}"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -573,6 +590,7 @@ def assets(in1, in2): ExternalAssetDependency(upstream_asset_key=AssetKey(["only_out"])), ], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="downstream", node_definition_name="downstream", graph_name=None, @@ -590,6 +608,7 @@ def assets(in1, in2): ExternalAssetDependedBy(downstream_asset_key=AssetKey(["mixed"])), ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_in"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="in1", node_definition_name="in1", graph_name=None, @@ -604,6 +623,7 @@ def assets(in1, in2): asset_key=AssetKey(["in2"]), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_in"]))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="in2", node_definition_name="in2", graph_name=None, @@ -624,6 +644,7 @@ def assets(in1, in2): ExternalAssetDependedBy(downstream_asset_key=AssetKey(["downstream"])), ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_out"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -644,6 +665,7 @@ def assets(in1, in2): ExternalAssetDependedBy(downstream_asset_key=AssetKey(["mixed"])), ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_out"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -663,6 +685,7 @@ def assets(in1, in2): depended_by=[ ExternalAssetDependedBy(downstream_asset_key=AssetKey(["downstream"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -688,6 +711,7 @@ def bar(foo): assert external_asset_nodes == [ ExternalAssetNode( asset_key=AssetKey("foo"), + execution_type=AssetExecutionType.UNEXECUTABLE, op_description=None, dependencies=[], depended_by=[ExternalAssetDependedBy(AssetKey("bar"))], @@ -696,6 +720,7 @@ def bar(foo): ), ExternalAssetNode( asset_key=AssetKey("bar"), + execution_type=AssetExecutionType.MATERIALIZATION, op_name="bar", node_definition_name="bar", graph_name=None, @@ -723,6 +748,7 @@ def test_unused_source_asset(): op_description="abc", dependencies=[], depended_by=[], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, is_source=True, @@ -732,6 +758,7 @@ def test_unused_source_asset(): op_description="def", dependencies=[], depended_by=[], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, is_source=True, @@ -757,6 +784,7 @@ def foo(bar): op_description="def", dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey(["foo"]))], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, is_source=True, @@ -770,6 +798,7 @@ def foo(bar): op_description=None, dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey(["bar"]))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, job_names=["job1"], output_name="result", group_name=DEFAULT_GROUP_NAME, @@ -832,6 +861,7 @@ def zero(): asset_key=AssetKey(["three"]), dependencies=[ExternalAssetDependency(AssetKey(["zero"]))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="three", node_definition_name="add_one", graph_name="three", @@ -846,6 +876,7 @@ def zero(): asset_key=AssetKey(["zero"]), dependencies=[], depended_by=[ExternalAssetDependedBy(AssetKey(["three"]))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="zero", node_definition_name="zero", graph_name=None, @@ -946,6 +977,7 @@ def create_twenty(thirteen, six): ExternalAssetDependency(AssetKey(["zero"])), ], depended_by=[ExternalAssetDependedBy(AssetKey(["twenty"]))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="create_thirteen_and_six", node_definition_name="add_one", graph_name="create_thirteen_and_six", @@ -969,6 +1001,7 @@ def create_twenty(thirteen, six): ExternalAssetDependency(AssetKey(["thirteen"])), ], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="create_twenty", node_definition_name="add_one", graph_name="create_twenty", @@ -988,6 +1021,7 @@ def create_twenty(thirteen, six): ExternalAssetDependedBy(AssetKey(["six"])), ExternalAssetDependedBy(AssetKey(["thirteen"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="zero", node_definition_name="zero", graph_name=None, @@ -1018,6 +1052,7 @@ def asset2(asset1): asset_key=AssetKey(["abc", "asset1"]), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="abc__asset1", node_definition_name="abc__asset1", graph_name=None, @@ -1031,6 +1066,7 @@ def asset2(asset1): asset_key=AssetKey("asset2"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey(["abc", "asset1"]))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2", node_definition_name="asset2", graph_name=None, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 720bc97f36e2f..d164d7e349af6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -51,7 +51,7 @@ def test_external_asset_basic_creation() -> None: assert assets_def.metadata_by_key[expected_key]["user_metadata"] == "value" assert assets_def.group_names_by_key[expected_key] == "a_group" assert assets_def.descriptions_by_key[expected_key] == "desc" - assert assets_def.is_asset_executable(expected_key) is False + assert not assets_def.is_executable def test_multi_external_asset_basic_creation() -> None: @@ -92,7 +92,7 @@ def test_normal_asset_materializeable() -> None: def an_asset() -> None: ... - assert an_asset.is_asset_executable(AssetKey(["an_asset"])) is True + assert an_asset.is_executable def test_external_asset_creation_with_deps() -> None: @@ -234,7 +234,8 @@ def an_observable_source_asset() -> DataVersion: return DataVersion("foo") assets_def = create_external_asset_from_source_asset(an_observable_source_asset) - assert assets_def.is_asset_executable(an_observable_source_asset.key) + assert assets_def.is_executable + assert assets_def.is_observable defs = Definitions(assets=[assets_def]) instance = DagsterInstance.ephemeral() diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py index 099f903ff7774..fe81689de03d3 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any, Callable, Dict, Generator, Tuple +from typing import Generator, Tuple import pytest from dagster import ( @@ -19,7 +19,6 @@ ) from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, AssetExecutionType, ) from dagster._core.definitions.assets import AssetsDefinition @@ -35,38 +34,8 @@ def _exec_asset(asset_def, selection=None, partition_key=None): return result.asset_observations_for_node(asset_def.node_def.name) -def _with_observe_metadata(kwargs: Dict[str, Any]) -> Dict[str, Any]: - metadata = kwargs.pop("metadata", {}) - metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] = AssetExecutionType.OBSERVATION.value - return {**kwargs, "metadata": metadata} - - -def _external_observable_asset(**kwargs) -> Callable[..., AssetsDefinition]: - def _decorator(fn: Callable[..., Any]) -> AssetsDefinition: - new_kwargs = _with_observe_metadata(kwargs) - return asset(**new_kwargs)(fn) - - return _decorator - - -def _external_observable_multi_asset(**kwargs) -> Callable[..., AssetsDefinition]: - def _decorator(fn: Callable[..., Any]) -> AssetsDefinition: - if "outs" in kwargs: - kwargs["outs"] = { - name: AssetOut(**_with_observe_metadata(out._asdict())) - for name, out in kwargs["outs"].items() - } - elif "specs" in kwargs: - kwargs["specs"] = [ - AssetSpec(**_with_observe_metadata(spec._asdict())) for spec in kwargs["specs"] - ] - return multi_asset(**kwargs)(fn) - - return _decorator - - def test_observe_result_asset(): - @_external_observable_asset() + @asset(_execution_type=AssetExecutionType.OBSERVATION) def ret_untyped(context: AssetExecutionContext): return ObserveResult( metadata={"one": 1}, @@ -77,7 +46,7 @@ def ret_untyped(context: AssetExecutionContext): assert "one" in observations[0].metadata # key mismatch - @_external_observable_asset() + @asset(_execution_type=AssetExecutionType.OBSERVATION) def ret_mismatch(context: AssetExecutionContext): return ObserveResult( asset_key="random", @@ -99,7 +68,7 @@ def ret_mismatch(context: AssetExecutionContext): ret_mismatch(build_asset_context()) # tuple - @_external_observable_asset() + @asset(_execution_type=AssetExecutionType.OBSERVATION) def ret_two(): return ObserveResult(metadata={"one": 1}), ObserveResult(metadata={"two": 2}) @@ -115,8 +84,9 @@ def ret_two(): def test_return_observe_result_with_asset_checks(): with instance_for_test() as instance: - @_external_observable_asset( - check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey("ret_checks"))] + @asset( + check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey("ret_checks"))], + _execution_type=AssetExecutionType.OBSERVATION, ) def ret_checks(context: AssetExecutionContext): return ObserveResult( @@ -141,7 +111,9 @@ def ret_checks(context: AssetExecutionContext): def test_multi_asset_observe_result(): - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, _execution_type=AssetExecutionType.OBSERVATION + ) def outs_multi_asset(): return ObserveResult(asset_key="one", metadata=({"foo": "bar"})), ObserveResult( asset_key="two", metadata={"baz": "qux"} @@ -153,11 +125,12 @@ def outs_multi_asset(): assert res[0].metadata["foo"] == "bar" assert res[1].metadata["baz"] == "qux" - @_external_observable_multi_asset( + @multi_asset( specs=[ AssetSpec(["prefix", "one"]), AssetSpec(["prefix", "two"]), - ] + ], + _execution_type=AssetExecutionType.OBSERVATION, ) def specs_multi_asset(): return ObserveResult(asset_key=["prefix", "one"], metadata={"foo": "bar"}), ObserveResult( @@ -175,7 +148,10 @@ def test_yield_materialization_multi_asset(): # # yield successful # - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def multi(): yield ObserveResult( asset_key="one", @@ -198,7 +174,10 @@ def multi(): # # missing a non optional out # - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def missing(): yield ObserveResult( asset_key="one", @@ -223,7 +202,10 @@ def missing(): # # missing asset_key # - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def no_key(): yield ObserveResult( metadata={"one": 1}, @@ -253,7 +235,10 @@ def no_key(): # # return tuple success # - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def ret_multi(): return ( ObserveResult( @@ -278,7 +263,10 @@ def ret_multi(): # # return list error # - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def ret_list(): return [ ObserveResult( @@ -323,7 +311,7 @@ def handle_output(self, context, obj): def load_input(self, context): return 1 - @_external_observable_asset() + @asset(_execution_type=AssetExecutionType.OBSERVATION) def asset_with_type_annotation() -> ObserveResult: return ObserveResult(metadata={"foo": "bar"}) @@ -331,7 +319,10 @@ def asset_with_type_annotation() -> ObserveResult: [asset_with_type_annotation], resources={"io_manager": TestingIOManager()} ).success - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def multi_asset_with_outs_and_type_annotation() -> Tuple[ObserveResult, ObserveResult]: return ObserveResult(asset_key="one"), ObserveResult(asset_key="two") @@ -339,7 +330,13 @@ def multi_asset_with_outs_and_type_annotation() -> Tuple[ObserveResult, ObserveR [multi_asset_with_outs_and_type_annotation], resources={"io_manager": TestingIOManager()} ).success - @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) + @multi_asset( + specs=[ + AssetSpec(["one"]), + AssetSpec(["two"]), + ], + _execution_type=AssetExecutionType.OBSERVATION, + ) def multi_asset_with_specs_and_type_annotation() -> Tuple[ObserveResult, ObserveResult]: return ObserveResult(asset_key="one"), ObserveResult(asset_key="two") @@ -347,7 +344,13 @@ def multi_asset_with_specs_and_type_annotation() -> Tuple[ObserveResult, Observe [multi_asset_with_specs_and_type_annotation], resources={"io_manager": TestingIOManager()} ).success - @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) + @multi_asset( + specs=[ + AssetSpec(["one"]), + AssetSpec(["two"]), + ], + _execution_type=AssetExecutionType.OBSERVATION, + ) def multi_asset_with_specs_and_no_type_annotation(): return ObserveResult(asset_key="one"), ObserveResult(asset_key="two") @@ -356,11 +359,12 @@ def multi_asset_with_specs_and_no_type_annotation(): resources={"io_manager": TestingIOManager()}, ).success - @_external_observable_asset( + @asset( check_specs=[ AssetCheckSpec(name="check_one", asset="with_checks"), AssetCheckSpec(name="check_two", asset="with_checks"), - ] + ], + _execution_type=AssetExecutionType.OBSERVATION, ) def with_checks(context: AssetExecutionContext) -> ObserveResult: return ObserveResult( @@ -381,7 +385,7 @@ def with_checks(context: AssetExecutionContext) -> ObserveResult: resources={"io_manager": TestingIOManager()}, ).success - @_external_observable_multi_asset( + @multi_asset( specs=[ AssetSpec("asset_one"), AssetSpec("asset_two"), @@ -390,6 +394,7 @@ def with_checks(context: AssetExecutionContext) -> ObserveResult: AssetCheckSpec(name="check_one", asset="asset_one"), AssetCheckSpec(name="check_two", asset="asset_two"), ], + _execution_type=AssetExecutionType.OBSERVATION, ) def multi_checks(context: AssetExecutionContext) -> Tuple[ObserveResult, ObserveResult]: return ObserveResult( @@ -439,7 +444,7 @@ def generator_asset() -> Generator[ObserveResult, None, None]: def test_observe_result_generators(): - @_external_observable_asset() + @asset(_execution_type=AssetExecutionType.OBSERVATION) def generator_asset() -> Generator[ObserveResult, None, None]: yield ObserveResult(metadata={"foo": "bar"}) @@ -451,7 +456,10 @@ def generator_asset() -> Generator[ObserveResult, None, None]: assert len(res) == 1 assert res[0].metadata["foo"] == "bar" - @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) + @multi_asset( + specs=[AssetSpec("one"), AssetSpec("two")], + _execution_type=AssetExecutionType.OBSERVATION, + ) def generator_specs_multi_asset(): yield ObserveResult(asset_key="one", metadata={"foo": "bar"}) yield ObserveResult(asset_key="two", metadata={"baz": "qux"}) @@ -466,7 +474,10 @@ def generator_specs_multi_asset(): assert res[0].metadata["foo"] == "bar" assert res[1].metadata["baz"] == "qux" - @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) + @multi_asset( + outs={"one": AssetOut(), "two": AssetOut()}, + _execution_type=AssetExecutionType.OBSERVATION, + ) def generator_outs_multi_asset(): yield ObserveResult(asset_key="one", metadata={"foo": "bar"}) yield ObserveResult(asset_key="two", metadata={"baz": "qux"}) @@ -481,7 +492,10 @@ def generator_outs_multi_asset(): assert res[0].metadata["foo"] == "bar" assert res[1].metadata["baz"] == "qux" - @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) + @multi_asset( + specs=[AssetSpec("one"), AssetSpec("two")], + _execution_type=AssetExecutionType.OBSERVATION, + ) async def async_specs_multi_asset(): return ObserveResult(asset_key="one", metadata={"foo": "bar"}), ObserveResult( asset_key="two", metadata={"baz": "qux"} @@ -497,7 +511,10 @@ async def async_specs_multi_asset(): assert res[0].metadata["foo"] == "bar" assert res[1].metadata["baz"] == "qux" - @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) + @multi_asset( + specs=[AssetSpec("one"), AssetSpec("two")], + _execution_type=AssetExecutionType.OBSERVATION, + ) async def async_gen_specs_multi_asset(): yield ObserveResult(asset_key="one", metadata={"foo": "bar"}) yield ObserveResult(asset_key="two", metadata={"baz": "qux"}) @@ -520,8 +537,9 @@ async def _run_async_gen(): def test_observe_result_with_partitions(): - @_external_observable_asset( - partitions_def=StaticPartitionsDefinition(["red", "blue", "yellow"]) + @asset( + partitions_def=StaticPartitionsDefinition(["red", "blue", "yellow"]), + _execution_type=AssetExecutionType.OBSERVATION, ) def partitioned_asset(context: AssetExecutionContext) -> ObserveResult: return ObserveResult(metadata={"key": context.partition_key}) @@ -532,8 +550,9 @@ def partitioned_asset(context: AssetExecutionContext) -> ObserveResult: def test_observe_result_with_partitions_direct_invocation(): - @_external_observable_asset( - partitions_def=StaticPartitionsDefinition(["red", "blue", "yellow"]) + @asset( + partitions_def=StaticPartitionsDefinition(["red", "blue", "yellow"]), + _execution_type=AssetExecutionType.OBSERVATION, ) def partitioned_asset(context: AssetExecutionContext) -> ObserveResult: return ObserveResult(metadata={"key": context.partition_key})