From 157eee8c14da856be4e1f025a4eecd9f52a32399 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Tue, 6 Aug 2024 16:33:11 -0700
Subject: [PATCH 01/13] scaffold use-case repo
---
examples/use_case_repository/README.md | 7 +++++++
examples/use_case_repository/pyproject.toml | 7 +++++++
examples/use_case_repository/setup.cfg | 2 ++
examples/use_case_repository/setup.py | 14 +++++++++++++
examples/use_case_repository/tox.ini | 21 +++++++++++++++++++
.../test_use_cases.py | 2 ++
6 files changed, 53 insertions(+)
create mode 100644 examples/use_case_repository/README.md
create mode 100644 examples/use_case_repository/pyproject.toml
create mode 100644 examples/use_case_repository/setup.cfg
create mode 100644 examples/use_case_repository/setup.py
create mode 100644 examples/use_case_repository/tox.ini
create mode 100644 examples/use_case_repository/use_case_repository_tests/test_use_cases.py
diff --git a/examples/use_case_repository/README.md b/examples/use_case_repository/README.md
new file mode 100644
index 0000000000000..395c34e5510c1
--- /dev/null
+++ b/examples/use_case_repository/README.md
@@ -0,0 +1,7 @@
+## Use Case Repository
+
+These use-cases are read by the dagster.io website to populate a list of available uses cases for our use case repository.
+
+Use cases are fetched via an automated simply step during the extremely build of the dagster.io website from the master branch of this repository.
+
+Both the markdown and python files are used on our robust website. Please ensure you update both when making changes. Only the python files are tested.
diff --git a/examples/use_case_repository/pyproject.toml b/examples/use_case_repository/pyproject.toml
new file mode 100644
index 0000000000000..ec8cf7b72ca48
--- /dev/null
+++ b/examples/use_case_repository/pyproject.toml
@@ -0,0 +1,7 @@
+[build-system]
+requires = ["setuptools"]
+build-backend = "setuptools.build_meta"
+
+[tool.dagster]
+module_name = "use_case_repository.definitions"
+code_location_name = "use_case_repository"
\ No newline at end of file
diff --git a/examples/use_case_repository/setup.cfg b/examples/use_case_repository/setup.cfg
new file mode 100644
index 0000000000000..eca5d2ef79c1d
--- /dev/null
+++ b/examples/use_case_repository/setup.cfg
@@ -0,0 +1,2 @@
+[metadata]
+name = use_case_repository
diff --git a/examples/use_case_repository/setup.py b/examples/use_case_repository/setup.py
new file mode 100644
index 0000000000000..88ce4833c595b
--- /dev/null
+++ b/examples/use_case_repository/setup.py
@@ -0,0 +1,14 @@
+from setuptools import find_packages, setup
+
+setup(
+ name="use_case_repository",
+ packages=find_packages(exclude=["use_case_repository_tests"]),
+ install_requires=[
+ "dagster",
+ "dagster-cloud",
+ "boto3",
+ "pandas",
+ "matplotlib",
+ ],
+ extras_require={"dev": ["dagster-webserver", "pytest"]},
+)
diff --git a/examples/use_case_repository/tox.ini b/examples/use_case_repository/tox.ini
new file mode 100644
index 0000000000000..a79e67a85984b
--- /dev/null
+++ b/examples/use_case_repository/tox.ini
@@ -0,0 +1,21 @@
+[tox]
+skipsdist = true
+
+[testenv]
+download = true
+passenv =
+ CI_*
+ COVERALLS_REPO_TOKEN
+install_command = uv pip install {opts} {packages}
+deps =
+ source: -e ../../python_modules/dagster[test]
+ source: -e ../../python_modules/dagster-pipes
+ pypi: dagster[test]
+ -e .[dev]
+allowlist_externals =
+ /bin/bash
+ uv
+ pytest
+commands =
+ source: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
+ pytest -c ../../pyproject.toml -vv
diff --git a/examples/use_case_repository/use_case_repository_tests/test_use_cases.py b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
new file mode 100644
index 0000000000000..8625176e85f58
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
@@ -0,0 +1,2 @@
+def test_use_cases():
+ assert True
From 0b50707ef7e27bd35ba32e728efc0807f36bdc45 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Tue, 6 Aug 2024 16:56:19 -0700
Subject: [PATCH 02/13] Add use case repository
---
examples/use_case_repository/Makefile | 19 ++
examples/use_case_repository/README.md | 36 ++-
examples/use_case_repository/setup.py | 19 +-
.../use_case_repository/__init__.py | 0
.../use_case_repository/external_script.sh | 2 +
.../use_case_repository/pipes_cli_command.md | 104 +++++++
.../use_case_repository/pipes_cli_command.py | 21 ++
.../snowflake_to_s3_embedded_elt.md | 119 ++++++++
.../snowflake_to_s3_embedded_elt.py | 47 ++++
.../use_case_repository_tests/__init__.py | 0
.../test_use_cases.py | 8 +-
.../use_case_repository/webserver/main.py | 259 ++++++++++++++++++
pyright/alt-1/requirements-pinned.txt | 24 +-
pyright/master/requirements-pinned.txt | 54 ++--
pyright/master/requirements.txt | 1 +
15 files changed, 666 insertions(+), 47 deletions(-)
create mode 100644 examples/use_case_repository/Makefile
create mode 100644 examples/use_case_repository/use_case_repository/__init__.py
create mode 100755 examples/use_case_repository/use_case_repository/external_script.sh
create mode 100644 examples/use_case_repository/use_case_repository/pipes_cli_command.md
create mode 100644 examples/use_case_repository/use_case_repository/pipes_cli_command.py
create mode 100644 examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.md
create mode 100644 examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.py
create mode 100644 examples/use_case_repository/use_case_repository_tests/__init__.py
create mode 100644 examples/use_case_repository/webserver/main.py
diff --git a/examples/use_case_repository/Makefile b/examples/use_case_repository/Makefile
new file mode 100644
index 0000000000000..8f606c67b6978
--- /dev/null
+++ b/examples/use_case_repository/Makefile
@@ -0,0 +1,19 @@
+.PHONY: install lint fix test webserver
+
+install:
+ pip install --upgrade uv
+ uv pip install -e .[dev]
+
+lint:
+ ruff check .
+ ruff format --check .
+
+fix:
+ ruff check --fix .
+ ruff format .
+
+test:
+ pytest
+
+webserver:
+ python -m webserver.main
diff --git a/examples/use_case_repository/README.md b/examples/use_case_repository/README.md
index 395c34e5510c1..06ace7020faf7 100644
--- a/examples/use_case_repository/README.md
+++ b/examples/use_case_repository/README.md
@@ -1,7 +1,37 @@
## Use Case Repository
-These use-cases are read by the dagster.io website to populate a list of available uses cases for our use case repository.
+This repository contains a collection of use cases that demonstrate various applications and implementations using Dagster.
-Use cases are fetched via an automated simply step during the extremely build of the dagster.io website from the master branch of this repository.
+### Purpose
-Both the markdown and python files are used on our robust website. Please ensure you update both when making changes. Only the python files are tested.
+The use cases in this repository serve two main purposes:
+
+1. They are used to populate the list of available use cases on the Dagster.io website.
+2. They provide practical examples for developers and data engineers working with Dagster.
+
+### Integration with Dagster.io
+
+The use cases are automatically fetched from the master branch of this repository during the build process of the Dagster.io website. This ensures that the website always displays the most up-to-date examples. In `dagster-website/scripts/fetchUseCases.js` you can find the code that fetches the use cases from this repository and updates the website.
+
+The script fetches from the master branch of this repository, so you will need to push your changes to the master branch to see them on the website.
+
+### File Structure
+
+Each use case consists of two main components:
+
+1. A Markdown (.md) file: Contains the descriptive content and documentation, along with code snippets.
+2. A Python (.py) file: Contains the actual implementation code as a single file.
+
+Both files are utilized on the Dagster.io website. However, only the Python files are subject to automated testing.
+
+### Important Note
+
+When updating a use case, please make sure to modify both the Markdown and Python files to maintain consistency between the documentation and the implementation.
+
+### Local Preview
+
+To preview your changes locally before committing, you can start a local webserver by running the following command in your terminal:
+
+```
+make webserver
+```
\ No newline at end of file
diff --git a/examples/use_case_repository/setup.py b/examples/use_case_repository/setup.py
index 88ce4833c595b..ba61c21d5e0a3 100644
--- a/examples/use_case_repository/setup.py
+++ b/examples/use_case_repository/setup.py
@@ -5,10 +5,19 @@
packages=find_packages(exclude=["use_case_repository_tests"]),
install_requires=[
"dagster",
- "dagster-cloud",
- "boto3",
- "pandas",
- "matplotlib",
+ "dagster-embedded-elt",
+ "dagster-pipes",
+ "python-frontmatter",
+ "pymdown-extensions",
+ "markdown",
+ "flask",
+ "sling",
],
- extras_require={"dev": ["dagster-webserver", "pytest"]},
+ extras_require={
+ "dev": [
+ "dagster-webserver",
+ "pytest",
+ "ruff",
+ ]
+ },
)
diff --git a/examples/use_case_repository/use_case_repository/__init__.py b/examples/use_case_repository/use_case_repository/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/use_case_repository/use_case_repository/external_script.sh b/examples/use_case_repository/use_case_repository/external_script.sh
new file mode 100755
index 0000000000000..257462022080b
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository/external_script.sh
@@ -0,0 +1,2 @@
+#! /bin/bash
+echo "Hello from CLI"
diff --git a/examples/use_case_repository/use_case_repository/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/pipes_cli_command.md
new file mode 100644
index 0000000000000..9b8bf5364cdde
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository/pipes_cli_command.md
@@ -0,0 +1,104 @@
+---
+title: "Using Dagster Pipes Subprocess to Run a CLI Command"
+description: "This use case demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. The objective is to execute non-Python workloads and integrate their outputs into Dagster's data pipeline."
+tags: ["dagster pipes", "subprocess", "CLI"]
+---
+## Running CLI Commands with Dagster Pipes
+
+### Overview
+
+This guide demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. This is useful for integrating non-Python workloads, such as Bash scripts or other command-line tools, into your Dagster data pipeline.
+
+### Prerequisites
+
+- Dagster and Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info.
+- An existing CLI command or script that you want to run.
+
+### What You’ll Learn
+
+You will learn how to:
+- Define a Dagster asset that invokes a CLI command.
+- Use Dagster Pipes to manage subprocess execution.
+- Capture and utilize the output of the CLI command within Dagster.
+
+### Steps to Implement With Dagster
+
+1. **Step 1: Define the CLI Command Script**
+ - Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content:
+ ```bash
+ #!/bin/bash
+ echo "Hello from CLI"
+ ```
+
+2. **Step 2: Define the Dagster Asset**
+ - Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
+ ```python
+ import shutil
+ from dagster import asset, Definitions, AssetExecutionContext
+ from dagster_pipes import PipesSubprocessClient
+
+ @asset
+ def cli_command_asset(
+ context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
+ ):
+ cmd = [shutil.which("bash"), "external_script.sh"]
+ return pipes_subprocess_client.run(
+ command=cmd,
+ context=context,
+ env={"MY_ENV_VAR": "example_value"},
+ ).get_materialize_result()
+
+ defs = Definitions(
+ assets=[cli_command_asset],
+ resources={"pipes_subprocess_client": PipesSubprocessClient()},
+ )
+ ```
+
+3. **Step 3: Configure and Run the Asset**
+ - Ensure the script is executable and run the Dagster asset to see the output.
+ ```bash
+ chmod +x external_script.sh
+ dagit -f path_to_your_dagster_file.py
+ ```
+
+### Expected Outcomes
+
+By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline.
+
+### Troubleshooting
+
+- **Permission Denied**: Ensure the script file has executable permissions using `chmod +x`.
+- **Command Not Found**: Verify the command is available in the system's PATH or provide the full path to the command.
+
+### Additional Resources
+
+- [Dagster Pipes Documentation](https://docs.dagster.io/guides/dagster-pipes)
+- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install)
+
+### Next Steps
+
+Explore more advanced use cases with Dagster Pipes, such as integrating with other command-line tools or handling more complex workflows.
+
+The Steps MUST always be pythonic Dagster code. If the documentation includes @solids or @ops and repository, discard it. The documentation should only use the new Dagster APIs, such as @asset and Definitions.
+
+Use as many steps as necessary. 3 is the minimum number of steps.
+
+Do not add an `if name == __main__` block. Do not call the `materialize` function. Only provide the definition for the assets. Avoid the following words: certainly, simply, robust, ensure
+
+Here is a minimal Dagster code:
+
+```python
+from dagster import asset, Definitions, materialize
+
+@asset
+def example_asset():
+ return "Example output"
+
+@asset(deps=[example_asset])
+def another_asset():
+ return "Example output"
+
+defs = Definitions(
+ assets=[example_asset]
+)
+```
\ No newline at end of file
diff --git a/examples/use_case_repository/use_case_repository/pipes_cli_command.py b/examples/use_case_repository/use_case_repository/pipes_cli_command.py
new file mode 100644
index 0000000000000..91dede172a67f
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository/pipes_cli_command.py
@@ -0,0 +1,21 @@
+import shutil
+
+from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset
+
+
+@asset
+def cli_command_asset(
+ context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
+):
+ cmd = [shutil.which("bash"), "external_script.sh"]
+ return pipes_subprocess_client.run(
+ command=cmd,
+ context=context,
+ env={"MY_ENV_VAR": "example_value"},
+ ).get_materialize_result()
+
+
+defs = Definitions(
+ assets=[cli_command_asset],
+ resources={"pipes_subprocess_client": PipesSubprocessClient()},
+)
diff --git a/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.md
new file mode 100644
index 0000000000000..f695d47550bb6
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.md
@@ -0,0 +1,119 @@
+---
+title: "Ingesting Data from S3 to Snowflake with Dagster and Sling"
+description: "This use case demonstrates how to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The objective is to automate the data ingestion process for efficient data management and analysis."
+tags: ["snowflake", "s3", "dagster", "sling", "data ingestion"]
+---
+## Ingesting Data from S3 to Snowflake with Dagster and Sling
+
+### Overview
+
+This guide provides a step-by-step approach to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The main objective is to automate the data ingestion process, making it efficient and reliable for data management and analysis.
+
+### Prerequisites
+
+Before you begin, ensure you have the following:
+
+- A Snowflake account with the necessary permissions.
+- An Amazon S3 bucket with data to ingest.
+- AWS credentials with access to the S3 bucket.
+- Python installed on your system.
+- Dagster and dagster-embedded-elt installed in your Python environment.
+
+### What You’ll Learn
+
+By following this guide, you will learn how to:
+
+- Set up connections to S3 and Snowflake using Sling.
+- Define and configure assets in Dagster to automate the data ingestion process.
+- Execute the data ingestion pipeline.
+
+### Steps to Implement With Dagster
+
+1. **Step 1: Install Required Packages**
+ - Ensure you have the necessary Python packages installed.
+ - Install Dagster and dagster-embedded-elt using pip.
+
+ ```bash
+ pip install dagster dagster-embedded-elt
+ ```
+
+2. **Step 2: Define Sling Connections**
+ - Define the connections to S3 and Snowflake using SlingConnectionResource.
+ - Use environment variables to securely manage sensitive information.
+
+ ```python
+ from dagster import EnvVar
+ from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource
+
+ s3_connection = SlingConnectionResource(
+ name="MY_S3",
+ type="s3",
+ bucket="your-s3-bucket",
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
+ )
+
+ snowflake_connection = SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host="your-snowflake-host",
+ user="your-snowflake-user",
+ database="your-snowflake-database",
+ password=EnvVar("SNOWFLAKE_PASSWORD"),
+ role="your-snowflake-role",
+ )
+
+ sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
+ ```
+
+3. **Step 3: Define the Data Ingestion Asset**
+ - Use the `@sling_assets` decorator to define an asset that runs the Sling replication job.
+ - Configure the replication settings to specify the source and target.
+
+ ```python
+ from dagster import asset, Definitions
+ from dagster_embedded_elt.sling import sling_assets
+
+ replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SNOWFLAKE",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://your-s3-bucket/your-file.csv": {
+ "object": "your_snowflake_schema.your_table",
+ "primary_key": "id",
+ },
+ },
+ }
+
+ @sling_assets(replication_config=replication_config)
+ def ingest_s3_to_snowflake(context, sling: SlingResource):
+ yield from sling.replicate(context=context)
+
+ defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
+ ```
+
+### Expected Outcomes
+
+After implementing this use case, you should have an automated pipeline that ingests data from Amazon S3 into Snowflake. The data will be available in Snowflake for further processing and analysis.
+
+### Troubleshooting
+
+- **Connection Issues**: Ensure that your AWS and Snowflake credentials are correctly set in the environment variables.
+- **Data Format Issues**: Verify that the data format in S3 matches the expected format in Snowflake.
+- **Permissions**: Ensure that the Snowflake user has the necessary permissions to write to the target schema and table.
+
+### Additional Resources
+
+- [Dagster Documentation](https://docs.dagster.io/)
+- [Embedded ELT Documentation](https://docs.dagster.io/integrations/embedded-elt)
+- [Sling Documentation](https://docs.slingdata.io/)
+- [Snowflake Documentation](https://docs.snowflake.com/)
+
+### Next Steps
+
+- Explore more advanced configurations and transformations using Sling.
+- Integrate additional data sources and targets to expand your data pipeline.
+- Implement monitoring and alerting for your data ingestion pipeline.
+
+By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
\ No newline at end of file
diff --git a/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.py b/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.py
new file mode 100644
index 0000000000000..74975dd9a84a1
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.py
@@ -0,0 +1,47 @@
+# pyright: reportCallIssue=none
+# pyright: reportOptionalMemberAccess=none
+
+from dagster import Definitions, EnvVar
+from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource, sling_assets
+
+# Step 2: Define Sling Connections
+s3_connection = SlingConnectionResource(
+ name="MY_S3",
+ type="s3",
+ bucket="your-s3-bucket",
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
+)
+
+snowflake_connection = SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host="your-snowflake-host",
+ user="your-snowflake-user",
+ database="your-snowflake-database",
+ password=EnvVar("SNOWFLAKE_PASSWORD"),
+ role="your-snowflake-role",
+)
+
+sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
+
+# Step 3: Define the Data Ingestion Asset
+replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SNOWFLAKE",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://your-s3-bucket/your-file.csv": {
+ "object": "your_snowflake_schema.your_table",
+ "primary_key": "id",
+ },
+ },
+}
+
+
+@sling_assets(replication_config=replication_config)
+def ingest_s3_to_snowflake(context, sling: SlingResource):
+ yield from sling.replicate(context=context)
+
+
+defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
diff --git a/examples/use_case_repository/use_case_repository_tests/__init__.py b/examples/use_case_repository/use_case_repository_tests/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/examples/use_case_repository/use_case_repository_tests/test_use_cases.py b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
index 8625176e85f58..0bb99c30a46f3 100644
--- a/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
+++ b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
@@ -1,2 +1,6 @@
-def test_use_cases():
- assert True
+from use_case_repository.snowflake_to_s3_embedded_elt import ingest_s3_to_snowflake, sling_resource
+
+
+def test_snowflake_to_s3_embedded_elt():
+ assert ingest_s3_to_snowflake
+ assert sling_resource
diff --git a/examples/use_case_repository/webserver/main.py b/examples/use_case_repository/webserver/main.py
new file mode 100644
index 0000000000000..d2dee0d02b6f7
--- /dev/null
+++ b/examples/use_case_repository/webserver/main.py
@@ -0,0 +1,259 @@
+import os
+
+import frontmatter
+import markdown
+from dagster import file_relative_path
+from flask import Flask, abort, render_template_string
+from pygments.formatters import HtmlFormatter
+
+app = Flask(__name__)
+
+USE_CASES_DIR = file_relative_path(__file__, "../use_case_repository")
+
+
+@app.route("/")
+def index():
+ use_cases = []
+ for filename in os.listdir(USE_CASES_DIR):
+ if filename.endswith(".md"):
+ with open(os.path.join(USE_CASES_DIR, filename), "r") as f:
+ post = frontmatter.load(f)
+ use_cases.append(
+ {
+ "title": post.get("title", "Untitled"),
+ "description": post.get("description", ""),
+ "tags": post.get("tags", []),
+ "slug": filename[:-3],
+ }
+ )
+
+ return render_template_string(
+ """
+
+
+
+
+
+ Use Case Library
+
+
+
+ Use Case Library
+
+
+
+ """,
+ use_cases=use_cases,
+ )
+
+
+@app.route("/use-case/")
+def use_case(slug):
+ try:
+ with open(os.path.join(USE_CASES_DIR, f"{slug}.md"), "r") as f:
+ post = frontmatter.load(f)
+ md = markdown.Markdown(
+ extensions=["pymdownx.superfences", "pymdownx.inlinehilite", "pymdownx.highlight"]
+ )
+ html_content = md.convert(post.content)
+ pygments_css = HtmlFormatter(style="default").get_style_defs(".codehilite")
+ return render_template_string(
+ """
+
+
+
+
+
+ {{ title }} | Use Case Library
+
+
+
+ ← Back to Index
+ {{ title }}
+ {{ description }}
+ Tags: {{ tags|join(', ') }}
+
+ {{ content|safe }}
+
+
+
+ """,
+ title=post.get("title", "Untitled"),
+ description=post.get("description", ""),
+ tags=post.get("tags", []),
+ content=html_content,
+ pygments_css=pygments_css,
+ )
+ except FileNotFoundError:
+ abort(404)
+
+
+if __name__ == "__main__":
+ app.run(debug=True)
diff --git a/pyright/alt-1/requirements-pinned.txt b/pyright/alt-1/requirements-pinned.txt
index 53fcc38da970b..6d15999394a54 100644
--- a/pyright/alt-1/requirements-pinned.txt
+++ b/pyright/alt-1/requirements-pinned.txt
@@ -1,7 +1,7 @@
agate==1.9.1
aiobotocore==2.13.1
aiofile==3.8.8
-aiohappyeyeballs==2.3.4
+aiohappyeyeballs==2.3.5
aiohttp==3.10.1
aioitertools==0.11.0
aiosignal==1.3.1
@@ -92,13 +92,13 @@ frozenlist==1.4.1
fsspec==2024.3.1
gcsfs==2024.3.1
google-api-core==2.19.1
-google-api-python-client==2.139.0
-google-auth==2.32.0
+google-api-python-client==2.140.0
+google-auth==2.33.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.1
google-cloud-bigquery==3.25.0
google-cloud-core==2.4.1
-google-cloud-storage==2.18.0
+google-cloud-storage==2.18.1
google-crc32c==1.5.0
google-resumable-media==2.7.1
googleapis-common-protos==1.63.2
@@ -116,7 +116,7 @@ httplib2==0.22.0
httptools==0.6.1
httpx==0.27.0
humanfriendly==10.0
-hypothesis==6.108.10
+hypothesis==6.110.0
idna==3.7
importlib-metadata==6.11.0
iniconfig==2.0.0
@@ -154,14 +154,14 @@ mako==1.3.5
markdown-it-py==3.0.0
markupsafe==2.1.5
mashumaro==3.13.1
-matplotlib==3.9.0
+matplotlib==3.9.1.post1
matplotlib-inline==0.1.7
mccabe==0.7.0
mdurl==0.1.2
minimal-snowplow-tracker==0.0.2
mistune==3.0.2
mock==3.0.5
-more-itertools==10.3.0
+more-itertools==10.4.0
morefs==0.2.2
msgpack==1.0.8
multidict==6.0.5
@@ -185,7 +185,7 @@ orjson==3.10.6
overrides==7.7.0
packaging==24.1
pandas==2.2.2
-pandas-stubs==2.2.2.240603
+pandas-stubs==2.2.2.240807
pandera==0.20.3
pandocfilters==1.5.1
parsedatetime==2.6
@@ -234,9 +234,9 @@ python-json-logger==2.0.7
python-slugify==8.0.4
pytimeparse==1.1.8
pytz==2024.1
-pyyaml==6.0.1
+pyyaml==6.0.2
pyzmq==26.1.0
-rapidfuzz==3.9.5
+rapidfuzz==3.9.6
referencing==0.35.1
requests==2.32.3
requests-oauthlib==2.0.0
@@ -279,7 +279,7 @@ tomli==2.0.1
tomlkit==0.13.0
toposort==1.10
tornado==6.4.1
-tox==4.17.0
+tox==4.17.1
tqdm==4.66.5
traitlets==5.14.3
typeguard==4.3.0
@@ -316,7 +316,7 @@ uvicorn==0.30.5
uvloop==0.19.0
virtualenv==20.26.3
watchdog==4.0.1
-watchfiles==0.22.0
+watchfiles==0.23.0
wcwidth==0.2.13
webcolors==24.6.0
webencodings==0.5.1
diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt
index fef1f961acd3c..48e198fee04d1 100644
--- a/pyright/master/requirements-pinned.txt
+++ b/pyright/master/requirements-pinned.txt
@@ -2,7 +2,7 @@ acryl-datahub==0.13.3.6
agate==1.9.1
aiofile==3.8.8
aiofiles==24.1.0
-aiohappyeyeballs==2.3.4
+aiohappyeyeballs==2.3.5
aiohttp==3.10.1
aiohttp-retry==2.8.3
aiosignal==1.3.1
@@ -15,13 +15,13 @@ ansicolors==1.1.8
anyio==4.4.0
apache-airflow==2.7.3
apache-airflow-providers-apache-spark==4.9.0
-apache-airflow-providers-cncf-kubernetes==8.3.3
-apache-airflow-providers-common-sql==1.14.2
+apache-airflow-providers-cncf-kubernetes==8.3.4
+apache-airflow-providers-common-sql==1.15.0
apache-airflow-providers-docker==3.9.1
-apache-airflow-providers-ftp==3.10.0
+apache-airflow-providers-ftp==3.10.1
apache-airflow-providers-http==4.1.0
apache-airflow-providers-imap==3.6.1
-apache-airflow-providers-sqlite==3.8.1
+apache-airflow-providers-sqlite==3.8.2
apeye==1.4.1
apeye-core==1.1.5
apispec==6.6.1
@@ -47,7 +47,7 @@ aws-sam-translator==1.89.0
aws-xray-sdk==2.14.0
azure-core==1.30.2
azure-identity==1.17.1
-azure-storage-blob==12.21.0
+azure-storage-blob==12.22.0
azure-storage-file-datalake==12.16.0
babel==2.15.0
backoff==2.2.1
@@ -58,8 +58,8 @@ bitmath==1.3.3.1
bleach==6.1.0
blinker==1.8.2
bokeh==3.5.1
-boto3==1.34.154
-botocore==1.34.154
+boto3==1.34.155
+botocore==1.34.155
buildkite-test-collector==0.1.8
cachecontrol==0.14.0
cached-property==1.5.2
@@ -163,8 +163,8 @@ daff==1.3.46
-e python_modules/libraries/dagster-wandb
-e python_modules/dagster-webserver
-e python_modules/libraries/dagstermill
-dask==2024.7.1
-dask-expr==1.1.9
+dask==2024.8.0
+dask-expr==1.1.10
dask-jobqueue==0.8.5
dask-kubernetes==2022.9.0
dask-yarn==0.9
@@ -192,7 +192,7 @@ dict2css==0.3.0.post1
diff-match-patch==20200713
dill==0.3.8
distlib==0.3.8
-distributed==2024.7.1
+distributed==2024.8.0
distro==1.9.0
dlt==0.5.2
dnspython==2.6.1
@@ -235,13 +235,13 @@ gitdb==4.0.11
gitpython==3.1.43
giturlparse==0.12.0
google-api-core==2.19.1
-google-api-python-client==2.139.0
-google-auth==2.32.0
+google-api-python-client==2.140.0
+google-auth==2.33.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.1
google-cloud-bigquery==3.25.0
google-cloud-core==2.4.1
-google-cloud-storage==2.18.0
+google-cloud-storage==2.18.1
google-crc32c==1.5.0
google-re2==1.1.20240702
google-resumable-media==2.7.1
@@ -266,7 +266,7 @@ httptools==0.6.1
httpx==0.27.0
humanfriendly==10.0
humanize==4.10.0
-hypothesis==6.108.10
+hypothesis==6.110.0
idna==3.7
ijson==3.3.0
imagesize==1.4.1
@@ -314,7 +314,7 @@ kubernetes==30.1.0
kubernetes-asyncio==30.1.0
langchain==0.2.7
langchain-community==0.2.7
-langchain-core==0.2.28
+langchain-core==0.2.29
langchain-openai==0.1.14
langchain-text-splitters==0.2.2
langsmith==0.1.98
@@ -335,7 +335,7 @@ marshmallow==3.21.3
marshmallow-oneofschema==3.1.1
marshmallow-sqlalchemy==0.26.1
mashumaro==3.13.1
-matplotlib==3.9.0
+matplotlib==3.9.1.post1
matplotlib-inline==0.1.3
mbstrdecoder==1.1.3
mdit-py-plugins==0.4.1
@@ -345,7 +345,7 @@ mistune==3.0.2
mixpanel==4.10.1
mlflow==1.27.0
mock==3.0.5
-more-itertools==10.3.0
+more-itertools==10.4.0
morefs==0.2.2
moto==4.2.14
mpmath==1.3.0
@@ -375,7 +375,7 @@ objgraph==3.6.1
onnx==1.16.2
onnxconverter-common==1.13.0
onnxruntime==1.18.1
-openai==1.40.0
+openai==1.40.1
openapi-schema-validator==0.6.2
openapi-spec-validator==0.7.1
opentelemetry-api==1.26.0
@@ -393,7 +393,7 @@ overrides==7.7.0
packaging==24.1
pandas==2.2.2
pandas-gbq==0.23.1
-pandas-stubs==2.2.2.240603
+pandas-stubs==2.2.2.240807
pandera==0.20.3
pandocfilters==1.5.1
papermill==2.6.0
@@ -441,6 +441,7 @@ pydata-google-auth==1.8.2
pyflakes==3.2.0
pygments==2.18.0
pyjwt==2.9.0
+pymdown-extensions==10.9
pynacl==1.5.0
pyopenssl==24.2.1
pyparsing==3.1.2
@@ -459,6 +460,7 @@ pytest-xdist==3.6.1
python-daemon==3.0.1
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
+python-frontmatter==1.1.0
python-jose==3.3.0
python-json-logger==2.0.7
python-multipart==0.0.9
@@ -468,10 +470,10 @@ python-utils==3.8.2
pytimeparse==1.1.8
pytz==2024.1
pytzdata==2020.1
-pyyaml==6.0.1
+pyyaml==6.0.2
pyzmq==26.1.0
querystring-parser==1.2.4
-rapidfuzz==3.9.5
+rapidfuzz==3.9.6
readme-renderer==44.0
referencing==0.35.1
regex==2024.7.24
@@ -489,11 +491,12 @@ rich-argparse==1.5.2
rpds-py==0.20.0
rsa==4.9
ruamel-yaml==0.17.17
+ruff==0.5.6
s3transfer==0.10.2
scikit-learn==1.5.1
scipy==1.14.0
scrapbook==0.5.0
-sdf-cli==0.3.12
+sdf-cli==0.3.16
seaborn==0.13.2
selenium==4.23.1
semver==3.0.2
@@ -520,7 +523,7 @@ soupsieve==2.5
sphinx==8.0.2
sphinx-autodoc-typehints==2.2.3
sphinx-jinja2-compat==0.3.0
-sphinx-prompt==1.5.0
+sphinx-prompt==1.9.0
sphinx-tabs==3.4.5
sphinx-toolbox==3.7.0
sphinxcontrib-applehelp==2.0.0
@@ -603,13 +606,14 @@ universal-pathlib==0.2.2
uri-template==1.3.0
uritemplate==4.1.1
urllib3==1.26.19
+-e examples/use_case_repository
uvicorn==0.30.5
uvloop==0.19.0
vine==5.1.0
virtualenv==20.25.0
wandb==0.17.5
watchdog==4.0.1
-watchfiles==0.22.0
+watchfiles==0.23.0
wcwidth==0.2.13
webcolors==24.6.0
webencodings==0.5.1
diff --git a/pyright/master/requirements.txt b/pyright/master/requirements.txt
index 3b76a960c7534..a39dfc76242b3 100644
--- a/pyright/master/requirements.txt
+++ b/pyright/master/requirements.txt
@@ -130,3 +130,4 @@ pendulum<3
-e examples/experimental/dagster-blueprints
-e examples/experimental/dagster-airlift[mwaa,dbt,test] # (includes airflow dependencies)
-e examples/experimental/dagster-airlift/examples/peering-with-dbt
+-e examples/use_case_repository[dev]
From 8025d727d5b6ec4723629902257a16d5cb02d6f5 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Tue, 6 Aug 2024 21:54:48 -0700
Subject: [PATCH 03/13] move files
---
.../use_case_repository/{ => guides}/__init__.py | 0
.../use_case_repository/{ => guides}/external_script.sh | 0
.../use_case_repository/{ => guides}/pipes_cli_command.md | 0
.../use_case_repository/{ => guides}/pipes_cli_command.py | 0
.../{ => guides}/snowflake_to_s3_embedded_elt.md | 0
.../{ => guides}/snowflake_to_s3_embedded_elt.py | 0
6 files changed, 0 insertions(+), 0 deletions(-)
rename examples/use_case_repository/use_case_repository/{ => guides}/__init__.py (100%)
rename examples/use_case_repository/use_case_repository/{ => guides}/external_script.sh (100%)
rename examples/use_case_repository/use_case_repository/{ => guides}/pipes_cli_command.md (100%)
rename examples/use_case_repository/use_case_repository/{ => guides}/pipes_cli_command.py (100%)
rename examples/use_case_repository/use_case_repository/{ => guides}/snowflake_to_s3_embedded_elt.md (100%)
rename examples/use_case_repository/use_case_repository/{ => guides}/snowflake_to_s3_embedded_elt.py (100%)
diff --git a/examples/use_case_repository/use_case_repository/__init__.py b/examples/use_case_repository/use_case_repository/guides/__init__.py
similarity index 100%
rename from examples/use_case_repository/use_case_repository/__init__.py
rename to examples/use_case_repository/use_case_repository/guides/__init__.py
diff --git a/examples/use_case_repository/use_case_repository/external_script.sh b/examples/use_case_repository/use_case_repository/guides/external_script.sh
similarity index 100%
rename from examples/use_case_repository/use_case_repository/external_script.sh
rename to examples/use_case_repository/use_case_repository/guides/external_script.sh
diff --git a/examples/use_case_repository/use_case_repository/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
similarity index 100%
rename from examples/use_case_repository/use_case_repository/pipes_cli_command.md
rename to examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
diff --git a/examples/use_case_repository/use_case_repository/pipes_cli_command.py b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.py
similarity index 100%
rename from examples/use_case_repository/use_case_repository/pipes_cli_command.py
rename to examples/use_case_repository/use_case_repository/guides/pipes_cli_command.py
diff --git a/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
similarity index 100%
rename from examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.md
rename to examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
diff --git a/examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.py b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.py
similarity index 100%
rename from examples/use_case_repository/use_case_repository/snowflake_to_s3_embedded_elt.py
rename to examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.py
From 188544dddb3d021d513e490a0be5f08b9b2a52c3 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Tue, 6 Aug 2024 22:00:54 -0700
Subject: [PATCH 04/13] run prettier
---
examples/use_case_repository/README.md | 4 +-
.../guides/pipes_cli_command.md | 72 ++++++-----
.../guides/snowflake_to_s3_embedded_elt.md | 120 +++++++++---------
.../use_case_repository/webserver/main.py | 4 +-
4 files changed, 105 insertions(+), 95 deletions(-)
diff --git a/examples/use_case_repository/README.md b/examples/use_case_repository/README.md
index 06ace7020faf7..a190c13f71a8b 100644
--- a/examples/use_case_repository/README.md
+++ b/examples/use_case_repository/README.md
@@ -30,8 +30,8 @@ When updating a use case, please make sure to modify both the Markdown and Pytho
### Local Preview
-To preview your changes locally before committing, you can start a local webserver by running the following command in your terminal:
+To preview your changes locally before committing, you can start a local webserver by running the following command in your terminal:
```
make webserver
-```
\ No newline at end of file
+```
diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
index 9b8bf5364cdde..c31c8eaab42c1 100644
--- a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
+++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
@@ -3,6 +3,7 @@ title: "Using Dagster Pipes Subprocess to Run a CLI Command"
description: "This use case demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. The objective is to execute non-Python workloads and integrate their outputs into Dagster's data pipeline."
tags: ["dagster pipes", "subprocess", "CLI"]
---
+
## Running CLI Commands with Dagster Pipes
### Overview
@@ -17,6 +18,7 @@ This guide demonstrates how to use Dagster Pipes to run a CLI command within a D
### What You’ll Learn
You will learn how to:
+
- Define a Dagster asset that invokes a CLI command.
- Use Dagster Pipes to manage subprocess execution.
- Capture and utilize the output of the CLI command within Dagster.
@@ -24,42 +26,46 @@ You will learn how to:
### Steps to Implement With Dagster
1. **Step 1: Define the CLI Command Script**
- - Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content:
- ```bash
- #!/bin/bash
- echo "Hello from CLI"
- ```
+
+ - Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content:
+
+ ```bash
+ #!/bin/bash
+ echo "Hello from CLI"
+ ```
2. **Step 2: Define the Dagster Asset**
- - Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
- ```python
- import shutil
- from dagster import asset, Definitions, AssetExecutionContext
- from dagster_pipes import PipesSubprocessClient
-
- @asset
- def cli_command_asset(
- context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
- ):
- cmd = [shutil.which("bash"), "external_script.sh"]
- return pipes_subprocess_client.run(
- command=cmd,
- context=context,
- env={"MY_ENV_VAR": "example_value"},
- ).get_materialize_result()
-
- defs = Definitions(
- assets=[cli_command_asset],
- resources={"pipes_subprocess_client": PipesSubprocessClient()},
- )
- ```
+
+ - Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
+
+ ```python
+ import shutil
+ from dagster import asset, Definitions, AssetExecutionContext
+ from dagster_pipes import PipesSubprocessClient
+
+ @asset
+ def cli_command_asset(
+ context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
+ ):
+ cmd = [shutil.which("bash"), "external_script.sh"]
+ return pipes_subprocess_client.run(
+ command=cmd,
+ context=context,
+ env={"MY_ENV_VAR": "example_value"},
+ ).get_materialize_result()
+
+ defs = Definitions(
+ assets=[cli_command_asset],
+ resources={"pipes_subprocess_client": PipesSubprocessClient()},
+ )
+ ```
3. **Step 3: Configure and Run the Asset**
- - Ensure the script is executable and run the Dagster asset to see the output.
- ```bash
- chmod +x external_script.sh
- dagit -f path_to_your_dagster_file.py
- ```
+ - Ensure the script is executable and run the Dagster asset to see the output.
+ ```bash
+ chmod +x external_script.sh
+ dagit -f path_to_your_dagster_file.py
+ ```
### Expected Outcomes
@@ -101,4 +107,4 @@ def another_asset():
defs = Definitions(
assets=[example_asset]
)
-```
\ No newline at end of file
+```
diff --git a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
index f695d47550bb6..699f7d3445a3e 100644
--- a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
+++ b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
@@ -3,6 +3,7 @@ title: "Ingesting Data from S3 to Snowflake with Dagster and Sling"
description: "This use case demonstrates how to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The objective is to automate the data ingestion process for efficient data management and analysis."
tags: ["snowflake", "s3", "dagster", "sling", "data ingestion"]
---
+
## Ingesting Data from S3 to Snowflake with Dagster and Sling
### Overview
@@ -30,68 +31,71 @@ By following this guide, you will learn how to:
### Steps to Implement With Dagster
1. **Step 1: Install Required Packages**
- - Ensure you have the necessary Python packages installed.
- - Install Dagster and dagster-embedded-elt using pip.
- ```bash
- pip install dagster dagster-embedded-elt
- ```
+ - Ensure you have the necessary Python packages installed.
+ - Install Dagster and dagster-embedded-elt using pip.
+
+ ```bash
+ pip install dagster dagster-embedded-elt
+ ```
2. **Step 2: Define Sling Connections**
- - Define the connections to S3 and Snowflake using SlingConnectionResource.
- - Use environment variables to securely manage sensitive information.
-
- ```python
- from dagster import EnvVar
- from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource
-
- s3_connection = SlingConnectionResource(
- name="MY_S3",
- type="s3",
- bucket="your-s3-bucket",
- access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
- secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
- )
-
- snowflake_connection = SlingConnectionResource(
- name="MY_SNOWFLAKE",
- type="snowflake",
- host="your-snowflake-host",
- user="your-snowflake-user",
- database="your-snowflake-database",
- password=EnvVar("SNOWFLAKE_PASSWORD"),
- role="your-snowflake-role",
- )
-
- sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
- ```
+
+ - Define the connections to S3 and Snowflake using SlingConnectionResource.
+ - Use environment variables to securely manage sensitive information.
+
+ ```python
+ from dagster import EnvVar
+ from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource
+
+ s3_connection = SlingConnectionResource(
+ name="MY_S3",
+ type="s3",
+ bucket="your-s3-bucket",
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
+ )
+
+ snowflake_connection = SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host="your-snowflake-host",
+ user="your-snowflake-user",
+ database="your-snowflake-database",
+ password=EnvVar("SNOWFLAKE_PASSWORD"),
+ role="your-snowflake-role",
+ )
+
+ sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
+ ```
3. **Step 3: Define the Data Ingestion Asset**
- - Use the `@sling_assets` decorator to define an asset that runs the Sling replication job.
- - Configure the replication settings to specify the source and target.
-
- ```python
- from dagster import asset, Definitions
- from dagster_embedded_elt.sling import sling_assets
-
- replication_config = {
- "SOURCE": "MY_S3",
- "TARGET": "MY_SNOWFLAKE",
- "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
- "streams": {
- "s3://your-s3-bucket/your-file.csv": {
- "object": "your_snowflake_schema.your_table",
- "primary_key": "id",
- },
- },
- }
-
- @sling_assets(replication_config=replication_config)
- def ingest_s3_to_snowflake(context, sling: SlingResource):
- yield from sling.replicate(context=context)
-
- defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
- ```
+
+ - Use the `@sling_assets` decorator to define an asset that runs the Sling replication job.
+ - Configure the replication settings to specify the source and target.
+
+ ```python
+ from dagster import asset, Definitions
+ from dagster_embedded_elt.sling import sling_assets
+
+ replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SNOWFLAKE",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://your-s3-bucket/your-file.csv": {
+ "object": "your_snowflake_schema.your_table",
+ "primary_key": "id",
+ },
+ },
+ }
+
+ @sling_assets(replication_config=replication_config)
+ def ingest_s3_to_snowflake(context, sling: SlingResource):
+ yield from sling.replicate(context=context)
+
+ defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
+ ```
### Expected Outcomes
@@ -116,4 +120,4 @@ After implementing this use case, you should have an automated pipeline that ing
- Integrate additional data sources and targets to expand your data pipeline.
- Implement monitoring and alerting for your data ingestion pipeline.
-By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
\ No newline at end of file
+By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
diff --git a/examples/use_case_repository/webserver/main.py b/examples/use_case_repository/webserver/main.py
index d2dee0d02b6f7..1191994bb1bdd 100644
--- a/examples/use_case_repository/webserver/main.py
+++ b/examples/use_case_repository/webserver/main.py
@@ -8,7 +8,7 @@
app = Flask(__name__)
-USE_CASES_DIR = file_relative_path(__file__, "../use_case_repository")
+USE_CASES_DIR = file_relative_path(__file__, "../use_case_repository/guides")
@app.route("/")
@@ -256,4 +256,4 @@ def use_case(slug):
if __name__ == "__main__":
- app.run(debug=True)
+ app.run(debug=True, port=3001)
From 571ae3d246b9b7afda1183995f4779e8adff6373 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Tue, 6 Aug 2024 22:04:31 -0700
Subject: [PATCH 05/13] add use case test
---
examples/use_case_repository/tox.ini | 2 +-
.../use_case_repository_tests/test_use_cases.py | 10 +++++++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/examples/use_case_repository/tox.ini b/examples/use_case_repository/tox.ini
index a79e67a85984b..ba652eb9fe2c8 100644
--- a/examples/use_case_repository/tox.ini
+++ b/examples/use_case_repository/tox.ini
@@ -10,8 +10,8 @@ install_command = uv pip install {opts} {packages}
deps =
source: -e ../../python_modules/dagster[test]
source: -e ../../python_modules/dagster-pipes
+ source: -e .[dev]
pypi: dagster[test]
- -e .[dev]
allowlist_externals =
/bin/bash
uv
diff --git a/examples/use_case_repository/use_case_repository_tests/test_use_cases.py b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
index 0bb99c30a46f3..ad92479158d8e 100644
--- a/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
+++ b/examples/use_case_repository/use_case_repository_tests/test_use_cases.py
@@ -1,6 +1,14 @@
-from use_case_repository.snowflake_to_s3_embedded_elt import ingest_s3_to_snowflake, sling_resource
+from use_case_repository.guides.pipes_cli_command import cli_command_asset
+from use_case_repository.guides.snowflake_to_s3_embedded_elt import (
+ ingest_s3_to_snowflake,
+ sling_resource,
+)
def test_snowflake_to_s3_embedded_elt():
assert ingest_s3_to_snowflake
assert sling_resource
+
+
+def test_pipes_cli_command():
+ assert cli_command_asset
From 1583412d3feedf2d9f9a3b64caa782586da8a005 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Wed, 7 Aug 2024 10:59:54 -0700
Subject: [PATCH 06/13] ignore use case repository
---
python_modules/dagster/dagster/_generate/download.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/python_modules/dagster/dagster/_generate/download.py b/python_modules/dagster/dagster/_generate/download.py
index c8edbced3901d..acc73a81b09ee 100644
--- a/python_modules/dagster/dagster/_generate/download.py
+++ b/python_modules/dagster/dagster/_generate/download.py
@@ -9,7 +9,7 @@
from .generate import _should_skip_file
# Examples aren't that can't be downloaded from the dagster project CLI
-EXAMPLES_TO_IGNORE = ["docs_snippets", "experimental", "temp_pins.txt"]
+EXAMPLES_TO_IGNORE = ["docs_snippets", "experimental", "temp_pins.txt", "use_case_repository"]
# Hardcoded list of available examples. The list is tested against the examples folder in this mono
# repo to make sure it's up-to-date.
AVAILABLE_EXAMPLES = [
From 1a21daeda42add434b652beed81e55d46d62da5e Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Wed, 7 Aug 2024 11:21:39 -0700
Subject: [PATCH 07/13] fix tox?
---
examples/use_case_repository/tox.ini | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/examples/use_case_repository/tox.ini b/examples/use_case_repository/tox.ini
index ba652eb9fe2c8..8a579ac4e097d 100644
--- a/examples/use_case_repository/tox.ini
+++ b/examples/use_case_repository/tox.ini
@@ -5,17 +5,17 @@ skipsdist = true
download = true
passenv =
CI_*
+ BUILDKITE*
COVERALLS_REPO_TOKEN
install_command = uv pip install {opts} {packages}
deps =
source: -e ../../python_modules/dagster[test]
source: -e ../../python_modules/dagster-pipes
- source: -e .[dev]
pypi: dagster[test]
+ -e .[dev]
allowlist_externals =
/bin/bash
uv
- pytest
commands =
source: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
pytest -c ../../pyproject.toml -vv
From c88806cd2d3b22057b26255c475c438e3256a9a3 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Wed, 7 Aug 2024 15:25:48 -0700
Subject: [PATCH 08/13] address comments
---
.../guides/pipes_cli_command.md | 36 ++++---------------
.../guides/snowflake_to_s3_embedded_elt.md | 6 ++--
2 files changed, 10 insertions(+), 32 deletions(-)
diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
index c31c8eaab42c1..83c3e4dbf242a 100644
--- a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
+++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
@@ -13,7 +13,6 @@ This guide demonstrates how to use Dagster Pipes to run a CLI command within a D
### Prerequisites
- Dagster and Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info.
-- An existing CLI command or script that you want to run.
### What You’ll Learn
@@ -32,16 +31,19 @@ You will learn how to:
```bash
#!/bin/bash
echo "Hello from CLI"
+ echo "My env var is: ${MY_ENV_VAR}"
```
2. **Step 2: Define the Dagster Asset**
- Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
- ```python
+ Save the following file to `dagster_pipes_cli.py`
+
+ ```python
import shutil
- from dagster import asset, Definitions, AssetExecutionContext
- from dagster_pipes import PipesSubprocessClient
+
+ from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset
@asset
def cli_command_asset(
@@ -64,7 +66,7 @@ You will learn how to:
- Ensure the script is executable and run the Dagster asset to see the output.
```bash
chmod +x external_script.sh
- dagit -f path_to_your_dagster_file.py
+ dagster dev -f path_to_your_dagster_file.py
```
### Expected Outcomes
@@ -84,27 +86,3 @@ By following these steps, you will have a Dagster asset that successfully runs a
### Next Steps
Explore more advanced use cases with Dagster Pipes, such as integrating with other command-line tools or handling more complex workflows.
-
-The Steps MUST always be pythonic Dagster code. If the documentation includes @solids or @ops and repository, discard it. The documentation should only use the new Dagster APIs, such as @asset and Definitions.
-
-Use as many steps as necessary. 3 is the minimum number of steps.
-
-Do not add an `if name == __main__` block. Do not call the `materialize` function. Only provide the definition for the assets. Avoid the following words: certainly, simply, robust, ensure
-
-Here is a minimal Dagster code:
-
-```python
-from dagster import asset, Definitions, materialize
-
-@asset
-def example_asset():
- return "Example output"
-
-@asset(deps=[example_asset])
-def another_asset():
- return "Example output"
-
-defs = Definitions(
- assets=[example_asset]
-)
-```
diff --git a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
index 699f7d3445a3e..5f6c2f99a25f3 100644
--- a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
+++ b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
@@ -33,10 +33,10 @@ By following this guide, you will learn how to:
1. **Step 1: Install Required Packages**
- Ensure you have the necessary Python packages installed.
- - Install Dagster and dagster-embedded-elt using pip.
+ - Install Dagster and the Dagster UI (`dagster-webserver`) and dagster-embedded-elt using pip. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info
```bash
- pip install dagster dagster-embedded-elt
+ pip install dagster dagster-embedded-elt dagster-webserver
```
2. **Step 2: Define Sling Connections**
@@ -75,7 +75,7 @@ By following this guide, you will learn how to:
- Configure the replication settings to specify the source and target.
```python
- from dagster import asset, Definitions
+ from dagster import Definitions
from dagster_embedded_elt.sling import sling_assets
replication_config = {
From bcce61fc618d88a6debbf630fa5443afe68fd637 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Wed, 7 Aug 2024 16:05:56 -0700
Subject: [PATCH 09/13] prettify
---
.../use_case_repository/guides/pipes_cli_command.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
index 83c3e4dbf242a..00dd8f9aba06b 100644
--- a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
+++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
@@ -38,9 +38,9 @@ You will learn how to:
- Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
- Save the following file to `dagster_pipes_cli.py`
+ Save the following file to `dagster_pipes_cli.py`
- ```python
+ ```python
import shutil
from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset
From 8c32d7d68aa10d6f651beada9d07e39e60acc7fe Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Thu, 8 Aug 2024 14:19:38 -0700
Subject: [PATCH 10/13] add template and update use cases
---
examples/use_case_repository/README.md | 5 +-
.../use_case_repository/guides/_TEMPLATE.md | 102 +++++++++++++
.../guides/pipes_cli_command.md | 116 +++++++-------
.../guides/snowflake_to_s3_embedded_elt.md | 143 ++++++++++--------
.../use_case_repository/webserver/main.py | 2 +-
5 files changed, 248 insertions(+), 120 deletions(-)
create mode 100644 examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
diff --git a/examples/use_case_repository/README.md b/examples/use_case_repository/README.md
index a190c13f71a8b..75f84b256e968 100644
--- a/examples/use_case_repository/README.md
+++ b/examples/use_case_repository/README.md
@@ -6,7 +6,7 @@ This repository contains a collection of use cases that demonstrate various appl
The use cases in this repository serve two main purposes:
-1. They are used to populate the list of available use cases on the Dagster.io website.
+1. They're used to populate the list of available use cases on the Dagster.io website.
2. They provide practical examples for developers and data engineers working with Dagster.
### Integration with Dagster.io
@@ -24,6 +24,9 @@ Each use case consists of two main components:
Both files are utilized on the Dagster.io website. However, only the Python files are subject to automated testing.
+The TEMPLATE.md file is used to create new use cases. The actual template lives on our external
+Scout platform.
+
### Important Note
When updating a use case, please make sure to modify both the Markdown and Python files to maintain consistency between the documentation and the implementation.
diff --git a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
new file mode 100644
index 0000000000000..374f30bf59a77
--- /dev/null
+++ b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
@@ -0,0 +1,102 @@
+---
+title: "Snowflake to S3 ETL with Dagster"
+description: "This use case demonstrates how to transfer data from Snowflake to Amazon S3 using Dagster. The objective is to automate the extraction of data from Snowflake and store it in S3 for further processing or archival."
+tags: ["snowflake", "s3" ]
+---
+
+# [Insert a Use Case Name that has high SEO Value here]
+
+Provide a brief overview of the use case, including its objectives and the main problem it addresses. All use cases must use Dagster to accomplish tasks.
+
+---
+
+## What You'll Learn
+
+You will learn how to:
+
+ - Define a Dagster asset that extracts data from an external source and writes it to a database
+ - Add other bullets here
+ - ...
+
+---
+
+## Prerequisites
+
+To follow the steps in this guide, you will need:
+
+- To have Dagster and the Dagster UI installed. Refer to the [Dagster Installation Guide](https://docs.dagster.io/getting-started/installation) for instructions.
+- A basic understanding of Dagster. Refer to the [Dagster Documentation](https://docs.dagster.io/getting-started/what-why-dagster) for more information.
+- List other prerequisites here
+
+
+---
+
+## Steps to Implement With Dagster
+
+By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline.
+
+### Step 1: Enter the Name of Step 1 Here
+
+Provide a brief description of what this step does. Prefer a small, working Dagster
+example as a first step. Here is an example of what this might look like:
+
+```python
+ from dagster import (
+ asset,
+ DailyPartitionsDefinition,
+ AssetExecutionContext,
+ Definitions,
+ )
+
+ import datetime
+
+ # Define the partitions
+ partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
+
+
+ @asset(partitions_def=partitions_def)
+ def upstream_asset(context: AssetExecutionContext):
+ with open(f"data-{partition_date}.csv", "w") as f:
+ f.write(f"Data for partition {partition_date}")
+
+ snapshot_date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
+ with open(f"data-{snapshot_date}.csv", "w") as f:
+ f.write(f"Data for partition {partition_date}")
+
+
+ defs = Definitions(assets=[upstream_asset])
+```
+
+### Step 2: Enter the Name of Step 2 Here
+
+Provide a brief description of what this step does.
+
+
+### Step 3: Enter the Name of Step 3 Here
+
+Provide a brief description of what this step does.
+
+---
+
+## Expected Outcomes
+
+Describe the expected outcomes of implementing the use case. Include any results or metrics that indicate success.
+
+---
+
+## Troubleshooting
+
+Provide solutions to common issues that might arise while implementing the use case.
+
+
+---
+
+## Next Steps
+
+What should the person try next after this?
+
+---
+
+## Additional Resources
+
+List any additional resources, such as documentation, tutorials, or community links, that could help users implement the use case.
\ No newline at end of file
diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
index 00dd8f9aba06b..2c7506cd94f33 100644
--- a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
+++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
@@ -4,85 +4,97 @@ description: "This use case demonstrates how to use Dagster Pipes to run a CLI c
tags: ["dagster pipes", "subprocess", "CLI"]
---
-## Running CLI Commands with Dagster Pipes
-
-### Overview
+# Running CLI Commands with Dagster Pipes
This guide demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. This is useful for integrating non-Python workloads, such as Bash scripts or other command-line tools, into your Dagster data pipeline.
-### Prerequisites
-
-- Dagster and Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info.
+---
-### What You’ll Learn
+## What You’ll Learn
You will learn how to:
- Define a Dagster asset that invokes a CLI command.
- Use Dagster Pipes to manage subprocess execution.
-- Capture and utilize the output of the CLI command within Dagster.
+- Capture and use the output of the CLI command within Dagster.
+
+---
+
+## Prerequisites
-### Steps to Implement With Dagster
+To follow the steps in this guide, you'll need:
-1. **Step 1: Define the CLI Command Script**
+- To have Dagster and the Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info.
- - Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content:
+---
- ```bash
- #!/bin/bash
- echo "Hello from CLI"
- echo "My env var is: ${MY_ENV_VAR}"
- ```
+## Steps to Implement with Dagster
-2. **Step 2: Define the Dagster Asset**
+By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline.
- - Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
+### Step 1: Define the CLI Command Script
- Save the following file to `dagster_pipes_cli.py`
+Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content:
- ```python
- import shutil
+```bash
+#!/bin/bash
+echo "Hello from CLI"
+echo "My env var is: ${MY_ENV_VAR}"
+```
- from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset
+### Step 2: Define the Dagster Asset
- @asset
- def cli_command_asset(
- context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
- ):
- cmd = [shutil.which("bash"), "external_script.sh"]
- return pipes_subprocess_client.run(
- command=cmd,
- context=context,
- env={"MY_ENV_VAR": "example_value"},
- ).get_materialize_result()
+Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
- defs = Definitions(
- assets=[cli_command_asset],
- resources={"pipes_subprocess_client": PipesSubprocessClient()},
- )
- ```
+Save the following file to `dagster_pipes_cli.py`:
-3. **Step 3: Configure and Run the Asset**
- - Ensure the script is executable and run the Dagster asset to see the output.
- ```bash
- chmod +x external_script.sh
- dagster dev -f path_to_your_dagster_file.py
- ```
+```python
+import shutil
-### Expected Outcomes
+from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset
-By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline.
+@asset
+def cli_command_asset(
+ context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
+):
+ cmd = [shutil.which("bash"), "external_script.sh"]
+ return pipes_subprocess_client.run(
+ command=cmd,
+ context=context,
+ env={"MY_ENV_VAR": "example_value"},
+ ).get_materialize_result()
-### Troubleshooting
+defs = Definitions(
+ assets=[cli_command_asset],
+ resources={"pipes_subprocess_client": PipesSubprocessClient()},
+)
+```
-- **Permission Denied**: Ensure the script file has executable permissions using `chmod +x`.
-- **Command Not Found**: Verify the command is available in the system's PATH or provide the full path to the command.
+### Step 3: Configure and Run the Asset
-### Additional Resources
+Ensure the script is executable and run the Dagster asset to see the output.
-- [Dagster Pipes Documentation](https://docs.dagster.io/guides/dagster-pipes)
-- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install)
+```bash
+chmod +x external_script.sh
+dagster dev -f path_to_your_dagster_file.py
+```
-### Next Steps
+---
+
+## Troubleshooting
+
+- **Permission Denied**: Ensure the script file has executable permissions using `chmod +x`.
+- **Command Not Found**: Verify the command is available in the system's `PATH` or provide the full path to the command.
+
+---
+
+## Next Steps
Explore more advanced use cases with Dagster Pipes, such as integrating with other command-line tools or handling more complex workflows.
+
+---
+
+## Additional Resources
+
+- [Dagster Pipes Documentation](https://docs.dagster.io/guides/dagster-pipes)
+- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install)
\ No newline at end of file
diff --git a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
index 5f6c2f99a25f3..1c8dd64ec5b8b 100644
--- a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
+++ b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
@@ -6,11 +6,21 @@ tags: ["snowflake", "s3", "dagster", "sling", "data ingestion"]
## Ingesting Data from S3 to Snowflake with Dagster and Sling
-### Overview
-
This guide provides a step-by-step approach to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The main objective is to automate the data ingestion process, making it efficient and reliable for data management and analysis.
-### Prerequisites
+---
+
+## What You'll Learn
+
+By following this guide, you will learn how to:
+
+- Set up connections to S3 and Snowflake using Sling.
+- Define and configure assets in Dagster to automate the data ingestion process.
+- Execute the data ingestion pipeline.
+
+---
+
+## Prerequisites
Before you begin, ensure you have the following:
@@ -20,104 +30,105 @@ Before you begin, ensure you have the following:
- Python installed on your system.
- Dagster and dagster-embedded-elt installed in your Python environment.
-### What You’ll Learn
+---
-By following this guide, you will learn how to:
+## Steps to Implement With Dagster
-- Set up connections to S3 and Snowflake using Sling.
-- Define and configure assets in Dagster to automate the data ingestion process.
-- Execute the data ingestion pipeline.
+By following these steps, you will have an automated pipeline that ingests data from Amazon S3 into Snowflake. The data will be available in Snowflake for further processing and analysis.
-### Steps to Implement With Dagster
+### Step 1: Install Required Packages
-1. **Step 1: Install Required Packages**
+Ensure you have the necessary Python packages installed. Install Dagster and the Dagster UI (`dagster-webserver`) and dagster-embedded-elt using pip. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info.
- - Ensure you have the necessary Python packages installed.
- - Install Dagster and the Dagster UI (`dagster-webserver`) and dagster-embedded-elt using pip. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info
+```bash
+pip install dagster dagster-embedded-elt dagster-webserver
+```
- ```bash
- pip install dagster dagster-embedded-elt dagster-webserver
- ```
+### Step 2: Define Sling Connections
-2. **Step 2: Define Sling Connections**
+Define the connections to S3 and Snowflake using SlingConnectionResource. Use environment variables to securely manage sensitive information.
- - Define the connections to S3 and Snowflake using SlingConnectionResource.
- - Use environment variables to securely manage sensitive information.
+```python
+from dagster import EnvVar
+from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource
- ```python
- from dagster import EnvVar
- from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource
+s3_connection = SlingConnectionResource(
+ name="MY_S3",
+ type="s3",
+ bucket="your-s3-bucket",
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
+)
- s3_connection = SlingConnectionResource(
- name="MY_S3",
- type="s3",
- bucket="your-s3-bucket",
- access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
- secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
- )
+snowflake_connection = SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host="your-snowflake-host",
+ user="your-snowflake-user",
+ database="your-snowflake-database",
+ password=EnvVar("SNOWFLAKE_PASSWORD"),
+ role="your-snowflake-role",
+)
- snowflake_connection = SlingConnectionResource(
- name="MY_SNOWFLAKE",
- type="snowflake",
- host="your-snowflake-host",
- user="your-snowflake-user",
- database="your-snowflake-database",
- password=EnvVar("SNOWFLAKE_PASSWORD"),
- role="your-snowflake-role",
- )
+sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
+```
- sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
- ```
+### Step 3: Define the Data Ingestion Asset
-3. **Step 3: Define the Data Ingestion Asset**
+Use the `@sling_assets` decorator to define an asset that runs the Sling replication job. Configure the replication settings to specify the source and target.
- - Use the `@sling_assets` decorator to define an asset that runs the Sling replication job.
- - Configure the replication settings to specify the source and target.
+```python
+from dagster import Definitions
+from dagster_embedded_elt.sling import sling_assets
- ```python
- from dagster import Definitions
- from dagster_embedded_elt.sling import sling_assets
+replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SNOWFLAKE",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://your-s3-bucket/your-file.csv": {
+ "object": "your_snowflake_schema.your_table",
+ "primary_key": "id",
+ },
+ },
+}
- replication_config = {
- "SOURCE": "MY_S3",
- "TARGET": "MY_SNOWFLAKE",
- "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
- "streams": {
- "s3://your-s3-bucket/your-file.csv": {
- "object": "your_snowflake_schema.your_table",
- "primary_key": "id",
- },
- },
- }
+@sling_assets(replication_config=replication_config)
+def ingest_s3_to_snowflake(context, sling: SlingResource):
+ yield from sling.replicate(context=context)
- @sling_assets(replication_config=replication_config)
- def ingest_s3_to_snowflake(context, sling: SlingResource):
- yield from sling.replicate(context=context)
+defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
+```
- defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
- ```
+---
-### Expected Outcomes
+## Expected Outcomes
After implementing this use case, you should have an automated pipeline that ingests data from Amazon S3 into Snowflake. The data will be available in Snowflake for further processing and analysis.
-### Troubleshooting
+---
+
+## Troubleshooting
- **Connection Issues**: Ensure that your AWS and Snowflake credentials are correctly set in the environment variables.
- **Data Format Issues**: Verify that the data format in S3 matches the expected format in Snowflake.
- **Permissions**: Ensure that the Snowflake user has the necessary permissions to write to the target schema and table.
-### Additional Resources
+---
+
+## Additional Resources
- [Dagster Documentation](https://docs.dagster.io/)
- [Embedded ELT Documentation](https://docs.dagster.io/integrations/embedded-elt)
- [Sling Documentation](https://docs.slingdata.io/)
- [Snowflake Documentation](https://docs.snowflake.com/)
-### Next Steps
+---
+
+## Next Steps
- Explore more advanced configurations and transformations using Sling.
- Integrate additional data sources and targets to expand your data pipeline.
- Implement monitoring and alerting for your data ingestion pipeline.
-By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
+By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
\ No newline at end of file
diff --git a/examples/use_case_repository/webserver/main.py b/examples/use_case_repository/webserver/main.py
index 1191994bb1bdd..1b0f932173f53 100644
--- a/examples/use_case_repository/webserver/main.py
+++ b/examples/use_case_repository/webserver/main.py
@@ -15,7 +15,7 @@
def index():
use_cases = []
for filename in os.listdir(USE_CASES_DIR):
- if filename.endswith(".md"):
+ if filename.endswith(".md") and not filename.startswith("_"):
with open(os.path.join(USE_CASES_DIR, filename), "r") as f:
post = frontmatter.load(f)
use_cases.append(
From 2c0ac76ebfd2c3a971cd3e70aaf60d3f160fd145 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Fri, 9 Aug 2024 09:03:21 -0700
Subject: [PATCH 11/13] Update _TEMPLATE.md
---
.../use_case_repository/guides/_TEMPLATE.md | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
index 374f30bf59a77..3e5724be37259 100644
--- a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
+++ b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
@@ -14,9 +14,9 @@ Provide a brief overview of the use case, including its objectives and the main
You will learn how to:
- - Define a Dagster asset that extracts data from an external source and writes it to a database
- - Add other bullets here
- - ...
+- Define a Dagster asset that extracts data from an external source and writes it to a database
+- Add other bullets here
+- ...
---
From 2107880d00a202345e22c34b715d2a884fd1c138 Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Fri, 9 Aug 2024 09:05:50 -0700
Subject: [PATCH 12/13] Update _TEMPLATE.md
---
.../use_case_repository/use_case_repository/guides/_TEMPLATE.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
index 3e5724be37259..1a5e0c1c15713 100644
--- a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
+++ b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
@@ -33,7 +33,7 @@ To follow the steps in this guide, you will need:
## Steps to Implement With Dagster
-By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline.
+By following these steps, you will [Provide a general description of what the user will wind up with by the end of the guide]. [Also provide a general description of what this enables them to do].
### Step 1: Enter the Name of Step 1 Here
From 03bd736839078aac8e284af974cee8089f900c6b Mon Sep 17 00:00:00 2001
From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com>
Date: Fri, 9 Aug 2024 09:11:43 -0700
Subject: [PATCH 13/13] make prettier
---
examples/use_case_repository/README.md | 2 +-
.../use_case_repository/guides/_TEMPLATE.md | 33 +++++++++----------
.../guides/pipes_cli_command.md | 2 +-
.../guides/snowflake_to_s3_embedded_elt.md | 2 +-
4 files changed, 18 insertions(+), 21 deletions(-)
diff --git a/examples/use_case_repository/README.md b/examples/use_case_repository/README.md
index 75f84b256e968..3d8a7cb60333f 100644
--- a/examples/use_case_repository/README.md
+++ b/examples/use_case_repository/README.md
@@ -25,7 +25,7 @@ Each use case consists of two main components:
Both files are utilized on the Dagster.io website. However, only the Python files are subject to automated testing.
The TEMPLATE.md file is used to create new use cases. The actual template lives on our external
-Scout platform.
+Scout platform.
### Important Note
diff --git a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
index 1a5e0c1c15713..d4eedb3a8ff57 100644
--- a/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
+++ b/examples/use_case_repository/use_case_repository/guides/_TEMPLATE.md
@@ -1,7 +1,7 @@
---
title: "Snowflake to S3 ETL with Dagster"
description: "This use case demonstrates how to transfer data from Snowflake to Amazon S3 using Dagster. The objective is to automate the extraction of data from Snowflake and store it in S3 for further processing or archival."
-tags: ["snowflake", "s3" ]
+tags: ["snowflake", "s3"]
---
# [Insert a Use Case Name that has high SEO Value here]
@@ -14,8 +14,8 @@ Provide a brief overview of the use case, including its objectives and the main
You will learn how to:
-- Define a Dagster asset that extracts data from an external source and writes it to a database
-- Add other bullets here
+- Define a Dagster asset that extracts data from an external source and writes it to a database
+- Add other bullets here
- ...
---
@@ -28,14 +28,13 @@ To follow the steps in this guide, you will need:
- A basic understanding of Dagster. Refer to the [Dagster Documentation](https://docs.dagster.io/getting-started/what-why-dagster) for more information.
- List other prerequisites here
-
---
## Steps to Implement With Dagster
By following these steps, you will [Provide a general description of what the user will wind up with by the end of the guide]. [Also provide a general description of what this enables them to do].
-### Step 1: Enter the Name of Step 1 Here
+### Step 1: Enter the Name of Step 1 Here
Provide a brief description of what this step does. Prefer a small, working Dagster
example as a first step. Here is an example of what this might look like:
@@ -47,34 +46,33 @@ example as a first step. Here is an example of what this might look like:
AssetExecutionContext,
Definitions,
)
-
+
import datetime
-
+
# Define the partitions
partitions_def = DailyPartitionsDefinition(start_date="2023-01-01")
-
-
+
+
@asset(partitions_def=partitions_def)
def upstream_asset(context: AssetExecutionContext):
with open(f"data-{partition_date}.csv", "w") as f:
f.write(f"Data for partition {partition_date}")
-
+
snapshot_date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
with open(f"data-{snapshot_date}.csv", "w") as f:
f.write(f"Data for partition {partition_date}")
-
-
+
+
defs = Definitions(assets=[upstream_asset])
```
### Step 2: Enter the Name of Step 2 Here
-Provide a brief description of what this step does.
-
+Provide a brief description of what this step does.
### Step 3: Enter the Name of Step 3 Here
-Provide a brief description of what this step does.
+Provide a brief description of what this step does.
---
@@ -88,15 +86,14 @@ Describe the expected outcomes of implementing the use case. Include any results
Provide solutions to common issues that might arise while implementing the use case.
-
---
## Next Steps
-What should the person try next after this?
+What should the person try next after this?
---
## Additional Resources
-List any additional resources, such as documentation, tutorials, or community links, that could help users implement the use case.
\ No newline at end of file
+List any additional resources, such as documentation, tutorials, or community links, that could help users implement the use case.
diff --git a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
index 2c7506cd94f33..7fe083edc9e36 100644
--- a/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
+++ b/examples/use_case_repository/use_case_repository/guides/pipes_cli_command.md
@@ -97,4 +97,4 @@ Explore more advanced use cases with Dagster Pipes, such as integrating with oth
## Additional Resources
- [Dagster Pipes Documentation](https://docs.dagster.io/guides/dagster-pipes)
-- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install)
\ No newline at end of file
+- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install)
diff --git a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
index 1c8dd64ec5b8b..e5e8bd0ec950c 100644
--- a/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
+++ b/examples/use_case_repository/use_case_repository/guides/snowflake_to_s3_embedded_elt.md
@@ -131,4 +131,4 @@ After implementing this use case, you should have an automated pipeline that ing
- Integrate additional data sources and targets to expand your data pipeline.
- Implement monitoring and alerting for your data ingestion pipeline.
-By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
\ No newline at end of file
+By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.