From 5d5ce3738b42c4ce98cc8e0513c7a8fe61e8b625 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Tue, 19 Sep 2023 10:42:39 -0400 Subject: [PATCH] observable assets in a user space sensor feedabck Extract out observe_fn_to_op_compute_fn to it can be reused in other contexts cp feedback experimental cp cp cp --- .../dagster/_core/definitions/source_asset.py | 33 ++- .../definitions_tests/test_external_assets.py | 218 +++++++++++++++++- 2 files changed, 237 insertions(+), 14 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 79f6ca213d571..51f0d9e7bd27f 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -62,6 +62,20 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( source_asset: "SourceAsset", +) -> "DecoratedOpFunction": + check.not_none(source_asset.observe_fn) # for runtime check + assert source_asset.observe_fn is not None # for type checker + return observe_fn_to_op_compute_fn( + observe_fn=source_asset.observe_fn, + partitions_def=source_asset.partitions_def, + asset_key=source_asset.key, + ) + + +def observe_fn_to_op_compute_fn( + observe_fn: SourceAssetObserveFunction, + partitions_def: Optional[PartitionsDefinition], + asset_key: AssetKey, ) -> "DecoratedOpFunction": from dagster._core.definitions.decorators.op_decorator import ( DecoratedOpFunction, @@ -71,11 +85,6 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( OpExecutionContext, ) - check.not_none(source_asset.observe_fn, "Must be an observable source asset") - assert source_asset.observe_fn # for type checker - - observe_fn = source_asset.observe_fn - observe_fn_has_context = is_context_provided(get_function_params(observe_fn)) def fn(context: OpExecutionContext) -> None: @@ -88,22 +97,22 @@ def fn(context: OpExecutionContext) -> None: ) if isinstance(observe_fn_return_value, DataVersion): - if source_asset.partitions_def is not None: + if partitions_def is not None: raise DagsterInvalidObservationError( - f"{source_asset.key} is partitioned, so its observe function should return a" + f"{asset_key} is partitioned, so its observe function should return a" " DataVersionsByPartition, not a DataVersion" ) context.log_event( AssetObservation( - asset_key=source_asset.key, + asset_key=asset_key, tags={DATA_VERSION_TAG: observe_fn_return_value.value}, ) ) elif isinstance(observe_fn_return_value, DataVersionsByPartition): - if source_asset.partitions_def is None: + if partitions_def is None: raise DagsterInvalidObservationError( - f"{source_asset.key} is not partitioned, so its observe function should return" + f"{asset_key} is not partitioned, so its observe function should return" " a DataVersion, not a DataVersionsByPartition" ) @@ -113,14 +122,14 @@ def fn(context: OpExecutionContext) -> None: ) in observe_fn_return_value.data_versions_by_partition.items(): context.log_event( AssetObservation( - asset_key=source_asset.key, + asset_key=asset_key, tags={DATA_VERSION_TAG: data_version.value}, partition=partition_key, ) ) else: raise DagsterInvalidObservationError( - f"Observe function for {source_asset.key} must return a DataVersion or" + f"Observe function for {asset_key} must return a DataVersion or" " DataVersionsByPartition, but returned a value of type" f" {type(observe_fn_return_value)}" ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 9bcb95e5f381c..6dcef1ce7866a 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -1,4 +1,4 @@ -from typing import AbstractSet, Iterable +from typing import AbstractSet, Any, Callable, Iterable, Optional import pytest from dagster import ( @@ -11,18 +11,36 @@ Definitions, IOManager, JobDefinition, + SensorResult, SourceAsset, _check as check, asset, observable_source_asset, ) -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions import materialize +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + AssetSpec, +) +from dagster._core.definitions.data_version import DATA_VERSION_TAG +from dagster._core.definitions.decorators.sensor_decorator import sensor +from dagster._core.definitions.events import AssetObservation from dagster._core.definitions.external_asset import ( create_external_asset_from_source_asset, external_assets_from_specs, ) from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.sensor_definition import ( + SensorDefinition, + build_sensor_context, +) +from dagster._core.definitions.source_asset import ( + observe_fn_to_op_compute_fn, +) from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition +from dagster._core.event_api import EventRecordsFilter +from dagster._core.events import DagsterEventType def test_external_asset_basic_creation() -> None: @@ -228,3 +246,199 @@ def an_observable_source_asset() -> DataVersion: all_materializations = result.get_asset_materialization_events() assert len(all_materializations) == 0 + + +def get_latest_asset_observation( + instance: DagsterInstance, asset_key: AssetKey +) -> AssetObservation: + event_records = instance.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_OBSERVATION, + asset_key=asset_key, + ), + limit=1, + ) + + assert len(event_records) == 1 + + event_record = event_records[0] + + return check.not_none(event_record.asset_observation) + + +def test_demonstrate_explicit_sensor_in_user_space() -> None: + def compute_data_version() -> str: + return "data_version" + + observing_only_asset_key = AssetKey("observing_only_asset") + + @asset( + key=observing_only_asset_key, + metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}, + ) + def observing_only_asset(context: AssetExecutionContext) -> None: + context.log_event( + AssetObservation( + asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()} + ) + ) + + asset_execution_instance = DagsterInstance.ephemeral() + + assert materialize(assets=[observing_only_asset], instance=asset_execution_instance).success + + assert ( + get_latest_asset_observation( + asset_execution_instance, observing_only_asset_key + ).data_version + == "data_version" + ) + + @sensor(job_name="observing_only_sensor") + def observing_only_asset_sensor() -> SensorResult: + return SensorResult( + asset_events=[ + AssetObservation( + asset_key=observing_only_asset_key, + tags={DATA_VERSION_TAG: compute_data_version()}, + ) + ] + ) + + sensor_instance = DagsterInstance.ephemeral() + + sensor_execution_data = observing_only_asset_sensor.evaluate_tick( + build_sensor_context(instance=sensor_instance) + ) + + assert len(sensor_execution_data.asset_events) == 1 + + asset_event = sensor_execution_data.asset_events[0] + + assert isinstance(asset_event, AssetObservation) + + +def create_observation_with_version( + asset_key: AssetKey, data_version: DataVersion +) -> AssetObservation: + return AssetObservation( + asset_key=asset_key, + tags={DATA_VERSION_TAG: data_version.value}, + ) + + +def assets_def_from_observe_fn(asset_key: AssetKey, observe_fn: Callable[..., Any]): + @asset( + key=asset_key, + metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}, + ) + def _asset(context: AssetExecutionContext, **kwargs) -> None: + assets_def = context.job_def.asset_layer.assets_def_for_asset(context.asset_key) + op_compute_fn = observe_fn_to_op_compute_fn( + observe_fn=observe_fn, + partitions_def=assets_def.partitions_def, + asset_key=context.asset_key, + ) + return op_compute_fn.decorated_fn(context, **kwargs) + + return _asset + + +def _get_auto_sensor_name(asset_key: AssetKey) -> str: + return f"__auto_observe_sensor{asset_key.to_python_identifier()}" + + +def sensor_def_from_observable_source_asset( + observable_source_asset: SourceAsset, + sensor_name: Optional[str] = None, +) -> SensorDefinition: + sensor_name = sensor_name if sensor_name else _get_auto_sensor_name(observable_source_asset.key) + + @sensor( + name=sensor_name, + minimum_interval_seconds=( + int(observable_source_asset.auto_observe_interval_minutes * 60) + if observable_source_asset.auto_observe_interval_minutes is not None + else 5 + * 60 # I could not find the default value (undocumented) so guessing it is 5 minutes? + ), + ) + def _sensor(context, **kwargs) -> SensorResult: + assert observable_source_asset.observe_fn + return SensorResult( + asset_events=[ + create_observation_with_version( + observable_source_asset.key, + observable_source_asset.observe_fn(**kwargs), + ), + ] + ) + + return _sensor + + +def test_framework_support_for_observable_source_assets_on_assets_def() -> None: + observing_only_asset_key = AssetKey("observing_only_asset") + + @observable_source_asset(name=observing_only_asset_key.to_python_identifier()) + def observing_only_source_asset() -> DataVersion: + return DataVersion("data_version") + + observing_only_assets_def = create_external_asset_from_source_asset(observing_only_source_asset) + + asset_execution_instance = DagsterInstance.ephemeral() + + assert materialize( + assets=[observing_only_assets_def], instance=asset_execution_instance + ).success + + assert ( + get_latest_asset_observation( + instance=asset_execution_instance, asset_key=observing_only_asset_key + ).data_version + == "data_version" + ) + + observing_only_asset_sensor = sensor_def_from_observable_source_asset( + observable_source_asset=observing_only_source_asset, + ) + + sensor_instance = DagsterInstance.ephemeral() + + sensor_execution_data = observing_only_asset_sensor.evaluate_tick( + build_sensor_context(instance=sensor_instance) + ) + + assert len(sensor_execution_data.asset_events) == 1 + + asset_event = sensor_execution_data.asset_events[0] + + assert isinstance(asset_event, AssetObservation) + + +def test_observable_source_adapter_ergonomics() -> None: + @observable_source_asset + def an_asset() -> DataVersion: + return DataVersion("data_version") + + # calling these helpers could be in the Definitions object itself + defs = Definitions( + assets=[create_external_asset_from_source_asset(an_asset)], + sensors=[sensor_def_from_observable_source_asset(an_asset)], + ) + + instance = DagsterInstance.ephemeral() + + result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) + assert result.success + + assert get_latest_asset_observation(instance, an_asset.key).data_version == "data_version" + + sensor_def = defs.get_sensor_def(_get_auto_sensor_name(an_asset.key)) + + sensor_result = sensor_def.evaluate_tick(build_sensor_context(instance=instance)) + + assert len(sensor_result.asset_events) == 1 + asset_observation = sensor_result.asset_events[0] + assert isinstance(asset_observation, AssetObservation) + assert asset_observation.asset_key == an_asset.key