Skip to content

Commit

Permalink
make asset execution context a subclass of op execution context
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 25, 2023
1 parent 0c2765e commit c355260
Showing 1 changed file with 32 additions and 10 deletions.
42 changes: 32 additions & 10 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC, abstractmethod
from abc import ABC, ABCMeta, abstractmethod
from typing import (
AbstractSet,
Any,
Expand All @@ -12,8 +12,6 @@
cast,
)

from typing_extensions import TypeAlias

import dagster._check as check
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
Expand Down Expand Up @@ -46,11 +44,19 @@
from dagster._core.log_manager import DagsterLogManager
from dagster._core.storage.dagster_run import DagsterRun
from dagster._utils.forked_pdb import ForkedPdb
from dagster._utils.warnings import (
deprecation_warning,
)

from .system import StepExecutionContext


class AbstractComputeExecutionContext(ABC):
# This metaclass has to exist for OpExecutionContext to have a metaclass
class AbstractComputeMetaclass(ABCMeta):
pass


class AbstractComputeExecutionContext(ABC, metaclass=AbstractComputeMetaclass):
"""Base class for op context implemented by OpExecutionContext and DagstermillExecutionContext."""

@abstractmethod
Expand Down Expand Up @@ -97,7 +103,25 @@ def op_config(self) -> Any:
"""The parsed config specific to this op."""


class OpExecutionContext(AbstractComputeExecutionContext):
class OpExecutionContextMetaClass(AbstractComputeMetaclass):
def __instancecheck__(cls, instance) -> bool:
# This makes isinstance(context, OpExecutionContext) throw a deprecation warning when
# context is an AssetExecutionContext. This metaclass can be deleted once AssetExecutionContext
# has been split into it's own class in 1.7.0
if type(instance) is AssetExecutionContext:
deprecation_warning(
subject="AssetExecutionContext",
additional_warn_text=(
"Starting in version 1.7.0 AssetExecutionContext will no longer be a subclass"
" of OpExecutionContext."
),
breaking_version="1.7.0",
stacklevel=1,
)
return super().__instancecheck__(instance)


class OpExecutionContext(AbstractComputeExecutionContext, metaclass=OpExecutionContextMetaClass):
"""The ``context`` object that can be made available as the first argument to the function
used for computing an op or asset.
Expand Down Expand Up @@ -1236,8 +1260,6 @@ def set_requires_typed_event_stream(self, *, error_message: Optional[str] = None
self._step_execution_context.set_requires_typed_event_stream(error_message=error_message)


# actually forking the object type for assets is tricky for users in the cases of:
# * manually constructing ops to make AssetsDefinitions
# * having ops in a graph that form a graph backed asset
# so we have a single type that users can call by their preferred name where appropriate
AssetExecutionContext: TypeAlias = OpExecutionContext
class AssetExecutionContext(OpExecutionContext):
def __init__(self, step_execution_context: StepExecutionContext):
super().__init__(step_execution_context=step_execution_context)

0 comments on commit c355260

Please sign in to comment.