Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DemoStorage: Add support for deleteObject (IExternalGC) #341

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@
5.7.1 (unreleased)
==================

- Introduce a new ``loadBeforeEx`` interface that complements ``loadBefore``:
``loadBeforeEx`` is simpler, provides better information for object delete
records and can be more efficiently implemented by many storages.
``loadBeforeEx`` is used (and required) to fix a ``DemoStorage`` data corruption
in the presence of object delete records.
See `issue 318 <https://github.com/zopefoundation/ZODB/issues/318>`_
and `PR 323 <https://github.com/zopefoundation/ZODB/pull/323>`_
for details.


5.7.0 (2022-03-17)
==================
Expand Down
9 changes: 8 additions & 1 deletion src/ZODB/BaseStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class BaseStorage(UndoLogCompatible):
If it stores multiple revisions, it should implement
loadSerial()
loadBefore()
loadBeforeEx()

Each storage will have two locks that are accessed via lock
acquire and release methods bound to the instance. (Yuck.)
Expand Down Expand Up @@ -269,7 +270,13 @@ def loadSerial(self, oid, serial):

def loadBefore(self, oid, tid):
"""Return most recent revision of oid before tid committed."""
return None
raise NotImplementedError

def loadBeforeEx(self, oid, tid):
"""Return most recent revision of oid before tid committed.
(see IStorageLoadBeforeEx).
"""
raise NotImplementedError

def copyTransactionsFrom(self, other, verbose=0):
"""Copy transactions from another storage.
Expand Down
2 changes: 1 addition & 1 deletion src/ZODB/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def open(self, transaction_manager=None, at=None, before=None):
- `before`: like `at`, but opens the readonly state before the
tid or datetime.
"""
# `at` is normalized to `before`, since we use storage.loadBefore
# `at` is normalized to `before`, since we use storage.loadBeforeEx
# as the underlying implementation of both.
before = getTID(at, before)
if (before is not None and
Expand Down
122 changes: 83 additions & 39 deletions src/ZODB/DemoStorage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) Zope Corporation and Contributors.
Expand Down Expand Up @@ -33,7 +34,7 @@
import zope.interface

from .ConflictResolution import ConflictResolvingStorage
from .utils import load_current, maxtid
from .utils import load_current, maxtid, p64, u64


@zope.interface.implementer(
Expand Down Expand Up @@ -107,11 +108,16 @@ def __init__(self, name=None, base=None, changes=None,
if close_changes_on_close is None:
close_changes_on_close = False
else:
self._temporary_changes = False
if ZODB.interfaces.IBlobStorage.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
if close_changes_on_close is None:
close_changes_on_close = True

if ZODB.interfaces.IExternalGC.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IExternalGC)
self.deleteObject = self._storeDelete

self.changes = changes
self.close_changes_on_close = close_changes_on_close

Expand Down Expand Up @@ -217,45 +223,44 @@ def __len__(self):
# still want load for old clients (e.g. zeo servers)
load = load_current

def loadBefore(self, oid, tid):
try:
result = self.changes.loadBefore(oid, tid)
except ZODB.POSException.POSKeyError:
# The oid isn't in the changes, so defer to base
return self.base.loadBefore(oid, tid)

if result is None:
# The oid *was* in the changes, but there aren't any
# earlier records. Maybe there are in the base.
try:
result = self.base.loadBefore(oid, tid)
except ZODB.POSException.POSKeyError:
# The oid isn't in the base, so None will be the right result
pass
def loadBeforeEx(self, oid, before):
data, serial = ZODB.utils.loadBeforeEx(self.changes, oid, before)
if (data is not None) or (serial != ZODB.utils.z64):
# object is present in changes either as data or deletion record.
return data, serial

# object is not present in changes at all - use base
return ZODB.utils.loadBeforeEx(self.base, oid, before)

def loadBefore(self, oid, before):
data, serial = self.loadBeforeEx(oid, before)

# find out next_serial.
# it is ok to use dumb/slow implementation since loadBefore should not
# be used and is provided only for backward compatibility.
next_serial = maxtid
while 1:
_, s = self.loadBeforeEx(oid, next_serial)
assert s >= serial
if s == serial:
# found - next_serial is serial of the next data record
break
next_serial = s

if next_serial == maxtid:
next_serial = None

# next_serial found -> return/raise what loadBefore users expect
if data is None:
if next_serial is None:
# object was never created
raise ZODB.POSException.POSKeyError(oid)
else:
if result and not result[-1]:
# The oid is current in the base. We need to find
# the end tid in the base by fining the first tid
# in the changes. Unfortunately, there isn't an
# api for this, so we have to walk back using
# loadBefore.

if tid == maxtid:
# Special case: we were looking for the
# current value. We won't find anything in
# changes, so we're done.
return result

end_tid = maxtid
t = self.changes.loadBefore(oid, end_tid)
while t:
end_tid = t[1]
t = self.changes.loadBefore(oid, end_tid)
result = result[:2] + (
end_tid if end_tid != maxtid else None,
)

return result
# object was deleted
return None

# regular data record
return data, serial, next_serial

def loadBlob(self, oid, serial):
try:
Expand Down Expand Up @@ -365,6 +370,45 @@ def store(self, oid, serial, data, version, transaction):
else:
self.changes.store(oid, serial, data, '', transaction)

# _storeDelete serves deleteObject when .changes implements IExternalGC.
def _storeDelete(self, oid, oldserial, transaction):
if transaction is not self._transaction:
raise ZODB.POSException.StorageTransactionError(self, transaction)

# oldserial ∈ changes -> changes.deleteObject
baseHead = self.base.lastTransaction()
if oldserial > baseHead:
self.changes.deleteObject(oid, oldserial, transaction)
return

# oldserial ∈ base -> find out it is indeed the latest there and then
# call changes.deleteObject(oldserial=z64)

changesHead = self.changes.lastTransaction()
_, serial = ZODB.utils.loadBeforeEx(self.changes, oid,
p64(u64(changesHead) + 1))
if serial != ZODB.utils.z64:
# object has data or deletion record in changes
raise ZODB.POSException.ConflictError(oid=oid,
serials=(serial, oldserial))

_, serial = ZODB.utils.loadBeforeEx(self.base, oid,
p64(u64(baseHead) + 1))
if serial != oldserial:
raise ZODB.POSException.ConflictError(oid=oid,
serials=(serial, oldserial))

# object has no data/deletion record in changes and its latest revision
# in base == oldserial. -> changes.deleteObject(oldserial=z64) +
# correct oldserial back on conflict.
try:
self.changes.deleteObject(oid, ZODB.utils.z64, transaction)
except ZODB.POSException.ConflictError as e:
assert len(e.serials) == 2
assert e.serials[1] == ZODB.utils.z64
e.serials = (e.serials[0], oldserial)
raise

def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
assert version == '', "versions aren't supported"
Expand Down
40 changes: 37 additions & 3 deletions src/ZODB/FileStorage/FileStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from ZODB.interfaces import IStorageIteration
from ZODB.interfaces import IStorageRestoreable
from ZODB.interfaces import IStorageUndoable
from ZODB.interfaces import IStorageLoadBeforeEx
from ZODB.POSException import ConflictError
from ZODB.POSException import MultipleUndoErrors
from ZODB.POSException import POSKeyError
Expand Down Expand Up @@ -144,6 +145,7 @@ def __init__(self, afile):
IStorageCurrentRecordIteration,
IExternalGC,
IStorage,
IStorageLoadBeforeEx,
)
class FileStorage(
FileStorageFormatter,
Expand Down Expand Up @@ -572,6 +574,37 @@ def loadSerial(self, oid, serial):
else:
return self._loadBack_impl(oid, h.back)[0]

def loadBeforeEx(self, oid, before):
"""loadBeforeEx implements IStorageLoadBeforeEx."""
with self._files.get() as _file:
try:
pos = self._lookup_pos(oid)
except POSKeyError:
# object does not exist
return None, z64

while 1:
h = self._read_data_header(pos, oid, _file)
if h.tid < before:
break
pos = h.prev
if not pos:
# object not yet created as of <before
return None, z64

# h is the most recent DataHeader with .tid < before
if h.plen:
# regular data record
return _file.read(h.plen), h.tid
elif h.back:
# backpointer
data, _, _, _ = self._loadBack_impl(oid, h.back,
fail=False, _file=_file)
return data, h.tid
else:
# deletion
return None, h.tid

def loadBefore(self, oid, tid):
with self._files.get() as _file:
pos = self._lookup_pos(oid)
Expand Down Expand Up @@ -637,9 +670,10 @@ def deleteObject(self, oid, oldserial, transaction):
with self._lock:
old = self._index_get(oid, 0)
if not old:
raise POSKeyError(oid)
h = self._read_data_header(old, oid)
committed_tid = h.tid
committed_tid = z64
else:
h = self._read_data_header(old, oid)
committed_tid = h.tid

if oldserial != committed_tid:
raise ConflictError(
Expand Down
17 changes: 17 additions & 0 deletions src/ZODB/MappingStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@zope.interface.implementer(
ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageLoadBeforeEx,
)
class MappingStorage(object):
"""In-memory storage implementation
Expand Down Expand Up @@ -149,6 +150,22 @@ def __len__(self):

load = ZODB.utils.load_current

# ZODB.interfaces.IStorageLoadBeforeEx
@ZODB.utils.locked(opened)
def loadBeforeEx(self, oid, before):
z64 = ZODB.utils.z64
tid_data = self._data.get(oid)
if not tid_data:
return None, z64
if before == z64:
return None, z64
at = ZODB.utils.p64(ZODB.utils.u64(before)-1)
tids_at = tid_data.keys(None, at)
if not tids_at:
return None, z64
serial = tids_at[-1]
return tid_data[serial], serial

# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def loadBefore(self, oid, tid):
Expand Down
6 changes: 2 additions & 4 deletions src/ZODB/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,9 @@ def undo(self, serial_id, transaction):
for oid in self.fshelper.getOIDsForSerial(serial_id):
# we want to find the serial id of the previous revision
# of this blob object.
load_result = self.loadBefore(oid, serial_id)

if load_result is None:
_, serial_before = utils.loadBeforeEx(self, oid, serial_id)

if serial_before == utils.z64:
# There was no previous revision of this blob
# object. The blob was created in the transaction
# represented by serial_id. We copy the blob data
Expand All @@ -889,7 +888,6 @@ def undo(self, serial_id, transaction):
# transaction implied by "serial_id". We copy the blob
# data to a new file that references the undo transaction
# in case a user wishes to undo this undo.
data, serial_before, serial_after = load_result
orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
with open(orig_fn, "rb") as orig:
Expand Down
4 changes: 2 additions & 2 deletions src/ZODB/historical_connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ development continues on a "development" head.

A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage
that supports ``loadBefore``.
that supports ``loadBeforeEx`` or ``loadBefore``.

We'll begin our example with a fairly standard set up. We

Expand Down Expand Up @@ -139,7 +139,7 @@ root.
0

In fact, ``at`` arguments are translated into ``before`` values because the
underlying mechanism is a storage's loadBefore method. When you look at a
underlying mechanism is a storage's loadBeforeEx method. When you look at a
connection's ``before`` attribute, it is normalized into a ``before`` serial,
no matter what you pass into ``db.open``.

Expand Down
23 changes: 23 additions & 0 deletions src/ZODB/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ def loadBefore(oid, tid):
- The transaction id of the following revision, if any, or None.

If the object id isn't in the storage, then POSKeyError is raised.

See also: IStorageLoadBeforeEx.loadBeforeEx .
"""

def loadSerial(oid, serial):
Expand Down Expand Up @@ -888,6 +890,27 @@ def prefetch(oids, tid):
"""


class IStorageLoadBeforeEx(Interface):

def loadBeforeEx(oid, before): # -> (data, serial)
"""Load object data as observed before given database state.

loadBeforeEx returns data for object with given object ID as observed
by most recent database transaction with ID < before. Two values are
returned:

- The data record,
- The transaction ID of the data record.

If the object does not exist, or is deleted as of requested database
state, loadBeforeEx returns data=None, and serial indicates transaction
ID of the most recent deletion done in transaction with ID < before, or
null tid if there is no such deletion.

Note: no POSKeyError is raised even if object id is not in the storage.
"""


class IMultiCommitStorage(IStorage):
"""A multi-commit storage can commit multiple transactions at once.

Expand Down
Loading