From 36a87baa762b8624c11e303e3683b3763ae0f32a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 12 Oct 2023 15:34:08 -0400 Subject: [PATCH] figure out if graph asset correctly --- .../_core/execution/context/compute.py | 9 ++- .../dagster/_core/execution/context/system.py | 8 +++ .../execution_tests/test_context.py | 66 ++++++++++++++++++- 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 1eea4b72821ee..edaccaa2f041c 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1330,7 +1330,7 @@ def build_execution_context( op None OpExecutionContext """ is_sda_step = step_context.is_sda_step - is_op_in_graph_asset = is_sda_step and step_context.is_op_in_graph + is_op_in_graph_asset = step_context.is_in_graph_asset is_asset_check = step_context.is_asset_check_step context_annotation = EmptyAnnotation compute_fn = step_context.op_def._compute_fn # noqa: SLF001 @@ -1345,7 +1345,12 @@ def build_execution_context( # It would be nice to do this check at definition time, rather than at run time, but we don't # know if the op is part of an op job or a graph-backed asset until we have the step execution context - if context_annotation is AssetExecutionContext and not is_sda_step and not is_asset_check: + if ( + context_annotation is AssetExecutionContext + and not is_sda_step + and not is_asset_check + and not is_op_in_graph_asset + ): # AssetExecutionContext requires an AssetsDefinition during init, so an op in an op job # cannot be annotated with AssetExecutionContext raise DagsterInvalidDefinitionError( diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 3ae8bf4bea8f1..08173dae897fa 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -917,6 +917,14 @@ def is_sda_step(self) -> bool: return True return False + @property + def is_in_graph_asset(self) -> bool: + return ( + self.is_op_in_graph + and self.job_def.asset_layer.assets_defs_by_node_handle.get(self.node_handle) + is not None + ) + @property def is_asset_check_step(self) -> bool: """Whether this step corresponds to an asset check.""" diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 4169bd2bbf610..b64c759697663 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -3,6 +3,7 @@ import dagster._check as check import pytest from dagster import ( + AssetCheckResult, AssetExecutionContext, AssetOut, DagsterInstance, @@ -19,6 +20,7 @@ multi_asset, op, ) +from dagster._core.definitions.asset_checks import build_asset_with_blocking_check from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.op_definition import OpDefinition from dagster._core.errors import DagsterInvalidDefinitionError @@ -172,11 +174,17 @@ def op_annotation(context: OpExecutionContext): def test_context_provided_to_graph_asset(): + @op + def layered_op(context: AssetExecutionContext, x): + assert isinstance(context, AssetExecutionContext) + return x + 1 + @op def no_annotation_op(context): assert isinstance(context, OpExecutionContext) # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check assert not isinstance(context, AssetExecutionContext) + return 1 @graph_asset def no_annotation_asset(): @@ -187,10 +195,11 @@ def no_annotation_asset(): @op def asset_annotation_op(context: AssetExecutionContext): assert isinstance(context, AssetExecutionContext) + return 1 @graph_asset def asset_annotation_asset(): - return asset_annotation_op() + return layered_op(asset_annotation_op()) materialize([asset_annotation_asset]) @@ -199,26 +208,33 @@ def op_annotation_op(context: OpExecutionContext): assert isinstance(context, OpExecutionContext) # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check assert not isinstance(context, AssetExecutionContext) + return 1 @graph_asset def op_annotation_asset(): - return op_annotation_op() + return layered_op(op_annotation_op()) materialize([op_annotation_asset]) def test_context_provided_to_graph_multi_asset(): + @op + def layered_op(context: AssetExecutionContext, x): + assert isinstance(context, AssetExecutionContext) + return x + 1 + @op def no_annotation_op(context): assert isinstance(context, OpExecutionContext) # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check assert not isinstance(context, AssetExecutionContext) + return 1 @graph_multi_asset( outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)} ) def no_annotation_asset(): - return no_annotation_op(), no_annotation_op() + return layered_op(no_annotation_op()), layered_op(no_annotation_op()) materialize([no_annotation_asset]) @@ -322,6 +338,50 @@ def op_annotation(context: OpExecutionContext): execute_assets_and_checks(assets=[to_check], asset_checks=[op_annotation]) +def test_context_provided_to_blocking_asset_check(): + instance = DagsterInstance.ephemeral() + + def execute_assets_and_checks(assets=None, asset_checks=None, raise_on_error: bool = True): + defs = Definitions(assets=assets, asset_checks=asset_checks) + job_def = defs.get_implicit_global_asset_job_def() + return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance) + + @asset + def to_check(): + return 1 + + # @asset_check(asset=to_check) + # def no_annotation(context): + # assert isinstance(context, AssetExecutionContext) + # return AssetCheckResult(passed=True, check_name="no_annotation") + # no_annotation_blocking_asset = build_asset_with_blocking_check( + # asset_def=to_check, checks=[no_annotation] + # ) + # execute_assets_and_checks(assets=[no_annotation_blocking_asset]) + + @asset_check(asset=to_check) + def asset_annotation(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + return AssetCheckResult(passed=True, check_name="asset_annotation") + + asset_annotation_blocking_asset = build_asset_with_blocking_check( + asset_def=to_check, checks=[asset_annotation] + ) + execute_assets_and_checks(assets=[asset_annotation_blocking_asset]) + + @asset_check(asset=to_check) + def op_annotation(context: OpExecutionContext): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + return AssetCheckResult(passed=True, check_name="op_annotation") + + op_annotation_blocking_asset = build_asset_with_blocking_check( + asset_def=to_check, checks=[op_annotation] + ) + execute_assets_and_checks(assets=[op_annotation_blocking_asset]) + + def test_error_on_invalid_context_annotation(): with pytest.raises( DagsterInvalidDefinitionError,