Skip to content

Commit

Permalink
Merge pull request #31 from MeshAddicts/protobuf
Browse files Browse the repository at this point in the history
MQTT Protobuf Support
  • Loading branch information
kevinelliott authored Jul 10, 2024
2 parents 2961b52 + 8e6f43c commit 6681539
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 48 deletions.
File renamed without changes.
11 changes: 9 additions & 2 deletions mosquitto/config/mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,16 @@ allow_anonymous true
#acl_file /mosquitto/config/mosquitto.acl

# Bridge to main MQTT broker
connection bridge-meshtastic-public
connection bridge-meshtastic-public-receive
address mqtt.meshtastic.org:1883
topic # both 0 msh/ msh/US/CA/sacvalley/
topic US/+/+/2/json/# in 0 msh/ msh/
remote_username meshdev
remote_password large4cats
try_private false

# connection bridge-meshtastic-public-send
# address mqtt.meshtastic.org:1883
# topic # out 0 msh/ msh/CA/US/sacvalley/
# remote_username meshdev
# remote_password large4cats
# try_private false
278 changes: 235 additions & 43 deletions mqtt.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
#!/usr/bin/env python3

import base64
import datetime
import json
import traceback
from zoneinfo import ZoneInfo
from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2
import paho.mqtt.client as mqtt_client
from google.protobuf.json_format import MessageToJson
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend

from encoders import _JSONDecoder
import _meshtastic
from models.node import Node
import utils

key = "AQ=="
key_hash = "1PG7OiApB1nwvP+rz05pAQ==" # AQ==
key_bytes = base64.b64decode(key_hash.encode('ascii'))

class MQTT:
def __init__(self, config, data):
Expand All @@ -32,31 +43,174 @@ def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
self.data.mqtt_connect_time = datetime.datetime.now(ZoneInfo(self.config['server']['timezone']))
print("Connected to MQTT broker at %s:%d (as client_id %s)" % (self.host, self.port, self.client_id))
self.subscribe(self.config['broker']['topic'])
if 'topic' in self.config['broker'] and self.config['broker']['topic'] is not None and isinstance(self.config['broker']['topic'], str):
self.subscribe(self.config['broker']['topic'])
if 'topics' in self.config['broker'] and self.config['broker']['topics'] is not None and isinstance(self.config['broker']['topics'], list):
for topic in self.config['broker']['topics']:
self.subscribe(topic)

else:
print("Failed to connect, error: %s\n" % rc)

def on_message(client, userdata, msg, properties=None):
try:
decoded = msg.payload.decode("utf-8")
j = json.loads(decoded, cls=_JSONDecoder)
self.handle_log(msg)
if j['type'] == "neighborinfo":
self.handle_neighborinfo(j)
if j['type'] == "nodeinfo":
self.handle_nodeinfo(j)
if j['type'] == "position":
self.handle_position(j)
if j['type'] == "telemetry":
self.handle_telemetry(j)
if j['type'] == "text":
self.handle_text(j)
if j['type'] == "traceroute":
self.handle_traceroute(j)
self.prune_expired_nodes()
except Exception as e:
print(e)
traceback.print_exc()
if '/2/e/' in msg.topic or '/2/map/' in msg.topic:
print(f"Received a protobuf message: {msg.topic} {msg.payload}")
is_encrypted = False
mp = mesh_pb2.MeshPacket()
outs = {}

try:
se = mqtt_pb2.ServiceEnvelope()
se.ParseFromString(msg.payload)
mp = se.packet
outs = json.loads(MessageToJson(mp, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True))
print(f"Decoded protobuf message: {outs}")
except Exception as e:
# print(f"*** ParseFromString: {str(e)}")
pass

if mp.HasField("encrypted") and not mp.HasField("decoded"):
try:
nonce_packet_id = getattr(mp, "id").to_bytes(8, "little")
nonce_from_node = getattr(mp, "from").to_bytes(8, "little")
nonce = nonce_packet_id + nonce_from_node
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_bytes = decryptor.update(getattr(mp, "encrypted")) + decryptor.finalize()
data = mesh_pb2.Data()
data.ParseFromString(decrypted_bytes)
mp.decoded.CopyFrom(data)
outs = json.loads(MessageToJson(mp, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True))
except Exception as e:
print(f"*** Decryption failed: {str(e)}")
return
is_encrypted = True

outs['rssi'] = mp.rx_rssi
outs['snr'] = mp.rx_snr
outs['timestamp'] = mp.rx_time

# TODO: Need to handle this
# self.handle_log(mp.decoded)

if mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP:
text = mp.decoded.payload.decode("utf-8")
payload = { "text": text }
outs["type"] = "text"
outs["payload"] = payload
print(f"Decoded protobuf message: text: {outs}")
self.handle_text(outs)

elif mp.decoded.portnum == portnums_pb2.MAP_REPORT_APP:
report = mesh_pb2.Position()
report.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(report, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True, always_print_fields_with_no_presence=True))
outs["type"] = "mapreport"
outs["payload"] = out
print(f"Decoded protobuf message: mapreport: {outs}")
# self.handle_mapreport(outs)

elif mp.decoded.portnum == portnums_pb2.NEIGHBORINFO_APP:
info = mesh_pb2.NeighborInfo()
info.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(info, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True, always_print_fields_with_no_presence=True))
outs["type"] = "neighborinfo"
outs["payload"] = out
print(f"Decoded protobuf message: neighborinfo: {outs}")
self.handle_neighborinfo(outs)

elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP:
info = mesh_pb2.User()
info.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(info, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True))
out["id"] = out['id'].replace('!', '')
outs["type"] = "nodeinfo"
outs["payload"] = out
print(f"Decoded protobuf message: nodeinfo: {outs}")
self.handle_nodeinfo(outs)

elif mp.decoded.portnum == portnums_pb2.ROUTING_APP:
data = mesh_pb2.Routing()
data.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(data, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True))
outs["type"] = "routing"
outs["payload"] = out
print(f"Decoded protobuf message: routing: {outs}")
# self.handle_routing(outs)

elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP:
route = mesh_pb2.RouteDiscovery()
route.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(route, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True, always_print_fields_with_no_presence=True))
if 'route' in out:
route = []
for r in out['route']:
id = utils.convert_node_id_from_int_to_hex(r)
route.append(id)
outs["route"] = route
outs["type"] = "traceroute"
outs["payload"] = out
print(f"Decoded protobuf message: traceroute: {outs}")
self.handle_traceroute(outs)

elif mp.decoded.portnum == portnums_pb2.POSITION_APP:
pos = mesh_pb2.Position()
pos.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(pos, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True))
outs["type"] = "position"
outs["payload"] = out
print(f"Decoded protobuf message: position: {outs}")
self.handle_position(outs)

elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP:
env = telemetry_pb2.Telemetry()
env.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(env, preserving_proto_field_name=True, ensure_ascii=False, indent=2, sort_keys=True, use_integers_for_enums=True))
if 'rx_time' in outs:
out['timestamp'] = datetime.datetime.fromtimestamp(outs['rx_time'] / 1000).astimezone(ZoneInfo(self.config['server']['timezone']))
outs["type"] = "telemetry"
if 'device_metrics' in out:
outs["payload"] = out['device_metrics']
if 'environment_metrics' in out:
outs["payload"] = out['environment_metrics']
print(f"Decoded protobuf message: telemetry: {outs}")
self.handle_telemetry(outs)

else:
print(f"Received an unknown protobuf message: {mp}")

elif '/2/json/' in msg.topic:
print(f"Received a JSON message: {msg.topic} {msg.payload}")
try:
decoded = msg.payload.decode("utf-8")
j = json.loads(decoded, cls=_JSONDecoder)
self.handle_log(msg)
if j['type'] == "neighborinfo":
self.handle_neighborinfo(j)
if j['type'] == "nodeinfo":
self.handle_nodeinfo(j)
if j['type'] == "position":
self.handle_position(j)
if j['type'] == "telemetry":
self.handle_telemetry(j)
if j['type'] == "text":
self.handle_text(j)
if j['type'] == "traceroute":
if 'route' in j['payload']:
route = []
for r in j['payload']['route']:
node = self.data.find_node_by_longname(r)
if node is not None:
id = node['id']
else:
id = None
route.append(id)
j['route'] = route
self.handle_traceroute(j)
self.prune_expired_nodes()
except Exception as e:
print(e)
traceback.print_exc()

client = mqtt_client.Client(client_id=self.client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
Expand Down Expand Up @@ -94,8 +248,9 @@ def handle_log(self, msg):

def handle_neighborinfo(self, msg):
msg['from'] = f'{msg["from"]:x}'
msg['to'] = f'{msg["to"]:x}'
if msg['sender'] and isinstance(msg['sender'], str):
if 'to' in msg:
msg['to'] = f'{msg["to"]:x}'
if 'sender' in msg and msg['sender'] and isinstance(msg['sender'], str):
msg['sender'] = msg['sender'].replace('!', '')

id = msg['from']
Expand All @@ -113,32 +268,52 @@ def handle_neighborinfo(self, msg):

def handle_nodeinfo(self, msg):
msg['from'] = f'{msg["from"]:x}'
msg['to'] = f'{msg["to"]:x}'
if msg['sender'] and isinstance(msg['sender'], str):
if 'to' in msg:
msg['to'] = f'{msg["to"]:x}'
if 'sender' in msg and msg['sender'] and isinstance(msg['sender'], str):
msg['sender'] = msg['sender'].replace('!', '')

id = msg['payload']['id']
if id in self.data.nodes:
node = self.data.nodes[id]
node['hardware'] = msg['payload']['hardware']
node['longname'] = msg['payload']['longname']
node['shortname'] = msg['payload']['shortname']
if 'hardware' in msg['payload']:
node['hardware'] = msg['payload']['hardware']
elif 'hw_model' in msg['payload']:
node['hardware'] = msg['payload']['hw_model']
if 'longname' in msg['payload']:
node['longname'] = msg['payload']['longname']
elif 'long_name' in msg['payload']:
node['longname'] = msg['payload']['long_name']
if 'shortname' in msg['payload']:
node['shortname'] = msg['payload']['shortname']
elif 'short_name' in msg['payload']:
node['shortname'] = msg['payload']['short_name']
self.data.update_node(id, node)
print(f"Node {id} updated")
else:
node = Node.default_node(id)
node['hardware'] = msg['payload']['hardware']
node['longname'] = msg['payload']['longname']
node['shortname'] = msg['payload']['shortname']
if 'hardware' in msg['payload']:
node['hardware'] = msg['payload']['hardware']
elif 'hw_model' in msg['payload']:
node['hardware'] = msg['payload']['hw_model']
if 'longname' in msg['payload']:
node['longname'] = msg['payload']['longname']
elif 'long_name' in msg['payload']:
node['longname'] = msg['payload']['long_name']
if 'shortname' in msg['payload']:
node['shortname'] = msg['payload']['shortname']
elif 'short_name' in msg['payload']:
node['shortname'] = msg['payload']['short_name']
self.data.update_node(id, node)
print(f"Node {id} added")
self.sort_nodes_by_shortname()
self.data.save()

def handle_position(self, msg):
msg['from'] = f'{msg["from"]:x}'
msg['to'] = f'{msg["to"]:x}'
if msg['sender'] and isinstance(msg['sender'], str):
if 'to' in msg:
msg['to'] = f'{msg["to"]:x}'
if 'sender' in msg and msg['sender'] and isinstance(msg['sender'], str):
msg['sender'] = msg['sender'].replace('!', '')

id = msg['from']
Expand All @@ -156,8 +331,9 @@ def handle_position(self, msg):

def handle_telemetry(self, msg):
msg['from'] = f'{msg["from"]:x}'
msg['to'] = f'{msg["to"]:x}'
if msg['sender'] and isinstance(msg['sender'], str):
if 'to' in msg:
msg['to'] = f'{msg["to"]:x}'
if 'sender' in msg and msg['sender'] and isinstance(msg['sender'], str):
msg['sender'] = msg['sender'].replace('!', '')

id = msg['from']
Expand All @@ -183,13 +359,19 @@ def handle_telemetry(self, msg):

def handle_text(self, msg):
msg['from'] = f'{msg["from"]:x}'
msg['to'] = f'{msg["to"]:x}'
if msg['sender'] and isinstance(msg['sender'], str):
if 'to' in msg:
msg['to'] = f'{msg["to"]:x}'
if 'sender' in msg and msg['sender'] and isinstance(msg['sender'], str):
msg['sender'] = msg['sender'].replace('!', '')

self.data.chat['channels'][str(msg['channel'])]['messages'].insert(0, {
if str(msg['channel']) not in self.data.chat['channels']:
self.data.chat['channels'][str(msg['channel'])] = {
'name': f'Channel {msg["channel"]}',
'messages': []
}

chat = {
'id': msg['id'],
'sender': msg['sender'],
'from': msg['from'],
'to': msg['to'],
'channel': str(msg['channel']),
Expand All @@ -198,18 +380,28 @@ def handle_text(self, msg):
'hops_away': msg['hops_away'] if 'hops_away' in msg else None,
'rssi': msg['rssi'] if 'rssi' in msg else None,
'snr': msg['snr'] if 'snr' in msg else None,
})
}
if 'sender' in msg:
chat['sender'] = msg['sender']
self.data.chat['channels'][str(msg['channel'])]['messages'].insert(0, chat)
self.data.save()

def handle_traceroute(self, msg):
msg['from'] = f'{msg["from"]:x}'
msg['to'] = f'{msg["to"]:x}'
if msg['sender'] and isinstance(msg['sender'], str):
if 'to' in msg:
msg['to'] = f'{msg["to"]:x}'
if 'sender' in msg and msg['sender'] and isinstance(msg['sender'], str):
msg['sender'] = msg['sender'].replace('!', '')
msg['route'] = msg['payload']['route']
msg['route_ids'] = []
for r in msg['route']:
node = self.data.find_node_by_longname(r)
if isinstance(r, str):
node = self.data.find_node_by_longname(r)
elif isinstance(r, int):
node = self.data.find_node_by_hex_id(r)
else:
node = None

if node:
msg['route_ids'].append(node['id'])
else:
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ python-dotenv
fastapi
discord.py
requests
meshtastic
cryptography
Loading

0 comments on commit 6681539

Please sign in to comment.