Skip to content

Commit

Permalink
[external-assets] Make observe support external observables
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/gql-repo-build-assets-job]
  • Loading branch information
smackesey committed Feb 20, 2024
1 parent fb20575 commit fd7e3f9
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 84 deletions.
31 changes: 23 additions & 8 deletions python_modules/dagster/dagster/_core/definitions/observe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import deprecated_param
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster._utils.warnings import disable_dagster_warnings
from dagster._utils.warnings import disable_dagster_warnings, normalize_renamed_param

from ..instance import DagsterInstance
from .source_asset import SourceAsset
Expand All @@ -12,22 +15,27 @@
from ..execution.execute_in_process_result import ExecuteInProcessResult


@deprecated_param(
param="source_assets", breaking_version="2.0.0", additional_warn_text="Use `assets` instead."
)
def observe(
source_assets: Sequence[SourceAsset],
assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None,
run_config: Any = None,
instance: Optional[DagsterInstance] = None,
resources: Optional[Mapping[str, object]] = None,
partition_key: Optional[str] = None,
raise_on_error: bool = True,
tags: Optional[Mapping[str, str]] = None,
*,
source_assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None,
) -> "ExecuteInProcessResult":
"""Executes a single-threaded, in-process run which observes provided source assets.
By default, will materialize assets to the local filesystem.
Args:
source_assets (Sequence[SourceAsset]):
The source assets to materialize.
assets (Sequence[Union[AssetsDefinition, SourceAsset]]):
The assets to observe.
resources (Optional[Mapping[str, object]]):
The resources needed for execution. Can provide resource instances
directly, or resource definitions. Note that if provided resources
Expand All @@ -37,19 +45,26 @@ def observe(
The string partition key that specifies the run config to execute. Can only be used
to select run config for assets with partitioned config.
tags (Optional[Mapping[str, str]]): Tags for the run.
assets (Sequence[Union[AssetsDefinition, SourceAsset]]):
The assets to observe.
Returns:
ExecuteInProcessResult: The result of the execution.
"""
source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset))
assets = check.not_none(
normalize_renamed_param(assets, "assets", source_assets, "source_assets")
)
assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset))
instance = check.opt_inst_param(instance, "instance", DagsterInstance)
partition_key = check.opt_str_param(partition_key, "partition_key")
resources = check.opt_mapping_param(resources, "resources", key_type=str)

with disable_dagster_warnings():
observation_job = define_asset_job("in_process_observation_job", source_assets)
observation_job = define_asset_job(
"in_process_observation_job", selection=AssetSelection.all(include_sources=True)
)
defs = Definitions(
assets=source_assets,
assets=assets,
jobs=[observation_job],
resources=resources,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def observe_sources(*args):
def observe_sources_fn(*, instance, times_by_key, **kwargs):
for arg in args:
key = AssetKey(arg)
observe(source_assets=[versioned_repo.source_assets_by_key[key]], instance=instance)
observe(assets=[versioned_repo.source_assets_by_key[key]], instance=instance)
latest_record = instance.get_latest_data_version_record(key, is_source=True)
latest_timestamp = latest_record.timestamp
times_by_key[key].append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def test_time_fn():
if run.is_observation:
observe(
instance=instance,
source_assets=[
assets=[
a
for a in self.assets
if isinstance(a, SourceAsset) and a.key in run.asset_keys
Expand Down
Loading

0 comments on commit fd7e3f9

Please sign in to comment.