diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 12d5858f2b91b..b3e173b81ea92 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -448,9 +448,17 @@ def create_assets_def_from_fn_and_decorator_args( can_subset=False, decorator_name="@asset", ) + builder = DecoratorAssetsDefinitionBuilder.from_asset_outs( - args=builder_args, fn=fn, op_name=out_asset_key.to_python_identifier() + fn=fn, + op_name=out_asset_key.to_python_identifier(), + asset_in_map=builder_args.asset_in_map, + asset_out_map=builder_args.asset_out_map, + asset_deps=builder_args.asset_deps, + upstream_asset_deps=builder_args.upstream_asset_deps, + passed_args=builder_args, ) + return builder.create_assets_definition() diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py index 40b997553bf99..46ef158dbbf02 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py @@ -286,27 +286,45 @@ def from_args( "Can not pass internal_asset_deps and specs to @multi_asset, specify deps on" " the AssetSpecs directly." ) - return DecoratorAssetsDefinitionBuilder.from_specs(fn=fn, op_name=op_name, args=args) + return DecoratorAssetsDefinitionBuilder.from_specs( + fn=fn, + op_name=op_name, + passed_args=args, + asset_specs=args.specs, + can_subset=args.can_subset, + asset_in_map=args.asset_in_map, + ) - return DecoratorAssetsDefinitionBuilder.from_asset_outs(fn=fn, op_name=op_name, args=args) + return DecoratorAssetsDefinitionBuilder.from_asset_outs( + fn=fn, + op_name=op_name, + asset_in_map=args.asset_in_map, + asset_out_map=args.asset_out_map, + asset_deps=args.asset_deps, + upstream_asset_deps=args.upstream_asset_deps, + passed_args=args, + ) @staticmethod def from_specs( *, fn: Callable[..., Any], op_name: str, - args: DecoratorAssetsDefinitionBuilderArgs, + asset_specs: Sequence[AssetSpec], + can_subset: bool, + asset_in_map: Mapping[str, AssetIn], + passed_args: DecoratorAssetsDefinitionBuilderArgs, ) -> "DecoratorAssetsDefinitionBuilder": - check.param_invariant(args.specs, "args", "Must use specs in this codepath") + check.param_invariant(passed_args.specs, "passed_args", "Must use specs in this codepath") named_outs_by_asset_key: Mapping[AssetKey, NamedOut] = {} - for asset_spec in args.specs: + for asset_spec in asset_specs: output_name = asset_spec.key.to_python_identifier() named_outs_by_asset_key[asset_spec.key] = NamedOut( output_name, Out( Nothing, - is_required=not (args.can_subset or asset_spec.skippable), + is_required=not (can_subset or asset_spec.skippable), description=asset_spec.description, code_version=asset_spec.code_version, metadata=asset_spec.metadata, @@ -314,7 +332,7 @@ def from_specs( ) upstream_keys = set() - for spec in args.specs: + for spec in asset_specs: for dep in spec.deps: if dep.asset_key not in named_outs_by_asset_key: upstream_keys.add(dep.asset_key) @@ -322,9 +340,8 @@ def from_specs( # self-dependent asset also needs to be considered an upstream_key upstream_keys.add(dep.asset_key) - explicit_ins = args.asset_in_map # get which asset keys have inputs set - loaded_upstreams = build_named_ins(fn, explicit_ins, deps=set()) + loaded_upstreams = build_named_ins(fn, asset_in_map, deps=set()) unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys} if unexpected_upstreams: raise DagsterInvalidDefinitionError( @@ -332,11 +349,11 @@ def from_specs( " AssetSpec(s). Set the deps on the appropriate AssetSpec(s)." ) remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams} - named_ins_by_asset_key = build_named_ins(fn, explicit_ins, deps=remaining_upstream_keys) + named_ins_by_asset_key = build_named_ins(fn, asset_in_map, deps=remaining_upstream_keys) internal_deps = { spec.key: {dep.asset_key for dep in spec.deps} - for spec in args.specs + for spec in asset_specs if spec.deps is not None } @@ -345,7 +362,7 @@ def from_specs( named_outs_by_asset_key=named_outs_by_asset_key, internal_deps=internal_deps, op_name=op_name, - args=args, + args=passed_args, fn=fn, ) @@ -354,23 +371,22 @@ def from_asset_outs( *, fn: Callable[..., Any], op_name: str, - args: DecoratorAssetsDefinitionBuilderArgs, + asset_in_map: Mapping[str, AssetIn], + asset_out_map: Mapping[str, AssetOut], + asset_deps: Mapping[str, Set[AssetKey]], + upstream_asset_deps: Optional[Iterable[AssetDep]], + passed_args: DecoratorAssetsDefinitionBuilderArgs, ): - check.param_invariant(not args.specs, "args", "This codepath for non-spec based create") - asset_out_map = args.asset_out_map + check.param_invariant( + not passed_args.specs, "args", "This codepath for non-spec based create" + ) named_ins_by_asset_key = build_named_ins( fn, - args.asset_in_map, - deps=( - {dep.asset_key for dep in args.upstream_asset_deps} - if args.upstream_asset_deps - else set() - ), + asset_in_map, + deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()), ) named_outs_by_asset_key = build_named_outs(asset_out_map) - asset_deps = args.asset_deps - # validate that the asset_ins are a subset of the upstream asset_deps. upstream_internal_asset_keys = set().union(*asset_deps.values()) asset_in_keys = set(named_ins_by_asset_key.keys()) @@ -409,7 +425,7 @@ def from_asset_outs( named_outs_by_asset_key=named_outs_by_asset_key, internal_deps=internal_deps, op_name=op_name, - args=args, + args=passed_args, fn=fn, )