Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(_default_asyncs): Re-check DB before transfer search #217

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions alpenhorn/io/_default_asyncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,33 @@ def group_search_async(
The request we're fulfilling.
"""

# Before doing anything re-check the DB for something
# in this group. The situation may have changed while this
# task was queued.
state = groupio.group.filecopy_state(req.file)
if state == "Y" or state == "M":
log.info(
"Cancelling pull request for "
f"{req.file.acq.name}/{req.file.name}: "
f"file already in group {groupio.group.name}."
)
req.cancelled = 1
req.save()
return

# Check whether an actual file exists on the target
log.debug(f"req={req}")
node = groupio.exists(req.file.path)
if node is not None:
# file on disk: create/update the ArchiveFileCopy
# to force a check next pass
log.warning(
f"Skipping pull request for "
"Skipping pull request for "
f"{req.file.acq.name}/{req.file.name}: "
f"file already on disk in group {groupio.group.name}."
)
log.info(
f"Requesting check of "
"Requesting check of "
f"{req.file.acq.name}/{req.file.name} on node "
f"{node.name}."
)
Expand Down
36 changes: 33 additions & 3 deletions tests/io/test_defaultgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import pytest
from unittest.mock import MagicMock, patch

from alpenhorn.db.archive import ArchiveFileCopy
from alpenhorn.db import ArchiveFileCopy, ArchiveFileCopyRequest
from alpenhorn.io._default_asyncs import group_search_async
from alpenhorn.server.update import UpdateableNode, UpdateableGroup


@pytest.fixture
def groupnode(xfs, queue, storagegroup, storagenode):
def groupnode(xfs, dbtables, queue, storagegroup, storagenode):
"""Fixture setting up a default test group.

Returns both the group and the node."""
Expand Down Expand Up @@ -112,8 +112,38 @@ def test_group_search_dispatch(groupnode, simplerequest, queue):
mock.assert_called_once_with(simplerequest)


def test_group_search_in_db(
groupnode, simplefile, archivefilecopy, archivefilecopyrequest, queue, xfs
):
"""Test group_search_async with record already in db."""

group, node = groupnode

mock = MagicMock()
group.io.pull_force = mock

# Create a file on the dest
xfs.create_file(f"{node.db.root}/{simplefile.path}")

# Create a file copy record
archivefilecopy(file=simplefile, node=node.db, has_file="Y")

# Create a copy request for the file.
# Source here doesn't matter
afcr = archivefilecopyrequest(file=simplefile, node_from=node.db, group_to=group.db)

# Run the async. First argument is Task
group_search_async(None, group.io, afcr)

# Check dispatch
mock.assert_not_called()

# Verify that the request has been cancelled
assert ArchiveFileCopyRequest.get(id=afcr.id).cancelled == 1


def test_group_search_existing(
groupnode, simplefile, archivefilecopyrequest, queue, dbtables, xfs
groupnode, simplefile, archivefilecopyrequest, queue, xfs
):
"""Test group_search_async with existing file."""

Expand Down