diff --git a/python_modules/dagster/dagster/_core/execution/context/dual_state_context.py b/python_modules/dagster/dagster/_core/execution/context/dual_state_context.py index 3fe5c86ea8c42..e52c41d497f20 100644 --- a/python_modules/dagster/dagster/_core/execution/context/dual_state_context.py +++ b/python_modules/dagster/dagster/_core/execution/context/dual_state_context.py @@ -4,6 +4,7 @@ from dagster._core.definitions.scoped_resources_builder import IContainsGenerator, Resources from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.build_resources import build_resources, wrap_resources_for_execution +from dagster._core.instance import DagsterInstance class DualStateContextResourcesContainer: @@ -50,3 +51,17 @@ def get_resources(self, fn_name_for_err_msg: str) -> Resources: f"open a context manager: `with {fn_name_for_err_msg}(...) as context:`" ) return self._resources + + +class DualStateInstanceContainer: + def __init__(self, instance: Optional[DagsterInstance]): + from dagster._core.execution.api import ephemeral_instance_if_missing + + self._exit_stack = ExitStack() + self.instance = self._exit_stack.enter_context(ephemeral_instance_if_missing(instance)) + + def call_on_exit(self) -> None: + self._exit_stack.close() + + def call_on_del(self) -> None: + self._exit_stack.close() diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 19d9613d2f9b8..43a6d2b9e99af 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -1,4 +1,4 @@ -from contextlib import ExitStack, contextmanager +from contextlib import contextmanager from typing import ( AbstractSet, Any, @@ -57,7 +57,7 @@ from dagster._utils.merger import merge_dicts from .compute import OpExecutionContext -from .dual_state_context import DualStateContextResourcesContainer +from .dual_state_context import DualStateContextResourcesContainer, DualStateInstanceContainer from .system import StepExecutionContext, TypeCheckContext @@ -83,16 +83,12 @@ def __init__( mapping_key: Optional[str], assets_def: Optional[AssetsDefinition], ): - from dagster._core.execution.api import ephemeral_instance_if_missing from dagster._core.execution.context_creation_job import initialize_console_manager self._op_config = op_config self._mapping_key = mapping_key - self._exit_stack = ExitStack() - - # Construct ephemeral instance if missing - self._instance = self._exit_stack.enter_context(ephemeral_instance_if_missing(instance)) + self._instance_container = DualStateInstanceContainer(instance) self._resources_config = resources_config self._resources_container = DualStateContextResourcesContainer( @@ -118,11 +114,11 @@ def __enter__(self) -> "UnboundOpExecutionContext": def __exit__(self, *exc) -> None: self._resources_container.call_on_exit() - self._exit_stack.close() + self._instance_container.call_on_exit() def __del__(self) -> None: self._resources_container.call_on_del() - self._exit_stack.close() + self._instance_container.call_on_del() @property def op_config(self) -> Any: @@ -146,7 +142,7 @@ def dagster_run(self) -> DagsterRun: @property def instance(self) -> DagsterInstance: - return self._instance + return self._instance_container.instance @property def pdb(self) -> ForkedPdb: