Skip to content

Commit

Permalink
[clean] Splitting current 'Display' into a Display (managing the inte…
Browse files Browse the repository at this point in the history
…rface) and a Notifier (dispatching event from IOObjet or QtObject to the Display)
  • Loading branch information
lpascal-ledger committed Jul 5, 2023
1 parent 9089ee7 commit aabb9c8
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 210 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/checkout@v3
- run: pip install mypy types-requests types-setuptools PyQt5-stubs
- name: Mypy type checking
run: mypy speculos || echo 0
run: mypy speculos

bandit:
name: Security checking
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ __version__.py
*.pyc
dist/
*egg-info
.coverage
74 changes: 23 additions & 51 deletions speculos/api/api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import socket
import threading
import pkg_resources
from typing import Dict
from typing import Any, Dict
from flask import Flask
from flask_restful import Api

from speculos.mcu.display import Display, IODevice
from speculos.mcu.display import DisplayNotifier, IODevice
from speculos.mcu.readerror import ReadError
from speculos.mcu.seproxyhal import SeProxyHal
from speculos.observer import BroadcastInterface
Expand Down Expand Up @@ -35,7 +35,7 @@ def __init__(self, api_port: int) -> None:
def file(self):
return self.sock

def can_read(self, s: int, screen: Display) -> None:
def can_read(self, s: int, screen: DisplayNotifier) -> None:
assert s == self.fileno
# Being able to read from the socket only happens when the API server exited.
raise ReadError("API server exited")
Expand All @@ -51,7 +51,7 @@ def _run(self) -> None:
self._notify_exit.close()

def start_server_thread(self,
screen_,
screen_: DisplayNotifier,
seph_: SeProxyHal,
automation_server: BroadcastInterface) -> None:
wrapper = ApiWrapper(screen_, seph_, automation_server)
Expand All @@ -61,61 +61,33 @@ def start_server_thread(self,


class ApiWrapper:
def __init__(self, screen, seph: SeProxyHal, automation_server: BroadcastInterface):
self._screen = screen
self._seph = seph
self._set_app()
def __init__(self, screen: DisplayNotifier, seph: SeProxyHal, automation_server: BroadcastInterface):

self._api = Api(self.app)
self._api.add_resource(APDU, "/apdu",
resource_class_kwargs=self._seph_kwargs)
self._api.add_resource(Automation, "/automation",
resource_class_kwargs=self._seph_kwargs)
self._api.add_resource(Button, "/button/left", "/button/right", "/button/both",
resource_class_kwargs=self._seph_kwargs)
self._api.add_resource(Events, "/events",
resource_class_kwargs={**self._app_kwargs,
"automation_server": automation_server})
self._api.add_resource(Finger, "/finger",
resource_class_kwargs=self._seph_kwargs)
self._api.add_resource(Screenshot, "/screenshot",
resource_class_kwargs=self._screen_kwargs)
self._api.add_resource(Swagger, "/swagger/",
resource_class_kwargs=self._app_kwargs)
self._api.add_resource(WebInterface, "/",
resource_class_kwargs=self._app_kwargs)
self._api.add_resource(Ticker, "/ticker/",
resource_class_kwargs=self._seph_kwargs)

def _set_app(self):
static_folder = pkg_resources.resource_filename(__name__, "/static")
self._app = Flask(__name__, static_url_path="", static_folder=static_folder)
self._app.env = "development"

@property
def _screen_kwargs(self):
return {"screen": self.screen}
screen_kwargs = {"screen": screen}
seph_kwargs = {"seph": seph}
app_kwargs = {"app": self._app}
event_kwargs: Dict[str, Any] = {**app_kwargs, "automation_server": automation_server}

@property
def _app_kwargs(self) -> Dict[str, Flask]:
return {"app": self.app}
self._api = Api(self.app)

@property
def _seph_kwargs(self) -> Dict[str, SeProxyHal]:
return {"seph": self.seph}
self._api.add_resource(APDU, "/apdu", resource_class_kwargs=seph_kwargs)
self._api.add_resource(Automation, "/automation", resource_class_kwargs=seph_kwargs)
self._api.add_resource(Button,
"/button/left",
"/button/right",
"/button/both",
resource_class_kwargs=seph_kwargs)
self._api.add_resource(Events, "/events", resource_class_kwargs=event_kwargs)
self._api.add_resource(Finger, "/finger", resource_class_kwargs=seph_kwargs)
self._api.add_resource(Screenshot, "/screenshot", resource_class_kwargs=screen_kwargs)
self._api.add_resource(Swagger, "/swagger/", resource_class_kwargs=app_kwargs)
self._api.add_resource(WebInterface, "/", resource_class_kwargs=app_kwargs)
self._api.add_resource(Ticker, "/ticker/", resource_class_kwargs=seph_kwargs)

@property
def app(self) -> Flask:
return self._app

@property
def api(self) -> Api:
return self._api

@property
def screen(self):
return self._screen

@property
def seph(self) -> SeProxyHal:
return self._seph
16 changes: 7 additions & 9 deletions speculos/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,13 @@ def main(prog=None) -> int:
logger.error("--vnc-password can only be used with --vnc-port")
sys.exit(1)

Screen: Type[display.Display]
ScreenNotifier: Type[display.DisplayNotifier]
if args.display == 'text':
from .mcu.screen_text import TextScreen as Screen
from .mcu.screen_text import TextScreenNotifier as ScreenNotifier
elif args.display == 'headless':
from .mcu.headless import Headless as Screen
from .mcu.headless import HeadlessNotifier as ScreenNotifier
else:
# TODO: QtScreen does not derive from display.Display so the `Screen` typing is wrong
# There is still work to be done to clarify what the `Screen` interface should be here
from .mcu.screen import QtScreen as Screen
from .mcu.screen import QtScreenNotifier as ScreenNotifier

if args.sdk and args.apiLevel:
logger.error("Either SDK version or api level should be specified")
Expand Down Expand Up @@ -518,13 +516,13 @@ def main(prog=None) -> int:
args.keymap, zoom, x, y, args.force_full_ocr,
args.disable_tesseract)
server_args = ServerArgs(apdu, apirun, button, finger, seph, vnc)
screen = Screen(display_args, server_args)
screen_notifier = ScreenNotifier(display_args, server_args)

if apirun is not None:
assert automation_server is not None
apirun.start_server_thread(screen, seph, automation_server)
apirun.start_server_thread(screen_notifier, seph, automation_server)

screen.run()
screen_notifier.run()

s2.close()
_, status = os.waitpid(qemu_pid, 0)
Expand Down
4 changes: 2 additions & 2 deletions speculos/mcu/apdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import socket
from typing import Optional

from .display import IODevice, Display
from .display import IODevice, DisplayNotifier


class ApduServer(IODevice):
Expand All @@ -27,7 +27,7 @@ def __init__(self, host: str = '127.0.0.1', port: int = 9999, hid: bool = False)
def file(self):
return self.socket

def can_read(self, s: int, screen: Display):
def can_read(self, s: int, screen: DisplayNotifier):
assert self.fileno == s
c, _ = self.file.accept()
self.client = ApduClient(c)
Expand Down
12 changes: 6 additions & 6 deletions speculos/mcu/button_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import socket
import time

from .display import Display, IODevice
from .display import DisplayNotifier, IODevice


class FakeButtonClient(IODevice):
Expand All @@ -33,22 +33,22 @@ def __init__(self, sock: socket.socket):
def file(self):
return self.socket

def _close(self, screen: Display):
def _cleanup(self, screen: DisplayNotifier):
screen.remove_notifier(self.fileno)
self.logger.debug("connection closed with fake button client")

def can_read(self, s: int, screen: Display):
def can_read(self, s: int, screen: DisplayNotifier):
packet = self.file.recv(1)
if packet == b'':
self._close(screen)
self._cleanup(screen)
return

for c in packet:
c = chr(c)
if c in self.actions.keys():
key, pressed = self.actions[c]
self.logger.debug(f"button {key} release: {pressed}")
screen.seph.handle_button(key, pressed)
screen.display.seph.handle_button(key, pressed)
time.sleep(0.1)
else:
self.logger.debug(f"ignoring byte {c!r}")
Expand All @@ -66,7 +66,7 @@ def __init__(self, port: int):
def file(self):
return self.socket

def can_read(self, s: int, screen: Display):
def can_read(self, s: int, screen: DisplayNotifier):
c, addr = self.file.accept()
self.logger.debug("New client from %s", addr)
client = FakeButtonClient(c)
Expand Down
71 changes: 45 additions & 26 deletions speculos/mcu/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def fileno(self) -> int:
return self.file.fileno()

@abstractmethod
def can_read(self, s: int, screen: Display) -> None:
def can_read(self, s: int, screen: DisplayNotifier) -> None:
pass


Expand Down Expand Up @@ -170,34 +170,33 @@ def take_screenshot(self) -> Tuple[Tuple[int, int], bytes]:


class Display(ABC):
def __init__(self, display: DisplayArgs, server: ServerArgs) -> None:
self.notifiers: Dict[int, Any] = {}
self._server = server
self._display = display
def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None:
self._server_args = server_args
self._display_args = display_args

@property
def apdu(self) -> Any: # ApduServer:
return self._server.apdu
return self._server_args.apdu

@property
def seph(self) -> Any: # SeProxyHal:
return self._server.seph
return self._server_args.seph

@property
def model(self) -> str:
return self._display.model
return self._display_args.model

@property
def force_full_ocr(self) -> bool:
return self._display.force_full_ocr
return self._display_args.force_full_ocr

@property
def disable_tesseract(self) -> bool:
return self._display.disable_tesseract
return self._display_args.disable_tesseract

@property
def rendering(self):
return self._display.rendering
return self._display_args.rendering

@property
@abstractmethod
Expand All @@ -216,24 +215,44 @@ def display_raw_status(self, data: bytes):
def screen_update(self) -> bool:
pass

@abstractmethod
def run(self) -> None:
pass
def forward_to_app(self, packet: bytes):
self.seph.to_app(packet)

def forward_to_apdu_client(self, packet):
self.apdu.forward_to_client(packet)


class DisplayNotifier(ABC):

def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None:
# TODO: this should be Dict[int, IODevice], but in QtScreen, it is
# a QSocketNotifier, which has a completely different interface
# and is not used in the same way in the mcu/screen.py module.
self.notifiers: Dict[int, Any] = {}
self._server_args = server_args
self._display_args = display_args
self._display: Display
self.__init_notifiers()

def _set_display_class(self, display_class: type):
self._display = display_class(self._display_args, self._server_args)

@property
def display(self) -> Display:
return self._display

def add_notifier(self, klass: IODevice):
assert klass.fileno not in self.notifiers
self.notifiers[klass.fileno] = klass
def add_notifier(self, device: IODevice):
assert device.fileno not in self.notifiers
self.notifiers[device.fileno] = device

def remove_notifier(self, fd: int):
self.notifiers.pop(fd)

def _init_notifiers(self, args: ServerArgs) -> None:
for klass in args._asdict().values():
if klass:
self.add_notifier(klass)

def forward_to_app(self, packet: bytes):
self.seph.to_app(packet)
def __init_notifiers(self) -> None:
for device in self._server_args._asdict().values():
if device:
self.add_notifier(device)

def forward_to_apdu_client(self, packet):
self.apdu.forward_to_client(packet)
@abstractmethod
def run(self) -> None:
pass
12 changes: 6 additions & 6 deletions speculos/mcu/finger_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import socket
from typing import List

from .display import Display, IODevice
from .display import DisplayNotifier, IODevice


class FakeFingerClient(IODevice):
Expand All @@ -25,14 +25,14 @@ def __init__(self, sock: socket.socket):
def file(self):
return self.socket

def _close(self, screen: Display):
def _cleanup(self, screen: DisplayNotifier):
screen.remove_notifier(self.socket.fileno())
self.logger.debug("connection closed with fake button client")

def can_read(self, sock: int, screen: Display):
def can_read(self, sock: int, screen: DisplayNotifier):
packet = self.socket.recv(100)
if packet == b'':
self._close(screen)
self._cleanup(screen)
return

_actions: List[str] = packet.decode("ascii").split(',')
Expand All @@ -43,7 +43,7 @@ def can_read(self, sock: int, screen: Display):
y = int(action[1])
pressed = int(action[2])
self.logger.debug(f"touch event on ({x},{y}) coordinates, {'pressed' if pressed else 'release'}")
screen.seph.handle_finger(x, y, pressed)
screen.display.seph.handle_finger(x, y, pressed)


class FakeFinger(IODevice):
Expand All @@ -58,7 +58,7 @@ def __init__(self, port: int):
def file(self):
return self.socket

def can_read(self, s: int, screen: Display) -> None:
def can_read(self, s: int, screen: DisplayNotifier) -> None:
c, addr = self.file.accept()
self.logger.debug("New client from %s", addr)
client = FakeFingerClient(c)
Expand Down
Loading

0 comments on commit aabb9c8

Please sign in to comment.