From d32e2513e7115de0eeb1952ffe02a3aa8ea5839a Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Sun, 18 Feb 2024 21:03:30 -0500 Subject: [PATCH] [external-assets] Make `observe` support external observables --- .../dagster/_core/definitions/observe.py | 18 +- .../core_tests/test_data_time.py | 2 +- .../auto_materialize_tests/base_scenario.py | 2 +- .../definitions_tests/test_observe_result.py | 158 ++++++++++-------- 4 files changed, 97 insertions(+), 83 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/observe.py b/python_modules/dagster/dagster/_core/definitions/observe.py index 87c58245478a2..59f657c3096e1 100644 --- a/python_modules/dagster/dagster/_core/definitions/observe.py +++ b/python_modules/dagster/dagster/_core/definitions/observe.py @@ -1,6 +1,8 @@ -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union import dagster._check as check +from dagster._core.definitions.asset_selection import AssetSelection +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job from dagster._utils.warnings import disable_dagster_warnings @@ -13,7 +15,7 @@ def observe( - source_assets: Sequence[SourceAsset], + assets: Sequence[Union[AssetsDefinition, SourceAsset]], run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, @@ -26,8 +28,8 @@ def observe( By default, will materialize assets to the local filesystem. Args: - source_assets (Sequence[SourceAsset]): - The source assets to materialize. + assets (Sequence[Union[AssetsDefinition, SourceAsset]]): + The assets to observe. resources (Optional[Mapping[str, object]]): The resources needed for execution. Can provide resource instances directly, or resource definitions. Note that if provided resources @@ -41,15 +43,17 @@ def observe( Returns: ExecuteInProcessResult: The result of the execution. """ - source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset)) + assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) with disable_dagster_warnings(): - observation_job = define_asset_job("in_process_observation_job", source_assets) + observation_job = define_asset_job( + "in_process_observation_job", selection=AssetSelection.all(include_sources=True) + ) defs = Definitions( - assets=source_assets, + assets=assets, jobs=[observation_job], resources=resources, ) diff --git a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py index 66c0ca49a6ce2..eb89811d21cbb 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py @@ -333,7 +333,7 @@ def observe_sources(*args): def observe_sources_fn(*, instance, times_by_key, **kwargs): for arg in args: key = AssetKey(arg) - observe(source_assets=[versioned_repo.source_assets_by_key[key]], instance=instance) + observe(assets=[versioned_repo.source_assets_by_key[key]], instance=instance) latest_record = instance.get_latest_data_version_record(key, is_source=True) latest_timestamp = latest_record.timestamp times_by_key[key].append( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index ed2c9652f75d7..cd36f950dbcfd 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -357,7 +357,7 @@ def test_time_fn(): if run.is_observation: observe( instance=instance, - source_assets=[ + assets=[ a for a in self.assets if isinstance(a, SourceAsset) and a.key in run.asset_keys diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py index 099f903ff7774..422b4503119dd 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py @@ -14,7 +14,6 @@ asset, build_op_context, instance_for_test, - materialize, multi_asset, ) from dagster._core.definitions.asset_check_spec import AssetCheckKey @@ -23,18 +22,13 @@ AssetExecutionType, ) from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.observe import observe from dagster._core.definitions.result import ObserveResult from dagster._core.errors import DagsterInvariantViolationError, DagsterStepOutputNotFoundError from dagster._core.execution.context.invocation import build_asset_context from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus -def _exec_asset(asset_def, selection=None, partition_key=None): - result = materialize([asset_def], selection=selection, partition_key=partition_key) - assert result.success - return result.asset_observations_for_node(asset_def.node_def.name) - - def _with_observe_metadata(kwargs: Dict[str, Any]) -> Dict[str, Any]: metadata = kwargs.pop("metadata", {}) metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] = AssetExecutionType.OBSERVATION.value @@ -72,7 +66,9 @@ def ret_untyped(context: AssetExecutionContext): metadata={"one": 1}, ) - observations = _exec_asset(ret_untyped) + result = observe([ret_untyped]) + assert result.success + observations = result.asset_observations_for_node(ret_untyped.node_def.name) assert len(observations) == 1, observations assert "one" in observations[0].metadata @@ -89,7 +85,7 @@ def ret_mismatch(context: AssetExecutionContext): DagsterInvariantViolationError, match="Asset key random not found in AssetsDefinition", ): - materialize([ret_mismatch]) + observe([ret_mismatch]) # direct invocation with pytest.raises( @@ -104,7 +100,7 @@ def ret_two(): return ObserveResult(metadata={"one": 1}), ObserveResult(metadata={"two": 2}) # core execution - result = materialize([ret_two]) + result = observe([ret_two]) assert result.success # direct invocation @@ -126,7 +122,7 @@ def ret_checks(context: AssetExecutionContext): ) # core execution - materialize([ret_checks], instance=instance) + observe([ret_checks], instance=instance) asset_check_executions = instance.event_log_storage.get_asset_check_execution_history( AssetCheckKey(asset_key=ret_checks.key, name="foo_check"), limit=1, @@ -147,7 +143,7 @@ def outs_multi_asset(): asset_key="two", metadata={"baz": "qux"} ) - assert materialize([outs_multi_asset]).success + assert observe([outs_multi_asset]).success res = outs_multi_asset() assert res[0].metadata["foo"] == "bar" @@ -164,7 +160,7 @@ def specs_multi_asset(): asset_key=["prefix", "two"], metadata={"baz": "qux"} ) - assert materialize([specs_multi_asset]).success + assert observe([specs_multi_asset]).success res = specs_multi_asset() assert res[0].metadata["foo"] == "bar" @@ -186,11 +182,12 @@ def multi(): metadata={"two": 2}, ) - mats = _exec_asset(multi) - - assert len(mats) == 2, mats - assert "one" in mats[0].metadata - assert "two" in mats[1].metadata + result = observe([multi]) + assert result.success + observations = result.asset_observations_for_node(multi.node_def.name) + assert len(observations) == 2 + assert "one" in observations[0].metadata + assert "two" in observations[1].metadata direct_results = list(multi()) assert len(direct_results) == 2 @@ -212,7 +209,7 @@ def missing(): 'Core compute for op "missing" did not return an output for non-optional output "two"' ), ): - _exec_asset(missing) + observe([missing]) with pytest.raises( DagsterInvariantViolationError, @@ -239,7 +236,7 @@ def no_key(): " asset_key, options are:" ), ): - _exec_asset(no_key) + observe([no_key]) with pytest.raises( DagsterInvariantViolationError, @@ -266,11 +263,12 @@ def ret_multi(): ), ) - mats = _exec_asset(ret_multi) - - assert len(mats) == 2, mats - assert "one" in mats[0].metadata - assert "two" in mats[1].metadata + result = observe([ret_multi]) + assert result.success + observations = result.asset_observations_for_node(ret_multi.node_def.name) + assert len(observations) == 2 + assert "one" in observations[0].metadata + assert "two" in observations[1].metadata res = ret_multi() assert len(res) == 2 @@ -299,7 +297,7 @@ def ret_list(): " value for each output." ), ): - _exec_asset(ret_list) + observe([ret_list]) with pytest.raises( DagsterInvariantViolationError, @@ -327,7 +325,7 @@ def load_input(self, context): def asset_with_type_annotation() -> ObserveResult: return ObserveResult(metadata={"foo": "bar"}) - assert materialize( + assert observe( [asset_with_type_annotation], resources={"io_manager": TestingIOManager()} ).success @@ -335,7 +333,7 @@ def asset_with_type_annotation() -> ObserveResult: def multi_asset_with_outs_and_type_annotation() -> Tuple[ObserveResult, ObserveResult]: return ObserveResult(asset_key="one"), ObserveResult(asset_key="two") - assert materialize( + assert observe( [multi_asset_with_outs_and_type_annotation], resources={"io_manager": TestingIOManager()} ).success @@ -343,7 +341,7 @@ def multi_asset_with_outs_and_type_annotation() -> Tuple[ObserveResult, ObserveR def multi_asset_with_specs_and_type_annotation() -> Tuple[ObserveResult, ObserveResult]: return ObserveResult(asset_key="one"), ObserveResult(asset_key="two") - assert materialize( + assert observe( [multi_asset_with_specs_and_type_annotation], resources={"io_manager": TestingIOManager()} ).success @@ -351,7 +349,7 @@ def multi_asset_with_specs_and_type_annotation() -> Tuple[ObserveResult, Observe def multi_asset_with_specs_and_no_type_annotation(): return ObserveResult(asset_key="one"), ObserveResult(asset_key="two") - assert materialize( + assert observe( [multi_asset_with_specs_and_no_type_annotation], resources={"io_manager": TestingIOManager()}, ).success @@ -376,7 +374,7 @@ def with_checks(context: AssetExecutionContext) -> ObserveResult: ] ) - assert materialize( + assert observe( [with_checks], resources={"io_manager": TestingIOManager()}, ).success @@ -412,7 +410,7 @@ def multi_checks(context: AssetExecutionContext) -> Tuple[ObserveResult, Observe ], ) - assert materialize( + assert observe( [multi_checks], resources={"io_manager": TestingIOManager()}, ).success @@ -435,7 +433,7 @@ def load_input(self, context): def generator_asset() -> Generator[ObserveResult, None, None]: yield ObserveResult(metadata={"foo": "bar"}) - materialize([generator_asset], resources={"io_manager": TestingIOManager()}) + observe([generator_asset], resources={"io_manager": TestingIOManager()}) def test_observe_result_generators(): @@ -443,43 +441,49 @@ def test_observe_result_generators(): def generator_asset() -> Generator[ObserveResult, None, None]: yield ObserveResult(metadata={"foo": "bar"}) - res = _exec_asset(generator_asset) - assert len(res) == 1 - assert res[0].metadata["foo"].value == "bar" + result = observe([generator_asset]) + assert result.success + observations = result.asset_observations_for_node(generator_asset.node_def.name) + assert len(observations) == 1 + assert observations[0].metadata["foo"].value == "bar" - res = list(generator_asset()) - assert len(res) == 1 - assert res[0].metadata["foo"] == "bar" + result = list(generator_asset()) + assert len(result) == 1 + assert result[0].metadata["foo"] == "bar" @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) def generator_specs_multi_asset(): yield ObserveResult(asset_key="one", metadata={"foo": "bar"}) yield ObserveResult(asset_key="two", metadata={"baz": "qux"}) - res = _exec_asset(generator_specs_multi_asset) - assert len(res) == 2 - assert res[0].metadata["foo"].value == "bar" - assert res[1].metadata["baz"].value == "qux" + result = observe([generator_specs_multi_asset]) + assert result.success + observations = result.asset_observations_for_node(generator_specs_multi_asset.node_def.name) + assert len(observations) == 2 + assert observations[0].metadata["foo"].value == "bar" + assert observations[1].metadata["baz"].value == "qux" - res = list(generator_specs_multi_asset()) - assert len(res) == 2 - assert res[0].metadata["foo"] == "bar" - assert res[1].metadata["baz"] == "qux" + result = list(generator_specs_multi_asset()) + assert len(result) == 2 + assert result[0].metadata["foo"] == "bar" + assert result[1].metadata["baz"] == "qux" @_external_observable_multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) def generator_outs_multi_asset(): yield ObserveResult(asset_key="one", metadata={"foo": "bar"}) yield ObserveResult(asset_key="two", metadata={"baz": "qux"}) - res = _exec_asset(generator_outs_multi_asset) - assert len(res) == 2 - assert res[0].metadata["foo"].value == "bar" - assert res[1].metadata["baz"].value == "qux" + result = observe([generator_outs_multi_asset]) + assert result.success + observations = result.asset_observations_for_node(generator_outs_multi_asset.node_def.name) + assert len(observations) == 2 + assert observations[0].metadata["foo"].value == "bar" + assert observations[1].metadata["baz"].value == "qux" - res = list(generator_outs_multi_asset()) - assert len(res) == 2 - assert res[0].metadata["foo"] == "bar" - assert res[1].metadata["baz"] == "qux" + result = list(generator_outs_multi_asset()) + assert len(result) == 2 + assert result[0].metadata["foo"] == "bar" + assert result[1].metadata["baz"] == "qux" @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) async def async_specs_multi_asset(): @@ -487,25 +491,29 @@ async def async_specs_multi_asset(): asset_key="two", metadata={"baz": "qux"} ) - res = _exec_asset(async_specs_multi_asset) - assert len(res) == 2 - assert res[0].metadata["foo"].value == "bar" - assert res[1].metadata["baz"].value == "qux" + result = observe([async_specs_multi_asset]) + assert result.success + observations = result.asset_observations_for_node(async_specs_multi_asset.node_def.name) + assert len(observations) == 2 + assert observations[0].metadata["foo"].value == "bar" + assert observations[1].metadata["baz"].value == "qux" - res = asyncio.run(async_specs_multi_asset()) - assert len(res) == 2 - assert res[0].metadata["foo"] == "bar" - assert res[1].metadata["baz"] == "qux" + result = asyncio.run(async_specs_multi_asset()) + assert len(result) == 2 + assert result[0].metadata["foo"] == "bar" + assert result[1].metadata["baz"] == "qux" @_external_observable_multi_asset(specs=[AssetSpec("one"), AssetSpec("two")]) async def async_gen_specs_multi_asset(): yield ObserveResult(asset_key="one", metadata={"foo": "bar"}) yield ObserveResult(asset_key="two", metadata={"baz": "qux"}) - res = _exec_asset(async_gen_specs_multi_asset) - assert len(res) == 2 - assert res[0].metadata["foo"].value == "bar" - assert res[1].metadata["baz"].value == "qux" + result = observe([async_gen_specs_multi_asset]) + assert result.success + observations = result.asset_observations_for_node(async_gen_specs_multi_asset.node_def.name) + assert len(observations) == 2 + assert observations[0].metadata["foo"].value == "bar" + assert observations[1].metadata["baz"].value == "qux" async def _run_async_gen(): results = [] @@ -513,10 +521,10 @@ async def _run_async_gen(): results.append(result) return results - res = asyncio.run(_run_async_gen()) - assert len(res) == 2 - assert res[0].metadata["foo"] == "bar" - assert res[1].metadata["baz"] == "qux" + result = asyncio.run(_run_async_gen()) + assert len(result) == 2 + assert result[0].metadata["foo"] == "bar" + assert result[1].metadata["baz"] == "qux" def test_observe_result_with_partitions(): @@ -526,9 +534,11 @@ def test_observe_result_with_partitions(): def partitioned_asset(context: AssetExecutionContext) -> ObserveResult: return ObserveResult(metadata={"key": context.partition_key}) - mats = _exec_asset(partitioned_asset, partition_key="red") - assert len(mats) == 1, mats - assert mats[0].metadata["key"].text == "red" + result = observe([partitioned_asset], partition_key="red") + assert result.success + observations = result.asset_observations_for_node(partitioned_asset.node_def.name) + assert len(observations) == 1 + assert observations[0].metadata["key"].text == "red" def test_observe_result_with_partitions_direct_invocation():