Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eliminate middle function create_assets_def_from_fn_and_decorator_args #22494

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,84 @@ def my_asset(my_upstream_asset: int) -> int:
owners=owners,
)

return create_assets_def_from_fn_and_decorator_args(args, compute_fn)
from dagster._config.pythonic_config import validate_resource_annotated_function

fn = compute_fn

validate_resource_annotated_function(fn)

out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator(
key=args.key,
key_prefix=args.key_prefix,
name=args.name,
fn=fn,
decorator_name="@asset",
)

resource_related_state = ResourceRelatedState(
io_manager_def=args.io_manager_def,
io_manager_key=args.io_manager_key,
resources=args.resource_defs,
out_asset_key=out_asset_key,
)

with disable_dagster_warnings():
builder_args = DecoratorAssetsDefinitionBuilderArgs(
name=args.name,
op_description=args.description,
check_specs_by_output_name=create_check_specs_by_output_name(args.check_specs),
group_name=args.group_name,
partitions_def=args.partitions_def,
retry_policy=args.retry_policy,
code_version=args.code_version,
op_tags=args.op_tags,
config_schema=args.config_schema,
compute_kind=args.compute_kind,
required_resource_keys=args.required_resource_keys,
op_def_resource_defs=resource_related_state.op_resource_defs,
assets_def_resource_defs=resource_related_state.asset_resource_defs,
backfill_policy=args.backfill_policy,
asset_out_map={
DEFAULT_OUTPUT: AssetOut(
key=out_asset_key,
metadata=args.metadata,
description=args.description,
is_required=args.output_required,
io_manager_key=resource_related_state.resolved_io_manager_key,
dagster_type=args.dagster_type if args.dagster_type else NoValueSentinel,
group_name=args.group_name,
code_version=args.code_version,
freshness_policy=args.freshness_policy,
automation_condition=args.automation_condition,
backfill_policy=args.backfill_policy,
owners=args.owners,
tags=validate_tags_strict(args.tags),
)
},
upstream_asset_deps=args.deps,
asset_in_map=args.ins,
# We will not be using specs to construct here
# because they are assumption about output names. Non-spec
# construction path assumptions apply here
specs=[],
# no internal asset deps
asset_deps={},
can_subset=False,
decorator_name="@asset",
execution_type=AssetExecutionType.MATERIALIZATION, # TODO is this correcT?
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
fn=fn,
op_name=out_asset_key.to_python_identifier(),
asset_in_map=builder_args.asset_in_map,
asset_out_map=builder_args.asset_out_map,
asset_deps=builder_args.asset_deps,
upstream_asset_deps=builder_args.upstream_asset_deps,
passed_args=builder_args,
)

return builder.create_assets_definition()


def resolve_asset_key_and_name_for_decorator(
Expand Down