Skip to content

Commit

Permalink
[embedded-elt] Decoupling connections from resource allocation (#18270)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR proposes an alternative pattern to using Sling that uses a new
Resource called `SlingConnectionResource` and builder method called
`build_assets_from_sling_stream`. Names tentative.

Developers that use these objects can compose and mix-and-match the
sources and targets interoperably by pushing the coupling down to the
builder method. ie. in this snippet from a unit test, two different
sources are synced to the same target database.

```python
sling_file_connection=SlingConnectionResource(type="file")
sling_staging_file_connection=SlingConnectionResource(type="file")
sling_sqlite_connection=SlingConnectionResource(type="sqlite", connection_string=f"sqlite://{temp_db}")
asset_def = build_assets_from_sling_stream(
    sling_file_connection,
    sling_sqlite_connection,
    stream=f"file://{test_csv}",
    target_object="main.tbl",
    mode=SlingMode.FULL_REFRESH,
    primary_key="SPECIES_CODE",
)

asset_def_two = build_assets_from_sling_stream(
    sling_staging_file_connection,
    sling_sqlite_connection,
    stream=f"file://{test_staging_csv}",
    target_object="main.staging_tbl",
    mode=SlingMode.FULL_REFRESH,
    primary_key="SPECIES_CODE",
)
```

## How I Tested These Changes
Unit tests
  • Loading branch information
tacastillo authored Nov 29, 2023
1 parent ac0cbb1 commit 822d653
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster_embedded_elt.sling.asset_defs import build_sling_asset
from dagster_embedded_elt.sling.asset_defs import build_assets_from_sling_stream, build_sling_asset
from dagster_embedded_elt.sling.resources import (
SlingConnectionResource,
SlingMode,
SlingResource,
SlingSourceConnection,
Expand All @@ -10,6 +11,8 @@
"SlingResource",
"SlingMode",
"build_sling_asset",
"build_assets_from_sling_stream",
"SlingSourceConnection",
"SlingTargetConnection",
"SlingConnectionResource",
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
)
from dagster._annotations import experimental

from dagster_embedded_elt.sling.resources import SlingMode, SlingResource
from dagster_embedded_elt.sling.resources import (
SlingConnectionResource,
SlingMode,
SlingResource,
SlingStreamReplicator,
)


@experimental
Expand Down Expand Up @@ -99,3 +104,103 @@ def sync(context: AssetExecutionContext) -> MaterializeResult:
)

return sync


@experimental
def build_assets_from_sling_stream(
source: SlingConnectionResource,
target: SlingConnectionResource,
stream: str,
target_object: str = "{{target_schema}}.{{stream_schema}}_{{stream_table}}",
mode: SlingMode = SlingMode.FULL_REFRESH,
primary_key: Optional[Union[str, List[str]]] = None,
update_key: Optional[str] = None,
source_options: Optional[Dict[str, Any]] = None,
target_options: Optional[Dict[str, Any]] = None,
) -> AssetsDefinition:
"""Asset Factory for using Sling to sync data from a source stream to a target object.
Args:
source (SlingConnectionResource): The source SlingConnectionResource to use.
target (SlingConnectionResource): The target SlingConnectionResource to use.
stream (str): The source stream to sync from. This can be a table, a query, or a path.
target_object (str, optional): The target object to sync to. This can be a table, or a path. Defaults to the template of "{{target_schema}}.{{stream_schema}}_{{stream_table}}". See the Sling documentation for more information. https://docs.slingdata.io/sling-cli/replication
mode (SlingMode, optional): The sync mode to use when syncing. Defaults to `full-refresh`.
primary_key (Optional[Union[str, List[str]]], optional): The optional primary key to use when syncing.
update_key (Optional[str], optional): The optional update key to use when syncing.
source_options (Optional[Dict[str, Any]], optional): Any optional Sling source options to use when syncing.
target_options (Optional[Dict[str, Any]], optional): Any optional target options to use when syncing.
Examples:
Creating a Sling asset that syncs from a database to a data warehouse:
.. code-block:: python
source = SlingConnectionResource(
type="postgres",
host=EnvVar("POSTGRES_HOST"),
port=EnvVar("POSTGRES_PORT"),
user=EnvVar("POSTGRES_USER"),
password=EnvVar("POSTGRES_PASSWORD"),
database=EnvVar("POSTGRES_DATABASE"),
)
target = SlingConnectionResource(
type="snowflake",
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database=EnvVar("SNOWFLAKE_DATABASE"),
warehouse=EnvVar("SNOWFLAKE_WAREHOUSE"),
)
asset_def = build_assets_from_sling_stream(
source=source,
target=target,
stream="main.orders",
target_object="main.orders",
mode=SlingMode.INCREMENTAL,
primary_key="id"
)
"""
if primary_key is not None and not isinstance(primary_key, list):
primary_key = [primary_key]

sling_replicator = SlingStreamReplicator(
source_connection=source,
target_connection=target,
)

# sanitize the stream name to a valid asset name, matching the regex A-Za-z0-9_
asset_name = stream.replace(".", "_")

if asset_name.startswith("file://"):
asset_name = asset_name.split("/")[-1]

specs = [AssetSpec(asset_name)]

@multi_asset(name=f"sling_sync__{asset_name}", compute_kind="sling", specs=specs)
def _sling_assets(context: AssetExecutionContext) -> MaterializeResult:
last_row_count_observed = None

for stdout_line in sling_replicator.sync(
source_stream=stream,
target_object=target_object,
mode=mode,
primary_key=primary_key,
update_key=update_key,
source_options=source_options,
target_options=target_options,
):
match = re.search(r"(\d+) rows", stdout_line)
if match:
last_row_count_observed = int(match.group(1))
context.log.info(stdout_line)

return MaterializeResult(
metadata=(
{} if last_row_count_observed is None else {"row_count": last_row_count_observed}
)
)

return _sling_assets
Loading

0 comments on commit 822d653

Please sign in to comment.