Skip to content

Commit

Permalink
fix(lfs): make HSM interactions more robust (#210)
Browse files Browse the repository at this point in the history
## All  `lfs` calls need a timeout
It seems pretty clear now that `lfs` invocation should always have a
timeout to guard against Lustre locking-up all our workers. So, now use
a 1-minute timeout by default in `run_lfs`.

Then, handle both `lfs quota` and `lfs hsm_state` not returning
successfully. In `StorageNode.update_avail_gb`, passing `None` now means
don't update `avail.gb` (but do still update the last check time). If
the current value of `avail_gb` is not null/None, a warning is issued.

For `hsm_state` failing, various places in `LustreHSM` where it was used
now handles not being able to determine the state.

Also removed an unnecessary `lfs quota` invocation from the idle update
`LustreHSMNodeIO.release_files()`

## Using `hsm_action` to probe HSM restores in-progress
I was reading through the lustre source code last night to see if I
could figure out what's up with `hsm_restore` timing out when files are
in the process of being restored.

I didn't figure that out because I discovered, instead, the `hsm_action`
command which tells you what HSM is currently doing to a file (e.g. is
it working on restoring it?).

So, there's another `HSMState`: `RESTORING` for files which HSM is in
the process of restoring, and alpenhorn will now use that to track
progress during restores. This removes the need for `_restore_retry`
(because alpenhorn no longer needs to guess as to whether a restore is
happening or not).

Closes #198 (because those constants are no longer present).

## Removing stats from LustreHSM
The idea here is: using `stat()` to check for file existance, while
avoiding making an external `lfs` call, causes trouble when `/nearline`
is acting up because the `stat()` will get a worker stuck in IO-wait,
while the `lfs` call can be abandonned (by subprocess timeout) meaning
the workers don't get stuck.

So, I've removed the stat from at the top of `lfs.hsm_state`. I
originally had it there because I thought that a cheap `stat` would save
us from an expensive `lfs` call. I've modified `lfs_run` to detect and
report missing files instead.

I've also re-implemented `LustreHSMNodeIO.exists` to use `lfs` instead
of stat-ting the filesystem.
  • Loading branch information
ketiltrout authored Nov 8, 2024
1 parent 790ec51 commit 546a464
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 157 deletions.
11 changes: 6 additions & 5 deletions alpenhorn/db/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,19 @@ def update_avail_gb(self, new_avail: int | None) -> None:
"""
# The value in the database is in GiB (2**30 bytes)
if new_avail is None:
self.avail_gb = None
# Warn unless we never knew the free space
if self.avail_gb is not None:
log.warning(f'Unable to determine available space for "{self.name}".')
else:
self.avail_gb = new_avail / 2**30

# Record check time, even if we failed to update
self.avail_gb_last_checked = pw.utcnow()

# Update the DB with the free space but don't clobber changes made
# manually to the database
# manually to the database.
self.save(only=[StorageNode.avail_gb, StorageNode.avail_gb_last_checked])

if new_avail is None:
log.info(f'Unable to determine available space for "{self.name}".')


class StorageTransferAction(base_model):
"""Storage transfer rules for the archive.
Expand Down
170 changes: 128 additions & 42 deletions alpenhorn/io/lfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
* `UNARCHIVED`: file exists on disk, but not on external storage
* `RESTORED`: file exists on both disk and external storage
* `RELEASED`: file exists in external storage only
* `lfs hsm_action`
retrieves the current action. We use this to detect this
`HSMState`:
* `RESTORING`: file exists in external storage only, but HSM
is in the process of restoring it
* `lfs hsm_restore`
requests the state change `RELEASED -> RESTORED`
* `lfs hsm_release`
Expand Down Expand Up @@ -55,21 +60,28 @@ class HSMState(Enum):
is the state of newly created files until they are archived.
HSMState.RELEASED:
The file is in external storage but not on disk.
HSMState.RESTORING:
The file is in external storage only, and HSM is in the process of
bringing it back to disk
HSMState.RESTORED:
The file is both in external storage and on disk.
A new file starts off in state UNARCHIVED. Initial archiving is beyond
our control, but once the file has been archived, it moves from state
UNARCHIVED to state RESTORED.
A `lfs.hsm_restore()` changes a file's state from RELEASED to RESTORED.
A `lfs.hsm_release()` changes a file's state from RESTORED to RELEASED.
A `lfs.hsm_restore()` requests a file's state change from RELEASED to RESTORED.
After this call, the files state changes to RESTORING until the restore is
complete.
A `lfs.hsm_release()` requests a file's state change from RESTORED to RELEASED.
"""

MISSING = 0
UNARCHIVED = 1
RESTORED = 2
RELEASED = 3
RESTORING = 3
RELEASED = 4


class LFS:
Expand All @@ -92,12 +104,16 @@ class LFS:
path : string, optional
If not None, the search path to use to look for the lfs(1)
commnad. If None, the "PATH" environmental variable is used.
timeout : int, optional
Timeout, in seconds, before abandonning a lfs(1) invocation.
Defaults to 60 seconds if not given.
"""

# Conveniences for clients
HSM_MISSING = HSMState.MISSING
HSM_UNARCHIVED = HSMState.UNARCHIVED
HSM_RESTORED = HSMState.RESTORED
HSM_RESTORING = HSMState.RESTORING
HSM_RELEASED = HSMState.RELEASED

def __init__(
Expand All @@ -106,53 +122,81 @@ def __init__(
fixed_quota: int | None = None,
lfs: str = "lfs",
path: str | None = None,
timeout: int | None = None,
) -> None:
self._quota_group = quota_group
self._fixed_quota = fixed_quota

if timeout is None:
self._timeout = 60
else:
if timeout <= 0:
raise ValueError("timeout must be positive.")
self._timeout = timeout

self._lfs = shutil.which(lfs, path=path)
if self._lfs is None:
raise RuntimeError("lfs command not found.")

def run_lfs(self, *args: str, timeout: float | None = None) -> str | False | None:
def run_lfs(self, *args: str) -> dict:
"""Run the lfs command with the `args` provided.
Parameters
----------
*args : strings
The list of command-line arguments to pass to the
lfs command.
timeout : float, optional
If not None, stop waiting for the command after `timeout` seconds
Retunrs
Returns
-------
output : str or False or None
If the command succeeded, returns standard output of
the command. If the command failed or timed out,
returns False (failed) or None (timed out) and logs
the failure.
result : dict
A dict is returned with the following keys:
- missing : bool
True if the file was missing; False otherwise
- timeout : bool
True if the command timed out; False otherwise
- failed : bool
True if the command failed; False otherwise
- output : str or None
If the command succeeded (i.e. all three booleans
are false), this has the standard output of
the command. Otherwise, this is None.
"""
result = {"missing": False, "timeout": False, "failed": False, "output": None}

# Stringify args
args = [str(arg) for arg in args]

ret, stdout, stderr = util.run_command([self._lfs] + args, timeout=timeout)
ret, stdout, stderr = util.run_command(
[self._lfs] + args, timeout=self._timeout
)

# Timeout
if ret is None:
log.warning(f"LFS command timed out: " + " ".join(args))
result["timeout"] = True
return result

if ret == 0:
# Success, return output
result["output"] = stdout
return result

# Failure or timeout
if ret is None or ret != 0:
if ret is None:
result = "timed out"
else:
result = f"failed (ret={ret})"
ret = False
log.warning(f"LFS command {result}: " + " ".join(args))
if stderr:
log.debug(f"LFS stderr: {stderr}")
if stdout:
log.debug(f"LFS stdout: {stdout}")
return ret
# Failure, look for a "No such file" remark in stderr
if stderr and "No such file or directory" in stderr:
log.debug(f"LFS missing file: " + " ".join(args))
result["missing"] = True
else:
# Otherwise, report failure
log.warning(f"LFS command failed: " + " ".join(args))
result["failed"] = True

return stdout
if stderr:
log.debug(f"LFS stderr: {stderr}")
if stdout:
log.debug(f"LFS stdout: {stdout}")
return result

def quota_remaining(self, path: str | os.PathLike) -> int | None:
"""Retrieve the remaining quota for `path`.
Expand Down Expand Up @@ -200,7 +244,7 @@ def quota_remaining(self, path: str | os.PathLike) -> int | None:
# Stringify path
path = str(path)

stdout = self.run_lfs("quota", "-q", "-g", self._quota_group, path)
stdout = self.run_lfs("quota", "-q", "-g", self._quota_group, path)["output"]
if stdout is None:
return None # Command returned error

Expand Down Expand Up @@ -246,6 +290,35 @@ def quota_remaining(self, path: str | os.PathLike) -> int | None:
# lfs quota reports values in kiByte blocks
return (quota_limit - quota) * 2**10

def hsm_restoring(self, path: os.PathLike | str) -> bool:
"""Is HSM processing a restore request?
Returns True if HSM is currently working on a restore
request for `path`, and False otherwise.
Runs `lfs hsm_action` to check the current HSM action
or the file.
Parameters
----------
path : path-like
The path to determine the state for.
Returns
-------
restoring : bool or None
True if a RESTORE is in progress. False otherwise.
None if the command failed.
"""

# Stringify path
path = str(path)

stdout = self.run_lfs("hsm_action", path)["output"]
if stdout is None:
return None # Command returned error
return "RESTORE" in stdout

def hsm_state(self, path: os.PathLike | str) -> HSMState:
"""Returns the HSM state of path.
Expand All @@ -261,16 +334,16 @@ def hsm_state(self, path: os.PathLike | str) -> HSMState:
failed. If `path` doesn't exist, this will be `HSMState.MISSING`.
"""

# No need to check with HSM if the path isn't present
if not pathlib.Path(path).exists():
return HSMState.MISSING

# Stringify path
path = str(path)

stdout = self.run_lfs("hsm_state", path)
if stdout is False:
result = self.run_lfs("hsm_state", path)
if result["failed"] or result["timeout"]:
return None # Command returned error
if result["missing"]:
return HSMState.MISSING

stdout = result["output"]

# The output of hsm_state looks like this:
#
Expand Down Expand Up @@ -301,18 +374,29 @@ def hsm_state(self, path: os.PathLike | str) -> HSMState:
# See llapi_hsm_state_get(3) for full details about these.
if "archived" not in stdout:
return HSMState.UNARCHIVED
if "released" in stdout:
return HSMState.RELEASED
return HSMState.RESTORED
if "released" not in stdout:
return HSMState.RESTORED

# File is released, so now run `hsm_action` to see if a restore is in
# progress
if self.hsm_restoring(path):
return HSMState.RESTORING

return HSMState.RELEASED

def hsm_archived(self, path: os.PathLike) -> bool:
"""Is `path` archived by HSM?"""
state = self.hsm_state(path)
return state == HSMState.RESTORED or state == HSMState.RELEASED
return (
state == HSMState.RESTORED
or state == HSMState.RESTORING
or state == HSMState.RELEASED
)

def hsm_released(self, path: os.PathLike) -> bool:
"""Is `path` released to external storage?"""
return self.hsm_state(path) == HSMState.RELEASED
state = self.hsm_state(path)
return state == HSMState.RELEASED or state == HSMState.RESTORING

def hsm_restore(self, path: os.PathLike) -> bool | None:
"""Trigger restore of `path` from external storage.
Expand Down Expand Up @@ -343,9 +427,11 @@ def hsm_restore(self, path: os.PathLike) -> bool | None:
if state != HSMState.RELEASED:
return True

result = self.run_lfs("hsm_restore", path, timeout=60)
if result is None or result is False:
return result
result = self.run_lfs("hsm_restore", path)
if result["missing"]:
return False
if result["timeout"] or result["failed"]:
return None
return True

def hsm_release(self, path: os.PathLike) -> bool:
Expand Down
Loading

0 comments on commit 546a464

Please sign in to comment.