diff --git a/alpenhorn/db/storage.py b/alpenhorn/db/storage.py index 022783196..cabcfeb31 100644 --- a/alpenhorn/db/storage.py +++ b/alpenhorn/db/storage.py @@ -367,18 +367,19 @@ def update_avail_gb(self, new_avail: int | None) -> None: """ # The value in the database is in GiB (2**30 bytes) if new_avail is None: - self.avail_gb = None + # Warn unless we never knew the free space + if self.avail_gb is not None: + log.warning(f'Unable to determine available space for "{self.name}".') else: self.avail_gb = new_avail / 2**30 + + # Record check time, even if we failed to update self.avail_gb_last_checked = pw.utcnow() # Update the DB with the free space but don't clobber changes made - # manually to the database + # manually to the database. self.save(only=[StorageNode.avail_gb, StorageNode.avail_gb_last_checked]) - if new_avail is None: - log.info(f'Unable to determine available space for "{self.name}".') - class StorageTransferAction(base_model): """Storage transfer rules for the archive. diff --git a/alpenhorn/io/lfs.py b/alpenhorn/io/lfs.py index bd105ff42..c63f08864 100644 --- a/alpenhorn/io/lfs.py +++ b/alpenhorn/io/lfs.py @@ -18,6 +18,11 @@ * `UNARCHIVED`: file exists on disk, but not on external storage * `RESTORED`: file exists on both disk and external storage * `RELEASED`: file exists in external storage only +* `lfs hsm_action` + retrieves the current action. We use this to detect this + `HSMState`: + * `RESTORING`: file exists in external storage only, but HSM + is in the process of restoring it * `lfs hsm_restore` requests the state change `RELEASED -> RESTORED` * `lfs hsm_release` @@ -55,6 +60,9 @@ class HSMState(Enum): is the state of newly created files until they are archived. HSMState.RELEASED: The file is in external storage but not on disk. + HSMState.RESTORING: + The file is in external storage only, and HSM is in the process of + bringing it back to disk HSMState.RESTORED: The file is both in external storage and on disk. @@ -62,14 +70,18 @@ class HSMState(Enum): our control, but once the file has been archived, it moves from state UNARCHIVED to state RESTORED. - A `lfs.hsm_restore()` changes a file's state from RELEASED to RESTORED. - A `lfs.hsm_release()` changes a file's state from RESTORED to RELEASED. + A `lfs.hsm_restore()` requests a file's state change from RELEASED to RESTORED. + After this call, the files state changes to RESTORING until the restore is + complete. + + A `lfs.hsm_release()` requests a file's state change from RESTORED to RELEASED. """ MISSING = 0 UNARCHIVED = 1 RESTORED = 2 - RELEASED = 3 + RESTORING = 3 + RELEASED = 4 class LFS: @@ -98,6 +110,7 @@ class LFS: HSM_MISSING = HSMState.MISSING HSM_UNARCHIVED = HSMState.UNARCHIVED HSM_RESTORED = HSMState.RESTORED + HSM_RESTORING = HSMState.RESTORING HSM_RELEASED = HSMState.RELEASED def __init__( @@ -114,7 +127,7 @@ def __init__( if self._lfs is None: raise RuntimeError("lfs command not found.") - def run_lfs(self, *args: str, timeout: float | None = None) -> str | False | None: + def run_lfs(self, *args: str) -> dict: """Run the lfs command with the `args` provided. Parameters @@ -122,37 +135,55 @@ def run_lfs(self, *args: str, timeout: float | None = None) -> str | False | Non *args : strings The list of command-line arguments to pass to the lfs command. - timeout : float, optional - If not None, stop waiting for the command after `timeout` seconds - Retunrs + Returns ------- - output : str or False or None - If the command succeeded, returns standard output of - the command. If the command failed or timed out, - returns False (failed) or None (timed out) and logs - the failure. + result : dict + A dict is returned with the following keys: + + - missing : bool + True if the file was missing; False otherwise + - timeout : bool + True if the command timed out; False otherwise + - failed : bool + True if the command failed; False otherwise + - output : str or None + If the command succeeded (i.e. all three booleans + are false), this has the standard output of + the command. Otherwise, this is None. """ + result = {"missing": False, "timeout": False, "failed": False, "output": None} + # Stringify args args = [str(arg) for arg in args] - ret, stdout, stderr = util.run_command([self._lfs] + args, timeout=timeout) + ret, stdout, stderr = util.run_command([self._lfs] + args, timeout=60) + + # Timeout + if ret is None: + log.warning(f"LFS command timed out: " + " ".join(args)) + result["timeout"] = True + return result + + if ret == 0: + # Success, return output + result["output"] = stdout + return result - # Failure or timeout - if ret is None or ret != 0: - if ret is None: - result = "timed out" - else: - result = f"failed (ret={ret})" - ret = False - log.warning(f"LFS command {result}: " + " ".join(args)) - if stderr: - log.debug(f"LFS stderr: {stderr}") - if stdout: - log.debug(f"LFS stdout: {stdout}") - return ret + # Failure, look for a "No such file" remark in stderr + if stderr and "No such file or directory" in stderr: + log.debug(f"LFS missing file: " + " ".join(args)) + result["missing"] = True + else: + # Otherwise, report failure + log.warning(f"LFS command failed: " + " ".join(args)) + result["failed"] = True - return stdout + if stderr: + log.debug(f"LFS stderr: {stderr}") + if stdout: + log.debug(f"LFS stdout: {stdout}") + return result def quota_remaining(self, path: str | os.PathLike) -> int | None: """Retrieve the remaining quota for `path`. @@ -200,7 +231,7 @@ def quota_remaining(self, path: str | os.PathLike) -> int | None: # Stringify path path = str(path) - stdout = self.run_lfs("quota", "-q", "-g", self._quota_group, path) + stdout = self.run_lfs("quota", "-q", "-g", self._quota_group, path)["output"] if stdout is None: return None # Command returned error @@ -246,6 +277,35 @@ def quota_remaining(self, path: str | os.PathLike) -> int | None: # lfs quota reports values in kiByte blocks return (quota_limit - quota) * 2**10 + def hsm_restoring(self, path: os.PathLike | str) -> bool: + """Is HSM processing a restore request? + + Returns True if HSM is currently working on a restore + request for `path`, and False otherwise. + + Runs `lfs hsm_action` to check the current HSM action + or the file. + + Parameters + ---------- + path : path-like + The path to determine the state for. + + Returns + ------- + restoring : bool or None + True if a RESTORE is in progress. False otherwise. + None if the command failed. + """ + + # Stringify path + path = str(path) + + stdout = self.run_lfs("hsm_action", path)["output"] + if stdout is None: + return None # Command returned error + return "RESTORE" in stdout + def hsm_state(self, path: os.PathLike | str) -> HSMState: """Returns the HSM state of path. @@ -261,16 +321,16 @@ def hsm_state(self, path: os.PathLike | str) -> HSMState: failed. If `path` doesn't exist, this will be `HSMState.MISSING`. """ - # No need to check with HSM if the path isn't present - if not pathlib.Path(path).exists(): - return HSMState.MISSING - # Stringify path path = str(path) - stdout = self.run_lfs("hsm_state", path) - if stdout is False: + result = self.run_lfs("hsm_state", path) + if result["failed"] or result["timeout"]: return None # Command returned error + if result["missing"]: + return HSMState.MISSING + + stdout = result["output"] # The output of hsm_state looks like this: # @@ -301,18 +361,29 @@ def hsm_state(self, path: os.PathLike | str) -> HSMState: # See llapi_hsm_state_get(3) for full details about these. if "archived" not in stdout: return HSMState.UNARCHIVED - if "released" in stdout: - return HSMState.RELEASED - return HSMState.RESTORED + if "released" not in stdout: + return HSMState.RESTORED + + # File is released, so now run `hsm_action` to see if a restore is in + # progress + if self.hsm_restoring(path): + return HSMState.RESTORING + + return HSMState.RELEASED def hsm_archived(self, path: os.PathLike) -> bool: """Is `path` archived by HSM?""" state = self.hsm_state(path) - return state == HSMState.RESTORED or state == HSMState.RELEASED + return ( + state == HSMState.RESTORED + or state == HSMState.RESTORING + or state == HSMState.RELEASED + ) def hsm_released(self, path: os.PathLike) -> bool: """Is `path` released to external storage?""" - return self.hsm_state(path) == HSMState.RELEASED + state = self.hsm_state(path) + return state == HSMState.RELEASED or state == HSMState.RESTORING def hsm_restore(self, path: os.PathLike) -> bool | None: """Trigger restore of `path` from external storage. @@ -343,9 +414,11 @@ def hsm_restore(self, path: os.PathLike) -> bool | None: if state != HSMState.RELEASED: return True - result = self.run_lfs("hsm_restore", path, timeout=60) - if result is None or result is False: - return result + result = self.run_lfs("hsm_restore", path) + if result["missing"]: + return False + if result["timeout"] or result["failed"]: + return None return True def hsm_release(self, path: os.PathLike) -> bool: diff --git a/alpenhorn/io/lustrehsm.py b/alpenhorn/io/lustrehsm.py index 5e527f132..c07b5b48c 100644 --- a/alpenhorn/io/lustrehsm.py +++ b/alpenhorn/io/lustrehsm.py @@ -38,10 +38,6 @@ log = logging.getLogger(__name__) -# Retry delays (in seconds) after a successful or timed-out hsm_restore request -RESTORE_TIMEOUT_RETRY = 1 * 3600 # 1 hour -RESTORE_SUCCESS_RETRY = 4 * 3600 # 4 hours - class LustreHSMNodeRemote(BaseNodeRemote): """LustreHSMNodeRemote: information about a LustreHSM remote node.""" @@ -96,17 +92,13 @@ def __init__( self._headroom = config["headroom"] * 2**10 # convert from kiB # QueryWalker for the HSM state check - self._release_qw = None + self._statecheck_qw = None # Tracks files we're in the process of retrieving, so we can avoid # waiting for the same file twice. The elements in this set are # `ArchiveFile.id`s self._restoring = set() - # The time.monotonic value after which we should try to restore again. - # Keys are elements in self._restoring. - self._restore_retry = dict() - # For informational purposes. Keys are elements in self._restoring. self._restore_start = dict() @@ -140,14 +132,28 @@ def _restore_wait(self, copy: ArchiveFileCopy) -> bool | None: # What's the current situation? state = self._lfs.hsm_state(copy.path) + if state == None: + log.warning(f"Unable to restore {copy.path}: state check failed.") + self._restore_start.pop(copy.file.id, None) + self._restoring.discard(copy.file.id) + return None + if state == self._lfs.HSM_MISSING: log.warning(f"Unable to restore {copy.path}: missing.") self._restore_start.pop(copy.file.id, None) - self._restore_retry.pop(copy.file.id, None) self._restoring.discard(copy.file.id) return None - if state != self._lfs.HSM_RELEASED: + if state == self._lfs.HSM_RESTORING: + if copy.file.id not in self._restoring: + self._restoring.add(copy.file.id) + self._restore_start[copy.file.id] = time.monotonic() + + log.debug(f"Restore in progress: {copy.path}") + + # Tell the caller to wait + return True + elif state != self._lfs.HSM_RELEASED: # i.e. file is restored or unarchived, so we're done. if copy.file.id not in self._restoring: log.debug(f"Already restored: {copy.path}") @@ -157,54 +163,30 @@ def _restore_wait(self, copy: ArchiveFileCopy) -> bool | None: f"{copy.file.path} restored on node {self.node.name}" f"after {pretty_deltat(deltat)}" ) - self._restore_start.pop(copy.file.id, None) - self._restore_retry.pop(copy.file.id, None) + del self._restore_start[copy.file.id] self._restoring.discard(copy.file.id) return False # If we got here, copy is released. + if copy.file.id not in self._restoring: + self._restoring.add(copy.file.id) + self._restore_start[copy.file.id] = time.monotonic() - # Have we hit the retry time for an in-progress restore? - retry_restore = ( - copy.file.id in self._restoring - and self._restore_retry[copy.file.id] <= time.monotonic() - ) - - if retry_restore or copy.file.id not in self._restoring: - # Add this copy to the list of copies we're waiting on. Other tasks - # can use this to see if the file they're interested in is already - # "in use". - if not retry_restore: - self._restoring.add(copy.file.id) - self._restore_start[copy.file.id] = time.monotonic() - - # Try to restore it. - result = self._lfs.hsm_restore(copy.path) - log.warning(f"restore result: {result}") + # Try to restore it. + result = self._lfs.hsm_restore(copy.path) - if result is False: - # Reqeust failed. Abandon the restore attempt entirely, - # in case it was deleted from the node. - self._restore_retry.pop(copy.file.id, None) - self._restore_start.pop(copy.file.id, None) - self._restoring.discard(copy.file.id) + if result is False: + log.warning(f"Restore request failed: {copy.path}") + # Reqeust failed. Abandon the restore attempt entirely, + # in case it was deleted from the node. + del self._restore_start[copy.file.id] + self._restoring.discard(copy.file.id) - # Report failure - return None - elif result is None: - # Request timeout. This seems to happen when the file - # is already being restored, but let's try again in a - # little while, just to be safe. - self._restore_retry[copy.file.id] = ( - time.monotonic() + RESTORE_TIMEOUT_RETRY - ) - else: - # Request success; restore should be in-progress, but - # we'll still schedule a retry for some time in the future - # to guard against HSM forgetting/abandonning our request - self._restore_retry[copy.file.id] = ( - time.monotonic() + RESTORE_SUCCESS_RETRY - ) + # Report failure + return None + elif result is None: + # Might have worked. Caller should check again in a bit. + log.warning(f"Restore request timeout: {copy.path}") # Tell the caller to wait return True @@ -217,7 +199,13 @@ def release_files(self) -> None: to happen (i.e. the node is currently idle). """ - headroom_needed = self._headroom - self._lfs.quota_remaining(self.node.root) + # Can't do anything if we don't know how much free space we have + size_gib = self.node.avail_gb + if size_gib is None: + log.debug("Skipping release_files: free space unknown.") + return + + headroom_needed = self._headroom - int(size_gib * 2**30) # Nothing to do if headroom_needed <= 0: @@ -238,8 +226,8 @@ def _async(task, node, lfs, headroom_needed): ) .order_by(ArchiveFileCopy.last_update) ): - # Skip unarchived files - if not lfs.hsm_archived(copy.path): + # The only files we can release are ones that are fully restored + if lfs.hsm_state(copy.path) != lfs.HSM_RESTORED: continue log.debug( @@ -308,9 +296,9 @@ def idle_update(self, newly_idle) -> None: super().idle_update(newly_idle) # Check the query walker. Initialised if necessary. - if self._release_qw is None: + if self._statecheck_qw is None: try: - self._release_qw = QueryWalker( + self._statecheck_qw = QueryWalker( ArchiveFileCopy, ArchiveFileCopy.node == self.node, ArchiveFileCopy.has_file == "Y", @@ -320,17 +308,21 @@ def idle_update(self, newly_idle) -> None: # Try to get a bunch of copies to check try: - copies = self._release_qw.get(self._nrelease) + copies = self._statecheck_qw.get(self._nrelease) except pw.DoesNotExist: # Not sure why all the file copies have gone away, but given there's # nothing on the node now, can't hurt to re-init the QW in this case - self._release_qw = None + self._statecheck_qw = None return def _async(task, node, lfs, copies): for copy in copies: state = lfs.hsm_state(copy.path) - if state == lfs.HSM_MISSING: + if state is None: + log.warning( + f"Unable to determine state for {copy.file.path} on node {node.name}." + ) + elif state == lfs.HSM_MISSING: # File is unexpectedly gone. log.warning( f"File copy {copy.file.path} on node {node.name} is missing!" @@ -338,7 +330,7 @@ def _async(task, node, lfs, copies): ArchiveFileCopy.update( has_file="N", ready=False, last_update=utcnow() ).where(ArchiveFileCopy.id == copy.id).execute() - elif state == lfs.HSM_RELEASED: + elif state == lfs.HSM_RELEASED or state == lfs.HSM_RESTORING: if copy.ready: log.info(f"Updating file copy {copy.file.path}: ready -> False") ArchiveFileCopy.update(ready=False, last_update=utcnow()).where( @@ -452,6 +444,19 @@ def check_active(self) -> bool: """ return self.node.active + def exists(self, path: pathlib.PurePath) -> bool: + """Does `path` exist? + + Checks whether `lfs hsm_state` returns ENOENT. + + Parameters + ---------- + path : pathlib.PurePath + path relative to `node.root` + """ + full_path = pathlib.PurePath(self.node.root).joinpath(path) + return self._lfs.hsm_state(full_path) != self._lfs.HSM_MISSING + def filesize(self, path: pathlib.Path, actual: bool = False) -> int: """Return size in bytes of the file given by `path`. @@ -508,7 +513,10 @@ def open(self, path: os.PathLike | str, binary: bool = True) -> IO: # Make abs path p = pathlib.Path(self.node.root, path) - if self._lfs.hsm_released(p): + state = self._lfs.hsm_state(p) + if state is None: + raise OSError(f"Can't get state for {path}.") + if state != self._lfs.HSM_RESTORED and state != self._lfs.HSM_UNARCHIVED: raise OSError(f"{path} is not restored.") return open(p, mode="rb" if binary else "rt") @@ -559,7 +567,7 @@ def _async(task: Task, node_io: LustreHSMNodeIO, file_: ArchiveFile): restore_wait = node_io._restore_wait(copy) while restore_wait: # Wait for a bit - yield 60 + yield 600 # Now check again restore_wait = node_io._restore_wait(copy) diff --git a/alpenhorn/server/update.py b/alpenhorn/server/update.py index 27df4d081..ed2c9a1e4 100644 --- a/alpenhorn/server/update.py +++ b/alpenhorn/server/update.py @@ -301,8 +301,10 @@ def update_free_space(self) -> None: self.db.update_avail_gb(bytes_avail) - if bytes_avail is not None: - log.info(f"Node {self.name}: {util.pretty_bytes(bytes_avail)} available.") + if self.db.avail_gb is not None: + log.info( + f"Node {self.name}: {util.pretty_bytes(self.db.avail_gb * 2**30)} available." + ) def run_auto_verify(self) -> None: """Run auto-verification on this node. diff --git a/tests/conftest.py b/tests/conftest.py index 8dc99ed7a..0af8d8861 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -55,7 +55,7 @@ def pytest_configure(config): "lfs_hsm_restore_result(result): " "used on tests which mock alpenhorn.io.lfs.LFS " "to indicate the result of the hsm_restore call. result " - "may be 'fail', 'timeout', or 'wait'", + "may be 'fail', 'timeout', 'wait', or 'restore'", ) config.addinivalue_line( "markers", @@ -242,6 +242,8 @@ def _mocked_lfs_hsm_state(self, path): return HSMState.UNARCHIVED if state == "restored": return HSMState.RESTORED + if state == "restoring": + return HSMState.RESTORING if state == "released": return HSMState.RELEASED @@ -250,6 +252,9 @@ def _mocked_lfs_hsm_state(self, path): def _mocked_lfs_hsm_restore(self, path): nonlocal request, lfs_hsm_state + # de-pathlib-ify + path = str(path) + marker = request.node.get_closest_marker("lfs_hsm_restore_result") if marker: if marker.args[0] == "fail": @@ -260,16 +265,21 @@ def _mocked_lfs_hsm_restore(self, path): return None if marker.args[0] == "wait": # Return true (successful request) - # without changing state to "restored" + # without full restore + lfs_hsm_state[path] = "restoring" + return True + if marker.args[0] == "restore": + # Return true (successful request) + # with full restore + lfs_hsm_state[path] = "restored" return True - - # de-pathlib-ify - path = str(path) state = lfs_hsm_state.get(path, "missing") if state == "missing": return False if state == "released": + lfs_hsm_state[path] = "restoring" + elif state == "restoring": lfs_hsm_state[path] = "restored" return True @@ -280,12 +290,14 @@ def _mocked_lfs_hsm_release(self, path): path = str(path) state = lfs_hsm_state.get(path, "missing") - if state == "missing" or state == "unarchived": - return False + if state == "released": + return True if state == "restored": lfs_hsm_state[path] = "released" + return True - return True + # Missing, unarchived, or restoring + return False marker = request.node.get_closest_marker("lfs_quota_remaining") if marker is None: diff --git a/tests/db/test_storage.py b/tests/db/test_storage.py index 5c7582863..bfc1aa060 100644 --- a/tests/db/test_storage.py +++ b/tests/db/test_storage.py @@ -368,13 +368,22 @@ def test_update_avail_gb(simplenode): node = StorageNode.get(id=simplenode.id) after = pw.utcnow() - assert node.avail_gb == 10000.0 / 2.0**30 + avail = node.avail_gb + assert avail == 10000.0 / 2.0**30 assert node.avail_gb_last_checked >= before assert node.avail_gb_last_checked <= after - # Test None + # Reset time + StorageNode.update(avail_gb_last_checked=0).where( + StorageNode.id == simplenode.id + ).execute() + + # Test None -- shouldn't change value, but last + # update has happened simplenode.update_avail_gb(None) - assert StorageNode.get(id=simplenode.id).avail_gb is None + node = StorageNode.get(id=simplenode.id) + assert node.avail_gb == avail + assert node.avail_gb_last_checked >= after def test_edge_model(storagetransferaction, storagenode, storagegroup): diff --git a/tests/io/test_lfs.py b/tests/io/test_lfs.py index 9a54c3faf..689781a24 100644 --- a/tests/io/test_lfs.py +++ b/tests/io/test_lfs.py @@ -4,6 +4,7 @@ import pathlib from alpenhorn.io.lfs import LFS +from alpenhorn.db import StorageNode @pytest.mark.run_command_result(0, "lfs_out", "lfs_err") @@ -12,11 +13,16 @@ def test_run_lfs_success(have_lfs, mock_run_command): lfs = LFS(None) - assert lfs.run_lfs("arg1", "arg2") == "lfs_out" + assert lfs.run_lfs("arg1", "arg2") == { + "failed": False, + "missing": False, + "output": "lfs_out", + "timeout": False, + } assert mock_run_command() == { "cmd": ["LFS", "arg1", "arg2"], "kwargs": dict(), - "timeout": None, + "timeout": 60, } @@ -26,11 +32,11 @@ def test_run_lfs_stringify(have_lfs, mock_run_command): lfs = LFS(None) - assert lfs.run_lfs(pathlib.Path("path"), 2) == "lfs_out" + assert lfs.run_lfs(pathlib.Path("path"), 2)["output"] == "lfs_out" assert mock_run_command() == { "cmd": ["LFS", "path", "2"], "kwargs": dict(), - "timeout": None, + "timeout": 60, } @@ -40,11 +46,56 @@ def test_run_lfs_fail(have_lfs, mock_run_command): lfs = LFS(None) - assert lfs.run_lfs("arg1", "arg2") is False + assert lfs.run_lfs("arg1", "arg2") == { + "failed": True, + "missing": False, + "output": None, + "timeout": False, + } + assert mock_run_command() == { + "cmd": ["LFS", "arg1", "arg2"], + "kwargs": dict(), + "timeout": 60, + } + + +@pytest.mark.run_command_result(None, "lfs_out", "lfs_err") +def test_run_lfs_timeout(have_lfs, mock_run_command): + """Test timed out invocation of lfs.run_lfs.""" + + lfs = LFS(None) + + assert lfs.run_lfs("arg1", "arg2") == { + "failed": False, + "missing": False, + "output": None, + "timeout": True, + } + assert mock_run_command() == { + "cmd": ["LFS", "arg1", "arg2"], + "kwargs": dict(), + "timeout": 60, + } + + +@pytest.mark.run_command_result( + 2, "lfs_out", "Something didn't work: No such file or directory" +) +def test_run_lfs_missing(have_lfs, mock_run_command): + """Test ENOENT from lfs.run_fls.""" + + lfs = LFS(None) + + assert lfs.run_lfs("arg1", "arg2") == { + "failed": False, + "missing": True, + "output": None, + "timeout": False, + } assert mock_run_command() == { "cmd": ["LFS", "arg1", "arg2"], "kwargs": dict(), - "timeout": None, + "timeout": 60, } @@ -143,7 +194,9 @@ def test_quota_fixed(have_lfs, mock_run_command): assert lfs.quota_remaining("/path") == (2500 - 1234) * 2**10 -@pytest.mark.run_command_result(1, None, None) +@pytest.mark.run_command_result( + 2, "", "Something didn't work: No such file or directory" +) def test_hsm_state_missing(xfs, have_lfs, mock_run_command): """Test hsm_state on a missing file.""" diff --git a/tests/io/test_lustrehsmgroup.py b/tests/io/test_lustrehsmgroup.py index 0eab04502..b22095cf2 100644 --- a/tests/io/test_lustrehsmgroup.py +++ b/tests/io/test_lustrehsmgroup.py @@ -131,6 +131,13 @@ def test_idle(queue, group): assert group.idle is True +@pytest.mark.lfs_hsm_state( + { + "/hsm/test/one": "restored", + "/hsm/test/two": "released", + "/hsm/test/three": "missing", + } +) def test_exists(xfs, group): """Test TransportGroupIO.exists().""" group, hsm, smallfile = group diff --git a/tests/io/test_lustrehsmnode.py b/tests/io/test_lustrehsmnode.py index babed8534..268627db7 100644 --- a/tests/io/test_lustrehsmnode.py +++ b/tests/io/test_lustrehsmnode.py @@ -21,7 +21,7 @@ def node( simplenode.io_class = "LustreHSM" simplenode.io_config = ( - '{"quota_group": "qgroup", "headroom": 10250, "release_check_count": 6}' + '{"quota_group": "qgroup", "headroom": 10250, "release_check_count": 7}' ) # Some files @@ -32,10 +32,11 @@ def node( archivefile(name="file4", acq=simpleacq, size_b=800000), archivefile(name="file5", acq=simpleacq, size_b=50000), archivefile(name="file6", acq=simpleacq, size_b=300000), + archivefile(name="file7", acq=simpleacq, size_b=200000), ] # Some copies - last_updates = [3, 1, 5, 6, 2, 4] + last_updates = [3, 1, 6, 7, 2, 4, 5] for num, file in enumerate(files): archivefilecopy(file=file, node=simplenode, has_file="Y", size_b=10, ready=True) # We need to do it this way to set last_update @@ -68,17 +69,18 @@ def test_init_bad_release_count(simplenode, have_lfs): UpdateableNode(None, simplenode) -@pytest.mark.lfs_quota_remaining(20000000) def test_release_files_okay(queue, node): """Test running release_files when we're under headroom""" + node.db.avail_gb = 20000000.0 / 2**30 + node.db.save() + node.io.release_files() # Shouldn't be anything in the queue assert queue.qsize == 0 -@pytest.mark.lfs_quota_remaining(10000000) @pytest.mark.lfs_hsm_state( { "/node/simpleacq/file1": "restored", @@ -87,11 +89,18 @@ def test_release_files_okay(queue, node): "/node/simpleacq/file4": "restored", "/node/simpleacq/file5": "restored", "/node/simpleacq/file6": "unarchived", + "/node/simpleacq/file7": "restoring", } ) def test_release_files(queue, mock_lfs, node): """Test running release_files.""" + node.db.avail_gb = 10000000.0 / 2**30 + node.db.save() + + # File7 is not ready + ArchiveFileCopy.update(ready=False).where(ArchiveFileCopy.id == 7).execute() + node.io.release_files() before = pw.utcnow().replace(microsecond=0) @@ -110,7 +119,8 @@ def test_release_files(queue, mock_lfs, node): # - file5: 50 kB [ last_update = 2 ] # - file1: 100 kB [ last_update = 3 ] # - file6 is skipped because it's not archived [last_update = 4] - # - file3: 400 kB [ last_update = 5 ] + # - file7 is skipped because it's being restored [last_update = 5] + # - file3: 400 kB [ last_update = 6 ] # file4 remains restored assert not ArchiveFileCopy.get(id=1).ready assert ArchiveFileCopy.get(id=1).last_update >= before @@ -126,7 +136,8 @@ def test_release_files(queue, mock_lfs, node): assert not ArchiveFileCopy.get(id=5).ready assert ArchiveFileCopy.get(id=5).last_update >= before - assert ArchiveFileCopy.get(id=6).ready + assert ArchiveFileCopy.get(id=6).last_update == 4 + assert ArchiveFileCopy.get(id=7).last_update == 5 # Check hsm_relase was actually called lfs = mock_lfs("") @@ -136,12 +147,15 @@ def test_release_files(queue, mock_lfs, node): assert lfs.hsm_state("/node/simpleacq/file4") == lfs.HSM_RESTORED assert lfs.hsm_state("/node/simpleacq/file5") == lfs.HSM_RELEASED assert lfs.hsm_state("/node/simpleacq/file6") == lfs.HSM_UNARCHIVED + assert lfs.hsm_state("/node/simpleacq/file7") == lfs.HSM_RESTORING -@pytest.mark.lfs_quota_remaining(10000000) def test_before_update(queue, node): """Test LustreHSMNodeIO.before_update()""" + node.db.avail_gb = 10000000.0 / 2**30 + node.db.save() + # When not idle, the release_files task is not run node.io.before_update(idle=False) @@ -164,36 +178,48 @@ def test_filesize(xfs, node): { "/node/dir/file1": "released", "/node/dir/file2": "restored", + "/node/dir/file3": "restoring", } ) def test_open_binary(xfs, node): """Test binary LustreHSMNodeIO.open()""" - xfs.create_file("/node/dir/file2", contents="file contents") + xfs.create_file("/node/dir/file1", contents="file1 contents") + xfs.create_file("/node/dir/file2", contents="file2 contents") + xfs.create_file("/node/dir/file3", contents="file3 contents") with pytest.raises(OSError): node.io.open("dir/file1", binary=True) with node.io.open("dir/file2", binary=True) as f: - assert f.read() == b"file contents" + assert f.read() == b"file2 contents" + + with pytest.raises(OSError): + node.io.open("dir/file3", binary=True) @pytest.mark.lfs_hsm_state( { "/node/dir/file1": "released", "/node/dir/file2": "restored", + "/node/dir/file3": "restoring", } ) def test_open_text(xfs, node): """Test text LustreHSMNodeIO.open()""" - xfs.create_file("/node/dir/file2", contents="file contents") + xfs.create_file("/node/dir/file1", contents="file1 contents") + xfs.create_file("/node/dir/file2", contents="file2 contents") + xfs.create_file("/node/dir/file3", contents="file3 contents") with pytest.raises(OSError): node.io.open("dir/file1", binary=False) with node.io.open("dir/file2", binary=False) as f: - assert f.read() == "file contents" + assert f.read() == "file2 contents" + + with pytest.raises(OSError): + node.io.open("dir/file3", binary=False) def test_check_missing(queue, node): @@ -258,6 +284,7 @@ def test_check_ready_restored(xfs, queue, node, mock_lfs): "/node/simpleacq/file1": "released", } ) +@pytest.mark.lfs_hsm_restore_result("restore") def test_check_released(xfs, queue, mock_lfs, node): """Test check on a non-ready, released file.""" @@ -301,6 +328,7 @@ def test_check_released(xfs, queue, mock_lfs, node): "/node/simpleacq/file1": "released", } ) +@pytest.mark.lfs_hsm_restore_result("restore") def test_check_ready_released(xfs, queue, mock_lfs, node): """Test check on a ready, released file.""" @@ -341,6 +369,7 @@ def test_check_ready_released(xfs, queue, mock_lfs, node): "/node/simpleacq/file2": "released", "/node/simpleacq/file3": "unarchived", "/node/simpleacq/file4": "missing", + "/node/simpleacq/file5": "restoring", } ) def test_ready_path(mock_lfs, node): @@ -351,13 +380,15 @@ def test_ready_path(mock_lfs, node): assert not node.io.ready_path("/node/simpleacq/file2") assert node.io.ready_path("/node/simpleacq/file3") assert not node.io.ready_path("/node/simpleacq/file4") + assert not node.io.ready_path("/node/simpleacq/file5") # But now released file is recalled. lfs = mock_lfs("") assert lfs.hsm_state("/node/simpleacq/file1") == lfs.HSM_RESTORED - assert lfs.hsm_state("/node/simpleacq/file2") == lfs.HSM_RESTORED + assert lfs.hsm_state("/node/simpleacq/file2") == lfs.HSM_RESTORING assert lfs.hsm_state("/node/simpleacq/file3") == lfs.HSM_UNARCHIVED assert lfs.hsm_state("/node/simpleacq/file4") == lfs.HSM_MISSING + assert lfs.hsm_state("/node/simpleacq/file5") == lfs.HSM_RESTORING @pytest.mark.lfs_hsm_state( @@ -396,6 +427,41 @@ def test_ready_pull_restored(mock_lfs, node, queue, archivefilecopyrequest): assert lfs.hsm_state(copy.path) == lfs.HSM_RESTORED +@pytest.mark.lfs_hsm_state( + { + "/node/simpleacq/file1": "restoring", + } +) +def test_ready_pull_restoring(mock_lfs, node, queue, archivefilecopyrequest): + """Test LustreHSMNodeIO.ready_pull on file already being restored.""" + + before = pw.utcnow().replace(microsecond=0) + + copy = ArchiveFileCopy.get(id=1) + copy.ready = False + copy.save() + afcr = archivefilecopyrequest( + file=copy.file, node_from=node.db, group_to=node.db.group + ) + + node.io.ready_pull(afcr) + + # Task in queue + assert queue.qsize == 1 + + # Run task + task, key = queue.get() + task() + queue.task_done(key) + + # File is still not ready + assert not ArchiveFileCopy.get(id=1).ready + + # File is still being restored + lfs = mock_lfs("") + assert lfs.hsm_state(copy.path) == lfs.HSM_RESTORING + + @pytest.mark.lfs_hsm_state( { "/node/simpleacq/file1": "released", @@ -430,12 +496,9 @@ def test_ready_pull_released(mock_lfs, node, queue, archivefilecopyrequest): # Don't wait for the deferral to expire, just run the task again task() - # File is now ready - assert ArchiveFileCopy.get(id=1).ready - - # File is restored + # File is being restored lfs = mock_lfs("") - assert lfs.hsm_state(copy.path) == lfs.HSM_RESTORED + assert lfs.hsm_state(copy.path) == lfs.HSM_RESTORING def test_idle_update_empty(queue, mock_lfs, node): @@ -447,7 +510,7 @@ def test_idle_update_empty(queue, mock_lfs, node): node.io.idle_update(False) # QW has not been initialised - assert node.io._release_qw is None + assert node.io._statecheck_qw is None # No item in queue assert queue.qsize == 0 @@ -459,6 +522,7 @@ def test_idle_update_empty(queue, mock_lfs, node): "/node/simpleacq/file2": "restored", "/node/simpleacq/file3": "unarchived", "/node/simpleacq/file4": "missing", + "/node/simpleacq/file5": "restoring", } ) def test_idle_update_ready(xfs, queue, mock_lfs, node): @@ -469,7 +533,7 @@ def test_idle_update_ready(xfs, queue, mock_lfs, node): node.io.idle_update(False) # QW has been initialised - assert node.io._release_qw is not None + assert node.io._statecheck_qw is not None # Item in queue assert queue.qsize == 1 @@ -492,6 +556,9 @@ def test_idle_update_ready(xfs, queue, mock_lfs, node): assert ArchiveFileCopy.get(id=4).last_update >= before assert ArchiveFileCopy.get(id=4).has_file == "N" + # Copy five is not ready (being restored) + assert not ArchiveFileCopy.get(id=5).ready + @pytest.mark.lfs_hsm_state( { @@ -499,6 +566,7 @@ def test_idle_update_ready(xfs, queue, mock_lfs, node): "/node/simpleacq/file2": "restored", "/node/simpleacq/file3": "unarchived", "/node/simpleacq/file4": "missing", + "/node/simpleacq/file5": "restoring", } ) def test_idle_update_not_ready(xfs, queue, mock_lfs, node): @@ -512,7 +580,7 @@ def test_idle_update_not_ready(xfs, queue, mock_lfs, node): node.io.idle_update(False) # QW has been initialised - assert node.io._release_qw is not None + assert node.io._statecheck_qw is not None # Item in queue assert queue.qsize == 1 @@ -536,6 +604,9 @@ def test_idle_update_not_ready(xfs, queue, mock_lfs, node): assert ArchiveFileCopy.get(id=4).last_update >= before assert ArchiveFileCopy.get(id=4).has_file == "N" + # Copy five is not ready (being restored) + assert not ArchiveFileCopy.get(id=5).ready + @pytest.mark.lfs_hsm_state({"/node/simpleacq/file1": "released"}) @pytest.mark.lfs_hsm_restore_result("wait") @@ -567,7 +638,6 @@ def test_hsm_restore_twice(xfs, queue, mock_lfs, node): # Check the internal bookkeeping assert copy.id in node.io._restoring assert copy.id in node.io._restore_start - assert copy.id in node.io._restore_retry # Try to add another task node.io.check(copy) @@ -609,7 +679,6 @@ def test_hsm_restore_timeout(xfs, queue, mock_lfs, node): # Check the internal bookkeeping assert copy.id in node.io._restoring assert copy.id in node.io._restore_start - assert copy.id in node.io._restore_retry @pytest.mark.lfs_hsm_state({"/node/simpleacq/file1": "released"}) @@ -642,4 +711,3 @@ def test_hsm_restore_fail(xfs, queue, mock_lfs, node): # Check the internal bookkeeping assert copy.id not in node.io._restoring assert copy.id not in node.io._restore_start - assert copy.id not in node.io._restore_retry diff --git a/tests/server/test_auto_import.py b/tests/server/test_auto_import.py index 2b7310f2a..0481fd2c7 100644 --- a/tests/server/test_auto_import.py +++ b/tests/server/test_auto_import.py @@ -9,6 +9,7 @@ from alpenhorn.db.archive import ArchiveFileCopy from alpenhorn.db.acquisition import ArchiveAcq, ArchiveFile +from alpenhorn.io.lfs import HSMState from alpenhorn.server import auto_import from alpenhorn.server.update import UpdateableNode @@ -54,8 +55,8 @@ def test_import_file_not_ready(dbtables, queue, simplenode, mock_lfs): # _import_file is a generator function, so it needs to be interated to run. assert next(auto_import._import_file(None, unode, pathlib.PurePath("acq/file"))) > 0 - # File has been restored - assert not mock_lfs("").hsm_released("/node/acq/file") + # File is being restored + assert mock_lfs("").hsm_state("/node/acq/file") == HSMState.RESTORING def test_import_file_no_ext(dbtables, unode): diff --git a/tests/server/test_service.py b/tests/server/test_service.py index 567ea1bfa..573a065d7 100644 --- a/tests/server/test_service.py +++ b/tests/server/test_service.py @@ -109,6 +109,7 @@ def e2e_db(xfs, clidb_noinit, hostname): root="/nl1", host=hostname, active=True, + avail_gb=2000.0 / 2**30, # This is mostly ignored io_config='{"quota_group": "qgroup", "headroom": 10}', ) @@ -128,6 +129,7 @@ def e2e_db(xfs, clidb_noinit, hostname): host=hostname, active=True, io_config='{"quota_group": "qgroup", "headroom": 1, "release_check_count": 1}', + avail_gb=2000.0 / 2**30, ) StorageNode.create(name="sf2", group=nlgrp, root="/sf2", host=hostname, active=True) xfs.create_file("/sf2/ALPENHORN_NODE", contents="sf2") @@ -305,7 +307,6 @@ def e2e_config(xfs, hostname, clidb_uri): xfs.create_file("/etc/alpenhorn/alpenhorn.conf", contents=yaml.dump(config)) -@pytest.mark.lfs_quota_remaining(2000) @pytest.mark.lfs_hsm_state( { "/nl2/acq1/correct.me": "restored", @@ -323,7 +324,7 @@ def test_cli(e2e_db, e2e_config, mock_lfs, mock_rsync, loop_once): # Check HSM lfs = mock_lfs(quota_group="qgroup") assert lfs.hsm_state("/nl2/acq1/correct.me") == lfs.HSM_RESTORED - assert lfs.hsm_state("/nl1/acq1/restore.me") == lfs.HSM_RESTORED + assert lfs.hsm_state("/nl1/acq1/restore.me") == lfs.HSM_RESTORING assert lfs.hsm_state("/nl1/acq1/release.me") == lfs.HSM_RELEASED # Check results