Skip to content

Commit

Permalink
eliminate middle function create_assets_def_from_fn_and_decorator_args
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 13, 2024
1 parent 08d3ace commit 1feff9a
Showing 1 changed file with 77 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,83 @@ def my_asset(my_upstream_asset: int) -> int:
owners=owners,
)

return create_assets_def_from_fn_and_decorator_args(args, compute_fn)
from dagster._config.pythonic_config import validate_resource_annotated_function

fn = compute_fn

validate_resource_annotated_function(fn)

out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator(
key=args.key,
key_prefix=args.key_prefix,
name=args.name,
fn=fn,
decorator_name="@asset",
)

resource_related_state = ResourceRelatedState(
io_manager_def=args.io_manager_def,
io_manager_key=args.io_manager_key,
resources=args.resource_defs,
out_asset_key=out_asset_key,
)

with disable_dagster_warnings():
builder_args = DecoratorAssetsDefinitionBuilderArgs(
name=args.name,
op_description=args.description,
check_specs_by_output_name=create_check_specs_by_output_name(args.check_specs),
group_name=args.group_name,
partitions_def=args.partitions_def,
retry_policy=args.retry_policy,
code_version=args.code_version,
op_tags=args.op_tags,
config_schema=args.config_schema,
compute_kind=args.compute_kind,
required_resource_keys=args.required_resource_keys,
op_def_resource_defs=resource_related_state.op_resource_defs,
assets_def_resource_defs=resource_related_state.asset_resource_defs,
backfill_policy=args.backfill_policy,
asset_out_map={
DEFAULT_OUTPUT: AssetOut(
key=out_asset_key,
metadata=args.metadata,
description=args.description,
is_required=args.output_required,
io_manager_key=resource_related_state.resolved_io_manager_key,
dagster_type=args.dagster_type if args.dagster_type else NoValueSentinel,
group_name=args.group_name,
code_version=args.code_version,
freshness_policy=args.freshness_policy,
auto_materialize_policy=args.auto_materialize_policy,
backfill_policy=args.backfill_policy,
owners=args.owners,
tags=validate_tags_strict(args.tags),
)
},
upstream_asset_deps=args.deps,
asset_in_map=args.ins,
# We will not be using specs to construct here
# because they are assumption about output names. Non-spec
# construction path assumptions apply here
specs=[],
# no internal asset deps
asset_deps={},
can_subset=False,
decorator_name="@asset",
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
fn=fn,
op_name=out_asset_key.to_python_identifier(),
asset_in_map=builder_args.asset_in_map,
asset_out_map=builder_args.asset_out_map,
asset_deps=builder_args.asset_deps,
upstream_asset_deps=builder_args.upstream_asset_deps,
passed_args=builder_args,
)

return builder.create_assets_definition()


def resolve_asset_key_and_name_for_decorator(
Expand Down Expand Up @@ -395,86 +471,6 @@ def asset_resource_defs(self) -> Mapping[str, ResourceDefinition]:
return wrap_resources_for_execution({**self.resources, **{io_manager_key: io_manager_def}})


def create_assets_def_from_fn_and_decorator_args(
args: AssetDecoratorArgs, fn: Callable[..., Any]
) -> AssetsDefinition:
from dagster._config.pythonic_config import validate_resource_annotated_function

validate_resource_annotated_function(fn)

out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator(
key=args.key,
key_prefix=args.key_prefix,
name=args.name,
fn=fn,
decorator_name="@asset",
)

resource_related_state = ResourceRelatedState(
io_manager_def=args.io_manager_def,
io_manager_key=args.io_manager_key,
resources=args.resource_defs,
out_asset_key=out_asset_key,
)

with disable_dagster_warnings():
builder_args = DecoratorAssetsDefinitionBuilderArgs(
name=args.name,
op_description=args.description,
check_specs_by_output_name=create_check_specs_by_output_name(args.check_specs),
group_name=args.group_name,
partitions_def=args.partitions_def,
retry_policy=args.retry_policy,
code_version=args.code_version,
op_tags=args.op_tags,
config_schema=args.config_schema,
compute_kind=args.compute_kind,
required_resource_keys=args.required_resource_keys,
op_def_resource_defs=resource_related_state.op_resource_defs,
assets_def_resource_defs=resource_related_state.asset_resource_defs,
backfill_policy=args.backfill_policy,
asset_out_map={
DEFAULT_OUTPUT: AssetOut(
key=out_asset_key,
metadata=args.metadata,
description=args.description,
is_required=args.output_required,
io_manager_key=resource_related_state.resolved_io_manager_key,
dagster_type=args.dagster_type if args.dagster_type else NoValueSentinel,
group_name=args.group_name,
code_version=args.code_version,
freshness_policy=args.freshness_policy,
auto_materialize_policy=args.auto_materialize_policy,
backfill_policy=args.backfill_policy,
owners=args.owners,
tags=validate_tags_strict(args.tags),
)
},
upstream_asset_deps=args.deps,
asset_in_map=args.ins,
# We will not be using specs to construct here
# because they are assumption about output names. Non-spec
# construction path assumptions apply here
specs=[],
# no internal asset deps
asset_deps={},
can_subset=False,
decorator_name="@asset",
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
fn=fn,
op_name=out_asset_key.to_python_identifier(),
asset_in_map=builder_args.asset_in_map,
asset_out_map=builder_args.asset_out_map,
asset_deps=builder_args.asset_deps,
upstream_asset_deps=builder_args.upstream_asset_deps,
passed_args=builder_args,
)

return builder.create_assets_definition()


@experimental_param(param="resource_defs")
@deprecated_param(
param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead."
Expand Down

0 comments on commit 1feff9a

Please sign in to comment.