Skip to content

Commit

Permalink
Resolve private addresses for known IRKs (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
koenvervloesem authored Nov 2, 2023
1 parent 8109783 commit 717e7d6
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 9 deletions.
19 changes: 16 additions & 3 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

from .decryption import UnsupportedEncryptionError, create_decryptor
from .diagnose import diagnostics
from .privacy import resolve_private_address

if platform.system() == "Linux":
from bleak.assigned_numbers import AdvertisementDataType
Expand Down Expand Up @@ -396,8 +397,20 @@ def detection_callback(
"""Detect device in received advertisement data."""
logger.debug("%s:%s", device.address, advertisement_data)

# Try to resolve private addresses with known IRKs
address = device.address
for identity, irk in self.configuration["identities"].items():
if resolve_private_address(address, irk):
address = identity
logger.debug(
"Using identity address %s instead of random private address %s",
address,
device.address,
)
break

# Try to add the device to dictionary of clocks to synchronize time.
self.add_clock(device.address)
self.add_clock(address)

data_json: DataJSONType = {}
company_id = None
Expand All @@ -418,7 +431,7 @@ def detection_callback(
data_json["name"] = advertisement_data.local_name

if advertisement_data.service_data:
data_json["id"] = device.address
data_json["id"] = address
data_json["rssi"] = advertisement_data.rssi
# Try to decode advertisement with service data for each UUID separately.
for uuid, data in advertisement_data.service_data.items():
Expand All @@ -428,7 +441,7 @@ def detection_callback(
data_json_copy["servicedata"] = data.hex()
self.decode_advertisement(data_json_copy, company_id)
elif data_json:
data_json["id"] = device.address
data_json["id"] = address
data_json["rssi"] = advertisement_data.rssi
self.decode_advertisement(data_json, company_id)

Expand Down
10 changes: 9 additions & 1 deletion TheengsGateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"bindkeys": {},
"enable_tls": 0,
"enable_websocket": 0,
"identities": {},
}


Expand Down Expand Up @@ -216,6 +217,13 @@ def parse_args() -> argparse.Namespace:
type=int,
help="Enable (1) or disable (0) WebSocket (default: 0)",
)
parser.add_argument(
"-id",
"--identities",
nargs="+",
metavar=("ADDRESS", "IRK"),
help="Identity addresses and their IRKs: ADDR1 IRK1 ADDR2 IRK2",
)
return parser.parse_args()


Expand Down Expand Up @@ -257,7 +265,7 @@ def merge_args_with_config(config: dict, args: argparse.Namespace) -> None:
for key, value in args.__dict__.items():
if value is not None:
if isinstance(value, list):
if key == "bindkeys":
if key in {"bindkeys", "identities"}:
config[key].update(
dict(zip(value[::2], value[1::2])),
)
Expand Down
9 changes: 5 additions & 4 deletions TheengsGateway/diagnose.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def _anonymize_addresses(addresses: list[str]) -> list[str]:
return [_anonymize_address(address) for address in addresses]


def _anonymize_bindkeys(bindkeys: dict[str, str]) -> dict[str, str]:
"""Anonymize the addresses and bindkeys in a dictionary."""
return {_anonymize_address(address): "***" for address in bindkeys}
def _anonymize_addr_keys(addr_keys: dict[str, str]) -> dict[str, str]:
"""Anonymize the addresses and corresponding keys in a dictionary."""
return {_anonymize_address(address): "***" for address in addr_keys}


# This function is taken from Textual
Expand Down Expand Up @@ -123,7 +123,8 @@ def _config(config_path: Path) -> None:
config = json.load(config_file)
_anonymize_strings(["user", "pass"], config)
config["time_sync"] = _anonymize_addresses(config["time_sync"])
config["bindkeys"] = _anonymize_bindkeys(config["bindkeys"])
config["bindkeys"] = _anonymize_addr_keys(config["bindkeys"])
config["identities"] = _anonymize_addr_keys(config["identities"])
print("```")
print(json.dumps(config, sort_keys=True, indent=4))
print("```")
Expand Down
42 changes: 42 additions & 0 deletions TheengsGateway/privacy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Privacy module for Theengs Gateway.
This module is used to resolve resolvable private addresses (RPAs) if you know their
corresponding identity resolving keys (IRKs).
"""
from __future__ import annotations

from base64 import b64decode

from Cryptodome.Cipher import AES

_MSB_MASK = 0b11000000
_MSB_RPA = 0b01000000


def resolve_private_address(address: str, irk: str) -> bool:
"""Return `True` if the address can be resolved with the IRK."""
rpa = bytes.fromhex(address.replace(":", ""))
# Format of a resolvable private address:
# MSB LSB
# 01 Random part of prand Hash value
# <- prand (24 bits) -><- hash (24 bits) ->
if rpa[0] & _MSB_MASK != _MSB_RPA:
# Address is not an RPA
return False

prand = rpa[:3]
hash_value = rpa[3:]
try:
# Suppose the key is in hex format
key = bytes.fromhex(irk)[::-1]
except ValueError:
# If that doesn't work, try to decode it as Base64 format
key = b64decode(irk)[::-1]
cipher = AES.new(key, AES.MODE_ECB)
localhash = cipher.encrypt(b"\x00" * 13 + prand)

if localhash[13:] != hash_value:
# 24 bits of local hash don't match the RPA's hash
return False

return True
15 changes: 14 additions & 1 deletion docs/use/use.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ usage: [-h] [-H HOST] [-P PORT] [-u USER] [-p PWD] [-pt PUB_TOPIC] [-Lt LWT_T
[-a ADAPTER] [-s {active,passive}] [-ts TIME_SYNC [TIME_SYNC ...]]
[-tf TIME_FORMAT] [-padv PUBLISH_ADVDATA]
[-bk ADDRESS [BINDKEY ...]] [-tls ENABLE_TLS]
[-ws ENABLE_WEBSOCKET]
[-ws ENABLE_WEBSOCKET] [-id ADDRESS [IRK ...]]

optional arguments:
-h, --help show this help message and exit
Expand Down Expand Up @@ -108,6 +108,8 @@ optional arguments:
Enable (1) or disable (0) TLS (default: 0)
-ws ENABLE_WEBSOCKET, --enable_websocket ENABLE_WEBSOCKET
Enable (1) or disable (0) WebSocket (default: 0)
-id ADDRESS [IRK ...], --identities ADDRESS [IRK ...]
Identity addresses and their IRKs: ADDR1 IRK1 ADDR2 IRK2
```
### For a Docker container
Expand Down Expand Up @@ -236,3 +238,14 @@ TheengsGateway --bindkeys 00:11:22:33:44:55:66 0dc540f3025b474b9ef1085e051b1add
```
Theengs Gateway will then use the bindkey 0dc540f3025b474b9ef1085e051b1add to decrypt all advertisements from device 00:11:22:33:44:55:66 and bindkey 6385424e1b0341109942ad2a6bb42e58 for all advertisements from device AA:BB:CC:DD:EE:FF.
## Resolving random private addresses
If you want to resolve random private addresses into a device's identity address, you need to add an identity resolving key (IRK) for each identity address with the `--identities` argument. For example:
```
TheengsGateway --identities 00:11:22:33:44:55:66 0dc540f3025b474b9ef1085e051b1add AA:BB:CC:DD:EE:FF 6385424e1b0341109942ad2a6bb42e58
```
Theengs Gateway will then use the identity resolving key 0dc540f3025b474b9ef1085e051b1add to resolve random private addresses from device 00:11:22:33:44:55:66 and identity resolving key 6385424e1b0341109942ad2a6bb42e58 to resolve random private addresses from device AA:BB:CC:DD:EE:FF.
The identity resolving key can also be specified as a Base64 encoded string, such as `"MGRjNTQwZjMwMjViNDc0YjllZjEwODVlMDUxYjFhZGQ="`.

0 comments on commit 717e7d6

Please sign in to comment.