Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pressure jump fast adc based on area detector #745

Draft
wants to merge 2 commits into
base: 658-pressure-jump-cell
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/dodal/beamlines/p38.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dodal.devices.i22.fswitch import FSwitch
from dodal.devices.linkam3 import Linkam3
from dodal.devices.pressure_jump_cell import PressureJumpCell
from dodal.devices.areadetector import PressureJumpCellDetector
from dodal.devices.slits import Slits
from dodal.devices.tetramm import TetrammDetector
from dodal.devices.undulator import Undulator
Expand Down Expand Up @@ -333,3 +334,18 @@ def high_pressure_xray_cell(
cell_prefix="-HPXC-01:",
adc_prefix="-ADC",
)

def high_pressure_xray_cell_adc(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> PressureJumpCellDetector:
return device_instantiation(
PressureJumpCellDetector,
"high_pressure_xray_cell_adc",
"-EA-HPXC-01:",
wait_for_connection,
fake_with_ophyd_sim,
adc_suffix="TRIG:",
drv_suffix="DET:",
hdf_suffix="FILE:",
path_provider=get_path_provider(),
)
2 changes: 2 additions & 0 deletions src/dodal/devices/areadetector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from .adaravis import AdAravisDetector
from .adsim import AdSimDetector
from .adutils import Hdf5Writer, SynchronisedAdDriverBase
from .pressurejumpcell import PressureJumpCellDetector

__all__ = [
"AdSimDetector",
"SynchronisedAdDriverBase",
"Hdf5Writer",
"AdAravisDetector",
"PressureJumpCellDetector",
]
48 changes: 48 additions & 0 deletions src/dodal/devices/areadetector/pressurejumpcell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import get_args

from bluesky.protocols import HasHints, Hints

from ophyd_async.core import PathProvider, StandardDetector
from ophyd_async.epics import adcore

from .pressurejumpcell_controller import PressureJumpCellController
from .pressurejumpcell_io import PressureJumpCellDriverIO, PressureJumpCellAdcIO


class PressureJumpCellDetector(StandardDetector, HasHints):
"""
Ophyd-async implementation of a Pressure Jump Cell ADC Detector for fast pressure jumps.
The detector may be configured for an external trigger on the TTL Trig input.
"""

_controller: PressureJumpCellController
_writer: adcore.ADHDFWriter

def __init__(
self,
prefix: str,
path_provider: PathProvider,
drv_suffix="cam1:",
adc_suffix="TRIG",
hdf_suffix="HDF1:",
name="",
):
self.drv = PressureJumpCellDriverIO(prefix + drv_suffix)
self.adc = PressureJumpCellAdcIO(prefix + adc_suffix)
self.hdf = adcore.NDFileHDFIO(prefix + hdf_suffix)

super().__init__(
PressureJumpCellController(self.drv, self.adc),
adcore.ADHDFWriter(
self.hdf,
path_provider,
lambda: self.name,
adcore.ADBaseDatasetDescriber(self.drv),
),
config_sigs=(self.drv.acquire_time,),
name=name,
)

@property
def hints(self) -> Hints:
return self._writer.hints
82 changes: 82 additions & 0 deletions src/dodal/devices/areadetector/pressurejumpcell_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
from typing import Optional, Tuple

from ophyd_async.core import (
AsyncStatus,
DetectorControl,
DetectorTrigger,
set_and_wait_for_value,
)
from ophyd_async.epics import adcore

from .pressurejumpcell_io import (
PressureJumpCellDriverIO,
PressureJumpCellAdcIO,
PressureJumpCellTriggerMode,
PressureJumpCellAdcTriggerMode,
)


#TODO Find out what the deadtime should be and if it can be retrieved from the device
_HIGHEST_POSSIBLE_DEADTIME = 1e-3


class PressureJumpCellController(DetectorControl):

def __init__(self, driver: PressureJumpCellDriverIO, adc: PressureJumpCellAdcIO) -> None:
self._drv = driver
self._adc = adc

def get_deadtime(self, exposure: float | None) -> float:
return _HIGHEST_POSSIBLE_DEADTIME

async def arm(
self,
num: int = 0,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
if num == 0:
image_mode = adcore.ImageMode.continuous
else:
image_mode = adcore.ImageMode.multiple
if exposure is not None:
await self._drv.acquire_time.set(exposure)

trigger_mode, adc_trigger_mode = self._get_trigger_info(trigger)

# trigger mode must be set first and on it's own!
await self._drv.trigger_mode.set(trigger_mode)
await self._adc.adc_trigger_mode.set(adc_trigger_mode)

await asyncio.gather(
self._drv.num_images.set(num),
self._drv.image_mode.set(image_mode),
)

status = await set_and_wait_for_value(self._drv.acquire, True)
return status

def _get_trigger_info(
self, trigger: DetectorTrigger
) -> Tuple[PressureJumpCellTriggerMode, PressureJumpCellAdcTriggerMode]:
supported_trigger_types = (
DetectorTrigger.edge_trigger,
DetectorTrigger.internal,
)
if trigger not in supported_trigger_types:
raise ValueError(
f"{self.__class__.__name__} only supports the following trigger "
f"types: {supported_trigger_types} but was asked to "
f"use {trigger}"
)
if trigger == DetectorTrigger.internal:
return (PressureJumpCellTriggerMode.internal, PressureJumpCellAdcTriggerMode.continuous)
else:
return (PressureJumpCellTriggerMode.external, PressureJumpCellAdcTriggerMode.single)

async def disarm(self):
await asyncio.gather(
adcore.stop_busy_record(self._drv.acquire, False, timeout=1),
adcore.stop_busy_record(self._adc.acquire, False, timeout=1)
)
29 changes: 29 additions & 0 deletions src/dodal/devices/areadetector/pressurejumpcell_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from enum import Enum

from ophyd_async.core import SubsetEnum
from ophyd_async.epics import adcore
from ophyd_async.epics.signal import epics_signal_rw_rbv


class PressureJumpCellTriggerMode(str, Enum):
internal = "Internal"
external = "Exernal"

class PressureJumpCellAdcTriggerMode(str, Enum):
single = "Single"
multiple = "Multiple"
continuous = "Continuous"

class PressureJumpCellDriverIO(adcore.ADBaseIO):
def __init__(self, prefix: str, name: str = "") -> None:
self.trigger_mode = epics_signal_rw_rbv(
PressureJumpCellTriggerMode, prefix + "TriggerMode"
)
super().__init__(prefix, name=name)

class PressureJumpCellAdcIO(adcore.ADBaseIO):
def __init__(self, prefix: str, name: str = "") -> None:
self.adc_trigger_mode = epics_signal_rw_rbv(
PressureJumpCellAdcTriggerMode, prefix + "TriggerMode"
)
super().__init__(prefix, name=name)
Loading