Skip to content

Commit

Permalink
[daggy-u] [dbt] - Add Lesson 6 (DEV-56) (#19955)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR adds Lesson 6 of the new dbt module to Dagster University.

TODOS:

- [ ] Add screenshots
- [ ] Verify final code snippets; move to import component

## How I Tested These Changes

---------

Co-authored-by: Tim Castillo <[email protected]>
  • Loading branch information
erinkcochran87 and tacastillo authored Feb 27, 2024
1 parent 822b4a4 commit 2707d7c
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 1 deletion.
7 changes: 6 additions & 1 deletion docs/dagster-university/pages/dagster-dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ title: Dagster + dbt
- [Overview](/dagster-dbt/lesson-5/1-overview)
- [Connecting dbt models to Dagster assets](/dagster-dbt/lesson-5/2-connecting-dbt-models-to-dagster-assets)
- [Creating assets that depend on dbt models](/dagster-dbt/lesson-5/3-creating-assets-that-depend-on-dbt-models)
- [Automating dbt models in Dagster](/dagster-dbt/lesson-5/4-automating-dbt-models-in-dagster)
- [Automating dbt models in Dagster](/dagster-dbt/lesson-5/4-automating-dbt-models-in-dagster)
- Lesson 6: Using Dagster to partition dbt models
- [Overview](/dagster-dbt/lesson-6/1-overview)
- [Creating an incremental model](/dagster-dbt/lesson-6/2-creating-a-simple-incremental-model)
- [Creating a partitioned dbt asset](/dagster-dbt/lesson-6/3-creating-a-partitioned-dbt-asset)
- [Lesson recap](/dagster-dbt/lesson-6/4-lesson-recap)
30 changes: 30 additions & 0 deletions docs/dagster-university/pages/dagster-dbt/lesson-6/1-overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: 'Lesson 6: Overview'
module: 'dagster_dbt'
lesson: '6'
---

# Overview

By default, dbt materializes tables by replacing the old table with the new one generated from the model. However, there are many situations where you’ll want to append new data to existing data without replacing it. dbt provides an `incremental` materialization pattern for this. Incremental materialization allows developers to define when the data was transformed and only transform data that hasn’t already been transformed.

In some situations, a normal incremental model might be brittle. This might be because:

1. **Incremental runs aren’t repeatable**. By this, we mean materializing an incremental model twice does two different things and those states cannot be easily replicated. One run might add 10 million rows, and immediately running it again might not add any rows at all. Incremental models are also difficult to roll back changes because the boundaries of what rows to insert are dynamic and not tracked by dbt.
2. **Re-running historical data requires you to re-run everything with a `full refresh`**. If you find out that the data from three incremental materializations ago used incorrect source data, and then the source data was corrected, you’ll have to rebuild your *entire* incremental model from scratch. This is problematic because incremental models are typically used for large and expensive tables. Therefore, doing a full rebuild is a long and costly process.

All this said, incremental models aren’t *bad*. They could just be better. Incremental models would be better if they could be made predictably repeatable and didn’t require rebuilding an entire table if one portion was wrong.

---

## Dagster partitions can help

{% callout %}
> 💡 **Need a primer on partitions?** Check this [Dagster Short](https://www.youtube.com/watch?v=zfLBHFCbocE) on YouTube!
{% /callout %}

Dagster partitions are predictable, repeatable, and don’t require a massive rebuild when one chunk of data is incorrect. A partition has discrete bounds, such as partitioning an asset by month or location, making it easy to understand what data will be created each time a partition is materialized.

Partitions can also be run independently from each other. If only one partition went awry, then you can re-materialize just that single erroneous partition without remaking your entire asset.

In this lesson, we’re going to write an incremental dbt model and make it easier to manage by partitioning it.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
---
title: 'Lesson 6: Creating an incremental model'
module: 'dagster_dbt'
lesson: '6'
---

# Creating an incremental model

As mentioned, partitions don’t *replace* incremental models, but you’ll soon see how you can expand the functionality of incremental models by partitioning them. In fact, we’ll first write an incremental dbt model and then show you how to use Dagster to partition it.

This model will be a series of stats about all New York taxi trips. It would be expensive to compute this every day because of the granularity of the metrics and the fact that some of the measures are computationally expensive to calculate. Therefore, this model will be incremental.

In your dbt project, create a new file called `daily_metrics.sql` in the `analytics/models/marts` directory. Copy and paste the following code into the file:

```sql
{{
config(
materialized='incremental',
unique_key='date_of_business'
)
}}

with
trips as (
select *
from {{ ref('stg_trips') }}
),
daily_summary as (
select
date_trunc('day', pickup_datetime) as date_of_business,
count(*) as trip_count,
count(*) - lag(count(*), 1) over (order by date_trunc('day', pickup_datetime)) as trip_count_change,
sum(duration) as total_duration,
sum(duration) / count(*) as average_duration,
sum(total_amount) as total_amount,
sum(total_amount) / count(*) as average_amount,
sum(case when duration > 30 then 1 else 0 end) / count(*) as pct_over_30_min
from trips
group by all
)
select *
from daily_summary
{% if is_incremental() %}
where date_of_business > (select max(date_of_business) from {{ this }})
{% endif %}
```

This is a standard incremental model that we won’t spend a lot of time going over, but let’s cover the incremental logic here:

```jsx
{% if is_incremental() %}
where date_of_business > (select max(date_of_business) from {{ this }})
{% endif %}
```

This `where` clause is the most common way to define incremental logic. It’s also one of the biggest reasons for the model’s brittleness.

What we’d like to do is partition this model and tell dbt to insert only the records that match that partition. For example, if we're running the Dagster partition range of `01/01/24` to`01/22/24`, then dbt should only select and insert records for that date range. Soon, we’ll update this clause to do exactly that!
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
---
title: 'Lesson 6: Creating a partitioned dbt asset'
module: 'dagster_dbt'
lesson: '6'
---

# Creating a partitioned dbt asset

We’ve built the foundation on the dbt side, and now we can make the appropriate changes on the Dagster side. We’ll refactor our existing Dagster code to tell dbt that the incremental models are partitioned and what data to fill in.

We want to configure some of these models (the incremental ones) with partitions. In this section, we’ll show you a use case that has multiple `@dbt_assets` definitions.

To partition an incremental dbt model, you’ll need first to partition your `@dbt_assets` definition. Then, when it runs, we’ll figure out what partition is running and tell dbt what the partition’s range is. Finally, we’ll modify our dbt model only to insert the records found in that range.

---

## Defining a new daily partition

Let’s start by defining a new daily partition for the model.

In `dagster_university/partitions/init.py`, make the following changes:

1. import `DailyPartitionsDefinition` from `dagster`, and
2. Define a new `daily_partition` like the following:

```python
from dagster import MonthlyPartitionsDefinition, WeeklyPartitionsDefinition, DailyPartitionsDefinition

# ...existing partitions here

daily_partition = DailyPartitionsDefinition(
start_date=start_date,
end_date=end_date
)
```

---

## Defining an incremental selector

We have a few changes to make to our dbt setup to get things working. In `dagster_university/assets/dbt.py`:

1. Add the following imports to the top of the file:

```python
from ..partitions import daily_partition
import json
```

This imports the new `daily_partition` and the `json` standard module. We’ll use the `json` module to format how we tell dbt what partition to materialize.

2. We now need a way to indicate that we’re selecting or excluding incremental models, so we’ll make a new constant in the `dbt.py` file called `INCREMENTAL_SELECTOR:`

```python
INCREMENTAL_SELECTOR = "config.materialized:incremental"
```

This string follows dbt’s selection syntax to select all incremental models. In your own projects, you can customize this to select only the specific incremental models that you want to partition.

---

## Creating a new @dbt_assets function

Previously, we used the `@dbt_assets` decorator to say *“this function produces assets based on this dbt project”*. Now, we also want to say *“this function produces partitioned assets based on a selected set of models from this dbt project.”* We’ll write an additional `@dbt_assets` -decorated function to express this.

1. In `dagster_university/assets/dbt.py`, define another `@dbt_assets` function below the original one. Name it `dbt_incremental_models` and have it use the same manifest that we’ve been using:

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def incremental_dbt_models(
context: AssetExecutionContext,
dbt: DbtCliResource
):
yield from dbt.cli(["build"], context=context).stream()
```

2. Next, add arguments to specify which models to select (`select`) and what partition (`partitions_def`) to use:

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
select=INCREMENTAL_SELECTOR, # select only models with INCREMENTAL_SELECTOR
partitions_def=daily_partition # partition those models using daily_partition
)
def incremental_dbt_models(
context: AssetExecutionContext,
dbt: DbtCliResource
):
yield from dbt.cli(["build"], context=context).stream()
```

This tells the function to only select models with `INCREMENTAL_SELECTOR` and to partition them using the `daily_partition.`

---

## Partitioning the incremental_dbt_models function

Now that the `@dbt_assets` definition has been created, it's time to fill in its body. We’ll start by using the `context` argument, which contains metadata about the Dagster run.

One of these pieces of information is that we can fetch *the partition this execution is trying to materialize*! In our case, since it’s a time-based partition, we can get the *time window* of the partitions we’re materializing, such as `2023-03-04T00:00:00+00:00`to `2023-03-05T00:00:00+00:00`.

First, add the following to the `@dbt_assets` function body, before the `yield`:

```bash
time_window = context.partition_time_window
dbt_vars = {
"min_date": time_window.start.isoformat(),
"max_date": time_window.end.isoformat()
}
```

This fetches the time window and stores it as a variable (`time_window` ) so we can use it later.

Now that we know *what* partitions we’re executing, the next step is to tell dbt the partition currently being materialized. To do that, we’ll take advantage of dbt’s `vars` argument to pass this information at runtime.
Because the `dbt.cli` function has the same capabilities as the `dbt` CLI, we can dynamically set the arguments we pass into it. To communicate this time window, we’ll pass in a `min_date` and `max_date` variable. Update the `yield` in the `@dbt_assets` definition to the following:

```python
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).stream()
```

---

## Updating the dbt_analytics function

Now that you have a dedicated `@dbt_assets` definition for the incremental models, you’ll need to *exclude* these models from your original dbt execution.

Modify the `dbt_analytics` definition to exclude the `INCREMENTAL_SELECTOR`:

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
exclude=INCREMENTAL_SELECTOR, # Add this here
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
```

At this point, the `dagster_university/assets/dbt.py` file should look like this:

```python
TODO
```

---

## Updating the daily_metrics model

Finally, we’ll modify the `daily_metrics.sql` file to reflect that dbt knows what partition range is being materialized. Since the partition range is passed in as variables at runtime, the dbt model can access them using the `var` dbt macro.

In `analytics/models/marts/daily_metrics.sql`, update the model's incremental logic to the following:

```python
{% if is_incremental() %}
where date_of_business >= strptime('{{ var('min_date') }}', '%c') and date_of_business < strptime('{{ var('max_date') }}', '%c')
{% endif %}
```

Here, we’ve changed the logic to say that we only want to select rows between the `min_date` and the `max_date`. Note that we are turning the variables into timestamps using `strptime` because they’re loaded as strings.

---

## Running the pipeline

That’s it! Now you can check out the new `daily_metrics` asset in Dagster.

1. In the Dagster UI, reload the code location. Once loaded, you should see the new partitioned `daily_metrics` asset.
2. Click the `daily_metrics` asset and then the **Materialize selected** button. You’ll be prompted to select some partitions first.
3. Once the run starts, navigate to the run’s details page to check out the event logs. The executed dbt command should look something like this:

```bash
dbt build --vars {"min_date": "2023-03-04T00:00:00+00:00", "max_date": "2023-03-05T00:00:00+00:00"} --select config.materialized:incremental
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
title: 'Lesson 6: Lesson recap'
module: 'dagster_dbt'
lesson: '6'
---

# Lesson recap

In this lesson, you:

- Learned the benefits of partitioning your incremental models
- Added a time-based partition to an incremental model
- Created a second `@dbt_assets` definition specifically for incremental dbt models

The patterns you used are general enough that they can also be applied to any type of partition, allowing you to partition your incremental models by location, customer, or other dimensions. Tinker around with the `context.partition_key` property if you’re interested!

{% callout %}
> 💡 **Tip:** Did you know dbt models can resolve schema changes on their own? Using `on_schema_change: "sync_all_columns"`, you can avoid needing to fully refresh your dbt models and instead only orchestrate with Dagster backfills.
{% /callout %}

1 comment on commit 2707d7c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-university ready!

✅ Preview
https://dagster-university-6oczt78zm-elementl.vercel.app

Built with commit 2707d7c.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.