Skip to content

Commit

Permalink
Remove old impl of registery
Browse files Browse the repository at this point in the history
  • Loading branch information
alex committed Nov 11, 2023
1 parent 4e5d1c9 commit 694564c
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 178 deletions.
140 changes: 2 additions & 138 deletions src/cryptography/hazmat/backends/openssl/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import collections
import contextlib
import itertools
import typing

from cryptography import utils, x509
Expand All @@ -33,27 +32,9 @@
)
from cryptography.hazmat.primitives.ciphers.algorithms import (
AES,
AES128,
AES256,
ARC4,
SM4,
Camellia,
ChaCha20,
TripleDES,
_BlowfishInternal,
_CAST5Internal,
_IDEAInternal,
_SEEDInternal,
)
from cryptography.hazmat.primitives.ciphers.modes import (
CBC,
CFB,
CFB8,
CTR,
ECB,
GCM,
OFB,
XTS,
Mode,
)
from cryptography.hazmat.primitives.serialization.pkcs12 import (
Expand All @@ -69,7 +50,7 @@

# Not actually supported, just used as a marker for some serialization tests.
class _RC2:
pass
key_size = 128


class Backend:
Expand Down Expand Up @@ -132,7 +113,6 @@ def __init__(self) -> None:
tuple[type[CipherAlgorithm], type[Mode]],
typing.Callable,
] = {}
self._register_default_ciphers()
self._dh_types = [self._lib.EVP_PKEY_DH]
if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
self._dh_types.append(self._lib.EVP_PKEY_DHX)
Expand Down Expand Up @@ -220,93 +200,7 @@ def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
if not isinstance(cipher, self._fips_ciphers):
return False

try:
adapter = self._cipher_registry[type(cipher), type(mode)]
except KeyError:
return False
evp_cipher = adapter(self, cipher, mode)
return self._ffi.NULL != evp_cipher

def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
if (cipher_cls, mode_cls) in self._cipher_registry:
raise ValueError(
f"Duplicate registration for: {cipher_cls} {mode_cls}."
)
self._cipher_registry[cipher_cls, mode_cls] = adapter

def _register_default_ciphers(self) -> None:
for cipher_cls in [AES, AES128, AES256]:
for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName(
"{cipher.name}-{cipher.key_size}-{mode.name}"
),
)
for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
self.register_cipher_adapter(
Camellia,
mode_cls,
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
)
for mode_cls in [CBC, CFB, CFB8, OFB]:
self.register_cipher_adapter(
TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
)
self.register_cipher_adapter(
TripleDES, ECB, GetCipherByName("des-ede3")
)
# ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL
# it uses "chacha"
self.register_cipher_adapter(
ChaCha20,
type(None),
GetCipherByName(
"chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20"
),
)
self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
self.register_cipher_adapter(
SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
)
# Don't register legacy ciphers if they're unavailable. Hypothetically
# this wouldn't be necessary because we test availability by seeing if
# we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
# will return a valid pointer even though the cipher is unavailable.
if (
self._binding._legacy_provider_loaded
or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
):
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
_BlowfishInternal,
mode_cls,
GetCipherByName("bf-{mode.name}"),
)
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
_SEEDInternal,
mode_cls,
GetCipherByName("seed-{mode.name}"),
)
for cipher_cls, mode_cls in itertools.product(
[_CAST5Internal, _IDEAInternal],
[CBC, OFB, CFB, ECB],
):
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName("{cipher.name}-{mode.name}"),
)
self.register_cipher_adapter(
ARC4, type(None), GetCipherByName("rc4")
)
# We don't actually support RC2, this is just used by some tests.
self.register_cipher_adapter(
_RC2, type(None), GetCipherByName("rc2")
)
return rust_openssl.ciphers.cipher_supported(cipher, mode)

def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
return self.hmac_supported(algorithm)
Expand Down Expand Up @@ -1108,34 +1002,4 @@ def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]:
return certs


class GetCipherByName:
def __init__(self, fmt: str):
self._fmt = fmt

def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
evp_cipher = backend._lib.EVP_get_cipherbyname(
cipher_name.encode("ascii")
)

# try EVP_CIPHER_fetch if present
if (
evp_cipher == backend._ffi.NULL
and backend._lib.Cryptography_HAS_300_EVP_CIPHER
):
evp_cipher = backend._lib.EVP_CIPHER_fetch(
backend._ffi.NULL,
cipher_name.encode("ascii"),
backend._ffi.NULL,
)

backend._consume_errors()
return evp_cipher


def _get_xts_cipher(backend: Backend, cipher: AES, mode):
cipher_name = f"aes-{cipher.key_size // 2}-xts"
return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))


backend = Backend()
3 changes: 3 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/openssl/ciphers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def create_decryption_ctx(
def create_decryption_ctx(
algorithm: ciphers.CipherAlgorithm, mode: modes.Mode
) -> ciphers.CipherContext: ...
def cipher_supported(
algorithm: ciphers.CipherAlgorithm, mode: modes.Mode
) -> bool: ...
def _advance(
ctx: ciphers.AEADEncryptionContext | ciphers.AEADDecryptionContext, n: int
) -> None: ...
Expand Down
78 changes: 45 additions & 33 deletions src/rust/src/backend/cipher_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ fn get_cipher_registry(
let seed = types::SEED.get(py)?;
let arc4 = types::ARC4.get(py)?;
let chacha20 = types::CHACHA20.get(py)?;
let rc2 = types::RC2.get(py)?;

let cbc = types::CBC.get(py)?;
let cfb = types::CFB.get(py)?;
Expand Down Expand Up @@ -223,42 +224,53 @@ fn get_cipher_registry(
m.add(sm4, ecb, Some(128), Cipher::sm4_ecb())?;
}

#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_SEED"))]
{
m.add(seed, cbc, Some(128), Cipher::seed_cbc())?;
m.add(seed, cfb, Some(128), Cipher::seed_cfb128())?;
m.add(seed, ofb, Some(128), Cipher::seed_ofb())?;
m.add(seed, ecb, Some(128), Cipher::seed_ecb())?;
}

#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_BF"))]
{
m.add(blowfish, cbc, None, Cipher::bf_cbc())?;
m.add(blowfish, cfb, None, Cipher::bf_cfb64())?;
m.add(blowfish, ofb, None, Cipher::bf_ofb())?;
m.add(blowfish, ecb, None, Cipher::bf_ecb())?;
}

#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_CAST"))]
{
m.add(cast5, cbc, None, Cipher::cast5_cbc())?;
m.add(cast5, ecb, None, Cipher::cast5_ecb())?;
m.add(cast5, ofb, None, Cipher::cast5_ofb())?;
m.add(cast5, cfb, None, Cipher::cast5_cfb64())?;
}

#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_IDEA"))]
{
m.add(idea, cbc, Some(128), Cipher::idea_cbc())?;
m.add(idea, ecb, Some(128), Cipher::idea_ecb())?;
m.add(idea, ofb, Some(128), Cipher::idea_ofb())?;
m.add(idea, cfb, Some(128), Cipher::idea_cfb64())?;
}

#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
m.add(chacha20, none_type, None, Cipher::chacha20())?;

m.add(arc4, none_type, None, Cipher::rc4())?;
// Don't register legacy ciphers if they're unavailable. In theory
// this should't be necessary but OpenSSL 3 will return an EVP_CIPHER
// even when the cipher is unavailable.
if types::LEGACY_PROVIDER_LOADED.get(py)?.is_true()?
|| cfg!(not(CRYPTOGRAPHY_OPENSSL_300_OR_GREATER))
{
#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_BF"))]
{
m.add(blowfish, cbc, None, Cipher::bf_cbc())?;
m.add(blowfish, cfb, None, Cipher::bf_cfb64())?;
m.add(blowfish, ofb, None, Cipher::bf_ofb())?;
m.add(blowfish, ecb, None, Cipher::bf_ecb())?;
}
#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_SEED"))]
{
m.add(seed, cbc, Some(128), Cipher::seed_cbc())?;
m.add(seed, cfb, Some(128), Cipher::seed_cfb128())?;
m.add(seed, ofb, Some(128), Cipher::seed_ofb())?;
m.add(seed, ecb, Some(128), Cipher::seed_ecb())?;
}

#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_CAST"))]
{
m.add(cast5, cbc, None, Cipher::cast5_cbc())?;
m.add(cast5, ecb, None, Cipher::cast5_ecb())?;
m.add(cast5, ofb, None, Cipher::cast5_ofb())?;
m.add(cast5, cfb, None, Cipher::cast5_cfb64())?;
}

#[cfg(not(CRYPTOGRAPHY_OSSLCONF = "OPENSSL_NO_IDEA"))]
{
m.add(idea, cbc, Some(128), Cipher::idea_cbc())?;
m.add(idea, ecb, Some(128), Cipher::idea_ecb())?;
m.add(idea, ofb, Some(128), Cipher::idea_ofb())?;
m.add(idea, cfb, Some(128), Cipher::idea_cfb64())?;
}

m.add(arc4, none_type, None, Cipher::rc4())?;

// We don't actually support RC2, this is just used by some tests.
if let Some(rc2_cbc) = Cipher::from_nid(openssl::nid::Nid::RC2_CBC) {
m.add(rc2, cbc, None, rc2_cbc)?;
}
}

Ok(m.build())
})
Expand Down
10 changes: 10 additions & 0 deletions src/rust/src/backend/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,15 @@ fn create_decryption_ctx(
}
}

#[pyo3::prelude::pyfunction]
fn cipher_supported(
py: pyo3::Python<'_>,
algorithm: &pyo3::PyAny,
mode: &pyo3::PyAny,
) -> CryptographyResult<bool> {
Ok(cipher_registry::get_cipher(py, algorithm, mode.get_type())?.is_some())
}

#[pyo3::prelude::pyfunction]
fn _advance(ctx: &pyo3::PyAny, n: u64) {
if let Ok(c) = ctx.downcast::<pyo3::PyCell<PyAEADEncryptionContext>>() {
Expand All @@ -519,6 +528,7 @@ pub(crate) fn create_module(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelu
let m = pyo3::prelude::PyModule::new(py, "ciphers")?;
m.add_function(pyo3::wrap_pyfunction!(create_encryption_ctx, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(create_decryption_ctx, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(cipher_supported, m)?)?;

m.add_function(pyo3::wrap_pyfunction!(_advance, m)?)?;
m.add_function(pyo3::wrap_pyfunction!(_advance_aad, m)?)?;
Expand Down
7 changes: 7 additions & 0 deletions src/rust/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ pub static ARC4: LazyPyImport = LazyPyImport::new(
"cryptography.hazmat.primitives.ciphers.algorithms",
&["ARC4"],
);
pub static RC2: LazyPyImport =
LazyPyImport::new("cryptography.hazmat.backends.openssl.backend", &["_RC2"]);

pub static MODE_WITH_INITIALIZATION_VECTOR: LazyPyImport = LazyPyImport::new(
"cryptography.hazmat.primitives.ciphers.modes",
Expand Down Expand Up @@ -576,6 +578,11 @@ pub static GCM: LazyPyImport =
pub static XTS: LazyPyImport =
LazyPyImport::new("cryptography.hazmat.primitives.ciphers.modes", &["XTS"]);

pub static LEGACY_PROVIDER_LOADED: LazyPyImport = LazyPyImport::new(
"cryptography.hazmat.bindings.openssl.binding",
&["Binding", "_legacy_provider_loaded"],
);

#[cfg(test)]
mod tests {
use super::LazyPyImport;
Expand Down
6 changes: 0 additions & 6 deletions tests/hazmat/backends/test_openssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from cryptography.hazmat.backends.openssl.backend import backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.modes import CBC

from ...doubles import (
DummyAsymmetricPadding,
Expand Down Expand Up @@ -92,10 +90,6 @@ def test_supports_cipher(self):
is False
)

def test_register_duplicate_cipher_adapter(self):
with pytest.raises(ValueError):
backend.register_cipher_adapter(AES, CBC, None)

def test_openssl_assert(self):
backend.openssl_assert(True)
with pytest.raises(InternalError):
Expand Down
5 changes: 4 additions & 1 deletion tests/hazmat/primitives/test_pkcs12.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ed25519,
rsa,
)
from cryptography.hazmat.primitives.ciphers import modes
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
Expand Down Expand Up @@ -94,7 +95,9 @@ def test_load_pkcs12_ec_keys(self, filename, password, backend):
],
)
@pytest.mark.supported(
only_if=lambda backend: backend.cipher_supported(_RC2(), None),
only_if=lambda backend: backend.cipher_supported(
_RC2(), modes.CBC(initialization_vector=b"\x00" * 16)
),
skip_message="Does not support RC2",
)
def test_load_pkcs12_ec_keys_rc2(self, filename, password, backend):
Expand Down

0 comments on commit 694564c

Please sign in to comment.