Skip to content

Commit

Permalink
Coordinate backups for the same job between servers
Browse files Browse the repository at this point in the history
The server with the largest number of local revisions is the leader.
If the leader is offline another server will take over.

This assumes that all servers share the same rng and thus
schedule the backup at the same time. If this is not the case more
backups than necessary may be created (but not less).
  • Loading branch information
Johann Bahl committed Apr 2, 2024
1 parent 190749a commit bb12f85
Show file tree
Hide file tree
Showing 11 changed files with 683 additions and 101 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ repos:
- types-PyYAML==5.4.0
- types-setuptools
- types-tzlocal==4.2
- types-aiofiles==23.2
- types-aiofiles==23.2.0.20240311
repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.0.1'
3 changes: 3 additions & 0 deletions changelog.d/20240402_125207_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.
- Coordinate backups for the same job between servers
51 changes: 27 additions & 24 deletions src/backy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from json import JSONEncoder
from pathlib import Path
from typing import Any, List, Tuple
from typing import TYPE_CHECKING, Any, List, Tuple

from aiohttp import hdrs, web
from aiohttp.web_exceptions import (
Expand All @@ -18,12 +18,14 @@
from aiohttp.web_runner import AppRunner, TCPSite
from structlog.stdlib import BoundLogger

import backy.daemon
from backy.backup import Backup
from backy.revision import Revision
from backy.scheduler import Job
from backy.utils import generate_taskid

if TYPE_CHECKING:
from backy.daemon import BackyDaemon


class BackyJSONEncoder(JSONEncoder):
def default(self, o: Any) -> Any:
Expand All @@ -36,7 +38,7 @@ def default(self, o: Any) -> Any:


class BackyAPI:
daemon: "backy.daemon.BackyDaemon"
daemon: "BackyDaemon"
sites: dict[Tuple[str, int], TCPSite]
runner: AppRunner
tokens: dict
Expand Down Expand Up @@ -144,7 +146,9 @@ async def to_json(self, request: web.Request, handler):
else:
return web.json_response(resp, dumps=BackyJSONEncoder().encode)

async def get_status(self, request: web.Request) -> List[dict]:
async def get_status(
self, request: web.Request
) -> List["BackyDaemon.StatusDict"]:
filter = request.query.get("filter", None)
request["log"].info("get-status", filter=filter)
if filter:
Expand All @@ -160,7 +164,7 @@ async def get_jobs(self, request: web.Request) -> List[Job]:
return list(self.daemon.jobs.values())

async def get_job(self, request: web.Request) -> Job:
name = request.match_info.get("job_name", None)
name = request.match_info.get("job_name")
request["log"].info("get-job", name=name)
try:
return self.daemon.jobs[name]
Expand All @@ -176,40 +180,38 @@ async def run_job(self, request: web.Request):

async def list_backups(self, request: web.Request) -> List[str]:
request["log"].info("list-backups")
return await self.daemon.find_dead_backups()
return list(self.daemon.dead_backups.keys())

async def get_backup(self, request: web.Request) -> Backup:
name = request.match_info.get("backup_name", None)
async def get_backup(
self, request: web.Request, allow_active: bool
) -> Backup:
name = request.match_info.get("backup_name")
request["log"].info("get-backups", name=name)
if name in self.daemon.dead_backups:
return self.daemon.dead_backups[name]
if name in self.daemon.jobs:
if allow_active:
return self.daemon.jobs[name].backup
request["log"].info("get-backups-forbidden", name=name)
raise HTTPForbidden()
try:
path = Path(self.daemon.base_dir).joinpath(name).resolve()
if (
not path.exists()
or Path(self.daemon.base_dir).resolve() not in path.parents
):
raise FileNotFoundError
return Backup(path, request["log"])
except FileNotFoundError:
request["log"].info("get-backups-not-found", name=name)
raise HTTPNotFound()
request["log"].info("get-backups-not-found", name=name)
raise HTTPNotFound()

async def run_purge(self, request: web.Request):
backup = await self.get_backup(request)
backup = await self.get_backup(request, False)
request["log"].info("run-purge", name=backup.name)
backup.set_purge_pending()
raise HTTPAccepted()

async def touch_backup(self, request: web.Request):
backup = await self.get_backup(request)
backup = await self.get_backup(request, True)
request["log"].info("touch-backup", name=backup.name)
backup.touch()

async def get_revs(self, request: web.Request) -> List[Revision]:
backup = await self.get_backup(request)
backup = await self.get_backup(request, True)
request["log"].info("get-revs", name=backup.name)
backup.scan()
return backup.get_history(
local=True, clean=request.query.get("only_clean", "") == "1"
)
Expand All @@ -223,8 +225,8 @@ async def put_tags(self, request: web.Request):
request["log"].info("put-tags-bad-request")
raise HTTPBadRequest()
autoremove = request.query.get("autoremove", "") == "1"
spec = request.match_info.get("rev_spec", None)
backup = await self.get_backup(request)
spec = request.match_info.get("rev_spec")
backup = await self.get_backup(request, False)
request["log"].info(
"put-tags",
name=backup.name,
Expand All @@ -233,6 +235,7 @@ async def put_tags(self, request: web.Request):
spec=spec,
autoremove=autoremove,
)
backup.scan()
try:
if not backup.tags(
"set",
Expand Down
19 changes: 12 additions & 7 deletions src/backy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import sys
from asyncio import get_running_loop
from typing import List
from typing import TYPE_CHECKING, Dict, Iterator, List

import aiohttp
import humanize
Expand All @@ -16,32 +16,35 @@
from backy.revision import Revision
from backy.utils import format_datetime_local

if TYPE_CHECKING:
from backy.daemon import BackyDaemon


class APIClientManager:
connector: TCPConnector
peers: dict
peers: dict[str, dict]
clients: dict[str, "APIClient"]
taskid: str
log: BoundLogger

def __init__(self, peers, taskid, log):
def __init__(self, peers: Dict[str, dict], taskid: str, log: BoundLogger):
self.connector = TCPConnector()
self.peers = peers
self.clients = dict()
self.taskid = taskid
self.log = log.bind(subsystem="APIClientManager")

def __getitem__(self, name):
def __getitem__(self, name: str) -> "APIClient":
if name and name not in self.clients:
self.clients[name] = APIClient.from_conf(
name, self.peers[name], self.taskid, self.log, self.connector
)
return self.clients[name]

def __iter__(self):
def __iter__(self) -> Iterator[str]:
return iter(self.peers)

async def close(self):
async def close(self) -> None:
for c in self.clients.values():
await c.close()
await self.connector.close()
Expand Down Expand Up @@ -89,7 +92,9 @@ def from_conf(cls, server_name, conf, *args, **kwargs):
**kwargs,
)

async def fetch_status(self, filter: str = ""):
async def fetch_status(
self, filter: str = ""
) -> List["BackyDaemon.StatusDict"]:
async with self.session.get(
"/v1/status", params={"filter": filter}
) as response:
Expand Down
6 changes: 6 additions & 0 deletions src/backy/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import os
import random
import shutil
from unittest import mock
from zoneinfo import ZoneInfo
Expand Down Expand Up @@ -70,6 +71,11 @@ class Clock(object):
return clock


@pytest.fixture
def seed_random(monkeypatch):
random.seed(0)


@pytest.fixture
def schedule():
schedule = backy.schedule.Schedule()
Expand Down
53 changes: 42 additions & 11 deletions src/backy/daemon.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import asyncio
import datetime
import fcntl
import os
import os.path as p
import signal
import sys
import time
from typing import IO, List, Optional, Pattern
from typing import IO, List, Optional, Pattern, TypedDict

import aiofiles.os as aos
import aioshutil
import yaml
from structlog.stdlib import BoundLogger

from .api import BackyAPI
from .backup import Backup
from .revision import filter_manual_tags
from .schedule import Schedule
from .scheduler import Job
Expand All @@ -35,6 +37,7 @@ class BackyDaemon(object):
config: dict
schedules: dict[str, Schedule]
jobs: dict[str, Job]
dead_backups: dict[str, Backup]

backup_semaphores: dict[str, asyncio.BoundedSemaphore]
log: BoundLogger
Expand All @@ -50,6 +53,7 @@ def __init__(self, config_file, log):
self.schedules = {}
self.backup_semaphores = {}
self.jobs = {}
self.dead_backups = {}
self._lock = None
self.reload_api = asyncio.Event()
self.api_addrs = ["::1", "127.0.0.1"]
Expand Down Expand Up @@ -126,6 +130,21 @@ def _apply_config(self):
del self.jobs[name]
self.log.info("deleted-job", job_name=name)

self.dead_backups.clear()
for b in os.scandir(self.base_dir):
if b.name in self.jobs or not b.is_dir(follow_symlinks=False):
continue
try:
self.dead_backups[b.name] = Backup(
p.join(self.base_dir, b.name),
self.log.bind(job_name=b.name),
)
self.log.info("found-backup", job_name=b.name)
except Exception:
self.log.info(
"invalid-backup", job_name=b.name, exc_style="short"
)

if (
not self.backup_semaphores
or self.backup_semaphores["slow"]._bound_value != self.worker_limit
Expand Down Expand Up @@ -253,9 +272,26 @@ async def shutdown_loop(self):
self.log.info("stopping-loop")
self.loop.stop()

def status(self, filter_re: Optional[Pattern[str]] = None) -> List[dict]:
class StatusDict(TypedDict):
job: str
sla: str
sla_overdue: int
status: str
last_time: Optional[datetime.datetime]
last_tags: Optional[str]
last_duration: Optional[float]
next_time: Optional[datetime.datetime]
next_tags: Optional[str]
manual_tags: str
quarantine_reports: int
unsynced_revs: int
local_revs: int

def status(
self, filter_re: Optional[Pattern[str]] = None
) -> List[StatusDict]:
"""Collects status information for all jobs."""
result = []
result: List["BackyDaemon.StatusDict"] = []
for job in list(self.jobs.values()):
if filter_re and not filter_re.search(job.name):
continue
Expand Down Expand Up @@ -293,6 +329,9 @@ def status(self, filter_re: Optional[Pattern[str]] = None) -> List[dict]:
manual_tags=", ".join(manual_tags),
quarantine_reports=len(job.backup.quarantine.report_ids),
unsynced_revs=unsynced_revs,
local_revs=len(
job.backup.get_history(clean=True, local=True)
),
)
)
return result
Expand Down Expand Up @@ -336,14 +375,6 @@ async def purge_pending_backups(self):
self.log.exception("purge-pending")
await asyncio.sleep(24 * 60 * 60)

async def find_dead_backups(self) -> List[str]:
self.log.debug("scanning-backups")
return [
b.name
for b in await aos.scandir(self.base_dir)
if await is_dir_no_symlink(b.path) and b.name not in self.jobs
]


def main(config_file, log: BoundLogger): # pragma: no cover
global daemon
Expand Down
Loading

0 comments on commit bb12f85

Please sign in to comment.