Skip to content

Commit

Permalink
Rename AssetConditionCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 17, 2024
1 parent 1bb1419 commit 7add39d
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 312 deletions.
241 changes: 128 additions & 113 deletions python_modules/dagster/dagster/_core/definitions/asset_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
FrozenSet,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)

Expand All @@ -22,12 +24,7 @@
from dagster._core.definitions.metadata import MetadataMapping, MetadataValue
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._serdes.serdes import (
FieldSerializer,
PackableValue,
UnpackContext,
WhitelistMap,
pack_value,
unpack_value,
whitelist_for_serdes,
)

Expand All @@ -38,6 +35,9 @@
from .auto_materialize_rule import AutoMaterializeRule


T = TypeVar("T")


@whitelist_for_serdes
class HistoricalAllPartitionsSubsetSentinel(NamedTuple):
"""Serializable indicator that this value was an AllPartitionsSubset at serialization time, but
Expand Down Expand Up @@ -77,58 +77,100 @@ def get_serializable_candidate_subset(
return candidate_subset


class CandidateSubsetSerializer(FieldSerializer):
def pack(
self,
candidate_subset: AssetSubset,
whitelist_map: WhitelistMap,
descent_path: str,
) -> Optional[Mapping[str, Any]]:
# On all ticks, the root condition starts with an AllPartitionsSubset as the candidate
# subset. This would be wasteful to calculate and serialize in its entirety, so we instead
# store this as `None` and reconstruct it as needed.
# This does mean that if new partitions are added between serialization time and read time,
# the candidate subset will contain those new partitions.
return pack_value(
get_serializable_candidate_subset(candidate_subset), whitelist_map, descent_path
class AssetConditionResult(NamedTuple):
condition: "AssetCondition"
start_timestamp: float
end_timestamp: float

true_subset: AssetSubset
candidate_subset: AssetSubset
subsets_with_metadata: Sequence[AssetSubsetWithMetadata]
extra_state: PackableValue

child_results: Sequence["AssetConditionResult"]

@staticmethod
def create_from_children(
context: "AssetConditionEvaluationContext",
true_subset: AssetSubset,
child_results: Sequence["AssetConditionResult"],
) -> "AssetConditionResult":
"""Returns a new AssetConditionEvaluation from the given child results."""
return AssetConditionResult(
condition=context.condition,
start_timestamp=context.start_timestamp,
end_timestamp=pendulum.now("UTC").timestamp(),
true_subset=true_subset,
candidate_subset=context.candidate_subset,
subsets_with_metadata=[],
child_results=child_results,
extra_state=None,
)

def unpack(
self,
serialized_candidate_subset: Optional[Mapping[str, Any]],
whitelist_map: WhitelistMap,
context: UnpackContext,
) -> Union[AssetSubset, HistoricalAllPartitionsSubsetSentinel]:
return unpack_value(
serialized_candidate_subset,
(AssetSubset, HistoricalAllPartitionsSubsetSentinel),
whitelist_map,
context,
@staticmethod
def create(
context: "AssetConditionEvaluationContext",
true_subset: AssetSubset,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [],
extra_state: PackableValue = None,
) -> "AssetConditionResult":
"""Returns a new AssetConditionEvaluation from the given parameters."""
return AssetConditionResult(
condition=context.condition,
start_timestamp=context.start_timestamp,
end_timestamp=pendulum.now("UTC").timestamp(),
true_subset=true_subset,
candidate_subset=context.candidate_subset,
subsets_with_metadata=subsets_with_metadata,
child_results=[],
extra_state=extra_state,
)


@whitelist_for_serdes(field_serializers={"candidate_subset": CandidateSubsetSerializer})
@whitelist_for_serdes
class AssetConditionEvaluation(NamedTuple):
"""Internal representation of the results of evaluating a node in the evaluation tree."""
"""Serializable representation of the results of evaluating a node in the evaluation tree."""

condition_snapshot: AssetConditionSnapshot
true_subset: AssetSubset
candidate_subset: Union[AssetSubset, HistoricalAllPartitionsSubsetSentinel]
start_timestamp: Optional[float]
end_timestamp: Optional[float]
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = []
child_evaluations: Sequence["AssetConditionEvaluation"] = []

true_subset: AssetSubset
candidate_subset: Union[AssetSubset, HistoricalAllPartitionsSubsetSentinel]
subsets_with_metadata: Sequence[AssetSubsetWithMetadata]

child_evaluations: Sequence["AssetConditionEvaluation"]

@property
def asset_key(self) -> AssetKey:
return self.true_subset.asset_key

@staticmethod
def from_result(result: AssetConditionResult) -> "AssetConditionEvaluation":
return AssetConditionEvaluation(
condition_snapshot=result.condition.snapshot,
start_timestamp=result.start_timestamp,
end_timestamp=result.end_timestamp,
true_subset=result.true_subset,
candidate_subset=get_serializable_candidate_subset(result.candidate_subset),
subsets_with_metadata=result.subsets_with_metadata,
child_evaluations=[
AssetConditionEvaluation.from_result(child_result)
for child_result in result.child_results
],
)

def equivalent_to_stored_evaluation(self, other: Optional["AssetConditionEvaluation"]) -> bool:
"""Returns if all fields other than `run_ids` are equal."""
"""Returns if this evaluation is functionally equivalent to the given stored evaluation.
This is used to determine if it is necessary to store this new evaluation in the database.
"""
return (
other is not None
and self.condition_snapshot == other.condition_snapshot
and self.true_subset == other.true_subset
# if any partitions are requested, then the state of the world must have meaninfully
# changed since the previous evaluation
and self.true_subset.size == 0
and other.true_subset.size == 0
# the candidate subset gets modified during serialization
and get_serializable_candidate_subset(self.candidate_subset)
== get_serializable_candidate_subset(other.candidate_subset)
Expand Down Expand Up @@ -175,65 +217,54 @@ def with_run_ids(self, run_ids: AbstractSet[str]) -> "AssetConditionEvaluationWi
return AssetConditionEvaluationWithRunIds(evaluation=self, run_ids=frozenset(run_ids))


class AssetConditionEvaluationResult(NamedTuple):
"""Return value for the evaluate method of an AssetCondition."""
@whitelist_for_serdes
class AssetConditionEvaluationState(NamedTuple):
"""Incremental state calculated during the evaluation of an AssetCondition. This may be used
on the subsequent evaluation to make the computation more efficient.
"""

condition: "AssetCondition"
evaluation: AssetConditionEvaluation
extra_values_by_unique_id: Mapping[str, PackableValue]
evaluation_timestamp: Optional[float]

max_storage_id: Optional[int]
extra_state_by_unique_id: Mapping[str, PackableValue]

@property
def asset_key(self) -> AssetKey:
return self.evaluation.asset_key

@property
def true_subset(self) -> AssetSubset:
return self.evaluation.true_subset

@staticmethod
def create_from_children(
context: "AssetConditionEvaluationContext",
true_subset: AssetSubset,
child_results: Sequence["AssetConditionEvaluationResult"],
) -> "AssetConditionEvaluationResult":
"""Returns a new AssetConditionEvaluationResult from the given child results."""
return AssetConditionEvaluationResult(
condition=context.condition,
evaluation=AssetConditionEvaluation(
context.condition.snapshot,
true_subset=true_subset,
candidate_subset=context.candidate_subset,
start_timestamp=context.start_timestamp,
end_timestamp=pendulum.now("UTC").timestamp(),
subsets_with_metadata=[],
child_evaluations=[child_result.evaluation for child_result in child_results],
),
extra_values_by_unique_id=dict(
item
for child_result in child_results
for item in child_result.extra_values_by_unique_id.items()
),
)

@staticmethod
def create(
context: "AssetConditionEvaluationContext",
true_subset: AssetSubset,
subsets_with_metadata: Sequence[AssetSubsetWithMetadata] = [],
extra_value: PackableValue = None,
) -> "AssetConditionEvaluationResult":
"""Returns a new AssetConditionEvaluationResult from the given parameters."""
return AssetConditionEvaluationResult(
condition=context.condition,
evaluation=AssetConditionEvaluation(
context.condition.snapshot,
true_subset=true_subset,
start_timestamp=context.start_timestamp,
end_timestamp=pendulum.now("UTC").timestamp(),
candidate_subset=context.candidate_subset,
subsets_with_metadata=subsets_with_metadata,
),
extra_values_by_unique_id={context.condition.unique_id: extra_value}
if extra_value
else {},
context: "AssetConditionEvaluationContext", root_result: AssetConditionResult
) -> "AssetConditionEvaluationState":
def _flatten_extra_state(r: AssetConditionResult) -> Mapping[str, PackableValue]:
extra_state: Dict[str, PackableValue] = (
{r.condition.unique_id: r.extra_state} if r.extra_state else {}
)
for child in r.child_results:
extra_state.update(_flatten_extra_state(child))
return extra_state

return AssetConditionEvaluationState(
evaluation=AssetConditionEvaluation.from_result(root_result),
evaluation_timestamp=context.evaluation_time.timestamp(),
max_storage_id=context.new_max_storage_id,
extra_state_by_unique_id=_flatten_extra_state(root_result),
)

def get_extra_state(self, condition: "AssetCondition", as_type: Type[T]) -> Optional[T]:
"""Returns the value from the extras dict for the given condition, if it exists and is of
the expected type. Otherwise, returns None.
"""
extra_state = self.extra_state_by_unique_id.get(condition.unique_id)
if isinstance(extra_state, as_type):
return extra_state
return None


@whitelist_for_serdes
class AssetConditionEvaluationWithRunIds(NamedTuple):
Expand Down Expand Up @@ -268,9 +299,7 @@ def unique_id(self) -> str:
return hashlib.md5("".join(parts).encode()).hexdigest()

@abstractmethod
def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
raise NotImplementedError()

@abstractproperty
Expand Down Expand Up @@ -341,9 +370,7 @@ def unique_id(self) -> str:
def description(self) -> str:
return self.rule.description

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
)
Expand All @@ -365,19 +392,15 @@ class AndAssetCondition(
def description(self) -> str:
return "All of"

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
child_results: List[AssetConditionEvaluationResult] = []
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
child_results: List[AssetConditionResult] = []
true_subset = context.candidate_subset
for child in self.children:
child_context = context.for_child(condition=child, candidate_subset=true_subset)
child_result = child.evaluate(child_context)
child_results.append(child_result)
true_subset &= child_result.true_subset
return AssetConditionEvaluationResult.create_from_children(
context, true_subset, child_results
)
return AssetConditionResult.create_from_children(context, true_subset, child_results)


class OrAssetCondition(
Expand All @@ -390,10 +413,8 @@ class OrAssetCondition(
def description(self) -> str:
return "Any of"

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
child_results: List[AssetConditionEvaluationResult] = []
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
child_results: List[AssetConditionResult] = []
true_subset = context.empty_subset()
for child in self.children:
child_context = context.for_child(
Expand All @@ -402,9 +423,7 @@ def evaluate(
child_result = child.evaluate(child_context)
child_results.append(child_result)
true_subset |= child_result.true_subset
return AssetConditionEvaluationResult.create_from_children(
context, true_subset, child_results
)
return AssetConditionResult.create_from_children(context, true_subset, child_results)


class NotAssetCondition(
Expand All @@ -425,15 +444,11 @@ def description(self) -> str:
def child(self) -> AssetCondition:
return self.children[0]

def evaluate(
self, context: "AssetConditionEvaluationContext"
) -> AssetConditionEvaluationResult:
def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset
)
child_result = self.child.evaluate(child_context)
true_subset = context.candidate_subset - child_result.true_subset

return AssetConditionEvaluationResult.create_from_children(
context, true_subset, [child_result]
)
return AssetConditionResult.create_from_children(context, true_subset, [child_result])
Loading

0 comments on commit 7add39d

Please sign in to comment.