Skip to content

Commit

Permalink
Add DualStateInstanceContainer to handle instance management in dual …
Browse files Browse the repository at this point in the history
…state contexts
  • Loading branch information
schrockn committed Aug 21, 2023
1 parent 656a8ea commit a93b8ea
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from contextlib import ExitStack
from typing import Any, Mapping
from typing import Any, Mapping, Optional

from dagster._core.definitions.scoped_resources_builder import IContainsGenerator, Resources
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import build_resources, wrap_resources_for_execution
from dagster._core.instance import DagsterInstance


class DualStateContextResourcesContainer:
Expand Down Expand Up @@ -44,3 +45,17 @@ def get_resources(self, fn_name_for_err_msg: str) -> Resources:
f"open a context manager: `with {fn_name_for_err_msg}(...) as context:`"
)
return self._resources


class DualStateInstanceContainer:
def __init__(self, instance: Optional[DagsterInstance]):
from dagster._core.execution.api import ephemeral_instance_if_missing

self._exit_stack = ExitStack()
self.instance = self._exit_stack.enter_context(ephemeral_instance_if_missing(instance))

def call_on_exit(self) -> None:
self._exit_stack.close()

def call_on_del(self) -> None:
self._exit_stack.close()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from contextlib import ExitStack, contextmanager
from contextlib import contextmanager
from typing import (
AbstractSet,
Any,
Expand Down Expand Up @@ -57,7 +57,7 @@
from dagster._utils.merger import merge_dicts

from .compute import OpExecutionContext
from .dual_state_context import DualStateContextResourcesContainer
from .dual_state_context import DualStateContextResourcesContainer, DualStateInstanceContainer
from .system import StepExecutionContext, TypeCheckContext


Expand All @@ -83,16 +83,12 @@ def __init__(
mapping_key: Optional[str],
assets_def: Optional[AssetsDefinition],
):
from dagster._core.execution.api import ephemeral_instance_if_missing
from dagster._core.execution.context_creation_job import initialize_console_manager

self._op_config = op_config
self._mapping_key = mapping_key

self._exit_stack = ExitStack()

# Construct ephemeral instance if missing
self._instance = self._exit_stack.enter_context(ephemeral_instance_if_missing(instance))
self._instance_container = DualStateInstanceContainer(instance)

self._resources_config = resources_config
self._resources_container = DualStateContextResourcesContainer(resources_dict)
Expand All @@ -116,11 +112,11 @@ def __enter__(self) -> "UnboundOpExecutionContext":

def __exit__(self, *exc) -> None:
self._resources_container.call_on_exit()
self._exit_stack.close()
self._instance_container.call_on_exit()

def __del__(self) -> None:
self._resources_container.call_on_del()
self._exit_stack.close()
self._instance_container.call_on_del()

@property
def op_config(self) -> Any:
Expand All @@ -144,7 +140,7 @@ def dagster_run(self) -> DagsterRun:

@property
def instance(self) -> DagsterInstance:
return self._instance
return self._instance_container.instance

@property
def pdb(self) -> ForkedPdb:
Expand Down

0 comments on commit a93b8ea

Please sign in to comment.