Skip to content

Commit

Permalink
[embedded-elt] Deprecate Sling factory method and SlingSource and
Browse files Browse the repository at this point in the history
SlingTarget Resources
  • Loading branch information
PedramNavid committed Feb 28, 2024
1 parent 6fe87b0 commit a2a19d4
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
MaterializeResult,
multi_asset,
)
from dagster._annotations import experimental
from dagster._annotations import deprecated
from dagster._utils.warnings import deprecation_warning

from dagster_embedded_elt.sling.resources import SlingMode, SlingResource


@experimental
@deprecated(
breaking_version="0.23.0",
additional_warn_text="Use `@sling_assets` instead.",
)
def build_sling_asset(
asset_spec: AssetSpec,
source_stream: str,
Expand Down Expand Up @@ -76,6 +80,12 @@ def build_sling_asset(
required_resource_keys={sling_resource_key},
)
def sync(context: AssetExecutionContext) -> MaterializeResult:
deprecation_warning(
"build_sling_asset",
breaking_version="0.23.0",
additional_warn_text="Use `@sling_assets` property instead.",
)

sling: SlingResource = getattr(context.resources, sling_resource_key)
last_row_count_observed = None
for stdout_line in sling.sync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
PermissiveConfig,
get_dagster_logger,
)
from dagster._annotations import experimental
from dagster._annotations import deprecated, experimental
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication
Expand All @@ -29,6 +30,7 @@
logger = get_dagster_logger()

ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
DEPRECATION_WARNING_TEXT = "{name} has been deprecated, use `SlingConnectionResource` for both source and target connections."


class SlingMode(str, Enum):
Expand All @@ -44,6 +46,10 @@ class SlingMode(str, Enum):
BACKFILL = "backfill"


@deprecated(
breaking_version="0.23.0",
additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingSourceConnection"),
)
class SlingSourceConnection(PermissiveConfig):
"""A Sling Source Connection defines the source connection used by :py:class:`~dagster_elt.sling.SlingResource`.
Expand Down Expand Up @@ -77,6 +83,10 @@ class SlingSourceConnection(PermissiveConfig):
)


@deprecated(
breaking_version="0.23.0",
additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingTargetConnection"),
)
class SlingTargetConnection(PermissiveConfig):
"""A Sling Target Connection defines the target connection used by :py:class:`~dagster_elt.sling.SlingResource`.
Expand Down Expand Up @@ -181,6 +191,22 @@ class SlingResource(ConfigurableResource):
@contextlib.contextmanager
def _setup_config(self) -> Generator[None, None, None]:
"""Uses environment variables to set the Sling source and target connections."""
if self.source_connection:
deprecation_warning(
"source_connection",
"0.23",
"source_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.",
stacklevel=4,
)

if self.target_connection:
deprecation_warning(
"target_connection",
"0.23",
"target_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.",
stacklevel=4,
)

sling_source = None
sling_target = None
if self.source_connection:
Expand Down Expand Up @@ -235,6 +261,10 @@ def _exec_sling_cmd(
if proc.returncode != 0:
raise Exception("Sling command failed with error code %s", proc.returncode)

@deprecated(
breaking_version="0.23.0",
additional_warn_text="sync has been deprecated, use `replicate` instead.",
)
def sync(
self,
source_stream: str,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
from dagster import Definitions, file_relative_path
from dagster_embedded_elt.sling import DagsterSlingTranslator, sling_assets
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource
from dagster import AssetSpec, Definitions, file_relative_path
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingMode,
build_sling_asset,
sling_assets,
)
from dagster_embedded_elt.sling.resources import (
SlingConnectionResource,
SlingResource,
SlingSourceConnection,
SlingTargetConnection,
)

replication_config = file_relative_path(__file__, "sling_replication.yaml")

Expand All @@ -19,6 +29,14 @@
]
)

asset_deprecated = build_sling_asset(
asset_spec=AssetSpec(key=["main", "dest_tbl"]),
source_stream="file:///tmp/test.csv",
target_object="main.dest_table",
mode=SlingMode.INCREMENTAL,
primary_key="id",
)


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
Expand All @@ -30,7 +48,17 @@ def my_assets(context, sling: SlingResource):
context.log.info(row)


sling_other_resource = SlingResource(
source_connection=SlingSourceConnection(
type="postgres",
connection_string="postgres://postgres:postgres@localhost:5432/finance?sslmode=disable",
),
target_connection=SlingTargetConnection(
type="duckdb", connection_string="duckdb:///var/tmp/duckdb.db"
),
)

defs = Definitions(
assets=[my_assets],
resources={"sling": sling_resource},
assets=[my_assets, asset_deprecated],
resources={"sling": sling_resource, "other": sling_other_resource},
)

0 comments on commit a2a19d4

Please sign in to comment.