Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[embedded-elt] Update Sling docs and examples #20028

Merged
merged 7 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/_apidocs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi
</td>
<td>Provides support for storing PySpark DataFrames in DuckDB.</td>
</tr>
<tr>
<td>
<a href="/_apidocs/libraries/dagster-embedded-elt">Embedded ELT</a> (
<code>dagster-embedded-elt</code>)
</td>
<td>Provides support for running embedded ELT within Dagster</td>
</tr>
<tr>
<td>
<a href="/_apidocs/libraries/dagster-fivetran">Fivetran</a> (
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
278 changes: 188 additions & 90 deletions docs/content/integrations/embedded-elt.mdx

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@ provides a simple way to sync data between databases and file systems.

Related documentation pages: `embedded-elt </integrations/embedded-elt>`_.

.. currentmodule:: dagster_embedded_elt.sling

******
Sling
******
***************************
dagster-embedded-elt.sling
***************************

.. currentmodule:: dagster_embedded_elt.sling
Assets (Sling)
==============

Assets
======
.. autodecorator:: sling_assets

.. autofunction:: build_sling_asset
.. autoclass:: DagsterSlingTranslator

Resources
=========
Resources (Sling)
=================

.. autoclass:: SlingResource
:members: sync
:members: sync, replicate

.. autoclass:: SlingConnectionResource

Deprecated
-----------

.. autofunction:: build_sling_asset
.. autoclass:: dagster_embedded_elt.sling.resources.SlingSourceConnection
.. autoclass:: dagster_embedded_elt.sling.resources.SlingTargetConnection
Original file line number Diff line number Diff line change
@@ -1,42 +1,56 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none

import os

from dagster_embedded_elt.sling import (
SlingMode,
DagsterSlingTranslator,
SlingConnectionResource,
SlingResource,
SlingSourceConnection,
SlingTargetConnection,
build_sling_asset,
sling_assets,
)

from dagster import AssetSpec
from dagster import EnvVar

source = SlingSourceConnection(
source = SlingConnectionResource(
name="MY_PG",
type="postgres",
host="localhost",
port=5432,
database="my_database",
user="my_user",
password=os.getenv("PG_PASS"),
password=EnvVar("PG_PASS"),
)

target = SlingTargetConnection(
target = SlingConnectionResource(
name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
password=os.getenv("SF_PASSWORD"),
password=EnvVar("SF_PASSWORD"),
role="role",
)

sling = SlingResource(source_connection=source, target_connection=target)

asset_def = build_sling_asset(
asset_spec=AssetSpec("my_asset_name"),
source_stream="public.my_table",
target_object="marts.my_table",
mode=SlingMode.INCREMENTAL,
primary_key="id",
sling = SlingResource(
connections=[
source,
target,
]
)
replication_config = {
"SOURCE": "MY_PG",
"TARGET": "MY_SF",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
replication_config = {
"SOURCE": "MY_POSTGRES",
"TARGET": "MY_DUCKDB",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,56 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none

import os

from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingConnectionResource,
SlingResource,
SlingSourceConnection,
SlingTargetConnection,
build_sling_asset,
sling_assets,
)

from dagster import AssetSpec
from dagster import EnvVar

target = SlingTargetConnection(
target = SlingConnectionResource(
name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
password=os.getenv("SF_PASSWORD"),
password=EnvVar("SF_PASSWORD"),
role="role",
)


# start_storage_config
source = SlingSourceConnection(
source = SlingConnectionResource(
name="MY_S3",
type="s3",
bucket="sling-bucket",
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)

sling = SlingResource(source_connection=source, target_connection=target)
sling = SlingResource(connections=[source, target])

replication_config = {
"SOURCE": "MY_S3",
"TARGET": "MY_SF",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"s3://my-bucket/my_file.parquet": {
"object": "marts.my_table",
"primary_key": "id",
},
},
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)

asset_def = build_sling_asset(
asset_spec=AssetSpec("my_asset_name"),
source_stream="s3://my-bucket/my_file.parquet",
target_object="marts.my_table",
primary_key="id",
)

# end_storage_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# pyright: reportCallIssue=none
from dagster_embedded_elt.sling.resources import (
SlingConnectionResource,
SlingResource,
)

from dagster import EnvVar

sling_resource = SlingResource(
connections=[
# Using a connection string from an environment variable
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
# Using a hard-coded connection string
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
# Using a keyword-argument constructor
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
role="REPORTING",
),
]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
sling_assets,
)

from dagster import Definitions, file_relative_path

replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
for row in sling.stream_raw_logs():
context.log.info(row)


defs = Definitions(
assets=[
my_assets,
],
resources={
"sling": sling_resource,
},
)
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from docs_snippets.integrations.embedded_elt.postgres_snowflake import (
asset_def as asset_def_postgres,
my_assets as asset_def_postgres,
)
from docs_snippets.integrations.embedded_elt.s3_snowflake import (
asset_def as asset_def_s3,
my_assets as asset_def_s3,
)
from docs_snippets.integrations.embedded_elt.sling_connection_resources import (
sling_resource,
)


def test_asset_defs():
def test_asset_defs() -> None:
assert asset_def_postgres
assert asset_def_s3
assert sling_resource
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def sling_assets(

Args:
replication_config: Union[Mapping[str, Any], str, Path]: A path to a Sling replication config, or a dictionary
of a replication config.
of a replication config.
dagster_sling_translator: DagsterSlingTranslator: Allows customization of how to map a Sling stream to a Dagster
AssetKey.
partitions_def: Optional[PartitionsDefinition]: The partitions definition for this asset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:

For example:

stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}
.. code-block:: python

stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}

By default, this returns the class's target_prefix paramater concatenated with the stream name.
A stream named "public.accounts" will create an AssetKey named "target_public_accounts".
Expand All @@ -78,11 +80,13 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
Examples:
Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
.. code-block:: python

class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
"""
config = stream_definition.get("config", {}) or {}
object_key = config.get("object")
Expand Down Expand Up @@ -131,12 +135,13 @@ def get_deps_asset_key(cls, stream_definition: Mapping[str, Any]) -> Iterable[As
Examples:
Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def get_deps_asset_key(cls, stream_name: str) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
.. code-block:: python

class CustomSlingTranslator(DagsterSlingTranslator):
@classmethod
def get_deps_asset_key(cls, stream_name: str) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])

"""
config = stream_definition.get("config", {}) or {}
Expand Down
Loading
Loading