diff --git a/core/blockchain.go b/core/blockchain.go index 79d97d9d0d..2ccc178de8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1039,15 +1039,15 @@ func (bc *BlockChain) Stop() { triedb := bc.triedb for _, offset := range []uint64{0, 1, bc.cacheConfig.TriesInMemory - 1, math.MaxUint64} { - if number := bc.CurrentBlock().Number.Uint64(); number > offset { + if number := bc.CurrentBlock().Number.Uint64(); number > offset || offset == math.MaxUint64 { var recent *types.Block - if offset == math.MaxUint { + if offset == math.MaxUint64 && !bc.triegc.Empty() { _, latest := bc.triegc.Peek() recent = bc.GetBlockByNumber(uint64(-latest)) } else { recent = bc.GetBlockByNumber(number - offset) } - if recent.Root() == (common.Hash{}) { + if recent == nil || recent.Root() == (common.Hash{}) { continue } @@ -1451,7 +1451,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // If MaxNumberOfBlocksToSkipStateSaving or MaxAmountOfGasToSkipStateSaving is not zero, then flushing of some blocks will be skipped: // * at most MaxNumberOfBlocksToSkipStateSaving block state commits will be skipped // * sum of gas used in skipped blocks will be at most MaxAmountOfGasToSkipStateSaving - if bc.cacheConfig.TrieDirtyDisabled { + archiveNode := bc.cacheConfig.TrieDirtyDisabled + if archiveNode { var maySkipCommiting, blockLimitReached, gasLimitReached bool if bc.cacheConfig.MaxNumberOfBlocksToSkipStateSaving != 0 { maySkipCommiting = true @@ -1474,10 +1475,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. bc.amountOfGasInBlocksToSkipStateSaving = bc.cacheConfig.MaxAmountOfGasToSkipStateSaving return bc.triedb.Commit(root, false) } - return nil + // we are skipping saving the trie to diskdb, so we need to keep the trie in memory and garbage collect it later } - // Full but not archive node, do proper garbage collection + // Full node or archive node that's not keeping all states, do proper garbage collection bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive bc.triegc.Push(trieGcEntry{root, block.Header().Time}, -int64(block.NumberU64())) @@ -1510,7 +1511,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } flushInterval := time.Duration(bc.flushInterval.Load()) // If we exceeded out time allowance, flush an entire trie to disk - if bc.gcproc > flushInterval && prevEntry != nil { + // In case of archive node that skips some trie commits we don't flush tries here + if bc.gcproc > flushInterval && prevEntry != nil && !archiveNode { // If the header is missing (canonical chain behind), we're reorging a low // diff sidechain. Suspend committing until this operation is completed. header := bc.GetHeaderByNumber(prevNum) diff --git a/trie/triedb/hashdb/database.go b/trie/triedb/hashdb/database.go index 8ebcff9f1f..da1339e07f 100644 --- a/trie/triedb/hashdb/database.go +++ b/trie/triedb/hashdb/database.go @@ -329,20 +329,25 @@ func (db *Database) Cap(limit common.StorageSize) error { // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. - nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() + start := time.Now() batch := db.diskdb.NewBatch() + db.lock.RLock() + nodes, storage := len(db.dirties), db.dirtiesSize // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. size := db.dirtiesSize + common.StorageSize(len(db.dirties)*cachedNodeSize) size += db.childrenSize + db.lock.RUnlock() // Keep committing nodes from the flush-list until we're below allowance oldest := db.oldest for size > limit && oldest != (common.Hash{}) { // Fetch the oldest referenced node and push into the batch + db.lock.RLock() node := db.dirties[oldest] + db.lock.RUnlock() rawdb.WriteLegacyTrieNode(batch, oldest, node.node) // If we exceeded the ideal batch size, commit and reset @@ -418,7 +423,9 @@ func (db *Database) Commit(node common.Hash, report bool) error { batch := db.diskdb.NewBatch() // Move the trie itself into the batch, flushing if enough data is accumulated + db.lock.RLock() nodes, storage := len(db.dirties), db.dirtiesSize + db.lock.RUnlock() uncacher := &cleaner{db} if err := db.commit(node, batch, uncacher); err != nil { @@ -460,7 +467,9 @@ func (db *Database) Commit(node common.Hash, report bool) error { // commit is the private locked version of Commit. func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error { // If the node does not exist, it's a previously committed node + db.lock.RLock() node, ok := db.dirties[hash] + db.lock.RUnlock() if !ok { return nil }