Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparation work for corruption/recovery testing #27

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 76 additions & 21 deletions sqlite_s3vfs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import uuid
import io
import apsw
import hashlib
from math import log2
from boto3.s3.transfer import TransferConfig


class S3VFS(apsw.VFS):
Expand Down Expand Up @@ -91,7 +95,27 @@ def block_bytes_iter():
for block, block_bytes in enumerate(block_bytes_iter()):
self._bucket.Object(f'{key_prefix}/{block:010d}').put(Body=block_bytes)


def _list_files(self):
files = set()
for obj in self._bucket.objects.filter(Prefix=''):
#get the first path component and put to set first component / second component
files.add(obj.key.split('/')[0] + '/' + obj.key.split('/')[1])
return list(files)

def _storage_diagnostic(self, page_size, block_size):
report = {}
for file in self._list_files():
report[file] = []
serialized = self.serialize_fileobj(file).read()
#split the serialized byte array into chunks of page_size length
chunks = [serialized[i:i+page_size] for i in range(0, len(serialized), page_size)]
#for each chunk calculate the md5sum
for chunk in chunks:
#calculate md5sum
md5sum = hashlib.md5(chunk).hexdigest()
report[file].append(md5sum)
return report

class S3VFSFile:
def __init__(self, name, flags, bucket, block_size):
self._key_prefix = \
Expand All @@ -109,28 +133,59 @@ def _blocks(self, offset, amount):
amount -= consume
offset += consume

def _block_object(self, block):
return self._bucket.Object(f'{self._key_prefix}/{block:010d}')

def _block_bytes(self, block):
def _get_block_bytes(self, block):
try:
block_bytes = self._block_object(block).get()["Body"].read()
except self._bucket.meta.client.exceptions.NoSuchKey as e:
block_bytes = b''

return block_bytes
with io.BytesIO() as f:
self._bucket.download_fileobj(f'{self._key_prefix}/{block:010d}', f, Config = TransferConfig(use_threads = False))
return f.getvalue()
except Exception as e:
return b''

def _put_get_block_bytes(self, block, bytes):
with io.BytesIO(bytes) as f:
self._bucket.upload_fileobj(f, f'{self._key_prefix}/{block:010d}' , Config = TransferConfig(use_threads = False))

def xRead(self, amount, offset):
def _read():
for block, start, consume in self._blocks(offset, amount):
block_bytes = self._block_bytes(block)
block_bytes = self._get_block_bytes(block)
yield block_bytes[start:start+consume]

return b"".join(_read())

def xSectorSize(self):
#defines the maximum atomically writable chunk
#but sqlite can only align on powers of 2 > 512
if self._block_size > 512 and log2(self._block_size) % 1 == 0:
return self._block_size
#otherwise we give up and accept unaligned writes
return 0


def xDeviceCharacteristics(self):
#we can safely append and sequentially write
flags = apsw.mapping_device_characteristics['SQLITE_IOCAP_SAFE_APPEND'] | apsw.mapping_device_characteristics['SQLITE_IOCAP_SEQUENTIAL']
#set flags fropm 512 to 64k
if self._block_size == 512:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC512']
if self._block_size == 1024:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC1K']
if self._block_size == 2048:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC2K']
if self._block_size == 4096:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC4K']
if self._block_size == 8192:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC8K']
if self._block_size == 16384:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC16K']
if self._block_size == 32768:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC32K']
if self._block_size == 65536:
flags = flags | apsw.mapping_device_characteristics['SQLITE_IOCAP_ATOMIC64K']

#our writes are networked so newer truly atomic, but we calculate the size from the stored object, and
#sqlite can work with that guarantee for recovery purposes
return flags

def xFileControl(self, *args):
return False

Expand Down Expand Up @@ -179,11 +234,11 @@ def xWrite(self, data, offset):
data_first_block = offset // self._block_size
lock_page_block = lock_page_offset // self._block_size
for block in range(data_first_block - 1, lock_page_block - 1, -1):
original_block_bytes = self._block_bytes(block)
if len(original_block_bytes) == self._block_size:
original_get_block_bytes = self._get_block_bytes(block)
if len(original_get_block_bytes) == self._block_size:
break
self._block_object(block).put(Body=original_block_bytes + bytes(
self._block_size - len(original_block_bytes)
self._put_get_block_bytes(block, original_get_block_bytes + bytes(
self._block_size - len(original_get_block_bytes)
))

data_offset = 0
Expand All @@ -192,13 +247,13 @@ def xWrite(self, data, offset):
data_to_write = data[data_offset:data_offset+write]

if start != 0 or len(data_to_write) != self._block_size:
original_block_bytes = self._block_bytes(block)
original_block_bytes = original_block_bytes + bytes(max(start - len(original_block_bytes), 0))
original_get_block_bytes = self._get_block_bytes(block)
original_get_block_bytes = original_get_block_bytes + bytes(max(start - len(original_get_block_bytes), 0))

data_to_write = \
original_block_bytes[0:start] + \
original_get_block_bytes[0:start] + \
data_to_write + \
original_block_bytes[start+write:]
original_get_block_bytes[start+write:]

data_offset += write
self._block_object(block).put(Body=data_to_write)
self._put_get_block_bytes(block, data_to_write)