Skip to content

Commit

Permalink
[external-assets] Add support for auto_observe_interval_minutes on ex…
Browse files Browse the repository at this point in the history
…ternal observables
  • Loading branch information
smackesey committed Feb 13, 2024
1 parent 078c860 commit 3f2aba0
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 58 deletions.
28 changes: 28 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.asset_subset import ValidAssetSubset
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.errors import DagsterInvalidInvocationError
Expand Down Expand Up @@ -141,6 +142,10 @@ def root_materializable_or_observable_asset_keys(self) -> AbstractSet[AssetKey]:
def freshness_policies_by_key(self) -> Mapping[AssetKey, Optional[FreshnessPolicy]]:
return self._freshness_policies_by_key

@property
def observable_keys(self) -> AbstractSet[AssetKey]:
return {key for key, is_observable in self._is_observable_by_key.items() if is_observable}

@property
def auto_materialize_policies_by_key(
self,
Expand All @@ -159,6 +164,10 @@ def from_assets(
all_assets: Iterable[Union[AssetsDefinition, SourceAsset]],
asset_checks: Optional[Sequence[AssetChecksDefinition]] = None,
) -> "InternalAssetGraph":
from dagster._core.definitions.external_asset import (
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
)

assets_defs: List[AssetsDefinition] = []
source_assets: List[SourceAsset] = []
partitions_defs_by_key: Dict[AssetKey, Optional[PartitionsDefinition]] = {}
Expand Down Expand Up @@ -197,6 +206,25 @@ def from_assets(
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)

is_observable = asset.execution_type == AssetExecutionType.OBSERVATION
is_observable_by_key.update({key: is_observable for key in asset.keys})

# Set auto_observe_interval_minutes for external observable assets
# This can be removed when/if we have a a solution for mapping
# `auto_observe_interval_minutes` to an AutoMaterialzePolicy
first_key = next(iter(asset.keys), None)
if (
first_key
and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
in asset.metadata_by_key[first_key]
):
interval = asset.metadata_by_key[first_key][
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES
]
auto_observe_interval_minutes_by_key.update(
{key: interval for key in asset.keys}
)

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
if len(all_required_keys) > 1:
Expand Down
10 changes: 9 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dagster._core.definitions.op_selection import get_graph_subset
from dagster._core.definitions.partition_mapping import MultiPartitionMapping
from dagster._core.definitions.resource_requirement import (
ExternalAssetIOManagerRequirement,
RequiresResources,
ResourceAddable,
ResourceRequirement,
Expand Down Expand Up @@ -1334,7 +1335,14 @@ def get_io_manager_key_for_asset_key(self, key: AssetKey) -> str:
)[0].io_manager_key

def get_resource_requirements(self) -> Iterator[ResourceRequirement]:
yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined]
if self.is_executable:
yield from self.node_def.get_resource_requirements() # type: ignore[attr-defined]
else:
for key in self.keys:
yield ExternalAssetIOManagerRequirement(
key=self.get_io_manager_key_for_asset_key(key),
asset_key=key.to_string(),
)
for source_key, resource_def in self.resource_defs.items():
yield from resource_def.get_resource_requirements(outer_context=source_key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def asset(
metadata: Optional[Mapping[str, Any]] = ...,
description: Optional[str] = ...,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = ...,
required_resource_keys: Optional[AbstractSet[str]] = ...,
resource_defs: Optional[Mapping[str, object]] = ...,
io_manager_def: Optional[object] = ...,
io_manager_key: Optional[str] = ...,
Expand Down Expand Up @@ -111,7 +111,7 @@ def asset(
metadata: Optional[ArbitraryMetadataMapping] = None,
description: Optional[str] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[Set[str]] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
resource_defs: Optional[Mapping[str, object]] = None,
io_manager_def: Optional[object] = None,
io_manager_key: Optional[str] = None,
Expand Down
83 changes: 50 additions & 33 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._utils.warnings import disable_dagster_warnings


def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition:
Expand Down Expand Up @@ -125,44 +126,60 @@ def _external_assets_def(context: AssetExecutionContext) -> None:
return assets_defs


# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of
# external assets resulting from a source asset conversion. It contains the
# `auto_observe_interval_minutes` value from the source asset and is consulted
# in the auto-materialize daemon. It should eventually be eliminated in favor
# of an implementation of `auto_observe_interval_minutes` in terms of
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Automatically observed external assets not supported yet: auto_observe_interval_minutes"
" should be None",
)

kwargs = {
"key": source_asset.key,
"metadata": source_asset.metadata,
"group_name": source_asset.group_name,
"description": source_asset.description,
"partitions_def": source_asset.partitions_def,
"_execution_type": (
AssetExecutionType.UNEXECUTABLE
if source_asset.observe_fn is None
else AssetExecutionType.OBSERVATION
observe_interval = source_asset.auto_observe_interval_minutes
metadata = {
**source_asset.raw_metadata,
**(
{SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES: observe_interval}
if observe_interval
else {}
),
}

if source_asset.io_manager_def:
kwargs["io_manager_def"] = source_asset.io_manager_def
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
isinstance(return_value, Output)
and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata,
"The wrapped decorated_fn should return an Output with a special metadata key.",
with disable_dagster_warnings():

@asset(
key=source_asset.key,
metadata=metadata,
group_name=source_asset.group_name,
description=source_asset.description,
partitions_def=source_asset.partitions_def,
_execution_type=(
AssetExecutionType.UNEXECUTABLE
if source_asset.observe_fn is None
else AssetExecutionType.OBSERVATION
),
io_manager_key=source_asset.io_manager_key,
# We don't pass the `io_manager_def` because it will already be present in
# `resource_defs` (it is added during `SourceAsset` initialization).
resource_defs=source_asset.resource_defs,
# We need to access the raw attribute because the property will return a computed value that
# includes requirements for the io manager. Those requirements will be inferred again when
# we create an AssetsDefinition.
required_resource_keys=source_asset._required_resource_keys, # noqa: SLF001
)
return return_value
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
isinstance(return_value, Output)
and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata,
"The wrapped decorated_fn should return an Output with a special metadata key.",
)
return return_value

check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.host_representation.external import ExternalRepository
Expand Down Expand Up @@ -153,18 +154,17 @@ def from_repository_handles_and_external_asset_nodes(
node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source
}

is_observable_by_key = {key: False for key in all_non_source_keys}
is_observable_by_key = {}
auto_observe_interval_minutes_by_key = {}

for repo_handle, node in repo_handle_external_asset_nodes:
is_observable_by_key[node.asset_key] = (
node.execution_type == AssetExecutionType.OBSERVATION
)
auto_observe_interval_minutes_by_key[
node.asset_key
] = node.auto_observe_interval_minutes
if node.is_source:
# We need to set this even if the node is a regular asset in another code location.
# `is_observable` will only ever be consulted in the source asset context.
is_observable_by_key[node.asset_key] = node.is_observable
auto_observe_interval_minutes_by_key[
node.asset_key
] = node.auto_observe_interval_minutes

if node.asset_key in all_non_source_keys:
# one location's source is another location's non-source
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,29 @@ def describe_requirement(self) -> str:
)


class ExternalAssetIOManagerRequirement(
NamedTuple(
"_ExternalAssetIOManagerRequirement",
[
("key", str),
("asset_key", Optional[str]),
],
),
ResourceRequirement,
):
@property
def expected_type(self) -> Type:
from ..storage.io_manager import IOManagerDefinition

return IOManagerDefinition

def describe_requirement(self) -> str:
external_asset_descriptor = (
f"external asset with key {self.asset_key}" if self.asset_key else "external asset"
)
return f"io manager with key '{self.key}' required by {external_asset_descriptor}"


class SourceAssetIOManagerRequirement(
NamedTuple(
"_InputManagerRequirement",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def _external_sensors(self) -> Dict[str, "ExternalSensor"]:
if asset_key not in covered_asset_keys:
default_sensor_asset_keys.add(asset_key)

for asset_key in asset_graph.source_asset_keys:
for asset_key in asset_graph.observable_keys:
if asset_graph.get_auto_observe_interval_minutes(asset_key) is None:
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ def external_asset_nodes_from_defs(

asset_keys_without_definitions = all_upstream_asset_keys.difference(
node_defs_by_asset_key.keys()
).difference(source_assets_by_key.keys())
).difference({*source_assets_by_key.keys()})

asset_nodes = [
ExternalAssetNode(
Expand Down Expand Up @@ -1729,6 +1729,14 @@ def external_asset_nodes_from_defs(
node_handle = node_handle.parent
graph_name = node_handle.name

if asset_key in source_assets_by_key:
source_asset = source_assets_by_key[asset_key]
is_observable = source_asset.is_observable
auto_observe_interval_minutes = source_asset.auto_observe_interval_minutes
else:
is_observable = False
auto_observe_interval_minutes = None

asset_nodes.append(
ExternalAssetNode(
asset_key=asset_key,
Expand All @@ -1749,6 +1757,8 @@ def external_asset_nodes_from_defs(
partitions_def_data=partitions_def_data,
output_name=output_def.name,
metadata=asset_metadata,
is_observable=is_observable,
auto_observe_interval_minutes=auto_observe_interval_minutes,
# assets defined by Out(asset_key="k") do not have any group
# name specified we default to DEFAULT_GROUP_NAME here to ensure
# such assets are part of the default group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
from pytest import fixture


Expand Down Expand Up @@ -44,59 +45,60 @@ def asset1():
)


@fixture
def single_auto_observe_source_asset_graph():
@fixture(params=[True, False], ids=["use_external_asset", "use_source_asset"])
def single_auto_observe_asset_graph(request):
@observable_source_asset(auto_observe_interval_minutes=30)
def asset1():
...

asset_graph = AssetGraph.from_assets([asset1])
observable = create_external_asset_from_source_asset(asset1) if request.param else asset1
asset_graph = AssetGraph.from_assets([observable])
return asset_graph


def test_single_observable_source_asset_no_prior_observe_requests(
single_auto_observe_source_asset_graph,
single_auto_observe_asset_graph,
):
run_requests = get_auto_observe_run_requests(
asset_graph=single_auto_observe_source_asset_graph,
asset_graph=single_auto_observe_asset_graph,
current_timestamp=1000,
last_observe_request_timestamp_by_asset_key={},
run_tags={},
auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys,
auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys,
)
assert len(run_requests) == 1
run_request = run_requests[0]
assert run_request.asset_selection == [AssetKey("asset1")]


def test_single_observable_source_asset_prior_observe_requests(
single_auto_observe_source_asset_graph,
single_auto_observe_asset_graph,
):
last_timestamp = 1000

run_requests = get_auto_observe_run_requests(
asset_graph=single_auto_observe_source_asset_graph,
asset_graph=single_auto_observe_asset_graph,
current_timestamp=last_timestamp + 30 * 60 + 5,
last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp},
run_tags={},
auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys,
auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys,
)
assert len(run_requests) == 1
run_request = run_requests[0]
assert run_request.asset_selection == [AssetKey("asset1")]


def test_single_observable_source_asset_prior_recent_observe_requests(
single_auto_observe_source_asset_graph,
single_auto_observe_asset_graph,
):
last_timestamp = 1000

run_requests = get_auto_observe_run_requests(
asset_graph=single_auto_observe_source_asset_graph,
asset_graph=single_auto_observe_asset_graph,
current_timestamp=last_timestamp + 30 * 60 - 5,
last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp},
run_tags={},
auto_observe_asset_keys=single_auto_observe_source_asset_graph.source_asset_keys,
auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys,
)
assert len(run_requests) == 0

Expand Down

0 comments on commit 3f2aba0

Please sign in to comment.