diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a2460e8f..ade31213 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,9 +28,12 @@ jobs: sudo chown 999:999 ssl/server.key chmod 600 ssl/pkey.key - - name: Set up Postgres container + - name: Build Postgres container run: | - docker compose -f docker-compose.yml up -d + docker build . --tag meltano/log_based + - name: Compose Postgres container + run: | + docker compose -f docker-compose.yml up -d --wait --wait-timeout=30 - uses: isbang/compose-action@v1.5.1 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..478adf97 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM postgres:16 + +RUN apt-get update +RUN apt-mark hold locales +RUN apt-get install curl ca-certificates -y +RUN install -d /usr/share/postgresql-common/pgdg +RUN curl -o /usr/share/postgresql-common/pgdg/apt.postgresql.org.asc --fail https://www.postgresql.org/media/keys/ACCC4CF8.asc +RUN sh -c 'echo "deb [signed-by=/usr/share/postgresql-common/pgdg/apt.postgresql.org.asc] https://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list' +RUN apt-get update +RUN apt-get install postgresql-server-dev-16 -y +RUN sh -c 'export PATH=/usr/lib/postgresql/16/bin:$PATH' +RUN apt-get install postgresql-16-wal2json -y diff --git a/docker-compose.yml b/docker-compose.yml index 1253d1ba..df081c77 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,7 +40,7 @@ services: ipv4_address: 10.5.0.5 postgres_ssl: - image: postgres:latest + image: postgres:16 command: postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key -c ssl_ca_file=/var/lib/postgresql/ca.crt -c hba_file=/var/lib/postgresql/pg_hba.conf environment: POSTGRES_USER: postgres @@ -57,6 +57,19 @@ services: ports: - "5433:5432" + postgres_log_based: + image: meltano/log_based # Locally built + command: postgres -c wal_level=logical -c max_replication_slots=10 -c max_wal_senders=10 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + volumes: + - ./log_based:/docker-entrypoint-initdb.d + ports: + - "5434:5432" + + networks: inner: driver: bridge diff --git a/log_based/init.sql b/log_based/init.sql new file mode 100644 index 00000000..4cd87906 --- /dev/null +++ b/log_based/init.sql @@ -0,0 +1 @@ +SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json'); \ No newline at end of file diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 0ad4fef7..cbd8822a 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -306,8 +306,10 @@ def schema(self) -> dict: """Override schema for log-based replication adding _sdc columns.""" schema_dict = typing.cast(dict, self._singer_catalog_entry.schema.to_dict()) for property in schema_dict["properties"].values(): - if "null" not in property["type"]: + if isinstance(property["type"], list): property["type"].append("null") + else: + property["type"] = [property["type"], "null"] if "required" in schema_dict: schema_dict.pop("required") schema_dict["properties"].update({"_sdc_deleted_at": {"type": ["string"]}}) diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index c04ed08b..0aebab29 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -515,7 +515,7 @@ def catalog_dict(self) -> dict: return self._catalog_dict @property - def catalog(self) -> Catalog: + def catalog(self) -> Catalog: # noqa: C901 """Get the tap's working catalog. Override to do LOG_BASED modifications. @@ -531,8 +531,10 @@ def catalog(self) -> Catalog: if new_stream.replication_method == "LOG_BASED": for property in new_stream.schema.properties.values(): if "null" not in property.type: - stream_modified = True - property.type.append("null") + if isinstance(property.type, list): + property.type.append("null") + else: + property.type = [property.type, "null"] if new_stream.schema.required: stream_modified = True new_stream.schema.required = None diff --git a/tests/test_log_based.py b/tests/test_log_based.py new file mode 100644 index 00000000..0048c0a4 --- /dev/null +++ b/tests/test_log_based.py @@ -0,0 +1,59 @@ +import json + +import sqlalchemy +from sqlalchemy import Column, MetaData, Table +from sqlalchemy.dialects.postgresql import BIGINT, TEXT +from tap_postgres.tap import TapPostgres +from tests.test_core import PostgresTestRunner + + +LOG_BASED_CONFIG = { + "host": "localhost", + "port": 5434, + "user": "postgres", + "password": "postgres", + "database": "postgres", +} + +def test_null_append(): + """LOG_BASED syncs failed with string property types. (issue #294). + + This test checks that even when a catalog contains properties with types represented + as strings (ex: "object") instead of arrays (ex: ["object"] or ["object", "null"]), + LOG_BASED replication can still append the "null" option to a property's type. + """ + table_name = "test_null_append" + engine = sqlalchemy.create_engine("postgresql://postgres:postgres@localhost:5434/postgres") + + metadata_obj = MetaData() + table = Table( + table_name, + metadata_obj, + Column("id", BIGINT, primary_key = True), + Column("data", TEXT, nullable = True) + ) + with engine.connect() as conn: + table.drop(conn, checkfirst=True) + metadata_obj.create_all(conn) + insert = table.insert().values(id=123, data="hello world") + conn.execute(insert) + tap = TapPostgres(config=LOG_BASED_CONFIG) + tap_catalog = json.loads(tap.catalog_json_text) + altered_table_name = f"public-{table_name}" + for stream in tap_catalog["streams"]: + if stream.get("stream") and altered_table_name not in stream["stream"]: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = False + else: + stream["replication_method"] = "LOG_BASED" + stream["replication_key"] = "_sdc_lsn" + stream["schema"]["properties"]["data"]["type"] = "string" + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = True + if metadata["breadcrumb"] == []: + metadata["metadata"]["replication-method"] = "LOG_BASED" + + test_runner = PostgresTestRunner( + tap_class=TapPostgres, config=LOG_BASED_CONFIG, catalog=tap_catalog + ) + test_runner.sync_all()