From 53c3d1d0dc71dd3a2eb741587ad4bb07b7484244 Mon Sep 17 00:00:00 2001 From: Koen Vervloesem Date: Fri, 10 Nov 2023 19:17:14 +0100 Subject: [PATCH] Update encryption code to Decoder 1.6.2 --- TheengsGateway/ble_gateway.py | 10 ++++++---- TheengsGateway/decryption.py | 16 +++++++--------- setup.py | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/TheengsGateway/ble_gateway.py b/TheengsGateway/ble_gateway.py index 0eace004..dc60b972 100644 --- a/TheengsGateway/ble_gateway.py +++ b/TheengsGateway/ble_gateway.py @@ -468,8 +468,8 @@ def decode_advertisement( self.hass_presence(decoded_json) # Handle encrypted payload - if decoded_json.get("encr", False): - self.handle_encrypted_advertisement( + if decoded_json.get("encr", 0) > 0: + decoded_json = self.handle_encrypted_advertisement( data_json, decoded_json, ) @@ -512,14 +512,14 @@ def handle_encrypted_advertisement( self, data_json: DataJSONType, decoded_json: DataJSONType, - ) -> None: + ) -> DataJSONType: """Handle encrypted advertisement.""" try: bindkey = bytes.fromhex( self.configuration["bindkeys"][get_address(decoded_json)], ) decryptor = create_decryptor( - decoded_json["model_id"], # type: ignore[arg-type] + decoded_json["encr"], # type: ignore[arg-type] ) decrypted_data = decryptor.decrypt( bindkey, @@ -564,6 +564,8 @@ def handle_encrypted_advertisement( ) except ValueError: logger.exception("Decryption failed") + finally: + return decoded_json # noqa: B012 def run(configuration: dict, config_path: Path) -> None: diff --git a/TheengsGateway/decryption.py b/TheengsGateway/decryption.py index ecdaefad..d3e85d84 100644 --- a/TheengsGateway/decryption.py +++ b/TheengsGateway/decryption.py @@ -136,11 +136,9 @@ def replace_encrypted_data( data_json["servicedata"] = bthome_service_data.hex() -_DECRYPTOR_MODELS = { - "LYWSD03MMC/MJWSD05MMC_PVVX_ENCR": LYWSD03MMC_PVVXDecryptor, - "SBBT_002C_ENCR": BTHomeV2Decryptor, - "SBDW_002C_ENCR": BTHomeV2Decryptor, - "SBMO_003Z_ENCR": BTHomeV2Decryptor, +_DECRYPTORS = { + 1: LYWSD03MMC_PVVXDecryptor, + 2: BTHomeV2Decryptor, } @@ -148,11 +146,11 @@ class UnsupportedEncryptionError(Exception): """Exception raised when trying to decrypt an unsupported device.""" -def create_decryptor(model_id: str) -> AdvertisementDecryptor: - """Return the decryptor class for the given model ID.""" - if model_id not in _DECRYPTOR_MODELS: +def create_decryptor(mode: int) -> AdvertisementDecryptor: + """Return the decryptor class for the given encryption mode.""" + if mode not in _DECRYPTORS: raise UnsupportedEncryptionError - return _DECRYPTOR_MODELS[model_id]() # type: ignore[abstract] + return _DECRYPTORS[mode]() # type: ignore[abstract] def reverse_address(address: str) -> str: diff --git a/setup.py b/setup.py index d0002f44..53a004d4 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ "importlib-metadata", "paho-mqtt>=1.6.1", "pycryptodomex>=3.18.0", - "TheengsDecoder>=1.5.5", + "TheengsDecoder>=1.6.2", ], use_scm_version={"version_scheme": "no-guess-dev"}, setup_requires=["setuptools_scm"],