Skip to content

Commit

Permalink
Add saving capabilities to the base IOC
Browse files Browse the repository at this point in the history
  • Loading branch information
mrakitin committed Feb 14, 2024
1 parent 09a08ca commit eb36115
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 7 deletions.
64 changes: 57 additions & 7 deletions src/srx_caproto_iocs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@

import textwrap
import threading
import time as ttime
import uuid
from enum import Enum
from enum import Enum, auto
from pathlib import Path

import numpy as np
from caproto import ChannelType
from caproto.ioc_examples.mini_beamline import no_reentry
from caproto.server import PVGroup, pvproperty, run, template_arg_parser
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO

from .utils import now
from .utils import now, save_hdf5


class AcqStatuses(Enum):
"""Enum class for acquisition statuses."""

IDLE = "Done"
ACQUIRING = "Count"
IDLE = auto()
ACQUIRING = auto()


class StageStates(Enum):
Expand Down Expand Up @@ -60,11 +63,22 @@ class CaprotoSaveIOC(PVGroup):
value=StageStates.UNSTAGED.value,
enum_strings=[x.value for x in StageStates],
dtype=ChannelType.ENUM,
doc="Stage/unstage the detector",
doc="Stage/unstage the device",
)

def __init__(self, *args, **kwargs):
acquire = pvproperty(
value=AcqStatuses.IDLE.value,
enum_strings=[x.value for x in AcqStatuses],
dtype=ChannelType.ENUM,
doc="Acquire signal to save a dataset.",
)

def __init__(self, *args, update_rate=10.0, **kwargs):
super().__init__(*args, **kwargs)

self._update_rate = update_rate
self._update_period = 1.0 / update_rate

self._request_queue = None
self._response_queue = None

Expand Down Expand Up @@ -128,6 +142,42 @@ async def stage(self, instance, value):

return False

def _get_current_dataset(self):
return np.random.random((10, 20))

@acquire.putter
@no_reentry
async def acquire(self, instance, value):
"""The acquire method to perform an individual acquisition of a data point."""
if value != AcqStatuses.ACQUIRING.value:
return False

if (
instance.value in [True, AcqStatuses.ACQUIRING.value]
and value == AcqStatuses.ACQUIRING.value
):
print(
f"The device is already acquiring. Please wait until the '{AcqStatuses.IDLE.value}' status."
)
return True

# Delegate saving the resulting data to a blocking callback in a thread.
payload = {
"filename": self.full_file_path.value,
"data": self._get_current_dataset(),
"uid": str(uuid.uuid4()),
"timestamp": ttime.time(),
}

await self._request_queue.async_put(payload)
response = await self._response_queue.async_get()

if response["success"]:
# Increment the counter only on a successful saving of the file.
await self.frame_num.write(self.frame_num.value + 1)

return False

@staticmethod
def saver(request_queue, response_queue):
"""The saver callback for threading-based queueing."""
Expand All @@ -136,7 +186,7 @@ def saver(request_queue, response_queue):
filename = received["filename"]
data = received["data"]
try:
# save_hdf5(fname=filename, data=data)
save_hdf5(fname=filename, data=data)
print(f"{now()}: saved {data.shape} data into:\n {filename}")

success = True
Expand Down
36 changes: 36 additions & 0 deletions src/srx_caproto_iocs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,46 @@

import datetime

import h5py
import numpy as np


def now(as_object=False):
"""A helper function to return ISO 8601 formatted datetime string."""
_now = datetime.datetime.now()
if as_object:
return _now
return _now.isoformat()


def save_hdf5(
fname,
data,
group_name="/entry",
group_path="data/data",
dtype="float32",
mode="x",
update_existing=False,
):
"""The function to export the data to an HDF5 file."""
h5file_desc = h5py.File(fname, mode, libver="latest")
frame_shape = data.shape
if not update_existing:
group = h5file_desc.create_group(group_name)
dataset = group.create_dataset(
"data/data",
data=np.full(fill_value=np.nan, shape=(1, *frame_shape)),
maxshape=(None, *frame_shape),
chunks=(1, *frame_shape),
dtype=dtype,
)
frame_num = 0
else:
dataset = h5file_desc[f"{group_name}/{group_path}"]
frame_num = dataset.shape[0]

h5file_desc.swmr_mode = True

dataset.resize((frame_num + 1, *frame_shape))
dataset[frame_num, :, :] = data
dataset.flush()

0 comments on commit eb36115

Please sign in to comment.