Skip to content

Commit

Permalink
Add dagster-elt (#16681)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This adds the `dagster-elt` integration which aims to be a set of embedded ELT/ETL tooling to help data engineers with common ingestion/transform tasks such as ingesting data to and from databases,
warehouses, and files.

This first iteration is a wrapper around the [sling](https://slingdata.io) package. Sling is a lightweight CLI tool that allows users to sync data from a source/destination without the extra burden of heavyweight tooling that other solutions require.

## How I Tested These Changes

Unit tests, local testing.

---------

Co-authored-by: Ben Pankow <[email protected]>
  • Loading branch information
2 people authored and yuhan committed Oct 3, 2023
1 parent 0a26ec4 commit 8aa7d64
Show file tree
Hide file tree
Showing 32 changed files with 1,002 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,11 @@
}
]
},
{
"title": "Embedded ELT",
"path": "/integrations/embedded-elt"

},
{
"title": "Fivetran",
"path": "/integrations/fivetran"
Expand Down
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions docs/content/integrations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Using our integration guides and libraries, you can extend Dagster to interopera
href="/integrations/dbt-cloud"
></ArticleListItem>
<ArticleListItem title="DuckDB" href="/integrations/duckdb"></ArticleListItem>
<ArticleListItem
title="Embedded ELT"
href="/integrations/embedded-elt"
></ArticleListItem>
<ArticleListItem
title="Fivetran"
href="/integrations/fivetran"
Expand Down Expand Up @@ -125,6 +129,10 @@ Explore libraries that are maintained by the Dagster core team.
title="dbt"
href="/_apidocs/libraries/dagster-dbt"
></ArticleListItem>
<ArticleListItem
title="Embedded ELT"
href="/_apidocs/libraries/dagster-embedded-elt"
></ArticleListItem>
<ArticleListItem
title="Fivetran"
href="/_apidocs/libraries/dagster-fivetran"
Expand Down
135 changes: 135 additions & 0 deletions docs/content/integrations/embedded-elt.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
---
title: "Dagster Embedded ELT"
description: Lightweight ELT framework for building ELT pipelines with Dagster, through helpful pre-built assets and resources
---

# Dagster Embedded ELT

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback.

This package currently includes a single implementation using <a href="https://slingdata.io">Sling</a>, which provides a simple way to sync data between databases and file systems.

We plan on adding additional embedded ELT tool integrations in the future.

---

## Relevant APIs

| Name | Description |
| --------------------------------------------------------------------------- | ------------------------------------------------------------------------------ |
| <PyObject module="dagster_embedded_elt.sling" object="build_sling_asset" /> | The core Sling asset factory for building syncs |
| <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> | The Sling Resource used for handing credentials to databases and object stores |

---

## Getting started

To get started with `dagster-embedded-elt` and Sling, familiarize yourself with <a href="https://docs.slingdata.io/sling-cli/running-tasks">Sling</a> by reading their docs which describe how sources and targets are configured.

The typical pattern for building an ELT pipeline with Sling has three steps:

1. First, create a <PyObject module="dagster-embedded-elt.sling" object="SlingResource" /> which is a container for the source and the target.
2. In the <PyObject module="dagster-embedded-elt.sling" object="SlingResource" /> define both a <PyObject module="dagster-embedded-elt.sling" object="SlingSourceConnection" /> and a <PyObject module="dagster-embedded-elt.sling" object="SlingTargetConnection" /> which holds the source and target credentials that Sling will use to sync data.
3. Finally, create an asset that syncs between two connections. You can use the <PyObject module="dagster-embedded-elt.sling" object="build_sling_asset" /> factory for most use cases.

---

## Step 1: Setting up a Sling resource

A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the <PyObject module="dagster-embedded-elt.sling" object="SlingSourceConnection" /> and <PyObject module="dagster-embedded-elt.sling" object="SlingTargetConnection" /> classes.

The types and parameters for each connection are defined by [Sling's connections](https://docs.slingdata.io/connections/database-connections).

The simplest connection is a file connection, which can be defined as:

```python
from dagster_embedded_elt.sling import SlingSourceConnection
source = SlingSourceConnection(type="file")
sling = SlingResource(source_connection=source, ...)
```

Note that no path is required in the source connection, as that is provided by the asset itself.

````python
asset_def = build_sling_asset(
asset_spec=AssetSpec("my_file"),
source_stream=f"file://{path_to_file}",
...
)

For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the `instance` keyword, which is specified in [Sling's SQLite connection](https://docs.slingdata.io/connections/database-connections/sqlite) documentation.

```python
source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db")
````

---

## Step 2: Creating a Sling sync

To create a Sling sync, once you have defined your resource, you can use the <PyObject module="dagster_embedded_elt.sling" object="build_sling_asset" /> factory to create an asset.

```python

sling_resource = SlingResource(
source_connection=SlingSourceConnection(type="file"),
target_connection=SlingTargetConnection(
type="sqlite", connection_string="sqlite://path/to/sqlite.db"
),
)

asset_spec = AssetSpec(
key=["main", "tbl"],
group_name="etl",
description="ETL Test",
deps=["foo"],
)

asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream="file://path/to/file.csv",
target_object="main.dest_tbl",
mode=SlingMode.INCREMENTAL,
primary_key="id",
)

sling_job = build_assets_job(
"sling_job",
[asset_def],
resource_defs={"sling": sling_resource},
)

```

---

## Examples

This is an example of how to setup a Sling sync between Postgres and Snowflake:

```python
import os
from dagster_embedded_elt.sling import SlingResource, SlingSourceConnection, SlingTargetConnection

source = SlingSourceConnection(
type="postgres", host="localhost", port=5432, database="my_database",
user="my_user", password=os.getenv("PG_PASS")
)

target = SlingTargetConnection(
type="snowflake", host="hostname.snowflake", user="username",
database="database", password=os.getenv("SF_PASSWORD"), role="role"
)

sling = SlingResource(source_connection=source, target_connection=target)
```

Similarily, you can define file/storage connections:

```python
source = SlingSourceConnection(
type="s3", bucket="sling-bucket",
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
```
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
"../../python_modules/libraries/dagster-datadog",
"../../python_modules/libraries/dagster-datahub",
"../../python_modules/libraries/dagster-docker",
"../../python_modules/libraries/dagster-embedded-elt",
"../../python_modules/libraries/dagster-embedded-elt/sling",
"../../python_modules/libraries/dagster-fivetran",
"../../python_modules/libraries/dagster-github",
"../../python_modules/libraries/dagster-k8s",
Expand Down
1 change: 1 addition & 0 deletions docs/sphinx/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
sections/api/apidocs/libraries/dagster-duckdb-pandas
sections/api/apidocs/libraries/dagster-duckdb-pyspark
sections/api/apidocs/libraries/dagster-duckdb-polars
sections/api/apidocs/libraries/dagster-embedded-elt
sections/api/apidocs/libraries/dagster-fivetran
sections/api/apidocs/libraries/dagster-docker
sections/api/apidocs/libraries/dagster-gcp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
####################################
embedded-elt (dagster-embedded-elt)
####################################

This package provides a framework for building ELT pipelines with Dagster through
helpful pre-built assets and resources.

This package currently includes a `Sling <https://slingdata.io>`_ integration which
provides a simple way to sync data between databases and file systems.

Related documentation pages: `embedded-elt </integrations/embedded-elt>`_.


******
Sling
******

.. currentmodule:: dagster_embedded_elt.sling

Assets
======

.. autofunction:: build_sling_asset

Resources
=========

.. autoclass:: SlingResource
:members: sync

.. autoclass:: dagster_embedded_elt.sling.resources.SlingSourceConnection
.. autoclass:: dagster_embedded_elt.sling.resources.SlingTargetConnection
1 change: 1 addition & 0 deletions docs/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ deps =
-e ../python_modules/libraries/dagster-duckdb
-e ../python_modules/libraries/dagster-dbt
-e ../python_modules/libraries/dagster-wandb
-e ../python_modules/libraries/dagster-embedded-elt

commands =
make --directory=sphinx clean
Expand Down
1 change: 1 addition & 0 deletions pyright/master/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ cycler==0.11.0
-e python_modules/libraries/dagster-duckdb-pandas
-e python_modules/libraries/dagster-duckdb-polars
-e python_modules/libraries/dagster-duckdb-pyspark
-e python_modules/libraries/dagster-embedded-elt
-e python_modules/libraries/dagster-fivetran
-e python_modules/libraries/dagster-gcp
-e python_modules/libraries/dagster-gcp-pandas
Expand Down
1 change: 1 addition & 0 deletions pyright/master/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
-e python_modules/libraries/dagster-duckdb-pandas/
-e python_modules/libraries/dagster-duckdb-pyspark/
-e python_modules/libraries/dagster-duckdb-polars/
-e python_modules/libraries/dagster-embedded-elt
-e python_modules/libraries/dagster-fivetran/
-e python_modules/libraries/dagster-gcp/
-e python_modules/libraries/dagster-gcp-pandas[test]
Expand Down
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-embedded-elt/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
branch = True
Loading

0 comments on commit 8aa7d64

Please sign in to comment.