Skip to content

Commit

Permalink
refactor builder creation
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 8, 2024
1 parent 49b5c3d commit 13657b8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,17 @@ def create_assets_def_from_fn_and_decorator_args(
can_subset=False,
decorator_name="@asset",
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs(
args=builder_args, fn=fn, op_name=out_asset_key.to_python_identifier()
fn=fn,
op_name=out_asset_key.to_python_identifier(),
asset_in_map=builder_args.asset_in_map,
asset_out_map=builder_args.asset_out_map,
asset_deps=builder_args.asset_deps,
upstream_asset_deps=builder_args.upstream_asset_deps,
passed_args=builder_args,
)

return builder.create_assets_definition()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,57 +286,74 @@ def from_args(
"Can not pass internal_asset_deps and specs to @multi_asset, specify deps on"
" the AssetSpecs directly."
)
return DecoratorAssetsDefinitionBuilder.from_specs(fn=fn, op_name=op_name, args=args)
return DecoratorAssetsDefinitionBuilder.from_specs(
fn=fn,
op_name=op_name,
passed_args=args,
asset_specs=args.specs,
can_subset=args.can_subset,
asset_in_map=args.asset_in_map,
)

return DecoratorAssetsDefinitionBuilder.from_asset_outs(fn=fn, op_name=op_name, args=args)
return DecoratorAssetsDefinitionBuilder.from_asset_outs(
fn=fn,
op_name=op_name,
asset_in_map=args.asset_in_map,
asset_out_map=args.asset_out_map,
asset_deps=args.asset_deps,
upstream_asset_deps=args.upstream_asset_deps,
passed_args=args,
)

@staticmethod
def from_specs(
*,
fn: Callable[..., Any],
op_name: str,
args: DecoratorAssetsDefinitionBuilderArgs,
asset_specs: Sequence[AssetSpec],
can_subset: bool,
asset_in_map: Mapping[str, AssetIn],
passed_args: DecoratorAssetsDefinitionBuilderArgs,
) -> "DecoratorAssetsDefinitionBuilder":
check.param_invariant(args.specs, "args", "Must use specs in this codepath")
check.param_invariant(passed_args.specs, "passed_args", "Must use specs in this codepath")

named_outs_by_asset_key: Mapping[AssetKey, NamedOut] = {}
for asset_spec in args.specs:
for asset_spec in asset_specs:
output_name = asset_spec.key.to_python_identifier()
named_outs_by_asset_key[asset_spec.key] = NamedOut(
output_name,
Out(
Nothing,
is_required=not (args.can_subset or asset_spec.skippable),
is_required=not (can_subset or asset_spec.skippable),
description=asset_spec.description,
code_version=asset_spec.code_version,
metadata=asset_spec.metadata,
),
)

upstream_keys = set()
for spec in args.specs:
for spec in asset_specs:
for dep in spec.deps:
if dep.asset_key not in named_outs_by_asset_key:
upstream_keys.add(dep.asset_key)
if dep.asset_key in named_outs_by_asset_key and dep.partition_mapping is not None:
# self-dependent asset also needs to be considered an upstream_key
upstream_keys.add(dep.asset_key)

explicit_ins = args.asset_in_map
# get which asset keys have inputs set
loaded_upstreams = build_named_ins(fn, explicit_ins, deps=set())
loaded_upstreams = build_named_ins(fn, asset_in_map, deps=set())
unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys}
if unexpected_upstreams:
raise DagsterInvalidDefinitionError(
f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed"
" AssetSpec(s). Set the deps on the appropriate AssetSpec(s)."
)
remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams}
named_ins_by_asset_key = build_named_ins(fn, explicit_ins, deps=remaining_upstream_keys)
named_ins_by_asset_key = build_named_ins(fn, asset_in_map, deps=remaining_upstream_keys)

internal_deps = {
spec.key: {dep.asset_key for dep in spec.deps}
for spec in args.specs
for spec in asset_specs
if spec.deps is not None
}

Expand All @@ -345,7 +362,7 @@ def from_specs(
named_outs_by_asset_key=named_outs_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
args=args,
args=passed_args,
fn=fn,
)

Expand All @@ -354,23 +371,22 @@ def from_asset_outs(
*,
fn: Callable[..., Any],
op_name: str,
args: DecoratorAssetsDefinitionBuilderArgs,
asset_in_map: Mapping[str, AssetIn],
asset_out_map: Mapping[str, AssetOut],
asset_deps: Mapping[str, Set[AssetKey]],
upstream_asset_deps: Optional[Iterable[AssetDep]],
passed_args: DecoratorAssetsDefinitionBuilderArgs,
):
check.param_invariant(not args.specs, "args", "This codepath for non-spec based create")
asset_out_map = args.asset_out_map
check.param_invariant(
not passed_args.specs, "args", "This codepath for non-spec based create"
)
named_ins_by_asset_key = build_named_ins(
fn,
args.asset_in_map,
deps=(
{dep.asset_key for dep in args.upstream_asset_deps}
if args.upstream_asset_deps
else set()
),
asset_in_map,
deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()),
)
named_outs_by_asset_key = build_named_outs(asset_out_map)

asset_deps = args.asset_deps

# validate that the asset_ins are a subset of the upstream asset_deps.
upstream_internal_asset_keys = set().union(*asset_deps.values())
asset_in_keys = set(named_ins_by_asset_key.keys())
Expand Down Expand Up @@ -409,7 +425,7 @@ def from_asset_outs(
named_outs_by_asset_key=named_outs_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
args=args,
args=passed_args,
fn=fn,
)

Expand Down

0 comments on commit 13657b8

Please sign in to comment.