diff --git a/Makefile b/Makefile index 7aebf0f..0a51e51 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ lint: ## Lint and static-check - black -l 79 --check airthings_ble + black -l 88 --check airthings_ble pylint airthings_ble mypy airthings_ble diff --git a/airthings_ble/const.py b/airthings_ble/const.py index c7c4196..e3ca581 100644 --- a/airthings_ble/const.py +++ b/airthings_ble/const.py @@ -18,9 +18,7 @@ CHAR_UUID_HUMIDITY = UUID("00002a6f-0000-1000-8000-00805f9b34fb") CHAR_UUID_RADON_1DAYAVG = UUID("b42e01aa-ade7-11e4-89d3-123b93f75cba") CHAR_UUID_RADON_LONG_TERM_AVG = UUID("b42e0a4c-ade7-11e4-89d3-123b93f75cba") -CHAR_UUID_ILLUMINANCE_ACCELEROMETER = UUID( - "b42e1348-ade7-11e4-89d3-123b93f75cba" -) +CHAR_UUID_ILLUMINANCE_ACCELEROMETER = UUID("b42e1348-ade7-11e4-89d3-123b93f75cba") CHAR_UUID_WAVE_PLUS_DATA = UUID("b42e2a68-ade7-11e4-89d3-123b93f75cba") CHAR_UUID_WAVE_2_DATA = UUID("b42e4dcc-ade7-11e4-89d3-123b93f75cba") CHAR_UUID_WAVEMINI_DATA = UUID("b42e3b98-ade7-11e4-89d3-123b93f75cba") diff --git a/airthings_ble/parser.py b/airthings_ble/parser.py index 8b287e3..ab296d3 100644 --- a/airthings_ble/parser.py +++ b/airthings_ble/parser.py @@ -65,9 +65,7 @@ CHAR_UUID_WAVEMINI_DATA, COMMAND_UUID, ] -sensors_characteristics_uuid_str = [ - str(x) for x in sensors_characteristics_uuid -] +sensors_characteristics_uuid_str = [str(x) for x in sensors_characteristics_uuid] def _decode_base( @@ -202,9 +200,7 @@ def __init__(self, name: str, format_type: str, cmd: bytes): self.format_type = format_type self.cmd = cmd - def decode_data( - self, raw_data: bytearray | None - ) -> dict[str, float | str | None]: + def decode_data(self, raw_data: bytearray | None) -> dict[str, float | str | None]: """Decoder returns dict with illuminance and battery""" if raw_data is None: return {} @@ -255,10 +251,7 @@ def get_absolute_pressure(elevation: int, data: float) -> float: return data + round(offset, 2) -sensor_decoders: dict[ - str, - Callable[[bytearray], dict[str, float | None | str]], -] = { +sensor_decoders: dict[str, Callable[[bytearray], dict[str, float | None | str]],] = { str(CHAR_UUID_WAVE_PLUS_DATA): __decode_wave_plus( name="Pluss", format_type="BBBBHHHHHHHH", scale=0 ), @@ -347,16 +340,13 @@ def notification_handler(self, _: Any, data: bytearray) -> None: async def _get_device_characteristics( self, client: BleakClient, device: AirthingsDevice ) -> AirthingsDevice: - device.identifier = short_address(client.address) + # device.identifier = short_address(client.address) device.address = client.address - for characteristic in device_info_characteristics: try: data = await client.read_gatt_char(characteristic.uuid) except BleakError as err: - self.logger.debug( - "Get device characteristics exception: %s", err - ) + self.logger.debug("Get device characteristics exception: %s", err) continue if characteristic.name == "hardware_rev": device.hw_version = data.decode(characteristic.format) @@ -366,6 +356,8 @@ async def _get_device_characteristics( continue if characteristic.name == "device_name": device.name = data.decode(characteristic.format) + if characteristic.name == "serial_nr": + device.identifier = data.decode(characteristic.format) return device async def _get_service_characteristics( @@ -387,9 +379,7 @@ async def _get_service_characteristics( continue data = await client.read_gatt_char(characteristic.uuid) - sensor_data = sensor_decoders[str(characteristic.uuid)]( - data - ) + sensor_data = sensor_decoders[str(characteristic.uuid)](data) # skip for now! sensor_data.pop("date_time") @@ -398,17 +388,15 @@ async def _get_service_characteristics( # manage radon values if d := sensor_data.get("radon_1day_avg"): - device.sensors["radon_1day_level"] = get_radon_level( - float(d) - ) + device.sensors["radon_1day_level"] = get_radon_level(float(d)) if not self.is_metric: device.sensors["radon_1day_avg"] = ( float(d) * BQ_TO_PCI_MULTIPLIER ) if d := sensor_data.get("radon_longterm_avg"): - device.sensors[ - "radon_longterm_level" - ] = get_radon_level(float(d)) + device.sensors["radon_longterm_level"] = get_radon_level( + float(d) + ) if not self.is_metric: device.sensors["radon_longterm_avg"] = ( float(d) * BQ_TO_PCI_MULTIPLIER @@ -417,9 +405,7 @@ async def _get_service_characteristics( # rel to abs pressure if pressure := sensor_data.get("rel_atm_pressure"): device.sensors["pressure"] = ( - get_absolute_pressure( - self.elevation, float(pressure) - ) + get_absolute_pressure(self.elevation, float(pressure)) if self.elevation is not None else pressure ) @@ -436,9 +422,7 @@ async def _get_service_characteristics( # send command to this 'indicate' characteristic await client.write_gatt_char( characteristic.uuid, - bytearray( - command_decoders[str(characteristic.uuid)].cmd - ), + bytearray(command_decoders[str(characteristic.uuid)].cmd), ) # Wait for up to one second to see if a # callback comes in. @@ -464,20 +448,14 @@ async def _get_service_characteristics( 0, min( 100, - round( - (bat - v_min) - / (v_max - v_min) - * 100 - ), + round((bat - v_min) / (v_max - v_min) * 100), ), ), )[0] device.sensors.update( { - "illuminance": command_sensor_data[ - "illuminance" - ], + "illuminance": command_sensor_data["illuminance"], "battery": bat_pct, } ) @@ -489,9 +467,7 @@ async def _get_service_characteristics( async def update_device(self, ble_device: BLEDevice) -> AirthingsDevice: """Connects to the device through BLE and retrieves relevant data""" - client = await establish_connection( - BleakClient, ble_device, ble_device.address - ) + client = await establish_connection(BleakClient, ble_device, ble_device.address) device = AirthingsDevice()