Skip to content

Commit

Permalink
use aiofiles
Browse files Browse the repository at this point in the history
  • Loading branch information
Johann Bahl committed Nov 16, 2023
1 parent fec1db2 commit d1a4c51
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 31 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ repos:
- types-PyYAML==5.4.0
- types-setuptools
- types-tzlocal==4.2
- types-aiofiles==23.2
repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.0.1'
26 changes: 25 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ tzlocal = "^5.0"
colorama = "^0.4.6"
aiohttp = "^3.8.4"
rich = "^13.3.2"
aiofiles = "^23.2.1"
aioshutil = "^1.3"

[tool.poetry.dev-dependencies]
pre-commit = "^3.3.3"
Expand Down
2 changes: 1 addition & 1 deletion src/backy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def run_job(self, request: web.Request):

async def list_backups(self, request: web.Request) -> List[str]:
request["log"].info("list-backups")
return self.daemon.find_dead_backups()
return await self.daemon.find_dead_backups()

async def get_backup(self, request: web.Request) -> Backup:
name = request.match_info.get("backup_name", None)
Expand Down
35 changes: 16 additions & 19 deletions src/backy/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
import fcntl
import os
import os.path as p
import shutil
import signal
import sys
import time
from typing import IO, List, Optional, Pattern

import aiofiles.os as aos
import aioshutil
import yaml
from structlog.stdlib import BoundLogger

from .api import BackyAPI
from .revision import filter_manual_tags
from .schedule import Schedule
from .scheduler import Job
from .utils import has_recent_changes
from .utils import has_recent_changes, is_dir_no_symlink

daemon: "BackyDaemon"

Expand Down Expand Up @@ -298,37 +299,33 @@ def status(self, filter_re: Optional[Pattern[str]] = None) -> List[dict]:
return result

async def purge_old_files(self):
# `stat` and other file system access things are _not_
# properly async, we might want to spawn those off into a separate
# thread.
while True:
try:
self.log.info("purge-scanning")
for candidate in os.scandir(self.base_dir):
if not candidate.is_dir(follow_symlinks=False):
for candidate in await aos.scandir(self.base_dir):
if not await is_dir_no_symlink(candidate.path):
continue
self.log.debug("purge-candidate", candidate=candidate.path)
reference_time = time.time() - 3 * 31 * 24 * 60 * 60
if not has_recent_changes(candidate, reference_time):
if not await has_recent_changes(
candidate.path, reference_time
):
self.log.info("purging", candidate=candidate.path)
shutil.rmtree(candidate)
await aioshutil.rmtree(candidate)
self.log.info("purge-finished")
except Exception:
self.log.exception("purge")
await asyncio.sleep(24 * 60 * 60)

async def purge_pending_backups(self):
# `stat` and other file system access things are _not_
# properly async, we might want to spawn those off into a separate
# thread.
while True:
try:
self.log.info("purge-pending-scanning")
for candidate in os.scandir(self.base_dir):
for candidate in await aos.scandir(self.base_dir):
if (
not candidate.is_dir(follow_symlinks=False)
or candidate.name in self.jobs # will get purged anyway
or not p.exists(
candidate.name in self.jobs # will get purged anyway
or not await is_dir_no_symlink(candidate.path)
or not await aos.path.exists(
p.join(candidate.path, ".purge_pending")
)
):
Expand All @@ -340,12 +337,12 @@ async def purge_pending_backups(self):
self.log.exception("purge-pending")
await asyncio.sleep(24 * 60 * 60)

def find_dead_backups(self) -> List[str]:
async def find_dead_backups(self) -> List[str]:
self.log.debug("scanning-backups")
return [
b.name
for b in os.scandir(self.base_dir)
if b.is_dir(follow_symlinks=False) and b.name not in self.jobs
for b in await aos.scandir(self.base_dir)
if await is_dir_no_symlink(b.path) and b.name not in self.jobs
]


Expand Down
10 changes: 8 additions & 2 deletions src/backy/tests/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


@pytest.fixture
async def daemon(tmpdir, event_loop, log):
async def daemon(tmpdir, event_loop, monkeypatch, log):
daemon = BackyDaemon(str(tmpdir / "config"), log)
base_dir = str(tmpdir)
source = str(tmpdir / "test01.source")
Expand Down Expand Up @@ -59,7 +59,13 @@ async def daemon(tmpdir, event_loop, log):

os.mkdir(tmpdir / "dead01")

daemon.start(event_loop)
async def null_coroutine():
return

with monkeypatch.context() as m:
m.setattr(daemon, "purge_old_files", null_coroutine)
m.setattr(daemon, "purge_pending_backups", null_coroutine)
daemon.start(event_loop)
yield daemon
daemon.terminate()

Expand Down
21 changes: 13 additions & 8 deletions src/backy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import IO, Callable
from zoneinfo import ZoneInfo

import aiofiles.os as aos
import humanize
import structlog
import tzlocal
Expand Down Expand Up @@ -428,27 +429,31 @@ def min_date():
return datetime.datetime.min.replace(tzinfo=ZoneInfo("UTC"))


def has_recent_changes(entry, reference_time):
async def is_dir_no_symlink(p):
return await aos.path.isdir(p) and not await aos.path.islink(p)


async def has_recent_changes(path: str, reference_time: float) -> bool:
# This is not efficient on a first look as we may stat things twice, but it
# makes the recursion easier to read and the VFS will be caching this
# anyway.
# However, I want to perform a breadth-first analysis as the theory is that
# higher levels will propagate changed mtimes do to new/deleted files
# instead of just modified files in our case and looking at stats when
# traversing a directory level is faster than going depth first.
st = entry.stat(follow_symlinks=False)
st = await aos.stat(path, follow_symlinks=False)
if st.st_mtime >= reference_time:
return True
if not entry.is_dir(follow_symlinks=False):
if not await is_dir_no_symlink(path):
return False
candidates = list(os.scandir(entry.path))
# First pass: stat all direct entries
for candidate in candidates:
if candidate.stat(follow_symlinks=False).st_mtime >= reference_time:
for candidate in await aos.scandir(path):
st = await aos.stat(candidate.path, follow_symlinks=False)
if st.st_mtime >= reference_time:
return True
# Second pass: start traversing
for candidate in os.scandir(entry.path):
if has_recent_changes(candidate, reference_time):
for candidate in await aos.scandir(path):
if await has_recent_changes(candidate.path, reference_time):
return True
return False

Expand Down

0 comments on commit d1a4c51

Please sign in to comment.