Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

triedb/pathdb: introduce lookup structure to optimize state access #30971

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,13 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
StateHistory: c.StateHistory,
TrieCleanSize: c.TrieCleanLimit * 1024 * 1024,
StateCleanSize: c.SnapshotLimit * 1024 * 1024,

// TODO(rjl493456442): The write buffer represents the memory limit used
// for flushing both trie data and state data to disk. The config name
// should be updated to eliminate the confusion.
WriteBufferSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
Expand Down Expand Up @@ -349,11 +354,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
// Head state is missing, before the state recovery, find out the disk
// layer point of snapshot(if it's enabled). Make sure the rewound point
// is lower than disk layer.
//
// Note it's unnecessary in path mode which always keep trie data and
// state data consistent.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
if bc.cacheConfig.SnapshotLimit > 0 && bc.cacheConfig.StateScheme == rawdb.HashScheme {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
Expand Down Expand Up @@ -426,15 +434,39 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
bc.setupSnapshot()

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
if bc.cacheConfig.StateScheme == rawdb.PathScheme {
return
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
Expand All @@ -451,23 +483,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Re-initialize the state database with snapshot
bc.statedb = state.NewDatabase(bc.triedb, bc.snaps)
}

// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
if compat.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compat.RewindToTime)
} else {
bc.SetHead(compat.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}

// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}

// empty returns an indicator whether the blockchain is empty.
Expand Down
34 changes: 23 additions & 11 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
)
defer engine.Close()
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
config.SnapshotLimit = 256
config.SnapshotWait = true
}
Expand Down Expand Up @@ -1820,7 +1820,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil {
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down Expand Up @@ -1952,8 +1952,10 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
if scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}

// Insert block B3 and commit the state into disk
Expand Down Expand Up @@ -1997,15 +1999,23 @@ func testIssue23496(t *testing.T, scheme string) {
}
expHead := uint64(1)
if scheme == rawdb.PathScheme {
expHead = uint64(2)
// The pathdb database makes sure that snapshot and trie are consistent,
// so only the last block is reverted in case of a crash.
expHead = uint64(3)
}
if head := chain.CurrentBlock(); head.Number.Uint64() != expHead {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, expHead)
}

// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
if scheme == rawdb.PathScheme {
// Reinsert B4
if _, err := chain.InsertChain(blocks[3:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
} else {
// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
}
if head := chain.CurrentHeader(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, 4)
Expand All @@ -2016,7 +2026,9 @@ func testIssue23496(t *testing.T, scheme string) {
if head := chain.CurrentBlock(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, uint64(4))
}
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
if scheme == rawdb.HashScheme {
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
}
}
}
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
}
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
Expand Down
22 changes: 14 additions & 8 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme {
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
Expand Down Expand Up @@ -149,13 +149,17 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
} else if basic.scheme == rawdb.HashScheme {
if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
}

// Check the snapshot, ensure it's integrated
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
if basic.scheme == rawdb.HashScheme {
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
}

Expand Down Expand Up @@ -565,12 +569,14 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
//
// Expected head header : C8
// Expected head fast block: C8
// Expected head block : G
// Expected snapshot disk : C4
// Expected head block : G (Hash mode), C6 (Hash mode)
// Expected snapshot disk : C4 (Hash mode)
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)
if scheme == rawdb.PathScheme {
expHead = uint64(4)
// The pathdb database makes sure that snapshot and trie are consistent,
// so only the last two blocks are reverted in case of a crash.
expHead = uint64(6)
}
test := &crashSnapshotTest{
snapshotTestBasic{
Expand Down
25 changes: 13 additions & 12 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,26 +175,27 @@ func NewDatabaseForTesting() *CachingDB {
func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
var readers []StateReader

// Set up the state snapshot reader if available. This feature
// is optional and may be partially useful if it's not fully
// generated.
if db.snap != nil {
// If standalone state snapshot is available (hash scheme),
// then construct the legacy snap reader.
// Configure the state reader using the standalone snapshot in hash mode.
// This reader offers improved performance but is optional and only
// partially useful if the snapshot is not fully generated.
if db.TrieDB().Scheme() == rawdb.HashScheme && db.snap != nil {
snap := db.snap.Snapshot(stateRoot)
if snap != nil {
readers = append(readers, newFlatReader(snap))
}
} else {
// If standalone state snapshot is not available, try to construct
// the state reader with database.
}
// Configure the state reader using the path database in path mode.
// This reader offers improved performance but is optional and only
// partially useful if the snapshot data in path database is not
// fully generated.
if db.TrieDB().Scheme() == rawdb.PathScheme {
reader, err := db.triedb.StateReader(stateRoot)
if err == nil {
readers = append(readers, newFlatReader(reader)) // state reader is optional
readers = append(readers, newFlatReader(reader))
}
}
// Set up the trie reader, which is expected to always be available
// as the gatekeeper unless the state is corrupted.
// Configure the trie reader, which is expected to be available as the
// gatekeeper unless the state is corrupted.
tr, err := newTrieReader(stateRoot, db.triedb, db.pointCache)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ func newHelper(scheme string) *testHelper {
diskdb := rawdb.NewMemoryDatabase()
config := &triedb.Config{}
if scheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
config.PathDB = &pathdb.Config{
SnapshotNoBuild: true,
} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}
Expand Down
3 changes: 2 additions & 1 deletion core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,8 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
)
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
TrieCleanSize: 0,
StateCleanSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {
Expand Down
3 changes: 2 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -175,7 +176,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
}
// If snap sync is requested but snapshots are disabled, fail loudly
if h.snapSync.Load() && config.Chain.Snapshots() == nil {
if h.snapSync.Load() && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
return nil, errors.New("snap sync not supported with snapshots disabled")
}
// Construct the downloader (long sync)
Expand Down
Loading