From 1d157c64028738adcd83018166a740b290e758fd Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Sep 2023 10:13:20 -0400 Subject: [PATCH] wip add testing --- .../definitions/decorators/op_decorator.py | 5 + .../_core/definitions/op_definition.py | 28 ++- .../execution_tests/test_context.py | 238 +++++++++++++++++- 3 files changed, 269 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py index 5cf4c94be4ef1..085ac668db2d1 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/op_decorator.py @@ -284,6 +284,11 @@ def name(self): def has_context_arg(self) -> bool: return is_context_provided(get_function_params(self.decorated_fn)) + def get_context_arg(self) -> Parameter: + if self.has_context_arg(): + return get_function_params(self.decorated_fn)[0] + check.failed("Requested context arg on function that does not have one") + @lru_cache(maxsize=1) def _get_function_params(self) -> Sequence[Parameter]: return get_function_params(self.decorated_fn) diff --git a/python_modules/dagster/dagster/_core/definitions/op_definition.py b/python_modules/dagster/dagster/_core/definitions/op_definition.py index fb4c1c3dca5da..c75792f6c1d10 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/op_definition.py @@ -30,7 +30,11 @@ OutputManagerRequirement, ResourceRequirement, ) -from dagster._core.errors import DagsterInvalidInvocationError, DagsterInvariantViolationError +from dagster._core.errors import ( + DagsterInvalidDefinitionError, + DagsterInvalidInvocationError, + DagsterInvariantViolationError, +) from dagster._core.types.dagster_type import DagsterType, DagsterTypeKind from dagster._utils import IHasInternalInit from dagster._utils.warnings import normalize_renamed_param @@ -143,9 +147,11 @@ def __init__( exclude_nothing=True, ) self._compute_fn = compute_fn + _validate_context_type_hint(self._compute_fn.decorated_fn) else: resolved_input_defs = input_defs self._compute_fn = check.callable_param(compute_fn, "compute_fn") + _validate_context_type_hint(self._compute_fn) code_version = normalize_renamed_param( code_version, @@ -504,3 +510,23 @@ def _resolve_output_defs_from_outs( ) return output_defs + + +def _validate_context_type_hint(fn): + from inspect import _empty as EmptyAnnotation + + from dagster._core.decorator_utils import get_function_params + from dagster._core.definitions.decorators.op_decorator import is_context_provided + from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext + + params = get_function_params(fn) + if is_context_provided(params): + if ( + params[0].annotation is not AssetExecutionContext + and params[0].annotation is not OpExecutionContext + and params[0].annotation is not EmptyAnnotation + ): + raise DagsterInvalidDefinitionError( + f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`" + " must be annotated with AssetExecutionContext, OpExecutionContext, or left blank." + ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index bd4d523dfa6d5..674314da0ff72 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -1,7 +1,18 @@ import dagster._check as check -from dagster import OpExecutionContext, job, op +import pytest +from dagster import OpExecutionContext, asset, job, materialize, op +from dagster._core.definitions.asset_out import AssetOut +from dagster._core.definitions.decorators.asset_decorator import ( + graph_asset, + graph_multi_asset, + multi_asset, +) +from dagster._core.definitions.events import Output +from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.op_definition import OpDefinition +from dagster._core.errors import DagsterInvalidDefinitionError +from dagster._core.execution.context.compute import AssetExecutionContext from dagster._core.storage.dagster_run import DagsterRun @@ -20,3 +31,228 @@ def foo(): ctx_op() assert foo.execute_in_process().success + + +def test_context_provided_to_asset(): + @asset + def no_annotation(context): + assert isinstance(context, AssetExecutionContext) + + materialize([no_annotation]) + + @asset + def asset_annotation(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + materialize([asset_annotation]) + + @asset + def op_annotation(context: OpExecutionContext): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + materialize([op_annotation]) + + +def test_context_provided_to_op(): + @op + def no_annotation(context): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + @job + def no_annotation_job(): + no_annotation() + + assert no_annotation_job.execute_in_process().success + + @op + def asset_annotation(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + @job + def asset_annotation_job(): + asset_annotation() + + with pytest.raises( + DagsterInvalidDefinitionError, + match="Cannot annotate @op `context` parameter with type AssetExecutionContext", + ): + asset_annotation_job.execute_in_process() + + @op + def op_annotation(context: OpExecutionContext): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + @job + def op_annotation_job(): + op_annotation() + + assert op_annotation_job.execute_in_process().success + + +def test_context_provided_to_multi_asset(): + @multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}) + def no_annotation(context): + assert isinstance(context, AssetExecutionContext) + return None, None + + materialize([no_annotation]) + + @multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}) + def asset_annotation(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + return None, None + + materialize([asset_annotation]) + + @multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}) + def op_annotation(context: OpExecutionContext): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + return None, None + + materialize([op_annotation]) + + +def test_context_provided_to_graph_asset(): + @op + def no_annotation_op(context): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + @graph_asset + def no_annotation_asset(): + return no_annotation_op() + + materialize([no_annotation_asset]) + + @op + def asset_annotation_op(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + @graph_asset + def asset_annotation_asset(): + return asset_annotation_op() + + materialize([asset_annotation_asset]) + + @op + def op_annotation_op(context: OpExecutionContext): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + @graph_asset + def op_annotation_asset(): + return op_annotation_op() + + materialize([op_annotation_asset]) + + +def test_context_provided_to_graph_multi_asset(): + @op + def no_annotation_op(context): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + @graph_multi_asset( + outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)} + ) + def no_annotation_asset(): + return no_annotation_op(), no_annotation_op() + + materialize([no_annotation_asset]) + + @op + def asset_annotation_op(context: AssetExecutionContext): + assert isinstance(context, AssetExecutionContext) + + @graph_multi_asset( + outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)} + ) + def asset_annotation_asset(): + return asset_annotation_op(), asset_annotation_op() + + materialize([asset_annotation_asset]) + + @op + def op_annotation_op(context: OpExecutionContext): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + + @graph_multi_asset( + outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)} + ) + def op_annotation_asset(): + return op_annotation_op(), op_annotation_op() + + materialize([op_annotation_asset]) + + +def test_context_provided_to_plain_python(): + # tests a job created using Definitions classes, not decorators + + def no_annotation(context, *args): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + yield Output(1) + + no_annotation_op = OpDefinition(compute_fn=no_annotation, name="no_annotation_op") + no_annotation_graph = GraphDefinition(name="no_annotation_graph", node_defs=[no_annotation_op]) + + no_annotation_graph.to_job(name="no_annotation_job").execute_in_process() + + def asset_annotation(context: AssetExecutionContext, *args): + assert False, "Test should error during context creation" + + asset_annotation_op = OpDefinition(compute_fn=asset_annotation, name="asset_annotation_op") + asset_annotation_graph = GraphDefinition( + name="asset_annotation_graph", node_defs=[asset_annotation_op] + ) + + with pytest.raises( + DagsterInvalidDefinitionError, + match="Cannot annotate @op `context` parameter with type AssetExecutionContext", + ): + asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process() + + def op_annotation(context: OpExecutionContext, *args): + assert isinstance(context, OpExecutionContext) + # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check + assert not isinstance(context, AssetExecutionContext) + yield Output(1) + + op_annotation_op = OpDefinition(compute_fn=op_annotation, name="op_annotation_op") + op_annotation_graph = GraphDefinition(name="op_annotation_graph", node_defs=[op_annotation_op]) + + op_annotation_graph.to_job(name="op_annotation_job").execute_in_process() + + +def test_error_on_invalid_context_annotation(): + with pytest.raises( + DagsterInvalidDefinitionError, + match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank", + ): + + @op + def the_op(context: int): + pass + + with pytest.raises( + DagsterInvalidDefinitionError, + match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank", + ): + + @asset + def the_asset(context: int): + pass