Skip to content

Commit

Permalink
feat: Per-device calibration (#359)
Browse files Browse the repository at this point in the history
* feat: Per-device calibration

* Per-device calibration

- Bring up to current codebase...
- Remove button entity testing
- Switch number entity to use runtime_data
- move to proper NumberMode class
- remove call to async_config_entry_first_refresh() in number.py
- fix unit_of_measurement attribute (use native, not processed)
- Fix restoration of value if None

* Linting, typing and comments

* remove attemped entity description in en.json
  • Loading branch information
agittins authored Nov 7, 2024
1 parent 6a84bce commit e18727a
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 87 deletions.
64 changes: 59 additions & 5 deletions custom_components/bermuda/bermuda_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, address, options) -> None:
self.local_name: str | None = None
self.prefname: str | None = None # "preferred" name - ideally local_name
self.address: str = address
self.ref_power: float = 0 # If non-zero, use in place of global ref_power.
self.options = options
self.unique_id: str | None = None # mac address formatted.
self.address_type = BDADDR_TYPE_UNKNOWN
Expand All @@ -78,6 +79,9 @@ def __init__(self, address, options) -> None:
self.create_sensor: bool = False # Create/update a sensor for this device
self.create_sensor_done: bool = False # Sensor should now exist
self.create_tracker_done: bool = False # device_tracker should now exist
self.create_number_done: bool = False
self.create_button_done: bool = False
self.create_all_done: bool = False # All platform entities are done and ready.
self.last_seen: float = 0 # stamp from most recent scanner spotting. MONOTONIC_TIME
self.scanners: dict[str, BermudaDeviceScanner] = {}

Expand Down Expand Up @@ -124,6 +128,54 @@ def __init__(self, address, options) -> None:
else:
self.address_type = BDADDR_TYPE_OTHER

def set_ref_power(self, value: float):
"""
Set a new reference power for this device and immediately apply
an interim distance calculation.
"""
self.ref_power = value
nearest_distance = 9999 # running tally to find closest scanner
nearest_scanner = None
for scanner in self.scanners.values():
rawdist = scanner.set_ref_power(value)
if rawdist < nearest_distance:
nearest_distance = rawdist
nearest_scanner = scanner
if nearest_scanner is not None:
self.apply_scanner_selection(nearest_scanner)

def apply_scanner_selection(self, closest_scanner: BermudaDeviceScanner | None):
"""
Given a DeviceScanner entry, apply the distance and area attributes
from it to this device.
Used to apply a "winning" scanner's data to the device for setting closest Area.
"""
if closest_scanner is not None:
# We found a winner
old_area = self.area_name
self.area_id = closest_scanner.area_id
self.area_name = closest_scanner.area_name
self.area_distance = closest_scanner.rssi_distance
self.area_rssi = closest_scanner.rssi
self.area_scanner = closest_scanner.name
if (old_area != self.area_name) and self.create_sensor:
# We check against area_name so we can know if the
# device's area changed names.
_LOGGER.debug(
"Device %s was in '%s', now in '%s'",
self.name,
old_area,
self.area_name,
)
else:
# Not close to any scanners!
self.area_id = None
self.area_name = None
self.area_distance = None
self.area_rssi = None
self.area_scanner = None

def calculate_data(self):
"""
Call after doing update_scanner() calls so that distances
Expand Down Expand Up @@ -166,19 +218,21 @@ def update_scanner(self, scanner_device: BermudaDevice, discoveryinfo: Bluetooth
if format_mac(scanner_device.address) in self.scanners:
# Device already exists, update it
self.scanners[format_mac(scanner_device.address)].update_advertisement(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id or "area_not_defined",
)
device_scanner = self.scanners[format_mac(scanner_device.address)]
else:
# Create it
self.scanners[format_mac(scanner_device.address)] = BermudaDeviceScanner(
self.address,
self,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id or "area_not_defined",
self.options,
scanner_device,
)
device_scanner = self.scanners[format_mac(scanner_device.address)]
device_scanner = self.scanners[format_mac(scanner_device.address)]
# On first creation, we also want to copy our ref_power to it (but not afterwards,
# since a metadevice might take over that role later)
device_scanner.ref_power = self.ref_power
# Let's see if we should update our last_seen based on this...
if device_scanner.stamp is not None and self.last_seen < device_scanner.stamp:
self.last_seen = device_scanner.stamp
Expand Down
163 changes: 120 additions & 43 deletions custom_components/bermuda/bermuda_device_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

from homeassistant.components.bluetooth import MONOTONIC_TIME, BluetoothScannerDevice

Expand Down Expand Up @@ -37,20 +37,25 @@ class BermudaDeviceScanner(dict):
"""
Represents details from a scanner relevant to a specific device.
A BermudaDevice will contain 0 or more of these depending on whether
it has been "seen" by that scanner.
Effectively a link between two BermudaDevices, being the tracked device
and the scanner device. So each transmitting device will have a collection
of these BermudaDeviceScanner entries, one for each scanner that has picked
up the advertisement.
This is created (and updated) by the receipt of an advertisement, which represents
a BermudaDevice hearing an advert from another BermudaDevice, if that makes sense!
A BermudaDevice's "scanners" property will contain one of these for each
scanner that has "seen" it.
Note that details on a scanner itself are BermudaDevice instances
in their own right.
"""

def __init__(
self,
device_address: str,
scandata: BluetoothScannerDevice,
area_id: str,
parent_device: BermudaDevice, # The device being tracked
scandata: BluetoothScannerDevice, # The advertisement info from the device, received by the scanner
options,
scanner_device: BermudaDevice,
scanner_device: BermudaDevice, # The scanner device that "saw" it.
) -> None:
# I am declaring these just to control their order in the dump,
# which is a bit silly, I suspect.
Expand All @@ -59,28 +64,38 @@ def __init__(
self.adapter: str = scandata.scanner.adapter
self.address = scanner_device.address
self.source: str = scandata.scanner.source
self.area_id: str = area_id
self.parent_device = device_address
self.area_id: str | None = scanner_device.area_id
self.area_name: str | None = scanner_device.area_name
self.parent_device = parent_device
self.parent_device_address = parent_device.address
self.scanner_device = scanner_device # links to the source device
self.options = options
self.stamp: float | None = 0
self.scanner_sends_stamps: bool = False
self.new_stamp: float | None = None # Set when a new advert is loaded from update
self.hist_stamp = []
self.rssi: float | None = None
self.tx_power: float | None = None
self.rssi_distance: float | None = None
self.rssi_distance_raw: float | None = None
self.ref_power: float = 0 # Override of global, set from parent device.
self.stale_update_count = 0 # How many times we did an update but no new stamps were found.
self.hist_stamp = []
self.hist_rssi = []
self.hist_distance = []
self.hist_distance_by_interval = [] # updated per-interval
self.hist_interval = [] # WARNING: This is actually "age of ad when we polled"
self.hist_velocity = [] # Effective velocity versus previous stamped reading
self.stale_update_count = 0 # How many times we did an update but no new stamps were found.
self.tx_power: float | None = None
self.rssi_distance: float | None = None
self.rssi_distance_raw: float | None = None
self.adverts: dict[str, bytes] = {}
self.adverts: dict[str, list] = {
"manufacturer_data": [],
"service_data": [],
"service_uuids": [],
"platform_data": [],
}

# Just pass the rest on to update...
self.update_advertisement(device_address, scandata, area_id)
self.update_advertisement(scandata)

def update_advertisement(self, device_address: str, scandata: BluetoothScannerDevice, area_id: str):
def update_advertisement(self, scandata: BluetoothScannerDevice):
"""
Update gets called every time we see a new packet or
every time we do a polled update.
Expand All @@ -90,9 +105,10 @@ def update_advertisement(self, device_address: str, scandata: BluetoothScannerDe
claims to have data.
"""
# In case the scanner has changed it's details since startup:
self.name: str = scandata.scanner.name
self.area_id: str = area_id
new_stamp: float | None = None
self.name = scandata.scanner.name
self.area_id = self.scanner_device.area_id
self.area_name = self.scanner_device.area_name
new_stamp = None

# Only remote scanners log timestamps here (local usb adaptors do not),
if hasattr(scandata.scanner, "_discovered_device_timestamps"):
Expand All @@ -104,7 +120,7 @@ def update_advertisement(self, device_address: str, scandata: BluetoothScannerDe
stamps = scandata.scanner._discovered_device_timestamps # type: ignore #noqa

# In this dict all MAC address keys are upper-cased
uppermac = device_address.upper()
uppermac = self.parent_device_address.upper()
if uppermac in stamps:
if self.stamp is None or (stamps[uppermac] is not None and stamps[uppermac] > self.stamp):
new_stamp = stamps[uppermac]
Expand All @@ -118,7 +134,7 @@ def update_advertisement(self, device_address: str, scandata: BluetoothScannerDe
_LOGGER.error(
"Scanner %s has no stamp for %s - very odd.",
scandata.scanner.source,
device_address,
self.parent_device_address,
)
new_stamp = None
else:
Expand All @@ -140,16 +156,13 @@ def update_advertisement(self, device_address: str, scandata: BluetoothScannerDe
new_stamp = None

if len(self.hist_stamp) == 0 or new_stamp is not None:
# this is the first entry or a new one...
# this is the first entry or a new one, bring in the new reading
# and calculate the distance.

self.rssi = scandata.advertisement.rssi
self.hist_rssi.insert(0, self.rssi)
self.rssi_distance_raw = rssi_to_metres(
self.rssi + self.options.get(CONF_RSSI_OFFSETS, {}).get(self.address, 0),
self.options.get(CONF_REF_POWER),
self.options.get(CONF_ATTENUATION),
)
self.hist_distance.insert(0, self.rssi_distance_raw)

self._update_raw_distance(reading_is_new=True)

# Note: this is not actually the interval between adverts,
# but rather a function of our UPDATE_INTERVAL plus the packet
Expand Down Expand Up @@ -180,15 +193,70 @@ def update_advertisement(self, device_address: str, scandata: BluetoothScannerDe
# Changing from warning to debug to quiet users' logs.
_LOGGER.debug(
"Device changed TX-POWER! That was unexpected: %s %sdB",
device_address,
self.parent_device_address,
scandata.advertisement.tx_power,
)
self.tx_power = scandata.advertisement.tx_power
for ad_str, ad_bytes in scandata.advertisement.service_data.items():
self.adverts[ad_str] = ad_bytes

# Track each advertisement element as or if they change.
for key, data in self.adverts.items():
new_data = getattr(scandata.advertisement, key, {})
if len(new_data) > 0:
if len(data) == 0 or data[0] != new_data:
data.insert(0, new_data)
# trim to keep size in check
del data[HIST_KEEP_COUNT:]

self.new_stamp = new_stamp

def _update_raw_distance(self, reading_is_new=True) -> float:
"""
Converts rssi to raw distance and updates history stack and
returns the new raw distance.
reading_is_new should only be called by the regular update
cycle, as it creates a new entry in the histories. Call with
false if you just need to set / override distance measurements
immediately, perhaps between cycles, in order to reflect a
setting change (such as altering a device's ref_power setting).
"""
# Check if we should use a device-based ref_power
if self.ref_power == 0:
ref_power = self.options.get(CONF_REF_POWER)
else:
ref_power = self.ref_power

distance = rssi_to_metres(
self.rssi + self.options.get(CONF_RSSI_OFFSETS, {}).get(self.address, 0),
ref_power,
self.options.get(CONF_ATTENUATION),
)
self.rssi_distance_raw = distance
if reading_is_new:
# Add a new historical reading
self.hist_distance.insert(0, distance)
# don't insert into hist_distance_by_interval, that's done by the caller.
else:
# We are over-riding readings between cycles. Force the
# new value in-place.
self.rssi_distance = distance
if len(self.hist_distance) > 0:
self.hist_distance[0] = distance
else:
self.hist_distance.append(distance)
if len(self.hist_distance_by_interval) > 0:
self.hist_distance_by_interval[0] = distance
# We don't else because we don't want to *add* a hist-by-interval reading, only
# modify in-place.
return distance

def set_ref_power(self, value: float):
"""Set a new reference power from the parent device and immediately update distance."""
# When the user updates the ref_power we want to reflect that change immediately,
# and not subject it to the normal smoothing algo.
self.ref_power = value
return self._update_raw_distance(False)

def calculate_data(self):
"""
Filter and update distance estimates.
Expand Down Expand Up @@ -293,9 +361,7 @@ def calculate_data(self):
# (not so for == 0 since it might still be an invalid retreat)
break

if velocity > peak_velocity:
# but on subsequent comparisons we only care if they're faster retreats
peak_velocity = velocity
peak_velocity = max(velocity, peak_velocity)
# we've been through the history and have peak velo retreat, or the most recent
# approach velo.
velocity = peak_velocity
Expand All @@ -306,10 +372,10 @@ def calculate_data(self):
self.hist_velocity.insert(0, velocity)

if velocity > self.options.get(CONF_MAX_VELOCITY):
if self.parent_device.upper() in self.options.get(CONF_DEVICES, []):
if self.parent_device_address.upper() in self.options.get(CONF_DEVICES, []):
_LOGGER.debug(
"This sparrow %s flies too fast (%2fm/s), ignoring",
self.parent_device,
self.parent_device_address,
velocity,
)
# Discard the bogus reading by duplicating the last.
Expand Down Expand Up @@ -366,10 +432,21 @@ def to_dict(self):
"""Convert class to serialisable dict for dump_devices."""
out = {}
for var, val in vars(self).items():
if var in ["options", "parent_device", "scanner_device"]:
# skip certain vars that we don't want in the dump output.
continue
if var == "adverts":
# FIXME: val is overwritten in loop
val = {} # noqa
for uuid, thebytes in self.adverts.items():
val[uuid] = thebytes.hex()
adout = {}
for adtype, adarray in val.items():
out_adarray = []
for ad_data in adarray:
if adtype in ["manufacturer_data", "service_data"]:
for ad_key, ad_value in ad_data.items():
out_adarray.append({ad_key: cast(bytes, ad_value).hex()})
else:
out_adarray.append(ad_data)
adout[adtype] = out_adarray
out[var] = adout
continue
out[var] = val
return out
4 changes: 3 additions & 1 deletion custom_components/bermuda/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@

# Platforms
BINARY_SENSOR = "binary_sensor"
BUTTON = "button"
SENSOR = "sensor"
SWITCH = "switch"
DEVICE_TRACKER = "device_tracker"
NUMBER = "number"
# PLATFORMS = [BINARY_SENSOR, SENSOR, SWITCH]
PLATFORMS = [SENSOR, DEVICE_TRACKER]
PLATFORMS = [SENSOR, DEVICE_TRACKER, NUMBER]

# Should probably retreive this from the component, but it's in "DOMAIN" *shrug*
DOMAIN_PRIVATE_BLE_DEVICE = "private_ble_device"
Expand Down
Loading

0 comments on commit e18727a

Please sign in to comment.