Skip to content

Commit

Permalink
[external-assets] Make observe support external observables
Browse files Browse the repository at this point in the history
[INTERNAL_BRANCH=sean/gql-repo-build-assets-job]
  • Loading branch information
smackesey committed Feb 20, 2024
1 parent fc799f7 commit 214c718
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 84 deletions.
1 change: 0 additions & 1 deletion pyrightconfig-alt-1.json

This file was deleted.

18 changes: 11 additions & 7 deletions python_modules/dagster/dagster/_core/definitions/observe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job
from dagster._utils.warnings import disable_dagster_warnings
Expand All @@ -13,7 +15,7 @@


def observe(
source_assets: Sequence[SourceAsset],
assets: Sequence[Union[AssetsDefinition, SourceAsset]],
run_config: Any = None,
instance: Optional[DagsterInstance] = None,
resources: Optional[Mapping[str, object]] = None,
Expand All @@ -26,8 +28,8 @@ def observe(
By default, will materialize assets to the local filesystem.
Args:
source_assets (Sequence[SourceAsset]):
The source assets to materialize.
assets (Sequence[Union[AssetsDefinition, SourceAsset]]):
The assets to observe.
resources (Optional[Mapping[str, object]]):
The resources needed for execution. Can provide resource instances
directly, or resource definitions. Note that if provided resources
Expand All @@ -41,15 +43,17 @@ def observe(
Returns:
ExecuteInProcessResult: The result of the execution.
"""
source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset))
assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset))
instance = check.opt_inst_param(instance, "instance", DagsterInstance)
partition_key = check.opt_str_param(partition_key, "partition_key")
resources = check.opt_mapping_param(resources, "resources", key_type=str)

with disable_dagster_warnings():
observation_job = define_asset_job("in_process_observation_job", source_assets)
observation_job = define_asset_job(
"in_process_observation_job", selection=AssetSelection.all(include_sources=True)
)
defs = Definitions(
assets=source_assets,
assets=assets,
jobs=[observation_job],
resources=resources,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def observe_sources(*args):
def observe_sources_fn(*, instance, times_by_key, **kwargs):
for arg in args:
key = AssetKey(arg)
observe(source_assets=[versioned_repo.source_assets_by_key[key]], instance=instance)
observe(assets=[versioned_repo.source_assets_by_key[key]], instance=instance)
latest_record = instance.get_latest_data_version_record(key, is_source=True)
latest_timestamp = latest_record.timestamp
times_by_key[key].append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def test_time_fn():
if run.is_observation:
observe(
instance=instance,
source_assets=[
assets=[
a
for a in self.assets
if isinstance(a, SourceAsset) and a.key in run.asset_keys
Expand Down
Loading

0 comments on commit 214c718

Please sign in to comment.