Skip to content

Commit

Permalink
guide scaffoling
Browse files Browse the repository at this point in the history
add example code snipper

check point

change to single multi to avoid bug

refactor into helper

cp

self review

cp

cp
  • Loading branch information
schrockn committed Oct 9, 2023
1 parent 5674ab9 commit 98b8736
Show file tree
Hide file tree
Showing 17 changed files with 378 additions and 39 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@
{
"title": "Asset checks (Experimental)",
"path": "/concepts/assets/asset-checks"
},
{
"title": "External assets (Experimental)",
"path": "/concepts/assets/external-assets"
}
]
},
Expand Down
4 changes: 4 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ An asset is an object in persistent storage, such as a table, file, or persisted
title="Asset checks (Experimental)"
href="/concepts/assets/asset-checks"
></ArticleListItem>
<ArticleListItem
title="External assets (Experimental)"
href="/concepts/assets/external-assets"
></ArticleListItem>
</ArticleList>

---
Expand Down
150 changes: 150 additions & 0 deletions docs/content/concepts/assets/external-assets.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
---
title: External Assets | Dagster
description: External assets model assets in Dagster that are not scheduled or materialized in Dagster.
---

# External assets (Experimental)

An external asset is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. This allows a user to model assets in Dagster, attach metadata and events to those assets, without scheduling their materialization in data. Example use cases

- Data that is landed by an external source (e.g. a file landing daily in s3)
- Data that is created and processed using manual processes
- Data that is materialized by existing pipelines with their own scheduling and infrastructure that you do not want to or need to migrate en masse.

With an external assets you can:

- Attach metadata to its definition for documentation, tracking ownership, and so on
- Track its data quality in Dagster.
- Track its data version in Dagster
- Schedule downstream assets based on updates to external assets using asset sensors or auto-materialize policies

But you cannot

- Schedule its materialiation
- Backfill it using Dagster
- Use the UI or GraphQL API to instigate ad hoc materializations.

## Relevant APIs

| Name | Description |
| ------------------------------------------------ | --------------------------------------------------------------------------------------- |
| <PyObject object="external_assets_from_specs" /> | Create a list of external asset definitions from a list of specs |
| <PyObject object="AssetSpec" /> | An object that represents the metadata of a particular asset |
| <PyObject object="SensorResult" /> | Result object for a sensor evalation where you pass events to attach to external assets |

## Defining External Assets

### A single external asset

The following code declares a single external asset and passes it to <PyObject object="Definitions"/>

```python file=/concepts/assets/external_assets/single_declaration.py
from dagster import AssetSpec, Definitions, external_asset_from_spec

defs = Definitions(assets=[external_asset_from_spec(AssetSpec("my_asset"))])
```

### External assets with dependencies

External assets can depend on other external assets. You do this by using the `deps` argument of <PyObject object="AssetSpec" />.

```python file=/concepts/assets/external_assets/external_asset_deps.py
from dagster import AssetSpec, Definitions, external_assets_from_specs

asset_one = AssetSpec("asset_one")
asset_two = AssetSpec("asset_two", deps=[asset_one])

defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two]))
```

## Keeping an external asset's metadata up to date.

Since with external assets Dagster does not control scheduling or materialization, it is up to the user to keep the metadata for an external asset up-to-date. There are a number of mechanisms for doing so:

- REST APIs
- Sensors
- Log events in ops

### Using REST API

TODO: joe

### Using Sensors

You can generate events to attach to external assets and provide them to sensors directly, by using the `asset_events` param of \<PyObject object="SensorResult/>.

```python file=/concepts/assets/external_assets/external_asset_using_sensor.py
import datetime

from dagster import (
AssetMaterialization,
AssetSpec,
Definitions,
SensorResult,
external_asset_from_spec,
sensor,
)
from dagster._core.definitions.sensor_definition import SensorEvaluationContext


def utc_now_str() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")


@sensor()
def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
# materialization happened in external system, but is recorded here
return SensorResult(
asset_events=[
AssetMaterialization(
asset_key="external_asset_a",
metadata={
"source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
},
)
]
)


defs = Definitions(
assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
sensors=[keep_external_asset_a_up_to_date],
)
```

### Log events in unrelated ops

A user can log an <PyObject object="AssetMaterialization"/> from a bare op. In this case we use the `log_event` method of <PyObject object="OpExecutionContext"/> to report an asset materialization of an external asset.

```python file=/concepts/assets/external_assets/update_external_asset_via_op.py
from dagster import (
AssetMaterialization,
AssetSpec,
Definitions,
external_asset_from_spec,
job,
op,
)
from dagster._core.execution.context.compute import OpExecutionContext


@op
def an_op(context: OpExecutionContext) -> None:
context.log_event(AssetMaterialization(asset_key="external_asset"))


@job
def a_job() -> None:
an_op()


defs = Definitions(
assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
)
```

### What about Source Assets?

A common use case for external assets is modeling data produced by an process not under Dagster's control. For example a daily drop of a file from a third party in s3.

In most systems these are described as sources. This includes Dagster, which includes <PyObject object="SourceAsset" displayText="SourceAsset" /> which will be supplanted by external assets in the near-term future, as external assets are a superset of the functionality of Source Assets.
2 changes: 1 addition & 1 deletion docs/content/concepts/assets/multi-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ A multi-asset represents a set of software-defined assets that are all updated b

When working with [software-defined assets](/concepts/assets/software-defined-assets), it's sometimes inconvenient or impossible to map each persisted asset to a unique [op](/concepts/ops-jobs-graphs/ops) or [graph](/concepts/ops-jobs-graphs/graphs). A multi-asset is a way to define a single op or graph that will produce the contents of multiple data assets at once.

Multi-assets may be useful in the following scenarios:
Multi-assets may be useful in the following scenarios:

- A single call to an API results in multiple tables being updated (e.g. Airbyte, Fivetran, dbt).
- The same in-memory object is used to compute multiple assets.
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dagster import AssetSpec, Definitions, external_assets_from_specs

asset_one = AssetSpec("asset_one")
asset_two = AssetSpec("asset_two", deps=[asset_one])

defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two]))
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import datetime

from dagster import (
AssetMaterialization,
AssetSpec,
Definitions,
SensorResult,
external_asset_from_spec,
sensor,
)
from dagster._core.definitions.sensor_definition import SensorEvaluationContext


def utc_now_str() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")


@sensor()
def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
# materialization happened in external system, but is recorded here
return SensorResult(
asset_events=[
AssetMaterialization(
asset_key="external_asset_a",
metadata={
"source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
},
)
]
)


defs = Definitions(
assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
sensors=[keep_external_asset_a_up_to_date],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dagster import AssetSpec, Definitions, external_asset_from_spec

defs = Definitions(assets=[external_asset_from_spec(AssetSpec("my_asset"))])
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dagster import (
AssetMaterialization,
AssetSpec,
Definitions,
external_asset_from_spec,
job,
op,
)
from dagster._core.execution.context.compute import OpExecutionContext


@op
def an_op(context: OpExecutionContext) -> None:
context.log_event(AssetMaterialization(asset_key="external_asset"))


@job
def a_job() -> None:
an_op()


defs = Definitions(
assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dagster import (
AssetKey,
AssetMaterialization,
DagsterInstance,
SensorResult,
build_sensor_context,
)
from docs_snippets.concepts.assets.external_assets.external_asset_using_sensor import (
keep_external_asset_a_up_to_date,
)


def test_keep_external_asset_a_up_to_date() -> None:
instance = DagsterInstance.ephemeral()
result = keep_external_asset_a_up_to_date(
build_sensor_context(
instance=instance, sensor_name="keep_external_asset_a_up_to_date"
)
)
assert isinstance(result, SensorResult)
assert len(result.asset_events) == 1
assert isinstance(result.asset_events[0], AssetMaterialization)
assert result.asset_events[0].asset_key == AssetKey("external_asset_a")
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import docs_snippets.concepts.assets.external_assets.external_asset_deps
import docs_snippets.concepts.assets.external_assets.single_declaration
from dagster import AssetKey, Definitions


def test_docs_snippets_concepts_external_asset_single_decl() -> None:
single_decl_defs: Definitions = (
docs_snippets.concepts.assets.external_assets.single_declaration.defs
)
assert single_decl_defs.get_assets_def("my_asset")


def test_docs_snippets_concepts_external_asset_external_asset_deps() -> None:
defs_with_deps: Definitions = (
docs_snippets.concepts.assets.external_assets.external_asset_deps.defs
)
assert defs_with_deps.get_assets_def("asset_one")
assert defs_with_deps.get_assets_def("asset_two")
assert defs_with_deps.get_assets_def("asset_two").asset_deps == {
AssetKey("asset_one"): set(),
AssetKey("asset_two"): {AssetKey("asset_one")},
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import docs_snippets.concepts.assets.external_assets.update_external_asset_via_op
from dagster import AssetKey, DagsterInstance, Definitions


def test_external_assets_update_external_asset_via_op_0() -> None:
defs: Definitions = (
docs_snippets.concepts.assets.external_assets.update_external_asset_via_op.defs
)
a_job_def = defs.get_job_def("a_job")
instance = DagsterInstance.ephemeral()
result = a_job_def.execute_in_process(instance=instance)
assert result.success

assert instance.get_latest_materialization_event(AssetKey("external_asset"))
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@
multiple_process_executor_requirements as multiple_process_executor_requirements,
multiprocess_executor as multiprocess_executor,
)
from dagster._core.definitions.external_asset import (
external_asset_from_spec as external_asset_from_spec,
external_assets_from_specs as external_assets_from_specs,
)
from dagster._core.definitions.freshness_policy import FreshnessPolicy as FreshnessPolicy
from dagster._core.definitions.freshness_policy_sensor_definition import (
FreshnessPolicySensorContext as FreshnessPolicySensorContext,
Expand Down
Loading

0 comments on commit 98b8736

Please sign in to comment.