From 98b873633e42a8235659b081684c22bcf9e85910 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Sat, 30 Sep 2023 05:58:19 -0400 Subject: [PATCH] guide scaffoling add example code snipper check point change to single multi to avoid bug refactor into helper cp self review cp cp --- docs/content/_navigation.json | 4 + docs/content/concepts.mdx | 4 + .../concepts/assets/external-assets.mdx | 150 ++++++++++++++++++ docs/content/concepts/assets/multi-assets.mdx | 2 +- .../assets/external_assets/__init__.py | 0 .../external_assets/external_asset_deps.py | 6 + .../external_asset_using_sensor.py | 36 +++++ .../external_assets/single_declaration.py | 3 + .../update_external_asset_via_op.py | 24 +++ .../external_asset_tests/__init__.py | 0 .../test_external_asset_sensor.py | 23 +++ .../test_external_assets_decls.py | 22 +++ .../test_external_assets_with_ops.py | 14 ++ python_modules/dagster/dagster/__init__.py | 4 + .../_core/definitions/external_asset.py | 72 ++++----- .../_core/definitions/sensor_definition.py | 4 + .../definitions_tests/test_external_assets.py | 49 ++++++ 17 files changed, 378 insertions(+), 39 deletions(-) create mode 100644 docs/content/concepts/assets/external-assets.mdx create mode 100644 examples/docs_snippets/docs_snippets/concepts/assets/external_assets/__init__.py create mode 100644 examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py create mode 100644 examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py create mode 100644 examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py create mode 100644 examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py create mode 100644 examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/__init__.py create mode 100644 examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py create mode 100644 examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py create mode 100644 examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index ce3d030c3bcf3..31cc6b4025a68 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -105,6 +105,10 @@ { "title": "Asset checks (Experimental)", "path": "/concepts/assets/asset-checks" + }, + { + "title": "External assets (Experimental)", + "path": "/concepts/assets/external-assets" } ] }, diff --git a/docs/content/concepts.mdx b/docs/content/concepts.mdx index 72155dc186335..22b2d01b2e173 100644 --- a/docs/content/concepts.mdx +++ b/docs/content/concepts.mdx @@ -45,6 +45,10 @@ An asset is an object in persistent storage, such as a table, file, or persisted title="Asset checks (Experimental)" href="/concepts/assets/asset-checks" > + --- diff --git a/docs/content/concepts/assets/external-assets.mdx b/docs/content/concepts/assets/external-assets.mdx new file mode 100644 index 0000000000000..a47832a432b00 --- /dev/null +++ b/docs/content/concepts/assets/external-assets.mdx @@ -0,0 +1,150 @@ +--- +title: External Assets | Dagster +description: External assets model assets in Dagster that are not scheduled or materialized in Dagster. +--- + +# External assets (Experimental) + +An external asset is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. This allows a user to model assets in Dagster, attach metadata and events to those assets, without scheduling their materialization in data. Example use cases + +- Data that is landed by an external source (e.g. a file landing daily in s3) +- Data that is created and processed using manual processes +- Data that is materialized by existing pipelines with their own scheduling and infrastructure that you do not want to or need to migrate en masse. + +With an external assets you can: + +- Attach metadata to its definition for documentation, tracking ownership, and so on +- Track its data quality in Dagster. +- Track its data version in Dagster +- Schedule downstream assets based on updates to external assets using asset sensors or auto-materialize policies + +But you cannot + +- Schedule its materialiation +- Backfill it using Dagster +- Use the UI or GraphQL API to instigate ad hoc materializations. + +## Relevant APIs + +| Name | Description | +| ------------------------------------------------ | --------------------------------------------------------------------------------------- | +| | Create a list of external asset definitions from a list of specs | +| | An object that represents the metadata of a particular asset | +| | Result object for a sensor evalation where you pass events to attach to external assets | + +## Defining External Assets + +### A single external asset + +The following code declares a single external asset and passes it to + +```python file=/concepts/assets/external_assets/single_declaration.py +from dagster import AssetSpec, Definitions, external_asset_from_spec + +defs = Definitions(assets=[external_asset_from_spec(AssetSpec("my_asset"))]) +``` + +### External assets with dependencies + +External assets can depend on other external assets. You do this by using the `deps` argument of . + +```python file=/concepts/assets/external_assets/external_asset_deps.py +from dagster import AssetSpec, Definitions, external_assets_from_specs + +asset_one = AssetSpec("asset_one") +asset_two = AssetSpec("asset_two", deps=[asset_one]) + +defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two])) +``` + +## Keeping an external asset's metadata up to date. + +Since with external assets Dagster does not control scheduling or materialization, it is up to the user to keep the metadata for an external asset up-to-date. There are a number of mechanisms for doing so: + +- REST APIs +- Sensors +- Log events in ops + +### Using REST API + +TODO: joe + +### Using Sensors + +You can generate events to attach to external assets and provide them to sensors directly, by using the `asset_events` param of \ SensorResult: + # materialization happened in external system, but is recorded here + return SensorResult( + asset_events=[ + AssetMaterialization( + asset_key="external_asset_a", + metadata={ + "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"' + }, + ) + ] + ) + + +defs = Definitions( + assets=[external_asset_from_spec(AssetSpec("external_asset_a"))], + sensors=[keep_external_asset_a_up_to_date], +) +``` + +### Log events in unrelated ops + +A user can log an from a bare op. In this case we use the `log_event` method of to report an asset materialization of an external asset. + +```python file=/concepts/assets/external_assets/update_external_asset_via_op.py +from dagster import ( + AssetMaterialization, + AssetSpec, + Definitions, + external_asset_from_spec, + job, + op, +) +from dagster._core.execution.context.compute import OpExecutionContext + + +@op +def an_op(context: OpExecutionContext) -> None: + context.log_event(AssetMaterialization(asset_key="external_asset")) + + +@job +def a_job() -> None: + an_op() + + +defs = Definitions( + assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job] +) +``` + +### What about Source Assets? + +A common use case for external assets is modeling data produced by an process not under Dagster's control. For example a daily drop of a file from a third party in s3. + +In most systems these are described as sources. This includes Dagster, which includes which will be supplanted by external assets in the near-term future, as external assets are a superset of the functionality of Source Assets. diff --git a/docs/content/concepts/assets/multi-assets.mdx b/docs/content/concepts/assets/multi-assets.mdx index b7098ef6dd497..9366d9f9cc365 100644 --- a/docs/content/concepts/assets/multi-assets.mdx +++ b/docs/content/concepts/assets/multi-assets.mdx @@ -17,7 +17,7 @@ A multi-asset represents a set of software-defined assets that are all updated b When working with [software-defined assets](/concepts/assets/software-defined-assets), it's sometimes inconvenient or impossible to map each persisted asset to a unique [op](/concepts/ops-jobs-graphs/ops) or [graph](/concepts/ops-jobs-graphs/graphs). A multi-asset is a way to define a single op or graph that will produce the contents of multiple data assets at once. -Multi-assets may be useful in the following scenarios: + Multi-assets may be useful in the following scenarios: - A single call to an API results in multiple tables being updated (e.g. Airbyte, Fivetran, dbt). - The same in-memory object is used to compute multiple assets. diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/__init__.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py new file mode 100644 index 0000000000000..59fe9a31dcfd8 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_deps.py @@ -0,0 +1,6 @@ +from dagster import AssetSpec, Definitions, external_assets_from_specs + +asset_one = AssetSpec("asset_one") +asset_two = AssetSpec("asset_two", deps=[asset_one]) + +defs = Definitions(assets=external_assets_from_specs([asset_one, asset_two])) diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py new file mode 100644 index 0000000000000..e2fe09460d0df --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/external_asset_using_sensor.py @@ -0,0 +1,36 @@ +import datetime + +from dagster import ( + AssetMaterialization, + AssetSpec, + Definitions, + SensorResult, + external_asset_from_spec, + sensor, +) +from dagster._core.definitions.sensor_definition import SensorEvaluationContext + + +def utc_now_str() -> str: + return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S") + + +@sensor() +def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult: + # materialization happened in external system, but is recorded here + return SensorResult( + asset_events=[ + AssetMaterialization( + asset_key="external_asset_a", + metadata={ + "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"' + }, + ) + ] + ) + + +defs = Definitions( + assets=[external_asset_from_spec(AssetSpec("external_asset_a"))], + sensors=[keep_external_asset_a_up_to_date], +) diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py new file mode 100644 index 0000000000000..0316170da4b34 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/single_declaration.py @@ -0,0 +1,3 @@ +from dagster import AssetSpec, Definitions, external_asset_from_spec + +defs = Definitions(assets=[external_asset_from_spec(AssetSpec("my_asset"))]) diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py new file mode 100644 index 0000000000000..e77293c4222cb --- /dev/null +++ b/examples/docs_snippets/docs_snippets/concepts/assets/external_assets/update_external_asset_via_op.py @@ -0,0 +1,24 @@ +from dagster import ( + AssetMaterialization, + AssetSpec, + Definitions, + external_asset_from_spec, + job, + op, +) +from dagster._core.execution.context.compute import OpExecutionContext + + +@op +def an_op(context: OpExecutionContext) -> None: + context.log_event(AssetMaterialization(asset_key="external_asset")) + + +@job +def a_job() -> None: + an_op() + + +defs = Definitions( + assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job] +) diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/__init__.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py new file mode 100644 index 0000000000000..f668d5ea61eab --- /dev/null +++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_asset_sensor.py @@ -0,0 +1,23 @@ +from dagster import ( + AssetKey, + AssetMaterialization, + DagsterInstance, + SensorResult, + build_sensor_context, +) +from docs_snippets.concepts.assets.external_assets.external_asset_using_sensor import ( + keep_external_asset_a_up_to_date, +) + + +def test_keep_external_asset_a_up_to_date() -> None: + instance = DagsterInstance.ephemeral() + result = keep_external_asset_a_up_to_date( + build_sensor_context( + instance=instance, sensor_name="keep_external_asset_a_up_to_date" + ) + ) + assert isinstance(result, SensorResult) + assert len(result.asset_events) == 1 + assert isinstance(result.asset_events[0], AssetMaterialization) + assert result.asset_events[0].asset_key == AssetKey("external_asset_a") diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py new file mode 100644 index 0000000000000..39a225815eb23 --- /dev/null +++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_decls.py @@ -0,0 +1,22 @@ +import docs_snippets.concepts.assets.external_assets.external_asset_deps +import docs_snippets.concepts.assets.external_assets.single_declaration +from dagster import AssetKey, Definitions + + +def test_docs_snippets_concepts_external_asset_single_decl() -> None: + single_decl_defs: Definitions = ( + docs_snippets.concepts.assets.external_assets.single_declaration.defs + ) + assert single_decl_defs.get_assets_def("my_asset") + + +def test_docs_snippets_concepts_external_asset_external_asset_deps() -> None: + defs_with_deps: Definitions = ( + docs_snippets.concepts.assets.external_assets.external_asset_deps.defs + ) + assert defs_with_deps.get_assets_def("asset_one") + assert defs_with_deps.get_assets_def("asset_two") + assert defs_with_deps.get_assets_def("asset_two").asset_deps == { + AssetKey("asset_one"): set(), + AssetKey("asset_two"): {AssetKey("asset_one")}, + } diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py new file mode 100644 index 0000000000000..66e6b4c5620d0 --- /dev/null +++ b/examples/docs_snippets/docs_snippets_tests/concepts_tests/external_asset_tests/test_external_assets_with_ops.py @@ -0,0 +1,14 @@ +import docs_snippets.concepts.assets.external_assets.update_external_asset_via_op +from dagster import AssetKey, DagsterInstance, Definitions + + +def test_external_assets_update_external_asset_via_op_0() -> None: + defs: Definitions = ( + docs_snippets.concepts.assets.external_assets.update_external_asset_via_op.defs + ) + a_job_def = defs.get_job_def("a_job") + instance = DagsterInstance.ephemeral() + result = a_job_def.execute_in_process(instance=instance) + assert result.success + + assert instance.get_latest_materialization_event(AssetKey("external_asset")) diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py index 17176ae4ef2e7..f655e3f9203b4 100644 --- a/python_modules/dagster/dagster/__init__.py +++ b/python_modules/dagster/dagster/__init__.py @@ -199,6 +199,10 @@ multiple_process_executor_requirements as multiple_process_executor_requirements, multiprocess_executor as multiprocess_executor, ) +from dagster._core.definitions.external_asset import ( + external_asset_from_spec as external_asset_from_spec, + external_assets_from_specs as external_assets_from_specs, +) from dagster._core.definitions.freshness_policy import FreshnessPolicy as FreshnessPolicy from dagster._core.definitions.freshness_policy_sensor_definition import ( FreshnessPolicySensorContext as FreshnessPolicySensorContext, diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 1f813fb63a1a4..2d543fb667329 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -16,6 +16,10 @@ from dagster._core.execution.context.compute import AssetExecutionContext +def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition: + return external_assets_from_specs([spec])[0] + + def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinition]: """Create an external assets definition from a sequence of asset specs. @@ -81,48 +85,40 @@ def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinit Args: specs (Sequence[AssetSpec]): The specs for the assets. """ - assets_defs = [] - for spec in specs: - check.invariant( - spec.auto_materialize_policy is None, - "auto_materialize_policy must be None since it is ignored", - ) - check.invariant(spec.code_version is None, "code_version must be None since it is ignored") - check.invariant( - spec.freshness_policy is None, "freshness_policy must be None since it is ignored" - ) - check.invariant( - spec.skippable is False, - "skippable must be False since it is ignored and False is the default", - ) - @multi_asset( - specs=[ - AssetSpec( - key=spec.key, - description=spec.description, - group_name=spec.group_name, - metadata={ - **(spec.metadata or {}), - **{ - SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( - AssetExecutionType.UNEXECUTABLE.value - ) - }, - }, - deps=spec.deps, - ) - ] + @multi_asset(specs=list(map(create_spec_for_inner_assets_def, specs))) + def _external_assets_def(context: AssetExecutionContext): + raise DagsterInvariantViolationError( + "You have attempted to execute an unexecutable asset" + f" {context.asset_key.to_user_string()}." ) - def _external_assets_def(context: AssetExecutionContext) -> None: - raise DagsterInvariantViolationError( - "You have attempted to execute an unexecutable asset" - f" {context.asset_key.to_user_string}." - ) - assets_defs.append(_external_assets_def) + return [_external_assets_def] + - return assets_defs +def create_spec_for_inner_assets_def(spec: AssetSpec) -> AssetSpec: + check.invariant( + spec.auto_materialize_policy is None, + "auto_materialize_policy must be None since it is ignored", + ) + check.invariant(spec.code_version is None, "code_version must be None since it is ignored") + check.invariant( + spec.freshness_policy is None, "freshness_policy must be None since it is ignored" + ) + check.invariant( + spec.skippable is False, + "skippable must be False since it is ignored and False is the default", + ) + return AssetSpec( + key=spec.key, + description=spec.description, + group_name=spec.group_name, + metadata={ + **(spec.metadata or {}), + **{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, + }, + deps=spec.deps, + ) def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: diff --git a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py index bba4703b30947..6e4507b96ab2e 100644 --- a/python_modules/dagster/dagster/_core/definitions/sensor_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/sensor_definition.py @@ -196,6 +196,10 @@ def __exit__(self, *exc) -> None: def resource_defs(self) -> Optional[Mapping[str, "ResourceDefinition"]]: return self._resource_defs + @property + def sensor_name(self) -> str: + return check.not_none(self._sensor_name, "Only valid when sensor name provided") + def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluationContext": """Merge the specified resources into this context. diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py index 3a8470691c5d9..c597adf63f4e3 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_external_assets.py @@ -17,6 +17,7 @@ observable_source_asset, ) from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.decorators.asset_decorator import multi_asset from dagster._core.definitions.external_asset import ( create_external_asset_from_source_asset, external_assets_from_specs, @@ -228,3 +229,51 @@ def an_observable_source_asset() -> DataVersion: # Note this does not make sense. We need to make framework changes to allow for the omission of # a materialzation event assert len(all_materializations) == 1 + + +def test_external_assets_with_dependencies_manual_construction() -> None: + upstream_asset = AssetSpec("upstream_asset") + downstream_asset = AssetSpec("downstream_asset", deps=[upstream_asset]) + + @multi_asset(name="_generated_asset_def_1", specs=[upstream_asset]) + def _upstream_def(context: AssetExecutionContext) -> None: + raise Exception("do not execute") + + @multi_asset(name="_generated_asset_def_2", specs=[downstream_asset]) + def _downstream_asset(context: AssetExecutionContext) -> None: + raise Exception("do not execute") + + defs = Definitions(assets=[_upstream_def, _downstream_asset]) + assert defs + + assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[ + AssetKey("downstream_asset") + ] == {AssetKey("upstream_asset")} + + +def test_external_asset_multi_asset() -> None: + upstream_asset = AssetSpec("upstream_asset") + downstream_asset = AssetSpec("downstream_asset", deps=[upstream_asset]) + + @multi_asset(specs=[downstream_asset, upstream_asset]) + def _generated_asset_def(context: AssetExecutionContext): + raise Exception("do not execute") + + defs = Definitions(assets=[_generated_asset_def]) + assert defs + + assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[ + AssetKey("downstream_asset") + ] == {AssetKey("upstream_asset")} + + +def test_external_assets_with_dependencies() -> None: + upstream_asset = AssetSpec("upstream_asset") + downstream_asset = AssetSpec("downstream_asset", deps=[upstream_asset]) + + defs = Definitions(assets=external_assets_from_specs([upstream_asset, downstream_asset])) + assert defs + + assert defs.get_implicit_global_asset_job_def().asset_layer.asset_deps[ + AssetKey("downstream_asset") + ] == {AssetKey("upstream_asset")}