Skip to content

Commit

Permalink
[pythonic resources][fix] fix setup/teardown behavior w/ nested pytho…
Browse files Browse the repository at this point in the history
…nic resources
  • Loading branch information
benpankow committed Feb 15, 2024
1 parent 069b7d0 commit 51f761f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 22 deletions.
48 changes: 26 additions & 22 deletions python_modules/dagster/dagster/_config/pythonic_config/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def _is_cm_resource_cls(cls: Type["ConfigurableResourceFactory"]) -> bool:
return (
cls.yield_for_execution != ConfigurableResourceFactory.yield_for_execution
or cls.teardown_after_execution != ConfigurableResourceFactory.teardown_after_execution
or len(_get_resource_param_fields(cls)) > 0
)

@property
Expand Down Expand Up @@ -920,36 +921,39 @@ class ResourceDataWithAnnotation(NamedTuple):
annotation_metadata: List[str]


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
fields_by_resolved_field_name = {
field.alias if field.alias else key: field for key, field in model_fields(cls).items()
}
data_with_annotation: List[ResourceDataWithAnnotation] = [
# No longer exists in Pydantic 2.x, will need to be updated when we upgrade
ResourceDataWithAnnotation(
key=field_name,
value=field_value,
annotation=fields_by_resolved_field_name[field_name].annotation,
annotation_metadata=fields_by_resolved_field_name[field_name].metadata,
)
for field_name, field_value in data.items()
if field_name in fields_by_resolved_field_name
]
def _get_resource_param_fields(cls: Type[BaseModel]) -> Set[str]:
"""Returns the set of field names in a structured config class which are annotated as resource types."""
# We need to grab metadata from the annotation in order to tell if
# this key was annotated with a typing.Annotated annotation (which we use for resource/resource deps),
# since Pydantic 2.0 strips that info out and sticks any Annotated metadata in the
# metadata field
fields_by_resolved_field_name = {
field.alias if field.alias else key: field for key, field in model_fields(cls).items()
}

return {
field_name
for field_name in fields_by_resolved_field_name
if _is_annotated_as_resource_type(
fields_by_resolved_field_name[field_name].annotation,
fields_by_resolved_field_name[field_name].metadata,
)
}


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
nested_resource_field_names = _get_resource_param_fields(cls)

resources = {}
non_resources = {}
for d in data_with_annotation:
if _is_annotated_as_resource_type(d.annotation, d.annotation_metadata):
resources[d.key] = d.value
for field_name, field_value in data.items():
if field_name in nested_resource_field_names:
resources[field_name] = field_value
else:
non_resources[d.key] = d.value
non_resources[field_name] = field_value

out = SeparatedResourceParams(
resources=resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,3 +869,73 @@ def my_asset(main: MainResource):
)
assert executed["my_asset"] == "bar"
executed.clear()


def test_nested_resource_setup_teardown_inner() -> None:
log = []

class MyBoringOuterResource(ConfigurableResource):
more_interesting_inner_resource: ResourceDependency["SetupTeardownInnerResource"]

class SetupTeardownInnerResource(ConfigurableResource):
def setup_for_execution(self, context: InitResourceContext) -> None:
log.append("SetupTeardownInnerResource setup_for_execution")

def teardown_after_execution(self, context: InitResourceContext) -> None:
log.append("SetupTeardownInnerResource teardown_after_execution")

@asset
def my_asset(outer: MyBoringOuterResource) -> str:
log.append("my_asset")
return "foo"

defs = Definitions(
assets=[my_asset],
resources={
"outer": MyBoringOuterResource(
more_interesting_inner_resource=SetupTeardownInnerResource()
),
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert log == [
"SetupTeardownInnerResource setup_for_execution",
"my_asset",
"SetupTeardownInnerResource teardown_after_execution",
]


def test_nested_resource_yield_inner() -> None:
log = []

class MyBoringOuterResource(ConfigurableResource):
more_interesting_inner_resource: ResourceDependency["SetupTeardownInnerResource"]

class SetupTeardownInnerResource(ConfigurableResource):
@contextlib.contextmanager
def yield_for_execution(self, context: InitResourceContext):
log.append("SetupTeardownInnerResource yield_for_execution")
yield self
log.append("SetupTeardownInnerResource yield_for_execution done")

@asset
def my_asset(outer: MyBoringOuterResource) -> str:
log.append("my_asset")
return "foo"

defs = Definitions(
assets=[my_asset],
resources={
"outer": MyBoringOuterResource(
more_interesting_inner_resource=SetupTeardownInnerResource()
),
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert log == [
"SetupTeardownInnerResource yield_for_execution",
"my_asset",
"SetupTeardownInnerResource yield_for_execution done",
]

0 comments on commit 51f761f

Please sign in to comment.