Skip to content

Commit

Permalink
Adjust asset job backfills to respect backfill policies (dagster-io#2…
Browse files Browse the repository at this point in the history
…1259)

## Summary & Motivation

Addresses issues discussed
[here](dagster-io/internal#7417).

This PR adds a `backfill_policy` to `JobDefinition`, which is then
accessed in the backfill daemon via `ExternalPartitionSet`. This allows
the backfill daemon to execute a job backfill (distinct from an asset
backfill) as a ranged backfill, if that is what is specified by the
policy. Details:

- Only asset jobs can have a non-null backfill policy. Non-asset jobs
have a `None` `backfill_policy`, which means status quo behavior is
preserved.
- All assets in an asset job must have the same backfill policy. This
matches the existing constraint that they must all have the same
partitions definition. An error is thrown at asset job resolution for
differing backfill policies. This means that all selected assets in a
job will always be able to executed together in a single run, which is a
tighter constraint than asset backfills have, but is consistent with how
jobs work in general.
- Passing a `PartitionedConfig` as the config for an assets job will
cause a resolution error if the job backfill policy specifies ranged
backfills. That is because `PartitionedConfig` behavior is currently
undefined for ranges (this can be addressed in a future PR). Passing a
static config still works.

Other notes:

- I first explored adding support for backfill policies on asset jobs by
modifying the asset backfill pathway to accept a job name. This ended up
being over-complicated and brittle. The further approach does not add
any new dependencies on the "job" concept, which is good as we want to
move away from the centrality of jobs.

## How I Tested These Changes

Manually and with new unit tests.

One snapshot updated to account for new backfill policy property on
ExternalPartitionSet.

Manual details:

Load these definitions:

```
from dagster import (
    AssetExecutionContext,
    BackfillPolicy,
    Config,
    Definitions,
    IOManager,
    StaticPartitionsDefinition,
    asset,
    define_asset_job,
)

parts = StaticPartitionsDefinition(["a", "b"])


class DummyIOManager(IOManager):
    def __init__(self):
        super().__init__()
        self._db = {}

    def handle_output(self, context, obj) -> None:
        pass

    def load_input(self, context) -> int:
        return 1


class FooConfig(Config):
    name: str


@asset(partitions_def=parts, backfill_policy=BackfillPolicy.single_run())
def foo(context: AssetExecutionContext, config: FooConfig):
    context.log.info(config.name)
    return {"a": 1, "b": 2}


@asset(partitions_def=parts, backfill_policy=BackfillPolicy.single_run())
def bar(context: AssetExecutionContext):
    return {"a": 1, "b": 2}


asset_job = define_asset_job(
    "asset_job",
    [foo, bar],
    tags={"alpha": "beta"},
    config={"ops": {"foo": {"config": {"name": "harry"}}}},
)

defs = Definitions(
    assets=[foo, bar],
    jobs=[asset_job,],
    resources={"io_manager": DummyIOManager()},
)
```

Select the `asset_job` in the UI

<img width="1456" alt="image"
src="https://github.com/dagster-io/dagster/assets/1531373/b0f7a47e-b46e-43f5-bdfc-41dc9460cb46">

Click "Materialize all" to launch a backfill. Note that it advises that
backfill policies will be respected. This is false (before this PR):

<img width="797" alt="image"
src="https://github.com/dagster-io/dagster/assets/1531373/2575738b-01ae-4c9f-93d9-7acfa1993b8d">

Result before this PR (2 runs launched!):

<img width="1328" alt="image"
src="https://github.com/dagster-io/dagster/assets/1531373/347fff85-1fd8-4cb8-a35c-9806113f8b4e">

Result after this PR (only 1 run launched):

<img width="1328" alt="image"
src="https://github.com/dagster-io/dagster/assets/1531373/941f1a24-5948-4347-99a8-8f43887b28fd">
  • Loading branch information
smackesey authored and danielgafni committed Jun 18, 2024
1 parent 9c51b87 commit 9c9b2fe
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def create_and_launch_partition_backfill(
graphene_info.context.instance,
create_workspace=lambda: graphene_info.context,
backfill_job=backfill,
partition_names=chunk,
partition_names_or_ranges=chunk,
)
if run_id is not None
)
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,9 @@ def _execute_backfill_command_at_location(
external_job,
job_partition_set,
backfill_job,
partition_data,
partition_data.name,
partition_data.tags,
partition_data.run_config,
)
if dagster_run:
instance.submit_run(dagster_run.run_id, workspace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import warnings
from datetime import datetime
from functools import update_wrapper
from functools import cached_property, update_wrapper
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand All @@ -26,6 +26,7 @@
from dagster._config.validate import validate_config
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.dependency import (
Node,
NodeHandle,
Expand Down Expand Up @@ -416,6 +417,20 @@ def partitions_def(self) -> Optional[PartitionsDefinition]:
"""
return None if not self.partitioned_config else self.partitioned_config.partitions_def

@cached_property
def backfill_policy(self) -> Optional[BackfillPolicy]:
from dagster._core.definitions.asset_job import ASSET_BASE_JOB_PREFIX

backfill_policies = {
self.asset_layer.get(k).backfill_policy for k in self.asset_layer.executable_asset_keys
}
if not self.name.startswith(ASSET_BASE_JOB_PREFIX) and not backfill_policies:
check.invariant(
len(backfill_policies) <= 1,
"All assets in non-asset base job a job must have the same backfill policy.",
)
return next(iter(backfill_policies), None)

@property
def hook_defs(self) -> AbstractSet[HookDefinition]:
return self._hook_defs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import dagster._check as check
from dagster._annotations import deprecated
from dagster._core.definitions import AssetKey
from dagster._core.definitions.asset_job import build_asset_job, get_asset_graph_for_job
from dagster._core.definitions.asset_job import (
build_asset_job,
get_asset_graph_for_job,
)
from dagster._core.definitions.run_request import RunRequest
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.instance import DynamicPartitionsStore

from .config import ConfigMapping
from .metadata import RawMetadataValue
from .partition import PartitionedConfig
from .policy import RetryPolicy

if TYPE_CHECKING:
Expand All @@ -19,7 +23,6 @@
ExecutorDefinition,
HookDefinition,
JobDefinition,
PartitionedConfig,
PartitionsDefinition,
ResourceDefinition,
)
Expand Down Expand Up @@ -187,6 +190,32 @@ def resolve(
f'Error resolving selection for asset job "{self.name}": {e}'
) from e

# Require that all assets in the job have the same backfill policy
backfill_policies = {
job_asset_graph.get(k).backfill_policy for k in job_asset_graph.executable_asset_keys
}
if len(backfill_policies) > 1:
raise DagsterInvalidDefinitionError(
f"Asset job {self.name} materializes asset with varying BackfillPolicies. All assets"
" in a job must share the same BackfillPolicy."
)

# Error if a PartitionedConfig is defined and any target asset has a backfill policy that
# materializes anything other than a single partition per run. This is because
# PartitionedConfig is a function that maps single partition keys to run config, so it's
# behavior is undefined for multiple-partition runs.
backfill_policy = next(iter(backfill_policies), None)
if (
backfill_policy
and backfill_policy.max_partitions_per_run != 1
and isinstance(self.config, PartitionedConfig)
):
raise DagsterInvalidDefinitionError(
f"Asset job {self.name} materializes an asset with a BackfillPolicy targeting multiple partitions per run,"
"but a PartitionedConfig was provided. PartitionedConfigs are not supported for "
"jobs with multi-partition-per-run backfill policies."
)

return build_asset_job(
self.name,
asset_graph=job_asset_graph,
Expand Down
Loading

0 comments on commit 9c9b2fe

Please sign in to comment.