diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 295f3faea6adf..4f12c5ca8d003 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -22,6 +22,7 @@ import toposort import dagster._check as check +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.asset_subset import ValidAssetSubset from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.errors import DagsterInvalidInvocationError @@ -141,6 +142,10 @@ def root_materializable_or_observable_asset_keys(self) -> AbstractSet[AssetKey]: def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolicy]]: return self._freshness_policies_by_key + @property + def observable_keys(self) -> AbstractSet[AssetKey]: + return {key for key, is_observable in self._is_observable_by_key.items() if is_observable} + @property def auto_materialize_policies_by_key( self, @@ -197,6 +202,16 @@ def from_assets( backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) code_versions_by_key.update(asset.code_versions_by_key) + is_observable = asset.execution_type == AssetExecutionType.OBSERVATION + is_observable_by_key.update({key: is_observable for key in asset.keys}) + + # Set auto_observe_interval_minutes for external observable assets + # This can be removed when/if we have a a solution for mapping + # `auto_observe_interval_minutes` to an AutoMaterialzePolicy + auto_observe_interval_minutes_by_key.update( + {key: asset.auto_observe_interval_minutes for key in asset.keys} + ) + if not asset.can_subset: all_required_keys = {*asset.check_keys, *asset.keys} if len(all_required_keys) > 1: diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index fccd41cb78465..26eaa24a8dbb2 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -24,6 +24,14 @@ # for externally materialized assets. SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type" +# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of +# external assets resulting from a source asset conversion. It contains the +# `auto_observe_interval_minutes` value from the source asset and is consulted +# in the auto-materialize daemon. It should eventually be eliminated in favor +# of an implementation of `auto_observe_interval_minutes` in terms of +# `AutoMaterializeRule`. +SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes" + @whitelist_for_serdes class AssetExecutionType(Enum): diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 6ed49ea1a5e00..d3bdbee22af22 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -23,6 +23,7 @@ from dagster._core.definitions.asset_layer import get_dep_node_handles_of_graph_backed_asset from dagster._core.definitions.asset_spec import ( SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES, AssetExecutionType, ) from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy @@ -34,6 +35,7 @@ from dagster._core.definitions.op_selection import get_graph_subset from dagster._core.definitions.partition_mapping import MultiPartitionMapping from dagster._core.definitions.resource_requirement import ( + ExternalAssetIOManagerRequirement, RequiresResources, ResourceAddable, ResourceRequirement, @@ -902,6 +904,18 @@ def freshness_policies_by_key(self) -> Mapping[AssetKey, FreshnessPolicy]: def auto_materialize_policies_by_key(self) -> Mapping[AssetKey, AutoMaterializePolicy]: return self._auto_materialize_policies_by_key + # Applies only to external observable assets. Can be removed when we fold + # `auto_observe_interval_minutes` into auto-materialize policies. + @property + def auto_observe_interval_minutes(self) -> Optional[float]: + first_key = next(iter(self.keys), None) + if first_key: + return self.metadata_by_key.get(first_key, {}).get( + SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES + ) + else: + return None + @property def backfill_policy(self) -> Optional[BackfillPolicy]: return self._backfill_policy @@ -1379,7 +1393,16 @@ def get_io_manager_key_for_asset_key(self, key: AssetKey) -> str: )[0].io_manager_key def get_resource_requirements(self) -> Iterator[ResourceRequirement]: - yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined] + if self.is_executable: + yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined] + else: + for key in self.keys: + # This matches how SourceAsset emit requirements except we emit + # ExternalAssetIOManagerRequirement instead of SourceAssetIOManagerRequirement + yield ExternalAssetIOManagerRequirement( + key=self.get_io_manager_key_for_asset_key(key), + asset_key=key.to_string(), + ) for source_key, resource_def in self.resource_defs.items(): yield from resource_def.get_resource_requirements(outer_context=source_key) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 211ad882cd129..14ecd2728674c 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -71,7 +71,7 @@ def asset( metadata: Optional[Mapping[str, Any]] = ..., description: Optional[str] = ..., config_schema: Optional[UserConfigSchema] = None, - required_resource_keys: Optional[Set[str]] = ..., + required_resource_keys: Optional[AbstractSet[str]] = ..., resource_defs: Optional[Mapping[str, object]] = ..., io_manager_def: Optional[object] = ..., io_manager_key: Optional[str] = ..., @@ -112,7 +112,7 @@ def asset( metadata: Optional[ArbitraryMetadataMapping] = None, description: Optional[str] = None, config_schema: Optional[UserConfigSchema] = None, - required_resource_keys: Optional[Set[str]] = None, + required_resource_keys: Optional[AbstractSet[str]] = None, resource_defs: Optional[Mapping[str, object]] = None, io_manager_def: Optional[object] = None, io_manager_key: Optional[str] = None, diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index dea5cf03085bc..0034549f19ec8 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -3,6 +3,7 @@ from dagster import _check as check from dagster._core.definitions.asset_spec import ( SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES, AssetExecutionType, AssetSpec, ) @@ -16,6 +17,7 @@ ) from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.context.compute import AssetExecutionContext +from dagster._utils.warnings import disable_dagster_warnings def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition: @@ -133,49 +135,50 @@ def _external_assets_def(context: AssetExecutionContext) -> None: def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: - check.invariant( - source_asset.auto_observe_interval_minutes is None, - "Automatically observed external assets not supported yet: auto_observe_interval_minutes" - " should be None", - ) - - injected_metadata = ( - {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} + observe_interval = source_asset.auto_observe_interval_minutes + execution_type = ( + AssetExecutionType.UNEXECUTABLE.value if source_asset.observe_fn is None - else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value} + else AssetExecutionType.OBSERVATION.value ) - - kwargs = { - "key": source_asset.key, - "metadata": { - **source_asset.metadata, - **injected_metadata, - }, - "group_name": source_asset.group_name, - "description": source_asset.description, - "partitions_def": source_asset.partitions_def, + metadata = { + **source_asset.raw_metadata, + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: execution_type, + **( + {SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES: observe_interval} + if observe_interval + else {} + ), } - if source_asset.io_manager_def: - kwargs["io_manager_def"] = source_asset.io_manager_def - elif source_asset.io_manager_key: - kwargs["io_manager_key"] = source_asset.io_manager_key - - @asset(**kwargs) - def _shim_assets_def(context: AssetExecutionContext): - if not source_asset.observe_fn: - raise NotImplementedError(f"Asset {source_asset.key} is not executable") - - op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) - return_value = op_function.decorated_fn(context) - check.invariant( - isinstance(return_value, Output) - and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata, - "The wrapped decorated_fn should return an Output with a special metadata key.", + with disable_dagster_warnings(): + + @asset( + key=source_asset.key, + metadata=metadata, + group_name=source_asset.group_name, + description=source_asset.description, + partitions_def=source_asset.partitions_def, + io_manager_key=source_asset.io_manager_key, + # We don't pass the `io_manager_def` because it will already be present in + # `resource_defs` (it is added during `SourceAsset` initialization). + resource_defs=source_asset.resource_defs, + # We need to access the raw attribute because the property will return a computed value that + # includes requirements for the io manager. Those requirements will be inferred again when + # we create an AssetsDefinition. + required_resource_keys=source_asset._required_resource_keys, # noqa: SLF001 ) - return return_value - - check.invariant(isinstance(_shim_assets_def, AssetsDefinition)) - assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright + def _shim_assets_def(context: AssetExecutionContext): + if not source_asset.observe_fn: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) + return_value = op_function.decorated_fn(context) + check.invariant( + isinstance(return_value, Output) + and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata, + "The wrapped decorated_fn should return an Output with a special metadata key.", + ) + return return_value return _shim_assets_def diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index ff09eef6b3f2f..76e0a341888c6 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -13,6 +13,7 @@ ) import dagster._check as check +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.host_representation.external import ExternalRepository @@ -153,18 +154,17 @@ def from_repository_handles_and_external_asset_nodes( node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source } - is_observable_by_key = {key: False for key in all_non_source_keys} + is_observable_by_key = {} auto_observe_interval_minutes_by_key = {} for repo_handle, node in repo_handle_external_asset_nodes: + is_observable_by_key[node.asset_key] = ( + node.execution_type == AssetExecutionType.OBSERVATION + ) + auto_observe_interval_minutes_by_key[ + node.asset_key + ] = node.auto_observe_interval_minutes if node.is_source: - # We need to set this even if the node is a regular asset in another code location. - # `is_observable` will only ever be consulted in the source asset context. - is_observable_by_key[node.asset_key] = node.is_observable - auto_observe_interval_minutes_by_key[ - node.asset_key - ] = node.auto_observe_interval_minutes - if node.asset_key in all_non_source_keys: # one location's source is another location's non-source continue diff --git a/python_modules/dagster/dagster/_core/definitions/resource_requirement.py b/python_modules/dagster/dagster/_core/definitions/resource_requirement.py index 7ddb26aa60ee4..c99ab7f323c14 100644 --- a/python_modules/dagster/dagster/_core/definitions/resource_requirement.py +++ b/python_modules/dagster/dagster/_core/definitions/resource_requirement.py @@ -104,6 +104,31 @@ def describe_requirement(self) -> str: ) +# The ResourceRequirement for unexecutable external assets. Is an analogue to +# `SourceAssetIOManagerRequirement`. +class ExternalAssetIOManagerRequirement( + NamedTuple( + "_ExternalAssetIOManagerRequirement", + [ + ("key", str), + ("asset_key", Optional[str]), + ], + ), + ResourceRequirement, +): + @property + def expected_type(self) -> Type: + from ..storage.io_manager import IOManagerDefinition + + return IOManagerDefinition + + def describe_requirement(self) -> str: + external_asset_descriptor = ( + f"external asset with key {self.asset_key}" if self.asset_key else "external asset" + ) + return f"io manager with key '{self.key}' required by {external_asset_descriptor}" + + class SourceAssetIOManagerRequirement( NamedTuple( "_InputManagerRequirement", diff --git a/python_modules/dagster/dagster/_core/host_representation/external.py b/python_modules/dagster/dagster/_core/host_representation/external.py index e1af8b63420f6..ba322c45f8cc0 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external.py +++ b/python_modules/dagster/dagster/_core/host_representation/external.py @@ -216,7 +216,7 @@ def _external_sensors(self) -> Dict[str, "ExternalSensor"]: if asset_key not in covered_asset_keys: default_sensor_asset_keys.add(asset_key) - for asset_key in asset_graph.source_asset_keys: + for asset_key in asset_graph.observable_keys: if asset_graph.get_auto_observe_interval_minutes(asset_key) is None: continue diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index db086e94a2954..60a6871c41d80 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1641,7 +1641,7 @@ def external_asset_nodes_from_defs( asset_keys_without_definitions = all_upstream_asset_keys.difference( node_defs_by_asset_key.keys() - ).difference(source_assets_by_key.keys()) + ).difference({*source_assets_by_key.keys()}) asset_nodes = [ ExternalAssetNode( @@ -1734,6 +1734,14 @@ def external_asset_nodes_from_defs( node_handle = node_handle.parent graph_name = node_handle.name + if asset_key in source_assets_by_key: + source_asset = source_assets_by_key[asset_key] + is_observable = source_asset.is_observable + auto_observe_interval_minutes = source_asset.auto_observe_interval_minutes + else: + is_observable = False + auto_observe_interval_minutes = None + asset_nodes.append( ExternalAssetNode( asset_key=asset_key, @@ -1754,6 +1762,8 @@ def external_asset_nodes_from_defs( partitions_def_data=partitions_def_data, output_name=output_def.name, metadata=asset_metadata, + is_observable=is_observable, + auto_observe_interval_minutes=auto_observe_interval_minutes, # assets defined by Out(asset_key="k") do not have any group # name specified we default to DEFAULT_GROUP_NAME here to ensure # such assets are part of the default group diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py index 5fcb4088a7b19..6500202aef0ed 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py @@ -7,6 +7,7 @@ ) from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.external_asset import create_external_asset_from_source_asset from pytest import fixture @@ -44,25 +45,26 @@ def asset1(): ) -@fixture -def single_auto_observe_source_asset_graph(): +@fixture(params=[True, False], ids=["use_external_asset", "use_source_asset"]) +def single_auto_observe_asset_graph(request): @observable_source_asset(auto_observe_interval_minutes=30) def asset1(): ... - asset_graph = AssetGraph.from_assets([asset1]) + observable = create_external_asset_from_source_asset(asset1) if request.param else asset1 + asset_graph = AssetGraph.from_assets([observable]) return asset_graph def test_single_observable_source_asset_no_prior_observe_requests( - single_auto_observe_source_asset_graph, + single_auto_observe_asset_graph, ): run_requests = get_auto_observe_run_requests( - asset_graph=single_auto_observe_source_asset_graph, + asset_graph=single_auto_observe_asset_graph, current_timestamp=1000, last_observe_request_timestamp_by_asset_key={}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, ) assert len(run_requests) == 1 run_request = run_requests[0] @@ -70,16 +72,16 @@ def test_single_observable_source_asset_no_prior_observe_requests( def test_single_observable_source_asset_prior_observe_requests( - single_auto_observe_source_asset_graph, + single_auto_observe_asset_graph, ): last_timestamp = 1000 run_requests = get_auto_observe_run_requests( - asset_graph=single_auto_observe_source_asset_graph, + asset_graph=single_auto_observe_asset_graph, current_timestamp=last_timestamp + 30 * 60 + 5, last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, ) assert len(run_requests) == 1 run_request = run_requests[0] @@ -87,16 +89,16 @@ def test_single_observable_source_asset_prior_observe_requests( def test_single_observable_source_asset_prior_recent_observe_requests( - single_auto_observe_source_asset_graph, + single_auto_observe_asset_graph, ): last_timestamp = 1000 run_requests = get_auto_observe_run_requests( - asset_graph=single_auto_observe_source_asset_graph, + asset_graph=single_auto_observe_asset_graph, current_timestamp=last_timestamp + 30 * 60 - 5, last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, ) assert len(run_requests) == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_definitions_class.py b/python_modules/dagster/dagster_tests/definitions_tests/test_definitions_class.py index c5230af770fa4..66d020b84ac44 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_definitions_class.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_definitions_class.py @@ -31,6 +31,7 @@ ) from dagster._core.definitions.decorators.job_decorator import job from dagster._core.definitions.executor_definition import executor +from dagster._core.definitions.external_asset import create_external_asset_from_source_asset from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.logger_definition import logger from dagster._core.definitions.repository_definition import ( @@ -624,6 +625,15 @@ def asset_foo(context): ): Definitions(assets=[source_asset_io_req]) + external_asset_io_req = create_external_asset_from_source_asset(source_asset_io_req) + with pytest.raises( + DagsterInvalidDefinitionError, + match=re.escape( + "io manager with key 'foo' required by external asset with key [\"foo\"] was not provided" + ), + ): + Definitions(assets=[external_asset_io_req]) + def test_assets_with_executor(): @asset