Skip to content

Commit

Permalink
[docs][pipes] - Modifying existing code page
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhan committed Oct 9, 2023
1 parent 56a9006 commit 9420e15
Show file tree
Hide file tree
Showing 20 changed files with 366 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/content/concepts/assets/software-defined-assets.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ with defs.get_asset_value_loader() as loader:
## Examples

- [Multi-component asset keys](#multi-component-asset-keys)
- [Recording materialization metadata](#recording-materialization-metadata)
- [Reporting materialization metadata](#reporting-materialization-metadata)
- [Attaching definition metadata](#attaching-definition-metadata)

### Multi-component asset keys
Expand All @@ -613,7 +613,7 @@ def downstream_asset(upstream_asset):
return upstream_asset + [4]
```

### Recording materialization metadata
### Reporting materialization metadata

Dagster supports attaching arbitrary [metadata](/\_apidocs/ops#dagster.MetadataValue) to asset materializations. This metadata will be displayed on the "Activity" tab of the "Asset Details" page in the UI. If it's numeric, it will be plotted. To attach metadata, your asset's op can return an <PyObject object="Output" /> object that contains the output value and a dictionary of metadata:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,268 @@
---
title: "Modifying existing code to work with Dagster Pipes | Dagster Docs"
description: ""
description: "With Dagster Pipes, you can incorporate existing code into Dagster without huge refactors. This guide shows you how to modify existing code to work with Dagster Pipes."
---

# Modifying existing code to work with Dagster Pipes

In this guide, we’ll demonstrate how to modify existing code (e.g. an existing Python script) to work with [Dagster Pipes](/guides/dagster-pipes) and send metadata from the script to Dagster.

<Note>

This guide is specific to modifying existing code that is being orchestrated by Dagster. Looking for other uses of Dagster Pipes? Refer to the <a href="/guides/dagster-pipes#usage">Dagster Pipes Usage</a> documentation.

</Note>

## Prerequisites

<Note>

This guide assumes your code is being orchestrated by Dagster. If that’s not the case, please refer to <a href="integrating-subprocess-with-dagster-pipes">Integrating Dagster Pipes with Subprocess</a> for how to orchestrate your existing Python scripts using Dagster.

</Note>

To get started, you’ll need:

- To install the dagster-pipes library:

```bash
pipe install dagster-pipes
```

- **An existing Python script.** We’ll use the following Python script to demonstrate:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_original.py
import pandas as pd


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
print(f"processing total {total_orders} orders")


if __name__ == "__main__":
main()
```

---

## About Dagster

### Materialize Dagster assets

In Dagster, data assets - like a database table - are created during a process called _materialization._ During materialization, Dagster runs the function in the asset’s body to create the asset. In this case, the ‘function’ is your existing code.

Dagster Pipes automatically makes some pieces of data about assets available, such as those about materialization or data quality checks. You can also send your own structured metadata and execution logs back to Dagster. We’ll come back to this in later sections.

When your existing code is orchestrated by Dagster, you can run it from the Dagster UI. Navigate to the the corresponding asset page and click **Materialize** to run your code. Refer to [Viewing and materializing assets in the UI](/concepts/assets/software-defined-assets#viewing-and-materializing-assets-in-the-ui) for more info.

<Image
alt="Materialize Dagster assets"
src="/images/guides/dagster-pipes/modifying-code-1-materialize-dagster-assets.png"
width={1328}
height={836}
/>

### Monitor code execution in Dagster UI

To monitor the run, navigate to the [Run details page](/concepts/webserver/ui#run-details). This page includes timing, raw compute logs, and structured events. The bottom section shows filterable logs from the run, detailing events, their types, and specifics. There are two types of log: structured event and raw compute.

#### Structured event logs

Structured logs are enriched and categorized with metadata. For example, a label of what the log entry is about, links to an asset’s metadata, and the event type. This detailed structuring not only facilitates easier filtering and searching within the logs, but also populates the asset catalog and status information in the asset graph. We’ll come back to how you can log metadata from your existing code to Dagster in a little bit.

<Image
alt="Structured event logs"
src="/images/guides/dagster-pipes/modifying-code-2-structured-event-logs.png"
width={1524}
height={844}
/>

#### Raw compute logs

If your existing code produces stdout or stderr logs, like a `print` statement in our sample Python script, Dagster will display these in the UI's raw compute log view. To see the `stdout` log, toggle the log section to **stdout**.

<Image
alt="Raw compute logs"
src="/images/guides/dagster-pipes/modifying-code-3-raw-compute-logs.png"
width={1498}
height={854}
/>

---

## Stream information back to Dagster

### Make Dagster context available in existing code

Getting existing code to work with Dagster Pipes requires adding a few lines of code:

- Imports from `dagster-pipes`

- A call that connects to Dagster Pipes: <PyObject module="dagster_pipes" object="open_dagster_pipes"/> initializes the Dagster Pipes context that can be used to stream information back to Dagster. It is recommended to call this function near the entry point of a pipes process.

<Note>

The with <code>with open_dagster_pipes() as context:</code> is a context manager in Python, ensuring resource setup and cleanup for a specific segment of code. It's useful for tasks requiring initial setup and final teardown, like opening and closing connections. In this case, the context manager is used to initialize and close the Dagster Pipes connection.

</Note>

- An instance of the Dagster Pipes context via <PyObject module="dagster_pipes" object="PipesContext" method="get"/>.

In our sample Python script, the changes would look like the following:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_context.py
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
...


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
```

---

### Send log messages to Dagster

Dagster Pipes context offers a built-in logging capability that enables you to stream log messages back to Dagster:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_logs.py
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# send log messages back to Dagster
context.log.info(f"processing total {total_orders} orders")


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
```

- <PyObject module="dagster_pipes" object="open_dagster_pipes" /> constructs a context
manager that establishes a connection with Dagster Pipes. It returns a <PyObject
module="dagster_pipes"
object="PipesContext"
/> object.
- Inside the context, `context.log.info(...)` is a method call that sends a log message back to Dagster.

Then, the log messages will show up in the Dagster UI. You can filter the log levels to only view `info` level messages:

1. Click the **Levels** filter next to the log filter field. This will present a dropdown of all log levels.
2. Select the **info** checkbox and deselect the others. This will show only the logs marked as **info** level.

<Image
alt="Send log messages to Dagster"
src="/images/guides/dagster-pipes/modifying-code-4-log-messages.png"
width={1326}
height={742}
/>

### Send structured metadata to Dagster

Sometimes, you may want to log information from your external code to be structured metadata in Dagster. Dagster Pipes context also comes the ability to log structured metadata back to Dagster.

#### Report asset materialization

Similar to [reporting materialization metadata within the Dagster process](/concepts/assets/software-defined-assets#reporting-materialization-metadata), you can also report asset materialization back to Dagster from the external process.

In this example, we’re passing a piece of metadata named `total_orders` to the `metadata` parameter of the <PyObject module="dagster_pipes" object="PipesContext" method="report_asset_materialization" />. This payload will be sent from the external process back to Dagster:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_events.py startafter=start_events_marker endbefore=end_events_marker
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})
```

Then, total_orders will show up in the UI as structured metadata:

<Image
alt="Report asset materialization to Dagster"
src="/images/guides/dagster-pipes/modifying-code-5-1-report-asset-materialization.png"
width={1412}
height={806}
/>

This metadata will also be displayed on the Events tab of the Asset Details page in the UI:

<Image
alt="View materialization events in asset details page"
src="/images/guides/dagster-pipes/modifying-code-5-2-materialization-asset-details.png"
width={1286}
height={736}
/>

#### Report asset checks

Dagster allows you to define and execute data quality checks on assets. Refer to the [Asset Checks](/concepts/assets/asset-checks) documentation for more information.

If your asset has data quality checks defined, you can report to Dagster that an asset check has been performed via <PyObject module="dagster_pipes" object="PipesContext" method="report_asset_check" />:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_events.py startafter=start_checks_marker endbefore=end_checks_marker dedent=4
# report data quality check result back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)
```

When Dagster executes the code, you’ll see an asset check event with the check result in the UI:

<Image
alt="Report asset checks to Dagster"
src="/images/guides/dagster-pipes/modifying-code-5-3-report-asset-checks.png"
width={1312}
height={734}
/>

This check result will also be displayed on the Checks tab of the Asset Details page in the UI:

<Image
alt="View checks in asset details page"
src="/images/guides/dagster-pipes/modifying-code-5-4-checks-asset-details.png"
width={1302}
height={730}
/>

---

## Troubleshooting

### When working with multiple assets

When working with multi_asset, `PipesContext.report_asset_materialization` may only be called once per unique asset key. If called more than once, an error similar to the following will surface:

```bash
Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it
```

Instead, you’ll need to set the `asset_key` parameter for each instance of <PyObject module="dagster_pipes" object="PipesContext" method="report_asset_materialization" />:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_with_multi_assets.py startafter=start_events_marker endbefore=end_events_marker
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(
asset_key="orders", metadata={"total_orders": total_orders}
)
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pandas as pd


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
print(f"processing total {total_orders} orders")


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
...


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# start_events_marker

# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})

# end_events_marker

# start_checks_marker
# report data quality check result back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)
# end_checks_marker


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# send log messages back to Dagster
context.log.info(f"processing total {total_orders} orders")


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# start_events_marker

# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(
asset_key="orders", metadata={"total_orders": total_orders}
)

# end_events_marker


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
1 change: 1 addition & 0 deletions examples/docs_snippets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"dagster-gcp",
"dagster-graphql",
"dagster-k8s",
"dagster_pipes",
"dagster-postgres",
"dagster-slack",
"dagster-gcp-pandas",
Expand Down

0 comments on commit 9420e15

Please sign in to comment.