Skip to content

Commit

Permalink
with unbind
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 2, 2023
1 parent c9078de commit 1387482
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ def direct_invocation_result(
resource_args=resource_arg_mapping,
)

return _type_check_output_wrapper(op_def, result, bound_context)
res = _type_check_output_wrapper(op_def, result, bound_context)

bound_context.unbind()

return res


def _resolve_inputs(
Expand Down
119 changes: 117 additions & 2 deletions python_modules/dagster/dagster/_core/execution/context/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def __init__(
from dagster._core.execution.context_creation_job import initialize_console_manager

self._op_config = op_config
self._init_op_config = op_config # so we can unbind the context back to it's original state
self._mapping_key = mapping_key

self._exit_stack = ExitStack()
Expand All @@ -102,6 +103,9 @@ def __init__(
resource_config=resources_config,
)
)
self._init_resources = (
self._resources
) # so we can unbind the context back to it's original state
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)

self._log = initialize_console_manager(None)
Expand Down Expand Up @@ -155,6 +159,22 @@ def bind(
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "DirectInvocationOpExecutionContext":
return self.bind_self(
op_def=op_def,
pending_invocation=pending_invocation,
assets_def=assets_def,
config_from_args=config_from_args,
resources_from_args=resources_from_args,
)

def bind_copy(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "DirectInvocationOpExecutionContext":
from dagster._core.definitions.resource_invocation import resolve_bound_config

Expand Down Expand Up @@ -240,13 +260,107 @@ def bind(
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
bound_ctx._op_config = resolve_bound_config(config_from_args or self.op_config, op_def) # noqa: SLF001

bound_ctx._seen_outputs = {} # noqa: SLF001
bound_ctx._requires_typed_event_stream = False # noqa: SLF001
bound_ctx._typed_event_stream_error_message = None # noqa: SLF001
bound_ctx._bound = True # noqa: SLF001

return bound_ctx

def bind_self(
self,
op_def: OpDefinition,
pending_invocation: Optional[PendingNodeInvocation[OpDefinition]],
assets_def: Optional[AssetsDefinition],
config_from_args: Optional[Mapping[str, Any]],
resources_from_args: Optional[Mapping[str, Any]],
) -> "DirectInvocationOpExecutionContext":
from dagster._core.definitions.resource_invocation import resolve_bound_config

if self._bound:
raise DagsterInvalidInvocationError(
"Cannot call bind() on a DirectInvocationOpExecutionContext that has already been bound."
)

# update the bound context with properties relevant to the execution of the op
self._op_def = op_def

invocation_tags = (
pending_invocation.tags
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
self._tags = (
merge_dicts(self._op_def.tags, invocation_tags)
if invocation_tags
else self._op_def.tags
)

self._hook_defs = (
pending_invocation.hook_defs
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
invocation_alias = (
pending_invocation.given_alias
if isinstance(pending_invocation, PendingNodeInvocation)
else None
)
self._alias = invocation_alias if invocation_alias else self._op_def.name

self._assets_def = assets_def

if resources_from_args:
if self._resource_defs:
raise DagsterInvalidInvocationError(
"Cannot provide resources in both context and kwargs"
)
resource_defs = wrap_resources_for_execution(resources_from_args)
# add new resources context to the stack to be cleared on exit
self._resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance)
)
elif assets_def and assets_def.resource_defs:
for key in sorted(list(assets_def.resource_defs.keys())):
if key in self._resource_defs:
raise DagsterInvalidInvocationError(
f"Error when invoking {assets_def!s} resource '{key}' "
"provided on both the definition and invocation context. Please "
"provide on only one or the other."
)
resource_defs = wrap_resources_for_execution(
{**self._resource_defs, **assets_def.resource_defs}
)
# add new resources context to the stack to be cleared on exit
self._resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance, self._resources_config)
)
else:
self._resources = self.resources
resource_defs = self._resource_defs

_validate_resource_requirements(resource_defs, op_def)

if self.op_config and config_from_args:
raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs")
self._op_config = resolve_bound_config(config_from_args or self.op_config, op_def)

self._requires_typed_event_stream = False
self._typed_event_stream_error_message = None
self._bound = True

return self

def unbind(self):
self._op_def = None
self._alias = None
self._hook_defs = None
self._tags = {}
self._seen_outputs = {}
self._resources = self._init_resources
self._op_config = self._init_op_config

self._bound = False

@property
def op_config(self) -> Any:
return self._op_config
Expand All @@ -257,7 +371,8 @@ def resource_keys(self) -> AbstractSet[str]:

@property
def resources(self) -> Resources:
# TODO - figure out with context manager stuff
if self._bound:
return self._resources
if self._resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
Expand Down

0 comments on commit 1387482

Please sign in to comment.