Skip to content

Commit

Permalink
Fix non-subsettable multi-asset handling
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 9, 2024
1 parent 460e961 commit dd4086f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ def get_asset_condition_evaluations(
num_checked_assets = 0
num_auto_materialize_asset_keys = len(self.auto_materialize_asset_keys)

visited_multi_asset_keys = set()
for asset_key in itertools.chain(*self.asset_graph.toposort_asset_keys()):
# an asset may have already been visited if it was part of a non-subsettable multi-asset
if asset_key not in self.auto_materialize_asset_keys:
Expand All @@ -257,10 +256,6 @@ def get_asset_condition_evaluations(
f" {asset_key.to_user_string()} ({num_checked_assets}/{num_auto_materialize_asset_keys})"
)

if asset_key in visited_multi_asset_keys:
self._verbose_log_fn(f"Asset {asset_key.to_user_string()} already visited")
continue

(evaluation, asset_cursor_for_asset, expected_data_time) = self.evaluate_asset(
asset_key, evaluation_results_by_key, expected_data_time_mapping
)
Expand Down Expand Up @@ -288,7 +283,14 @@ def get_asset_condition_evaluations(
if num_requested > 0:
for neighbor_key in self.asset_graph.get_required_multi_asset_keys(asset_key):
expected_data_time_mapping[neighbor_key] = expected_data_time
visited_multi_asset_keys.add(neighbor_key)

# make sure that the true_subset of the neighbor is accurate
if neighbor_key in evaluation_results_by_key:
neighbor_evaluation = evaluation_results_by_key[neighbor_key]
evaluation_results_by_key[neighbor_key] = neighbor_evaluation._replace(
true_subset=neighbor_evaluation.true_subset
| evaluation.true_subset._replace(asset_key=neighbor_key)
)
to_request |= {
ap._replace(asset_key=neighbor_key)
for ap in evaluation.true_subset.asset_partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeRuleEvaluationData,
)
from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.definitions.events import AssetKeyPartitionKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import in_process_executor
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.host_representation.origin import InProcessCodeLocationOrigin
from dagster._core.scheduler.instigation import TickStatus
from dagster._core.storage.tags import PARTITION_NAME_TAG
Expand Down Expand Up @@ -171,6 +173,12 @@ class AssetSpecWithPartitionsDef(
...


class MultiAssetSpec(NamedTuple):
specs: Sequence[AssetSpec]
partitions_def: Optional[PartitionsDefinition] = None
can_subset: bool = False


class AssetDaemonScenarioState(NamedTuple):
"""Specifies the state of a given AssetDaemonScenario. This state can be modified by changing
the set of asset definitions it contains, executing runs, updating the time, evaluating ticks, etc.
Expand All @@ -185,7 +193,7 @@ class AssetDaemonScenarioState(NamedTuple):
current_time (datetime): The current time of the scenario.
"""

asset_specs: Sequence[Union[AssetSpec, AssetSpecWithPartitionsDef]]
asset_specs: Sequence[Union[AssetSpec, AssetSpecWithPartitionsDef, MultiAssetSpec]]
current_time: datetime.datetime = pendulum.now()
run_requests: Sequence[RunRequest] = []
serialized_cursor: str = AssetDaemonCursor.empty(0).serialize()
Expand All @@ -207,26 +215,35 @@ def compute_fn(context: AssetExecutionContext) -> None:
AssetKey.from_coercible(s)
for s in json.loads(context.run.tags.get(FAIL_TAG) or "[]")
}
if context.asset_key in fail_keys:
raise Exception("Asset failed")
for asset_key in context.selected_asset_keys:
if asset_key in fail_keys:
raise Exception("Asset failed")

assets = []
params = {
"key",
"deps",
"group_name",
"code_version",
"auto_materialize_policy",
"freshness_policy",
"partitions_def",
}
for spec in self.asset_specs:
assets.append(
asset(
compute_fn=compute_fn,
**{k: v for k, v in spec._asdict().items() if k in params},
if isinstance(spec, MultiAssetSpec):

@multi_asset(**spec._asdict())
def _multi_asset(context: AssetExecutionContext):
return compute_fn(context)

assets.append(_multi_asset)
else:
params = {
"key",
"deps",
"group_name",
"code_version",
"auto_materialize_policy",
"freshness_policy",
"partitions_def",
}
assets.append(
asset(
compute_fn=compute_fn,
**{k: v for k, v in spec._asdict().items() if k in params},
)
)
)
return assets

@property
Expand All @@ -243,16 +260,28 @@ def with_asset_properties(
"""Convenience method to update the properties of one or more assets in the scenario state."""
new_asset_specs = []
for spec in self.asset_specs:
if keys is None or spec.key in {AssetKey.from_coercible(key) for key in keys}:
if "partitions_def" in kwargs:
# partitions_def is not a field on AssetSpec, so we need to do this hack
new_asset_specs.append(
AssetSpecWithPartitionsDef(**{**spec._asdict(), **kwargs})
)
else:
new_asset_specs.append(spec._replace(**kwargs))
if isinstance(spec, MultiAssetSpec):
partitions_def = kwargs.get("partitions_def", spec.partitions_def)
new_multi_specs = [
s._replace(**{k: v for k, v in kwargs.items() if k != "partitions_def"})
if keys is None or s.key in keys
else s
for s in spec.specs
]
new_asset_specs.append(
spec._replace(partitions_def=partitions_def, specs=new_multi_specs)
)
else:
new_asset_specs.append(spec)
if keys is None or spec.key in {AssetKey.from_coercible(key) for key in keys}:
if "partitions_def" in kwargs:
# partitions_def is not a field on AssetSpec, so we need to do this hack
new_asset_specs.append(
AssetSpecWithPartitionsDef(**{**spec._asdict(), **kwargs})
)
else:
new_asset_specs.append(spec._replace(**kwargs))
else:
new_asset_specs.append(spec)
return self._replace(asset_specs=new_asset_specs)

def with_serialized_cursor(self, serialized_cursor: str) -> "AssetDaemonScenarioState":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
HourlyPartitionsDefinition,
)

from ..asset_daemon_scenario import AssetDaemonScenarioState
from ..asset_daemon_scenario import AssetDaemonScenarioState, MultiAssetSpec

############
# PARTITIONS
Expand Down Expand Up @@ -63,6 +63,14 @@
]
)

three_assets_not_subsettable = AssetDaemonScenarioState(
asset_specs=[
MultiAssetSpec(
specs=[AssetSpec("A"), AssetSpec("B"), AssetSpec("C")],
)
]
)

##################
# PARTITION STATES
##################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
hourly_partitions_def,
one_asset,
one_asset_depends_on_two,
three_assets_not_subsettable,
time_partitions_start_str,
)

Expand Down Expand Up @@ -56,6 +57,23 @@ def get_cron_policy(
.evaluate_tick()
.assert_requested_runs(),
),
AssetDaemonScenario(
id="basic_hourly_cron_unpartitioned_multi_asset",
initial_state=three_assets_not_subsettable.with_asset_properties(
auto_materialize_policy=get_cron_policy(basic_hourly_cron_rule)
).with_current_time("2020-01-01T00:05"),
execution_fn=lambda state: state.evaluate_tick()
.assert_requested_runs(run_request(["A", "B", "C"]))
.with_current_time_advanced(seconds=30)
.evaluate_tick()
.assert_requested_runs()
.with_current_time_advanced(hours=1)
.evaluate_tick()
.assert_requested_runs(run_request(["A", "B", "C"]))
.with_current_time_advanced(seconds=30)
.evaluate_tick()
.assert_requested_runs(),
),
AssetDaemonScenario(
id="basic_hourly_cron_partitioned",
initial_state=one_asset.with_asset_properties(
Expand Down

0 comments on commit dd4086f

Please sign in to comment.