Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asset check decorator refactor placeholder #22380

7 changes: 4 additions & 3 deletions python_modules/dagster-graphql/dagster_graphql/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ def execute_query(
if "errors" in result_dict:
result_dict_errors = check.list_elem(result_dict, "errors", of_type=Exception)
result_errors = check.is_list(result.errors, of_type=Exception)
check.invariant(len(result_dict_errors) == len(result_errors)) #
check.invariant(len(result_dict_errors) == len(result_errors))
for python_error, error_dict in zip(result_errors, result_dict_errors):
if hasattr(python_error, "original_error") and python_error.original_error:
error_dict["stack_trace"] = get_stack_trace_array(python_error.original_error)
# Typing errors caught by making is_list typed -- schrockn 2024-06-09
if hasattr(python_error, "original_error") and python_error.original_error: # type: ignore
error_dict["stack_trace"] = get_stack_trace_array(python_error.original_error) # type: ignore

return result_dict

Expand Down
9 changes: 6 additions & 3 deletions python_modules/dagster/dagster/_check/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,18 +878,21 @@ def opt_list_elem(
return _check_iterable_items(value, of_type, "list")


TTypeOrTupleOfTTypes = Union[Type[T], Tuple[Type[T], ...]]


def is_list(
obj: object,
of_type: Optional[TypeOrTupleOfTypes] = None,
of_type: Optional[TTypeOrTupleOfTTypes[T]] = None,
additional_message: Optional[str] = None,
) -> List:
) -> List[T]:
if not isinstance(obj, list):
raise _type_mismatch_error(obj, list, additional_message)

if not of_type:
return obj

return _check_iterable_items(obj, of_type, "list")
return list(_check_iterable_items(obj, of_type, "list"))


# ########################
Expand Down
3 changes: 2 additions & 1 deletion python_modules/dagster/dagster/_config/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ def has_field(self, name: str) -> bool:
@property
def field_names(self) -> Sequence[str]:
fields = check.is_list(self.fields, of_type=ConfigFieldSnap)
return [fs.name for fs in fields]
# Typing error caught by making is_list typed -- schrockn 2024-06-09
return [fs.name for fs in fields] # type: ignore

def get_child_type_keys(self) -> Sequence[str]:
if ConfigTypeKind.is_closed_generic(self.kind):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.output import Out
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.execution.build_resources import wrap_resources_for_execution
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster._utils.warnings import disable_dagster_warnings

from ..input import In
from .asset_decorator import make_asset_deps
from .decorator_assets_definition_builder import (
build_asset_ins,
DecoratorAssetsDefinitionBuilder,
DecoratorAssetsDefinitionBuilderArgs,
NamedIn,
build_named_ins,
compute_required_resource_keys,
get_function_params_without_context_or_config_or_resources,
)
from .op_decorator import _Op
Expand All @@ -49,7 +51,25 @@ def _build_asset_check_input(
fn: Callable[..., Any],
additional_ins: Mapping[str, AssetIn],
additional_deps: Optional[AbstractSet[AssetKey]],
) -> Mapping[AssetKey, Tuple[str, In]]:
) -> Mapping[AssetKey, NamedIn]:
all_deps, all_ins = _build_all_deps_and_all_ins_for_check_input(
name, asset_key, fn, additional_ins, additional_deps
)

return build_named_ins(
fn=fn,
asset_ins=all_ins,
deps=all_deps,
)


def _build_all_deps_and_all_ins_for_check_input(
name: str,
asset_key: AssetKey,
fn: Callable[..., Any],
additional_ins: Mapping[str, AssetIn],
additional_deps: Optional[AbstractSet[AssetKey]],
) -> Tuple[AbstractSet[AssetKey], Mapping[str, AssetIn]]:
fn_params = get_function_params_without_context_or_config_or_resources(fn)

if asset_key in (additional_deps or []):
Expand Down Expand Up @@ -91,11 +111,7 @@ def _build_asset_check_input(
" the target asset or be specified in 'additional_ins'."
)

return build_asset_ins(
fn=fn,
asset_ins=all_ins,
deps=all_deps,
)
return all_deps, all_ins


def asset_check(
Expand Down Expand Up @@ -189,6 +205,9 @@ def my_asset_has_enough_rows(my_asset: DataFrame) -> AssetCheckResult:
def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
check.callable_param(fn, "fn")
resolved_name = name or fn.__name__

wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

asset_key = AssetKey.from_coercible_or_definition(asset)

additional_dep_keys = set([dep.asset_key for dep in make_asset_deps(additional_deps) or []])
Expand All @@ -199,6 +218,13 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
additional_ins=additional_ins or {},
additional_deps=additional_dep_keys,
)
all_deps, all_ins = _build_all_deps_and_all_ins_for_check_input(
resolved_name,
asset_key,
fn,
additional_ins=additional_ins or {},
additional_deps=additional_dep_keys,
)

# additional_deps on AssetCheckSpec holds the keys passed to additional_deps and
# additional_ins. We don't want to include the primary asset key in this set.
Expand All @@ -213,18 +239,74 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
metadata=metadata,
)

arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
args = DecoratorAssetsDefinitionBuilderArgs(
decorator_name="@asset_check",
name=name,
description=description,
required_resource_keys=required_resource_keys or set(),
config_schema=config_schema,
retry_policy=retry_policy,
specs=[],
check_specs=[spec],
can_subset=False,
compute_kind=compute_kind,
op_tags=op_tags,
op_def_resource_defs=wrapped_resource_defs,
assets_def_resource_defs=wrapped_resource_defs,
upstream_asset_deps=[],
# unsupported capabiltiies in asset checks
partitions_def=None,
code_version=None,
backfill_policy=None,
group_name=None,
# non-sensical args in this context
asset_deps={},
asset_in_map={},
asset_out_map={},
)

if False:
builder = DecoratorAssetsDefinitionBuilder.from_specs(
fn=fn,
passed_args=args,
asset_specs=[],
can_subset=False,
asset_in_map=all_ins,
op_name=resolved_name,
)

return builder.create_asset_checks_definition()

additional_dep_keys = set([dep.asset_key for dep in make_asset_deps(additional_deps) or []])
input_tuples_by_asset_key = _build_asset_check_input(
resolved_name,
asset_key,
fn,
additional_ins=additional_ins or {},
additional_deps=additional_dep_keys,
)

# additional_deps on AssetCheckSpec holds the keys passed to additional_deps and
# additional_ins. We don't want to include the primary asset key in this set.
additional_ins_and_deps = input_tuples_by_asset_key.keys() - {asset_key}

check.param_invariant(
len(required_resource_keys or []) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @asset_check decorator and as arguments"
" to the decorated function",
spec = AssetCheckSpec(
name=resolved_name,
description=description,
asset=asset_key,
additional_deps=additional_ins_and_deps,
blocking=blocking,
metadata=metadata,
)

resource_defs_keys = set(resource_defs.keys() if resource_defs else [])
decorator_resource_keys = (required_resource_keys or set()) | resource_defs_keys
resource_defs_for_execution = wrap_resources_for_execution(resource_defs)

op_required_resource_keys = decorator_resource_keys - arg_resource_keys
op_required_resource_keys = compute_required_resource_keys(
required_resource_keys or set(),
resource_defs_for_execution,
fn=fn,
decorator_name="@asset_check",
)

out = Out(dagster_type=None)

Expand All @@ -249,7 +331,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
for asset_key, input_tuple in input_tuples_by_asset_key.items()
},
node_def=op_def,
resource_defs=wrap_resources_for_execution(resource_defs),
resource_defs=resource_defs_for_execution,
check_specs_by_output_name={op_def.output_defs[0].name: spec},
can_subset=False,
)
Expand Down Expand Up @@ -328,13 +410,89 @@ def checks():

def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
op_name = name or fn.__name__
arg_resource_keys = {arg.name for arg in get_resource_args(fn)}
op_required_resource_keys = required_resource_keys - arg_resource_keys
upstream_asset_deps_ = {spec.asset_key for spec in specs} | {
dep.asset_key for spec in specs for dep in spec.additional_deps or []
}
upstream_asset_deps = make_asset_deps(upstream_asset_deps_)
# List[AssetDep] = []
# make_asset_deps
# for check_spec in specs:
# upstream_asset_deps.append
# upstream_asset_deps.extend(check_spec.additional_deps)
deps_for_build_named_ins = {spec.asset_key for spec in specs} | {
dep.asset_key for spec in specs for dep in spec.additional_deps or []
}

args = DecoratorAssetsDefinitionBuilderArgs(
decorator_name="@multi_asset_check",
name=None,
description=description,
required_resource_keys=required_resource_keys,
config_schema=config_schema,
retry_policy=retry_policy,
specs=[],
check_specs=specs,
can_subset=can_subset,
compute_kind=compute_kind,
op_tags=op_tags,
op_def_resource_defs=resource_defs,
assets_def_resource_defs=resource_defs,
upstream_asset_deps=upstream_asset_deps,
# unsupported capabiltiies in asset checks
partitions_def=None,
code_version=None,
backfill_policy=None,
group_name=None,
# non-sensical args in this context
asset_deps={},
asset_in_map={},
asset_out_map={},
)

# builder = DecoratorAssetsDefinitionBuilder(
# fn=fn,
# op_name=op_name,
# internal_deps={},
# named_ins_by_asset_key={},
# named_outs_by_asset_key={},
# args=args
# )

if True:
builder = DecoratorAssetsDefinitionBuilder.from_specs(
op_name=op_name,
asset_specs=[],
# check_specs=specs,
can_subset=can_subset,
# compute_kind=compute_kind,
# op_tags=op_tags,
fn=fn,
# resource_defs=resource_defs,
# required_resource_keys=required_resource_keys,
passed_args=args,
asset_in_map={},
deps_for_build_named_ins=deps_for_build_named_ins,
)
# return AssetChecksDefinition.create(
# node_def=op_def,
# resource_defs=wrap_resources_for_execution(resource_defs),
# keys_by_input_name={
# input_tuple[0]: asset_key
# for asset_key, input_tuple in named_ins_by_asset_key.items()
# },
# check_specs_by_output_name={spec.get_python_identifier(): spec for spec in specs},
# can_subset=can_subset,
# )
return builder.create_asset_checks_definition()

op_required_resource_keys = compute_required_resource_keys(
required_resource_keys, resource_defs, fn=fn, decorator_name="@multi_asset_check"
)

outs = {
spec.get_python_identifier(): Out(None, is_required=not can_subset) for spec in specs
}
input_tuples_by_asset_key = build_asset_ins(
named_ins_by_asset_key = build_named_ins(
fn=fn,
asset_ins={},
deps={spec.asset_key for spec in specs}
Expand All @@ -345,7 +503,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
op_def = _Op(
name=op_name,
description=description,
ins=dict(input_tuples_by_asset_key.values()),
ins=dict(named_ins_by_asset_key.values()),
out=outs,
required_resource_keys=op_required_resource_keys,
tags={
Expand All @@ -361,7 +519,7 @@ def inner(fn: MultiAssetCheckFunction) -> AssetChecksDefinition:
resource_defs=wrap_resources_for_execution(resource_defs),
keys_by_input_name={
input_tuple[0]: asset_key
for asset_key, input_tuple in input_tuples_by_asset_key.items()
for asset_key, input_tuple in named_ins_by_asset_key.items()
},
check_specs_by_output_name={spec.get_python_identifier(): spec for spec in specs},
can_subset=can_subset,
Expand Down
Loading