From 4cb1fd67905a0abe8e6952e206ce8cfba8665ae8 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Thu, 6 Jun 2024 04:08:44 -0400 Subject: [PATCH] Extract compute_required_resource_keys (#22228) ## Summary & Motivation We have this logic that handles computing the resource keys to pass to underlying the op strewn about the creation pathways. We consolidate it into a function here. ## How I Tested These Changes BK --- .../definitions/decorators/asset_decorator.py | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 2d90de6fe5aa3..01aeb5553f0a1 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -500,6 +500,23 @@ def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition: ) +def compute_required_resource_keys( + required_resource_keys: Set[str], + resource_defs: Mapping[str, ResourceDefinition], + fn: Callable[..., Any], +) -> Set[str]: + bare_required_resource_keys = required_resource_keys.copy() + resource_defs_keys = set(resource_defs.keys()) + required_resource_keys = bare_required_resource_keys | resource_defs_keys + arg_resource_keys = {arg.name for arg in get_resource_args(fn)} + check.param_invariant( + len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0, + "Cannot specify resource requirements in both @multi_asset decorator and as" + " arguments to the decorated function", + ) + return required_resource_keys - arg_resource_keys + + @experimental_param(param="resource_defs") @deprecated_param( param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead." @@ -681,27 +698,20 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: can_subset=can_subset, ) - arg_resource_keys = {arg.name for arg in get_resource_args(fn)} - check.param_invariant( - len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0, - "Cannot specify resource requirements in both @multi_asset decorator and as" - " arguments to the decorated function", - ) - check.invariant( len(in_out_mapper.overlapping_output_names) == 0, f"Check output names overlap with asset output names: {in_out_mapper.overlapping_output_names}", ) with disable_dagster_warnings(): - op_required_resource_keys = required_resource_keys - arg_resource_keys - op = _Op( name=op_name, description=description, ins=in_out_mapper.asset_ins_by_input_names, out=in_out_mapper.combined_outs_by_output_name, - required_resource_keys=op_required_resource_keys, + required_resource_keys=compute_required_resource_keys( + required_resource_keys, resource_defs, fn + ), tags={ **({"kind": compute_kind} if compute_kind else {}), **(op_tags or {}),