diff --git a/docs/dagster-university/pages/dagster-dbt.md b/docs/dagster-university/pages/dagster-dbt.md index 7b759e5ed7880..3bace6a25eff6 100644 --- a/docs/dagster-university/pages/dagster-dbt.md +++ b/docs/dagster-university/pages/dagster-dbt.md @@ -33,4 +33,9 @@ title: Dagster + dbt - [Overview](/dagster-dbt/lesson-5/1-overview) - [Connecting dbt models to Dagster assets](/dagster-dbt/lesson-5/2-connecting-dbt-models-to-dagster-assets) - [Creating assets that depend on dbt models](/dagster-dbt/lesson-5/3-creating-assets-that-depend-on-dbt-models) - - [Automating dbt models in Dagster](/dagster-dbt/lesson-5/4-automating-dbt-models-in-dagster) \ No newline at end of file + - [Automating dbt models in Dagster](/dagster-dbt/lesson-5/4-automating-dbt-models-in-dagster) +- Lesson 6: Using Dagster to partition dbt models + - [Overview](/dagster-dbt/lesson-6/1-overview) + - [Creating an incremental model](/dagster-dbt/lesson-6/2-creating-a-simple-incremental-model) + - [Creating a partitioned dbt asset](/dagster-dbt/lesson-6/3-creating-a-partitioned-dbt-asset) + - [Lesson recap](/dagster-dbt/lesson-6/4-lesson-recap) \ No newline at end of file diff --git a/docs/dagster-university/pages/dagster-dbt/lesson-6/1-overview.md b/docs/dagster-university/pages/dagster-dbt/lesson-6/1-overview.md new file mode 100644 index 0000000000000..d83e63cfd4190 --- /dev/null +++ b/docs/dagster-university/pages/dagster-dbt/lesson-6/1-overview.md @@ -0,0 +1,30 @@ +--- +title: 'Lesson 6: Overview' +module: 'dagster_dbt' +lesson: '6' +--- + +# Overview + +By default, dbt materializes tables by replacing the old table with the new one generated from the model. However, there are many situations where you’ll want to append new data to existing data without replacing it. dbt provides an `incremental` materialization pattern for this. Incremental materialization allows developers to define when the data was transformed and only transform data that hasn’t already been transformed. + +In some situations, a normal incremental model might be brittle. This might be because: + +1. **Incremental runs aren’t repeatable**. By this, we mean materializing an incremental model twice does two different things and those states cannot be easily replicated. One run might add 10 million rows, and immediately running it again might not add any rows at all. Incremental models are also difficult to roll back changes because the boundaries of what rows to insert are dynamic and not tracked by dbt. +2. **Re-running historical data requires you to re-run everything with a `full refresh`**. If you find out that the data from three incremental materializations ago used incorrect source data, and then the source data was corrected, you’ll have to rebuild your *entire* incremental model from scratch. This is problematic because incremental models are typically used for large and expensive tables. Therefore, doing a full rebuild is a long and costly process. + +All this said, incremental models aren’t *bad*. They could just be better. Incremental models would be better if they could be made predictably repeatable and didn’t require rebuilding an entire table if one portion was wrong. + +--- + +## Dagster partitions can help + +{% callout %} +> 💡 **Need a primer on partitions?** Check this [Dagster Short](https://www.youtube.com/watch?v=zfLBHFCbocE) on YouTube! +{% /callout %} + +Dagster partitions are predictable, repeatable, and don’t require a massive rebuild when one chunk of data is incorrect. A partition has discrete bounds, such as partitioning an asset by month or location, making it easy to understand what data will be created each time a partition is materialized. + +Partitions can also be run independently from each other. If only one partition went awry, then you can re-materialize just that single erroneous partition without remaking your entire asset. + +In this lesson, we’re going to write an incremental dbt model and make it easier to manage by partitioning it. \ No newline at end of file diff --git a/docs/dagster-university/pages/dagster-dbt/lesson-6/2-creating-a-simple-incremental-model.md b/docs/dagster-university/pages/dagster-dbt/lesson-6/2-creating-a-simple-incremental-model.md new file mode 100644 index 0000000000000..45308a0129097 --- /dev/null +++ b/docs/dagster-university/pages/dagster-dbt/lesson-6/2-creating-a-simple-incremental-model.md @@ -0,0 +1,58 @@ +--- +title: 'Lesson 6: Creating an incremental model' +module: 'dagster_dbt' +lesson: '6' +--- + +# Creating an incremental model + +As mentioned, partitions don’t *replace* incremental models, but you’ll soon see how you can expand the functionality of incremental models by partitioning them. In fact, we’ll first write an incremental dbt model and then show you how to use Dagster to partition it. + +This model will be a series of stats about all New York taxi trips. It would be expensive to compute this every day because of the granularity of the metrics and the fact that some of the measures are computationally expensive to calculate. Therefore, this model will be incremental. + +In your dbt project, create a new file called `daily_metrics.sql` in the `analytics/models/marts` directory. Copy and paste the following code into the file: + +```sql +{{ + config( + materialized='incremental', + unique_key='date_of_business' + ) +}} + +with + trips as ( + select * + from {{ ref('stg_trips') }} + ), + daily_summary as ( + select + date_trunc('day', pickup_datetime) as date_of_business, + count(*) as trip_count, + count(*) - lag(count(*), 1) over (order by date_trunc('day', pickup_datetime)) as trip_count_change, + sum(duration) as total_duration, + sum(duration) / count(*) as average_duration, + sum(total_amount) as total_amount, + sum(total_amount) / count(*) as average_amount, + sum(case when duration > 30 then 1 else 0 end) / count(*) as pct_over_30_min + from trips + group by all + ) +select * +from daily_summary +{% if is_incremental() %} + where date_of_business > (select max(date_of_business) from {{ this }}) +{% endif %} +``` + +This is a standard incremental model that we won’t spend a lot of time going over, but let’s cover the incremental logic here: + +```jsx +{% if is_incremental() %} + where date_of_business > (select max(date_of_business) from {{ this }}) +{% endif %} +``` + +This `where` clause is the most common way to define incremental logic. It’s also one of the biggest reasons for the model’s brittleness. + +What we’d like to do is partition this model and tell dbt to insert only the records that match that partition. For example, if we're running the Dagster partition range of `01/01/24` to`01/22/24`, then dbt should only select and insert records for that date range. Soon, we’ll update this clause to do exactly that! \ No newline at end of file diff --git a/docs/dagster-university/pages/dagster-dbt/lesson-6/3-creating-a-partitioned-dbt-asset.md b/docs/dagster-university/pages/dagster-dbt/lesson-6/3-creating-a-partitioned-dbt-asset.md new file mode 100644 index 0000000000000..7aec98b549d0b --- /dev/null +++ b/docs/dagster-university/pages/dagster-dbt/lesson-6/3-creating-a-partitioned-dbt-asset.md @@ -0,0 +1,177 @@ +--- +title: 'Lesson 6: Creating a partitioned dbt asset' +module: 'dagster_dbt' +lesson: '6' +--- + +# Creating a partitioned dbt asset + +We’ve built the foundation on the dbt side, and now we can make the appropriate changes on the Dagster side. We’ll refactor our existing Dagster code to tell dbt that the incremental models are partitioned and what data to fill in. + +We want to configure some of these models (the incremental ones) with partitions. In this section, we’ll show you a use case that has multiple `@dbt_assets` definitions. + +To partition an incremental dbt model, you’ll need first to partition your `@dbt_assets` definition. Then, when it runs, we’ll figure out what partition is running and tell dbt what the partition’s range is. Finally, we’ll modify our dbt model only to insert the records found in that range. + +--- + +## Defining a new daily partition + +Let’s start by defining a new daily partition for the model. + +In `dagster_university/partitions/init.py`, make the following changes: + +1. import `DailyPartitionsDefinition` from `dagster`, and +2. Define a new `daily_partition` like the following: + + ```python + from dagster import MonthlyPartitionsDefinition, WeeklyPartitionsDefinition, DailyPartitionsDefinition + + # ...existing partitions here + + daily_partition = DailyPartitionsDefinition( + start_date=start_date, + end_date=end_date + ) + ``` + +--- + +## Defining an incremental selector + +We have a few changes to make to our dbt setup to get things working. In `dagster_university/assets/dbt.py`: + +1. Add the following imports to the top of the file: + + ```python + from ..partitions import daily_partition + import json + ``` + + This imports the new `daily_partition` and the `json` standard module. We’ll use the `json` module to format how we tell dbt what partition to materialize. + +2. We now need a way to indicate that we’re selecting or excluding incremental models, so we’ll make a new constant in the `dbt.py` file called `INCREMENTAL_SELECTOR:` + + ```python + INCREMENTAL_SELECTOR = "config.materialized:incremental" + ``` + + This string follows dbt’s selection syntax to select all incremental models. In your own projects, you can customize this to select only the specific incremental models that you want to partition. + +--- + +## Creating a new @dbt_assets function + +Previously, we used the `@dbt_assets` decorator to say *“this function produces assets based on this dbt project”*. Now, we also want to say *“this function produces partitioned assets based on a selected set of models from this dbt project.”* We’ll write an additional `@dbt_assets` -decorated function to express this. + +1. In `dagster_university/assets/dbt.py`, define another `@dbt_assets` function below the original one. Name it `dbt_incremental_models` and have it use the same manifest that we’ve been using: + + ```python + @dbt_assets( + manifest=dbt_manifest_path, + dagster_dbt_translator=CustomizedDagsterDbtTranslator() + ) + def incremental_dbt_models( + context: AssetExecutionContext, + dbt: DbtCliResource + ): + yield from dbt.cli(["build"], context=context).stream() + ``` + +2. Next, add arguments to specify which models to select (`select`) and what partition (`partitions_def`) to use: + + ```python + @dbt_assets( + manifest=dbt_manifest_path, + dagster_dbt_translator=CustomizedDagsterDbtTranslator(), + select=INCREMENTAL_SELECTOR, # select only models with INCREMENTAL_SELECTOR + partitions_def=daily_partition # partition those models using daily_partition + ) + def incremental_dbt_models( + context: AssetExecutionContext, + dbt: DbtCliResource + ): + yield from dbt.cli(["build"], context=context).stream() + ``` + + This tells the function to only select models with `INCREMENTAL_SELECTOR` and to partition them using the `daily_partition.` + +--- + +## Partitioning the incremental_dbt_models function + +Now that the `@dbt_assets` definition has been created, it's time to fill in its body. We’ll start by using the `context` argument, which contains metadata about the Dagster run. + +One of these pieces of information is that we can fetch *the partition this execution is trying to materialize*! In our case, since it’s a time-based partition, we can get the *time window* of the partitions we’re materializing, such as `2023-03-04T00:00:00+00:00`to `2023-03-05T00:00:00+00:00`. + +First, add the following to the `@dbt_assets` function body, before the `yield`: + +```bash +time_window = context.partition_time_window +dbt_vars = { + "min_date": time_window.start.isoformat(), + "max_date": time_window.end.isoformat() +} +``` + +This fetches the time window and stores it as a variable (`time_window` ) so we can use it later. + +Now that we know *what* partitions we’re executing, the next step is to tell dbt the partition currently being materialized. To do that, we’ll take advantage of dbt’s `vars` argument to pass this information at runtime. +Because the `dbt.cli` function has the same capabilities as the `dbt` CLI, we can dynamically set the arguments we pass into it. To communicate this time window, we’ll pass in a `min_date` and `max_date` variable. Update the `yield` in the `@dbt_assets` definition to the following: + +```python +yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).stream() +``` + +--- + +## Updating the dbt_analytics function + +Now that you have a dedicated `@dbt_assets` definition for the incremental models, you’ll need to *exclude* these models from your original dbt execution. + +Modify the `dbt_analytics` definition to exclude the `INCREMENTAL_SELECTOR`: + +```python +@dbt_assets( + manifest=dbt_manifest_path, + dagster_dbt_translator=CustomizedDagsterDbtTranslator(), + exclude=INCREMENTAL_SELECTOR, # Add this here +) +def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() +``` + +At this point, the `dagster_university/assets/dbt.py` file should look like this: + +```python +TODO +``` + +--- + +## Updating the daily_metrics model + +Finally, we’ll modify the `daily_metrics.sql` file to reflect that dbt knows what partition range is being materialized. Since the partition range is passed in as variables at runtime, the dbt model can access them using the `var` dbt macro. + +In `analytics/models/marts/daily_metrics.sql`, update the model's incremental logic to the following: + +```python +{% if is_incremental() %} + where date_of_business >= strptime('{{ var('min_date') }}', '%c') and date_of_business < strptime('{{ var('max_date') }}', '%c') +{% endif %} +``` + +Here, we’ve changed the logic to say that we only want to select rows between the `min_date` and the `max_date`. Note that we are turning the variables into timestamps using `strptime` because they’re loaded as strings. + +--- + +## Running the pipeline + +That’s it! Now you can check out the new `daily_metrics` asset in Dagster. + +1. In the Dagster UI, reload the code location. Once loaded, you should see the new partitioned `daily_metrics` asset. +2. Click the `daily_metrics` asset and then the **Materialize selected** button. You’ll be prompted to select some partitions first. +3. Once the run starts, navigate to the run’s details page to check out the event logs. The executed dbt command should look something like this: + + ```bash + dbt build --vars {"min_date": "2023-03-04T00:00:00+00:00", "max_date": "2023-03-05T00:00:00+00:00"} --select config.materialized:incremental + ``` \ No newline at end of file diff --git a/docs/dagster-university/pages/dagster-dbt/lesson-6/4-lesson-recap.md b/docs/dagster-university/pages/dagster-dbt/lesson-6/4-lesson-recap.md new file mode 100644 index 0000000000000..0670be6c7dd4a --- /dev/null +++ b/docs/dagster-university/pages/dagster-dbt/lesson-6/4-lesson-recap.md @@ -0,0 +1,19 @@ +--- +title: 'Lesson 6: Lesson recap' +module: 'dagster_dbt' +lesson: '6' +--- + +# Lesson recap + +In this lesson, you: + +- Learned the benefits of partitioning your incremental models +- Added a time-based partition to an incremental model +- Created a second `@dbt_assets` definition specifically for incremental dbt models + +The patterns you used are general enough that they can also be applied to any type of partition, allowing you to partition your incremental models by location, customer, or other dimensions. Tinker around with the `context.partition_key` property if you’re interested! + +{% callout %} +> 💡 **Tip:** Did you know dbt models can resolve schema changes on their own? Using `on_schema_change: "sync_all_columns"`, you can avoid needing to fully refresh your dbt models and instead only orchestrate with Dagster backfills. +{% /callout %} \ No newline at end of file