From 764ba26f7eb9a86bc460a81c5a9fb943fc052ebc Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 21 Jun 2024 14:29:08 -0400 Subject: [PATCH 1/7] Add record/replay support. --- dbt/adapters/postgres/connections.py | 34 ++++++++++++++++++------- dbt/adapters/postgres/record.py | 37 ++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 9 deletions(-) create mode 100644 dbt/adapters/postgres/record.py diff --git a/dbt/adapters/postgres/connections.py b/dbt/adapters/postgres/connections.py index 83f26957..6e0ca443 100644 --- a/dbt/adapters/postgres/connections.py +++ b/dbt/adapters/postgres/connections.py @@ -5,10 +5,12 @@ from dbt.adapters.contracts.connection import AdapterResponse, Credentials from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import TypeCodeNotFound +from dbt.adapters.postgres.record import PostgresRecordReplayHandle from dbt.adapters.sql import SQLConnectionManager from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError from dbt_common.events.functions import warn_or_error from dbt_common.helper_types import Port +from dbt_common.record import get_record_mode_from_env, RecorderMode from mashumaro.jsonschema.annotations import Maximum, Minimum import psycopg2 from typing_extensions import Annotated @@ -132,17 +134,31 @@ def open(cls, connection): kwargs["application_name"] = credentials.application_name def connect(): - handle = psycopg2.connect( - dbname=credentials.database, - user=credentials.user, - host=credentials.host, - password=credentials.password, - port=credentials.port, - connect_timeout=credentials.connect_timeout, - **kwargs, - ) + handle = None + + # In replay mode, we won't connect to a real database at all, while + # in record and diff modes we do, but insert an intermediate handle + # object which monitors native connection activity. + rec_mode = get_record_mode_from_env() + if rec_mode != RecorderMode.REPLAY: + handle = psycopg2.connect( + dbname=credentials.database, + user=credentials.user, + host=credentials.host, + password=credentials.password, + port=credentials.port, + connect_timeout=credentials.connect_timeout, + **kwargs + ) + + if rec_mode is not None: + # If using the record/replay mechanism, regardless of mode, we + # use a wrapper. + handle = PostgresRecordReplayHandle(handle, connection) + if credentials.role: handle.cursor().execute("set role {}".format(credentials.role)) + return handle retryable_exceptions = [ diff --git a/dbt/adapters/postgres/record.py b/dbt/adapters/postgres/record.py new file mode 100644 index 00000000..09981b1e --- /dev/null +++ b/dbt/adapters/postgres/record.py @@ -0,0 +1,37 @@ +import dataclasses +from typing import Optional + +from dbt.adapters.record import RecordReplayHandle, RecordReplayCursor +from dbt_common.record import record_function, Record, Recorder + + +class PostgresRecordReplayHandle(RecordReplayHandle): + # PAW: Wrap + def cursor(self): + cursor = None if self.native_handle is None else self.native_handle.cursor() + return PostgresRecordReplayCursor(cursor, self.connection) + + +@dataclasses.dataclass +class CursorGetStatusMessageParams: + connection_name: str + + +@dataclasses.dataclass +class CursorGetStatusMessageResult: + msg: Optional[str] + + +class CursorGetStatusMessageRecord(Record): + params_cls = CursorGetStatusMessageParams + result_cls = CursorGetStatusMessageResult + + +Recorder.register_record_type(CursorGetStatusMessageRecord) + + +class PostgresRecordReplayCursor(RecordReplayCursor): + @property + @record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name") + def statusmessage(self): + return self.native_cursor.statusmessage From 492b412e1d9181d9e420368eae0a2787879bce5d Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 21 Jun 2024 14:40:10 -0400 Subject: [PATCH 2/7] Formatting fix --- dbt/adapters/postgres/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/postgres/connections.py b/dbt/adapters/postgres/connections.py index 6e0ca443..e8f0abe5 100644 --- a/dbt/adapters/postgres/connections.py +++ b/dbt/adapters/postgres/connections.py @@ -148,7 +148,7 @@ def connect(): password=credentials.password, port=credentials.port, connect_timeout=credentials.connect_timeout, - **kwargs + **kwargs, ) if rec_mode is not None: From 64e2ea6b69ee3b4f96069c9b332e40bf78055cef Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Sun, 7 Jul 2024 15:06:00 -0400 Subject: [PATCH 3/7] Add group to record types. --- dbt/adapters/postgres/record.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/postgres/record.py b/dbt/adapters/postgres/record.py index 09981b1e..52248a7f 100644 --- a/dbt/adapters/postgres/record.py +++ b/dbt/adapters/postgres/record.py @@ -6,7 +6,6 @@ class PostgresRecordReplayHandle(RecordReplayHandle): - # PAW: Wrap def cursor(self): cursor = None if self.native_handle is None else self.native_handle.cursor() return PostgresRecordReplayCursor(cursor, self.connection) @@ -22,12 +21,11 @@ class CursorGetStatusMessageResult: msg: Optional[str] +@Recorder.register_record_type class CursorGetStatusMessageRecord(Record): params_cls = CursorGetStatusMessageParams result_cls = CursorGetStatusMessageResult - - -Recorder.register_record_type(CursorGetStatusMessageRecord) + group = "Database" class PostgresRecordReplayCursor(RecordReplayCursor): From bdbfc3b2bc919796f68991e9738c2cee9f2d30a3 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Tue, 16 Jul 2024 17:19:10 -0400 Subject: [PATCH 4/7] Reorganize record/replay code to match dbt-adapters. --- dbt/adapters/postgres/record.py | 35 ------------------- dbt/adapters/postgres/record/__init__.py | 2 ++ dbt/adapters/postgres/record/cursor/cursor.py | 13 +++++++ dbt/adapters/postgres/record/cursor/status.py | 20 +++++++++++ dbt/adapters/postgres/record/handle.py | 8 +++++ 5 files changed, 43 insertions(+), 35 deletions(-) delete mode 100644 dbt/adapters/postgres/record.py create mode 100644 dbt/adapters/postgres/record/__init__.py create mode 100644 dbt/adapters/postgres/record/cursor/cursor.py create mode 100644 dbt/adapters/postgres/record/cursor/status.py create mode 100644 dbt/adapters/postgres/record/handle.py diff --git a/dbt/adapters/postgres/record.py b/dbt/adapters/postgres/record.py deleted file mode 100644 index 52248a7f..00000000 --- a/dbt/adapters/postgres/record.py +++ /dev/null @@ -1,35 +0,0 @@ -import dataclasses -from typing import Optional - -from dbt.adapters.record import RecordReplayHandle, RecordReplayCursor -from dbt_common.record import record_function, Record, Recorder - - -class PostgresRecordReplayHandle(RecordReplayHandle): - def cursor(self): - cursor = None if self.native_handle is None else self.native_handle.cursor() - return PostgresRecordReplayCursor(cursor, self.connection) - - -@dataclasses.dataclass -class CursorGetStatusMessageParams: - connection_name: str - - -@dataclasses.dataclass -class CursorGetStatusMessageResult: - msg: Optional[str] - - -@Recorder.register_record_type -class CursorGetStatusMessageRecord(Record): - params_cls = CursorGetStatusMessageParams - result_cls = CursorGetStatusMessageResult - group = "Database" - - -class PostgresRecordReplayCursor(RecordReplayCursor): - @property - @record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name") - def statusmessage(self): - return self.native_cursor.statusmessage diff --git a/dbt/adapters/postgres/record/__init__.py b/dbt/adapters/postgres/record/__init__.py new file mode 100644 index 00000000..9b8b9b3a --- /dev/null +++ b/dbt/adapters/postgres/record/__init__.py @@ -0,0 +1,2 @@ +from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor +from dbt.adapters.postgres.record.handle import PostgresRecordReplayHandle diff --git a/dbt/adapters/postgres/record/cursor/cursor.py b/dbt/adapters/postgres/record/cursor/cursor.py new file mode 100644 index 00000000..fc13e3cd --- /dev/null +++ b/dbt/adapters/postgres/record/cursor/cursor.py @@ -0,0 +1,13 @@ +from dbt_common.record import record_function + +from dbt.adapters.record import RecordReplayCursor + +from dbt.adapters.postgres.record.cursor.status import CursorGetStatusMessageRecord + + +class PostgresRecordReplayCursor(RecordReplayCursor): + @property + @record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name") + def statusmessage(self): + return self.native_cursor.statusmessage + diff --git a/dbt/adapters/postgres/record/cursor/status.py b/dbt/adapters/postgres/record/cursor/status.py new file mode 100644 index 00000000..55b40751 --- /dev/null +++ b/dbt/adapters/postgres/record/cursor/status.py @@ -0,0 +1,20 @@ +import dataclasses +from typing import Optional + +from dbt_common.record import Record, Recorder + +@dataclasses.dataclass +class CursorGetStatusMessageParams: + connection_name: str + + +@dataclasses.dataclass +class CursorGetStatusMessageResult: + msg: Optional[str] + + +@Recorder.register_record_type +class CursorGetStatusMessageRecord(Record): + params_cls = CursorGetStatusMessageParams + result_cls = CursorGetStatusMessageResult + group = "Database" diff --git a/dbt/adapters/postgres/record/handle.py b/dbt/adapters/postgres/record/handle.py new file mode 100644 index 00000000..c6a589a7 --- /dev/null +++ b/dbt/adapters/postgres/record/handle.py @@ -0,0 +1,8 @@ +from dbt.adapters.record import RecordReplayHandle + +from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor + +class PostgresRecordReplayHandle(RecordReplayHandle): + def cursor(self): + cursor = None if self.native_handle is None else self.native_handle.cursor() + return PostgresRecordReplayCursor(cursor, self.connection) From b8cf4be80128a399340b916915ab033f078ab92c Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Tue, 16 Jul 2024 17:23:38 -0400 Subject: [PATCH 5/7] Add docstrings --- dbt/adapters/postgres/record/cursor/cursor.py | 2 ++ dbt/adapters/postgres/record/handle.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/dbt/adapters/postgres/record/cursor/cursor.py b/dbt/adapters/postgres/record/cursor/cursor.py index fc13e3cd..a7edfe2e 100644 --- a/dbt/adapters/postgres/record/cursor/cursor.py +++ b/dbt/adapters/postgres/record/cursor/cursor.py @@ -6,6 +6,8 @@ class PostgresRecordReplayCursor(RecordReplayCursor): + """A custom extension of RecordReplayCursor that adds the statusmessage + property which is specific to psycopg.""" @property @record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name") def statusmessage(self): diff --git a/dbt/adapters/postgres/record/handle.py b/dbt/adapters/postgres/record/handle.py index c6a589a7..d75e0e21 100644 --- a/dbt/adapters/postgres/record/handle.py +++ b/dbt/adapters/postgres/record/handle.py @@ -3,6 +3,9 @@ from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor class PostgresRecordReplayHandle(RecordReplayHandle): + """A custom extension of RecordReplayHandle that returns + a psycopg-specific PostgresRecordReplayCursor object.""" + def cursor(self): cursor = None if self.native_handle is None else self.native_handle.cursor() return PostgresRecordReplayCursor(cursor, self.connection) From fd7715c11f16bcd33a1c9a4d096ea86239d43ae5 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Tue, 16 Jul 2024 17:49:18 -0400 Subject: [PATCH 6/7] Formatting fixes. --- dbt/adapters/postgres/record/cursor/cursor.py | 2 +- dbt/adapters/postgres/record/cursor/status.py | 1 + dbt/adapters/postgres/record/handle.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/postgres/record/cursor/cursor.py b/dbt/adapters/postgres/record/cursor/cursor.py index a7edfe2e..a096c9f5 100644 --- a/dbt/adapters/postgres/record/cursor/cursor.py +++ b/dbt/adapters/postgres/record/cursor/cursor.py @@ -8,8 +8,8 @@ class PostgresRecordReplayCursor(RecordReplayCursor): """A custom extension of RecordReplayCursor that adds the statusmessage property which is specific to psycopg.""" + @property @record_function(CursorGetStatusMessageRecord, method=True, id_field_name="connection_name") def statusmessage(self): return self.native_cursor.statusmessage - diff --git a/dbt/adapters/postgres/record/cursor/status.py b/dbt/adapters/postgres/record/cursor/status.py index 55b40751..1e8d9620 100644 --- a/dbt/adapters/postgres/record/cursor/status.py +++ b/dbt/adapters/postgres/record/cursor/status.py @@ -3,6 +3,7 @@ from dbt_common.record import Record, Recorder + @dataclasses.dataclass class CursorGetStatusMessageParams: connection_name: str diff --git a/dbt/adapters/postgres/record/handle.py b/dbt/adapters/postgres/record/handle.py index d75e0e21..119dc2f1 100644 --- a/dbt/adapters/postgres/record/handle.py +++ b/dbt/adapters/postgres/record/handle.py @@ -2,6 +2,7 @@ from dbt.adapters.postgres.record.cursor.cursor import PostgresRecordReplayCursor + class PostgresRecordReplayHandle(RecordReplayHandle): """A custom extension of RecordReplayHandle that returns a psycopg-specific PostgresRecordReplayCursor object.""" From b2247cfde25b7fcd52934cae3f394b6251db1c6c Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Tue, 16 Jul 2024 17:54:37 -0400 Subject: [PATCH 7/7] Add changelog entry. --- .changes/unreleased/Under the Hood-20240716-172442.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Under the Hood-20240716-172442.yaml diff --git a/.changes/unreleased/Under the Hood-20240716-172442.yaml b/.changes/unreleased/Under the Hood-20240716-172442.yaml new file mode 100644 index 00000000..8777edbb --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240716-172442.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Add support for experimental record/replay testing. +time: 2024-07-16T17:24:42.271859-04:00 +custom: + Author: peterallenwebb + Issue: "123"