diff --git a/src/dodal/beamline_specific_utils/i03.py b/src/dodal/beamline_specific_utils/i03.py index 455bab41ef..30db44c2a9 100644 --- a/src/dodal/beamline_specific_utils/i03.py +++ b/src/dodal/beamline_specific_utils/i03.py @@ -1,7 +1,5 @@ from dataclasses import dataclass -from dodal.devices.aperturescatterguard import SingleAperturePosition - I03_BEAM_HEIGHT_UM = 20 @@ -11,6 +9,5 @@ class BeamSize: y_um: float | None -def beam_size_from_aperture(position: SingleAperturePosition): - aperture_size = position.radius_microns +def beam_size_from_aperture(aperture_size: float | None): return BeamSize(aperture_size, I03_BEAM_HEIGHT_UM if aperture_size else None) diff --git a/src/dodal/beamlines/i03.py b/src/dodal/beamlines/i03.py index 00fecdaccf..807b3f98f2 100644 --- a/src/dodal/beamlines/i03.py +++ b/src/dodal/beamlines/i03.py @@ -9,9 +9,9 @@ from dodal.common.beamlines.beamline_utils import set_beamline as set_utils_beamline from dodal.common.udc_directory_provider import PandASubdirectoryProvider from dodal.devices.aperturescatterguard import ( + AperturePosition, ApertureScatterguard, load_positions_from_beamline_parameters, - load_tolerances_from_beamline_params, ) from dodal.devices.attenuator import Attenuator from dodal.devices.backlight import Backlight @@ -71,7 +71,7 @@ def aperture_scatterguard( wait=wait_for_connection, fake=fake_with_ophyd_sim, loaded_positions=load_positions_from_beamline_parameters(params), - tolerances=load_tolerances_from_beamline_params(params), + tolerances=AperturePosition.tolerances_from_gda_params(params), ) diff --git a/src/dodal/beamlines/i04.py b/src/dodal/beamlines/i04.py index a5e12522cc..71f4dbe212 100644 --- a/src/dodal/beamlines/i04.py +++ b/src/dodal/beamlines/i04.py @@ -2,9 +2,9 @@ from dodal.common.beamlines.beamline_utils import device_instantiation from dodal.common.beamlines.beamline_utils import set_beamline as set_utils_beamline from dodal.devices.aperturescatterguard import ( + AperturePosition, ApertureScatterguard, load_positions_from_beamline_parameters, - load_tolerances_from_beamline_params, ) from dodal.devices.attenuator import Attenuator from dodal.devices.backlight import Backlight @@ -236,7 +236,7 @@ def aperture_scatterguard( wait=wait_for_connection, fake=fake_with_ophyd_sim, loaded_positions=load_positions_from_beamline_parameters(params), - tolerances=load_tolerances_from_beamline_params(params), + tolerances=AperturePosition.tolerances_from_gda_params(params), ) diff --git a/src/dodal/common/beamlines/beamline_parameters.py b/src/dodal/common/beamlines/beamline_parameters.py index 5a7d2f4f18..d103a9a5de 100644 --- a/src/dodal/common/beamlines/beamline_parameters.py +++ b/src/dodal/common/beamlines/beamline_parameters.py @@ -15,6 +15,9 @@ class GDABeamlineParameters: params: dict[str, Any] + def __init__(self, params: dict[str, Any]): + self.params = params + def __repr__(self) -> str: return repr(self.params) @@ -23,7 +26,6 @@ def __getitem__(self, item: str): @classmethod def from_lines(cls, file_name: str, config_lines: list[str]): - ob = cls() config_lines_nocomments = [line.split("#", 1)[0] for line in config_lines] config_lines_sep_key_and_value = [ # XXX removes all whitespace instead of just trim @@ -46,8 +48,7 @@ def from_lines(cls, file_name: str, config_lines: list[str]): except Exception as e: LOGGER.warning(f"Unable to parse {file_name} line {i}: {e}") - ob.params = dict(config_pairs) - return ob + return cls(params=dict(config_pairs)) @classmethod def from_file(cls, path: str): diff --git a/src/dodal/devices/aperturescatterguard.py b/src/dodal/devices/aperturescatterguard.py index e85cf727dd..90597d532f 100644 --- a/src/dodal/devices/aperturescatterguard.py +++ b/src/dodal/devices/aperturescatterguard.py @@ -1,12 +1,16 @@ +from __future__ import annotations + import asyncio -from collections import namedtuple -from dataclasses import asdict, dataclass from enum import Enum -from bluesky.protocols import Movable, Reading -from event_model.documents.event_descriptor import DataKey -from ophyd_async.core import AsyncStatus, HintedSignal, SignalR, StandardReadable -from ophyd_async.core.soft_signal_backend import SoftConverter, SoftSignalBackend +from bluesky.protocols import Movable +from ophyd_async.core import ( + AsyncStatus, + HintedSignal, + StandardReadable, + soft_signal_rw, +) +from pydantic import BaseModel, Field from dodal.common.beamlines.beamline_parameters import GDABeamlineParameters from dodal.devices.aperture import Aperture @@ -17,100 +21,76 @@ class InvalidApertureMove(Exception): pass -ApertureFiveDimensionalLocation = namedtuple( - "ApertureFiveDimensionalLocation", - [ - "aperture_x", - "aperture_y", - "aperture_z", - "scatterguard_x", - "scatterguard_y", - ], -) - +class AperturePosition(BaseModel): + aperture_x: float + aperture_y: float + aperture_z: float + scatterguard_x: float + scatterguard_y: float + radius: float | None = Field(json_schema_extra={"units": "µm"}, default=None) + + @property + def values(self) -> tuple[float, float, float, float, float]: + return ( + self.aperture_x, + self.aperture_y, + self.aperture_z, + self.scatterguard_x, + self.scatterguard_y, + ) -@dataclass -class ApertureScatterguardTolerances: - ap_x: float - ap_y: float - ap_z: float - sg_x: float - sg_y: float + @staticmethod + def tolerances_from_gda_params( + params: GDABeamlineParameters, + ) -> AperturePosition: + return AperturePosition( + aperture_x=params["miniap_x_tolerance"], + aperture_y=params["miniap_y_tolerance"], + aperture_z=params["miniap_z_tolerance"], + scatterguard_x=params["sg_x_tolerance"], + scatterguard_y=params["sg_y_tolerance"], + ) + @staticmethod + def from_gda_params( + name: ApertureValue, + radius: float | None, + params: GDABeamlineParameters, + ) -> AperturePosition: + return AperturePosition( + aperture_x=params[f"miniap_x_{name.value}"], + aperture_y=params[f"miniap_y_{name.value}"], + aperture_z=params[f"miniap_z_{name.value}"], + scatterguard_x=params[f"sg_x_{name.value}"], + scatterguard_y=params[f"sg_y_{name.value}"], + radius=radius, + ) -@dataclass -class SingleAperturePosition: - name: str - GDA_name: str - radius_microns: float | None - location: ApertureFiveDimensionalLocation +class ApertureValue(str, Enum): + """Maps from a short usable name to the value name in the GDA Beamline parameters""" -# Use StrEnum once we stop python 3.10 support -class AperturePositionGDANames(str, Enum): - LARGE_APERTURE = "LARGE_APERTURE" - MEDIUM_APERTURE = "MEDIUM_APERTURE" - SMALL_APERTURE = "SMALL_APERTURE" ROBOT_LOAD = "ROBOT_LOAD" - - def __str__(self): - return str(self.value) - - -def position_from_params( - name: str, - GDA_name: AperturePositionGDANames, - radius_microns: float | None, - params: GDABeamlineParameters, -) -> SingleAperturePosition: - return SingleAperturePosition( - name, - GDA_name, - radius_microns, - ApertureFiveDimensionalLocation( - params[f"miniap_x_{GDA_name}"], - params[f"miniap_y_{GDA_name}"], - params[f"miniap_z_{GDA_name}"], - params[f"sg_x_{GDA_name}"], - params[f"sg_y_{GDA_name}"], - ), - ) - - -def load_tolerances_from_beamline_params( - params: GDABeamlineParameters, -) -> ApertureScatterguardTolerances: - return ApertureScatterguardTolerances( - ap_x=params["miniap_x_tolerance"], - ap_y=params["miniap_y_tolerance"], - ap_z=params["miniap_z_tolerance"], - sg_x=params["sg_x_tolerance"], - sg_y=params["sg_y_tolerance"], - ) - - -class AperturePosition(Enum): - ROBOT_LOAD = 0 - SMALL = 1 - MEDIUM = 2 - LARGE = 3 + SMALL = "SMALL_APERTURE" + MEDIUM = "MEDIUM_APERTURE" + LARGE = "LARGE_APERTURE" def load_positions_from_beamline_parameters( params: GDABeamlineParameters, -) -> dict[AperturePosition, SingleAperturePosition]: +) -> dict[ApertureValue, AperturePosition]: return { - AperturePosition.ROBOT_LOAD: position_from_params( - "Robot load", AperturePositionGDANames.ROBOT_LOAD, None, params + ApertureValue.ROBOT_LOAD: AperturePosition.from_gda_params( + ApertureValue.ROBOT_LOAD, None, params ), - AperturePosition.SMALL: position_from_params( - "Small", AperturePositionGDANames.SMALL_APERTURE, 20, params + ApertureValue.SMALL: AperturePosition.from_gda_params( + ApertureValue.SMALL, 20, params ), - AperturePosition.MEDIUM: position_from_params( - "Medium", AperturePositionGDANames.MEDIUM_APERTURE, 50, params + ApertureValue.MEDIUM: AperturePosition.from_gda_params( + ApertureValue.MEDIUM, 50, params ), - AperturePosition.LARGE: position_from_params( - "Large", AperturePositionGDANames.LARGE_APERTURE, 100, params + ApertureValue.LARGE: AperturePosition.from_gda_params( + ApertureValue.LARGE, 100, params ), } @@ -118,119 +98,86 @@ def load_positions_from_beamline_parameters( class ApertureScatterguard(StandardReadable, Movable): def __init__( self, - loaded_positions: dict[AperturePosition, SingleAperturePosition], - tolerances: ApertureScatterguardTolerances, + loaded_positions: dict[ApertureValue, AperturePosition], + tolerances: AperturePosition, prefix: str = "", name: str = "", ) -> None: - self._aperture = Aperture(prefix + "-MO-MAPT-01:") - self._scatterguard = Scatterguard(prefix + "-MO-SCAT-01:") + self.aperture = Aperture(prefix + "-MO-MAPT-01:") + self.scatterguard = Scatterguard(prefix + "-MO-SCAT-01:") + self.radius = soft_signal_rw(float, units="µm") self._loaded_positions = loaded_positions self._tolerances = tolerances - aperture_backend = SoftSignalBackend( - SingleAperturePosition, self._loaded_positions[AperturePosition.ROBOT_LOAD] + self.add_readables( + [ + self.aperture.x.user_readback, + self.aperture.y.user_readback, + self.aperture.z.user_readback, + self.scatterguard.x.user_readback, + self.scatterguard.y.user_readback, + self.radius, + ], ) - aperture_backend.converter = self.ApertureConverter() - self.selected_aperture = self.SelectedAperture(backend=aperture_backend) - self.add_readables([self.selected_aperture], wrapper=HintedSignal) - super().__init__(name) - - class ApertureConverter(SoftConverter): - # Ophyd-async #311 should add a default converter for dataclasses to do this - def reading( - self, value: SingleAperturePosition, timestamp: float, severity: int - ) -> Reading: - return Reading( - value=asdict(value), - timestamp=timestamp, - alarm_severity=-1 if severity > 2 else severity, - ) + with self.add_children_as_readables(HintedSignal): + self.selected_aperture = soft_signal_rw(ApertureValue) - class SelectedAperture(SignalR): - async def read(self, *args, **kwargs): - assert isinstance(self.parent, ApertureScatterguard) - assert self._backend - await self._backend.put(await self.parent.get_current_aperture_position()) - return {self.name: await self._backend.get_reading()} - - async def describe(self) -> dict[str, DataKey]: - return { - self._name: DataKey( - dtype="array", - shape=[ - -1, - ], # TODO describe properly - see https://github.com/DiamondLightSource/dodal/issues/253, - source=self._backend.source(self._name), # type: ignore - ) - } + super().__init__(name) def get_position_from_gda_aperture_name( - self, gda_aperture_name: AperturePositionGDANames - ) -> AperturePosition: - for aperture, detail in self._loaded_positions.items(): - if detail.GDA_name == gda_aperture_name: - return aperture - raise ValueError( - f"Tried to convert unknown aperture name {gda_aperture_name} to a SingleAperturePosition" - ) - - def get_gda_name_for_position(self, position: AperturePosition) -> str: - detailed_position = self._loaded_positions[position] - return detailed_position.GDA_name + self, gda_aperture_name: str + ) -> ApertureValue: + return ApertureValue(gda_aperture_name) @AsyncStatus.wrap - async def set(self, value: AperturePosition): + async def set(self, value: ApertureValue): position = self._loaded_positions[value] - await self._safe_move_within_datacollection_range(position.location) - - def _get_motor_list(self): - return [ - self._aperture.x, - self._aperture.y, - self._aperture.z, - self._scatterguard.x, - self._scatterguard.y, - ] + await self._safe_move_within_datacollection_range(position, value) @AsyncStatus.wrap - async def _set_raw_unsafe(self, positions: ApertureFiveDimensionalLocation): + async def _set_raw_unsafe(self, position: AperturePosition): """Accept the risks and move in an unsafe way. Collisions possible.""" + if position.radius is not None: + await self.radius.set(position.radius) - # unpacking the position - aperture_x, aperture_y, aperture_z, scatterguard_x, scatterguard_y = positions + aperture_x, aperture_y, aperture_z, scatterguard_x, scatterguard_y = ( + position.values + ) await asyncio.gather( - self._aperture.x.set(aperture_x), - self._aperture.y.set(aperture_y), - self._aperture.z.set(aperture_z), - self._scatterguard.x.set(scatterguard_x), - self._scatterguard.y.set(scatterguard_y), + self.aperture.x.set(aperture_x), + self.aperture.y.set(aperture_y), + self.aperture.z.set(aperture_z), + self.scatterguard.x.set(scatterguard_x), + self.scatterguard.y.set(scatterguard_y), ) + try: + value = await self.get_current_aperture_position() + self.selected_aperture.set(value) + except InvalidApertureMove: + self.selected_aperture.set(None) # type: ignore - async def get_current_aperture_position(self) -> SingleAperturePosition: + async def get_current_aperture_position(self) -> ApertureValue: """ Returns the current aperture position using readback values for SMALL, MEDIUM, LARGE. ROBOT_LOAD position defined when mini aperture y <= ROBOT_LOAD.location.aperture_y + tolerance. If no position is found then raises InvalidApertureMove. """ - current_ap_y = await self._aperture.y.user_readback.get_value(cached=False) - robot_load_ap_y = self._loaded_positions[ - AperturePosition.ROBOT_LOAD - ].location.aperture_y - if await self._aperture.large.get_value(cached=False) == 1: - return self._loaded_positions[AperturePosition.LARGE] - elif await self._aperture.medium.get_value(cached=False) == 1: - return self._loaded_positions[AperturePosition.MEDIUM] - elif await self._aperture.small.get_value(cached=False) == 1: - return self._loaded_positions[AperturePosition.SMALL] - elif current_ap_y <= robot_load_ap_y + self._tolerances.ap_y: - return self._loaded_positions[AperturePosition.ROBOT_LOAD] + current_ap_y = await self.aperture.y.user_readback.get_value(cached=False) + robot_load_ap_y = self._loaded_positions[ApertureValue.ROBOT_LOAD].aperture_y + if await self.aperture.large.get_value(cached=False) == 1: + return ApertureValue.LARGE + elif await self.aperture.medium.get_value(cached=False) == 1: + return ApertureValue.MEDIUM + elif await self.aperture.small.get_value(cached=False) == 1: + return ApertureValue.SMALL + elif current_ap_y <= robot_load_ap_y + self._tolerances.aperture_y: + return ApertureValue.ROBOT_LOAD raise InvalidApertureMove("Current aperture/scatterguard state unrecognised") async def _safe_move_within_datacollection_range( - self, pos: ApertureFiveDimensionalLocation + self, position: AperturePosition, value: ApertureValue ): """ Move the aperture and scatterguard combo safely to a new position. @@ -238,45 +185,50 @@ async def _safe_move_within_datacollection_range( for why this is required. """ assert self._loaded_positions is not None - # unpacking the position - aperture_x, aperture_y, aperture_z, scatterguard_x, scatterguard_y = pos - ap_z_in_position = await self._aperture.z.motor_done_move.get_value() + ap_z_in_position = await self.aperture.z.motor_done_move.get_value() if not ap_z_in_position: raise InvalidApertureMove( "ApertureScatterguard z is still moving. Wait for it to finish " "before triggering another move." ) - current_ap_z = await self._aperture.z.user_readback.get_value() - diff_on_z = abs(current_ap_z - aperture_z) - if diff_on_z > self._tolerances.ap_z: + current_ap_z = await self.aperture.z.user_readback.get_value() + diff_on_z = abs(current_ap_z - position.aperture_z) + if diff_on_z > self._tolerances.aperture_z: raise InvalidApertureMove( "ApertureScatterguard safe move is not yet defined for positions " "outside of LARGE, MEDIUM, SMALL, ROBOT_LOAD. " - f"Current aperture z ({current_ap_z}), outside of tolerance ({self._tolerances.ap_z}) from target ({aperture_z})." + f"Current aperture z ({current_ap_z}), outside of tolerance ({self._tolerances.aperture_z}) from target ({position.aperture_z})." ) - current_ap_y = await self._aperture.y.user_readback.get_value() + current_ap_y = await self.aperture.y.user_readback.get_value() + if position.radius is not None: + await self.radius.set(position.radius) + + aperture_x, aperture_y, aperture_z, scatterguard_x, scatterguard_y = ( + position.values + ) - if aperture_y > current_ap_y: + if position.aperture_y > current_ap_y: await asyncio.gather( - self._scatterguard.x.set(scatterguard_x), - self._scatterguard.y.set(scatterguard_y), + self.scatterguard.x.set(scatterguard_x), + self.scatterguard.y.set(scatterguard_y), ) await asyncio.gather( - self._aperture.x.set(aperture_x), - self._aperture.y.set(aperture_y), - self._aperture.z.set(aperture_z), + self.aperture.x.set(aperture_x), + self.aperture.y.set(aperture_y), + self.aperture.z.set(aperture_z), ) return await asyncio.gather( - self._aperture.x.set(aperture_x), - self._aperture.y.set(aperture_y), - self._aperture.z.set(aperture_z), + self.aperture.x.set(aperture_x), + self.aperture.y.set(aperture_y), + self.aperture.z.set(aperture_z), ) await asyncio.gather( - self._scatterguard.x.set(scatterguard_x), - self._scatterguard.y.set(scatterguard_y), + self.scatterguard.x.set(scatterguard_x), + self.scatterguard.y.set(scatterguard_y), ) + await self.selected_aperture.set(value) diff --git a/tests/devices/system_tests/test_aperturescatterguard_system.py b/tests/devices/system_tests/test_aperturescatterguard_system.py index e812c6d06f..f9adfe86b3 100644 --- a/tests/devices/system_tests/test_aperturescatterguard_system.py +++ b/tests/devices/system_tests/test_aperturescatterguard_system.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Any, cast - import bluesky.plan_stubs as bps import pytest from bluesky.callbacks import CallbackBase @@ -9,12 +7,13 @@ from event_model import Event from ophyd_async.core import DeviceCollector +from dodal.common.beamlines.beamline_parameters import GDABeamlineParameters from dodal.devices.aperturescatterguard import ( AperturePosition, ApertureScatterguard, + ApertureValue, InvalidApertureMove, load_positions_from_beamline_parameters, - load_tolerances_from_beamline_params, ) I03_BEAMLINE_PARAMETER_PATH = ( @@ -23,48 +22,11 @@ BEAMLINE_PARAMETER_KEYWORDS = ["FB", "FULL", "deadtime"] -class GDABeamlineParameters: - params: dict[str, Any] - - def __repr__(self) -> str: - return repr(self.params) - - def __getitem__(self, item: str): - return self.params[item] - - @classmethod - def from_file(cls, path: str): - ob = cls() - with open(path) as f: - config_lines = f.readlines() - config_lines_nocomments = [line.split("#", 1)[0] for line in config_lines] - config_lines_sep_key_and_value = [ - line.translate(str.maketrans("", "", " \n\t\r")).split("=") - for line in config_lines_nocomments - ] - config_pairs: list[tuple[str, Any]] = [ - cast(tuple[str, Any], param) - for param in config_lines_sep_key_and_value - if len(param) == 2 - ] - for i, (_, value) in enumerate(config_pairs): - if value == "Yes": - config_pairs[i] = (config_pairs[i][0], True) - elif value == "No": - config_pairs[i] = (config_pairs[i][0], False) - elif value in BEAMLINE_PARAMETER_KEYWORDS: - pass - else: - config_pairs[i] = (config_pairs[i][0], float(config_pairs[i][1])) - ob.params = dict(config_pairs) - return ob - - @pytest.fixture async def ap_sg(): params = GDABeamlineParameters.from_file(I03_BEAMLINE_PARAMETER_PATH) - positions = load_positions_from_beamline_parameters(params) # type:ignore - tolerances = load_tolerances_from_beamline_params(params) # type:ignore + positions = load_positions_from_beamline_parameters(params) + tolerances = AperturePosition.tolerances_from_gda_params(params) async with DeviceCollector(): ap_sg = ApertureScatterguard( @@ -79,25 +41,25 @@ async def ap_sg(): @pytest.fixture def move_to_large(ap_sg: ApertureScatterguard): assert ap_sg._loaded_positions is not None - yield from bps.abs_set(ap_sg, AperturePosition.LARGE) + yield from bps.abs_set(ap_sg, ApertureValue.LARGE) @pytest.fixture def move_to_medium(ap_sg: ApertureScatterguard): assert ap_sg._loaded_positions is not None - yield from bps.abs_set(ap_sg, AperturePosition.MEDIUM) + yield from bps.abs_set(ap_sg, ApertureValue.MEDIUM) @pytest.fixture def move_to_small(ap_sg: ApertureScatterguard): assert ap_sg._loaded_positions is not None - yield from bps.abs_set(ap_sg, AperturePosition.SMALL) + yield from bps.abs_set(ap_sg, ApertureValue.SMALL) @pytest.fixture def move_to_robotload(ap_sg: ApertureScatterguard): assert ap_sg._loaded_positions is not None - yield from bps.abs_set(ap_sg, AperturePosition.ROBOT_LOAD) + yield from bps.abs_set(ap_sg, ApertureValue.ROBOT_LOAD) @pytest.mark.s03 @@ -115,9 +77,9 @@ async def test_aperturescatterguard_move_in_plan( RE, ): assert ap_sg._loaded_positions is not None - large = ap_sg._loaded_positions[AperturePosition.LARGE] + large = ap_sg._loaded_positions[ApertureValue.LARGE] - await ap_sg._aperture.z.set(large.location[2]) + await ap_sg.aperture.z.set(large.aperture_z) RE(move_to_large) RE(move_to_medium) @@ -129,7 +91,7 @@ async def test_aperturescatterguard_move_in_plan( async def test_move_fails_when_not_in_good_starting_pos( ap_sg: ApertureScatterguard, move_to_large, RE ): - await ap_sg._aperture.z.set(0) + await ap_sg.aperture.z.set(0) with pytest.raises(InvalidApertureMove): RE(move_to_large) @@ -176,22 +138,22 @@ async def test_aperturescatterguard_moves_in_correct_order( cb = MonitorCallback() assert ap_sg._loaded_positions positions = { - "L": ap_sg._loaded_positions[AperturePosition.LARGE], - "M": ap_sg._loaded_positions[AperturePosition.MEDIUM], - "S": ap_sg._loaded_positions[AperturePosition.SMALL], - "R": ap_sg._loaded_positions[AperturePosition.ROBOT_LOAD], + "L": ap_sg._loaded_positions[ApertureValue.LARGE], + "M": ap_sg._loaded_positions[ApertureValue.MEDIUM], + "S": ap_sg._loaded_positions[ApertureValue.SMALL], + "R": ap_sg._loaded_positions[ApertureValue.ROBOT_LOAD], } pos1 = positions[pos_name_1] pos2 = positions[pos_name_2] RE = RunEngine({}) RE.subscribe(cb) - await ap_sg._aperture.z.set(pos1.location[2]) + await ap_sg.aperture.z.set(pos1.aperture_z) def monitor_and_moves(): yield from bps.open_run() - yield from bps.monitor(ap_sg._aperture.y.motor_done_move, name="ap_y") - yield from bps.monitor(ap_sg._scatterguard.y.motor_done_move, name="sg_y") + yield from bps.monitor(ap_sg.aperture.y.motor_done_move, name="ap_y") + yield from bps.monitor(ap_sg.scatterguard.y.motor_done_move, name="sg_y") yield from bps.mv(ap_sg, pos1) yield from bps.mv(ap_sg, pos2) yield from bps.close_run() diff --git a/tests/devices/unit_tests/test_aperture_scatterguard.py b/tests/devices/unit_tests/test_aperture_scatterguard.py index 26b614ecc6..3e51cbb415 100644 --- a/tests/devices/unit_tests/test_aperture_scatterguard.py +++ b/tests/devices/unit_tests/test_aperture_scatterguard.py @@ -1,6 +1,6 @@ from collections.abc import Sequence from contextlib import ExitStack -from dataclasses import asdict +from typing import Any from unittest.mock import ANY, MagicMock, call import bluesky.plan_stubs as bps @@ -12,16 +12,13 @@ set_mock_value, ) +from dodal.common.beamlines.beamline_parameters import GDABeamlineParameters from dodal.devices.aperturescatterguard import ( - ApertureFiveDimensionalLocation, AperturePosition, - AperturePositionGDANames, ApertureScatterguard, - ApertureScatterguardTolerances, + ApertureValue, InvalidApertureMove, - SingleAperturePosition, load_positions_from_beamline_parameters, - load_tolerances_from_beamline_params, ) from dodal.devices.util.test_utils import patch_motor @@ -29,61 +26,65 @@ @pytest.fixture -def aperture_positions() -> dict[AperturePosition, SingleAperturePosition]: +def aperture_positions() -> dict[ApertureValue, AperturePosition]: return load_positions_from_beamline_parameters( - { - "miniap_x_LARGE_APERTURE": 2.389, - "miniap_y_LARGE_APERTURE": 40.986, - "miniap_z_LARGE_APERTURE": 15.8, - "sg_x_LARGE_APERTURE": 5.25, - "sg_y_LARGE_APERTURE": 4.43, - "miniap_x_MEDIUM_APERTURE": 2.384, - "miniap_y_MEDIUM_APERTURE": 44.967, - "miniap_z_MEDIUM_APERTURE": 15.8, - "sg_x_MEDIUM_APERTURE": 5.285, - "sg_y_MEDIUM_APERTURE": 0.46, - "miniap_x_SMALL_APERTURE": 2.430, - "miniap_y_SMALL_APERTURE": 48.974, - "miniap_z_SMALL_APERTURE": 15.8, - "sg_x_SMALL_APERTURE": 5.3375, - "sg_y_SMALL_APERTURE": -3.55, - "miniap_x_ROBOT_LOAD": 2.386, - "miniap_y_ROBOT_LOAD": 31.40, - "miniap_z_ROBOT_LOAD": 15.8, - "sg_x_ROBOT_LOAD": 5.25, - "sg_y_ROBOT_LOAD": 4.43, - } # type:ignore + GDABeamlineParameters( + params={ + "miniap_x_LARGE_APERTURE": 2.389, + "miniap_y_LARGE_APERTURE": 40.986, + "miniap_z_LARGE_APERTURE": 15.8, + "sg_x_LARGE_APERTURE": 5.25, + "sg_y_LARGE_APERTURE": 4.43, + "miniap_x_MEDIUM_APERTURE": 2.384, + "miniap_y_MEDIUM_APERTURE": 44.967, + "miniap_z_MEDIUM_APERTURE": 15.8, + "sg_x_MEDIUM_APERTURE": 5.285, + "sg_y_MEDIUM_APERTURE": 0.46, + "miniap_x_SMALL_APERTURE": 2.430, + "miniap_y_SMALL_APERTURE": 48.974, + "miniap_z_SMALL_APERTURE": 15.8, + "sg_x_SMALL_APERTURE": 5.3375, + "sg_y_SMALL_APERTURE": -3.55, + "miniap_x_ROBOT_LOAD": 2.386, + "miniap_y_ROBOT_LOAD": 31.40, + "miniap_z_ROBOT_LOAD": 15.8, + "sg_x_ROBOT_LOAD": 5.25, + "sg_y_ROBOT_LOAD": 4.43, + } + ) ) @pytest.fixture def aperture_tolerances(): - return load_tolerances_from_beamline_params( - { - "miniap_x_tolerance": 0.004, - "miniap_y_tolerance": 0.1, - "miniap_z_tolerance": 0.1, - "sg_x_tolerance": 0.1, - "sg_y_tolerance": 0.1, - } # type:ignore + return AperturePosition.tolerances_from_gda_params( + GDABeamlineParameters( + { + "miniap_x_tolerance": 0.004, + "miniap_y_tolerance": 0.1, + "miniap_z_tolerance": 0.1, + "sg_x_tolerance": 0.1, + "sg_y_tolerance": 0.1, + } + ) ) def get_all_motors(ap_sg: ApertureScatterguard): return [ - ap_sg._aperture.x, - ap_sg._aperture.y, - ap_sg._aperture.z, - ap_sg._scatterguard.x, - ap_sg._scatterguard.y, + ap_sg.aperture.x, + ap_sg.aperture.y, + ap_sg.aperture.z, + ap_sg.scatterguard.x, + ap_sg.scatterguard.y, ] @pytest.fixture async def ap_sg_and_call_log( RE: RunEngine, - aperture_positions: dict[AperturePosition, SingleAperturePosition], - aperture_tolerances: ApertureScatterguardTolerances, + aperture_positions: dict[ApertureValue, AperturePosition], + aperture_tolerances: AperturePosition, ): call_log = MagicMock() async with DeviceCollector(mock=True): @@ -108,12 +109,12 @@ async def ap_sg(ap_sg_and_call_log: ApSgAndLog): @pytest.fixture async def aperture_in_medium_pos_w_call_log( ap_sg_and_call_log: ApSgAndLog, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): ap_sg, call_log = ap_sg_and_call_log - await ap_sg._set_raw_unsafe(aperture_positions[AperturePosition.MEDIUM].location) + await ap_sg._set_raw_unsafe(aperture_positions[ApertureValue.MEDIUM]) - set_mock_value(ap_sg._aperture.medium, 1) + set_mock_value(ap_sg.aperture.medium, 1) yield ap_sg, call_log @@ -125,13 +126,11 @@ async def aperture_in_medium_pos(aperture_in_medium_pos_w_call_log: ApSgAndLog): def _assert_patched_ap_sg_has_call( ap_sg: ApertureScatterguard, - position: ( - ApertureFiveDimensionalLocation | tuple[float, float, float, float, float] - ), + position: AperturePosition, ): for motor, pos in zip( get_all_motors(ap_sg), - position, + position.values, strict=False, ): get_mock_put(motor.user_setpoint).assert_called_with( @@ -139,6 +138,18 @@ def _assert_patched_ap_sg_has_call( ) +def _assert_position_in_reading( + reading: dict[str, Any], position: AperturePosition, device_name: str +): + if position.radius is not None: + assert reading[f"{device_name}-radius"]["value"] == position.radius + assert reading[f"{device_name}-aperture-x"]["value"] == position.aperture_x + assert reading[f"{device_name}-aperture-y"]["value"] == position.aperture_y + assert reading[f"{device_name}-aperture-z"]["value"] == position.aperture_z + assert reading[f"{device_name}-scatterguard-x"]["value"] == position.scatterguard_x + assert reading[f"{device_name}-scatterguard-y"]["value"] == position.scatterguard_y + + def _call_list(calls: Sequence[float]): return [call.setpoint(v, wait=True, timeout=ANY) for v in calls] @@ -148,7 +159,7 @@ async def test_aperture_scatterguard_select_bottom_moves_sg_down_then_assembly_u ): ap_sg, call_log = aperture_in_medium_pos_w_call_log - await ap_sg.set(AperturePosition.SMALL) + await ap_sg.set(ApertureValue.SMALL) call_log.assert_has_calls(_call_list((5.3375, -3.55, 2.43, 48.974, 15.8))) @@ -156,10 +167,17 @@ async def test_aperture_scatterguard_select_bottom_moves_sg_down_then_assembly_u async def test_aperture_unsafe_move( aperture_in_medium_pos: ApertureScatterguard, ): - (a, b, c, d, e) = (0.2, 3.4, 5.6, 7.8, 9.0) + pos = AperturePosition( + aperture_x=0.2, + aperture_y=3.4, + aperture_z=5.6, + scatterguard_x=7.8, + scatterguard_y=9.0, + radius=None, + ) ap_sg = aperture_in_medium_pos - await ap_sg._set_raw_unsafe((a, b, c, d, e)) # type: ignore - _assert_patched_ap_sg_has_call(ap_sg, (a, b, c, d, e)) + await ap_sg._set_raw_unsafe(pos) + _assert_patched_ap_sg_has_call(ap_sg, pos) async def test_aperture_scatterguard_select_top_moves_assembly_down_then_sg_up( @@ -167,112 +185,113 @@ async def test_aperture_scatterguard_select_top_moves_assembly_down_then_sg_up( ): ap_sg = aperture_in_medium_pos - await ap_sg.set(AperturePosition.LARGE) - - _assert_patched_ap_sg_has_call(ap_sg, (2.389, 40.986, 15.8, 5.25, 4.43)) + await ap_sg.set(ApertureValue.LARGE) + + _assert_patched_ap_sg_has_call( + ap_sg, + AperturePosition( + aperture_x=2.389, + aperture_y=40.986, + aperture_z=15.8, + scatterguard_x=5.25, + scatterguard_y=4.43, + radius=100, + ), + ) async def test_aperture_scatterguard_throws_error_if_outside_tolerance( ap_sg: ApertureScatterguard, ): - set_mock_value(ap_sg._aperture.z.deadband, 0.001) - set_mock_value(ap_sg._aperture.z.user_readback, 1) - set_mock_value(ap_sg._aperture.z.motor_done_move, 1) + set_mock_value(ap_sg.aperture.z.deadband, 0.001) + set_mock_value(ap_sg.aperture.z.user_readback, 1) + set_mock_value(ap_sg.aperture.z.motor_done_move, 1) with pytest.raises(InvalidApertureMove): - pos = ApertureFiveDimensionalLocation(0, 0, 1.1, 0, 0) - await ap_sg._safe_move_within_datacollection_range(pos) + pos = AperturePosition( + aperture_x=0, + aperture_y=0, + aperture_z=1.1, + scatterguard_x=0, + scatterguard_y=0, + ) + await ap_sg._safe_move_within_datacollection_range(pos, ApertureValue.LARGE) async def test_aperture_scatterguard_returns_status_if_within_tolerance( ap_sg: ApertureScatterguard, ): - set_mock_value(ap_sg._aperture.z.deadband, 0.001) - set_mock_value(ap_sg._aperture.z.user_readback, 1) - set_mock_value(ap_sg._aperture.z.motor_done_move, 1) + set_mock_value(ap_sg.aperture.z.deadband, 0.001) + set_mock_value(ap_sg.aperture.z.user_readback, 1) + set_mock_value(ap_sg.aperture.z.motor_done_move, 1) - pos = ApertureFiveDimensionalLocation(0, 0, 1, 0, 0) - await ap_sg._safe_move_within_datacollection_range(pos) + pos = AperturePosition( + aperture_x=0, aperture_y=0, aperture_z=1, scatterguard_x=0, scatterguard_y=0 + ) + await ap_sg._safe_move_within_datacollection_range(pos, ApertureValue.LARGE) -def set_underlying_motors( - ap_sg: ApertureScatterguard, position: ApertureFiveDimensionalLocation -): +def set_underlying_motors(ap_sg: ApertureScatterguard, position: AperturePosition): for motor, pos in zip( get_all_motors(ap_sg), - position, + position.values, strict=False, ): motor.set(pos) -async def test_aperture_positions_large( - ap_sg: ApertureScatterguard, aperture_positions -): - set_mock_value(ap_sg._aperture.large, 1) - assert ( - await ap_sg.get_current_aperture_position() - == aperture_positions[AperturePosition.LARGE] - ) +async def test_aperture_positions_large(ap_sg: ApertureScatterguard): + set_mock_value(ap_sg.aperture.large, 1) + assert await ap_sg.get_current_aperture_position() == ApertureValue.LARGE -async def test_aperture_positions_medium( - ap_sg: ApertureScatterguard, aperture_positions -): - set_mock_value(ap_sg._aperture.medium, 1) - assert ( - await ap_sg.get_current_aperture_position() - == aperture_positions[AperturePosition.MEDIUM] - ) +async def test_aperture_positions_medium(ap_sg: ApertureScatterguard): + set_mock_value(ap_sg.aperture.medium, 1) + assert await ap_sg.get_current_aperture_position() == ApertureValue.MEDIUM -async def test_aperture_positions_small( - ap_sg: ApertureScatterguard, aperture_positions -): - set_mock_value(ap_sg._aperture.small, 1) - assert ( - await ap_sg.get_current_aperture_position() - == aperture_positions[AperturePosition.SMALL] - ) +async def test_aperture_positions_small(ap_sg: ApertureScatterguard): + set_mock_value(ap_sg.aperture.small, 1) + assert await ap_sg.get_current_aperture_position() == ApertureValue.SMALL async def test_aperture_positions_robot_load( ap_sg: ApertureScatterguard, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): - set_mock_value(ap_sg._aperture.large, 0) - set_mock_value(ap_sg._aperture.medium, 0) - set_mock_value(ap_sg._aperture.small, 0) - robot_load = aperture_positions[AperturePosition.ROBOT_LOAD] - await ap_sg._aperture.y.set(robot_load.location.aperture_y) - assert await ap_sg.get_current_aperture_position() == robot_load + set_mock_value(ap_sg.aperture.large, 0) + set_mock_value(ap_sg.aperture.medium, 0) + set_mock_value(ap_sg.aperture.small, 0) + robot_load = aperture_positions[ApertureValue.ROBOT_LOAD] + await ap_sg.aperture.y.set(robot_load.aperture_y) + assert await ap_sg.get_current_aperture_position() == ApertureValue.ROBOT_LOAD async def test_aperture_positions_robot_load_within_tolerance( ap_sg: ApertureScatterguard, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): - robot_load = aperture_positions[AperturePosition.ROBOT_LOAD] - robot_load_ap_y = robot_load.location.aperture_y - tolerance = ap_sg._tolerances.ap_y - set_mock_value(ap_sg._aperture.large, 0) - set_mock_value(ap_sg._aperture.medium, 0) - set_mock_value(ap_sg._aperture.small, 0) - await ap_sg._aperture.y.set(robot_load_ap_y + tolerance) - assert await ap_sg.get_current_aperture_position() == robot_load + robot_load = aperture_positions[ApertureValue.ROBOT_LOAD] + robot_load_ap_y = robot_load.aperture_y + tolerance = ap_sg._tolerances.aperture_y + set_mock_value(ap_sg.aperture.large, 0) + set_mock_value(ap_sg.aperture.medium, 0) + set_mock_value(ap_sg.aperture.small, 0) + await ap_sg.aperture.y.set(robot_load_ap_y + tolerance) + assert await ap_sg.get_current_aperture_position() == ApertureValue.ROBOT_LOAD async def test_aperture_positions_robot_load_outside_tolerance( ap_sg: ApertureScatterguard, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): - robot_load = aperture_positions[AperturePosition.ROBOT_LOAD] - robot_load_ap_y = robot_load.location.aperture_y - tolerance = ap_sg._tolerances.ap_y + 0.01 - set_mock_value(ap_sg._aperture.large, 0) - set_mock_value(ap_sg._aperture.medium, 0) - set_mock_value(ap_sg._aperture.small, 0) - await ap_sg._aperture.y.set(robot_load_ap_y + tolerance) + robot_load = aperture_positions[ApertureValue.ROBOT_LOAD] + robot_load_ap_y = robot_load.aperture_y + tolerance = ap_sg._tolerances.aperture_y + 0.01 + set_mock_value(ap_sg.aperture.large, 0) + set_mock_value(ap_sg.aperture.medium, 0) + set_mock_value(ap_sg.aperture.small, 0) + await ap_sg.aperture.y.set(robot_load_ap_y + tolerance) with pytest.raises(InvalidApertureMove): await ap_sg.get_current_aperture_position() @@ -280,46 +299,49 @@ async def test_aperture_positions_robot_load_outside_tolerance( async def test_aperture_positions_robot_load_unsafe( ap_sg: ApertureScatterguard, ): - set_mock_value(ap_sg._aperture.large, 0) - set_mock_value(ap_sg._aperture.medium, 0) - set_mock_value(ap_sg._aperture.small, 0) - await ap_sg._aperture.y.set(50.0) + set_mock_value(ap_sg.aperture.large, 0) + set_mock_value(ap_sg.aperture.medium, 0) + set_mock_value(ap_sg.aperture.small, 0) + await ap_sg.aperture.y.set(50.0) with pytest.raises(InvalidApertureMove): await ap_sg.get_current_aperture_position() async def test_given_aperture_not_set_through_device_but_motors_in_position_when_device_read_then_position_returned( aperture_in_medium_pos: ApertureScatterguard, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): - selected_aperture = await aperture_in_medium_pos.read() - assert isinstance(selected_aperture, dict) - assert selected_aperture["test_ap_sg-selected_aperture"]["value"] == asdict( - aperture_positions[AperturePosition.MEDIUM] + reading = await aperture_in_medium_pos.read() + assert isinstance(reading, dict) + _assert_position_in_reading( + reading, + aperture_positions[ApertureValue.MEDIUM], + aperture_in_medium_pos.name, ) async def test_when_aperture_set_and_device_read_then_position_returned( aperture_in_medium_pos: ApertureScatterguard, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): - await aperture_in_medium_pos.set(AperturePosition.MEDIUM) - selected_aperture = await aperture_in_medium_pos.read() - assert selected_aperture["test_ap_sg-selected_aperture"]["value"] == asdict( - aperture_positions[AperturePosition.MEDIUM] + await aperture_in_medium_pos.set(ApertureValue.MEDIUM) + reading = await aperture_in_medium_pos.read() + _assert_position_in_reading( + reading, + aperture_positions[ApertureValue.MEDIUM], + aperture_in_medium_pos.name, ) async def test_ap_sg_in_runengine( aperture_in_medium_pos: ApertureScatterguard, RE: RunEngine, - aperture_positions: dict[AperturePosition, SingleAperturePosition], + aperture_positions: dict[ApertureValue, AperturePosition], ): - ap_sg = aperture_in_medium_pos - ap = ap_sg._aperture - sg = ap_sg._scatterguard - test_loc = aperture_positions[AperturePosition.SMALL].location - RE(bps.abs_set(ap_sg, AperturePosition.SMALL, wait=True)) + ap = aperture_in_medium_pos.aperture + sg = aperture_in_medium_pos.scatterguard + test_loc = aperture_positions[ApertureValue.SMALL] + RE(bps.abs_set(aperture_in_medium_pos, ApertureValue.SMALL, wait=True)) assert await ap.x.user_readback.get_value() == test_loc.aperture_x assert await ap.y.user_readback.get_value() == test_loc.aperture_y assert await ap.z.user_readback.get_value() == test_loc.aperture_z @@ -338,32 +360,22 @@ def test_get_position_from_gda_aperture_name( ap_sg: ApertureScatterguard, ): assert ( - ap_sg.get_position_from_gda_aperture_name( - AperturePositionGDANames.LARGE_APERTURE - ) - == AperturePosition.LARGE + ap_sg.get_position_from_gda_aperture_name(ApertureValue.LARGE.value) + == ApertureValue.LARGE ) assert ( - ap_sg.get_position_from_gda_aperture_name( - AperturePositionGDANames.MEDIUM_APERTURE - ) - == AperturePosition.MEDIUM + ap_sg.get_position_from_gda_aperture_name(ApertureValue.MEDIUM.value) + == ApertureValue.MEDIUM ) assert ( - ap_sg.get_position_from_gda_aperture_name( - AperturePositionGDANames.SMALL_APERTURE - ) - == AperturePosition.SMALL + ap_sg.get_position_from_gda_aperture_name(ApertureValue.SMALL.value) + == ApertureValue.SMALL ) assert ( - ap_sg.get_position_from_gda_aperture_name(AperturePositionGDANames.ROBOT_LOAD) - == AperturePosition.ROBOT_LOAD + ap_sg.get_position_from_gda_aperture_name(ApertureValue.ROBOT_LOAD.value) + == ApertureValue.ROBOT_LOAD ) with pytest.raises(ValueError): ap_sg.get_position_from_gda_aperture_name( "VERY TINY APERTURE" # type: ignore ) - - -def test_ap_sg_returns_GDA_name_correctly(ap_sg: ApertureScatterguard): - assert ap_sg.get_gda_name_for_position(AperturePosition.SMALL) == "SMALL_APERTURE" diff --git a/tests/devices/unit_tests/util/test_beamline_specific_utils.py b/tests/devices/unit_tests/util/test_beamline_specific_utils.py index bbf0cac43b..13388734c6 100644 --- a/tests/devices/unit_tests/util/test_beamline_specific_utils.py +++ b/tests/devices/unit_tests/util/test_beamline_specific_utils.py @@ -4,10 +4,6 @@ I03_BEAM_HEIGHT_UM, beam_size_from_aperture, ) -from dodal.devices.aperturescatterguard import ( - ApertureFiveDimensionalLocation, - SingleAperturePosition, -) RADII_AND_SIZES = [ (None, (None, None)), @@ -17,14 +13,10 @@ ] -def get_single_ap(radius): - return SingleAperturePosition( - "", "", radius, ApertureFiveDimensionalLocation(0, 0, 0, 0, 0) - ) - - @pytest.mark.parametrize(["aperture_radius", "beam_size"], RADII_AND_SIZES) def test_beam_size_from_aperture(aperture_radius, beam_size): - beamsize = beam_size_from_aperture(get_single_ap(aperture_radius)) + beamsize = beam_size_from_aperture( + aperture_radius, + ) assert beamsize.x_um == beam_size[0] assert beamsize.y_um == beam_size[1]