Skip to content

Commit

Permalink
Update encryption code to Decoder 1.6.2 (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvervloesem authored Nov 10, 2023
1 parent 717e7d6 commit 48ebddc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
10 changes: 6 additions & 4 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ def decode_advertisement(
self.hass_presence(decoded_json)

# Handle encrypted payload
if decoded_json.get("encr", False):
self.handle_encrypted_advertisement(
if decoded_json.get("encr", 0) > 0:
decoded_json = self.handle_encrypted_advertisement(
data_json,
decoded_json,
)
Expand Down Expand Up @@ -512,14 +512,14 @@ def handle_encrypted_advertisement(
self,
data_json: DataJSONType,
decoded_json: DataJSONType,
) -> None:
) -> DataJSONType:
"""Handle encrypted advertisement."""
try:
bindkey = bytes.fromhex(
self.configuration["bindkeys"][get_address(decoded_json)],
)
decryptor = create_decryptor(
decoded_json["model_id"], # type: ignore[arg-type]
decoded_json["encr"], # type: ignore[arg-type]
)
decrypted_data = decryptor.decrypt(
bindkey,
Expand Down Expand Up @@ -564,6 +564,8 @@ def handle_encrypted_advertisement(
)
except ValueError:
logger.exception("Decryption failed")
finally:
return decoded_json # noqa: B012


def run(configuration: dict, config_path: Path) -> None:
Expand Down
16 changes: 7 additions & 9 deletions TheengsGateway/decryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,23 +136,21 @@ def replace_encrypted_data(
data_json["servicedata"] = bthome_service_data.hex()


_DECRYPTOR_MODELS = {
"LYWSD03MMC/MJWSD05MMC_PVVX_ENCR": LYWSD03MMC_PVVXDecryptor,
"SBBT_002C_ENCR": BTHomeV2Decryptor,
"SBDW_002C_ENCR": BTHomeV2Decryptor,
"SBMO_003Z_ENCR": BTHomeV2Decryptor,
_DECRYPTORS = {
1: LYWSD03MMC_PVVXDecryptor,
2: BTHomeV2Decryptor,
}


class UnsupportedEncryptionError(Exception):
"""Exception raised when trying to decrypt an unsupported device."""


def create_decryptor(model_id: str) -> AdvertisementDecryptor:
"""Return the decryptor class for the given model ID."""
if model_id not in _DECRYPTOR_MODELS:
def create_decryptor(mode: int) -> AdvertisementDecryptor:
"""Return the decryptor class for the given encryption mode."""
if mode not in _DECRYPTORS:
raise UnsupportedEncryptionError
return _DECRYPTOR_MODELS[model_id]() # type: ignore[abstract]
return _DECRYPTORS[mode]() # type: ignore[abstract]


def reverse_address(address: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"importlib-metadata",
"paho-mqtt>=1.6.1",
"pycryptodomex>=3.18.0",
"TheengsDecoder>=1.5.5",
"TheengsDecoder>=1.6.2",
],
use_scm_version={"version_scheme": "no-guess-dev"},
setup_requires=["setuptools_scm"],
Expand Down

0 comments on commit 48ebddc

Please sign in to comment.