diff --git a/python_modules/dagster/dagster/_config/pythonic_config/resource.py b/python_modules/dagster/dagster/_config/pythonic_config/resource.py index 5887989f6071f..422d03e9841db 100644 --- a/python_modules/dagster/dagster/_config/pythonic_config/resource.py +++ b/python_modules/dagster/dagster/_config/pythonic_config/resource.py @@ -399,6 +399,10 @@ def _is_cm_resource_cls(cls: Type["ConfigurableResourceFactory"]) -> bool: return ( cls.yield_for_execution != ConfigurableResourceFactory.yield_for_execution or cls.teardown_after_execution != ConfigurableResourceFactory.teardown_after_execution + # We assume that any resource which has nested resources needs to be treated as a + # context manager resource, since its nested resources may be context managers + # and need setup and teardown logic + or len(_get_resource_param_fields(cls)) > 0 ) @property @@ -920,36 +924,39 @@ class ResourceDataWithAnnotation(NamedTuple): annotation_metadata: List[str] -def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams: - """Separates out the key/value inputs of fields in a structured config Resource class which - are marked as resources (ie, using ResourceDependency) from those which are not. - """ - fields_by_resolved_field_name = { - field.alias if field.alias else key: field for key, field in model_fields(cls).items() - } - data_with_annotation: List[ResourceDataWithAnnotation] = [ - # No longer exists in Pydantic 2.x, will need to be updated when we upgrade - ResourceDataWithAnnotation( - key=field_name, - value=field_value, - annotation=fields_by_resolved_field_name[field_name].annotation, - annotation_metadata=fields_by_resolved_field_name[field_name].metadata, - ) - for field_name, field_value in data.items() - if field_name in fields_by_resolved_field_name - ] +def _get_resource_param_fields(cls: Type[BaseModel]) -> Set[str]: + """Returns the set of field names in a structured config class which are annotated as resource types.""" # We need to grab metadata from the annotation in order to tell if # this key was annotated with a typing.Annotated annotation (which we use for resource/resource deps), # since Pydantic 2.0 strips that info out and sticks any Annotated metadata in the # metadata field + fields_by_resolved_field_name = { + field.alias if field.alias else key: field for key, field in model_fields(cls).items() + } + + return { + field_name + for field_name in fields_by_resolved_field_name + if _is_annotated_as_resource_type( + fields_by_resolved_field_name[field_name].annotation, + fields_by_resolved_field_name[field_name].metadata, + ) + } + + +def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams: + """Separates out the key/value inputs of fields in a structured config Resource class which + are marked as resources (ie, using ResourceDependency) from those which are not. + """ + nested_resource_field_names = _get_resource_param_fields(cls) resources = {} non_resources = {} - for d in data_with_annotation: - if _is_annotated_as_resource_type(d.annotation, d.annotation_metadata): - resources[d.key] = d.value + for field_name, field_value in data.items(): + if field_name in nested_resource_field_names: + resources[field_name] = field_value else: - non_resources[d.key] = d.value + non_resources[field_name] = field_value out = SeparatedResourceParams( resources=resources, diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py index 4d4883bd258d4..9dd17e43fd3ff 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_nesting.py @@ -869,3 +869,73 @@ def my_asset(main: MainResource): ) assert executed["my_asset"] == "bar" executed.clear() + + +def test_nested_resource_setup_teardown_inner() -> None: + log = [] + + class MyBoringOuterResource(ConfigurableResource): + more_interesting_inner_resource: ResourceDependency["SetupTeardownInnerResource"] + + class SetupTeardownInnerResource(ConfigurableResource): + def setup_for_execution(self, context: InitResourceContext) -> None: + log.append("SetupTeardownInnerResource setup_for_execution") + + def teardown_after_execution(self, context: InitResourceContext) -> None: + log.append("SetupTeardownInnerResource teardown_after_execution") + + @asset + def my_asset(outer: MyBoringOuterResource) -> str: + log.append("my_asset") + return "foo" + + defs = Definitions( + assets=[my_asset], + resources={ + "outer": MyBoringOuterResource( + more_interesting_inner_resource=SetupTeardownInnerResource() + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert log == [ + "SetupTeardownInnerResource setup_for_execution", + "my_asset", + "SetupTeardownInnerResource teardown_after_execution", + ] + + +def test_nested_resource_yield_inner() -> None: + log = [] + + class MyBoringOuterResource(ConfigurableResource): + more_interesting_inner_resource: ResourceDependency["SetupTeardownInnerResource"] + + class SetupTeardownInnerResource(ConfigurableResource): + @contextlib.contextmanager + def yield_for_execution(self, context: InitResourceContext): + log.append("SetupTeardownInnerResource yield_for_execution") + yield self + log.append("SetupTeardownInnerResource yield_for_execution done") + + @asset + def my_asset(outer: MyBoringOuterResource) -> str: + log.append("my_asset") + return "foo" + + defs = Definitions( + assets=[my_asset], + resources={ + "outer": MyBoringOuterResource( + more_interesting_inner_resource=SetupTeardownInnerResource() + ), + }, + ) + + assert defs.get_implicit_global_asset_job_def().execute_in_process().success + assert log == [ + "SetupTeardownInnerResource yield_for_execution", + "my_asset", + "SetupTeardownInnerResource yield_for_execution done", + ]