diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 8db3dd77986eb..824d17e6519c8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -179,7 +179,7 @@ def create_and_launch_partition_backfill( graphene_info.context.instance, create_workspace=lambda: graphene_info.context, backfill_job=backfill, - partition_names=chunk, + partition_names_or_ranges=chunk, ) if run_id is not None ) diff --git a/python_modules/dagster/dagster/_cli/job.py b/python_modules/dagster/dagster/_cli/job.py index 52f9837704151..0389c8e330813 100644 --- a/python_modules/dagster/dagster/_cli/job.py +++ b/python_modules/dagster/dagster/_cli/job.py @@ -771,7 +771,9 @@ def _execute_backfill_command_at_location( external_job, job_partition_set, backfill_job, - partition_data, + partition_data.name, + partition_data.tags, + partition_data.run_config, ) if dagster_run: instance.submit_run(dagster_run.run_id, workspace) diff --git a/python_modules/dagster/dagster/_core/definitions/job_definition.py b/python_modules/dagster/dagster/_core/definitions/job_definition.py index 7f9a863c7e350..d7c9f70ed403a 100644 --- a/python_modules/dagster/dagster/_core/definitions/job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/job_definition.py @@ -2,7 +2,7 @@ import os import warnings from datetime import datetime -from functools import update_wrapper +from functools import cached_property, update_wrapper from typing import ( TYPE_CHECKING, AbstractSet, @@ -26,6 +26,7 @@ from dagster._config.validate import validate_config from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_selection import AssetSelection +from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.dependency import ( Node, NodeHandle, @@ -416,6 +417,20 @@ def partitions_def(self) -> Optional[PartitionsDefinition]: """ return None if not self.partitioned_config else self.partitioned_config.partitions_def + @cached_property + def backfill_policy(self) -> Optional[BackfillPolicy]: + from dagster._core.definitions.asset_job import ASSET_BASE_JOB_PREFIX + + backfill_policies = { + self.asset_layer.get(k).backfill_policy for k in self.asset_layer.executable_asset_keys + } + if not self.name.startswith(ASSET_BASE_JOB_PREFIX) and not backfill_policies: + check.invariant( + len(backfill_policies) <= 1, + "All assets in non-asset base job a job must have the same backfill policy.", + ) + return next(iter(backfill_policies), None) + @property def hook_defs(self) -> AbstractSet[HookDefinition]: return self._hook_defs diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index fc9740f4323fe..8958f1bbe629e 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -4,13 +4,17 @@ import dagster._check as check from dagster._annotations import deprecated from dagster._core.definitions import AssetKey -from dagster._core.definitions.asset_job import build_asset_job, get_asset_graph_for_job +from dagster._core.definitions.asset_job import ( + build_asset_job, + get_asset_graph_for_job, +) from dagster._core.definitions.run_request import RunRequest from dagster._core.errors import DagsterInvalidDefinitionError from dagster._core.instance import DynamicPartitionsStore from .config import ConfigMapping from .metadata import RawMetadataValue +from .partition import PartitionedConfig from .policy import RetryPolicy if TYPE_CHECKING: @@ -19,7 +23,6 @@ ExecutorDefinition, HookDefinition, JobDefinition, - PartitionedConfig, PartitionsDefinition, ResourceDefinition, ) @@ -187,6 +190,32 @@ def resolve( f'Error resolving selection for asset job "{self.name}": {e}' ) from e + # Require that all assets in the job have the same backfill policy + backfill_policies = { + job_asset_graph.get(k).backfill_policy for k in job_asset_graph.executable_asset_keys + } + if len(backfill_policies) > 1: + raise DagsterInvalidDefinitionError( + f"Asset job {self.name} materializes asset with varying BackfillPolicies. All assets" + " in a job must share the same BackfillPolicy." + ) + + # Error if a PartitionedConfig is defined and any target asset has a backfill policy that + # materializes anything other than a single partition per run. This is because + # PartitionedConfig is a function that maps single partition keys to run config, so it's + # behavior is undefined for multiple-partition runs. + backfill_policy = next(iter(backfill_policies), None) + if ( + backfill_policy + and backfill_policy.max_partitions_per_run != 1 + and isinstance(self.config, PartitionedConfig) + ): + raise DagsterInvalidDefinitionError( + f"Asset job {self.name} materializes an asset with a BackfillPolicy targeting multiple partitions per run," + "but a PartitionedConfig was provided. PartitionedConfigs are not supported for " + "jobs with multi-partition-per-run backfill policies." + ) + return build_asset_job( self.name, asset_graph=job_asset_graph, diff --git a/python_modules/dagster/dagster/_core/execution/job_backfill.py b/python_modules/dagster/dagster/_core/execution/job_backfill.py index 99056c7744ede..3d07a143b8db2 100644 --- a/python_modules/dagster/dagster/_core/execution/job_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/job_backfill.py @@ -1,8 +1,10 @@ import logging import time -from typing import Callable, Iterable, Mapping, Optional, Sequence, Tuple, cast +from typing import Any, Callable, Iterable, Mapping, Optional, Sequence, Tuple, Union, cast import dagster._check as check +from dagster._core.definitions.partition import PartitionsDefinition +from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterBackfillFailedError from dagster._core.execution.plan.resume_retry import ReexecutionStrategy @@ -14,12 +16,13 @@ ExternalPartitionSet, ) from dagster._core.remote_representation.external_data import ( - ExternalPartitionExecutionParamData, ExternalPartitionSetExecutionParamData, ) from dagster._core.remote_representation.origin import RemotePartitionSetOrigin from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunsFilter from dagster._core.storage.tags import ( + ASSET_PARTITION_RANGE_END_TAG, + ASSET_PARTITION_RANGE_START_TAG, PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, PARTITION_SET_TAG, @@ -58,7 +61,7 @@ def execute_job_backfill_iteration( f" {backfill.last_submitted_partition_name}" ) - _check_repo_has_partition_set(workspace_process_context, backfill) + partition_set = _get_partition_set(workspace_process_context, backfill) has_more = True while has_more: @@ -66,7 +69,11 @@ def execute_job_backfill_iteration( break chunk, checkpoint, has_more = _get_partitions_chunk( - instance, logger, backfill, CHECKPOINT_COUNT + instance, + logger, + backfill, + CHECKPOINT_COUNT, + partition_set, ) check_for_debug_crash(debug_crash_flags, "BEFORE_SUBMIT") @@ -101,9 +108,9 @@ def execute_job_backfill_iteration( yield None -def _check_repo_has_partition_set( +def _get_partition_set( workspace_process_context: IWorkspaceProcessContext, backfill_job: PartitionBackfill -) -> None: +) -> ExternalPartitionSet: origin = cast(RemotePartitionSetOrigin, backfill_job.partition_set_origin) location_name = origin.repository_origin.code_location_origin.location_name @@ -124,6 +131,27 @@ def _check_repo_has_partition_set( raise DagsterBackfillFailedError( f"Could not find partition set {partition_set_name} in repository {repo_name}. " ) + return external_repo.get_external_partition_set(partition_set_name) + + +def _subdivide_partition_key_range( + partitions_def: PartitionsDefinition, + partition_key_range: PartitionKeyRange, + instance: DagsterInstance, + max_range_size: Optional[int], +) -> Sequence[PartitionKeyRange]: + """Take a partition key range and subdivide it into smaller ranges of size max_range_size. This + is done to satisfy backfill policies that limit the maximum number of partitions that can be + materialized in a run. + """ + if max_range_size is None: + return [partition_key_range] + else: + keys = partitions_def.get_partition_keys_in_range( + partition_key_range, dynamic_partitions_store=instance + ) + chunks = [keys[i : i + max_range_size] for i in range(0, len(keys), max_range_size)] + return [PartitionKeyRange(start=chunk[0], end=chunk[-1]) for chunk in chunks] def _get_partitions_chunk( @@ -131,7 +159,8 @@ def _get_partitions_chunk( logger: logging.Logger, backfill_job: PartitionBackfill, chunk_size: int, -) -> Tuple[Sequence[str], str, bool]: + partition_set: ExternalPartitionSet, +) -> Tuple[Sequence[Union[str, PartitionKeyRange]], str, bool]: partition_names = cast(Sequence[str], backfill_job.partition_names) checkpoint = backfill_job.last_submitted_partition_name @@ -151,20 +180,40 @@ def _get_partitions_chunk( partition_names.index(checkpoint) + 1 if checkpoint and checkpoint in partition_names else 0 ) partition_names = partition_names[initial_checkpoint:] - has_more = chunk_size < len(partition_names) - partitions_chunk = partition_names[:chunk_size] - next_checkpoint = partitions_chunk[-1] - to_skip = set(partitions_chunk).intersection(completed_partitions) - if to_skip: - logger.info( - f"Found {len(to_skip)} existing runs for backfill {backfill_job.backfill_id}, skipping" + partitions_def = partition_set.get_partitions_definition() + backfill_policy = partition_set.backfill_policy + if backfill_policy and backfill_policy.max_partitions_per_run != 1: + partitions_subset = partitions_def.subset_with_partition_keys(partition_names) + partition_key_ranges = partitions_subset.get_partition_key_ranges( + partitions_def, dynamic_partitions_store=instance ) - to_submit = [ - partition_name - for partition_name in partitions_chunk - if partition_name not in completed_partitions - ] + subdivided_ranges = [ + sr + for r in partition_key_ranges + for sr in _subdivide_partition_key_range( + partitions_def, r, instance, backfill_policy.max_partitions_per_run + ) + ] + ranges_to_launch = subdivided_ranges[:chunk_size] + has_more = chunk_size < len(subdivided_ranges) + next_checkpoint = ranges_to_launch[-1].end + to_submit = ranges_to_launch + else: + has_more = chunk_size < len(partition_names) + partitions_chunk = partition_names[:chunk_size] + next_checkpoint = partitions_chunk[-1] + to_skip = set(partitions_chunk).intersection(completed_partitions) + if to_skip: + logger.info( + f"Found {len(to_skip)} existing runs for backfill {backfill_job.backfill_id}, skipping" + ) + to_submit = [ + partition_name + for partition_name in partitions_chunk + if partition_name not in completed_partitions + ] + return to_submit, next_checkpoint, has_more @@ -172,7 +221,7 @@ def submit_backfill_runs( instance: DagsterInstance, create_workspace: Callable[[], BaseWorkspaceRequestContext], backfill_job: PartitionBackfill, - partition_names: Optional[Sequence[str]] = None, + partition_names_or_ranges: Optional[Sequence[Union[str, PartitionKeyRange]]] = None, ) -> Iterable[Optional[str]]: """Returns the run IDs of the submitted runs.""" origin = cast(RemotePartitionSetOrigin, backfill_job.partition_set_origin) @@ -181,12 +230,11 @@ def submit_backfill_runs( repo_name = repository_origin.repository_name location_name = repository_origin.code_location_origin.location_name - if not partition_names: - partition_names = cast(Sequence[str], backfill_job.partition_names) + if not partition_names_or_ranges: + partition_names_or_ranges = cast(Sequence[str], backfill_job.partition_names) workspace = create_workspace() code_location = workspace.get_code_location(location_name) - check.invariant( code_location.has_repository(repo_name), f"Could not find repository {repo_name} in location {code_location.name}", @@ -194,11 +242,7 @@ def submit_backfill_runs( external_repo = code_location.get_repository(repo_name) partition_set_name = origin.partition_set_name external_partition_set = external_repo.get_external_partition_set(partition_set_name) - result = code_location.get_external_partition_set_execution_param_data( - external_repo.handle, partition_set_name, partition_names, instance - ) - assert isinstance(result, ExternalPartitionSetExecutionParamData) if backfill_job.asset_selection: # need to make another call to the user code location to properly subset # for an asset selection @@ -212,7 +256,51 @@ def submit_backfill_runs( external_job = code_location.get_external_job(pipeline_selector) else: external_job = external_repo.get_full_external_job(external_partition_set.job_name) - for partition_data in result.partition_data: + + partition_data_target = check.is_list( + [partition_names_or_ranges[0].start] + if isinstance(partition_names_or_ranges[0], PartitionKeyRange) + else partition_names_or_ranges, + of_type=str, + ) + partition_set_execution_data = code_location.get_external_partition_set_execution_param_data( + external_repo.handle, + partition_set_name, + partition_data_target, + instance, + ) + assert isinstance(partition_set_execution_data, ExternalPartitionSetExecutionParamData) + + # Partition-scoped run config is prohibited at the definitions level for a jobs that materialize + # ranges, so we can assume that all partition data will have the same run config and tags as the + # first partition. + tags_by_key_or_range: Mapping[Union[str, PartitionKeyRange], Mapping[str, str]] + run_config_by_key_or_range: Mapping[Union[str, PartitionKeyRange], Mapping[str, Any]] + if isinstance(partition_names_or_ranges[0], PartitionKeyRange): + run_config = partition_set_execution_data.partition_data[0].run_config + tags = { + k: v + for k, v in partition_set_execution_data.partition_data[0].tags.items() + if k != PARTITION_NAME_TAG + } + run_config_by_key_or_range = {r: run_config for r in partition_names_or_ranges} + tags_by_key_or_range = { + r: { + **tags, + ASSET_PARTITION_RANGE_START_TAG: r.start, + ASSET_PARTITION_RANGE_END_TAG: r.end, + } + for r in check.is_list(partition_names_or_ranges, of_type=PartitionKeyRange) + } + else: + run_config_by_key_or_range = { + pd.name: pd.run_config for pd in partition_set_execution_data.partition_data + } + tags_by_key_or_range = { + pd.name: pd.tags for pd in partition_set_execution_data.partition_data + } + + for key_or_range in partition_names_or_ranges: # Refresh the code location in case the workspace has reloaded mid-backfill workspace = create_workspace() code_location = workspace.get_code_location(location_name) @@ -223,7 +311,9 @@ def submit_backfill_runs( external_job, external_partition_set, backfill_job, - partition_data, + key_or_range, + run_tags=tags_by_key_or_range[key_or_range], + run_config=run_config_by_key_or_range[key_or_range], ) if dagster_run: # we skip runs in certain cases, e.g. we are running a `from_failure` backfill job @@ -240,7 +330,9 @@ def create_backfill_run( external_pipeline: ExternalJob, external_partition_set: ExternalPartitionSet, backfill_job: PartitionBackfill, - partition_data: ExternalPartitionExecutionParamData, + partition_key_or_range: Union[str, PartitionKeyRange], + run_tags: Mapping[str, str], + run_config: Mapping[str, Any], ) -> Optional[DagsterRun]: from dagster._daemon.daemon import get_telemetry_daemon_session_id @@ -256,7 +348,7 @@ def create_backfill_run( tags = merge_dicts( external_pipeline.tags, - partition_data.tags, + run_tags, DagsterRun.tags_for_backfill_id(backfill_job.backfill_id), backfill_job.tags, ) @@ -273,7 +365,7 @@ def create_backfill_run( op_selection = external_partition_set.op_selection elif backfill_job.from_failure: - last_run = _fetch_last_run(instance, external_partition_set, partition_data.name) + last_run = _fetch_last_run(instance, external_partition_set, partition_key_or_range) if not last_run or last_run.status != DagsterRunStatus.FAILURE: return None return instance.create_reexecuted_run( @@ -282,12 +374,12 @@ def create_backfill_run( external_job=external_pipeline, strategy=ReexecutionStrategy.FROM_FAILURE, extra_tags=tags, - run_config=partition_data.run_config, + run_config=run_config, use_parent_run_tags=False, # don't inherit tags from the previous run ) else: # backfill_job.reexecution_steps - last_run = _fetch_last_run(instance, external_partition_set, partition_data.name) + last_run = _fetch_last_run(instance, external_partition_set, partition_key_or_range) parent_run_id = last_run.run_id if last_run else None root_run_id = (last_run.root_run_id or last_run.run_id) if last_run else None if parent_run_id and root_run_id: @@ -309,7 +401,7 @@ def create_backfill_run( external_execution_plan = code_location.get_external_execution_plan( external_pipeline, - partition_data.run_config, + run_config, step_keys_to_execute=step_keys_to_execute, known_state=known_state, instance=instance, @@ -322,7 +414,7 @@ def create_backfill_run( job_name=external_pipeline.name, run_id=make_new_run_id(), resolved_op_selection=resolved_op_selection, - run_config=partition_data.run_config, + run_config=run_config, step_keys_to_execute=step_keys_to_execute, tags=tags, root_run_id=root_run_id, @@ -340,19 +432,29 @@ def create_backfill_run( def _fetch_last_run( - instance: DagsterInstance, external_partition_set: ExternalPartitionSet, partition_name: str + instance: DagsterInstance, + external_partition_set: ExternalPartitionSet, + partition_key_or_range: Union[str, PartitionKeyRange], ) -> Optional[DagsterRun]: check.inst_param(instance, "instance", DagsterInstance) check.inst_param(external_partition_set, "external_partition_set", ExternalPartitionSet) - check.str_param(partition_name, "partition_name") + check.str_param(partition_key_or_range, "partition_name") + + tags = ( + { + PARTITION_NAME_TAG: partition_key_or_range, + } + if isinstance(partition_key_or_range, str) + else { + ASSET_PARTITION_RANGE_START_TAG: partition_key_or_range.start, + ASSET_PARTITION_RANGE_END_TAG: partition_key_or_range.end, + } + ) runs = instance.get_runs( RunsFilter( job_name=external_partition_set.job_name, - tags={ - PARTITION_SET_TAG: external_partition_set.name, - PARTITION_NAME_TAG: partition_name, - }, + tags={PARTITION_SET_TAG: external_partition_set.name, **tags}, ), limit=1, ) diff --git a/python_modules/dagster/dagster/_core/remote_representation/external.py b/python_modules/dagster/dagster/_core/remote_representation/external.py index 2262d813868af..6cc0f68585d41 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external.py @@ -19,10 +19,12 @@ from dagster import AssetSelection from dagster._config.snap import ConfigFieldSnap, ConfigSchemaSnapshot from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import ( MetadataValue, ) +from dagster._core.definitions.partition import PartitionsDefinition from dagster._core.definitions.run_request import InstigatorType from dagster._core.definitions.schedule_definition import DefaultScheduleStatus from dagster._core.definitions.selector import ( @@ -520,7 +522,7 @@ def root_config_key(self) -> Optional[str]: return self.get_mode_def_snap(DEFAULT_MODE_NAME).root_config_key @property - def tags(self) -> Mapping[str, object]: + def tags(self) -> Mapping[str, str]: return self._job_index.job_snapshot.tags @property @@ -1010,6 +1012,10 @@ def mode(self) -> Optional[str]: def job_name(self) -> str: return self._external_partition_set_data.job_name + @property + def backfill_policy(self) -> Optional[BackfillPolicy]: + return self._external_partition_set_data.backfill_policy + @property def repository_handle(self) -> RepositoryHandle: return self._handle.repository_handle @@ -1026,9 +1032,13 @@ def has_partition_name_data(self) -> bool: # names return self._external_partition_set_data.external_partitions_data is not None + def get_partitions_definition(self) -> PartitionsDefinition: + return ( + self._external_partition_set_data.external_partitions_data.get_partitions_definition() # type: ignore + ) + def get_partition_names(self, instance: DagsterInstance) -> Sequence[str]: check.invariant(self.has_partition_name_data()) - partitions = ( - self._external_partition_set_data.external_partitions_data.get_partitions_definition() # type: ignore + return self.get_partitions_definition().get_partition_keys( + dynamic_partitions_store=instance ) - return partitions.get_partition_keys(dynamic_partitions_store=instance) diff --git a/python_modules/dagster/dagster/_core/remote_representation/external_data.py b/python_modules/dagster/dagster/_core/remote_representation/external_data.py index dbe9f76b8fa27..59346f57c74e4 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external_data.py @@ -903,6 +903,7 @@ class PartitionSetSnap( ("op_selection", Optional[Sequence[str]]), ("mode", Optional[str]), ("external_partitions_data", Optional[ExternalPartitionsDefinitionData]), + ("backfill_policy", Optional[BackfillPolicy]), ], ) ): @@ -913,6 +914,7 @@ def __new__( op_selection: Optional[Sequence[str]], mode: Optional[str], external_partitions_data: Optional[ExternalPartitionsDefinitionData] = None, + backfill_policy: Optional[BackfillPolicy] = None, ): return super(PartitionSetSnap, cls).__new__( cls, @@ -925,6 +927,9 @@ def __new__( "external_partitions_data", ExternalPartitionsDefinitionData, ), + backfill_policy=check.opt_inst_param( + backfill_policy, "backfill_policy", BackfillPolicy + ), ) @classmethod @@ -955,6 +960,7 @@ def from_job_def(cls, job_def: JobDefinition) -> Self: op_selection=None, mode=DEFAULT_MODE_NAME, external_partitions_data=partitions_def_data, + backfill_policy=job_def.backfill_policy, ) @@ -999,7 +1005,7 @@ def __new__(cls, name: str, tags: Optional[Mapping[str, str]] = None): class ExternalPartitionExecutionParamData( NamedTuple( "_ExternalPartitionExecutionParamData", - [("name", str), ("tags", Mapping[str, object]), ("run_config", Mapping[str, object])], + [("name", str), ("tags", Mapping[str, str]), ("run_config", Mapping[str, object])], ) ): def __new__(cls, name: str, tags: Mapping[str, str], run_config: Mapping[str, object]): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py index 6b3c2a28c47b0..64e9943e61459 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py @@ -6,6 +6,7 @@ AssetOut, AssetsDefinition, AssetSelection, + BackfillPolicy, DagsterEventType, DailyPartitionsDefinition, HourlyPartitionsDefinition, @@ -814,3 +815,32 @@ def b(): job1.execute_in_process(raise_on_error=False) assert tries == {"a": 3, "b": 4} + + +def test_backfill_policy(): + partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d"]) + + @asset(partitions_def=partitions_def, backfill_policy=BackfillPolicy.single_run()) + def foo(): ... + + @asset(partitions_def=partitions_def, backfill_policy=BackfillPolicy.single_run()) + def bar(): ... + + @asset(partitions_def=partitions_def, backfill_policy=BackfillPolicy.multi_run(2)) + def baz(): ... + + assert create_test_asset_job([foo, bar]).backfill_policy == BackfillPolicy.single_run() + assert create_test_asset_job([baz]).backfill_policy == BackfillPolicy.multi_run(2) + + # different backfill policies + with pytest.raises(DagsterInvalidDefinitionError, match="BackfillPolicy"): + create_test_asset_job([foo, bar, baz]) + + # can't do PartitionedConfig for single-run backfills + with pytest.raises(DagsterInvalidDefinitionError, match="PartitionedConfig"): + + @static_partitioned_config(partition_keys=partitions_def.get_partition_keys()) + def my_partitioned_config(partition_key: str): + return {"ops": {"foo": {"config": {"partition": partition_key}}}} + + create_test_asset_job([foo], config=my_partitioned_config) diff --git a/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_active_data.ambr b/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_active_data.ambr index 655f38c2539e2..232dc14a513a3 100644 --- a/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_active_data.ambr +++ b/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_active_data.ambr @@ -1132,6 +1132,7 @@ "external_partition_set_datas": [ { "__class__": "ExternalPartitionSetData", + "backfill_policy": null, "external_partitions_data": { "__class__": "ExternalTimeWindowPartitionsDefinitionData", "cron_schedule": "15 0 * * *", diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 0cdc3aff5fd10..dd1b9b76f86b5 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -11,9 +11,11 @@ from dagster import ( AllPartitionMapping, Any, + AssetExecutionContext, AssetIn, AssetKey, AssetsDefinition, + Config, DagsterEventType, DagsterInstance, DailyPartitionsDefinition, @@ -339,6 +341,34 @@ def asset_with_multi_run_backfill_policy(): pass +asset_job_partitions = StaticPartitionsDefinition(["a", "b", "c", "d"]) + + +class BpSingleRunConfig(Config): + name: str + + +@asset(partitions_def=asset_job_partitions, backfill_policy=BackfillPolicy.single_run()) +def bp_single_run(context: AssetExecutionContext): + return {k: 1 for k in context.partition_keys} + + +@asset(partitions_def=asset_job_partitions, backfill_policy=BackfillPolicy.single_run()) +def bp_single_run_config(context: AssetExecutionContext, config: BpSingleRunConfig): + context.log.info(config.name) + return {k: 1 for k in context.partition_keys} + + +@asset(partitions_def=asset_job_partitions, backfill_policy=BackfillPolicy.multi_run(2)) +def bp_multi_run(context: AssetExecutionContext): + return {k: 1 for k in context.partition_keys} + + +@asset(partitions_def=asset_job_partitions) +def bp_none(context: AssetExecutionContext): + return 1 + + @repository def the_repo(): return [ @@ -371,6 +401,25 @@ def the_repo(): asset_g, asset_with_single_run_backfill_policy, asset_with_multi_run_backfill_policy, + bp_single_run, + bp_single_run_config, + bp_multi_run, + bp_none, + define_asset_job( + "bp_single_run_asset_job", + selection=[bp_single_run_config, bp_single_run], + tags={"alpha": "beta"}, + config={"ops": {"bp_single_run_config": {"config": {"name": "harry"}}}}, + ), + define_asset_job( + "bp_multi_run_asset_job", + selection=[bp_multi_run], + tags={"alpha": "beta"}, + ), + define_asset_job( + "bp_none_asset_job", + selection=[bp_none], + ), ] @@ -1595,6 +1644,83 @@ def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress( ) in error_msg +def _get_asset_job_backfill(external_repo: ExternalRepository, job_name: str) -> PartitionBackfill: + external_partition_set = external_repo.get_external_partition_set(f"{job_name}_partition_set") + return PartitionBackfill( + backfill_id="simple", + partition_set_origin=external_partition_set.get_external_origin(), + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c", "d"], + from_failure=False, + reexecution_steps=None, + tags=None, + backfill_timestamp=pendulum.now().timestamp(), + ) + + +def test_asset_job_backfill_single_run( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + external_repo: ExternalRepository, +): + backfill = _get_asset_job_backfill(external_repo, "bp_single_run_asset_job") + assert instance.get_runs_count() == 0 + instance.add_backfill(backfill) + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + + assert instance.get_runs_count() == 1 + run = instance.get_runs()[0] + assert run.tags[BACKFILL_ID_TAG] == "simple" + assert run.tags[ASSET_PARTITION_RANGE_START_TAG] == "a" + assert run.tags[ASSET_PARTITION_RANGE_END_TAG] == "d" + assert run.tags["alpha"] == "beta" + + +def test_asset_job_backfill_multi_run( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + external_repo: ExternalRepository, +): + backfill = _get_asset_job_backfill(external_repo, "bp_multi_run_asset_job") + assert instance.get_runs_count() == 0 + instance.add_backfill(backfill) + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + + assert instance.get_runs_count() == 2 + run_1, run_2 = instance.get_runs() + + assert run_1.tags[BACKFILL_ID_TAG] == "simple" + assert run_1.tags[ASSET_PARTITION_RANGE_START_TAG] == "c" + assert run_1.tags[ASSET_PARTITION_RANGE_END_TAG] == "d" + + assert run_2.tags[BACKFILL_ID_TAG] == "simple" + assert run_2.tags[ASSET_PARTITION_RANGE_START_TAG] == "a" + assert run_2.tags[ASSET_PARTITION_RANGE_END_TAG] == "b" + + +def test_asset_job_backfill_default( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + external_repo: ExternalRepository, +): + backfill = _get_asset_job_backfill(external_repo, "bp_none_asset_job") + assert instance.get_runs_count() == 0 + instance.add_backfill(backfill) + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + + assert instance.get_runs_count() == 4 + run_1, run_2, run_3, run_4 = instance.get_runs() + + assert run_1.tags[BACKFILL_ID_TAG] == "simple" + assert run_1.tags[PARTITION_NAME_TAG] == "d" + assert run_2.tags[BACKFILL_ID_TAG] == "simple" + assert run_2.tags[PARTITION_NAME_TAG] == "c" + assert run_3.tags[BACKFILL_ID_TAG] == "simple" + assert run_3.tags[PARTITION_NAME_TAG] == "b" + assert run_4.tags[BACKFILL_ID_TAG] == "simple" + assert run_4.tags[PARTITION_NAME_TAG] == "a" + + def test_asset_backfill_with_single_run_backfill_policy( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ):