Skip to content

Commit

Permalink
Extract compute_required_resource_keys
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 2, 2024
1 parent f483de3 commit e9e610f
Showing 1 changed file with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,19 @@ def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
)


def compute_required_resource_keys(required_resource_keys, resource_defs, fn) -> Set[str]:
bare_required_resource_keys = required_resource_keys.copy()
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @multi_asset decorator and as"
" arguments to the decorated function",
)
return required_resource_keys - arg_resource_keys


@experimental_param(param="resource_defs")
@deprecated_param(
param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead."
Expand Down Expand Up @@ -646,10 +659,6 @@ def my_function(asset0):
additional_message="Only dicts are supported for asset config_schema.",
)

bare_required_resource_keys = required_resource_keys.copy()
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys

asset_out_map: Mapping[str, AssetOut] = {} if outs is None else outs

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
Expand Down Expand Up @@ -690,27 +699,20 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
can_subset=can_subset,
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @multi_asset decorator and as"
" arguments to the decorated function",
)

check.invariant(
len(in_out_mapper.overlapping_output_names) == 0,
f"Check output names overlap with asset output names: {in_out_mapper.overlapping_output_names}",
)

with disable_dagster_warnings():
op_required_resource_keys = required_resource_keys - arg_resource_keys

op = _Op(
name=op_name,
description=description,
ins=in_out_mapper.asset_ins_by_input_names,
out=in_out_mapper.combined_outs_by_output_name,
required_resource_keys=op_required_resource_keys,
required_resource_keys=compute_required_resource_keys(
required_resource_keys, resource_defs, fn
),
tags={
**({"kind": compute_kind} if compute_kind else {}),
**(op_tags or {}),
Expand Down

0 comments on commit e9e610f

Please sign in to comment.