From 28a69f4b3265e3830e5bf1e4d2f403edd2eb76dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 21 Dec 2023 14:34:19 -0600 Subject: [PATCH 1/5] ci: Run mypy in CI --- poetry.lock | 32 +++++++++++++++++++++++++- pyproject.toml | 7 ++++-- tap_postgres/client.py | 27 +++++++++++----------- tap_postgres/tap.py | 51 ++++++++++++++++++++---------------------- tox.ini | 2 +- 5 files changed, 74 insertions(+), 45 deletions(-) diff --git a/poetry.lock b/poetry.lock index ab26f1e6..e459bbf8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1832,6 +1832,8 @@ files = [ [package.dependencies] greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"} +mypy = {version = ">=0.910", optional = true, markers = "python_version >= \"3\" and extra == \"mypy\""} +sqlalchemy2-stubs = {version = "*", optional = true, markers = "extra == \"mypy\""} [package.extras] aiomysql = ["aiomysql (>=0.2.0)", "greenlet (!=0.4.17)"] @@ -1854,6 +1856,20 @@ postgresql-psycopg2cffi = ["psycopg2cffi"] pymysql = ["pymysql", "pymysql (<1)"] sqlcipher = ["sqlcipher3_binary"] +[[package]] +name = "sqlalchemy2-stubs" +version = "0.0.2a37" +description = "Typing Stubs for SQLAlchemy 1.4" +optional = false +python-versions = ">=3.6" +files = [ + {file = "sqlalchemy2-stubs-0.0.2a37.tar.gz", hash = "sha256:619bd131f8ad7eeb88af8dfabb0e6ba07593db91868cdc49674d2cd4ca61e986"}, + {file = "sqlalchemy2_stubs-0.0.2a37-py3-none-any.whl", hash = "sha256:43067e3f67bd16a7fb2b574ee696f7ed53bf60f3e73329cf234a7b4dd4d3816a"}, +] + +[package.dependencies] +typing-extensions = ">=3.7.4" + [[package]] name = "sshtunnel" version = "0.4.0" @@ -1911,6 +1927,20 @@ virtualenv = ">=20.25" docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-argparse-cli (>=1.11.1)", "sphinx-autodoc-typehints (>=1.25.2)", "sphinx-copybutton (>=0.5.2)", "sphinx-inline-tabs (>=2023.4.21)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=23.11)"] testing = ["build[virtualenv] (>=1.0.3)", "covdefaults (>=2.3)", "detect-test-pollution (>=1.2)", "devpi-process (>=1)", "diff-cover (>=8.0.2)", "distlib (>=0.3.8)", "flaky (>=3.7)", "hatch-vcs (>=0.4)", "hatchling (>=1.21)", "psutil (>=5.9.7)", "pytest (>=7.4.4)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-xdist (>=3.5)", "re-assert (>=1.1)", "time-machine (>=2.13)", "wheel (>=0.42)"] +[[package]] +name = "types-paramiko" +version = "3.3.0.2" +description = "Typing stubs for paramiko" +optional = false +python-versions = ">=3.7" +files = [ + {file = "types-paramiko-3.3.0.2.tar.gz", hash = "sha256:4615fa0bc5b78c0f1b68b106071dc29737cb2cf53903712df72785dad5b359c3"}, + {file = "types_paramiko-3.3.0.2-py3-none-any.whl", hash = "sha256:18fe96e6ef78ca04b2ac2a111f9404409b31a1d28bb2f5dca1a1afd62b8351a9"}, +] + +[package.dependencies] +cryptography = ">=37.0.0" + [[package]] name = "typing-extensions" version = "4.9.0" @@ -1987,4 +2017,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "<3.13,>=3.8.1" -content-hash = "be7623681c33f3fbcb99780e77d59c583442841d5c413d79243404166c18edcc" +content-hash = "c11d9899c1e18797e005f7d6782c5c1493dc3d70265cafeef91282990251596a" diff --git a/pyproject.toml b/pyproject.toml index 00a4bb08..407a4579 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,8 +47,10 @@ mypy = "1.8.0" pendulum = "~=3.0" pre-commit = "^3.0.4" pydocstyle = "^6.1.1" -singer-sdk = {version = "*", extras = ["testing"]} +singer-sdk = { version = "*", extras = ["testing"] } +sqlalchemy = { version = "<2", extras = ["mypy"] } tox = "^4" +types-paramiko = "^3.3.0.2" [tool.isort] profile = "black" @@ -57,13 +59,14 @@ src_paths = "tap_postgres" [tool.mypy] exclude = "tests" -python_version = "3.9" +python_version = "3.11" warn_unused_configs = true warn_unused_ignores = true [[tool.mypy.overrides]] ignore_missing_imports = true module = [ + "psycopg2", "sshtunnel", ] diff --git a/tap_postgres/client.py b/tap_postgres/client.py index f09ea74e..dec9b73d 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -117,7 +117,7 @@ def __init__( # Note super is static, we can get away with this because this is called once # and is luckily referenced via the instance of the class - def to_jsonschema_type( + def to_jsonschema_type( # type: ignore[override] self, sql_type: Union[ str, @@ -211,12 +211,12 @@ def sdk_typing_object( | th.StringType | th.BooleanType, ] = { - "jsonb": th.CustomType( - {"type": ["string", "number", "integer", "array", "object", "boolean"]} - ), - "json": th.CustomType( - {"type": ["string", "number", "integer", "array", "object", "boolean"]} - ), + "jsonb": th.CustomType({ + "type": ["string", "number", "integer", "array", "object", "boolean"] + }), + "json": th.CustomType({ + "type": ["string", "number", "integer", "array", "object", "boolean"] + }), "timestamp": th.DateTimeType(), "datetime": th.DateTimeType(), "date": th.DateType(), @@ -475,13 +475,11 @@ def consume(self, message) -> dict | None: elif message_payload["action"] in delete_actions: for column in message_payload["identity"]: row.update({column["name"]: column["value"]}) - row.update( - { - "_sdc_deleted_at": datetime.datetime.utcnow().strftime( - r"%Y-%m-%dT%H:%M:%SZ" - ) - } - ) + row.update({ + "_sdc_deleted_at": datetime.datetime.utcnow().strftime( + r"%Y-%m-%dT%H:%M:%SZ" + ) + }) row.update({"_sdc_lsn": message.data_start}) elif message_payload["action"] in truncate_actions: self.logger.debug( @@ -529,6 +527,7 @@ def logical_replication_connection(self): # TODO: Make this change upstream in the SDK? # I'm not sure if in general SQL databases don't guarantee order of records log # replication, but at least Postgres does not. + @property def is_sorted(self) -> bool: """Return True if the stream is sorted by the replication key.""" return self.replication_method == REPLICATION_INCREMENTAL diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index 0aebab29..5251ec09 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -1,4 +1,5 @@ """Postgres tap class.""" + from __future__ import annotations import atexit @@ -10,7 +11,7 @@ from typing import Any, Dict, cast import paramiko -from singer_sdk import SQLTap, Stream +from singer_sdk import SQLTap from singer_sdk import typing as th from singer_sdk._singerlib import ( # JSON schema typing helpers Catalog, @@ -405,7 +406,7 @@ def connector(self) -> PostgresConnector: """ # We mutate this url to use the ssh tunnel if enabled - url = make_url(self.get_sqlalchemy_url(config=self.config)) + url = make_url(self.get_sqlalchemy_url(config=self.config)) # type: ignore[arg-type] ssh_config = self.config.get("ssh_tunnel", {}) if ssh_config.get("enable", False): @@ -529,7 +530,7 @@ def catalog(self) -> Catalog: # noqa: C901 stream_modified = False new_stream = copy.deepcopy(stream) if new_stream.replication_method == "LOG_BASED": - for property in new_stream.schema.properties.values(): + for property in new_stream.schema.properties.values(): # type: ignore[union-attr] if "null" not in property.type: if isinstance(property.type, list): property.type.append("null") @@ -538,30 +539,26 @@ def catalog(self) -> Catalog: # noqa: C901 if new_stream.schema.required: stream_modified = True new_stream.schema.required = None - if "_sdc_deleted_at" not in new_stream.schema.properties: + if "_sdc_deleted_at" not in new_stream.schema.properties: # type: ignore[operator] stream_modified = True - new_stream.schema.properties.update( - {"_sdc_deleted_at": Schema(type=["string", "null"])} - ) - new_stream.metadata.update( - { - ("properties", "_sdc_deleted_at"): Metadata( - Metadata.InclusionType.AVAILABLE, True, None - ) - } - ) - if "_sdc_lsn" not in new_stream.schema.properties: + new_stream.schema.properties.update({ # type: ignore[union-attr] + "_sdc_deleted_at": Schema(type=["string", "null"]) + }) + new_stream.metadata.update({ + ("properties", "_sdc_deleted_at"): Metadata( + Metadata.InclusionType.AVAILABLE, True, None + ) + }) + if "_sdc_lsn" not in new_stream.schema.properties: # type: ignore[operator] stream_modified = True - new_stream.schema.properties.update( - {"_sdc_lsn": Schema(type=["integer", "null"])} - ) - new_stream.metadata.update( - { - ("properties", "_sdc_lsn"): Metadata( - Metadata.InclusionType.AVAILABLE, True, None - ) - } - ) + new_stream.schema.properties.update({ # type: ignore[union-attr] + "_sdc_lsn": Schema(type=["integer", "null"]) + }) + new_stream.metadata.update({ + ("properties", "_sdc_lsn"): Metadata( + Metadata.InclusionType.AVAILABLE, True, None + ) + }) if stream_modified: modified_streams.append(new_stream.tap_stream_id) new_catalog.add_stream(new_stream) @@ -573,13 +570,13 @@ def catalog(self) -> Catalog: # noqa: C901 ) return new_catalog - def discover_streams(self) -> list[Stream]: + def discover_streams(self) -> list[PostgresStream | PostgresLogBasedStream]: # type: ignore[override] """Initialize all available streams and return them as a list. Returns: List of discovered Stream objects. """ - streams = [] + streams: list[PostgresStream | PostgresLogBasedStream] = [] for catalog_entry in self.catalog_dict["streams"]: if catalog_entry["replication_method"] == "LOG_BASED": streams.append( diff --git a/tox.ini b/tox.ini index ccbf3b21..031ff4bd 100644 --- a/tox.ini +++ b/tox.ini @@ -42,7 +42,7 @@ commands = poetry run flake8 tap_postgres poetry run pydocstyle tap_postgres # refer to pyproject.toml for specific settings - # poetry run mypy . + poetry run mypy . [flake8] ignore = W503 From ce2ec74b1004ff8c9989832a2726fa07f8e83652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 21 Dec 2023 14:37:56 -0600 Subject: [PATCH 2/5] Make black happy --- tap_postgres/client.py | 12 +++++++----- tap_postgres/tap.py | 40 ++++++++++++++++++++++++---------------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index dec9b73d..d40c6a18 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -475,11 +475,13 @@ def consume(self, message) -> dict | None: elif message_payload["action"] in delete_actions: for column in message_payload["identity"]: row.update({column["name"]: column["value"]}) - row.update({ - "_sdc_deleted_at": datetime.datetime.utcnow().strftime( - r"%Y-%m-%dT%H:%M:%SZ" - ) - }) + row.update( + { + "_sdc_deleted_at": datetime.datetime.utcnow().strftime( + r"%Y-%m-%dT%H:%M:%SZ" + ) + } + ) row.update({"_sdc_lsn": message.data_start}) elif message_payload["action"] in truncate_actions: self.logger.debug( diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index 5251ec09..88638a2e 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -541,24 +541,32 @@ def catalog(self) -> Catalog: # noqa: C901 new_stream.schema.required = None if "_sdc_deleted_at" not in new_stream.schema.properties: # type: ignore[operator] stream_modified = True - new_stream.schema.properties.update({ # type: ignore[union-attr] - "_sdc_deleted_at": Schema(type=["string", "null"]) - }) - new_stream.metadata.update({ - ("properties", "_sdc_deleted_at"): Metadata( - Metadata.InclusionType.AVAILABLE, True, None - ) - }) + new_stream.schema.properties.update( + { # type: ignore[union-attr] + "_sdc_deleted_at": Schema(type=["string", "null"]) + } + ) + new_stream.metadata.update( + { + ("properties", "_sdc_deleted_at"): Metadata( + Metadata.InclusionType.AVAILABLE, True, None + ) + } + ) if "_sdc_lsn" not in new_stream.schema.properties: # type: ignore[operator] stream_modified = True - new_stream.schema.properties.update({ # type: ignore[union-attr] - "_sdc_lsn": Schema(type=["integer", "null"]) - }) - new_stream.metadata.update({ - ("properties", "_sdc_lsn"): Metadata( - Metadata.InclusionType.AVAILABLE, True, None - ) - }) + new_stream.schema.properties.update( + { # type: ignore[union-attr] + "_sdc_lsn": Schema(type=["integer", "null"]) + } + ) + new_stream.metadata.update( + { + ("properties", "_sdc_lsn"): Metadata( + Metadata.InclusionType.AVAILABLE, True, None + ) + } + ) if stream_modified: modified_streams.append(new_stream.tap_stream_id) new_catalog.add_stream(new_stream) From 95b6490df9a4539075dacd1a1265e11e2ccac129 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 21 Dec 2023 14:44:17 -0600 Subject: [PATCH 3/5] Ignore long lines --- tap_postgres/tap.py | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index 88638a2e..8089d784 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -406,7 +406,7 @@ def connector(self) -> PostgresConnector: """ # We mutate this url to use the ssh tunnel if enabled - url = make_url(self.get_sqlalchemy_url(config=self.config)) # type: ignore[arg-type] + url = make_url(self.get_sqlalchemy_url(config=self.config)) # type: ignore[arg-type] # noqa: E501 ssh_config = self.config.get("ssh_tunnel", {}) if ssh_config.get("enable", False): @@ -530,7 +530,7 @@ def catalog(self) -> Catalog: # noqa: C901 stream_modified = False new_stream = copy.deepcopy(stream) if new_stream.replication_method == "LOG_BASED": - for property in new_stream.schema.properties.values(): # type: ignore[union-attr] + for property in new_stream.schema.properties.values(): # type: ignore[union-attr] # noqa: E501 if "null" not in property.type: if isinstance(property.type, list): property.type.append("null") @@ -539,33 +539,37 @@ def catalog(self) -> Catalog: # noqa: C901 if new_stream.schema.required: stream_modified = True new_stream.schema.required = None - if "_sdc_deleted_at" not in new_stream.schema.properties: # type: ignore[operator] + if "_sdc_deleted_at" not in new_stream.schema.properties: # type: ignore[operator] # noqa: E501 stream_modified = True - new_stream.schema.properties.update( - { # type: ignore[union-attr] - "_sdc_deleted_at": Schema(type=["string", "null"]) - } + new_stream.schema.properties.update( # type: ignore[union-attr] + { + "_sdc_deleted_at": Schema(type=["string", "null"]), + }, ) new_stream.metadata.update( { ("properties", "_sdc_deleted_at"): Metadata( - Metadata.InclusionType.AVAILABLE, True, None - ) - } + Metadata.InclusionType.AVAILABLE, + True, + None, + ), + }, ) - if "_sdc_lsn" not in new_stream.schema.properties: # type: ignore[operator] + if "_sdc_lsn" not in new_stream.schema.properties: # type: ignore[operator] # noqa: E501 stream_modified = True - new_stream.schema.properties.update( - { # type: ignore[union-attr] - "_sdc_lsn": Schema(type=["integer", "null"]) - } + new_stream.schema.properties.update( # type: ignore[union-attr] + { + "_sdc_lsn": Schema(type=["integer", "null"]), + }, ) new_stream.metadata.update( { ("properties", "_sdc_lsn"): Metadata( - Metadata.InclusionType.AVAILABLE, True, None - ) - } + Metadata.InclusionType.AVAILABLE, + True, + None, + ), + }, ) if stream_modified: modified_streams.append(new_stream.tap_stream_id) @@ -578,7 +582,7 @@ def catalog(self) -> Catalog: # noqa: C901 ) return new_catalog - def discover_streams(self) -> list[PostgresStream | PostgresLogBasedStream]: # type: ignore[override] + def discover_streams(self) -> list[PostgresStream | PostgresLogBasedStream]: # type: ignore[override] # noqa: E501 """Initialize all available streams and return them as a list. Returns: From 0238b82f3d4d2654687b189159cb753f97e3e53e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Wed, 10 Jan 2024 16:56:28 -0600 Subject: [PATCH 4/5] Treat is_sorted as a property --- tap_postgres/client.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index d40c6a18..4fdabd3e 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -379,7 +379,7 @@ def _increment_stream_state( f"stream(replication method={self.replication_method})" ) raise ValueError(msg) - treat_as_sorted = self.is_sorted() + treat_as_sorted = self.is_sorted if not treat_as_sorted and self.state_partitioning_keys is not None: # Streams with custom state partitioning are not resumable. treat_as_sorted = False @@ -475,13 +475,11 @@ def consume(self, message) -> dict | None: elif message_payload["action"] in delete_actions: for column in message_payload["identity"]: row.update({column["name"]: column["value"]}) - row.update( - { - "_sdc_deleted_at": datetime.datetime.utcnow().strftime( - r"%Y-%m-%dT%H:%M:%SZ" - ) - } - ) + row.update({ + "_sdc_deleted_at": datetime.datetime.utcnow().strftime( + r"%Y-%m-%dT%H:%M:%SZ" + ) + }) row.update({"_sdc_lsn": message.data_start}) elif message_payload["action"] in truncate_actions: self.logger.debug( From 9329a43fc7ed163c7b6efe6d836f84973046bee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Wed, 10 Jan 2024 17:04:33 -0600 Subject: [PATCH 5/5] Fix additional errors --- .pre-commit-config.yaml | 19 ++++++------------- README.md | 2 +- log_based/init.sql | 2 +- poetry.lock | 2 +- tap_postgres/client.py | 28 ++++++++++++++++------------ tests/test_core.py | 6 +++--- tests/test_log_based.py | 11 +++++++---- 7 files changed, 35 insertions(+), 35 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cdf38d9..f64ed824 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,32 +1,25 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.3.0 + rev: v4.5.0 hooks: - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/psf/black - rev: 22.10.0 + rev: 23.12.1 hooks: - id: black - repo: https://github.com/pycqa/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort - repo: https://github.com/python-poetry/poetry - rev: 1.3.2 + rev: 1.7.0 hooks: - id: poetry-check - id: poetry-lock args: [--no-update] -- repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v0.991' - hooks: - - id: mypy - exclude: tests - additional_dependencies: - - types-paramiko - repo: https://github.com/pycqa/flake8 - rev: 6.1.0 + rev: 7.0.0 hooks: - - id: flake8 \ No newline at end of file + - id: flake8 diff --git a/README.md b/README.md index 7f538dc1..226d317e 100644 --- a/README.md +++ b/README.md @@ -261,4 +261,4 @@ Note also that using log-based replication will cause the replication key for al "*": replication_method: LOG_BASED replication_key: _sdc_lsn - ``` \ No newline at end of file + ``` diff --git a/log_based/init.sql b/log_based/init.sql index 4cd87906..a700552f 100644 --- a/log_based/init.sql +++ b/log_based/init.sql @@ -1 +1 @@ -SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json'); \ No newline at end of file +SELECT * FROM pg_create_logical_replication_slot('tappostgres', 'wal2json'); diff --git a/poetry.lock b/poetry.lock index e459bbf8..e672c505 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand. [[package]] name = "appdirs" diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 4fdabd3e..141c6454 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -178,6 +178,7 @@ def sdk_typing_object( | th.DateType | th.StringType | th.BooleanType + | th.CustomType ): """Return the JSON Schema dict that describes the sql type. @@ -209,14 +210,15 @@ def sdk_typing_object( | th.IntegerType | th.DateType | th.StringType - | th.BooleanType, + | th.BooleanType + | th.CustomType, ] = { - "jsonb": th.CustomType({ - "type": ["string", "number", "integer", "array", "object", "boolean"] - }), - "json": th.CustomType({ - "type": ["string", "number", "integer", "array", "object", "boolean"] - }), + "jsonb": th.CustomType( + {"type": ["string", "number", "integer", "array", "object", "boolean"]} + ), + "json": th.CustomType( + {"type": ["string", "number", "integer", "array", "object", "boolean"]} + ), "timestamp": th.DateTimeType(), "datetime": th.DateTimeType(), "date": th.DateType(), @@ -475,11 +477,13 @@ def consume(self, message) -> dict | None: elif message_payload["action"] in delete_actions: for column in message_payload["identity"]: row.update({column["name"]: column["value"]}) - row.update({ - "_sdc_deleted_at": datetime.datetime.utcnow().strftime( - r"%Y-%m-%dT%H:%M:%SZ" - ) - }) + row.update( + { + "_sdc_deleted_at": datetime.datetime.utcnow().strftime( + r"%Y-%m-%dT%H:%M:%SZ" + ) + } + ) row.update({"_sdc_lsn": message.data_start}) elif message_payload["action"] in truncate_actions: self.logger.debug( diff --git a/tests/test_core.py b/tests/test_core.py index 1bb8614d..9736ca80 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -11,13 +11,13 @@ from singer_sdk.testing.runners import TapTestRunner from sqlalchemy import Column, DateTime, Integer, MetaData, Numeric, String, Table, text from sqlalchemy.dialects.postgresql import ( + ARRAY, BIGINT, DATE, JSON, JSONB, TIME, TIMESTAMP, - ARRAY, ) from tests.settings import DB_SCHEMA_NAME, DB_SQLALCHEMY_URL from tests.test_replication_key import TABLE_NAME, TapTestReplicationKey @@ -418,13 +418,13 @@ def run_sync_dry_run(self) -> bool: return True -def test_invalid_python_dates(): +def test_invalid_python_dates(): # noqa: C901 """Some dates are invalid in python, but valid in Postgres Check out https://www.psycopg.org/psycopg3/docs/advanced/adapt.html#example-handling-infinity-date for more information. - """ + """ # noqa: E501 table_name = "test_invalid_python_dates" engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"], future=True) diff --git a/tests/test_log_based.py b/tests/test_log_based.py index 0048c0a4..889ee12b 100644 --- a/tests/test_log_based.py +++ b/tests/test_log_based.py @@ -3,9 +3,9 @@ import sqlalchemy from sqlalchemy import Column, MetaData, Table from sqlalchemy.dialects.postgresql import BIGINT, TEXT -from tap_postgres.tap import TapPostgres from tests.test_core import PostgresTestRunner +from tap_postgres.tap import TapPostgres LOG_BASED_CONFIG = { "host": "localhost", @@ -15,6 +15,7 @@ "database": "postgres", } + def test_null_append(): """LOG_BASED syncs failed with string property types. (issue #294). @@ -23,14 +24,16 @@ def test_null_append(): LOG_BASED replication can still append the "null" option to a property's type. """ table_name = "test_null_append" - engine = sqlalchemy.create_engine("postgresql://postgres:postgres@localhost:5434/postgres") + engine = sqlalchemy.create_engine( + "postgresql://postgres:postgres@localhost:5434/postgres" + ) metadata_obj = MetaData() table = Table( table_name, metadata_obj, - Column("id", BIGINT, primary_key = True), - Column("data", TEXT, nullable = True) + Column("id", BIGINT, primary_key=True), + Column("data", TEXT, nullable=True), ) with engine.connect() as conn: table.drop(conn, checkfirst=True)