From 694564c2676c2664da23f1adc6e7f86749e87ba3 Mon Sep 17 00:00:00 2001 From: Alex Gaynor Date: Sat, 11 Nov 2023 17:06:45 -0500 Subject: [PATCH] Remove old impl of registery --- .../hazmat/backends/openssl/backend.py | 140 +----------------- .../hazmat/bindings/_rust/openssl/ciphers.pyi | 3 + src/rust/src/backend/cipher_registry.rs | 78 +++++----- src/rust/src/backend/ciphers.rs | 10 ++ src/rust/src/types.rs | 7 + tests/hazmat/backends/test_openssl.py | 6 - tests/hazmat/primitives/test_pkcs12.py | 5 +- 7 files changed, 71 insertions(+), 178 deletions(-) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 43ff4336062e9..d27ddc452b7a3 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -6,7 +6,6 @@ import collections import contextlib -import itertools import typing from cryptography import utils, x509 @@ -33,27 +32,9 @@ ) from cryptography.hazmat.primitives.ciphers.algorithms import ( AES, - AES128, - AES256, - ARC4, - SM4, - Camellia, - ChaCha20, - TripleDES, - _BlowfishInternal, - _CAST5Internal, - _IDEAInternal, - _SEEDInternal, ) from cryptography.hazmat.primitives.ciphers.modes import ( CBC, - CFB, - CFB8, - CTR, - ECB, - GCM, - OFB, - XTS, Mode, ) from cryptography.hazmat.primitives.serialization.pkcs12 import ( @@ -69,7 +50,7 @@ # Not actually supported, just used as a marker for some serialization tests. class _RC2: - pass + key_size = 128 class Backend: @@ -132,7 +113,6 @@ def __init__(self) -> None: tuple[type[CipherAlgorithm], type[Mode]], typing.Callable, ] = {} - self._register_default_ciphers() self._dh_types = [self._lib.EVP_PKEY_DH] if self._lib.Cryptography_HAS_EVP_PKEY_DHX: self._dh_types.append(self._lib.EVP_PKEY_DHX) @@ -220,93 +200,7 @@ def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: if not isinstance(cipher, self._fips_ciphers): return False - try: - adapter = self._cipher_registry[type(cipher), type(mode)] - except KeyError: - return False - evp_cipher = adapter(self, cipher, mode) - return self._ffi.NULL != evp_cipher - - def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: - if (cipher_cls, mode_cls) in self._cipher_registry: - raise ValueError( - f"Duplicate registration for: {cipher_cls} {mode_cls}." - ) - self._cipher_registry[cipher_cls, mode_cls] = adapter - - def _register_default_ciphers(self) -> None: - for cipher_cls in [AES, AES128, AES256]: - for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: - self.register_cipher_adapter( - cipher_cls, - mode_cls, - GetCipherByName( - "{cipher.name}-{cipher.key_size}-{mode.name}" - ), - ) - for mode_cls in [CBC, CTR, ECB, OFB, CFB]: - self.register_cipher_adapter( - Camellia, - mode_cls, - GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), - ) - for mode_cls in [CBC, CFB, CFB8, OFB]: - self.register_cipher_adapter( - TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") - ) - self.register_cipher_adapter( - TripleDES, ECB, GetCipherByName("des-ede3") - ) - # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL - # it uses "chacha" - self.register_cipher_adapter( - ChaCha20, - type(None), - GetCipherByName( - "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20" - ), - ) - self.register_cipher_adapter(AES, XTS, _get_xts_cipher) - for mode_cls in [ECB, CBC, OFB, CFB, CTR]: - self.register_cipher_adapter( - SM4, mode_cls, GetCipherByName("sm4-{mode.name}") - ) - # Don't register legacy ciphers if they're unavailable. Hypothetically - # this wouldn't be necessary because we test availability by seeing if - # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 - # will return a valid pointer even though the cipher is unavailable. - if ( - self._binding._legacy_provider_loaded - or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER - ): - for mode_cls in [CBC, CFB, OFB, ECB]: - self.register_cipher_adapter( - _BlowfishInternal, - mode_cls, - GetCipherByName("bf-{mode.name}"), - ) - for mode_cls in [CBC, CFB, OFB, ECB]: - self.register_cipher_adapter( - _SEEDInternal, - mode_cls, - GetCipherByName("seed-{mode.name}"), - ) - for cipher_cls, mode_cls in itertools.product( - [_CAST5Internal, _IDEAInternal], - [CBC, OFB, CFB, ECB], - ): - self.register_cipher_adapter( - cipher_cls, - mode_cls, - GetCipherByName("{cipher.name}-{mode.name}"), - ) - self.register_cipher_adapter( - ARC4, type(None), GetCipherByName("rc4") - ) - # We don't actually support RC2, this is just used by some tests. - self.register_cipher_adapter( - _RC2, type(None), GetCipherByName("rc2") - ) + return rust_openssl.ciphers.cipher_supported(cipher, mode) def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: return self.hmac_supported(algorithm) @@ -1108,34 +1002,4 @@ def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]: return certs -class GetCipherByName: - def __init__(self, fmt: str): - self._fmt = fmt - - def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): - cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() - evp_cipher = backend._lib.EVP_get_cipherbyname( - cipher_name.encode("ascii") - ) - - # try EVP_CIPHER_fetch if present - if ( - evp_cipher == backend._ffi.NULL - and backend._lib.Cryptography_HAS_300_EVP_CIPHER - ): - evp_cipher = backend._lib.EVP_CIPHER_fetch( - backend._ffi.NULL, - cipher_name.encode("ascii"), - backend._ffi.NULL, - ) - - backend._consume_errors() - return evp_cipher - - -def _get_xts_cipher(backend: Backend, cipher: AES, mode): - cipher_name = f"aes-{cipher.key_size // 2}-xts" - return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) - - backend = Backend() diff --git a/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi b/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi index a64d4c755abb6..759f3b591cba6 100644 --- a/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi +++ b/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi @@ -23,6 +23,9 @@ def create_decryption_ctx( def create_decryption_ctx( algorithm: ciphers.CipherAlgorithm, mode: modes.Mode ) -> ciphers.CipherContext: ... +def cipher_supported( + algorithm: ciphers.CipherAlgorithm, mode: modes.Mode +) -> bool: ... def _advance( ctx: ciphers.AEADEncryptionContext | ciphers.AEADDecryptionContext, n: int ) -> None: ... diff --git a/src/rust/src/backend/cipher_registry.rs b/src/rust/src/backend/cipher_registry.rs index e576884539453..a725296604dbd 100644 --- a/src/rust/src/backend/cipher_registry.rs +++ b/src/rust/src/backend/cipher_registry.rs @@ -112,6 +112,7 @@ fn get_cipher_registry( let seed = types::SEED.get(py)?; let arc4 = types::ARC4.get(py)?; let chacha20 = types::CHACHA20.get(py)?; + let rc2 = types::RC2.get(py)?; let cbc = types::CBC.get(py)?; let cfb = types::CFB.get(py)?; @@ -223,42 +224,53 @@ fn get_cipher_registry( m.add(sm4, ecb, Some(128), Cipher::sm4_ecb())?; } - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_SEED"))] - { - m.add(seed, cbc, Some(128), Cipher::seed_cbc())?; - m.add(seed, cfb, Some(128), Cipher::seed_cfb128())?; - m.add(seed, ofb, Some(128), Cipher::seed_ofb())?; - m.add(seed, ecb, Some(128), Cipher::seed_ecb())?; - } - - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_BF"))] - { - m.add(blowfish, cbc, None, Cipher::bf_cbc())?; - m.add(blowfish, cfb, None, Cipher::bf_cfb64())?; - m.add(blowfish, ofb, None, Cipher::bf_ofb())?; - m.add(blowfish, ecb, None, Cipher::bf_ecb())?; - } - - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_CAST"))] - { - m.add(cast5, cbc, None, Cipher::cast5_cbc())?; - m.add(cast5, ecb, None, Cipher::cast5_ecb())?; - m.add(cast5, ofb, None, Cipher::cast5_ofb())?; - m.add(cast5, cfb, None, Cipher::cast5_cfb64())?; - } - - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_IDEA"))] - { - m.add(idea, cbc, Some(128), Cipher::idea_cbc())?; - m.add(idea, ecb, Some(128), Cipher::idea_ecb())?; - m.add(idea, ofb, Some(128), Cipher::idea_ofb())?; - m.add(idea, cfb, Some(128), Cipher::idea_cfb64())?; - } - #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] m.add(chacha20, none_type, None, Cipher::chacha20())?; - m.add(arc4, none_type, None, Cipher::rc4())?; + // Don't register legacy ciphers if they're unavailable. In theory + // this should't be necessary but OpenSSL 3 will return an EVP_CIPHER + // even when the cipher is unavailable. + if types::LEGACY_PROVIDER_LOADED.get(py)?.is_true()? + || cfg!(not(CRYPTOGRAPHY_OPENSSL_300_OR_GREATER)) + { + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_BF"))] + { + m.add(blowfish, cbc, None, Cipher::bf_cbc())?; + m.add(blowfish, cfb, None, Cipher::bf_cfb64())?; + m.add(blowfish, ofb, None, Cipher::bf_ofb())?; + m.add(blowfish, ecb, None, Cipher::bf_ecb())?; + } + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_SEED"))] + { + m.add(seed, cbc, Some(128), Cipher::seed_cbc())?; + m.add(seed, cfb, Some(128), Cipher::seed_cfb128())?; + m.add(seed, ofb, Some(128), Cipher::seed_ofb())?; + m.add(seed, ecb, Some(128), Cipher::seed_ecb())?; + } + + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_CAST"))] + { + m.add(cast5, cbc, None, Cipher::cast5_cbc())?; + m.add(cast5, ecb, None, Cipher::cast5_ecb())?; + m.add(cast5, ofb, None, Cipher::cast5_ofb())?; + m.add(cast5, cfb, None, Cipher::cast5_cfb64())?; + } + + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_IDEA"))] + { + m.add(idea, cbc, Some(128), Cipher::idea_cbc())?; + m.add(idea, ecb, Some(128), Cipher::idea_ecb())?; + m.add(idea, ofb, Some(128), Cipher::idea_ofb())?; + m.add(idea, cfb, Some(128), Cipher::idea_cfb64())?; + } + + m.add(arc4, none_type, None, Cipher::rc4())?; + + // We don't actually support RC2, this is just used by some tests. + if let Some(rc2_cbc) = Cipher::from_nid(openssl::nid::Nid::RC2_CBC) { + m.add(rc2, cbc, None, rc2_cbc)?; + } + } Ok(m.build()) }) diff --git a/src/rust/src/backend/ciphers.rs b/src/rust/src/backend/ciphers.rs index a5b2c377a7803..b2494484895b1 100644 --- a/src/rust/src/backend/ciphers.rs +++ b/src/rust/src/backend/ciphers.rs @@ -497,6 +497,15 @@ fn create_decryption_ctx( } } +#[pyo3::prelude::pyfunction] +fn cipher_supported( + py: pyo3::Python<'_>, + algorithm: &pyo3::PyAny, + mode: &pyo3::PyAny, +) -> CryptographyResult { + Ok(cipher_registry::get_cipher(py, algorithm, mode.get_type())?.is_some()) +} + #[pyo3::prelude::pyfunction] fn _advance(ctx: &pyo3::PyAny, n: u64) { if let Ok(c) = ctx.downcast::>() { @@ -519,6 +528,7 @@ pub(crate) fn create_module(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelu let m = pyo3::prelude::PyModule::new(py, "ciphers")?; m.add_function(pyo3::wrap_pyfunction!(create_encryption_ctx, m)?)?; m.add_function(pyo3::wrap_pyfunction!(create_decryption_ctx, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(cipher_supported, m)?)?; m.add_function(pyo3::wrap_pyfunction!(_advance, m)?)?; m.add_function(pyo3::wrap_pyfunction!(_advance_aad, m)?)?; diff --git a/src/rust/src/types.rs b/src/rust/src/types.rs index a8b84801d5dac..b6e54a8c70f52 100644 --- a/src/rust/src/types.rs +++ b/src/rust/src/types.rs @@ -542,6 +542,8 @@ pub static ARC4: LazyPyImport = LazyPyImport::new( "cryptography.hazmat.primitives.ciphers.algorithms", &["ARC4"], ); +pub static RC2: LazyPyImport = + LazyPyImport::new("cryptography.hazmat.backends.openssl.backend", &["_RC2"]); pub static MODE_WITH_INITIALIZATION_VECTOR: LazyPyImport = LazyPyImport::new( "cryptography.hazmat.primitives.ciphers.modes", @@ -576,6 +578,11 @@ pub static GCM: LazyPyImport = pub static XTS: LazyPyImport = LazyPyImport::new("cryptography.hazmat.primitives.ciphers.modes", &["XTS"]); +pub static LEGACY_PROVIDER_LOADED: LazyPyImport = LazyPyImport::new( + "cryptography.hazmat.bindings.openssl.binding", + &["Binding", "_legacy_provider_loaded"], +); + #[cfg(test)] mod tests { use super::LazyPyImport; diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index e81b8f8a2ef18..73ea286fb75f9 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -13,8 +13,6 @@ from cryptography.hazmat.backends.openssl.backend import backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.ciphers.modes import CBC from ...doubles import ( DummyAsymmetricPadding, @@ -92,10 +90,6 @@ def test_supports_cipher(self): is False ) - def test_register_duplicate_cipher_adapter(self): - with pytest.raises(ValueError): - backend.register_cipher_adapter(AES, CBC, None) - def test_openssl_assert(self): backend.openssl_assert(True) with pytest.raises(InternalError): diff --git a/tests/hazmat/primitives/test_pkcs12.py b/tests/hazmat/primitives/test_pkcs12.py index 2159242bb2636..714cd49a179fa 100644 --- a/tests/hazmat/primitives/test_pkcs12.py +++ b/tests/hazmat/primitives/test_pkcs12.py @@ -19,6 +19,7 @@ ed25519, rsa, ) +from cryptography.hazmat.primitives.ciphers import modes from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, @@ -94,7 +95,9 @@ def test_load_pkcs12_ec_keys(self, filename, password, backend): ], ) @pytest.mark.supported( - only_if=lambda backend: backend.cipher_supported(_RC2(), None), + only_if=lambda backend: backend.cipher_supported( + _RC2(), modes.CBC(initialization_vector=b"\x00" * 16) + ), skip_message="Does not support RC2", ) def test_load_pkcs12_ec_keys_rc2(self, filename, password, backend):