Skip to content

Commit

Permalink
Device Tracker sync across gateways
Browse files Browse the repository at this point in the history
Device Tracker sync across gateways - OpenMQTTGateway & Theengs Gateway
  • Loading branch information
DigiH committed Oct 29, 2024
1 parent 5756332 commit 656d53c
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 33 deletions.
7 changes: 7 additions & 0 deletions TheengsGateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import sys
import uuid
from pathlib import Path

from .ble_gateway import run
Expand Down Expand Up @@ -48,6 +49,12 @@ def main() -> None:
if configuration["discovery_topic"].endswith("/sensor"):
configuration["discovery_topic"] = configuration["discovery_topic"][:-7]

# Get the MAC address of the gateway.
mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:]
configuration["gateway_id"] = ":".join(
[mac_address[i : i + 2] for i in range(0, 12, 2)]
).upper()

if not configuration["host"]:
sys.exit("MQTT host is not specified")

Expand Down
96 changes: 71 additions & 25 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"cipher",
"cont",
"ctr",
"ctrl",
"encr",
"manufacturerdata",
"mic",
Expand Down Expand Up @@ -151,6 +152,8 @@ def on_connect(
retain=True,
)
self.subscribe(self.configuration["subscribe_topic"])
if self.configuration["enable_multi_gtw_sync"]:
self.subscribe("home/internal/trackersync")
else:
logger.error(
"Failed to connect to MQTT broker %s:%d reason code: %s",
Expand Down Expand Up @@ -226,28 +229,52 @@ def subscribe(self, sub_topic: str) -> None:
"""Subscribe to MQTT topic <sub_topic>."""

def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return
# Evaluate trackersync messages
if (
msg.topic == "home/internal/trackersync"
and self.configuration["enable_multi_gtw_sync"]
):
msg_json = json.loads(msg.payload)
logger.info("trackersync message: %s", msg_json)

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
if (
msg_json["gatewayid"] != self.configuration["gateway_id"]
and msg_json["trackerid"] in self.discovered_trackers
and self.discovered_trackers[msg_json["trackerid"]].time != 0
):
self.discovered_trackers[msg_json["trackerid"]].time = 0
logger.debug(
"Tracker %s disassociated by gateway %s",
msg_json["trackerid"],
msg_json["gatewayid"],
)

logger.info(
"[DIS] Discovered Trackers: %s", self.discovered_trackers
)
else:
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
return
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
)
return

self.decode_advertisement(msg_json)
self.decode_advertisement(msg_json)

self.client.subscribe(sub_topic)
self.client.on_message = on_message
Expand Down Expand Up @@ -369,14 +396,11 @@ def check_tracker_timeout(self) -> None:
if (
round(time()) - time_model.time >= self.configuration["tracker_timeout"]
and time_model.time != 0
and (
self.configuration["discovery"]
or self.configuration["general_presence"]
)
):
if (
time_model.model_id in ("APPLEWATCH", "APPLEDEVICE")
and not self.configuration["discovery"]
and self.configuration["general_presence"]
):
message = json.dumps(
{"id": address, "presence": "absent", "unlocked": False}
Expand All @@ -390,9 +414,12 @@ def check_tracker_timeout(self) -> None:
+ "/"
+ address.replace(":", ""),
)

time_model.time = 0
self.discovered_trackers[address] = time_model
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

logger.info("Tracker %s timed out", address)
logger.debug("[TO] Discovered Trackers: %s", self.discovered_trackers)

async def ble_scan_loop(self) -> None:
"""Scan for BLE devices."""
Expand Down Expand Up @@ -439,6 +466,10 @@ async def ble_scan_loop(self) -> None:
"Sent %s messages to MQTT",
self.published_messages,
)

# Check tracker timeouts
self.check_tracker_timeout()

await asyncio.sleep(
self.configuration["ble_time_between_scans"],
)
Expand Down Expand Up @@ -610,11 +641,26 @@ def publish_json(
+ "/"
+ get_address(data_json).replace(":", ""),
)

# Update tracker last received time
self.discovered_trackers[str(data_json["id"])] = TnM(
round(time()),
str(data_json["model_id"]),
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)
# Publish trackersync message
if self.configuration["enable_multi_gtw_sync"]:
message = json.dumps(
{
"gatewayid": self.configuration["gateway_id"],
"trackerid": data_json["id"],
}
)
self.publish(
message,
"home/internal/trackersync",
)

logger.debug("[GP] Discovered Trackers: %s", self.discovered_trackers)

# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in data_json:
Expand Down
9 changes: 8 additions & 1 deletion TheengsGateway/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"port": 1883,
"user": "",
"pass": "",
"ble_scan_time": 5,
"ble_scan_time": 7,
"ble_time_between_scans": 5,
"publish_topic": "home/TheengsGateway/BTtoMQTT",
"lwt_topic": "home/TheengsGateway/LWT",
Expand Down Expand Up @@ -51,6 +51,7 @@
"ble": 1,
"whitelist": [],
"blacklist": [],
"enable_multi_gtw_sync": 1,
}


Expand Down Expand Up @@ -268,6 +269,12 @@ def parse_args() -> argparse.Namespace:
type=int,
help="Enable (1) or disable (0) WebSocket (default: 0)",
)
parser.add_argument(
"-gs",
"--enable_multi_gtw_sync",
type=int,
help="Disable (0) or enable (1) to use tracker and closest control devices sync across Theengs Gateway gateways and OpenMQTTGateway (default: 1)", # noqa: E501
)
return parser.parse_args()


Expand Down
27 changes: 20 additions & 7 deletions TheengsGateway/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ def publish_device_info(self, pub_device) -> None: # noqa: ANN001
device["name"] = pub_device["model_id"] + "-" + k
device["uniq_id"] = pub_device_uuid + "-" + k
if k == "unlocked":
device[
"val_tpl"
] = "{% if value_json.get('unlocked') is true -%}True{%- else -%}False{%- endif %}" # noqa: E501
device["val_tpl"] = (
"{% if value_json.get('unlocked') is true -%}True{%- else -%}False{%- endif %}" # noqa: E501
)
elif self.configuration["hass_discovery"]:
device["val_tpl"] = "{{ value_json." + k + " | is_defined }}"
else:
Expand Down Expand Up @@ -257,9 +257,9 @@ def publish_device_tracker(
tracker["stat_t"] = state_topic
tracker["name"] = pub_device["model_id"] + "-tracker" # type: ignore[assignment,operator]
tracker["uniq_id"] = pub_device_uuid + "-tracker"
tracker[
"val_tpl"
] = "{% if value_json.get('rssi') -%}home{%- else -%}not_home{%- endif %}"
tracker["val_tpl"] = (
"{% if value_json.get('rssi') -%}home{%- else -%}not_home{%- endif %}"
)
tracker["source_type"] = "bluetooth_le"
tracker["device"] = hadevice # type: ignore[assignment]

Expand All @@ -272,8 +272,21 @@ def copy_pub_device(self, device: dict) -> dict:
self.discovered_trackers[device["id"]] = TnM(
round(time()), device["model_id"]
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

# Publish trackersync message
if self.configuration["enable_multi_gtw_sync"]:
message = json.dumps(
{
"gatewayid": self.configuration["gateway_id"],
"trackerid": device["id"],
}
)
self.publish(
message,
"home/internal/trackersync",
)

logger.debug(" Discovered Trackers: %s", self.discovered_trackers)
pub_device_copy = device.copy()
# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in pub_device_copy:
Expand Down

0 comments on commit 656d53c

Please sign in to comment.