Skip to content

Commit

Permalink
use compute_required_resource_keys_for_underlying_op in @asset constr…
Browse files Browse the repository at this point in the history
…uction
  • Loading branch information
schrockn committed Jun 3, 2024
1 parent 9c6bec6 commit 687ebc0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping
from dagster._core.definitions.resource_annotation import (
get_resource_args,
)
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterType
from dagster._utils.warnings import (
Expand Down Expand Up @@ -59,6 +56,7 @@
AssetsDefinitionBuilderArgs,
build_asset_ins,
build_asset_outs,
compute_required_resource_keys_for_underlying_op,
)


Expand Down Expand Up @@ -352,18 +350,19 @@ def invoke(args: AssetArgs, fn: Callable[..., Any]) -> AssetsDefinition:
)

with disable_dagster_warnings():
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}

bare_required_resource_keys = set(args.required_resource_keys)

resource_defs_dict = args.resource_defs
resource_defs_keys = set(resource_defs_dict.keys())
decorator_resource_keys = bare_required_resource_keys | resource_defs_keys

# TODO: rename op_resource_defs and asset_resource_defs and document
# the strange logic -- schrockn 2024-06-03
op_resource_defs = wrap_resources_for_execution(resource_defs_dict)

op_required_resource_keys = compute_required_resource_keys_for_underlying_op(
explicitly_passed_required_resource_keys=args.required_resource_keys,
resource_defs_bound_to_asset=op_resource_defs,
fn=fn,
decorator="@asset",
)

io_manager_key = args.io_manager_key
if args.io_manager_def:
if not io_manager_key:
Expand All @@ -382,16 +381,8 @@ def invoke(args: AssetArgs, fn: Callable[..., Any]) -> AssetsDefinition:

asset_resource_defs = wrap_resources_for_execution(resource_defs_dict)

check.param_invariant(
len(bare_required_resource_keys) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @asset decorator and as arguments"
" to the decorated function",
)

io_manager_key = cast(str, io_manager_key) if io_manager_key else DEFAULT_IO_MANAGER_KEY

op_required_resource_keys = decorator_resource_keys - arg_resource_keys

# check backfill policy is BackfillPolicyType.SINGLE_RUN for non-partitioned asset
if args.partitions_def is None:
check.param_invariant(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,15 @@ def make_keys_by_output_name(


def compute_required_resource_keys_for_underlying_op(
explicitly_passed_required_resource_keys: Set[str],
explicitly_passed_required_resource_keys: AbstractSet[str],
resource_defs_bound_to_asset: Mapping[str, ResourceDefinition],
fn: Callable[..., Any],
) -> Set[str]:
decorator: str,
) -> AbstractSet[str]:
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(explicitly_passed_required_resource_keys) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @multi_asset decorator and as"
f"Cannot specify resource requirements in both {decorator} decorator and as"
" arguments to the decorated function",
)
if arg_resource_keys:
Expand Down Expand Up @@ -244,7 +245,7 @@ class AssetsDefinitionBuilderArgs(NamedTuple):
config_schema: Optional[UserConfigSchema]
retry_policy: Optional[RetryPolicy]
compute_kind: Optional[str]
required_resource_keys: Set[str]
required_resource_keys: AbstractSet[str]
assets_def_resource_defs: Mapping[str, ResourceDefinition]
op_def_resource_defs: Mapping[str, ResourceDefinition]
backfill_policy: Optional[BackfillPolicy]
Expand Down Expand Up @@ -529,7 +530,10 @@ def _create_op_definition(self) -> OpDefinition:
ins=self.asset_ins_by_input_names,
out=self.combined_outs_by_output_name,
required_resource_keys=compute_required_resource_keys_for_underlying_op(
self.args.required_resource_keys, self.args.op_def_resource_defs, self.fn
self.args.required_resource_keys,
self.args.op_def_resource_defs,
self.fn,
"@multi_asset",
),
tags={
**({"kind": self.args.compute_kind} if self.args.compute_kind else {}),
Expand Down

0 comments on commit 687ebc0

Please sign in to comment.