Skip to content

Commit

Permalink
use better pattern for decorator compute_fn is Nomne
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 13, 2024
1 parent eb20b87 commit 08d3ace
Showing 1 changed file with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,51 @@ def asset(
def my_asset(my_upstream_asset: int) -> int:
return my_upstream_asset + 1
"""
if compute_fn is None:
return lambda fn: asset( # type: ignore
compute_fn=fn,
name=name,
key_prefix=key_prefix,
ins=ins,
deps=deps,
metadata=metadata,
tags=tags,
description=description,
config_schema=config_schema,
required_resource_keys=required_resource_keys,
resource_defs=resource_defs,
io_manager_def=io_manager_def,
io_manager_key=io_manager_key,
compute_kind=compute_kind,
dagster_type=dagster_type,
partitions_def=partitions_def,
op_tags=op_tags,
group_name=group_name,
output_required=output_required,
freshness_policy=freshness_policy,
auto_materialize_policy=auto_materialize_policy,
backfill_policy=backfill_policy,
retry_policy=retry_policy,
code_version=code_version,
key=key,
non_argument_deps=non_argument_deps,
check_specs=check_specs,
owners=owners,
)

compute_kind = check.opt_str_param(compute_kind, "compute_kind")
required_resource_keys = check.opt_set_param(required_resource_keys, "required_resource_keys")
upstream_asset_deps = _deps_and_non_argument_deps_to_asset_deps(
deps=deps, non_argument_deps=non_argument_deps
)
resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs"))

check.invariant(
not (io_manager_key and io_manager_def),
"Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please"
" provide one or the other. ",
)

args = AssetDecoratorArgs(
name=name,
key_prefix=key_prefix,
Expand Down Expand Up @@ -253,18 +291,7 @@ def my_asset(my_upstream_asset: int) -> int:
owners=owners,
)

if compute_fn is not None:
return create_assets_def_from_fn_and_decorator_args(args, compute_fn)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
check.invariant(
not (io_manager_key and io_manager_def),
"Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please"
" provide one or the other. ",
)
return create_assets_def_from_fn_and_decorator_args(args, fn)

return inner
return create_assets_def_from_fn_and_decorator_args(args, compute_fn)


def resolve_asset_key_and_name_for_decorator(
Expand Down Expand Up @@ -729,7 +756,7 @@ def slack_files_table():
return store_files(fetch_files_from_slack())
"""
if compose_fn is None:
return lambda fn: graph_asset(
return lambda fn: graph_asset( # type: ignore
fn,
name=name,
description=description,
Expand Down

0 comments on commit 08d3ace

Please sign in to comment.