forked from ghostboy3/EyeTalk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
97 lines (76 loc) · 3.84 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
''' Demonstrates how to subscribe to and handle data from gaze and event streams '''
import time
import adhawkapi
import adhawkapi.frontend
class FrontendData:
''' BLE Frontend '''
def __init__(self):
# Instantiate an API object
# TODO: Update the device name to match your device
self._api = adhawkapi.frontend.FrontendApi(ble_device_name='ADHAWK MINDLINK-282')
# Tell the api that we wish to receive eye tracking data stream
# with self._handle_et_data as the handler
self._api.register_stream_handler(adhawkapi.PacketType.EYETRACKING_STREAM, self._handle_et_data)
# Tell the api that we wish to tap into the EVENTS stream
# with self._handle_events as the handler
self._api.register_stream_handler(adhawkapi.PacketType.EVENTS, self._handle_events)
# Start the api and set its connection callback to self._handle_tracker_connect/disconnect.
# When the api detects a connection to a MindLink, this function will be run.
self._api.start(tracker_connect_cb=self._handle_tracker_connect,
tracker_disconnect_cb=self._handle_tracker_disconnect)
def shutdown(self):
'''Shutdown the api and terminate the bluetooth connection'''
self._api.shutdown()
@staticmethod
def _handle_et_data(et_data: adhawkapi.EyeTrackingStreamData):
''' Handles the latest et data '''
if et_data.gaze is not None:
xvec, yvec, zvec, vergence = et_data.gaze
print(f'Gaze={xvec:.2f},y={yvec:.2f},z={zvec:.2f},vergence={vergence:.2f}')
if et_data.eye_center is not None:
if et_data.eye_mask == adhawkapi.EyeMask.BINOCULAR:
rxvec, ryvec, rzvec, lxvec, lyvec, lzvec = et_data.eye_center
print(f'Eye center: Left=(x={lxvec:.2f},y={lyvec:.2f},z={lzvec:.2f}) '
f'Right=(x={rxvec:.2f},y={ryvec:.2f},z={rzvec:.2f})')
if et_data.pupil_diameter is not None:
if et_data.eye_mask == adhawkapi.EyeMask.BINOCULAR:
rdiameter, ldiameter = et_data.pupil_diameter
print(f'Pupil diameter: Left={ldiameter:.2f} Right={rdiameter:.2f}')
if et_data.imu_quaternion is not None:
if et_data.eye_mask == adhawkapi.EyeMask.BINOCULAR:
x, y, z, w = et_data.imu_quaternion
print(f'IMU: x={x:.2f},y={y:.2f},z={z:.2f},w={w:.2f}')
@staticmethod
def _handle_events(event_type, timestamp, *args):
if event_type == adhawkapi.Events.BLINK:
duration = args[0]
print(f'Got blink: {timestamp} {duration}')
if event_type == adhawkapi.Events.EYE_CLOSED:
eye_idx = args[0]
print(f'Eye Close: {timestamp} {eye_idx}')
if event_type == adhawkapi.Events.EYE_OPENED:
eye_idx = args[0]
print(f'Eye Open: {timestamp} {eye_idx}')
def _handle_tracker_connect(self):
print("Tracker connected")
self._api.set_et_stream_rate(60, callback=lambda *args: None)
self._api.set_et_stream_control([
adhawkapi.EyeTrackingStreamTypes.GAZE,
adhawkapi.EyeTrackingStreamTypes.EYE_CENTER,
adhawkapi.EyeTrackingStreamTypes.PUPIL_DIAMETER,
adhawkapi.EyeTrackingStreamTypes.IMU_QUATERNION,
], True, callback=lambda *args: None)
self._api.set_event_control(adhawkapi.EventControlBit.BLINK, 1, callback=lambda *args: None)
self._api.set_event_control(adhawkapi.EventControlBit.EYE_CLOSE_OPEN, 1, callback=lambda *args: None)
def _handle_tracker_disconnect(self):
print("Tracker disconnected")
def main():
''' App entrypoint '''
frontend = FrontendData()
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
frontend.shutdown()
if __name__ == '__main__':
main()