Skip to content

Commit

Permalink
[docs] [dagster-aws] add docs for PipesGlueClient (#22969)
Browse files Browse the repository at this point in the history
Adding docs for Glue Dagster Pipes

---------

Co-authored-by: Marco polo <[email protected]>
Co-authored-by: prha <[email protected]>
Co-authored-by: Erin Cochran <[email protected]>
(cherry picked from commit c4328ab)
  • Loading branch information
danielgafni authored and jmsanders committed Jul 29, 2024
1 parent fb8a574 commit 6eb6027
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@
}
]
},
{
"title": "Dagster Pipes + AWS Glue",
"path": "/concepts/dagster-pipes/aws-glue"
},
{
"title": "Dagster Pipes + AWS Lambda",
"path": "/concepts/dagster-pipes/aws-lambda"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
4 changes: 4 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa
title="Dagster Pipes tutorial"
href="/concepts/dagster-pipes/subprocess"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS Glue"
href="/concepts/dagster-pipes/aws-glue"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS Lambda"
href="/concepts/dagster-pipes/aws-lambda"
Expand Down
148 changes: 148 additions & 0 deletions docs/content/concepts/dagster-pipes/aws-glue.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
---
title: "Integrating AWS Glue with Dagster Pipes | Dagster Docs"
description: "Learn to integrate Dagster Pipes with AWS Glue to launch external code from Dagster assets."
---

# AWS Glue & Dagster Pipes

This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS Glue](https://aws.amazon.com/glue/).

The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesGlueClient" module="dagster_aws.pipes" /> resource which can be used to launch AWS Glue jobs from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the job side.

---

## Prerequisites

- **In the orchestration environment**, you'll need to:

- Install the following packages:

```shell
pip install dagster dagster-webserver dagster-aws
```

Refer to the [Dagster installation guide](/getting-started/install) for more info.

- **An existing boto3 client that can authenticate to AWS.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).

- **In AWS**:

- An existing AWS account
- An AWS Glue job with a Python 3.8+ runtime environment

---

## Step 1: Provide the dagster-pipes module

Provide the `dagster-pipes` module to the AWS Glue job either by installing it in the Glue job environment or packaging it along with the job script.

---

## Step 2: Add dagster-pipes to the Glue job

Call `open_dagster_pipes` in the Glue job script to create a context that can be used to send messages to Dagster:

```python file=/guides/dagster/dagster_pipes/glue/glue_script.py
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")


if __name__ == "__main__":
main()
```

---

## Step 3: Add the PipesGlueClient to Dagster code

In the Dagster asset/op code, use the `PipesGlueClient` resource to launch the job:

```python file=/guides/dagster/dagster_pipes/glue/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
import os

# dagster_glue_pipes.py
import boto3
from dagster_aws.pipes import PipesGlueClient

from dagster import AssetExecutionContext, asset


@asset
def glue_pipes_asset(
context: AssetExecutionContext, pipes_glue_client: PipesGlueClient
):
return pipes_glue_client.run(
context=context,
job_name="Example Job",
arguments={"some_parameter_value": "1"},
).get_materialize_result()
```

This will launch the AWS Glue job and monitor its status until it either fails or succeeds. A job failure will also cause the Dagster run to fail with an exception.

---

## Step 4: Create Dagster definitions

Next, add the `PipesGlueClient` resource to your project's <PyObject object="Definitions" /> object:

```python file=/guides/dagster/dagster_pipes/glue/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]


defs = Definitions(
assets=[glue_pipes_asset],
resources={
"pipes_glue_client": PipesGlueClient(
client=boto3.client("glue"),
context_injector=PipesGlueContextInjector(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
```

Dagster will now be able to launch the AWS Glue job from the `glue_pipes_asset` asset.

---

## Related

<ArticleList>
<ArticleListItem
title="Dagster Pipes"
href="/concepts/dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="AWS Glue Pipes API reference"
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesGlueClient"
></ArticleListItem>
</ArticleList>
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ Pipes

.. autoclass:: dagster_aws.pipes.PipesLambdaClient

.. autoclass:: dagster_aws.pipes.PipesGlueClient

Legacy
--------

Expand Down
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-pipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Params loaders load the bootstrap payload from some globally accessible key-valu

.. autoclass:: PipesEnvVarParamsLoader

.. autoclass:: PipesCliArgsParamsLoader

----

Message writers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# start_asset_marker
import os

# dagster_glue_pipes.py
import boto3
from dagster_aws.pipes import PipesGlueClient

from dagster import AssetExecutionContext, asset


@asset
def glue_pipes_asset(
context: AssetExecutionContext, pipes_glue_client: PipesGlueClient
):
return pipes_glue_client.run(
context=context,
job_name="Example Job",
arguments={"some_parameter_value": "1"},
).get_materialize_result()


# end_asset_marker

# start_definitions_marker

from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]


defs = Definitions(
assets=[glue_pipes_asset],
resources={
"pipes_glue_client": PipesGlueClient(
client=boto3.client("glue"),
context_injector=PipesGlueContextInjector(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)

# end_definitions_marker
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")


if __name__ == "__main__":
main()

1 comment on commit 6eb6027

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-apylyxb11-elementl.vercel.app
https://release-1-7-16.dagster.dagster-docs.io

Built with commit 6eb6027.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.