From 51b04dbe73e1b06bad900fd05f0a46719b8d0832 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Sat, 8 Jun 2024 06:20:19 -0400 Subject: [PATCH] refactor builder creation --- .../definitions/decorators/asset_decorator.py | 55 +++++++++++- .../decorator_assets_definition_builder.py | 88 ++++++++++++------- 2 files changed, 110 insertions(+), 33 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 12d5858f2b91b..faf0318856e15 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -448,9 +448,17 @@ def create_assets_def_from_fn_and_decorator_args( can_subset=False, decorator_name="@asset", ) + builder = DecoratorAssetsDefinitionBuilder.from_asset_outs( - args=builder_args, fn=fn, op_name=out_asset_key.to_python_identifier() + fn=fn, + op_name=out_asset_key.to_python_identifier(), + asset_in_map=builder_args.asset_in_map, + asset_out_map=builder_args.asset_out_map, + asset_deps=builder_args.asset_deps, + upstream_asset_deps=builder_args.upstream_asset_deps, + passed_args=builder_args, ) + return builder.create_assets_definition() @@ -608,7 +616,7 @@ def my_function(asset0): ) def inner(fn: Callable[..., Any]) -> AssetsDefinition: - builder = DecoratorAssetsDefinitionBuilder.from_args(args=args, fn=fn) + builder = create_builder_for_multi_asset(fn=fn, args=args) check.invariant( len(builder.overlapping_output_names) == 0, @@ -621,6 +629,49 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: return inner +@staticmethod +def create_builder_for_multi_asset( + *, fn: Callable[..., Any], args: DecoratorAssetsDefinitionBuilderArgs +) -> "DecoratorAssetsDefinitionBuilder": + op_name = args.name or fn.__name__ + + if args.asset_out_map and args.specs: + raise DagsterInvalidDefinitionError("Must specify only outs or specs but not both.") + + if args.specs: + check.invariant( + args.decorator_name == "@multi_asset", "Only hit this code path in multi_asset." + ) + if args.upstream_asset_deps: + raise DagsterInvalidDefinitionError( + "Can not pass deps and specs to @multi_asset, specify deps on the AssetSpecs" + " directly." + ) + if args.asset_deps: + raise DagsterInvalidDefinitionError( + "Can not pass internal_asset_deps and specs to @multi_asset, specify deps on" + " the AssetSpecs directly." + ) + return DecoratorAssetsDefinitionBuilder.from_specs( + fn=fn, + op_name=op_name, + passed_args=args, + asset_specs=args.specs, + can_subset=args.can_subset, + asset_in_map=args.asset_in_map, + ) + + return DecoratorAssetsDefinitionBuilder.from_asset_outs( + fn=fn, + op_name=op_name, + asset_in_map=args.asset_in_map, + asset_out_map=args.asset_out_map, + asset_deps=args.asset_deps, + upstream_asset_deps=args.upstream_asset_deps, + passed_args=args, + ) + + @overload def graph_asset( compose_fn: Callable[..., Any], diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py index 2f64e9c2a0006..a6393b69fc4a2 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py @@ -20,6 +20,7 @@ import dagster._check as check from dagster._config.config_schema import UserConfigSchema from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.asset_dep import AssetDep from dagster._core.definitions.asset_in import AssetIn from dagster._core.definitions.asset_key import AssetKey @@ -99,7 +100,7 @@ def build_named_ins( "of the arguments to the decorated function" ) - ins_by_asset_key: Dict[AssetKey, NamedIn] = {} + named_ins_by_asset_key: Dict[AssetKey, NamedIn] = {} for input_name in all_input_names: asset_key = None @@ -117,23 +118,23 @@ def build_named_ins( asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name]))) - ins_by_asset_key[asset_key] = NamedIn( + named_ins_by_asset_key[asset_key] = NamedIn( input_name.replace("-", "_"), In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type), ) for asset_key in deps: - if asset_key in ins_by_asset_key: + if asset_key in named_ins_by_asset_key: raise DagsterInvalidDefinitionError( f"deps value {asset_key} also declared as input/AssetIn" ) # mypy doesn't realize that Nothing is a valid type here - ins_by_asset_key[asset_key] = NamedIn( + named_ins_by_asset_key[asset_key] = NamedIn( stringify_asset_key_to_input_name(asset_key), In(cast(type, Nothing)), ) - return ins_by_asset_key + return named_ins_by_asset_key def build_named_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "NamedOut"]: @@ -150,7 +151,7 @@ def build_named_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "N return named_outs_by_asset_key -def build_subsettable_asset_ins( +def build_subsettable_named_ins( asset_ins: Mapping[AssetKey, Tuple[str, In]], asset_outs: Mapping[AssetKey, Tuple[str, Out]], internal_upstream_deps: Iterable[AbstractSet[AssetKey]], @@ -252,7 +253,7 @@ def __init__( ( { **named_ins_by_asset_key, - **build_subsettable_asset_ins( + **build_subsettable_named_ins( named_ins_by_asset_key, named_outs_by_asset_key, self.internal_deps.values(), @@ -286,27 +287,45 @@ def from_args( "Can not pass internal_asset_deps and specs to @multi_asset, specify deps on" " the AssetSpecs directly." ) - return DecoratorAssetsDefinitionBuilder.from_specs(fn=fn, op_name=op_name, args=args) + return DecoratorAssetsDefinitionBuilder.from_specs( + fn=fn, + op_name=op_name, + passed_args=args, + asset_specs=args.specs, + can_subset=args.can_subset, + asset_in_map=args.asset_in_map, + ) - return DecoratorAssetsDefinitionBuilder.from_asset_outs(fn=fn, op_name=op_name, args=args) + return DecoratorAssetsDefinitionBuilder.from_asset_outs( + fn=fn, + op_name=op_name, + asset_in_map=args.asset_in_map, + asset_out_map=args.asset_out_map, + asset_deps=args.asset_deps, + upstream_asset_deps=args.upstream_asset_deps, + passed_args=args, + ) @staticmethod def from_specs( *, fn: Callable[..., Any], op_name: str, - args: DecoratorAssetsDefinitionBuilderArgs, + asset_specs: Sequence[AssetSpec], + can_subset: bool, + asset_in_map: Mapping[str, AssetIn], + passed_args: DecoratorAssetsDefinitionBuilderArgs, ) -> "DecoratorAssetsDefinitionBuilder": - check.param_invariant(args.specs, "args", "Must use specs in this codepath") + check.param_invariant(passed_args.specs, "passed_args", "Must use specs in this codepath") named_outs_by_asset_key: Mapping[AssetKey, NamedOut] = {} - for asset_spec in args.specs: + for asset_spec in asset_specs: output_name = asset_spec.key.to_python_identifier() named_outs_by_asset_key[asset_spec.key] = NamedOut( output_name, Out( Nothing, - is_required=not (args.can_subset or asset_spec.skippable), + is_required=not (can_subset or asset_spec.skippable), description=asset_spec.description, code_version=asset_spec.code_version, metadata=asset_spec.metadata, @@ -314,7 +333,7 @@ def from_specs( ) upstream_keys = set() - for spec in args.specs: + for spec in asset_specs: for dep in spec.deps: if dep.asset_key not in named_outs_by_asset_key: upstream_keys.add(dep.asset_key) @@ -322,9 +341,8 @@ def from_specs( # self-dependent asset also needs to be considered an upstream_key upstream_keys.add(dep.asset_key) - explicit_ins = args.asset_in_map # get which asset keys have inputs set - loaded_upstreams = build_named_ins(fn, explicit_ins, deps=set()) + loaded_upstreams = build_named_ins(fn, asset_in_map, deps=set()) unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys} if unexpected_upstreams: raise DagsterInvalidDefinitionError( @@ -332,11 +350,11 @@ def from_specs( " AssetSpec(s). Set the deps on the appropriate AssetSpec(s)." ) remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams} - named_ins_by_asset_key = build_named_ins(fn, explicit_ins, deps=remaining_upstream_keys) + named_ins_by_asset_key = build_named_ins(fn, asset_in_map, deps=remaining_upstream_keys) internal_deps = { spec.key: {dep.asset_key for dep in spec.deps} - for spec in args.specs + for spec in asset_specs if spec.deps is not None } @@ -345,7 +363,7 @@ def from_specs( named_outs_by_asset_key=named_outs_by_asset_key, internal_deps=internal_deps, op_name=op_name, - args=args, + args=passed_args, fn=fn, ) @@ -354,23 +372,22 @@ def from_asset_outs( *, fn: Callable[..., Any], op_name: str, - args: DecoratorAssetsDefinitionBuilderArgs, + asset_in_map: Mapping[str, AssetIn], + asset_out_map: Mapping[str, AssetOut], + asset_deps: Mapping[str, Set[AssetKey]], + upstream_asset_deps: Optional[Iterable[AssetDep]], + passed_args: DecoratorAssetsDefinitionBuilderArgs, ): - check.param_invariant(not args.specs, "args", "This codepath for non-spec based create") - asset_out_map = args.asset_out_map + check.param_invariant( + not passed_args.specs, "args", "This codepath for non-spec based create" + ) named_ins_by_asset_key = build_named_ins( fn, - args.asset_in_map, - deps=( - {dep.asset_key for dep in args.upstream_asset_deps} - if args.upstream_asset_deps - else set() - ), + asset_in_map, + deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()), ) named_outs_by_asset_key = build_named_outs(asset_out_map) - asset_deps = args.asset_deps - # validate that the asset_ins are a subset of the upstream asset_deps. upstream_internal_asset_keys = set().union(*asset_deps.values()) asset_in_keys = set(named_ins_by_asset_key.keys()) @@ -409,7 +426,7 @@ def from_asset_outs( named_outs_by_asset_key=named_outs_by_asset_key, internal_deps=internal_deps, op_name=op_name, - args=args, + args=passed_args, fn=fn, ) @@ -528,6 +545,15 @@ def create_assets_definition(self) -> AssetsDefinition: selected_asset_check_keys=None, # not a subset so this is none ) + def create_assets_checks_definition(self) -> AssetChecksDefinition: + return AssetChecksDefinition.create( + node_def=self._create_op_definition(), + resource_defs=self.args.assets_def_resource_defs, + keys_by_input_name=self.asset_keys_by_input_names, + check_specs_by_output_name=self.check_specs_by_output_name, + can_subset=self.args.can_subset, + ) + @cached_property def specs(self) -> Sequence[AssetSpec]: specs = self.args.specs if self.args.specs else self._synthesize_specs()