Skip to content

Commit

Permalink
[docs][pipes] - Subprocess page
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhan committed Oct 9, 2023
1 parent 2b54bfe commit c8629f0
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,241 @@ description: ""
---

# Integrating Subprocess with Dagster Pipes

In this guide, we’ll show you how to use [Dagster Pipes](/guides/dagster-pipes) with Dagster’s built-in subprocess implementation to run a subprocess with the given command and environment. You can then send information such as structured metadata and logging back to Dagster from the subprocess, where it will be visible in the Dagster UI.

<Note>

This guide focuses on using an out-of-the-box `PipesSubprocessClient` resource. For further customization with the subprocess invocation, use <PyObject module="dagster_pipes" object="open_dagster_pipes"/> approach instead. Refer to [Customizing Dagster Pipes protocols](/guides/dagster-pipes/customizing-dagster-pipes-protocols) for more info.

</Note>

---

## Prerequisites

To use Dagster Pipes to run a subprocess , you’ll need:

- To have Dagster (`dagster`) and the Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](/getting-started/install) for more info.

---

## Step 1: Create a Dagster asset that executes a subprocess

In this step, you’ll create a Dagster asset that, when materialized, opens a Dagster pipes session and invokes subprocess that executes some external code.

### Step 1.1: Define a Dagster asset that executes a subprocess

1. Create a new file named `dagster_subprocess_pipes.py` in your Dagster project.
2. First, you’ll define the asset. Copy and paste the following into the file:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes.py startafter=start_asset_marker endbefore=end_asset_marker lines=-16
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()
```

Here’s what we did in this example:

- Created an asset named `subprocess_asset`
- Provided <PyObject object="AssetExecutionContext" /> as the `context` argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section.
- Specified a resource for the asset to use, `PipesSubprocessClient`. We’ll also come back to this in a little bit.
- Declared a command list `cmd` to run the external script. In the list:
- First, found the path to the Python executable on the system using `shutil.which("python")`.
- Then, provided the file path to the file that we want to execute. In this case, it’s a file called `external_code.py` in the same directory as `dagster_subprocess_pipes.py`. Refer to [Modifying existing code to work with Dagster Pipes](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes) for what the file would look like.

3. Then, invoke the subprocess from the asset using the `pipes_subprocess_client` resource:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes.py startafter=start_asset_marker endbefore=end_asset_marker
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()
```

Let’s take a look at what this code does:

- The `PipesSubprocessClient` resource used by the asset contains a `run` method.
- When the asset is executed, this method will synchronously execute the subprocess with in a pipes session, and it will return a `PipesClientCompletedInvocation` object.
- This object contains a `get_results` method, which you can use to access the results reported by the subprocess.
- Lastly, return the results of the subprocess.

### Step 1.2: Define a Definitions object

To make the asset and subprocess resource loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster Cloud, you’ll create a `Definitions` object which contains them.

Copy and paste the following to the bottom of `dagster_subprocess_pipes.py`:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions

defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
```

At this point, `dagster_subprocess_pipes.py` should look like the following:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_pipes_finished.py
import shutil

from dagster import (
AssetExecutionContext,
Definitions,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()


defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
```

---

## Step 2: Modify the external code and run it from the Dagster UI

The next step is to modify the external code to work with Dagster Pipes and execute the code from Dagster. Refer to [Modifying existing code to work with Dagster Pipes](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes) for how to:

- [Make Dagster context available in existing code](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes#make-dagster-context-available-in-existing-code)
- [Stream log messages from subprocess back to Dagster](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes#send-log-messages-to-dagster)
- [Stream structured metadata from subprocess back to Dagster](/guides/dagster-pipes/modifying-existing-code-to-work-with-dagster-pipes#send-structured-metadata-to-dagster)

---

## Step 3: Run the subprocess from the Dagster UI

After completing Step 2, you should have created a standalone Python file called `external_code.py` which looks like the following:

```python file=/guides/dagster/dagster_pipes/subprocess/external_code_finished.py
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})
# report data quality check result back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()
```

In this step, you’ll execute the subprocess you created in Step 1.2 from the Dagster UI.

1. In a new command line session, run the following to start the UI:

```bash
dagster dev
```

2. Navigate to localhost:3000, where you should see the UI:

<Image
alt="Asset in the UI"
src="/images/guides/dagster-pipes/subprocess-1-asset.png"
width={1264}
height={714}
/>

3. Click **Materialize** located in the top right to run your code:

<Image
alt="Materialize asset"
src="/images/guides/dagster-pipes/subprocess-2-materialize.png"
width={1326}
height={744}
/>

4. Navigate to the [Run details](/concepts/webserver/ui#run-details) page, where you should see the structured asset metadata and asset check result:

<Image
alt="Events in the run details page"
src="/images/guides/dagster-pipes/subprocess-3-events.png"
width={1272}
height={718}
/>

---

## Examples

### Specify environment variables and extras to a subprocess

The `run` method to the `PipesSubprocessClient` resource also accepts `env` and `extras` , which allow you to specify environment variables and extra arguments when executing the subprocess:

```python file=/guides/dagster/dagster_pipes/subprocess/dagster_subprocess_env_extras.py
import os
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
extras={"foo": "bar"},
env={
"MY_ENV_VAR_IN_SUBPROCESS": os.environ["MY_ENV_VAR"],
},
).get_results()
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
extras={"foo": "bar"},
env={
"MY_ENV_VAR_IN_SUBPROCESS": os.environ["MY_ENV_VAR"],
},
).get_results()
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# start_asset_marker
import shutil

from dagster import (
AssetExecutionContext,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()


# end_asset_marker

# start_definitions_marker

from dagster import Definitions

defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
# end_definitions_marker
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import shutil

from dagster import (
AssetExecutionContext,
Definitions,
PipesSubprocessClient,
asset,
file_relative_path,
)


@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(command=cmd, context=context).get_results()


defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
# get the Dagster Pipes context
context = PipesContext.get()
# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})
# report data quality check result back to Dagster
context.report_asset_check(
asset_key="my_asset",
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",
)


if __name__ == "__main__":
# connect to Dagster Pipes
with open_dagster_pipes() as context:
main()

0 comments on commit c8629f0

Please sign in to comment.