diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index 295f3faea6adf..4e28af03e2142 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -22,6 +22,7 @@ import toposort import dagster._check as check +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.asset_subset import ValidAssetSubset from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.errors import DagsterInvalidInvocationError @@ -141,6 +142,10 @@ def root_materializable_or_observable_asset_keys(self) -> AbstractSet[AssetKey]: def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolicy]]: return self._freshness_policies_by_key + @property + def observable_keys(self) -> AbstractSet[AssetKey]: + return {key for key, is_observable in self._is_observable_by_key.items() if is_observable} + @property def auto_materialize_policies_by_key( self, @@ -159,6 +164,10 @@ def from_assets( all_assets: Iterable[Union[AssetsDefinition, SourceAsset]], asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, ) -> "InternalAssetGraph": + from dagster._core.definitions.external_asset import ( + SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES, + ) + assets_defs: List[AssetsDefinition] = [] source_assets: List[SourceAsset] = [] partitions_defs_by_key: Dict[AssetKey, Optional[PartitionsDefinition]] = {} @@ -197,6 +206,25 @@ def from_assets( backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) code_versions_by_key.update(asset.code_versions_by_key) + is_observable = asset.execution_type == AssetExecutionType.OBSERVATION + is_observable_by_key.update({key: is_observable for key in asset.keys}) + + # Set auto_observe_interval_minutes for external observable assets + # This can be removed when/if we have a a solution for mapping + # `auto_observe_interval_minutes` to an AutoMaterialzePolicy + first_key = next(iter(asset.keys), None) + if ( + first_key + and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES + in asset.metadata_by_key[first_key] + ): + interval = asset.metadata_by_key[first_key][ + SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES + ] + auto_observe_interval_minutes_by_key.update( + {key: interval for key in asset.keys} + ) + if not asset.can_subset: all_required_keys = {*asset.check_keys, *asset.keys} if len(all_required_keys) > 1: diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index e6496d915faa5..bf438ac8e6952 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -30,6 +30,7 @@ from dagster._core.definitions.op_selection import get_graph_subset from dagster._core.definitions.partition_mapping import MultiPartitionMapping from dagster._core.definitions.resource_requirement import ( + ExternalAssetIOManagerRequirement, RequiresResources, ResourceAddable, ResourceRequirement, @@ -1334,7 +1335,14 @@ def get_io_manager_key_for_asset_key(self, key: AssetKey) -> str: )[0].io_manager_key def get_resource_requirements(self) -> Iterator[ResourceRequirement]: - yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined] + if self.is_executable: + yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined] + else: + for key in self.keys: + yield ExternalAssetIOManagerRequirement( + key=self.get_io_manager_key_for_asset_key(key), + asset_key=key.to_string(), + ) for source_key, resource_def in self.resource_defs.items(): yield from resource_def.get_resource_requirements(outer_context=source_key) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index c292104a1c743..648d817b40387 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -71,7 +71,7 @@ def asset( metadata: Optional[Mapping[str, Any]] = ..., description: Optional[str] = ..., config_schema: Optional[UserConfigSchema] = None, - required_resource_keys: Optional[Set[str]] = ..., + required_resource_keys: Optional[AbstractSet[str]] = ..., resource_defs: Optional[Mapping[str, object]] = ..., io_manager_def: Optional[object] = ..., io_manager_key: Optional[str] = ..., @@ -111,7 +111,7 @@ def asset( metadata: Optional[ArbitraryMetadataMapping] = None, description: Optional[str] = None, config_schema: Optional[UserConfigSchema] = None, - required_resource_keys: Optional[Set[str]] = None, + required_resource_keys: Optional[AbstractSet[str]] = None, resource_defs: Optional[Mapping[str, object]] = None, io_manager_def: Optional[object] = None, io_manager_key: Optional[str] = None, diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 57142ef838bc2..60f199239c869 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -15,6 +15,7 @@ ) from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.context.compute import AssetExecutionContext +from dagster._utils.warnings import disable_dagster_warnings def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition: @@ -125,44 +126,60 @@ def _external_assets_def(context: AssetExecutionContext) -> None: return assets_defs +# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of +# external assets resulting from a source asset conversion. It contains the +# `auto_observe_interval_minutes` value from the source asset and is consulted +# in the auto-materialize daemon. It should eventually be eliminated in favor +# of an implementation of `auto_observe_interval_minutes` in terms of +# `AutoMaterializeRule`. +SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes" + + def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: - check.invariant( - source_asset.auto_observe_interval_minutes is None, - "Automatically observed external assets not supported yet: auto_observe_interval_minutes" - " should be None", - ) - - kwargs = { - "key": source_asset.key, - "metadata": source_asset.metadata, - "group_name": source_asset.group_name, - "description": source_asset.description, - "partitions_def": source_asset.partitions_def, - "_execution_type": ( - AssetExecutionType.UNEXECUTABLE - if source_asset.observe_fn is None - else AssetExecutionType.OBSERVATION + observe_interval = source_asset.auto_observe_interval_minutes + metadata = { + **source_asset.raw_metadata, + **( + {SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES: observe_interval} + if observe_interval + else {} ), } - if source_asset.io_manager_def: - kwargs["io_manager_def"] = source_asset.io_manager_def - elif source_asset.io_manager_key: - kwargs["io_manager_key"] = source_asset.io_manager_key - - @asset(**kwargs) - def _shim_assets_def(context: AssetExecutionContext): - if not source_asset.observe_fn: - raise NotImplementedError(f"Asset {source_asset.key} is not executable") - - op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) - return_value = op_function.decorated_fn(context) - check.invariant( - isinstance(return_value, Output) - and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata, - "The wrapped decorated_fn should return an Output with a special metadata key.", + with disable_dagster_warnings(): + + @asset( + key=source_asset.key, + metadata=metadata, + group_name=source_asset.group_name, + description=source_asset.description, + partitions_def=source_asset.partitions_def, + _execution_type=( + AssetExecutionType.UNEXECUTABLE + if source_asset.observe_fn is None + else AssetExecutionType.OBSERVATION + ), + io_manager_key=source_asset.io_manager_key, + # We don't pass the `io_manager_def` because it will already be present in + # `resource_defs` (it is added during `SourceAsset` initialization). + resource_defs=source_asset.resource_defs, + # We need to access the raw attribute because the property will return a computed value that + # includes requirements for the io manager. Those requirements will be inferred again when + # we create an AssetsDefinition. + required_resource_keys=source_asset._required_resource_keys, # noqa: SLF001 ) - return return_value + def _shim_assets_def(context: AssetExecutionContext): + if not source_asset.observe_fn: + raise NotImplementedError(f"Asset {source_asset.key} is not executable") + + op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) + return_value = op_function.decorated_fn(context) + check.invariant( + isinstance(return_value, Output) + and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata, + "The wrapped decorated_fn should return an Output with a special metadata key.", + ) + return return_value check.invariant(isinstance(_shim_assets_def, AssetsDefinition)) assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index ff09eef6b3f2f..76e0a341888c6 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -13,6 +13,7 @@ ) import dagster._check as check +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.host_representation.external import ExternalRepository @@ -153,18 +154,17 @@ def from_repository_handles_and_external_asset_nodes( node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source } - is_observable_by_key = {key: False for key in all_non_source_keys} + is_observable_by_key = {} auto_observe_interval_minutes_by_key = {} for repo_handle, node in repo_handle_external_asset_nodes: + is_observable_by_key[node.asset_key] = ( + node.execution_type == AssetExecutionType.OBSERVATION + ) + auto_observe_interval_minutes_by_key[ + node.asset_key + ] = node.auto_observe_interval_minutes if node.is_source: - # We need to set this even if the node is a regular asset in another code location. - # `is_observable` will only ever be consulted in the source asset context. - is_observable_by_key[node.asset_key] = node.is_observable - auto_observe_interval_minutes_by_key[ - node.asset_key - ] = node.auto_observe_interval_minutes - if node.asset_key in all_non_source_keys: # one location's source is another location's non-source continue diff --git a/python_modules/dagster/dagster/_core/definitions/resource_requirement.py b/python_modules/dagster/dagster/_core/definitions/resource_requirement.py index 7ddb26aa60ee4..bb8610ab06633 100644 --- a/python_modules/dagster/dagster/_core/definitions/resource_requirement.py +++ b/python_modules/dagster/dagster/_core/definitions/resource_requirement.py @@ -104,6 +104,29 @@ def describe_requirement(self) -> str: ) +class ExternalAssetIOManagerRequirement( + NamedTuple( + "_ExternalAssetIOManagerRequirement", + [ + ("key", str), + ("asset_key", Optional[str]), + ], + ), + ResourceRequirement, +): + @property + def expected_type(self) -> Type: + from ..storage.io_manager import IOManagerDefinition + + return IOManagerDefinition + + def describe_requirement(self) -> str: + external_asset_descriptor = ( + f"external asset with key {self.asset_key}" if self.asset_key else "external asset" + ) + return f"io manager with key '{self.key}' required by {external_asset_descriptor}" + + class SourceAssetIOManagerRequirement( NamedTuple( "_InputManagerRequirement", diff --git a/python_modules/dagster/dagster/_core/host_representation/external.py b/python_modules/dagster/dagster/_core/host_representation/external.py index 772edff995d8a..eb5cedfbf18a7 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external.py +++ b/python_modules/dagster/dagster/_core/host_representation/external.py @@ -216,7 +216,7 @@ def _external_sensors(self) -> Dict[str, "ExternalSensor"]: if asset_key not in covered_asset_keys: default_sensor_asset_keys.add(asset_key) - for asset_key in asset_graph.source_asset_keys: + for asset_key in asset_graph.observable_keys: if asset_graph.get_auto_observe_interval_minutes(asset_key) is None: continue diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 34c1644c14f1c..cbf384cc5a376 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1636,7 +1636,7 @@ def external_asset_nodes_from_defs( asset_keys_without_definitions = all_upstream_asset_keys.difference( node_defs_by_asset_key.keys() - ).difference(source_assets_by_key.keys()) + ).difference({*source_assets_by_key.keys()}) asset_nodes = [ ExternalAssetNode( @@ -1729,6 +1729,14 @@ def external_asset_nodes_from_defs( node_handle = node_handle.parent graph_name = node_handle.name + if asset_key in source_assets_by_key: + source_asset = source_assets_by_key[asset_key] + is_observable = source_asset.is_observable + auto_observe_interval_minutes = source_asset.auto_observe_interval_minutes + else: + is_observable = False + auto_observe_interval_minutes = None + asset_nodes.append( ExternalAssetNode( asset_key=asset_key, @@ -1749,6 +1757,8 @@ def external_asset_nodes_from_defs( partitions_def_data=partitions_def_data, output_name=output_def.name, metadata=asset_metadata, + is_observable=is_observable, + auto_observe_interval_minutes=auto_observe_interval_minutes, # assets defined by Out(asset_key="k") do not have any group # name specified we default to DEFAULT_GROUP_NAME here to ensure # such assets are part of the default group diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py index 5fcb4088a7b19..6500202aef0ed 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py @@ -7,6 +7,7 @@ ) from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor from dagster._core.definitions.asset_graph import AssetGraph +from dagster._core.definitions.external_asset import create_external_asset_from_source_asset from pytest import fixture @@ -44,25 +45,26 @@ def asset1(): ) -@fixture -def single_auto_observe_source_asset_graph(): +@fixture(params=[True, False], ids=["use_external_asset", "use_source_asset"]) +def single_auto_observe_asset_graph(request): @observable_source_asset(auto_observe_interval_minutes=30) def asset1(): ... - asset_graph = AssetGraph.from_assets([asset1]) + observable = create_external_asset_from_source_asset(asset1) if request.param else asset1 + asset_graph = AssetGraph.from_assets([observable]) return asset_graph def test_single_observable_source_asset_no_prior_observe_requests( - single_auto_observe_source_asset_graph, + single_auto_observe_asset_graph, ): run_requests = get_auto_observe_run_requests( - asset_graph=single_auto_observe_source_asset_graph, + asset_graph=single_auto_observe_asset_graph, current_timestamp=1000, last_observe_request_timestamp_by_asset_key={}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, ) assert len(run_requests) == 1 run_request = run_requests[0] @@ -70,16 +72,16 @@ def test_single_observable_source_asset_no_prior_observe_requests( def test_single_observable_source_asset_prior_observe_requests( - single_auto_observe_source_asset_graph, + single_auto_observe_asset_graph, ): last_timestamp = 1000 run_requests = get_auto_observe_run_requests( - asset_graph=single_auto_observe_source_asset_graph, + asset_graph=single_auto_observe_asset_graph, current_timestamp=last_timestamp + 30 * 60 + 5, last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, ) assert len(run_requests) == 1 run_request = run_requests[0] @@ -87,16 +89,16 @@ def test_single_observable_source_asset_prior_observe_requests( def test_single_observable_source_asset_prior_recent_observe_requests( - single_auto_observe_source_asset_graph, + single_auto_observe_asset_graph, ): last_timestamp = 1000 run_requests = get_auto_observe_run_requests( - asset_graph=single_auto_observe_source_asset_graph, + asset_graph=single_auto_observe_asset_graph, current_timestamp=last_timestamp + 30 * 60 - 5, last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, ) assert len(run_requests) == 0