diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 8424f6addfa6b..5db2a7966ef7f 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -5,171 +5,253 @@ description: Lightweight ELT framework for building ELT pipelines with Dagster, # Dagster Embedded ELT -This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback. +This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback. -This package currently includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems. +This package includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems. We plan on adding additional embedded ELT tool integrations in the future. --- -## Getting started +## Overview -To get started with `dagster-embedded-elt` and Sling, familiarize yourself with Sling's replication configuration. +To get started with `dagster-embedded-elt` and Sling, first, familiarize yourself with Sling's replication configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The `dagser-embedded-elt` integration uses this configuration to build the assets for both sources and destinations. The typical pattern for building an ELT pipeline with Sling has three steps: 1. First, define a `replication.yaml` file that specifies the source and target connections, as well as which streams to sync from. -2. Next, define a for each connection, ensuring you name the resource using the same name given to the connection in the Sling configuration. +2. Next, create a and pass a list of for each connection to the `connection` parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration. -3. Create a Resource object and pass all the connections from the previous step to it. +3. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync. -4. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync. +Each step is explained in detail below: -````python --- -## Step 1: Setting up a Sling resource +## Step 1: Setting up a Sling replication configuration -A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the and classes. +Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing `replication.yaml` file, or construct a dictionary that represents the configuration in Python. -The types and parameters for each connection are defined by [Sling's connections](https://docs.slingdata.io/connections/database-connections). +This configuration is passed to the Sling CLI to run the replication job. -The simplest connection is a file connection, which can be defined as: +Here's an example of a `replication.yaml` file: -```python -from dagster_embedded_elt.sling import SlingSourceConnection -source = SlingSourceConnection(type="file") -sling = SlingResource(source_connection=source, ...) -```` +```yaml +SOURCE: MY_POSTGRES +TARGET: MY_SNOWFLAKE -Note that no path is required in the source connection, as that is provided by the asset itself. +defaults: + mode: full-refresh + object: "{stream_schema}_{stream_table}" -```python -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_file"), - source_stream=f"file://{path_to_file}", - ... -) +streams: + public.accounts: + public.users: + public.finance_departments: + object: "departments" ``` -For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the `instance` keyword, which is specified in [Sling's SQLite connection](https://docs.slingdata.io/connections/database-connections/sqlite) documentation. - -```python -source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db") +Or in Python: + +```python file=/integrations/embedded_elt/replication_config.py +replication_config = { + "SOURCE": "MY_POSTGRES", + "TARGET": "MY_DUCKDB", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} ``` ---- +## Step 2: Creating a Sling resource -## Step 2: Creating a Sling sync +Next, you will need to create a object that contains references to the connections specified in the replication configuration. -To create a Sling sync, once you have defined your resource, you can use the factory to create an asset. +A takes a `connections` parameter, where each represents a connection to a source or target database. You may provide as many connections to the `SlingResource` as needed. -```python +The `name` parameter in the should match the `SOURCE` and `TARGET` keys in the replication configuration. -sling_resource = SlingResource( - source_connection=SlingSourceConnection(type="file"), - target_connection=SlingTargetConnection( - type="sqlite", connection_string="sqlite://path/to/sqlite.db" - ), -) +You may pass a connection string or arbitrary keyword arguments to the to specify the connection details. See the Sling connections reference for the specific connection types and parameters. -asset_spec = AssetSpec( - key=["main", "tbl"], - group_name="etl", - description="ETL Test", - deps=["foo"], +```python file=/integrations/embedded_elt/sling_connection_resources.py +from dagster_embedded_elt.sling.resources import ( + SlingConnectionResource, + SlingResource, ) -asset_def = build_sling_asset( - asset_spec=asset_spec, - source_stream="file://path/to/file.csv", - target_object="main.dest_tbl", - mode=SlingMode.INCREMENTAL, - primary_key="id", +from dagster import EnvVar + +sling_resource = SlingResource( + connections=[ + # Using a connection string from an environment variable + SlingConnectionResource( + name="MY_POSTGRES", + type="postgres", + connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), + ), + # Using a hard-coded connection string + SlingConnectionResource( + name="MY_DUCKDB", + type="duckdb", + connection_string="duckdb:///var/tmp/duckdb.db", + ), + # Using a keyword-argument constructor + SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + role="REPORTING", + ), + ] ) +``` + +## Step 3: Define the Sling assets -sling_job = build_assets_job( - "sling_job", - [asset_def], - resource_defs={"sling": sling_resource}, +Now you can define a Sling asset using the decorator. Dagster will read the replication configuration to produce Assets. + +Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom object. + +```python file=/integrations/embedded_elt/sling_dagster_translator.py +from dagster_embedded_elt import sling +from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingResource, + sling_assets, ) +from dagster import Definitions, file_relative_path + +replication_config = file_relative_path(__file__, "../sling_replication.yaml") +sling_resource = SlingResource(connections=[...]) # Add connections here + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) + for row in sling.stream_raw_logs(): + context.log.info(row) + + +defs = Definitions( + assets=[ + my_assets, + ], + resources={ + "sling": sling_resource, + }, +) ``` ---- +That's it! You should now be able to view your assets in Dagster and run the replication job. ## Examples This is an example of how to setup a Sling sync between Postgres and Snowflake: ```python file=/integrations/embedded_elt/postgres_snowflake.py -import os - from dagster_embedded_elt.sling import ( - SlingMode, + DagsterSlingTranslator, + SlingConnectionResource, SlingResource, - SlingSourceConnection, - SlingTargetConnection, - build_sling_asset, + sling_assets, ) -from dagster import AssetSpec +from dagster import EnvVar -source = SlingSourceConnection( +source = SlingConnectionResource()( + name="MY_PG", type="postgres", host="localhost", port=5432, database="my_database", user="my_user", - password=os.getenv("PG_PASS"), + password=EnvVar("PG_PASS"), ) -target = SlingTargetConnection( +target = SlingConnectionResource( + name="MY_SF", type="snowflake", host="hostname.snowflake", user="username", database="database", - password=os.getenv("SF_PASSWORD"), + password=EnvVar("SF_PASSWORD"), role="role", ) -sling = SlingResource(source_connection=source, target_connection=target) - -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="public.my_table", - target_object="marts.my_table", - mode=SlingMode.INCREMENTAL, - primary_key="id", +sling = SlingResource( + connections=[ + source, + target, + ] ) +replication_config = { + "SOURCE": "MY_PG", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) ``` Similarily, you can define file/storage connections: ```python startafter=start_storage_config endbefore=end_storage_config file=/integrations/embedded_elt/s3_snowflake.py -source = SlingSourceConnection( +source = SlingConnectionResource()( + name="MY_S3", type="s3", bucket="sling-bucket", - access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), ) -sling = SlingResource(source_connection=source, target_connection=target) - -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="s3://my-bucket/my_file.parquet", - target_object="marts.my_table", - primary_key="id", -) +sling = SlingResource(connections=[source, target]) + +replication_config = { + "SOURCE": "MY_S3", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "s3://my-bucket/my_file.parquet": { + "object": "marts.my_table", + "primary_key": "id", + }, + }, +} + + +@sling_assets +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) ``` ## Relevant APIs -| Name | Description | -| -------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ | -| | The core Sling asset factory for building syncs | -| | The Sling Resource used for handing credentials to databases and object stores | -| | A translator for specifying how to map between Sling and Dagster types | +| Name | Description | +| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ | +| | The core Sling asset factory for building syncs | +| | The Sling Resource used for handing credentials to databases and object stores | +| | A translator for specifying how to map between Sling and Dagster types | +| | A Sling connection resource for specifying the connection details | diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index cb667468d6c27..cca09ed35d820 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -1,42 +1,56 @@ # pyright: reportCallIssue=none # pyright: reportOptionalMemberAccess=none -import os - from dagster_embedded_elt.sling import ( - SlingMode, + DagsterSlingTranslator, + SlingConnectionResource, SlingResource, - SlingSourceConnection, - SlingTargetConnection, - build_sling_asset, + sling_assets, ) -from dagster import AssetSpec +from dagster import EnvVar -source = SlingSourceConnection( +source = SlingConnectionResource( + name="MY_PG", type="postgres", host="localhost", port=5432, database="my_database", user="my_user", - password=os.getenv("PG_PASS"), + password=EnvVar("PG_PASS"), ) -target = SlingTargetConnection( +target = SlingConnectionResource( + name="MY_SF", type="snowflake", host="hostname.snowflake", user="username", database="database", - password=os.getenv("SF_PASSWORD"), + password=EnvVar("SF_PASSWORD"), role="role", ) -sling = SlingResource(source_connection=source, target_connection=target) - -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="public.my_table", - target_object="marts.my_table", - mode=SlingMode.INCREMENTAL, - primary_key="id", +sling = SlingResource( + connections=[ + source, + target, + ] ) +replication_config = { + "SOURCE": "MY_PG", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py new file mode 100644 index 0000000000000..b2a5abca43540 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py @@ -0,0 +1,10 @@ +replication_config = { + "SOURCE": "MY_POSTGRES", + "TARGET": "MY_DUCKDB", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index c8f8a66580f2c..9d13fe66d1496 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -1,42 +1,56 @@ # pyright: reportCallIssue=none # pyright: reportOptionalMemberAccess=none -import os - from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingConnectionResource, SlingResource, - SlingSourceConnection, - SlingTargetConnection, - build_sling_asset, + sling_assets, ) -from dagster import AssetSpec +from dagster import EnvVar -target = SlingTargetConnection( +target = SlingConnectionResource( + name="MY_SF", type="snowflake", host="hostname.snowflake", user="username", database="database", - password=os.getenv("SF_PASSWORD"), + password=EnvVar("SF_PASSWORD"), role="role", ) # start_storage_config -source = SlingSourceConnection( +source = SlingConnectionResource()( + name="MY_S3", type="s3", bucket="sling-bucket", - access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), ) -sling = SlingResource(source_connection=source, target_connection=target) +sling = SlingResource(connections=[source, target]) + +replication_config = { + "SOURCE": "MY_S3", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "s3://my-bucket/my_file.parquet": { + "object": "marts.my_table", + "primary_key": "id", + }, + }, +} + + +@sling_assets +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="s3://my-bucket/my_file.parquet", - target_object="marts.my_table", - primary_key="id", -) # end_storage_config diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py new file mode 100644 index 0000000000000..bd6c76014e188 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py @@ -0,0 +1,32 @@ +# pyright: reportCallIssue=none +from dagster_embedded_elt.sling.resources import ( + SlingConnectionResource, + SlingResource, +) + +from dagster import EnvVar + +sling_resource = SlingResource( + connections=[ + # Using a connection string from an environment variable + SlingConnectionResource( + name="MY_POSTGRES", + type="postgres", + connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), + ), + # Using a hard-coded connection string + SlingConnectionResource( + name="MY_DUCKDB", + type="duckdb", + connection_string="duckdb:///var/tmp/duckdb.db", + ), + # Using a keyword-argument constructor + SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + role="REPORTING", + ), + ] +) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py new file mode 100644 index 0000000000000..09226414221d5 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -0,0 +1,31 @@ +from dagster_embedded_elt import sling +from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingResource, + sling_assets, +) + +from dagster import Definitions, file_relative_path + +replication_config = file_relative_path(__file__, "../sling_replication.yaml") +sling_resource = SlingResource(connections=[...]) # Add connections here + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) + for row in sling.stream_raw_logs(): + context.log.info(row) + + +defs = Definitions( + assets=[ + my_assets, + ], + resources={ + "sling": sling_resource, + }, +) diff --git a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py index 8dde56f5a2e43..955598028019b 100644 --- a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py +++ b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py @@ -1,11 +1,15 @@ from docs_snippets.integrations.embedded_elt.postgres_snowflake import ( - asset_def as asset_def_postgres, + my_assets as asset_def_postgres, ) from docs_snippets.integrations.embedded_elt.s3_snowflake import ( - asset_def as asset_def_s3, + my_assets as asset_def_s3, +) +from docs_snippets.integrations.embedded_elt.sling_connection_resources import ( + sling_resource, ) -def test_asset_defs(): +def test_asset_defs() -> None: assert asset_def_postgres assert asset_def_s3 + assert sling_resource