From e98ae1eb5a2136fe3b60bc58fc506f5b2121f568 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Thu, 6 Jun 2024 03:56:22 -0400 Subject: [PATCH] Extract InOutMapper to begin refactoring AssetsDefinition construction process (#22222) ## Summary & Motivation `multi_asset` is a beast, as well as the entire `AssetsDefinition` creation machinery. This class introduces a class `InOutMapper` (renamed to `AssetsDefinitionFactory` upstack once its mandate increased) that begins to tease apart `multi_asset` which is nearly 300 LoC, has 37 local variables, and a huge dynamically scoped inner function. To see where this is going I have created a digest [PR](https://github.com/dagster-io/dagster/pull/22238) that demonstrate the before after. This also sets us up to consolidate the `AssetsDefinition` creation code paths, as it contains tons of duplicated code strewn about. Instead we will be able to invoke the decomposed code in the new factory functions. This was motivated by the discussion in https://github.com/dagster-io/dagster/pull/22221 that suggested we move a propose class to be within the inheritance hierarchy of `AssetsDefinition`. The complexity of logic surrounding the construction of said object is completely out of control, and I found it effectively intractable to do an inheritance scheme cleanly. ## How I Tested These Changes BK --- .../definitions/decorators/asset_decorator.py | 23 ++++---- .../decorators/assets_definition_factory.py | 52 +++++++++++++++++++ 2 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index d969639a0830d..1bd7809974cbf 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -25,6 +25,7 @@ from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.config import ConfigMapping +from dagster._core.definitions.decorators.assets_definition_factory import InOutMapper from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping from dagster._core.definitions.resource_annotation import get_resource_args @@ -739,6 +740,10 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: f" {list(valid_asset_deps)[:20]}", ) + in_out_mapper = InOutMapper.from_asset_ins_and_asset_outs( + asset_ins=asset_ins, asset_outs=output_tuples_by_asset_key + ) + arg_resource_keys = {arg.name for arg in get_resource_args(fn)} check.param_invariant( len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0, @@ -746,12 +751,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: " arguments to the decorated function", ) - asset_outs_by_output_name: Mapping[str, Out] = dict(output_tuples_by_asset_key.values()) - keys_by_output_name = { - output_name: asset_key - for asset_key, (output_name, _) in output_tuples_by_asset_key.items() - } - check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( check_specs, list(output_tuples_by_asset_key.keys()) ) @@ -760,14 +759,14 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: for output_name in check_specs_by_output_name.keys() } overlapping_output_names = ( - asset_outs_by_output_name.keys() & check_outs_by_output_name.keys() + in_out_mapper.asset_outs_by_output_name.keys() & check_outs_by_output_name.keys() ) check.invariant( len(overlapping_output_names) == 0, f"Check output names overlap with asset output names: {overlapping_output_names}", ) combined_outs_by_output_name: Mapping[str, Out] = { - **asset_outs_by_output_name, + **in_out_mapper.asset_outs_by_output_name, **check_outs_by_output_name, } @@ -778,7 +777,9 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: if spec.deps is not None } else: - internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps} + internal_deps = { + in_out_mapper.keys_by_output_name[name]: asset_deps[name] for name in asset_deps + } # when a subsettable multi-asset is defined, it is possible that it will need to be # broken into two separate parts, one which depends on the other. in order to represent @@ -833,7 +834,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: } input_deps = list(input_deps_by_key.values()) for output_name, asset_out in asset_out_map.items(): - key = keys_by_output_name[output_name] + key = in_out_mapper.keys_by_output_name[output_name] if internal_asset_deps: deps = [ input_deps_by_key.get( @@ -857,7 +858,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: return AssetsDefinition.dagster_internal_init( keys_by_input_name=keys_by_input_name, - keys_by_output_name=keys_by_output_name, + keys_by_output_name=in_out_mapper.keys_by_output_name, node_def=op, partitions_def=partitions_def, can_subset=can_subset, diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py new file mode 100644 index 0000000000000..efce0a3c7af0c --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/decorators/assets_definition_factory.py @@ -0,0 +1,52 @@ +from functools import cached_property +from typing import Mapping, NamedTuple, Tuple + +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.input import In +from dagster._core.definitions.output import Out + + +class InMapping(NamedTuple): + input_name: str + input: In + + +class OutMapping(NamedTuple): + output_name: str + output: Out + + +class InOutMapper: + def __init__( + self, + in_mappings: Mapping[AssetKey, InMapping], + out_mappings: Mapping[AssetKey, OutMapping], + ) -> None: + self.in_mappings = in_mappings + self.out_mappings = out_mappings + + @staticmethod + def from_asset_ins_and_asset_outs( + asset_ins: Mapping[AssetKey, Tuple[str, In]], + asset_outs: Mapping[AssetKey, Tuple[str, Out]], + ): + in_mappings = { + asset_key: InMapping(input_name, in_) + for asset_key, (input_name, in_) in asset_ins.items() + } + out_mappings = { + asset_key: OutMapping(output_name, out_) + for asset_key, (output_name, out_) in asset_outs.items() + } + return InOutMapper(in_mappings, out_mappings) + + @cached_property + def asset_outs_by_output_name(self) -> Mapping[str, Out]: + return dict(self.out_mappings.values()) + + @cached_property + def keys_by_output_name(self) -> Mapping[str, AssetKey]: + return { + out_mapping.output_name: asset_key + for asset_key, out_mapping in self.out_mappings.items() + }