From 76e3858ad62584a098caec9086b253b880ce5125 Mon Sep 17 00:00:00 2001 From: Alex Gaynor Date: Sat, 11 Nov 2023 17:06:45 -0500 Subject: [PATCH] Remove old impl of registery --- .../hazmat/backends/openssl/backend.py | 145 +----------------- .../hazmat/bindings/_rust/openssl/ciphers.pyi | 3 + src/rust/src/backend/cipher_registry.rs | 72 +++++---- src/rust/src/backend/ciphers.rs | 10 ++ src/rust/src/buf.rs | 18 +-- src/rust/src/types.rs | 7 + tests/hazmat/backends/test_openssl.py | 6 - tests/hazmat/primitives/test_pkcs12.py | 5 +- 8 files changed, 77 insertions(+), 189 deletions(-) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index d35c0a4a8a27..ba3473b6befe 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -6,7 +6,6 @@ import collections import contextlib -import itertools import typing from cryptography import utils, x509 @@ -32,27 +31,9 @@ ) from cryptography.hazmat.primitives.ciphers.algorithms import ( AES, - AES128, - AES256, - ARC4, - SM4, - Camellia, - ChaCha20, - TripleDES, - _BlowfishInternal, - _CAST5Internal, - _IDEAInternal, - _SEEDInternal, ) from cryptography.hazmat.primitives.ciphers.modes import ( CBC, - CFB, - CFB8, - CTR, - ECB, - GCM, - OFB, - XTS, Mode, ) from cryptography.hazmat.primitives.serialization.pkcs12 import ( @@ -68,7 +49,7 @@ # Not actually supported, just used as a marker for some serialization tests. class _RC2: - pass + key_size = 128 class Backend: @@ -127,12 +108,6 @@ def __init__(self) -> None: self._lib = self._binding.lib self._fips_enabled = rust_openssl.is_fips_enabled() - self._cipher_registry: dict[ - tuple[type[CipherAlgorithm], type[Mode]], - typing.Callable, - ] = {} - self._register_default_ciphers() - def __repr__(self) -> str: return "".format( self.openssl_version_text(), @@ -216,93 +191,7 @@ def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: if not isinstance(cipher, self._fips_ciphers): return False - try: - adapter = self._cipher_registry[type(cipher), type(mode)] - except KeyError: - return False - evp_cipher = adapter(self, cipher, mode) - return self._ffi.NULL != evp_cipher - - def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: - if (cipher_cls, mode_cls) in self._cipher_registry: - raise ValueError( - f"Duplicate registration for: {cipher_cls} {mode_cls}." - ) - self._cipher_registry[cipher_cls, mode_cls] = adapter - - def _register_default_ciphers(self) -> None: - for cipher_cls in [AES, AES128, AES256]: - for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: - self.register_cipher_adapter( - cipher_cls, - mode_cls, - GetCipherByName( - "{cipher.name}-{cipher.key_size}-{mode.name}" - ), - ) - for mode_cls in [CBC, CTR, ECB, OFB, CFB]: - self.register_cipher_adapter( - Camellia, - mode_cls, - GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), - ) - for mode_cls in [CBC, CFB, CFB8, OFB]: - self.register_cipher_adapter( - TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") - ) - self.register_cipher_adapter( - TripleDES, ECB, GetCipherByName("des-ede3") - ) - # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL - # it uses "chacha" - self.register_cipher_adapter( - ChaCha20, - type(None), - GetCipherByName( - "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20" - ), - ) - self.register_cipher_adapter(AES, XTS, _get_xts_cipher) - for mode_cls in [ECB, CBC, OFB, CFB, CTR, GCM]: - self.register_cipher_adapter( - SM4, mode_cls, GetCipherByName("sm4-{mode.name}") - ) - # Don't register legacy ciphers if they're unavailable. Hypothetically - # this wouldn't be necessary because we test availability by seeing if - # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 - # will return a valid pointer even though the cipher is unavailable. - if ( - self._binding._legacy_provider_loaded - or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER - ): - for mode_cls in [CBC, CFB, OFB, ECB]: - self.register_cipher_adapter( - _BlowfishInternal, - mode_cls, - GetCipherByName("bf-{mode.name}"), - ) - for mode_cls in [CBC, CFB, OFB, ECB]: - self.register_cipher_adapter( - _SEEDInternal, - mode_cls, - GetCipherByName("seed-{mode.name}"), - ) - for cipher_cls, mode_cls in itertools.product( - [_CAST5Internal, _IDEAInternal], - [CBC, OFB, CFB, ECB], - ): - self.register_cipher_adapter( - cipher_cls, - mode_cls, - GetCipherByName("{cipher.name}-{mode.name}"), - ) - self.register_cipher_adapter( - ARC4, type(None), GetCipherByName("rc4") - ) - # We don't actually support RC2, this is just used by some tests. - self.register_cipher_adapter( - _RC2, type(None), GetCipherByName("rc2") - ) + return rust_openssl.ciphers.cipher_supported(cipher, mode) def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: return self.hmac_supported(algorithm) @@ -904,34 +793,4 @@ def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]: return certs -class GetCipherByName: - def __init__(self, fmt: str): - self._fmt = fmt - - def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): - cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() - evp_cipher = backend._lib.EVP_get_cipherbyname( - cipher_name.encode("ascii") - ) - - # try EVP_CIPHER_fetch if present - if ( - evp_cipher == backend._ffi.NULL - and backend._lib.Cryptography_HAS_300_EVP_CIPHER - ): - evp_cipher = backend._lib.EVP_CIPHER_fetch( - backend._ffi.NULL, - cipher_name.encode("ascii"), - backend._ffi.NULL, - ) - - backend._consume_errors() - return evp_cipher - - -def _get_xts_cipher(backend: Backend, cipher: AES, mode): - cipher_name = f"aes-{cipher.key_size // 2}-xts" - return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) - - backend = Backend() diff --git a/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi b/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi index a64d4c755abb..759f3b591cba 100644 --- a/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi +++ b/src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi @@ -23,6 +23,9 @@ def create_decryption_ctx( def create_decryption_ctx( algorithm: ciphers.CipherAlgorithm, mode: modes.Mode ) -> ciphers.CipherContext: ... +def cipher_supported( + algorithm: ciphers.CipherAlgorithm, mode: modes.Mode +) -> bool: ... def _advance( ctx: ciphers.AEADEncryptionContext | ciphers.AEADDecryptionContext, n: int ) -> None: ... diff --git a/src/rust/src/backend/cipher_registry.rs b/src/rust/src/backend/cipher_registry.rs index b4e4d137d21d..23e805fde2df 100644 --- a/src/rust/src/backend/cipher_registry.rs +++ b/src/rust/src/backend/cipher_registry.rs @@ -131,6 +131,7 @@ fn get_cipher_registry( let seed = types::SEED.get(py)?; let arc4 = types::ARC4.get(py)?; let chacha20 = types::CHACHA20.get(py)?; + let rc2 = types::RC2.get(py)?; let cbc = types::CBC.get(py)?; let cfb = types::CFB.get(py)?; @@ -247,42 +248,53 @@ fn get_cipher_registry( } } - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_SEED"))] - { - m.add(seed, cbc, Some(128), Cipher::seed_cbc())?; - m.add(seed, cfb, Some(128), Cipher::seed_cfb128())?; - m.add(seed, ofb, Some(128), Cipher::seed_ofb())?; - m.add(seed, ecb, Some(128), Cipher::seed_ecb())?; - } + #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] + m.add(chacha20, none_type, None, Cipher::chacha20())?; - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_BF"))] + // Don't register legacy ciphers if they're unavailable. In theory + // this should't be necessary but OpenSSL 3 will return an EVP_CIPHER + // even when the cipher is unavailable. + if cfg!(not(CRYPTOGRAPHY_OPENSSL_300_OR_GREATER)) + || types::LEGACY_PROVIDER_LOADED.get(py)?.is_true()? { - m.add(blowfish, cbc, None, Cipher::bf_cbc())?; - m.add(blowfish, cfb, None, Cipher::bf_cfb64())?; - m.add(blowfish, ofb, None, Cipher::bf_ofb())?; - m.add(blowfish, ecb, None, Cipher::bf_ecb())?; - } + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_BF"))] + { + m.add(blowfish, cbc, None, Cipher::bf_cbc())?; + m.add(blowfish, cfb, None, Cipher::bf_cfb64())?; + m.add(blowfish, ofb, None, Cipher::bf_ofb())?; + m.add(blowfish, ecb, None, Cipher::bf_ecb())?; + } + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_SEED"))] + { + m.add(seed, cbc, Some(128), Cipher::seed_cbc())?; + m.add(seed, cfb, Some(128), Cipher::seed_cfb128())?; + m.add(seed, ofb, Some(128), Cipher::seed_ofb())?; + m.add(seed, ecb, Some(128), Cipher::seed_ecb())?; + } - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_CAST"))] - { - m.add(cast5, cbc, None, Cipher::cast5_cbc())?; - m.add(cast5, ecb, None, Cipher::cast5_ecb())?; - m.add(cast5, ofb, None, Cipher::cast5_ofb())?; - m.add(cast5, cfb, None, Cipher::cast5_cfb64())?; - } + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_CAST"))] + { + m.add(cast5, cbc, None, Cipher::cast5_cbc())?; + m.add(cast5, ecb, None, Cipher::cast5_ecb())?; + m.add(cast5, ofb, None, Cipher::cast5_ofb())?; + m.add(cast5, cfb, None, Cipher::cast5_cfb64())?; + } - #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_IDEA"))] - { - m.add(idea, cbc, Some(128), Cipher::idea_cbc())?; - m.add(idea, ecb, Some(128), Cipher::idea_ecb())?; - m.add(idea, ofb, Some(128), Cipher::idea_ofb())?; - m.add(idea, cfb, Some(128), Cipher::idea_cfb64())?; - } + #[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_IDEA"))] + { + m.add(idea, cbc, Some(128), Cipher::idea_cbc())?; + m.add(idea, ecb, Some(128), Cipher::idea_ecb())?; + m.add(idea, ofb, Some(128), Cipher::idea_ofb())?; + m.add(idea, cfb, Some(128), Cipher::idea_cfb64())?; + } - #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] - m.add(chacha20, none_type, None, Cipher::chacha20())?; + m.add(arc4, none_type, None, Cipher::rc4())?; - m.add(arc4, none_type, None, Cipher::rc4())?; + // We don't actually support RC2, this is just used by some tests. + if let Some(rc2_cbc) = Cipher::from_nid(openssl::nid::Nid::RC2_CBC) { + m.add(rc2, cbc, None, rc2_cbc)?; + } + } Ok(m.build()) }) diff --git a/src/rust/src/backend/ciphers.rs b/src/rust/src/backend/ciphers.rs index fdfc183325e5..3695ca1d89df 100644 --- a/src/rust/src/backend/ciphers.rs +++ b/src/rust/src/backend/ciphers.rs @@ -523,6 +523,15 @@ fn create_decryption_ctx( } } +#[pyo3::prelude::pyfunction] +fn cipher_supported( + py: pyo3::Python<'_>, + algorithm: &pyo3::PyAny, + mode: &pyo3::PyAny, +) -> CryptographyResult { + Ok(cipher_registry::get_cipher(py, algorithm, mode.get_type())?.is_some()) +} + #[pyo3::prelude::pyfunction] fn _advance(ctx: &pyo3::PyAny, n: u64) { if let Ok(c) = ctx.downcast::>() { @@ -545,6 +554,7 @@ pub(crate) fn create_module(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelu let m = pyo3::prelude::PyModule::new(py, "ciphers")?; m.add_function(pyo3::wrap_pyfunction!(create_encryption_ctx, m)?)?; m.add_function(pyo3::wrap_pyfunction!(create_decryption_ctx, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(cipher_supported, m)?)?; m.add_function(pyo3::wrap_pyfunction!(_advance, m)?)?; m.add_function(pyo3::wrap_pyfunction!(_advance_aad, m)?)?; diff --git a/src/rust/src/buf.rs b/src/rust/src/buf.rs index 9399f71e3dc5..9cce2bb3b7a5 100644 --- a/src/rust/src/buf.rs +++ b/src/rust/src/buf.rs @@ -72,15 +72,9 @@ impl<'a> pyo3::conversion::FromPyObject<'a> for CffiMutBuf<'a> { .extract()?; let len = bufobj.len()?; - let ptr = if len == 0 { - ptr::NonNull::dangling().as_ptr() + let buf = if len == 0 { + &mut [] } else { - ptrval as *mut u8 - }; - - Ok(CffiMutBuf { - _pyobj: pyobj, - _bufobj: bufobj, // SAFETY: _extract_buffer_length ensures that we have a valid ptr // and length (and we ensure we meet slice's requirements for // 0-length slices above), we're keeping pyobj alive which ensures @@ -89,7 +83,13 @@ impl<'a> pyo3::conversion::FromPyObject<'a> for CffiMutBuf<'a> { // https://alexgaynor.net/2022/oct/23/buffers-on-the-edge/ // for details. This is the same as our cffi status quo ante, so // we're doing an unsound thing and living with it. - buf: unsafe { slice::from_raw_parts_mut(ptr, len) }, + unsafe { slice::from_raw_parts_mut(ptrval as *mut u8, len) } + }; + + Ok(CffiMutBuf { + _pyobj: pyobj, + _bufobj: bufobj, + buf, }) } } diff --git a/src/rust/src/types.rs b/src/rust/src/types.rs index 6dce3f878d96..058a0683d45b 100644 --- a/src/rust/src/types.rs +++ b/src/rust/src/types.rs @@ -500,6 +500,8 @@ pub static ARC4: LazyPyImport = LazyPyImport::new( "cryptography.hazmat.primitives.ciphers.algorithms", &["ARC4"], ); +pub static RC2: LazyPyImport = + LazyPyImport::new("cryptography.hazmat.backends.openssl.backend", &["_RC2"]); pub static MODE_WITH_INITIALIZATION_VECTOR: LazyPyImport = LazyPyImport::new( "cryptography.hazmat.primitives.ciphers.modes", @@ -534,6 +536,11 @@ pub static GCM: LazyPyImport = pub static XTS: LazyPyImport = LazyPyImport::new("cryptography.hazmat.primitives.ciphers.modes", &["XTS"]); +pub static LEGACY_PROVIDER_LOADED: LazyPyImport = LazyPyImport::new( + "cryptography.hazmat.bindings.openssl.binding", + &["Binding", "_legacy_provider_loaded"], +); + #[cfg(test)] mod tests { use super::LazyPyImport; diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index 012ba5c4c807..0fa09e0fba0a 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -14,8 +14,6 @@ from cryptography.hazmat.bindings._rust import openssl as rust_openssl from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.ciphers.modes import CBC from ...doubles import ( DummyAsymmetricPadding, @@ -93,10 +91,6 @@ def test_supports_cipher(self): is False ) - def test_register_duplicate_cipher_adapter(self): - with pytest.raises(ValueError): - backend.register_cipher_adapter(AES, CBC, None) - def test_openssl_assert(self): backend.openssl_assert(True) with pytest.raises(InternalError): diff --git a/tests/hazmat/primitives/test_pkcs12.py b/tests/hazmat/primitives/test_pkcs12.py index cd9c279ac4b0..f5a092ff5416 100644 --- a/tests/hazmat/primitives/test_pkcs12.py +++ b/tests/hazmat/primitives/test_pkcs12.py @@ -19,6 +19,7 @@ ed25519, rsa, ) +from cryptography.hazmat.primitives.ciphers import modes from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, @@ -81,7 +82,9 @@ def test_load_pkcs12_ec_keys(self, filename, password, backend): ], ) @pytest.mark.supported( - only_if=lambda backend: backend.cipher_supported(_RC2(), None), + only_if=lambda backend: backend.cipher_supported( + _RC2(), modes.CBC(initialization_vector=b"\x00" * 16) + ), skip_message="Does not support RC2", ) def test_load_pkcs12_ec_keys_rc2(self, filename, password, backend):