From 7d08a7d9fbc5d685a916d662a5f1658157eb2e39 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Tue, 19 Sep 2023 10:42:39 -0400 Subject: [PATCH] observable assets in a user space sensor --- .../_core/definitions/sensor_definition.py | 66 ++++++++++++++- .../dagster/dagster/_daemon/sensor.py | 3 + .../test_observable_assets.py | 81 ++++++++++++++++++- 3 files changed, 148 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py index 92227e5e18275..38f9356dacd4e 100644 --- a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py @@ -28,7 +28,13 @@ import dagster._check as check from dagster._annotations import public +from dagster._core.definitions.events import ( + AssetMaterialization, + AssetObservation, + UserEvent, +) from dagster._core.definitions.instigation_logger import InstigationLogger +from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.partition import ( CachingDynamicPartitionsLoader, ) @@ -55,7 +61,6 @@ ) from .asset_selection import AssetSelection from .graph_definition import GraphDefinition -from .job_definition import JobDefinition from .run_request import ( AddDynamicPartitionsRequest, DagsterRunReaction, @@ -72,6 +77,7 @@ from dagster import ResourceDefinition from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.repository_definition import RepositoryDefinition + from dagster._core.events import DagsterEvent @whitelist_for_serdes @@ -178,6 +184,7 @@ def __init__( ) self._logger: Optional[InstigationLogger] = None self._cursor_updated = False + self._events: List["DagsterEvent"] = [] def __enter__(self) -> "SensorEvaluationContext": self._cm_scope_entered = True @@ -373,6 +380,55 @@ def has_captured_logs(self): def log_key(self) -> Optional[List[str]]: return self._log_key + @public + def log_event(self, event: UserEvent) -> None: + from dagster._core.events import ( + AssetObservationData, + DagsterEvent, + DagsterEventType, + StepMaterializationData, + ) + + """Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op. + + Events logged with this method will appear in the list of DagsterEvents, as well as the event log. + + Args: + event (Union[AssetMaterialization, AssetObservation, ExpectationResult]): The event to log. + + **Examples:** + + .. code-block:: python + + from dagster import op, AssetMaterialization + + @op + def log_materialization(context): + context.log_event(AssetMaterialization("foo")) + """ + dagster_event = None + if isinstance(event, AssetMaterialization): + event_type_value = DagsterEventType.ASSET_MATERIALIZATION.value + data_payload = StepMaterializationData(event) + dagster_event = DagsterEvent( + event_type_value=event_type_value, + event_specific_data=data_payload, + job_name="", # RUNLESS_JOB_NAME + ) + + elif isinstance(event, AssetObservation): + event_type_value = DagsterEventType.ASSET_OBSERVATION.value + data_payload = AssetObservationData(event) + dagster_event = DagsterEvent( + event_type_value=event_type_value, + event_specific_data=data_payload, + job_name="", # RUNLESS_JOB_NAME + ) + else: + raise DagsterInvariantViolationError(f"Unsupported event type: {type(event)}") + + self._events.append(dagster_event) + RawSensorEvaluationFunctionReturn = Union[ Iterator[Union[SkipReason, RunRequest, DagsterRunReaction, SensorResult]], @@ -799,6 +855,7 @@ def evaluate_tick(self, context: "SensorEvaluationContext") -> "SensorExecutionD dagster_run_reactions, captured_log_key=context.log_key if context.has_captured_logs() else None, dynamic_partitions_requests=dynamic_partitions_requests, + dagster_events=context._events, # noqa: SLF001 ) def has_loadable_targets(self) -> bool: @@ -939,6 +996,10 @@ class SensorExecutionData( Sequence[Union[AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest]] ], ), + ( + "dagster_events", + List["DagsterEvent"], + ), ], ) ): @@ -954,6 +1015,7 @@ def __new__( dynamic_partitions_requests: Optional[ Sequence[Union[AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest]] ] = None, + dagster_events: Optional[Iterable["DagsterEvent"]] = None, ): check.opt_sequence_param(run_requests, "run_requests", RunRequest) check.opt_str_param(skip_message, "skip_message") @@ -968,6 +1030,7 @@ def __new__( check.invariant( not (run_requests and skip_message), "Found both skip data and run request data" ) + # TODO DagsterEvent type check return super(SensorExecutionData, cls).__new__( cls, run_requests=run_requests, @@ -976,6 +1039,7 @@ def __new__( dagster_run_reactions=dagster_run_reactions, captured_log_key=captured_log_key, dynamic_partitions_requests=dynamic_partitions_requests, + dagster_events=list(dagster_events) if dagster_events else [], ) diff --git a/python_modules/dagster/dagster/_daemon/sensor.py b/python_modules/dagster/dagster/_daemon/sensor.py index dd2cad76dda0e..bb88d4dcb3034 100644 --- a/python_modules/dagster/dagster/_daemon/sensor.py +++ b/python_modules/dagster/dagster/_daemon/sensor.py @@ -679,6 +679,9 @@ def _evaluate_sensor( assert isinstance(sensor_runtime_data, SensorExecutionData) + for dagster_event in sensor_runtime_data.dagster_events or []: + instance.report_dagster_event(dagster_event, run_id="") + if sensor_runtime_data.dynamic_partitions_requests: for request in sensor_runtime_data.dynamic_partitions_requests: existent_partitions = [] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 1b27ffba0a72b..ecaf5bb299655 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -16,13 +16,28 @@ asset, observable_source_asset, ) -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions import materialize +from dagster._core.definitions.asset_spec import ( + SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE, + AssetExecutionType, + AssetSpec, +) +from dagster._core.definitions.data_version import DATA_VERSION_TAG +from dagster._core.definitions.decorators.sensor_decorator import sensor +from dagster._core.definitions.events import AssetObservation from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.observable_asset import ( create_assets_def_from_source_asset, create_unexecutable_observable_assets_def, ) +from dagster._core.definitions.run_request import SkipReason +from dagster._core.definitions.sensor_definition import ( + SensorEvaluationContext, + build_sensor_context, +) from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition +from dagster._core.event_api import EventRecordsFilter +from dagster._core.events import DagsterEventType def test_observable_asset_basic_creation() -> None: @@ -215,3 +230,67 @@ def an_observable_source_asset() -> DataVersion: all_materializations = result.get_asset_materialization_events() assert len(all_materializations) == 0 + + +def get_latest_asset_observation( + instance: DagsterInstance, asset_key: AssetKey +) -> AssetObservation: + event_records = instance.get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.ASSET_OBSERVATION, + asset_key=asset_key, + ), + limit=1, + ) + + assert len(event_records) == 1 + + event_record = event_records[0] + + return check.not_none(event_record.asset_observation) + + +def test_demonstrate_explicit_sensor_in_user_space() -> None: + def compute_data_version() -> str: + return "data_version" + + observing_only_asset_key = AssetKey("observing_only_asset") + + @asset( + key=observing_only_asset_key, + metadata={SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}, + ) + def observing_only_asset(context: AssetExecutionContext) -> None: + context.log_event( + AssetObservation( + asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()} + ) + ) + + asset_execution_instance = DagsterInstance.ephemeral() + + assert materialize(assets=[observing_only_asset], instance=asset_execution_instance).success + + assert ( + get_latest_asset_observation( + asset_execution_instance, observing_only_asset_key + ).data_version + == "data_version" + ) + + @sensor(job_name="observing_only_sensor") + def observing_only_asset_sensor(context: SensorEvaluationContext) -> SkipReason: + context.log_event( + AssetObservation( + asset_key=observing_only_asset_key, tags={DATA_VERSION_TAG: compute_data_version()} + ) + ) + return SkipReason("Never kicks off run") + + sensor_instance = DagsterInstance.ephemeral() + + sensor_execution_data = observing_only_asset_sensor.evaluate_tick( + build_sensor_context(instance=sensor_instance) + ) + + assert len(sensor_execution_data.dagster_events) == 1