Skip to content

Commit

Permalink
Working saving example of multiple frames with ophyd
Browse files Browse the repository at this point in the history
  • Loading branch information
mrakitin committed Feb 15, 2024
1 parent d78df6f commit f18d187
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 41 deletions.
66 changes: 59 additions & 7 deletions src/srx_caproto_iocs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from enum import Enum
from pathlib import Path

import numpy as np
import skimage.data
from caproto import ChannelType
from caproto.ioc_examples.mini_beamline import no_reentry
from caproto.server import PVGroup, pvproperty, run, template_arg_parser
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd.status import SubscriptionStatus

from .utils import now, save_hdf5

Expand Down Expand Up @@ -142,14 +143,18 @@ async def stage(self, instance, value):

return False

def _get_current_dataset(self):
return np.random.random((10, 20))
def _get_current_dataset(self, frame):
dataset = skimage.data.cells3d().sum(axis=1)
return dataset[frame, ...]

@acquire.putter
@no_reentry
async def acquire(self, instance, value):
"""The acquire method to perform an individual acquisition of a data point."""
if value != AcqStatuses.ACQUIRING.value:
if (
value != AcqStatuses.ACQUIRING.value
# or self.stage.value not in [True, StageStates.STAGED.value]
):
return False

if (
Expand All @@ -161,12 +166,16 @@ async def acquire(self, instance, value):
)
return True

await self.acquire.write(AcqStatuses.ACQUIRING.value)

# Delegate saving the resulting data to a blocking callback in a thread.
payload = {
"filename": self.full_file_path.value,
"data": self._get_current_dataset(),
"data": self._get_current_dataset(frame=self.frame_num.value),
"uid": str(uuid.uuid4()),
"timestamp": ttime.time(),
"frame_number": self.frame_num.value,
"update_existing": self.frame_num.value > 0,
}

await self._request_queue.async_put(payload)
Expand All @@ -176,6 +185,8 @@ async def acquire(self, instance, value):
# Increment the counter only on a successful saving of the file.
await self.frame_num.write(self.frame_num.value + 1)

# await self.acquire.write(AcqStatuses.IDLE.value)

return False

@staticmethod
Expand All @@ -185,9 +196,15 @@ def saver(request_queue, response_queue):
received = request_queue.get()
filename = received["filename"]
data = received["data"]
frame_number = received["frame_number"]
update_existing = received["update_existing"]
try:
save_hdf5(fname=filename, data=data)
print(f"{now()}: saved {data.shape} data into:\n {filename}")
save_hdf5(
fname=filename, data=data, mode="a", update_existing=update_existing
)
print(
f"{now()}: saved {frame_number=} {data.shape} data into:\n {filename}"
)

success = True
error_message = ""
Expand All @@ -213,6 +230,41 @@ class OphydDeviceWithCaprotoIOC(Device):
full_file_path = Cpt(EpicsSignalRO, "full_file_path", string=True)
frame_num = Cpt(EpicsSignal, "frame_num")
ioc_stage = Cpt(EpicsSignal, "stage", string=True)
acquire = Cpt(EpicsSignal, "acquire", string=True)

def set(self, command):
"""The set method with values for staging and acquiring."""

print(f"{now()}: {command = }")
if command in [StageStates.STAGED.value, "stage"]:
expected_old_value = StageStates.UNSTAGED.value
expected_new_value = StageStates.STAGED.value
obj = self.ioc_stage
cmd = StageStates.STAGED.value

if command in [StageStates.UNSTAGED.value, "unstage"]:
expected_old_value = StageStates.STAGED.value
expected_new_value = StageStates.UNSTAGED.value
obj = self.ioc_stage
cmd = StageStates.UNSTAGED.value

if command in [AcqStatuses.ACQUIRING.value, "acquire"]:
expected_old_value = AcqStatuses.ACQUIRING.value
expected_new_value = AcqStatuses.IDLE.value
obj = self.acquire
cmd = AcqStatuses.ACQUIRING.value

def cb(value, old_value, **kwargs):
# pylint: disable=unused-argument
print(f"{now()}: {old_value} -> {value}")
if value == expected_new_value and old_value == expected_old_value:
return True
return False

st = SubscriptionStatus(obj, callback=cb, run=False)
print(f"{now()}: {cmd = }")
obj.put(cmd)
return st


def check_args(parser_, split_args_):
Expand Down
54 changes: 32 additions & 22 deletions src/srx_caproto_iocs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,35 @@ def save_hdf5(
mode="x",
update_existing=False,
):
"""The function to export the data to an HDF5 file."""
h5file_desc = h5py.File(fname, mode, libver="latest")
frame_shape = data.shape
if not update_existing:
group = h5file_desc.create_group(group_name)
dataset = group.create_dataset(
"data/data",
data=np.full(fill_value=np.nan, shape=(1, *frame_shape)),
maxshape=(None, *frame_shape),
chunks=(1, *frame_shape),
dtype=dtype,
)
frame_num = 0
else:
dataset = h5file_desc[f"{group_name}/{group_path}"]
frame_num = dataset.shape[0]

h5file_desc.swmr_mode = True

dataset.resize((frame_num + 1, *frame_shape))
dataset[frame_num, :, :] = data
dataset.flush()
"""The function to export the data to an HDF5 file.
Check https://docs.h5py.org/en/stable/high/file.html#opening-creating-files for modes:
r Readonly, file must exist (default)
r+ Read/write, file must exist
w Create file, truncate if exists
w- or x Create file, fail if exists
a Read/write if exists, create otherwise
"""
with h5py.File(fname, mode, libver="latest") as h5file_desc:
frame_shape = data.shape
if not update_existing:
group = h5file_desc.create_group(group_name)
dataset = group.create_dataset(
group_path,
data=np.full(fill_value=np.nan, shape=(1, *frame_shape)),
maxshape=(None, *frame_shape),
chunks=(1, *frame_shape),
dtype=dtype,
)
frame_num = 0
else:
dataset = h5file_desc[f"{group_name}/{group_path}"]
frame_num = dataset.shape[0]

# https://docs.h5py.org/en/stable/swmr.html
h5file_desc.swmr_mode = True

dataset.resize((frame_num + 1, *frame_shape))
dataset[frame_num, :, :] = data
dataset.flush()
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def base_ophyd_device():


@pytest.fixture(scope="session")
def base_caproto_ioc(wait=3):
def base_caproto_ioc(wait=5):
first_three = ".".join(socket.gethostbyname(socket.gethostname()).split(".")[:3])
broadcast = f"{first_three}.255"

Expand Down
33 changes: 22 additions & 11 deletions tests/test_base_ophyd.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from __future__ import annotations

import tempfile
import time as ttime
from pathlib import Path

import h5py
import pytest
from ophyd.status import SubscriptionStatus

from srx_caproto_iocs.utils import now


@pytest.mark.cloud_friendly()
@pytest.mark.parametrize("date_template", ["%Y/%m/", "%Y/%m/%d", "mydir/%Y/%m/%d"])
def test_base_ophyd_templates(base_caproto_ioc, base_ophyd_device, date_template):
# @pytest.mark.parametrize("date_template", ["%Y/%m/", "%Y/%m/%d", "mydir/%Y/%m/%d"])
@pytest.mark.parametrize("date_template", ["%Y/%m/"])
def test_base_ophyd_templates(
base_caproto_ioc, base_ophyd_device, date_template, num_frames=50
):
with tempfile.TemporaryDirectory(prefix="/tmp/") as tmpdirname:
date = now(as_object=True)
write_dir_root = Path(tmpdirname)
Expand All @@ -25,16 +29,23 @@ def test_base_ophyd_templates(base_caproto_ioc, base_ophyd_device, date_template
dev.write_dir.put(dir_template)
dev.file_name.put(file_template)

def cb(value, old_value, **kwargs):
if value == "staged" and old_value == "unstaged":
return True
return False

st = SubscriptionStatus(dev.ioc_stage, callback=cb, run=False)
dev.ioc_stage.put("staged")
st.wait()
dev.set("stage").wait()

full_file_path = dev.full_file_path.get()
print(f"{full_file_path = }")

for i in range(num_frames):
print(f"Collecting frame {i}...")
dev.set("acquire").wait()
ttime.sleep(0.1)

dev.set("unstage").wait()

assert full_file_path, "The returned 'full_file_path' did not change."
assert Path(full_file_path).is_file(), f"No such file '{full_file_path}'"

with h5py.File(full_file_path, "r", swmr=True) as f:
dataset = f["/entry/data/data"]
assert dataset.shape == (num_frames, 256, 256)

ttime.sleep(1.0)

0 comments on commit f18d187

Please sign in to comment.