diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index c1d46b03a..f1f2ab3cf 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -3,7 +3,10 @@ import logging import re import sys +import textwrap +import time import psycopg2 +import psycopg2.errors import psycopg2.extras from typing import Dict @@ -15,6 +18,12 @@ LOGGER = logging.getLogger(__name__) +def parse_lsn(lsn: str) -> int: + """Parse string LSN format to integer""" + file, index = lsn.split('/') + return (int(file, 16) << 32) + int(index, 16) + + class FastSyncTapPostgres: """ Common functions for fastsync from a Postgres database @@ -25,9 +34,8 @@ def __init__(self, connection_config, tap_type_to_target_type, target_quote=None self.tap_type_to_target_type = tap_type_to_target_type self.target_quote = target_quote self.conn = None - self.curr = None self.primary_host_conn = None - self.primary_host_curr = None + self.version = None @staticmethod def generate_replication_slot_name(dbname, tap_id=None, prefix='pipelinewise'): @@ -184,7 +192,6 @@ def open_connection(self): self.conn = self.get_connection( self.connection_config, prioritize_primary=False ) - self.curr = self.conn.cursor() def close_connection(self): """ @@ -221,7 +228,7 @@ def primary_host_query(self, query, params=None): return [] # pylint: disable=no-member - def create_replication_slot(self): + def create_replication_slot(self) -> int: """ Create replication slot on the primary host @@ -236,24 +243,76 @@ def create_replication_slot(self): format you won't be able to do LOG_BASED replication from the same postgres database by multiple taps. If that the case then you need to drop the old replication slot and full-resync the new taps. + + Returns the LSN at which the replication slot is consistent. """ - try: - slot_name = self.__get_slot_name( - self.primary_host_conn, - self.connection_config['dbname'], - self.connection_config['tap_id'], - ) + slot_name = self.__get_slot_name( + self.primary_host_conn, + self.connection_config['dbname'], + self.connection_config['tap_id'], + ) - # Create the replication host + # Create the replication host + return parse_lsn( self.primary_host_query( f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')" - ) - except Exception as exc: - # ERROR: replication slot already exists SQL state: 42710 - if hasattr(exc, 'pgcode') and exc.pgcode == '42710': - pass + )[0]['lsn'] + ) + + def get_current_lsn(self) -> int: + """Obtain the most recent LSN availiable.""" + # is replica_host set ? + if self.connection_config.get('replica_host'): + # Get latest applied lsn from replica_host + if self.version >= 100000: + result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn') + elif self.version >= 90400: + result = self.query( + 'SELECT pg_last_xlog_replay_location() AS current_lsn' + ) + else: + raise Exception( + 'Logical replication not supported before PostgreSQL 9.4' + ) + else: + # Get current lsn from primary host + if self.version >= 100000: + result = self.query('SELECT pg_current_wal_lsn() AS current_lsn') + elif self.version >= 90400: + result = self.query('SELECT pg_current_xlog_location() AS current_lsn') else: - raise exc + raise Exception( + 'Logical replication not supported before PostgreSQL 9.4' + ) + return parse_lsn(result[0]['current_lsn']) + + def get_confirmed_flush_lsn(self) -> int: + """ + Get the last flushed LSN for the replication slot. + + For Postgres <9.6 this defaults to the restart_lsn as confirmed_flush_lsn + is not availiable. + """ + slot_name = self.__get_slot_name( + self.primary_host_conn, + self.connection_config['dbname'], + self.connection_config['tap_id'], + ) + + res = self.primary_host_query( + textwrap.dedent( + f"""\ + SELECT * + FROM pg_replication_slots + WHERE slot_name = '{slot_name}' + AND plugin = 'wal2json' + AND slot_type = 'logical'""" + ) + )[0] + + # confirmed_flush_lsn was introduced in Postgres 9.6 so fallback to + # restart_lsn if needed. + return parse_lsn(res.get('confirmed_flush_lsn', res['restart_lsn'])) # pylint: disable=too-many-branches,no-member,chained-comparison def fetch_current_log_pos(self): @@ -265,63 +324,52 @@ def fetch_current_log_pos(self): self.primary_host_conn = self.get_connection( self.connection_config, prioritize_primary=True ) - self.primary_host_curr = self.primary_host_conn.cursor() # Make sure PostgreSQL version is 9.4 or higher result = self.primary_host_query( "SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'" ) - version = result[0].get('version') + self.version = result[0].get('version') # Do not allow minor versions with PostgreSQL BUG #15114 - if (version >= 110000) and (version < 110002): + if (self.version >= 110000) and (self.version < 110002): raise Exception('PostgreSQL upgrade required to minor version 11.2') - if (version >= 100000) and (version < 100007): + if (self.version >= 100000) and (self.version < 100007): raise Exception('PostgreSQL upgrade required to minor version 10.7') - if (version >= 90600) and (version < 90612): + if (self.version >= 90600) and (self.version < 90612): raise Exception('PostgreSQL upgrade required to minor version 9.6.12') - if (version >= 90500) and (version < 90516): + if (self.version >= 90500) and (self.version < 90516): raise Exception('PostgreSQL upgrade required to minor version 9.5.16') - if (version >= 90400) and (version < 90421): + if (self.version >= 90400) and (self.version < 90421): raise Exception('PostgreSQL upgrade required to minor version 9.4.21') - if version < 90400: + if self.version < 90400: raise Exception('Logical replication not supported before PostgreSQL 9.4') - # Create replication slot - self.create_replication_slot() + try: + # Create replication slot and obtain the oldest LSN + # that it can deliver. + oldest_lsn = self.create_replication_slot() + except psycopg2.errors.DuplicateObject: + # Replication slot exists already so just get the + # oldest LSN which the slot can deliver. + oldest_lsn = self.get_confirmed_flush_lsn() # Close replication slot dedicated connection self.primary_host_conn.close() - # is replica_host set ? - if self.connection_config.get('replica_host'): - # Get latest applied lsn from replica_host - if version >= 100000: - result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn') - elif version >= 90400: - result = self.query( - 'SELECT pg_last_xlog_replay_location() AS current_lsn' - ) - else: - raise Exception( - 'Logical replication not supported before PostgreSQL 9.4' - ) - else: - # Get current lsn from primary host - if version >= 100000: - result = self.query('SELECT pg_current_wal_lsn() AS current_lsn') - elif version >= 90400: - result = self.query('SELECT pg_current_xlog_location() AS current_lsn') - else: - raise Exception( - 'Logical replication not supported before PostgreSQL 9.4' - ) + current_lsn = self.get_current_lsn() - current_lsn = result[0].get('current_lsn') - file, index = current_lsn.split('/') - lsn = (int(file, 16) << 32) + int(index, 16) + if self.connection_config.get('replica_host'): + # Ensure that the newest LSN availiable on the replica is newer than + # the oldest LSN which is deliverable by the replication slot. + max_time_waited = 60 # seconds + now = time.time() + while oldest_lsn > current_lsn: + if time.time() - now > max_time_waited: + raise RuntimeError('Replica database is lagging too far behind Primary.') + current_lsn = self.get_current_lsn() - return {'lsn': lsn, 'version': 1} + return {'lsn': current_lsn, 'version': 1} # pylint: disable=invalid-name def fetch_current_incremental_key_pos(self, table, replication_key): @@ -485,8 +533,9 @@ def copy_table( split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000) split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20) """ - table_columns = self.get_table_columns(table_name, max_num, date_type) - column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns] + column_safe_sql_values = [ + c.get('safe_sql_value') for c in self.get_table_columns(table_name, max_num, date_type) + ] # If self.get_table_columns returns zero row then table not exist if len(column_safe_sql_values) == 0: @@ -513,4 +562,5 @@ def copy_table( ) with gzip_splitter as split_gzip_files: - self.curr.copy_expert(sql, split_gzip_files, size=131072) + with self.conn.cursor() as cur: + cur.copy_expert(sql, split_gzip_files, size=131072) diff --git a/tests/units/fastsync/commons/test_fastsync_tap_postgres.py b/tests/units/fastsync/commons/test_fastsync_tap_postgres.py index 7dafa9b34..dee154041 100644 --- a/tests/units/fastsync/commons/test_fastsync_tap_postgres.py +++ b/tests/units/fastsync/commons/test_fastsync_tap_postgres.py @@ -21,11 +21,6 @@ def setUp(self) -> None: self.postgres.executed_queries_primary_host = [] self.postgres.executed_queries = [] - def primary_host_query_mock(query, _=None): - self.postgres.executed_queries_primary_host.append(query) - - self.postgres.primary_host_query = primary_host_query_mock - def test_generate_repl_slot_name(self): """Validate if the replication slot name generated correctly""" # Provide only database name @@ -70,17 +65,20 @@ def test_create_replication_slot_1(self): Validate if replication slot creation SQL commands generated correctly in case no v15 slots exists """ - def execute_mock(query): + def execute_mock(query, _=None): print('Mocked execute called') self.postgres.executed_queries_primary_host.append(query) # mock cursor with execute method cursor_mock = MagicMock().return_value cursor_mock.__enter__.return_value.execute.side_effect = execute_mock - type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=0) + # First query -> 0 rows, second query -> 1 row + type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[0, 1]) + cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}] # mock PG connection instance with ability to open cursor - pg_con = Mock() + pg_con = MagicMock() + pg_con.__enter__.return_value = pg_con pg_con.cursor.return_value = cursor_mock self.postgres.primary_host_conn = pg_con @@ -96,17 +94,20 @@ def test_create_replication_slot_2(self): Validate if replication slot creation SQL commands generated correctly in case a v15 slots exists """ - def execute_mock(query): + def execute_mock(query, _=None): print('Mocked execute called') self.postgres.executed_queries_primary_host.append(query) # mock cursor with execute method cursor_mock = MagicMock().return_value cursor_mock.__enter__.return_value.execute.side_effect = execute_mock - type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=1) + # First query -> 1 row, second query -> 1 row + type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[1, 1]) + cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}] # mock PG connection instance with ability to open cursor - pg_con = Mock() + pg_con = MagicMock() + pg_con.__enter__.return_value = pg_con pg_con.cursor.return_value = cursor_mock self.postgres.primary_host_conn = pg_con