Skip to content

Commit

Permalink
Select targeted software frontend in a clever way
Browse files Browse the repository at this point in the history
  • Loading branch information
dachengx committed Sep 24, 2024
1 parent 0cfe543 commit f556d63
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 41 deletions.
82 changes: 50 additions & 32 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def __init__(
applied to plugins
:param register: plugin class or list of plugin classes to register
:param register_all: module for which all plugin classes defined in it
will be registered.
will be registered.
:param processors: A mapping of processor names to classes to use for
data processing.
Any additional kwargs are considered Context-specific options; see
Expand Down Expand Up @@ -2260,15 +2260,32 @@ def _check_copy_to_frontend_kwargs(
"""Simple kwargs checks for copy_to_frontend."""
if not self.is_stored(run_id, target):
raise strax.DataNotAvailable(f"Cannot copy {run_id} {target} since it does not exist")
if len(strax.to_str_tuple(target)) > 1:
raise ValueError("copy_to_frontend only works for a single target at the time")
if target_frontend_id is not None and target_frontend_id >= len(self.storage):
raise ValueError(
f"Cannot select {target_frontend_id}-th frontend as "
f"we only have {len(self.storage)} frontends!"
)
if rechunk and rechunk_to_mb == strax.DEFAULT_CHUNK_SIZE_MB:
self.log.warning("No <rechunk_to_mb> specified!")
# Reuse some codes
self._check_merge_per_chunk_storage_kwargs(run_id, target, target_frontend_id)

def _get_target_sf(self, run_id, target, target_frontend_id):
"""Get the target storage frontends for copy_to_frontend and merge_per_chunk_storage."""
if target_frontend_id is None:
target_sf = self.storage
elif len(self.storage) > target_frontend_id:
target_sf = [self.storage[target_frontend_id]]

# Keep frontends that:
# 1. don't already have the data; and
# 2. take the data; and
# 3. are not readonly
target_sf = [
t_sf
for t_sf in target_sf
if (
not self._is_stored_in_sf(run_id, target, t_sf)
and t_sf._we_take(target)
and t_sf.readonly is False
)
]
return target_sf

def copy_to_frontend(
self,
Expand All @@ -2295,27 +2312,12 @@ def copy_to_frontend(
self._check_copy_to_frontend_kwargs(
run_id, target, target_frontend_id, rechunk, rechunk_to_mb
)
if target_frontend_id is None:
target_sf = self.storage
elif len(self.storage) > target_frontend_id:
target_sf = [self.storage[target_frontend_id]]

# Figure out which of the frontends has the data. Raise error when none
source_sf = self.get_source_sf(run_id, target, should_exist=True)[0]

# Keep frontends that:
# 1. don't already have the data; and
# 2. take the data; and
# 3. are not readonly
target_sf = [
t_sf
for t_sf in target_sf
if (
not self._is_stored_in_sf(run_id, target, t_sf)
and t_sf._we_take(target)
and t_sf.readonly is False
)
]
# Get the target storage frontends
target_sf = self._get_target_sf(run_id, target, target_frontend_id)
self.log.info(f"Copy data from {source_sf} to {target_sf}")

if not len(target_sf):
Expand Down Expand Up @@ -2372,19 +2374,31 @@ def wrapped_loader():
"do you have two storage frontends writing to the same place?"
)

def _check_merge_per_chunk_storage_kwargs(self, run_id, target, target_frontend_id) -> None:
if len(strax.to_str_tuple(target)) > 1:
raise ValueError("copy_to_frontend only works for a single target at the time")
if target_frontend_id is not None and target_frontend_id >= len(self.storage):
raise ValueError(
f"Cannot select {target_frontend_id}-th frontend as "
f"we only have {len(self.storage)} frontends!"
)

def merge_per_chunk_storage(
self,
run_id: str,
target: str,
per_chunked_dependency: str,
rechunk=True,
chunk_number_group: ty.Optional[ty.List[ty.List[int]]] = None,
target_frontend_id: ty.Optional[int] = None,
):
"""Merge the per-chunked data from the per-chunked dependency into the target storage."""

if self.is_stored(run_id, target):
raise ValueError(f"Data {target} for {run_id} already exists.")

self._check_merge_per_chunk_storage_kwargs(run_id, target, target_frontend_id)

if chunk_number_group is not None:
combined_chunk_numbers = list(itertools.chain(*chunk_number_group))
if len(combined_chunk_numbers) != len(set(combined_chunk_numbers)):
Expand All @@ -2404,17 +2418,20 @@ def merge_per_chunk_storage(

# Usually we want to save in the same storage frontend
# Here we assume that the target is stored chunk by chunk of the dependency
target_sf = source_sf = self.get_source_sf(
source_sf = self.get_source_sf(
run_id,
target,
chunk_number={per_chunked_dependency: chunk_number},
should_exist=True,
)[0]

# Get the target storage frontends
target_sf = self._get_target_sf(run_id, target, target_frontend_id)

def wrapped_loader():
"""Wrapped loader for changing the target_size_mb."""
for chunk_number in chunk_number_group:
# Mostly copied from self.copy_to_frontend
# Mostly revised from self.copy_to_frontend
# Get the info from the source backend (s_be) that we need to fill
# the target backend (t_be) with
data_key = self.key_for(
Expand All @@ -2436,17 +2453,18 @@ def wrapped_loader():
except StopIteration:
continue

# Fill the target buffer
data_key = self.key_for(run_id, target, chunk_number=_chunk_number)
t_be_str, t_be_key = target_sf.find(data_key, write=True)
target_be = target_sf._get_backend(t_be_str)
target_plugin = self.__get_plugin(run_id, target, chunk_number=_chunk_number)
target_md = target_plugin.metadata(run_id, target)
# Copied from StorageBackend.saver
if "dtype" in target_md:
target_md["dtype"] = target_md["dtype"].descr.__repr__()
saver = target_be._saver(t_be_key, target_md)
saver.save_from(wrapped_loader(), rechunk=rechunk)
for t_sf in target_sf:
# Fill the target buffer
t_be_str, t_be_key = t_sf.find(data_key, write=True)
target_be = t_sf._get_backend(t_be_str)
saver = target_be._saver(t_be_key, target_md)
saver.save_from(wrapped_loader(), rechunk=rechunk)

def get_source(
self,
Expand Down
3 changes: 2 additions & 1 deletion strax/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ast import literal_eval

import strax
from strax import RUN_METADATA_PATTERN

export, __all__ = strax.exporter()

Expand Down Expand Up @@ -102,7 +103,7 @@ def _compress_blosc(data):
@export
def dry_load_files(dirname, chunk_number=None):
prefix = strax.storage.files.dirname_to_prefix(dirname)
metadata_json = f"{prefix}-metadata.json"
metadata_json = RUN_METADATA_PATTERN % prefix
md_path = os.path.join(dirname, metadata_json)

with open(md_path, mode="r") as f:
Expand Down
5 changes: 3 additions & 2 deletions strax/storage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .common import StorageFrontend

export, __all__ = strax.exporter()
__all__.extend(["RUN_METADATA_PATTERN"])

RUN_METADATA_PATTERN = "%s-metadata.json"

Expand Down Expand Up @@ -230,7 +231,7 @@ def __init__(

def _get_metadata(self, dirname):
prefix = dirname_to_prefix(dirname)
metadata_json = f"{prefix}-metadata.json"
metadata_json = RUN_METADATA_PATTERN % prefix
md_path = osp.join(dirname, metadata_json)

if not osp.exists(md_path):
Expand Down Expand Up @@ -300,7 +301,7 @@ def __init__(self, dirname, metadata, **kwargs):
self.dirname = dirname
self.tempdirname = dirname + "_temp"
self.prefix = dirname_to_prefix(dirname)
self.metadata_json = f"{self.prefix}-metadata.json"
self.metadata_json = RUN_METADATA_PATTERN % self.prefix

if os.path.exists(dirname):
print(f"Removing data in {dirname} to overwrite")
Expand Down
4 changes: 2 additions & 2 deletions strax/storage/zipfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options):
try:
dirname = str(key)
prefix = strax.dirname_to_prefix(dirname)
zp.getinfo(f"{dirname}/{prefix}-metadata.json")
zp.getinfo(f"{dirname}/{RUN_METADATA_PATTERN % prefix}")
return bk
except KeyError:
pass
Expand Down Expand Up @@ -111,7 +111,7 @@ def _get_metadata(self, zipn_and_dirn):
zipn, dirn = zipn_and_dirn
with zipfile.ZipFile(zipn) as zp:
prefix = strax.dirname_to_prefix(dirn)
with zp.open(f"{dirn}/{prefix}-metadata.json") as f:
with zp.open(f"{dirn}/{RUN_METADATA_PATTERN % prefix}") as f:
return json.loads(f.read())

def saver(self, *args, **kwargs):
Expand Down
8 changes: 6 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os.path as osp
import pytest

from strax import RUN_METADATA_PATTERN
from strax.testutils import *

processing_conditions = pytest.mark.parametrize(
Expand Down Expand Up @@ -89,7 +90,10 @@ def test_filestore(allow_multiprocess, max_workers, processor):
# The first dir contains peaks.
# It should have one data chunk (rechunk is on) and a metadata file
prefix = strax.dirname_to_prefix(data_dirs[0])
assert sorted(os.listdir(data_dirs[0])) == [f"{prefix}-000000", f"{prefix}-metadata.json"]
assert sorted(os.listdir(data_dirs[0])) == [
f"{prefix}-000000",
RUN_METADATA_PATTERN % prefix,
]

# Check metadata got written correctly.
metadata = mystrax.get_metadata(run_id, "peaks")
Expand All @@ -99,7 +103,7 @@ def test_filestore(allow_multiprocess, max_workers, processor):
assert len(metadata["chunks"]) == 1

# Check data gets loaded from cache, not rebuilt
md_filename = osp.join(data_dirs[0], f"{prefix}-metadata.json")
md_filename = osp.join(data_dirs[0], RUN_METADATA_PATTERN % prefix)
mtime_before = osp.getmtime(md_filename)
peaks_2 = mystrax.get_array(run_id=run_id, targets="peaks")
np.testing.assert_array_equal(peaks_1, peaks_2)
Expand Down
4 changes: 3 additions & 1 deletion tests/test_saving.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import tempfile

from strax import RUN_METADATA_PATTERN


class TestPerRunDefaults(unittest.TestCase):
"""Test the saving behavior of the context."""
Expand Down Expand Up @@ -72,7 +74,7 @@ def test_raise_corruption(self):

# copied from FileSytemBackend (maybe abstractify the method separately?)
prefix = strax.dirname_to_prefix(data_path)
metadata_json = f"{prefix}-metadata.json"
metadata_json = RUN_METADATA_PATTERN % prefix
md_path = os.path.join(data_path, metadata_json)
assert os.path.exists(md_path)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def test_check_chunk_n(self):
prefix = strax.storage.files.dirname_to_prefix(backend_key)
md = st_new.get_metadata(self.run_id, self.target)
md["chunks"][0]["n"] += 1
md_path = os.path.join(backend_key, f"{prefix}-metadata.json")
md_path = os.path.join(backend_key, strax.RUN_METADATA_PATTERN % prefix)
with open(md_path, "w") as file:
json.dump(md, file, indent=4)

Expand Down

0 comments on commit f556d63

Please sign in to comment.