Skip to content
This repository has been archived by the owner on Dec 22, 2024. It is now read-only.

Commit

Permalink
feat/intercom
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed May 30, 2024
1 parent e2ccd8c commit a667968
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
81 changes: 79 additions & 2 deletions hivemind_core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass, field
from enum import Enum, IntEnum
from typing import List, Dict, Optional
import pgpy

from ovos_bus_client import MessageBusClient
from ovos_bus_client.message import Message
Expand All @@ -13,6 +14,7 @@

from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_bus_client.serialization import decode_bitstring, get_bitstring
from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.util import (
decrypt_bin,
encrypt_bin,
Expand Down Expand Up @@ -236,7 +238,7 @@ class HiveMindListenerProtocol:

require_crypto: bool = True # throw error if crypto key not available
handshake_enabled: bool = True # generate a key per session if not pre-shared

identity: Optional[NodeIdentity] = None
# below are optional callbacks to handle payloads
# receives the payload + HiveMindClient that sent it
escalate_callback = None # slave asked to escalate payload
Expand All @@ -246,7 +248,8 @@ class HiveMindListenerProtocol:
mycroft_bus_callback = None # slave asked to inject payload into mycroft bus
shared_bus_callback = None # passive sharing of slave device bus (info)

def bind(self, websocket, bus):
def bind(self, websocket, bus, identity):
self.identity = identity
websocket.protocol = self
self.internal_protocol = HiveMindListenerInternalProtocol(bus)
self.internal_protocol.register_bus_handlers()
Expand Down Expand Up @@ -367,6 +370,8 @@ def handle_message(self, message: HiveMessage, client: HiveMindClientConnection)
self.handle_broadcast_message(message, client)
elif message.msg_type == HiveMessageType.ESCALATE:
self.handle_escalate_message(message, client)
elif message.msg_type == HiveMessageType.INTERCOM:
self.handle_intercom_message(message, client)
elif message.msg_type == HiveMessageType.BINARY:
self.handle_binary_message(message, client)
else:
Expand Down Expand Up @@ -476,6 +481,16 @@ def handle_broadcast_message(
if self.broadcast_callback:
self.broadcast_callback(payload)

if message.payload.msg_type == HiveMessageType.INTERCOM:
if self.handle_intercom_message(message.payload, client):
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.identity.site_id:
self.handle_bus_message(message.payload, client)

# broadcast message to other peers
payload = self._unpack_message(message, client)
for peer in self.clients:
Expand Down Expand Up @@ -514,6 +529,16 @@ def handle_propagate_message(
if self.propagate_callback:
self.propagate_callback(payload)

if message.payload.msg_type == HiveMessageType.INTERCOM:
if self.handle_intercom_message(message.payload, client):
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.identity.site_id:
self.handle_bus_message(message.payload, client)

# propagate message to other peers
for peer in self.clients:
if peer == client.peer:
Expand Down Expand Up @@ -557,6 +582,16 @@ def handle_escalate_message(
if self.escalate_callback:
self.escalate_callback(payload)

if message.payload.msg_type == HiveMessageType.INTERCOM:
if self.handle_intercom_message(message.payload, client):
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.identity.site_id:
self.handle_bus_message(message.payload, client)

# send to other masters
message = Message(
"hive.send.upstream",
Expand All @@ -570,6 +605,48 @@ def handle_escalate_message(
bus = self.get_bus(client)
bus.emit(message)

def handle_intercom_message(
self, message: HiveMessage, client: HiveMindClientConnection
) -> bool:

# if the message targets us, send it to internal bus
k = message.target_public_key
if k and k != self.identity.public_key:
# not for us
return False

pload = message.payload
if isinstance(pload, dict) and "ciphertext" in pload:
try:
message_from_blob = pgpy.PGPMessage.from_blob(pload["ciphertext"])

with open(self.identity.private_key, "r") as f:
private_key = pgpy.PGPKey.from_blob(f.read())

decrypted: str = private_key.decrypt(message_from_blob)
message._payload = HiveMessage.deserialize(decrypted)
except:
if k:
LOG.error("failed to decrypt message!")
else:
LOG.debug("failed to decrypt message, not for us")
return False

if message.msg_type == HiveMessageType.BUS:
self.handle_bus_message(message, client)
return True
elif message.msg_type == HiveMessageType.PROPAGATE:
self.handle_propagate_message(message, client)
return True
elif message.msg_type == HiveMessageType.BROADCAST:
self.handle_broadcast_message(message, client)
return True
elif message.msg_type == HiveMessageType.ESCALATE:
self.handle_escalate_message(message, client)
return True

return False

# HiveMind mycroft bus messages - from slave -> master
def handle_inject_mycroft_msg(
self, message: Message, client: HiveMindClientConnection
Expand Down
2 changes: 1 addition & 1 deletion hivemind_core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def run(self):
loop = ioloop.IOLoop.current()

self.protocol = self._proto(loop=loop)
self.protocol.bind(self._ws_handler, self.bus)
self.protocol.bind(self._ws_handler, self.bus, self.identity)
self.status.bind(self.bus)
self.status.set_started()

Expand Down

0 comments on commit a667968

Please sign in to comment.