From bed5fdcf789970d3b2c0995f8b795e9983ceeb8f Mon Sep 17 00:00:00 2001 From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> Date: Fri, 9 Aug 2024 10:07:12 -0700 Subject: [PATCH] Add code for a use-case repository (#23460) ## Summary & Motivation This adds the `use_case_repository` package under examples which will be used by `dagster-website` to populate the the Repository on our website. All `.md` and `.py` files in `use_case_repository/guides` from the master branch will be loaded to our website during the build process. A local server is provided in order to quickly view your markdown files using `make webserver`. Refer to the `README.md` for more information. ## How I Tested These Changes local testing --- examples/use_case_repository/Makefile | 19 ++ examples/use_case_repository/README.md | 40 +++ examples/use_case_repository/pyproject.toml | 7 + examples/use_case_repository/setup.cfg | 2 + examples/use_case_repository/setup.py | 23 ++ examples/use_case_repository/tox.ini | 21 ++ .../use_case_repository/guides/_TEMPLATE.md | 99 +++++++ .../use_case_repository/guides/__init__.py | 0 .../guides/external_script.sh | 2 + .../guides/pipes_cli_command.md | 100 +++++++ .../guides/pipes_cli_command.py | 21 ++ .../guides/snowflake_to_s3_embedded_elt.md | 134 +++++++++ .../guides/snowflake_to_s3_embedded_elt.py | 47 ++++ .../use_case_repository_tests/__init__.py | 0 .../test_use_cases.py | 14 + .../use_case_repository/webserver/main.py | 259 ++++++++++++++++++ pyright/alt-1/requirements-pinned.txt | 24 +- pyright/master/requirements-pinned.txt | 54 ++-- pyright/master/requirements.txt | 1 + .../dagster/dagster/_generate/download.py | 2 +- 20 files changed, 831 insertions(+), 38 deletions(-) create mode 100644 examples/use_case_repository/Makefile create mode 100644 examples/use_case_repository/README.md create mode 100644 examples/use_case_repository/pyproject.toml create mode 100644 examples/use_case_repository/setup.cfg create mode 100644 examples/use_case_repository/setup.py create mode 100644 examples/use_case_repository/tox.ini create mode 100644 examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md create mode 100644 examples/use_case_repository/use_case_repository/guides/__init__.py create mode 100755 examples/use_case_repository/use_case_repository/guides/external_script.sh create mode 100644 examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md create mode 100644 examples/use_case_repository/use_case_repository/guides/pipes_cli_command.py create mode 100644 examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md create mode 100644 examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.py create mode 100644 examples/use_case_repository/use_case_repository_tests/__init__.py create mode 100644 examples/use_case_repository/use_case_repository_tests/test_use_cases.py create mode 100644 examples/use_case_repository/webserver/main.py diff --git a/examples/use_case_repository/Makefile b/examples/use_case_repository/Makefile new file mode 100644 index 0000000000000..8f606c67b6978 --- /dev/null +++ b/examples/use_case_repository/Makefile @@ -0,0 +1,19 @@ +.PHONY: install lint fix test webserver + +install: + pip install --upgrade uv + uv pip install -e .[dev] + +lint: + ruff check . + ruff format --check . + +fix: + ruff check --fix . + ruff format . + +test: + pytest + +webserver: + python -m webserver.main diff --git a/examples/use_case_repository/README.md b/examples/use_case_repository/README.md new file mode 100644 index 0000000000000..3d8a7cb60333f --- /dev/null +++ b/examples/use_case_repository/README.md @@ -0,0 +1,40 @@ +## Use Case Repository + +This repository contains a collection of use cases that demonstrate various applications and implementations using Dagster. + +### Purpose + +The use cases in this repository serve two main purposes: + +1. They're used to populate the list of available use cases on the Dagster.io website. +2. They provide practical examples for developers and data engineers working with Dagster. + +### Integration with Dagster.io + +The use cases are automatically fetched from the master branch of this repository during the build process of the Dagster.io website. This ensures that the website always displays the most up-to-date examples. In `dagster-website/scripts/fetchUseCases.js` you can find the code that fetches the use cases from this repository and updates the website. + +The script fetches from the master branch of this repository, so you will need to push your changes to the master branch to see them on the website. + +### File Structure + +Each use case consists of two main components: + +1. A Markdown (.md) file: Contains the descriptive content and documentation, along with code snippets. +2. A Python (.py) file: Contains the actual implementation code as a single file. + +Both files are utilized on the Dagster.io website. However, only the Python files are subject to automated testing. + +The TEMPLATE.md file is used to create new use cases. The actual template lives on our external +Scout platform. + +### Important Note + +When updating a use case, please make sure to modify both the Markdown and Python files to maintain consistency between the documentation and the implementation. + +### Local Preview + +To preview your changes locally before committing, you can start a local webserver by running the following command in your terminal: + +``` +make webserver +``` diff --git a/examples/use_case_repository/pyproject.toml b/examples/use_case_repository/pyproject.toml new file mode 100644 index 0000000000000..ec8cf7b72ca48 --- /dev/null +++ b/examples/use_case_repository/pyproject.toml @@ -0,0 +1,7 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "use_case_repository.definitions" +code_location_name = "use_case_repository" \ No newline at end of file diff --git a/examples/use_case_repository/setup.cfg b/examples/use_case_repository/setup.cfg new file mode 100644 index 0000000000000..eca5d2ef79c1d --- /dev/null +++ b/examples/use_case_repository/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +name = use_case_repository diff --git a/examples/use_case_repository/setup.py b/examples/use_case_repository/setup.py new file mode 100644 index 0000000000000..ba61c21d5e0a3 --- /dev/null +++ b/examples/use_case_repository/setup.py @@ -0,0 +1,23 @@ +from setuptools import find_packages, setup + +setup( + name="use_case_repository", + packages=find_packages(exclude=["use_case_repository_tests"]), + install_requires=[ + "dagster", + "dagster-embedded-elt", + "dagster-pipes", + "python-frontmatter", + "pymdown-extensions", + "markdown", + "flask", + "sling", + ], + extras_require={ + "dev": [ + "dagster-webserver", + "pytest", + "ruff", + ] + }, +) diff --git a/examples/use_case_repository/tox.ini b/examples/use_case_repository/tox.ini new file mode 100644 index 0000000000000..8a579ac4e097d --- /dev/null +++ b/examples/use_case_repository/tox.ini @@ -0,0 +1,21 @@ +[tox] +skipsdist = true + +[testenv] +download = true +passenv = + CI_* + BUILDKITE* + COVERALLS_REPO_TOKEN +install_command = uv pip install {opts} {packages} +deps = + source: -e ../../python_modules/dagster[test] + source: -e ../../python_modules/dagster-pipes + pypi: dagster[test] + -e .[dev] +allowlist_externals = + /bin/bash + uv +commands = + source: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' + pytest -c ../../pyproject.toml -vv diff --git a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md new file mode 100644 index 0000000000000..d4eedb3a8ff57 --- /dev/null +++ b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md @@ -0,0 +1,99 @@ +--- +title: "Snowflake to S3 ETL with Dagster" +description: "This use case demonstrates how to transfer data from Snowflake to Amazon S3 using Dagster. The objective is to automate the extraction of data from Snowflake and store it in S3 for further processing or archival." +tags: ["snowflake", "s3"] +--- + +# [Insert a Use Case Name that has high SEO Value here] + +Provide a brief overview of the use case, including its objectives and the main problem it addresses. All use cases must use Dagster to accomplish tasks. + +--- + +## What You'll Learn + +You will learn how to: + +- Define a Dagster asset that extracts data from an external source and writes it to a database +- Add other bullets here +- ... + +--- + +## Prerequisites + +To follow the steps in this guide, you will need: + +- To have Dagster and the Dagster UI installed. Refer to the [Dagster Installation Guide](https://docs.dagster.io/getting-started/installation) for instructions. +- A basic understanding of Dagster. Refer to the [Dagster Documentation](https://docs.dagster.io/getting-started/what-why-dagster) for more information. +- List other prerequisites here + +--- + +## Steps to Implement With Dagster + +By following these steps, you will [Provide a general description of what the user will wind up with by the end of the guide]. [Also provide a general description of what this enables them to do]. + +### Step 1: Enter the Name of Step 1 Here + +Provide a brief description of what this step does. Prefer a small, working Dagster +example as a first step. Here is an example of what this might look like: + +```python + from dagster import ( + asset, + DailyPartitionsDefinition, + AssetExecutionContext, + Definitions, + ) + + import datetime + + # Define the partitions + partitions_def = DailyPartitionsDefinition(start_date="2023-01-01") + + + @asset(partitions_def=partitions_def) + def upstream_asset(context: AssetExecutionContext): + with open(f"data-{partition_date}.csv", "w") as f: + f.write(f"Data for partition {partition_date}") + + snapshot_date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + with open(f"data-{snapshot_date}.csv", "w") as f: + f.write(f"Data for partition {partition_date}") + + + defs = Definitions(assets=[upstream_asset]) +``` + +### Step 2: Enter the Name of Step 2 Here + +Provide a brief description of what this step does. + +### Step 3: Enter the Name of Step 3 Here + +Provide a brief description of what this step does. + +--- + +## Expected Outcomes + +Describe the expected outcomes of implementing the use case. Include any results or metrics that indicate success. + +--- + +## Troubleshooting + +Provide solutions to common issues that might arise while implementing the use case. + +--- + +## Next Steps + +What should the person try next after this? + +--- + +## Additional Resources + +List any additional resources, such as documentation, tutorials, or community links, that could help users implement the use case. diff --git a/examples/use_case_repository/use_case_repository/guides/__init__.py b/examples/use_case_repository/use_case_repository/guides/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/use_case_repository/use_case_repository/guides/external_script.sh b/examples/use_case_repository/use_case_repository/guides/external_script.sh new file mode 100755 index 0000000000000..257462022080b --- /dev/null +++ b/examples/use_case_repository/use_case_repository/guides/external_script.sh @@ -0,0 +1,2 @@ +#! /bin/bash +echo "Hello from CLI" diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md new file mode 100644 index 0000000000000..7fe083edc9e36 --- /dev/null +++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md @@ -0,0 +1,100 @@ +--- +title: "Using Dagster Pipes Subprocess to Run a CLI Command" +description: "This use case demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. The objective is to execute non-Python workloads and integrate their outputs into Dagster's data pipeline." +tags: ["dagster pipes", "subprocess", "CLI"] +--- + +# Running CLI Commands with Dagster Pipes + +This guide demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. This is useful for integrating non-Python workloads, such as Bash scripts or other command-line tools, into your Dagster data pipeline. + +--- + +## What You’ll Learn + +You will learn how to: + +- Define a Dagster asset that invokes a CLI command. +- Use Dagster Pipes to manage subprocess execution. +- Capture and use the output of the CLI command within Dagster. + +--- + +## Prerequisites + +To follow the steps in this guide, you'll need: + +- To have Dagster and the Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info. + +--- + +## Steps to Implement with Dagster + +By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline. + +### Step 1: Define the CLI Command Script + +Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content: + +```bash +#!/bin/bash +echo "Hello from CLI" +echo "My env var is: ${MY_ENV_VAR}" +``` + +### Step 2: Define the Dagster Asset + +Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters. + +Save the following file to `dagster_pipes_cli.py`: + +```python +import shutil + +from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset + +@asset +def cli_command_asset( + context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient +): + cmd = [shutil.which("bash"), "external_script.sh"] + return pipes_subprocess_client.run( + command=cmd, + context=context, + env={"MY_ENV_VAR": "example_value"}, + ).get_materialize_result() + +defs = Definitions( + assets=[cli_command_asset], + resources={"pipes_subprocess_client": PipesSubprocessClient()}, +) +``` + +### Step 3: Configure and Run the Asset + +Ensure the script is executable and run the Dagster asset to see the output. + +```bash +chmod +x external_script.sh +dagster dev -f path_to_your_dagster_file.py +``` + +--- + +## Troubleshooting + +- **Permission Denied**: Ensure the script file has executable permissions using `chmod +x`. +- **Command Not Found**: Verify the command is available in the system's `PATH` or provide the full path to the command. + +--- + +## Next Steps + +Explore more advanced use cases with Dagster Pipes, such as integrating with other command-line tools or handling more complex workflows. + +--- + +## Additional Resources + +- [Dagster Pipes Documentation](https://docs.dagster.io/guides/dagster-pipes) +- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install) diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.py b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.py new file mode 100644 index 0000000000000..91dede172a67f --- /dev/null +++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.py @@ -0,0 +1,21 @@ +import shutil + +from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset + + +@asset +def cli_command_asset( + context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient +): + cmd = [shutil.which("bash"), "external_script.sh"] + return pipes_subprocess_client.run( + command=cmd, + context=context, + env={"MY_ENV_VAR": "example_value"}, + ).get_materialize_result() + + +defs = Definitions( + assets=[cli_command_asset], + resources={"pipes_subprocess_client": PipesSubprocessClient()}, +) diff --git a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md new file mode 100644 index 0000000000000..e5e8bd0ec950c --- /dev/null +++ b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md @@ -0,0 +1,134 @@ +--- +title: "Ingesting Data from S3 to Snowflake with Dagster and Sling" +description: "This use case demonstrates how to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The objective is to automate the data ingestion process for efficient data management and analysis." +tags: ["snowflake", "s3", "dagster", "sling", "data ingestion"] +--- + +## Ingesting Data from S3 to Snowflake with Dagster and Sling + +This guide provides a step-by-step approach to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The main objective is to automate the data ingestion process, making it efficient and reliable for data management and analysis. + +--- + +## What You'll Learn + +By following this guide, you will learn how to: + +- Set up connections to S3 and Snowflake using Sling. +- Define and configure assets in Dagster to automate the data ingestion process. +- Execute the data ingestion pipeline. + +--- + +## Prerequisites + +Before you begin, ensure you have the following: + +- A Snowflake account with the necessary permissions. +- An Amazon S3 bucket with data to ingest. +- AWS credentials with access to the S3 bucket. +- Python installed on your system. +- Dagster and dagster-embedded-elt installed in your Python environment. + +--- + +## Steps to Implement With Dagster + +By following these steps, you will have an automated pipeline that ingests data from Amazon S3 into Snowflake. The data will be available in Snowflake for further processing and analysis. + +### Step 1: Install Required Packages + +Ensure you have the necessary Python packages installed. Install Dagster and the Dagster UI (`dagster-webserver`) and dagster-embedded-elt using pip. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info. + +```bash +pip install dagster dagster-embedded-elt dagster-webserver +``` + +### Step 2: Define Sling Connections + +Define the connections to S3 and Snowflake using SlingConnectionResource. Use environment variables to securely manage sensitive information. + +```python +from dagster import EnvVar +from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource + +s3_connection = SlingConnectionResource( + name="MY_S3", + type="s3", + bucket="your-s3-bucket", + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), +) + +snowflake_connection = SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host="your-snowflake-host", + user="your-snowflake-user", + database="your-snowflake-database", + password=EnvVar("SNOWFLAKE_PASSWORD"), + role="your-snowflake-role", +) + +sling_resource = SlingResource(connections=[s3_connection, snowflake_connection]) +``` + +### Step 3: Define the Data Ingestion Asset + +Use the `@sling_assets` decorator to define an asset that runs the Sling replication job. Configure the replication settings to specify the source and target. + +```python +from dagster import Definitions +from dagster_embedded_elt.sling import sling_assets + +replication_config = { + "SOURCE": "MY_S3", + "TARGET": "MY_SNOWFLAKE", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "s3://your-s3-bucket/your-file.csv": { + "object": "your_snowflake_schema.your_table", + "primary_key": "id", + }, + }, +} + +@sling_assets(replication_config=replication_config) +def ingest_s3_to_snowflake(context, sling: SlingResource): + yield from sling.replicate(context=context) + +defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource}) +``` + +--- + +## Expected Outcomes + +After implementing this use case, you should have an automated pipeline that ingests data from Amazon S3 into Snowflake. The data will be available in Snowflake for further processing and analysis. + +--- + +## Troubleshooting + +- **Connection Issues**: Ensure that your AWS and Snowflake credentials are correctly set in the environment variables. +- **Data Format Issues**: Verify that the data format in S3 matches the expected format in Snowflake. +- **Permissions**: Ensure that the Snowflake user has the necessary permissions to write to the target schema and table. + +--- + +## Additional Resources + +- [Dagster Documentation](https://docs.dagster.io/) +- [Embedded ELT Documentation](https://docs.dagster.io/integrations/embedded-elt) +- [Sling Documentation](https://docs.slingdata.io/) +- [Snowflake Documentation](https://docs.snowflake.com/) + +--- + +## Next Steps + +- Explore more advanced configurations and transformations using Sling. +- Integrate additional data sources and targets to expand your data pipeline. +- Implement monitoring and alerting for your data ingestion pipeline. + +By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling. diff --git a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.py b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.py new file mode 100644 index 0000000000000..74975dd9a84a1 --- /dev/null +++ b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.py @@ -0,0 +1,47 @@ +# pyright: reportCallIssue=none +# pyright: reportOptionalMemberAccess=none + +from dagster import Definitions, EnvVar +from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource, sling_assets + +# Step 2: Define Sling Connections +s3_connection = SlingConnectionResource( + name="MY_S3", + type="s3", + bucket="your-s3-bucket", + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), +) + +snowflake_connection = SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host="your-snowflake-host", + user="your-snowflake-user", + database="your-snowflake-database", + password=EnvVar("SNOWFLAKE_PASSWORD"), + role="your-snowflake-role", +) + +sling_resource = SlingResource(connections=[s3_connection, snowflake_connection]) + +# Step 3: Define the Data Ingestion Asset +replication_config = { + "SOURCE": "MY_S3", + "TARGET": "MY_SNOWFLAKE", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "s3://your-s3-bucket/your-file.csv": { + "object": "your_snowflake_schema.your_table", + "primary_key": "id", + }, + }, +} + + +@sling_assets(replication_config=replication_config) +def ingest_s3_to_snowflake(context, sling: SlingResource): + yield from sling.replicate(context=context) + + +defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource}) diff --git a/examples/use_case_repository/use_case_repository_tests/__init__.py b/examples/use_case_repository/use_case_repository_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/use_case_repository/use_case_repository_tests/test_use_cases.py b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py new file mode 100644 index 0000000000000..ad92479158d8e --- /dev/null +++ b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py @@ -0,0 +1,14 @@ +from use_case_repository.guides.pipes_cli_command import cli_command_asset +from use_case_repository.guides.snowflake_to_s3_embedded_elt import ( + ingest_s3_to_snowflake, + sling_resource, +) + + +def test_snowflake_to_s3_embedded_elt(): + assert ingest_s3_to_snowflake + assert sling_resource + + +def test_pipes_cli_command(): + assert cli_command_asset diff --git a/examples/use_case_repository/webserver/main.py b/examples/use_case_repository/webserver/main.py new file mode 100644 index 0000000000000..1b0f932173f53 --- /dev/null +++ b/examples/use_case_repository/webserver/main.py @@ -0,0 +1,259 @@ +import os + +import frontmatter +import markdown +from dagster import file_relative_path +from flask import Flask, abort, render_template_string +from pygments.formatters import HtmlFormatter + +app = Flask(__name__) + +USE_CASES_DIR = file_relative_path(__file__, "../use_case_repository/guides") + + +@app.route("/") +def index(): + use_cases = [] + for filename in os.listdir(USE_CASES_DIR): + if filename.endswith(".md") and not filename.startswith("_"): + with open(os.path.join(USE_CASES_DIR, filename), "r") as f: + post = frontmatter.load(f) + use_cases.append( + { + "title": post.get("title", "Untitled"), + "description": post.get("description", ""), + "tags": post.get("tags", []), + "slug": filename[:-3], + } + ) + + return render_template_string( + """ + + +
+ + +{{ use_case.description }}
+ +{{ description }}
+ +