Skip to content

Commit

Permalink
Merge pull request #61 from xian-network/devnet
Browse files Browse the repository at this point in the history
fixes proper hdf5 usage
  • Loading branch information
duelingbenjos authored Sep 12, 2024
2 parents 014c5c9 + e9a1c22 commit ea0d831
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 44 deletions.
97 changes: 68 additions & 29 deletions src/contracting/storage/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,21 @@ def __build_directories(self):
self.run_state.mkdir(exist_ok=True, parents=True)

def __parse_key(self, key):
try:
filename, variable = key.split(constants.INDEX_SEPARATOR, 1)
variable = variable.replace(constants.DELIMITER, constants.HDF5_GROUP_SEPARATOR)
except ValueError:
filename = "__misc"
variable = key.replace(constants.DELIMITER, constants.HDF5_GROUP_SEPARATOR)
# Split the key into parts (filename, group, etc.)
parts = key.split(constants.INDEX_SEPARATOR, 1) # Ensure key contains the INDEX_SEPARATOR

# The first part should be the filename (e.g., "currency")
filename = parts[0].split(constants.DELIMITER, 1)[0] # Get only 'currency' from 'currency.balances'

# The rest (after the first '.') becomes the group and attribute inside the HDF5 file
if len(parts) > 1:
variable = parts[1].replace(constants.DELIMITER, constants.HDF5_GROUP_SEPARATOR)
else:
variable = parts[0].replace(constants.DELIMITER, constants.HDF5_GROUP_SEPARATOR)

return filename, variable


def __filename_to_path(self, filename):
if filename.startswith("__"):
return str(self.run_state.joinpath(filename))
Expand All @@ -66,13 +73,19 @@ def is_file(self, filename):
return file_path.is_file()

def get(self, key: str, save: bool = True):
"""
Get a value from the cache, pending reads, or disk. If save is True,
the value will be saved to pending_reads.
"""
# Parse the key to get the filename and group
value = self.find(key)
if save and self.pending_reads.get(key) is None:
self.pending_reads[key] = value
if value is not None:
rt.deduct_read(*encode_kv(key, value))
return value


def set(self, key, value):
rt.deduct_write(*encode_kv(key, value))
if self.pending_reads.get(key) is None:
Expand All @@ -82,17 +95,26 @@ def set(self, key, value):
self.pending_writes[key] = value

def find(self, key: str):
"""
Find the value for a given key. If not found in cache or pending writes,
it will look it up from the disk.
"""
if self.bypass_cache:
value = hdf5.get_value_from_disk(self.__filename_to_path(key), key)
# Parse the key to get the filename and group
filename, variable = self.__parse_key(key)
value = hdf5.get_value_from_disk(self.__filename_to_path(filename), variable)
return value

value = self.pending_writes.get(key)
if value is None:
value = self.cache.get(key)
if value is None:
value = hdf5.get_value_from_disk(self.__filename_to_path(key), key)
# Parse the key to get the filename and group for disk lookup
filename, variable = self.__parse_key(key)
value = hdf5.get_value_from_disk(self.__filename_to_path(filename), variable)
return value


def __get_keys_from_file(self, filename):
return hdf5.get_groups(self.__filename_to_path(filename))

Expand Down Expand Up @@ -135,37 +157,42 @@ def iter_from_disk(self, prefix="", length=0):
return keys if length == 0 else keys[:length]

def value_from_disk(self, key):
"""
Retrieve a value from the disk based on the parsed key.
"""
# Parse the key to get the filename and group
filename, variable = self.__parse_key(key)
return hdf5.get_value_from_disk(self.__filename_to_path(filename), variable)

def items(self, prefix=""):
"""
Get all existing items with a given prefix
Get all existing items with a given prefix.
"""

# Get all the items in the cache currently
_items = {}
keys = set()

# Collect pending writes with matching prefix
for k, v in self.pending_writes.items():
if k.startswith(prefix) and v is not None:
_items[k] = v
keys.add(k)

# Collect cache items with matching prefix
for k, v in self.cache.items():
if k.startswith(prefix) and v is not None:
_items[k] = v
keys.add(k)

# Get remaining keys from disk
# Collect keys from the disk
db_keys = set(self.iter_from_disk(prefix=prefix))

# Subtract the already gotten keys
# Subtract already collected keys and add missing ones from disk
for k in db_keys - keys:
_items[k] = self.get(k) # Cache get will add the keys to the cache

return _items


def keys(self, prefix=""):
return list(self.items(prefix).keys())

Expand All @@ -179,14 +206,22 @@ def make_key(self, contract, variable, args=[]):
return HASH_DEPTH_DELIMITER.join((contract_variable, *[str(arg) for arg in args]))
return contract_variable

def get_var(self, contract, variable, arguments=[], mark=True):
key = self.make_key(contract, variable, arguments)
return self.get(key)

def set_var(self, contract, variable, arguments=[], value=None, mark=True):
"""
Set a variable in a contract.
"""
# Construct the key and set the value
key = self.make_key(contract, variable, arguments)
self.set(key, value)

def get_var(self, contract, variable, arguments=[], mark=True):
"""
Get a variable from a contract.
"""
# Construct the key and get the value
key = self.make_key(contract, variable, arguments)
return self.get(key)

def get_owner(self, name):
owner = self.get_var(name, OWNER_KEY)
if owner == "":
Expand Down Expand Up @@ -241,6 +276,10 @@ def get_contract_files(self):
return sorted(os.listdir(self.contract_state))

def delete_key_from_disk(self, key):
"""
Delete a key from the disk by parsing the filename and group from the key.
"""
# Parse the key to get the filename and group
filename, variable = self.__parse_key(key)
if len(filename) < constants.FILENAME_LEN_MAX:
hdf5.delete_key_from_disk(self.__filename_to_path(filename), variable)
Expand Down Expand Up @@ -301,18 +340,21 @@ def commit(self):
Save the current state to disk and clear the L1 and L2 caches.
"""
for k, v in self.pending_writes.items():
# Parse the key before applying to HDF5
filename, variable = self.__parse_key(k)
if v is None:
hdf5.delete_key_from_disk(self.__filename_to_path(k), k)
hdf5.delete_key_from_disk(self.__filename_to_path(filename), variable)
else:
hdf5.set_value_to_disk(self.__filename_to_path(k), k, v, None)
hdf5.set_value_to_disk(self.__filename_to_path(filename), variable, v, None)

self.cache.clear()
self.pending_writes.clear()
self.pending_reads.clear()


def hard_apply(self, nanos):
"""
Save the current state to disk and L1 cache and clear the L2 cache
Save the current state to disk and L1 cache and clear the L2 cache.
"""

deltas = {}
Expand All @@ -328,26 +370,23 @@ def hard_apply(self, nanos):
self.pending_reads = {}
self.pending_writes.clear()

# see if the HCL even exists
if self.pending_deltas.get(nanos) is None:
return

# Run through the sorted HCLs from oldest to newest applying each one until the hcl committed is

# Run through the sorted HCLs from oldest to newest applying each one
to_delete = []
for _nanos, _deltas in sorted(self.pending_deltas.items()):
# Run through all state changes, taking the second value, which is the post delta
for key, delta in _deltas["writes"].items():
hdf5.set_value_to_disk(self.__filename_to_path(key), key, delta[1], nanos)
# Parse the key before applying to HDF5
filename, variable = self.__parse_key(key)
hdf5.set_value_to_disk(self.__filename_to_path(filename), variable, delta[1], nanos)

# Add the key (
to_delete.append(_nanos)
if _nanos == nanos:
break

# Remove the deltas from the set
[self.pending_deltas.pop(key) for key in to_delete]


def get_all_contract_state(self):
"""
Queries the disk storage and returns a dictionary with all the state from the contract storage directory.
Expand Down
52 changes: 37 additions & 15 deletions src/contracting/storage/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,52 @@ def get_groups(file_path):
return list(f.keys())


def set(file_path, group_name, value, blocknum, timeout=20):
"""
Set the value and blocknum attributes in the HDF5 file for the given group.
"""
# Acquire a file lock to prevent concurrent writes
lock = get_file_lock(file_path if isinstance(file_path, str) else file_path.filename)
if lock.acquire(timeout=timeout):
try:
with h5py.File(file_path, 'a') as f:

# Write value and blocknum to the group attributes
write_attr(f, group_name, ATTR_VALUE, value, timeout)
write_attr(f, group_name, ATTR_BLOCK, blocknum, timeout)
finally:
# Always release the lock after operation
lock.release()
else:
raise TimeoutError("Lock acquisition timed out")


def write_attr(file_or_path, group_name, attr_name, value, timeout=20):
# Attempt to acquire lock with a timeout to prevent deadlock
"""
Write an attribute to a group inside an HDF5 file.
"""

# Open the file and ensure group exists, then write the attribute
if isinstance(file_or_path, str):
with h5py.File(file_or_path, 'a') as f:
_write_attr_to_file(f, group_name, attr_name, value, timeout)
else:
_write_attr_to_file(file_or_path, group_name, attr_name, value, timeout)


def _write_attr_to_file(file, group_name, attr_name, value, timeout):
def _write_attr_to_file(file, group_name, attr_name, value, timeout):
"""
Internal method to write the attribute to the group.
"""
# Ensure the group exists, or create it if necessary
grp = file.require_group(group_name)

# Write or update the attribute in the group
if attr_name in grp.attrs:
del grp.attrs[attr_name]
if value:
if value is not None:
grp.attrs[attr_name] = value


def set(file_path, group_name, value, blocknum, timeout=20):
lock = get_file_lock(file_path if isinstance(file_path, str) else file_path.filename)
if lock.acquire(timeout=timeout):
try:
with h5py.File(file_path, 'a') as f:
write_attr(f, group_name, ATTR_VALUE, value, timeout)
write_attr(f, group_name, ATTR_BLOCK, blocknum, timeout)
finally:
lock.release()
else:
raise TimeoutError("Lock acquisition timed out")


def delete(file_path, group_name, timeout=20):
Expand All @@ -87,7 +105,11 @@ def delete(file_path, group_name, timeout=20):


def set_value_to_disk(file_path, group_name, value, block_num=None, timeout=20):
"""
Save value to disk with optional block number.
"""
encoded_value = encode(value) if value is not None else None

set(file_path, group_name, encoded_value, block_num if block_num is not None else -1, timeout)


Expand Down

0 comments on commit ea0d831

Please sign in to comment.