From 66268de8a382168ce700ed2efc60199279446466 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Wed, 14 Feb 2024 14:03:00 -0500 Subject: [PATCH] [external-assets] Make execution_type an ExternalAssetNode property (#19684) ## Summary & Motivation This adds `execution_type` as a property on `ExternalAssetsNode` and defines execution-type related helper properties on `AssetsDefinition`. Logic is in place on `ExternalAssetNode` to ensure that `execution_type` specified as a param agrees with metadata. ## How I Tested These Changes Existing test suite. --- .../dagster_graphql/implementation/loader.py | 2 + .../dagster/_core/definitions/asset_layer.py | 11 ++++ .../dagster/_core/definitions/asset_spec.py | 2 + .../dagster/_core/definitions/assets.py | 52 ++++++++++-------- .../dagster/_core/definitions/source_asset.py | 14 +++++ .../_core/execution/plan/execute_step.py | 2 +- .../host_representation/external_data.py | 54 ++++++++++++++----- .../test_external_data.py | 38 ++++++++++++- .../definitions_tests/test_external_assets.py | 7 +-- 9 files changed, 141 insertions(+), 41 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py b/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py index c239a6fa03546..c63b97fdba93d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/loader.py @@ -7,6 +7,7 @@ DagsterInstance, _check as check, ) +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import CachingStaleStatusResolver from dagster._core.definitions.events import AssetKey from dagster._core.events.log import EventLogEntry @@ -315,6 +316,7 @@ def _build_cross_repo_deps( ) ], depended_by=[], + execution_type=AssetExecutionType.UNEXECUTABLE, ) return sink_assets, external_asset_deps diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index f8ab025115d64..ea27c80eb7e92 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -20,6 +20,9 @@ import dagster._check as check from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.asset_spec import ( + AssetExecutionType, +) from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.metadata import ( ArbitraryMetadataMapping, @@ -706,6 +709,14 @@ def input_for_asset_key(self, node_handle: NodeHandle, key: AssetKey) -> Optiona def io_manager_key_for_asset(self, asset_key: AssetKey) -> str: return self.io_manager_keys_by_asset_key.get(asset_key, "io_manager") + def execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: + if asset_key in self.assets_defs_by_key: + return self.assets_defs_by_key[asset_key].execution_type + elif asset_key in self.source_assets_by_key: + return self.source_assets_by_key[asset_key].execution_type + else: + check.failed(f"Couldn't find key {asset_key}") + def is_observable_for_asset(self, asset_key: AssetKey) -> bool: return ( asset_key in self.source_assets_by_key diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 8aa967088d44c..fccd41cb78465 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -3,6 +3,7 @@ import dagster._check as check from dagster._annotations import PublicAttr, experimental_param +from dagster._serdes.serdes import whitelist_for_serdes from .auto_materialize_policy import AutoMaterializePolicy from .events import ( @@ -24,6 +25,7 @@ SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type" +@whitelist_for_serdes class AssetExecutionType(Enum): OBSERVATION = "OBSERVATION" UNEXECUTABLE = "UNEXECUTABLE" diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 2699e5b12b322..6ed49ea1a5e00 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -21,7 +21,10 @@ from dagster._annotations import experimental_param, public from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSpec from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset -from dagster._core.definitions.asset_spec import AssetExecutionType +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, +) from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType from dagster._core.definitions.freshness_policy import FreshnessPolicy @@ -953,33 +956,36 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]: """ return self._selected_asset_check_keys - def is_asset_executable(self, asset_key: AssetKey) -> bool: - """Returns True if the asset key is materializable by this AssetsDefinition. + @property + def execution_type(self) -> AssetExecutionType: + first_key = next(iter(self.keys), None) + # Currently all assets in an AssetsDefinition have the same execution type + if first_key: + return AssetExecutionType.str_to_enum( + self.metadata_by_key.get(first_key, {}).get( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE + ) + ) + else: + return AssetExecutionType.UNEXECUTABLE - Args: - asset_key (AssetKey): The asset key to check. + return self._execution_type - Returns: - bool: True if the asset key is materializable by this AssetsDefinition. - """ - from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, - ) + @property + def is_external(self) -> bool: + return self.execution_type != AssetExecutionType.MATERIALIZATION - return AssetExecutionType.is_executable( - self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) - ) + @property + def is_observable(self) -> bool: + return self.execution_type == AssetExecutionType.OBSERVATION - def asset_execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: - from dagster._core.definitions.asset_spec import ( - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, - AssetExecutionType, - ) + @property + def is_materializable(self) -> bool: + return self.execution_type == AssetExecutionType.MATERIALIZATION - return AssetExecutionType.str_to_enum( - self._metadata_by_key.get(asset_key, {}).get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) - ) + @property + def is_executable(self) -> bool: + return self.execution_type != AssetExecutionType.UNEXECUTABLE def get_partition_mapping_for_input(self, input_name: str) -> Optional[PartitionMapping]: return self._partition_mappings.get(self._keys_by_input_name[input_name]) diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 7073382cad2d7..e26db6fb3b856 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -15,6 +15,7 @@ import dagster._check as check from dagster._annotations import PublicAttr, experimental_param, public from dagster._core.decorator_utils import get_function_params +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.data_version import ( DATA_VERSION_TAG, DataVersion, @@ -269,6 +270,19 @@ def op(self) -> OpDefinition: ) return cast(OpDefinition, self.node_def) + @property + def execution_type(self) -> AssetExecutionType: + return ( + AssetExecutionType.OBSERVATION + if self.is_observable + else AssetExecutionType.UNEXECUTABLE + ) + + @property + def is_executable(self) -> bool: + """bool: Whether the asset is observable.""" + return self.is_observable + @public @property def is_observable(self) -> bool: diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 311b808db893b..deee4e9f1ed30 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -880,7 +880,7 @@ def _log_asset_materialization_events_for_asset( if asset_key: asset_layer = step_context.job_def.asset_layer execution_type = ( - asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key) + asset_layer.assets_def_for_asset(asset_key).execution_type if asset_layer.has_assets_def_for_asset(asset_key) else AssetExecutionType.MATERIALIZATION ) diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index f9f815186e9bb..db086e94a2954 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -103,7 +103,9 @@ from dagster._core.snap.mode import ResourceDefSnap, build_resource_def_snap from dagster._core.storage.io_manager import IOManagerDefinition from dagster._serdes import whitelist_for_serdes -from dagster._serdes.serdes import is_whitelisted_for_serdes_object +from dagster._serdes.serdes import ( + is_whitelisted_for_serdes_object, +) from dagster._utils.error import SerializableErrorInfo if TYPE_CHECKING: @@ -1175,6 +1177,7 @@ class ExternalAssetNode( ("asset_key", AssetKey), ("dependencies", Sequence[ExternalAssetDependency]), ("depended_by", Sequence[ExternalAssetDependedBy]), + ("execution_type", AssetExecutionType), ("compute_kind", Optional[str]), ("op_name", Optional[str]), ("op_names", Sequence[str]), @@ -1215,6 +1218,7 @@ def __new__( asset_key: AssetKey, dependencies: Sequence[ExternalAssetDependency], depended_by: Sequence[ExternalAssetDependedBy], + execution_type: Optional[AssetExecutionType] = None, compute_kind: Optional[str] = None, op_name: Optional[str] = None, op_names: Optional[Sequence[str]] = None, @@ -1238,6 +1242,32 @@ def __new__( auto_observe_interval_minutes: Optional[float] = None, owners: Optional[Sequence[str]] = None, ): + metadata = normalize_metadata(check.opt_mapping_param(metadata, "metadata", key_type=str)) + + # backcompat logic for execution type specified via metadata + if SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE in metadata: + val = metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] + if not isinstance(val, TextMetadataValue): + check.failed( + f"Expected metadata value for key {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE} to be a TextMetadataValue, got {val}" + ) + metadata_execution_type = AssetExecutionType.str_to_enum(val.value) + if execution_type is not None: + check.invariant( + execution_type == metadata_execution_type, + f"Execution type {execution_type} in metadata does not match type inferred from metadata {metadata_execution_type}", + ) + execution_type = metadata_execution_type + else: + execution_type = ( + check.opt_inst_param( + execution_type, + "execution_type", + AssetExecutionType, + ) + or AssetExecutionType.MATERIALIZATION + ) + # backcompat logic to handle ExternalAssetNodes serialized without op_names/graph_name if not op_names: op_names = list(filter(None, [op_name])) @@ -1272,9 +1302,7 @@ def __new__( ), output_name=check.opt_str_param(output_name, "output_name"), output_description=check.opt_str_param(output_description, "output_description"), - metadata=normalize_metadata( - check.opt_mapping_param(metadata, "metadata", key_type=str) - ), + metadata=metadata, group_name=check.opt_str_param(group_name, "group_name"), freshness_policy=check.opt_inst_param( freshness_policy, "freshness_policy", FreshnessPolicy @@ -1299,19 +1327,12 @@ def __new__( auto_observe_interval_minutes, "auto_observe_interval_minutes" ), owners=check.opt_sequence_param(owners, "owners", of_type=str), + execution_type=check.inst_param(execution_type, "execution_type", AssetExecutionType), ) @property def is_executable(self) -> bool: - metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) - if not metadata_value: - varietal_text = None - else: - check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error - assert isinstance(metadata_value, TextMetadataValue) # for type checker - varietal_text = metadata_value.value - - return AssetExecutionType.is_executable(varietal_text) + return self.execution_type != AssetExecutionType.UNEXECUTABLE ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]] @@ -1554,6 +1575,7 @@ def external_asset_nodes_from_defs( descriptions_by_asset_key: Dict[AssetKey, str] = {} atomic_execution_unit_ids_by_key: Dict[Union[AssetKey, AssetCheckKey], str] = {} owners_by_asset_key: Dict[AssetKey, Sequence[AssetOwner]] = {} + execution_types_by_asset_key: Dict[AssetKey, AssetExecutionType] = {} for job_def in job_defs: asset_layer = job_def.asset_layer @@ -1577,6 +1599,9 @@ def external_asset_nodes_from_defs( all_upstream_asset_keys.update(upstream_asset_keys) node_defs_by_asset_key[output_key].append((node_output_handle, job_def)) asset_info_by_asset_key[output_key] = asset_info + execution_types_by_asset_key[output_key] = asset_layer.execution_type_for_asset( + output_key + ) for upstream_key in upstream_asset_keys: partition_mapping = asset_layer.partition_mapping_for_node_input( @@ -1623,6 +1648,7 @@ def external_asset_nodes_from_defs( asset_key=asset_key, dependencies=list(deps[asset_key].values()), depended_by=list(dep_by[asset_key].values()), + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=group_name_by_asset_key.get(asset_key), code_version=code_version_by_asset_key.get(asset_key), @@ -1656,6 +1682,7 @@ def external_asset_nodes_from_defs( asset_key=source_asset.key, dependencies=list(deps[source_asset.key].values()), depended_by=list(dep_by[source_asset.key].values()), + execution_type=source_asset.execution_type, job_names=job_names, op_description=source_asset.description, metadata=source_asset.metadata, @@ -1712,6 +1739,7 @@ def external_asset_nodes_from_defs( asset_key=asset_key, dependencies=list(deps[asset_key].values()), depended_by=list(dep_by[asset_key].values()), + execution_type=execution_types_by_asset_key[asset_key], compute_kind=node_def.tags.get("kind"), # backcompat op_name=graph_name diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index a8a873f15a6a7..f742bde72a8a2 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -20,7 +20,7 @@ from dagster._check import ParameterCheckError from dagster._core.definitions import AssetIn, SourceAsset, asset, build_assets_job, multi_asset from dagster._core.definitions.asset_graph import AssetGraph -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.asset_spec import AssetExecutionType, AssetSpec from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.external_asset import external_assets_from_specs from dagster._core.definitions.metadata import MetadataValue, TextMetadataValue, normalize_metadata @@ -57,6 +57,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -82,6 +83,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -108,6 +110,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -152,6 +155,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", graph_name=None, op_names=["asset1"], @@ -229,6 +233,7 @@ def asset2(asset1): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -242,6 +247,7 @@ def asset2(asset1): asset_key=AssetKey("asset2"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2", node_definition_name="asset2", graph_name=None, @@ -269,6 +275,7 @@ def something(result): asset_key=AssetKey("not_result"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("something"))], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, ), @@ -276,6 +283,7 @@ def something(result): asset_key=AssetKey("something"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("not_result"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="something", node_definition_name="something", graph_name=None, @@ -320,6 +328,7 @@ def c2(c): asset_key=AssetKey("a"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("a2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="abc", node_definition_name="abc", graph_name=None, @@ -337,6 +346,7 @@ def c2(c): asset_key=AssetKey("c"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("c2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="abc", node_definition_name="abc", graph_name=None, @@ -373,6 +383,7 @@ def asset2_b(asset1): ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2_a")), ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2_b")), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -386,6 +397,7 @@ def asset2_b(asset1): asset_key=AssetKey("asset2_a"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2_a", node_definition_name="asset2_a", graph_name=None, @@ -399,6 +411,7 @@ def asset2_b(asset1): asset_key=AssetKey("asset2_b"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2_b", node_definition_name="asset2_b", graph_name=None, @@ -431,6 +444,7 @@ def asset2(asset1): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -444,6 +458,7 @@ def asset2(asset1): asset_key=AssetKey("asset2"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2", node_definition_name="asset2", graph_name=None, @@ -471,6 +486,7 @@ def asset1(): asset_key=AssetKey("asset1"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset1", node_definition_name="asset1", graph_name=None, @@ -505,6 +521,7 @@ def assets(): asset_key=AssetKey(f"asset{i}"), dependencies=[], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -573,6 +590,7 @@ def assets(in1, in2): ExternalAssetDependency(upstream_asset_key=AssetKey(["only_out"])), ], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="downstream", node_definition_name="downstream", graph_name=None, @@ -590,6 +608,7 @@ def assets(in1, in2): ExternalAssetDependedBy(downstream_asset_key=AssetKey(["mixed"])), ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_in"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="in1", node_definition_name="in1", graph_name=None, @@ -604,6 +623,7 @@ def assets(in1, in2): asset_key=AssetKey(["in2"]), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_in"]))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="in2", node_definition_name="in2", graph_name=None, @@ -624,6 +644,7 @@ def assets(in1, in2): ExternalAssetDependedBy(downstream_asset_key=AssetKey(["downstream"])), ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_out"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -644,6 +665,7 @@ def assets(in1, in2): ExternalAssetDependedBy(downstream_asset_key=AssetKey(["mixed"])), ExternalAssetDependedBy(downstream_asset_key=AssetKey(["only_out"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -663,6 +685,7 @@ def assets(in1, in2): depended_by=[ ExternalAssetDependedBy(downstream_asset_key=AssetKey(["downstream"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="assets", node_definition_name="assets", graph_name=None, @@ -688,6 +711,7 @@ def bar(foo): assert external_asset_nodes == [ ExternalAssetNode( asset_key=AssetKey("foo"), + execution_type=AssetExecutionType.UNEXECUTABLE, op_description=None, dependencies=[], depended_by=[ExternalAssetDependedBy(AssetKey("bar"))], @@ -696,6 +720,7 @@ def bar(foo): ), ExternalAssetNode( asset_key=AssetKey("bar"), + execution_type=AssetExecutionType.MATERIALIZATION, op_name="bar", node_definition_name="bar", graph_name=None, @@ -723,6 +748,7 @@ def test_unused_source_asset(): op_description="abc", dependencies=[], depended_by=[], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, is_source=True, @@ -732,6 +758,7 @@ def test_unused_source_asset(): op_description="def", dependencies=[], depended_by=[], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, is_source=True, @@ -757,6 +784,7 @@ def foo(bar): op_description="def", dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey(["foo"]))], + execution_type=AssetExecutionType.UNEXECUTABLE, job_names=[], group_name=DEFAULT_GROUP_NAME, is_source=True, @@ -770,6 +798,7 @@ def foo(bar): op_description=None, dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey(["bar"]))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, job_names=["job1"], output_name="result", group_name=DEFAULT_GROUP_NAME, @@ -832,6 +861,7 @@ def zero(): asset_key=AssetKey(["three"]), dependencies=[ExternalAssetDependency(AssetKey(["zero"]))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="three", node_definition_name="add_one", graph_name="three", @@ -846,6 +876,7 @@ def zero(): asset_key=AssetKey(["zero"]), dependencies=[], depended_by=[ExternalAssetDependedBy(AssetKey(["three"]))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="zero", node_definition_name="zero", graph_name=None, @@ -946,6 +977,7 @@ def create_twenty(thirteen, six): ExternalAssetDependency(AssetKey(["zero"])), ], depended_by=[ExternalAssetDependedBy(AssetKey(["twenty"]))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="create_thirteen_and_six", node_definition_name="add_one", graph_name="create_thirteen_and_six", @@ -969,6 +1001,7 @@ def create_twenty(thirteen, six): ExternalAssetDependency(AssetKey(["thirteen"])), ], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="create_twenty", node_definition_name="add_one", graph_name="create_twenty", @@ -988,6 +1021,7 @@ def create_twenty(thirteen, six): ExternalAssetDependedBy(AssetKey(["six"])), ExternalAssetDependedBy(AssetKey(["thirteen"])), ], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="zero", node_definition_name="zero", graph_name=None, @@ -1018,6 +1052,7 @@ def asset2(asset1): asset_key=AssetKey(["abc", "asset1"]), dependencies=[], depended_by=[ExternalAssetDependedBy(downstream_asset_key=AssetKey("asset2"))], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="abc__asset1", node_definition_name="abc__asset1", graph_name=None, @@ -1031,6 +1066,7 @@ def asset2(asset1): asset_key=AssetKey("asset2"), dependencies=[ExternalAssetDependency(upstream_asset_key=AssetKey(["abc", "asset1"]))], depended_by=[], + execution_type=AssetExecutionType.MATERIALIZATION, op_name="asset2", node_definition_name="asset2", graph_name=None, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 720bc97f36e2f..d164d7e349af6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -51,7 +51,7 @@ def test_external_asset_basic_creation() -> None: assert assets_def.metadata_by_key[expected_key]["user_metadata"] == "value" assert assets_def.group_names_by_key[expected_key] == "a_group" assert assets_def.descriptions_by_key[expected_key] == "desc" - assert assets_def.is_asset_executable(expected_key) is False + assert not assets_def.is_executable def test_multi_external_asset_basic_creation() -> None: @@ -92,7 +92,7 @@ def test_normal_asset_materializeable() -> None: def an_asset() -> None: ... - assert an_asset.is_asset_executable(AssetKey(["an_asset"])) is True + assert an_asset.is_executable def test_external_asset_creation_with_deps() -> None: @@ -234,7 +234,8 @@ def an_observable_source_asset() -> DataVersion: return DataVersion("foo") assets_def = create_external_asset_from_source_asset(an_observable_source_asset) - assert assets_def.is_asset_executable(an_observable_source_asset.key) + assert assets_def.is_executable + assert assets_def.is_observable defs = Definitions(assets=[assets_def]) instance = DagsterInstance.ephemeral()