-
Notifications
You must be signed in to change notification settings - Fork 207
/
user.py
315 lines (280 loc) · 12.5 KB
/
user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#!/usr/bin/env python2.7
from __future__ import print_function
from yapsy.PluginManager import PluginManager
import argparse # new in Python2.7
import atexit
import logging
import string
import sys
import threading
import time
logging.basicConfig(level=logging.ERROR)
# Load the plugins from the plugin directory.
manager = PluginManager()
if __name__ == '__main__':
print("------------user.py-------------")
parser = argparse.ArgumentParser(description="OpenBCI 'user'")
parser.add_argument('--board', default="cyton",
help="Choose between [cyton] and [ganglion] boards.")
parser.add_argument('-l', '--list', action='store_true',
help="List available plugins.")
parser.add_argument('-i', '--info', metavar='PLUGIN',
help="Show more information about a plugin.")
parser.add_argument('-p', '--port',
help="For Cyton, port to connect to OpenBCI Dongle " +
"( ex /dev/ttyUSB0 or /dev/tty.usbserial-* ). " +
"For Ganglion, MAC address of the board. For both, AUTO to attempt auto-detection.")
parser.set_defaults(port="AUTO")
# baud rate is not currently used
parser.add_argument('-b', '--baud', default=115200, type=int,
help="Baud rate (not currently used)")
parser.add_argument('--no-filtering', dest='filtering',
action='store_false',
help="Disable notch filtering")
parser.set_defaults(filtering=True)
parser.add_argument('-d', '--daisy', dest='daisy',
action='store_true',
help="Force daisy mode (cyton board)")
parser.add_argument('-x', '--aux', dest='aux',
action='store_true',
help="Enable accelerometer/AUX data (ganglion board)")
# first argument: plugin name, then parameters for plugin
parser.add_argument('-a', '--add', metavar=('PLUGIN', 'PARAM'),
action='append', nargs='+',
help="Select which plugins to activate and set parameters.")
parser.add_argument('--log', dest='log', action='store_true',
help="Log program")
parser.add_argument('--plugins-path', dest='plugins_path', nargs='+',
help="Additional path(s) to look for plugins")
parser.set_defaults(daisy=False, log=False)
args = parser.parse_args()
if not args.add:
print("WARNING: no plugin selected, you will only be able to communicate with the board. "
"You should select at least one plugin with '--add [plugin_name]'. "
"Use '--list' to show available plugins or '--info [plugin_name]' to get more information.")
if args.board == "cyton":
print("Board type: OpenBCI Cyton (v3 API)")
import openbci.cyton as bci
elif args.board == "ganglion":
print("Board type: OpenBCI Ganglion")
import openbci.ganglion as bci
else:
raise ValueError('Board type %r was not recognized. Known are 3 and 4' % args.board)
# Check AUTO port selection, a "None" parameter for the board API
if "AUTO" == args.port.upper():
print("Will try do auto-detect board's port. Set it manually with '--port' if it goes wrong.")
args.port = None
else:
print("Port: ", args.port)
plugins_paths = ["openbci/plugins"]
if args.plugins_path:
plugins_paths += args.plugins_path
manager.setPluginPlaces(plugins_paths)
manager.collectPlugins()
# Print list of available plugins and exit
if args.list:
print("Available plugins:")
for plugin in manager.getAllPlugins():
print("\t- " + plugin.name)
exit()
# User wants more info about a plugin...
if args.info:
plugin = manager.getPluginByName(args.info)
if plugin == None:
# eg: if an import fail inside a plugin, yapsy skip it
print("Error: [ " + args.info +
" ] not found or could not be loaded. Check name and requirements.")
else:
print(plugin.description)
plugin.plugin_object.show_help()
exit()
print("\n------------SETTINGS-------------")
print("Notch filtering:" + str(args.filtering))
# Logging
if args.log:
print("Logging Enabled: " + str(args.log))
logging.basicConfig(filename="OBCI.log", format='%(asctime)s - %(levelname)s : %(message)s',
level=logging.DEBUG)
logging.getLogger('yapsy').setLevel(logging.DEBUG)
logging.info('---------LOG START-------------')
logging.info(args)
else:
print("user.py: Logging Disabled.")
print("\n-------INSTANTIATING BOARD-------")
if args.board == "cyton":
board = bci.OpenBCICyton(port=args.port,
baud=args.baud,
daisy=args.daisy,
filter_data=args.filtering,
scaled_output=True,
log=args.log)
elif args.board == "ganglion":
board = bci.OpenBCIGanglion(port=args.port,
filter_data=args.filtering,
scaled_output=True,
log=args.log,
aux=args.aux)
# Info about effective number of channels and sampling rate
if board.daisy:
print("Force daisy mode:")
else:
print("No daisy:")
print(board.getNbEEGChannels(), "EEG channels and", board.getNbAUXChannels(), "AUX channels at",
board.getSampleRate(), "Hz.")
print("\n------------PLUGINS--------------")
# Loop round the plugins and print their names.
print("Found plugins:")
for plugin in manager.getAllPlugins():
print("[ " + plugin.name + " ]")
print("\n")
# Fetch plugins, try to activate them, add to the list if OK
plug_list = []
callback_list = []
if args.add:
for plug_candidate in args.add:
# first value: plugin name, then optional arguments
plug_name = plug_candidate[0]
plug_args = plug_candidate[1:]
# Try to find name
plug = manager.getPluginByName(plug_name)
if plug == None:
# eg: if an import fail inside a plugin, yapsy skip it
print("Error: [ " + plug_name + " ] not found or could not be loaded. Check name and requirements.")
else:
print("\nActivating [ " + plug_name + " ] plugin...")
if not plug.plugin_object.pre_activate(plug_args, sample_rate=board.getSampleRate(),
eeg_channels=board.getNbEEGChannels(),
aux_channels=board.getNbAUXChannels(),
imp_channels=board.getNbImpChannels()):
print("Error while activating [ " + plug_name + " ], check output for more info.")
else:
print("Plugin [ " + plug_name + "] added to the list")
plug_list.append(plug.plugin_object)
callback_list.append(plug.plugin_object)
if len(plug_list) == 0:
fun = None
else:
fun = callback_list
def cleanUp():
board.disconnect()
print("Deactivating Plugins...")
for plug in plug_list:
plug.deactivate()
print("User.py exiting...")
atexit.register(cleanUp)
print("--------------INFO---------------")
print("User serial interface enabled...\n\
View command map at http://docs.openbci.com.\n\
Type /start to run (/startimp for impedance \n\
checking, if supported) -- and /stop\n\
before issuing new commands afterwards.\n\
Type /exit to exit. \n\
Board outputs are automatically printed as: \n\
% <tab> message\n\
$$$ signals end of message")
print("\n-------------BEGIN---------------")
# Init board state
# s: stop board streaming; v: soft reset of the 32-bit board (no effect with 8bit board)
s = 'sv'
# Tell the board to enable or not daisy module
if board.daisy:
s = s + 'C'
else:
s = s + 'c'
# d: Channels settings back to default
s = s + 'd'
while s != "/exit":
# Send char and wait for registers to set
if not s:
pass
elif "help" in s:
print("View command map at: \
http://docs.openbci.com/software/01-OpenBCI_SDK.\n\
For user interface: read README or view \
https://github.com/OpenBCI/OpenBCI_Python")
elif board.streaming and s != "/stop":
print("Error: the board is currently streaming data, please type '/stop' before issuing new commands.")
else:
# read silently incoming packet if set (used when stream is stopped)
flush = False
if '/' == s[0]:
s = s[1:]
rec = False # current command is recognized or fot
if "T:" in s:
lapse = int(s[string.find(s, "T:") + 2:])
rec = True
elif "t:" in s:
lapse = int(s[string.find(s, "t:") + 2:])
rec = True
else:
lapse = -1
if 'startimp' in s:
if board.getBoardType() == "cyton":
print("Impedance checking not supported on cyton.")
else:
board.setImpedance(True)
if (fun != None):
# start streaming in a separate thread so we could always send commands in here
boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse))
boardThread.daemon = True # will stop on exit
try:
boardThread.start()
except:
raise
else:
print("No function loaded")
rec = True
elif "start" in s:
board.setImpedance(False)
if fun != None:
# start streaming in a separate thread so we could always send commands in here
boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse))
boardThread.daemon = True # will stop on exit
try:
boardThread.start()
except:
raise
else:
print("No function loaded")
rec = True
elif 'test' in s:
test = int(s[s.find("test") + 4:])
board.test_signal(test)
rec = True
elif 'stop' in s:
board.stop()
rec = True
flush = True
if rec == False:
print("Command not recognized...")
elif s:
for c in s:
if sys.hexversion > 0x03000000:
board.ser_write(bytes(c, 'utf-8'))
else:
board.ser_write(bytes(c))
time.sleep(0.100)
line = ''
time.sleep(0.1) # Wait to see if the board has anything to report
# The Cyton nicely return incoming packets -- here supposedly messages
# whereas the Ganglion prints incoming ASCII message by itself
if board.getBoardType() == "cyton":
while board.ser_inWaiting():
# we're supposed to get UTF8 text, but the board might behave otherwise
c = board.ser_read().decode('utf-8', errors='replace')
line += c
time.sleep(0.001)
if (c == '\n') and not flush:
print('%\t' + line[:-1])
line = ''
elif board.getBoardType() == "ganglion":
while board.ser_inWaiting():
board.waitForNotifications(0.001)
if not flush:
print(line)
# Take user input
# s = input('--> ')
if sys.hexversion > 0x03000000:
s = input('--> ')
else:
s = raw_input('--> ')