Skip to content

Commit

Permalink
RuleEvaluationData -> SubsetWithMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Dec 16, 2023
1 parent aaf9fb2 commit 9e94345
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import datetime
import functools
from dataclasses import dataclass
from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional
from typing import TYPE_CHECKING, AbstractSet, Mapping, Optional, Sequence

from dagster._core.definitions.auto_materialize_rule_evaluation import RuleEvaluationResults
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.partition import PartitionsDefinition
Expand All @@ -16,6 +15,8 @@
from .asset_subset import AssetSubset

if TYPE_CHECKING:
from dagster._core.definitions.asset_automation_evaluator import AssetSubsetWithMetdata

from .asset_automation_evaluator import AutomationCondition, ConditionEvaluation
from .asset_daemon_context import AssetDaemonContext

Expand Down Expand Up @@ -101,7 +102,7 @@ def materialized_requested_or_discarded_since_previous_tick_subset(self) -> Asse
return (
self.materialized_since_previous_tick_subset
| self.latest_evaluation.true_subset
| (self.latest_evaluation.discard_subset or self.empty_subset())
| (self.latest_evaluation.discard_subset(self.root_condition) or self.empty_subset())
)

@functools.cached_property
Expand Down Expand Up @@ -200,7 +201,7 @@ def get_new_asset_cursor(self, evaluation: "ConditionEvaluation") -> AssetDaemon
previous_handled_subset
| self.materialized_requested_or_discarded_since_previous_tick_subset
| evaluation.true_subset
| (evaluation.discard_subset or self.empty_subset())
| (evaluation.discard_subset(self.root_condition) or self.empty_subset())
)
return AssetDaemonAssetCursor(
asset_key=self.asset_key,
Expand Down Expand Up @@ -299,9 +300,9 @@ def materialized_requested_or_discarded_since_previous_tick_subset(self) -> Asse
return self.asset_context.materialized_requested_or_discarded_since_previous_tick_subset

@property
def previous_tick_results(self) -> RuleEvaluationResults:
def previous_tick_subsets_with_metadata(self) -> Sequence["AssetSubsetWithMetdata"]:
"""Returns the RuleEvaluationResults calculated on the previous tick for this condition."""
return self.latest_evaluation.results if self.latest_evaluation else []
return self.latest_evaluation.subsets_with_metadata if self.latest_evaluation else []

def empty_subset(self) -> AssetSubset:
return self.asset_context.empty_subset()
Expand Down
Loading

0 comments on commit 9e94345

Please sign in to comment.