diff --git a/alpenhorn/io/lustrehsm.py b/alpenhorn/io/lustrehsm.py index ffe959407..488ffc6e9 100644 --- a/alpenhorn/io/lustrehsm.py +++ b/alpenhorn/io/lustrehsm.py @@ -15,6 +15,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, IO +import time import logging import pathlib import peewee as pw @@ -23,7 +24,7 @@ from ..archive import ArchiveFileCopy from ..querywalker import QueryWalker from ..task import Task -from ..util import pretty_bytes +from ..util import pretty_bytes, pretty_deltat from .base import BaseGroupIO, BaseNodeRemote from .lustrequota import LustreQuotaNodeIO @@ -94,6 +95,14 @@ def __init__( # QueryWalker for the HSM state check self._release_qw = None + # Tracks files we're in the process of retrieving, so we can avoid + # waiting for the same file twice. The elements in this set are + # `ArchiveFile.id`s + self._restoring = set() + + # For informational purposes. Keys are elements in self._restoring. + self._restore_start = dict() + # For idle-time HSM state updates self._nrelease = config.get("release_check_count", 100) if self._nrelease < 1: @@ -101,6 +110,65 @@ def __init__( f"io_config key 'release_check_count' non-positive (={self._nrelease})" ) + def _restore_wait(self, copy: ArchiveFileCopy) -> bool | None: + """Attempt to restore a file from HSM. + + The caller should call this method repeatedly while it returns `True`, + indicating the file has not yet been restored, with a suitable wait + between calls. + + Parameters + ---------- + copy : ArchiveFileCopy + The file copy to restore + + Returns + ------- + result, one of: + * True: file is not restored. Caller should wait and try again. + * False: file is restored. Caller may now access the restored file. + * None: an error occurred. Caller should stop trying to access the file. + """ + + # What's the current situation? + state = self._lfs.hsm_state(copy.path) + + if state == self._lfs.HSM_MISSING: + log.warning(f"Unable to restore {copy.path}: missing.") + self._restore_start.pop(copy.file.id, None) + self._restoring.discard(copy.file.id) + return None + + if state != self._lfs.HSM_RELEASED: + # i.e. file is restored or unarchived, so we're done. + if copy.file.id not in self._restoring: + log.debug(f"Already restored: {copy.path}") + else: + deltat = time.monotonic() - self._restore_start[copy.file.id] + log.info( + f"{copy.file.path} restored on node {self.node.name}" + f"after {pretty_deltat(deltat)}" + ) + self._restore_start.pop(copy.file.id, None) + self._restoring.discard(copy.file.id) + return False + + # If we got here, copy is released. + + # Add this copy to the list of copies we're waiting on. Other tasks + # can use this to see if the file they're interested in is already + # "in use". + if copy.file.id not in self._restoring: + self._restoring.add(copy.file.id) + self._restore_start[copy.file.id] = time.monotonic() + + # Restore it. We deliberately do this every time, to hedge against + # our initial request being forgotten/ignored by HSM. + self._lfs.hsm_restore(copy.path) + + # Tell the caller to wait + return True + def release_files(self) -> None: """Release files from the HSM disk to keep the headroom clear. @@ -275,19 +343,15 @@ def check(self, copy: ArchiveFileCopy) -> None: the file copy to auto-verify """ - def _async( - task: Task, node: UpdateableNode, lfs: LFS, copy: ArchiveFileCopy - ) -> None: + def _async(task: Task, node_io: LustreHSMNodeIO, copy: ArchiveFileCopy) -> None: """Verify a file (with restore and release, if necessary). Parameters ---------- task : Task The task instance containing this async. - node : UpdateableNode + node_io : LustreHSMNodeIO The node we're running on - lfs : LFS - The lfs(1) wrapper copy : ArchiveFileCopy The file copy to check """ @@ -306,42 +370,39 @@ def _async( return # Trigger restore, if necessary - if lfs.hsm_released(copy.path): - lfs.hsm_restore(copy.path) - - # While the file is not restored, yield to wait for later - while lfs.hsm_released(copy.path): - # Has someone else come by while we've been waiting and - # checked the file? - if copy.has_file != "M": - log.debug( - f"File copy {copy.path} no longer needs " - f"check (has_file={copy.has_file})" - ) - - # Assume whatever did the other check has re-released - # the file if necessary - return - - # Wait for a bit - yield 60 + restore_wait = node_io._restore_wait(copy) + while restore_wait: + # Wait for a bit + yield 60 + + # Now check again + restore_wait = node_io._restore_wait(copy) + + # If the restore failed, bail. + if restore_wait is None: + log.debug(f"Aborting check of {copy.path} after failed restore.") + return # Do the check by inlining the Default-I/O function from ._default_asyncs import check_async - check_async(task, node, copy) + check_async(task, node_io.node, copy) # Release the file if the DB says it should be if not ArchiveFileCopy.get(id=copy.id).ready: - lfs.hsm_release(copy.path) - - Task( - func=_async, - queue=self._queue, - key=self.node.name, - args=(self.node, self._lfs, copy), - name=f"Check file {copy.file.path} on node {self.node.name}", - ) + node_io._lfs.hsm_release(copy.path) + + # Only do this if another task isn't already restoring this file. + if copy.file.id not in self._restoring: + Task( + func=_async, + queue=self._queue, + key=self.node.name, + args=(self, copy), + name=f"Check file {copy.file.path} on node {self.node.name}", + ) + else: + log.debug(f"Skipping check of {copy.path}: restore in progress.") def check_active(self) -> bool: """Check that this is an active node. @@ -399,7 +460,7 @@ def open(self, path: os.PathLike | str, binary: bool = True) -> IO: ValueError: `path` was absolute. OSError: - The file is not recalled. + The file is not restored. """ if pathlib.PurePath(path).is_absolute(): raise ValueError("path must be relative to node.root") @@ -414,6 +475,8 @@ def open(self, path: os.PathLike | str, binary: bool = True) -> IO: def ready_path(self, path: os.PathLike) -> bool: """Recall the specified path so it can be read. + This should only be used on paths not already managed by alpenhorn. + Parameters ---------- path : path-like @@ -443,18 +506,51 @@ def ready_pull(self, req: ArchiveFileCopyRequest) -> None: the copy request to ready. We are the source node (i.e. `req.node_from == self.node`). """ - ready = self.ready_path(req.file.path) - # Update DB based on HSM state, if necessary - copy = ArchiveFileCopy.get(file=req.file, node=self.node) + # A small async to restore-and-wait for the file + def _async(task: Task, node_io: LustreHSMNodeIO, file_: ArchiveFile): + """Restore `file_` on `node` and wait for completion.""" - if copy.ready != ready: - copy.ready = ready - copy.last_update = datetime.utcnow() - copy.save() - log.info( - f"File copy {req.file.path} on node {self.node.name} now " - + ("restored" if ready else "released") + try: + copy = ArchiveFileCopy.get(node=node_io.node, file=file_) + except pw.DoesNotExist: + return + + restore_wait = node_io._restore_wait(copy) + while restore_wait: + # Wait for a bit + yield 60 + + # Now check again + restore_wait = node_io._restore_wait(copy) + + # Update copy based on result. restore_wait==False means the + # file was successfully restored. On error, we assume the + # file is not ready (i.e. not restored). + ready = True if restore_wait is False else False + + if copy.ready != ready: + copy.ready = ready + copy.last_update = datetime.utcnow() + copy.save() + log.info( + f"File copy {file_.path} on node {node_io.node.name} now " + + ("restored" if ready else "released") + ) + + # No need to do this more than once + if req.file.id not in self._restoring: + Task( + func=_async, + queue=self._queue, + key=self.node.name, + args=(self, req.file), + name=f"Ready file {req.file.path} on node {self.node.name}", + ) + else: + log.debug( + f"Skipping ready of {req.file.path} " + f"on node {self.node.name}: restore in progress." ) def release_bytes(self, size: int) -> None: diff --git a/tests/io/test_lustrehsmnode.py b/tests/io/test_lustrehsmnode.py index 10e07c9d9..9ded3fa33 100644 --- a/tests/io/test_lustrehsmnode.py +++ b/tests/io/test_lustrehsmnode.py @@ -279,6 +279,17 @@ def test_check_released(xfs, queue, mock_lfs, node): task() queue.task_done(key) + # Task is now deferred + assert queue.deferred_size == 1 + assert queue.qsize == 0 + + # Calling check again doesn't add another task + node.io.check(copy) + assert queue.qsize == 0 + + # Don't wait for the deferral to expire, just run the task again + task() + async_mock.assert_called_once() # File has been re-released @@ -312,6 +323,12 @@ def test_check_ready_released(xfs, queue, mock_lfs, node): task() queue.task_done(key) + # Task is now deferred + assert queue.deferred_size == 1 + + # Don't wait for the deferral to expire, just run the task again + task() + async_mock.assert_called_once() # File has been restored @@ -349,7 +366,7 @@ def test_ready_path(mock_lfs, node): "/node/simpleacq/file1": "restored", } ) -def test_ready_pull_restored(mock_lfs, node, archivefilecopyrequest): +def test_ready_pull_restored(mock_lfs, node, queue, archivefilecopyrequest): """Test LustreHSMNodeIO.ready_pull on a restored file that isn't ready.""" before = datetime.datetime.utcnow().replace(microsecond=0) @@ -363,6 +380,14 @@ def test_ready_pull_restored(mock_lfs, node, archivefilecopyrequest): node.io.ready_pull(afcr) + # Task in queue + assert queue.qsize == 1 + + # Run task + task, key = queue.get() + task() + queue.task_done(key) + # File is ready assert ArchiveFileCopy.get(id=1).ready assert ArchiveFileCopy.get(id=1).last_update >= before @@ -377,7 +402,7 @@ def test_ready_pull_restored(mock_lfs, node, archivefilecopyrequest): "/node/simpleacq/file1": "released", } ) -def test_ready_pull_released(mock_lfs, node, archivefilecopyrequest): +def test_ready_pull_released(mock_lfs, node, queue, archivefilecopyrequest): """Test LustreHSMNodeIO.ready on a released file that isn't ready.""" copy = ArchiveFileCopy.get(id=1) @@ -387,8 +412,27 @@ def test_ready_pull_released(mock_lfs, node, archivefilecopyrequest): node.io.ready_pull(afcr) - # File is not ready (because restore hasn't been verified) - assert not ArchiveFileCopy.get(id=1).ready + # Task in queue + assert queue.qsize == 1 + + # Run task + task, key = queue.get() + task() + queue.task_done(key) + + # Task is now deferred + assert queue.deferred_size == 1 + assert queue.qsize == 0 + + # Calling ready_pull again doesn't add another task + node.io.ready_pull(afcr) + assert queue.qsize == 0 + + # Don't wait for the deferral to expire, just run the task again + task() + + # File is now ready + assert ArchiveFileCopy.get(id=1).ready # File is restored lfs = mock_lfs("")