Skip to content

Commit

Permalink
Extract compute_required_resource_keys (dagster-io#22228)
Browse files Browse the repository at this point in the history
## Summary & Motivation

We have this logic that handles computing the resource keys to pass to underlying the op strewn about the creation pathways. We consolidate it into a function here.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored and danielgafni committed Jun 18, 2024
1 parent 905465e commit aa378b7
Showing 1 changed file with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,23 @@ def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition:
)


def compute_required_resource_keys(
required_resource_keys: Set[str],
resource_defs: Mapping[str, ResourceDefinition],
fn: Callable[..., Any],
) -> Set[str]:
bare_required_resource_keys = required_resource_keys.copy()
resource_defs_keys = set(resource_defs.keys())
required_resource_keys = bare_required_resource_keys | resource_defs_keys
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @multi_asset decorator and as"
" arguments to the decorated function",
)
return required_resource_keys - arg_resource_keys


@experimental_param(param="resource_defs")
@deprecated_param(
param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead."
Expand Down Expand Up @@ -681,27 +698,20 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
can_subset=can_subset,
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @multi_asset decorator and as"
" arguments to the decorated function",
)

check.invariant(
len(in_out_mapper.overlapping_output_names) == 0,
f"Check output names overlap with asset output names: {in_out_mapper.overlapping_output_names}",
)

with disable_dagster_warnings():
op_required_resource_keys = required_resource_keys - arg_resource_keys

op = _Op(
name=op_name,
description=description,
ins=in_out_mapper.asset_ins_by_input_names,
out=in_out_mapper.combined_outs_by_output_name,
required_resource_keys=op_required_resource_keys,
required_resource_keys=compute_required_resource_keys(
required_resource_keys, resource_defs, fn
),
tags={
**({"kind": compute_kind} if compute_kind else {}),
**(op_tags or {}),
Expand Down

0 comments on commit aa378b7

Please sign in to comment.