Skip to content

Commit

Permalink
refactor builder creation
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 8, 2024
1 parent abb0f39 commit 51b04db
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,17 @@ def create_assets_def_from_fn_and_decorator_args(
can_subset=False,
decorator_name="@asset",
)

builder = DecoratorAssetsDefinitionBuilder.from_asset_outs(
args=builder_args, fn=fn, op_name=out_asset_key.to_python_identifier()
fn=fn,
op_name=out_asset_key.to_python_identifier(),
asset_in_map=builder_args.asset_in_map,
asset_out_map=builder_args.asset_out_map,
asset_deps=builder_args.asset_deps,
upstream_asset_deps=builder_args.upstream_asset_deps,
passed_args=builder_args,
)

return builder.create_assets_definition()


Expand Down Expand Up @@ -608,7 +616,7 @@ def my_function(asset0):
)

def inner(fn: Callable[..., Any]) -> AssetsDefinition:
builder = DecoratorAssetsDefinitionBuilder.from_args(args=args, fn=fn)
builder = create_builder_for_multi_asset(fn=fn, args=args)

check.invariant(
len(builder.overlapping_output_names) == 0,
Expand All @@ -621,6 +629,49 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
return inner


@staticmethod
def create_builder_for_multi_asset(
*, fn: Callable[..., Any], args: DecoratorAssetsDefinitionBuilderArgs
) -> "DecoratorAssetsDefinitionBuilder":
op_name = args.name or fn.__name__

if args.asset_out_map and args.specs:
raise DagsterInvalidDefinitionError("Must specify only outs or specs but not both.")

if args.specs:
check.invariant(
args.decorator_name == "@multi_asset", "Only hit this code path in multi_asset."
)
if args.upstream_asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass deps and specs to @multi_asset, specify deps on the AssetSpecs"
" directly."
)
if args.asset_deps:
raise DagsterInvalidDefinitionError(
"Can not pass internal_asset_deps and specs to @multi_asset, specify deps on"
" the AssetSpecs directly."
)
return DecoratorAssetsDefinitionBuilder.from_specs(
fn=fn,
op_name=op_name,
passed_args=args,
asset_specs=args.specs,
can_subset=args.can_subset,
asset_in_map=args.asset_in_map,
)

return DecoratorAssetsDefinitionBuilder.from_asset_outs(
fn=fn,
op_name=op_name,
asset_in_map=args.asset_in_map,
asset_out_map=args.asset_out_map,
asset_deps=args.asset_deps,
upstream_asset_deps=args.upstream_asset_deps,
passed_args=args,
)


@overload
def graph_asset(
compose_fn: Callable[..., Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_in import AssetIn
from dagster._core.definitions.asset_key import AssetKey
Expand Down Expand Up @@ -99,7 +100,7 @@ def build_named_ins(
"of the arguments to the decorated function"
)

ins_by_asset_key: Dict[AssetKey, NamedIn] = {}
named_ins_by_asset_key: Dict[AssetKey, NamedIn] = {}
for input_name in all_input_names:
asset_key = None

Expand All @@ -117,23 +118,23 @@ def build_named_ins(

asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name])))

ins_by_asset_key[asset_key] = NamedIn(
named_ins_by_asset_key[asset_key] = NamedIn(
input_name.replace("-", "_"),
In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type),
)

for asset_key in deps:
if asset_key in ins_by_asset_key:
if asset_key in named_ins_by_asset_key:
raise DagsterInvalidDefinitionError(
f"deps value {asset_key} also declared as input/AssetIn"
)
# mypy doesn't realize that Nothing is a valid type here
ins_by_asset_key[asset_key] = NamedIn(
named_ins_by_asset_key[asset_key] = NamedIn(
stringify_asset_key_to_input_name(asset_key),
In(cast(type, Nothing)),
)

return ins_by_asset_key
return named_ins_by_asset_key


def build_named_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "NamedOut"]:
Expand All @@ -150,7 +151,7 @@ def build_named_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "N
return named_outs_by_asset_key


def build_subsettable_asset_ins(
def build_subsettable_named_ins(
asset_ins: Mapping[AssetKey, Tuple[str, In]],
asset_outs: Mapping[AssetKey, Tuple[str, Out]],
internal_upstream_deps: Iterable[AbstractSet[AssetKey]],
Expand Down Expand Up @@ -252,7 +253,7 @@ def __init__(
(
{
**named_ins_by_asset_key,
**build_subsettable_asset_ins(
**build_subsettable_named_ins(
named_ins_by_asset_key,
named_outs_by_asset_key,
self.internal_deps.values(),
Expand Down Expand Up @@ -286,57 +287,74 @@ def from_args(
"Can not pass internal_asset_deps and specs to @multi_asset, specify deps on"
" the AssetSpecs directly."
)
return DecoratorAssetsDefinitionBuilder.from_specs(fn=fn, op_name=op_name, args=args)
return DecoratorAssetsDefinitionBuilder.from_specs(
fn=fn,
op_name=op_name,
passed_args=args,
asset_specs=args.specs,
can_subset=args.can_subset,
asset_in_map=args.asset_in_map,
)

return DecoratorAssetsDefinitionBuilder.from_asset_outs(fn=fn, op_name=op_name, args=args)
return DecoratorAssetsDefinitionBuilder.from_asset_outs(
fn=fn,
op_name=op_name,
asset_in_map=args.asset_in_map,
asset_out_map=args.asset_out_map,
asset_deps=args.asset_deps,
upstream_asset_deps=args.upstream_asset_deps,
passed_args=args,
)

@staticmethod
def from_specs(
*,
fn: Callable[..., Any],
op_name: str,
args: DecoratorAssetsDefinitionBuilderArgs,
asset_specs: Sequence[AssetSpec],
can_subset: bool,
asset_in_map: Mapping[str, AssetIn],
passed_args: DecoratorAssetsDefinitionBuilderArgs,
) -> "DecoratorAssetsDefinitionBuilder":
check.param_invariant(args.specs, "args", "Must use specs in this codepath")
check.param_invariant(passed_args.specs, "passed_args", "Must use specs in this codepath")

named_outs_by_asset_key: Mapping[AssetKey, NamedOut] = {}
for asset_spec in args.specs:
for asset_spec in asset_specs:
output_name = asset_spec.key.to_python_identifier()
named_outs_by_asset_key[asset_spec.key] = NamedOut(
output_name,
Out(
Nothing,
is_required=not (args.can_subset or asset_spec.skippable),
is_required=not (can_subset or asset_spec.skippable),
description=asset_spec.description,
code_version=asset_spec.code_version,
metadata=asset_spec.metadata,
),
)

upstream_keys = set()
for spec in args.specs:
for spec in asset_specs:
for dep in spec.deps:
if dep.asset_key not in named_outs_by_asset_key:
upstream_keys.add(dep.asset_key)
if dep.asset_key in named_outs_by_asset_key and dep.partition_mapping is not None:
# self-dependent asset also needs to be considered an upstream_key
upstream_keys.add(dep.asset_key)

explicit_ins = args.asset_in_map
# get which asset keys have inputs set
loaded_upstreams = build_named_ins(fn, explicit_ins, deps=set())
loaded_upstreams = build_named_ins(fn, asset_in_map, deps=set())
unexpected_upstreams = {key for key in loaded_upstreams.keys() if key not in upstream_keys}
if unexpected_upstreams:
raise DagsterInvalidDefinitionError(
f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed"
" AssetSpec(s). Set the deps on the appropriate AssetSpec(s)."
)
remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams}
named_ins_by_asset_key = build_named_ins(fn, explicit_ins, deps=remaining_upstream_keys)
named_ins_by_asset_key = build_named_ins(fn, asset_in_map, deps=remaining_upstream_keys)

internal_deps = {
spec.key: {dep.asset_key for dep in spec.deps}
for spec in args.specs
for spec in asset_specs
if spec.deps is not None
}

Expand All @@ -345,7 +363,7 @@ def from_specs(
named_outs_by_asset_key=named_outs_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
args=args,
args=passed_args,
fn=fn,
)

Expand All @@ -354,23 +372,22 @@ def from_asset_outs(
*,
fn: Callable[..., Any],
op_name: str,
args: DecoratorAssetsDefinitionBuilderArgs,
asset_in_map: Mapping[str, AssetIn],
asset_out_map: Mapping[str, AssetOut],
asset_deps: Mapping[str, Set[AssetKey]],
upstream_asset_deps: Optional[Iterable[AssetDep]],
passed_args: DecoratorAssetsDefinitionBuilderArgs,
):
check.param_invariant(not args.specs, "args", "This codepath for non-spec based create")
asset_out_map = args.asset_out_map
check.param_invariant(
not passed_args.specs, "args", "This codepath for non-spec based create"
)
named_ins_by_asset_key = build_named_ins(
fn,
args.asset_in_map,
deps=(
{dep.asset_key for dep in args.upstream_asset_deps}
if args.upstream_asset_deps
else set()
),
asset_in_map,
deps=({dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set()),
)
named_outs_by_asset_key = build_named_outs(asset_out_map)

asset_deps = args.asset_deps

# validate that the asset_ins are a subset of the upstream asset_deps.
upstream_internal_asset_keys = set().union(*asset_deps.values())
asset_in_keys = set(named_ins_by_asset_key.keys())
Expand Down Expand Up @@ -409,7 +426,7 @@ def from_asset_outs(
named_outs_by_asset_key=named_outs_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
args=args,
args=passed_args,
fn=fn,
)

Expand Down Expand Up @@ -528,6 +545,15 @@ def create_assets_definition(self) -> AssetsDefinition:
selected_asset_check_keys=None, # not a subset so this is none
)

def create_assets_checks_definition(self) -> AssetChecksDefinition:
return AssetChecksDefinition.create(
node_def=self._create_op_definition(),
resource_defs=self.args.assets_def_resource_defs,
keys_by_input_name=self.asset_keys_by_input_names,
check_specs_by_output_name=self.check_specs_by_output_name,
can_subset=self.args.can_subset,
)

@cached_property
def specs(self) -> Sequence[AssetSpec]:
specs = self.args.specs if self.args.specs else self._synthesize_specs()
Expand Down

0 comments on commit 51b04db

Please sign in to comment.