diff --git a/.vscode/launch.json b/.vscode/launch.json index f65cb376..214937c9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,18 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "name": "Eiger", + "type": "python", + "request": "launch", + "module": "tickit", + "justMyCode": false, + "console": "integratedTerminal", + "args": [ + "all", + "examples/configs/eiger/eiger.yaml" + ] + }, { "name": "Debug Unit Test", "type": "python", diff --git a/pyproject.toml b/pyproject.toml index 096aa30d..f3b54956 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,11 @@ classifiers = [ ] description = "Devices for tickit, an event-based device simulation framework" dependencies = [ - "tickit<0.2.3", + "tickit==0.2.3", "typing_extensions", "softioc", + "pydantic>1", + "apischema" ] dynamic = ["version"] license.file = "LICENSE" diff --git a/src/tickit_devices/eiger/__init__.py b/src/tickit_devices/eiger/__init__.py index 38b99fc8..6bb33489 100644 --- a/src/tickit_devices/eiger/__init__.py +++ b/src/tickit_devices/eiger/__init__.py @@ -22,6 +22,6 @@ def __call__(self) -> Component: # noqa: D102 device=EigerDevice(), adapters=[ EigerRESTAdapter(host=self.host, port=self.port), - EigerZMQAdapter(zmq_host=self.zmq_host, zmq_port=self.zmq_port), + EigerZMQAdapter(host=self.zmq_host, port=self.zmq_port), ], ) diff --git a/src/tickit_devices/eiger/data/dummy_image.py b/src/tickit_devices/eiger/data/dummy_image.py index bff90699..aedd554e 100644 --- a/src/tickit_devices/eiger/data/dummy_image.py +++ b/src/tickit_devices/eiger/data/dummy_image.py @@ -1,6 +1,5 @@ from dataclasses import dataclass -from pathlib import Path -from typing import List +from typing import List, Tuple @dataclass @@ -12,9 +11,10 @@ class Image: dtype: str data: bytes encoding: str + shape: Tuple[int, int] @classmethod - def create_dummy_image(cls, index: int) -> "Image": + def create_dummy_image(cls, index: int, shape: Tuple[int, int]) -> "Image": """Returns an Image object wrapping the dummy blob using the metadata provided. Args: @@ -27,7 +27,7 @@ def create_dummy_image(cls, index: int) -> "Image": hsh = str(hash(data)) dtype = "uint16" encoding = "bs16-lz4<" - return Image(index, hsh, dtype, data, encoding) + return Image(index, hsh, dtype, data, encoding, shape) _DUMMY_IMAGE_BLOBS: List[bytes] = [] @@ -44,8 +44,7 @@ def dummy_image_blob() -> bytes: """ if not _DUMMY_IMAGE_BLOBS: with open( - Path(Path(__file__).parent.parent, "resources", "frame_sample"), - "rb", + "src/tickit_devices/eiger/resources/frame_sample", "rb" ) as frame_file: _DUMMY_IMAGE_BLOBS.append(frame_file.read()) return _DUMMY_IMAGE_BLOBS[0] diff --git a/src/tickit_devices/eiger/data/schema.py b/src/tickit_devices/eiger/data/schema.py new file mode 100644 index 00000000..de9f20d5 --- /dev/null +++ b/src/tickit_devices/eiger/data/schema.py @@ -0,0 +1,73 @@ +from typing import Any, Dict, Iterable, Tuple, Union + +from pydantic.v1 import BaseModel +from zmq import Frame + +Json = Dict[str, Any] +Sendable = Union[bytes, Frame, memoryview] +MultipartMessage = Iterable[Sendable] + +DEFAULT_HEADER_TYPE = "dheader-1.0" + + +class AcquisitionSeriesHeader(BaseModel): + """Sent before a series of images (and associated headers).""" + + header_detail: str + series: int + htype: str = DEFAULT_HEADER_TYPE + + +class AcquisitionSeriesFooter(BaseModel): + """Sent at the end of a series of images (and associated headers).""" + + series: int + htype: str = "dseries_end-1.0" + + +class AcquisitionDetailsHeader(BaseModel): + """Describes an additional dataset sent at the beginning of a series. + + Used when header_detail is set to all in AcquisitionSeriesHeader. + """ + + htype: str = DEFAULT_HEADER_TYPE + shape: Tuple[int, int] + type: str + + +class ImageHeader(BaseModel): + """Sent before a detector image blob. + + Metadata about the acquisition operation. + """ + + frame: int + hash: str + series: int + htype: str = "dimage-1.0" + + +class ImageCharacteristicsHeader(BaseModel): + """Sent before a detector image blob. + + Metadata about the image. + """ + + encoding: str + shape: Tuple[int, int] + size: int + type: str + htype: str = "dimage_d-1.0" + + +class ImageConfigHeader(BaseModel): + """Sent before a detector image blob. + + Describes the metrics on the image acquisition. + """ + + real_time: float + start_time: float + stop_time: float + htype: str = "dconfig-1.0" diff --git a/src/tickit_devices/eiger/eiger.py b/src/tickit_devices/eiger/eiger.py index 93c70fa0..e783f3e2 100644 --- a/src/tickit_devices/eiger/eiger.py +++ b/src/tickit_devices/eiger/eiger.py @@ -1,51 +1,68 @@ +import asyncio import logging -from dataclasses import fields -from typing import TypedDict +from queue import Queue +from typing import Optional -from apischema import serialize from tickit.core.device import Device, DeviceUpdate from tickit.core.typedefs import SimTime +from typing_extensions import TypedDict from tickit_devices.eiger.data.dummy_image import Image -from tickit_devices.eiger.eiger_schema import AccessMode, Value from tickit_devices.eiger.eiger_settings import EigerSettings from tickit_devices.eiger.filewriter.filewriter_config import FileWriterConfig from tickit_devices.eiger.filewriter.filewriter_status import FileWriterStatus from tickit_devices.eiger.monitor.monitor_config import MonitorConfig from tickit_devices.eiger.monitor.monitor_status import MonitorStatus -from tickit_devices.eiger.stream.stream_config import StreamConfig -from tickit_devices.eiger.stream.stream_status import StreamStatus +from tickit_devices.eiger.stream.eiger_stream import EigerStream from .eiger_status import EigerStatus, State -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger("Eiger") class EigerDevice(Device): - """A device class for the Eiger detector.""" + """Simulation logic for the Eiger detector. + + The simulation acquires frames based on the commands called. It supports + the following state transitions: + + NA -> IDLE + IDLE -> READY + READY -> IDLE + READY -> ACQUIRING + ACQUIRING -> READY + ACQUIRING -> IDLE + """ settings: EigerSettings status: EigerStatus + stream: EigerStream + + _num_frames_left: int + _data_queue: Queue #: An empty typed mapping of input values - Inputs: TypedDict = TypedDict("Inputs", {"flux": float}) + Inputs: TypedDict = TypedDict("Inputs", {"trigger": bool}, total=False) #: A typed mapping containing the 'value' output value Outputs: TypedDict = TypedDict("Outputs", {}) def __init__( self, + settings: Optional[EigerSettings] = None, + status: Optional[EigerStatus] = None, + stream: Optional[EigerStream] = None, ) -> None: - """An Eiger device constructor. + """Construct a new eiger. - An Eiger device constructor which configures the default settings and various - states of the device. + Args: + settings: Eiger settings. Defaults to None. + status: Starting status. Defaults to None. + stream: Data stream handler. Defaults to None. """ - self.settings = EigerSettings() - self.status = EigerStatus() + self.settings = settings or EigerSettings() + self.status = status or EigerStatus() - self.stream_status = StreamStatus() - self.stream_config = StreamConfig() - self.stream_callback_period = SimTime(int(1e9)) + self.stream = stream or EigerStream(callback_period=SimTime(int(1e9))) self.filewriter_status: FileWriterStatus = FileWriterStatus() self.filewriter_config: FileWriterConfig = FileWriterConfig() @@ -55,142 +72,145 @@ def __init__( self.monitor_config: MonitorConfig = MonitorConfig() self.monitor_callback_period = SimTime(int(1e9)) - async def initialize(self) -> None: - """Function to initialise the Eiger.""" - self._set_state(State.IDLE) + self._num_frames_left: int = 0 + self._total_frames: int = 0 + self._data_queue: Queue = Queue() + self._series_id: int = 0 - async def arm(self) -> None: - """Function to arm the Eiger.""" - self._set_state(State.READY) + self._finished_aquisition: Optional[asyncio.Event] = None - header_detail = self.stream_config["header_detail"]["value"] + @property + def finished_aquisition(self) -> asyncio.Event: + """Event that is set when an acqusition series is complete. - json = { - "htype": "dheader-1.0", - "series": "", - "header_detail": header_detail, - } - if header_detail != "none": - config_json = {} - disallowed_configs = ["flatfield", "pixelmask" "countrate_correction_table"] - for field_ in fields(self.settings): - if field_.name not in disallowed_configs: - config_json[field_.name] = vars(self.settings)[field_.name] + Property ensures the event is created. + """ + if self._finished_aquisition is None: + self._finished_aquisition = asyncio.Event() - LOGGER.debug(json) - LOGGER.debug(config_json) + return self._finished_aquisition - async def disarm(self) -> None: - """Function to disarm the Eiger.""" + async def initialize(self) -> None: + """Initialize the detector. + + Required for all subsequent operations. + """ self._set_state(State.IDLE) - json = {"htype": "dseries_end-1.0", "series": ""} + async def arm(self) -> None: + """Arm the detector. - LOGGER.debug(json) + Required for triggering. + """ + self._series_id += 1 + self.stream.begin_series(self.settings, self._series_id) + self._num_frames_left = self.settings.nimages + self._set_state(State.READY) - async def trigger(self) -> str: - """Function to trigger the Eiger. + async def disarm(self) -> None: + """Disarm the detector. - If the detector is in an external trigger mode, this is disabled as - this software command interface only works for internal triggers. + Intended for use when armed. See state diagram in class docstring. """ + self._set_state(State.IDLE) + self.stream.end_series(self._series_id) + + async def trigger(self) -> None: + """Trigger the detector. + + If the detector is in INTS mode, it will begin acquiring frames the + next time update() is called. If it is in EXTS mode, this call will + be ignored and acquisition will start based on the parameter to + update(). + INTE and EXTE mode are currently not supported. + """ + LOGGER.info("Trigger requested") trigger_mode = self.settings.trigger_mode - state = self.status.state - - if state == State.READY and trigger_mode == "ints": - self._set_state(State.ACQUIRE) - - for idx in range(0, self.settings.nimages): - aquired = Image.create_dummy_image(idx) - - header_json = { - "htype": "dimage-1.0", - "series": "", - "frame": aquired.index, - "hash": aquired.hash, - } - - json2 = { - "htype": "dimage_d-1.0", - "shape": "[x,y,(z)]", - "type": aquired.dtype, - "encoding": aquired.encoding, - "size": len(aquired.data), - } - - json3 = { - "htype": "dconfig-1.0", - "start_time": "", - "stop_time": "", - "real_time": "", - } - - LOGGER.debug(header_json) - LOGGER.debug(json2) - LOGGER.debug(json3) - - return "Aquiring Data from Eiger..." + + if self._is_in_state(State.READY) and trigger_mode == "ints": + self._begin_acqusition_mode() else: - return ( - f"Ignoring trigger, state={self.status.state}," + LOGGER.info( + f"Ignoring trigger, state={self.get_state()}," f"trigger_mode={trigger_mode}" ) async def cancel(self) -> None: - """Function to stop the data acquisition. + """Cancel acquisition. - Function to stop the data acquisition, but only after the next - image is finished. + The detector will stop acquiring frames after the next full frame is taken, + it will then return to a READY state as though it has just been armed. """ self._set_state(State.READY) - - header_json = {"htype": "dseries_end-1.0", "series": ""} - - LOGGER.debug(header_json) + self.stream.end_series(self._series_id) async def abort(self) -> None: - """Function to abort the current task on the Eiger.""" - self._set_state(State.IDLE) + """Abort acquisition. - header_json = {"htype": "dseries_end-1.0", "series": ""} - - LOGGER.debug(header_json) + The detector will immediately stop acquiring frames and disarm itself. + """ + self._set_state(State.IDLE) + self.stream.end_series(self._series_id) def update(self, time: SimTime, inputs: Inputs) -> DeviceUpdate[Outputs]: - """Generic update function to update the values of the ExampleHTTPDevice. + """Update the detector. - Args: - time (SimTime): The simulation time in nanoseconds. - inputs (Inputs): A TypedDict of the inputs to the ExampleHTTPDevice. + Depending on the detector's current state, will begin, continue or + clean up an acquisition series. - Returns: - DeviceUpdate[Outputs]: - The produced update event which contains the value of the device - variables. + Args: + time: The current simulation time (in nanoseconds). + inputs: A mapping of device inputs and their values. """ - current_flux = inputs["flux"] - - intensity_scale = (current_flux / 100) * 100 - LOGGER.debug(f"Relative beam intensity: {intensity_scale}") + if self._is_in_state(State.ACQUIRE): + if self._num_frames_left > 0: + self._acquire_frame() + + return DeviceUpdate( + self.Outputs(), SimTime(time + int(self.settings.frame_time * 1e9)) + ) + else: + self.finished_aquisition.set() + + LOGGER.debug("Ending Series...") + self._set_state(State.IDLE) + self.stream.end_series(self._series_id) + if inputs.get("trigger", False): + self._begin_acqusition_mode() + # Should have another update immediately to begin acquisition + return DeviceUpdate(self.Outputs(), SimTime(time)) return DeviceUpdate(self.Outputs(), None) - def get_state(self): # TODO: Add return type hint - """Returns the current state of the Eiger. + def _begin_acqusition_mode(self) -> None: + self._set_state(State.ACQUIRE) + LOGGER.info("Now in acquiring mode") + self.finished_aquisition.clear() + + def _acquire_frame(self) -> None: + frame_id = self.settings.nimages - self._num_frames_left + LOGGER.debug(f"Frame id {frame_id}") + + shape = ( + self.settings.x_pixels_in_detector, + self.settings.y_pixels_in_detector, + ) + image = Image.create_dummy_image(frame_id, shape) + self.stream.insert_image(image, self._series_id) + self._num_frames_left -= 1 + LOGGER.debug(f"Frames left: {self._num_frames_left}") + + def get_state(self) -> State: + """Get the eiger's current state Returns: - State: The state of the Eiger. + State: The state the detector is in. + See state diagram in class docstring. """ - val = self.status.state - allowed = [s.value for s in State] - return serialize( - Value( - val, - AccessMode.STRING, # type: ignore - access_mode=AccessMode.READ_ONLY, - allowed_values=allowed, - ) - ) + return self.status.state - def _set_state(self, state: State): + def _set_state(self, state: State) -> None: self.status.state = state + + def _is_in_state(self, state: State) -> bool: + return self.get_state() is state diff --git a/src/tickit_devices/eiger/eiger_adapters.py b/src/tickit_devices/eiger/eiger_adapters.py index 3a7f0faa..6e7c085c 100644 --- a/src/tickit_devices/eiger/eiger_adapters.py +++ b/src/tickit_devices/eiger/eiger_adapters.py @@ -1,32 +1,30 @@ -import json import logging from aiohttp import web from apischema import serialize -from tickit.adapters.httpadapter import HTTPAdapter -from tickit.adapters.interpreters.endpoints.http_endpoint import HTTPEndpoint -from tickit.adapters.zmqadapter import ZeroMQAdapter +from tickit.adapters.httpadapter import HttpAdapter +from tickit.adapters.interpreters.endpoints.http_endpoint import HttpEndpoint +from tickit.adapters.zeromq.push_adapter import ZeroMqPushAdapter from tickit_devices.eiger.eiger import EigerDevice -from tickit_devices.eiger.eiger_schema import AccessMode, SequenceComplete, Value +from tickit_devices.eiger.eiger_schema import SequenceComplete, Value, construct_value from tickit_devices.eiger.eiger_status import State -from tickit_devices.eiger.filewriter.eiger_filewriter import EigerFileWriterAdapter -from tickit_devices.eiger.monitor.eiger_monitor import EigerMonitorAdapter -from tickit_devices.eiger.stream.eiger_stream import EigerStreamAdapter -DETECTOR_API = "detector/api/1.8.0" +API_VERSION = "1.8.0" +DETECTOR_API = f"detector/api/{API_VERSION}" +STREAM_API = f"stream/api/{API_VERSION}" +MONITOR_API = "monitor/api/1.8.0" +FILEWRITER_API = "filewriter/api/1.8.0" -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger("EigerAdapter") -class EigerRESTAdapter( - HTTPAdapter, EigerStreamAdapter, EigerMonitorAdapter, EigerFileWriterAdapter -): +class EigerRESTAdapter(HttpAdapter): """An Eiger adapter which parses the commands sent to the HTTP server.""" - device: EigerDevice # type: ignore + device: EigerDevice - @HTTPEndpoint.get(f"/{DETECTOR_API}" + "/config/{parameter_name}") + @HttpEndpoint.get(f"/{DETECTOR_API}" + "/config/{parameter_name}") async def get_config(self, request: web.Request) -> web.Response: """A HTTP Endpoint for requesting configuration variables from the Eiger. @@ -40,29 +38,14 @@ async def get_config(self, request: web.Request) -> web.Response: param = request.match_info["parameter_name"] if hasattr(self.device.settings, param): - attr = self.device.settings[param] - - data = serialize( - Value( - attr["value"], - attr["metadata"]["value_type"].value, - access_mode=( - attr["metadata"]["access_mode"].value # type: ignore - if hasattr(attr["metadata"], "access_mode") - else AccessMode.READ_ONLY - ), - ) - ) + data = construct_value(self.device.settings, param) + else: - data = serialize( - Value("None", "string", access_mode=AccessMode.NONE) # type: ignore - ) + data = serialize(Value("None", "string", access_mode="None")) return web.json_response(data) - @HTTPEndpoint.put( - f"/{DETECTOR_API}" + "/config/{parameter_name}", include_json=True - ) + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/config/{parameter_name}") async def put_config(self, request: web.Request) -> web.Response: """A HTTP Endpoint for setting configuration variables for the Eiger. @@ -76,28 +59,28 @@ async def put_config(self, request: web.Request) -> web.Response: """ param = request.match_info["parameter_name"] - response = json.loads(await request.json()) + response = await request.json() - if self.device.get_state()["value"] != State.IDLE.value: # type: ignore + if self.device.get_state() is not State.IDLE: LOGGER.warning("Eiger not initialized or is currently running.") - return web.json_response(serialize(SequenceComplete(7))) + return web.json_response(serialize([])) elif ( hasattr(self.device.settings, param) - and self.device.get_state()["value"] == State.IDLE.value # type: ignore + and self.device.get_state() is State.IDLE ): attr = response["value"] - LOGGER.debug(f"Changing to {attr} for {param}") + LOGGER.debug(f"Changing to {str(attr)} for {str(param)}") self.device.settings[param] = attr LOGGER.debug("Set " + str(param) + " to " + str(attr)) - return web.json_response(serialize(SequenceComplete(8))) + return web.json_response(serialize([param])) else: LOGGER.debug("Eiger has no config variable: " + str(param)) - return web.json_response(serialize(SequenceComplete(9))) + return web.json_response(serialize([])) - @HTTPEndpoint.get(f"/{DETECTOR_API}" + "/status/{status_param}") + @HttpEndpoint.get(f"/{DETECTOR_API}" + "/status/{status_param}") async def get_status(self, request: web.Request) -> web.Response: """A HTTP Endpoint for requesting the status of the Eiger. @@ -111,15 +94,14 @@ async def get_status(self, request: web.Request) -> web.Response: param = request.match_info["status_param"] if hasattr(self.device.status, param): - attr = self.device.status[param] - else: - attr = "None" + data = construct_value(self.device.status, param) - data = serialize({"value": attr}) + else: + data = serialize(Value("None", "string", access_mode="None")) return web.json_response(data) - @HTTPEndpoint.get(f"/{DETECTOR_API}" + "/status/board_000/{status_param}") + @HttpEndpoint.get(f"/{DETECTOR_API}" + "/status/board_000/{status_param}") async def get_board_000_status(self, request: web.Request) -> web.Response: """A HTTP Endpoint for requesting the status of the Eiger. @@ -130,18 +112,9 @@ async def get_board_000_status(self, request: web.Request) -> web.Response: web.Response: The response object returned given the result of the HTTP request. """ - param = request.match_info["status_param"] - - if hasattr(self.device.status, param): - attr = self.device.status[param] - else: - attr = "None" - - data = serialize({"value": attr}) + return await self.get_status(request) - return web.json_response(data) - - @HTTPEndpoint.get(f"/{DETECTOR_API}" + "/status/builder/{status_param}") + @HttpEndpoint.get(f"/{DETECTOR_API}" + "/status/builder/{status_param}") async def get_builder_status(self, request: web.Request) -> web.Response: """A HTTP Endpoint for requesting the status of the Eiger. @@ -152,18 +125,9 @@ async def get_builder_status(self, request: web.Request) -> web.Response: web.Response: The response object returned given the result of the HTTP request. """ - param = request.match_info["status_param"] - - if hasattr(self.device.status, param): - attr = self.device.status[param] - else: - attr = "None" - - data = serialize({"value": attr}) + return await self.get_status(request) - return web.json_response(data) - - @HTTPEndpoint.put(f"/{DETECTOR_API}" + "/command/initialize") + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/command/initialize", interrupt=True) async def initialize_eiger(self, request: web.Request) -> web.Response: """A HTTP Endpoint for the 'initialize' command of the Eiger. @@ -179,7 +143,7 @@ async def initialize_eiger(self, request: web.Request) -> web.Response: LOGGER.debug("Initializing Eiger...") return web.json_response(serialize(SequenceComplete(1))) - @HTTPEndpoint.put(f"/{DETECTOR_API}" + "/command/arm") + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/command/arm", interrupt=True) async def arm_eiger(self, request: web.Request) -> web.Response: """A HTTP Endpoint for the 'arm' command of the Eiger. @@ -195,7 +159,7 @@ async def arm_eiger(self, request: web.Request) -> web.Response: LOGGER.debug("Arming Eiger...") return web.json_response(serialize(SequenceComplete(2))) - @HTTPEndpoint.put(f"/{DETECTOR_API}" + "/command/disarm") + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/command/disarm", interrupt=True) async def disarm_eiger(self, request: web.Request) -> web.Response: """A HTTP Endpoint for the 'disarm' command of the Eiger. @@ -211,7 +175,7 @@ async def disarm_eiger(self, request: web.Request) -> web.Response: LOGGER.debug("Disarming Eiger...") return web.json_response(serialize(SequenceComplete(3))) - @HTTPEndpoint.put(f"/{DETECTOR_API}" + "/command/trigger") + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/command/trigger", interrupt=False) async def trigger_eiger(self, request: web.Request) -> web.Response: """A HTTP Endpoint for the 'trigger' command of the Eiger. @@ -222,13 +186,15 @@ async def trigger_eiger(self, request: web.Request) -> web.Response: web.Response: The response object returned given the result of the HTTP request. """ - trigger_message = await self.device.trigger() - self.device._set_state(State.IDLE) + LOGGER.debug("Triggering Eiger") + await self.device.trigger() + + await self.raise_interrupt() + await self.device.finished_aquisition.wait() - LOGGER.debug(trigger_message) return web.json_response(serialize(SequenceComplete(4))) - @HTTPEndpoint.put(f"/{DETECTOR_API}" + "/command/cancel") + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/command/cancel", interrupt=True) async def cancel_eiger(self, request: web.Request) -> web.Response: """A HTTP Endpoint for the 'cancel' command of the Eiger. @@ -244,7 +210,7 @@ async def cancel_eiger(self, request: web.Request) -> web.Response: LOGGER.debug("Cancelling Eiger...") return web.json_response(serialize(SequenceComplete(5))) - @HTTPEndpoint.put(f"/{DETECTOR_API}" + "/command/abort") + @HttpEndpoint.put(f"/{DETECTOR_API}" + "/command/abort", interrupt=True) async def abort_eiger(self, request: web.Request) -> web.Response: """A HTTP Endpoint for the 'abort' command of the Eiger. @@ -260,8 +226,202 @@ async def abort_eiger(self, request: web.Request) -> web.Response: LOGGER.debug("Aborting Eiger...") return web.json_response(serialize(SequenceComplete(6))) + @HttpEndpoint.get(f"/{STREAM_API}" + "/status/{param}") + async def get_stream_status(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for requesting status values from the Stream. -class EigerZMQAdapter(ZeroMQAdapter): + Args: + request (web.Request): The request object that takes the given parameter. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + data = construct_value(self.device.stream.status, param) + + return web.json_response(data) + + @HttpEndpoint.get(f"/{STREAM_API}" + "/config/{param}") + async def get_stream_config(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for requesting config values from the Stream. + + Args: + request (web.Request): The request object that takes the given parameter. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + data = construct_value(self.device.stream.config, param) + + return web.json_response(data) + + @HttpEndpoint.put(f"/{STREAM_API}" + "/config/{param}") + async def put_stream_config(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for setting config values for the Stream. + + Args: + request (web.Request): The request object that takes the given parameter + and value. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + response = await request.json() + + if hasattr(self.device.stream.config, param): + attr = response["value"] + + LOGGER.debug(f"Changing to {attr} for {param}") + + self.device.stream.config[param] = attr + + LOGGER.debug("Set " + str(param) + " to " + str(attr)) + return web.json_response(serialize([param])) + else: + LOGGER.debug("Eiger has no config variable: " + str(param)) + return web.json_response(serialize([])) + + @HttpEndpoint.get(f"/{MONITOR_API}" + "/config/{param}") + async def get_monitor_config(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for requesting config values from the Monitor. + + Args: + request (web.Request): The request object that takes the given parameter. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + data = construct_value(self.device.monitor_config, param) + + return web.json_response(data) + + @HttpEndpoint.put(f"/{MONITOR_API}" + "/config/{param}") + async def put_monitor_config(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for setting config values for the Monitor. + + Args: + request (web.Request): The request object that takes the given parameter + and value. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + response = await request.json() + + if hasattr(self.device.monitor_config, param): + attr = response["value"] + + LOGGER.debug(f"Changing to {attr} for {param}") + + self.device.monitor_config[param] = attr + + LOGGER.debug("Set " + str(param) + " to " + str(attr)) + return web.json_response(serialize([param])) + else: + LOGGER.debug("Eiger has no config variable: " + str(param)) + return web.json_response(serialize([])) + + @HttpEndpoint.get(f"/{MONITOR_API}" + "/status/{param}") + async def get_monitor_status(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for requesting status values from the Monitor. + + Args: + request (web.Request): The request object that takes the given parameter. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + data = construct_value(self.device.monitor_status, param) + + return web.json_response(data) + + @HttpEndpoint.get(f"/{FILEWRITER_API}" + "/config/{param}") + async def get_filewriter_config(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for requesting config values from the Filewriter. + + Args: + request (web.Request): The request object that takes the given parameter. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + data = construct_value(self.device.filewriter_config, param) + + return web.json_response(data) + + @HttpEndpoint.put(f"/{FILEWRITER_API}" + "/config/{param}") + async def put_filewriter_config(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for setting config values for the Filewriter. + + Args: + request (web.Request): The request object that takes the given parameter + and value. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + response = await request.json() + + if hasattr(self.device.filewriter_config, param): + attr = response["value"] + + LOGGER.debug(f"Changing to {attr} for {param}") + + self.device.filewriter_config[param] = attr + + LOGGER.debug("Set " + str(param) + " to " + str(attr)) + return web.json_response(serialize([param])) + else: + LOGGER.debug("Eiger has no config variable: " + str(param)) + return web.json_response(serialize([])) + + @HttpEndpoint.get(f"/{FILEWRITER_API}" + "/status/{param}") + async def get_filewriter_status(self, request: web.Request) -> web.Response: + """A HTTP Endpoint for requesting status values from the Filewriter. + + Args: + request (web.Request): The request object that takes the given parameter. + + Returns: + web.Response: The response object returned given the result of the HTTP + request. + """ + param = request.match_info["param"] + + data = construct_value(self.device.filewriter_status, param) + + return web.json_response(data) + + +class EigerZMQAdapter(ZeroMqPushAdapter): """An Eiger adapter which parses the data to send along a ZeroMQStream.""" device: EigerDevice + + def after_update(self) -> None: + """Updates IOC values immediately following a device update.""" + buffered_data = self.device.stream.consume_data() + self.send_message_sequence_soon([list(buffered_data)]) diff --git a/src/tickit_devices/eiger/eiger_schema.py b/src/tickit_devices/eiger/eiger_schema.py index 557d87aa..1e59833a 100644 --- a/src/tickit_devices/eiger/eiger_schema.py +++ b/src/tickit_devices/eiger/eiger_schema.py @@ -1,10 +1,18 @@ +import logging from dataclasses import dataclass, field from enum import Enum from functools import partial from typing import Any, Generic, List, Mapping, Optional, TypeVar +from apischema import serialized +from apischema.fields import with_fields_set +from apischema.metadata import skip +from apischema.serialization import serialize + T = TypeVar("T") +LOGGER = logging.getLogger(__name__) + def field_config(**kwargs) -> Mapping[str, Any]: """Helper function to create a typesafe dictionary. @@ -27,75 +35,123 @@ class AccessMode(Enum): READ_ONLY: str = "r" WRITE_ONLY: str = "w" READ_WRITE: str = "rw" + + +class ValueType(Enum): + """Possible value types for field metadata.""" + FLOAT: str = "float" INT: str = "int" UINT: str = "uint" STRING: str = "string" - LIST_STR: str = "string[]" + STR_LIST: str = "string[]" BOOL: str = "bool" FLOAT_GRID: str = "float[][]" UINT_GRID: str = "uint[][]" DATE: str = "date" + DATETIME: str = "datetime" NONE: str = "none" + STATE: str = "State" # # Shortcuts to creating dataclass field metadata # rw_float: partial = partial( - field_config, value_type=AccessMode.FLOAT, access_mode=AccessMode.READ_WRITE + field_config, value_type=ValueType.FLOAT, access_mode=AccessMode.READ_WRITE ) ro_float: partial = partial( - field_config, value_type=AccessMode.FLOAT, access_mode=AccessMode.READ_ONLY + field_config, value_type=ValueType.FLOAT, access_mode=AccessMode.READ_ONLY ) rw_int: partial = partial( - field_config, value_type=AccessMode.INT, access_mode=AccessMode.READ_WRITE + field_config, value_type=ValueType.INT, access_mode=AccessMode.READ_WRITE ) ro_int: partial = partial( - field_config, value_type=AccessMode.INT, access_mode=AccessMode.READ_ONLY + field_config, value_type=ValueType.INT, access_mode=AccessMode.READ_ONLY ) rw_uint: partial = partial( - field_config, value_type=AccessMode.UINT, access_mode=AccessMode.READ_WRITE + field_config, value_type=ValueType.UINT, access_mode=AccessMode.READ_WRITE ) rw_str: partial = partial( - field_config, value_type=AccessMode.STRING, access_mode=AccessMode.READ_WRITE + field_config, value_type=ValueType.STRING, access_mode=AccessMode.READ_WRITE ) ro_str: partial = partial( - field_config, value_type=AccessMode.STRING, access_mode=AccessMode.READ_ONLY + field_config, value_type=ValueType.STRING, access_mode=AccessMode.READ_ONLY ) rw_bool: partial = partial( - field_config, value_type=AccessMode.BOOL, access_mode=AccessMode.READ_WRITE + field_config, value_type=ValueType.BOOL, access_mode=AccessMode.READ_WRITE ) rw_float_grid: partial = partial( field_config, - value_type=AccessMode.FLOAT_GRID, + value_type=ValueType.FLOAT_GRID, access_mode=AccessMode.READ_WRITE, ) rw_uint_grid: partial = partial( field_config, - value_type=AccessMode.UINT_GRID, + value_type=ValueType.UINT_GRID, access_mode=AccessMode.READ_WRITE, ) ro_date: partial = partial( - field_config, value_type=AccessMode.DATE, access_mode=AccessMode.READ_ONLY + field_config, value_type=ValueType.DATE, access_mode=AccessMode.READ_ONLY +) +rw_datetime: partial = partial( + field_config, value_type=ValueType.DATETIME, access_mode=AccessMode.READ_WRITE +) +rw_state: partial = partial( + field_config, value_type=ValueType.STATE, access_mode=AccessMode.READ_WRITE +) +ro_str_list: partial = partial( + field_config, value_type=ValueType.STR_LIST, access_mode=AccessMode.READ_ONLY ) +@with_fields_set @dataclass class Value(Generic[T]): """Schema for a value to be returned by the API. Most fields are optional.""" value: T value_type: str - access_mode: Optional[AccessMode] = None + access_mode: Optional[str] = None unit: Optional[str] = None min: Optional[T] = None max: Optional[T] = None allowed_values: Optional[List[str]] = None +def construct_value(obj, param): # noqa: D103 + value = obj[param]["value"] + meta = obj[param]["metadata"] + + if "allowed_values" in meta: + data = serialize( + Value( + value, + meta["value_type"].value, + access_mode=meta["access_mode"].value, + allowed_values=meta["allowed_values"], + ) + ) + + else: + data = serialize( + Value( + value, + meta["value_type"].value, + access_mode=meta["access_mode"].value, + ) + ) + + return data + + @dataclass class SequenceComplete: """Schema for confirmation returned by operations that do not return values.""" - sequence_id: int = field(default=1, metadata=ro_int()) + _sequence_id: int = field(default=1, metadata=skip, init=True, repr=False) + + @serialized("sequence id") # type: ignore + @property + def sequence_id(self) -> int: # noqa: D102 + return self._sequence_id diff --git a/src/tickit_devices/eiger/eiger_settings.py b/src/tickit_devices/eiger/eiger_settings.py index 7e199917..fec60af9 100644 --- a/src/tickit_devices/eiger/eiger_settings.py +++ b/src/tickit_devices/eiger/eiger_settings.py @@ -1,18 +1,22 @@ +import logging from dataclasses import dataclass, field, fields from enum import Enum -from typing import Any, List +from typing import Any, List, Mapping from .eiger_schema import ( - AccessMode, - field_config, ro_float, ro_str, rw_bool, rw_float, + rw_float_grid, rw_int, rw_str, + rw_uint_grid, ) +LOGGER = logging.getLogger(__name__) + + FRAME_WIDTH: int = 4148 FRAME_HEIGHT: int = 4362 @@ -77,11 +81,10 @@ class EigerSettings: detector_number: str = field(default="EIGERSIM001", metadata=ro_str()) detector_readout_time: float = field(default=0.01, metadata=rw_float()) element: str = field( - default="Co", metadata=rw_str(allowed_values=[e.name for e in KA_Energy]) + default="Co", metadata=rw_str(allowed_values=["", *(e.name for e in KA_Energy)]) ) flatfield: List[List[float]] = field( - default_factory=lambda: [[]], - metadata=field_config(value_type=AccessMode.FLOAT_GRID), + default_factory=lambda: [[]], metadata=rw_float_grid() ) flatfield_correction_applied: bool = field(default=True, metadata=rw_bool()) frame_time: float = field(default=0.12, metadata=rw_float()) @@ -96,8 +99,7 @@ class EigerSettings: phi_start: float = field(default=0.0, metadata=rw_float()) photon_energy: float = field(default=6930.32, metadata=rw_float()) pixel_mask: List[List[int]] = field( - default_factory=lambda: [[]], - metadata=field_config(value_type=AccessMode.UINT_GRID), + default_factory=lambda: [[]], metadata=rw_uint_grid() ) pixel_mask_applied: bool = field(default=False, metadata=rw_bool()) roi_mode: str = field( @@ -112,7 +114,7 @@ class EigerSettings: ) two_theta_increment: float = field(default=0.0, metadata=rw_float()) two_theta_start: float = field(default=0.0, metadata=rw_float()) - wavelength: float = field(default=1e-9, metadata=rw_float()) + wavelength: float = field(default=1.0, metadata=rw_float()) x_pixel_size: float = field(default=0.01, metadata=ro_float()) x_pixels_in_detector: int = field(default=FRAME_WIDTH, metadata=rw_int()) y_pixel_size: float = field(default=0.01, metadata=ro_float()) @@ -130,6 +132,41 @@ def __getitem__(self, key: str) -> Any: # noqa: D105 def __setitem__(self, key: str, value: Any) -> None: # noqa: D105 self.__dict__[key] = value + self._check_dependencies(key, value) + + def _check_dependencies(self, key, value): if key == "element": self.photon_energy = getattr(KA_Energy, value).value - self.threshold_energy = 0.5 * self.photon_energy + self.wavelength = (1240 / self.photon_energy) / 10 # to convert to Angstrom + self._calc_threshold_energy() + + elif key == "photon_energy": + self.element = "" + + hc = 1240 + self.wavelength = (hc / self.photon_energy) / 10 # to convert to Angstrom + + self._calc_threshold_energy() + + elif key == "wavelength": + self.element = "" + + hc = 1240 + self.photon_energy = hc / (self.wavelength * 10) # to convert from Angstrom + + self._calc_threshold_energy() + + elif key == "count_time": + self.frame_time = self.count_time + self.detector_readout_time + + def _calc_threshold_energy(self): + self.threshold_energy = 0.5 * self.photon_energy + + LOGGER.warning("Flatfield not recalculated.") + + def filtered(self, exclude_fields: List[str]) -> Mapping[str, Any]: + return { + fld.name: vars(self)[fld.name] + for fld in fields(self) + if fld not in exclude_fields + } diff --git a/src/tickit_devices/eiger/eiger_status.py b/src/tickit_devices/eiger/eiger_status.py index 8209ccbf..74a96ec4 100644 --- a/src/tickit_devices/eiger/eiger_status.py +++ b/src/tickit_devices/eiger/eiger_status.py @@ -3,6 +3,8 @@ from enum import Enum from typing import Any, List +from .eiger_schema import ro_str_list, rw_datetime, rw_float, rw_state + class State(Enum): """Possible states of the Eiger detector.""" @@ -21,15 +23,21 @@ class State(Enum): class EigerStatus: """Stores the status parameters of the Eiger detector.""" - state: State = field(default=State.NA) - errors: List[str] = field(default_factory=list) - th0_temp: float = field(default=24.5) - th0_humidity: float = field(default=0.2) - time: datetime = field(default=datetime.now()) - dcu_buffer_free: float = field(default=0.5) + state: State = field( + default=State.NA, + metadata=rw_state(allowed_values=[state.value for state in State]), + ) + errors: List[str] = field(default_factory=list, metadata=ro_str_list()) + th0_temp: float = field(default=24.5, metadata=rw_float()) + th0_humidity: float = field(default=0.2, metadata=rw_float()) + time: datetime = field(default=datetime.now(), metadata=rw_datetime()) + dcu_buffer_free: float = field(default=0.5, metadata=rw_float()) def __getitem__(self, key: str) -> Any: # noqa: D105 f = {} for field_ in fields(self): - f[field_.name] = vars(self)[field_.name] + f[field_.name] = { + "value": vars(self)[field_.name], + "metadata": field_.metadata, + } return f[key] diff --git a/src/tickit_devices/eiger/filewriter/eiger_filewriter.py b/src/tickit_devices/eiger/filewriter/eiger_filewriter.py deleted file mode 100644 index 3f447f99..00000000 --- a/src/tickit_devices/eiger/filewriter/eiger_filewriter.py +++ /dev/null @@ -1,78 +0,0 @@ -import logging -from typing import TypedDict - -from aiohttp import web -from apischema import serialize -from tickit.adapters.interpreters.endpoints.http_endpoint import HTTPEndpoint -from tickit.core.typedefs import SimTime - -from tickit_devices.eiger.eiger_schema import Value -from tickit_devices.eiger.filewriter.filewriter_config import FileWriterConfig -from tickit_devices.eiger.filewriter.filewriter_status import FileWriterStatus - -LOGGER = logging.getLogger(__name__) - -FILEWRITER_API = "filewriter/api/1.8.0" - - -class EigerFileWriter: - """Simulation of an Eiger FileWriter.""" - - #: An empty typed mapping of input values - Inputs: TypedDict = TypedDict("Inputs", {}) - #: A typed mapping containing the 'value' output value - Outputs: TypedDict = TypedDict("Outputs", {}) - - def __init__(self) -> None: - """An Eiger FileWriter constructor.""" - self.filewriter_status: FileWriterStatus = FileWriterStatus() - self.filewriter_config: FileWriterConfig = FileWriterConfig() - self.filewriter_callback_period = SimTime(int(1e9)) - - -class EigerFileWriterAdapter: - """An adapter for the FileWriter.""" - - device: EigerFileWriter - - @HTTPEndpoint.get(f"/{FILEWRITER_API}" + "/config/{param}") - async def get_filewriter_config(self, request: web.Request) -> web.Response: - """A HTTP Endpoint for requesting config values from the Filewriter. - - Args: - request (web.Request): The request object that takes the given parameter. - - Returns: - web.Response: The response object returned given the result of the HTTP - request. - """ - param = request.match_info["param"] - val = self.device.filewriter_config[param]["value"] - meta = self.device.filewriter_config[param]["metadata"] - - data = serialize( - Value(val, meta["value_type"].value, access_mode=meta["access_mode"]) - ) - - return web.json_response(data) - - @HTTPEndpoint.get(f"/{FILEWRITER_API}" + "/status/{param}") - async def get_filewriter_status(self, request: web.Request) -> web.Response: - """A HTTP Endpoint for requesting status values from the Filewriter. - - Args: - request (web.Request): The request object that takes the given parameter. - - Returns: - web.Response: The response object returned given the result of the HTTP - request. - """ - param = request.match_info["param"] - val = self.device.filewriter_status[param]["value"] - meta = self.device.filewriter_status[param]["metadata"] - - data = serialize( - Value(val, meta["value_type"].value, access_mode=meta["access_mode"]) - ) - - return web.json_response(data) diff --git a/src/tickit_devices/eiger/filewriter/filewriter_config.py b/src/tickit_devices/eiger/filewriter/filewriter_config.py index 990184f7..b4b47bfb 100644 --- a/src/tickit_devices/eiger/filewriter/filewriter_config.py +++ b/src/tickit_devices/eiger/filewriter/filewriter_config.py @@ -24,3 +24,6 @@ def __getitem__(self, key: str) -> Any: # noqa: D105 "metadata": field_.metadata, } return f[key] + + def __setitem__(self, key: str, value: Any) -> None: # noqa: D105 + self.__dict__[key] = value diff --git a/src/tickit_devices/eiger/filewriter/filewriter_status.py b/src/tickit_devices/eiger/filewriter/filewriter_status.py index 6659da5b..0610df59 100644 --- a/src/tickit_devices/eiger/filewriter/filewriter_status.py +++ b/src/tickit_devices/eiger/filewriter/filewriter_status.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field, fields from typing import Any, List -from tickit_devices.eiger.eiger_schema import AccessMode, ro_str +from tickit_devices.eiger.eiger_schema import ro_str, ro_str_list @dataclass @@ -9,18 +9,8 @@ class FileWriterStatus: """Eiger filewriter status taken from the API spec.""" state: str = field(default="ready", metadata=ro_str()) - error: List[str] = field( - default_factory=lambda: [], - metadata=dict( - value=[], value_type=AccessMode.LIST_STR, access_mode=AccessMode.READ_ONLY - ), - ) - files: List[str] = field( - default_factory=lambda: [], - metadata=dict( - value=[], value_type=AccessMode.LIST_STR, access_mode=AccessMode.READ_ONLY - ), - ) + error: List[str] = field(default_factory=lambda: [], metadata=ro_str_list()) + files: List[str] = field(default_factory=lambda: [], metadata=ro_str_list()) def __getitem__(self, key: str) -> Any: # noqa: D105 f = {} diff --git a/src/tickit_devices/eiger/monitor/eiger_monitor.py b/src/tickit_devices/eiger/monitor/eiger_monitor.py deleted file mode 100644 index 1217b111..00000000 --- a/src/tickit_devices/eiger/monitor/eiger_monitor.py +++ /dev/null @@ -1,78 +0,0 @@ -import logging -from typing import TypedDict - -from aiohttp import web -from apischema import serialize -from tickit.adapters.interpreters.endpoints.http_endpoint import HTTPEndpoint -from tickit.core.typedefs import SimTime - -from tickit_devices.eiger.eiger_schema import Value -from tickit_devices.eiger.monitor.monitor_config import MonitorConfig -from tickit_devices.eiger.monitor.monitor_status import MonitorStatus - -LOGGER = logging.getLogger(__name__) - -MONITOR_API = "monitor/api/1.8.0" - - -class EigerMonitor: - """Simulation of an Eiger Monitor.""" - - #: An empty typed mapping of input values - Inputs: TypedDict = TypedDict("Inputs", {}) - #: A typed mapping containing the 'value' output value - Outputs: TypedDict = TypedDict("Outputs", {}) - - def __init__(self) -> None: - """An Eiger Monitor constructor.""" - self.monitor_status: MonitorStatus = MonitorStatus() - self.monitor_config: MonitorConfig = MonitorConfig() - self.monitor_callback_period = SimTime(int(1e9)) - - -class EigerMonitorAdapter: - """An adapter for the Monitor.""" - - device: EigerMonitor - - @HTTPEndpoint.get(f"/{MONITOR_API}" + "/config/{param}") - async def get_monitor_config(self, request: web.Request) -> web.Response: - """A HTTP Endpoint for requesting config values from the Monitor. - - Args: - request (web.Request): The request object that takes the given parameter. - - Returns: - web.Response: The response object returned given the result of the HTTP - request. - """ - param = request.match_info["param"] - val = self.device.monitor_config[param]["value"] - meta = self.device.monitor_config[param]["metadata"] - - data = serialize( - Value(val, meta["value_type"].value, access_mode=meta["access_mode"]) - ) - - return web.json_response(data) - - @HTTPEndpoint.get(f"/{MONITOR_API}" + "/status/{param}") - async def get_monitor_status(self, request: web.Request) -> web.Response: - """A HTTP Endpoint for requesting status values from the Monitor. - - Args: - request (web.Request): The request object that takes the given parameter. - - Returns: - web.Response: The response object returned given the result of the HTTP - request. - """ - param = request.match_info["param"] - val = self.device.monitor_status[param]["value"] - meta = self.device.monitor_status[param]["metadata"] - - data = serialize( - Value(val, meta["value_type"].value, access_mode=meta["access_mode"]) - ) - - return web.json_response(data) diff --git a/src/tickit_devices/eiger/monitor/monitor_config.py b/src/tickit_devices/eiger/monitor/monitor_config.py index afaf5f15..3b4bd0ed 100644 --- a/src/tickit_devices/eiger/monitor/monitor_config.py +++ b/src/tickit_devices/eiger/monitor/monitor_config.py @@ -9,7 +9,7 @@ class MonitorConfig: """Eiger monitor configuration taken from the API spec.""" mode: str = field( - default="enabled", metadata=rw_str(allowed_values=["disabled", "enabled"]) + default="enabled", metadata=rw_str(allowed_values=["enabled", "disabled"]) ) buffer_size: int = field(default=512, metadata=rw_int()) @@ -21,3 +21,6 @@ def __getitem__(self, key: str) -> Any: # noqa: D105 "metadata": field_.metadata, } return f[key] + + def __setitem__(self, key: str, value: Any) -> None: # noqa: D105 + self.__dict__[key] = value diff --git a/src/tickit_devices/eiger/monitor/monitor_status.py b/src/tickit_devices/eiger/monitor/monitor_status.py index c5141168..18b1894b 100644 --- a/src/tickit_devices/eiger/monitor/monitor_status.py +++ b/src/tickit_devices/eiger/monitor/monitor_status.py @@ -1,19 +1,14 @@ from dataclasses import dataclass, field, fields from typing import Any, List -from tickit_devices.eiger.eiger_schema import AccessMode +from tickit_devices.eiger.eiger_schema import ro_str_list @dataclass class MonitorStatus: """Eiger monitor status taken from the API spec.""" - error: List[str] = field( - default_factory=lambda: [], - metadata=dict( - value=[], value_type=AccessMode.LIST_STR, access_mode=AccessMode.READ_ONLY - ), - ) + error: List[str] = field(default_factory=lambda: [], metadata=ro_str_list()) def __getitem__(self, key: str) -> Any: # noqa: D105 f = {} diff --git a/src/tickit_devices/eiger/stream/eiger_stream.py b/src/tickit_devices/eiger/stream/eiger_stream.py index 3b1b22f5..acdc477d 100644 --- a/src/tickit_devices/eiger/stream/eiger_stream.py +++ b/src/tickit_devices/eiger/stream/eiger_stream.py @@ -1,25 +1,38 @@ import logging -from typing import TypedDict +from queue import Queue +from typing import Any, Iterable, Mapping, TypedDict, Union -from aiohttp import web -from apischema import serialize -from tickit.adapters.interpreters.endpoints.http_endpoint import HTTPEndpoint +from pydantic.v1 import BaseModel from tickit.core.typedefs import SimTime - -from tickit_devices.eiger.eiger_schema import Value +from typing_extensions import TypedDict + +from tickit_devices.eiger.data.dummy_image import Image +from tickit_devices.eiger.data.schema import ( + AcquisitionDetailsHeader, + AcquisitionSeriesFooter, + AcquisitionSeriesHeader, + ImageCharacteristicsHeader, + ImageConfigHeader, + ImageHeader, +) +from tickit_devices.eiger.eiger_settings import EigerSettings from tickit_devices.eiger.stream.stream_config import StreamConfig from tickit_devices.eiger.stream.stream_status import StreamStatus LOGGER = logging.getLogger(__name__) -STREAM_API = "stream/api/1.8.0" + + +_Message = Union[BaseModel, Mapping[str, Any], bytes] class EigerStream: """Simulation of an Eiger stream.""" - stream_status: StreamStatus - stream_config: StreamConfig - stream_callback_period: SimTime + status: StreamStatus + config: StreamConfig + callback_period: SimTime + + _message_buffer: Queue[_Message] #: An empty typed mapping of input values Inputs: TypedDict = TypedDict("Inputs", {}) @@ -28,54 +41,110 @@ class EigerStream: def __init__(self, callback_period: int = int(1e9)) -> None: """An Eiger Stream constructor.""" - self.stream_status = StreamStatus() - self.stream_config = StreamConfig() - self.stream_callback_period = SimTime(callback_period) - + self.status = StreamStatus() + self.config = StreamConfig() + self.callback_period = SimTime(callback_period) -class EigerStreamAdapter: - """An adapter for the Stream.""" + self._message_buffer = Queue() - device: EigerStream - - @HTTPEndpoint.get(f"/{STREAM_API}" + "/status/{param}") - async def get_stream_status(self, request: web.Request) -> web.Response: - """A HTTP Endpoint for requesting status values from the Stream. + def begin_series(self, settings: EigerSettings, series_id: int) -> None: + """Send the headers marking the beginning of the acquisition series. Args: - request (web.Request): The request object that takes the given parameter. - - Returns: - web.Response: The response object returned given the result of the HTTP - request. + settings: Current detector configuration, a snapshot may be sent with the + headers. + series_id: ID for the acquisition series. """ - param = request.match_info["param"] - val = self.device.stream_status[param]["value"] - meta = self.device.stream_status[param]["metadata"] + header_detail = self.config.header_detail + header = AcquisitionSeriesHeader( + header_detail=header_detail, + series=series_id, + ) + self._buffer(header) + + if header_detail != "none": + config_header = settings.filtered( + ["flatfield", "pixelmask" "countrate_correction_table"] + ) + self._buffer(config_header) + + if header_detail == "all": + x = settings.x_pixels_in_detector + y = settings.y_pixels_in_detector + + flatfield_header = AcquisitionDetailsHeader( + htype="flatfield-1.0", + shape=(x, y), + type="float32", + ) + self._buffer(flatfield_header) + flatfield_data_blob = {"blob": "blob"} + self._buffer(flatfield_data_blob) + + pixel_mask_header = AcquisitionDetailsHeader( + htype="dpixelmask-1.0", + shape=(x, y), + type="uint32", + ) + self._buffer(pixel_mask_header) + pixel_mask_data_blob = {"blob": "blob"} + self._buffer(pixel_mask_data_blob) + + countrate_table_header = AcquisitionDetailsHeader( + htype="dcountrate_table-1.0", + shape=(x, y), + type="float32", + ) + self._buffer(countrate_table_header) + countrate_table_data_blob = {"blob": "blob"} + self._buffer(countrate_table_data_blob) + + def insert_image(self, image: Image, series_id: int) -> None: + """Send headers and an data blob for a single image. - data = serialize( - Value(val, meta["value_type"].value, access_mode=meta["access_mode"]) + Args: + image: The image with associated metadata + series_id: ID for the acquisition series. + """ + header = ImageHeader( + frame=image.index, + hash=image.hash, + series=series_id, + ) + characteristics_header = ImageCharacteristicsHeader( + encoding=image.encoding, + shape=image.shape, + size=len(image.data), + type=image.dtype, + ) + config_header = ImageConfigHeader( + real_time=0.0, + start_time=0.0, + stop_time=0.0, ) - return web.json_response(data) + self._buffer(header) + self._buffer(characteristics_header) + self._buffer(image.data) + self._buffer(config_header) - @HTTPEndpoint.get(f"/{STREAM_API}" + "/config/{param}") - async def get_stream_config(self, request: web.Request) -> web.Response: - """A HTTP Endpoint for requesting config values from the Stream. + def end_series(self, series_id: int) -> None: + """Send footer marking the end of an acquisition series. Args: - request (web.Request): The request object that takes the given parameter. + series_id: ID of the series to end. + """ + footer = AcquisitionSeriesFooter(series=series_id) + self._buffer(footer) + + def consume_data(self) -> Iterable[_Message]: + """Consume all headers and data buffered by other methods. Returns: - web.Response: The response object returned given the result of the HTTP - request. + Iterable[_Message]: Iterable of headers and data """ - param = request.match_info["param"] - val = self.device.stream_config[param]["value"] - meta = self.device.stream_config[param]["metadata"] - - data = serialize( - Value(val, meta["value_type"].value, access_mode=meta["access_mode"]) - ) + while not self._message_buffer.empty(): + yield self._message_buffer.get() - return web.json_response(data) + def _buffer(self, message: _Message) -> None: + self._message_buffer.put_nowait(message) diff --git a/src/tickit_devices/eiger/stream/stream_config.py b/src/tickit_devices/eiger/stream/stream_config.py index 9bd390e7..0581f871 100644 --- a/src/tickit_devices/eiger/stream/stream_config.py +++ b/src/tickit_devices/eiger/stream/stream_config.py @@ -12,7 +12,7 @@ class StreamConfig: default="enabled", metadata=rw_str(allowed_values=["disabled", "enabled"]) ) header_detail: str = field( - default="basic", metadata=rw_str(allowed_values=["all", "basic", "none"]) + default="basic", metadata=rw_str(allowed_values=["none", "basic", "all"]) ) header_appendix: str = field(default="", metadata=rw_str()) image_appendix: str = field(default="", metadata=rw_str()) @@ -25,3 +25,6 @@ def __getitem__(self, key: str) -> Any: # noqa: D105 "metadata": field_.metadata, } return f[key] + + def __setitem__(self, key: str, value: Any) -> None: # noqa: D105 + self.__dict__[key] = value diff --git a/src/tickit_devices/eiger/stream/stream_status.py b/src/tickit_devices/eiger/stream/stream_status.py index e82a1a1c..f81e4935 100644 --- a/src/tickit_devices/eiger/stream/stream_status.py +++ b/src/tickit_devices/eiger/stream/stream_status.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field, fields from typing import Any, List -from tickit_devices.eiger.eiger_schema import AccessMode, ro_int, ro_str +from tickit_devices.eiger.eiger_schema import ro_int, ro_str, ro_str_list @dataclass @@ -9,12 +9,7 @@ class StreamStatus: """Eiger stream status taken from the API spec.""" state: str = field(default="ready", metadata=ro_str()) - error: List[str] = field( - default_factory=lambda: [], - metadata=dict( - value=[], value_type=AccessMode.LIST_STR, access_mode=AccessMode.READ_ONLY - ), - ) + error: List[str] = field(default_factory=lambda: [], metadata=ro_str_list()) dropped: int = field(default=0, metadata=ro_int()) def __getitem__(self, key: str) -> Any: # noqa: D105 diff --git a/src/tickit_devices/synchrotron/synchrotron_current.py b/src/tickit_devices/synchrotron/synchrotron_current.py index eddd62ff..2b15c3a0 100644 --- a/src/tickit_devices/synchrotron/synchrotron_current.py +++ b/src/tickit_devices/synchrotron/synchrotron_current.py @@ -180,6 +180,6 @@ def __call__(self) -> Component: # noqa: D102 SynchrotronCurrentTCPAdapter( TcpServer(self.host, self.port, self.format) ), - SynchrotronCurrentEpicsAdapter(self.db_file, self.ioc_name), + SynchrotronCurrentEpicsAdapter(self.ioc_name, self.db_file), ], ) diff --git a/src/tickit_devices/synchrotron/synchrotron_machine.py b/src/tickit_devices/synchrotron/synchrotron_machine.py index 56a1290e..06ffd7ac 100644 --- a/src/tickit_devices/synchrotron/synchrotron_machine.py +++ b/src/tickit_devices/synchrotron/synchrotron_machine.py @@ -210,6 +210,6 @@ def __call__(self) -> Component: # noqa: D102 SynchrotronMachineStatusTCPAdapter( TcpServer(self.host, self.port, self.format) ), - SynchrotronMachineStatusEpicsAdapter(self.db_file, self.ioc_name), + SynchrotronMachineStatusEpicsAdapter(self.ioc_name, self.db_file), ], ) diff --git a/src/tickit_devices/synchrotron/synchrotron_topup.py b/src/tickit_devices/synchrotron/synchrotron_topup.py index 7ea29f60..01643edb 100644 --- a/src/tickit_devices/synchrotron/synchrotron_topup.py +++ b/src/tickit_devices/synchrotron/synchrotron_topup.py @@ -231,6 +231,6 @@ def __call__(self) -> Component: # noqa: D102 SynchrotronTopUpTCPAdapter( TcpServer(self.host, self.port, self.format) ), - SynchrotronTopUpEpicsAdapter(self.db_file, self.ioc_name), + SynchrotronTopUpEpicsAdapter(self.ioc_name, self.db_file), ], ) diff --git a/tests/conftest.py b/tests/conftest.py index 1742f64e..170b52bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,7 +25,8 @@ def tickit_process(request): ) # Wait for IOC to be up while True: - if "complete" in proc.stdout.readline(): + line = proc.stdout.readline() + if "complete" in line: break yield proc proc.send_signal(signal.SIGINT) diff --git a/tests/eiger/test_eiger.py b/tests/eiger/test_eiger.py index 55a17092..c4e3eea2 100644 --- a/tests/eiger/test_eiger.py +++ b/tests/eiger/test_eiger.py @@ -1,95 +1,244 @@ +import itertools +from unittest.mock import ANY + import pytest +from mock import MagicMock, Mock +from tickit.core.typedefs import SimTime from tickit_devices.eiger.eiger import EigerDevice from tickit_devices.eiger.eiger_status import State +from tickit_devices.eiger.stream.eiger_stream import EigerStream + + +@pytest.fixture +def mock_stream() -> EigerStream: + return MagicMock(EigerStream) @pytest.fixture -def eiger() -> EigerDevice: - return EigerDevice() +def eiger(mock_stream: EigerStream) -> EigerDevice: + return EigerDevice(stream=mock_stream) -def test_eiger_constructor(): - EigerDevice() +def test_starting_state_is_na(eiger: EigerDevice): + assert_in_state(eiger, State.NA) @pytest.mark.asyncio -async def test_eiger_initialize(eiger: EigerDevice): +async def test_initialize(eiger: EigerDevice): await eiger.initialize() - - assert State.IDLE.value == eiger.get_state()["value"] + assert_in_state(eiger, State.IDLE) @pytest.mark.asyncio -async def test_eiger_arm(eiger: EigerDevice): +async def test_arm(eiger: EigerDevice): + await eiger.initialize() await eiger.arm() - - assert State.READY.value == eiger.get_state()["value"] + assert_in_state(eiger, State.READY) @pytest.mark.asyncio -async def test_eiger_disarm(eiger: EigerDevice): +async def test_disarm(eiger: EigerDevice): + await eiger.initialize() + await eiger.arm() await eiger.disarm() + assert_in_state(eiger, State.IDLE) - assert State.IDLE.value == eiger.get_state()["value"] + +@pytest.mark.asyncio +async def test_trigger_in_ints_mode_sets_acquire(eiger: EigerDevice): + await eiger.initialize() + eiger.settings.trigger_mode = "ints" + await eiger.arm() + assert_in_state(eiger, State.READY) + await eiger.trigger() + assert_in_state(eiger, State.ACQUIRE) @pytest.mark.asyncio -async def test_eiger_trigger_ints_and_ready(eiger: EigerDevice): - eiger._set_state(State.READY) +async def test_trigger_in_ints_mode_while_not_armed_is_ignored( + eiger: EigerDevice, +): + await eiger.initialize() eiger.settings.trigger_mode = "ints" + await eiger.trigger() + assert_in_state(eiger, State.IDLE) - message = await eiger.trigger() - assert State.ACQUIRE.value == eiger.get_state()["value"] - assert "Aquiring Data from Eiger..." == message +@pytest.mark.asyncio +async def test_trigger_in_exts_mode_is_ignored(eiger: EigerDevice): + await eiger.initialize() + eiger.settings.trigger_mode = "exts" + await eiger.arm() + assert_in_state(eiger, State.READY) + await eiger.trigger() + eiger.update(SimTime(0.0), {}) + assert_in_state(eiger, State.READY) @pytest.mark.asyncio -async def test_eiger_trigger_not_ints_and_ready(eiger: EigerDevice): - eiger._set_state(State.READY) +async def test_update_in_exts_mode_is_ignored(eiger: EigerDevice): + await eiger.initialize() + eiger.settings.trigger_mode = "exts" + await eiger.arm() + assert_in_state(eiger, State.READY) + eiger.update(SimTime(0.0), {}) + assert_in_state(eiger, State.READY) - message = await eiger.trigger() - assert State.READY.value == eiger.get_state()["value"] - assert ( - f"Ignoring trigger, state={eiger.status.state}," - f"trigger_mode={eiger.settings.trigger_mode}" == message - ) +@pytest.mark.asyncio +async def test_trigger_in_exts_mode_while_not_armed_is_ignored( + eiger: EigerDevice, +): + await eiger.initialize() + eiger.settings.trigger_mode = "exts" + await eiger.trigger() + assert_in_state(eiger, State.IDLE) @pytest.mark.asyncio -async def test_eiger_trigger_not_ints_and_not_ready(eiger: EigerDevice): - eiger._set_state(State.IDLE) +async def test_cancel(eiger: EigerDevice): + await eiger.cancel() + assert_in_state(eiger, State.READY) - message = await eiger.trigger() - assert State.READY.value != eiger.get_state()["value"] - assert ( - f"Ignoring trigger, state={eiger.status.state}," - f"trigger_mode={eiger.settings.trigger_mode}" == message - ) +@pytest.mark.asyncio +async def test_abort(eiger: EigerDevice): + await eiger.abort() + assert_in_state(eiger, State.IDLE) + + +@pytest.mark.asyncio +async def test_armed_eiger_starts_series(eiger: EigerDevice, mock_stream: Mock): + await eiger.initialize() + eiger.settings.trigger_mode = "ints" + await eiger.arm() + mock_stream.begin_series.assert_called_once_with(eiger.settings, 1) @pytest.mark.asyncio -async def test_eiger_cancel(eiger: EigerDevice): +async def test_disarmed_eiger_starts_and_ends_series( + eiger: EigerDevice, mock_stream: Mock +): + await eiger.initialize() + eiger.settings.trigger_mode = "ints" + await eiger.arm() + await eiger.disarm() + mock_stream.begin_series.assert_called_once_with(eiger.settings, 1) + mock_stream.end_series.assert_called_once_with(1) + + +@pytest.mark.asyncio +async def test_cancelled_eiger_starts_and_ends_series( + eiger: EigerDevice, mock_stream: Mock +): + await eiger.initialize() + eiger.settings.trigger_mode = "ints" + await eiger.arm() await eiger.cancel() + mock_stream.begin_series.assert_called_once_with(eiger.settings, 1) + mock_stream.end_series.assert_called_once_with(1) - assert State.READY.value == eiger.get_state()["value"] + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "num_frames,num_series", list(itertools.product([0, 1, 2, 10], [1, 2, 3])) +) +async def test_acquire_frames_in_ints_mode( + eiger: EigerDevice, + mock_stream: Mock, + num_frames: int, + num_series: int, +): + for series in range(1, num_series + 1): + await eiger.initialize() + eiger.settings.trigger_mode = "ints" + eiger.settings.nimages = num_frames + await eiger.arm() + await eiger.trigger() + + # Extra update cleans up state + for i in range(num_frames): + update = eiger.update(SimTime(i), {}) + assert update.call_at == SimTime(i + int(0.12 * 1e9)) + + update = eiger.update(SimTime(0.0), {}) + assert update.call_at is None + + mock_stream.begin_series.assert_called_with(eiger.settings, series) + assert mock_stream.begin_series.call_count == series + if num_frames > 0: + mock_stream.insert_image.assert_called_with(ANY, series) + assert mock_stream.insert_image.call_count == series * num_frames + mock_stream.end_series.assert_called_with(series) + assert mock_stream.end_series.call_count == series + + assert_in_state(eiger, State.IDLE) @pytest.mark.asyncio -async def test_eiger_abort(eiger: EigerDevice): - await eiger.abort() +@pytest.mark.parametrize( + "num_frames,num_series", list(itertools.product([0, 1, 2, 10], [1, 2, 3])) +) +async def test_acquire_frames_in_exts_mode( + eiger: EigerDevice, + mock_stream: Mock, + num_frames: int, + num_series: int, +): + for series in range(1, num_series + 1): + await eiger.initialize() + eiger.settings.trigger_mode = "exts" + eiger.settings.nimages = num_frames + await eiger.arm() + + # Trigger detector + update = eiger.update(SimTime(0.0), {"trigger": True}) + assert update.call_at == 0.0 + + # Extra update cleans up state + for i in range(num_frames): + update = eiger.update(SimTime(i), {}) + assert update.call_at == SimTime(i + int(0.12 * 1e9)) + + update = eiger.update(SimTime(0.0), {}) + assert update.call_at is None + + mock_stream.begin_series.assert_called_with(eiger.settings, series) + assert mock_stream.begin_series.call_count == series + if num_frames > 0: + mock_stream.insert_image.assert_called_with(ANY, series) + assert mock_stream.insert_image.call_count == series * num_frames + mock_stream.end_series.assert_called_with(series) + assert mock_stream.end_series.call_count == series + + assert_in_state(eiger, State.IDLE) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("num_series", [1, 2, 3]) +async def test_abort_mid_acquisition( + eiger: EigerDevice, + mock_stream: Mock, + num_series: int, +): + for series in range(1, num_series + 1): + await eiger.initialize() + eiger.settings.trigger_mode = "ints" + eiger.settings.nimages = 3 + await eiger.arm() + await eiger.trigger() - assert State.IDLE.value == eiger.get_state()["value"] + eiger.update(SimTime(0.0), {}) + eiger.update(SimTime(0.0), {}) + await eiger.abort() -def test_eiger_get_state(eiger: EigerDevice): - assert State.NA.value == eiger.get_state()["value"] + eiger.update(SimTime(0.0), {}) + mock_stream.end_series.assert_called_with(series) + assert_in_state(eiger, State.IDLE) -def test_eiger_set_state(eiger: EigerDevice): - eiger._set_state(State.IDLE) - assert State.IDLE.value == eiger.get_state()["value"] +def assert_in_state(eiger: EigerDevice, state: State) -> None: + assert state is eiger.get_state() diff --git a/tests/eiger/test_eiger_adapters.py b/tests/eiger/test_eiger_adapters.py deleted file mode 100644 index b4bc961d..00000000 --- a/tests/eiger/test_eiger_adapters.py +++ /dev/null @@ -1,157 +0,0 @@ -import json - -import aiohttp -import pytest -from aiohttp import web -from mock import MagicMock, Mock -from mock.mock import create_autospec - -from tickit_devices.eiger.eiger import EigerDevice -from tickit_devices.eiger.eiger_adapters import EigerRESTAdapter -from tickit_devices.eiger.eiger_settings import EigerSettings -from tickit_devices.eiger.eiger_status import EigerStatus, State - - -@pytest.fixture -def mock_status() -> MagicMock: - status = create_autospec(EigerStatus, instance=True) - status.state = State.NA - return status - - -@pytest.fixture -def mock_settings() -> MagicMock: - settings = create_autospec(EigerSettings, instance=True) - settings.count_time = { - "value": 0.1, - "metadata": {"value_type": Mock(value="int"), "access_mode": Mock(value="rw")}, - } - return settings - - -@pytest.fixture -def mock_eiger(mock_status: MagicMock, mock_settings: MagicMock) -> MagicMock: - mock_eiger = create_autospec(EigerDevice, instance=True) - mock_eiger.status = mock_status - mock_eiger.settings = mock_settings - return mock_eiger - - -@pytest.fixture -def raise_interrupt(): - async def raise_interrupt(): - return False - - return Mock(raise_interrupt) - - -@pytest.fixture -def eiger_adapter(mock_eiger: MagicMock) -> EigerRESTAdapter: - return EigerRESTAdapter("0.0.0.0", 8081) - - -def test_eiger_adapter_contructor(): - EigerRESTAdapter(mock_eiger, raise_interrupt) - - -@pytest.fixture() -def mock_request(): - mock_request = MagicMock(web.Request) - return mock_request - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "tickit_task", ["examples/configs/eiger/eiger.yaml"], indirect=True -) -async def test_eiger_system(tickit_task): - commands = { - "initialize": {"sequence_id": 1}, - "disarm": {"sequence_id": 3}, - "cancel": {"sequence_id": 5}, - "abort": {"sequence_id": 6}, - } - - url = "http://0.0.0.0:8081/detector/api/1.8.0/" - headers = {"content-type": "application/json"} - - filewriter_url = "http://0.0.0.0:8081/filewriter/api/1.8.0/" - monitor_url = "http://0.0.0.0:8081/monitor/api/1.8.0/" - stream_url = "http://0.0.0.0:8081/stream/api/1.8.0/" - - async def get_status(status, expected): - async with session.get(url + f"status/{status}") as resp: - assert expected == json.loads(str(await resp.text()))["value"] - - async with aiohttp.ClientSession() as session: - await get_status(status="state", expected="na") - - # Test setting config var before Eiger set up - data = '{"value": "test"}' - async with session.put( - url + "config/element", headers=headers, json=data - ) as resp: - assert json.loads(str(await resp.text())) == {"sequence_id": 7} - - # Test each command - for key, value in commands.items(): - async with session.put(url + f"command/{key}") as resp: - assert value == json.loads(str(await resp.text())) - - await get_status(status="doesnt_exist", expected="None") - - await get_status(status="board_000/th0_temp", expected=24.5) - - await get_status(status="board_000/doesnt_exist", expected="None") - - await get_status(status="builder/dcu_buffer_free", expected=0.5) - - await get_status(status="builder/doesnt_exist", expected="None") - - # Test Eiger in IDLE state - await get_status(status="state", expected="idle") - - async with session.get(url + "config/doesnt_exist") as resp: - assert json.loads(str(await resp.text()))["value"] == "None" - - data = '{"value": "test"}' - async with session.put( - url + "config/doesnt_exist", headers=headers, json=data - ) as resp: - assert json.loads(str(await resp.text())) == {"sequence_id": 9} - - async with session.get(url + "config/element") as resp: - assert json.loads(str(await resp.text()))["value"] == "Co" - - data = '{"value": "Li"}' - async with session.put( - url + "config/element", headers=headers, json=data - ) as resp: - assert json.loads(str(await resp.text())) == {"sequence_id": 8} - - async with session.get(url + "config/photon_energy") as resp: - assert json.loads(str(await resp.text()))["value"] == 54.3 - - async with session.get(filewriter_url + "config/mode") as resp: - assert "enabled" == json.loads(str(await resp.text()))["value"] - - async with session.get(filewriter_url + "status/state") as resp: - assert "ready" == json.loads(str(await resp.text()))["value"] - - async with session.get(monitor_url + "config/mode") as resp: - assert "enabled" == json.loads(str(await resp.text()))["value"] - - async with session.get(monitor_url + "status/error") as resp: - assert [] == json.loads(str(await resp.text()))["value"] - - async with session.get(stream_url + "config/mode") as resp: - assert "enabled" == json.loads(str(await resp.text()))["value"] - - async with session.get(stream_url + "status/state") as resp: - assert "ready" == json.loads(str(await resp.text()))["value"] - - async with session.put(url + "command/arm") as resp: - assert {"sequence_id": 2} == json.loads(str(await resp.text())) - - async with session.put(url + "command/trigger") as resp: - assert {"sequence_id": 4} == json.loads(str(await resp.text())) diff --git a/tests/eiger/test_eiger_filewriter.py b/tests/eiger/test_eiger_filewriter.py deleted file mode 100644 index e704e7a3..00000000 --- a/tests/eiger/test_eiger_filewriter.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -from tickit_devices.eiger.filewriter.eiger_filewriter import EigerFileWriter - - -@pytest.fixture -def filewriter() -> EigerFileWriter: - return EigerFileWriter() - - -def test_eiger_filewriter_constructor(): - EigerFileWriter() diff --git a/tests/eiger/test_eiger_monitor.py b/tests/eiger/test_eiger_monitor.py deleted file mode 100644 index cec0c242..00000000 --- a/tests/eiger/test_eiger_monitor.py +++ /dev/null @@ -1,12 +0,0 @@ -import pytest - -from tickit_devices.eiger.monitor.eiger_monitor import EigerMonitor - - -@pytest.fixture -def filewriter() -> EigerMonitor: - return EigerMonitor() - - -def test_eiger_monitor_constructor(): - EigerMonitor() diff --git a/tests/eiger/test_eiger_settings.py b/tests/eiger/test_eiger_settings.py index d6ba97b5..ad6fcfa1 100644 --- a/tests/eiger/test_eiger_settings.py +++ b/tests/eiger/test_eiger_settings.py @@ -29,3 +29,33 @@ def test_eiger_settings_set_element(eiger_settings): assert "Li" == eiger_settings.element assert KA_Energy["Li"].value == eiger_settings.photon_energy + assert (1240 / eiger_settings.photon_energy) / 10 == eiger_settings.wavelength + assert 0.5 * eiger_settings.photon_energy == eiger_settings.threshold_energy + + +def test_eiger_settings_set_photon_energy(eiger_settings): + eiger_settings["photon_energy"] = 1000.0 + + assert 1000.0 == eiger_settings.photon_energy + assert "" == eiger_settings.element + assert (1240 / eiger_settings.photon_energy) / 10 == eiger_settings.wavelength + assert 0.5 * eiger_settings.photon_energy == eiger_settings.threshold_energy + + +def test_eiger_settings_set_wavelength(eiger_settings): + eiger_settings["wavelength"] = 1.24 + + assert 1.24 == eiger_settings.wavelength + assert "" == eiger_settings.element + assert 1240 / (eiger_settings.wavelength * 10) == eiger_settings.photon_energy + assert 0.5 * eiger_settings.photon_energy == eiger_settings.threshold_energy + + +def test_eiger_settings_set_count_time(eiger_settings): + eiger_settings["count_time"] = 0.2 + + assert 0.2 == eiger_settings.count_time + assert ( + eiger_settings.count_time + eiger_settings.detector_readout_time + == eiger_settings.frame_time + ) diff --git a/tests/eiger/test_eiger_status.py b/tests/eiger/test_eiger_status.py index 3feef53e..956b94f7 100644 --- a/tests/eiger/test_eiger_status.py +++ b/tests/eiger/test_eiger_status.py @@ -15,4 +15,4 @@ def test_eiger_status_constructor(): def test_eiger_status_getitem(eiger_status): - assert 24.5 == eiger_status["th0_temp"] + assert 24.5 == eiger_status["th0_temp"]["value"] diff --git a/tests/eiger/test_eiger_stream.py b/tests/eiger/test_eiger_stream.py index 81fe6107..b0513047 100644 --- a/tests/eiger/test_eiger_stream.py +++ b/tests/eiger/test_eiger_stream.py @@ -1,12 +1,146 @@ +from typing import Any, List, Mapping, Union + import pytest +from pydantic.v1 import BaseModel +from tickit_devices.eiger.data.dummy_image import Image +from tickit_devices.eiger.data.schema import ( + AcquisitionDetailsHeader, + AcquisitionSeriesFooter, + AcquisitionSeriesHeader, + ImageCharacteristicsHeader, + ImageConfigHeader, + ImageHeader, +) +from tickit_devices.eiger.eiger_settings import EigerSettings from tickit_devices.eiger.stream.eiger_stream import EigerStream @pytest.fixture -def filewriter() -> EigerStream: +def stream() -> EigerStream: return EigerStream() -def test_eiger_stream_constructor(): - EigerStream() +TEST_SERIES_ID = 1 + +MINIMAL_HEADER = [ + AcquisitionSeriesHeader( + header_detail="none", + series=TEST_SERIES_ID, + ) +] + +EIGER_SETTINGS_HEADER = EigerSettings().filtered( + ["flatfield", "pixelmask" "countrate_correction_table"] +) +X_SIZE = EIGER_SETTINGS_HEADER["x_pixels_in_detector"] +Y_SIZE = EIGER_SETTINGS_HEADER["y_pixels_in_detector"] + +BASIC_HEADERS = [ + AcquisitionSeriesHeader( + header_detail="basic", + series=TEST_SERIES_ID, + ), + EIGER_SETTINGS_HEADER, +] + +ALL_HEADERS = [ + AcquisitionSeriesHeader( + header_detail="all", + series=TEST_SERIES_ID, + ), + EIGER_SETTINGS_HEADER, + AcquisitionDetailsHeader( + htype="flatfield-1.0", + shape=(X_SIZE, Y_SIZE), + type="float32", + ), + {"blob": "blob"}, + AcquisitionDetailsHeader( + htype="dpixelmask-1.0", + shape=(X_SIZE, Y_SIZE), + type="uint32", + ), + {"blob": "blob"}, + AcquisitionDetailsHeader( + htype="dcountrate_table-1.0", + shape=(X_SIZE, Y_SIZE), + type="float32", + ), + {"blob": "blob"}, +] + + +END_SERIES_FOOTER = [AcquisitionSeriesFooter(series=TEST_SERIES_ID)] + + +@pytest.mark.parametrize( + "header_detail,expected_headers", + [ + ("none", MINIMAL_HEADER), + ("basic", BASIC_HEADERS), + ("all", ALL_HEADERS), + ], +) +def test_begin_series_produces_correct_headers( + stream: EigerStream, + header_detail: str, + expected_headers: List[Union[BaseModel, bytes, Mapping[str, Any]]], +) -> None: + settings = EigerSettings() + stream.config.header_detail = header_detail + stream.begin_series(settings, TEST_SERIES_ID) + blobs = list(stream.consume_data()) + assert blobs == expected_headers + + +@pytest.mark.parametrize("number_of_times", [1, 2]) +def test_insert_image_produces_correct_headers_and_blobs( + stream: EigerStream, number_of_times: int +) -> None: + for i in range(number_of_times): + image = Image.create_dummy_image(i, (X_SIZE, Y_SIZE)) + stream.insert_image(image, TEST_SERIES_ID) + blobs = list(stream.consume_data()) + assert blobs == expected_image_blobs(image) + + +def test_end_series_produces_correct_headers( + stream: EigerStream, +) -> None: + stream.end_series(TEST_SERIES_ID) + blobs = list(stream.consume_data()) + assert blobs == END_SERIES_FOOTER + + +def test_data_buffered(stream: EigerStream) -> None: + settings = EigerSettings() + stream.config.header_detail = "all" + stream.begin_series(settings, TEST_SERIES_ID) + image = Image.create_dummy_image(0, (X_SIZE, Y_SIZE)) + stream.insert_image(image, TEST_SERIES_ID) + stream.end_series(TEST_SERIES_ID) + blobs = list(stream.consume_data()) + assert blobs == ALL_HEADERS + expected_image_blobs(image) + END_SERIES_FOOTER + + +def expected_image_blobs(image: Image) -> List[Union[bytes, BaseModel]]: + return [ + ImageHeader( + frame=image.index, + hash=image.hash, + series=TEST_SERIES_ID, + ), + ImageCharacteristicsHeader( + encoding=image.encoding, + shape=image.shape, + size=len(image.data), + type=image.dtype, + ), + image.data, + ImageConfigHeader( + real_time=0.0, + start_time=0.0, + stop_time=0.0, + ), + ] diff --git a/tests/eiger/test_eiger_system.py b/tests/eiger/test_eiger_system.py new file mode 100644 index 00000000..5233ad29 --- /dev/null +++ b/tests/eiger/test_eiger_system.py @@ -0,0 +1,205 @@ +import aiohttp +import pytest + +DETECTOR_URL = "http://localhost:8081/detector/api/1.8.0/" +FILE_WRITER_URL = "http://localhost:8081/filewriter/api/1.8.0/" +MONITOR_URL = "http://localhost:8081/monitor/api/1.8.0/" +STREAM_URL = "http://localhost:8081/stream/api/1.8.0/" +REQUEST_TIMEOUT = 1.0 + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "tickit_task", ["examples/configs/eiger/eiger.yaml"], indirect=True +) +async def test_eiger_system(tickit_task): + commands = { + "initialize": {"sequence id": 1}, + "disarm": {"sequence id": 3}, + "cancel": {"sequence id": 5}, + "abort": {"sequence id": 6}, + } + + headers = {"content-type": "application/json"} + + async def get_status(status, expected): + async with session.get( + DETECTOR_URL + f"status/{status}", + timeout=REQUEST_TIMEOUT, + ) as response: + assert expected == (await response.json())["value"] + + async with aiohttp.ClientSession() as session: + await get_status(status="state", expected="na") + + # Test setting config var before Eiger set up + async with session.put( + DETECTOR_URL + "config/element", + headers=headers, + json={"value": "test"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert (await response.json()) == [] + + # Test each command + for key, value in commands.items(): + async with session.put( + DETECTOR_URL + f"command/{key}", + timeout=REQUEST_TIMEOUT, + ) as response: + assert value == (await response.json()) + + # Check status + await get_status(status="doesnt_exist", expected="None") + await get_status(status="board_000/th0_temp", expected=24.5) + await get_status(status="board_000/doesnt_exist", expected="None") + await get_status(status="builder/dcu_buffer_free", expected=0.5) + await get_status(status="builder/doesnt_exist", expected="None") + + # Test Eiger in IDLE state + await get_status(status="state", expected="idle") + + # Test settings/getting config + async with session.get( + DETECTOR_URL + "config/doesnt_exist", + timeout=REQUEST_TIMEOUT, + ) as response: + assert (await response.json())["value"] == "None" + + async with session.put( + DETECTOR_URL + "config/doesnt_exist", + headers=headers, + json={"value": "test"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert (await response.json()) == [] + + async with session.get( + DETECTOR_URL + "config/element", + timeout=REQUEST_TIMEOUT, + ) as response: + assert (await response.json())["value"] == "Co" + + async with session.put( + DETECTOR_URL + "config/element", + headers=headers, + json={"value": "Li"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert (await response.json()) == ["element"] + + async with session.get( + DETECTOR_URL + "config/photon_energy", + timeout=REQUEST_TIMEOUT, + ) as response: + assert 54.3 == (await response.json())["value"] + + async with session.get( + FILE_WRITER_URL + "config/mode", + timeout=REQUEST_TIMEOUT, + ) as response: + assert "enabled" == (await response.json())["value"] + + async with session.put( + FILE_WRITER_URL + "config/mode", + headers=headers, + json={"value": "enabled"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert ["mode"] == (await response.json()) + + async with session.put( + FILE_WRITER_URL + "config/test", + headers=headers, + json={"value": "test"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert [] == (await response.json()) + + # Test filewriter, monitor and stream endpoints + async with session.get( + FILE_WRITER_URL + "status/state", + timeout=REQUEST_TIMEOUT, + ) as response: + assert "ready" == (await response.json())["value"] + + async with session.get( + MONITOR_URL + "config/mode", + timeout=REQUEST_TIMEOUT, + ) as response: + assert "enabled" == (await response.json())["value"] + + async with session.put( + MONITOR_URL + "config/mode", + headers=headers, + json={"value": "enabled"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert ["mode"] == (await response.json()) + + async with session.put( + MONITOR_URL + "config/test", + headers=headers, + json={"value": "test"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert [] == (await response.json()) + + async with session.get( + MONITOR_URL + "status/error", + timeout=REQUEST_TIMEOUT, + ) as response: + assert [] == (await response.json())["value"] + + async with session.get( + STREAM_URL + "config/mode", + timeout=REQUEST_TIMEOUT, + ) as response: + assert "enabled" == (await response.json())["value"] + + async with session.put( + STREAM_URL + "config/mode", + headers=headers, + json={"value": "enabled"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert ["mode"] == (await response.json()) + + async with session.put( + STREAM_URL + "config/test", + headers=headers, + json={"value": "test"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert [] == (await response.json()) + + # Test acquisition in ints mode + async with session.put( + DETECTOR_URL + "config/trigger_mode", + headers=headers, + json={"value": "ints"}, + timeout=REQUEST_TIMEOUT, + ) as response: + assert ["trigger_mode"] == (await response.json()) + + async with session.get( + STREAM_URL + "status/state", + timeout=REQUEST_TIMEOUT, + ) as response: + assert "ready" == (await response.json())["value"] + + assert get_status(status="state", expected="idle") + + async with session.put( + DETECTOR_URL + "command/arm", + timeout=REQUEST_TIMEOUT, + ) as response: + assert {"sequence id": 2} == (await response.json()) + + assert get_status(status="state", expected="ready") + + async with session.put( + DETECTOR_URL + "command/trigger", + timeout=REQUEST_TIMEOUT, + ) as response: + assert {"sequence id": 4} == (await response.json())