Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert symmetric ciphers to Rust #9859

Merged
merged 1 commit into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 3 additions & 161 deletions src/cryptography/hazmat/backends/openssl/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,12 @@

import collections
import contextlib
import itertools
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.bindings.openssl import binding
from cryptography.hazmat.decrepit.ciphers.algorithms import (
ARC4,
CAST5,
IDEA,
RC2,
SEED,
Blowfish,
TripleDES,
)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric import ec
Expand All @@ -41,21 +30,9 @@
)
from cryptography.hazmat.primitives.ciphers.algorithms import (
AES,
AES128,
AES256,
SM4,
Camellia,
ChaCha20,
)
from cryptography.hazmat.primitives.ciphers.modes import (
CBC,
CFB,
CFB8,
CTR,
ECB,
GCM,
OFB,
XTS,
Mode,
)
from cryptography.hazmat.primitives.serialization.pkcs12 import (
Expand Down Expand Up @@ -113,25 +90,15 @@ def __init__(self) -> None:
self._lib = self._binding.lib
self._fips_enabled = rust_openssl.is_fips_enabled()

self._cipher_registry: dict[
tuple[type[CipherAlgorithm], type[Mode]],
typing.Callable,
] = {}
self._register_default_ciphers()

def __repr__(self) -> str:
return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format(
self.openssl_version_text(),
self._fips_enabled,
rust_openssl._legacy_provider_loaded,
)

def openssl_assert(
self,
ok: bool,
errors: list[rust_openssl.OpenSSLError] | None = None,
) -> None:
return binding._openssl_assert(ok, errors=errors)
def openssl_assert(self, ok: bool) -> None:
return binding._openssl_assert(ok)

def _enable_fips(self) -> None:
# This function enables FIPS mode for OpenSSL 3.0.0 on installs that
Expand Down Expand Up @@ -204,102 +171,7 @@ def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
if not isinstance(cipher, self._fips_ciphers):
return False

try:
adapter = self._cipher_registry[type(cipher), type(mode)]
except KeyError:
return False
evp_cipher = adapter(self, cipher, mode)
return self._ffi.NULL != evp_cipher

def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None:
if (cipher_cls, mode_cls) in self._cipher_registry:
raise ValueError(
f"Duplicate registration for: {cipher_cls} {mode_cls}."
)
self._cipher_registry[cipher_cls, mode_cls] = adapter

def _register_default_ciphers(self) -> None:
for cipher_cls in [AES, AES128, AES256]:
for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName(
"{cipher.name}-{cipher.key_size}-{mode.name}"
),
)
for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
self.register_cipher_adapter(
Camellia,
mode_cls,
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
)
for mode_cls in [CBC, CFB, CFB8, OFB]:
self.register_cipher_adapter(
TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
)
self.register_cipher_adapter(
TripleDES, ECB, GetCipherByName("des-ede3")
)
# ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL
# it uses "chacha"
self.register_cipher_adapter(
ChaCha20,
type(None),
GetCipherByName(
"chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20"
),
)
self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
for mode_cls in [ECB, CBC, OFB, CFB, CTR, GCM]:
self.register_cipher_adapter(
SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
)
# Don't register legacy ciphers if they're unavailable. Hypothetically
# this wouldn't be necessary because we test availability by seeing if
# we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3
# will return a valid pointer even though the cipher is unavailable.
if (
rust_openssl._legacy_provider_loaded
or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
):
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
Blowfish,
mode_cls,
GetCipherByName("bf-{mode.name}"),
)
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
SEED,
mode_cls,
GetCipherByName("seed-{mode.name}"),
)
for cipher_cls, mode_cls in itertools.product(
[CAST5, IDEA],
[CBC, OFB, CFB, ECB],
):
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName("{cipher.name}-{mode.name}"),
)
self.register_cipher_adapter(
ARC4, type(None), GetCipherByName("rc4")
)
self.register_cipher_adapter(
RC2, CBC, GetCipherByName("{cipher.name}-{mode.name}")
)

def create_symmetric_encryption_ctx(
self, cipher: CipherAlgorithm, mode: Mode
) -> _CipherContext:
return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)

def create_symmetric_decryption_ctx(
self, cipher: CipherAlgorithm, mode: Mode
) -> _CipherContext:
return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
return rust_openssl.ciphers.cipher_supported(cipher, mode)

def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
return self.hmac_supported(algorithm)
Expand Down Expand Up @@ -834,34 +706,4 @@ def pkcs7_supported(self) -> bool:
return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL


class GetCipherByName:
def __init__(self, fmt: str):
self._fmt = fmt

def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
evp_cipher = backend._lib.EVP_get_cipherbyname(
cipher_name.encode("ascii")
)

# try EVP_CIPHER_fetch if present
if (
evp_cipher == backend._ffi.NULL
and backend._lib.Cryptography_HAS_300_EVP_CIPHER
):
evp_cipher = backend._lib.EVP_CIPHER_fetch(
backend._ffi.NULL,
cipher_name.encode("ascii"),
backend._ffi.NULL,
)

backend._consume_errors()
return evp_cipher


def _get_xts_cipher(backend: Backend, cipher: AES, mode):
cipher_name = f"aes-{cipher.key_size // 2}-xts"
return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))


backend = Backend()
Loading