Skip to content

Commit

Permalink
cp
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 9, 2024
1 parent 433ad2f commit 962222a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
from typing import AbstractSet, Any, Callable, Dict, Iterable, Mapping, Optional, Sequence, Set, Union
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Iterable,
Mapping,
Optional,
Sequence,
Set,
Union,
)

from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey
from typing_extensions import TypeAlias

from dagster import _check as check
Expand Down Expand Up @@ -33,6 +44,9 @@
)
from .op_decorator import _Op

if TYPE_CHECKING:
from dagster._core.definitions.base_asset_graph import AssetKeyOrCheckKey

AssetCheckFunctionReturn: TypeAlias = AssetCheckResult
AssetCheckFunction: TypeAlias = Callable[..., AssetCheckFunctionReturn]

Expand Down Expand Up @@ -248,24 +262,31 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
fn=fn,
)

op_required_resource_keys = builder.required_resource_keys

out = Out(dagster_type=None)

op_def = _Op(
name=spec.get_python_identifier(),
ins=dict(named_in_by_asset_key.values()),
out=out,
# Any resource requirements specified as arguments will be identified as
# part of the Op definition instantiation
required_resource_keys=op_required_resource_keys,
tags={
**({COMPUTE_KIND_TAG: compute_kind} if compute_kind else {}),
**(op_tags or {}),
},
config_schema=config_schema,
retry_policy=retry_policy,
)(fn)
# op_required_resource_keys = builder.required_resource_keys

# out = Out(dagster_type=None)

# old_op_def = _Op(
# name=spec.get_python_identifier(),
# ins=dict(named_in_by_asset_key.values()),
# out=out,
# # Any resource requirements specified as arguments will be identified as
# # part of the Op definition instantiation
# required_resource_keys=op_required_resource_keys,
# tags={
# **({COMPUTE_KIND_TAG: compute_kind} if compute_kind else {}),
# **(op_tags or {}),
# },
# config_schema=config_schema,
# retry_policy=retry_policy,
# )(fn)

op_def = builder.create_op_definition()

# check.invariant(
# [od.name for od in old_op_def.output_defs] == [od.name for od in op_def.output_defs],
# f"Comparing {old_op_def.output_defs} to {op_def.output_defs}",
# )

return AssetChecksDefinition.create(
keys_by_input_name={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ def from_multi_asset_specs(
if spec.deps is not None
}

_validate_check_specs_target_relevant_asset_keys(
passed_args.check_specs, [spec.key for spec in asset_specs]
)

return DecoratorAssetsDefinitionBuilder(
named_ins_by_asset_key=named_ins_by_asset_key,
named_outs_by_asset_graph_key=named_outs_by_asset_graph_key,
Expand Down Expand Up @@ -425,9 +429,13 @@ def from_asset_outs_in_asset_centric_decorator(
keys_by_output_name = make_keys_by_output_name(named_outs_by_asset_key)
internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps}

_validate_check_specs_target_relevant_asset_keys(
passed_args.check_specs, list(named_outs_by_asset_key.keys())
)

return DecoratorAssetsDefinitionBuilder(
named_ins_by_asset_key=named_ins_by_asset_key,
named_outs_by_asset_graph_key=named_outs_by_asset_key,
named_outs_by_asset_graph_key=named_outs_by_asset_key, # type: ignore
internal_deps=internal_deps,
op_name=op_name,
args=passed_args,
Expand Down Expand Up @@ -478,6 +486,9 @@ def check_outs_by_output_name(self) -> Mapping[str, Out]:

@cached_property
def combined_outs_by_output_name(self) -> Mapping[str, Out]:
if self.args.decorator_name == "@asset_check":
return self.outs_by_output_name

return {
**self.outs_by_output_name,
**self.check_outs_by_output_name,
Expand Down Expand Up @@ -524,7 +535,7 @@ def required_resource_keys(self) -> AbstractSet[str]:
decorator_name=self.args.decorator_name,
)

def _create_op_definition(self) -> OpDefinition:
def create_op_definition(self) -> OpDefinition:
return _Op(
name=self.op_name,
description=self.args.description,
Expand All @@ -544,7 +555,7 @@ def create_assets_definition(self) -> AssetsDefinition:
return AssetsDefinition.dagster_internal_init(
keys_by_input_name=self.asset_keys_by_input_names,
keys_by_output_name=self.asset_keys_by_output_name,
node_def=self._create_op_definition(),
node_def=self.create_op_definition(),
partitions_def=self.args.partitions_def,
can_subset=self.args.can_subset,
resource_defs=self.args.assets_def_resource_defs,
Expand Down Expand Up @@ -603,7 +614,7 @@ def _synthesize_specs(self) -> Sequence[AssetSpec]:
def validate_and_assign_output_names_to_check_specs(
check_specs: Optional[Sequence[AssetCheckSpec]], valid_asset_keys: Sequence[AssetKey]
) -> Mapping[str, AssetCheckSpec]:
_validate_check_specs_target_relevant_asset_keys(check_specs, valid_asset_keys)
# _validate_check_specs_target_relevant_asset_keys(check_specs, valid_asset_keys)
return _assign_output_names_to_check_specs(check_specs)


Expand Down

0 comments on commit 962222a

Please sign in to comment.