diff --git a/doc/user_guide/supported_formats/supported_formats.rst b/doc/user_guide/supported_formats/supported_formats.rst index 4e39b26b..b7dff940 100644 --- a/doc/user_guide/supported_formats/supported_formats.rst +++ b/doc/user_guide/supported_formats/supported_formats.rst @@ -56,7 +56,7 @@ +---------------------------------------------------------------------+-------------------------+--------+--------+--------+-------------+ | :ref:`Protochips logfile ` | csv & log | Yes | No | No | No | +---------------------------------------------------------------------+-------------------------+--------+--------+--------+-------------+ - | :ref:`Quantum Detector ` | mib | Yes | No | Yes | No | + | :ref:`Quantum Detector ` | mib | Yes | No | Yes | Yes | +---------------------------------------------------------------------+-------------------------+--------+--------+--------+-------------+ | :ref:`Renishaw ` | wdf | Yes | No | No | No | +---------------------------------------------------------------------+-------------------------+--------+--------+--------+-------------+ diff --git a/rsciio/quantumdetector/_api.py b/rsciio/quantumdetector/_api.py index 4f51c870..1f3c1097 100644 --- a/rsciio/quantumdetector/_api.py +++ b/rsciio/quantumdetector/_api.py @@ -29,12 +29,14 @@ from rsciio._docstrings import ( CHUNKS_READ_DOC, + DISTRIBUTED_DOC, FILENAME_DOC, LAZY_DOC, MMAP_DOC, NAVIGATION_SHAPE, RETURNS_DOC, ) +from rsciio.utils.distributed import memmap_distributed _logger = logging.getLogger(__name__) @@ -194,6 +196,7 @@ def load_mib_data( navigation_shape=None, first_frame=None, last_frame=None, + distributed=False, mib_prop=None, return_headers=False, print_info=False, @@ -210,6 +213,7 @@ def load_mib_data( %s %s %s + %s mib_prop : ``MIBProperties``, default=None The ``MIBProperties`` instance of the file. If None, it will be parsed from the file. @@ -302,15 +306,21 @@ def load_mib_data( # if it is read from TCPIP interface it needs to drop first 15 bytes which # describe the stream size. Also watch for the coma in front of the stream. if isinstance(mib_prop.path, str): - data = np.memmap( - mib_prop.path, - dtype=merlin_frame_dtype, + memmap_kwargs = dict( + filename=mib_prop.path, # take into account first_frame offset=mib_prop.offset + merlin_frame_dtype.itemsize * first_frame, # need to use np.prod(navigation_shape) to crop number line shape=np.prod(navigation_shape), - mode=mmap_mode, + dtype=merlin_frame_dtype, ) + if distributed: + data = memmap_distributed(chunks=chunks, key="data", **memmap_kwargs) + if not lazy: + data = data.compute() + # get_file_handle(data).close() + else: + data = np.memmap(mode=mmap_mode, **memmap_kwargs) elif isinstance(path, bytes): data = np.frombuffer( path, @@ -322,10 +332,11 @@ def load_mib_data( else: # pragma: no cover raise TypeError("`path` must be a str or a buffer.") - headers = data["header"] - data = data["data"] + if not distributed: + headers = data["header"] + data = data["data"] if not return_mmap: - if lazy: + if not distributed and lazy: if isinstance(chunks, tuple) and len(chunks) > 2: # Since the data is reshaped later on, we set only the # signal dimension chunks here @@ -344,6 +355,10 @@ def load_mib_data( data = data.rechunk(chunks) if return_headers: + if distributed: + raise ValueError( + "Retuning headers is not supported with `distributed=True`." + ) return data, headers else: return data @@ -356,6 +371,7 @@ def load_mib_data( MMAP_DOC, NAVIGATION_SHAPE, _FIRST_LAST_FRAME, + DISTRIBUTED_DOC, ) @@ -489,6 +505,7 @@ def file_reader( navigation_shape=None, first_frame=None, last_frame=None, + distributed=False, print_info=False, ): """ @@ -505,6 +522,7 @@ def file_reader( %s %s %s + %s print_info : bool Display information about the mib file. @@ -589,6 +607,7 @@ def file_reader( navigation_shape=navigation_shape, first_frame=first_frame, last_frame=last_frame, + distributed=distributed, mib_prop=mib_prop, print_info=print_info, return_mmap=False, @@ -653,5 +672,6 @@ def file_reader( MMAP_DOC, NAVIGATION_SHAPE, _FIRST_LAST_FRAME, + DISTRIBUTED_DOC, RETURNS_DOC, ) diff --git a/rsciio/tests/test_quantumdetector.py b/rsciio/tests/test_quantumdetector.py index c7bd18fa..4907bce7 100644 --- a/rsciio/tests/test_quantumdetector.py +++ b/rsciio/tests/test_quantumdetector.py @@ -394,3 +394,21 @@ def test_frames_in_acquisition_zero(): s = hs.load(f"{fname}.mib") assert s.axes_manager.navigation_shape == () + + +@pytest.mark.parametrize("lazy", (True, False)) +def test_distributed(lazy): + s = hs.load( + TEST_DATA_DIR_UNZIPPED / "001_4x2_6bit.mib", + distributed=False, + lazy=lazy, + ) + s2 = hs.load( + TEST_DATA_DIR_UNZIPPED / "001_4x2_6bit.mib", + distributed=True, + lazy=lazy, + ) + if lazy: + s.compute() + s2.compute() + np.testing.assert_array_equal(s.data, s2.data) diff --git a/rsciio/tests/utils/test_utils.py b/rsciio/tests/utils/test_utils.py index 97364db5..1c1cb904 100644 --- a/rsciio/tests/utils/test_utils.py +++ b/rsciio/tests/utils/test_utils.py @@ -384,7 +384,7 @@ def test_get_date_time_from_metadata(): @pytest.mark.parametrize( "shape", - ((10, 20, 30, 512, 512),(20, 30, 512, 512), (10, 512, 512), (512, 512)) + ((10, 20, 30, 512, 512), (20, 30, 512, 512), (10, 512, 512), (512, 512)) ) def test_get_chunk_slice(shape): chunk_arr, chunk = get_chunk_slice(shape=shape, chunks=-1) # 1 chunk diff --git a/rsciio/utils/distributed.py b/rsciio/utils/distributed.py index e5be58bd..f880a9fa 100644 --- a/rsciio/utils/distributed.py +++ b/rsciio/utils/distributed.py @@ -16,6 +16,7 @@ # You should have received a copy of the GNU General Public License # along with RosettaSciIO. If not, see . +import os import dask.array as da import numpy as np @@ -60,11 +61,7 @@ def get_chunk_slice( ) chunks_shape = tuple([len(c) for c in chunks]) slices = np.empty( - shape=chunks_shape - + ( - len(chunks_shape), - 2, - ), + shape=chunks_shape + (len(chunks_shape), 2), dtype=int, ) for ind in np.ndindex(chunks_shape): @@ -72,10 +69,11 @@ def get_chunk_slice( starts = [int(np.sum(chunk[:i])) for i, chunk in zip(ind, chunks)] stops = [s + c for s, c in zip(starts, current_chunk)] slices[ind] = [[start, stop] for start, stop in zip(starts, stops)] + return da.from_array(slices, chunks=(1,) * len(shape) + slices.shape[-2:]), chunks -def slice_memmap(slices, file, dtypes, shape, **kwargs): +def slice_memmap(slices, file, dtypes, shape, key=None, **kwargs): """ Slice a memory mapped file using a tuple of slices. @@ -96,6 +94,8 @@ def slice_memmap(slices, file, dtypes, shape, **kwargs): Data type of the data for :class:`numpy.memmap` function. shape : tuple Shape of the entire dataset. Passed to the :class:`numpy.memmap` function. + key : None, str + For structured dtype only. Specify the key of the structured dtype to use. **kwargs : dict Additional keyword arguments to pass to the :class:`numpy.memmap` function. @@ -104,31 +104,36 @@ def slice_memmap(slices, file, dtypes, shape, **kwargs): numpy.ndarray Array of the data from the memory mapped file sliced using the provided slice. """ - sl = np.squeeze(slices)[()] + slices_ = np.squeeze(slices)[()] data = np.memmap(file, dtypes, shape=shape, **kwargs) - slics = tuple([slice(s[0], s[1]) for s in sl]) - return data[slics] + if key is not None: + data = data[key] + slices_ = tuple([slice(s[0], s[1]) for s in slices_]) + return data[slices_] def memmap_distributed( - file, + filename, dtype, offset=0, shape=None, order="C", chunks="auto", block_size_limit=None, + key=None, ): """ - Drop in replacement for py:func:`numpy.memmap` allowing for distributed loading of data. + Drop in replacement for py:func:`numpy.memmap` allowing for distributed + loading of data. - This always loads the data using dask which can be beneficial in many cases, but - may not be ideal in others. The ``chunks`` and ``block_size_limit`` are for describing an ideal chunk shape and size - as defined using the :py:func:`dask.array.core.normalize_chunks` function. + This always loads the data using dask which can be beneficial in many + cases, but may not be ideal in others. The ``chunks`` and ``block_size_limit`` + are for describing an ideal chunk shape and size as defined using the + :func:`dask.array.core.normalize_chunks` function. Parameters ---------- - file : str + filename : str Path to the file. dtype : numpy.dtype Data type of the data for memmap function. @@ -142,25 +147,50 @@ def memmap_distributed( Chunk shape. The default is "auto". block_size_limit : int, optional Maximum size of a block in bytes. The default is None. + key : None, str + For structured dtype only. Specify the key of the structured dtype to use. Returns ------- dask.array.Array Dask array of the data from the memmaped file and with the specified chunks. + + Notes + ----- + Currently :func:`dask.array.map_blocks` does not allow for multiple outputs. + As a result, in case of structured dtype, the key of the structured dtype need + to be specified. + For example: with dtype = (("data", int, (128, 128)), ("sec", "