Skip to content

Commit

Permalink
AssetAutomationCondition -> AssetCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 18, 2024
1 parent 643167c commit d6143ec
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 223 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
from .asset_subset import AssetSubset

if TYPE_CHECKING:
from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata
from dagster._core.definitions.asset_condition import AssetSubsetWithMetdata

from .asset_automation_evaluator import AutomationCondition, ConditionEvaluation
from .asset_condition import AssetCondition, AssetConditionEvaluation
from .asset_daemon_context import AssetDaemonContext


@dataclass(frozen=True)
class AssetAutomationEvaluationContext:
class RootAssetConditionEvaluationContext:
"""Context object containing methods and properties used for evaluating the entire state of an
asset's automation rules.
"""

asset_key: AssetKey
asset_cursor: Optional[AssetDaemonAssetCursor]
root_condition: "AutomationCondition"
root_condition: "AssetCondition"

instance_queryer: CachingInstanceQueryer
data_time_resolver: CachingDataTimeResolver
daemon_context: "AssetDaemonContext"

evaluation_results_by_key: Mapping[AssetKey, "ConditionEvaluation"]
evaluation_results_by_key: Mapping[AssetKey, "AssetConditionEvaluation"]
expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]]

@property
Expand All @@ -52,7 +52,7 @@ def evaluation_time(self) -> datetime.datetime:
return self.instance_queryer.evaluation_time

@functools.cached_property
def latest_evaluation(self) -> Optional["ConditionEvaluation"]:
def latest_evaluation(self) -> Optional["AssetConditionEvaluation"]:
if not self.asset_cursor:
return None
return self.asset_cursor.latest_evaluation
Expand Down Expand Up @@ -175,9 +175,9 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) ->
def empty_subset(self) -> AssetSubset:
return AssetSubset.empty(self.asset_key, self.partitions_def)

def get_root_condition_context(self) -> "AssetAutomationConditionEvaluationContext":
return AssetAutomationConditionEvaluationContext(
asset_context=self,
def get_root_condition_context(self) -> "AssetConditionEvaluationContext":
return AssetConditionEvaluationContext(
root_context=self,
condition=self.root_condition,
candidate_subset=AssetSubset.all(
asset_key=self.asset_key,
Expand All @@ -188,7 +188,9 @@ def get_root_condition_context(self) -> "AssetAutomationConditionEvaluationConte
latest_evaluation=self.latest_evaluation,
)

def get_new_asset_cursor(self, evaluation: "ConditionEvaluation") -> AssetDaemonAssetCursor:
def get_new_asset_cursor(
self, evaluation: "AssetConditionEvaluation"
) -> AssetDaemonAssetCursor:
"""Returns a new AssetDaemonAssetCursor based on the current cursor and the results of
this tick's evaluation.
"""
Expand All @@ -213,33 +215,33 @@ def get_new_asset_cursor(self, evaluation: "ConditionEvaluation") -> AssetDaemon


@dataclass(frozen=True)
class AssetAutomationConditionEvaluationContext:
"""Context object containing methods and properties used for evaluating a particular AutomationCondition."""
class AssetConditionEvaluationContext:
"""Context object containing methods and properties used for evaluating a particular AssetCondition."""

asset_context: AssetAutomationEvaluationContext
condition: "AutomationCondition"
root_context: RootAssetConditionEvaluationContext
condition: "AssetCondition"
candidate_subset: AssetSubset
latest_evaluation: Optional["ConditionEvaluation"]
latest_evaluation: Optional["AssetConditionEvaluation"]

@property
def asset_key(self) -> AssetKey:
return self.asset_context.asset_key
return self.root_context.asset_key

@property
def partitions_def(self) -> Optional[PartitionsDefinition]:
return self.asset_context.partitions_def
return self.root_context.partitions_def

@property
def asset_cursor(self) -> Optional[AssetDaemonAssetCursor]:
return self.asset_context.asset_cursor
return self.root_context.asset_cursor

@property
def asset_graph(self) -> AssetGraph:
return self.asset_context.asset_graph
return self.root_context.asset_graph

@property
def instance_queryer(self) -> CachingInstanceQueryer:
return self.asset_context.instance_queryer
return self.root_context.instance_queryer

@property
def max_storage_id(self) -> Optional[int]:
Expand All @@ -264,9 +266,9 @@ def parent_has_updated_subset(self) -> AssetSubset:
return AssetSubset.from_asset_partitions_set(
self.asset_key,
self.partitions_def,
self.asset_context.instance_queryer.asset_partitions_with_newly_updated_parents(
self.root_context.instance_queryer.asset_partitions_with_newly_updated_parents(
latest_storage_id=self.max_storage_id,
child_asset_key=self.asset_context.asset_key,
child_asset_key=self.root_context.asset_key,
map_old_time_partitions=False,
),
)
Expand All @@ -277,7 +279,7 @@ def candidate_parent_has_or_will_update_subset(self) -> AssetSubset:
the previous tick, or will update on this tick.
"""
return self.candidate_subset & (
self.parent_has_updated_subset | self.asset_context.parent_will_update_subset
self.parent_has_updated_subset | self.root_context.parent_will_update_subset
)

@property
Expand All @@ -292,26 +294,26 @@ def candidates_not_evaluated_on_previous_tick_subset(self) -> AssetSubset:
@property
def materialized_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return self.asset_context.materialized_since_previous_tick_subset
return self.root_context.materialized_since_previous_tick_subset

@property
def materialized_requested_or_discarded_since_previous_tick_subset(self) -> AssetSubset:
"""Returns the set of asset partitions that were materialized since the previous tick."""
return self.asset_context.materialized_requested_or_discarded_since_previous_tick_subset
return self.root_context.materialized_requested_or_discarded_since_previous_tick_subset

@property
def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetdata"]:
"""Returns the RuleEvaluationResults calculated on the previous tick for this condition."""
return self.latest_evaluation.subsets_with_metadata if self.latest_evaluation else []

def empty_subset(self) -> AssetSubset:
return self.asset_context.empty_subset()
return self.root_context.empty_subset()

def for_child(
self, condition: "AutomationCondition", candidate_subset: AssetSubset
) -> "AssetAutomationConditionEvaluationContext":
return AssetAutomationConditionEvaluationContext(
asset_context=self.asset_context,
self, condition: "AssetCondition", candidate_subset: AssetSubset, child_index: int
) -> "AssetConditionEvaluationContext":
return AssetConditionEvaluationContext(
root_context=self.root_context,
condition=condition,
candidate_subset=candidate_subset,
latest_evaluation=self.latest_evaluation.for_child(condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

from ... import PartitionKeyRange
from ..storage.tags import ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG
from .asset_automation_condition_context import AssetAutomationEvaluationContext
from .asset_automation_evaluator import ConditionEvaluation
from .asset_condition import AssetConditionEvaluation
from .asset_condition_evaluation_context import RootAssetConditionEvaluationContext
from .asset_daemon_cursor import AssetDaemonAssetCursor, AssetDaemonCursor
from .asset_graph import AssetGraph
from .auto_materialize_rule import AutoMaterializeRule
Expand Down Expand Up @@ -220,9 +220,9 @@ def get_new_latest_storage_id(self) -> Optional[int]:
def evaluate_asset(
self,
asset_key: AssetKey,
evaluation_results_by_key: Mapping[AssetKey, ConditionEvaluation],
evaluation_results_by_key: Mapping[AssetKey, AssetConditionEvaluation],
expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]],
) -> Tuple[ConditionEvaluation, AssetDaemonAssetCursor, Optional[datetime.datetime]]:
) -> Tuple[AssetConditionEvaluation, AssetDaemonAssetCursor, Optional[datetime.datetime]]:
"""Evaluates the auto materialize policy of a given asset key.
Params:
Expand All @@ -236,21 +236,25 @@ def evaluate_asset(
"""
# convert the legacy AutoMaterializePolicy to an Evaluator
auto_materialize_policy_evaluator = check.not_none(
asset_condition = check.not_none(
self.asset_graph.auto_materialize_policies_by_key.get(asset_key)
).to_auto_materialize_policy_evaluator()
).to_asset_condition()

context = AssetAutomationEvaluationContext(
context = RootAssetConditionEvaluationContext(
asset_key=asset_key,
asset_cursor=self.cursor.asset_cursor_for_key(asset_key, self.asset_graph),
root_condition=auto_materialize_policy_evaluator.condition,
root_condition=asset_condition,
instance_queryer=self.instance_queryer,
data_time_resolver=self.data_time_resolver,
daemon_context=self,
evaluation_results_by_key=evaluation_results_by_key,
expected_data_time_mapping=expected_data_time_mapping,
)
evaluation, asset_cursor = auto_materialize_policy_evaluator.evaluate(context)
condition_context = context.get_root_condition_context()

evaluation = asset_condition.evaluate(condition_context)
asset_cursor = context.get_new_asset_cursor(evaluation=evaluation)

expected_data_time = get_expected_data_time_for_asset_key(
context, will_materialize=evaluation.true_subset.size > 0
)
Expand All @@ -269,7 +273,7 @@ def get_auto_materialize_asset_evaluations(
"""
asset_cursors: List[AssetDaemonAssetCursor] = []

evaluation_results_by_key: Dict[AssetKey, ConditionEvaluation] = {}
evaluation_results_by_key: Dict[AssetKey, AssetConditionEvaluation] = {}
legacy_evaluation_results_by_key: Dict[AssetKey, AutoMaterializeAssetEvaluation] = {}
expected_data_time_mapping: Dict[AssetKey, Optional[datetime.datetime]] = defaultdict()
to_request: Set[AssetKeyPartitionKey] = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .partition import PartitionsSubset

if TYPE_CHECKING:
from .asset_automation_evaluator import ConditionEvaluation
from .asset_condition import AssetConditionEvaluation


class AssetDaemonAssetCursor(NamedTuple):
Expand All @@ -38,7 +38,7 @@ class AssetDaemonAssetCursor(NamedTuple):
asset_key: AssetKey
latest_storage_id: Optional[int]
latest_evaluation_timestamp: Optional[float]
latest_evaluation: Optional["ConditionEvaluation"]
latest_evaluation: Optional["AssetConditionEvaluation"]
materialized_requested_or_discarded_subset: AssetSubset


Expand Down Expand Up @@ -75,7 +75,7 @@ def was_previously_handled(self, asset_key: AssetKey) -> bool:
def asset_cursor_for_key(
self, asset_key: AssetKey, asset_graph: AssetGraph
) -> AssetDaemonAssetCursor:
from .asset_automation_evaluator import ConditionEvaluation
from .asset_condition import AssetConditionEvaluation

partitions_def = asset_graph.get_partitions_def(asset_key)
handled_partitions_subset = self.handled_root_partitions_by_asset_key.get(asset_key)
Expand All @@ -85,16 +85,14 @@ def asset_cursor_for_key(
handled_subset = AssetSubset(asset_key=asset_key, value=True)
else:
handled_subset = AssetSubset.empty(asset_key, partitions_def)
condition = (
check.not_none(asset_graph.get_auto_materialize_policy(asset_key))
.to_auto_materialize_policy_evaluator()
.condition
)
condition = check.not_none(
asset_graph.get_auto_materialize_policy(asset_key)
).to_asset_condition()
return AssetDaemonAssetCursor(
asset_key=asset_key,
latest_storage_id=self.latest_storage_id,
latest_evaluation_timestamp=self.latest_evaluation_timestamp,
latest_evaluation=ConditionEvaluation.from_evaluation(
latest_evaluation=AssetConditionEvaluation.from_evaluation(
condition=condition,
evaluation=self.latest_evaluation_by_asset_key.get(asset_key),
asset_graph=asset_graph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)

if TYPE_CHECKING:
from dagster._core.definitions.asset_automation_evaluator import AssetAutomationEvaluator
from dagster._core.definitions.asset_condition import AssetCondition
from dagster._core.definitions.auto_materialize_rule import (
AutoMaterializeRule,
AutoMaterializeRuleSnapshot,
Expand Down Expand Up @@ -253,38 +253,37 @@ def policy_type(self) -> AutoMaterializePolicyType:
def rule_snapshots(self) -> Sequence["AutoMaterializeRuleSnapshot"]:
return [rule.to_snapshot() for rule in self.rules]

def to_auto_materialize_policy_evaluator(self) -> "AssetAutomationEvaluator":
def to_asset_condition(self) -> "AssetCondition":
"""Converts a set of materialize / skip rules into a single binary expression."""
from .asset_automation_evaluator import (
AndAutomationCondition,
AssetAutomationEvaluator,
NotAutomationCondition,
OrAutomationCondition,
from .asset_condition import (
AndAssetCondition,
NotAssetCondition,
OrAssetCondition,
RuleCondition,
)
from .auto_materialize_rule import DiscardOnMaxMaterializationsExceededRule

materialize_condition = OrAutomationCondition(
materialize_condition = OrAssetCondition(
children=[
RuleCondition(rule)
for rule in sorted(self.materialize_rules, key=lambda rule: rule.description)
]
)
skip_condition = OrAutomationCondition(
skip_condition = OrAssetCondition(
children=[
RuleCondition(rule)
for rule in sorted(self.skip_rules, key=lambda rule: rule.description)
]
)
children = [
materialize_condition,
NotAutomationCondition([skip_condition]),
NotAssetCondition([skip_condition]),
]
if self.max_materializations_per_minute:
discard_condition = RuleCondition(
DiscardOnMaxMaterializationsExceededRule(self.max_materializations_per_minute)
)
children.append(NotAutomationCondition([discard_condition]))
children.append(NotAssetCondition([discard_condition]))

# results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d
return AssetAutomationEvaluator(condition=AndAutomationCondition(children))
return AndAssetCondition(children)
Loading

0 comments on commit d6143ec

Please sign in to comment.