From dc412a762df56299dbd5dc007767167abeff48b3 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 16:41:11 -0400 Subject: [PATCH] handle graph assets --- .../definitions/decorators/asset_decorator.py | 22 +------------------ .../_core/definitions/op_definition.py | 10 ++++++--- .../_core/execution/context/compute.py | 11 +++++++--- .../dagster/_core/execution/context/system.py | 4 ++++ .../execution_tests/test_context.py | 10 +++------ 5 files changed, 23 insertions(+), 34 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 16b24dada8ced..5a858d0010d90 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -21,12 +21,8 @@ from dagster._annotations import deprecated_param, experimental_param from dagster._builtins import Nothing from dagster._config import UserConfigSchema -<<<<<<< HEAD -from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations -from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep -======= from dagster._core.decorator_utils import get_function_params ->>>>>>> eed11d9bde (error on bad type annotation at def time) +from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.config import ConfigMapping from dagster._core.definitions.freshness_policy import FreshnessPolicy @@ -330,7 +326,6 @@ def __call__(self, fn: Callable) -> AssetsDefinition: from dagster._core.execution.build_resources import wrap_resources_for_execution validate_resource_annotated_function(fn) - _validate_context_type_hint(fn) asset_name = self.name or fn.__name__ asset_ins = build_asset_ins(fn, self.ins or {}, {dep.asset_key for dep in self.deps}) @@ -1315,18 +1310,3 @@ def _get_partition_mappings_from_deps( ) return partition_mappings - - -def _validate_context_type_hint(fn): - from inspect import _empty as EmptyAnnotation - - from dagster._core.decorator_utils import get_function_params, is_context_provided - from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext - - params = get_function_params(fn) - if is_context_provided(params): - if not isinstance(params[0], (AssetExecutionContext, OpExecutionContext, EmptyAnnotation)): - raise DagsterInvalidDefinitionError( - f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`" - " must be annotated with AssetExecutionContext, OpExecutionContext, or left blank." - ) diff --git a/python_modules/dagster/dagster/_core/definitions/op_definition.py b/python_modules/dagster/dagster/_core/definitions/op_definition.py index 30de8ca6b1574..e4c6f1ecb5a0f 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/op_definition.py @@ -516,12 +516,16 @@ def _validate_context_type_hint(fn): from inspect import _empty as EmptyAnnotation from dagster._core.decorator_utils import get_function_params, is_context_provided - from dagster._core.execution.context.compute import OpExecutionContext + from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext params = get_function_params(fn) if is_context_provided(params): - if not isinstance(params[0], (OpExecutionContext, EmptyAnnotation)): + if ( + params[0].annotation is not AssetExecutionContext + and params[0].annotation is not OpExecutionContext + and params[0].annotation is not EmptyAnnotation + ): raise DagsterInvalidDefinitionError( f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`" - " must be annotated with OpExecutionContext or left blank." + " must be annotated with AssetExecutionContext, OpExecutionContext, or left blank." ) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 186c6fbc69115..418b1f22f24b2 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1726,6 +1726,7 @@ def build_execution_context( """ is_sda_step = step_context.is_sda_step + is_op_in_graph_asset = step_context.is_graph_asset_op context_annotation = EmptyAnnotation compute_fn = step_context.op_def._compute_fn # noqa: SLF001 compute_fn = ( @@ -1740,9 +1741,13 @@ def build_execution_context( op_context = OpExecutionContext(step_context) if context_annotation is EmptyAnnotation: - # if no type hint has been given, default to AssetExecutionContext for sda steps and - # OpExecutionContext for non sda steps - return AssetExecutionContext(op_context) if is_sda_step else op_context + # if no type hint has been given, default to: + # * AssetExecutionContext for sda steps, not in graph-backed assets + # * OpExecutionContext for non sda steps + # * OpExecutionContext for ops in graph-backed assets + if is_op_in_graph_asset or not is_sda_step: + return op_context + return AssetExecutionContext(op_context) if context_annotation is AssetExecutionContext: return AssetExecutionContext(op_context) return op_context diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index e36f176ef9b27..80b1e42de08b7 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -864,6 +864,10 @@ def op_config(self) -> Any: op_config = self.resolved_run_config.ops.get(str(self.node_handle)) return op_config.config if op_config else None + @property + def is_graph_asset_op(self) -> bool: + return self.step.node_handle.parent is not None + @property def is_sda_step(self) -> bool: """Whether this step corresponds to a software define asset, inferred by presence of asset info on outputs. diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py index 7b9ad96d8649f..d51c350e6ea00 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_context.py @@ -121,9 +121,7 @@ def test_context_provided_to_graph_asset(): def no_annotation_op(context): assert isinstance(context, OpExecutionContext) # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check - assert not isinstance( - context, AssetExecutionContext - ) # fails, context is an AssetExecutionContext in current impl + assert not isinstance(context, AssetExecutionContext) @graph_asset def no_annotation_asset(): @@ -159,9 +157,7 @@ def test_context_provided_to_graph_multi_asset(): def no_annotation_op(context): assert isinstance(context, OpExecutionContext) # AssetExecutionContext is an instance of OpExecutionContext, so add this additional check - assert not isinstance( - context, AssetExecutionContext - ) # fails, context is an AssetExecutionContext in current impl + assert not isinstance(context, AssetExecutionContext) @graph_multi_asset( outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)} @@ -238,7 +234,7 @@ def op_annotation(context: OpExecutionContext, *args): def test_error_on_invalid_context_annotation(): with pytest.raises( DagsterInvalidDefinitionError, - match="must be annotated with OpExecutionContext or left blank", + match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank", ): @op