Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs][pipes] - Subprocess page #17096

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,244 @@
---
title: "Integrating Subprocess with Dagster Pipes | Dagster Docs"
description: ""
description: "Learn how to use Dagster Pipes's built-in subprocess implementation to invoke a subprocess with the given command and environment."
---

# Integrating Subprocess with Dagster Pipes

In this guide, we’ll show you how to use [Dagster Pipes](/guides/dagster-pipes) with Dagster’s built-in subprocess implementation to run a subprocess with the given command and environment. You can then send information such as structured metadata and logging back to Dagster from the subprocess, where it will be visible in the Dagster UI.

<Note>

This guide focuses on using an out-of-the-box `PipesSubprocessClient` resource. For further customization with the subprocess invocation, use <PyObject module="dagster_pipes" object="open_dagster_pipes"/> approach instead. Refer to [Customizing Dagster Pipes protocols](/guides/dagster-pipes/customizing-dagster-pipes-protocols) for more info.

</Note>

---

## Prerequisites

To use Dagster Pipes to run a subprocess , you’ll need:

- To have Dagster (`dagster`) and the Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](/getting-started/install) for more info.

---

## Step 1: Create a Dagster asset that executes a subprocess

In this step, you’ll create a Dagster asset that, when materialized, opens a Dagster pipes session and invokes subprocess that executes some external code.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads awkwardly as "when materialized" could be interpreted as "when the external script has completed"

"asset that, in its execution function" could be good language.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we refer to pipes process elsewhere, pipes session; what is the noun?


### Step 1.1: Define a Dagster asset that executes a subprocess

1. Create a new file named `dagster_subprocess_pipes.py` in your Dagster project.
2. First, you’ll define the asset. Copy and paste the following into the file:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes.py startafter=start_asset_marker endbefore=end_asset_marker lines=-16
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()
```

Here’s what we did in this example:

- Created an asset named `subprocess_asset`
- Provided <PyObject object="AssetExecutionContext" /> as the `context` argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section.
- Specified a resource for the asset to use, `PipesSubprocessClient`. We’ll also come back to this in a little bit.
- Declared a command list `cmd` to run the external script. In the list:
- First, found the path to the Python executable on the system using `shutil.which("python")`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would note that this is fairly contrived and that this becomes more compelling if the external script is either 1) invoked using a different python virtual env or 2) is a different programming language.

- Then, provided the file path to the file that we want to execute. In this case, it’s a file called `external_code.py` in the same directory as `dagster_subprocess_pipes.py`. Refer to [Modifying existing code to work with Dagster Pipes](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes) for what the file would look like.

3. Then, invoke the subprocess from the asset using the `pipes_subprocess_client` resource:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes.py startafter=start_asset_marker endbefore=end_asset_marker
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()
yuhan marked this conversation as resolved.
Show resolved Hide resolved
```

Let’s take a look at what this code does:

- The `PipesSubprocessClient` resource used by the asset contains a `run` method.
- When the asset is executed, this method will synchronously execute the subprocess with in a pipes session, and it will return a `PipesClientCompletedInvocation` object.
- This object contains a `get_results` method, which you can use to access the results reported by the subprocess.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is results here?

I think it'd also be worth mentioning why PipesSubprocessClient exists instead of just opening a subprocess manually.

- Lastly, return the results of the subprocess.

### Step 1.2: Define a Definitions object

To make the asset and subprocess resource loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster Cloud, you’ll create a `Definitions` object which contains them.

Copy and paste the following to the bottom of `dagster_subprocess_pipes.py`:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions

defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
```

At this point, `dagster_subprocess_pipes.py` should look like the following:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes_finished.py
import shutil

from dagster import (
AssetExecutionContext,
Definitions,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()


defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
```

---

## Step 2: Modify the external code and run it from the Dagster UI

The next step is to modify the external code to work with Dagster Pipes and execute the code from Dagster. Refer to [Modifying existing code to work with Dagster Pipes](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes) for how to:

- [Make Dagster context available in existing code](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes#make-dagster-context-available-in-existing-code)
- [Stream log messages from subprocess back to Dagster](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes#send-log-messages-to-dagster)
- [Stream structured metadata from subprocess back to Dagster](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes#send-structured-metadata-to-dagster)

---

## Step 3: Run the subprocess from the Dagster UI

After completing Step 2, you should have created a standalone Python file called `external_code.py` which looks like the following:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_finished.py
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})
# report data quality check result back to Dagster
context.report_asset_check(
asset_key="my_asset",
yuhan marked this conversation as resolved.
Show resolved Hide resolved
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
yuhan marked this conversation as resolved.
Show resolved Hide resolved
main()
```

In this step, you’ll execute the subprocess you created in Step 1.2 from the Dagster UI.

1. In a new command line session, run the following to start the UI:

```bash
dagster dev
```

2. Navigate to localhost:3000, where you should see the UI:

<Image
alt="Asset in the UI"
src="/images/guides/dagster-pipes/subprocess-1-asset.png"
width={1264}
height={714}
/>

3. Click **Materialize** located in the top right to run your code:

<Image
alt="Materialize asset"
src="/images/guides/dagster-pipes/subprocess-2-materialize.png"
width={1326}
height={744}
/>

4. Navigate to the [Run details](/concepts/webserver/ui#run-details) page, where you should see the structured asset metadata and asset check result:

<Image
alt="Events in the run details page"
src="/images/guides/dagster-pipes/subprocess-3-events.png"
width={1272}
height={718}
/>

---

## Examples

### Specify environment variables and extras to a subprocess

The `run` method to the `PipesSubprocessClient` resource also accepts `env` and `extras` , which allow you to specify environment variables and extra arguments when executing the subprocess:
yuhan marked this conversation as resolved.
Show resolved Hide resolved

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_env_extras.py
import os
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
extras={"foo": "bar"},
yuhan marked this conversation as resolved.
Show resolved Hide resolved
env={
"MY_ENV_VAR_IN_SUBPROCESS": os.environ["MY_ENV_VAR"],
},
).get_results()
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
extras={"foo": "bar"},
env={
"MY_ENV_VAR_IN_SUBPROCESS": os.environ["MY_ENV_VAR"],
},
).get_results()
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# start_asset_marker
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()


# end_asset_marker

# start_definitions_marker

from dagster import Definitions

defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
# end_definitions_marker
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import shutil

from dagster import (
AssetExecutionContext,
Definitions,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()


defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})
# report data quality check result back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()