From 670b36248a2842a51032aa81f5ad3b4b5955e313 Mon Sep 17 00:00:00 2001 From: benpankow Date: Thu, 16 Nov 2023 11:48:01 -0800 Subject: [PATCH 1/4] [pythonic resources] Add support for Pythonic resource classes to @configured --- .../dagster/_core/definitions/configurable.py | 15 +++ .../pythonic_resources/test_configured.py | 112 ++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py diff --git a/python_modules/dagster/dagster/_core/definitions/configurable.py b/python_modules/dagster/dagster/_core/definitions/configurable.py index e6f92c010f705..2dfaa5660ec0d 100644 --- a/python_modules/dagster/dagster/_core/definitions/configurable.py +++ b/python_modules/dagster/dagster/_core/definitions/configurable.py @@ -173,6 +173,11 @@ def _check_configurable_param(configurable: ConfigurableDefinition) -> None: " the `tag` or `alias` methods. For usage examples, see" " https://docs.dagster.io/concepts/configuration/configured", ) + from dagster._config.pythonic_config import ConfigurableResourceFactory, safe_is_subclass + + if safe_is_subclass(configurable, ConfigurableResourceFactory): + return + check.inst_param( configurable, "configurable", @@ -312,6 +317,16 @@ def dev_s3(config): """ _check_configurable_param(configurable) + from dagster._config.pythonic_config import ConfigurableResourceFactory, safe_is_subclass + + if safe_is_subclass(configurable, ConfigurableResourceFactory): + configurable_inner = ( + cast(Type[ConfigurableResourceFactory], configurable) + .configure_at_launch() + .get_resource_definition() + ) + return configured(configurable_inner, config_schema=config_schema, **kwargs) + if isinstance(configurable, NamedConfigurableDefinition): def _configured(config_or_config_fn: object) -> T_Configurable: diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py new file mode 100644 index 0000000000000..13bae93968550 --- /dev/null +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py @@ -0,0 +1,112 @@ +from typing import Any, Dict + +import pytest +from dagster import Config, ConfigurableResource, InitResourceContext, configured, job, op, resource +from dagster._check import CheckError + + +def test_config_mapping_return_resource_config_dict() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + @resource(config_schema={"resource_param": str}) + def my_resource_legacy(context: InitResourceContext) -> MyResource: + return MyResource(resource_param=context.resource_config["resource_param"]) + + @configured(my_resource_legacy, config_schema={"simplified_param": str}) + def my_resource_legacy_simplified(config_in) -> Dict[str, Any]: + return {"resource_param": config_in["simplified_param"]} + + @op + def do_something(my_resource: ConfigurableResource) -> str: + return my_resource.resource_param + + @job + def do_it_all() -> None: + do_something() + + result = do_it_all.execute_in_process( + resources={ + "my_resource": my_resource_legacy_simplified.configured({"simplified_param": "foo"}) + } + ) + assert result.success + assert result.output_for_node("do_something") == "foo" + + class MyResourceSimplifiedConfig(Config): + simplified_param: str + + # New, fancy config mapping takes in a Pythonic config object but returns normal config dict + @configured(MyResource) + def my_resource_simplified(config_in: MyResourceSimplifiedConfig) -> Dict[str, Any]: + return {"resource_param": config_in.simplified_param} + + result = do_it_all.execute_in_process( + resources={"my_resource": my_resource_simplified.configured({"simplified_param": "foo"})} + ) + assert result.success + assert result.output_for_node("do_something") == "foo" + + +def test_config_mapping_return_resource_object() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + @op + def do_something(my_resource: ConfigurableResource) -> str: + return my_resource.resource_param + + @job + def do_it_all() -> None: + do_something() + + class MyResourceSimplifiedConfig(Config): + simplified_param: str + + # New, fancy config mapping takes in a Pythonic config object and returns a constructed resource + @configured(MyResource) + def my_resource_simplified(config_in: MyResourceSimplifiedConfig) -> MyResource: + return MyResource(resource_param=config_in.simplified_param) + + result = do_it_all.execute_in_process( + resources={"my_resource": my_resource_simplified.configured({"simplified_param": "foo"})} + ) + assert result.success + assert result.output_for_node("do_something") == "foo" + + +def test_config_annotation_no_config_schema_err() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + class MyResourceSimplifiedConfig(Config): + simplified_param: str + + # Ensure that we error if we try to provide a config_schema to a @configured function + # which has a Config-annotated param - no need to provide a config_schema in this case + with pytest.raises( + CheckError, + match="Cannot provide config_schema to @configured function with Config-annotated param", + ): + + @configured(MyResource, config_schema={"simplified_param": str}) + def my_resource_simplified(config_in: MyResourceSimplifiedConfig): + ... + + +def test_config_annotation_extra_param_err() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + class MyResourceSimplifiedConfig(Config): + simplified_param: str + + # Ensure that we error if the @configured function has an extra param + with pytest.raises( + CheckError, + match="@configured function should have exactly one parameter", + ): + + @configured(MyResource) + def my_resource_simplified(config_in: MyResourceSimplifiedConfig, useless_param: str): + ... From 4fe6edcbecfd481990da1567234e786d15db07ed Mon Sep 17 00:00:00 2001 From: benpankow Date: Thu, 16 Nov 2023 12:02:59 -0800 Subject: [PATCH 2/4] new tests --- .../pythonic_resources/test_configured.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py index 13bae93968550..8186eb6973c4a 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/pythonic_resources/test_configured.py @@ -5,6 +5,27 @@ from dagster._check import CheckError +def test_config_mapping_return_resource_config_dict_noargs() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + @configured(MyResource) + def my_resource_noargs(_) -> Dict[str, Any]: + return {"resource_param": "foo"} + + @op + def do_something(my_resource: ConfigurableResource) -> str: + return my_resource.resource_param + + @job + def do_it_all() -> None: + do_something() + + result = do_it_all.execute_in_process(resources={"my_resource": my_resource_noargs}) + assert result.success + assert result.output_for_node("do_something") == "foo" + + def test_config_mapping_return_resource_config_dict() -> None: class MyResource(ConfigurableResource): resource_param: str @@ -110,3 +131,47 @@ class MyResourceSimplifiedConfig(Config): @configured(MyResource) def my_resource_simplified(config_in: MyResourceSimplifiedConfig, useless_param: str): ... + + +def test_factory_resource_pattern_noargs() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + class MyResourceNoargs(ConfigurableResource): + def create_resource(self, context: InitResourceContext) -> Any: + return MyResource(resource_param="foo") + + @op + def do_something(my_resource: ConfigurableResource) -> str: + return my_resource.resource_param + + @job + def do_it_all() -> None: + do_something() + + result = do_it_all.execute_in_process(resources={"my_resource": MyResourceNoargs()}) + assert result.success + assert result.output_for_node("do_something") == "foo" + + +def test_factory_resource_pattern_args() -> None: + class MyResource(ConfigurableResource): + resource_param: str + + class MyResourceFromInt(ConfigurableResource): + an_int: int + + def create_resource(self, context: InitResourceContext) -> Any: + return MyResource(resource_param=str(self.an_int)) + + @op + def do_something(my_resource: ConfigurableResource) -> str: + return my_resource.resource_param + + @job + def do_it_all() -> None: + do_something() + + result = do_it_all.execute_in_process(resources={"my_resource": MyResourceFromInt(an_int=10)}) + assert result.success + assert result.output_for_node("do_something") == "10" From d44c77209f1f3d19fa8e8e91601a7d90ecef9665 Mon Sep 17 00:00:00 2001 From: benpankow Date: Thu, 16 Nov 2023 13:16:21 -0800 Subject: [PATCH 3/4] pyright --- .../dagster/_core/definitions/configurable.py | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/configurable.py b/python_modules/dagster/dagster/_core/definitions/configurable.py index 2dfaa5660ec0d..08f0835cbeb88 100644 --- a/python_modules/dagster/dagster/_core/definitions/configurable.py +++ b/python_modules/dagster/dagster/_core/definitions/configurable.py @@ -177,16 +177,16 @@ def _check_configurable_param(configurable: ConfigurableDefinition) -> None: if safe_is_subclass(configurable, ConfigurableResourceFactory): return - - check.inst_param( - configurable, - "configurable", - ConfigurableDefinition, - "Only the following types can be used with the `configured` method: ResourceDefinition," - " ExecutorDefinition, GraphDefinition, NodeDefinition, and LoggerDefinition." - " For usage examples of `configured`, see" - " https://docs.dagster.io/concepts/configuration/configured", - ) + else: + check.inst_param( + configurable, + "configurable", + ConfigurableDefinition, + "Only the following types can be used with the `configured` method: ResourceDefinition," + " ExecutorDefinition, GraphDefinition, NodeDefinition, and LoggerDefinition." + " For usage examples of `configured`, see" + " https://docs.dagster.io/concepts/configuration/configured", + ) T_Configurable = TypeVar( @@ -318,14 +318,20 @@ def dev_s3(config): _check_configurable_param(configurable) from dagster._config.pythonic_config import ConfigurableResourceFactory, safe_is_subclass + from dagster._core.definitions.resource_definition import ResourceDefinition + # we specially handle ConfigurableResources, treating it as @configured of the + # underlying resource definition (which is indeed a ConfigurableDefinition) if safe_is_subclass(configurable, ConfigurableResourceFactory): - configurable_inner = ( - cast(Type[ConfigurableResourceFactory], configurable) - .configure_at_launch() - .get_resource_definition() + configurable_inner = cast( + ResourceDefinition, + ( + cast(Type[ConfigurableResourceFactory], configurable) + .configure_at_launch() + .get_resource_definition() + ), ) - return configured(configurable_inner, config_schema=config_schema, **kwargs) + return configured(configurable_inner, config_schema=config_schema, **kwargs) # type: ignore if isinstance(configurable, NamedConfigurableDefinition): From 77d2bfe2ac5b0a9faec10268b1a61ccbd2adee2b Mon Sep 17 00:00:00 2001 From: benpankow Date: Mon, 27 Nov 2023 08:30:51 -0600 Subject: [PATCH 4/4] type ignore --- .../dagster/dagster/_core/definitions/configurable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/configurable.py b/python_modules/dagster/dagster/_core/definitions/configurable.py index 08f0835cbeb88..d8f2d39a9495e 100644 --- a/python_modules/dagster/dagster/_core/definitions/configurable.py +++ b/python_modules/dagster/dagster/_core/definitions/configurable.py @@ -325,7 +325,7 @@ def dev_s3(config): if safe_is_subclass(configurable, ConfigurableResourceFactory): configurable_inner = cast( ResourceDefinition, - ( + ( # type: ignore cast(Type[ConfigurableResourceFactory], configurable) .configure_at_launch() .get_resource_definition()