Skip to content

Commit

Permalink
Merge pull request #25 from thread/thread/postgres-snapshot-1
Browse files Browse the repository at this point in the history
Make sure that Postgres always get a consistent snapshot
  • Loading branch information
judahrand authored Apr 19, 2022
2 parents e8dc508 + 642b837 commit c9941cf
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 68 deletions.
164 changes: 107 additions & 57 deletions pipelinewise/fastsync/commons/tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import logging
import re
import sys
import textwrap
import time
import psycopg2
import psycopg2.errors
import psycopg2.extras

from typing import Dict
Expand All @@ -15,6 +18,12 @@
LOGGER = logging.getLogger(__name__)


def parse_lsn(lsn: str) -> int:
"""Parse string LSN format to integer"""
file, index = lsn.split('/')
return (int(file, 16) << 32) + int(index, 16)


class FastSyncTapPostgres:
"""
Common functions for fastsync from a Postgres database
Expand All @@ -25,9 +34,8 @@ def __init__(self, connection_config, tap_type_to_target_type, target_quote=None
self.tap_type_to_target_type = tap_type_to_target_type
self.target_quote = target_quote
self.conn = None
self.curr = None
self.primary_host_conn = None
self.primary_host_curr = None
self.version = None

@staticmethod
def generate_replication_slot_name(dbname, tap_id=None, prefix='pipelinewise'):
Expand Down Expand Up @@ -184,7 +192,6 @@ def open_connection(self):
self.conn = self.get_connection(
self.connection_config, prioritize_primary=False
)
self.curr = self.conn.cursor()

def close_connection(self):
"""
Expand Down Expand Up @@ -221,7 +228,7 @@ def primary_host_query(self, query, params=None):
return []

# pylint: disable=no-member
def create_replication_slot(self):
def create_replication_slot(self) -> int:
"""
Create replication slot on the primary host
Expand All @@ -236,24 +243,76 @@ def create_replication_slot(self):
format you won't be able to do LOG_BASED replication from the same postgres
database by multiple taps. If that the case then you need to drop the old
replication slot and full-resync the new taps.
Returns the LSN at which the replication slot is consistent.
"""
try:
slot_name = self.__get_slot_name(
self.primary_host_conn,
self.connection_config['dbname'],
self.connection_config['tap_id'],
)
slot_name = self.__get_slot_name(
self.primary_host_conn,
self.connection_config['dbname'],
self.connection_config['tap_id'],
)

# Create the replication host
# Create the replication host
return parse_lsn(
self.primary_host_query(
f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')"
)
except Exception as exc:
# ERROR: replication slot already exists SQL state: 42710
if hasattr(exc, 'pgcode') and exc.pgcode == '42710':
pass
)[0]['lsn']
)

def get_current_lsn(self) -> int:
"""Obtain the most recent LSN availiable."""
# is replica_host set ?
if self.connection_config.get('replica_host'):
# Get latest applied lsn from replica_host
if self.version >= 100000:
result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn')
elif self.version >= 90400:
result = self.query(
'SELECT pg_last_xlog_replay_location() AS current_lsn'
)
else:
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
else:
# Get current lsn from primary host
if self.version >= 100000:
result = self.query('SELECT pg_current_wal_lsn() AS current_lsn')
elif self.version >= 90400:
result = self.query('SELECT pg_current_xlog_location() AS current_lsn')
else:
raise exc
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
return parse_lsn(result[0]['current_lsn'])

def get_confirmed_flush_lsn(self) -> int:
"""
Get the last flushed LSN for the replication slot.
For Postgres <9.6 this defaults to the restart_lsn as confirmed_flush_lsn
is not availiable.
"""
slot_name = self.__get_slot_name(
self.primary_host_conn,
self.connection_config['dbname'],
self.connection_config['tap_id'],
)

res = self.primary_host_query(
textwrap.dedent(
f"""\
SELECT *
FROM pg_replication_slots
WHERE slot_name = '{slot_name}'
AND plugin = 'wal2json'
AND slot_type = 'logical'"""
)
)[0]

# confirmed_flush_lsn was introduced in Postgres 9.6 so fallback to
# restart_lsn if needed.
return parse_lsn(res.get('confirmed_flush_lsn', res['restart_lsn']))

# pylint: disable=too-many-branches,no-member,chained-comparison
def fetch_current_log_pos(self):
Expand All @@ -265,63 +324,52 @@ def fetch_current_log_pos(self):
self.primary_host_conn = self.get_connection(
self.connection_config, prioritize_primary=True
)
self.primary_host_curr = self.primary_host_conn.cursor()

# Make sure PostgreSQL version is 9.4 or higher
result = self.primary_host_query(
"SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'"
)
version = result[0].get('version')
self.version = result[0].get('version')

# Do not allow minor versions with PostgreSQL BUG #15114
if (version >= 110000) and (version < 110002):
if (self.version >= 110000) and (self.version < 110002):
raise Exception('PostgreSQL upgrade required to minor version 11.2')
if (version >= 100000) and (version < 100007):
if (self.version >= 100000) and (self.version < 100007):
raise Exception('PostgreSQL upgrade required to minor version 10.7')
if (version >= 90600) and (version < 90612):
if (self.version >= 90600) and (self.version < 90612):
raise Exception('PostgreSQL upgrade required to minor version 9.6.12')
if (version >= 90500) and (version < 90516):
if (self.version >= 90500) and (self.version < 90516):
raise Exception('PostgreSQL upgrade required to minor version 9.5.16')
if (version >= 90400) and (version < 90421):
if (self.version >= 90400) and (self.version < 90421):
raise Exception('PostgreSQL upgrade required to minor version 9.4.21')
if version < 90400:
if self.version < 90400:
raise Exception('Logical replication not supported before PostgreSQL 9.4')

# Create replication slot
self.create_replication_slot()
try:
# Create replication slot and obtain the oldest LSN
# that it can deliver.
oldest_lsn = self.create_replication_slot()
except psycopg2.errors.DuplicateObject:
# Replication slot exists already so just get the
# oldest LSN which the slot can deliver.
oldest_lsn = self.get_confirmed_flush_lsn()

# Close replication slot dedicated connection
self.primary_host_conn.close()

# is replica_host set ?
if self.connection_config.get('replica_host'):
# Get latest applied lsn from replica_host
if version >= 100000:
result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn')
elif version >= 90400:
result = self.query(
'SELECT pg_last_xlog_replay_location() AS current_lsn'
)
else:
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
else:
# Get current lsn from primary host
if version >= 100000:
result = self.query('SELECT pg_current_wal_lsn() AS current_lsn')
elif version >= 90400:
result = self.query('SELECT pg_current_xlog_location() AS current_lsn')
else:
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
current_lsn = self.get_current_lsn()

current_lsn = result[0].get('current_lsn')
file, index = current_lsn.split('/')
lsn = (int(file, 16) << 32) + int(index, 16)
if self.connection_config.get('replica_host'):
# Ensure that the newest LSN availiable on the replica is newer than
# the oldest LSN which is deliverable by the replication slot.
max_time_waited = 60 # seconds
now = time.time()
while oldest_lsn > current_lsn:
if time.time() - now > max_time_waited:
raise RuntimeError('Replica database is lagging too far behind Primary.')
current_lsn = self.get_current_lsn()

return {'lsn': lsn, 'version': 1}
return {'lsn': current_lsn, 'version': 1}

# pylint: disable=invalid-name
def fetch_current_incremental_key_pos(self, table, replication_key):
Expand Down Expand Up @@ -485,8 +533,9 @@ def copy_table(
split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000)
split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20)
"""
table_columns = self.get_table_columns(table_name, max_num, date_type)
column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns]
column_safe_sql_values = [
c.get('safe_sql_value') for c in self.get_table_columns(table_name, max_num, date_type)
]

# If self.get_table_columns returns zero row then table not exist
if len(column_safe_sql_values) == 0:
Expand All @@ -513,4 +562,5 @@ def copy_table(
)

with gzip_splitter as split_gzip_files:
self.curr.copy_expert(sql, split_gzip_files, size=131072)
with self.conn.cursor() as cur:
cur.copy_expert(sql, split_gzip_files, size=131072)
23 changes: 12 additions & 11 deletions tests/units/fastsync/commons/test_fastsync_tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ def setUp(self) -> None:
self.postgres.executed_queries_primary_host = []
self.postgres.executed_queries = []

def primary_host_query_mock(query, _=None):
self.postgres.executed_queries_primary_host.append(query)

self.postgres.primary_host_query = primary_host_query_mock

def test_generate_repl_slot_name(self):
"""Validate if the replication slot name generated correctly"""
# Provide only database name
Expand Down Expand Up @@ -70,17 +65,20 @@ def test_create_replication_slot_1(self):
Validate if replication slot creation SQL commands generated correctly in case no v15 slots exists
"""

def execute_mock(query):
def execute_mock(query, _=None):
print('Mocked execute called')
self.postgres.executed_queries_primary_host.append(query)

# mock cursor with execute method
cursor_mock = MagicMock().return_value
cursor_mock.__enter__.return_value.execute.side_effect = execute_mock
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=0)
# First query -> 0 rows, second query -> 1 row
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[0, 1])
cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}]

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con = MagicMock()
pg_con.__enter__.return_value = pg_con
pg_con.cursor.return_value = cursor_mock

self.postgres.primary_host_conn = pg_con
Expand All @@ -96,17 +94,20 @@ def test_create_replication_slot_2(self):
Validate if replication slot creation SQL commands generated correctly in case a v15 slots exists
"""

def execute_mock(query):
def execute_mock(query, _=None):
print('Mocked execute called')
self.postgres.executed_queries_primary_host.append(query)

# mock cursor with execute method
cursor_mock = MagicMock().return_value
cursor_mock.__enter__.return_value.execute.side_effect = execute_mock
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=1)
# First query -> 1 row, second query -> 1 row
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[1, 1])
cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}]

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con = MagicMock()
pg_con.__enter__.return_value = pg_con
pg_con.cursor.return_value = cursor_mock

self.postgres.primary_host_conn = pg_con
Expand Down

0 comments on commit c9941cf

Please sign in to comment.