Skip to content

Commit

Permalink
cp
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 8, 2024
1 parent d173e51 commit 720a0cc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def my_function(asset0):
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
builder = create_builder_for_multi_asset(fn=fn, args=args)
builder = DecoratorAssetsDefinitionBuilder.from_args(fn=fn, args=args)

check.invariant(
len(builder.overlapping_output_names) == 0,
Expand All @@ -629,49 +629,6 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return inner


@staticmethod
def create_builder_for_multi_asset(
*, fn: Callable[..., Any], args: DecoratorAssetsDefinitionBuilderArgs
) -> "DecoratorAssetsDefinitionBuilder":
op_name = args.name or fn.__name__

if args.asset_out_map and args.specs:
raise DagsterInvalidDefinitionError("Must specify only outs or specs but not both.")

if args.specs:
check.invariant(
args.decorator_name == "@multi_asset", "Only hit this code path in multi_asset."
)
if args.upstream_asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass deps and specs to @multi_asset, specify deps on the AssetSpecs"
" directly."
)
if args.asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass internal_asset_deps and specs to @multi_asset, specify deps on"
" the AssetSpecs directly."
)
return DecoratorAssetsDefinitionBuilder.from_specs(
fn=fn,
op_name=op_name,
passed_args=args,
asset_specs=args.specs,
can_subset=args.can_subset,
asset_in_map=args.asset_in_map,
)

return DecoratorAssetsDefinitionBuilder.from_asset_outs(
fn=fn,
op_name=op_name,
asset_in_map=args.asset_in_map,
asset_out_map=args.asset_out_map,
asset_deps=args.asset_deps,
upstream_asset_deps=args.upstream_asset_deps,
passed_args=args,
)


@overload
def graph_asset(
compose_fn: Callable[..., Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,48 @@ def __init__(
else named_ins_by_asset_key
)

@staticmethod
def from_args(
*, fn: Callable[..., Any], args: DecoratorAssetsDefinitionBuilderArgs
) -> "DecoratorAssetsDefinitionBuilder":
op_name = args.name or fn.__name__

if args.asset_out_map and args.specs:
raise DagsterInvalidDefinitionError("Must specify only outs or specs but not both.")

if args.specs:
check.invariant(
args.decorator_name == "@multi_asset", "Only hit this code path in multi_asset."
)
if args.upstream_asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass deps and specs to @multi_asset, specify deps on the AssetSpecs"
" directly."
)
if args.asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass internal_asset_deps and specs to @multi_asset, specify deps on"
" the AssetSpecs directly."
)
return DecoratorAssetsDefinitionBuilder.from_specs(
fn=fn,
op_name=op_name,
passed_args=args,
asset_specs=args.specs,
can_subset=args.can_subset,
asset_in_map=args.asset_in_map,
)

return DecoratorAssetsDefinitionBuilder.from_asset_outs(
fn=fn,
op_name=op_name,
asset_in_map=args.asset_in_map,
asset_out_map=args.asset_out_map,
asset_deps=args.asset_deps,
upstream_asset_deps=args.upstream_asset_deps,
passed_args=args,
)

@staticmethod
def from_specs(
*,
Expand Down

0 comments on commit 720a0cc

Please sign in to comment.