From d7b5a78cf6c928ed09af8a48e26d026cd958e967 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Sun, 9 Jun 2024 08:34:28 -0400 Subject: [PATCH] Type /Users/schrockn/code/dagster-io/dagster/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py --- .../test_asset_check_decorator.py | 203 ++++++++++-------- 1 file changed, 108 insertions(+), 95 deletions(-) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py index 83c416bcf3cd2..234817af59eb1 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py @@ -1,5 +1,5 @@ import re -from typing import NamedTuple, Optional +from typing import Iterable, NamedTuple, Optional import pytest from dagster import ( @@ -18,6 +18,7 @@ ObserveResult, ResourceParam, SourceAsset, + _check as check, asset, asset_check, build_op_context, @@ -26,12 +27,14 @@ observable_source_asset, ) from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.errors import ( DagsterInvalidDefinitionError, DagsterInvalidSubsetError, DagsterInvariantViolationError, ) from dagster._core.execution.context.compute import AssetCheckExecutionContext +from dagster._utils.error import SerializableErrorInfo def execute_assets_and_checks( @@ -57,9 +60,9 @@ def execute_assets_and_checks( return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance) -def test_asset_check_decorator(): +def test_asset_check_decorator() -> None: @asset_check(asset="asset1", description="desc", metadata={"foo": "bar"}) - def check1(): + def check1() -> AssetCheckResult: return AssetCheckResult(passed=True) spec = check1.get_spec_for_check_key(AssetCheckKey(AssetKey(["asset1"]), "check1")) @@ -69,21 +72,21 @@ def check1(): assert spec.metadata == {"foo": "bar"} -def test_asset_check_decorator_name(): +def test_asset_check_decorator_name() -> None: @asset_check(asset="asset1", description="desc", name="check1") - def _check(): + def _check() -> AssetCheckResult: return AssetCheckResult(passed=True) spec = _check.get_spec_for_check_key(AssetCheckKey(AssetKey(["asset1"]), "check1")) assert spec.name == "check1" -def test_asset_check_with_prefix(): +def test_asset_check_with_prefix() -> None: @asset(key_prefix="prefix") - def asset1(): ... + def asset1() -> None: ... @asset_check(asset=asset1) - def my_check(): + def my_check() -> AssetCheckResult: return AssetCheckResult(passed=True) spec = my_check.get_spec_for_check_key( @@ -92,12 +95,12 @@ def my_check(): assert spec.asset_key == AssetKey(["prefix", "asset1"]) -def test_asset_check_input_with_prefix(): +def test_asset_check_input_with_prefix() -> None: @asset(key_prefix="prefix") - def asset1(): ... + def asset1() -> None: ... @asset_check(asset=asset1) - def my_check(asset1): + def my_check(asset1) -> AssetCheckResult: return AssetCheckResult(passed=True) spec = my_check.get_spec_for_check_key( @@ -106,9 +109,9 @@ def my_check(asset1): assert spec.asset_key == AssetKey(["prefix", "asset1"]) -def test_execute_asset_and_check(): +def test_execute_asset_and_check() -> None: @asset - def asset1(): ... + def asset1() -> None: ... @asset_check(asset=asset1, description="desc") def check1(context: AssetCheckExecutionContext): @@ -163,9 +166,9 @@ def check1(context: AssetCheckExecutionContext): ) -def test_execute_check_without_asset(): +def test_execute_check_without_asset() -> None: @asset_check(asset="asset1", description="desc") - def check1(): + def check1() -> AssetCheckResult: return AssetCheckResult(passed=True, metadata={"foo": "bar"}) result = execute_assets_and_checks(asset_checks=[check1]) @@ -212,12 +215,12 @@ def check1(context: AssetCheckExecutionContext): assert check_eval.target_materialization_data.timestamp == materialization_record.timestamp -def test_execute_check_and_unrelated_asset(): +def test_execute_check_and_unrelated_asset() -> None: @asset - def asset2(): ... + def asset2() -> None: ... @asset_check(asset="asset1", description="desc") - def check1(): + def check1() -> AssetCheckResult: return AssetCheckResult(passed=True) result = execute_assets_and_checks(assets=[asset2], asset_checks=[check1]) @@ -233,16 +236,17 @@ def check1(): assert check_eval.check_name == "check1" -def test_check_doesnt_execute_if_asset_fails(): +def test_check_doesnt_execute_if_asset_fails() -> None: check_executed = [False] @asset - def asset1(): + def asset1() -> None: raise ValueError() @asset_check(asset=asset1) - def asset1_check(context: AssetCheckExecutionContext): + def asset1_check(context: AssetCheckExecutionContext) -> AssetCheckResult: check_executed[0] = True + raise Exception("Should not execute") result = execute_assets_and_checks( assets=[asset1], asset_checks=[asset1_check], raise_on_error=False @@ -252,9 +256,9 @@ def asset1_check(context: AssetCheckExecutionContext): assert not check_executed[0] -def test_check_decorator_unexpected_asset_key(): +def test_check_decorator_unexpected_asset_key() -> None: @asset_check(asset="asset1", description="desc") - def asset1_check(): + def asset1_check() -> AssetCheckResult: return AssetCheckResult(asset_key=AssetKey("asset2"), passed=True) with pytest.raises( @@ -267,16 +271,16 @@ def asset1_check(): execute_assets_and_checks(asset_checks=[asset1_check]) -def test_asset_check_separate_op_downstream_still_executes(): +def test_asset_check_separate_op_downstream_still_executes() -> None: @asset - def asset1(): ... + def asset1() -> None: ... @asset_check(asset=asset1) - def asset1_check(context): + def asset1_check(context) -> AssetCheckResult: return AssetCheckResult(passed=False) @asset(deps=[asset1]) - def asset2(): ... + def asset2() -> None: ... result = execute_assets_and_checks(assets=[asset1, asset2], asset_checks=[asset1_check]) assert result.success @@ -292,16 +296,23 @@ def asset2(): ... assert not check_eval.passed -def test_blocking_check_skip_downstream(): +def error_for_node(result: ExecuteInProcessResult, node_name: str) -> SerializableErrorInfo: + failure_data = result.failure_data_for_node(node_name) + assert failure_data + assert failure_data.error + return failure_data.error + + +def test_blocking_check_skip_downstream() -> None: @asset - def asset1(): ... + def asset1() -> None: ... @asset_check(asset=asset1, blocking=True) - def check1(context): + def check1(context) -> AssetCheckResult: return AssetCheckResult(passed=False) @asset(deps=[asset1]) - def asset2(): ... + def asset2() -> None: ... result = execute_assets_and_checks( assets=[asset1, asset2], asset_checks=[check1], raise_on_error=False @@ -318,22 +329,22 @@ def asset2(): ... assert check_eval.check_name == "check1" assert not check_eval.passed - error = result.failure_data_for_node("asset1_check1").error + error = error_for_node(result, "asset1_check1") assert error.message.startswith( "dagster._core.errors.DagsterAssetCheckFailedError: Blocking check 'check1' for asset 'asset1'" " failed with ERROR severity." ) -def test_blocking_check_with_source_asset_fail(): +def test_blocking_check_with_source_asset_fail() -> None: asset1 = SourceAsset("asset1") @asset_check(asset=asset1, blocking=True) - def check1(context): + def check1(context) -> AssetCheckResult: return AssetCheckResult(passed=False) @asset(deps=[asset1]) - def asset2(): ... + def asset2() -> None: ... result = execute_assets_and_checks( assets=[asset1, asset2], asset_checks=[check1], raise_on_error=False @@ -350,29 +361,29 @@ def asset2(): ... assert check_eval.check_name == "check1" assert not check_eval.passed - error = result.failure_data_for_node("asset1_check1").error + error = error_for_node(result, "asset1_check1") assert error.message.startswith( "dagster._core.errors.DagsterAssetCheckFailedError: Blocking check 'check1' for asset 'asset1'" " failed with ERROR severity." ) -def test_error_severity_with_source_asset_success(): +def test_error_severity_with_source_asset_success() -> None: asset1 = SourceAsset("asset1", io_manager_key="asset1_io_manager") @asset_check(asset=asset1) - def check1(context): + def check1(context) -> AssetCheckResult: return AssetCheckResult(passed=True, severity=AssetCheckSeverity.ERROR) @asset - def asset2(asset1): + def asset2(asset1) -> None: assert asset1 == 5 class MyIOManager(IOManager): - def load_input(self, context): + def load_input(self, context) -> int: return 5 - def handle_output(self, context, obj): + def handle_output(self, context, obj) -> None: raise NotImplementedError() result = execute_assets_and_checks( @@ -394,10 +405,10 @@ def handle_output(self, context, obj): assert check_eval.passed -def test_definitions_conflicting_checks(): - def make_check(): +def test_definitions_conflicting_checks() -> None: + def make_check() -> AssetChecksDefinition: @asset_check(asset="asset1") - def check1(context): ... + def check1(context) -> AssetCheckResult: ... return check1 @@ -408,10 +419,10 @@ def check1(context): ... Definitions(asset_checks=[make_check(), make_check()]) -def test_definitions_same_name_different_asset(): +def test_definitions_same_name_different_asset() -> None: def make_check_for_asset(asset_key: str): @asset_check(asset=asset_key) - def check1(context): + def check1(context) -> AssetCheckResult: return AssetCheckResult(passed=True) return check1 @@ -419,10 +430,10 @@ def check1(context): Definitions(asset_checks=[make_check_for_asset("asset1"), make_check_for_asset("asset2")]) -def test_definitions_same_asset_different_name(): +def test_definitions_same_asset_different_name() -> None: def make_check(check_name: str): @asset_check(asset="asset1", name=check_name) - def _check(context): + def _check(context) -> AssetCheckResult: return AssetCheckResult(passed=True) return _check @@ -430,7 +441,7 @@ def _check(context): Definitions(asset_checks=[make_check("check1"), make_check("check2")]) -def test_resource_params(): +def test_resource_params() -> None: class MyResource(NamedTuple): value: int @@ -442,7 +453,7 @@ def check1(my_resource: ResourceParam[MyResource]): execute_assets_and_checks(asset_checks=[check1], resources={"my_resource": MyResource(5)}) -def test_resource_params_with_resource_defs(): +def test_resource_params_with_resource_defs() -> None: class MyResource(NamedTuple): value: int @@ -454,13 +465,13 @@ def check1(my_resource: ResourceParam[MyResource]): execute_assets_and_checks(asset_checks=[check1]) -def test_required_resource_keys(): +def test_required_resource_keys() -> None: @asset - def my_asset(): + def my_asset() -> None: pass @asset_check(asset=my_asset, required_resource_keys={"my_resource"}) - def my_check(context): + def my_check(context) -> AssetCheckResult: assert context.resources.my_resource == "foobar" return AssetCheckResult(passed=True) @@ -469,25 +480,25 @@ def my_check(context): ) -def test_resource_definitions(): +def test_resource_definitions() -> None: @asset - def my_asset(): + def my_asset() -> None: pass class MyResource(ConfigurableResource): name: str @asset_check(asset=my_asset, resource_defs={"my_resource": MyResource(name="foobar")}) - def my_check(context: AssetCheckExecutionContext): + def my_check(context: AssetCheckExecutionContext) -> AssetCheckResult: assert context.resources.my_resource.name == "foobar" return AssetCheckResult(passed=True) execute_assets_and_checks(assets=[my_asset], asset_checks=[my_check]) -def test_resource_definitions_satisfy_required_keys(): +def test_resource_definitions_satisfy_required_keys() -> None: @asset - def my_asset(): + def my_asset() -> None: pass class MyResource(ConfigurableResource): @@ -505,19 +516,19 @@ def my_check(context: AssetCheckExecutionContext): execute_assets_and_checks(assets=[my_asset], asset_checks=[my_check]) -def test_job_only_execute_checks_downstream_of_selected_assets(): +def test_job_only_execute_checks_downstream_of_selected_assets() -> None: @asset - def asset1(): ... + def asset1() -> None: ... @asset - def asset2(): ... + def asset2() -> None: ... @asset_check(asset=asset1) - def check1(): + def check1() -> AssetCheckResult: return AssetCheckResult(passed=False) @asset_check(asset=asset2) - def check2(): + def check2() -> AssetCheckResult: return AssetCheckResult(passed=False) defs = Definitions( @@ -536,29 +547,29 @@ def check2(): assert check_eval.check_name == "check1" -def test_asset_not_provided(): +def test_asset_not_provided() -> None: with pytest.raises(Exception): + # testing case that fails typechecking + @asset_check(description="desc") # type: ignore + def check1() -> AssetCheckResult: ... - @asset_check(description="desc") - def check1(): ... - -def test_managed_input(): +def test_managed_input() -> None: @asset def asset1() -> int: return 4 @asset_check(asset=asset1, description="desc") - def check1(asset1): + def check1(asset1) -> AssetCheckResult: assert asset1 == 4 return AssetCheckResult(passed=True) class MyIOManager(IOManager): - def load_input(self, context): + def load_input(self, context) -> int: assert context.asset_key == asset1.key return 4 - def handle_output(self, context, obj): ... + def handle_output(self, context, obj) -> None: ... spec = check1.get_spec_for_check_key(AssetCheckKey(AssetKey(["asset1"]), "check1")) assert spec.name == "check1" @@ -576,7 +587,7 @@ def handle_output(self, context, obj): ... ).success -def test_multiple_managed_inputs(): +def test_multiple_managed_inputs() -> None: with pytest.raises( DagsterInvalidDefinitionError, match=re.escape( @@ -587,10 +598,10 @@ def test_multiple_managed_inputs(): ): @asset_check(asset="asset1", description="desc") - def check1(asset1, asset2): ... + def check1(asset1, asset2) -> AssetCheckResult: ... -def test_managed_input_with_context(): +def test_managed_input_with_context() -> None: @asset def asset1() -> int: return 4 @@ -608,12 +619,12 @@ def check1(context: AssetCheckExecutionContext, asset1): execute_assets_and_checks(assets=[asset1], asset_checks=[check1]) -def test_doesnt_invoke_io_manager(): +def test_doesnt_invoke_io_manager() -> None: class DummyIOManager(IOManager): - def handle_output(self, context, obj): + def handle_output(self, context, obj) -> None: assert False - def load_input(self, context): + def load_input(self, context) -> None: assert False @asset_check(asset="asset1", description="desc") @@ -644,12 +655,12 @@ def assert_check_eval( assert check_eval.target_materialization_data.timestamp == materialization_record.timestamp -def test_multi_asset_check(): +def test_multi_asset_check() -> None: @asset - def asset1(): ... + def asset1() -> None: ... @asset - def asset2(): ... + def asset2() -> None: ... @multi_asset_check( specs=[ @@ -658,7 +669,7 @@ def asset2(): ... AssetCheckSpec("check3", asset=asset2), ] ) - def checks(context): + def checks(context) -> Iterable[AssetCheckResult]: asset1_records = instance.fetch_materializations(asset1.key, limit=1000).records asset2_records = instance.fetch_materializations(asset2.key, limit=1000).records materialization_records = [*asset1_records, *asset2_records] @@ -718,12 +729,12 @@ def checks(context): ) -def test_multi_asset_check_subset(): +def test_multi_asset_check_subset() -> None: @asset - def asset1(): ... + def asset1() -> None: ... @asset - def asset2(): ... + def asset2() -> None: ... asset1_check1 = AssetCheckSpec("check1", asset=asset1) asset2_check3 = AssetCheckSpec("check3", asset=asset2) @@ -732,7 +743,7 @@ def asset2(): ... specs=[asset1_check1, AssetCheckSpec("check2", asset=asset1), asset2_check3], can_subset=True, ) - def checks(context): + def checks(context) -> Iterable[AssetCheckResult]: assert context.selected_asset_check_keys == {asset1_check1.key, asset2_check3.key} yield AssetCheckResult(passed=True, asset_key="asset1", check_name="check1") @@ -783,13 +794,13 @@ def checks(context): ) -def test_target_materialization_observable_source_asset(): +def test_target_materialization_observable_source_asset() -> None: @observable_source_asset - def asset1(): + def asset1() -> ObserveResult: return ObserveResult() @asset_check(asset=asset1) - def check1(): + def check1() -> AssetCheckResult: return AssetCheckResult(passed=True) result = execute_assets_and_checks(assets=[asset1], asset_checks=[check1]) @@ -804,9 +815,9 @@ def check1(): assert check_eval.target_materialization_data is None -def test_direct_invocation(): +def test_direct_invocation() -> None: @asset_check(asset="asset1") - def check1(): + def check1() -> AssetCheckResult: return AssetCheckResult(passed=True) result = check1() @@ -814,9 +825,9 @@ def check1(): assert result.passed -def test_direct_invocation_with_context(): +def test_direct_invocation_with_context() -> None: @asset_check(asset="asset1") - def check1(context): + def check1(context) -> AssetCheckResult: return AssetCheckResult(passed=True) result = check1(build_op_context()) @@ -824,7 +835,7 @@ def check1(context): assert result.passed -def test_multi_check_direct_invocation(): +def test_multi_check_direct_invocation() -> None: @multi_asset_check( specs=[ AssetCheckSpec("check1", asset="asset1"), @@ -832,12 +843,14 @@ def test_multi_check_direct_invocation(): AssetCheckSpec("check3", asset="asset2"), ] ) - def checks(): + def checks() -> Iterable[AssetCheckResult]: yield AssetCheckResult(passed=True, asset_key="asset1", check_name="check1") yield AssetCheckResult(passed=False, asset_key="asset1", check_name="check2") yield AssetCheckResult(passed=True, asset_key="asset2") - results = list(checks()) + checks_ret_obj = checks() + assert isinstance(checks_ret_obj, Iterable) + results = check.is_list(list(checks_ret_obj), of_type=AssetCheckResult) assert len(results) == 3 assert all(isinstance(result, AssetCheckResult) for result in results) assert results[0].passed