From 52affe38a583306a991600b89fbe1fb96ba12c6e Mon Sep 17 00:00:00 2001 From: ketiltrout Date: Fri, 8 Nov 2024 12:04:54 -0800 Subject: [PATCH] fix(_default_asyncs): Re-check DB before transfer search When the task queue is very deep, it's possible that multiple requests for the same pull can get queued. So the group search shouldn't be surprised if the file already exists when it starts. Closes #216 --- alpenhorn/io/_default_asyncs.py | 18 +++++++++++++++-- tests/io/test_defaultgroup.py | 36 ++++++++++++++++++++++++++++++--- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/alpenhorn/io/_default_asyncs.py b/alpenhorn/io/_default_asyncs.py index 344471e47..64371eb08 100644 --- a/alpenhorn/io/_default_asyncs.py +++ b/alpenhorn/io/_default_asyncs.py @@ -315,6 +315,20 @@ def group_search_async( The request we're fulfilling. """ + # Before doing anything re-check the DB for something + # in this group. The situation may have changed while this + # task was queued. + state = groupio.group.filecopy_state(req.file) + if state == "Y" or state == "M": + log.info( + "Cancelling pull request for " + f"{req.file.acq.name}/{req.file.name}: " + f"file already in group {groupio.group.name}." + ) + req.cancelled = 1 + req.save() + return + # Check whether an actual file exists on the target log.debug(f"req={req}") node = groupio.exists(req.file.path) @@ -322,12 +336,12 @@ def group_search_async( # file on disk: create/update the ArchiveFileCopy # to force a check next pass log.warning( - f"Skipping pull request for " + "Skipping pull request for " f"{req.file.acq.name}/{req.file.name}: " f"file already on disk in group {groupio.group.name}." ) log.info( - f"Requesting check of " + "Requesting check of " f"{req.file.acq.name}/{req.file.name} on node " f"{node.name}." ) diff --git a/tests/io/test_defaultgroup.py b/tests/io/test_defaultgroup.py index e1a17f9ba..e937d6c46 100644 --- a/tests/io/test_defaultgroup.py +++ b/tests/io/test_defaultgroup.py @@ -3,13 +3,13 @@ import pytest from unittest.mock import MagicMock, patch -from alpenhorn.db.archive import ArchiveFileCopy +from alpenhorn.db import ArchiveFileCopy, ArchiveFileCopyRequest from alpenhorn.io._default_asyncs import group_search_async from alpenhorn.server.update import UpdateableNode, UpdateableGroup @pytest.fixture -def groupnode(xfs, queue, storagegroup, storagenode): +def groupnode(xfs, dbtables, queue, storagegroup, storagenode): """Fixture setting up a default test group. Returns both the group and the node.""" @@ -112,8 +112,38 @@ def test_group_search_dispatch(groupnode, simplerequest, queue): mock.assert_called_once_with(simplerequest) +def test_group_search_in_db( + groupnode, simplefile, archivefilecopy, archivefilecopyrequest, queue, xfs +): + """Test group_search_async with record already in db.""" + + group, node = groupnode + + mock = MagicMock() + group.io.pull_force = mock + + # Create a file on the dest + xfs.create_file(f"{node.db.root}/{simplefile.path}") + + # Create a file copy record + archivefilecopy(file=simplefile, node=node.db, has_file="Y") + + # Create a copy request for the file. + # Source here doesn't matter + afcr = archivefilecopyrequest(file=simplefile, node_from=node.db, group_to=group.db) + + # Run the async. First argument is Task + group_search_async(None, group.io, afcr) + + # Check dispatch + mock.assert_not_called() + + # Verify that the request has been cancelled + assert ArchiveFileCopyRequest.get(id=afcr.id).cancelled == 1 + + def test_group_search_existing( - groupnode, simplefile, archivefilecopyrequest, queue, dbtables, xfs + groupnode, simplefile, archivefilecopyrequest, queue, xfs ): """Test group_search_async with existing file."""