diff --git a/pyproject.toml b/pyproject.toml index 7f105afc1..6d0e0324b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,7 +139,7 @@ commands = [tool.ruff] -src = ["src", "tests"] +src = ["src", "tests", "system_tests"] line-length = 88 lint.select = [ "C4", # flake8-comprehensions - https://beta.ruff.rs/docs/rules/#flake8-comprehensions-c4 diff --git a/src/ophyd_async/core/_signal.py b/src/ophyd_async/core/_signal.py index df748c4b9..503c1f9ca 100644 --- a/src/ophyd_async/core/_signal.py +++ b/src/ophyd_async/core/_signal.py @@ -547,7 +547,7 @@ async def wait_for_value( async def set_and_wait_for_other_value( - set_signal: SignalRW[T], + set_signal: SignalW[T], set_value: T, read_signal: SignalR[S], read_value: S, diff --git a/src/ophyd_async/epics/eiger/__init__.py b/src/ophyd_async/epics/eiger/__init__.py new file mode 100644 index 000000000..bf9368859 --- /dev/null +++ b/src/ophyd_async/epics/eiger/__init__.py @@ -0,0 +1,5 @@ +from ._eiger import EigerDetector, EigerTriggerInfo +from ._eiger_controller import EigerController +from ._eiger_io import EigerDriverIO + +__all__ = ["EigerDetector", "EigerController", "EigerDriverIO", "EigerTriggerInfo"] diff --git a/src/ophyd_async/epics/eiger/_eiger.py b/src/ophyd_async/epics/eiger/_eiger.py new file mode 100644 index 000000000..e7d60786e --- /dev/null +++ b/src/ophyd_async/epics/eiger/_eiger.py @@ -0,0 +1,43 @@ +from pydantic import Field + +from ophyd_async.core import AsyncStatus, PathProvider, StandardDetector +from ophyd_async.core._detector import TriggerInfo + +from ._eiger_controller import EigerController +from ._eiger_io import EigerDriverIO +from ._odin_io import Odin, OdinWriter + + +class EigerTriggerInfo(TriggerInfo): + energy_ev: float = Field(gt=0) + + +class EigerDetector(StandardDetector): + """ + Ophyd-async implementation of an Eiger Detector. + """ + + _controller: EigerController + _writer: Odin + + def __init__( + self, + prefix: str, + path_provider: PathProvider, + drv_suffix="-EA-EIGER-01:", + hdf_suffix="-EA-ODIN-01:", + name="", + ): + self.drv = EigerDriverIO(prefix + drv_suffix) + self.odin = Odin(prefix + hdf_suffix + "FP:") + + super().__init__( + EigerController(self.drv), + OdinWriter(path_provider, lambda: self.name, self.odin), + name=name, + ) + + @AsyncStatus.wrap + async def prepare(self, value: EigerTriggerInfo) -> None: + await self._controller.set_energy(value.energy_ev) + await super().prepare(value) diff --git a/src/ophyd_async/epics/eiger/_eiger_controller.py b/src/ophyd_async/epics/eiger/_eiger_controller.py new file mode 100644 index 000000000..fa37bbeba --- /dev/null +++ b/src/ophyd_async/epics/eiger/_eiger_controller.py @@ -0,0 +1,66 @@ +import asyncio +from typing import Optional + +from ophyd_async.core import ( + DEFAULT_TIMEOUT, + AsyncStatus, + DetectorControl, + DetectorTrigger, + set_and_wait_for_other_value, +) + +from ._eiger_io import EigerDriverIO, EigerTriggerMode + +EIGER_TRIGGER_MODE_MAP = { + DetectorTrigger.internal: EigerTriggerMode.internal, + DetectorTrigger.constant_gate: EigerTriggerMode.gate, + DetectorTrigger.variable_gate: EigerTriggerMode.gate, + DetectorTrigger.edge_trigger: EigerTriggerMode.edge, +} + + +class EigerController(DetectorControl): + def __init__( + self, + driver: EigerDriverIO, + ) -> None: + self._drv = driver + + def get_deadtime(self, exposure: float) -> float: + # See https://media.dectris.com/filer_public/30/14/3014704e-5f3b-43ba-8ccf-8ef720e60d2a/240202_usermanual_eiger2.pdf + return 0.0001 + + async def set_energy(self, energy: float, tolerance: float = 0.1): + """Changing photon energy takes some time so only do so if the current energy is + outside the tolerance.""" + current_energy = await self._drv.photon_energy.get_value() + if abs(current_energy - energy) > tolerance: + await self._drv.photon_energy.set(energy) + + @AsyncStatus.wrap + async def arm( + self, + num: int, + trigger: DetectorTrigger = DetectorTrigger.internal, + exposure: Optional[float] = None, + ): + coros = [ + self._drv.trigger_mode.set(EIGER_TRIGGER_MODE_MAP[trigger].value), + self._drv.num_images.set(num), + ] + if exposure is not None: + coros.extend( + [ + self._drv.acquire_time.set(exposure), + self._drv.acquire_period.set(exposure), + ] + ) + await asyncio.gather(*coros) + + # TODO: Detector state should be an enum see https://github.com/DiamondLightSource/eiger-fastcs/issues/43 + await set_and_wait_for_other_value( + self._drv.arm, 1, self._drv.state, "ready", timeout=DEFAULT_TIMEOUT + ) + + async def disarm(self): + await self._drv.disarm.set(1) diff --git a/src/ophyd_async/epics/eiger/_eiger_io.py b/src/ophyd_async/epics/eiger/_eiger_io.py new file mode 100644 index 000000000..1df672592 --- /dev/null +++ b/src/ophyd_async/epics/eiger/_eiger_io.py @@ -0,0 +1,42 @@ +from enum import Enum + +from ophyd_async.core import Device +from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw_rbv, epics_signal_w + + +class EigerTriggerMode(str, Enum): + internal = "ints" + edge = "exts" + gate = "exte" + + +class EigerDriverIO(Device): + def __init__(self, prefix: str, name: str = "") -> None: + self.bit_depth = epics_signal_r(int, f"{prefix}BitDepthReadout") + self.stale_parameters = epics_signal_r(bool, f"{prefix}StaleParameters") + self.state = epics_signal_r(str, f"{prefix}DetectorState") + self.roi_mode = epics_signal_rw_rbv(str, f"{prefix}RoiMode") + + self.acquire_time = epics_signal_rw_rbv(float, f"{prefix}CountTime") + self.acquire_period = epics_signal_rw_rbv(float, f"{prefix}FrameTime") + + self.num_images = epics_signal_rw_rbv(int, f"{prefix}Nimages") + self.num_triggers = epics_signal_rw_rbv(int, f"{prefix}Ntrigger") + + # TODO: Should be EigerTriggerMode enum, see https://github.com/DiamondLightSource/eiger-fastcs/issues/43 + self.trigger_mode = epics_signal_rw_rbv(str, f"{prefix}TriggerMode") + + self.arm = epics_signal_w(int, f"{prefix}Arm") + self.disarm = epics_signal_w(int, f"{prefix}Disarm") + self.abort = epics_signal_w(int, f"{prefix}Abort") + + self.beam_centre_x = epics_signal_rw_rbv(float, f"{prefix}BeamCenterX") + self.beam_centre_y = epics_signal_rw_rbv(float, f"{prefix}BeamCenterY") + + self.det_distance = epics_signal_rw_rbv(float, f"{prefix}DetectorDistance") + self.omega_start = epics_signal_rw_rbv(float, f"{prefix}OmegaStart") + self.omega_increment = epics_signal_rw_rbv(float, f"{prefix}OmegaIncrement") + + self.photon_energy = epics_signal_rw_rbv(float, f"{prefix}PhotonEnergy") + + super().__init__(name) diff --git a/src/ophyd_async/epics/eiger/_odin_io.py b/src/ophyd_async/epics/eiger/_odin_io.py new file mode 100644 index 000000000..0d1b3516d --- /dev/null +++ b/src/ophyd_async/epics/eiger/_odin_io.py @@ -0,0 +1,125 @@ +import asyncio +from enum import Enum +from typing import AsyncGenerator, AsyncIterator, Dict + +from bluesky.protocols import StreamAsset +from event_model.documents.event_descriptor import DataKey + +from ophyd_async.core import ( + DEFAULT_TIMEOUT, + DetectorWriter, + Device, + DeviceVector, + NameProvider, + PathProvider, + observe_value, + set_and_wait_for_value, +) +from ophyd_async.epics.signal import ( + epics_signal_r, + epics_signal_rw, + epics_signal_rw_rbv, +) + + +class Writing(str, Enum): + ON = "ON" + OFF = "OFF" + + +class OdinNode(Device): + def __init__(self, prefix: str, name: str = "") -> None: + self.writing = epics_signal_r(Writing, f"{prefix}HDF:Writing") + self.connected = epics_signal_r(bool, f"{prefix}Connected") + + super().__init__(name) + + +class Odin(Device): + def __init__(self, prefix: str, name: str = "") -> None: + self.nodes = DeviceVector({i: OdinNode(f"{prefix}FP{i}:") for i in range(4)}) + + self.capture = epics_signal_rw( + Writing, f"{prefix}Writing", f"{prefix}ConfigHdfWrite" + ) + self.num_captured = epics_signal_r(int, f"{prefix}FramesWritten") + self.num_to_capture = epics_signal_rw_rbv(int, f"{prefix}ConfigHdfFrames") + + self.start_timeout = epics_signal_rw_rbv(int, f"{prefix}TimeoutTimerPeriod") + + self.image_height = epics_signal_rw_rbv(int, f"{prefix}DatasetDataDims0") + self.image_width = epics_signal_rw_rbv(int, f"{prefix}DatasetDataDims1") + + self.num_row_chunks = epics_signal_rw_rbv(int, f"{prefix}DatasetDataChunks1") + self.num_col_chunks = epics_signal_rw_rbv(int, f"{prefix}DatasetDataChunks2") + + self.file_path = epics_signal_rw_rbv(str, f"{prefix}ConfigHdfFilePath") + self.file_name = epics_signal_rw_rbv(str, f"{prefix}ConfigHdfFilePrefix") + + self.acquisition_id = epics_signal_rw_rbv( + str, f"{prefix}ConfigHdfAcquisitionId" + ) + + self.data_type = epics_signal_rw_rbv(str, f"{prefix}DatasetDataDatatype") + + super().__init__(name) + + +class OdinWriter(DetectorWriter): + def __init__( + self, + path_provider: PathProvider, + name_provider: NameProvider, + odin_driver: Odin, + ) -> None: + self._drv = odin_driver + self._path_provider = path_provider + self._name_provider = name_provider + super().__init__() + + async def open(self, multiplier: int = 1) -> Dict[str, DataKey]: + info = self._path_provider(device_name=self._name_provider()) + + await asyncio.gather( + self._drv.file_path.set(str(info.directory_path)), + self._drv.file_name.set(info.filename), + self._drv.data_type.set( + "uint16" + ), # TODO: Get from eiger https://github.com/bluesky/ophyd-async/issues/529 + self._drv.num_to_capture.set(0), + ) + + await self._drv.capture.set(Writing.ON) + + return await self._describe() + + async def _describe(self) -> Dict[str, DataKey]: + data_shape = await asyncio.gather( + self._drv.image_height.get_value(), self._drv.image_width.get_value() + ) + + return { + "data": DataKey( + source=self._drv.file_name.source, + shape=data_shape, + dtype="array", + dtype_numpy=" AsyncGenerator[int, None]: + async for num_captured in observe_value(self._drv.num_captured, timeout): + yield num_captured + + async def get_indices_written(self) -> int: + return await self._drv.num_captured.get_value() + + def collect_stream_docs(self, indices_written: int) -> AsyncIterator[StreamAsset]: + # TODO: Correctly return stream https://github.com/bluesky/ophyd-async/issues/530 + raise NotImplementedError() + + async def close(self) -> None: + await set_and_wait_for_value(self._drv.capture, Writing.OFF) diff --git a/system_tests/epics/eiger/README.md b/system_tests/epics/eiger/README.md new file mode 100644 index 000000000..8080a5339 --- /dev/null +++ b/system_tests/epics/eiger/README.md @@ -0,0 +1,8 @@ +This system test runs against the eiger tickit sim. To run it: + +0. Ensure you have disabled SELinux (https://dev-portal.diamond.ac.uk/guide/containers/tutorials/podman/#enable-use-of-vscode-features) +1. Run `podman run --rm -it -v /dev/shm:/dev/shm -v /tmp:/tmp --net=host ghcr.io/diamondlightsource/eiger-detector-runtime:1.16.0beta5` this will bring up the simulator itself. +2. In a separate terminal load a python environment with `ophyd-async` in it +3. `cd system_tests/epics/eiger` and `./start_iocs_and_run_tests.sh` + + diff --git a/system_tests/epics/eiger/start_iocs_and_run_tests.sh b/system_tests/epics/eiger/start_iocs_and_run_tests.sh new file mode 100755 index 000000000..ae9cdcec7 --- /dev/null +++ b/system_tests/epics/eiger/start_iocs_and_run_tests.sh @@ -0,0 +1,25 @@ + +host=$(hostname | tr -cd '[:digit:]') +export eiger_ioc=EIGER-$host +export odin_ioc=ODIN-$host + +mkdir /tmp/opi + +if command -v docker &> /dev/null; then + DOCKER_COMMAND=docker +else + DOCKER_COMMAND=podman +fi + +echo "Using $DOCKER_COMMAND" + +$DOCKER_COMMAND run --rm --name=$eiger_ioc -dt --net=host -v /tmp/opi/:/epics/opi ghcr.io/diamondlightsource/eiger-fastcs:0.1.0beta5 ioc $eiger_ioc + +$DOCKER_COMMAND run --rm --name=$odin_ioc -dt --net=host -v /tmp/opi/:/epics/opi ghcr.io/diamondlightsource/odin-fastcs:0.2.0beta2 ioc $odin_ioc + +sleep 1 + +pytest . + +podman kill $eiger_ioc +podman kill $odin_ioc diff --git a/system_tests/epics/eiger/test_eiger_system.py b/system_tests/epics/eiger/test_eiger_system.py new file mode 100644 index 000000000..8d20c1a70 --- /dev/null +++ b/system_tests/epics/eiger/test_eiger_system.py @@ -0,0 +1,92 @@ +import asyncio +import os +from pathlib import Path + +import h5py +import pytest +from bluesky.run_engine import RunEngine + +from ophyd_async.core import ( + DetectorTrigger, + Device, + DeviceCollector, + StaticPathProvider, +) +from ophyd_async.epics.eiger import EigerDetector, EigerTriggerInfo +from ophyd_async.epics.signal import epics_signal_rw + +SAVE_PATH = "/tmp" + + +class SetupDevice(Device): + """Holds PVs that we would either expect to be initially set and + never change or to be externally set in prod.""" + + def __init__(self, eiger_prefix: str, odin_prefix: str) -> None: + self.trigger = epics_signal_rw(int, f"{eiger_prefix}Trigger") + self.header_detail = epics_signal_rw(str, f"{eiger_prefix}HeaderDetail") + self.compression = epics_signal_rw(str, f"{odin_prefix}DatasetDataCompression") + self.frames_per_block = epics_signal_rw( + int, f"{odin_prefix}ProcessFramesPerBlock" + ) + self.blocks_per_file = epics_signal_rw( + int, f"{odin_prefix}ProcessBlocksPerFile" + ) + super().__init__("") + + +@pytest.fixture +def ioc_prefixes(): + return os.environ["eiger_ioc"] + ":", os.environ["odin_ioc"] + ":" + + +@pytest.fixture +def RE(): + return RunEngine() + + +@pytest.fixture +async def setup_device(RE, ioc_prefixes): + async with DeviceCollector(): + device = SetupDevice(ioc_prefixes[0], ioc_prefixes[1] + "FP:") + await asyncio.gather( + device.header_detail.set("all"), + device.compression.set("BSLZ4"), + device.frames_per_block.set(1000), + device.blocks_per_file.set(1), + ) + + return device + + +@pytest.fixture +async def test_eiger(RE, ioc_prefixes) -> EigerDetector: + provider = StaticPathProvider(lambda: "test_eiger", Path(SAVE_PATH)) + async with DeviceCollector(): + test_eiger = EigerDetector("", provider, ioc_prefixes[0], ioc_prefixes[1]) + + return test_eiger + + +async def test_trigger_saves_file(test_eiger: EigerDetector, setup_device: SetupDevice): + single_shot = EigerTriggerInfo( + frame_timeout=None, + number=1, + trigger=DetectorTrigger.internal, + deadtime=None, + livetime=None, + energy_ev=10000, + ) + + await test_eiger.stage() + await test_eiger.prepare(single_shot) + # Need to work out what the hold up is in prepare so we cant do this straight away. + # File path propogation? + await asyncio.sleep(0.5) + await setup_device.trigger.set(1) + await asyncio.sleep(0.5) # Need to work out when it's actually finished writing + await test_eiger.unstage() + + with h5py.File(SAVE_PATH + "/test_eiger_000001.h5") as f: + assert "data" in f.keys() + assert len(f["data"]) == 1 diff --git a/tests/epics/eiger/test_eiger_controller.py b/tests/epics/eiger/test_eiger_controller.py new file mode 100644 index 000000000..70f307816 --- /dev/null +++ b/tests/epics/eiger/test_eiger_controller.py @@ -0,0 +1,114 @@ +from unittest.mock import ANY, patch + +from pytest import fixture, raises + +from ophyd_async.core import ( + DeviceCollector, + callback_on_mock_put, + get_mock_put, + set_mock_value, +) +from ophyd_async.epics.eiger._eiger_controller import EigerController +from ophyd_async.epics.eiger._eiger_io import EigerDriverIO + +DriverAndController = tuple[EigerDriverIO, EigerController] + + +@fixture +def eiger_driver_and_controller_no_arm(RE) -> DriverAndController: + with DeviceCollector(mock=True): + driver = EigerDriverIO("") + controller = EigerController(driver) + + return driver, controller + + +@fixture +def eiger_driver_and_controller( + eiger_driver_and_controller_no_arm: DriverAndController, +) -> DriverAndController: + driver, controller = eiger_driver_and_controller_no_arm + + def become_ready_on_arm(*args, **kwargs): + if args[0] == 1: + set_mock_value(driver.state, "ready") + + callback_on_mock_put(driver.arm, become_ready_on_arm) + + return driver, controller + + +async def test_when_arm_with_exposure_then_time_and_period_set( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + test_exposure = 0.002 + await controller.arm(10, exposure=test_exposure) + assert (await driver.acquire_period.get_value()) == test_exposure + assert (await driver.acquire_time.get_value()) == test_exposure + + +async def test_when_arm_with_no_exposure_then_arm_set_correctly( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + await controller.arm(10, exposure=None) + get_mock_put(driver.arm).assert_called_once_with(1, wait=ANY, timeout=ANY) + + +async def test_when_arm_with_number_of_images_then_number_of_images_set_correctly( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + test_number_of_images = 40 + await controller.arm(test_number_of_images, exposure=None) + get_mock_put(driver.num_images).assert_called_once_with( + test_number_of_images, wait=ANY, timeout=ANY + ) + + +@patch("ophyd_async.epics.eiger._eiger_controller.DEFAULT_TIMEOUT", 0.1) +async def test_given_detector_fails_to_go_ready_when_arm_called_then_fails( + eiger_driver_and_controller_no_arm: DriverAndController, +): + driver, controller = eiger_driver_and_controller_no_arm + with raises(TimeoutError): + await controller.arm(10) + + +async def test_when_disarm_called_on_controller_then_disarm_called_on_driver( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + await controller.disarm() + get_mock_put(driver.disarm).assert_called_once_with(1, wait=ANY, timeout=ANY) + + +async def test_when_get_deadtime_called_then_returns_expected_deadtime( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + assert controller.get_deadtime(0) == 0.0001 + + +async def test_given_energy_within_tolerance_when_photon_energy_set_then_pv_unchanged( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + initial_energy = 10 + set_mock_value(driver.photon_energy, initial_energy) + await controller.set_energy(10.002) + get_mock_put(driver.photon_energy).assert_not_called() + assert (await driver.photon_energy.get_value()) == initial_energy + + +async def test_given_energy_outside_tolerance_when_photon_energy_set_then_pv_changed( + eiger_driver_and_controller: DriverAndController, +): + driver, controller = eiger_driver_and_controller + initial_energy = 10 + new_energy = 15 + set_mock_value(driver.photon_energy, initial_energy) + await controller.set_energy(new_energy) + get_mock_put(driver.photon_energy).assert_called_once() + assert (await driver.photon_energy.get_value()) == new_energy diff --git a/tests/epics/eiger/test_eiger_detector.py b/tests/epics/eiger/test_eiger_detector.py new file mode 100644 index 000000000..33cfef805 --- /dev/null +++ b/tests/epics/eiger/test_eiger_detector.py @@ -0,0 +1,38 @@ +from unittest.mock import ANY, AsyncMock, MagicMock + +import pytest + +from ophyd_async.core import DetectorTrigger, DeviceCollector, get_mock_put +from ophyd_async.epics.eiger import EigerDetector, EigerTriggerInfo + + +@pytest.fixture +def detector(RE): + with DeviceCollector(mock=True): + detector = EigerDetector("BL03I", MagicMock()) + return detector + + +def test_when_detector_initialised_then_driver_and_odin_have_expected_prefixes( + detector, +): + assert "BL03I-EA-EIGER-01:" in detector.drv.arm.source + assert "BL03I-EA-ODIN-01:FP:" in detector.odin.acquisition_id.source + + +async def test_when_prepared_with_energy_then_energy_set_on_detector(detector): + detector.controller.arm = AsyncMock() + await detector.prepare( + EigerTriggerInfo( + frame_timeout=None, + number=1, + trigger=DetectorTrigger.internal, + deadtime=None, + livetime=None, + energy_ev=10000, + ) + ) + + get_mock_put(detector.drv.photon_energy).assert_called_once_with( + 10000, wait=ANY, timeout=ANY + ) diff --git a/tests/epics/eiger/test_odin_io.py b/tests/epics/eiger/test_odin_io.py new file mode 100644 index 000000000..1ef9c5944 --- /dev/null +++ b/tests/epics/eiger/test_odin_io.py @@ -0,0 +1,75 @@ +from pathlib import Path +from unittest.mock import ANY, MagicMock + +import pytest + +from ophyd_async.core import DeviceCollector, get_mock_put, set_mock_value +from ophyd_async.epics.eiger._odin_io import Odin, OdinWriter, Writing + +OdinDriverAndWriter = tuple[Odin, OdinWriter] + + +@pytest.fixture +def odin_driver_and_writer(RE) -> OdinDriverAndWriter: + with DeviceCollector(mock=True): + driver = Odin("") + writer = OdinWriter(MagicMock(), lambda: "odin", driver) + return driver, writer + + +async def test_when_open_called_then_file_correctly_set( + odin_driver_and_writer: OdinDriverAndWriter, +): + driver, writer = odin_driver_and_writer + path_info = writer._path_provider.return_value + expected_path = "/tmp" + expected_filename = "filename.h5" + path_info.directory_path = Path(expected_path) + path_info.filename = expected_filename + + await writer.open() + + get_mock_put(driver.file_path).assert_called_once_with( + expected_path, wait=ANY, timeout=ANY + ) + get_mock_put(driver.file_name).assert_called_once_with( + expected_filename, wait=ANY, timeout=ANY + ) + + +async def test_when_open_called_then_all_expected_signals_set( + odin_driver_and_writer: OdinDriverAndWriter, +): + driver, writer = odin_driver_and_writer + await writer.open() + + get_mock_put(driver.data_type).assert_called_once_with( + "uint16", wait=ANY, timeout=ANY + ) + get_mock_put(driver.num_to_capture).assert_called_once_with( + 0, wait=ANY, timeout=ANY + ) + + get_mock_put(driver.capture).assert_called_once_with( + Writing.ON, wait=ANY, timeout=ANY + ) + + +async def test_given_data_shape_set_when_open_called_then_describe_has_correct_shape( + odin_driver_and_writer: OdinDriverAndWriter, +): + driver, writer = odin_driver_and_writer + set_mock_value(driver.image_width, 1024) + set_mock_value(driver.image_height, 768) + description = await writer.open() + assert description["data"]["shape"] == [768, 1024] + + +async def test_when_closed_then_data_capture_turned_off( + odin_driver_and_writer: OdinDriverAndWriter, +): + driver, writer = odin_driver_and_writer + await writer.close() + get_mock_put(driver.capture).assert_called_once_with( + Writing.OFF, wait=ANY, timeout=ANY + )