Skip to content

Commit

Permalink
inmapping outmapping
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 4, 2024
1 parent b85de60 commit 977ee45
Showing 1 changed file with 51 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def build_asset_ins(
fn: Callable[..., Any],
asset_ins: Mapping[str, AssetIn],
deps: Optional[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
) -> Mapping[AssetKey, "NamedIn"]:
"""Creates a mapping from AssetKey to (name of input, In object)."""
deps = check.opt_set_param(deps, "deps", AssetKey)

Expand All @@ -98,7 +98,7 @@ def build_asset_ins(
"of the arguments to the decorated function"
)

ins_by_asset_key: Dict[AssetKey, Tuple[str, In]] = {}
ins_by_asset_key: Dict[AssetKey, NamedIn] = {}
for input_name in all_input_names:
asset_key = None

Expand All @@ -116,7 +116,7 @@ def build_asset_ins(

asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name])))

ins_by_asset_key[asset_key] = (
ins_by_asset_key[asset_key] = NamedIn(
input_name.replace("-", "_"),
In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type),
)
Expand All @@ -127,33 +127,33 @@ def build_asset_ins(
f"deps value {asset_key} also declared as input/AssetIn"
)
# mypy doesn't realize that Nothing is a valid type here
ins_by_asset_key[asset_key] = (
ins_by_asset_key[asset_key] = NamedIn(
stringify_asset_key_to_input_name(asset_key),
In(cast(type, Nothing)),
)

return ins_by_asset_key


def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]:
def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, "NamedOut"]:
"""Creates a mapping from AssetKey to (name of output, Out object)."""
outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {}
named_outs_by_asset_key: Dict[AssetKey, NamedOut] = {}
for output_name, asset_out in asset_outs.items():
out = asset_out.to_out()
asset_key = asset_out.key or AssetKey(
list(filter(None, [*(asset_out.key_prefix or []), output_name]))
)

outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out)
named_outs_by_asset_key[asset_key] = NamedOut(output_name.replace("-", "_"), out)

return outs_by_asset_key
return named_outs_by_asset_key


def build_subsettable_asset_ins(
asset_ins: Mapping[AssetKey, Tuple[str, In]],
asset_outs: Mapping[AssetKey, Tuple[str, Out]],
internal_upstream_deps: Iterable[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
) -> Mapping[AssetKey, "NamedIn"]:
"""Creates a mapping from AssetKey to (name of input, In object) for any asset key that is not
currently an input, but may become one if this asset is subset.
Expand All @@ -165,18 +165,18 @@ def build_subsettable_asset_ins(
# set of asset keys which are upstream of another asset, and are not currently inputs
potential_deps = set().union(*internal_upstream_deps).difference(set(asset_ins.keys()))
return {
key: (f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing))
key: NamedIn(f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing))
for key, (name, _) in asset_outs.items()
if key in potential_deps
}


class InMapping(NamedTuple):
class NamedIn(NamedTuple):
input_name: str
input: In


class OutMapping(NamedTuple):
class NamedOut(NamedTuple):
output_name: str
output: Out

Expand Down Expand Up @@ -234,20 +234,34 @@ class AssetsDefinitionBuilder:
def __init__(
self,
*,
input_tuples_by_asset_key: Mapping[AssetKey, Tuple[str, In]],
output_tables_by_asset_key: Mapping[AssetKey, Tuple[str, Out]],
named_ins_by_asset_key: Mapping[AssetKey, NamedIn],
named_outs_by_asset_key: Mapping[AssetKey, NamedOut],
internal_deps: Mapping[AssetKey, Set[AssetKey]],
op_name: str,
multi_asset_args: AssetsDefinitionBuilderArgs,
fn: Callable[..., Any],
) -> None:
self._passed_input_tuples_by_asset_key = input_tuples_by_asset_key
self.output_tuples_by_asset_key = output_tables_by_asset_key
self.named_outs_by_asset_key = named_outs_by_asset_key
self.internal_deps = internal_deps
self.op_name = op_name
self.args = multi_asset_args
self.fn = fn

self.named_ins_by_asset_key = (
(
{
**named_ins_by_asset_key,
**build_subsettable_asset_ins(
named_ins_by_asset_key,
named_outs_by_asset_key,
self.internal_deps.values(),
),
}
)
if self.args.can_subset and self.internal_deps
else named_ins_by_asset_key
)

@staticmethod
def from_args(*, fn: Callable[..., Any], args: AssetsDefinitionBuilderArgs):
op_name = args.name or fn.__name__
Expand Down Expand Up @@ -279,10 +293,10 @@ def from_specs(
):
check.param_invariant(args.specs, "args", "Must use specs in this codepath")

output_tuples_by_asset_key = {}
named_outs_by_asset_key: Mapping[AssetKey, NamedOut] = {}
for asset_spec in args.specs:
output_name = asset_spec.key.to_python_identifier()
output_tuples_by_asset_key[asset_spec.key] = (
named_outs_by_asset_key[asset_spec.key] = NamedOut(
output_name,
Out(
Nothing,
Expand All @@ -296,12 +310,9 @@ def from_specs(
upstream_keys = set()
for spec in args.specs:
for dep in spec.deps:
if dep.asset_key not in output_tuples_by_asset_key:
if dep.asset_key not in named_outs_by_asset_key:
upstream_keys.add(dep.asset_key)
if (
dep.asset_key in output_tuples_by_asset_key
and dep.partition_mapping is not None
):
if dep.asset_key in named_outs_by_asset_key and dep.partition_mapping is not None:
# self-dependent asset also needs to be considered an upstream_key
upstream_keys.add(dep.asset_key)

Expand All @@ -324,8 +335,8 @@ def from_specs(
}

return AssetsDefinitionBuilder(
input_tuples_by_asset_key=input_tuples_by_asset_key,
output_tables_by_asset_key=output_tuples_by_asset_key,
named_ins_by_asset_key=input_tuples_by_asset_key,
named_outs_by_asset_key=named_outs_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
multi_asset_args=args,
Expand Down Expand Up @@ -388,8 +399,8 @@ def from_asset_outs(
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}

return AssetsDefinitionBuilder(
input_tuples_by_asset_key=inputs_tuples_by_asset_key,
output_tables_by_asset_key=output_tuples_by_asset_key,
named_ins_by_asset_key=inputs_tuples_by_asset_key,
named_outs_by_asset_key=output_tuples_by_asset_key,
internal_deps=internal_deps,
op_name=op_name,
multi_asset_args=args,
Expand All @@ -401,53 +412,26 @@ def group_name(self) -> Optional[str]:
return self.args.group_name

@cached_property
def input_tuples_by_asset_key(self) -> Mapping[AssetKey, Tuple[str, In]]:
if self.args.can_subset and self.internal_deps:
return {
**self._passed_input_tuples_by_asset_key,
**build_subsettable_asset_ins(
self._passed_input_tuples_by_asset_key,
self.output_tuples_by_asset_key,
self.internal_deps.values(),
),
}
else:
return self._passed_input_tuples_by_asset_key

@cached_property
def in_mappings(self) -> Mapping[AssetKey, InMapping]:
return {
asset_key: InMapping(input_name, in_)
for asset_key, (input_name, in_) in self.input_tuples_by_asset_key.items()
}

@cached_property
def out_mappings(self) -> Mapping[AssetKey, OutMapping]:
return {
asset_key: OutMapping(output_name, out_)
for asset_key, (output_name, out_) in self.output_tuples_by_asset_key.items()
}

@cached_property
def asset_outs_by_output_name(self) -> Mapping[str, Out]:
return dict(self.out_mappings.values())
def outs_by_output_name(self) -> Mapping[str, Out]:
return dict(self.named_outs_by_asset_key.values())

@cached_property
def asset_keys_by_input_name(self) -> Mapping[str, AssetKey]:
return {
in_mapping.input_name: asset_key for asset_key, in_mapping in self.in_mappings.items()
in_mapping.input_name: asset_key
for asset_key, in_mapping in self.named_ins_by_asset_key.items()
}

@cached_property
def asset_keys_by_output_name(self) -> Mapping[str, AssetKey]:
return {
out_mapping.output_name: asset_key
for asset_key, out_mapping in self.out_mappings.items()
for asset_key, out_mapping in self.named_outs_by_asset_key.items()
}

@cached_property
def asset_keys(self) -> Set[AssetKey]:
return set(self.out_mappings.keys())
return set(self.named_outs_by_asset_key.keys())

@cached_property
def check_specs_by_output_name(self) -> Mapping[str, AssetCheckSpec]:
Expand All @@ -465,24 +449,23 @@ def check_outs_by_output_name(self) -> Mapping[str, Out]:
@cached_property
def combined_outs_by_output_name(self) -> Mapping[str, Out]:
return {
**self.asset_outs_by_output_name,
**self.outs_by_output_name,
**self.check_outs_by_output_name,
}

@cached_property
def overlapping_output_names(self) -> Set[str]:
return set(self.asset_outs_by_output_name.keys()) & set(
self.check_outs_by_output_name.keys()
)
return set(self.outs_by_output_name.keys()) & set(self.check_outs_by_output_name.keys())

@cached_property
def asset_ins_by_input_names(self) -> Mapping[str, In]:
return {in_name: in_obj for in_name, in_obj in self.in_mappings.values()}
def ins_by_input_names(self) -> Mapping[str, In]:
return {in_name: in_obj for in_name, in_obj in self.named_ins_by_asset_key.values()}

@cached_property
def asset_keys_by_input_names(self) -> Mapping[str, AssetKey]:
return {
in_mapping.input_name: asset_key for asset_key, in_mapping in self.in_mappings.items()
in_mapping.input_name: asset_key
for asset_key, in_mapping in self.named_ins_by_asset_key.items()
}

@cached_property
Expand All @@ -506,7 +489,7 @@ def _create_op_definition(self) -> OpDefinition:
return _Op(
name=self.op_name,
description=self.args.description,
ins=self.asset_ins_by_input_names,
ins=self.ins_by_input_names,
out=self.combined_outs_by_output_name,
required_resource_keys=compute_required_resource_keys(
self.args.required_resource_keys,
Expand Down

0 comments on commit 977ee45

Please sign in to comment.