Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): Deal with 3.12+ naive UTC datetimes #208

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions alpenhorn/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@

# Prototypes
from ._base import EnumField, base_model

# Naive-UTC stuff courtesy peewee. These were originally in datetime
# but were deprecated in 3.12 as too confusing.
from peewee import utcnow as utcnow, utcfromtimestamp as utcfromtimestamp
2 changes: 1 addition & 1 deletion alpenhorn/db/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ArchiveFile(base_model):
md5sum = pw.CharField(null=True, max_length=32)
# Note: default here is the now method itself (i.e. "now", not "now()").
# Will be evaulated by peewee at row-creation time.
registered = pw.DateTimeField(default=datetime.datetime.utcnow)
registered = pw.DateTimeField(default=pw.utcnow)

class Meta:
# (acq,name) is unique
Expand Down
4 changes: 2 additions & 2 deletions alpenhorn/db/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ArchiveFileCopy(base_model):
wants_file = EnumField(["Y", "M", "N"], default="Y")
ready = pw.BooleanField(default=False)
size_b = pw.BigIntegerField(null=True)
last_update = pw.DateTimeField(default=datetime.datetime.utcnow)
last_update = pw.DateTimeField(default=pw.utcnow)

@property
def path(self) -> pathlib.Path:
Expand Down Expand Up @@ -91,7 +91,7 @@ class ArchiveFileCopyRequest(base_model):
node_from = pw.ForeignKeyField(StorageNode, backref="requests_from")
completed = pw.BooleanField(default=False)
cancelled = pw.BooleanField(default=False)
timestamp = pw.DateTimeField(default=datetime.datetime.utcnow, null=True)
timestamp = pw.DateTimeField(default=pw.utcnow, null=True)
transfer_started = pw.DateTimeField(null=True)
transfer_completed = pw.DateTimeField(null=True)

Expand Down
2 changes: 1 addition & 1 deletion alpenhorn/db/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def update_avail_gb(self, new_avail: int | None) -> None:
self.avail_gb = None
else:
self.avail_gb = new_avail / 2**30
self.avail_gb_last_checked = datetime.datetime.utcnow()
self.avail_gb_last_checked = pw.utcnow()

# Update the DB with the free space but don't clobber changes made
# manually to the database
Expand Down
9 changes: 4 additions & 5 deletions alpenhorn/io/_default_asyncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import shutil
import logging
import pathlib
from datetime import datetime

from . import ioutil
from ..db import ArchiveFileCopy, ArchiveFileCopyRequest
from ..db import ArchiveFileCopy, ArchiveFileCopyRequest, utcnow
from ..server.update import RemoteNode

if TYPE_CHECKING:
Expand Down Expand Up @@ -224,7 +223,7 @@ def check_async(task: Task, io: BaseNodeIO, copy: ArchiveFileCopy) -> None:
log.info(
f"Updating file copy #{copy.id} for file {copyname} on node {io.node.name}."
)
copy.last_update = datetime.utcnow()
copy.last_update = utcnow()
copy.save()


Expand Down Expand Up @@ -288,7 +287,7 @@ def delete_async(

# Update the DB
ArchiveFileCopy.update(
has_file="N", wants_file="N", last_update=datetime.utcnow()
has_file="N", wants_file="N", last_update=utcnow()
).where(ArchiveFileCopy.id == copy.id).execute()


Expand Down Expand Up @@ -341,7 +340,7 @@ def group_search_async(
has_file="M",
wants_file="Y",
ready=False,
last_update=datetime.utcnow(),
last_update=utcnow(),
)
.where(
ArchiveFileCopy.file == req.file,
Expand Down
17 changes: 9 additions & 8 deletions alpenhorn/io/ioutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import errno
import pathlib
import peewee as pw
from datetime import datetime
from tempfile import TemporaryDirectory

from .. import db
Expand All @@ -17,6 +16,8 @@
ArchiveFileCopyRequest,
StorageNode,
StorageTransferAction,
utcfromtimestamp,
utcnow,
)
from ..scheduler import threadlocal
from ..common import config, util
Expand Down Expand Up @@ -369,7 +370,7 @@ def post_add(node: StorageNode, file_: ArchiveFile) -> None:
StorageTransferAction.autoclean == True, # noqa: E712
):
count = (
ArchiveFileCopy.update(wants_file="N", last_update=datetime.utcnow())
ArchiveFileCopy.update(wants_file="N", last_update=utcnow())
.where(
ArchiveFileCopy.file == file_,
ArchiveFileCopy.node == edge.node_from,
Expand Down Expand Up @@ -427,7 +428,7 @@ def copy_request_done(
if check_src:
# If the copy didn't work, then the remote file may be corrupted.
log.error(f"Copy failed: {stderr}; Marking source file suspect.")
ArchiveFileCopy.update(has_file="M", last_update=datetime.utcnow()).where(
ArchiveFileCopy.update(has_file="M", last_update=utcnow()).where(
ArchiveFileCopy.file == req.file,
ArchiveFileCopy.node == req.node_from,
).execute()
Expand All @@ -448,7 +449,7 @@ def copy_request_done(
f"MD5 mismatch on node {io.node.name}; "
f"Marking source file {req.file.name} on node {req.node_from} suspect."
)
ArchiveFileCopy.update(has_file="M", last_update=datetime.utcnow()).where(
ArchiveFileCopy.update(has_file="M", last_update=utcnow()).where(
ArchiveFileCopy.file == req.file,
ArchiveFileCopy.node == req.node_from,
).execute()
Expand All @@ -474,24 +475,24 @@ def copy_request_done(
wants_file="Y",
ready=True,
size_b=size,
last_update=datetime.utcnow(),
last_update=utcnow(),
).execute()
except pw.IntegrityError:
ArchiveFileCopy.update(
has_file="Y",
wants_file="Y",
ready=True,
size_b=size,
last_update=datetime.utcnow(),
last_update=utcnow(),
).where(
ArchiveFileCopy.file == req.file, ArchiveFileCopy.node == io.node
).execute()

# Mark AFCR as completed
ArchiveFileCopyRequest.update(
completed=True,
transfer_started=datetime.utcfromtimestamp(start_time),
transfer_completed=datetime.utcfromtimestamp(end_time),
transfer_started=utcfromtimestamp(start_time),
transfer_completed=utcfromtimestamp(end_time),
).where(ArchiveFileCopyRequest.id == req.id).execute()

# Run post-add actions, if any
Expand Down
31 changes: 15 additions & 16 deletions alpenhorn/io/lustrehsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import logging
import pathlib
import peewee as pw
from datetime import datetime

from ..common.util import pretty_bytes, pretty_deltat
from ..db import ArchiveFileCopy
from ..db import ArchiveFileCopy, utcnow
from ..scheduler import Task
from ..server.querywalker import QueryWalker
from .base import BaseNodeRemote
Expand Down Expand Up @@ -249,9 +248,9 @@ def _async(task, node, lfs, headroom_needed):
)
lfs.hsm_release(copy.path)
# Update copy record immediately
ArchiveFileCopy.update(
ready=False, last_update=datetime.utcnow()
).where(ArchiveFileCopy.id == copy.id).execute()
ArchiveFileCopy.update(ready=False, last_update=utcnow()).where(
ArchiveFileCopy.id == copy.id
).execute()
total_files += 1
total_bytes += copy.file.size_b
if total_bytes >= headroom_needed:
Expand Down Expand Up @@ -337,20 +336,20 @@ def _async(task, node, lfs, copies):
f"File copy {copy.file.path} on node {node.name} is missing!"
)
ArchiveFileCopy.update(
has_file="N", ready=False, last_update=datetime.utcnow()
has_file="N", ready=False, last_update=utcnow()
).where(ArchiveFileCopy.id == copy.id).execute()
elif state == lfs.HSM_RELEASED:
if copy.ready:
log.info(f"Updating file copy {copy.file.path}: ready -> False")
ArchiveFileCopy.update(
ready=False, last_update=datetime.utcnow()
).where(ArchiveFileCopy.id == copy.id).execute()
ArchiveFileCopy.update(ready=False, last_update=utcnow()).where(
ArchiveFileCopy.id == copy.id
).execute()
else: # i.e. RESTORED or UNARCHIVED
if not copy.ready:
log.info(f"Updating file copy {copy.file.path}: ready -> True")
ArchiveFileCopy.update(
ready=True, last_update=datetime.utcnow()
).where(ArchiveFileCopy.id == copy.id).execute()
ArchiveFileCopy.update(ready=True, last_update=utcnow()).where(
ArchiveFileCopy.id == copy.id
).execute()

# Copies get checked in an async
Task(
Expand Down Expand Up @@ -405,9 +404,9 @@ def _async(task: Task, node_io: LustreHSMNodeIO, copy: ArchiveFileCopy) -> None:
"File copy missing during check: "
f"{copy.path}. Updating database."
)
ArchiveFileCopy.update(
has_file="N", last_update=datetime.utcnow()
).where(ArchiveFileCopy.id == copy.id).execute()
ArchiveFileCopy.update(has_file="N", last_update=utcnow()).where(
ArchiveFileCopy.id == copy.id
).execute()
return

# Trigger restore, if necessary
Expand Down Expand Up @@ -572,7 +571,7 @@ def _async(task: Task, node_io: LustreHSMNodeIO, file_: ArchiveFile):

if copy.ready != ready:
copy.ready = ready
copy.last_update = datetime.utcnow()
copy.last_update = utcnow()
copy.save()
log.info(
f"File copy {file_.path} on node {node_io.node.name} now "
Expand Down
7 changes: 3 additions & 4 deletions alpenhorn/server/auto_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
import logging
import pathlib
import peewee as pw
from datetime import datetime
from watchdog.events import FileSystemEventHandler

from .. import db
from ..common import config, extensions
from ..db import ArchiveAcq, ArchiveFile, ArchiveFileCopy
from ..db import ArchiveAcq, ArchiveFile, ArchiveFileCopy, utcnow
from ..io import ioutil
from ..scheduler import Task

Expand Down Expand Up @@ -163,7 +162,7 @@ def _import_file(task: Task, node: StorageNode, path: pathlib.PurePath) -> None:
copy.has_file = "M"
copy.wants_file = "Y"
copy.ready = True
copy.last_update = datetime.utcnow()
copy.last_update = utcnow()
copy.save()
log.warning(
f'Imported file "{path}" formerly present on node {node.name}! Marking suspect.'
Expand All @@ -177,7 +176,7 @@ def _import_file(task: Task, node: StorageNode, path: pathlib.PurePath) -> None:
wants_file="Y",
ready=True,
size_b=node.io.filesize(path, actual=True),
last_update=datetime.utcnow(),
last_update=utcnow(),
)
log.info(f'Registered file copy "{path}" on node "{node.name}".')

Expand Down
11 changes: 8 additions & 3 deletions alpenhorn/server/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
import time
import logging
import peewee as pw
from datetime import datetime

from ..common import config, util
from ..common.extensions import io_module
from ..db import ArchiveFileCopy, ArchiveFileCopyRequest, StorageNode, StorageGroup
from ..db import (
ArchiveFileCopy,
ArchiveFileCopyRequest,
StorageNode,
StorageGroup,
utcnow,
)
from ..scheduler import global_abort, WorkerPool, EmptyPool
from . import auto_import
from .querywalker import QueryWalker
Expand Down Expand Up @@ -337,7 +342,7 @@ def run_auto_verify(self) -> None:

# Mark file as needing check
copy.has_file = "M"
copy.last_update = datetime.utcnow()
copy.last_update = utcnow()
copy.save()

def update_idle(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ license = {file = "LICENSE"}
dependencies = [
"Click >= 6.0",
"concurrent-log-handler",
"peewee >= 3.16",
"peewee >= 3.17.1",
"PyYAML",
"tabulate",
"watchdog"
Expand Down
5 changes: 2 additions & 3 deletions tests/db/test_acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pytest
import pathlib
import datetime
import peewee as pw

from alpenhorn.db.acquisition import ArchiveAcq, ArchiveFile
Expand Down Expand Up @@ -39,9 +38,9 @@ def test_acq_model(archiveacq):

def test_file_model(archiveacq, archivefile):
acq1 = archiveacq(name="acq1")
before = datetime.datetime.utcnow().replace(microsecond=0)
before = pw.utcnow().replace(microsecond=0)
archivefile(name="min", acq=acq1)
after = datetime.datetime.utcnow()
after = pw.utcnow()
archivefile(
name="max",
acq=acq1,
Expand Down
6 changes: 2 additions & 4 deletions tests/db/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,9 @@ def test_archivefilecopyrequest_model(
"""Test ArchiveFileCopyRequest model"""
minnode = storagenode(name="min", group=simplegroup)
maxnode = storagenode(name="max", group=simplegroup)
before = (datetime.datetime.utcnow() - datetime.timedelta(seconds=1)).replace(
microsecond=0
)
before = (pw.utcnow() - datetime.timedelta(seconds=1)).replace(microsecond=0)
archivefilecopyrequest(file=simplefile, node_from=minnode, group_to=simplegroup)
after = datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
after = pw.utcnow() + datetime.timedelta(seconds=1)
archivefilecopyrequest(
file=simplefile,
node_from=maxnode,
Expand Down
5 changes: 2 additions & 3 deletions tests/db/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pytest
import pathlib
import datetime
import peewee as pw

from alpenhorn.db.storage import StorageGroup, StorageNode, StorageTransferAction
Expand Down Expand Up @@ -355,11 +354,11 @@ def test_update_avail_gb(simplenode):
assert simplenode.avail_gb is None

# Test a number
before = datetime.datetime.utcnow()
before = pw.utcnow()
simplenode.update_avail_gb(10000)
# Now the value is set
node = StorageNode.get(id=simplenode.id)
after = datetime.datetime.utcnow()
after = pw.utcnow()

assert node.avail_gb == 10000.0 / 2.0**30
assert node.avail_gb_last_checked >= before
Expand Down
4 changes: 2 additions & 2 deletions tests/io/test_ioutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def test_autoclean(
):
"""Test post_add running autoclean."""

before = datetime.datetime.utcnow() - datetime.timedelta(seconds=2)
before = pw.utcnow() - datetime.timedelta(seconds=2)

destnode = storagenode(name="dest", group=simplegroup)

Expand All @@ -370,7 +370,7 @@ def test_autoclean_state(
):
"""post_add autoclean only deletes copies with has_file=='Y'."""

then = datetime.datetime.utcnow() - datetime.timedelta(seconds=200)
then = pw.utcnow() - datetime.timedelta(seconds=200)

srcnode = storagenode(name="src", group=simplegroup)
storagetransferaction(node_from=srcnode, group_to=simplenode.group, autoclean=True)
Expand Down
Loading