diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json
index ce3d030c3bcf3..31cc6b4025a68 100644
--- a/docs/content/_navigation.json
+++ b/docs/content/_navigation.json
@@ -105,6 +105,10 @@
{
"title": "Asset checks (Experimental)",
"path": "/concepts/assets/asset-checks"
+ },
+ {
+ "title": "External assets (Experimental)",
+ "path": "/concepts/assets/external-assets"
}
]
},
diff --git a/docs/content/concepts.mdx b/docs/content/concepts.mdx
index 72155dc186335..22b2d01b2e173 100644
--- a/docs/content/concepts.mdx
+++ b/docs/content/concepts.mdx
@@ -45,6 +45,10 @@ An asset is an object in persistent storage, such as a table, file, or persisted
title="Asset checks (Experimental)"
href="/concepts/assets/asset-checks"
>
+
---
diff --git a/docs/content/concepts/assets/external-assets.mdx b/docs/content/concepts/assets/external-assets.mdx
new file mode 100644
index 0000000000000..a47832a432b00
--- /dev/null
+++ b/docs/content/concepts/assets/external-assets.mdx
@@ -0,0 +1,150 @@
+---
+title: External Assets | Dagster
+description: External assets model assets in Dagster that are not scheduled or materialized in Dagster.
+---
+
+# External assets (Experimental)
+
+An external asset is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. This allows a user to model assets in Dagster, attach metadata and events to those assets, without scheduling their materialization in data. Example use cases
+
+- Data that is landed by an external source (e.g. a file landing daily in s3)
+- Data that is created and processed using manual processes
+- Data that is materialized by existing pipelines with their own scheduling and infrastructure that you do not want to or need to migrate en masse.
+
+With an external assets you can:
+
+- Attach metadata to its definition for documentation, tracking ownership, and so on
+- Track its data quality in Dagster.
+- Track its data version in Dagster
+- Schedule downstream assets based on updates to external assets using asset sensors or auto-materialize policies
+
+But you cannot
+
+- Schedule its materialiation
+- Backfill it using Dagster
+- Use the UI or GraphQL API to instigate ad hoc materializations.
+
+## Relevant APIs
+
+| Name | Description |
+| ------------------------------------------------ | --------------------------------------------------------------------------------------- |
+| | Create a list of external asset definitions from a list of specs |
+| | An object that represents the metadata of a particular asset |
+| | Result object for a sensor evalation where you pass events to attach to external assets |
+
+## Defining External Assets
+
+### A single external asset
+
+The following code declares a single external asset and passes it to
+
+```python file=/concepts/assets/external_assets/single_declaration.py
+from dagster import AssetSpec, Definitions, external_asset_from_spec
+
+defs = Definitions(assets=[external_asset_from_spec(AssetSpec("my_asset"))])
+```
+
+### External assets with dependencies
+
+External assets can depend on other external assets. You do this by using the `deps` argument of .
+
+```python file=/concepts/assets/external_assets/external_asset_deps.py
+from dagster import AssetSpec, Definitions, external_assets_from_specs
+
+asset_one = AssetSpec("asset_one")
+asset_two = AssetSpec("asset_two", deps=[asset_one])
+
+defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two]))
+```
+
+## Keeping an external asset's metadata up to date.
+
+Since with external assets Dagster does not control scheduling or materialization, it is up to the user to keep the metadata for an external asset up-to-date. There are a number of mechanisms for doing so:
+
+- REST APIs
+- Sensors
+- Log events in ops
+
+### Using REST API
+
+TODO: joe
+
+### Using Sensors
+
+You can generate events to attach to external assets and provide them to sensors directly, by using the `asset_events` param of \ SensorResult:
+ # materialization happened in external system, but is recorded here
+ return SensorResult(
+ asset_events=[
+ AssetMaterialization(
+ asset_key="external_asset_a",
+ metadata={
+ "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
+ },
+ )
+ ]
+ )
+
+
+defs = Definitions(
+ assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
+ sensors=[keep_external_asset_a_up_to_date],
+)
+```
+
+### Log events in unrelated ops
+
+A user can log an from a bare op. In this case we use the `log_event` method of to report an asset materialization of an external asset.
+
+```python file=/concepts/assets/external_assets/update_external_asset_via_op.py
+from dagster import (
+ AssetMaterialization,
+ AssetSpec,
+ Definitions,
+ external_asset_from_spec,
+ job,
+ op,
+)
+from dagster._core.execution.context.compute import OpExecutionContext
+
+
+@op
+def an_op(context: OpExecutionContext) -> None:
+ context.log_event(AssetMaterialization(asset_key="external_asset"))
+
+
+@job
+def a_job() -> None:
+ an_op()
+
+
+defs = Definitions(
+ assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
+)
+```
+
+### What about Source Assets?
+
+A common use case for external assets is modeling data produced by an process not under Dagster's control. For example a daily drop of a file from a third party in s3.
+
+In most systems these are described as sources. This includes Dagster, which includes which will be supplanted by external assets in the near-term future, as external assets are a superset of the functionality of Source Assets.
diff --git a/docs/content/concepts/assets/multi-assets.mdx b/docs/content/concepts/assets/multi-assets.mdx
index b7098ef6dd497..9366d9f9cc365 100644
--- a/docs/content/concepts/assets/multi-assets.mdx
+++ b/docs/content/concepts/assets/multi-assets.mdx
@@ -17,7 +17,7 @@ A multi-asset represents a set of software-defined assets that are all updated b
When working with [software-defined assets](/concepts/assets/software-defined-assets), it's sometimes inconvenient or impossible to map each persisted asset to a unique [op](/concepts/ops-jobs-graphs/ops) or [graph](/concepts/ops-jobs-graphs/graphs). A multi-asset is a way to define a single op or graph that will produce the contents of multiple data assets at once.
-Multi-assets may be useful in the following scenarios:
+ Multi-assets may be useful in the following scenarios:
- A single call to an API results in multiple tables being updated (e.g. Airbyte, Fivetran, dbt).
- The same in-memory object is used to compute multiple assets.
diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/__init__.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py
new file mode 100644
index 0000000000000..59fe9a31dcfd8
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py
@@ -0,0 +1,6 @@
+from dagster import AssetSpec, Definitions, external_assets_from_specs
+
+asset_one = AssetSpec("asset_one")
+asset_two = AssetSpec("asset_two", deps=[asset_one])
+
+defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two]))
diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py
new file mode 100644
index 0000000000000..e2fe09460d0df
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py
@@ -0,0 +1,36 @@
+import datetime
+
+from dagster import (
+ AssetMaterialization,
+ AssetSpec,
+ Definitions,
+ SensorResult,
+ external_asset_from_spec,
+ sensor,
+)
+from dagster._core.definitions.sensor_definition import SensorEvaluationContext
+
+
+def utc_now_str() -> str:
+ return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")
+
+
+@sensor()
+def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
+ # materialization happened in external system, but is recorded here
+ return SensorResult(
+ asset_events=[
+ AssetMaterialization(
+ asset_key="external_asset_a",
+ metadata={
+ "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
+ },
+ )
+ ]
+ )
+
+
+defs = Definitions(
+ assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
+ sensors=[keep_external_asset_a_up_to_date],
+)
diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py
new file mode 100644
index 0000000000000..0316170da4b34
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py
@@ -0,0 +1,3 @@
+from dagster import AssetSpec, Definitions, external_asset_from_spec
+
+defs = Definitions(assets=[external_asset_from_spec(AssetSpec("my_asset"))])
diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py
new file mode 100644
index 0000000000000..e77293c4222cb
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py
@@ -0,0 +1,24 @@
+from dagster import (
+ AssetMaterialization,
+ AssetSpec,
+ Definitions,
+ external_asset_from_spec,
+ job,
+ op,
+)
+from dagster._core.execution.context.compute import OpExecutionContext
+
+
+@op
+def an_op(context: OpExecutionContext) -> None:
+ context.log_event(AssetMaterialization(asset_key="external_asset"))
+
+
+@job
+def a_job() -> None:
+ an_op()
+
+
+defs = Definitions(
+ assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
+)
diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/__init__.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py
new file mode 100644
index 0000000000000..f668d5ea61eab
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py
@@ -0,0 +1,23 @@
+from dagster import (
+ AssetKey,
+ AssetMaterialization,
+ DagsterInstance,
+ SensorResult,
+ build_sensor_context,
+)
+from docs_snippets.concepts.assets.external_assets.external_asset_using_sensor import (
+ keep_external_asset_a_up_to_date,
+)
+
+
+def test_keep_external_asset_a_up_to_date() -> None:
+ instance = DagsterInstance.ephemeral()
+ result = keep_external_asset_a_up_to_date(
+ build_sensor_context(
+ instance=instance, sensor_name="keep_external_asset_a_up_to_date"
+ )
+ )
+ assert isinstance(result, SensorResult)
+ assert len(result.asset_events) == 1
+ assert isinstance(result.asset_events[0], AssetMaterialization)
+ assert result.asset_events[0].asset_key == AssetKey("external_asset_a")
diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py
new file mode 100644
index 0000000000000..39a225815eb23
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py
@@ -0,0 +1,22 @@
+import docs_snippets.concepts.assets.external_assets.external_asset_deps
+import docs_snippets.concepts.assets.external_assets.single_declaration
+from dagster import AssetKey, Definitions
+
+
+def test_docs_snippets_concepts_external_asset_single_decl() -> None:
+ single_decl_defs: Definitions = (
+ docs_snippets.concepts.assets.external_assets.single_declaration.defs
+ )
+ assert single_decl_defs.get_assets_def("my_asset")
+
+
+def test_docs_snippets_concepts_external_asset_external_asset_deps() -> None:
+ defs_with_deps: Definitions = (
+ docs_snippets.concepts.assets.external_assets.external_asset_deps.defs
+ )
+ assert defs_with_deps.get_assets_def("asset_one")
+ assert defs_with_deps.get_assets_def("asset_two")
+ assert defs_with_deps.get_assets_def("asset_two").asset_deps == {
+ AssetKey("asset_one"): set(),
+ AssetKey("asset_two"): {AssetKey("asset_one")},
+ }
diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py
new file mode 100644
index 0000000000000..66e6b4c5620d0
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py
@@ -0,0 +1,14 @@
+import docs_snippets.concepts.assets.external_assets.update_external_asset_via_op
+from dagster import AssetKey, DagsterInstance, Definitions
+
+
+def test_external_assets_update_external_asset_via_op_0() -> None:
+ defs: Definitions = (
+ docs_snippets.concepts.assets.external_assets.update_external_asset_via_op.defs
+ )
+ a_job_def = defs.get_job_def("a_job")
+ instance = DagsterInstance.ephemeral()
+ result = a_job_def.execute_in_process(instance=instance)
+ assert result.success
+
+ assert instance.get_latest_materialization_event(AssetKey("external_asset"))
diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py
index 17176ae4ef2e7..f655e3f9203b4 100644
--- a/python_modules/dagster/dagster/__init__.py
+++ b/python_modules/dagster/dagster/__init__.py
@@ -199,6 +199,10 @@
multiple_process_executor_requirements as multiple_process_executor_requirements,
multiprocess_executor as multiprocess_executor,
)
+from dagster._core.definitions.external_asset import (
+ external_asset_from_spec as external_asset_from_spec,
+ external_assets_from_specs as external_assets_from_specs,
+)
from dagster._core.definitions.freshness_policy import FreshnessPolicy as FreshnessPolicy
from dagster._core.definitions.freshness_policy_sensor_definition import (
FreshnessPolicySensorContext as FreshnessPolicySensorContext,
diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py
index 1f813fb63a1a4..2d543fb667329 100644
--- a/python_modules/dagster/dagster/_core/definitions/external_asset.py
+++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py
@@ -16,6 +16,10 @@
from dagster._core.execution.context.compute import AssetExecutionContext
+def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition:
+ return external_assets_from_specs([spec])[0]
+
+
def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinition]:
"""Create an external assets definition from a sequence of asset specs.
@@ -81,48 +85,40 @@ def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinit
Args:
specs (Sequence[AssetSpec]): The specs for the assets.
"""
- assets_defs = []
- for spec in specs:
- check.invariant(
- spec.auto_materialize_policy is None,
- "auto_materialize_policy must be None since it is ignored",
- )
- check.invariant(spec.code_version is None, "code_version must be None since it is ignored")
- check.invariant(
- spec.freshness_policy is None, "freshness_policy must be None since it is ignored"
- )
- check.invariant(
- spec.skippable is False,
- "skippable must be False since it is ignored and False is the default",
- )
- @multi_asset(
- specs=[
- AssetSpec(
- key=spec.key,
- description=spec.description,
- group_name=spec.group_name,
- metadata={
- **(spec.metadata or {}),
- **{
- SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: (
- AssetExecutionType.UNEXECUTABLE.value
- )
- },
- },
- deps=spec.deps,
- )
- ]
+ @multi_asset(specs=list(map(create_spec_for_inner_assets_def, specs)))
+ def _external_assets_def(context: AssetExecutionContext):
+ raise DagsterInvariantViolationError(
+ "You have attempted to execute an unexecutable asset"
+ f" {context.asset_key.to_user_string()}."
)
- def _external_assets_def(context: AssetExecutionContext) -> None:
- raise DagsterInvariantViolationError(
- "You have attempted to execute an unexecutable asset"
- f" {context.asset_key.to_user_string}."
- )
- assets_defs.append(_external_assets_def)
+ return [_external_assets_def]
+
- return assets_defs
+def create_spec_for_inner_assets_def(spec: AssetSpec) -> AssetSpec:
+ check.invariant(
+ spec.auto_materialize_policy is None,
+ "auto_materialize_policy must be None since it is ignored",
+ )
+ check.invariant(spec.code_version is None, "code_version must be None since it is ignored")
+ check.invariant(
+ spec.freshness_policy is None, "freshness_policy must be None since it is ignored"
+ )
+ check.invariant(
+ spec.skippable is False,
+ "skippable must be False since it is ignored and False is the default",
+ )
+ return AssetSpec(
+ key=spec.key,
+ description=spec.description,
+ group_name=spec.group_name,
+ metadata={
+ **(spec.metadata or {}),
+ **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
+ },
+ deps=spec.deps,
+ )
def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
diff --git a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py
index bba4703b30947..6e4507b96ab2e 100644
--- a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py
+++ b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py
@@ -196,6 +196,10 @@ def __exit__(self, *exc) -> None:
def resource_defs(self) -> Optional[Mapping[str, "ResourceDefinition"]]:
return self._resource_defs
+ @property
+ def sensor_name(self) -> str:
+ return check.not_none(self._sensor_name, "Only valid when sensor name provided")
+
def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluationContext":
"""Merge the specified resources into this context.
diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py
index 3a8470691c5d9..c597adf63f4e3 100644
--- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py
+++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py
@@ -17,6 +17,7 @@
observable_source_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
+from dagster._core.definitions.decorators.asset_decorator import multi_asset
from dagster._core.definitions.external_asset import (
create_external_asset_from_source_asset,
external_assets_from_specs,
@@ -228,3 +229,51 @@ def an_observable_source_asset() -> DataVersion:
# Note this does not make sense. We need to make framework changes to allow for the omission of
# a materialzation event
assert len(all_materializations) == 1
+
+
+def test_external_assets_with_dependencies_manual_construction() -> None:
+ upstream_asset = AssetSpec("upstream_asset")
+ downstream_asset = AssetSpec("downstream_asset", deps=[upstream_asset])
+
+ @multi_asset(name="_generated_asset_def_1", specs=[upstream_asset])
+ def _upstream_def(context: AssetExecutionContext) -> None:
+ raise Exception("do not execute")
+
+ @multi_asset(name="_generated_asset_def_2", specs=[downstream_asset])
+ def _downstream_asset(context: AssetExecutionContext) -> None:
+ raise Exception("do not execute")
+
+ defs = Definitions(assets=[_upstream_def, _downstream_asset])
+ assert defs
+
+ assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[
+ AssetKey("downstream_asset")
+ ] == {AssetKey("upstream_asset")}
+
+
+def test_external_asset_multi_asset() -> None:
+ upstream_asset = AssetSpec("upstream_asset")
+ downstream_asset = AssetSpec("downstream_asset", deps=[upstream_asset])
+
+ @multi_asset(specs=[downstream_asset, upstream_asset])
+ def _generated_asset_def(context: AssetExecutionContext):
+ raise Exception("do not execute")
+
+ defs = Definitions(assets=[_generated_asset_def])
+ assert defs
+
+ assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[
+ AssetKey("downstream_asset")
+ ] == {AssetKey("upstream_asset")}
+
+
+def test_external_assets_with_dependencies() -> None:
+ upstream_asset = AssetSpec("upstream_asset")
+ downstream_asset = AssetSpec("downstream_asset", deps=[upstream_asset])
+
+ defs = Definitions(assets=external_assets_from_specs([upstream_asset, downstream_asset]))
+ assert defs
+
+ assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[
+ AssetKey("downstream_asset")
+ ] == {AssetKey("upstream_asset")}