diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 77ba9ef72825e..411d2417c5cf7 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC, ABCMeta, abstractmethod from typing import ( AbstractSet, Any, @@ -12,8 +12,6 @@ cast, ) -from typing_extensions import TypeAlias - import dagster._check as check from dagster._annotations import deprecated, experimental, public from dagster._core.definitions.asset_check_spec import AssetCheckSpec @@ -46,11 +44,19 @@ from dagster._core.log_manager import DagsterLogManager from dagster._core.storage.dagster_run import DagsterRun from dagster._utils.forked_pdb import ForkedPdb +from dagster._utils.warnings import ( + deprecation_warning, +) from .system import StepExecutionContext -class AbstractComputeExecutionContext(ABC): +# This metaclass has to exist for OpExecutionContext to have a metaclass +class AbstractComputeMetaclass(ABCMeta): + pass + + +class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass): """Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext.""" @abstractmethod @@ -97,7 +103,25 @@ def op_config(self) -> Any: """The parsed config specific to this op.""" -class OpExecutionContext(AbstractComputeExecutionContext): +class OpExecutionContextMetaClass(AbstractComputeMetaclass): + def __instancecheck__(cls, instance) -> bool: + # This makes isinstance(context, OpExecutionContext) throw a deprecation warning when + # context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext + # has been split into it's own class in 1.7.0 + if type(instance) is AssetExecutionContext: + deprecation_warning( + subject="AssetExecutionContext", + additional_warn_text=( + "Starting in version 1.7.0 AssetExecutionContext will no longer be a subclass" + " of OpExecutionContext." + ), + breaking_version="1.7.0", + stacklevel=1, + ) + return super().__instancecheck__(instance) + + +class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass): """The ``context`` object that can be made available as the first argument to the function used for computing an op or asset. @@ -1236,8 +1260,6 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None self._step_execution_context.set_requires_typed_event_stream(error_message=error_message) -# actually forking the object type for assets is tricky for users in the cases of: -# * manually constructing ops to make AssetsDefinitions -# * having ops in a graph that form a graph backed asset -# so we have a single type that users can call by their preferred name where appropriate -AssetExecutionContext: TypeAlias = OpExecutionContext +class AssetExecutionContext(OpExecutionContext): + def __init__(self, step_execution_context: StepExecutionContext): + super().__init__(step_execution_context=step_execution_context)