diff --git a/postgres-appliance/major_upgrade/inplace_upgrade.py b/postgres-appliance/major_upgrade/inplace_upgrade.py index 2d4ade477..8de6ef18b 100644 --- a/postgres-appliance/major_upgrade/inplace_upgrade.py +++ b/postgres-appliance/major_upgrade/inplace_upgrade.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +"""spilo ``--update-version`` machinery.""" import json import logging import os @@ -21,6 +22,14 @@ def patch_wale_prefix(value, new_version): + """ + Patch the WAL prefix with the new version if the old version is valid and different from the new version. + + :param value: The WAL prefix to patch. + :param new_version: The new version to use. + + :returns: The patched string WAL prefix if the old version is valid and different from the new version, otherwise the original value. + """ from spilo_commons import is_valid_pg_version if '/spilo/' in value and '/wal/' in value: # path crafted in the configure_spilo.py? @@ -31,6 +40,16 @@ def patch_wale_prefix(value, new_version): def update_configs(new_version): + """ + Update the Patroni configuration file by setting the new Postgres version and updating the shared_preload_libraries. + Update the WAL-E/WAL-G envdir files by patching the WAL prefix with the new version if the old version is valid and different from the new version. + Check if the extwlist.extensions parameter is set in the Patroni configuration file and appends the new extensions to it. + Write the update Patroni configuration file and WAL-E/WAL-G envdir files. + + :param new_version: The new Postgres version to be upgrade. + + :returns: The path to the WAL-E/WAL-G envdir files if they exist, None otherwise. + """ from spilo_commons import append_extensions, get_bin_dir, get_patroni_config, write_file, write_patroni_config config = get_patroni_config() @@ -75,6 +94,10 @@ def update_configs(new_version): def kill_patroni(): + """ + Restart the Patroni process. + This function finds the Patroni process and kills it. If the process is not found, nothing happens. + """ logger.info('Restarting patroni') patroni = next(iter(filter(lambda p: p.info['name'] == 'patroni', psutil.process_iter(['name']))), None) if patroni: @@ -82,8 +105,29 @@ def kill_patroni(): class InplaceUpgrade(object): + """ + A class representing an in-place upgrade of a PostgreSQL cluster. + + :ivar config: a dictionary containing the configuration parameters for the upgrade process. + :ivar postgresql: a PostgresqlUpgrade object representing the PostgreSQL instance being upgraded. + :ivar cluster_version: the version of the PostgreSQL cluster being upgraded. + :ivar desired_version: the version of PostgreSQL to which the cluster is being upgraded. + :ivar upgrade_required: a flag indicating whether an upgrade is required. + :ivar paused: a flag indicating whether the upgrade process is currently paused. + :ivar new_data_created: a flag indicating whether new data has been created during the upgrade process. + :ivar upgrade_complete: a flag indicating whether the upgrade process has been completed. + :ivar rsyncd_configs_created: a flag indicating whether rsyncd configurations have been created during the upgrade process. + :ivar rsyncd_started: a flag indicating whether rsyncd has been started during the upgrade process. + :ivar dcs: a distributed configuration store object. + :ivar request: a PatroniRequest object representing a request to the PostgreSQL cluster. + """ def __init__(self, config): + """ + Initialize the InplaceUpgrade object. + + :param config: a dictionary containing the configuration parameters for the upgrade process. + """ from patroni.dcs import get_dcs from patroni.request import PatroniRequest from pg_upgrade import PostgresqlUpgrade @@ -109,6 +153,14 @@ def __init__(self, config): @staticmethod def get_desired_version(): + """ + Return the desired version of the PostgreSQL binary to be used for the upgrade. + First attempts to retrieve the binary directory from the SPILO_CONFIGURATION environment variable. + If that fails, it retrieves the binary directory from the PGVERSION environment variable. + Finally, it returns the version of the PostgreSQL binary located in the binary directory. + + :returns: str: The version of the PostgreSQL binary to be used for the upgrade. + """ from spilo_commons import get_bin_dir, get_binary_version try: @@ -123,6 +175,13 @@ def get_desired_version(): return get_binary_version(bin_dir) def check_patroni_api(self, member): + """ + Check the Patroni API for a given member. + + :param member: The member to check the API for. + + :returns: True if the API request was successful and returned a 200 status code, False otherwise. + """ try: response = self.request(member, timeout=2, retries=0) return response.status == 200 @@ -130,6 +189,16 @@ def check_patroni_api(self, member): return logger.error('API request to %s name failed: %r', member.name, e) def toggle_pause(self, paused): + """ + Responsible for enabling or disabling maintenance mode. + If the cluster is currently paused, this method will attempt to disable maintenance mode and + resume normal operation. If the cluster is not currently paused, this method will attempt to + enable maintenance mode and pause normal operation. + + :param paused: A boolean value indicating whether to enable or disable maintenance mode. + + :returns: bool: True if the maintenance mode was successfully enabled or disabled, False otherwise. + """ from patroni.config import get_global_config from patroni.utils import polling_loop @@ -158,6 +227,13 @@ def toggle_pause(self, paused): return logger.error("%s members didn't recognized pause state after %s seconds", remaining, ttl) def resume_cluster(self): + """ + Resume the cluster by disabling maintenance mode. + If the cluster is currently paused, this method will attempt to disable + maintenance mode and resume normal operation. + + :raises: Exception: If an error occurs while resuming the cluster. + """ if self.paused: try: logger.info('Disabling maintenance mode') @@ -167,8 +243,12 @@ def resume_cluster(self): def ensure_replicas_state(self, cluster): """ - This method checks the satatus of all replicas and also tries to open connections + Check the status of all replicas and also tries to open connections to all of them and puts into the `self.replica_connections` dict for a future usage. + + :param cluster: cluster object representing the PostgreSQL cluster. + + :returns: bool: True if all replicas are streaming from the primary and are healthy, False otherwise. """ self.replica_connections = {} streaming = {a: l for a, l in self.postgresql.query( @@ -177,6 +257,16 @@ def ensure_replicas_state(self, cluster): .format(self.postgresql.wal_name, self.postgresql.lsn_name))} def ensure_replica_state(member): + """ + Check the state of a sinble replica and opens a connection to it. + Check if the replication lag on the replica is too high (more than 16 MB). + If the replica is streaming from the primary and is healthy, it opens a connection to it and + puts it into the `self.replica_connections` dictionary for a future usage. + + :param member: A member object representing the replica. + + :returns: bool: True if the replica is streaming from the primary and is healthy, False otherwise. + """ ip = member.conn_kwargs().get('host') lag = streaming.get(ip) if lag is None: @@ -203,6 +293,13 @@ def ensure_replica_state(member): return all(ensure_replica_state(member) for member in cluster.members if member.name != self.postgresql.name) def sanity_checks(self, cluster): + """ + Perform sanity checks before triggering an upgrade. + + :param cluster: cluster object representing the PostgreSQL cluster. + + :raises: Exception: If any of the sanity checks fail. + """ from patroni.config import get_global_config if not cluster.initialize: @@ -221,6 +318,13 @@ def sanity_checks(self, cluster): return self.ensure_replicas_state(cluster) def remove_initialize_key(self): + """ + Remove the initialize key from the cluster. + Check if the cluster has an initialize key set, and if so, attempts to remove it by canceling the + initialization process. It uses a polling loop to check the cluster's state multiple times before giving up. + + :returns: bool: True if the initialize key was successfully removed, False otherwise. + """ from patroni.utils import polling_loop for _ in polling_loop(10): @@ -233,6 +337,16 @@ def remove_initialize_key(self): logger.error('Failed to remove initialize key') def wait_for_replicas(self, checkpoint_lsn): + """ + Ensure that all replica nodes of a PostgreSQL database have caught up with the primary node to a specific + Log Sequence Number (LSN). + Importing the polling_loop function from the patroni.utils module. This function is used to create a + loop that polls for a certain condition. + + :param checkpoint_lsn: Sequence Number (LSN) up to which the replica nodes need to catch up + + :returns: bool: True if all replicas have caught up to the checkpoint_lsn. False otherwise. + """ from patroni.utils import polling_loop logger.info('Waiting for replica nodes to catch up with primary') @@ -266,6 +380,18 @@ def wait_for_replicas(self, checkpoint_lsn): logger.error('Node %s did not catched up. Lag=%s', name, checkpoint_lsn - lsn) def create_rsyncd_configs(self): + """ + Responsible for creating configuration files for rsyncd, a daemon for rsync, which is a tool used + for copying and synchronizing files across systems. + + :param rsyncd_configs_created: A boolean attribute set to True indicating that the rsyncd configurations have been created. + :param rsyncd_conf_dir: A string representing the directory for the rsyncd configuration files (/run/rsync). + :param rsyncd_feedback_dir: A string representing a subdirectory for feedback (/run/rsync/feedback). + :param rsyncd_conf: A string representing the path for the main rsyncd configuration file (rsyncd.conf). + :param secrets_file: A string representing the path for the secrets file (rsyncd.secrets), which is located in the rsyncd configuration directory. + :param auth_users: A string representing a comma-separated list of all the keys in the replica_connections dictionary. + :param replica_ips: A string representing a comma-separated list of the first element of all the values in the replica_connections dictionary. + """ self.rsyncd_configs_created = True self.rsyncd_conf_dir = '/run/rsync' self.rsyncd_feedback_dir = os.path.join(self.rsyncd_conf_dir, 'feedback') @@ -301,11 +427,24 @@ def create_rsyncd_configs(self): os.chmod(secrets_file, 0o600) def start_rsyncd(self): + """ + Start the rsync daemon for file synchronization. + Create the rsync daemon configuration file and starts the rsync daemon process. + The rsync daemon is started with the specified configuration file and runs in the foreground. + """ self.create_rsyncd_configs() self.rsyncd = subprocess.Popen(['rsync', '--daemon', '--no-detach', '--config=' + self.rsyncd_conf]) self.rsyncd_started = True def stop_rsyncd(self): + """ + Stop the rsync daemon and removes the rsync configuration directory. + If the rsync daemon is running, it will be killed. If the rsync configuration + directory exists, it will be removed. + + :raises: OSError: If there is an error killing the rsync daemon or removing the + rsync configuration directory. + """ if self.rsyncd_started: logger.info('Stopping rsyncd') try: @@ -322,6 +461,13 @@ def stop_rsyncd(self): logger.error('Failed to remove %s: %r', self.rsync_conf_dir, e) def checkpoint(self, member): + """ + Perform a checkpoint on a specific member. + + :param member: A tuple containing the name and the database connection of the member. + + :returns: A tuple containing the name of the member and a boolean indicating whether the checkpoint was successful. + """ name, (_, cur) = member try: cur.execute('CHECKPOINT') @@ -331,6 +477,15 @@ def checkpoint(self, member): return name, False def rsync_replicas(self, primary_ip): + """ + Responsible for synchronizing replicas using rsync, a tool used for copying and synchronizing files across systems. + + :param primary_ip: A string representing the IP address of the primary node. + :param ret: A boolean attribute set to True indicating that the rsync was successful. + :param status: A dictionary containing the status of the rsync operation. + + :returns: bool: True if the rsync operation was successful, False otherwise. + """ from patroni.utils import polling_loop logger.info('Notifying replicas %s to start rsync', ','.join(self.replica_connections.keys())) @@ -388,6 +543,13 @@ def rsync_replicas(self, primary_ip): return ret def wait_replica_restart(self, member): + """ + Wait for the replica to restart after a major upgrade. + + :param member: The replica member object. + + :returns: str: The name of the replica member if it has restarted successfully, None otherwise. + """ from patroni.utils import polling_loop for _ in polling_loop(10): @@ -403,6 +565,13 @@ def wait_replica_restart(self, member): logger.error('Patroni on replica %s was not restarted in 10 seconds', member.name) def wait_replicas_restart(self, cluster): + """ + Wait for the restart of patroni on replicas. + + :param cluster: The cluster object representing the Postgres cluster. + + :returns: bool: True if all replicas successfully restarted, False otherwise. + """ members = [member for member in cluster.members if member.name in self.replica_connections] logger.info('Waiting for restart of patroni on replicas %s', ', '.join(m.name for m in members)) pool = ThreadPool(len(members)) @@ -413,6 +582,13 @@ def wait_replicas_restart(self, cluster): return all(results) def reset_custom_statistics_target(self): + """ + Reset the non-default statistics target before performing analyze. + Retrieve the list of databases and their corresponding tables and columns + that have a custom statistics target set. It then resets the statistics target to -1 + for each column, effectively disabling custom statistics for those columns. + Also require the `patroni.postgresql.connection` module. + """ from patroni.postgresql.connection import get_connection_cursor logger.info('Resetting non-default statistics target before analyze') @@ -432,6 +608,12 @@ def reset_custom_statistics_target(self): self._statistics[d[0]][table][column] = target def restore_custom_statistics_target(self): + """ + Restore the default statistics targets after an upgrade. + Connect to each database specified in the `_statistics` attribute and executes + an ALTER TABLE statement for each table and column specified in the `_statistics` attribute. + The ALTER TABLE statement set the statistics target for the column to the value specified. + """ from patroni.postgresql.connection import get_connection_cursor if not self._statistics: @@ -453,6 +635,14 @@ def restore_custom_statistics_target(self): logger.error("Failed to execute '%s'", query) def reanalyze(self): + """ + Reanalyze the tables in the PostgreSQL database using the ANALYZE command. + Iterate over the statistics stored in the `_statistics` attribute and executes the ANALYZE command + for each table in each database. It uses the `get_connection_cursor` function from the `patroni.postgresql.connection` + module to establish a connection to the local PostgreSQL instance. + + :raises: Exception: If there is an error executing the ANALYZE command for any table. + """ from patroni.postgresql.connection import get_connection_cursor if not self._statistics: @@ -472,6 +662,14 @@ def reanalyze(self): logger.error("Failed to execute '%s'", query) def analyze(self): + """ + Analyze the database by resetting and restoring custom statistics targets. + First resets the custom statistics targets, then performs a database analysis, + and finally restores the custom statistics targets. If any error occurs during the process, + it logs the error message. + + :raises: Exception: If an error occurs during the reset or restore of custom statistics targets. + """ try: self.reset_custom_statistics_target() except Exception as e: @@ -483,6 +681,18 @@ def analyze(self): logger.error('Failed to restore custom statistics targets: %r', e) def do_upgrade(self): + """ + Responsible for upgrading a PostgreSQL database cluster. + Perform the following steps, checks if the upgrade, checks if the PostgreSQL instance is running and there is a leader, + checks if the cluster is ready to be upgraded, prepares the new PGDATA directory, drops possibly incompatible extensions, + run a pg_upgrade check, drops possibly incompatible objects, enable maintenance mode, stops the PostgreSQL instance, + starts rsyncd, waits for replicas to catch up, run a CHECKPOINT on replicas, execute the pg_upgrade, switches PGDATA directories, + removes the initialize key, kills Patroni, waits for Patroni to restart, starts the PostgreSQL instance, + updates the configuration files, performs a CHECKPOINT on replicas, rsyncs replicas, wait for replicas to restart, + run a database analyze, updates the extensions, run a post-upgrade cleanup, run a backup and execute a post-cleanup. + + :returns: bool: True if the upgrade was successful, False otherwise. + """ from patroni.utils import polling_loop if not self.upgrade_required: @@ -650,6 +860,10 @@ def do_upgrade(self): return ret def post_cleanup(self): + """ + Perform post-cleanup tasks after the upgrade process. + Stop the rsync daemon, resumes the cluster, and cleans up the new PGDATA directory if it was created. + """ self.stop_rsyncd() self.resume_cluster() @@ -660,6 +874,14 @@ def post_cleanup(self): logger.error('Failed to remove new PGDATA %r', e) def try_upgrade(self, replica_count): + """ + Tries to perform the upgrade by setting the replica count and calling the do_upgrade method. + Finally, performs post-cleanup operations. + + :param replica_count: The number of replicas to set before performing the upgrade. + + :returns: The result of the do_upgrade method. + """ try: self.replica_count = replica_count return self.do_upgrade() @@ -667,6 +889,11 @@ def try_upgrade(self, replica_count): self.post_cleanup() def start_backup(self, envdir): + """ + Initiate a new backup by calling the postgres_backup.sh script with the specified environment directory and data directory. + + :param envdir: The path string to the environment directory. + """ logger.info('Initiating a new backup...') if not os.fork(): subprocess.call(['nohup', 'envdir', envdir, '/scripts/postgres_backup.sh', self.postgresql.data_dir], @@ -675,6 +902,20 @@ def start_backup(self, envdir): # this function will be running in a clean environment, therefore we can't rely on DCS connection def rsync_replica(config, desired_version, primary_ip, pid): + """ + Responsible for synch of the replica and primary during the upgrade process. + Import the PostgresqlUpgrade class from the pg_upgrade module and the polling_loop function from the patroni.utils module. + Check if the PostgreSQL version in replica matches the desired version, stops PostgreSQL instance and switches the PGDATA directory. + Update the configuration files and restarts Patroni, remove the recovery.conf file and restarts Patroni again. + Return the result of the cleanup_old_pgdata method. + + :param config: A Config object representing the Patroni configuration. + :param desired_version: A string representing the desired version of the PostgreSQL binary to be used for the upgrade. + :param primary_ip: A string representing the IP address of the primary node. + :param pid: An integer representing the process ID of the PostgreSQL backend process. + + :returns: int: 0 if the rsync was successful, 1 otherwise. + """ from pg_upgrade import PostgresqlUpgrade from patroni.utils import polling_loop @@ -760,6 +1001,15 @@ def rsync_replica(config, desired_version, primary_ip, pid): def main(): + """ + Starting point of the script. + Parse command line arguments and performs either an rsync_replica operation or an inplace upgrade. + + :returns: + 0 if the operation is successful, + 1 if the operation fails, + 2 if the command line arguments are invalid. + """ from patroni.config import Config from spilo_commons import PATRONI_CONFIG_FILE diff --git a/postgres-appliance/major_upgrade/pg_upgrade.py b/postgres-appliance/major_upgrade/pg_upgrade.py index dee894b99..da47fee4d 100644 --- a/postgres-appliance/major_upgrade/pg_upgrade.py +++ b/postgres-appliance/major_upgrade/pg_upgrade.py @@ -10,10 +10,29 @@ class _PostgresqlUpgrade(Postgresql): + """ + Representing the PostgreSQL upgrade process. + Extend the `Postgresql` class and provides methods for adjusting shared_preload_libraries, + starting the old cluster, dropping incompatible extensions and objects, updating extensions, + cleaning up old and new pgdata directories, switching pgdata directories, performing pg_upgrade, + preparing new pgdata, and analyzing the database. + + :ivar _old_bin_dir: The old PostgreSQL binary directory. + :ivar _old_config_values: A dictionary of old configuration values. + :ivar _old_data_dir: The old PostgreSQL data directory. + :ivar _new_data_dir: The new PostgreSQL data directory. + :ivar _version_file: The PostgreSQL version file. + :ivar _INCOMPATIBLE_EXTENSIONS: A tuple of incompatible extensions. + """ _INCOMPATIBLE_EXTENSIONS = ('amcheck_next', 'pg_repack',) def adjust_shared_preload_libraries(self, version): + """ + Adjust the shared_preload_libraries parameter based on the given version. + + :param version: The string version of PostgreSQL being upgraded to. + """ from spilo_commons import adjust_extensions shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') @@ -24,17 +43,37 @@ def adjust_shared_preload_libraries(self, version): adjust_extensions(shared_preload_libraries, version) def no_bg_mon(self): + """ + Remove 'bg_mon' from the 'shared_preload_libraries' configuration parameter. + Check if the 'shared_preload_libraries' configuration parameter is set, and if it is, + remove the 'bg_mon' library from the list of libraries. + """ shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries') if shared_preload_libraries: tmp = filter(lambda a: a != "bg_mon", map(lambda a: a.strip(), shared_preload_libraries.split(","))) self.config.get('parameters')['shared_preload_libraries'] = ",".join(tmp) def restore_shared_preload_libraries(self): + """ + Restore the value of shared_preload_libraries to its original value. + If the _old_shared_preload_libraries attribute is set, it restores the value of shared_preload_libraries + to the stored value in the _old_shared_preload_libraries attribute. + + :returns: bool: True if the shared_preload_libraries value was successfully restored, False otherwise. + """ if getattr(self, '_old_shared_preload_libraries'): self.config.get('parameters')['shared_preload_libraries'] = self._old_shared_preload_libraries return True def start_old_cluster(self, config, version): + """ + Start the old cluster with the specified configuration and version. + + :param config (dict): The configuration for the old cluster. + :param version (float): The version of the old cluster. + + :returns: bool: True if the old cluster was successfully started, False otherwise. + """ self.set_bin_dir(version) # make sure we don't archive wals from the old version @@ -47,10 +86,20 @@ def start_old_cluster(self, config, version): return self.bootstrap.bootstrap(config) def get_cluster_version(self): + """ + Get the version of the cluster. + + Returns: str: The version of the cluster. + """ with open(self._version_file) as f: return f.read().strip() def set_bin_dir(self, version): + """ + Set the binary directory for the specified version. + + :param version: The string version of PostgreSQL. + """ from spilo_commons import get_bin_dir self._old_bin_dir = self._bin_dir @@ -58,15 +107,33 @@ def set_bin_dir(self, version): @property def local_conn_kwargs(self): + """ + Return the connection kwargs for the local database. + The returned kwargs include options for synchronous_commit, statement_timeout, and search_path. + The connect_timeout option is removed from the kwargs. + + :returns: dict: The connection kwargs for the local database. + """ conn_kwargs = self.config.local_connect_kwargs conn_kwargs['options'] = '-c synchronous_commit=local -c statement_timeout=0 -c search_path=' conn_kwargs.pop('connect_timeout', None) return conn_kwargs def _get_all_databases(self): + """ + Retrieve a list of all databases in the PostgreSQL cluster. + + :returns: list: A list of database names. + """ return [d[0] for d in self.query('SELECT datname FROM pg_catalog.pg_database WHERE datallowconn')] def drop_possibly_incompatible_extensions(self): + """ + Drop extensions from the cluster which could be incompatible. + Iterate over all databases in the cluster and drops the extensions + specified in the `_INCOMPATIBLE_EXTENSIONS` list if they exist. + Use the `patroni.postgresql.connection.get_connection_cursor` function to establish a connection to each database. + """ from patroni.postgresql.connection import get_connection_cursor logger.info('Dropping extensions from the cluster which could be incompatible') @@ -80,6 +147,11 @@ def drop_possibly_incompatible_extensions(self): cur.execute("DROP EXTENSION IF EXISTS {0}".format(ext)) def drop_possibly_incompatible_objects(self): + """ + Drop objects from the cluster which could be incompatible. + Iterate over all databases in the cluster and drops the objects from `_INCOMPATIBLE_EXTENSIONS`. + Use the `patroni.postgresql.connection.get_connection_cursor` function to establish a connection to each database. + """ from patroni.postgresql.connection import get_connection_cursor logger.info('Dropping objects from the cluster which could be incompatible') @@ -110,6 +182,13 @@ def drop_possibly_incompatible_objects(self): logger.error('Failed: %r', e) def update_extensions(self): + """ + Update the extensions in the PostgreSQL databases. + Connect to each database and executes the 'ALTER EXTENSION UPDATE' command + for each extension found in the database. + + :raises: Any exception raised during the execution of the 'ALTER EXTENSION UPDATE' command. + """ from patroni.postgresql.connection import get_connection_cursor conn_kwargs = self.local_conn_kwargs @@ -128,20 +207,40 @@ def update_extensions(self): @staticmethod def remove_new_data(d): + """ + Remove the new data directory. + + :param d: The string directory path to be removed. + """ if d.endswith('_new') and os.path.isdir(d): shutil.rmtree(d) def cleanup_new_pgdata(self): + """ + Clean up the new PostgreSQL data directory. + If the `_new_data_dir` attribute is set, this method removes the new data directory. + """ if getattr(self, '_new_data_dir', None): self.remove_new_data(self._new_data_dir) def cleanup_old_pgdata(self): + """ + Remove the old data directory if it exists. + + :returns: bool: True if the old data directory was successfully removed, False otherwise. + """ if os.path.exists(self._old_data_dir): logger.info('Removing %s', self._old_data_dir) shutil.rmtree(self._old_data_dir) return True def switch_pgdata(self): + """ + Switche the PostgreSQL data directory by renaming the current data directory to a old directory, + and renaming the new data directory to the current data directory. + + :returns: bool: True if the PostgreSQL data directory was successfully switched, False otherwise. + """ self._old_data_dir = self._data_dir + '_old' self.cleanup_old_pgdata() os.rename(self._data_dir, self._old_data_dir) @@ -151,6 +250,10 @@ def switch_pgdata(self): return True def switch_back_pgdata(self): + """ + Switche back to the original data directory by renaming the new data directory to the original data directory name. + If the original data directory exists, it is renamed to a backup name before renaming the new data directory. + """ if os.path.exists(self._data_dir): self._new_data_dir = self._data_dir + '_new' self.cleanup_new_pgdata() @@ -158,6 +261,16 @@ def switch_back_pgdata(self): os.rename(self._old_data_dir, self._data_dir) def pg_upgrade(self, check=False): + """ + Perform the pg_upgrade process using the `pg_upgrade` command to perform the upgrade process. + The `psutil.cpu_count` set the number of CPUs to use in the upgrade, `shutil.rmtree` remove the upgrade directory, + `os.makedirs` creates the upgrade directory, `os.chdir` changes the current directory to the upgrade directory, + `subprocess.call` execute the `pg_upgrade` command. + + :param check: A boolean value indicating whether to perform a check or not. + + :returns: bool: True if the pg_upgrade process was successful, False otherwise. + """ upgrade_dir = self._data_dir + '_upgrade' if os.path.exists(upgrade_dir) and os.path.isdir(upgrade_dir): shutil.rmtree(upgrade_dir) @@ -187,6 +300,17 @@ def pg_upgrade(self, check=False): return True def prepare_new_pgdata(self, version): + """ + Prepare a new data directory for a PostgreSQL database cluster upgrade. + Set the initdb config, which is used to initialize a new PostgreSQL database cluster. + Add the data-checksums option to the initdb. + Prepare the paths for the old and new data directories. The old data directory is the current data directory, + and the new data directory is where the new database cluster will be initialized. + + :param version: The string version of PostgreSQL. + + :returns: bool: True if the new data directory was successfully prepared, False otherwise. + """ from spilo_commons import append_extensions locale = self.query('SHOW lc_collate').fetchone()[0] @@ -247,10 +371,25 @@ def prepare_new_pgdata(self, version): return True def do_upgrade(self): + """ + Perform the upgrade process for the PostgreSQL appliance. + + :returns: bool: True if the upgrade process is successful, False otherwise. + """ return self.pg_upgrade() and self.restore_shared_preload_libraries()\ and self.switch_pgdata() and self.cleanup_old_pgdata() def analyze(self, in_stages=False): + """ + Rebuild the statistics for the PostgreSQL cluster. + Use the `patroni.postgresql.connection.get_connection_cursor` function to establish a connection to each database. + If the `in_stages` parameter is True, the `vacuumdb` command is executed with the `--analyze-in-stages` option. + Otherwise, the `vacuumdb` command is executed with the `-Z` and `-j` options. + + :param in_stages: A boolean value indicating whether to perform the analyze in stages or not. + + :returns: bool: True if the analyze process is successful, False otherwise. + """ vacuumdb_args = ['--analyze-in-stages'] if in_stages else [] logger.info('Rebuilding statistics (vacuumdb%s)', (' ' + vacuumdb_args[0] if in_stages else '')) if 'username' in self.config.superuser: @@ -279,6 +418,13 @@ def analyze(self, in_stages=False): def PostgresqlUpgrade(config): + """ + Upgrade the PostgreSQL database using the provided configuration. + + :param config: A dictionary containing the PostgreSQL configuration. + + :returns: _PostgresqlUpgrade: An instance of the _PostgresqlUpgrade class. + """ config['postgresql'].update({'callbacks': {}, 'pg_ctl_timeout': 3600*24*7}) # avoid unnecessary interactions with PGDATA and postgres