Skip to content

Commit

Permalink
Update AutoMaterializeRul return type
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 8, 2024
1 parent 1b2b0d4 commit 59ad413
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,12 @@ def evaluate(
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Evaluating rule: {self.rule.to_snapshot()}"
)
true_subset, subsets_with_metadata, extra_value = self.rule.evaluate_for_asset(context)
evaluation_result = self.rule.evaluate_for_asset(context)
context.root_context.daemon_context._verbose_log_fn( # noqa
f"Rule returned {true_subset.size} partitions" f"{true_subset}"
)
return AssetConditionEvaluationResult.create(
context=context,
true_subset=true_subset,
subsets_with_metadata=subsets_with_metadata,
extra_value=extra_value,
f"Rule returned {evaluation_result.true_subset.size} partitions:"
f"{evaluation_result.true_subset}"
)
return evaluation_result


class AndAssetCondition(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import dataclasses
import datetime
import functools
import operator
from collections import defaultdict
from dataclasses import dataclass
from typing import TYPE_CHECKING, AbstractSet, Any, Callable, Mapping, Optional, Sequence, Tuple
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
FrozenSet,
Mapping,
Optional,
Sequence,
Tuple,
)

from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
Expand Down Expand Up @@ -290,5 +303,54 @@ def will_update_asset_partition(self, asset_partition: AssetKeyPartitionKey) ->
return False
return asset_partition in parent_evaluation.true_subset

def add_evaluation_data_from_previous_tick(
self,
asset_partitions_by_frozen_metadata: Mapping[
FrozenSet[Tuple[str, MetadataValue]], AbstractSet[AssetKeyPartitionKey]
],
ignore_subset: AssetSubset,
) -> Tuple[AssetSubset, Sequence["AssetSubsetWithMetadata"]]:
"""Combines information calculated on this tick with information from the previous tick,
returning a tuple of the combined true subset and the combined subsets with metadata.
Args:
asset_partitions_by_frozen_metadata: A mapping from metadata to the set of asset
partitions that the rule applies to.
ignore_subset: An AssetSubset which represents information that we should *not* carry
forward from the previous tick.
"""
from .asset_condition import AssetSubsetWithMetadata

mapping = defaultdict(lambda: self.empty_subset())
for frozen_metadata, asset_partitions in asset_partitions_by_frozen_metadata.items():
mapping[frozen_metadata] = AssetSubset.from_asset_partitions_set(
self.asset_key, self.partitions_def, asset_partitions
)

# get the set of all things we have metadata for
has_new_metadata_subset = functools.reduce(
operator.or_, mapping.values(), self.empty_subset()
)

# don't use information from the previous tick if we have explicit metadata for it or
# we've explicitly said to ignore it
ignore_subset = has_new_metadata_subset | ignore_subset

for elt in self.previous_subsets_with_metadata:
carry_forward_subset = elt.subset - ignore_subset
if carry_forward_subset.size > 0:
mapping[elt.frozen_metadata] |= carry_forward_subset

# for now, an asset is in the "true" subset if and only if we have some metadata for it
true_subset = functools.reduce(operator.or_, mapping.values(), self.empty_subset())

return (
self.candidate_subset & true_subset,
[
AssetSubsetWithMetadata(subset, dict(metadata))
for metadata, subset in mapping.items()
],
)

def empty_subset(self) -> AssetSubset:
return AssetSubset.empty(self.asset_key, self.partitions_def)
Loading

0 comments on commit 59ad413

Please sign in to comment.