Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pythonic resources] Add support for Pythonic resource classes to @configured #18079

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions python_modules/dagster/dagster/_core/definitions/configurable.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,20 @@ def _check_configurable_param(configurable: ConfigurableDefinition) -> None:
" the `tag` or `alias` methods. For usage examples, see"
" https://docs.dagster.io/concepts/configuration/configured",
)
check.inst_param(
configurable,
"configurable",
ConfigurableDefinition,
"Only the following types can be used with the `configured` method: ResourceDefinition,"
" ExecutorDefinition, GraphDefinition, NodeDefinition, and LoggerDefinition."
" For usage examples of `configured`, see"
" https://docs.dagster.io/concepts/configuration/configured",
)
from dagster._config.pythonic_config import ConfigurableResourceFactory, safe_is_subclass

if safe_is_subclass(configurable, ConfigurableResourceFactory):
return
else:
check.inst_param(
configurable,
"configurable",
ConfigurableDefinition,
"Only the following types can be used with the `configured` method: ResourceDefinition,"
" ExecutorDefinition, GraphDefinition, NodeDefinition, and LoggerDefinition."
" For usage examples of `configured`, see"
" https://docs.dagster.io/concepts/configuration/configured",
)


T_Configurable = TypeVar(
Expand Down Expand Up @@ -312,6 +317,22 @@ def dev_s3(config):
"""
_check_configurable_param(configurable)

from dagster._config.pythonic_config import ConfigurableResourceFactory, safe_is_subclass
from dagster._core.definitions.resource_definition import ResourceDefinition

# we specially handle ConfigurableResources, treating it as @configured of the
# underlying resource definition (which is indeed a ConfigurableDefinition)
if safe_is_subclass(configurable, ConfigurableResourceFactory):
configurable_inner = cast(
ResourceDefinition,
( # type: ignore
cast(Type[ConfigurableResourceFactory], configurable)
.configure_at_launch()
.get_resource_definition()
),
)
return configured(configurable_inner, config_schema=config_schema, **kwargs) # type: ignore

if isinstance(configurable, NamedConfigurableDefinition):

def _configured(config_or_config_fn: object) -> T_Configurable:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from typing import Any, Dict

import pytest
from dagster import Config, ConfigurableResource, InitResourceContext, configured, job, op, resource
from dagster._check import CheckError


def test_config_mapping_return_resource_config_dict_noargs() -> None:
class MyResource(ConfigurableResource):
resource_param: str

@configured(MyResource)
def my_resource_noargs(_) -> Dict[str, Any]:
return {"resource_param": "foo"}

@op
def do_something(my_resource: ConfigurableResource) -> str:
return my_resource.resource_param

@job
def do_it_all() -> None:
do_something()

result = do_it_all.execute_in_process(resources={"my_resource": my_resource_noargs})
assert result.success
assert result.output_for_node("do_something") == "foo"


def test_config_mapping_return_resource_config_dict() -> None:
class MyResource(ConfigurableResource):
resource_param: str

@resource(config_schema={"resource_param": str})
def my_resource_legacy(context: InitResourceContext) -> MyResource:
return MyResource(resource_param=context.resource_config["resource_param"])

@configured(my_resource_legacy, config_schema={"simplified_param": str})
def my_resource_legacy_simplified(config_in) -> Dict[str, Any]:
return {"resource_param": config_in["simplified_param"]}

@op
def do_something(my_resource: ConfigurableResource) -> str:
return my_resource.resource_param

@job
def do_it_all() -> None:
do_something()

result = do_it_all.execute_in_process(
resources={
"my_resource": my_resource_legacy_simplified.configured({"simplified_param": "foo"})
}
)
assert result.success
assert result.output_for_node("do_something") == "foo"

class MyResourceSimplifiedConfig(Config):
simplified_param: str

# New, fancy config mapping takes in a Pythonic config object but returns normal config dict
@configured(MyResource)
def my_resource_simplified(config_in: MyResourceSimplifiedConfig) -> Dict[str, Any]:
return {"resource_param": config_in.simplified_param}

result = do_it_all.execute_in_process(
resources={"my_resource": my_resource_simplified.configured({"simplified_param": "foo"})}
)
assert result.success
assert result.output_for_node("do_something") == "foo"


def test_config_mapping_return_resource_object() -> None:
class MyResource(ConfigurableResource):
resource_param: str

@op
def do_something(my_resource: ConfigurableResource) -> str:
return my_resource.resource_param

@job
def do_it_all() -> None:
do_something()

class MyResourceSimplifiedConfig(Config):
simplified_param: str

# New, fancy config mapping takes in a Pythonic config object and returns a constructed resource
@configured(MyResource)
def my_resource_simplified(config_in: MyResourceSimplifiedConfig) -> MyResource:
return MyResource(resource_param=config_in.simplified_param)

result = do_it_all.execute_in_process(
resources={"my_resource": my_resource_simplified.configured({"simplified_param": "foo"})}
)
assert result.success
assert result.output_for_node("do_something") == "foo"


def test_config_annotation_no_config_schema_err() -> None:
class MyResource(ConfigurableResource):
resource_param: str

class MyResourceSimplifiedConfig(Config):
simplified_param: str

# Ensure that we error if we try to provide a config_schema to a @configured function
# which has a Config-annotated param - no need to provide a config_schema in this case
with pytest.raises(
CheckError,
match="Cannot provide config_schema to @configured function with Config-annotated param",
):

@configured(MyResource, config_schema={"simplified_param": str})
def my_resource_simplified(config_in: MyResourceSimplifiedConfig):
...


def test_config_annotation_extra_param_err() -> None:
class MyResource(ConfigurableResource):
resource_param: str

class MyResourceSimplifiedConfig(Config):
simplified_param: str

# Ensure that we error if the @configured function has an extra param
with pytest.raises(
CheckError,
match="@configured function should have exactly one parameter",
):

@configured(MyResource)
def my_resource_simplified(config_in: MyResourceSimplifiedConfig, useless_param: str):
...


def test_factory_resource_pattern_noargs() -> None:
class MyResource(ConfigurableResource):
resource_param: str

class MyResourceNoargs(ConfigurableResource):
def create_resource(self, context: InitResourceContext) -> Any:
return MyResource(resource_param="foo")

@op
def do_something(my_resource: ConfigurableResource) -> str:
return my_resource.resource_param

@job
def do_it_all() -> None:
do_something()

result = do_it_all.execute_in_process(resources={"my_resource": MyResourceNoargs()})
assert result.success
assert result.output_for_node("do_something") == "foo"


def test_factory_resource_pattern_args() -> None:
class MyResource(ConfigurableResource):
resource_param: str

class MyResourceFromInt(ConfigurableResource):
an_int: int

def create_resource(self, context: InitResourceContext) -> Any:
return MyResource(resource_param=str(self.an_int))

@op
def do_something(my_resource: ConfigurableResource) -> str:
return my_resource.resource_param

@job
def do_it_all() -> None:
do_something()

result = do_it_all.execute_in_process(resources={"my_resource": MyResourceFromInt(an_int=10)})
assert result.success
assert result.output_for_node("do_something") == "10"