Skip to content

Commit

Permalink
[daggy-u] [dbt] - Add Lesson 5 (DEV-57) (#19947)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR adds Lesson 5 of the new dbt module to Dagster University.

TODOs

- [x] Add screenshots
- [ ] Update code snippets to use file import

## How I Tested These Changes

---------

Co-authored-by: Tim Castillo <[email protected]>
  • Loading branch information
2 people authored and cmpadden committed Feb 28, 2024
1 parent 226c587 commit e98b947
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 1 deletion.
7 changes: 6 additions & 1 deletion docs/dagster-university/pages/dagster-dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ title: Dagster + dbt
- [Overview](/dagster-dbt/lesson-4/1-overview)
- [Speeding up the development cycle](/dagster-dbt/lesson-4/2-speeding-up-the-development-cycle)
- [Debugging failed runs](/dagster-dbt/lesson-4/3-debugging-failed-runs)
- [Customizing your execution](/dagster-dbt/lesson-4/4-customizing-your-execution)
- [Customizing your execution](/dagster-dbt/lesson-4/4-customizing-your-execution)
- Lesson 5: Adding dependencies and automation to dbt models
- [Overview](/dagster-dbt/lesson-5/1-overview)
- [Connecting dbt models to Dagster assets](/dagster-dbt/lesson-5/2-connecting-dbt-models-to-dagster-assets)
- [Creating assets that depend on dbt models](/dagster-dbt/lesson-5/3-creating-assets-that-depend-on-dbt-models)
- [Automating dbt models in Dagster](/dagster-dbt/lesson-5/4-automating-dbt-models-in-dagster)
11 changes: 11 additions & 0 deletions docs/dagster-university/pages/dagster-dbt/lesson-5/1-overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
title: 'Lesson 5: Overview'
module: 'dagster_dbt'
lesson: '5'
---

# Overview

In Lesson 3, you loaded your dbt project's models as assets into Dagster. You also materialized some of those models.

In this lesson, we’ll integrate more dbt models with the rest of your Dagster project. You’ll use the existing Dagster assets as sources in your dbt project and learn how to further customize how Dagster maps your dbt project with the `DagsterDbtTranslator` class. To wrap things up, we’ll show you how to automate the running of the dbt models in Dagster.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
---
title: 'Lesson 5: Connecting dbt models to Dagster assets'
module: 'dagster_dbt'
lesson: '5'
---

# Connecting dbt models to Dagster assets

With where we left off, you may have noticed that the sources for your dbt projects are not just tables that exist in DuckDB, but also *assets* that Dagster created. However, the staging models that use those sources aren’t linked to the Dagster assets that produced them.

Let’s fix that by telling Dagster that the dbt sources are the tables that the `taxi_trips` and `taxi_zones` asset definitions produce. To match up these assets, we’ll override dbt’s asset key with the name `taxi_trips`. By having the asset keys line up, Dagster will know that these assets are the same and should merge them.

This is accomplished by changing the dbt source’s asset keys to be the same as the matching assets that Dagster makes. In this case, the dbt source’s default asset key is `raw_taxis/trips`, and the table that we’re making with Dagster has an asset key of `taxi_trips`.

To adjust how Dagster names the asset keys for your project’s dbt models, we’ll need to override the `dagster-dbt` integration’s default logic for how to interpret the dbt project. This mapping is contained in the `DagsterDbtTranslator` class.

---

## Customizing how Dagster understands dbt projects

The `DagsterDbtTranslator` class is the default mapping for how Dagster interprets and maps your dbt project. As Dagster loops through each of your dbt models, it will execute each of the translator’s functions and use the return value to configure your new Dagster asset.

However, you can override its methods by making a new class that inherits from and provides your logic for a dbt model. Refer to the `dagster-dbt` package’s [API Reference](https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DagsterDbtTranslator) for more info on the different functions you can override in the `DagsterDbtTranslator` class.

For now, we’ll customize how asset keys are defined by overriding the translator’s `get_asset_key` method.

Open the `assets/dbt.py` file and do the following:

1. Update the imports to include:
- From the `dagster_dbt` module, import `DagsterDbtTranslator`
- From the `dagster` module, import `AssetKey`

2. Create a new class called `CustomizedDagsterDbtTranslator` that inherits from the `DagsterDbtTranslator`. Add this code after the imports in `assets/dbt.py`:

```python
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
```

3. In this class, create a method called `get_asset_key.`

This is a method of `DagsterDbtTranslator` class that we'll override and customize to do as we need. It is a `@classmethod`, so we'll annotate it with the `@classmethod` decorator and have its first argument be `cls`, to follow [Pythonic conventions](https://builtin.com/software-engineering-perspectives/python-cls). The second argument refers to a dictionary/JSON object for the dbt model’s properties, which is based on the manifest file from earlier. Let’s call that second argument `dbt_resource_props`. The return value of this function is an object of the `AssetKey` class.

```python
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):

@classmethod
def get_asset_key(cls, dbt_resource_props):
```

4. Now, let’s fill in the `get_asset_key` method with our own logic for defining asset keys.

1. There are two properties that we’ll want from `dbt_resource_props`: the `type` (ex., model or source) and the `name`, such as `trips` or `stg_trips`. Access both of those properties from the `dbt_resource_props` argument and store them in their own respective variables (`type` and `name`):

```python
@classmethod
def get_asset_key(cls, dbt_resource_props):
type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
```

2. As mentioned above, the asset keys of our existing Dagster assets used by our dbt project are named `taxi_trips` and `taxi_zones`. If you were to print out the `name`, you’d see that the dbt sources are named `trips` and `zones`. Therefore, to match our asset keys up, we can prefix our keys with the string `taxi_` .

Copy and paste the following code to return an `AssetKey` of `AssetKey(f"taxi_{name}")`:

```python
@classmethod
def get_asset_key(cls, dbt_resource_props):
type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]

return AssetKey(f"taxi_{name}")
```

3. You have full control over how each asset can be named, as you can define how asset keys are created. In our case we only want to rename the dbt sources, but we can keep the asset keys of the models the same.

The object-oriented pattern of the `DagsterDbtTranslator` means that we can leverage the existing implementations of the parent class by using the `super` method. We’ll use this pattern to customize how the sources are defined but default to the original logic for deciding the model asset keys. Copy and paste the code below to complete the `get_asset_key` function:

```python
@classmethod
def get_asset_key(cls, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(f"taxi_{name}")
else:
return super().get_asset_key(dbt_resource_props)
```

You’ve successfully written your first translator!

{% callout %}
> 💡 **Important!** dbt models and Dagster asset keys must be unique. If you're receiving a `DuplicateKeyError` , add some logging to verify that the logic in `get_asset_key` doesn't return two of the same key for different values!
{% /callout %}

5. Now, update the definition that uses `@dbt_assets` to be configured with an instance of the `CustomizedDagsterDbtTranslator`. The `@dbt_assets` decorator has a `dagster_dbt_translator` argument that you can pass this instance into. **Don’t forget to instantiate the class!**

Your code should look something like this:

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

At this point, your `dbt.py` file should match the following:

```python
import os
from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator

from .constants import DBT_DIRECTORY
from ..resources import dbt_resource


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(f"taxi_{name}")
else:
return super().get_asset_key(dbt_resource_props)


dbt_resource.cli(["--quiet", "parse"]).wait()

if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(["--quiet", "parse"])
.wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = DBT_DIRECTORY.joinpath("target", "manifest.json")


@dbt_assets(
manifest=dbt_manifest_path, dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
---
title: 'Lesson 5: Creating assets that depend on dbt models'
module: 'dagster_dbt'
lesson: '5'
---

# Creating assets that depend on dbt models

At this point, you’ve loaded your dbt models as Dagster assets and linked the dependencies between the dbt assets and their source Dagster assets. However, a dbt model is typically not the last asset in a pipeline. For example, you might want to:

- Generate a chart,
- Update a dashboard, or
- Send data to Salesforce

In this section, you’ll learn how to do this by defining a new Dagster asset that depends on a dbt model. We’ll make some metrics in a dbt model and then use Python to generate a chart with that data.

If you’re familiar with New York City, you might know that there are three major airports - JFK, LGA, and EWR - in different parts of the metropolitan area. Hypothetically, you’re curious how their final destination impacts the airport they fly into. For example, how many people staying in Queens flew into LGA?

---

## Creating the dbt model

To answer these questions, let’s define a new dbt model that builds a series of metrics from the staging models you wrote earlier.

In the `analytics/models` directory:

1. Create a new directory called `marts`.
2. In the `marts` directory, create a new file called `location_metrics.sql`.
3. Copy and paste the following into `location_metrics.sql`:

```sql
with
trips as (
select *
from {{ ref('stg_trips') }}
),
zones as (
select *
from {{ ref('stg_zones') }}
),
trips_by_zone as (
select
pickup_zones.zone_name as zone,
dropoff_zones.borough as destination_borough,
pickup_zones.is_airport as from_airport,
count(*) as trips,
sum(trips.trip_distance) as total_distance,
sum(trips.duration) as total_duration,
sum(trips.total_amount) as fare,
sum(case when duration > 30 then 1 else 0 end) as trips_over_30_min
from trips
left join zones as pickup_zones on trips.pickup_zone_id = pickup_zones.zone_id
left join zones as dropoff_zones on trips.dropoff_zone_id = dropoff_zones.zone_id
group by all
)
select *
from trips_by_zone
```

4. In the Dagster UI, reload the code location.
5. Observe and materialize the new `location_metrics` dbt asset:

![The new location_metrics dbt asset in the Dagster UI](/images/dagster-dbt/lesson-5/new-location-metrics-asset.png)

---

## Creating the Dagster asset

Next, we’ll create an asset that uses some of the columns in the `location_metrics` model to chart the number of taxi trips that happen per major NYC airport and the borough they come from.

### Adding a new constant

Let's start by adding a new string constant to reference when building the new asset. This will make it easier for us to reference the correct location of the chart in the asset.
In the `assets/constants.py` file, add the following to the end of the file:
```python
AIRPORT_TRIPS_FILE_PATH = Path(__file__).joinpath("..", "..", "outputs", "airport_trips.png").resolve()
```
### Creating the airport_trips asset
Now we’re ready to create the asset!
1. Open the `assets/metrics.py` file.
2. At the end of the file, define a new asset called `airport_trips` with the context argument and the existing `DuckDBResource` named `database`:
```python
def airport_trips(context, database: DuckDBResource):
```
3. Add the asset decorator to the `airport_trips` function and specify the `location_metrics` model as a dependency:
```python
@asset(
deps=["location_metrics"],
)
def airport_trips(context, database: DuckDBResource):
```
**Note:** Because Dagster doesn’t discriminate and treats all dbt models as assets, you’ll add this dependency just like you would with any other asset.
4. Fill in the body of the function with the following code to follow a similar pattern to your project’s existing pipelines: query for the data, use a library to generate a chart, save the chart as a file, and embed the chart:
```python
@asset(
deps=["location_metrics"],
)
def airport_trips(context, database: DuckDBResource):
"""
A chart of where trips from the airport go
"""
query = """
select
zone,
destination_borough,
trips
from location_metrics
where from_airport
"""
with database.get_connection() as conn:
airport_trips = conn.execute(query).fetch_df()
fig = px.bar(
airport_trips,
x="zone",
y="trips",
color="destination_borough",
barmode="relative",
labels={
"zone": "Zone",
"trips": "Number of Trips",
"destination_borough": "Destination Borough"
},
)
pio.write_image(fig, constants.AIRPORT_TRIPS_FILE_PATH)
with open(constants.AIRPORT_TRIPS_FILE_PATH, 'rb') as file:
image_data = file.read()
# Convert the image data to base64
base64_data = base64.b64encode(image_data).decode('utf-8')
md_content = f"![Image](data:image/jpeg;base64,{base64_data})"
#TODO: Use `MaterializeResult` instead
context.add_output_metadata({
"preview": MetadataValue.md(md_content),
"data": MetadataValue.json(airport_trips.to_dict(orient="records"))
})
```
5. Reload your code location to see the new `airport_trips` asset within the `metrics` group. Notice how the asset graph links the dependency between the `location_metrics` dbt asset and the new `airport_trips` chart asset.
Loading

0 comments on commit e98b947

Please sign in to comment.