Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract InOutMapper to begin refactoring AssetsDefinition construction process #22222

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.config import ConfigMapping
from dagster._core.definitions.decorators.assets_definition_factory import InOutMapper
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping
from dagster._core.definitions.resource_annotation import get_resource_args
Expand Down Expand Up @@ -739,19 +740,17 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
f" {list(valid_asset_deps)[:20]}",
)

in_out_mapper = InOutMapper.from_asset_ins_and_asset_outs(
asset_ins=asset_ins, asset_outs=output_tuples_by_asset_key
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
check.param_invariant(
len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @multi_asset decorator and as"
" arguments to the decorated function",
)

asset_outs_by_output_name: Mapping[str, Out] = dict(output_tuples_by_asset_key.values())
keys_by_output_name = {
output_name: asset_key
for asset_key, (output_name, _) in output_tuples_by_asset_key.items()
}

check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs(
check_specs, list(output_tuples_by_asset_key.keys())
)
Expand All @@ -760,14 +759,14 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
for output_name in check_specs_by_output_name.keys()
}
overlapping_output_names = (
asset_outs_by_output_name.keys() & check_outs_by_output_name.keys()
in_out_mapper.asset_outs_by_output_name.keys() & check_outs_by_output_name.keys()
)
check.invariant(
len(overlapping_output_names) == 0,
f"Check output names overlap with asset output names: {overlapping_output_names}",
)
combined_outs_by_output_name: Mapping[str, Out] = {
**asset_outs_by_output_name,
**in_out_mapper.asset_outs_by_output_name,
**check_outs_by_output_name,
}

Expand All @@ -778,7 +777,9 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
if spec.deps is not None
}
else:
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}
internal_deps = {
in_out_mapper.keys_by_output_name[name]: asset_deps[name] for name in asset_deps
}

# when a subsettable multi-asset is defined, it is possible that it will need to be
# broken into two separate parts, one which depends on the other. in order to represent
Expand Down Expand Up @@ -833,7 +834,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
}
input_deps = list(input_deps_by_key.values())
for output_name, asset_out in asset_out_map.items():
key = keys_by_output_name[output_name]
key = in_out_mapper.keys_by_output_name[output_name]
if internal_asset_deps:
deps = [
input_deps_by_key.get(
Expand All @@ -857,7 +858,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:

return AssetsDefinition.dagster_internal_init(
keys_by_input_name=keys_by_input_name,
keys_by_output_name=keys_by_output_name,
keys_by_output_name=in_out_mapper.keys_by_output_name,
node_def=op,
partitions_def=partitions_def,
can_subset=can_subset,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from functools import cached_property
schrockn marked this conversation as resolved.
Show resolved Hide resolved
from typing import Mapping, NamedTuple, Tuple

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.input import In
from dagster._core.definitions.output import Out


class InMapping(NamedTuple):
schrockn marked this conversation as resolved.
Show resolved Hide resolved
input_name: str
input: In


class OutMapping(NamedTuple):
output_name: str
output: Out


class InOutMapper:
def __init__(
self,
in_mappings: Mapping[AssetKey, InMapping],
out_mappings: Mapping[AssetKey, OutMapping],
) -> None:
self.in_mappings = in_mappings
self.out_mappings = out_mappings

@staticmethod
def from_asset_ins_and_asset_outs(
asset_ins: Mapping[AssetKey, Tuple[str, In]],
asset_outs: Mapping[AssetKey, Tuple[str, Out]],
):
in_mappings = {
schrockn marked this conversation as resolved.
Show resolved Hide resolved
asset_key: InMapping(input_name, in_)
for asset_key, (input_name, in_) in asset_ins.items()
}
out_mappings = {
asset_key: OutMapping(output_name, out_)
for asset_key, (output_name, out_) in asset_outs.items()
}
return InOutMapper(in_mappings, out_mappings)

@cached_property
def asset_outs_by_output_name(self) -> Mapping[str, Out]:
schrockn marked this conversation as resolved.
Show resolved Hide resolved
return dict(self.out_mappings.values())

@cached_property
def keys_by_output_name(self) -> Mapping[str, AssetKey]:
return {
out_mapping.output_name: asset_key
for asset_key, out_mapping in self.out_mappings.items()
}