Skip to content

Commit

Permalink
Eliminate middle args object
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jul 15, 2024
1 parent 7408811 commit 4d6f77b
Showing 1 changed file with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,13 @@ def my_asset(my_upstream_asset: int) -> int:
owners=owners,
)

from dagster._config.pythonic_config import validate_resource_annotated_function

compute_kind = check.opt_str_param(compute_kind, "compute_kind")
required_resource_keys = check.opt_set_param(required_resource_keys, "required_resource_keys")
upstream_asset_deps = _deps_and_non_argument_deps_to_asset_deps(
deps=deps, non_argument_deps=non_argument_deps
upstream_asset_deps = (
_deps_and_non_argument_deps_to_asset_deps(deps=deps, non_argument_deps=non_argument_deps)
or []
)
resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))

Expand Down Expand Up @@ -307,49 +310,47 @@ def my_asset(my_upstream_asset: int) -> int:
owners=owners,
)

from dagster._config.pythonic_config import validate_resource_annotated_function

fn = compute_fn

validate_resource_annotated_function(fn)

out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator(
key=args.key,
key_prefix=args.key_prefix,
name=args.name,
fn=fn,
key=key,
key_prefix=key_prefix,
name=name,
fn=compute_fn,
decorator_name="@asset",
)

resource_related_state = ResourceRelatedState(
io_manager_def=args.io_manager_def,
io_manager_key=args.io_manager_key,
resources=args.resource_defs,
io_manager_def=io_manager_def,
io_manager_key=io_manager_key,
resources=resource_defs,
out_asset_key=out_asset_key,
)

with disable_dagster_warnings():
builder_args = DecoratorAssetsDefinitionBuilderArgs(
name=args.name,
op_description=args.description,
check_specs_by_output_name=create_check_specs_by_output_name(args.check_specs),
group_name=args.group_name,
partitions_def=args.partitions_def,
retry_policy=args.retry_policy,
code_version=args.code_version,
op_tags=args.op_tags,
config_schema=args.config_schema,
compute_kind=args.compute_kind,
required_resource_keys=args.required_resource_keys,
name=name,
op_description=description,
check_specs_by_output_name=create_check_specs_by_output_name(check_specs),
group_name=group_name,
partitions_def=partitions_def,
retry_policy=retry_policy,
code_version=code_version,
op_tags=op_tags,
config_schema=config_schema,
compute_kind=compute_kind,
required_resource_keys=required_resource_keys,
op_def_resource_defs=resource_related_state.op_resource_defs,
assets_def_resource_defs=resource_related_state.asset_resource_defs,
backfill_policy=args.backfill_policy,
backfill_policy=backfill_policy,
asset_out_map={
DEFAULT_OUTPUT: AssetOut(
key=out_asset_key,
metadata=args.metadata,
description=args.description,
is_required=args.output_required,
metadata=metadata,
description=description,
is_required=output_required,
io_manager_key=resource_related_state.resolved_io_manager_key,
dagster_type=args.dagster_type if args.dagster_type else NoValueSentinel,
group_name=args.group_name,
Expand All @@ -361,8 +362,8 @@ def my_asset(my_upstream_asset: int) -> int:
tags=validate_tags_strict(args.tags),
)
},
upstream_asset_deps=args.deps,
asset_in_map=args.ins,
upstream_asset_deps=upstream_asset_deps,
asset_in_map=ins or {},
# We will not be using specs to construct here
# because they are assumption about output names. Non-spec
# construction path assumptions apply here
Expand All @@ -375,7 +376,7 @@ def my_asset(my_upstream_asset: int) -> int:
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
fn=fn,
fn=compute_fn,
op_name=out_asset_key.to_python_identifier(),
asset_in_map=builder_args.asset_in_map,
asset_out_map=builder_args.asset_out_map,
Expand Down

0 comments on commit 4d6f77b

Please sign in to comment.