Skip to content

Commit

Permalink
Begin of stack to refactor asset check decorators to use DecoratorAss…
Browse files Browse the repository at this point in the history
…etsDefinitionBuilder
  • Loading branch information
schrockn committed Jun 9, 2024
1 parent 9d2e085 commit 3728e6f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,4 @@
from typing import (
AbstractSet,
Any,
Callable,
Iterable,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
)
from typing import AbstractSet, Any, Callable, Iterable, Mapping, Optional, Sequence, Set, Union

from typing_extensions import TypeAlias

Expand All @@ -30,9 +19,11 @@
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._utils.warnings import disable_dagster_warnings

from ..input import In
from .asset_decorator import make_asset_deps
from .decorator_assets_definition_builder import (
DecoratorAssetsDefinitionBuilder,
DecoratorAssetsDefinitionBuilderArgs,
NamedIn,
build_named_ins,
compute_required_resource_keys,
get_function_params_without_context_or_config_or_resources,
Expand All @@ -43,13 +34,13 @@
AssetCheckFunction: TypeAlias = Callable[..., AssetCheckFunctionReturn]


def _build_asset_check_input(
def _build_asset_check_named_ins(
name: str,
asset_key: AssetKey,
fn: Callable[..., Any],
additional_ins: Mapping[str, AssetIn],
additional_deps: Optional[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
) -> Mapping[AssetKey, NamedIn]:
fn_params = get_function_params_without_context_or_config_or_resources(fn)

if asset_key in (additional_deps or []):
Expand Down Expand Up @@ -192,7 +183,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
asset_key = AssetKey.from_coercible_or_definition(asset)

additional_dep_keys = set([dep.asset_key for dep in make_asset_deps(additional_deps) or []])
input_tuples_by_asset_key = _build_asset_check_input(
named_in_by_asset_key = _build_asset_check_named_ins(
resolved_name,
asset_key,
fn,
Expand All @@ -202,7 +193,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:

# additional_deps on AssetCheckSpec holds the keys passed to additional_deps and
# additional_ins. We don't want to include the primary asset key in this set.
additional_ins_and_deps = input_tuples_by_asset_key.keys() - {asset_key}
additional_ins_and_deps = named_in_by_asset_key.keys() - {asset_key}

spec = AssetCheckSpec(
name=resolved_name,
Expand All @@ -215,18 +206,48 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:

resource_defs_for_execution = wrap_resources_for_execution(resource_defs)

op_required_resource_keys = compute_required_resource_keys(
required_resource_keys or set(),
resource_defs_for_execution,
fn=fn,
builder_args = DecoratorAssetsDefinitionBuilderArgs(
decorator_name="@asset_check",
name=name,
description=description,
required_resource_keys=required_resource_keys or set(),
config_schema=config_schema,
retry_policy=retry_policy,
specs=[],
check_specs=[spec],
can_subset=False,
compute_kind=compute_kind,
op_tags=op_tags,
op_def_resource_defs=resource_defs_for_execution,
assets_def_resource_defs=resource_defs_for_execution,
upstream_asset_deps=[],
# unsupported capabiltiies in asset checks
partitions_def=None,
code_version=None,
backfill_policy=None,
group_name=None,
# non-sensical args in this context
asset_deps={},
asset_in_map={},
asset_out_map={},
)

builder = DecoratorAssetsDefinitionBuilder(
named_ins_by_asset_key=named_in_by_asset_key,
named_outs_by_asset_key={},
internal_deps={},
op_name=spec.get_python_identifier(),
args=builder_args,
fn=fn,
)

op_required_resource_keys = builder.required_resource_keys

out = Out(dagster_type=None)

op_def = _Op(
name=spec.get_python_identifier(),
ins=dict(input_tuples_by_asset_key.values()),
ins=dict(named_in_by_asset_key.values()),
out=out,
# Any resource requirements specified as arguments will be identified as
# part of the Op definition instantiation
Expand All @@ -242,7 +263,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
return AssetChecksDefinition.create(
keys_by_input_name={
input_tuple[0]: asset_key
for asset_key, input_tuple in input_tuples_by_asset_key.items()
for asset_key, input_tuple in named_in_by_asset_key.items()
},
node_def=op_def,
resource_defs=resource_defs_for_execution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,18 +507,22 @@ def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]:
asset_name=self.op_name,
)

@cached_property
def required_resource_keys(self) -> AbstractSet[str]:
return compute_required_resource_keys(
required_resource_keys=self.args.required_resource_keys,
resource_defs=self.args.assets_def_resource_defs,
fn=self.fn,
decorator_name=self.args.decorator_name,
)

def _create_op_definition(self) -> OpDefinition:
return _Op(
name=self.op_name,
description=self.args.description,
ins=self.ins_by_input_names,
out=self.combined_outs_by_output_name,
required_resource_keys=compute_required_resource_keys(
required_resource_keys=self.args.required_resource_keys,
resource_defs=self.args.op_def_resource_defs,
fn=self.fn,
decorator_name=self.args.decorator_name,
),
required_resource_keys=self.required_resource_keys,
tags={
**({COMPUTE_KIND_TAG: self.args.compute_kind} if self.args.compute_kind else {}),
**(self.args.op_tags or {}),
Expand Down

0 comments on commit 3728e6f

Please sign in to comment.