Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pythonic resources][fix] fix setup/teardown behavior w/ nested pythonic resources #19852

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions python_modules/dagster/dagster/_config/pythonic_config/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ def _is_cm_resource_cls(cls: Type["ConfigurableResourceFactory"]) -> bool:
return (
cls.yield_for_execution != ConfigurableResourceFactory.yield_for_execution
or cls.teardown_after_execution != ConfigurableResourceFactory.teardown_after_execution
# We assume that any resource which has nested resources needs to be treated as a
# context manager resource, since its nested resources may be context managers
# and need setup and teardown logic
or len(_get_resource_param_fields(cls)) > 0
Copy link
Member

@alangenfeld alangenfeld Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this equate to "if there are any resources as params, assume they are context managers"? Is that the case because they always are in the base class?

edit: not sure my question actually makes sense but i think it points towards putting a comment here explaining whats going on

Copy link
Member Author

@benpankow benpankow Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It basically equates to "if this resource has any other nested resources, let's assume that they're context managers, and treat this resource as a context manager"

This boolean affects a few things:

  • The underlying resource function for the ResourceDefinition created from this ConfigurableResource - it's a CM function if True, not a CM if False (the source of the bug this PR addresses). This is largely opaque to the user, so biasing towards True is low risk.

  • A few utility methods, like from_resource_context - this method is typically used for migrations from function-style resources which construct the Pythonic-style resource. These for the most part shouldn't use nesting:

    class FancyDbResource(ConfigurableResource):
        conn_string: str
    
    @resource(config_schema=FancyDbResource.to_config_schema())
    def fancy_db_resource(context: InitResourceContext) -> FancyDbResource:
        return FancyDbResource.from_resource_context(context)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it - think its worth a comment

)

@property
Expand Down Expand Up @@ -920,36 +924,39 @@ class ResourceDataWithAnnotation(NamedTuple):
annotation_metadata: List[str]


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
fields_by_resolved_field_name = {
field.alias if field.alias else key: field for key, field in model_fields(cls).items()
}
data_with_annotation: List[ResourceDataWithAnnotation] = [
# No longer exists in Pydantic 2.x, will need to be updated when we upgrade
ResourceDataWithAnnotation(
key=field_name,
value=field_value,
annotation=fields_by_resolved_field_name[field_name].annotation,
annotation_metadata=fields_by_resolved_field_name[field_name].metadata,
)
for field_name, field_value in data.items()
if field_name in fields_by_resolved_field_name
]
def _get_resource_param_fields(cls: Type[BaseModel]) -> Set[str]:
"""Returns the set of field names in a structured config class which are annotated as resource types."""
# We need to grab metadata from the annotation in order to tell if
# this key was annotated with a typing.Annotated annotation (which we use for resource/resource deps),
# since Pydantic 2.0 strips that info out and sticks any Annotated metadata in the
# metadata field
fields_by_resolved_field_name = {
field.alias if field.alias else key: field for key, field in model_fields(cls).items()
}

return {
field_name
for field_name in fields_by_resolved_field_name
if _is_annotated_as_resource_type(
fields_by_resolved_field_name[field_name].annotation,
fields_by_resolved_field_name[field_name].metadata,
)
}


def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
nested_resource_field_names = _get_resource_param_fields(cls)

resources = {}
non_resources = {}
for d in data_with_annotation:
if _is_annotated_as_resource_type(d.annotation, d.annotation_metadata):
resources[d.key] = d.value
for field_name, field_value in data.items():
if field_name in nested_resource_field_names:
resources[field_name] = field_value
else:
non_resources[d.key] = d.value
non_resources[field_name] = field_value

out = SeparatedResourceParams(
resources=resources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,3 +869,73 @@ def my_asset(main: MainResource):
)
assert executed["my_asset"] == "bar"
executed.clear()


def test_nested_resource_setup_teardown_inner() -> None:
log = []

class MyBoringOuterResource(ConfigurableResource):
more_interesting_inner_resource: ResourceDependency["SetupTeardownInnerResource"]

class SetupTeardownInnerResource(ConfigurableResource):
def setup_for_execution(self, context: InitResourceContext) -> None:
log.append("SetupTeardownInnerResource setup_for_execution")

def teardown_after_execution(self, context: InitResourceContext) -> None:
log.append("SetupTeardownInnerResource teardown_after_execution")

@asset
def my_asset(outer: MyBoringOuterResource) -> str:
log.append("my_asset")
return "foo"

defs = Definitions(
assets=[my_asset],
resources={
"outer": MyBoringOuterResource(
more_interesting_inner_resource=SetupTeardownInnerResource()
),
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert log == [
"SetupTeardownInnerResource setup_for_execution",
"my_asset",
"SetupTeardownInnerResource teardown_after_execution",
]


def test_nested_resource_yield_inner() -> None:
log = []

class MyBoringOuterResource(ConfigurableResource):
more_interesting_inner_resource: ResourceDependency["SetupTeardownInnerResource"]

class SetupTeardownInnerResource(ConfigurableResource):
@contextlib.contextmanager
def yield_for_execution(self, context: InitResourceContext):
log.append("SetupTeardownInnerResource yield_for_execution")
yield self
log.append("SetupTeardownInnerResource yield_for_execution done")

@asset
def my_asset(outer: MyBoringOuterResource) -> str:
log.append("my_asset")
return "foo"

defs = Definitions(
assets=[my_asset],
resources={
"outer": MyBoringOuterResource(
more_interesting_inner_resource=SetupTeardownInnerResource()
),
},
)

assert defs.get_implicit_global_asset_job_def().execute_in_process().success
assert log == [
"SetupTeardownInnerResource yield_for_execution",
"my_asset",
"SetupTeardownInnerResource yield_for_execution done",
]
Loading