Skip to content

Commit

Permalink
Add asset_condition argument to AssetsDefinition etal.
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Dec 29, 2023
1 parent 21a936e commit a3e232e
Show file tree
Hide file tree
Showing 21 changed files with 200 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def blocking_asset(**kwargs):
resource_defs=asset_def.resource_defs,
metadata=asset_def.metadata_by_key.get(asset_def.key),
freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key),
auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key),
asset_condition=asset_def.asset_conditions_by_key.get(asset_def.key),
backfill_policy=asset_def.backfill_policy,
config=None, # gets config from asset_def.op
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
from dagster._core.definitions.metadata import MetadataMapping, MetadataValue
from dagster._serdes.serdes import whitelist_for_serdes

from .asset_condition_evaluation_context import (
AssetConditionEvaluationContext,
)
from .asset_subset import AssetSubset

if TYPE_CHECKING:
from .asset_condition_evaluation_context import AssetConditionEvaluationContext
from .auto_materialize_rule import AutoMaterializeRule


Expand Down Expand Up @@ -144,7 +142,7 @@ def unique_id(self) -> str:

@abstractmethod
def evaluate(
self, context: AssetConditionEvaluationContext
self, context: "AssetConditionEvaluationContext"
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
raise NotImplementedError()

Expand Down Expand Up @@ -217,7 +215,7 @@ def description(self) -> str:
return self.rule.description

def evaluate(
self, context: AssetConditionEvaluationContext
self, context: "AssetConditionEvaluationContext"
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
Expand Down Expand Up @@ -245,7 +243,7 @@ def description(self) -> str:
return "All of"

def evaluate(
self, context: AssetConditionEvaluationContext
self, context: "AssetConditionEvaluationContext"
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
child_evaluations: List[AssetConditionEvaluation] = []
child_extras: List[AssetConditionCursorExtras] = []
Expand Down Expand Up @@ -275,7 +273,7 @@ def description(self) -> str:
return "Any of"

def evaluate(
self, context: AssetConditionEvaluationContext
self, context: "AssetConditionEvaluationContext"
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
child_evaluations: List[AssetConditionEvaluation] = []
child_extras: List[AssetConditionCursorExtras] = []
Expand Down Expand Up @@ -315,7 +313,7 @@ def child(self) -> AssetCondition:
return self.children[0]

def evaluate(
self, context: AssetConditionEvaluationContext
self, context: "AssetConditionEvaluationContext"
) -> Tuple[AssetConditionEvaluation, Sequence[AssetConditionCursorExtras]]:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from ... import PartitionKeyRange
from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG
from .asset_condition import AssetConditionEvaluation
from .asset_condition import AssetCondition, AssetConditionEvaluation
from .asset_condition_evaluation_context import (
AssetConditionEvaluationContext,
)
Expand All @@ -50,12 +50,12 @@
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer # expensive import


def get_implicit_auto_materialize_policy(
def get_implicit_asset_condition(
asset_key: AssetKey, asset_graph: AssetGraph
) -> Optional[AutoMaterializePolicy]:
) -> Optional[AssetCondition]:
"""For backcompat with pre-auto materialize policy graphs, assume a default scope of 1 day."""
auto_materialize_policy = asset_graph.get_auto_materialize_policy(asset_key)
if auto_materialize_policy is None:
asset_condition = asset_graph.get_asset_condition(asset_key)
if asset_condition is None:
time_partitions_def = get_time_partitions_def(asset_graph.get_partitions_def(asset_key))
if time_partitions_def is None:
max_materializations_per_minute = None
Expand All @@ -76,8 +76,8 @@ def get_implicit_auto_materialize_policy(
return AutoMaterializePolicy(
rules=rules,
max_materializations_per_minute=max_materializations_per_minute,
)
return auto_materialize_policy
).to_asset_condition()
return asset_condition


class AssetDaemonContext:
Expand Down Expand Up @@ -192,9 +192,7 @@ def evaluate_asset(
"""
# convert the legacy AutoMaterializePolicy to an Evaluator
asset_condition = check.not_none(
self.asset_graph.auto_materialize_policies_by_key.get(asset_key)
).to_asset_condition()
asset_condition = check.not_none(self.asset_graph.get_asset_condition(asset_key))

asset_cursor = self.cursor.get_asset_cursor(asset_key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
TypeVar,
)

from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.auto_materialize_rule_evaluation import (
BackcompatAutoMaterializeAssetEvaluationSerializer,
Expand All @@ -29,10 +28,9 @@
whitelist_for_serdes,
)

from .asset_graph import AssetGraph

if TYPE_CHECKING:
from .asset_condition import AssetCondition, AssetConditionEvaluation, AssetConditionSnapshot
from .asset_graph import AssetGraph

T = TypeVar("T")

Expand Down Expand Up @@ -118,7 +116,9 @@ def empty(evaluation_id: int) -> "AssetDaemonCursor":

@staticmethod
def from_serialized(
raw_cursor: Optional[str], asset_graph: Optional[AssetGraph], default_evaluation_id: int = 0
raw_cursor: Optional[str],
asset_graph: Optional["AssetGraph"],
default_evaluation_id: int = 0,
) -> "AssetDaemonCursor":
"""Deserializes an AssetDaemonCursor from a string. Provides a backcompat layer for the old
manually-serialized cursor format.
Expand Down Expand Up @@ -215,12 +215,13 @@ def get_backcompat_asset_condition_cursor(


def backcompat_deserialize_asset_daemon_cursor_str(
cursor_str: str, asset_graph: Optional[AssetGraph], default_evaluation_id: int
cursor_str: str, asset_graph: Optional["AssetGraph"], default_evaluation_id: int
) -> AssetDaemonCursor:
"""This serves as a backcompat layer for deserializing the old cursor format. Some information
is impossible to fully recover, this will recover enough to continue operating as normal.
"""
from .asset_condition import AssetConditionEvaluationWithRunIds
from .asset_graph_subset import AssetGraphSubset

data = json.loads(cursor_str)

Expand Down
26 changes: 12 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.asset_condition import AssetCondition
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.selector.subset_selector import (
Expand Down Expand Up @@ -79,7 +79,7 @@ def __init__(
partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]],
group_names_by_key: Mapping[AssetKey, Optional[str]],
freshness_policies_by_key: Mapping[AssetKey, Optional[FreshnessPolicy]],
auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]],
asset_conditions_by_key: Mapping[AssetKey, Optional[AssetCondition]],
backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]],
code_versions_by_key: Mapping[AssetKey, Optional[str]],
is_observable_by_key: Mapping[AssetKey, bool],
Expand All @@ -94,7 +94,7 @@ def __init__(
self._partition_mappings_by_key = partition_mappings_by_key
self._group_names_by_key = group_names_by_key
self._freshness_policies_by_key = freshness_policies_by_key
self._auto_materialize_policies_by_key = auto_materialize_policies_by_key
self._asset_conditions_by_key = asset_conditions_by_key
self._backfill_policies_by_key = backfill_policies_by_key
self._code_versions_by_key = code_versions_by_key
self._is_observable_by_key = is_observable_by_key
Expand Down Expand Up @@ -141,10 +141,8 @@ def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolic
return self._freshness_policies_by_key

@property
def auto_materialize_policies_by_key(
self,
) -> Mapping[AssetKey, Optional[AutoMaterializePolicy]]:
return self._auto_materialize_policies_by_key
def asset_condtitions_by_key(self) -> Mapping[AssetKey, Optional[AssetCondition]]:
return self._asset_conditions_by_key

@property
def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]:
Expand All @@ -166,7 +164,7 @@ def from_assets(
] = {}
group_names_by_key: Dict[AssetKey, Optional[str]] = {}
freshness_policies_by_key: Dict[AssetKey, Optional[FreshnessPolicy]] = {}
auto_materialize_policies_by_key: Dict[AssetKey, Optional[AutoMaterializePolicy]] = {}
asset_conditions_by_key: Dict[AssetKey, Optional[AssetCondition]] = {}
backfill_policies_by_key: Dict[AssetKey, Optional[BackfillPolicy]] = {}
code_versions_by_key: Dict[AssetKey, Optional[str]] = {}
is_observable_by_key: Dict[AssetKey, bool] = {}
Expand All @@ -192,7 +190,7 @@ def from_assets(
partitions_defs_by_key.update({key: asset.partitions_def for key in asset.keys})
group_names_by_key.update(asset.group_names_by_key)
freshness_policies_by_key.update(asset.freshness_policies_by_key)
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
asset_conditions_by_key.update(asset.asset_conditions_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)

Expand All @@ -209,7 +207,7 @@ def from_assets(
partition_mappings_by_key=partition_mappings_by_key,
group_names_by_key=group_names_by_key,
freshness_policies_by_key=freshness_policies_by_key,
auto_materialize_policies_by_key=auto_materialize_policies_by_key,
asset_conditions_by_key=asset_conditions_by_key,
backfill_policies_by_key=backfill_policies_by_key,
assets=assets_defs,
asset_checks=asset_checks or [],
Expand Down Expand Up @@ -531,8 +529,8 @@ def toposort_asset_keys(self) -> Sequence[AbstractSet[AssetKey]]:
{key for key in level} for level in toposort.toposort(self._asset_dep_graph["upstream"])
]

def get_auto_materialize_policy(self, asset_key: AssetKey) -> Optional[AutoMaterializePolicy]:
return self.auto_materialize_policies_by_key.get(asset_key)
def get_asset_condition(self, asset_key: AssetKey) -> Optional[AssetCondition]:
return self.asset_condtitions_by_key.get(asset_key)

def get_backfill_policy(self, asset_key: AssetKey) -> Optional[BackfillPolicy]:
return self.backfill_policies_by_key.get(asset_key)
Expand Down Expand Up @@ -707,7 +705,7 @@ def __init__(
partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]],
group_names_by_key: Mapping[AssetKey, Optional[str]],
freshness_policies_by_key: Mapping[AssetKey, Optional[FreshnessPolicy]],
auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]],
asset_conditions_by_key: Mapping[AssetKey, Optional[AssetCondition]],
backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]],
assets: Sequence[AssetsDefinition],
source_assets: Sequence[SourceAsset],
Expand All @@ -726,7 +724,7 @@ def __init__(
partition_mappings_by_key=partition_mappings_by_key,
group_names_by_key=group_names_by_key,
freshness_policies_by_key=freshness_policies_by_key,
auto_materialize_policies_by_key=auto_materialize_policies_by_key,
asset_conditions_by_key=asset_conditions_by_key,
backfill_policies_by_key=backfill_policies_by_key,
code_versions_by_key=code_versions_by_key,
is_observable_by_key=is_observable_by_key,
Expand Down
Loading

0 comments on commit a3e232e

Please sign in to comment.