Skip to content

Commit

Permalink
[docs][guide] writing a multi-asset backed integration
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Jul 12, 2024
1 parent b286464 commit 07622a6
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ One of the most fundamental features that can be implemented in an integration i

- **Efficiency:** Allows defining multiple assets in a single function, reducing boilerplate code.
- **Simplicity:** Easier to manage related assets together.
- **Consistency:** Ensures that related assets are always defined and updated together.
- **Consistency:** Decorator based assets fit match the developer experience of the Dagster ecosystem.

### Cons

Expand All @@ -60,10 +60,7 @@ One of the most fundamental features that can be implemented in an integration i

### Tutorial

<Note>
A tutorial for writing a multi-asset decorator based integration is coming
soon!
</Note>
- [Writing a multi-asset decorator integration](/guides/integrations/writing-a-multi-asset-decorator-integration)

## Pipes protocol

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
---
title: "Writing a multi-asset decorator integration"
---

# Writing a multi-asset decorator integration

You may have noticed that Dagster makes frequent use of decorators when working with assets, jobs, ops, and more. If you have a service that produces many assets, it is possible to define it as a multi-asset decorator — offering a consistent and intuitive developer experience to existing Dagster APIs.

Existing integrations that implement this approach include:

- [dbt](https://docs.dagster.io/integrations/dbt)
- [Embedded ELT: dlt](https://docs.dagster.io/integrations/embedded-elt/dlt)
- [Embedded ELT: Sling](https://docs.dagster.io/integrations/embedded-elt/sling)

## Background

### What even is a decorator?

Before we dive into writing an integration that provides a multi-asset decorator, let's take a step back and review what a decorator is, and why you may consider using it.

A Python decorator is a function that wraps another function by adding an annotation above the function prefixed with an `@` symbol. It allows you to modify the behavior of an existing function without changing the source code of that function directly.

For example, say we have a function that performs some kind of computation.

```python
def add(a: int, b: int) -> int:
return a + b
```

Say we want to double the output of a function. We can create a decorator that modifies the output of the function that it wraps, and return the new modified value.

<Note>
A function that takes another function as input, or returns a function as a
result, is traditionally called a{" "}
<a href="https://en.wikipedia.org/wiki/Higher-order_function">
higher-order function
</a>
.
</Note>

To define a decorator, you write a function that takes a function as an input argument. Within this function, you define a wrapper function that encapsulates the function you're modifying, and then this wrapper function is returned by your decorator.

```python
def double(func):
def _wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if isinstance(result, (int, float)):
return result * 2
else:
return result
return _wrapper
```

We've defined a function called `double` that implements a `_wrapper` function. This function accepts variable `*args` and `**kwargs`, passing them to the inner function that it wraps. It calls the function it wraps, and modifies the output if the result is a numeric type.

```python
@double
def add(a: int, b: int) -> int:
return a + b

# >>> add(2, 3)
# 10
```

Decorators are extremely powerful in that you can perform operations before and after the function they wrap, and you can also manipulate the value returned by the function itself.

### Why decorators in Dagster

In the context of Dagster, decorators are helpful in that we are often wrapping some form of processing. For example, when writing an asset, you define your processing code, and by annotating that function with the <PyObject object="asset" decorator /> decorator. Then, the internal Dagster code can register the asset, assign metadata, pass in context data, or perform any other variety of operations that are required to integrate your asset code with the Dagster platform.

## Walkthrough

Now that we have a general idea of what a decorator is, and why it's a useful to in the context of developing Dagster code, let's walk through the development of a new multi-asset integration. This integration will take a YAML file, and produce a multi-asset, allowing the end-user to customize the translation of definition spec to how it maps to Dagster concepts.

### Input

This hypothetical tool is configured using a YAML definition file where someone can define source and destination databases, along with the tables that they would like to replicate.

```yaml
connections:
source:
type: duckdb
connection: example.duckdb
destination:
type: postgres
connection: postgresql://postgres:postgres@localhost/postgres

tables:
- name: users
primary_key: id
- name: products
primary_key: id
- name: activity
primary_key: id
```
For our integration, we would like to generate an asset for each table that is being replicated.
To keep this tutorial simple, let's assume that we have a library that provides us with a Python function that performs the replication process, and returns a dictionary with the status of each table.
```python
import yaml

from pathlib import Path
from typing import Mapping, Iterator, Any


def replicate(replication_configuration_yaml: Path) -> Iterator[Mapping[str, Any]]:
data = yaml.safe_load(replication_configuration_yaml.read_text())
for table in data.get("tables"):
# < perform replication here, and get status >
yield {"table": table.get("name"), "status": "success"}
```
### Implementation
First, let's define a `Project` object that takes in the path of our configuration YAML file. This will allow us to encapsulate the logic that gets metadata and table information from our project configuration.

```python
import yaml
from pathlib import Path
class ReplicationProject(self):
def __init__(self, replication_configuration_yaml: str):
self.replication_configuration_yaml = replication_configuration_yaml
def load(self):
return yaml.safe_load(Path(self.replication_configuration_yaml).read_text())
```

Here we define a function that returns a `multi_asset` function. The `multi_asset` function is a decorator itself, so this allows us to customize the behavior of `multi_asset` and create a new decorator of our own.

We load our replication project, and then iterate over the tables that were defined in the YAML file. Those are then used to create a list of `AssetSpec` objects that are passed to the `specs` parameter. This is what allows us to define the assets that are visible in the Dagster UI. However, we have not yet shown how to perform the actual execution of our replication function — that's what we will go over next!

```python
def custom_replication_assets(
*,
replication_project: ReplicationProject,
name: Optional[str] = None,
group_name: Optional[str] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
project = replication_project.load()
return multi_asset(
name=name,
group_name=group_name,
specs=[
AssetSpec(
key=table.get("name"),
)
for table in project.get("tables")
],
)
```

Thinking back to our previous example on how decorators work, they allow us to wrap a function that performs some operation. In the case of our `multi_asset` we are able to define `AssetSpec` objects for our project, and the actual processing that takes place will be in the body of the function we decorate.

In this function, we will perform the replication, and then yield `AssetMaterialization` objects indicating that the replication was successful for a given table.

```python
from dagster import AssetExecutionContext
replication_project_path = "replication.yaml"
replication_project = ReplicationProject(replication_project_path)
@custom_replication_assets(
replication_project=replication_project,
name="my_custom_replication_assets",
group_name="replication",
)
def my_assets(context: AssetExecutionContext):
results = replicate(Path(replication_project_path))
for table in results:
if table.get("status") == "SUCCESS":
yield AssetMaterialization(asset_key=str(table.get("name")), metadata=table)
```

You may have noticed, however, that there are a couple of limitations to this approach. Most importantly, we have not encapsulated the logic for replicating tables. Meaning, end-users who make use of the `custom_replication_assets` decorator would be responsible for yielding asset materializations themselves. Second, there is no way for the user to customize the attributes of the asset.

For the first limitation, we can resolve them by refactoring the code in the body of our asset function into a `Resource`.

### Moving our replication logic into a Resource

To accomplish this, we will extend the `ConfigurableResource` object to create our own custom resource. Then, we will define a `run` method that will perform the replication operation.

```python
from dagster import ConfigurableResource
from dagster._annotations import public
class ReplicationResource(ConfigurableResource):
@public
def run(
self, replication_project: ReplicationProject
) -> Iterator[AssetMaterialization]:
results = replicate(Path(replication_project.replication_configuration_yaml))
for table in results:
if table.get("status") == "SUCCESS":
# NOTE: this assumes that the table name is the same as the asset key
yield AssetMaterialization(
asset_key=str(table.get("name")), metadata=table
)
```

Now, we can refactor our `custom_replication_assets` instance to use this resource.

```python
@custom_replication_assets(
replication_project=replication_project,
name="my_custom_replication_assets",
group_name="replication",
)
def my_assets(replication_resource: ReplicationProject):
replication_resource.run(replication_project)
```

### Using translators

Previously we mentioned that there was no way for an end-user to customize the asset definitions, for example, changing the key of the asset. The recommended way to accomplish this is through the use of a translator class.

This class provides end-users with a way to override the translation between a configuration file, and how that maps to concepts in Dagster.

To start, we will define a translator method to map the table specification to a Dagster asset key associated, however, this approach can be mapped to many other attributes such as: dependencies, group name, metadata, and more. You can reference the implementation for other integrations for a full example.

```python
from dagster import AssetKey, _check as check
from dataclasses import dataclass
@dataclass
class ReplicationTranslator:
@public
def get_asset_key(self, table_definition: Mapping[str, str]) -> AssetKey:
return AssetKey(str(table_definition.get("name")))
```

Next, we can update our `custom_replication_assets` to use the translator when defining the `key` on the `AssetSpec`. You will note, that we have taken this opportunity to also include the replication project, and the translator instance on the `AssetSpec` metadata. This is a workaround that we tend to employ in this approach, making it possible to define these objects once, and then access them on the context of our asset.

```python
def custom_replication_assets(
*,
replication_project: ReplicationProject,
name: Optional[str] = None,
group_name: Optional[str] = None,
translator: Optional[ReplicationProject] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
project = replication_project.load()
translator = (
check.opt_inst_param(translator, "translator", ReplicationTranslator)
or ReplicationTranslator()
)
return multi_asset(
name=name,
group_name=group_name,
specs=[
AssetSpec(
key=translator.get_asset_key(table),
metadata={
"replication_project": project,
"replication_translator": translator,
},
)
for table in project.get("tables")
],
)
```

Finally, we have to update our resource to use the translator, and project that was provided on the metadata. We are using the `check` method provided by `dagster._check` to ensure that the type of the object is appropriate as we retrieve it from the metadata.

Now, we can use the same `translator.get_asset_key`when yielding the asset materialization, thus ensuring that our asset declarations match our asset materializations.

```python
class ReplicationResource(ConfigurableResource):
@public
def run(self, context: AssetExecutionContext) -> Iterator[AssetMaterialization]:
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
project = check.inst(
first_asset_metadata.get("replication_project"),
ReplicationProject,
)
translator = check.inst(
first_asset_metadata.get("replication_translator"),
ReplicationTranslator,
)
results = replicate(Path(project.replication_configuration_yaml))
for table in results:
if table.get("status") == "SUCCESS":
yield AssetMaterialization(
asset_key=translator.get_asset_key(table), metadata=table
)
```

### Conclusion

In this guide we walked through how to define a custom multi-asset decorator, a resource for encapsulating tool logic, and a translator for defining the logic to translate a specification to Dagster concepts.

Defining integrations with this approach aligns nicely with the overall development paradigm of Dagster, and is suitable for tools that generate many assets.

For more examples on how to build a multi-asset decorator, please reference the implementations for [dbt](https://docs.dagster.io/integrations/dbt), [dlt](https://docs.dagster.io/integrations/embedded-elt/dlt), and [Sling](https://docs.dagster.io/integrations/embedded-elt/sling).

0 comments on commit 07622a6

Please sign in to comment.