From 22f948a9e00d9220be0166352b6ee1ce7bb8c626 Mon Sep 17 00:00:00 2001 From: Lucas PASCAL Date: Wed, 5 Jul 2023 16:18:34 +0200 Subject: [PATCH] [clean] Splitting current 'Display' into a Display (managing the interface) and a Notifier (dispatching event from IOObjet or QtObject to the Display) --- .gitignore | 1 + speculos/api/api.py | 74 ++++++++++-------------------- speculos/main.py | 16 +++---- speculos/mcu/apdu.py | 4 +- speculos/mcu/button_tcp.py | 12 ++--- speculos/mcu/display.py | 71 ++++++++++++++++++----------- speculos/mcu/finger_tcp.py | 12 ++--- speculos/mcu/headless.py | 40 +++++++++------- speculos/mcu/screen.py | 91 +++++++++++++++++++------------------ speculos/mcu/screen_text.py | 30 +++++++----- speculos/mcu/seproxyhal.py | 64 +++++++++++++------------- speculos/mcu/vnc.py | 8 ++-- 12 files changed, 214 insertions(+), 209 deletions(-) diff --git a/.gitignore b/.gitignore index b41886a9..63f9ca1e 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ __version__.py *.pyc dist/ *egg-info +.coverage diff --git a/speculos/api/api.py b/speculos/api/api.py index da735826..f8491189 100644 --- a/speculos/api/api.py +++ b/speculos/api/api.py @@ -1,11 +1,11 @@ import socket import threading import pkg_resources -from typing import Dict +from typing import Any, Dict from flask import Flask from flask_restful import Api -from speculos.mcu.display import Display, IODevice +from speculos.mcu.display import DisplayNotifier, IODevice from speculos.mcu.readerror import ReadError from speculos.mcu.seproxyhal import SeProxyHal from speculos.observer import BroadcastInterface @@ -35,7 +35,7 @@ def __init__(self, api_port: int) -> None: def file(self): return self.sock - def can_read(self, s: int, screen: Display) -> None: + def can_read(self, s: int, screen: DisplayNotifier) -> None: assert s == self.fileno # Being able to read from the socket only happens when the API server exited. raise ReadError("API server exited") @@ -51,7 +51,7 @@ def _run(self) -> None: self._notify_exit.close() def start_server_thread(self, - screen_, + screen_: DisplayNotifier, seph_: SeProxyHal, automation_server: BroadcastInterface) -> None: wrapper = ApiWrapper(screen_, seph_, automation_server) @@ -61,61 +61,33 @@ def start_server_thread(self, class ApiWrapper: - def __init__(self, screen, seph: SeProxyHal, automation_server: BroadcastInterface): - self._screen = screen - self._seph = seph - self._set_app() + def __init__(self, screen: DisplayNotifier, seph: SeProxyHal, automation_server: BroadcastInterface): - self._api = Api(self.app) - self._api.add_resource(APDU, "/apdu", - resource_class_kwargs=self._seph_kwargs) - self._api.add_resource(Automation, "/automation", - resource_class_kwargs=self._seph_kwargs) - self._api.add_resource(Button, "/button/left", "/button/right", "/button/both", - resource_class_kwargs=self._seph_kwargs) - self._api.add_resource(Events, "/events", - resource_class_kwargs={**self._app_kwargs, - "automation_server": automation_server}) - self._api.add_resource(Finger, "/finger", - resource_class_kwargs=self._seph_kwargs) - self._api.add_resource(Screenshot, "/screenshot", - resource_class_kwargs=self._screen_kwargs) - self._api.add_resource(Swagger, "/swagger/", - resource_class_kwargs=self._app_kwargs) - self._api.add_resource(WebInterface, "/", - resource_class_kwargs=self._app_kwargs) - self._api.add_resource(Ticker, "/ticker/", - resource_class_kwargs=self._seph_kwargs) - - def _set_app(self): static_folder = pkg_resources.resource_filename(__name__, "/static") self._app = Flask(__name__, static_url_path="", static_folder=static_folder) self._app.env = "development" - @property - def _screen_kwargs(self): - return {"screen": self.screen} + screen_kwargs = {"screen": screen} + seph_kwargs = {"seph": seph} + app_kwargs = {"app": self._app} + event_kwargs: Dict[str, Any] = {**app_kwargs, "automation_server": automation_server} - @property - def _app_kwargs(self) -> Dict[str, Flask]: - return {"app": self.app} + self._api = Api(self.app) - @property - def _seph_kwargs(self) -> Dict[str, SeProxyHal]: - return {"seph": self.seph} + self._api.add_resource(APDU, "/apdu", resource_class_kwargs=seph_kwargs) + self._api.add_resource(Automation, "/automation", resource_class_kwargs=seph_kwargs) + self._api.add_resource(Button, + "/button/left", + "/button/right", + "/button/both", + resource_class_kwargs=seph_kwargs) + self._api.add_resource(Events, "/events", resource_class_kwargs=event_kwargs) + self._api.add_resource(Finger, "/finger", resource_class_kwargs=seph_kwargs) + self._api.add_resource(Screenshot, "/screenshot", resource_class_kwargs=screen_kwargs) + self._api.add_resource(Swagger, "/swagger/", resource_class_kwargs=app_kwargs) + self._api.add_resource(WebInterface, "/", resource_class_kwargs=app_kwargs) + self._api.add_resource(Ticker, "/ticker/", resource_class_kwargs=seph_kwargs) @property def app(self) -> Flask: return self._app - - @property - def api(self) -> Api: - return self._api - - @property - def screen(self): - return self._screen - - @property - def seph(self) -> SeProxyHal: - return self._seph diff --git a/speculos/main.py b/speculos/main.py index 77ee4c5c..5bffb38f 100644 --- a/speculos/main.py +++ b/speculos/main.py @@ -414,15 +414,13 @@ def main(prog=None) -> int: logger.error("--vnc-password can only be used with --vnc-port") sys.exit(1) - Screen: Type[display.Display] + ScreenNotifier: Type[display.DisplayNotifier] if args.display == 'text': - from .mcu.screen_text import TextScreen as Screen + from .mcu.screen_text import TextScreenNotifier as ScreenNotifier elif args.display == 'headless': - from .mcu.headless import Headless as Screen + from .mcu.headless import HeadlessNotifier as ScreenNotifier else: - # TODO: QtScreen does not derive from display.Display so the `Screen` typing is wrong - # There is still work to be done to clarify what the `Screen` interface should be here - from .mcu.screen import QtScreen as Screen + from .mcu.screen import QtScreenNotifier as ScreenNotifier if args.sdk and args.apiLevel: logger.error("Either SDK version or api level should be specified") @@ -518,13 +516,13 @@ def main(prog=None) -> int: args.keymap, zoom, x, y, args.force_full_ocr, args.disable_tesseract) server_args = ServerArgs(apdu, apirun, button, finger, seph, vnc) - screen = Screen(display_args, server_args) + screen_notifier = ScreenNotifier(display_args, server_args) if apirun is not None: assert automation_server is not None - apirun.start_server_thread(screen, seph, automation_server) + apirun.start_server_thread(screen_notifier, seph, automation_server) - screen.run() + screen_notifier.run() s2.close() _, status = os.waitpid(qemu_pid, 0) diff --git a/speculos/mcu/apdu.py b/speculos/mcu/apdu.py index 4ace5f13..d35dbf22 100644 --- a/speculos/mcu/apdu.py +++ b/speculos/mcu/apdu.py @@ -12,7 +12,7 @@ import socket from typing import Optional -from .display import IODevice, Display +from .display import IODevice, DisplayNotifier class ApduServer(IODevice): @@ -27,7 +27,7 @@ def __init__(self, host: str = '127.0.0.1', port: int = 9999, hid: bool = False) def file(self): return self.socket - def can_read(self, s: int, screen: Display): + def can_read(self, s: int, screen: DisplayNotifier): assert self.fileno == s c, _ = self.file.accept() self.client = ApduClient(c) diff --git a/speculos/mcu/button_tcp.py b/speculos/mcu/button_tcp.py index dd5c6515..c554cb08 100644 --- a/speculos/mcu/button_tcp.py +++ b/speculos/mcu/button_tcp.py @@ -14,7 +14,7 @@ import socket import time -from .display import Display, IODevice +from .display import DisplayNotifier, IODevice class FakeButtonClient(IODevice): @@ -33,14 +33,14 @@ def __init__(self, sock: socket.socket): def file(self): return self.socket - def _close(self, screen: Display): + def _cleanup(self, screen: DisplayNotifier): screen.remove_notifier(self.fileno) self.logger.debug("connection closed with fake button client") - def can_read(self, s: int, screen: Display): + def can_read(self, s: int, screen: DisplayNotifier): packet = self.file.recv(1) if packet == b'': - self._close(screen) + self._cleanup(screen) return for c in packet: @@ -48,7 +48,7 @@ def can_read(self, s: int, screen: Display): if c in self.actions.keys(): key, pressed = self.actions[c] self.logger.debug(f"button {key} release: {pressed}") - screen.seph.handle_button(key, pressed) + screen.display.seph.handle_button(key, pressed) time.sleep(0.1) else: self.logger.debug(f"ignoring byte {c!r}") @@ -66,7 +66,7 @@ def __init__(self, port: int): def file(self): return self.socket - def can_read(self, s: int, screen: Display): + def can_read(self, s: int, screen: DisplayNotifier): c, addr = self.file.accept() self.logger.debug("New client from %s", addr) client = FakeButtonClient(c) diff --git a/speculos/mcu/display.py b/speculos/mcu/display.py index d5ad01f0..49cca372 100644 --- a/speculos/mcu/display.py +++ b/speculos/mcu/display.py @@ -22,7 +22,7 @@ def fileno(self) -> int: return self.file.fileno() @abstractmethod - def can_read(self, s: int, screen: Display) -> None: + def can_read(self, s: int, screen: DisplayNotifier) -> None: pass @@ -170,34 +170,33 @@ def take_screenshot(self) -> Tuple[Tuple[int, int], bytes]: class Display(ABC): - def __init__(self, display: DisplayArgs, server: ServerArgs) -> None: - self.notifiers: Dict[int, Any] = {} - self._server = server - self._display = display + def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None: + self._server_args = server_args + self._display_args = display_args @property def apdu(self) -> Any: # ApduServer: - return self._server.apdu + return self._server_args.apdu @property def seph(self) -> Any: # SeProxyHal: - return self._server.seph + return self._server_args.seph @property def model(self) -> str: - return self._display.model + return self._display_args.model @property def force_full_ocr(self) -> bool: - return self._display.force_full_ocr + return self._display_args.force_full_ocr @property def disable_tesseract(self) -> bool: - return self._display.disable_tesseract + return self._display_args.disable_tesseract @property def rendering(self): - return self._display.rendering + return self._display_args.rendering @property @abstractmethod @@ -216,24 +215,44 @@ def display_raw_status(self, data: bytes): def screen_update(self) -> bool: pass - @abstractmethod - def run(self) -> None: - pass + def forward_to_app(self, packet: bytes): + self.seph.to_app(packet) + + def forward_to_apdu_client(self, packet): + self.apdu.forward_to_client(packet) + + +class DisplayNotifier(ABC): + + def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None: + # TODO: this should be Dict[int, IODevice], but in QtScreen, it is + # a QSocketNotifier, which has a completely different interface + # and is not used in the same way in the mcu/screen.py module. + self.notifiers: Dict[int, Any] = {} + self._server_args = server_args + self._display_args = display_args + self._display: Display + self.__init_notifiers() + + def _set_display_class(self, display_class: type): + self._display = display_class(self._display_args, self._server_args) + + @property + def display(self) -> Display: + return self._display - def add_notifier(self, klass: IODevice): - assert klass.fileno not in self.notifiers - self.notifiers[klass.fileno] = klass + def add_notifier(self, device: IODevice): + assert device.fileno not in self.notifiers + self.notifiers[device.fileno] = device def remove_notifier(self, fd: int): self.notifiers.pop(fd) - def _init_notifiers(self, args: ServerArgs) -> None: - for klass in args._asdict().values(): - if klass: - self.add_notifier(klass) - - def forward_to_app(self, packet: bytes): - self.seph.to_app(packet) + def __init_notifiers(self) -> None: + for device in self._server_args._asdict().values(): + if device: + self.add_notifier(device) - def forward_to_apdu_client(self, packet): - self.apdu.forward_to_client(packet) + @abstractmethod + def run(self) -> None: + pass diff --git a/speculos/mcu/finger_tcp.py b/speculos/mcu/finger_tcp.py index 6e130e15..5a0ae4b7 100644 --- a/speculos/mcu/finger_tcp.py +++ b/speculos/mcu/finger_tcp.py @@ -13,7 +13,7 @@ import socket from typing import List -from .display import Display, IODevice +from .display import DisplayNotifier, IODevice class FakeFingerClient(IODevice): @@ -25,14 +25,14 @@ def __init__(self, sock: socket.socket): def file(self): return self.socket - def _close(self, screen: Display): + def _cleanup(self, screen: DisplayNotifier): screen.remove_notifier(self.socket.fileno()) self.logger.debug("connection closed with fake button client") - def can_read(self, sock: int, screen: Display): + def can_read(self, sock: int, screen: DisplayNotifier): packet = self.socket.recv(100) if packet == b'': - self._close(screen) + self._cleanup(screen) return _actions: List[str] = packet.decode("ascii").split(',') @@ -43,7 +43,7 @@ def can_read(self, sock: int, screen: Display): y = int(action[1]) pressed = int(action[2]) self.logger.debug(f"touch event on ({x},{y}) coordinates, {'pressed' if pressed else 'release'}") - screen.seph.handle_finger(x, y, pressed) + screen.display.seph.handle_finger(x, y, pressed) class FakeFinger(IODevice): @@ -58,7 +58,7 @@ def __init__(self, port: int): def file(self): return self.socket - def can_read(self, s: int, screen: Display) -> None: + def can_read(self, s: int, screen: DisplayNotifier) -> None: c, addr = self.file.accept() self.logger.debug("New client from %s", addr) client = FakeFingerClient(c) diff --git a/speculos/mcu/headless.py b/speculos/mcu/headless.py index 92040b73..6795fbac 100644 --- a/speculos/mcu/headless.py +++ b/speculos/mcu/headless.py @@ -4,7 +4,7 @@ from speculos.observer import TextEvent from . import bagl from . import nbgl -from .display import Display, FrameBuffer, GraphicLibrary, MODELS +from .display import Display, DisplayNotifier, FrameBuffer, GraphicLibrary, MODELS from .readerror import ReadError from .struct import DisplayArgs, ServerArgs from .vnc import VNC @@ -13,7 +13,6 @@ class Headless(Display): def __init__(self, display: DisplayArgs, server: ServerArgs) -> None: super().__init__(display, server) - self._init_notifiers(server) self.m = HeadlessPaintWidget(self.model, server.vnc) self._gl: GraphicLibrary @@ -43,21 +42,6 @@ def screen_update(self) -> bool: assert isinstance(self.gl, bagl.Bagl) return self.gl.refresh() - def run(self) -> None: - while True: - _rlist = self.notifiers.keys() - if not _rlist: - break - - rlist, _, _ = select.select(_rlist, [], []) - try: - for fd in rlist: - self.notifiers[fd].can_read(fd, self) - - # This exception occur when can_read have no more data available - except ReadError: - break - class HeadlessPaintWidget(FrameBuffer): def __init__(self, model: str, vnc: Optional[VNC] = None): @@ -79,3 +63,25 @@ def _redraw(self) -> None: if self.vnc: self.vnc.redraw(self.pixels) self.update_screenshot() + + +class HeadlessNotifier(DisplayNotifier): + + def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None: + super().__init__(display_args, server_args) + self._set_display_class(Headless) + + def run(self) -> None: + while True: + _rlist = self.notifiers.keys() + if not _rlist: + break + + rlist, _, _ = select.select(_rlist, [], []) + try: + for fd in rlist: + self.notifiers[fd].can_read(fd, self) + + # This exception occur when can_read have no more data available + except ReadError: + break diff --git a/speculos/mcu/screen.py b/speculos/mcu/screen.py index 5d175826..899bb03a 100644 --- a/speculos/mcu/screen.py +++ b/speculos/mcu/screen.py @@ -1,3 +1,4 @@ +from functools import partial from PyQt5.QtWidgets import QApplication, QWidget, QMainWindow from PyQt5.QtGui import QPainter, QColor, QPixmap from PyQt5.QtGui import QIcon, QKeyEvent, QMouseEvent @@ -8,7 +9,7 @@ from speculos.observer import TextEvent from . import bagl from . import nbgl -from .display import COLORS, Display, FrameBuffer, GraphicLibrary, IODevice +from .display import COLORS, Display, DisplayNotifier, FrameBuffer, GraphicLibrary, IODevice from .readerror import ReadError from .struct import DisplayArgs, MODELS, ServerArgs from .vnc import VNC @@ -138,12 +139,12 @@ def __init__(self, qt_app: QApplication, display: DisplayArgs, server: ServerArg self.widget = PaintWidget(self, display.model, display.pixel_size, server.vnc) self.widget.move(self.box_position_x * display.pixel_size, self.box_position_y * display.pixel_size) self.widget.resize(self._width * display.pixel_size, self._height * display.pixel_size) - - self._screen = Screen(self, display, server) - self.setWindowIcon(QIcon('mcu/icon.png')) - self.show() + self._screen: Screen + + def set_screen(self, screen: Screen) -> None: + self._screen = screen def screen_update(self) -> bool: return self._screen.screen_update() @@ -197,44 +198,23 @@ def closeEvent(self, event: QEvent): class Screen(Display): def __init__(self, app: App, display: DisplayArgs, server: ServerArgs) -> None: - self.app = app super().__init__(display, server) - self._init_notifiers(server) + self.app: App self._gl: GraphicLibrary - if display.model != "stax": - self._gl = bagl.Bagl(app.widget, MODELS[display.model].screen_size) + + def set_app(self, app: App) -> None: + self.app = app + model = self._display_args.model + if model != "stax": + self._gl = bagl.Bagl(app.widget, MODELS[model].screen_size) else: - self._gl = nbgl.NBGL(app.widget, MODELS[display.model].screen_size, - display.force_full_ocr) + self._gl = nbgl.NBGL(app.widget, MODELS[model].screen_size, + self._display_args.force_full_ocr) @property def gl(self): return self._gl - def klass_can_read(self, klass: IODevice, s: int) -> None: - try: - klass.can_read(s, self) - - # This exception occur when can_read have no more data available - except ReadError: - self.app.close() - - def add_notifier(self, klass: IODevice) -> None: - n = QSocketNotifier(voidptr(klass.fileno), QSocketNotifier.Read, self.app) - n.activated.connect(lambda s: self.klass_can_read(klass, s)) - assert klass.fileno not in self.notifiers - self.notifiers[klass.fileno] = n - - def enable_notifier(self, fd: int, enabled: bool = True) -> None: - n = self.notifiers[fd] - n.setEnabled(enabled) - - def remove_notifier(self, fd: int) -> None: - # just in case - self.enable_notifier(fd, False) - n = self.notifiers.pop(fd) - n.disconnect() - def _key_event(self, event: QKeyEvent, pressed) -> None: key = Qt.Key(event.key()) if key in [Qt.Key_Left, Qt.Key_Right]: @@ -263,16 +243,39 @@ def display_raw_status(self, data: bytes) -> None: def screen_update(self) -> bool: return self.gl.refresh() - def run(self) -> None: - pass +class QtScreenNotifier(DisplayNotifier): + def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None: + super().__init__(display_args, server_args) + self._set_display_class(Screen) + self._qapp = QApplication([]) + self._app_widget = App(self._qapp, display_args, server_args) + assert isinstance(self.display, Screen) + self.display.set_app(self._app_widget) + + def _can_read(self, device: IODevice, s: int) -> None: + try: + device.can_read(s, self) + # This exception occur when can_read have no more data available + except ReadError: + self._app_widget.close() + + def add_notifier(self, device: IODevice) -> None: + n = QSocketNotifier(voidptr(device.fileno), QSocketNotifier.Read, self._qapp) + n.activated.connect(lambda s: self._can_read(device, s)) + assert device.fileno not in self.notifiers + self.notifiers[device.fileno] = n -class QtScreen: - def __init__(self, display: DisplayArgs, server: ServerArgs) -> None: - self.app = QApplication([]) - self.app_widget = App(self.app, display, server) - self.m = self.app_widget.widget + def enable_notifier(self, fd: int, enabled: bool = True) -> None: + n = self.notifiers[fd] + n.setEnabled(enabled) + + def remove_notifier(self, fd: int) -> None: + # just in case + self.enable_notifier(fd, False) + n = self.notifiers.pop(fd) + n.disconnect() def run(self): - self.app.exec_() - self.app.quit() + self._qapp.exec_() + self._qapp.quit() diff --git a/speculos/mcu/screen_text.py b/speculos/mcu/screen_text.py index 06505ed1..f4144455 100644 --- a/speculos/mcu/screen_text.py +++ b/speculos/mcu/screen_text.py @@ -7,7 +7,7 @@ from typing import Any, List from . import bagl -from .display import Display, FrameBuffer, MODELS +from .display import Display, DisplayNotifier, FrameBuffer, MODELS from .readerror import ReadError from .struct import DisplayArgs, ServerArgs @@ -100,20 +100,18 @@ def _redraw(self): class TextScreen(Display): - def __init__(self, display: DisplayArgs, server: ServerArgs) -> None: - super().__init__(display, server) + def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None: + super().__init__(display_args, server_args) - self.width, self.height = MODELS[display.model].screen_size - self.m = TextWidget(self, display.model) - if display.model != "stax": - self._gl = bagl.Bagl(self.m, MODELS[display.model].screen_size) + self.width, self.height = MODELS[display_args.model].screen_size + self.m = TextWidget(self, display_args.model) + if display_args.model != "stax": + self._gl = bagl.Bagl(self.m, MODELS[display_args.model].screen_size) else: raise NotImplementedError("This display can not emulate NBGL OS yet") - self._init_notifiers(server) - - if display.keymap is not None: - self.ARROW_KEYS = list(map(ord, display.keymap)) + if display_args.keymap is not None: + self.ARROW_KEYS = list(map(ord, display_args.keymap)) else: self.ARROW_KEYS = [curses.KEY_LEFT, curses.KEY_RIGHT, curses.KEY_DOWN] @@ -150,6 +148,13 @@ def get_keypress(self) -> bool: else: return True + +class TextScreenNotifier(DisplayNotifier): + + def __init__(self, display_args: DisplayArgs, server_args: ServerArgs) -> None: + super().__init__(display_args, server_args) + self._set_display_class(TextScreen) + def run(self) -> None: while True: rlist: List[Any] = list(self.notifiers.keys()) @@ -160,7 +165,8 @@ def run(self) -> None: rlist, _, _ = select.select(rlist, [], []) if sys.stdin in rlist: rlist.remove(sys.stdin) - if not self.get_keypress(): + assert isinstance(self.display, TextScreen) + if not self.display.get_keypress(): break try: for fd in rlist: diff --git a/speculos/mcu/seproxyhal.py b/speculos/mcu/seproxyhal.py index 062a1fdd..b99f5778 100644 --- a/speculos/mcu/seproxyhal.py +++ b/speculos/mcu/seproxyhal.py @@ -10,7 +10,7 @@ from speculos.observer import BroadcastInterface, TextEvent from . import usb from .automation import Automation -from .display import Display, IODevice +from .display import DisplayNotifier, IODevice from .nbgl import NBGL from .ocr import OCR from .readerror import ReadError @@ -298,11 +298,11 @@ def apply_automation(self): self.apply_automation_helper(event) self.events = [] - def _close(self, _: int, screen: Display): - screen.remove_notifier(self.fileno) + def _cleanup(self, notifier: DisplayNotifier): + notifier.remove_notifier(self.fileno) self.file.close() - def can_read(self, s: int, screen: Display): + def can_read(self, s: int, screen: DisplayNotifier): ''' Handle packet sent by the app. @@ -312,7 +312,7 @@ def can_read(self, s: int, screen: Display): tag, size, data = self.socket_helper.read_packet() if data is None: - self._close(s, screen) + self._cleanup(screen) raise ReadError("fd closed") self.logger.debug(f"received (tag: {tag:#04x}, size: {size:#04x}): {data!r}") @@ -322,19 +322,19 @@ def can_read(self, s: int, screen: Display): if self.refreshed: self.refreshed = False - if not screen.disable_tesseract: + if not screen.display.disable_tesseract: # Run the OCR - screen.gl.update_screenshot() - screen_size, image_data = screen.gl.take_screenshot() + screen.display.gl.update_screenshot() + screen_size, image_data = screen.display.gl.take_screenshot() self.ocr.analyze_image(screen_size, image_data) # Publish the new screenshot, we'll upload its associated events shortly - screen.gl.update_public_screenshot() + screen.display.gl.update_public_screenshot() - if screen.model != "stax" and screen.screen_update(): - if screen.model in ["nanox", "nanosp"]: + if screen.display.model != "stax" and screen.display.screen_update(): + if screen.display.model in ["nanox", "nanosp"]: self.events += self.ocr.get_events() - elif screen.model == "stax": + elif screen.display.model == "stax": self.events += self.ocr.get_events() # Apply automation rules after having received a GENERAL_STATUS_LAST_COMMAND tag. It allows the @@ -353,8 +353,8 @@ def can_read(self, s: int, screen: Display): SephTag.DBG_SCREEN_DISPLAY_STATUS, SephTag.BAGL_DRAW_RECT]: self.logger.debug(f"DISPLAY_STATUS {data!r}") - if screen.model not in ["nanox", "nanosp"] or tag == SephTag.BAGL_DRAW_RECT: - events = screen.display_status(data) + if screen.display.model not in ["nanox", "nanosp"] or tag == SephTag.BAGL_DRAW_RECT: + events = screen.display.display_status(data) if events: self.events += events if tag != SephTag.BAGL_DRAW_RECT: @@ -362,13 +362,13 @@ def can_read(self, s: int, screen: Display): elif tag in [SephTag.SCREEN_DISPLAY_RAW_STATUS, SephTag.BAGL_DRAW_BITMAP]: self.logger.debug("SephTag.SCREEN_DISPLAY_RAW_STATUS") - screen.display_raw_status(data) - if screen.model in ["nanox", "nanosp"]: + screen.display.display_raw_status(data) + if screen.display.model in ["nanox", "nanosp"]: self.ocr.analyze_bitmap(data) if tag != SephTag.BAGL_DRAW_BITMAP: self.socket_helper.send_packet(SephTag.DISPLAY_PROCESSED_EVENT) - if screen.rendering == RENDER_METHOD.PROGRESSIVE: - screen.screen_update() + if screen.display.rendering == RENDER_METHOD.PROGRESSIVE: + screen.display.screen_update() elif tag == SephTag.PRINTF_STATUS or tag == SephTag.PRINTC_STATUS: for b in [chr(b) for b in data]: @@ -377,11 +377,11 @@ def can_read(self, s: int, screen: Display): self.printf_queue = '' else: self.printf_queue += b - if screen.model in ["blue"]: + if screen.display.model in ["blue"]: self.socket_helper.send_packet(SephTag.DISPLAY_PROCESSED_EVENT) elif tag == SephTag.RAPDU: - screen.forward_to_apdu_client(data) + screen.display.forward_to_apdu_client(data) for c in self.apdu_callbacks: c(data) @@ -393,7 +393,7 @@ def can_read(self, s: int, screen: Display): if data: for c in self.apdu_callbacks: c(data) - screen.forward_to_apdu_client(data) + screen.display.forward_to_apdu_client(data) elif tag == SephTag.MCU: pass @@ -406,7 +406,7 @@ def can_read(self, s: int, screen: Display): elif tag == SephTag.SE_POWER_OFF: self.logger.warn("received tag SE_POWER_OFF, exiting") - self._close(s, screen) + self._cleanup(screen) raise ReadError("SE_POWER_OFF") elif tag == SephTag.REQUEST_STATUS: @@ -417,28 +417,28 @@ def can_read(self, s: int, screen: Display): pass elif tag == SephTag.NBGL_DRAW_RECT: - assert isinstance(screen.gl, NBGL) - screen.gl.hal_draw_rect(data) + assert isinstance(screen.display.gl, NBGL) + screen.display.gl.hal_draw_rect(data) elif tag == SephTag.NBGL_REFRESH: - assert isinstance(screen.gl, NBGL) - screen.gl.refresh(data) + assert isinstance(screen.display.gl, NBGL) + screen.display.gl.refresh(data) # Stax only # We have refreshed the screen, remember it for the next time we have SephTag.GENERAL_STATUS # then we'll perform a new OCR and make public the resulting screenshot / OCR analysis self.refreshed = True elif tag == SephTag.NBGL_DRAW_LINE: - assert isinstance(screen.gl, NBGL) - screen.gl.hal_draw_line(data) + assert isinstance(screen.display.gl, NBGL) + screen.display.gl.hal_draw_line(data) elif tag == SephTag.NBGL_DRAW_IMAGE: - assert isinstance(screen.gl, NBGL) - screen.gl.hal_draw_image(data) + assert isinstance(screen.display.gl, NBGL) + screen.display.gl.hal_draw_image(data) elif tag == SephTag.NBGL_DRAW_IMAGE_FILE: - assert isinstance(screen.gl, NBGL) - screen.gl.hal_draw_image_file(data) + assert isinstance(screen.display.gl, NBGL) + screen.display.gl.hal_draw_image_file(data) else: self.logger.error(f"unknown tag: {tag:#x}") diff --git a/speculos/mcu/vnc.py b/speculos/mcu/vnc.py index 7ead3305..bb117c7d 100644 --- a/speculos/mcu/vnc.py +++ b/speculos/mcu/vnc.py @@ -11,7 +11,7 @@ import sys from typing import IO, Optional, Tuple -from .display import Display, IODevice +from .display import DisplayNotifier, IODevice class VNC(IODevice): @@ -69,7 +69,7 @@ def redraw(self, pixels): self.p.stdin.write(buf) self.p.stdin.flush() - def can_read(self, s: int, screen: Display): + def can_read(self, s: int, screen: DisplayNotifier): '''Process a new keyboard or mouse event message from the VNC server.''' assert s == self.fileno @@ -87,11 +87,11 @@ def can_read(self, s: int, screen: Display): x = int.from_bytes(data[0:2], 'little') y = int.from_bytes(data[2:4], 'little') pressed = (data[4] != 0x00) - screen.seph.handle_finger(x, y, pressed) + screen.display.seph.handle_finger(x, y, pressed) elif data[4] in [0x10, 0x11]: # keyboard button = data[0] pressed = (data[4] == 0x11) - screen.seph.handle_button(button, pressed) + screen.display.seph.handle_button(button, pressed) else: self.logger.error("invalid message from the VNC server")