Skip to content

Commit

Permalink
fix: Log based testing, upgrade to postgres16 for testing (#316)
Browse files Browse the repository at this point in the history
Setup allowing pytest for log_based syncs

Closes #294

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
Co-authored-by: Derek Visch <[email protected]>
  • Loading branch information
3 people authored Dec 15, 2023
1 parent 2eddf32 commit acaca95
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 7 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ jobs:
sudo chown 999:999 ssl/server.key
chmod 600 ssl/pkey.key
- name: Set up Postgres container
- name: Build Postgres container
run: |
docker compose -f docker-compose.yml up -d
docker build . --tag meltano/log_based
- name: Compose Postgres container
run: |
docker compose -f docker-compose.yml up -d --wait --wait-timeout=30
- uses: isbang/[email protected]

Expand Down
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM postgres:16

RUN apt-get update
RUN apt-mark hold locales
RUN apt-get install curl ca-certificates -y
RUN install -d /usr/share/postgresql-common/pgdg
RUN curl -o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc --fail https://www.postgresql.org/media/keys/ACCC4CF8.asc
RUN sh -c 'echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] https://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
RUN apt-get update
RUN apt-get install postgresql-server-dev-16 -y
RUN sh -c 'export PATH=/usr/lib/postgresql/16/bin:$PATH'
RUN apt-get install postgresql-16-wal2json -y
15 changes: 14 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
ipv4_address: 10.5.0.5

postgres_ssl:
image: postgres:latest
image: postgres:16
command: postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key -c ssl_ca_file=/var/lib/postgresql/ca.crt -c hba_file=/var/lib/postgresql/pg_hba.conf
environment:
POSTGRES_USER: postgres
Expand All @@ -57,6 +57,19 @@ services:
ports:
- "5433:5432"

postgres_log_based:
image: meltano/log_based # Locally built
command: postgres -c wal_level=logical -c max_replication_slots=10 -c max_wal_senders=10
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- ./log_based:/docker-entrypoint-initdb.d
ports:
- "5434:5432"


networks:
inner:
driver: bridge
Expand Down
1 change: 1 addition & 0 deletions log_based/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json');
4 changes: 3 additions & 1 deletion tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,10 @@ def schema(self) -> dict:
"""Override schema for log-based replication adding _sdc columns."""
schema_dict = typing.cast(dict, self._singer_catalog_entry.schema.to_dict())
for property in schema_dict["properties"].values():
if "null" not in property["type"]:
if isinstance(property["type"], list):
property["type"].append("null")
else:
property["type"] = [property["type"], "null"]
if "required" in schema_dict:
schema_dict.pop("required")
schema_dict["properties"].update({"_sdc_deleted_at": {"type": ["string"]}})
Expand Down
8 changes: 5 additions & 3 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def catalog_dict(self) -> dict:
return self._catalog_dict

@property
def catalog(self) -> Catalog:
def catalog(self) -> Catalog: # noqa: C901
"""Get the tap's working catalog.
Override to do LOG_BASED modifications.
Expand All @@ -531,8 +531,10 @@ def catalog(self) -> Catalog:
if new_stream.replication_method == "LOG_BASED":
for property in new_stream.schema.properties.values():
if "null" not in property.type:
stream_modified = True
property.type.append("null")
if isinstance(property.type, list):
property.type.append("null")
else:
property.type = [property.type, "null"]
if new_stream.schema.required:
stream_modified = True
new_stream.schema.required = None
Expand Down
59 changes: 59 additions & 0 deletions tests/test_log_based.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json

import sqlalchemy
from sqlalchemy import Column, MetaData, Table
from sqlalchemy.dialects.postgresql import BIGINT, TEXT
from tap_postgres.tap import TapPostgres
from tests.test_core import PostgresTestRunner


LOG_BASED_CONFIG = {
"host": "localhost",
"port": 5434,
"user": "postgres",
"password": "postgres",
"database": "postgres",
}

def test_null_append():
"""LOG_BASED syncs failed with string property types. (issue #294).
This test checks that even when a catalog contains properties with types represented
as strings (ex: "object") instead of arrays (ex: ["object"] or ["object", "null"]),
LOG_BASED replication can still append the "null" option to a property's type.
"""
table_name = "test_null_append"
engine = sqlalchemy.create_engine("postgresql://postgres:postgres@localhost:5434/postgres")

metadata_obj = MetaData()
table = Table(
table_name,
metadata_obj,
Column("id", BIGINT, primary_key = True),
Column("data", TEXT, nullable = True)
)
with engine.connect() as conn:
table.drop(conn, checkfirst=True)
metadata_obj.create_all(conn)
insert = table.insert().values(id=123, data="hello world")
conn.execute(insert)
tap = TapPostgres(config=LOG_BASED_CONFIG)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
stream["replication_method"] = "LOG_BASED"
stream["replication_key"] = "_sdc_lsn"
stream["schema"]["properties"]["data"]["type"] = "string"
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "LOG_BASED"

test_runner = PostgresTestRunner(
tap_class=TapPostgres, config=LOG_BASED_CONFIG, catalog=tap_catalog
)
test_runner.sync_all()

0 comments on commit acaca95

Please sign in to comment.