Skip to content

Commit

Permalink
Rename AssetConditionCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 9, 2024
1 parent dd4086f commit 26af907
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 266 deletions.
110 changes: 64 additions & 46 deletions python_modules/dagster/dagster/_core/definitions/asset_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)

Expand Down Expand Up @@ -41,6 +43,9 @@
from .auto_materialize_rule import AutoMaterializeRule


T = TypeVar("T")


@whitelist_for_serdes
class HistoricalAllPartitionsSubset(NamedTuple):
"""Serializable indicator that this value was an AllPartitionsSubset at serialization time, but
Expand Down Expand Up @@ -111,7 +116,7 @@ def unpack(


@whitelist_for_serdes(field_serializers={"candidate_subset": CandidateSubsetSerializer})
class AssetConditionEvaluation(NamedTuple):
class AssetConditionEvaluationResult(NamedTuple):
"""Internal representation of the results of evaluating a node in the evaluation tree."""

condition_snapshot: AssetConditionSnapshot
Expand All @@ -120,7 +125,7 @@ class AssetConditionEvaluation(NamedTuple):
start_timestamp: Optional[float]
end_timestamp: Optional[float]
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = []
child_evaluations: Sequence["AssetConditionEvaluation"] = []
child_evaluations: Sequence["AssetConditionEvaluationResult"] = []

@property
def asset_key(self) -> AssetKey:
Expand All @@ -141,7 +146,9 @@ def get_candidate_subset(
)
return self.candidate_subset

def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluation"]) -> bool:
def equivalent_to_stored_evaluation(
self, other: Optional["AssetConditionEvaluationResult"]
) -> bool:
"""Returns if all fields other than `run_ids` are equal."""
return (
other is not None
Expand Down Expand Up @@ -178,7 +185,9 @@ def get_requested_or_discarded_subset(self, condition: "AssetCondition") -> Asse
else:
return self.true_subset | discarded_subset

def for_child(self, child_condition: "AssetCondition") -> Optional["AssetConditionEvaluation"]:
def for_child(
self, child_condition: "AssetCondition"
) -> Optional["AssetConditionEvaluationResult"]:
"""Returns the evaluation of a given child condition by finding the child evaluation that
has an identical hash to the given condition.
"""
Expand All @@ -193,73 +202,92 @@ def with_run_ids(self, run_ids: AbstractSet[str]) -> "AssetConditionEvaluationWi
return AssetConditionEvaluationWithRunIds(evaluation=self, run_ids=frozenset(run_ids))


class AssetConditionEvaluationResult(NamedTuple):
@whitelist_for_serdes
class AssetConditionEvaluationInfo(NamedTuple):
"""Return value for the evaluate method of an AssetCondition."""

condition: "AssetCondition"
evaluation: AssetConditionEvaluation
extra_values_by_unique_id: Mapping[str, PackableValue]
asset_key: AssetKey
evaluation_result: AssetConditionEvaluationResult

max_storage_id: Optional[int]
timestamp: Optional[float]
extra_state_by_unique_id: Mapping[str, PackableValue]

@property
def true_subset(self) -> AssetSubset:
return self.evaluation.true_subset
return self.evaluation_result.true_subset

@staticmethod
def create_from_children(
context: "AssetConditionEvaluationContext",
true_subset: AssetSubset,
child_results: Sequence["AssetConditionEvaluationResult"],
) -> "AssetConditionEvaluationResult":
child_results: Sequence["AssetConditionEvaluationInfo"],
) -> "AssetConditionEvaluationInfo":
"""Returns a new AssetConditionEvaluationResult from the given child results."""
return AssetConditionEvaluationResult(
condition=context.condition,
evaluation=AssetConditionEvaluation(
context.condition.snapshot,
return AssetConditionEvaluationInfo(
asset_key=context.asset_key,
evaluation_result=AssetConditionEvaluationResult(
condition_snapshot=context.condition.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
start_timestamp=context.start_timestamp,
end_timestamp=pendulum.now("UTC").timestamp(),
subsets_with_metadata=[],
child_evaluations=[child_result.evaluation for child_result in child_results],
child_evaluations=[
child_result.evaluation_result for child_result in child_results
],
),
extra_values_by_unique_id=dict(
extra_state_by_unique_id=dict(
item
for child_result in child_results
for item in child_result.extra_values_by_unique_id.items()
for item in child_result.extra_state_by_unique_id.items()
),
max_storage_id=context.new_max_storage_id,
timestamp=context.evaluation_time.timestamp(),
)

@staticmethod
def create(
context: "AssetConditionEvaluationContext",
true_subset: AssetSubset,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [],
extra_value: PackableValue = None,
) -> "AssetConditionEvaluationResult":
extra_state: PackableValue = None,
) -> "AssetConditionEvaluationInfo":
"""Returns a new AssetConditionEvaluationResult from the given parameters."""
return AssetConditionEvaluationResult(
condition=context.condition,
evaluation=AssetConditionEvaluation(
return AssetConditionEvaluationInfo(
asset_key=context.asset_key,
evaluation_result=AssetConditionEvaluationResult(
context.condition.snapshot,
true_subset=true_subset,
start_timestamp=context.start_timestamp,
end_timestamp=pendulum.now("UTC").timestamp(),
candidate_subset=context.candidate_subset,
subsets_with_metadata=subsets_with_metadata,
),
extra_values_by_unique_id={context.condition.unique_id: extra_value}
if extra_value
extra_state_by_unique_id={context.condition.unique_id: extra_state}
if extra_state
else {},
max_storage_id=context.new_max_storage_id,
timestamp=context.evaluation_time.timestamp(),
)

def get_extra_state(self, condition: "AssetCondition", as_type: Type[T]) -> Optional[T]:
"""Returns the value from the extras dict for the given condition, if it exists and is of
the expected type. Otherwise, returns None.
"""
extra_state = self.extra_state_by_unique_id.get(condition.unique_id)
if isinstance(extra_state, as_type):
return extra_state
return None


@whitelist_for_serdes
class AssetConditionEvaluationWithRunIds(NamedTuple):
"""A union of an AssetConditionEvaluation and the set of run IDs that have been launched in
response to it.
"""

evaluation: AssetConditionEvaluation
evaluation: AssetConditionEvaluationResult
run_ids: FrozenSet[str]

@property
Expand All @@ -286,9 +314,7 @@ def unique_id(self) -> str:
return hashlib.md5("".join(parts).encode()).hexdigest()

@abstractmethod
def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo:
raise NotImplementedError()

@abstractproperty
Expand Down Expand Up @@ -359,9 +385,7 @@ def unique_id(self) -> str:
def description(self) -> str:
return self.rule.description

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
)
Expand All @@ -383,17 +407,15 @@ class AndAssetCondition(
def description(self) -> str:
return "All of"

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
child_results: List[AssetConditionEvaluationResult] = []
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo:
child_results: List[AssetConditionEvaluationInfo] = []
true_subset = context.candidate_subset
for child in self.children:
child_context = context.for_child(condition=child, candidate_subset=true_subset)
child_result = child.evaluate(child_context)
child_results.append(child_result)
true_subset &= child_result.true_subset
return AssetConditionEvaluationResult.create_from_children(
return AssetConditionEvaluationInfo.create_from_children(
context, true_subset, child_results
)

Expand All @@ -408,10 +430,8 @@ class OrAssetCondition(
def description(self) -> str:
return "Any of"

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
child_results: List[AssetConditionEvaluationResult] = []
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo:
child_results: List[AssetConditionEvaluationInfo] = []
true_subset = context.empty_subset()
for child in self.children:
child_context = context.for_child(
Expand All @@ -420,7 +440,7 @@ def evaluate(
child_result = child.evaluate(child_context)
child_results.append(child_result)
true_subset |= child_result.true_subset
return AssetConditionEvaluationResult.create_from_children(
return AssetConditionEvaluationInfo.create_from_children(
context, true_subset, child_results
)

Expand All @@ -443,15 +463,13 @@ def description(self) -> str:
def child(self) -> AssetCondition:
return self.children[0]

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionEvaluationInfo:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset
)
child_result = self.child.evaluate(child_context)
true_subset = context.candidate_subset - child_result.true_subset

return AssetConditionEvaluationResult.create_from_children(
return AssetConditionEvaluationInfo.create_from_children(
context, true_subset, [child_result]
)
Loading

0 comments on commit 26af907

Please sign in to comment.