Skip to content

Commit

Permalink
[external-assets] Change build_assets_job interface to use assets_to_…
Browse files Browse the repository at this point in the history
…execute
  • Loading branch information
smackesey committed Feb 13, 2024
1 parent 046bc7f commit 19f636e
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1335,8 +1335,8 @@ def never_runs_asset(

hanging_job = build_assets_job(
name="hanging_job",
source_assets=[dummy_source_asset],
assets=[first_asset, hanging_asset, never_runs_asset],
other_assets=[dummy_source_asset],
assets_to_execute=[first_asset, hanging_asset, never_runs_asset],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
"hanging_asset_resource": hanging_asset_resource,
Expand Down Expand Up @@ -1383,7 +1383,7 @@ def downstream_asset(hanging_graph):

hanging_graph_asset_job = build_assets_job(
name="hanging_graph_asset_job",
assets=[hanging_graph_asset, downstream_asset],
assets_to_execute=[hanging_graph_asset, downstream_asset],
resource_defs={
"hanging_asset_resource": hanging_asset_resource,
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1401,7 +1401,7 @@ def asset_two(asset_one):
return asset_one + 1


two_assets_job = build_assets_job(name="two_assets_job", assets=[asset_one, asset_two])
two_assets_job = build_assets_job(name="two_assets_job", assets_to_execute=[asset_one, asset_two])


@asset
Expand All @@ -1412,7 +1412,7 @@ def executable_asset() -> None:
unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")])))

executable_test_job = build_assets_job(
name="executable_test_job", assets=[executable_asset, unexecutable_asset]
name="executable_test_job", assets_to_execute=[executable_asset, unexecutable_asset]
)

static_partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"])
Expand Down Expand Up @@ -1455,7 +1455,7 @@ def downstream_dynamic_partitioned_asset(

dynamic_partitioned_assets_job = build_assets_job(
"dynamic_partitioned_assets_job",
assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
assets_to_execute=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset],
)


Expand Down Expand Up @@ -1529,7 +1529,7 @@ def yield_partition_materialization():

partition_materialization_job = build_assets_job(
"partition_materialization_job",
assets=[yield_partition_materialization],
assets_to_execute=[yield_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1543,7 +1543,7 @@ def fail_partition_materialization(context):

fail_partition_materialization_job = build_assets_job(
"fail_partition_materialization_job",
assets=[fail_partition_materialization],
assets_to_execute=[fail_partition_materialization],
executor_def=in_process_executor,
)

Expand All @@ -1562,7 +1562,7 @@ def hanging_partition_asset(context):

hanging_partition_asset_job = build_assets_job(
"hanging_partition_asset_job",
assets=[hanging_partition_asset],
assets_to_execute=[hanging_partition_asset],
executor_def=in_process_executor,
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()),
Expand All @@ -1580,7 +1580,7 @@ def asset_yields_observation():

observation_job = build_assets_job(
"observation_job",
assets=[asset_yields_observation],
assets_to_execute=[asset_yields_observation],
executor_def=in_process_executor,
)

Expand Down
25 changes: 13 additions & 12 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,23 +894,24 @@ def build_asset_selection_job(
f"{partitions_def}.",
)

if len(included_assets) or len(included_checks_defs) > 0:
# Job materializes assets and/or executes checks
final_assets = included_assets
final_asset_checks = included_checks_defs
final_source_assets = [*source_assets, *excluded_assets]
else:
# Job only observes source assets
final_assets = []
final_asset_checks = []
final_source_assets = included_source_assets
included_observables = [asset for asset in included_source_assets if asset.is_observable]
final_assets = [*included_assets, *included_observables]
final_asset_checks = included_checks_defs
final_source_assets = [
*(
source_asset
for source_asset in source_assets
if source_asset not in included_observables
),
*excluded_assets,
]

return build_assets_job(
name=name,
assets=final_assets,
assets_to_execute=final_assets,
asset_checks=final_asset_checks,
config=config,
source_assets=final_source_assets,
other_assets=final_source_assets,
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
Expand Down
93 changes: 49 additions & 44 deletions python_modules/dagster/dagster/_core/definitions/assets_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,60 +68,64 @@ def get_base_asset_jobs(
resource_defs: Optional[Mapping[str, ResourceDefinition]],
executor_def: Optional[ExecutorDefinition],
) -> Sequence[JobDefinition]:
assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[AssetsDefinition]
executable_assets = [
*(ad for ad in assets if ad.is_executable),
*(sa for sa in source_assets if sa.is_observable),
]
unexecutable_assets = [
*(ad for ad in assets if not ad.is_executable),
*(sa for sa in source_assets if not sa.is_observable),
]

executable_assets_by_partitions_def: Dict[
Optional[PartitionsDefinition], List[Union[AssetsDefinition, SourceAsset]]
] = defaultdict(list)
for assets_def in assets:
assets_by_partitions_def[assets_def.partitions_def].append(assets_def)

# We need to create "empty" jobs for each partitions def that is used by an observable but no
# materializable asset. They are empty because we don't assign the source asset to the `assets`,
# but rather the `source_assets` argument of `build_assets_job`.
for observable in [sa for sa in source_assets if sa.is_observable]:
if observable.partitions_def not in assets_by_partitions_def:
assets_by_partitions_def[observable.partitions_def] = []
if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}:
for asset in executable_assets:
executable_assets_by_partitions_def[asset.partitions_def].append(asset)
# sort to ensure some stability in the ordering
all_partitions_defs = sorted(
[p for p in executable_assets_by_partitions_def.keys() if p], key=repr
)

if len(all_partitions_defs) == 0:
return [
build_assets_job(
name=ASSET_BASE_JOB_PREFIX,
assets=assets,
assets_to_execute=executable_assets,
other_assets=unexecutable_assets,
asset_checks=asset_checks,
source_assets=source_assets,
executor_def=executor_def,
resource_defs=resource_defs,
)
]
else:
unpartitioned_assets = assets_by_partitions_def.get(None, [])
partitioned_assets_by_partitions_def = {
k: v for k, v in assets_by_partitions_def.items() if k is not None
}
unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, [])
jobs = []

# sort to ensure some stability in the ordering
for i, (partitions_def, assets_with_partitions) in enumerate(
sorted(partitioned_assets_by_partitions_def.items(), key=lambda item: repr(item[0]))
):
# all partition base jobs contain all unpartitioned assets
for i, partitions_def in enumerate(all_partitions_defs):
partitioned_executable_assets = executable_assets_by_partitions_def[partitions_def]
assets_to_execute = [*partitioned_executable_assets, *unpartitioned_executable_assets]
jobs.append(
build_assets_job(
f"{ASSET_BASE_JOB_PREFIX}_{i}",
assets=[*assets_with_partitions, *unpartitioned_assets],
source_assets=[*source_assets, *assets],
assets_to_execute=assets_to_execute,
other_assets=[
*(asset for asset in executable_assets if asset not in assets_to_execute),
*unexecutable_assets,
],
asset_checks=asset_checks,
resource_defs=resource_defs,
executor_def=executor_def,
# Only explicitly set partitions_def for observable-only jobs since it can't be
# auto-detected from the passed assets (which is an empty list).
partitions_def=partitions_def if len(assets_with_partitions) == 0 else None,
)
)
return jobs


def build_assets_job(
name: str,
assets: Sequence[AssetsDefinition],
source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
assets_to_execute: Sequence[Union[AssetsDefinition, SourceAsset]],
other_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
resource_defs: Optional[Mapping[str, object]] = None,
description: Optional[str] = None,
Expand Down Expand Up @@ -173,40 +177,41 @@ def asset2(asset1):
from dagster._core.execution.build_resources import wrap_resources_for_execution

check.str_param(name, "name")
check.iterable_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset))
source_assets = check.opt_sequence_param(
source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
check.iterable_param(assets_to_execute, "assets", of_type=(AssetsDefinition, SourceAsset))
other_assets = check.opt_sequence_param(
other_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition)
)
asset_checks = check.opt_sequence_param(
asset_checks, "asset_checks", of_type=AssetChecksDefinition
)
check.opt_str_param(description, "description")
check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resource_defs = check.opt_mapping_param(resource_defs, "resource_defs")
resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs)
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)

assets = [asset for asset in assets_to_execute if isinstance(asset, AssetsDefinition)]
# turn any AssetsDefinitions into SourceAssets
resolved_source_assets: List[SourceAsset] = []
for asset in source_assets or []:
source_assets: List[SourceAsset] = []
for asset in other_assets or []:
if isinstance(asset, AssetsDefinition):
resolved_source_assets += asset.to_source_assets()
source_assets += asset.to_source_assets()
elif isinstance(asset, SourceAsset):
resolved_source_assets.append(asset)
source_assets.append(asset)

# figure out what partitions (if any) exist for this job
partitions_def = partitions_def or build_job_partitions_from_assets(assets)

resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets)
deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
)

# attempt to resolve cycles using multi-asset subsetting
if _has_cycles(deps):
assets = _attempt_resolve_cycles(assets, resolved_source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets)
assets = _attempt_resolve_cycles(assets, source_assets)
resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets)

deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps(
assets, asset_checks, resolved_asset_deps
Expand Down Expand Up @@ -244,14 +249,14 @@ def asset2(asset1):
asset_layer = AssetLayer.from_graph_and_assets_node_mapping(
graph_def=graph,
asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle,
source_assets=resolved_source_assets,
source_assets=source_assets,
resolved_asset_deps=resolved_asset_deps,
assets_defs_by_outer_node_handle=assets_defs_by_node_handle,
observable_source_assets_by_node_handle=observable_source_assets_by_node_handle,
)

all_resource_defs = get_all_resource_defs(
assets, asset_checks, resolved_source_assets, wrapped_resource_defs
assets, asset_checks, source_assets, wrapped_resource_defs
)

if _asset_selection_data:
Expand Down
35 changes: 26 additions & 9 deletions python_modules/dagster/dagster/_core/definitions/observe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import deprecated_param
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import build_assets_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.warnings import disable_dagster_warnings
from dagster._utils.warnings import disable_dagster_warnings, normalize_renamed_param

from ..instance import DagsterInstance
from .source_asset import SourceAsset
Expand All @@ -12,22 +14,27 @@
from ..execution.execute_in_process_result import ExecuteInProcessResult


@deprecated_param(
param="source_assets", breaking_version="2.0", additional_warn_text="Use `assets` instead."
)
def observe(
source_assets: Sequence[SourceAsset],
assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None,
run_config: Any = None,
instance: Optional[DagsterInstance] = None,
resources: Optional[Mapping[str, object]] = None,
partition_key: Optional[str] = None,
raise_on_error: bool = True,
tags: Optional[Mapping[str, str]] = None,
*,
source_assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None,
) -> "ExecuteInProcessResult":
"""Executes a single-threaded, in-process run which observes provided source assets.
"""Executes a single-threaded, in-process run which observes provided observable assets.
By default, will materialize assets to the local filesystem.
Args:
source_assets (Sequence[SourceAsset]):
The source assets to materialize.
assets (Sequence[Union[SourceAsset, AssetsDefinition]]):
The assets to observe. Assets must be observable.
resources (Optional[Mapping[str, object]]):
The resources needed for execution. Can provide resource instances
directly, or resource definitions. Note that if provided resources
Expand All @@ -37,19 +44,29 @@ def observe(
The string partition key that specifies the run config to execute. Can only be used
to select run config for assets with partitioned config.
tags (Optional[Mapping[str, str]]): Tags for the run.
source_assets (Sequence[Union[SourceAsset, AssetsDefinition]]):
The assets to observe.
Returns:
ExecuteInProcessResult: The result of the execution.
"""
source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset))
assets = check.not_none(
normalize_renamed_param(
assets,
"assets",
source_assets,
"source_assets",
)
)
assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset))
instance = check.opt_inst_param(instance, "instance", DagsterInstance)
partition_key = check.opt_str_param(partition_key, "partition_key")
resources = check.opt_mapping_param(resources, "resources", key_type=str)

with disable_dagster_warnings():
observation_job = build_assets_job("in_process_observation_job", [], source_assets)
observation_job = build_assets_job("in_process_observation_job", assets)
defs = Definitions(
assets=source_assets,
assets=assets,
jobs=[observation_job],
resources=resources,
)
Expand Down
Loading

0 comments on commit 19f636e

Please sign in to comment.