-
Notifications
You must be signed in to change notification settings - Fork 1
/
dynon_serial_reader.py
84 lines (68 loc) · 2.21 KB
/
dynon_serial_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import serial
import logger
class DynonSerialReader(object):
"""
Helper to read data from a Dynon serial connection.
"""
def __init__(
self,
serial_port: str
):
"""
Creates a new reader for the given serial port.
Arguments:
serial_port {str} -- The port to read from.
"""
super().__init__()
self.serial_port = serial_port
self.serial_reader = None
self.open_serial_connection()
def open_serial_connection(
self
):
"""
Attempts to open the serial connection.
"""
try:
if self.serial_reader is None:
logger.log(
'ATTEMPTING to open connection to {0}'.format(
self.serial_port))
self.serial_reader = serial.Serial(
self.serial_port,
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=10)
self.serial_reader.flushInput()
logger.log(
'OPENED serial connection to {0}'.format(
self.serial_port))
except:
self.serial_reader = None
logger.log(
'FAILED attempt to open serial connection to {0}'.format(self.serial_port))
def read(
self
) -> str:
"""
Attempts to read from the serial port.
Returns:
str -- Any data read from the serial connection. Returns an empty string if unable to read.
"""
try:
if self.serial_reader is not None:
serial_bytes = self.serial_reader.readline()
if serial_bytes != None and len(serial_bytes) > 0:
raw_read = str(serial_bytes, encoding='ascii')
logger.log(raw_read)
return raw_read
else:
self.open_serial_connection()
return ""
except:
try:
self.serial_reader.close()
finally:
self.serial_reader = None