diff --git a/python_modules/dagster/dagster/_core/definitions/op_invocation.py b/python_modules/dagster/dagster/_core/definitions/op_invocation.py index 7b38150ad329d..3ea9385253066 100644 --- a/python_modules/dagster/dagster/_core/definitions/op_invocation.py +++ b/python_modules/dagster/dagster/_core/definitions/op_invocation.py @@ -219,7 +219,11 @@ def direct_invocation_result( resource_args=resource_arg_mapping, ) - return _type_check_output_wrapper(op_def, result, bound_context) + res = _type_check_output_wrapper(op_def, result, bound_context) + + bound_context.unbind() + + return res def _resolve_inputs( diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 44fab07575c8d..b20b7c2e800c8 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -84,6 +84,7 @@ def __init__( from dagster._core.execution.context_creation_job import initialize_console_manager self._op_config = op_config + self._init_op_config = op_config # so we can unbind the context back to it's original state self._mapping_key = mapping_key self._exit_stack = ExitStack() @@ -102,6 +103,9 @@ def __init__( resource_config=resources_config, ) ) + self._init_resources = ( + self._resources + ) # so we can unbind the context back to it's original state self._resources_contain_cm = isinstance(self._resources, IContainsGenerator) self._log = initialize_console_manager(None) @@ -155,6 +159,22 @@ def bind( assets_def: Optional[AssetsDefinition], config_from_args: Optional[Mapping[str, Any]], resources_from_args: Optional[Mapping[str, Any]], + ) -> "DirectInvocationOpExecutionContext": + return self.bind_self( + op_def=op_def, + pending_invocation=pending_invocation, + assets_def=assets_def, + config_from_args=config_from_args, + resources_from_args=resources_from_args, + ) + + def bind_copy( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], ) -> "DirectInvocationOpExecutionContext": from dagster._core.definitions.resource_invocation import resolve_bound_config @@ -240,13 +260,107 @@ def bind( raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs") bound_ctx._op_config = resolve_bound_config(config_from_args or self.op_config, op_def) # noqa: SLF001 - bound_ctx._seen_outputs = {} # noqa: SLF001 bound_ctx._requires_typed_event_stream = False # noqa: SLF001 bound_ctx._typed_event_stream_error_message = None # noqa: SLF001 bound_ctx._bound = True # noqa: SLF001 return bound_ctx + def bind_self( + self, + op_def: OpDefinition, + pending_invocation: Optional[PendingNodeInvocation[OpDefinition]], + assets_def: Optional[AssetsDefinition], + config_from_args: Optional[Mapping[str, Any]], + resources_from_args: Optional[Mapping[str, Any]], + ) -> "DirectInvocationOpExecutionContext": + from dagster._core.definitions.resource_invocation import resolve_bound_config + + if self._bound: + raise DagsterInvalidInvocationError( + "Cannot call bind() on a DirectInvocationOpExecutionContext that has already been bound." + ) + + # update the bound context with properties relevant to the execution of the op + self._op_def = op_def + + invocation_tags = ( + pending_invocation.tags + if isinstance(pending_invocation, PendingNodeInvocation) + else None + ) + self._tags = ( + merge_dicts(self._op_def.tags, invocation_tags) + if invocation_tags + else self._op_def.tags + ) + + self._hook_defs = ( + pending_invocation.hook_defs + if isinstance(pending_invocation, PendingNodeInvocation) + else None + ) + invocation_alias = ( + pending_invocation.given_alias + if isinstance(pending_invocation, PendingNodeInvocation) + else None + ) + self._alias = invocation_alias if invocation_alias else self._op_def.name + + self._assets_def = assets_def + + if resources_from_args: + if self._resource_defs: + raise DagsterInvalidInvocationError( + "Cannot provide resources in both context and kwargs" + ) + resource_defs = wrap_resources_for_execution(resources_from_args) + # add new resources context to the stack to be cleared on exit + self._resources = self._exit_stack.enter_context( + build_resources(resource_defs, self.instance) + ) + elif assets_def and assets_def.resource_defs: + for key in sorted(list(assets_def.resource_defs.keys())): + if key in self._resource_defs: + raise DagsterInvalidInvocationError( + f"Error when invoking {assets_def!s} resource '{key}' " + "provided on both the definition and invocation context. Please " + "provide on only one or the other." + ) + resource_defs = wrap_resources_for_execution( + {**self._resource_defs, **assets_def.resource_defs} + ) + # add new resources context to the stack to be cleared on exit + self._resources = self._exit_stack.enter_context( + build_resources(resource_defs, self.instance, self._resources_config) + ) + else: + self._resources = self.resources + resource_defs = self._resource_defs + + _validate_resource_requirements(resource_defs, op_def) + + if self.op_config and config_from_args: + raise DagsterInvalidInvocationError("Cannot provide config in both context and kwargs") + self._op_config = resolve_bound_config(config_from_args or self.op_config, op_def) + + self._requires_typed_event_stream = False + self._typed_event_stream_error_message = None + self._bound = True + + return self + + def unbind(self): + self._op_def = None + self._alias = None + self._hook_defs = None + self._tags = {} + self._seen_outputs = {} + self._resources = self._init_resources + self._op_config = self._init_op_config + + self._bound = False + @property def op_config(self) -> Any: return self._op_config @@ -257,7 +371,8 @@ def resource_keys(self) -> AbstractSet[str]: @property def resources(self) -> Resources: - # TODO - figure out with context manager stuff + if self._bound: + return self._resources if self._resources_contain_cm and not self._cm_scope_entered: raise DagsterInvariantViolationError( "At least one provided resource is a generator, but attempting to access "