From 82f939348b8f1e1fc6d20ff5f088b67235f1a39c Mon Sep 17 00:00:00 2001 From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> Date: Thu, 22 Feb 2024 18:41:50 -0800 Subject: [PATCH] [embedded-elt] Deprecate Sling factory method and SlingSource and SlingTarget Resources --- .../dagster_embedded_elt/sling/asset_defs.py | 14 ++++++- .../dagster_embedded_elt/sling/resources.py | 32 +++++++++++++++- .../pg_to_duckdb_with_dag_config/sling_dag.py | 38 ++++++++++++++++--- 3 files changed, 76 insertions(+), 8 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py index d15550b144474..faba095553eca 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_defs.py @@ -8,12 +8,16 @@ MaterializeResult, multi_asset, ) -from dagster._annotations import experimental +from dagster._annotations import deprecated +from dagster._utils.warnings import deprecation_warning from dagster_embedded_elt.sling.resources import SlingMode, SlingResource -@experimental +@deprecated( + breaking_version="0.23.0", + additional_warn_text="Use `@sling_assets` instead.", +) def build_sling_asset( asset_spec: AssetSpec, source_stream: str, @@ -76,6 +80,12 @@ def build_sling_asset( required_resource_keys={sling_resource_key}, ) def sync(context: AssetExecutionContext) -> MaterializeResult: + deprecation_warning( + "build_sling_asset", + breaking_version="0.23.0", + additional_warn_text="Use `@sling_assets` property instead.", + ) + sling: SlingResource = getattr(context.resources, sling_resource_key) last_row_count_observed = None for stdout_line in sling.sync( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 6c9f2d26f4517..d7afbeefdfcd9 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -18,8 +18,9 @@ PermissiveConfig, get_dagster_logger, ) -from dagster._annotations import experimental +from dagster._annotations import deprecated, experimental from dagster._utils.env import environ +from dagster._utils.warnings import deprecation_warning from pydantic import Field from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication @@ -29,6 +30,7 @@ logger = get_dagster_logger() ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") +DEPRECATION_WARNING_TEXT = "{name} has been deprecated, use `SlingConnectionResource` for both source and target connections." class SlingMode(str, Enum): @@ -44,6 +46,10 @@ class SlingMode(str, Enum): BACKFILL = "backfill" +@deprecated( + breaking_version="0.23.0", + additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingSourceConnection"), +) class SlingSourceConnection(PermissiveConfig): """A Sling Source Connection defines the source connection used by :py:class:`~dagster_elt.sling.SlingResource`. @@ -77,6 +83,10 @@ class SlingSourceConnection(PermissiveConfig): ) +@deprecated( + breaking_version="0.23.0", + additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingTargetConnection"), +) class SlingTargetConnection(PermissiveConfig): """A Sling Target Connection defines the target connection used by :py:class:`~dagster_elt.sling.SlingResource`. @@ -181,6 +191,22 @@ class SlingResource(ConfigurableResource): @contextlib.contextmanager def _setup_config(self) -> Generator[None, None, None]: """Uses environment variables to set the Sling source and target connections.""" + if self.source_connection: + deprecation_warning( + "source_connection", + "0.23", + "source_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.", + stacklevel=4, + ) + + if self.target_connection: + deprecation_warning( + "target_connection", + "0.23", + "target_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.", + stacklevel=4, + ) + sling_source = None sling_target = None if self.source_connection: @@ -235,6 +261,10 @@ def _exec_sling_cmd( if proc.returncode != 0: raise Exception("Sling command failed with error code %s", proc.returncode) + @deprecated( + breaking_version="0.23.0", + additional_warn_text="sync has been deprecated, use `replicate` instead.", + ) def sync( self, source_stream: str, diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/replication_configs/pg_to_duckdb_with_dag_config/sling_dag.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/replication_configs/pg_to_duckdb_with_dag_config/sling_dag.py index 164c5fb0a3c4e..d5312b78d25c2 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/replication_configs/pg_to_duckdb_with_dag_config/sling_dag.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/replication_configs/pg_to_duckdb_with_dag_config/sling_dag.py @@ -1,6 +1,16 @@ -from dagster import Definitions, file_relative_path -from dagster_embedded_elt.sling import DagsterSlingTranslator, sling_assets -from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource +from dagster import AssetSpec, Definitions, file_relative_path +from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingMode, + build_sling_asset, + sling_assets, +) +from dagster_embedded_elt.sling.resources import ( + SlingConnectionResource, + SlingResource, + SlingSourceConnection, + SlingTargetConnection, +) replication_config = file_relative_path(__file__, "sling_replication.yaml") @@ -19,6 +29,14 @@ ] ) +asset_deprecated = build_sling_asset( + asset_spec=AssetSpec(key=["main", "dest_tbl"]), + source_stream="file:///tmp/test.csv", + target_object="main.dest_table", + mode=SlingMode.INCREMENTAL, + primary_key="id", +) + @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): @@ -30,7 +48,17 @@ def my_assets(context, sling: SlingResource): context.log.info(row) +sling_other_resource = SlingResource( + source_connection=SlingSourceConnection( + type="postgres", + connection_string="postgres://postgres:postgres@localhost:5432/finance?sslmode=disable", + ), + target_connection=SlingTargetConnection( + type="duckdb", connection_string="duckdb:///var/tmp/duckdb.db" + ), +) + defs = Definitions( - assets=[my_assets], - resources={"sling": sling_resource}, + assets=[my_assets, asset_deprecated], + resources={"sling": sling_resource, "other": sling_other_resource}, )