Skip to content

Commit

Permalink
first full pexpect version
Browse files Browse the repository at this point in the history
  • Loading branch information
gilesknap committed Mar 23, 2024
1 parent 7c35d5f commit 1252f95
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 80 deletions.
14 changes: 2 additions & 12 deletions src/rtems_proxy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import __version__
from .copy import copy_rtems
from .globals import GLOBALS
from .telnet import connect
from .telnet import ioc_connect

__all__ = ["main"]

Expand Down Expand Up @@ -66,17 +66,7 @@ def start(
)
if copy:
copy_rtems()
connect(GLOBALS.RTEMS_CONSOLE, reboot=reboot)

while True:
print(f"\n\nIOC {GLOBALS.IOC_NAME} disconnected. Reconnect or exit? [r/e]")
choice = ""
while choice not in ["r", "e"]:
choice = input()
if choice == "e":
break
connect(GLOBALS.RTEMS_CONSOLE)
sleep(10)
ioc_connect(GLOBALS.RTEMS_CONSOLE, reboot=reboot)


@cli.command()
Expand Down
224 changes: 156 additions & 68 deletions src/rtems_proxy/telnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,107 +8,195 @@
from .utils import run_command


class CurrentPrompt(Enum):
class CannotConnect(Exception):
pass


class RtemsState(Enum):
MOT = 0
IOC = 2
UNKNOWN = 3


class TelnetRTEMS:
MOT_PROMPT = "MVME5500> "
"""
A class for connecting to an RTEMS MVME5500 IOC over telnet.
properties:
_hostname: the hostname of the terminal server connected to the IOC
_port: the port of the terminal server connected to the IOC
_ioc_reboot: a flag to determine if the IOC should be rebooted
_child: the pexpect child object for the initial telnet session
"""

MOT_PROMPT = "MVME5500> $"
CONTINUE = "<SPC> to Continue"
REBOOTING = "(most recent call last):"
REBOOTED = "TCP Statistics"
IOC_STARTED = "iocRun: All initialization complete"
DBPF = "help dbpf"
DBPF_RESPONSE = "of record field."
IOC_CHECK = "\ntaskwdShow"
IOC_RESPONSE = "free nodes"
NO_CONNECTION = "Connection closed by foreign host"

def __init__(self, host_and_port: str, reboot: bool, pause: bool):
self.hostname, self.port = host_and_port.split(":")
self.reboot = reboot
self.pause = pause
self.running = True
self.terminated = False
self.command = f"telnet {self.hostname} {self.port}"
def __init__(self, host_and_port: str, ioc_reboot: bool):
self._hostname, self._port = host_and_port.split(":")
self._ioc_reboot = ioc_reboot
self._child = None

self.ioc_rebooted = False
self.command = f"telnet {self._hostname} {self._port}"

signal.signal(signal.SIGINT, self.terminate)
signal.signal(signal.SIGTERM, self.terminate)

def report(self, message):
"""
print a message that is noticeable amongst all the other output
"""
print(f"\n>>>> {message} <<<<\n")

def terminate(self, signum, frame):
print(">> Terminating <<")
"""
Allow the user to terminate the connection with ctrl-c while the
pexpect child is running (but not once interactive telnet is started)
"""
self.report("Terminating")
exit(0)

def check_prompt(self, child, retries=30) -> CurrentPrompt:
def connect(self):
"""
connect to an IOC over telnet using pexpect and determine if we are
at the bootloader or IOC shell. If we are at the bootloader, we will
reboot the IOC into the IOC shell, we will also reboot if the ioc_reboot
flag was set in the constructor.
"""
self._child = pexpect.spawn(
self.command,
encoding="utf-8",
logfile=sys.stdout,
echo=False,
codec_errors="ignore",
)
try:
# first check for connection refusal
self._child.expect(self.NO_CONNECTION, timeout=1)
except pexpect.exceptions.TIMEOUT:
# if we timeout looking for failed connection that is good
pass
else:
print(">> Cannot connect to remote IOC, connection in use? <<")
raise CannotConnect

def check_prompt(self, retries=5) -> RtemsState:
"""
Determine if we are currently seeing an IOC shell prompt or
bootloader. Because there is a possibility that we are in the middle
of a reboot, we will retry for one before giving up.
"""
while retries > 0:
try:
child.sendline()
child.expect(self.MOT_PROMPT, timeout=1)
# see if we are in the IOC shell
self._child.sendline(self.IOC_CHECK)
self._child.expect(self.IOC_RESPONSE, timeout=1)
except pexpect.exceptions.TIMEOUT:
child.sendline(self.DBPF)
try:
child.expect(self.DBPF_RESPONSE, timeout=1)
# see if we are in the bootloader
self._child.sendline()
self._child.expect(self.MOT_PROMPT, timeout=1)
except pexpect.exceptions.TIMEOUT:
sleep(2)
# current state unknown. wait and retry
sleep(15)
else:
print("\n>> Currently in IOC shell <<\n")
return CurrentPrompt.IOC
self.report("Currently in bootloader")
return RtemsState.MOT
else:
print("\n>>> Currently in bootloader <<\n")
return CurrentPrompt.MOT
print(">> Retrying get current status ... <<")
retries -= 1
self.report("Currently in IOC shell")
return RtemsState.IOC

print("\n>> Current state UNKNOWN <<\n")
raise RuntimeError("Current state of remote IOC unknown")
self.report("Retrying get current status")
retries -= 1

def get_epics_prompt(self, child):
current = self.check_prompt(child)
if current != CurrentPrompt.IOC:
self.report("Current state UNKNOWN")
raise CannotConnect("Current state of remote IOC unknown")

def reboot(self, into: RtemsState):
"""
Reboot the board from IOC shell or bootloader and choose appropriate
options to get to the state requested by the into argument.
"""
self.report(f"Rebooting into {into.name}")
current_state = self.check_prompt()
if current_state == RtemsState.MOT:
self._child.sendline("reset")
else:
self._child.sendline("exit")

self._child.expect(self.CONTINUE, timeout=10)
if into == RtemsState.MOT:
# send escape to get into the bootloader
self._child.sendline(chr(27))
else:
# send space to boot the IOC
self._child.send(" ")

def get_epics_prompt(self):
"""
Get to the IOC shell prompt, if the IOC is not already running, reboot
it into the IOC shell. If the IOC is running, do a reboot only if
requested (in order to pick up new binaries/startup/epics db)
"""
current = self.check_prompt()
if current != RtemsState.IOC:
sleep(0.2)
child.send("reset\n")
child.expect(self.CONTINUE, timeout=10)

child.send(" ")
child.expect(self.IOC_STARTED, timeout=50)
self._child.reboot(RtemsState.IOC)
self.ioc_rebooted = True
self._child.expect(self.IOC_STARTED, timeout=50)
else:
if self._ioc_reboot and not self.ioc_rebooted:
self.ioc_rebooted = True
self.reboot(RtemsState.IOC)
self._child.expect(self.IOC_STARTED, timeout=50)

self.report("press enter for IOC shell prompt")

def get_boot_prompt(self):
"""
Get to the bootloader prompt, if the IOC shell is running then exit
and send appropriate commands to get to the bootloader
"""
current = self.check_prompt()
if current != RtemsState.MOT:
# get out of the IOC and return to MOT
self.reboot(RtemsState.MOT)
self._child.expect(self.MOT_PROMPT, timeout=20)

print(">> press enter for IOC shell prompt <<")
self.report("press enter for bootloader prompt")

def get_boot_prompt(self, child):
current = self.check_prompt(child)
if current != CurrentPrompt.MOT:
# get out of the IOC and return to MOT
child.sendline("exit")
try:
child.expect(self.REBOOTING, timeout=1)
except UnicodeDecodeError:
pass # there are illegal chars during the reboot process
child.expect(self.CONTINUE, timeout=10)
def close(self):
if self._child:
self._child.close()
self._child = None

child.send(chr(27))
child.expect(self.MOT_PROMPT, timeout=20)
def __del__(self):
self.close()

print(">> press enter for bootloader prompt <<")

def ioc_connect(host_and_port: str, reboot: bool = False):
"""
Entrypoint to make a connection to an RTEMS IOC over telnet.
Once connected, enters an interactive user session with the IOC.
def connect(host_and_port: str, reboot: bool = False, pause: bool = False):
telnet = TelnetRTEMS(host_and_port, reboot, pause)
args:
host_and_port: 'hostname:port' of the IOC to connect to
reboot: reboot the IOC to pick up new binaries/startup/epics db
"""
telnet = TelnetRTEMS(host_and_port, reboot)

child = pexpect.spawn(
telnet.command, encoding="utf-8", logfile=sys.stdout, echo=False
)
try:
child.expect(telnet.NO_CONNECTION, timeout=1)
except pexpect.exceptions.TIMEOUT:
pass
telnet.connect()
telnet.get_epics_prompt()
except (CannotConnect, pexpect.exceptions.TIMEOUT):
print("\n\nNot Connected. Exiting...")
telnet.close()
else:
print(">> Cannot connect to remote IOC, connection in use? <<")
child.close()
return

telnet.get_epics_prompt(child)

# telnet.get_boot_prompt(child)

child.close()

run_command(telnet.command)
telnet.close()
run_command(telnet.command)

0 comments on commit 1252f95

Please sign in to comment.