Skip to content

Commit

Permalink
split history into local/remote
Browse files Browse the repository at this point in the history
  • Loading branch information
Johann Bahl committed Feb 5, 2024
1 parent 4cf25cd commit 6ab69ac
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 38 deletions.
8 changes: 3 additions & 5 deletions src/backy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ async def touch_backup(self, request: web.Request):
async def get_revs(self, request: web.Request) -> List[Revision]:
backup = await self.get_backup(request)
request["log"].info("get-revs", name=backup.name)
if request.query.get("only_clean", "") == "1":
revs = backup.clean_history
else:
revs = backup.history
return [r for r in revs if not r.server]
return backup.get_history(
local=True, clean=request.query.get("only_clean", "") == "1"
)

async def put_tags(self, request: web.Request):
json = await request.json()
Expand Down
6 changes: 3 additions & 3 deletions src/backy/backends/chunked/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def open(self, mode="rb"):
def purge(self):
self.log.debug("purge")
self.store.users = []
for revision in self.backup.history:
for revision in self.backup.local_history:
try:
self.store.users.append(
self.backup.backend_factory(revision, self.log).open()
Expand All @@ -73,7 +73,7 @@ def verify(self):
verified_chunks = set()

# Load verified chunks to avoid duplicate work
for revision in self.backup.clean_history:
for revision in self.backup.get_history(clean=True, local=True):
if revision.trust != Trust.VERIFIED:
continue
f = self.backup.backend_factory(revision, log).open()
Expand Down Expand Up @@ -137,7 +137,7 @@ def scrub(self, backup, type):
def scrub_light(self, backup):
errors = 0
self.log.info("scrub-light")
for revision in backup.history:
for revision in backup.local_history:
self.log.info("scrub-light-rev", revision_uuid=revision.uuid)
backend = backup.backend_factory(revision, self.log).open()
for hash in backend._mapping.values():
Expand Down
3 changes: 3 additions & 0 deletions src/backy/backends/chunked/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def test_purge(simple_file_config, log):
f.write(b"asdf")
f.close()
r.materialize()
remote = Revision(b, log) # remote revision without local data
remote.server = "remote"
remote.materialize()
b.scan()
# Reassign as the scan will create a new reference
r = b.history[0]
Expand Down
42 changes: 32 additions & 10 deletions src/backy/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ def __init__(self, path, log):
# Initialize our backend
self.backend_type = self.config["source"].get("backend", None)
if self.backend_type is None:
if not self.history:
if not self.local_history:
# Start fresh backups with our new default.
self.backend_type = "chunked"
else:
# Choose to continue existing backups with whatever format
# they are in.
self.backend_type = self.history[-1].backend_type
self.backend_type = self.local_history[-1].backend_type

self.schedule = Schedule()
self.schedule.configure(self.config["schedule"])
Expand Down Expand Up @@ -213,14 +213,34 @@ def clear_purge_pending(self):
if p.exists(path):
os.remove(path)

def get_history(
self, *, clean: bool = False, local: bool = False
) -> list[Revision]:
return [
rev
for rev in self.history
if (not clean or "duration" in rev.stats)
and (not local or not rev.server)
]

@property
def clean_history(self):
"""History without incomplete revisions."""
return [rev for rev in self.history if "duration" in rev.stats]
return self.get_history(clean=True)

@property
def local_history(self):
"""History without incomplete revisions."""
return self.get_history(local=True)

@property
def contains_distrusted(self):
return any((r == Trust.DISTRUSTED for r in self.clean_history))
return any(
(
r == Trust.DISTRUSTED
for r in self.get_history(clean=True, local=True)
)
)

def validate_tags(self, tags):
missing_tags = (
Expand Down Expand Up @@ -264,7 +284,7 @@ def prevent_remote_rev(self, revs: Optional[List[Revision]] = None):
@locked(target=".backup", mode="exclusive")
def _clean(self):
"""Clean-up incomplete revisions."""
for revision in self.history:
for revision in self.local_history:
if "duration" not in revision.stats:
self.log.warning(
"clean-incomplete", revision_uuid=revision.uuid
Expand Down Expand Up @@ -373,7 +393,7 @@ def backup(self, tags: set[str], force=False):
# moving along automatically. This could also be moved into the
# scheduler.
self.scan()
for revision in reversed(self.clean_history):
for revision in reversed(self.get_history(clean=True, local=True)):
if revision.trust == Trust.DISTRUSTED:
self.log.warning("inconsistent")
backend = self.backend_factory(revision, self.log)
Expand All @@ -398,7 +418,7 @@ def verify(self, revision: str):

@locked(target=".purge", mode="exclusive")
def purge(self):
backend = self.backend_factory(self.history[0], self.log)
backend = self.backend_factory(self.local_history[0], self.log)
backend.purge()
self.clear_purge_pending()

Expand Down Expand Up @@ -514,7 +534,9 @@ def upgrade(self):
while True:
self.scan()
to_upgrade = [
r for r in self.clean_history if r.backend_type == "cowfile"
r
for r in self.get_history(clean=True, local=True)
if r.backend_type == "cowfile"
]
if not to_upgrade:
break
Expand Down Expand Up @@ -660,7 +682,7 @@ def find_revisions(
elif token == "all":
return self.history[:]
elif token == "clean":
return self.clean_history[:]
return self.clean_history
elif token == "local":
return self.find_revisions("server:")
elif token == "remote":
Expand Down Expand Up @@ -776,7 +798,7 @@ def find(self, spec: str) -> Revision:
@locked(target=".backup", mode="exclusive")
async def push_metadata(self, peers, taskid: str):
grouped = defaultdict(list)
for r in self.history:
for r in self.clean_history:
if r.pending_changes:
grouped[r.server].append(r)
self.log.info(
Expand Down
23 changes: 11 additions & 12 deletions src/backy/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,28 +262,27 @@ def status(self, filter_re: Optional[Pattern[str]] = None) -> List[dict]:
job.backup.scan()
manual_tags = set()
unsynced_revs = 0
if job.backup.clean_history:
last = job.backup.clean_history[-1]
for rev in job.backup.clean_history:
manual_tags |= filter_manual_tags(rev.tags)
if rev.pending_changes:
unsynced_revs += 1
else:
last = None
history = job.backup.clean_history
for rev in history:
manual_tags |= filter_manual_tags(rev.tags)
if rev.pending_changes:
unsynced_revs += 1
result.append(
dict(
job=job.name,
sla="OK" if job.sla else "TOO OLD",
sla_overdue=job.sla_overdue,
status=job.status,
last_time=last.timestamp if last else None,
last_time=history[-1].timestamp if history else None,
last_tags=(
",".join(job.schedule.sorted_tags(last.tags))
if last
",".join(job.schedule.sorted_tags(history[-1].tags))
if history
else None
),
last_duration=(
last.stats.get("duration", 0) if last else None
history[-1].stats.get("duration", 0)
if history
else None
),
next_time=job.next_time,
next_tags=(
Expand Down
2 changes: 2 additions & 0 deletions src/backy/revision.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ def get_parent(self) -> Optional["Revision"]:
"""defaults to last rev if not in history"""
prev = None
for r in self.backup.history:
if r.server != self.server:
continue
if r.uuid == self.uuid:
break
prev = r
Expand Down
6 changes: 3 additions & 3 deletions src/backy/sources/ceph/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ def _delete_old_snapshots(self):
# revision - which is wrong: broken new revisions would always cause
# full backups instead of new deltas based on the most recent valid
# one.
if not self.always_full and self.revision.backup.history:
keep_snapshot_revision = self.revision.backup.history[-1]
keep_snapshot_revision = keep_snapshot_revision.uuid
# XXX this will break if multiple servers are active
if not self.always_full and self.revision.backup.local_history:
keep_snapshot_revision = self.revision.backup.local_history[-1].uuid
else:
keep_snapshot_revision = None
for snapshot in self.rbd.snap_ls(self._image_name):
Expand Down
20 changes: 15 additions & 5 deletions src/backy/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def backup_with_revisions(backup, tmpdir):
timestamp: 2015-08-30 01:00:00+00:00
parent: 123-0
stats: {bytes_written: 1486880, duration: 3.7}
server: remote2
server: remote1
tags: [daily, weekly]
"""
)
Expand Down Expand Up @@ -58,10 +58,9 @@ def test_find_revision_empty(backup):
def test_load_revisions(backup_with_revisions):
a = backup_with_revisions
assert [x.uuid for x in a.history] == ["123-0", "123-1", "123-2"]
assert a.history[1].uuid == "123-1"
assert a.history[1].get_parent().uuid == "123-0"
assert a.history[2].get_parent().uuid == "123-1"
assert a.history[0].get_parent() is None
assert a.history[1].get_parent() is None
assert a.history[2].get_parent().uuid == "123-1"


def test_find_revisions(backup_with_revisions):
Expand Down Expand Up @@ -127,6 +126,7 @@ def test_find_revisions(backup_with_revisions):
]
assert a.find_revisions("server:aaaa") == []
assert a.find_revisions("server:remote1") == [
a.find("123-1"),
a.find("123-2"),
]
assert a.find_revisions("local") == [
Expand Down Expand Up @@ -173,8 +173,18 @@ def test_find_revision(backup_with_revisions):
assert a.find(" first( tag:monthly ) ").uuid == "123-0"


def test_clean_history_should_exclude_incomplete_revs(backup_with_revisions):
def test_get_history(backup_with_revisions):
assert 2 == len(backup_with_revisions.clean_history)
assert (
backup_with_revisions.clean_history
== backup_with_revisions.get_history(clean=True)
)
assert 1 == len(backup_with_revisions.local_history)
assert (
backup_with_revisions.local_history
== backup_with_revisions.get_history(local=True)
)
assert 1 == len(backup_with_revisions.get_history(clean=True, local=True))


def test_ignore_duplicates(backup_with_revisions, tmpdir):
Expand Down

0 comments on commit 6ab69ac

Please sign in to comment.