From 7d9da99394d49aaccfb946030b231d03ff057bb6 Mon Sep 17 00:00:00 2001 From: DigiH Date: Sun, 8 Dec 2024 17:41:23 +0100 Subject: [PATCH] Device Tracker sync across gateways (#265) Device Tracker sync across gateways - OpenMQTTGateway & Theengs Gateway --- TheengsGateway/__init__.py | 7 +++ TheengsGateway/ble_gateway.py | 95 ++++++++++++++++++++++++++--------- TheengsGateway/config.py | 16 +++++- TheengsGateway/discovery.py | 15 +++++- 4 files changed, 106 insertions(+), 27 deletions(-) diff --git a/TheengsGateway/__init__.py b/TheengsGateway/__init__.py index ece2e39..2846a82 100644 --- a/TheengsGateway/__init__.py +++ b/TheengsGateway/__init__.py @@ -19,6 +19,7 @@ """ import sys +import uuid from pathlib import Path from .ble_gateway import run @@ -48,6 +49,12 @@ def main() -> None: if configuration["discovery_topic"].endswith("/sensor"): configuration["discovery_topic"] = configuration["discovery_topic"][:-7] + # Get the MAC address of the gateway. + mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:] + configuration["gateway_id"] = ":".join( + [mac_address[i : i + 2] for i in range(0, 12, 2)] + ).upper() + if not configuration["host"]: sys.exit("MQTT host is not specified") diff --git a/TheengsGateway/ble_gateway.py b/TheengsGateway/ble_gateway.py index f538a85..f0ce6e3 100644 --- a/TheengsGateway/ble_gateway.py +++ b/TheengsGateway/ble_gateway.py @@ -152,6 +152,8 @@ def on_connect( retain=True, ) self.subscribe(self.configuration["subscribe_topic"]) + if self.configuration["enable_multi_gtw_sync"]: + self.subscribe(self.configuration["trackersync_topic"]) else: logger.error( "Failed to connect to MQTT broker %s:%d reason code: %s", @@ -227,28 +229,52 @@ def subscribe(self, sub_topic: str) -> None: """Subscribe to MQTT topic .""" def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001 - logger.info( - "Received `%s` from `%s` topic", - msg.payload.decode(), - msg.topic, - ) - try: - msg_json = json.loads(msg.payload.decode()) - except (json.JSONDecodeError, UnicodeDecodeError) as exception: - logger.warning( - "Invalid JSON message %s: %s", msg.payload.decode(), exception - ) - return + # Evaluate trackersync messages + if ( + msg.topic == self.configuration["trackersync_topic"] + and self.configuration["enable_multi_gtw_sync"] + ): + msg_json = json.loads(msg.payload) + logger.debug("trackersync message: %s", msg_json) - try: - msg_json["id"] = self.rpa2id(msg_json["id"]) - except KeyError: - logger.warning( - "JSON message %s doesn't contain id", msg.payload.decode() + if ( + msg_json["gatewayid"] != self.configuration["gateway_id"] + and msg_json["trackerid"] in self.discovered_trackers + and self.discovered_trackers[msg_json["trackerid"]].time != 0 + ): + self.discovered_trackers[msg_json["trackerid"]].time = 0 + logger.debug( + "Tracker %s disassociated by gateway %s", + msg_json["trackerid"], + msg_json["gatewayid"], + ) + + logger.debug( + "[DIS] Discovered Trackers: %s", self.discovered_trackers + ) + else: + logger.info( + "Received `%s` from `%s` topic", + msg.payload.decode(), + msg.topic, ) - return + try: + msg_json = json.loads(msg.payload.decode()) + except (json.JSONDecodeError, UnicodeDecodeError) as exception: + logger.warning( + "Invalid JSON message %s: %s", msg.payload.decode(), exception + ) + return + + try: + msg_json["id"] = self.rpa2id(msg_json["id"]) + except KeyError: + logger.warning( + "JSON message %s doesn't contain id", msg.payload.decode() + ) + return - self.decode_advertisement(msg_json) + self.decode_advertisement(msg_json) self.client.subscribe(sub_topic) self.client.on_message = on_message @@ -370,14 +396,11 @@ def check_tracker_timeout(self) -> None: if ( round(time()) - time_model.time >= self.configuration["tracker_timeout"] and time_model.time != 0 - and ( - self.configuration["discovery"] - or self.configuration["general_presence"] - ) ): if ( time_model.model_id in ("APPLEWATCH", "APPLEDEVICE") and not self.configuration["discovery"] + and self.configuration["general_presence"] ): message = json.dumps( {"id": address, "presence": "absent", "unlocked": False} @@ -391,9 +414,12 @@ def check_tracker_timeout(self) -> None: + "/" + address.replace(":", ""), ) + time_model.time = 0 self.discovered_trackers[address] = time_model - logger.debug("Discovered Trackers: %s", self.discovered_trackers) + + logger.info("Tracker %s timed out", address) + logger.debug("[TO] Discovered Trackers: %s", self.discovered_trackers) async def ble_scan_loop(self) -> None: """Scan for BLE devices.""" @@ -440,6 +466,10 @@ async def ble_scan_loop(self) -> None: "Sent %s messages to MQTT", self.published_messages, ) + + # Check tracker timeouts + self.check_tracker_timeout() + await asyncio.sleep( self.configuration["ble_time_between_scans"], ) @@ -611,11 +641,26 @@ def publish_json( + "/" + get_address(data_json).replace(":", ""), ) + + # Update tracker last received time self.discovered_trackers[str(data_json["id"])] = TnM( round(time()), str(data_json["model_id"]), ) - logger.debug("Discovered Trackers: %s", self.discovered_trackers) + # Publish trackersync message + if self.configuration["enable_multi_gtw_sync"]: + message = json.dumps( + { + "gatewayid": self.configuration["gateway_id"], + "trackerid": data_json["id"], + } + ) + self.publish( + message, + self.configuration["trackersync_topic"], + ) + + logger.debug("[GP] Discovered Trackers: %s", self.discovered_trackers) # Remove "track" if PUBLISH_ADVDATA is 0 if not self.configuration["publish_advdata"] and "track" in data_json: diff --git a/TheengsGateway/config.py b/TheengsGateway/config.py index 33d8db9..5c62d38 100644 --- a/TheengsGateway/config.py +++ b/TheengsGateway/config.py @@ -20,7 +20,7 @@ "port": 1883, "user": "", "pass": "", - "ble_scan_time": 5, + "ble_scan_time": 7, "ble_time_between_scans": 5, "publish_topic": "home/TheengsGateway/BTtoMQTT", "lwt_topic": "home/TheengsGateway/LWT", @@ -51,6 +51,8 @@ "ble": 1, "whitelist": [], "blacklist": [], + "enable_multi_gtw_sync": 1, + "trackersync_topic": "home/internal/trackersync", } @@ -268,6 +270,18 @@ def parse_args() -> argparse.Namespace: type=int, help="Enable (1) or disable (0) WebSocket (default: 0)", ) + parser.add_argument( + "-gs", + "--enable_multi_gtw_sync", + type=int, + help="Disable (0) or enable (1) to use tracker and closest control devices sync across Theengs Gateway gateways and OpenMQTTGateway (default: 1)", # noqa: E501 + ) + parser.add_argument( + "-tt", + "--trackersync_topic", + type=str, + help="Internal trackersync publish topic", + ) return parser.parse_args() diff --git a/TheengsGateway/discovery.py b/TheengsGateway/discovery.py index ab6c014..437f747 100644 --- a/TheengsGateway/discovery.py +++ b/TheengsGateway/discovery.py @@ -273,8 +273,21 @@ def copy_pub_device(self, device: dict) -> dict: self.discovered_trackers[device["id"]] = TnM( round(time()), device["model_id"] ) - logger.debug("Discovered Trackers: %s", self.discovered_trackers) + # Publish trackersync message + if self.configuration["enable_multi_gtw_sync"]: + message = json.dumps( + { + "gatewayid": self.configuration["gateway_id"], + "trackerid": device["id"], + } + ) + self.publish( + message, + self.configuration["trackersync_topic"], + ) + + logger.debug(" Discovered Trackers: %s", self.discovered_trackers) pub_device_copy = device.copy() # Remove "track" if PUBLISH_ADVDATA is 0 if not self.configuration["publish_advdata"] and "track" in pub_device_copy: