diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 02d5cde96ce63..ef9ee912d40a9 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -1,4 +1,4 @@ -from typing import List, Sequence +from typing import List, Optional, Sequence from dagster import _check as check from dagster._core.definitions.asset_spec import ( @@ -7,7 +7,14 @@ AssetSpec, ) from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset +from dagster._core.definitions.decorators.sensor_decorator import sensor +from dagster._core.definitions.events import AssetKey, AssetObservation +from dagster._core.definitions.run_request import SensorResult +from dagster._core.definitions.sensor_definition import ( + SensorDefinition, +) from dagster._core.definitions.source_asset import ( SourceAsset, wrap_source_asset_observe_fn_in_op_compute_fn, @@ -177,3 +184,68 @@ def _shim_assets_def(context: AssetExecutionContext): assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright return _shim_assets_def + + +def create_observation_with_version( + asset_key: AssetKey, data_version: DataVersion +) -> AssetObservation: + return AssetObservation( + asset_key=asset_key, + tags={DATA_VERSION_TAG: data_version.value}, + ) + + +def get_auto_sensor_name(asset_key: AssetKey) -> str: + return f"__auto_observe_sensor{asset_key.to_python_identifier()}" + + +def sensor_def_from_observable_source_assets( + observable_source_assets: Sequence[SourceAsset], + sensor_name: Optional[str] = None, + interval_minutes: Optional[float] = None, +) -> SensorDefinition: + """Given an existing observable source asset, generate a sensor that observes it on a regular + interval as specified by `SourceAsset.auto_observe_internal_minutes`. + """ + for source_asset in observable_source_assets: + check.param_invariant( + source_asset.observe_fn, + "observable_source_assets", + "All source assets must have observe_fn in this code path", + ) + + def _get_sensor_name() -> str: + if sensor_name: + return sensor_name + if len(observable_source_assets) == 1: + return get_auto_sensor_name(observable_source_assets[0].key) + raise DagsterInvariantViolationError( + "Must specific sensor name explicitly if there is more than one source asset" + ) + + def _get_interval_seconds() -> Optional[int]: + if interval_minutes: + return int(interval_minutes * 60) + + seen_interval_minutes = None + for source_asset in observable_source_assets: + if seen_interval_minutes is None: + seen_interval_minutes = source_asset.auto_observe_interval_minutes + else: + check.invariant( + seen_interval_minutes == source_asset.auto_observe_interval_minutes, + "All interval minutes in source assets must be the same", + ) + return int(seen_interval_minutes * 60) if seen_interval_minutes else None + + @sensor(name=_get_sensor_name(), minimum_interval_seconds=_get_interval_seconds()) + def _sensor(context, **kwargs) -> SensorResult: + asset_events = [] + for source_asset in observable_source_assets: + assert source_asset.observe_fn # appease pyright + asset_events.append( + create_observation_with_version(source_asset.key, source_asset.observe_fn(**kwargs)) + ) + return SensorResult(asset_events=asset_events) + + return _sensor diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 79f6ca213d571..51f0d9e7bd27f 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -62,6 +62,20 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( source_asset: "SourceAsset", +) -> "DecoratedOpFunction": + check.not_none(source_asset.observe_fn) # for runtime check + assert source_asset.observe_fn is not None # for type checker + return observe_fn_to_op_compute_fn( + observe_fn=source_asset.observe_fn, + partitions_def=source_asset.partitions_def, + asset_key=source_asset.key, + ) + + +def observe_fn_to_op_compute_fn( + observe_fn: SourceAssetObserveFunction, + partitions_def: Optional[PartitionsDefinition], + asset_key: AssetKey, ) -> "DecoratedOpFunction": from dagster._core.definitions.decorators.op_decorator import ( DecoratedOpFunction, @@ -71,11 +85,6 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( OpExecutionContext, ) - check.not_none(source_asset.observe_fn, "Must be an observable source asset") - assert source_asset.observe_fn # for type checker - - observe_fn = source_asset.observe_fn - observe_fn_has_context = is_context_provided(get_function_params(observe_fn)) def fn(context: OpExecutionContext) -> None: @@ -88,22 +97,22 @@ def fn(context: OpExecutionContext) -> None: ) if isinstance(observe_fn_return_value, DataVersion): - if source_asset.partitions_def is not None: + if partitions_def is not None: raise DagsterInvalidObservationError( - f"{source_asset.key} is partitioned, so its observe function should return a" + f"{asset_key} is partitioned, so its observe function should return a" " DataVersionsByPartition, not a DataVersion" ) context.log_event( AssetObservation( - asset_key=source_asset.key, + asset_key=asset_key, tags={DATA_VERSION_TAG: observe_fn_return_value.value}, ) ) elif isinstance(observe_fn_return_value, DataVersionsByPartition): - if source_asset.partitions_def is None: + if partitions_def is None: raise DagsterInvalidObservationError( - f"{source_asset.key} is not partitioned, so its observe function should return" + f"{asset_key} is not partitioned, so its observe function should return" " a DataVersion, not a DataVersionsByPartition" ) @@ -113,14 +122,14 @@ def fn(context: OpExecutionContext) -> None: ) in observe_fn_return_value.data_versions_by_partition.items(): context.log_event( AssetObservation( - asset_key=source_asset.key, + asset_key=asset_key, tags={DATA_VERSION_TAG: data_version.value}, partition=partition_key, ) ) else: raise DagsterInvalidObservationError( - f"Observe function for {source_asset.key} must return a DataVersion or" + f"Observe function for {asset_key} must return a DataVersion or" " DataVersionsByPartition, but returned a value of type" f" {type(observe_fn_return_value)}" ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 9bcb95e5f381c..a913fcb4ad229 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -11,18 +11,34 @@ Definitions, IOManager, JobDefinition, + SensorResult, SourceAsset, _check as check, asset, observable_source_asset, ) -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions import materialize +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + AssetSpec, +) +from dagster._core.definitions.data_version import DATA_VERSION_TAG +from dagster._core.definitions.decorators.sensor_decorator import sensor +from dagster._core.definitions.events import AssetObservation from dagster._core.definitions.external_asset import ( create_external_asset_from_source_asset, external_assets_from_specs, + get_auto_sensor_name, + sensor_def_from_observable_source_assets, ) from dagster._core.definitions.freshness_policy import FreshnessPolicy +from dagster._core.definitions.sensor_definition import ( + build_sensor_context, +) from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition +from dagster._core.event_api import EventRecordsFilter +from dagster._core.events import DagsterEventType def test_external_asset_basic_creation() -> None: @@ -228,3 +244,183 @@ def an_observable_source_asset() -> DataVersion: all_materializations = result.get_asset_materialization_events() assert len(all_materializations) == 0 + + +def get_latest_asset_observation( + instance: DagsterInstance, asset_key: AssetKey +) -> AssetObservation: + event_records = instance.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_OBSERVATION, + asset_key=asset_key, + ), + limit=1, + ) + + assert len(event_records) == 1 + + event_record = event_records[0] + + return check.not_none(event_record.asset_observation) + + +def test_demonstrate_explicit_sensor_in_user_space() -> None: + def compute_data_version() -> str: + return "data_version" + + observing_only_asset_key = AssetKey("observing_only_asset") + + @asset( + key=observing_only_asset_key, + metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}, + ) + def observing_only_asset(context: AssetExecutionContext) -> None: + context.log_event( + AssetObservation( + asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()} + ) + ) + + asset_execution_instance = DagsterInstance.ephemeral() + + assert materialize(assets=[observing_only_asset], instance=asset_execution_instance).success + + assert ( + get_latest_asset_observation( + asset_execution_instance, observing_only_asset_key + ).data_version + == "data_version" + ) + + @sensor(job_name="observing_only_sensor") + def observing_only_asset_sensor() -> SensorResult: + return SensorResult( + asset_events=[ + AssetObservation( + asset_key=observing_only_asset_key, + tags={DATA_VERSION_TAG: compute_data_version()}, + ) + ] + ) + + sensor_instance = DagsterInstance.ephemeral() + + sensor_execution_data = observing_only_asset_sensor.evaluate_tick( + build_sensor_context(instance=sensor_instance) + ) + + assert len(sensor_execution_data.asset_events) == 1 + + asset_event = sensor_execution_data.asset_events[0] + + assert isinstance(asset_event, AssetObservation) + + +def test_framework_support_for_observable_source_assets_on_assets_def() -> None: + observing_only_asset_key = AssetKey("observing_only_asset") + + @observable_source_asset(name=observing_only_asset_key.to_python_identifier()) + def observing_only_source_asset() -> DataVersion: + return DataVersion("data_version") + + observing_only_assets_def = create_external_asset_from_source_asset(observing_only_source_asset) + + asset_execution_instance = DagsterInstance.ephemeral() + + assert materialize( + assets=[observing_only_assets_def], instance=asset_execution_instance + ).success + + assert ( + get_latest_asset_observation( + instance=asset_execution_instance, asset_key=observing_only_asset_key + ).data_version + == "data_version" + ) + + observing_only_asset_sensor = sensor_def_from_observable_source_assets( + [observing_only_source_asset] + ) + + sensor_instance = DagsterInstance.ephemeral() + + sensor_execution_data = observing_only_asset_sensor.evaluate_tick( + build_sensor_context(instance=sensor_instance) + ) + + assert len(sensor_execution_data.asset_events) == 1 + + asset_event = sensor_execution_data.asset_events[0] + + assert isinstance(asset_event, AssetObservation) + + +def test_observable_source_adapter_ergonomics() -> None: + @observable_source_asset + def an_asset() -> DataVersion: + return DataVersion("data_version") + + # calling these helpers could be in the Definitions object itself + defs = Definitions( + assets=[create_external_asset_from_source_asset(an_asset)], + sensors=[sensor_def_from_observable_source_assets([an_asset])], + ) + + instance = DagsterInstance.ephemeral() + + result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) + assert result.success + + assert get_latest_asset_observation(instance, an_asset.key).data_version == "data_version" + sensor_def = defs.get_sensor_def(get_auto_sensor_name(an_asset.key)) + + sensor_result = sensor_def.evaluate_tick(build_sensor_context(instance=instance)) + + assert len(sensor_result.asset_events) == 1 + asset_observation = sensor_result.asset_events[0] + assert isinstance(asset_observation, AssetObservation) + assert asset_observation.asset_key == an_asset.key + + +def test_multi_observable_source_adapter() -> None: + @observable_source_asset + def asset_one() -> DataVersion: + return DataVersion("data_version_one") + + @observable_source_asset + def asset_two() -> DataVersion: + return DataVersion("data_version_two") + + # calling these helpers could be in the Definitions object itself + defs = Definitions( + assets=[create_external_asset_from_source_asset(sa) for sa in [asset_one, asset_two]], + sensors=[ + sensor_def_from_observable_source_assets( + [asset_one, asset_two], sensor_name="observe_all" + ) + ], + ) + + instance = DagsterInstance.ephemeral() + + result = defs.get_implicit_global_asset_job_def().execute_in_process(instance=instance) + assert result.success + + assert get_latest_asset_observation(instance, asset_one.key).data_version == "data_version_one" + assert get_latest_asset_observation(instance, asset_two.key).data_version == "data_version_two" + + sensor_def = defs.get_sensor_def("observe_all") + + sensor_result = sensor_def.evaluate_tick(build_sensor_context(instance=instance)) + + assert len(sensor_result.asset_events) == 2 + + def _get_observation_for(asset_key: AssetKey) -> AssetObservation: + for asset_observation in sensor_result.asset_events: + assert isinstance(asset_observation, AssetObservation) + if asset_observation.asset_key == asset_key: + return asset_observation + check.failed("not found") + + assert _get_observation_for(asset_one.key).data_version == "data_version_one" + assert _get_observation_for(asset_two.key).data_version == "data_version_two"