-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[docs][pipes] - Modifying existing code page
- Loading branch information
Showing
20 changed files
with
366 additions
and
6 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
264 changes: 263 additions & 1 deletion
264
...ent/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,268 @@ | ||
--- | ||
title: "Modifying existing code to work with Dagster Pipes | Dagster Docs" | ||
description: "" | ||
description: "With Dagster Pipes, you can incorporate existing code into Dagster without huge refactors. This guide shows you how to modify existing code to work with Dagster Pipes." | ||
--- | ||
|
||
# Modifying existing code to work with Dagster Pipes | ||
|
||
In this guide, we’ll demonstrate how to modify existing code (e.g. an existing Python script) to work with [Dagster Pipes](/guides/dagster-pipes) and send metadata from the script to Dagster. | ||
|
||
<Note> | ||
|
||
This guide is specific to modifying existing code that is being orchestrated by Dagster. Looking for other uses of Dagster Pipes? Refer to the <a href="/guides/dagster-pipes#usage">Dagster Pipes Usage</a> documentation. | ||
|
||
</Note> | ||
|
||
## Prerequisites | ||
|
||
<Note> | ||
|
||
This guide assumes your code is being orchestrated by Dagster. If that’s not the case, please refer to <a href="integrating-subprocess-with-dagster-pipes">Integrating Dagster Pipes with Subprocess</a> for how to orchestrate your existing Python scripts using Dagster. | ||
|
||
</Note> | ||
|
||
To get started, you’ll need: | ||
|
||
- To install the dagster-pipes library: | ||
|
||
```bash | ||
pipe install dagster-pipes | ||
``` | ||
|
||
- **An existing Python script.** We’ll use the following Python script to demonstrate: | ||
|
||
```python file=/guides/dagster/dagster_pipes/subprocess/external_code_original.py | ||
import pandas as pd | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
print(f"processing total {total_orders} orders") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
``` | ||
|
||
--- | ||
|
||
## About Dagster | ||
|
||
### Materialize Dagster assets | ||
|
||
In Dagster, data assets - like a database table - are created during a process called _materialization._ During materialization, Dagster runs the function in the asset’s body to create the asset. In this case, the ‘function’ is your existing code. | ||
|
||
Dagster Pipes automatically makes some pieces of data about assets available, such as those about materialization or data quality checks. You can also send your own structured metadata and execution logs back to Dagster. We’ll come back to this in later sections. | ||
|
||
When your existing code is orchestrated by Dagster, you can run it from the Dagster UI. Navigate to the the corresponding asset page and click **Materialize** to run your code. Refer to [Viewing and materializing assets in the UI](/concepts/assets/software-defined-assets#viewing-and-materializing-assets-in-the-ui) for more info. | ||
|
||
<Image | ||
alt="Materialize Dagster assets" | ||
src="/images/guides/dagster-pipes/modifying-code-1-materialize-dagster-assets.png" | ||
width={1328} | ||
height={836} | ||
/> | ||
|
||
### Monitor code execution in Dagster UI | ||
|
||
To monitor the run, navigate to the [Run details page](/concepts/webserver/ui#run-details). This page includes timing, raw compute logs, and structured events. The bottom section shows filterable logs from the run, detailing events, their types, and specifics. There are two types of log: structured event and raw compute. | ||
|
||
#### Structured event logs | ||
|
||
Structured logs are enriched and categorized with metadata. For example, a label of what the log entry is about, links to an asset’s metadata, and the event type. This detailed structuring not only facilitates easier filtering and searching within the logs, but also populates the asset catalog and status information in the asset graph. We’ll come back to how you can log metadata from your existing code to Dagster in a little bit. | ||
|
||
<Image | ||
alt="Structured event logs" | ||
src="/images/guides/dagster-pipes/modifying-code-2-structured-event-logs.png" | ||
width={1524} | ||
height={844} | ||
/> | ||
|
||
#### Raw compute logs | ||
|
||
If your existing code produces stdout or stderr logs, like a `print` statement in our sample Python script, Dagster will display these in the UI's raw compute log view. To see the `stdout` log, toggle the log section to **stdout**. | ||
|
||
<Image | ||
alt="Raw compute logs" | ||
src="/images/guides/dagster-pipes/modifying-code-3-raw-compute-logs.png" | ||
width={1498} | ||
height={854} | ||
/> | ||
|
||
--- | ||
|
||
## Stream information back to Dagster | ||
|
||
### Make Dagster context available in existing code | ||
|
||
Getting existing code to work with Dagster Pipes requires adding a few lines of code: | ||
|
||
- Imports from `dagster-pipes` | ||
|
||
- A call that connects to Dagster Pipes: <PyObject module="dagster_pipes" object="open_dagster_pipes"/> initializes the Dagster Pipes context that can be used to stream information back to Dagster. It is recommended to call this function near the entry point of a pipes process. | ||
|
||
<Note> | ||
|
||
The with <code>with open_dagster_pipes() as context:</code> is a context manager in Python, ensuring resource setup and cleanup for a specific segment of code. It's useful for tasks requiring initial setup and final teardown, like opening and closing connections. In this case, the context manager is used to initialize and close the Dagster Pipes connection. | ||
|
||
</Note> | ||
|
||
- An instance of the Dagster Pipes context via <PyObject module="dagster_pipes" object="PipesContext" method="get"/>. | ||
|
||
In our sample Python script, the changes would look like the following: | ||
|
||
```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_context.py | ||
import pandas as pd | ||
from dagster_pipes import PipesContext, open_dagster_pipes | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
... | ||
|
||
|
||
if __name__ == "__main__": | ||
# connect to Dagster Pipes | ||
with open_dagster_pipes() as context: | ||
main() | ||
``` | ||
|
||
--- | ||
|
||
### Send log messages to Dagster | ||
|
||
Dagster Pipes context offers a built-in logging capability that enables you to stream log messages back to Dagster: | ||
|
||
```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_logs.py | ||
import pandas as pd | ||
from dagster_pipes import PipesContext, open_dagster_pipes | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
# send log messages back to Dagster | ||
context.log.info(f"processing total {total_orders} orders") | ||
|
||
|
||
if __name__ == "__main__": | ||
# connect to Dagster Pipes | ||
with open_dagster_pipes() as context: | ||
main() | ||
``` | ||
|
||
- <PyObject module="dagster_pipes" object="open_dagster_pipes" /> constructs a context | ||
manager that establishes a connection with Dagster Pipes. It returns a <PyObject | ||
module="dagster_pipes" | ||
object="PipesContext" | ||
/> object. | ||
- Inside the context, `context.log.info(...)` is a method call that sends a log message back to Dagster. | ||
|
||
Then, the log messages will show up in the Dagster UI. You can filter the log levels to only view `info` level messages: | ||
|
||
1. Click the **Levels** filter next to the log filter field. This will present a dropdown of all log levels. | ||
2. Select the **info** checkbox and deselect the others. This will show only the logs marked as **info** level. | ||
|
||
<Image | ||
alt="Send log messages to Dagster" | ||
src="/images/guides/dagster-pipes/modifying-code-4-log-messages.png" | ||
width={1326} | ||
height={742} | ||
/> | ||
|
||
### Send structured metadata to Dagster | ||
|
||
Sometimes, you may want to log information from your external code to be structured metadata in Dagster. Dagster Pipes context also comes the ability to log structured metadata back to Dagster. | ||
|
||
#### Report asset materialization | ||
|
||
Similar to [reporting materialization metadata within the Dagster process](/concepts/assets/software-defined-assets#reporting-materialization-metadata), you can also report asset materialization back to Dagster from the external process. | ||
|
||
In this example, we’re passing a piece of metadata named `total_orders` to the `metadata` parameter of the <PyObject module="dagster_pipes" object="PipesContext" method="report_asset_materialization" />. This payload will be sent from the external process back to Dagster: | ||
|
||
```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_events.py startafter=start_events_marker endbefore=end_events_marker | ||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
# send structured metadata back to Dagster | ||
context.report_asset_materialization(metadata={"total_orders": total_orders}) | ||
``` | ||
|
||
Then, total_orders will show up in the UI as structured metadata: | ||
|
||
<Image | ||
alt="Report asset materialization to Dagster" | ||
src="/images/guides/dagster-pipes/modifying-code-5-1-report-asset-materialization.png" | ||
width={1412} | ||
height={806} | ||
/> | ||
|
||
This metadata will also be displayed on the Events tab of the Asset Details page in the UI: | ||
|
||
<Image | ||
alt="View materialization events in asset details page" | ||
src="/images/guides/dagster-pipes/modifying-code-5-2-materialization-asset-details.png" | ||
width={1286} | ||
height={736} | ||
/> | ||
|
||
#### Report asset checks | ||
|
||
Dagster allows you to define and execute data quality checks on assets. Refer to the [Asset Checks](/concepts/assets/asset-checks) documentation for more information. | ||
|
||
If your asset has data quality checks defined, you can report to Dagster that an asset check has been performed via <PyObject module="dagster_pipes" object="PipesContext" method="report_asset_check" />: | ||
|
||
```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_events.py startafter=start_checks_marker endbefore=end_checks_marker dedent=4 | ||
# report data quality check result back to Dagster | ||
context.report_asset_check( | ||
asset_key="my_asset", | ||
passed=orders_df[["item_id"]].notnull().all().bool(), | ||
check_name="no_empty_order_check", | ||
) | ||
``` | ||
|
||
When Dagster executes the code, you’ll see an asset check event with the check result in the UI: | ||
|
||
<Image | ||
alt="Report asset checks to Dagster" | ||
src="/images/guides/dagster-pipes/modifying-code-5-3-report-asset-checks.png" | ||
width={1312} | ||
height={734} | ||
/> | ||
|
||
This check result will also be displayed on the Checks tab of the Asset Details page in the UI: | ||
|
||
<Image | ||
alt="View checks in asset details page" | ||
src="/images/guides/dagster-pipes/modifying-code-5-4-checks-asset-details.png" | ||
width={1302} | ||
height={730} | ||
/> | ||
|
||
--- | ||
|
||
## Troubleshooting | ||
|
||
### When working with multiple assets | ||
|
||
When working with multi_asset, `PipesContext.report_asset_materialization` may only be called once per unique asset key. If called more than once, an error similar to the following will surface: | ||
|
||
```bash | ||
Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it | ||
``` | ||
|
||
Instead, you’ll need to set the `asset_key` parameter for each instance of <PyObject module="dagster_pipes" object="PipesContext" method="report_asset_materialization" />: | ||
|
||
```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_multi_assets.py startafter=start_events_marker endbefore=end_events_marker | ||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
# send structured metadata back to Dagster | ||
context.report_asset_materialization( | ||
asset_key="orders", metadata={"total_orders": total_orders} | ||
) | ||
``` |
Binary file added
BIN
+151 KB
...lic/images/guides/dagster-pipes/modifying-code-1-materialize-dagster-assets.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+335 KB
...t/public/images/guides/dagster-pipes/modifying-code-2-structured-event-logs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+206 KB
docs/next/public/images/guides/dagster-pipes/modifying-code-3-raw-compute-logs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+162 KB
docs/next/public/images/guides/dagster-pipes/modifying-code-4-log-messages.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+260 KB
...images/guides/dagster-pipes/modifying-code-5-1-report-asset-materialization.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+153 KB
...mages/guides/dagster-pipes/modifying-code-5-2-materialization-asset-details.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+234 KB
...t/public/images/guides/dagster-pipes/modifying-code-5-3-report-asset-checks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added
BIN
+128 KB
.../public/images/guides/dagster-pipes/modifying-code-5-4-checks-asset-details.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
11 changes: 11 additions & 0 deletions
11
..._snippets/docs_snippets/guides/dagster/dagster_pipes/subprocess/external_code_original.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
import pandas as pd | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
print(f"processing total {total_orders} orders") | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
16 changes: 16 additions & 0 deletions
16
...ppets/docs_snippets/guides/dagster/dagster_pipes/subprocess/external_code_with_context.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import pandas as pd | ||
from dagster_pipes import PipesContext, open_dagster_pipes | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
... | ||
|
||
|
||
if __name__ == "__main__": | ||
# connect to Dagster Pipes | ||
with open_dagster_pipes() as context: | ||
main() |
30 changes: 30 additions & 0 deletions
30
...ippets/docs_snippets/guides/dagster/dagster_pipes/subprocess/external_code_with_events.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import pandas as pd | ||
from dagster_pipes import PipesContext, open_dagster_pipes | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
# start_events_marker | ||
|
||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
# send structured metadata back to Dagster | ||
context.report_asset_materialization(metadata={"total_orders": total_orders}) | ||
|
||
# end_events_marker | ||
|
||
# start_checks_marker | ||
# report data quality check result back to Dagster | ||
context.report_asset_check( | ||
asset_key="my_asset", | ||
passed=orders_df[["item_id"]].notnull().all().bool(), | ||
check_name="no_empty_order_check", | ||
) | ||
# end_checks_marker | ||
|
||
|
||
if __name__ == "__main__": | ||
# connect to Dagster Pipes | ||
with open_dagster_pipes() as context: | ||
main() |
17 changes: 17 additions & 0 deletions
17
...snippets/docs_snippets/guides/dagster/dagster_pipes/subprocess/external_code_with_logs.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import pandas as pd | ||
from dagster_pipes import PipesContext, open_dagster_pipes | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
# send log messages back to Dagster | ||
context.log.info(f"processing total {total_orders} orders") | ||
|
||
|
||
if __name__ == "__main__": | ||
# connect to Dagster Pipes | ||
with open_dagster_pipes() as context: | ||
main() |
23 changes: 23 additions & 0 deletions
23
.../docs_snippets/guides/dagster/dagster_pipes/subprocess/external_code_with_multi_assets.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import pandas as pd | ||
from dagster_pipes import PipesContext, open_dagster_pipes | ||
|
||
|
||
def main(): | ||
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) | ||
total_orders = len(orders_df) | ||
# start_events_marker | ||
|
||
# get the Dagster Pipes context | ||
context = PipesContext.get() | ||
# send structured metadata back to Dagster | ||
context.report_asset_materialization( | ||
asset_key="orders", metadata={"total_orders": total_orders} | ||
) | ||
|
||
# end_events_marker | ||
|
||
|
||
if __name__ == "__main__": | ||
# connect to Dagster Pipes | ||
with open_dagster_pipes() as context: | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters