Skip to content

Commit

Permalink
Push backfill policy check into generic builder path
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Jun 22, 2024
1 parent 009f2da commit 7d45074
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ..asset_out import AssetOut
from ..asset_spec import AssetSpec
from ..assets import AssetsDefinition
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
from ..backfill_policy import BackfillPolicy
from ..decorators.graph_decorator import graph
from ..events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix
from ..input import GraphIn
Expand Down Expand Up @@ -390,19 +390,6 @@ def create_assets_def_from_fn_and_decorator_args(
out_asset_key=out_asset_key,
)

with disable_dagster_warnings():
# check backfill policy is BackfillPolicyType.SINGLE_RUN for non-partitioned asset
if args.partitions_def is None:
check.param_invariant(
(
args.backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN
if args.backfill_policy
else True
),
"backfill_policy",
"Non partitioned asset can only have single run backfill policy",
)

with disable_dagster_warnings():
builder_args = DecoratorAssetsDefinitionBuilderArgs(
name=args.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
AssetsDefinition,
get_partition_mappings_from_deps,
)
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.input import In
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.output import Out
Expand Down Expand Up @@ -268,6 +268,17 @@ def __init__(
else named_ins_by_asset_key
)

if args.partitions_def is None:
check.param_invariant(
(
args.backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN
if args.backfill_policy
else True
),
"backfill_policy",
"Non partitioned asset can only have single run backfill policy",
)

@staticmethod
def for_multi_asset(
*, fn: Callable[..., Any], args: DecoratorAssetsDefinitionBuilderArgs
Expand Down

0 comments on commit 7d45074

Please sign in to comment.