Skip to content

Commit

Permalink
[daggy-u] update usage of context.add_output_metadata with `Materia…
Browse files Browse the repository at this point in the history
…lizeResult`
  • Loading branch information
cmpadden committed Feb 2, 2024
1 parent 059e4d7 commit e26ae9e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,27 @@ In Lesson 9, you created the `adhoc_request` asset. During materialization, the

```python
import base64
from dagster import MaterializeResult
```

4. Because you need to use the `context` object, you’ll need to add it to the asset function’s argument as the first argument:

```python
@asset
def adhoc_request(context, config: AdhocRequestConfig, taxi_zones, taxi_trips, database: DuckDBResource):
```

5. After the last line in the asset, add the following code:
4. After the last line in the asset, add the following code:

```python
with open(file_path, 'rb') as file:
image_data = file.read()
```

6. Next, we’ll use base64 encoding to convert the chart to Markdown. After the `image_data` line, add the following code:
5. Next, we’ll use base64 encoding to convert the chart to Markdown. After the `image_data` line, add the following code:

```python
base64_data = base64.b64encode(image_data).decode('utf-8')
md_content = f"![Image](data:image/jpeg;base64,{base64_data})"
md_content = f"![Image](data:image/jpeg;base64,{base64_data})"

context.add_output_metadata({
"preview": MetadataValue.md(md_content)
})
return MaterializeResult(
metadata={
"preview": MetadataValue.md(md_content)
}
)
```

Let’s break down what’s happening here:
Expand All @@ -126,13 +122,13 @@ In Lesson 9, you created the `adhoc_request` asset. During materialization, the
2. `base64.b64encode` encodes the image’s binary data (`image_data`) into base64 format.
3. Next, the encoded image data is converted to a UTF-8 encoded string using the `decode` function.
4. Next, a variable named `md_content` is created. The value of this variable is a Markdown-formatted string containing a JPEG image, where the base64 representation of the image is inserted.
5. Using `context.add_output_metadata`, the image is passed in as metadata. The metadata will have a `preview` label in the Dagster UI.
5. We are able to include the metadata on the asset by returning a `MaterializeResult` instance with the image is passed in as metadata. The metadata will have a `preview` label in the Dagster UI.
6. Using `MetadataValue.md`, the `md_content` is typed as Markdown. This ensures Dagster will correctly render the chart.

At this point, the code for the `adhoc_request` asset should look like this:

```python
from dagster import Config, asset, MetadataValue, get_dagster_logger
from dagster import Config, asset, MaterializeResult, MetadataValue, get_dagster_logger
from dagster_duckdb import DuckDBResource

import plotly.express as px
Expand All @@ -148,7 +144,7 @@ class AdhocRequestConfig(Config):
end_date: str

@asset
def adhoc_request(context**,** config: AdhocRequestConfig, taxi_zones, taxi_trips, database: DuckDBResource):
def adhoc_request(config: AdhocRequestConfig, taxi_zones, taxi_trips, database: DuckDBResource):
"""
The response to an request made in the `requests` directory.
See `requests/README.md` for more information.
Expand Down Expand Up @@ -210,9 +206,11 @@ def adhoc_request(context**,** config: AdhocRequestConfig, taxi_zones, taxi_trip
base64_data = base64.b64encode(image_data).decode('utf-8')
md_content = f"![Image](data:image/jpeg;base64,{base64_data})"

context.add_output_metadata({
"preview": MetadataValue.md(md_content)
})
return MaterializeResult(
metadata={
"preview": MetadataValue.md(md_content)
}
)
```

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The metadata you built should look similar to the code contained in the **View a
@asset(
group_name="raw_files",
)
def taxi_zones_file(context):
def taxi_zones_file():
"""
The raw CSV file for the taxi zones dataset. Sourced from the NYC Open Data portal.
"""
Expand All @@ -29,5 +29,10 @@ def taxi_zones_file(context):
with open(constants.TAXI_ZONES_FILE_PATH, "wb") as output_file:
output_file.write(raw_taxi_zones.content)
num_rows = MetadataValue.int(len(pd.read_csv(constants.TAXI_ZONES_FILE_PATH)))
context.add_output_metadata({'Number of records': num_rows})

return MaterializeResult(
metadata={
'Number of records': MetadataValue.int(num_rows)
}
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ Now that we’ve covered definition metadata, let’s dive into the other type o

To add metadata to an asset, you need to do two things:

- Use the `context.add_output_metadata` function to pass in the data
- Use the `MetadataValue` utility class to wrap the data, ensuring it displays correctly in the UI
- Return a `MaterializeResult` instance with the `metadata` parameter from your asset

Let’s add metadata to the `taxi_trips_file` asset to demonstrate further. This will add the count of records to the asset’s materialization metadata.

Expand All @@ -34,19 +34,19 @@ Let’s add metadata to the `taxi_trips_file` asset to demonstrate further. This
group_name="raw_files",
)
def taxi_trips_file(context):
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""

partition_date_str = context.asset_partition_key_for_output()
month_to_fetch = partition_date_str[:-3]
partition_date_str = context.asset_partition_key_for_output()
month_to_fetch = partition_date_str[:-3]

raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)
raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)

with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as output_file:
output_file.write(raw_trips.content)
with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as output_file:
output_file.write(raw_trips.content)
```

3. First, we need to calculate the number of records contained in the file. Copy and paste the following after the last line in the asset:
Expand All @@ -55,15 +55,19 @@ Let’s add metadata to the `taxi_trips_file` asset to demonstrate further. This
num_rows = len(pd.read_parquet(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch)))
```

4. Next, we’ll pass in and type the data:
4. Next, we’ll add the metadata with the specified type:

```python
context.add_output_metadata({'Number of records':MetadataValue.int(num_rows)})
return MaterializeResult(
metadata={
'Number of records': MetadataValue.int(num_rows)
}
)
```

Let’s break down what’s happening here:

- `context.add_output_metadata` accepts a `dict`, where the key is the label or name of the metadata being passed and the value is the data itself. In this case, the key is `Number of records`. The value in this example is everything after `Number of records`.
- The `metadata` parameter accepts a `dict`, where the key is the label or name of the metadata and the value is the data itself. In this case, the key is `Number of records`. The value in this example is everything after `Number of records`.
- Using `MetadataValue.int`, the value of the `num_rows` variable is typed as an integer. This tells Dagster to render the data as an integer.

At this point, the asset should look like this:
Expand All @@ -73,25 +77,31 @@ Let’s add metadata to the `taxi_trips_file` asset to demonstrate further. This
from dagster import asset, MetadataValue

@asset(
partitions_def=monthly_partition,
group_name="raw_files",
partitions_def=monthly_partition,
group_name="raw_files",
)
def taxi_trips_file(context):
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""
"""
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""

partition_date_str = context.asset_partition_key_for_output()
month_to_fetch = partition_date_str[:-3]

raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)

partition_date_str = context.asset_partition_key_for_output()
month_to_fetch = partition_date_str[:-3]
with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as output_file:
output_file.write(raw_trips.content)

raw_trips = requests.get(
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)
num_rows = len(pd.read_parquet(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch)))

with open(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch), "wb") as output_file:
output_file.write(raw_trips.content)
num_rows = len(pd.read_parquet(constants.TAXI_TRIPS_TEMPLATE_FILE_PATH.format(month_to_fetch)))
context.add_output_metadata({'Number of records':MetadataValue.int(num_rows)})
return MaterializeResult(
metadata={
'Number of records': MetadataValue.int(num_rows)
}
)
```

---
Expand Down

0 comments on commit e26ae9e

Please sign in to comment.