-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[daggy-u] [dbt] - Add Lesson 5 (DEV-57) #19947
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
ad5288e
Lesson 5, part 1
erinkcochran87 9c45856
Lesson 5, part 2
erinkcochran87 0be0f23
Lesson 5, part 3
erinkcochran87 d87f4a7
Lesson 5, part 4
erinkcochran87 90d4238
Merge branch 'master' into erin/dev-57-lesson-5
erinkcochran87 9a7150c
Merge branch 'master' into erin/dev-57-lesson-5
erinkcochran87 72bdf13
Add full dbt.py
erinkcochran87 fcce3a2
Cleanup
erinkcochran87 0f30f22
Merge branch 'master' into erin/dev-57-lesson-5
erinkcochran87 d501548
Update category page
erinkcochran87 aa4f689
Update docs/dagster-university/pages/dagster-dbt/lesson-5/2-connectin…
erinkcochran87 ac5f886
Update docs/dagster-university/pages/dagster-dbt/lesson-5/2-connectin…
erinkcochran87 d3cd955
Update docs/dagster-university/pages/dagster-dbt/lesson-5/3-creating-…
erinkcochran87 cb52811
Update docs/dagster-university/pages/dagster-dbt/lesson-5/3-creating-…
erinkcochran87 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
docs/dagster-university/pages/dagster-dbt/lesson-5/1-overview.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
--- | ||
title: 'Lesson 5: Overview' | ||
module: 'dagster_dbt' | ||
lesson: '5' | ||
--- | ||
|
||
# Overview | ||
|
||
In Lesson 3, you loaded your dbt project's models as assets into Dagster. You also materialized some of those models. | ||
|
||
In this lesson, we’ll integrate more dbt models with the rest of your Dagster project. You’ll use the existing Dagster assets as sources in your dbt project and learn how to further customize how Dagster maps your dbt project with the `DagsterDbtTranslator` class. To wrap things up, we’ll show you how to automate the running of the dbt models in Dagster. |
147 changes: 147 additions & 0 deletions
147
...versity/pages/dagster-dbt/lesson-5/2-connecting-dbt-models-to-dagster-assets.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
--- | ||
title: 'Lesson 5: Connecting dbt models to Dagster assets' | ||
module: 'dagster_dbt' | ||
lesson: '5' | ||
--- | ||
|
||
# Connecting dbt models to Dagster assets | ||
|
||
With where we left off, you may have noticed that the sources for your dbt projects are not just tables that exist in DuckDB, but also *assets* that Dagster created. However, the staging models that use those sources aren’t linked to the Dagster assets that produced them. | ||
|
||
Let’s fix that by telling Dagster that the dbt sources are the tables that the `taxi_trips` and `taxi_zones` asset definitions produce. To match up these assets, we’ll override dbt’s asset key with the name `taxi_trips`. By having the asset keys line up, Dagster will know that these assets are the same and should merge them. | ||
|
||
This is accomplished by changing the dbt source’s asset keys to be the same as the matching assets that Dagster makes. In this case, the dbt source’s default asset key is `raw_taxis/trips`, and the table that we’re making with Dagster has an asset key of `taxi_trips`. | ||
|
||
To adjust how Dagster names the asset keys for your project’s dbt models, we’ll need to override the `dagster-dbt` integration’s default logic for how to interpret the dbt project. This mapping is contained in the `DagsterDbtTranslator` class. | ||
|
||
--- | ||
|
||
## Customizing how Dagster understands dbt projects | ||
|
||
The `DagsterDbtTranslator` class is the default mapping for how Dagster interprets and maps your dbt project. As Dagster loops through each of your dbt models, it will execute each of the translator’s functions and use the return value to configure your new Dagster asset. | ||
|
||
However, you can override its methods by making a new class that inherits from and provides your logic for a dbt model. Refer to the `dagster-dbt` package’s [API Reference](https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.DagsterDbtTranslator) for more info on the different functions you can override in the `DagsterDbtTranslator` class. | ||
|
||
For now, we’ll customize how asset keys are defined by overriding the translator’s `get_asset_key` method. | ||
|
||
Open the `assets/dbt.py` file and do the following: | ||
|
||
1. Update the imports to include: | ||
- From the `dagster_dbt` module, import `DagsterDbtTranslator` | ||
- From the `dagster` module, import `AssetKey` | ||
|
||
2. Create a new class called `CustomizedDagsterDbtTranslator` that inherits from the `DagsterDbtTranslator`. Add this code after the imports in `assets/dbt.py`: | ||
|
||
```python | ||
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator): | ||
``` | ||
|
||
3. In this class, create a method called `get_asset_key.` | ||
|
||
This is a method of `DagsterDbtTranslator` class that we'll override and customize to do as we need. It is a `@classmethod`, so we'll annotate it with the `@classmethod` decorator and have its first argument be `cls`, to follow [Pythonic conventions](https://builtin.com/software-engineering-perspectives/python-cls). The second argument refers to a dictionary/JSON object for the dbt model’s properties, which is based on the manifest file from earlier. Let’s call that second argument `dbt_resource_props`. The return value of this function is an object of the `AssetKey` class. | ||
|
||
```python | ||
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator): | ||
|
||
@classmethod | ||
def get_asset_key(cls, dbt_resource_props): | ||
``` | ||
|
||
4. Now, let’s fill in the `get_asset_key` method with our own logic for defining asset keys. | ||
|
||
1. There are two properties that we’ll want from `dbt_resource_props`: the `type` (ex., model or source) and the `name`, such as `trips` or `stg_trips`. Access both of those properties from the `dbt_resource_props` argument and store them in their own respective variables (`type` and `name`): | ||
|
||
```python | ||
@classmethod | ||
def get_asset_key(cls, dbt_resource_props): | ||
type = dbt_resource_props["resource_type"] | ||
name = dbt_resource_props["name"] | ||
``` | ||
|
||
2. As mentioned above, the asset keys of our existing Dagster assets used by our dbt project are named `taxi_trips` and `taxi_zones`. If you were to print out the `name`, you’d see that the dbt sources are named `trips` and `zones`. Therefore, to match our asset keys up, we can prefix our keys with the string `taxi_` . | ||
|
||
Copy and paste the following code to return an `AssetKey` of `AssetKey(f"taxi_{name}")`: | ||
|
||
```python | ||
@classmethod | ||
def get_asset_key(cls, dbt_resource_props): | ||
type = dbt_resource_props["resource_type"] | ||
name = dbt_resource_props["name"] | ||
|
||
return AssetKey(f"taxi_{name}") | ||
``` | ||
|
||
3. You have full control over how each asset can be named, as you can define how asset keys are created. In our case we only want to rename the dbt sources, but we can keep the asset keys of the models the same. | ||
|
||
The object-oriented pattern of the `DagsterDbtTranslator` means that we can leverage the existing implementations of the parent class by using the `super` method. We’ll use this pattern to customize how the sources are defined but default to the original logic for deciding the model asset keys. Copy and paste the code below to complete the `get_asset_key` function: | ||
|
||
```python | ||
@classmethod | ||
def get_asset_key(cls, dbt_resource_props): | ||
resource_type = dbt_resource_props["resource_type"] | ||
name = dbt_resource_props["name"] | ||
if resource_type == "source": | ||
return AssetKey(f"taxi_{name}") | ||
else: | ||
return super().get_asset_key(dbt_resource_props) | ||
``` | ||
|
||
You’ve successfully written your first translator! | ||
|
||
{% callout %} | ||
> 💡 **Important!** dbt models and Dagster asset keys must be unique. If you're receiving a `DuplicateKeyError` , add some logging to verify that the logic in `get_asset_key` doesn't return two of the same key for different values! | ||
{% /callout %} | ||
|
||
5. Now, update the definition that uses `@dbt_assets` to be configured with an instance of the `CustomizedDagsterDbtTranslator`. The `@dbt_assets` decorator has a `dagster_dbt_translator` argument that you can pass this instance into. **Don’t forget to instantiate the class!** | ||
|
||
Your code should look something like this: | ||
|
||
```python | ||
@dbt_assets( | ||
manifest=dbt_manifest_path, | ||
dagster_dbt_translator=CustomizedDagsterDbtTranslator() | ||
) | ||
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource): | ||
yield from dbt.cli(["build"], context=context).stream() | ||
``` | ||
|
||
At this point, your `dbt.py` file should match the following: | ||
|
||
```python | ||
import os | ||
from dagster import AssetExecutionContext, AssetKey | ||
from dagster_dbt import dbt_assets, DbtCliResource, DagsterDbtTranslator | ||
|
||
from .constants import DBT_DIRECTORY | ||
from ..resources import dbt_resource | ||
|
||
|
||
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator): | ||
@classmethod | ||
def get_asset_key(cls, dbt_resource_props): | ||
resource_type = dbt_resource_props["resource_type"] | ||
name = dbt_resource_props["name"] | ||
if resource_type == "source": | ||
return AssetKey(f"taxi_{name}") | ||
else: | ||
return super().get_asset_key(dbt_resource_props) | ||
|
||
|
||
dbt_resource.cli(["--quiet", "parse"]).wait() | ||
|
||
if os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD"): | ||
dbt_manifest_path = ( | ||
dbt_resource.cli(["--quiet", "parse"]) | ||
.wait() | ||
.target_path.joinpath("manifest.json") | ||
) | ||
else: | ||
dbt_manifest_path = DBT_DIRECTORY.joinpath("target", "manifest.json") | ||
|
||
|
||
@dbt_assets( | ||
manifest=dbt_manifest_path, dagster_dbt_translator=CustomizedDagsterDbtTranslator() | ||
) | ||
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource): | ||
yield from dbt.cli(["build"], context=context).stream() | ||
``` |
155 changes: 155 additions & 0 deletions
155
...rsity/pages/dagster-dbt/lesson-5/3-creating-assets-that-depend-on-dbt-models.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
--- | ||
title: 'Lesson 5: Creating assets that depend on dbt models' | ||
module: 'dagster_dbt' | ||
lesson: '5' | ||
--- | ||
|
||
# Creating assets that depend on dbt models | ||
|
||
At this point, you’ve loaded your dbt models as Dagster assets and linked the dependencies between the dbt assets and their source Dagster assets. However, a dbt model is typically not the last asset in a pipeline. For example, you might want to: | ||
|
||
- Generate a chart, | ||
- Update a dashboard, or | ||
- Send data to Salesforce | ||
|
||
In this section, you’ll learn how to do this by defining a new Dagster asset that depends on a dbt model. We’ll make some metrics in a dbt model and then use Python to generate a chart with that data. | ||
|
||
If you’re familiar with New York City, you might know that there are three major airports - JFK, LGA, and EWR - in different parts of the metropolitan area. Hypothetically, you’re curious how their final destination impacts the airport they fly into. For example, how many people staying in Queens flew into LGA? | ||
|
||
--- | ||
|
||
## Creating the dbt model | ||
|
||
To answer these questions, let’s define a new dbt model that builds a series of metrics from the staging models you wrote earlier. | ||
|
||
In the `analytics/models` directory: | ||
|
||
1. Create a new directory called `marts`. | ||
2. In the `marts` directory, create a new file called `location_metrics.sql`. | ||
3. Copy and paste the following into `location_metrics.sql`: | ||
|
||
```sql | ||
with | ||
trips as ( | ||
select * | ||
from {{ ref('stg_trips') }} | ||
), | ||
zones as ( | ||
select * | ||
from {{ ref('stg_zones') }} | ||
), | ||
trips_by_zone as ( | ||
select | ||
pickup_zones.zone_name as zone, | ||
dropoff_zones.borough as destination_borough, | ||
pickup_zones.is_airport as from_airport, | ||
count(*) as trips, | ||
sum(trips.trip_distance) as total_distance, | ||
sum(trips.duration) as total_duration, | ||
sum(trips.total_amount) as fare, | ||
sum(case when duration > 30 then 1 else 0 end) as trips_over_30_min | ||
from trips | ||
left join zones as pickup_zones on trips.pickup_zone_id = pickup_zones.zone_id | ||
left join zones as dropoff_zones on trips.dropoff_zone_id = dropoff_zones.zone_id | ||
group by all | ||
) | ||
select * | ||
from trips_by_zone | ||
``` | ||
|
||
4. In the Dagster UI, reload the code location. | ||
5. Observe and materialize the new `location_metrics` dbt asset: | ||
|
||
![The new location_metrics dbt asset in the Dagster UI](/images/dagster-dbt/lesson-5/new-location-metrics-asset.png) | ||
|
||
--- | ||
|
||
## Creating the Dagster asset | ||
|
||
Next, we’ll create an asset that uses some of the columns in the `location_metrics` model to chart the number of taxi trips that happen per major NYC airport and the borough they come from. | ||
|
||
### Adding a new constant | ||
|
||
Let's start by adding a new string constant to reference when building the new asset. This will make it easier for us to reference the correct location of the chart in the asset. | ||
|
||
In the `assets/constants.py` file, add the following to the end of the file: | ||
|
||
```python | ||
AIRPORT_TRIPS_FILE_PATH = Path(__file__).joinpath("..", "..", "outputs", "airport_trips.png").resolve() | ||
``` | ||
|
||
### Creating the airport_trips asset | ||
|
||
Now we’re ready to create the asset! | ||
|
||
1. Open the `assets/metrics.py` file. | ||
2. At the end of the file, define a new asset called `airport_trips` with the context argument and the existing `DuckDBResource` named `database`: | ||
|
||
```python | ||
def airport_trips(context, database: DuckDBResource): | ||
``` | ||
|
||
3. Add the asset decorator to the `airport_trips` function and specify the `location_metrics` model as a dependency: | ||
|
||
```python | ||
@asset( | ||
deps=["location_metrics"], | ||
) | ||
def airport_trips(context, database: DuckDBResource): | ||
``` | ||
|
||
**Note:** Because Dagster doesn’t discriminate and treats all dbt models as assets, you’ll add this dependency just like you would with any other asset. | ||
|
||
4. Fill in the body of the function with the following code to follow a similar pattern to your project’s existing pipelines: query for the data, use a library to generate a chart, save the chart as a file, and embed the chart: | ||
|
||
```python | ||
@asset( | ||
deps=["location_metrics"], | ||
) | ||
def airport_trips(context, database: DuckDBResource): | ||
""" | ||
A chart of where trips from the airport go | ||
""" | ||
|
||
query = """ | ||
select | ||
zone, | ||
destination_borough, | ||
trips | ||
from location_metrics | ||
where from_airport | ||
""" | ||
|
||
with database.get_connection() as conn: | ||
airport_trips = conn.execute(query).fetch_df() | ||
|
||
fig = px.bar( | ||
airport_trips, | ||
x="zone", | ||
y="trips", | ||
color="destination_borough", | ||
barmode="relative", | ||
labels={ | ||
"zone": "Zone", | ||
"trips": "Number of Trips", | ||
"destination_borough": "Destination Borough" | ||
}, | ||
) | ||
|
||
pio.write_image(fig, constants.AIRPORT_TRIPS_FILE_PATH) | ||
|
||
with open(constants.AIRPORT_TRIPS_FILE_PATH, 'rb') as file: | ||
image_data = file.read() | ||
|
||
# Convert the image data to base64 | ||
base64_data = base64.b64encode(image_data).decode('utf-8') | ||
md_content = f"![Image](data:image/jpeg;base64,{base64_data})" | ||
|
||
#TODO: Use `MaterializeResult` instead | ||
context.add_output_metadata({ | ||
erinkcochran87 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"preview": MetadataValue.md(md_content), | ||
"data": MetadataValue.json(airport_trips.to_dict(orient="records")) | ||
}) | ||
``` | ||
|
||
5. Reload your code location to see the new `airport_trips` asset within the `metrics` group. Notice how the asset graph links the dependency between the `location_metrics` dbt asset and the new `airport_trips` chart asset. |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need a reference to Lesson 4 here?