Skip to content

Commit

Permalink
Device Tracker sync across gateways (#265)
Browse files Browse the repository at this point in the history
Device Tracker sync across gateways - OpenMQTTGateway & Theengs Gateway
  • Loading branch information
DigiH authored Dec 8, 2024
1 parent 9188b12 commit 7d9da99
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 27 deletions.
7 changes: 7 additions & 0 deletions TheengsGateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import sys
import uuid
from pathlib import Path

from .ble_gateway import run
Expand Down Expand Up @@ -48,6 +49,12 @@ def main() -> None:
if configuration["discovery_topic"].endswith("/sensor"):
configuration["discovery_topic"] = configuration["discovery_topic"][:-7]

# Get the MAC address of the gateway.
mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:]
configuration["gateway_id"] = ":".join(
[mac_address[i : i + 2] for i in range(0, 12, 2)]
).upper()

if not configuration["host"]:
sys.exit("MQTT host is not specified")

Expand Down
95 changes: 70 additions & 25 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def on_connect(
retain=True,
)
self.subscribe(self.configuration["subscribe_topic"])
if self.configuration["enable_multi_gtw_sync"]:
self.subscribe(self.configuration["trackersync_topic"])
else:
logger.error(
"Failed to connect to MQTT broker %s:%d reason code: %s",
Expand Down Expand Up @@ -227,28 +229,52 @@ def subscribe(self, sub_topic: str) -> None:
"""Subscribe to MQTT topic <sub_topic>."""

def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return
# Evaluate trackersync messages
if (
msg.topic == self.configuration["trackersync_topic"]
and self.configuration["enable_multi_gtw_sync"]
):
msg_json = json.loads(msg.payload)
logger.debug("trackersync message: %s", msg_json)

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
if (
msg_json["gatewayid"] != self.configuration["gateway_id"]
and msg_json["trackerid"] in self.discovered_trackers
and self.discovered_trackers[msg_json["trackerid"]].time != 0
):
self.discovered_trackers[msg_json["trackerid"]].time = 0
logger.debug(
"Tracker %s disassociated by gateway %s",
msg_json["trackerid"],
msg_json["gatewayid"],
)

logger.debug(
"[DIS] Discovered Trackers: %s", self.discovered_trackers
)
else:
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
return
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
)
return

self.decode_advertisement(msg_json)
self.decode_advertisement(msg_json)

self.client.subscribe(sub_topic)
self.client.on_message = on_message
Expand Down Expand Up @@ -370,14 +396,11 @@ def check_tracker_timeout(self) -> None:
if (
round(time()) - time_model.time >= self.configuration["tracker_timeout"]
and time_model.time != 0
and (
self.configuration["discovery"]
or self.configuration["general_presence"]
)
):
if (
time_model.model_id in ("APPLEWATCH", "APPLEDEVICE")
and not self.configuration["discovery"]
and self.configuration["general_presence"]
):
message = json.dumps(
{"id": address, "presence": "absent", "unlocked": False}
Expand All @@ -391,9 +414,12 @@ def check_tracker_timeout(self) -> None:
+ "/"
+ address.replace(":", ""),
)

time_model.time = 0
self.discovered_trackers[address] = time_model
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

logger.info("Tracker %s timed out", address)
logger.debug("[TO] Discovered Trackers: %s", self.discovered_trackers)

async def ble_scan_loop(self) -> None:
"""Scan for BLE devices."""
Expand Down Expand Up @@ -440,6 +466,10 @@ async def ble_scan_loop(self) -> None:
"Sent %s messages to MQTT",
self.published_messages,
)

# Check tracker timeouts
self.check_tracker_timeout()

await asyncio.sleep(
self.configuration["ble_time_between_scans"],
)
Expand Down Expand Up @@ -611,11 +641,26 @@ def publish_json(
+ "/"
+ get_address(data_json).replace(":", ""),
)

# Update tracker last received time
self.discovered_trackers[str(data_json["id"])] = TnM(
round(time()),
str(data_json["model_id"]),
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)
# Publish trackersync message
if self.configuration["enable_multi_gtw_sync"]:
message = json.dumps(
{
"gatewayid": self.configuration["gateway_id"],
"trackerid": data_json["id"],
}
)
self.publish(
message,
self.configuration["trackersync_topic"],
)

logger.debug("[GP] Discovered Trackers: %s", self.discovered_trackers)

# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in data_json:
Expand Down
16 changes: 15 additions & 1 deletion TheengsGateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"port": 1883,
"user": "",
"pass": "",
"ble_scan_time": 5,
"ble_scan_time": 7,
"ble_time_between_scans": 5,
"publish_topic": "home/TheengsGateway/BTtoMQTT",
"lwt_topic": "home/TheengsGateway/LWT",
Expand Down Expand Up @@ -51,6 +51,8 @@
"ble": 1,
"whitelist": [],
"blacklist": [],
"enable_multi_gtw_sync": 1,
"trackersync_topic": "home/internal/trackersync",
}


Expand Down Expand Up @@ -268,6 +270,18 @@ def parse_args() -> argparse.Namespace:
type=int,
help="Enable (1) or disable (0) WebSocket (default: 0)",
)
parser.add_argument(
"-gs",
"--enable_multi_gtw_sync",
type=int,
help="Disable (0) or enable (1) to use tracker and closest control devices sync across Theengs Gateway gateways and OpenMQTTGateway (default: 1)", # noqa: E501
)
parser.add_argument(
"-tt",
"--trackersync_topic",
type=str,
help="Internal trackersync publish topic",
)
return parser.parse_args()


Expand Down
15 changes: 14 additions & 1 deletion TheengsGateway/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,21 @@ def copy_pub_device(self, device: dict) -> dict:
self.discovered_trackers[device["id"]] = TnM(
round(time()), device["model_id"]
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

# Publish trackersync message
if self.configuration["enable_multi_gtw_sync"]:
message = json.dumps(
{
"gatewayid": self.configuration["gateway_id"],
"trackerid": device["id"],
}
)
self.publish(
message,
self.configuration["trackersync_topic"],
)

logger.debug(" Discovered Trackers: %s", self.discovered_trackers)
pub_device_copy = device.copy()
# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in pub_device_copy:
Expand Down

0 comments on commit 7d9da99

Please sign in to comment.