Skip to content

Commit

Permalink
fix(LustreHSM): Move ready_pull I/O out of main thread (#187)
Browse files Browse the repository at this point in the history
This is the second part of the fix for #183 which fixes how the
`ready_pull` function works.  All I/O for `ready_pull` is now
in a task and out of the main thread.

In the previous PR I added some slap-dash "race condition insurance"
which I've removed here and replaced with a more robust system to
restore-and-wait for a file which provides a way to track files
already being waited on for restore.  (Which was otherwise impossible
to do because yielded tasks are essentially hidden from the system
while they're deferred.)

Now `check`s and `ready_pull`s won't be re-queued if they're already
waiting for a file to be restored.

Closes #183
  • Loading branch information
ketiltrout authored Jul 3, 2024
1 parent dac1d4f commit 4670ec2
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 51 deletions.
190 changes: 143 additions & 47 deletions alpenhorn/io/lustrehsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, IO

import time
import logging
import pathlib
import peewee as pw
Expand All @@ -23,7 +24,7 @@
from ..archive import ArchiveFileCopy
from ..querywalker import QueryWalker
from ..task import Task
from ..util import pretty_bytes
from ..util import pretty_bytes, pretty_deltat
from .base import BaseGroupIO, BaseNodeRemote
from .lustrequota import LustreQuotaNodeIO

Expand Down Expand Up @@ -94,13 +95,80 @@ def __init__(
# QueryWalker for the HSM state check
self._release_qw = None

# Tracks files we're in the process of retrieving, so we can avoid
# waiting for the same file twice. The elements in this set are
# `ArchiveFile.id`s
self._restoring = set()

# For informational purposes. Keys are elements in self._restoring.
self._restore_start = dict()

# For idle-time HSM state updates
self._nrelease = config.get("release_check_count", 100)
if self._nrelease < 1:
raise ValueError(
f"io_config key 'release_check_count' non-positive (={self._nrelease})"
)

def _restore_wait(self, copy: ArchiveFileCopy) -> bool | None:
"""Attempt to restore a file from HSM.
The caller should call this method repeatedly while it returns `True`,
indicating the file has not yet been restored, with a suitable wait
between calls.
Parameters
----------
copy : ArchiveFileCopy
The file copy to restore
Returns
-------
result, one of:
* True: file is not restored. Caller should wait and try again.
* False: file is restored. Caller may now access the restored file.
* None: an error occurred. Caller should stop trying to access the file.
"""

# What's the current situation?
state = self._lfs.hsm_state(copy.path)

if state == self._lfs.HSM_MISSING:
log.warning(f"Unable to restore {copy.path}: missing.")
self._restore_start.pop(copy.file.id, None)
self._restoring.discard(copy.file.id)
return None

if state != self._lfs.HSM_RELEASED:
# i.e. file is restored or unarchived, so we're done.
if copy.file.id not in self._restoring:
log.debug(f"Already restored: {copy.path}")
else:
deltat = time.monotonic() - self._restore_start[copy.file.id]
log.info(
f"{copy.file.path} restored on node {self.node.name}"
f"after {pretty_deltat(deltat)}"
)
self._restore_start.pop(copy.file.id, None)
self._restoring.discard(copy.file.id)
return False

# If we got here, copy is released.

# Add this copy to the list of copies we're waiting on. Other tasks
# can use this to see if the file they're interested in is already
# "in use".
if copy.file.id not in self._restoring:
self._restoring.add(copy.file.id)
self._restore_start[copy.file.id] = time.monotonic()

# Restore it. We deliberately do this every time, to hedge against
# our initial request being forgotten/ignored by HSM.
self._lfs.hsm_restore(copy.path)

# Tell the caller to wait
return True

def release_files(self) -> None:
"""Release files from the HSM disk to keep the headroom clear.
Expand Down Expand Up @@ -275,19 +343,15 @@ def check(self, copy: ArchiveFileCopy) -> None:
the file copy to auto-verify
"""

def _async(
task: Task, node: UpdateableNode, lfs: LFS, copy: ArchiveFileCopy
) -> None:
def _async(task: Task, node_io: LustreHSMNodeIO, copy: ArchiveFileCopy) -> None:
"""Verify a file (with restore and release, if necessary).
Parameters
----------
task : Task
The task instance containing this async.
node : UpdateableNode
node_io : LustreHSMNodeIO
The node we're running on
lfs : LFS
The lfs(1) wrapper
copy : ArchiveFileCopy
The file copy to check
"""
Expand All @@ -306,42 +370,39 @@ def _async(
return

# Trigger restore, if necessary
if lfs.hsm_released(copy.path):
lfs.hsm_restore(copy.path)

# While the file is not restored, yield to wait for later
while lfs.hsm_released(copy.path):
# Has someone else come by while we've been waiting and
# checked the file?
if copy.has_file != "M":
log.debug(
f"File copy {copy.path} no longer needs "
f"check (has_file={copy.has_file})"
)

# Assume whatever did the other check has re-released
# the file if necessary
return

# Wait for a bit
yield 60
restore_wait = node_io._restore_wait(copy)
while restore_wait:
# Wait for a bit
yield 60

# Now check again
restore_wait = node_io._restore_wait(copy)

# If the restore failed, bail.
if restore_wait is None:
log.debug(f"Aborting check of {copy.path} after failed restore.")
return

# Do the check by inlining the Default-I/O function
from ._default_asyncs import check_async

check_async(task, node, copy)
check_async(task, node_io.node, copy)

# Release the file if the DB says it should be
if not ArchiveFileCopy.get(id=copy.id).ready:
lfs.hsm_release(copy.path)

Task(
func=_async,
queue=self._queue,
key=self.node.name,
args=(self.node, self._lfs, copy),
name=f"Check file {copy.file.path} on node {self.node.name}",
)
node_io._lfs.hsm_release(copy.path)

# Only do this if another task isn't already restoring this file.
if copy.file.id not in self._restoring:
Task(
func=_async,
queue=self._queue,
key=self.node.name,
args=(self, copy),
name=f"Check file {copy.file.path} on node {self.node.name}",
)
else:
log.debug(f"Skipping check of {copy.path}: restore in progress.")

def check_active(self) -> bool:
"""Check that this is an active node.
Expand Down Expand Up @@ -399,7 +460,7 @@ def open(self, path: os.PathLike | str, binary: bool = True) -> IO:
ValueError:
`path` was absolute.
OSError:
The file is not recalled.
The file is not restored.
"""
if pathlib.PurePath(path).is_absolute():
raise ValueError("path must be relative to node.root")
Expand All @@ -414,6 +475,8 @@ def open(self, path: os.PathLike | str, binary: bool = True) -> IO:
def ready_path(self, path: os.PathLike) -> bool:
"""Recall the specified path so it can be read.
This should only be used on paths not already managed by alpenhorn.
Parameters
----------
path : path-like
Expand Down Expand Up @@ -443,18 +506,51 @@ def ready_pull(self, req: ArchiveFileCopyRequest) -> None:
the copy request to ready. We are the source node (i.e.
`req.node_from == self.node`).
"""
ready = self.ready_path(req.file.path)

# Update DB based on HSM state, if necessary
copy = ArchiveFileCopy.get(file=req.file, node=self.node)
# A small async to restore-and-wait for the file
def _async(task: Task, node_io: LustreHSMNodeIO, file_: ArchiveFile):
"""Restore `file_` on `node` and wait for completion."""

if copy.ready != ready:
copy.ready = ready
copy.last_update = datetime.utcnow()
copy.save()
log.info(
f"File copy {req.file.path} on node {self.node.name} now "
+ ("restored" if ready else "released")
try:
copy = ArchiveFileCopy.get(node=node_io.node, file=file_)
except pw.DoesNotExist:
return

restore_wait = node_io._restore_wait(copy)
while restore_wait:
# Wait for a bit
yield 60

# Now check again
restore_wait = node_io._restore_wait(copy)

# Update copy based on result. restore_wait==False means the
# file was successfully restored. On error, we assume the
# file is not ready (i.e. not restored).
ready = True if restore_wait is False else False

if copy.ready != ready:
copy.ready = ready
copy.last_update = datetime.utcnow()
copy.save()
log.info(
f"File copy {file_.path} on node {node_io.node.name} now "
+ ("restored" if ready else "released")
)

# No need to do this more than once
if req.file.id not in self._restoring:
Task(
func=_async,
queue=self._queue,
key=self.node.name,
args=(self, req.file),
name=f"Ready file {req.file.path} on node {self.node.name}",
)
else:
log.debug(
f"Skipping ready of {req.file.path} "
f"on node {self.node.name}: restore in progress."
)

def release_bytes(self, size: int) -> None:
Expand Down
52 changes: 48 additions & 4 deletions tests/io/test_lustrehsmnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,17 @@ def test_check_released(xfs, queue, mock_lfs, node):
task()
queue.task_done(key)

# Task is now deferred
assert queue.deferred_size == 1
assert queue.qsize == 0

# Calling check again doesn't add another task
node.io.check(copy)
assert queue.qsize == 0

# Don't wait for the deferral to expire, just run the task again
task()

async_mock.assert_called_once()

# File has been re-released
Expand Down Expand Up @@ -312,6 +323,12 @@ def test_check_ready_released(xfs, queue, mock_lfs, node):
task()
queue.task_done(key)

# Task is now deferred
assert queue.deferred_size == 1

# Don't wait for the deferral to expire, just run the task again
task()

async_mock.assert_called_once()

# File has been restored
Expand Down Expand Up @@ -349,7 +366,7 @@ def test_ready_path(mock_lfs, node):
"/node/simpleacq/file1": "restored",
}
)
def test_ready_pull_restored(mock_lfs, node, archivefilecopyrequest):
def test_ready_pull_restored(mock_lfs, node, queue, archivefilecopyrequest):
"""Test LustreHSMNodeIO.ready_pull on a restored file that isn't ready."""

before = datetime.datetime.utcnow().replace(microsecond=0)
Expand All @@ -363,6 +380,14 @@ def test_ready_pull_restored(mock_lfs, node, archivefilecopyrequest):

node.io.ready_pull(afcr)

# Task in queue
assert queue.qsize == 1

# Run task
task, key = queue.get()
task()
queue.task_done(key)

# File is ready
assert ArchiveFileCopy.get(id=1).ready
assert ArchiveFileCopy.get(id=1).last_update >= before
Expand All @@ -377,7 +402,7 @@ def test_ready_pull_restored(mock_lfs, node, archivefilecopyrequest):
"/node/simpleacq/file1": "released",
}
)
def test_ready_pull_released(mock_lfs, node, archivefilecopyrequest):
def test_ready_pull_released(mock_lfs, node, queue, archivefilecopyrequest):
"""Test LustreHSMNodeIO.ready on a released file that isn't ready."""

copy = ArchiveFileCopy.get(id=1)
Expand All @@ -387,8 +412,27 @@ def test_ready_pull_released(mock_lfs, node, archivefilecopyrequest):

node.io.ready_pull(afcr)

# File is not ready (because restore hasn't been verified)
assert not ArchiveFileCopy.get(id=1).ready
# Task in queue
assert queue.qsize == 1

# Run task
task, key = queue.get()
task()
queue.task_done(key)

# Task is now deferred
assert queue.deferred_size == 1
assert queue.qsize == 0

# Calling ready_pull again doesn't add another task
node.io.ready_pull(afcr)
assert queue.qsize == 0

# Don't wait for the deferral to expire, just run the task again
task()

# File is now ready
assert ArchiveFileCopy.get(id=1).ready

# File is restored
lfs = mock_lfs("")
Expand Down

0 comments on commit 4670ec2

Please sign in to comment.