Skip to content

Commit

Permalink
expire on remote server
Browse files Browse the repository at this point in the history
  • Loading branch information
Johann Bahl committed Jun 26, 2023
1 parent 18b75f1 commit 6715f93
Show file tree
Hide file tree
Showing 16 changed files with 946 additions and 91 deletions.
5 changes: 5 additions & 0 deletions changelog.d/20230626_005856_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- Support backup job migration across servers

- Add `tags {set, add, remove}` subcommand

- Add `expire` subcommand
80 changes: 80 additions & 0 deletions src/backy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ def __init__(self, daemon, log):
web.get("/v1/jobs", self.get_jobs),
# web.get("/v1/jobs/{job_name}", self.get_job),
web.post("/v1/jobs/{job_name}/run", self.run_job),
web.get("/v1/backups", self.list_backups),
# web.get("/v1/backups/{backup_name}", self.get_backup),
web.post("/v1/backups/{backup_name}/purge", self.run_purge),
web.post("/v1/backups/{backup_name}/touch", self.touch_backup),
web.get("/v1/backups/{backup_name}/revs", self.get_revs),
# web.get("/v1/backups/{backup_name}/revs/{rev_spec}", self.get_rev),
web.put(
"/v1/backups/{backup_name}/revs/{rev_spec}/tags",
self.put_tags,
),
]
)

Expand Down Expand Up @@ -154,3 +164,73 @@ async def run_job(self, request: web.Request):
j = await self.get_job(request)
j.run_immediately.set()
raise HTTPAccepted()

async def list_backups(self, request: web.Request):
backups = self.daemon.find_all_backups()
return [b for b in backups if b not in self.daemon.jobs]

async def get_backup(self, request: web.Request) -> Backup:
name = request.match_info.get("backup_name", None)
if not name:
raise HTTPNotFound()
if name in self.daemon.jobs:
raise HTTPForbidden()
try:
return Backup(p.join(self.daemon.base_dir, name), request["log"])
except FileNotFoundError:
raise HTTPNotFound()

async def run_purge(self, request: web.Request):
backup = await self.get_backup(request)
backup.set_purge_pending()
raise HTTPAccepted()

async def touch_backup(self, request: web.Request):
backup = await self.get_backup(request)
backup.touch()

async def get_revs(self, request: web.Request):
backup = await self.get_backup(request)
if request.query.get("only_clean", "") == "1":
revs = backup.clean_history
else:
revs = backup.history
return [r for r in revs if not r.location]

async def get_rev(self, request: web.Request) -> Revision:
spec = request.match_info.get("rev_spec", None)
backup = await self.get_backup(request)
try:
rev = backup.find(spec)
if rev.location:
raise HTTPNotFound()
return rev
except KeyError:
raise HTTPNotFound()

async def put_tags(self, request: web.Request):
json = await request.json()
if "old_tags" not in json:
raise HTTPPreconditionRequired()
old_tags = set(json["old_tags"])
if "new_tags" not in json:
raise HTTPBadRequest()
new_tags = set(json["new_tags"])

autoremove = request.query.get("autoremove", "") == "1"
spec = request.match_info.get("rev_spec", None)
backup = await self.get_backup(request)
try:
if not backup.tags(
"set",
spec,
new_tags,
old_tags,
autoremove=autoremove,
force=True,
):
raise HTTPPreconditionFailed()
except KeyError:
raise HTTPNotFound()
except BlockingIOError:
raise HTTPServiceUnavailable()
190 changes: 175 additions & 15 deletions src/backy/backup.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import asyncio
import fcntl
import glob
import os
import os.path as p
import time
from typing import IO, Type
from typing import IO, Literal, Optional, Type

import yaml
from aiohttp import ClientConnectorError, ClientError, ClientResponseError
from aiohttp.web_exceptions import (
HTTPForbidden,
HTTPNotFound,
HTTPPreconditionFailed,
)
from structlog.stdlib import BoundLogger

from backy.utils import min_date

from .backends import BackyBackend
from .backends.chunked import ChunkedFileBackend
from .backends.cowfile import COWFileBackend
from .client import APIClient, APIClientManager
from .nbd.server import Server
from .revision import Revision, Trust, filter_schedule_tags
from .schedule import Schedule
Expand Down Expand Up @@ -54,10 +62,10 @@ def locked_function(self, *args, **kw):
except BlockingIOError:
self.log.warning(
"lock-no-exclusive",
_fmt_msg="Failed to get exclusive lock for '{function}'. Continuing.",
_fmt_msg="Failed to get exclusive lock for '{function}'.",
function=f.__name__,
)
return
raise
else:
try:
return f(self, *args, **kw)
Expand Down Expand Up @@ -147,6 +155,13 @@ def __init__(self, path, log):
"Unsupported backend_type '{}'".format(self.backend_type)
)

@property
def name(self) -> str:
return p.basename(self.path)

def to_dict(self):
return self.config

def scan(self):
self.history = []
self._by_uuid = {}
Expand All @@ -161,11 +176,35 @@ def scan(self):
# The history is stored: oldest first. newest last.
self.history.sort(key=lambda r: r.timestamp)

def touch(self):
os.utime(self.path, None)

def set_purge_pending(self):
open(p.join(self.path, ".purge_pending"), "w").close()

def remove_purge_pending(self):
path = p.join(self.path, ".purge_pending")
if p.exists(path):
os.remove(path)

@property
def clean_history(self):
"""History without incomplete revisions."""
return [rev for rev in self.history if "duration" in rev.stats]

def validate_tags(self, tags):
missing_tags = (
filter_schedule_tags(tags) - self.schedule.schedule.keys()
)
if missing_tags:
self.log.error(
"unknown-tags",
_fmt_msg="The following tags are missing from the schedule: {unknown_tags}\n"
"Check the config file, add the `manual:` prefix or disable tag validation (-f)",
unknown_tags=", ".join(missing_tags),
)
raise RuntimeError("Unknown tags")

#################
# Making backups

Expand All @@ -184,21 +223,47 @@ def forget_revision(self, revision):
r = self.find(revision)
r.remove()

@locked(target=".backup", mode="exclusive")
def expire(self):
self.schedule.expire(self)

@locked(target=".backup", mode="exclusive")
def tags(
self,
action: Literal["set", "add", "remove"],
revision: str,
tags: set[str],
expect: Optional[set[str]] = None,
autoremove: bool = False,
force=False,
) -> bool:
self.scan()
r = self.find(revision)
if not force:
self.validate_tags(tags)
if expect is not None and expect != r.tags:
self.log.info("tags-expectation-failed")
return False
match action:
case "set":
r.tags = tags
case "add":
r.tags |= tags
case "remove":
r.tags -= tags
case _:
raise ValueError(f"invalid action '{action}'")
if not r.tags and autoremove:
r.remove()
else:
r.materialize()
return True

@locked(target=".backup", mode="exclusive")
@locked(target=".purge", mode="shared")
def backup(self, tags, force=False):
if not force:
missing_tags = (
filter_schedule_tags(tags) - self.schedule.schedule.keys()
)
if missing_tags:
self.log.error(
"unknown-tags",
_fmt_msg="The following tags are missing from the schedule: {unknown_tags}\n"
"Check the config file, add the `manual:` prefix or disable tag validation (-f)",
unknown_tags=", ".join(missing_tags),
)
raise RuntimeError("Unknown tags")
self.validate_tags(tags)

start = time.time()

Expand Down Expand Up @@ -281,6 +346,7 @@ def verify(self, revision=None):
def purge(self):
backend = self.backend_factory(self.history[0], self.log)
backend.purge()
self.remove_purge_pending()

#################
# Restoring
Expand Down Expand Up @@ -470,7 +536,7 @@ def find_by_uuid(self, spec):
except KeyError:
raise IndexError()

def find(self, spec):
def find(self, spec) -> Revision:
"""Flexible revision search.
Locates a revision by relative number, by tag, or by uuid.
Expand All @@ -486,3 +552,97 @@ def find(self, spec):
pass
self.log.warning("find-rev-not-found", spec=spec)
raise KeyError(spec)

###################
# Syncing Revisions

@locked(target=".backup", mode="exclusive")
async def push_metadata(self, peers):
self.log.info("push-metadata-start")
async with APIClientManager(peers, self.log) as apis:
peers_with_removals = set()
for r in self.history:
if not r.pending_changes:
continue

self.log.debug(
"push-metadata-updating-tags",
server=r.location,
rev_uuid=r.uuid,
old_tags=r.orig_tags,
new_tags=r.tags,
)
try:
await apis[r.location].put_tags(r, autoremove=True)
if not r.tags:
r.remove(force=True)
peers_with_removals.add(r.location)
except (ClientError, KeyError) as e:
if (
isinstance(e, ClientResponseError)
and e.status == HTTPPreconditionFailed.status_code
):
self.log.warning(
"push-metadata-unexpected-server-state",
expected_tags=r.orig_tags,
)
elif isinstance(e, ClientConnectorError):
self.log.debug("pull-metadata-connection-error")
else:
self.log.exception("push-metadata-error")
r.remove(force=True)

for s in peers_with_removals:
self.log.debug("push-metadata-purging-remote", server=s)
try:
await apis[s].run_purge(self.name)
except ClientError:
self.log.warning(
"push-metadata-remote-purge-error",
exc_info=True,
)
continue

@locked(target=".backup", mode="exclusive")
async def pull_metadata(self, peers: dict):
self.log.info("pull-metadata-start")
async with APIClientManager(peers, self.log) as apis:
await asyncio.gather(
*[self._pull_metadata(apis[server]) for server in apis]
)

async def _pull_metadata(self, api: APIClient):
log = self.log.bind(server=api.server_name)
try:
await api.touch_backup(self.name)
remote_revs = await api.get_revs(self)
except ClientError as e:
if isinstance(e, ClientResponseError) and e.status in (
HTTPNotFound.status_code,
HTTPForbidden.status_code,
):
log.debug("pull-metadata-not-found")
elif isinstance(e, ClientConnectorError):
log.debug("pull-metadata-connection-error")
else:
log.warning("pull-metadata-error", exc_info=True)
return
log.debug(
"pull-metadata-found-matching-server",
revs=len(remote_revs),
)

matching_uuids = {
r.uuid for r in self.history if r.location == api.server_name
}
remote_uuids = {r.uuid for r in remote_revs}
for uuid in matching_uuids - remote_uuids:
log.warning("pull-metadata-removing-unknown-rev", rev_uuid=uuid)
self.find_by_uuid(uuid).remove(force=True)

for r in remote_revs:
r.materialize()
log.debug(
"pull-metadata-updated-rev",
rev_uid=r.uuid,
)
Loading

0 comments on commit 6715f93

Please sign in to comment.