From dd8151236d3383e2252c783419844efdc46efb32 Mon Sep 17 00:00:00 2001 From: Timofey Barmin Date: Fri, 21 Apr 2023 20:05:55 -0700 Subject: [PATCH 1/2] Add support for xmlsec1 1.3 --- src/saml2/sigver.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 01a12a71d..4609dc461 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -471,18 +471,25 @@ def import_rsa_key_from_file(filename): return key -def parse_xmlsec_output(output): +def parse_xmlsec_verify_output(xmlsec_vsn, output): """Parse the output from xmlsec to try to find out if the command was successfull or not. :param output: The output from Popen :return: A boolean; True if the command was a success otherwise False """ - for line in output.splitlines(): - if line == "OK": - return True - elif line == "FAIL": - raise XmlsecError(output) + if xmlsec_vsn < (1, 3): + for line in output.splitlines(): + if line == "OK": + return True + elif line == "FAIL": + raise XmlsecError(output) + else: + for line in output.splitlines(): + if line == 'Verification status: OK': + return True + elif line == 'Verification status: FAILED': + raise XmlsecError(output) raise XmlsecError(output) @@ -629,6 +636,9 @@ def __init__(self, xmlsec_binary, delete_tmpfiles=True, **kwargs): raise ValueError("xmlsec_binary should be of type string") self.xmlsec = xmlsec_binary self.delete_tmpfiles = delete_tmpfiles + vsn = self.version() + [maj_num_str, min_num_str] = vsn.split('.')[0:2] + self.vsn = (int(maj_num_str), int(min_num_str)) try: self.non_xml_crypto = RSACrypto(kwargs["rsa_key"]) except KeyError: @@ -824,7 +834,7 @@ def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_i except XmlsecError as e: raise SignatureError(com_list) from e - return parse_xmlsec_output(stderr) + return parse_xmlsec_verify_output(self.vsn, stderr) def _run_xmlsec(self, com_list, extra_args): """ @@ -836,6 +846,8 @@ def _run_xmlsec(self, com_list, extra_args): """ with NamedTemporaryFile(suffix=".xml") as ntf: com_list.extend(["--output", ntf.name]) + if self.vsn >= (1, 3): + com_list.extend(['--lax-key-search']) com_list += extra_args logger.debug("xmlsec command: %s", " ".join(com_list)) From 6aaf1ec90d8ab14533208ac31fc676eab659d5d8 Mon Sep 17 00:00:00 2001 From: Ivan Kanakarakis Date: Fri, 9 Jun 2023 11:03:03 +0300 Subject: [PATCH 2/2] Add version and version_nums properties to CryptoBackends Signed-off-by: Ivan Kanakarakis --- src/saml2/sigver.py | 32 +++++++++++++++++++++----------- tests/test_40_sigver.py | 32 ++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py index 4609dc461..72f1546b1 100644 --- a/src/saml2/sigver.py +++ b/src/saml2/sigver.py @@ -471,14 +471,14 @@ def import_rsa_key_from_file(filename): return key -def parse_xmlsec_verify_output(xmlsec_vsn, output): +def parse_xmlsec_verify_output(output, version=None): """Parse the output from xmlsec to try to find out if the command was successfull or not. :param output: The output from Popen :return: A boolean; True if the command was a success otherwise False """ - if xmlsec_vsn < (1, 3): + if version is None or version < (1, 3): for line in output.splitlines(): if line == "OK": return True @@ -600,9 +600,18 @@ def verify_redirect_signature(saml_msg, crypto, cert=None, sigkey=None): class CryptoBackend: + @property def version(self): raise NotImplementedError() + @property + def version_nums(self): + try: + vns = tuple(int(t) for t in self.version) + except ValueError: + vns = (0, 0, 0) + return vns + def encrypt(self, text, recv_key, template, key_type): raise NotImplementedError() @@ -636,14 +645,12 @@ def __init__(self, xmlsec_binary, delete_tmpfiles=True, **kwargs): raise ValueError("xmlsec_binary should be of type string") self.xmlsec = xmlsec_binary self.delete_tmpfiles = delete_tmpfiles - vsn = self.version() - [maj_num_str, min_num_str] = vsn.split('.')[0:2] - self.vsn = (int(maj_num_str), int(min_num_str)) try: self.non_xml_crypto = RSACrypto(kwargs["rsa_key"]) except KeyError: pass + @property def version(self): com_list = [self.xmlsec, "--version"] pof = Popen(com_list, stderr=PIPE, stdout=PIPE) @@ -652,7 +659,7 @@ def version(self): try: return content.split(" ")[1] except IndexError: - return "" + return "0.0.0" def encrypt(self, text, recv_key, template, session_key_type, xpath=""): """ @@ -834,7 +841,7 @@ def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_i except XmlsecError as e: raise SignatureError(com_list) from e - return parse_xmlsec_verify_output(self.vsn, stderr) + return parse_xmlsec_verify_output(stderr, self.version_nums) def _run_xmlsec(self, com_list, extra_args): """ @@ -846,7 +853,7 @@ def _run_xmlsec(self, com_list, extra_args): """ with NamedTemporaryFile(suffix=".xml") as ntf: com_list.extend(["--output", ntf.name]) - if self.vsn >= (1, 3): + if self.version_nums >= (1, 3): com_list.extend(['--lax-key-search']) com_list += extra_args @@ -882,10 +889,13 @@ class CryptoBackendXMLSecurity(CryptoBackend): def __init__(self): CryptoBackend.__init__(self) + @property def version(self): - # XXX if XMLSecurity.__init__ included a __version__, that would be - # better than static 0.0 here. - return "XMLSecurity 0.0" + try: + import xmlsec + return xmlsec.__version__ + except (ImportError, AttributeError): + return "0.0.0" def sign_statement(self, statement, node_name, key_file, node_id): """ diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py index f026fa393..0049e1711 100644 --- a/tests/test_40_sigver.py +++ b/tests/test_40_sigver.py @@ -192,7 +192,7 @@ def test_sign_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass) @@ -209,7 +209,7 @@ def test_multiple_signatures_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass, must=True) @@ -498,7 +498,7 @@ def test_sign_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass) @@ -515,7 +515,7 @@ def test_multiple_signatures_assertion(self): assert sass.id == "id-11111" assert time_util.str_to_time(sass.issue_instant) - print(f"Crypto version : {self.sec.crypto.version()}") + print(f"Crypto version : {self.sec.crypto.version}") item = self.sec.check_signature(sass, class_name(sass), sign_ass, must=True) @@ -1079,18 +1079,34 @@ def test_sha256_signing_non_ascii_ava(): def test_xmlsec_output_line_parsing(): output1 = "prefix\nOK\npostfix" - assert sigver.parse_xmlsec_output(output1) + assert sigver.parse_xmlsec_verify_output(output1) output2 = "prefix\nFAIL\npostfix" with raises(sigver.XmlsecError): - sigver.parse_xmlsec_output(output2) + sigver.parse_xmlsec_verify_output(output2) output3 = "prefix\r\nOK\r\npostfix" - assert sigver.parse_xmlsec_output(output3) + assert sigver.parse_xmlsec_verify_output(output3) output4 = "prefix\r\nFAIL\r\npostfix" with raises(sigver.XmlsecError): - sigver.parse_xmlsec_output(output4) + sigver.parse_xmlsec_verify_output(output4) + + +def test_xmlsec_v1_3_x_output_line_parsing(): + output1 = "prefix\nVerification status: OK\npostfix" + assert sigver.parse_xmlsec_verify_output(output1, version=(1, 3)) + + output2 = "prefix\nVerification status: FAILED\npostfix" + with raises(sigver.XmlsecError): + sigver.parse_xmlsec_verify_output(output2, version=(1, 3)) + + output3 = "prefix\r\nVerification status: OK\r\npostfix" + assert sigver.parse_xmlsec_verify_output(output3, version=(1, 3)) + + output4 = "prefix\r\nVerification status: FAILED\r\npostfix" + with raises(sigver.XmlsecError): + sigver.parse_xmlsec_verify_output(output4, version=(1, 3)) def test_cert_trailing_newlines_ignored():