Skip to content

Commit

Permalink
Add ANSI escape sequence cleanup and replication timing to SlingResource
Browse files Browse the repository at this point in the history
- Yield sanitized stdout with timings from `replicate` method in SlingResource.
- Capture raw logs in `_stdout` list for later streaming.
- Include elapsed time metadata in `MaterializeResult` when yielding sync results.
- Add Docker-related files for testing Postgres to DuckDB replication using Sling.
  • Loading branch information
PedramNavid committed Feb 25, 2024
1 parent adaaa2d commit 284a60c
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import json
import os
import re
import sys
import tempfile
import time
import uuid
from enum import Enum
from subprocess import PIPE, STDOUT, Popen
Expand All @@ -26,6 +28,8 @@

logger = get_dagster_logger()

ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")


class SlingMode(str, Enum):
"""The mode to use when syncing.
Expand Down Expand Up @@ -172,6 +176,7 @@ class SlingResource(ConfigurableResource):
source_connection: Optional[SlingSourceConnection] = None
target_connection: Optional[SlingTargetConnection] = None
connections: List[SlingConnectionResource] = []
_stdout: List[str] = []

@contextlib.contextmanager
def _setup_config(self) -> Generator[None, None, None]:
Expand Down Expand Up @@ -207,21 +212,23 @@ def _setup_config(self) -> Generator[None, None, None]:
):
yield

def process_stdout(self, stdout: IO[AnyStr], encoding="utf8") -> Iterator[str]:
def _clean_line(self, line: str) -> str:
"""Removes ANSI escape sequences from a line of output."""
return ANSI_ESCAPE.sub("", line).replace("INF", "")

def _process_stdout(self, stdout: IO[AnyStr], encoding="utf8") -> Iterator[str]:
"""Process stdout from the Sling CLI."""
ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
for line in stdout:
assert isinstance(line, bytes)
fmt_line = bytes.decode(line, encoding=encoding, errors="replace")
clean_line: str = ansi_escape.sub("", fmt_line).replace("INF", "")
yield clean_line
yield self._clean_line(fmt_line)

def _exec_sling_cmd(
self, cmd, stdin=None, stdout=PIPE, stderr=STDOUT, encoding="utf8"
) -> Generator[str, None, None]:
with Popen(cmd, shell=True, stdin=stdin, stdout=stdout, stderr=stderr) as proc:
if proc.stdout:
for line in self.process_stdout(proc.stdout, encoding=encoding):
for line in self._process_stdout(proc.stdout, encoding=encoding):
yield line

proc.wait()
Expand All @@ -242,10 +249,18 @@ def sync(
"""Runs a Sling sync from the given source table to the given destination table. Generates
output lines from the Sling CLI.
"""
if self.source_connection.type == "file" and not source_stream.startswith("file://"):
if (
self.source_connection
and self.source_connection.type == "file"
and not source_stream.startswith("file://")
):
source_stream = "file://" + source_stream

if self.target_connection.type == "file" and not target_object.startswith("file://"):
if (
self.target_connection
and self.target_connection.type == "file"
and not target_object.startswith("file://")
):
target_object = "file://" + target_object

with self._setup_config():
Expand Down Expand Up @@ -300,17 +315,30 @@ def replicate(

logger.debug(f"Running Sling replication with command: {cmd}")

# Get start time from wall clock
start_time = time.time()
results = sling._run( # noqa
cmd=cmd,
temp_file=temp_file,
return_output=True,
env=env,
)
for row in results.split("\n"):
clean_line = self._clean_line(row)
sys.stdout.write(clean_line + "\n")
self._stdout.append(clean_line)

end_time = time.time()

logger.info(results)
for stream in stream_definition:
output_name = dagster_sling_translator.get_asset_key(stream)
yield MaterializeResult(asset_key=output_name)
yield MaterializeResult(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
)

def stream_raw_logs(self) -> Generator[str, None, None]:
"""Returns the logs from the Sling CLI."""
yield from self._stdout


def _process_env_vars(config: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM postgres:15

ENV POSTGRES_DB=finance
ENV POSTGRES_USER=postgres
ENV POSTGRES_PASSWORD=postgres

COPY init_finance_db.sql /docker-entrypoint-initdb.d/
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
@PHONY: build run

build:
docker build -t finance-postgres .

run: build
docker run --name finance-db -p 5432:5432 -ti --rm finance-postgres

sync:
MY_POSTGRES=postgres://postgres:postgres@localhost:5432/finance?sslmode=disable \
MY_OTHER_POSTGRES=duckdb:///var/tmp/duckdb.db \
sling run -r sling_replication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
This is some helper code for internally testing the integration with Sling.

Run `make run` to build and run a simple Postgres instance that is fed with some
sample data.

Run the job in `sling_dag.py` with `dagster dev -f sling_dag.py` to see Dagster
load the assets from the replication file, and sync data from PG to DuckDB using
Sling.

You can interact with the duckdb instance which defaults to /var/tmp/duckdb.db

This folder is not currently used for automated testing.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
\c postgres

DROP DATABASE IF EXISTS finance;
DROP DATABASE IF EXISTS clone;
CREATE DATABASE finance;
CREATE DATABASE clone;

\c finance

CREATE TABLE IF NOT EXISTS accounts (
account_id serial PRIMARY KEY,
user_id int,
balance decimal(15,2) NOT NULL
);

CREATE TABLE IF NOT EXISTS users (
user_id serial PRIMARY KEY,
name varchar(255) NOT NULL,
email varchar(255) UNIQUE NOT NULL,
department_id int
);

CREATE TABLE IF NOT EXISTS finance_departments_old (
department_id serial PRIMARY KEY,
name varchar(255) NOT NULL
);

CREATE TABLE IF NOT EXISTS "Transactions"(
id serial PRIMARY KEY,
account_id int,
amount decimal(15,2) NOT NULL,
last_updated_at timestamp without time zone NOT NULL
);

CREATE TABLE IF NOT EXISTS "all_Users" (
all_user_id serial PRIMARY KEY,
name varchar(255) NOT NULL
);

INSERT INTO accounts (user_id, balance) VALUES
(1, 1000.00),
(2, 1500.00);

INSERT INTO users (name, email, department_id) VALUES
('John Doe', '[email protected]', 1),
('Jane Smith', '[email protected]', 2);

INSERT INTO finance_departments_old (name) VALUES ('Accounting'), ('Human Resources'), ('Engineering');

INSERT INTO "Transactions" (account_id, amount, last_updated_at) VALUES
(1, -200.00, '2023-01-15 14:30:00'),
(1, 300.00, '2023-02-15 10:00:00'),
(2, -150.00, '2023-01-20 09:00:00');

INSERT INTO "all_Users" (name) VALUES ('Alice Johnson'), ('Bob Williams'), ('Charlie Miller');

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dagster import Definitions, file_relative_path
from dagster_embedded_elt.sling import DagsterSlingTranslator, sling_assets
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource

replication_config = file_relative_path(__file__, "sling_replication.yaml")

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string="postgres://postgres:postgres@localhost:5432/finance?sslmode=disable",
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
for row in sling.stream_raw_logs():
context.log.info(row)


defs = Definitions(
assets=[my_assets],
resources={"sling": sling_resource},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
source: MY_POSTGRES
target: MY_DUCKDB

defaults:
mode: full-refresh

object: '{stream_schema}_{stream_table}'

streams:
public.accounts:
public.users:
disabled: true
public.finance_departments_old:
object: 'departments' # overwrite default object
source_options:
empty_as_null: false
meta:
dagster_source: boo

public."Transactions":
mode: incremental # overwrite default mode
primary_key: id
update_key: last_updated_at

public.all_users:
sql: |
select all_user_id, name
from public."all_Users"
object: public.all_users # need to add 'object' key for custom SQL

env:
SLING_LOADED_AT_COLUMN: true # adds the _sling_loaded_at timestamp column
SLING_STREAM_URL_COLUMN: true # if source is file, adds a _sling_stream_url column with file path / url
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,5 @@ def test_update_mode(
],
)
def test_non_unicode_stdout(text, encoding, expected, sling_sqlite_resource: SlingResource):
lines = sling_sqlite_resource.process_stdout(text, encoding)
lines = sling_sqlite_resource._process_stdout(text, encoding) # noqa
assert list(lines) == expected

0 comments on commit 284a60c

Please sign in to comment.