Skip to content

Commit

Permalink
fix(_default_asyncs): Re-check DB before transfer search (#217)
Browse files Browse the repository at this point in the history
When the task queue is very deep, it's possible that multiple requests
for the same pull can get queued. So the group search shouldn't be
surprised if the file already exists when it starts.

Closes #216
  • Loading branch information
ketiltrout authored Nov 8, 2024
1 parent 546a464 commit 72da85d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
18 changes: 16 additions & 2 deletions alpenhorn/io/_default_asyncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,33 @@ def group_search_async(
The request we're fulfilling.
"""

# Before doing anything re-check the DB for something
# in this group. The situation may have changed while this
# task was queued.
state = groupio.group.filecopy_state(req.file)
if state == "Y" or state == "M":
log.info(
"Cancelling pull request for "
f"{req.file.acq.name}/{req.file.name}: "
f"file already in group {groupio.group.name}."
)
req.cancelled = 1
req.save()
return

# Check whether an actual file exists on the target
log.debug(f"req={req}")
node = groupio.exists(req.file.path)
if node is not None:
# file on disk: create/update the ArchiveFileCopy
# to force a check next pass
log.warning(
f"Skipping pull request for "
"Skipping pull request for "
f"{req.file.acq.name}/{req.file.name}: "
f"file already on disk in group {groupio.group.name}."
)
log.info(
f"Requesting check of "
"Requesting check of "
f"{req.file.acq.name}/{req.file.name} on node "
f"{node.name}."
)
Expand Down
36 changes: 33 additions & 3 deletions tests/io/test_defaultgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import pytest
from unittest.mock import MagicMock, patch

from alpenhorn.db.archive import ArchiveFileCopy
from alpenhorn.db import ArchiveFileCopy, ArchiveFileCopyRequest
from alpenhorn.io._default_asyncs import group_search_async
from alpenhorn.server.update import UpdateableNode, UpdateableGroup


@pytest.fixture
def groupnode(xfs, queue, storagegroup, storagenode):
def groupnode(xfs, dbtables, queue, storagegroup, storagenode):
"""Fixture setting up a default test group.
Returns both the group and the node."""
Expand Down Expand Up @@ -112,8 +112,38 @@ def test_group_search_dispatch(groupnode, simplerequest, queue):
mock.assert_called_once_with(simplerequest)


def test_group_search_in_db(
groupnode, simplefile, archivefilecopy, archivefilecopyrequest, queue, xfs
):
"""Test group_search_async with record already in db."""

group, node = groupnode

mock = MagicMock()
group.io.pull_force = mock

# Create a file on the dest
xfs.create_file(f"{node.db.root}/{simplefile.path}")

# Create a file copy record
archivefilecopy(file=simplefile, node=node.db, has_file="Y")

# Create a copy request for the file.
# Source here doesn't matter
afcr = archivefilecopyrequest(file=simplefile, node_from=node.db, group_to=group.db)

# Run the async. First argument is Task
group_search_async(None, group.io, afcr)

# Check dispatch
mock.assert_not_called()

# Verify that the request has been cancelled
assert ArchiveFileCopyRequest.get(id=afcr.id).cancelled == 1


def test_group_search_existing(
groupnode, simplefile, archivefilecopyrequest, queue, dbtables, xfs
groupnode, simplefile, archivefilecopyrequest, queue, xfs
):
"""Test group_search_async with existing file."""

Expand Down

0 comments on commit 72da85d

Please sign in to comment.