Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DualStateContextResourcesContainer in OutputContext #15998

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 12 additions & 39 deletions python_modules/dagster/dagster/_core/execution/context/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import (
TYPE_CHECKING,
Any,
ContextManager,
Iterator,
List,
Mapping,
Expand All @@ -29,6 +28,7 @@
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._core.errors import DagsterInvalidMetadata, DagsterInvariantViolationError
from dagster._core.execution.context.dual_state_context import DualStateContextResourcesContainer
from dagster._core.execution.plan.utils import build_resources_for_manager

if TYPE_CHECKING:
Expand Down Expand Up @@ -79,10 +79,8 @@ def handle_output(self, context: OutputContext, obj):
_step_context: Optional["StepExecutionContext"]
_asset_info: Optional[AssetOutputInfo]
_warn_on_step_context_use: bool
_resources: Optional["Resources"]
_resources_cm: Optional[ContextManager["Resources"]]
_resources_contain_cm: Optional[bool]
_cm_scope_entered: Optional[bool]
_explicit_resources_provided: bool
_resources_container: DualStateContextResourcesContainer
_events: List["DagsterEvent"]
_user_events: List[Union[AssetMaterialization, AssetObservation]]

Expand All @@ -106,9 +104,6 @@ def __init__(
warn_on_step_context_use: bool = False,
partition_key: Optional[str] = None,
):
from dagster._core.definitions.resource_definition import IContainsGenerator, Resources
from dagster._core.execution.build_resources import build_resources

self._step_key = step_key
self._name = name
self._job_name = job_name
Expand All @@ -129,38 +124,22 @@ def __init__(
else:
self._partition_key = partition_key

if isinstance(resources, Resources):
self._resources_cm = None
self._resources = resources
else:
self._resources_cm = build_resources(
check.opt_mapping_param(resources, "resources", key_type=str)
)
self._resources = self._resources_cm.__enter__()
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)
self._cm_scope_entered = False
self._explicit_resources_provided = bool(resources)
self._resources_container = DualStateContextResourcesContainer(resources)

self._events = []
self._user_events = []
self._user_generated_metadata = {}

def __enter__(self):
if self._resources_cm:
self._cm_scope_entered = True
def __enter__(self) -> "OutputContext":
self._resources_container.call_on_enter()
return self

def __exit__(self, *exc):
if self._resources_cm:
self._resources_cm.__exit__(*exc)
def __exit__(self, *exc) -> None:
self._resources_container.call_on_exit()

def __del__(self):
if (
hasattr(self, "_resources_cm")
and self._resources_cm
and self._resources_contain_cm
and not self._cm_scope_entered
):
self._resources_cm.__exit__(None, None, None)
self._resources_container.call_on_del()

@public
@property
Expand Down Expand Up @@ -284,19 +263,13 @@ def resources(self) -> Any:
"""The resources required by the output manager, specified by the `required_resource_keys`
parameter.
"""
if self._resources is None:
if not self._explicit_resources_provided:
raise DagsterInvariantViolationError(
"Attempting to access resources, "
"but it was not provided when constructing the OutputContext"
)

if self._resources_cm and self._resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
"resources outside of context manager scope. You can use the following syntax to "
"open a context manager: `with build_output_context(...) as context:`"
)
return self._resources
return self._resources_container.get_resources("build_output_context")

@property
def asset_info(self) -> Optional[AssetOutputInfo]:
Expand Down