Skip to content

Commit

Permalink
[daggy u] Allow dbt/jinja template in code block
Browse files Browse the repository at this point in the history
  • Loading branch information
hellendag committed Feb 29, 2024
1 parent 015a47e commit 2ee0007
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 73 deletions.
8 changes: 7 additions & 1 deletion docs/dagster-university/markdoc/nodes/fence.markdoc.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {nodes} from '@markdoc/markdoc';
import {Tag, nodes} from '@markdoc/markdoc';
import {CodeBlock} from '../../components';

export const fence = {
Expand All @@ -9,4 +9,10 @@ export const fence = {
},
...nodes.fence.attributes,
},
transform(node, config) {
const attributes = node.transformAttributes(config);
const {language} = node.children[0].attributes;
const content = node.attributes.content;
return new Tag(this.render, {...attributes, language}, [content]);
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -20,88 +20,88 @@ Let’s start by defining a new daily partition for the model.

In `dagster_university/partitions/init.py`, make the following changes:

1. import `DailyPartitionsDefinition` from `dagster`, and
1. import `DailyPartitionsDefinition` from `dagster`, and
2. Define a new `daily_partition` like the following:
```python
from dagster import MonthlyPartitionsDefinition, WeeklyPartitionsDefinition, DailyPartitionsDefinition
# ...existing partitions here
daily_partition = DailyPartitionsDefinition(
start_date=start_date,
end_date=end_date
)
```

```python
from dagster import MonthlyPartitionsDefinition, WeeklyPartitionsDefinition, DailyPartitionsDefinition

# ...existing partitions here

daily_partition = DailyPartitionsDefinition(
start_date=start_date,
end_date=end_date
)
```

---

## Defining an incremental selector

We have a few changes to make to our dbt setup to get things working. In `dagster_university/assets/dbt.py`:
We have a few changes to make to our dbt setup to get things working. In `dagster_university/assets/dbt.py`:

1. Add the following imports to the top of the file:
```python
from ..partitions import daily_partition
import json
```
This imports the new `daily_partition` and the `json` standard module. We’ll use the `json` module to format how we tell dbt what partition to materialize.

```python
from ..partitions import daily_partition
import json
```

This imports the new `daily_partition` and the `json` standard module. We’ll use the `json` module to format how we tell dbt what partition to materialize.

2. We now need a way to indicate that we’re selecting or excluding incremental models, so we’ll make a new constant in the `dbt.py` file called `INCREMENTAL_SELECTOR:`
```python
INCREMENTAL_SELECTOR = "config.materialized:incremental"
```
This string follows dbt’s selection syntax to select all incremental models. In your own projects, you can customize this to select only the specific incremental models that you want to partition.

```python
INCREMENTAL_SELECTOR = "config.materialized:incremental"
```

This string follows dbt’s selection syntax to select all incremental models. In your own projects, you can customize this to select only the specific incremental models that you want to partition.

---

## Creating a new @dbt_assets function

Previously, we used the `@dbt_assets` decorator to say *“this function produces assets based on this dbt project”*. Now, we also want to say *“this function produces partitioned assets based on a selected set of models from this dbt project.”* We’ll write an additional `@dbt_assets` -decorated function to express this.
Previously, we used the `@dbt_assets` decorator to say _“this function produces assets based on this dbt project”_. Now, we also want to say _“this function produces partitioned assets based on a selected set of models from this dbt project.”_ We’ll write an additional `@dbt_assets` -decorated function to express this.

1. In `dagster_university/assets/dbt.py`, define another `@dbt_assets` function below the original one. Name it `dbt_incremental_models` and have it use the same manifest that we’ve been using:
```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def incremental_dbt_models(
context: AssetExecutionContext,
dbt: DbtCliResource
):
yield from dbt.cli(["build"], context=context).stream()
```

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator()
)
def incremental_dbt_models(
context: AssetExecutionContext,
dbt: DbtCliResource
):
yield from dbt.cli(["build"], context=context).stream()
```

2. Next, add arguments to specify which models to select (`select`) and what partition (`partitions_def`) to use:
```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
select=INCREMENTAL_SELECTOR, # select only models with INCREMENTAL_SELECTOR
partitions_def=daily_partition # partition those models using daily_partition
)
def incremental_dbt_models(
context: AssetExecutionContext,
dbt: DbtCliResource
):
yield from dbt.cli(["build"], context=context).stream()
```
This tells the function to only select models with `INCREMENTAL_SELECTOR` and to partition them using the `daily_partition.`

```python
@dbt_assets(
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
select=INCREMENTAL_SELECTOR, # select only models with INCREMENTAL_SELECTOR
partitions_def=daily_partition # partition those models using daily_partition
)
def incremental_dbt_models(
context: AssetExecutionContext,
dbt: DbtCliResource
):
yield from dbt.cli(["build"], context=context).stream()
```

This tells the function to only select models with `INCREMENTAL_SELECTOR` and to partition them using the `daily_partition.`

---

## Partitioning the incremental_dbt_models function

Now that the `@dbt_assets` definition has been created, it's time to fill in its body. We’ll start by using the `context` argument, which contains metadata about the Dagster run.
Now that the `@dbt_assets` definition has been created, it's time to fill in its body. We’ll start by using the `context` argument, which contains metadata about the Dagster run.

One of these pieces of information is that we can fetch *the partition this execution is trying to materialize*! In our case, since it’s a time-based partition, we can get the *time window* of the partitions we’re materializing, such as `2023-03-04T00:00:00+00:00`to `2023-03-05T00:00:00+00:00`.
One of these pieces of information is that we can fetch _the partition this execution is trying to materialize_! In our case, since it’s a time-based partition, we can get the _time window_ of the partitions we’re materializing, such as `2023-03-04T00:00:00+00:00`to `2023-03-05T00:00:00+00:00`.

First, add the following to the `@dbt_assets` function body, before the `yield`:

Expand All @@ -113,10 +113,10 @@ dbt_vars = {
}
```

This fetches the time window and stores it as a variable (`time_window` ) so we can use it later.
This fetches the time window and stores it as a variable (`time_window` ) so we can use it later.

Now that we know *what* partitions we’re executing, the next step is to tell dbt the partition currently being materialized. To do that, we’ll take advantage of dbt’s `vars` argument to pass this information at runtime.
Because the `dbt.cli` function has the same capabilities as the `dbt` CLI, we can dynamically set the arguments we pass into it. To communicate this time window, we’ll pass in a `min_date` and `max_date` variable. Update the `yield` in the `@dbt_assets` definition to the following:
Now that we know _what_ partitions we’re executing, the next step is to tell dbt the partition currently being materialized. To do that, we’ll take advantage of dbt’s `vars` argument to pass this information at runtime.
Because the `dbt.cli` function has the same capabilities as the `dbt` CLI, we can dynamically set the arguments we pass into it. To communicate this time window, we’ll pass in a `min_date` and `max_date` variable. Update the `yield` in the `@dbt_assets` definition to the following:

```python
yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).stream()
Expand All @@ -126,7 +126,7 @@ yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).s

## Updating the dbt_analytics function

Now that you have a dedicated `@dbt_assets` definition for the incremental models, you’ll need to *exclude* these models from your original dbt execution.
Now that you have a dedicated `@dbt_assets` definition for the incremental models, you’ll need to _exclude_ these models from your original dbt execution.

Modify the `dbt_analytics` definition to exclude the `INCREMENTAL_SELECTOR`:

Expand Down Expand Up @@ -154,11 +154,11 @@ Finally, we’ll modify the `daily_metrics.sql` file to reflect that dbt knows w

In `analytics/models/marts/daily_metrics.sql`, update the model's incremental logic to the following:

```sql
`{% if is_incremental() %}`
```python
where date_of_business >= strptime('{{ var('min_date') }}', '%c') and date_of_business < strptime('{{ var('max_date') }}', '%c')
```
`{% endif %}`
```

**TODO: AWARE THIS IS BROKEN - NEEDS A MARKDOC FIX**

Expand All @@ -168,12 +168,11 @@ Here, we’ve changed the logic to say that we only want to select rows between

## Running the pipeline

That’s it! Now you can check out the new `daily_metrics` asset in Dagster.
That’s it! Now you can check out the new `daily_metrics` asset in Dagster.

1. In the Dagster UI, reload the code location. Once loaded, you should see the new partitioned `daily_metrics` asset.
2. Click the `daily_metrics` asset and then the **Materialize selected** button. You’ll be prompted to select some partitions first.
1. In the Dagster UI, reload the code location. Once loaded, you should see the new partitioned `daily_metrics` asset.
2. Click the `daily_metrics` asset and then the **Materialize selected** button. You’ll be prompted to select some partitions first.
3. Once the run starts, navigate to the run’s details page to check out the event logs. The executed dbt command should look something like this:

```bash
dbt build --vars {"min_date": "2023-03-04T00:00:00+00:00", "max_date": "2023-03-05T00:00:00+00:00"} --select config.materialized:incremental
```
```
1 change: 1 addition & 0 deletions docs/dagster-university/styles/globals.css
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ main {
margin: 0 auto;
flex-grow: 1;
font-size: 16px;
padding-bottom: 32px;
}

.codeBlock code[class*='language-'],
Expand Down

0 comments on commit 2ee0007

Please sign in to comment.