Skip to content

Commit

Permalink
Merge pull request #291 from OffchainLabs/backport-hashdb-locking
Browse files Browse the repository at this point in the history
backport upstream hashdb locking changes
  • Loading branch information
tsahee authored Mar 7, 2024
2 parents 086bfcc + 9a83389 commit a104baf
Showing 1 changed file with 25 additions and 53 deletions.
78 changes: 25 additions & 53 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ var Defaults = &Config{
// Database is an intermediate write layer between the trie data structures and
// the disk database. The aim is to accumulate trie writes in-memory and only
// periodically flush a couple tries to disk, garbage collecting the remainder.
//
// Note, the trie Database is **not** thread safe in its mutations, but it **is**
// thread safe in providing individual, independent node access. The rationale
// behind this split design is to provide read access to RPC handlers and sync
// servers even while the trie is executing expensive garbage collection.
type Database struct {
diskdb ethdb.Database // Persistent storage for matured trie nodes
resolver ChildResolver // The handler to resolve children of nodes
Expand All @@ -113,7 +108,7 @@ type Database struct {
// cachedNode is all the information we know about a single cached trie node
// in the memory database write layer.
type cachedNode struct {
node []byte // Encoded node blob
node []byte // Encoded node blob, immutable
parents uint32 // Number of live nodes referencing this one
external map[common.Hash]struct{} // The set of external children
flushPrev common.Hash // Previous node in the flush-list
Expand Down Expand Up @@ -152,9 +147,9 @@ func New(diskdb ethdb.Database, config *Config, resolver ChildResolver) *Databas
}
}

// insert inserts a simplified trie node into the memory database.
// All nodes inserted by this function will be reference tracked
// and in theory should only used for **trie nodes** insertion.
// insert inserts a trie node into the memory database. All nodes inserted by
// this function will be reference tracked. This function assumes the lock is
// already held.
func (db *Database) insert(hash common.Hash, node []byte) {
// If the node's already cached, skip
if _, ok := db.dirties[hash]; ok {
Expand Down Expand Up @@ -183,9 +178,9 @@ func (db *Database) insert(hash common.Hash, node []byte) {
db.dirtiesSize += common.StorageSize(common.HashLength + len(node))
}

// Node retrieves an encoded cached trie node from memory. If it cannot be found
// node retrieves an encoded cached trie node from memory. If it cannot be found
// cached, the method queries the persistent database for the content.
func (db *Database) Node(hash common.Hash) ([]byte, error) {
func (db *Database) node(hash common.Hash) ([]byte, error) {
// It doesn't make sense to retrieve the metaroot
if hash == (common.Hash{}) {
return nil, errors.New("not found")
Expand All @@ -198,11 +193,14 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) {
return enc, nil
}
}
// Retrieve the node from the dirty cache if available
// Retrieve the node from the dirty cache if available.
db.lock.RLock()
dirty := db.dirties[hash]
db.lock.RUnlock()

// Return the cached node if it's found in the dirty set.
// The dirty.node field is immutable and safe to read it
// even without lock guard.
if dirty != nil {
memcacheDirtyHitMeter.Mark(1)
memcacheDirtyReadMeter.Mark(int64(len(dirty.node)))
Expand All @@ -223,18 +221,9 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) {
return nil, errors.New("not found")
}

// Nodes retrieves the hashes of all the nodes cached within the memory database.
// This method is extremely expensive and should only be used to validate internal
// states in test code.
func (db *Database) Nodes() []common.Hash {
db.lock.RLock()
defer db.lock.RUnlock()

var hashes = make([]common.Hash, 0, len(db.dirties))
for hash := range db.dirties {
hashes = append(hashes, hash)
}
return hashes
// arbitrum: exposing hashdb.Database.Node for triedb.Database.Node currently used by arbitrum.RecordingKV.Get
func (db *Database) Node(hash common.Hash) ([]byte, error) {
return db.node(hash)
}

// Reference adds a new reference from a parent node to a child node.
Expand Down Expand Up @@ -344,33 +333,28 @@ func (db *Database) dereference(hash common.Hash) {

// Cap iteratively flushes old but still referenced trie nodes until the total
// memory usage goes below the given threshold.
//
// Note, this method is a non-synchronized mutator. It is unsafe to call this
// concurrently with other mutators.
func (db *Database) Cap(limit common.StorageSize) error {
db.lock.Lock()
defer db.lock.Unlock()

// Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from
// memory cache during commit but not yet in persistent storage). This is ensured
// by only uncaching existing data when the database write finalizes.
start := time.Now()
batch := db.diskdb.NewBatch()
db.lock.RLock()
nodes, storage := len(db.dirties), db.dirtiesSize
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
size := db.dirtiesSize + common.StorageSize(len(db.dirties)*cachedNodeSize)
size += db.childrenSize
db.lock.RUnlock()

// Keep committing nodes from the flush-list until we're below allowance
oldest := db.oldest
for size > limit && oldest != (common.Hash{}) {
// Fetch the oldest referenced node and push into the batch
db.lock.RLock()
node := db.dirties[oldest]
db.lock.RUnlock()
rawdb.WriteLegacyTrieNode(batch, oldest, node.node)

// If we exceeded the ideal batch size, commit and reset
Expand All @@ -396,9 +380,6 @@ func (db *Database) Cap(limit common.StorageSize) error {
return err
}
// Write successful, clear out the flushed data
db.lock.Lock()
defer db.lock.Unlock()

for db.oldest != oldest {
node := db.dirties[db.oldest]
delete(db.dirties, db.oldest)
Expand Down Expand Up @@ -429,14 +410,13 @@ func (db *Database) Cap(limit common.StorageSize) error {
// Commit iterates over all the children of a particular node, writes them out
// to disk, forcefully tearing down all references in both directions. As a side
// effect, all pre-images accumulated up to this point are also written.
//
// Note, this method is a non-synchronized mutator. It is unsafe to call this
// concurrently with other mutators.
func (db *Database) Commit(node common.Hash, report bool) error {
if node == (common.Hash{}) {
// There's no data to commit in this node
return nil
}
db.lock.Lock()
defer db.lock.Unlock()

// Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from
Expand All @@ -446,9 +426,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
batch := db.diskdb.NewBatch()

// Move the trie itself into the batch, flushing if enough data is accumulated
db.lock.RLock()
nodes, storage := len(db.dirties), db.dirtiesSize
db.lock.RUnlock()

uncacher := &cleaner{db}
if err := db.commit(node, batch, uncacher); err != nil {
Expand All @@ -461,8 +439,6 @@ func (db *Database) Commit(node common.Hash, report bool) error {
return err
}
// Uncache any leftovers in the last batch
db.lock.Lock()
defer db.lock.Unlock()
if err := batch.Replay(uncacher); err != nil {
return err
}
Expand Down Expand Up @@ -490,9 +466,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
// commit is the private locked version of Commit.
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error {
// If the node does not exist, it's a previously committed node
db.lock.RLock()
node, ok := db.dirties[hash]
db.lock.RUnlock()
if !ok {
return nil
}
Expand All @@ -513,13 +487,11 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane
if err := batch.Write(); err != nil {
return err
}
db.lock.Lock()
err := batch.Replay(uncacher)
batch.Reset()
db.lock.Unlock()
if err != nil {
return err
}
batch.Reset()
}
return nil
}
Expand Down Expand Up @@ -588,7 +560,7 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {
func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, nodes *trienode.MergedNodeSet, states *triestate.Set) error {
// Ensure the parent state is present and signal a warning if not.
if parent != types.EmptyRootHash {
if blob, _ := db.Node(parent); len(blob) == 0 {
if blob, _ := db.node(parent); len(blob) == 0 {
log.Error("parent state is not present")
}
}
Expand Down Expand Up @@ -669,7 +641,7 @@ func (db *Database) Scheme() string {
// Reader retrieves a node reader belonging to the given state root.
// An error will be returned if the requested state is not available.
func (db *Database) Reader(root common.Hash) (*reader, error) {
if _, err := db.Node(root); err != nil {
if _, err := db.node(root); err != nil {
return nil, fmt.Errorf("state %#x is not available, %v", root, err)
}
return &reader{db: db}, nil
Expand All @@ -680,9 +652,9 @@ type reader struct {
db *Database
}

// Node retrieves the trie node with the given node hash.
// No error will be returned if the node is not found.
// Node retrieves the trie node with the given node hash. No error will be
// returned if the node is not found.
func (reader *reader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
blob, _ := reader.db.Node(hash)
blob, _ := reader.db.node(hash)
return blob, nil
}

0 comments on commit a104baf

Please sign in to comment.