From 687ebc06c0b5696a70f93125ecdb8e18113178e6 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Mon, 3 Jun 2024 00:57:19 -0400 Subject: [PATCH] use compute_required_resource_keys_for_underlying_op in @asset construction --- .../definitions/decorators/asset_decorator.py | 25 ++++++------------- .../decorators/assets_definition_factory.py | 14 +++++++---- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 9eff8557d55c6..95550e3902264 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -27,9 +27,6 @@ ) from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping -from dagster._core.definitions.resource_annotation import ( - get_resource_args, -) from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError from dagster._core.types.dagster_type import DagsterType from dagster._utils.warnings import ( @@ -59,6 +56,7 @@ AssetsDefinitionBuilderArgs, build_asset_ins, build_asset_outs, + compute_required_resource_keys_for_underlying_op, ) @@ -352,18 +350,19 @@ def invoke(args: AssetArgs, fn: Callable[..., Any]) -> AssetsDefinition: ) with disable_dagster_warnings(): - arg_resource_keys = {arg.name for arg in get_resource_args(fn)} - - bare_required_resource_keys = set(args.required_resource_keys) - resource_defs_dict = args.resource_defs - resource_defs_keys = set(resource_defs_dict.keys()) - decorator_resource_keys = bare_required_resource_keys | resource_defs_keys # TODO: rename op_resource_defs and asset_resource_defs and document # the strange logic -- schrockn 2024-06-03 op_resource_defs = wrap_resources_for_execution(resource_defs_dict) + op_required_resource_keys = compute_required_resource_keys_for_underlying_op( + explicitly_passed_required_resource_keys=args.required_resource_keys, + resource_defs_bound_to_asset=op_resource_defs, + fn=fn, + decorator="@asset", + ) + io_manager_key = args.io_manager_key if args.io_manager_def: if not io_manager_key: @@ -382,16 +381,8 @@ def invoke(args: AssetArgs, fn: Callable[..., Any]) -> AssetsDefinition: asset_resource_defs = wrap_resources_for_execution(resource_defs_dict) - check.param_invariant( - len(bare_required_resource_keys) == 0 or len(arg_resource_keys) == 0, - "Cannot specify resource requirements in both @asset decorator and as arguments" - " to the decorated function", - ) - io_manager_key = cast(str, io_manager_key) if io_manager_key else DEFAULT_IO_MANAGER_KEY - op_required_resource_keys = decorator_resource_keys - arg_resource_keys - # check backfill policy is BackfillPolicyType.SINGLE_RUN for non-partitioned asset if args.partitions_def is None: check.param_invariant( diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py index 9a439a40bf86f..b8c590418b120 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py @@ -206,14 +206,15 @@ def make_keys_by_output_name( def compute_required_resource_keys_for_underlying_op( - explicitly_passed_required_resource_keys: Set[str], + explicitly_passed_required_resource_keys: AbstractSet[str], resource_defs_bound_to_asset: Mapping[str, ResourceDefinition], fn: Callable[..., Any], -) -> Set[str]: + decorator: str, +) -> AbstractSet[str]: arg_resource_keys = {arg.name for arg in get_resource_args(fn)} check.param_invariant( len(explicitly_passed_required_resource_keys) == 0 or len(arg_resource_keys) == 0, - "Cannot specify resource requirements in both @multi_asset decorator and as" + f"Cannot specify resource requirements in both {decorator} decorator and as" " arguments to the decorated function", ) if arg_resource_keys: @@ -244,7 +245,7 @@ class AssetsDefinitionBuilderArgs(NamedTuple): config_schema: Optional[UserConfigSchema] retry_policy: Optional[RetryPolicy] compute_kind: Optional[str] - required_resource_keys: Set[str] + required_resource_keys: AbstractSet[str] assets_def_resource_defs: Mapping[str, ResourceDefinition] op_def_resource_defs: Mapping[str, ResourceDefinition] backfill_policy: Optional[BackfillPolicy] @@ -529,7 +530,10 @@ def _create_op_definition(self) -> OpDefinition: ins=self.asset_ins_by_input_names, out=self.combined_outs_by_output_name, required_resource_keys=compute_required_resource_keys_for_underlying_op( - self.args.required_resource_keys, self.args.op_def_resource_defs, self.fn + self.args.required_resource_keys, + self.args.op_def_resource_defs, + self.fn, + "@multi_asset", ), tags={ **({"kind": self.args.compute_kind} if self.args.compute_kind else {}),