-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.py
200 lines (176 loc) · 8.98 KB
/
listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# The MIT License (MIT)
# Copyright (c) 2023 Edgaras Janušauskas and Inovatorius MB (www.fildz.com)
################################################################################
# FILDZ CYBEROS EVENT LISTENER
#
# ESP-NOW event listener and firing of events or generators.
import uasyncio as asyncio
from uasyncio import Event
import fildz_cyberos as cyberos
import ustruct as struct
class Listener:
def __init__(self):
self._on_event = Event()
asyncio.create_task(self._event())
self._sender_mac = None # Event sender MAC address (e.g., b'\x9e\x9c\x1f\x00\x00\x00')
self._name = None # Event name (e.g., on_pair, on_ping)
self._args = list() # Event arguments (e.g., (0, 0, 'Hello World!'))
self._sender = None # Event sender name (e.g., BUTTON-02AD9A-WAY)
self._receiver = None # Event receiver name (e.g., DISPLAY-0F889A-ABW)
################################################################################
# Properties
#
@property
def sender_mac(self):
return self._sender_mac
@property
def sender(self):
return self._sender
@property
def receiver(self):
return self._receiver
@property
def name(self):
return self._name
@property
def args(self):
return self._args
################################################################################
# Events
#
@property
def on_event(self):
return self._on_event
################################################################################
# Tasks
#
# New event received.
async def _event(self):
async for sender, event in cyberos.espnow:
self._sender_mac = sender
try:
self._sender, self._receiver, self._name, self._args = self.decode(event)
except:
continue
# print('\nFROM:', self._sender)
# print('TO:', self._receiver)
# print('EVENT:', self._name)
# print('ARGS:', self._args)
self._on_event.set() # We have a new event, inform tasks.
# To whom event was sent?
if not len(self._receiver):
# Event was sent to all cyberwares, so it is a public event.
# Public events are sent to AP MAC address on default channel.
if self._name in cyberos.cyberwares[cyberos.network.ap_ssid]['events']:
if not cyberos.cyberwares[cyberos.network.ap_ssid]['events'][self._name].is_set():
# print('\nPUBLIC {} EVENT'.format(self._name))
cyberos.cyberwares[cyberos.network.ap_ssid]['events'][self._name].set()
elif self._receiver == cyberos.network.ap_ssid:
# Event was sent to our cyberware, so it is a private event.
# Private events are sent to STA MAC address on random channel (channel depends on cyberware config).
# The event we received can be from unpaired, in-pairing, or paired cyberware.
# Is the event received is from the paired or unpaired cyberware?
if self._sender in cyberos.cyberwares['subscribed']:
# Received an event from either paired-subscribed or unpaired-subscribed cyberware.
# If we have a mac address of the event sender, then we are paired with it.
if 'mac' in cyberos.cyberwares['subscribed'][self._sender]:
# We are paired with the event sender, but are we subscribed to its events?
if self._name in cyberos.cyberwares['subscribed'][self._sender]['events']:
# print('\nPAIRED {} {} SUBSCRIBED EVENT'.format(self._sender, self._name))
# Indeed we are subscribed to paired cyberware events.
# Now should we set the event or run a function?
if cyberos.cyberwares['subscribed'][self._sender] \
['events'][self._name].__class__.__name__ is 'Event':
if not cyberos.cyberwares['subscribed'][self._sender]['events'][self._name].is_set():
cyberos.cyberwares['subscribed'][self._sender]['events'][self._name].set()
elif cyberos.cyberwares['subscribed'][self._sender]['events'][
self._name].__class__.__name__ is 'generator':
await cyberos.cyberwares['subscribed'][self._sender]['events'][self._name]()
# else:
# We do not listen to paired cyberware events.
# print('\nPAIRED {} {} UNSUBSCRIBED EVENT'.format(self._sender, self._name))
# else:
# Event from unpaired device, but we are subscribed to its events.
# We do not execute any events from unpaired cyberware except for public events.
# print('\nUNPAIRED {} {} SUBSCRIBED EVENT'.format(self._sender, self._name))
else:
# Received an event from 'unpaired' or 'in-pairing' cyberware.
# We do not execute any events from 'unpaired' cyberware except if it is a public event
# like 'on_pairing' that is only available in 'in-pairing' cyberware.
# 'in-pairing' cyberware is a cyberware that is answering our pairing (on_pairing event) request.
if self._name in cyberos.cyberwares[cyberos.network.ap_ssid]['events']:
if not cyberos.cyberwares[cyberos.network.ap_ssid]['events'][self._name].is_set():
# print('\nCYBEROS {} EVENT'.format(self._name))
cyberos.cyberwares[cyberos.network.ap_ssid]['events'][self._name].set()
# else:
# print('\nUNPAIRED {} {} UNSUBSCRIBED EVENT'.format(self._sender, self._name))
# Event was sent to some other cyberware, resend it to all.
# else:
# print('\nRetransmitting...')
self._on_event.clear()
async def encode(self, event_name, args, cyberware=''):
a_len = len(cyberos.network.ap_ssid)
c_len = len(cyberware)
e_len = len(event_name)
n = len(args)
for arg in args:
n += len(arg)
offset = 1 + a_len + 1 + c_len + 1 + e_len
n += offset
data = bytearray(n)
buffer = memoryview(data)
struct.pack_into('b%isb%isb%is' % (a_len, c_len, e_len), buffer, 0,
a_len, cyberos.network.ap_ssid,
c_len, cyberware,
e_len, event_name)
for arg in args:
arg_len = len(arg)
struct.pack_into('b%is' % arg_len, buffer, offset, arg_len, arg)
offset += 1 + arg_len
return buffer
async def decode(self, event):
size = len(event)
event = memoryview(event)
offset = 0
args = list()
# Event sender, receiver, name.
for x in range(3):
arg_size = event[offset:offset + 1][0]
arg = event[offset + 1:offset + 1 + arg_size]
try:
arg = str(arg, 'utf8')
except UnicodeError:
arg = bytes(arg)
offset += 1 + len(arg)
yield arg
# Event args.
while True:
if offset >= size:
yield args
break
arg_size = event[offset:offset + 1][0]
arg = event[offset + 1:offset + 1 + arg_size]
try:
arg = str(arg, 'utf8')
except UnicodeError:
arg = bytes(arg)
args.append(arg)
offset += 1 + len(arg)
async def send(self, event_name, *args, cyberware='', sync=True):
if cyberware is '':
for cyberware in cyberos.cyberwares['subscribed']:
_event = await self.encode(event_name, '' if not len(args) else args, cyberware=cyberware)
await cyberos.espnow.asend(cyberos.cyberwares['subscribed'][cyberware]['mac'], _event, sync=sync)
else:
_event = await self.encode(event_name, '' if not len(args) else args, cyberware=cyberware)
await cyberos.espnow.asend(cyberos.cyberwares['subscribed'][cyberware]['mac'], _event, sync=sync)
async def push(self, cyberware_name, event_name, event):
if cyberware_name in cyberos.cyberwares['subscribed']:
cyberos.cyberwares['subscribed'][cyberware_name]['events'].update({event_name: event})
else:
cyberos.cyberwares['subscribed'].update({cyberware_name: {'events': {event_name: event}}})
async def pull(self, cyberware_name, event_name=''):
if event_name is '':
cyberos.cyberwares['subscribed'][cyberware_name]['events'].clear()
else:
cyberos.cyberwares['subscribed'][cyberware_name]['events'].pop(event_name)