diff --git a/strax/context.py b/strax/context.py index 5dfa502d..3fe7c475 100644 --- a/strax/context.py +++ b/strax/context.py @@ -199,7 +199,7 @@ def __init__( applied to plugins :param register: plugin class or list of plugin classes to register :param register_all: module for which all plugin classes defined in it - will be registered. + will be registered. :param processors: A mapping of processor names to classes to use for data processing. Any additional kwargs are considered Context-specific options; see @@ -2260,15 +2260,32 @@ def _check_copy_to_frontend_kwargs( """Simple kwargs checks for copy_to_frontend.""" if not self.is_stored(run_id, target): raise strax.DataNotAvailable(f"Cannot copy {run_id} {target} since it does not exist") - if len(strax.to_str_tuple(target)) > 1: - raise ValueError("copy_to_frontend only works for a single target at the time") - if target_frontend_id is not None and target_frontend_id >= len(self.storage): - raise ValueError( - f"Cannot select {target_frontend_id}-th frontend as " - f"we only have {len(self.storage)} frontends!" - ) if rechunk and rechunk_to_mb == strax.DEFAULT_CHUNK_SIZE_MB: self.log.warning("No specified!") + # Reuse some codes + self._check_merge_per_chunk_storage_kwargs(run_id, target, target_frontend_id) + + def _get_target_sf(self, run_id, target, target_frontend_id): + """Get the target storage frontends for copy_to_frontend and merge_per_chunk_storage.""" + if target_frontend_id is None: + target_sf = self.storage + elif len(self.storage) > target_frontend_id: + target_sf = [self.storage[target_frontend_id]] + + # Keep frontends that: + # 1. don't already have the data; and + # 2. take the data; and + # 3. are not readonly + target_sf = [ + t_sf + for t_sf in target_sf + if ( + not self._is_stored_in_sf(run_id, target, t_sf) + and t_sf._we_take(target) + and t_sf.readonly is False + ) + ] + return target_sf def copy_to_frontend( self, @@ -2295,27 +2312,12 @@ def copy_to_frontend( self._check_copy_to_frontend_kwargs( run_id, target, target_frontend_id, rechunk, rechunk_to_mb ) - if target_frontend_id is None: - target_sf = self.storage - elif len(self.storage) > target_frontend_id: - target_sf = [self.storage[target_frontend_id]] # Figure out which of the frontends has the data. Raise error when none source_sf = self.get_source_sf(run_id, target, should_exist=True)[0] - # Keep frontends that: - # 1. don't already have the data; and - # 2. take the data; and - # 3. are not readonly - target_sf = [ - t_sf - for t_sf in target_sf - if ( - not self._is_stored_in_sf(run_id, target, t_sf) - and t_sf._we_take(target) - and t_sf.readonly is False - ) - ] + # Get the target storage frontends + target_sf = self._get_target_sf(run_id, target, target_frontend_id) self.log.info(f"Copy data from {source_sf} to {target_sf}") if not len(target_sf): @@ -2372,6 +2374,15 @@ def wrapped_loader(): "do you have two storage frontends writing to the same place?" ) + def _check_merge_per_chunk_storage_kwargs(self, run_id, target, target_frontend_id) -> None: + if len(strax.to_str_tuple(target)) > 1: + raise ValueError("copy_to_frontend only works for a single target at the time") + if target_frontend_id is not None and target_frontend_id >= len(self.storage): + raise ValueError( + f"Cannot select {target_frontend_id}-th frontend as " + f"we only have {len(self.storage)} frontends!" + ) + def merge_per_chunk_storage( self, run_id: str, @@ -2379,12 +2390,15 @@ def merge_per_chunk_storage( per_chunked_dependency: str, rechunk=True, chunk_number_group: ty.Optional[ty.List[ty.List[int]]] = None, + target_frontend_id: ty.Optional[int] = None, ): """Merge the per-chunked data from the per-chunked dependency into the target storage.""" if self.is_stored(run_id, target): raise ValueError(f"Data {target} for {run_id} already exists.") + self._check_merge_per_chunk_storage_kwargs(run_id, target, target_frontend_id) + if chunk_number_group is not None: combined_chunk_numbers = list(itertools.chain(*chunk_number_group)) if len(combined_chunk_numbers) != len(set(combined_chunk_numbers)): @@ -2404,17 +2418,20 @@ def merge_per_chunk_storage( # Usually we want to save in the same storage frontend # Here we assume that the target is stored chunk by chunk of the dependency - target_sf = source_sf = self.get_source_sf( + source_sf = self.get_source_sf( run_id, target, chunk_number={per_chunked_dependency: chunk_number}, should_exist=True, )[0] + # Get the target storage frontends + target_sf = self._get_target_sf(run_id, target, target_frontend_id) + def wrapped_loader(): """Wrapped loader for changing the target_size_mb.""" for chunk_number in chunk_number_group: - # Mostly copied from self.copy_to_frontend + # Mostly revised from self.copy_to_frontend # Get the info from the source backend (s_be) that we need to fill # the target backend (t_be) with data_key = self.key_for( @@ -2436,17 +2453,18 @@ def wrapped_loader(): except StopIteration: continue - # Fill the target buffer data_key = self.key_for(run_id, target, chunk_number=_chunk_number) - t_be_str, t_be_key = target_sf.find(data_key, write=True) - target_be = target_sf._get_backend(t_be_str) target_plugin = self.__get_plugin(run_id, target, chunk_number=_chunk_number) target_md = target_plugin.metadata(run_id, target) # Copied from StorageBackend.saver if "dtype" in target_md: target_md["dtype"] = target_md["dtype"].descr.__repr__() - saver = target_be._saver(t_be_key, target_md) - saver.save_from(wrapped_loader(), rechunk=rechunk) + for t_sf in target_sf: + # Fill the target buffer + t_be_str, t_be_key = t_sf.find(data_key, write=True) + target_be = t_sf._get_backend(t_be_str) + saver = target_be._saver(t_be_key, target_md) + saver.save_from(wrapped_loader(), rechunk=rechunk) def get_source( self, diff --git a/strax/io.py b/strax/io.py index 1e73fa6a..908e8fdd 100644 --- a/strax/io.py +++ b/strax/io.py @@ -11,6 +11,7 @@ from ast import literal_eval import strax +from strax import RUN_METADATA_PATTERN export, __all__ = strax.exporter() @@ -102,7 +103,7 @@ def _compress_blosc(data): @export def dry_load_files(dirname, chunk_number=None): prefix = strax.storage.files.dirname_to_prefix(dirname) - metadata_json = f"{prefix}-metadata.json" + metadata_json = RUN_METADATA_PATTERN % prefix md_path = os.path.join(dirname, metadata_json) with open(md_path, mode="r") as f: diff --git a/strax/storage/files.py b/strax/storage/files.py index 42853bf4..6b827534 100644 --- a/strax/storage/files.py +++ b/strax/storage/files.py @@ -10,6 +10,7 @@ from .common import StorageFrontend export, __all__ = strax.exporter() +__all__.extend(["RUN_METADATA_PATTERN"]) RUN_METADATA_PATTERN = "%s-metadata.json" @@ -230,7 +231,7 @@ def __init__( def _get_metadata(self, dirname): prefix = dirname_to_prefix(dirname) - metadata_json = f"{prefix}-metadata.json" + metadata_json = RUN_METADATA_PATTERN % prefix md_path = osp.join(dirname, metadata_json) if not osp.exists(md_path): @@ -300,7 +301,7 @@ def __init__(self, dirname, metadata, **kwargs): self.dirname = dirname self.tempdirname = dirname + "_temp" self.prefix = dirname_to_prefix(dirname) - self.metadata_json = f"{self.prefix}-metadata.json" + self.metadata_json = RUN_METADATA_PATTERN % self.prefix if os.path.exists(dirname): print(f"Removing data in {dirname} to overwrite") diff --git a/strax/storage/zipfiles.py b/strax/storage/zipfiles.py index 096db6b0..50260492 100644 --- a/strax/storage/zipfiles.py +++ b/strax/storage/zipfiles.py @@ -42,7 +42,7 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options): try: dirname = str(key) prefix = strax.dirname_to_prefix(dirname) - zp.getinfo(f"{dirname}/{prefix}-metadata.json") + zp.getinfo(f"{dirname}/{RUN_METADATA_PATTERN % prefix}") return bk except KeyError: pass @@ -111,7 +111,7 @@ def _get_metadata(self, zipn_and_dirn): zipn, dirn = zipn_and_dirn with zipfile.ZipFile(zipn) as zp: prefix = strax.dirname_to_prefix(dirn) - with zp.open(f"{dirn}/{prefix}-metadata.json") as f: + with zp.open(f"{dirn}/{RUN_METADATA_PATTERN % prefix}") as f: return json.loads(f.read()) def saver(self, *args, **kwargs): diff --git a/tests/test_core.py b/tests/test_core.py index 605275ab..ca3063eb 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -5,6 +5,7 @@ import os.path as osp import pytest +from strax import RUN_METADATA_PATTERN from strax.testutils import * processing_conditions = pytest.mark.parametrize( @@ -89,7 +90,10 @@ def test_filestore(allow_multiprocess, max_workers, processor): # The first dir contains peaks. # It should have one data chunk (rechunk is on) and a metadata file prefix = strax.dirname_to_prefix(data_dirs[0]) - assert sorted(os.listdir(data_dirs[0])) == [f"{prefix}-000000", f"{prefix}-metadata.json"] + assert sorted(os.listdir(data_dirs[0])) == [ + f"{prefix}-000000", + RUN_METADATA_PATTERN % prefix, + ] # Check metadata got written correctly. metadata = mystrax.get_metadata(run_id, "peaks") @@ -99,7 +103,7 @@ def test_filestore(allow_multiprocess, max_workers, processor): assert len(metadata["chunks"]) == 1 # Check data gets loaded from cache, not rebuilt - md_filename = osp.join(data_dirs[0], f"{prefix}-metadata.json") + md_filename = osp.join(data_dirs[0], RUN_METADATA_PATTERN % prefix) mtime_before = osp.getmtime(md_filename) peaks_2 = mystrax.get_array(run_id=run_id, targets="peaks") np.testing.assert_array_equal(peaks_1, peaks_2) diff --git a/tests/test_saving.py b/tests/test_saving.py index 4fd11784..39c255fd 100644 --- a/tests/test_saving.py +++ b/tests/test_saving.py @@ -4,6 +4,8 @@ import os import tempfile +from strax import RUN_METADATA_PATTERN + class TestPerRunDefaults(unittest.TestCase): """Test the saving behavior of the context.""" @@ -72,7 +74,7 @@ def test_raise_corruption(self): # copied from FileSytemBackend (maybe abstractify the method separately?) prefix = strax.dirname_to_prefix(data_path) - metadata_json = f"{prefix}-metadata.json" + metadata_json = RUN_METADATA_PATTERN % prefix md_path = os.path.join(data_path, metadata_json) assert os.path.exists(md_path) diff --git a/tests/test_storage.py b/tests/test_storage.py index 1d187512..29e6cdf2 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -187,7 +187,7 @@ def test_check_chunk_n(self): prefix = strax.storage.files.dirname_to_prefix(backend_key) md = st_new.get_metadata(self.run_id, self.target) md["chunks"][0]["n"] += 1 - md_path = os.path.join(backend_key, f"{prefix}-metadata.json") + md_path = os.path.join(backend_key, strax.RUN_METADATA_PATTERN % prefix) with open(md_path, "w") as file: json.dump(md, file, indent=4)