This repository has been archived by the owner on Jan 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathfake_serial_device.py
executable file
·103 lines (73 loc) · 2.91 KB
/
fake_serial_device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
import pty
import os
import fcntl
import termios
from av_device import AV_Device
class Fake_SerialDevice(AV_Device):
"""Create a local serial-port-like device that can be used to
impersonate real devices connected to a serial port.
This is useful for testing programs communicating with a serial
device when the serial device is not available.
"""
Description = "Fake serial port device"
def __init__(self, av_loop, name):
AV_Device.__init__(self, av_loop, name)
self.master, self.slave = pty.openpty()
self._client_name = os.ttyname(self.slave)
# Close the slave descriptor. It will be reopened by the client
os.close(self.slave)
# Make the master descriptor non-blocking.
fl = fcntl.fcntl(self.master, fcntl.F_GETFL)
fcntl.fcntl(self.master, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# Backup old term settings and setup new settings
self.term = termios.tcgetattr(self.master)
newterm = termios.tcgetattr(self.master)
newterm[3] = newterm[3] & ~termios.ECHO # lflags
termios.tcsetattr(self.master, termios.TCSAFLUSH, newterm)
self.av_loop.add_handler(
self.master, self.handle_io, self.av_loop.READ)
def __del__(self):
# Close the remaining descriptor
termios.tcsetattr(self.master, termios.TCSAFLUSH, self.term)
os.close(self.master)
def fileno(self):
return self.master
def client_name(self):
return self._client_name
def handle_io(self, fd, events):
assert fd == self.master
if events & self.av_loop.READ:
self.handle_read()
if events & self.av_loop.WRITE:
self.handle_write()
if events & self.av_loop.ERROR:
# Ignore HUP and EIO, etc. FIXME: Is this safe?
pass
events &= ~(
self.av_loop.READ | self.av_loop.WRITE | self.av_loop.ERROR)
if events:
self.debug("Unhandled events: %u" % (events))
def handle_read(self):
"""Attempt to read data from the PTY.
This method should probably be overridden in subclasses.
"""
print(os.read(self.master, 64 * 1024))
def handle_write(self):
"""Must be overridden in subclasses that poll for writes."""
raise NotImplementedError
def main(args):
import argparse
from tornado.ioloop import IOLoop
from av_loop import AV_Loop
parser = argparse.ArgumentParser(description=Fake_SerialDevice.Description)
Fake_SerialDevice.register_args("fake", parser)
IOLoop.configure(AV_Loop, parsed_args=vars(parser.parse_args(args)))
mainloop = IOLoop.instance()
fake = Fake_SerialDevice(mainloop, "fake")
print("%s is listening on %s" % (
fake.Description, fake.client_name()))
return mainloop.run()
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv[1:]))