Skip to content

Commit

Permalink
re-revert "remove get_implicit_job_def_for_assets (#23372)" (#23798)
Browse files Browse the repository at this point in the history
This reverts commit a0fe2a8.

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
sryza authored Aug 21, 2024
1 parent 5844789 commit 6d78319
Show file tree
Hide file tree
Showing 21 changed files with 126 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def test_databricks_asset(databricks_client, capsys):
script_file=script_file,
dbfs_path="dbfs:/my_python_script.py",
) as script_file:
job_def = databricks_asset_defs.get_implicit_global_asset_job_def()
job_def = databricks_asset_defs.get_implicit_job_def_for_assets(
[AssetKey("databricks_asset")],
)
assert job_def
result = job_def.execute_in_process()
assert result.success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
repository,
)
from dagster._config.pythonic_config import Config
from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME
from dagster._core.definitions.data_version import DATA_VERSION_TAG, DataVersion
from dagster._core.definitions.decorators.source_asset_decorator import observable_source_asset
from dagster._core.definitions.definitions_class import Definitions
Expand Down Expand Up @@ -359,7 +358,7 @@ def _materialize_assets(
else None
)
selector = infer_job_selector(
context, IMPLICIT_ASSET_JOB_NAME, asset_selection=gql_asset_selection
context, repo.get_implicit_asset_job_names()[0], asset_selection=gql_asset_selection
)
if partition_keys:
results = []
Expand All @@ -382,9 +381,7 @@ def _materialize_assets(
return results
else:
selector = infer_job_selector(
context,
IMPLICIT_ASSET_JOB_NAME,
asset_selection=gql_asset_selection,
context, repo.get_implicit_asset_job_names()[0], asset_selection=gql_asset_selection
)
return execute_dagster_graphql(
context,
Expand All @@ -399,7 +396,7 @@ def _materialize_assets(


def _fetch_data_versions(context: WorkspaceRequestContext, repo: RepositoryDefinition):
selector = infer_job_selector(context, repo.get_implicit_global_asset_job_def().name)
selector = infer_job_selector(context, repo.get_implicit_asset_job_names()[0])
return execute_dagster_graphql(
context,
GET_ASSET_DATA_VERSIONS,
Expand Down
10 changes: 9 additions & 1 deletion python_modules/dagster/dagster/_cli/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str,
asset_selection = AssetSelection.from_coercible(kwargs["select"].split(","))
asset_keys = asset_selection.resolve(repo_def.asset_graph)

implicit_job_def = repo_def.get_implicit_global_asset_job_def()
implicit_job_def = repo_def.get_implicit_job_def_for_assets(asset_keys)
# If we can't find an implicit job with all the given assets, it's because they couldn't be
# placed into the same implicit job, because of their conflicting PartitionsDefinitions.
if implicit_job_def is None:
raise DagsterInvalidSubsetError(
"All selected assets must share the same PartitionsDefinition or have no"
" PartitionsDefinition"
)

reconstructable_job = recon_job_from_origin(
JobPythonOrigin(implicit_job_def.name, repository_origin=repository_origin)
)
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
from .asset_check_spec import AssetCheckSpec


def is_base_asset_job_name(name: str) -> bool:
return name == IMPLICIT_ASSET_JOB_NAME


def get_base_asset_job_lambda(
asset_graph: AssetGraph,
resource_defs: Optional[Mapping[str, ResourceDefinition]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,22 @@ def get_all_job_defs(self) -> Sequence[JobDefinition]:
"""
return self.get_repository_def().get_all_jobs()

def has_implicit_global_asset_job_def(self) -> bool:
return self.get_repository_def().has_implicit_global_asset_job_def()

def get_implicit_global_asset_job_def(self) -> JobDefinition:
"""A useful conveninence method when there is a single defined global asset job.
This occurs when all assets in the code location use a single partitioning scheme.
If there are multiple partitioning schemes you must use get_implicit_job_def_for_assets
instead to access to the correct implicit asset one.
"""
return self.get_repository_def().get_implicit_global_asset_job_def()

def get_implicit_job_def_for_assets(
self, asset_keys: Iterable[AssetKey]
) -> Optional[JobDefinition]:
return self.get_repository_def().get_implicit_job_def_for_assets(asset_keys)

def get_assets_def(self, key: CoercibleToAssetKey) -> AssetsDefinition:
asset_key = AssetKey.from_coercible(key)
for assets_def in self.get_asset_graph().assets_defs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import dagster._check as check
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
from dagster._core.definitions.utils import DEFAULT_GROUP_NAME
from dagster._core.remote_representation.external import ExternalRepository
from dagster._core.remote_representation.handle import RepositoryHandle

from .backfill_policy import BackfillPolicy
Expand Down Expand Up @@ -355,6 +357,18 @@ def get_materialization_asset_keys_for_job(self, job_name: str) -> Sequence[Asse
if job_name in self.get_materialization_job_names(k)
]

def get_implicit_job_name_for_assets(
self,
asset_keys: Iterable[AssetKey],
external_repo: Optional[ExternalRepository],
) -> Optional[str]:
"""Returns the name of the asset base job that contains all the given assets, or None if there is no such
job.
Note: all asset_keys should be in the same repository.
"""
return IMPLICIT_ASSET_JOB_NAME

def split_asset_keys_by_repository(
self, asset_keys: AbstractSet[AssetKey]
) -> Sequence[AbstractSet[AssetKey]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
)
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME, get_base_asset_job_lambda
from dagster._core.definitions.asset_job import (
IMPLICIT_ASSET_JOB_NAME,
get_base_asset_job_lambda,
is_base_asset_job_name,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_sensor_definition import (
AutomationConditionSensorDefinition,
Expand Down Expand Up @@ -190,7 +194,7 @@ def build_caching_repository_data_from_list(
raise DagsterInvalidDefinitionError(
f"Duplicate job definition found for {definition.describe_target()}"
)
if definition.name == IMPLICIT_ASSET_JOB_NAME:
if is_base_asset_job_name(definition.name):
raise DagsterInvalidDefinitionError(
f"Attempted to provide job called {definition.name} to repository, which "
"is a reserved name. Please rename the job."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,20 @@ def asset_checks_defs_by_key(self) -> Mapping[AssetKey, "AssetChecksDefinition"]
"""Mapping[AssetCheckKey, AssetChecksDefinition]: The assets checks defined in the repository."""
return self._repository_data.get_asset_checks_defs_by_key()

def has_implicit_global_asset_job_def(self) -> bool:
return self.has_job(IMPLICIT_ASSET_JOB_NAME)

def get_implicit_global_asset_job_def(self) -> JobDefinition:
return self.get_job(IMPLICIT_ASSET_JOB_NAME)

def get_implicit_asset_job_names(self) -> Sequence[str]:
return [IMPLICIT_ASSET_JOB_NAME]

def get_implicit_job_def_for_assets(
self, asset_keys: Iterable[AssetKey]
) -> Optional[JobDefinition]:
return self.get_job(IMPLICIT_ASSET_JOB_NAME)

def get_maybe_subset_job_def(
self,
job_name: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1393,10 +1393,10 @@ def _run_requests_with_base_asset_jobs(
else:
asset_keys = outer_asset_selection.resolve(asset_graph)

base_job = check.not_none(context.repository_def).get_implicit_global_asset_job_def()
base_job = context.repository_def.get_implicit_job_def_for_assets(asset_keys) # type: ignore # (possible none)
result.append(
run_request.with_replaced_attrs(
job_name=base_job.name,
job_name=base_job.name, # type: ignore # (possible none)
asset_selection=list(asset_keys),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ class RunRequestExecutionData(NamedTuple):
external_execution_plan: ExternalExecutionPlan


def _get_implicit_job_name_for_assets(
asset_graph: RemoteAssetGraph, asset_keys: Sequence[AssetKey]
) -> Optional[str]:
job_names = set(asset_graph.get_materialization_job_names(asset_keys[0]))
for asset_key in asset_keys[1:]:
job_names &= set(asset_graph.get_materialization_job_names(asset_key))

return next((job_name for job_name in job_names if job_name == IMPLICIT_ASSET_JOB_NAME), None)


def _get_execution_plan_asset_keys(
execution_plan_snapshot: ExecutionPlanSnapshot,
) -> AbstractSet[AssetKey]:
Expand All @@ -49,14 +59,22 @@ def _get_job_execution_data_from_run_request(
cast(Sequence[AssetKey], run_request.asset_selection)[0]
)
location_name = repo_handle.code_location_origin.location_name
job_name = _get_implicit_job_name_for_assets(
asset_graph, cast(Sequence[AssetKey], run_request.asset_selection)
)
if job_name is None:
check.failed(
"Could not find an implicit asset job for the given assets:"
f" {run_request.asset_selection}"
)

if not run_request.asset_selection:
check.failed("Expected RunRequest to have an asset selection")

pipeline_selector = JobSubsetSelector(
location_name=location_name,
repository_name=repo_handle.repository_name,
job_name=IMPLICIT_ASSET_JOB_NAME,
job_name=job_name,
asset_selection=run_request.asset_selection,
asset_check_selection=run_request.asset_check_keys,
op_selection=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
)
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_job import IMPLICIT_ASSET_JOB_NAME
from dagster._core.definitions.asset_job import is_base_asset_job_name
from dagster._core.definitions.asset_sensor_definition import AssetSensorDefinition
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
Expand Down Expand Up @@ -516,9 +516,10 @@ def from_def(cls, sensor_def: SensorDefinition, repository_def: RepositoryDefini

if sensor_def.asset_selection is not None:
target_dict = {
IMPLICIT_ASSET_JOB_NAME: ExternalTargetData(
job_name=IMPLICIT_ASSET_JOB_NAME, mode=DEFAULT_MODE_NAME, op_selection=None
base_asset_job_name: ExternalTargetData(
job_name=base_asset_job_name, mode=DEFAULT_MODE_NAME, op_selection=None
)
for base_asset_job_name in repository_def.get_implicit_asset_job_names()
}

serializable_asset_selection = (
Expand Down Expand Up @@ -1209,7 +1210,7 @@ def _get_resource_job_usage(job_defs: Sequence[JobDefinition]) -> ResourceJobUsa

for job_def in job_defs:
job_name = job_def.name
if job_name == IMPLICIT_ASSET_JOB_NAME:
if is_base_asset_job_name(job_name):
continue

resource_usage: List[NodeHandleResourceUse] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ def myconfig(start, _end):
CheckError,
match=(
"Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition"
" than supplied for"
" than supplied for 'partitions_def'."
),
):
define_asset_job("job", config=myconfig).resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def baz():
return 1

with instance_for_test() as instance:
job_def = Definitions(assets=[foo, bar, baz]).get_implicit_global_asset_job_def()
job_def = Definitions(assets=[foo, bar, baz]).get_implicit_job_def_for_assets([foo.key])

# If the asset selection contains any materializable assets, source assets observations will not run
job_def.execute_in_process(partition_key="A", instance=instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def prior_repo():
)
for run_request in run_requests:
instance.create_run_for_job(
prior_repo.get_implicit_global_asset_job_def(),
prior_repo.get_implicit_job_def_for_assets(run_request.asset_selection),
asset_selection=set(run_request.asset_selection),
tags=run_request.tags,
)
Expand Down Expand Up @@ -416,7 +416,7 @@ def test_time_fn():
).evaluate()

for run_request in run_requests:
base_job = repo.get_implicit_global_asset_job_def()
base_job = repo.get_implicit_job_def_for_assets(run_request.asset_selection)
assert base_job is not None

return run_requests, cursor, evaluations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ def _with_run_with_status_for_assets(
) -> Self:
run_id = make_new_run_id()
with freeze_time(self.current_time):
job_def = self.scenario_spec.defs.get_implicit_global_asset_job_def()
job_def = self.scenario_spec.defs.get_implicit_job_def_for_assets(
asset_keys=list(asset_keys)
)
assert job_def
execution_plan = create_execution_plan(job_def, run_config={})
self.instance.create_run_for_job(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def _evaluate_tick_fast(
# make sure these run requests are available on the instance
for request in new_run_requests:
asset_selection = check.not_none(request.asset_selection)
job_def = self.scenario_spec.defs.get_implicit_global_asset_job_def()
job_def = self.scenario_spec.defs.get_implicit_job_def_for_assets(asset_selection)
self.instance.create_run_for_job(
job_def=check.not_none(job_def),
asset_selection=set(asset_selection),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def asset_one():

defs = Definitions(assets=[asset_one])

assert defs.get_implicit_global_asset_job_def()
assert defs.has_implicit_global_asset_job_def()
assert len(defs.get_all_job_defs()) == 1


Expand All @@ -496,7 +496,7 @@ def asset_one():

defs = Definitions(assets=[asset_one], jobs=[define_asset_job("all_assets_job", selection="*")])

assert defs.get_implicit_global_asset_job_def()
assert defs.has_implicit_global_asset_job_def()
assert defs.get_job_def("all_assets_job")
assert defs.get_job_def("all_assets_job") is not defs.get_implicit_global_asset_job_def()

Expand Down Expand Up @@ -534,6 +534,8 @@ def downstream_of_source(source_asset):
defs = Definitions(assets=[source_asset, downstream_of_source])
assert defs.get_all_job_defs()
assert len(defs.get_all_job_defs()) == 1
assert defs.get_implicit_job_def_for_assets(asset_keys=[AssetKey("downstream_of_source")])
assert defs.has_implicit_global_asset_job_def()
assert defs.get_implicit_global_asset_job_def()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import AbstractSet, Iterable

import pytest
from dagster import (
AssetExecutionContext,
Expand All @@ -8,6 +10,7 @@
DataVersion,
Definitions,
IOManager,
JobDefinition,
SourceAsset,
_check as check,
asset,
Expand Down Expand Up @@ -192,6 +195,21 @@ def an_asset(source_asset: str) -> str:
assert result_two.output_for_node("an_asset") == "hardcoded-computed"


def get_job_for_assets(defs: Definitions, *coercibles_or_defs) -> JobDefinition:
job_def = defs.get_implicit_job_def_for_assets(set_from_coercibles_or_defs(coercibles_or_defs))
assert job_def, "Expected to find a job def"
return job_def


def set_from_coercibles_or_defs(coercibles_or_defs: Iterable) -> AbstractSet["AssetKey"]:
return set(
[
AssetKey.from_coercible_or_definition(coercible_or_def)
for coercible_or_def in coercibles_or_defs
]
)


def test_how_partitioned_source_assets_are_backwards_compatible() -> None:
class DummyIOManager(IOManager):
def handle_output(self, context, obj) -> None:
Expand All @@ -216,7 +234,7 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

instance = DagsterInstance.ephemeral()

job_def_without_shim = defs_with_source.get_implicit_global_asset_job_def()
job_def_without_shim = get_job_for_assets(defs_with_source, an_asset)

result_one = job_def_without_shim.execute_in_process(
instance=instance, partition_key="2021-01-02"
Expand All @@ -230,7 +248,7 @@ def an_asset(context: AssetExecutionContext, source_asset: str) -> str:

assert isinstance(defs_with_shim.get_assets_def("source_asset"), AssetsDefinition)

job_def_with_shim = defs_with_shim.get_implicit_global_asset_job_def()
job_def_with_shim = get_job_for_assets(defs_with_shim, an_asset)

result_two = job_def_with_shim.execute_in_process(
instance=instance,
Expand Down
Loading

1 comment on commit 6d78319

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-hmrw8if80-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 6d78319.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.