Skip to content

Commit

Permalink
preparation work for lock testing
Browse files Browse the repository at this point in the history
- changed upload/download format for saving one server round trip
- added sector/device characteristics to prepare for recovery testing
- added md5 page centric dump to make corruption visible in debug
  • Loading branch information
LorenzoBoccaccia committed Apr 30, 2024
1 parent 213dd1a commit 06f077c
Showing 1 changed file with 76 additions and 21 deletions.
97 changes: 76 additions & 21 deletions sqlite_s3vfs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import uuid
import io
import apsw
import hashlib
from math import log2
from boto3.s3.transfer import TransferConfig


class S3VFS(apsw.VFS):
Expand Down Expand Up @@ -91,7 +95,27 @@ def block_bytes_iter():
for block, block_bytes in enumerate(block_bytes_iter()):
self._bucket.Object(f'{key_prefix}/{block:010d}').put(Body=block_bytes)


def _list_files(self):
files = set()
for obj in self._bucket.objects.filter(Prefix=''):
#get the first path component and put to set first component / second component
files.add(obj.key.split('/')[0] + '/' + obj.key.split('/')[1])
return list(files)

def _storage_diagnostic(self, page_size, block_size):
report = {}
for file in self._list_files():
report[file] = []
serialized = self.serialize_fileobj(file).read()
#split the serialized byte array into chunks of page_size length
chunks = [serialized[i:i+page_size] for i in range(0, len(serialized), page_size)]
#for each chunk calculate the md5sum
for chunk in chunks:
#calculate md5sum
md5sum = hashlib.md5(chunk).hexdigest()
report[file].append(md5sum)
return report

class S3VFSFile:
def __init__(self, name, flags, bucket, block_size):
self._key_prefix = \
Expand All @@ -109,28 +133,59 @@ def _blocks(self, offset, amount):
amount -= consume
offset += consume

def _block_object(self, block):
return self._bucket.Object(f'{self._key_prefix}/{block:010d}')

def _block_bytes(self, block):
def _get_block_bytes(self, block):
try:
block_bytes = self._block_object(block).get()["Body"].read()
except self._bucket.meta.client.exceptions.NoSuchKey as e:
block_bytes = b''

return block_bytes
with io.BytesIO() as f:
self._bucket.download_fileobj(f'{self._key_prefix}/{block:010d}', f, Config = TransferConfig(use_threads = False))
return f.getvalue()
except Exception as e:
return b''

def _put_get_block_bytes(self, block, bytes):
with io.BytesIO(bytes) as f:
self._bucket.upload_fileobj(f, f'{self._key_prefix}/{block:010d}' , Config = TransferConfig(use_threads = False))

def xRead(self, amount, offset):
def _read():
for block, start, consume in self._blocks(offset, amount):
block_bytes = self._block_bytes(block)
block_bytes = self._get_block_bytes(block)
yield block_bytes[start:start+consume]

return b"".join(_read())

def xSectorSize(self):
#defines the maximum atomically writable chunk
#but sqlite can only align on powers of 2 > 512
if self._block_size > 512 and log2(self._block_size) % 1 == 0:
return self._block_size
#otherwise we give up and accept unaligned writes
return 0


def xDeviceCharacteristics(self):
#we can safely append and sequentially write
flags = apsw.mapping_device_characteristics['SQLITE_IOCAP_SAFE_APPEND'] | apsw.mapping_device_characteristics['SQLITE_IOCAP_SEQUENTIAL']
#set flags fropm 512 to 64k
if self._block_size == 512:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC512']
if self._block_size == 1024:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC1K']
if self._block_size == 2048:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC2K']
if self._block_size == 4096:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC4K']
if self._block_size == 8192:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC8K']
if self._block_size == 16384:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC16K']
if self._block_size == 32768:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC32K']
if self._block_size == 65536:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC64K']

#our writes are networked so newer truly atomic, but we calculate the size from the stored object, and
#sqlite can work with that guarantee for recovery purposes
return flags

def xFileControl(self, *args):
return False

Expand Down Expand Up @@ -179,11 +234,11 @@ def xWrite(self, data, offset):
data_first_block = offset // self._block_size
lock_page_block = lock_page_offset // self._block_size
for block in range(data_first_block - 1, lock_page_block - 1, -1):
original_block_bytes = self._block_bytes(block)
if len(original_block_bytes) == self._block_size:
original_get_block_bytes = self._get_block_bytes(block)
if len(original_get_block_bytes) == self._block_size:
break
self._block_object(block).put(Body=original_block_bytes + bytes(
self._block_size - len(original_block_bytes)
self._put_get_block_bytes(block, original_get_block_bytes + bytes(
self._block_size - len(original_get_block_bytes)
))

data_offset = 0
Expand All @@ -192,13 +247,13 @@ def xWrite(self, data, offset):
data_to_write = data[data_offset:data_offset+write]

if start != 0 or len(data_to_write) != self._block_size:
original_block_bytes = self._block_bytes(block)
original_block_bytes = original_block_bytes + bytes(max(start - len(original_block_bytes), 0))
original_get_block_bytes = self._get_block_bytes(block)
original_get_block_bytes = original_get_block_bytes + bytes(max(start - len(original_get_block_bytes), 0))

data_to_write = \
original_block_bytes[0:start] + \
original_get_block_bytes[0:start] + \
data_to_write + \
original_block_bytes[start+write:]
original_get_block_bytes[start+write:]

data_offset += write
self._block_object(block).put(Body=data_to_write)
self._put_get_block_bytes(block, data_to_write)

0 comments on commit 06f077c

Please sign in to comment.