Skip to content

Commit

Permalink
Move outs parameter contruction to InOutMapper (#22224)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Continues on the path of decomposing and moving complexity to the mapper. In this case the construction of the `outs` parameter to `AssetsDefinition`.

## How I Tested These Changes

BK
  • Loading branch information
schrockn authored and salazarm committed Jun 10, 2024
1 parent 146347b commit 7cbb81b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
asset_ins=asset_ins,
asset_outs=output_tuples_by_asset_key,
check_specs=check_specs or [],
can_subset=can_subset,
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
Expand All @@ -755,21 +756,10 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
" arguments to the decorated function",
)

check_outs_by_output_name: Mapping[str, Out] = {
output_name: Out(dagster_type=None, is_required=not can_subset)
for output_name in in_out_mapper.check_specs_by_output_name.keys()
}
overlapping_output_names = (
in_out_mapper.asset_outs_by_output_name.keys() & check_outs_by_output_name.keys()
)
check.invariant(
len(overlapping_output_names) == 0,
f"Check output names overlap with asset output names: {overlapping_output_names}",
len(in_out_mapper.overlapping_output_names) == 0,
f"Check output names overlap with asset output names: {in_out_mapper.overlapping_output_names}",
)
combined_outs_by_output_name: Mapping[str, Out] = {
**in_out_mapper.asset_outs_by_output_name,
**check_outs_by_output_name,
}

if specs:
internal_deps = {
Expand Down Expand Up @@ -800,7 +790,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
name=op_name,
description=description,
ins=dict(asset_ins.values()),
out=combined_outs_by_output_name,
out=in_out_mapper.combined_outs_by_output_name,
required_resource_keys=op_required_resource_keys,
tags={
**({"kind": compute_kind} if compute_kind else {}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@ def __init__(
in_mappings: Mapping[AssetKey, InMapping],
out_mappings: Mapping[AssetKey, OutMapping],
check_specs: Sequence[AssetCheckSpec],
can_subset: bool,
) -> None:
self.in_mappings = in_mappings
self.out_mappings = out_mappings
self.check_specs = check_specs
self.can_subset = can_subset

@staticmethod
def from_asset_ins_and_asset_outs(
asset_ins: Mapping[AssetKey, Tuple[str, In]],
asset_outs: Mapping[AssetKey, Tuple[str, Out]],
check_specs: Sequence[AssetCheckSpec],
can_subset: bool,
):
in_mappings = {
asset_key: InMapping(input_name, in_)
Expand All @@ -52,7 +55,7 @@ def from_asset_ins_and_asset_outs(
asset_key: OutMapping(output_name, out_)
for asset_key, (output_name, out_) in asset_outs.items()
}
return InOutMapper(in_mappings, out_mappings, check_specs)
return InOutMapper(in_mappings, out_mappings, check_specs, can_subset)

@cached_property
def asset_outs_by_output_name(self) -> Mapping[str, Out]:
Expand All @@ -75,6 +78,26 @@ def check_specs_by_output_name(self) -> Mapping[str, AssetCheckSpec]:
self.check_specs, list(self.asset_keys)
)

@cached_property
def check_outs_by_output_name(self) -> Mapping[str, Out]:
return {
output_name: Out(dagster_type=None, is_required=not self.can_subset)
for output_name in self.check_specs_by_output_name.keys()
}

@cached_property
def combined_outs_by_output_name(self) -> Mapping[str, Out]:
return {
**self.asset_outs_by_output_name,
**self.check_outs_by_output_name,
}

@cached_property
def overlapping_output_names(self) -> Set[str]:
return set(self.asset_outs_by_output_name.keys()) & set(
self.check_outs_by_output_name.keys()
)


def validate_and_assign_output_names_to_check_specs(
check_specs: Optional[Sequence[AssetCheckSpec]], valid_asset_keys: Sequence[AssetKey]
Expand Down

0 comments on commit 7cbb81b

Please sign in to comment.