From 1feff9a33256b488beb67f9e2484efb457dfb5e7 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Mon, 10 Jun 2024 18:24:25 -0400 Subject: [PATCH] eliminate middle function create_assets_def_from_fn_and_decorator_args --- .../definitions/decorators/asset_decorator.py | 158 +++++++++--------- 1 file changed, 77 insertions(+), 81 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index d261a8e2a32bd..47228d2fca22c 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -291,7 +291,83 @@ def my_asset(my_upstream_asset: int) -> int: owners=owners, ) - return create_assets_def_from_fn_and_decorator_args(args, compute_fn) + from dagster._config.pythonic_config import validate_resource_annotated_function + + fn = compute_fn + + validate_resource_annotated_function(fn) + + out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator( + key=args.key, + key_prefix=args.key_prefix, + name=args.name, + fn=fn, + decorator_name="@asset", + ) + + resource_related_state = ResourceRelatedState( + io_manager_def=args.io_manager_def, + io_manager_key=args.io_manager_key, + resources=args.resource_defs, + out_asset_key=out_asset_key, + ) + + with disable_dagster_warnings(): + builder_args = DecoratorAssetsDefinitionBuilderArgs( + name=args.name, + op_description=args.description, + check_specs_by_output_name=create_check_specs_by_output_name(args.check_specs), + group_name=args.group_name, + partitions_def=args.partitions_def, + retry_policy=args.retry_policy, + code_version=args.code_version, + op_tags=args.op_tags, + config_schema=args.config_schema, + compute_kind=args.compute_kind, + required_resource_keys=args.required_resource_keys, + op_def_resource_defs=resource_related_state.op_resource_defs, + assets_def_resource_defs=resource_related_state.asset_resource_defs, + backfill_policy=args.backfill_policy, + asset_out_map={ + DEFAULT_OUTPUT: AssetOut( + key=out_asset_key, + metadata=args.metadata, + description=args.description, + is_required=args.output_required, + io_manager_key=resource_related_state.resolved_io_manager_key, + dagster_type=args.dagster_type if args.dagster_type else NoValueSentinel, + group_name=args.group_name, + code_version=args.code_version, + freshness_policy=args.freshness_policy, + auto_materialize_policy=args.auto_materialize_policy, + backfill_policy=args.backfill_policy, + owners=args.owners, + tags=validate_tags_strict(args.tags), + ) + }, + upstream_asset_deps=args.deps, + asset_in_map=args.ins, + # We will not be using specs to construct here + # because they are assumption about output names. Non-spec + # construction path assumptions apply here + specs=[], + # no internal asset deps + asset_deps={}, + can_subset=False, + decorator_name="@asset", + ) + + builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator( + fn=fn, + op_name=out_asset_key.to_python_identifier(), + asset_in_map=builder_args.asset_in_map, + asset_out_map=builder_args.asset_out_map, + asset_deps=builder_args.asset_deps, + upstream_asset_deps=builder_args.upstream_asset_deps, + passed_args=builder_args, + ) + + return builder.create_assets_definition() def resolve_asset_key_and_name_for_decorator( @@ -395,86 +471,6 @@ def asset_resource_defs(self) -> Mapping[str, ResourceDefinition]: return wrap_resources_for_execution({**self.resources, **{io_manager_key: io_manager_def}}) -def create_assets_def_from_fn_and_decorator_args( - args: AssetDecoratorArgs, fn: Callable[..., Any] -) -> AssetsDefinition: - from dagster._config.pythonic_config import validate_resource_annotated_function - - validate_resource_annotated_function(fn) - - out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator( - key=args.key, - key_prefix=args.key_prefix, - name=args.name, - fn=fn, - decorator_name="@asset", - ) - - resource_related_state = ResourceRelatedState( - io_manager_def=args.io_manager_def, - io_manager_key=args.io_manager_key, - resources=args.resource_defs, - out_asset_key=out_asset_key, - ) - - with disable_dagster_warnings(): - builder_args = DecoratorAssetsDefinitionBuilderArgs( - name=args.name, - op_description=args.description, - check_specs_by_output_name=create_check_specs_by_output_name(args.check_specs), - group_name=args.group_name, - partitions_def=args.partitions_def, - retry_policy=args.retry_policy, - code_version=args.code_version, - op_tags=args.op_tags, - config_schema=args.config_schema, - compute_kind=args.compute_kind, - required_resource_keys=args.required_resource_keys, - op_def_resource_defs=resource_related_state.op_resource_defs, - assets_def_resource_defs=resource_related_state.asset_resource_defs, - backfill_policy=args.backfill_policy, - asset_out_map={ - DEFAULT_OUTPUT: AssetOut( - key=out_asset_key, - metadata=args.metadata, - description=args.description, - is_required=args.output_required, - io_manager_key=resource_related_state.resolved_io_manager_key, - dagster_type=args.dagster_type if args.dagster_type else NoValueSentinel, - group_name=args.group_name, - code_version=args.code_version, - freshness_policy=args.freshness_policy, - auto_materialize_policy=args.auto_materialize_policy, - backfill_policy=args.backfill_policy, - owners=args.owners, - tags=validate_tags_strict(args.tags), - ) - }, - upstream_asset_deps=args.deps, - asset_in_map=args.ins, - # We will not be using specs to construct here - # because they are assumption about output names. Non-spec - # construction path assumptions apply here - specs=[], - # no internal asset deps - asset_deps={}, - can_subset=False, - decorator_name="@asset", - ) - - builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator( - fn=fn, - op_name=out_asset_key.to_python_identifier(), - asset_in_map=builder_args.asset_in_map, - asset_out_map=builder_args.asset_out_map, - asset_deps=builder_args.asset_deps, - upstream_asset_deps=builder_args.upstream_asset_deps, - passed_args=builder_args, - ) - - return builder.create_assets_definition() - - @experimental_param(param="resource_defs") @deprecated_param( param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead."