Skip to content

Commit

Permalink
Merge branch 'master' into erin/dev-55-lesson-6
Browse files Browse the repository at this point in the history
  • Loading branch information
erinkcochran87 committed Feb 27, 2024
2 parents b03f8ab + 822b4a4 commit 57e925f
Show file tree
Hide file tree
Showing 5 changed files with 389 additions and 0 deletions.
11 changes: 11 additions & 0 deletions docs/dagster-university/pages/dagster-dbt/lesson-5/1-overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
title: 'Lesson 5: Overview'
module: 'dagster_dbt'
lesson: '5'
---

# Overview

In Lesson 3, you loaded your dbt project's models as assets into Dagster. You also materialized some of those models.

In this lesson, we’ll integrate more dbt models with the rest of your Dagster project. You’ll use the existing Dagster assets as sources in your dbt project and learn how to further customize how Dagster maps your dbt project with the `DagsterDbtTranslator` class. To wrap things up, we’ll show you how to automate the running of the dbt models in Dagster.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
---
title: 'Lesson 5: Connecting dbt models to Dagster assets'
module: 'dagster_dbt'
lesson: '5'
---

# Connecting dbt models to Dagster assets

With where we left off, you may have noticed that the sources for your dbt projects are not just tables that exist in DuckDB, but also *assets* that Dagster created. However, the staging models that use those sources aren’t linked to the Dagster assets that produced them.

Let’s fix that by telling Dagster that the dbt sources are the tables that the `taxi_trips` and `taxi_zones` asset definitions produce. To match up these assets, we’ll override dbt’s asset key with the name `taxi_trips`. By having the asset keys line up, Dagster will know that these assets are the same and should merge them.

This is accomplished by changing the dbt source’s asset keys to be the same as the matching assets that Dagster makes. In this case, the dbt source’s default asset key is `raw_taxis/trips`, and the table that we’re making with Dagster has an asset key of `taxi_trips`.

To adjust how Dagster names the asset keys for your project’s dbt models, we’ll need to override the `dagster-dbt` integration’s default logic for how to interpret the dbt project. This mapping is contained in the `DagsterDbtTranslator` class.

---

## Customizing how Dagster understands dbt projects

The `DagsterDbtTranslator` class is the default mapping for how Dagster interprets and maps your dbt project. As Dagster loops through each of your dbt models, it will execute each of the translator’s functions and use the return value to configure your new Dagster asset.

However, you can override its methods by making a new class that inherits from and provides your logic for a dbt model. Refer to the `dagster-dbt` package’s [API Reference](https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DagsterDbtTranslator) for more info on the different functions you can override in the `DagsterDbtTranslator` class.

For now, we’ll customize how asset keys are defined by overriding the translator’s `get_asset_key` method.

Open the `assets/dbt.py` file and do the following:

1. Update the imports to include:
- From the `dagster_dbt` module, import `DagsterDbtTranslator`
- From the `dagster` module, import `AssetKey`

2. Create a new class called `CustomizedDagsterDbtTranslator` that inherits from the `DagsterDbtTranslator`. Add this code after the imports in `assets/dbt.py`:

```python
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
```

3. In this class, create a method called `get_asset_key.`

This is a method of `DagsterDbtTranslator` class that we'll override and customize to do as we need. It is a `@classmethod`, so we'll annotate it with the `@classmethod` decorator and have its first argument be `cls`, to follow [Pythonic conventions](https://builtin.com/software-engineering-perspectives/python-cls). The second argument refers to a dictionary/JSON object for the dbt model’s properties, which is based on the manifest file from earlier. Let’s call that second argument `dbt_resource_props`. The return value of this function is an object of the `AssetKey` class.

```python
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):

@classmethod
def get_asset_key(cls, dbt_resource_props):
```

4. Now, let’s fill in the `get_asset_key` method with our own logic for defining asset keys.

1. There are two properties that we’ll want from `dbt_resource_props`: the `type` (ex., model or source) and the `name`, such as `trips` or `stg_trips`. Access both of those properties from the `dbt_resource_props` argument and store them in their own respective variables (`type` and `name`):

```python
@classmethod
def get_asset_key(cls, dbt_resource_props):
type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
```

2. As mentioned above, the asset keys of our existing Dagster assets used by our dbt project are named `taxi_trips` and `taxi_zones`. If you were to print out the `name`, you’d see that the dbt sources are named `trips` and `zones`. Therefore, to match our asset keys up, we can prefix our keys with the string `taxi_` .

Copy and paste the following code to return an `AssetKey` of `AssetKey(f"taxi_{name}")`:

```python
@classmethod
def get_asset_key(cls, dbt_resource_props):
type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]

return AssetKey(f"taxi_{name}")
```

3. You have full control over how each asset can be named, as you can define how asset keys are created. In our case we only want to rename the dbt sources, but we can keep the asset keys of the models the same.

The object-oriented pattern of the `DagsterDbtTranslator` means that we can leverage the existing implementations of the parent class by using the `super` method. We’ll use this pattern to customize how the sources are defined but default to the original logic for deciding the model asset keys. Copy and paste the code below to complete the `get_asset_key` function:

```python
@classmethod
def get_asset_key(cls, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(f"taxi_{name}")
else:
return super().get_asset_key(dbt_resource_props)
```

You’ve successfully written your first translator!

{% callout %}
> 💡 **Important!** dbt models and Dagster asset keys must be unique. If you're receiving a `DuplicateKeyError` , add some logging to verify that the logic in `get_asset_key` doesn't return two of the same key for different values!
{% /callout %}

5. Now, update the definition that uses `@dbt_assets` to be configured with an instance of the `CustomizedDagsterDbtTranslator`. The `@dbt_assets` decorator has a `dagster_dbt_translator` argument that you can pass this instance into. **Don’t forget to instantiate the class!**

Your code should look something like this:

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

At this point, your `dbt.py` file should match the following:

```python
import os
from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator

from .constants import DBT_DIRECTORY
from ..resources import dbt_resource


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(f"taxi_{name}")
else:
return super().get_asset_key(dbt_resource_props)


dbt_resource.cli(["--quiet", "parse"]).wait()

if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"):
dbt_manifest_path = (
dbt_resource.cli(["--quiet", "parse"])
.wait()
.target_path.joinpath("manifest.json")
)
else:
dbt_manifest_path = DBT_DIRECTORY.joinpath("target", "manifest.json")


@dbt_assets(
manifest=dbt_manifest_path, dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
---
title: 'Lesson 5: Creating assets that depend on dbt models'
module: 'dagster_dbt'
lesson: '5'
---

# Creating assets that depend on dbt models

At this point, you’ve loaded your dbt models as Dagster assets and linked the dependencies between the dbt assets and their source Dagster assets. However, a dbt model is typically not the last asset in a pipeline. For example, you might want to:

- Generate a chart,
- Update a dashboard, or
- Send data to Salesforce

In this section, you’ll learn how to do this by defining a new Dagster asset that depends on a dbt model. We’ll make some metrics in a dbt model and then use Python to generate a chart with that data.

If you’re familiar with New York City, you might know that there are three major airports - JFK, LGA, and EWR - in different parts of the metropolitan area. Hypothetically, you’re curious how their final destination impacts the airport they fly into. For example, how many people staying in Queens flew into LGA?

---

## Creating the dbt model

To answer these questions, let’s define a new dbt model that builds a series of metrics from the staging models you wrote earlier.

In the `analytics/models` directory:

1. Create a new directory called `marts`.
2. In the `marts` directory, create a new file called `location_metrics.sql`.
3. Copy and paste the following into `location_metrics.sql`:

```sql
with
trips as (
select *
from {{ ref('stg_trips') }}
),
zones as (
select *
from {{ ref('stg_zones') }}
),
trips_by_zone as (
select
pickup_zones.zone_name as zone,
dropoff_zones.borough as destination_borough,
pickup_zones.is_airport as from_airport,
count(*) as trips,
sum(trips.trip_distance) as total_distance,
sum(trips.duration) as total_duration,
sum(trips.total_amount) as fare,
sum(case when duration > 30 then 1 else 0 end) as trips_over_30_min
from trips
left join zones as pickup_zones on trips.pickup_zone_id = pickup_zones.zone_id
left join zones as dropoff_zones on trips.dropoff_zone_id = dropoff_zones.zone_id
group by all
)
select *
from trips_by_zone
```

4. In the Dagster UI, reload the code location.
5. Observe and materialize the new `location_metrics` dbt asset:

![The new location_metrics dbt asset in the Dagster UI](/images/dagster-dbt/lesson-5/new-location-metrics-asset.png)

---

## Creating the Dagster asset

Next, we’ll create an asset that uses some of the columns in the `location_metrics` model to chart the number of taxi trips that happen per major NYC airport and the borough they come from.

### Adding a new constant

Let's start by adding a new string constant to reference when building the new asset. This will make it easier for us to reference the correct location of the chart in the asset.
In the `assets/constants.py` file, add the following to the end of the file:
```python
AIRPORT_TRIPS_FILE_PATH = Path(__file__).joinpath("..", "..", "outputs", "airport_trips.png").resolve()
```
### Creating the airport_trips asset
Now we’re ready to create the asset!
1. Open the `assets/metrics.py` file.
2. At the end of the file, define a new asset called `airport_trips` with the context argument and the existing `DuckDBResource` named `database`:
```python
def airport_trips(context, database: DuckDBResource):
```
3. Add the asset decorator to the `airport_trips` function and specify the `location_metrics` model as a dependency:
```python
@asset(
deps=["location_metrics"],
)
def airport_trips(context, database: DuckDBResource):
```
**Note:** Because Dagster doesn’t discriminate and treats all dbt models as assets, you’ll add this dependency just like you would with any other asset.
4. Fill in the body of the function with the following code to follow a similar pattern to your project’s existing pipelines: query for the data, use a library to generate a chart, save the chart as a file, and embed the chart:
```python
@asset(
deps=["location_metrics"],
)
def airport_trips(context, database: DuckDBResource):
"""
A chart of where trips from the airport go
"""
query = """
select
zone,
destination_borough,
trips
from location_metrics
where from_airport
"""
with database.get_connection() as conn:
airport_trips = conn.execute(query).fetch_df()
fig = px.bar(
airport_trips,
x="zone",
y="trips",
color="destination_borough",
barmode="relative",
labels={
"zone": "Zone",
"trips": "Number of Trips",
"destination_borough": "Destination Borough"
},
)
pio.write_image(fig, constants.AIRPORT_TRIPS_FILE_PATH)
with open(constants.AIRPORT_TRIPS_FILE_PATH, 'rb') as file:
image_data = file.read()
# Convert the image data to base64
base64_data = base64.b64encode(image_data).decode('utf-8')
md_content = f"![Image](data:image/jpeg;base64,{base64_data})"
#TODO: Use `MaterializeResult` instead
context.add_output_metadata({
"preview": MetadataValue.md(md_content),
"data": MetadataValue.json(airport_trips.to_dict(orient="records"))
})
```
5. Reload your code location to see the new `airport_trips` asset within the `metrics` group. Notice how the asset graph links the dependency between the `location_metrics` dbt asset and the new `airport_trips` chart asset.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
title: 'Lesson 5: Automating dbt models in Dagster'
module: 'dagster_dbt'
lesson: '5'
---

# Automating dbt models in Dagster

Did you realize that your dbt models have already been scheduled to run on a regular basis because of an existing schedule within this Dagster project?

Check it out in the Dagster UI by clicking **Overview** in the top navigation bar, then the **Jobs** tab. Click `trip_update_job` to check out the job’s details. It looks like the dbt models are already attached to this job!

Pretty cool, right? Let’s check out the code that made this happen. Open the `dagster_university/jobs/__init__.py` and look at the definition for `trip_update_job`:

```python
trip_update_job = define_asset_job(
name="trip_update_job",
partitions_def=monthly_partition,
selection=AssetSelection.all() - trips_by_week - adhoc_request
)
```

The dbt models were included in this job because of the `AssetSelection.all()` call. This reinforces the idea that once you load your dbt project into your Dagster project, Dagster will recognize and treat all of your dbt models as assets.

---

## Excluding specific dbt models

Treating dbt models as assets is great, but one of the core tenets of Dagster’s dbt integration is respecting how dbt is used, along with meeting dbt users where they are. That’s why there are a few utility methods that should feel familiar to dbt users. Let’s use one of these methods to remove some of our dbt models from this job explicitly.

Pretend that you’re working with an analytics engineer, iterating on the `stg_trips` model and planning to add new models that depend on it soon. Therefore, you’d like to exclude `stg_trips` and any new hypothetical dbt models downstream of it until the pipeline stabilizes. The analytics engineer you’re working with is really strong with dbt, but not too familiar with Dagster.

This is where you’d lean on a function like [`build_dbt_asset_selection`](https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.build_dbt_asset_selection). This utility method will help your analytics engineer contribute without needing to know Dagster’s asset selection syntax. It takes two arguments:

- A list of `@dbt_assets` definitions to select models from
- A string of the selector using [dbt’s selection syntax](https://docs.getdbt.com/reference/node-selection/syntax) of the models you want to select

The function will return an `AssetSelection` of the dbt models that match your dbt selector. Let’s put this into practice:

1. At the top of `jobs/__init__.py`, import `dbt_analytics` from the `assets.dbt` module:

```python
from ..assets.dbt import dbt_analytics
```

2. After the other selections, define a new variable called `dbt_trips_selection` and make a call to `build_dbt_asset_selection`. Pass in the `dbt_analytics` definition and a string that selects `stg_trips` and all dbt models downstream of it:

```python
dbt_trips_selection = build_dbt_asset_selection([dbt_analytics], "stg_trips+")
```

3. Next, update the `selection` argument in the `trip_update_job` to subtract the `dbt_trips_selection`:

```python
trip_update_job = define_asset_job(
name="trip_update_job",
partitions_def=monthly_partition,
selection=AssetSelection.all() - trips_by_week - adhoc_request - dbt_trips_selection
)
```

4. Reload the code location and confirm that the dbt models are not in the `trip_update_job` anymore!

You might notice that the `airport_trips` asset is still scheduled to run with this job! That’s because the `build_dbt_asset_selection` function only selects *dbt models* and **not** Dagster assets.

If you want to also exclude the new `airport_trips` asset from this job, modify the `dbt_trips_selection` to include all *downstream assets*, too. Because we’re using Dagster’s native functionality to select all downstream assets, we can now drop the `+` from the dbt selector:

```python
dbt_trips_selection = build_dbt_asset_selection([dbt_analytics], "stg_trips").downstream()
```

Reload the code location and look at the `trip_update_job` once more to verify that everything looks right.

{% callout %}
> 💡 **Want an even more convenient utility to do this work for you?** Consider using the similar [`build_schedule_from_dbt_selection`](https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.build_schedule_from_dbt_selection) function to quickly create a job and schedule for a given dbt selection.
{% /callout %}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 57e925f

Please sign in to comment.