From a3a815944ca50bf59843dfb5fd12cf5962181f6e Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 20 Jan 2023 21:06:46 +0200 Subject: [PATCH 1/8] Add retries to SCPMover.copy() --- trollmoves/movers.py | 56 ++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index e3219e52..41842368 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -357,6 +357,43 @@ def move(self): def copy(self): """Upload the file.""" + from scp import SCPError + + retries = 3 + success = False + while retries > 0: + retries -= 1 + + try: + scp = self._get_scp_client() + scp.put(self.origin, self.destination.path) + success = True + break + except OSError as osex: + if osex.errno == 2: + LOGGER.error("No such file or directory. File not transfered: " + "%s. Original error message: %s", + self.origin, str(osex)) + else: + LOGGER.error("OSError in scp.put: %s", str(osex)) + raise + except SCPError as err: + LOGGER.error("SCP failed: %s", str(err)) + except Exception as err: + LOGGER.error("Something went wrong with scp: %s", str(err)) + LOGGER.error("Exception name %s", type(err).__name__) + LOGGER.error("Exception args %s", str(err.args)) + raise + finally: + scp.close() + + if success: + break + + LOGGER.error("Retrying transfer in 2 seconds") + time.sleep(2) + + def _get_scp_client(self): from scp import SCPClient ssh_connection = self.get_connection(self.destination.hostname, @@ -369,24 +406,7 @@ def copy(self): LOGGER.error("Failed to initiate SCPClient: %s", str(err)) ssh_connection.close() raise - - try: - scp.put(self.origin, self.destination.path) - except OSError as osex: - if osex.errno == 2: - LOGGER.error("No such file or directory. File not transfered: " - "%s. Original error message: %s", - self.origin, str(osex)) - else: - LOGGER.error("OSError in scp.put: %s", str(osex)) - raise - except Exception as err: - LOGGER.error("Something went wrong with scp: %s", str(err)) - LOGGER.error("Exception name %s", type(err).__name__) - LOGGER.error("Exception args %s", str(err.args)) - raise - finally: - scp.close() + return scp class SftpMover(Mover): From 24f037e95bfe9bb695c618d146da2441e1cc3785 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Mon, 23 Jan 2023 16:21:42 +0200 Subject: [PATCH 2/8] Refactor scp retries --- trollmoves/movers.py | 123 +++++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 57 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index 41842368..f1808696 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -297,38 +297,52 @@ class ScpMover(Mover): def open_connection(self): """Open a connection.""" + retries = 3 + + ssh_connection = self._run_with_retries(self._open_connection, num_retries=retries) + if ssh_connection is None: + raise IOError("Failed to ssh connect after 3 attempts") + return ssh_connection + + def _open_connection(self): from paramiko import SSHClient, SSHException - retries = 3 ssh_key_filename = self.attrs.get("ssh_key_filename", None) timeout = self.attrs.get("ssh_connection_timeout", None) - while retries > 0: - retries -= 1 - try: - ssh_connection = SSHClient() - ssh_connection.load_system_host_keys() - ssh_connection.connect(self.destination.hostname, - username=self._dest_username, - port=self.destination.port or 22, - key_filename=ssh_key_filename, - timeout=timeout) - LOGGER.debug("Successfully connected to %s:%s as %s", - self.destination.hostname, - self.destination.port or 22, - self._dest_username) - except SSHException as sshe: - LOGGER.exception("Failed to init SSHClient: %s", str(sshe)) - except socket.timeout as sto: - LOGGER.exception("SSH connection timed out: %s", str(sto)) - except Exception as err: - LOGGER.exception("Unknown exception at init SSHClient: %s", str(err)) - else: - return ssh_connection + try: + ssh_connection = SSHClient() + ssh_connection.load_system_host_keys() + ssh_connection.connect(self.destination.hostname, + username=self._dest_username, + port=self.destination.port or 22, + key_filename=ssh_key_filename, + timeout=timeout) + LOGGER.debug("Successfully connected to %s:%s as %s", + self.destination.hostname, + self.destination.port or 22, + self._dest_username) + except SSHException as sshe: + LOGGER.exception("Failed to init SSHClient: %s", str(sshe)) + except socket.timeout as sto: + LOGGER.exception("SSH connection timed out: %s", str(sto)) + except Exception as err: + LOGGER.exception("Unknown exception at init SSHClient: %s", str(err)) + else: + return ssh_connection - ssh_connection.close() - time.sleep(2) - LOGGER.debug("Retrying ssh connect ...") - raise IOError("Failed to ssh connect after 3 attempts") + ssh_connection.close() + return None + + def _run_with_retries(self, func, num_retries=3): + res = None + for _ in range(num_retries): + res = func() + if res: + break + time.sleep(2) + LOGGER.debug(f"Retrying {self.__class__.__name__}.{func.__name__}() ...") + + return res @staticmethod def is_connected(connection): @@ -357,41 +371,36 @@ def move(self): def copy(self): """Upload the file.""" + retries = 3 + _ = self._run_with_retries(self._copy, num_retries=retries) + + def _copy(self): from scp import SCPError - retries = 3 success = False - while retries > 0: - retries -= 1 - - try: - scp = self._get_scp_client() - scp.put(self.origin, self.destination.path) - success = True - break - except OSError as osex: - if osex.errno == 2: - LOGGER.error("No such file or directory. File not transfered: " - "%s. Original error message: %s", - self.origin, str(osex)) - else: - LOGGER.error("OSError in scp.put: %s", str(osex)) - raise - except SCPError as err: - LOGGER.error("SCP failed: %s", str(err)) - except Exception as err: - LOGGER.error("Something went wrong with scp: %s", str(err)) - LOGGER.error("Exception name %s", type(err).__name__) - LOGGER.error("Exception args %s", str(err.args)) + try: + scp = self._get_scp_client() + scp.put(self.origin, self.destination.path) + success = True + except OSError as osex: + if osex.errno == 2: + LOGGER.error("No such file or directory. File not transfered: " + "%s. Original error message: %s", + self.origin, str(osex)) + else: + LOGGER.error("OSError in scp.put: %s", str(osex)) raise - finally: - scp.close() - - if success: - break + except SCPError as err: + LOGGER.error("SCP failed: %s", str(err)) + except Exception as err: + LOGGER.error("Something went wrong with scp: %s", str(err)) + LOGGER.error("Exception name %s", type(err).__name__) + LOGGER.error("Exception args %s", str(err.args)) + raise + finally: + scp.close() - LOGGER.error("Retrying transfer in 2 seconds") - time.sleep(2) + return success def _get_scp_client(self): from scp import SCPClient From b761352f0e0233300951cfc79656fcb95bed3199 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 24 Jan 2023 08:57:47 +0200 Subject: [PATCH 3/8] Add textual description what is being retried --- trollmoves/movers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index f1808696..8a79cd33 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -299,7 +299,7 @@ def open_connection(self): """Open a connection.""" retries = 3 - ssh_connection = self._run_with_retries(self._open_connection, num_retries=retries) + ssh_connection = self._run_with_retries(self._open_connection, "ssh connect", num_retries=retries) if ssh_connection is None: raise IOError("Failed to ssh connect after 3 attempts") return ssh_connection @@ -333,14 +333,14 @@ def _open_connection(self): ssh_connection.close() return None - def _run_with_retries(self, func, num_retries=3): + def _run_with_retries(self, func, name, num_retries=3): res = None for _ in range(num_retries): res = func() if res: break time.sleep(2) - LOGGER.debug(f"Retrying {self.__class__.__name__}.{func.__name__}() ...") + LOGGER.debug(f"Retrying {name} ...") return res @@ -372,7 +372,7 @@ def move(self): def copy(self): """Upload the file.""" retries = 3 - _ = self._run_with_retries(self._copy, num_retries=retries) + _ = self._run_with_retries(self._copy, "SCP copy", num_retries=retries) def _copy(self): from scp import SCPError From 2ac0452f00c01103590b3370cce633250c156b96 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 24 Jan 2023 09:05:12 +0200 Subject: [PATCH 4/8] Refactor creation of ssh connection --- trollmoves/movers.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index 8a79cd33..372a8ea9 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -305,22 +305,10 @@ def open_connection(self): return ssh_connection def _open_connection(self): - from paramiko import SSHClient, SSHException + from paramiko import SSHException - ssh_key_filename = self.attrs.get("ssh_key_filename", None) - timeout = self.attrs.get("ssh_connection_timeout", None) try: - ssh_connection = SSHClient() - ssh_connection.load_system_host_keys() - ssh_connection.connect(self.destination.hostname, - username=self._dest_username, - port=self.destination.port or 22, - key_filename=ssh_key_filename, - timeout=timeout) - LOGGER.debug("Successfully connected to %s:%s as %s", - self.destination.hostname, - self.destination.port or 22, - self._dest_username) + ssh_connection = self._create_ssh_connection() except SSHException as sshe: LOGGER.exception("Failed to init SSHClient: %s", str(sshe)) except socket.timeout as sto: @@ -330,9 +318,27 @@ def _open_connection(self): else: return ssh_connection - ssh_connection.close() return None + def _create_ssh_connection(self): + from paramiko import SSHClient + + ssh_key_filename = self.attrs.get("ssh_key_filename", None) + timeout = self.attrs.get("ssh_connection_timeout", None) + + ssh_connection = SSHClient() + ssh_connection.load_system_host_keys() + ssh_connection.connect(self.destination.hostname, + username=self._dest_username, + port=self.destination.port or 22, + key_filename=ssh_key_filename, + timeout=timeout) + LOGGER.debug("Successfully connected to %s:%s as %s", + self.destination.hostname, + self.destination.port or 22, + self._dest_username) + return ssh_connection + def _run_with_retries(self, func, name, num_retries=3): res = None for _ in range(num_retries): From 9de9f1efc8349b4825517eba5f9c9b3fe4f0f782 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 24 Jan 2023 09:08:34 +0200 Subject: [PATCH 5/8] Remove password from log messages --- trollmoves/movers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index 372a8ea9..d77d7b81 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -115,8 +115,7 @@ def move(self): def get_connection(self, hostname, port, username=None): """Get the connection.""" with self.active_connection_lock: - LOGGER.debug("Destination username and passwd: %s %s", - self._dest_username, self._dest_password) + LOGGER.debug("Destination username: %s", self._dest_username) LOGGER.debug('Getting connection to %s@%s:%s', username, hostname, port) try: From 432dc4932162b8866068f5bf4b433f7e5becd915 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 24 Jan 2023 13:41:36 +0200 Subject: [PATCH 6/8] Make SSH retries configurable --- trollmoves/movers.py | 10 ++++------ trollmoves/server.py | 8 ++++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index d77d7b81..bbbe2f19 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -296,9 +296,7 @@ class ScpMover(Mover): def open_connection(self): """Open a connection.""" - retries = 3 - - ssh_connection = self._run_with_retries(self._open_connection, "ssh connect", num_retries=retries) + ssh_connection = self._run_with_retries(self._open_connection, "ssh connect") if ssh_connection is None: raise IOError("Failed to ssh connect after 3 attempts") return ssh_connection @@ -338,7 +336,8 @@ def _create_ssh_connection(self): self._dest_username) return ssh_connection - def _run_with_retries(self, func, name, num_retries=3): + def _run_with_retries(self, func, name): + num_retries = self.attrs.get("num_ssh_retries", 3) res = None for _ in range(num_retries): res = func() @@ -376,8 +375,7 @@ def move(self): def copy(self): """Upload the file.""" - retries = 3 - _ = self._run_with_retries(self._copy, "SCP copy", num_retries=retries) + _ = self._run_with_retries(self._copy, "SCP copy") def _copy(self): from scp import SCPError diff --git a/trollmoves/server.py b/trollmoves/server.py index e1944d39..a7aa6bee 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -579,6 +579,7 @@ def _read_ini_config(filename): _parse_nameserver(res[section], cp_[section]) _parse_addresses(res[section]) _parse_delete(res[section], cp_[section]) + _parse_ssh_retries(res[section], cp_[section]) if not _check_origin_and_listen(res, section): continue if not _check_topic(res, section): @@ -594,6 +595,7 @@ def _set_config_defaults(conf): conf.setdefault("transfer_req_timeout", 10 * DEFAULT_REQ_TIMEOUT) conf.setdefault("ssh_key_filename", None) conf.setdefault("delete", False) + conf.setdefault("num_ssh_retries", 3) def _parse_nameserver(conf, raw_conf): @@ -617,6 +619,12 @@ def _parse_delete(conf, raw_conf): conf["delete"] = val +def _parse_ssh_retries(conf, raw_conf): + val = raw_conf.getint("num_ssh_retries") + if val is not None: + conf["num_ssh_retries"] = val + + def _check_origin_and_listen(res, section): if ("origin" not in res[section]) and ('listen' not in res[section]): LOGGER.warning("Incomplete section %s: add an 'origin' or 'listen' item.", section) From 19417531a99d104ca8bc69c6790f6f738b67df7e Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 24 Jan 2023 13:43:59 +0200 Subject: [PATCH 7/8] Fix exception name --- trollmoves/movers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index bbbe2f19..ab7dccae 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -378,7 +378,7 @@ def copy(self): _ = self._run_with_retries(self._copy, "SCP copy") def _copy(self): - from scp import SCPError + from scp import SCPException success = False try: @@ -393,7 +393,7 @@ def _copy(self): else: LOGGER.error("OSError in scp.put: %s", str(osex)) raise - except SCPError as err: + except SCPException as err: LOGGER.error("SCP failed: %s", str(err)) except Exception as err: LOGGER.error("Something went wrong with scp: %s", str(err)) From ea70de18cd00e0e504ead34ebc0a4f0516ee411b Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 24 Jan 2023 15:14:45 +0200 Subject: [PATCH 8/8] Test Server default and ssh retry config options --- trollmoves/tests/test_server.py | 48 +++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index fc3090d9..3fd6bbe9 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -300,3 +300,51 @@ def test_requestmanager_is_delete_set_True(patch_validate_file_pattern): port = 9876 req_man = RequestManager(port, attrs={'delete': True}) assert req_man._is_delete_set() is True + + +CONFIG_MINIMAL = """ +[test] +origin = foo +listen = bar +""" +CONFIG_NUM_SSH_RETRIES = CONFIG_MINIMAL + """ +num_ssh_retries = 5 +""" + + +def test_config_defaults(): + """Test that config defaults are set.""" + from trollmoves.server import read_config + + with NamedTemporaryFile(mode='w') as tmp_file: + tmp_file.write(CONFIG_MINIMAL) + tmp_file.file.flush() + + config = read_config(tmp_file.name) + + test_section = config["test"] + assert "origin" in test_section + assert "listen" in test_section + assert test_section["working_directory"] is None + assert test_section["compression"] is False + assert test_section["req_timeout"] == 1 + assert test_section["transfer_req_timeout"] == 10 + assert test_section["ssh_key_filename"] is None + assert test_section["delete"] is False + assert test_section["num_ssh_retries"] == 3 + assert test_section["nameserver"] is None + assert test_section["addresses"] is None + + +def test_config_num_ssh_retries(): + """Test that config defaults are set.""" + from trollmoves.server import read_config + + with NamedTemporaryFile(mode='w') as tmp_file: + tmp_file.write(CONFIG_NUM_SSH_RETRIES) + tmp_file.file.flush() + + config = read_config(tmp_file.name) + + test_section = config["test"] + assert test_section["num_ssh_retries"] == 5