Skip to content

Commit

Permalink
External Assets Concept Page (#16935)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Adds an External Assets concept page (motivation described in
#16754).

This also contains a code change necessary because of the bug
demonstrated in #17077.

## How I Tested These Changes

BK. Also loaded examples in `dagster dev`

---------

Co-authored-by: Erin Cochran <[email protected]>
Co-authored-by: Yuhan Luo <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent 90d0ca5 commit 7319d02
Show file tree
Hide file tree
Showing 23 changed files with 602 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@
{
"title": "Asset checks (Experimental)",
"path": "/concepts/assets/asset-checks"
},
{
"title": "External assets (Experimental)",
"path": "/concepts/assets/external-assets"
}
]
},
Expand Down
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ An asset is an object in persistent storage, such as a table, file, or persisted
title="Asset checks (Experimental)"
href="/concepts/assets/asset-checks"
></ArticleListItem>
<ArticleListItem
title="External assets (Experimental)"
href="/concepts/assets/external-assets"
></ArticleListItem>
</ArticleList>

---
Expand Down
334 changes: 334 additions & 0 deletions docs/content/concepts/assets/external-assets.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
---
title: External Assets | Dagster
description: External assets model assets in Dagster that are not scheduled or materialized in Dagster.
---

# External Assets (Experimental)

An **external asset** is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. This allows you to model assets in Dagster, attach metadata and events to those assets, but without scheduling their materialization with Dagster.

**External assets are a good fit when data is**:

- Landed by an external source (e.g. an external file landing daily; Kafka landing data into Amazon S3)
- Created and processed using manual processes
- Materialized by existing pipelines with their own scheduling and infrastructure that you do not want to or need to migrate en masse

**With an external asset, you can:**

- Attach metadata to its definition for documentation, tracking ownership, and so on
- Track its data quality and version in Dagster
- Use [asset sensors](/concepts/partitions-schedules-sensors/asset-sensors) or auto-materialize policies to update downstream assets based on updates to external assets

**You cannot, however:**

- Schedule an external asset's materialization
- Backfill an external asset using Dagster
- Use the [Dagster UI](/concepts/webserver/ui) or [GraphQL API](/concepts/webserver/graphql) to instigate ad hoc materializations

<Note>
<strong>What about Source Assets?</strong> A common use case for external
assets is modeling data produced by a process not under Dagster's control. For
example, a daily file drop from a third party into Amazon S3. In most systems,
these are described as <strong>sources</strong>. This includes Dagster, which
includes <PyObject object="SourceAsset" displayText="SourceAsset" />. As
external assets are a superset of Source Asset functionality,{" "}
<strong>
source assets will be supplanted by external assets in the near future
</strong>
.
</Note>

---

## Relevant APIs

| Name | Description |
| ------------------------------------------------ | ------------------------------------------------------------------------------------------- |
| <PyObject object="external_assets_from_specs" /> | Create list of <PyObject object="AssetsDefinition"/> objects that represent external assets |
| <PyObject object="AssetSpec" /> | An object that represents the metadata of a particular asset |

---

## Defining external assets

The following code declares a single external asset that represents a file in S3 and passes it to a <PyObject object="Definitions"/> object:

<TabGroup>
<TabItem name="Asset definition">

Click the **Asset in the Dagster UI** tab to see how this asset would be rendered in the Dagster UI.

```python file=/concepts/assets/external_assets/single_declaration.py
from dagster import AssetSpec, Definitions, external_asset_from_spec

defs = Definitions(assets=[external_asset_from_spec(AssetSpec("file_in_s3"))])
```

---

</TabItem>
<TabItem name="Asset in the Dagster UI">

Click the **Asset definition** tab to view how this asset is defined.

<Image
alt="The files_in_s3 external asset in the Asset Graph of the Dagster UI"
src="/images/concepts/assets/external-asset.png"
width={3024}
height={1654}
/>

---

</TabItem>
</TabGroup>

### External assets with dependencies

External assets can depend only on other external assets.

Dependencies are defined by using the `deps` argument of <PyObject object="AssetSpec" />. This enables Dagster to model entire graphs of assets scheduled and orchestrated by other systems.

In the following example, we have two assets: `raw_logs` and `processed_logs`. The `processed_logs` asset is produced by a scheduled computation in another orchestration system. Using external assets allows you to model both assets in Dagster.

<TabGroup>
<TabItem name="Asset definitions">

Click the **Assets in the Dagster UI** tab to see how these assets would be rendered in the Dagster UI.

```python file=/concepts/assets/external_assets/external_asset_deps.py
from dagster import AssetSpec, Definitions, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])

defs = Definitions(assets=external_assets_from_specs([raw_logs, processed_logs]))
```

---

</TabItem>
<TabItem name="Assets in the Dagster UI">

Click the **Asset definitions** tab to view how these assets are defined.

<Image
alt="External assets with dependencies in the Dagster UI"
src="/images/concepts/assets/external-assets-show-detail.png"
width={3024}
height={1654}
/>

---

</TabItem>
</TabGroup>

### Fully-managed assets with external asset dependencies

Fully-managed assets can depend on external assets. In this example, the `aggregated_logs` asset depends on `processed_logs`, which is an external asset:

<TabGroup>
<TabItem name="Asset definitions">

Click the **Assets in the Dagster UI** tab to see how these assets would be rendered in the Dagster UI.

```python file=/concepts/assets/external_assets/normal_asset_depending_on_external.py
from dagster import AssetSpec, Definitions, asset, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])


@asset(deps=[processed_logs])
def aggregated_logs() -> None:
# Loads "processed_log" into memory and performs some aggregation
...


defs = Definitions(
assets=[aggregated_logs, *external_assets_from_specs([raw_logs, processed_logs])]
)
```

</TabItem>
<TabItem name="Assets in the Dagster UI">

Click the **Asset definitions** tab to view how these assets are defined.

<Image
alt="An external asset as an upstream dependency to a fully-managed asset in the Dagster UI"
src="/images/concepts/assets/external-assets-normal-dep-on-external.png"
width={3024}
height={1654}
/>

</TabItem>
</TabGroup>

---

## Updating external asset metadata

As Dagster doesn't control scheduling or materializing external assets, it's up to you to keep their metadata updated. This also means that materialization for external assets will be disabled in the Dagster UI.

To keep your external assets updated, you can use any of the following approaches:

- [A REST API](#using-the-rest-api)
- [Sensors](#using-sensors)
- [Using the Python API](#using-the-python-api)
- [Logging events in ops](#logging-events-in-unrelated-ops)

### Using the REST API

Dagster OSS exposes a REST endpoint for reporting asset materializations. Refer to the following tabs for examples using a `curl` command, and for invoking the API in Python.

<TabGroup>
<TabItem name="Using curl">

The following demonstrates how to use a `curl` command in a shell script to communicate with the API:

```bash
curl --request POST \
--url https://path/to/instance/report_asset_materialization/{asset_key}\
--header 'Content-Type: application/json' \
--data '{
"metadata" : {
"source": "From curl command"
}
}'
```

</TabItem>
<TabItem name="Using Python">

The following demonstrates how to invoke the API in Python using the `requests` library:

```python
import requests

url = f"https://path/to/instance/report_asset_materialization/{asset_key}"
payload = { "metadata": { "source": "From python script" } }
headers = { "Content-Type": "application/json" }

response = requests.request("POST", url, json=payload, headers=headers)
```

</TabItem>
</TabGroup>

The API also has endpoints for reporting [asset observations](/concepts/assets/asset-observations) and [asset check evaluations](/concepts/assets/asset-checks).

### Using sensors

By using the `asset_events` parameter of <PyObject object="SensorResult" />, you can generate events to attach to external assets and then provide them directly to sensors. For example:

```python file=/concepts/assets/external_assets/external_asset_using_sensor.py
import datetime

from dagster import (
AssetMaterialization,
AssetSpec,
Definitions,
SensorEvaluationContext,
SensorResult,
external_asset_from_spec,
sensor,
)


def utc_now_str() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")


@sensor()
def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
# Materialization happened in external system, but is recorded here
return SensorResult(
asset_events=[
AssetMaterialization(
asset_key="external_asset_a",
metadata={
"source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
},
)
]
)


defs = Definitions(
assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
sensors=[keep_external_asset_a_up_to_date],
)
```

### Using the Python API

You can insert events to attach to external assets directly from Dagster's Python API. Specifically, the API is `report_runless_asset_event` on <PyObject object="DagsterInstance" />.

For example, this would be useful when writing a hand-rolled Python script to backfill metadata:

```python file=/concepts/assets/external_assets/external_asset_events_using_python_api.py startafter=start_python_api_marker endbefore=end_python_api_marker dedent=4
from dagster import AssetMaterialization

# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
AssetMaterialization(
"asset_one", metadata={"nrows": 10, "source": "From this script."}
)
)
```

### Logging events in unrelated ops

You can log an <PyObject object="AssetMaterialization"/> from a bare op. In this case, use the `log_event` method of <PyObject object="OpExecutionContext"/> to report an asset materialization of an external asset. For example:

```python file=/concepts/assets/external_assets/update_external_asset_via_op.py
from dagster import (
AssetMaterialization,
AssetSpec,
Definitions,
OpExecutionContext,
external_asset_from_spec,
job,
op,
)


@op
def an_op(context: OpExecutionContext) -> None:
context.log_event(AssetMaterialization(asset_key="external_asset"))


@job
def a_job() -> None:
an_op()


defs = Definitions(
assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
)
```

---

## Related

<ArticleList>
<ArticleListItem
title="Software-defined Assets"
href="/concepts/assets-software-defined-assets"
></ArticleListItem>
<ArticleListItem
title="Asset sensors"
href="/concepts/partitions-schedules-sensors/asset-sensors"
></ArticleListItem>
<ArticleListItem
title="Asset checks"
href="/concepts/assets/asset-checks"
></ArticleListItem>
<ArticleListItem
title="Asset observations"
href="/concepts/assets/asset-observations"
></ArticleListItem>
</ArticleList>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dagster import AssetSpec, Definitions, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])

defs = Definitions(assets=external_assets_from_specs([raw_logs, processed_logs]))
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dagster import AssetSpec, Definitions, external_assets_from_specs

asset_one = AssetSpec("asset_one")
asset_two = AssetSpec("asset_two", deps=[asset_one])

defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two]))


def do_report_runless_asset_event(instance) -> None:
# start_python_api_marker
from dagster import AssetMaterialization

# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
AssetMaterialization(
"asset_one", metadata={"nrows": 10, "source": "From this script."}
)
)


# end_python_api_marker
Loading

1 comment on commit 7319d02

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-utj8keik3-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 7319d02.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.