diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 040629f79e..e273ebbaac 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -284,6 +284,12 @@ var ( Usage: "Scheme to use for storing ethereum state ('hash' or 'path')", Category: flags.StateCategory, } + PathDBSyncFlag = &cli.BoolFlag{ + Name: "pathdb.sync", + Usage: "sync flush nodes cache to disk in path schema", + Value: false, + Category: flags.StateCategory, + } StateHistoryFlag = &cli.Uint64Flag{ Name: "history.state", Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)", @@ -1838,6 +1844,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions") cfg.TransactionHistory = ctx.Uint64(TxLookupLimitFlag.Name) } + if ctx.IsSet(PathDBSyncFlag.Name) { + cfg.PathSyncFlush = true + } if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") diff --git a/core/blockchain.go b/core/blockchain.go index d3a7766bec..c0d920de28 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -146,6 +146,7 @@ type CacheConfig struct { Preimages bool // Whether to store preimage of trie key to the disk StateHistory uint64 // Number of blocks from head whose state histories are reserved. StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top + PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk. SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it @@ -163,6 +164,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config { } if c.StateScheme == rawdb.PathScheme { config.PathDB = &pathdb.Config{ + SyncFlush: c.PathSyncFlush, StateHistory: c.StateHistory, CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024, @@ -372,6 +374,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if bc.cacheConfig.SnapshotLimit > 0 { diskRoot = rawdb.ReadSnapshotRoot(bc.db) } + if bc.triedb.Scheme() == rawdb.PathScheme { + recoverable, _ := bc.triedb.Recoverable(diskRoot) + if !bc.HasState(diskRoot) && !recoverable { + diskRoot = bc.triedb.Head() + } + } if diskRoot != (common.Hash{}) { log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot) @@ -1042,7 +1050,7 @@ func (bc *BlockChain) Stop() { for !bc.triegc.Empty() { triedb.Dereference(bc.triegc.PopItem()) } - if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb + if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb log.Error("Dangling trie nodes after full cleanup") } } @@ -1465,8 +1473,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } // If we exceeded our memory allowance, flush matured singleton nodes to disk var ( - _, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + _, nodes, _, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 ) if nodes > limit || imgs > 4*1024*1024 { bc.triedb.Cap(limit - ethdb.IdealBatchSize) @@ -1882,12 +1890,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) - triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing - trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read - blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing - blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation + blockExecutionTimer.Update(ptime) // The time spent on EVM processing + blockValidationTimer.Update(vtime) // The time spent on block validation // Write the block to the chain and get the status. var ( @@ -1910,7 +1916,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them - blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits) + blockWriteTimer.UpdateSince(wstart) blockInsertTimer.UpdateSince(start) // Report the import stats before returning the various results @@ -1921,8 +1927,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if bc.snaps != nil { snapDiffItems, snapBufItems = bc.snaps.Size() } - trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() - stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead) + trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size() + stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 9bf662b6b7..83a4cea5dc 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) { +func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -73,6 +73,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn context = append(context, []interface{}{"triediffs", trieDiffNodes}...) } context = append(context, []interface{}{"triedirty", triebufNodes}...) + context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...) if st.queued > 0 { context = append(context, []interface{}{"queued", st.queued}...) diff --git a/core/rawdb/accessors_snapshot.go b/core/rawdb/accessors_snapshot.go index 3c82b3f731..089eafc836 100644 --- a/core/rawdb/accessors_snapshot.go +++ b/core/rawdb/accessors_snapshot.go @@ -18,10 +18,12 @@ package rawdb import ( "encoding/binary" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) // ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled. @@ -74,6 +76,10 @@ func DeleteSnapshotRoot(db ethdb.KeyValueWriter) { // ReadAccountSnapshot retrieves the snapshot entry of an account trie leaf. func ReadAccountSnapshot(db ethdb.KeyValueReader, hash common.Hash) []byte { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { rawdbGetAccountSnapNodeTimer.UpdateSince(start) }() + } data, _ := db.Get(accountSnapshotKey(hash)) return data } @@ -94,6 +100,10 @@ func DeleteAccountSnapshot(db ethdb.KeyValueWriter, hash common.Hash) { // ReadStorageSnapshot retrieves the snapshot entry of an storage trie leaf. func ReadStorageSnapshot(db ethdb.KeyValueReader, accountHash, storageHash common.Hash) []byte { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { rawdbGetStorageSnapNodeTimer.UpdateSince(start) }() + } data, _ := db.Get(storageSnapshotKey(accountHash, storageHash)) return data } diff --git a/core/rawdb/accessors_trie.go b/core/rawdb/accessors_trie.go index 78f1a70b1c..3ca4fbbe51 100644 --- a/core/rawdb/accessors_trie.go +++ b/core/rawdb/accessors_trie.go @@ -19,11 +19,13 @@ package rawdb import ( "fmt" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "golang.org/x/crypto/sha3" ) @@ -68,6 +70,10 @@ func (h *hasher) release() { // ReadAccountTrieNode retrieves the account trie node and the associated node // hash with the specified node path. func ReadAccountTrieNode(db ethdb.KeyValueReader, path []byte) ([]byte, common.Hash) { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { rawdbGetAccountTrieNodeTimer.UpdateSince(start) }() + } data, err := db.Get(accountTrieNodeKey(path)) if err != nil { return nil, common.Hash{} @@ -116,6 +122,10 @@ func DeleteAccountTrieNode(db ethdb.KeyValueWriter, path []byte) { // ReadStorageTrieNode retrieves the storage trie node and the associated node // hash with the specified node path. func ReadStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path []byte) ([]byte, common.Hash) { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { rawdbGetStorageTrieNodeTimer.UpdateSince(start) }() + } data, err := db.Get(storageTrieNodeKey(accountHash, path)) if err != nil { return nil, common.Hash{} diff --git a/core/rawdb/metrics.go b/core/rawdb/metrics.go new file mode 100644 index 0000000000..b4c09f8689 --- /dev/null +++ b/core/rawdb/metrics.go @@ -0,0 +1,10 @@ +package rawdb + +import "github.com/ethereum/go-ethereum/metrics" + +var ( + rawdbGetAccountTrieNodeTimer = metrics.NewRegisteredTimer("rawdb/get/account/trienode/time", nil) + rawdbGetStorageTrieNodeTimer = metrics.NewRegisteredTimer("rawdb/get/storage/trienode/time", nil) + rawdbGetAccountSnapNodeTimer = metrics.NewRegisteredTimer("rawdb/get/account/snapnode/time", nil) + rawdbGetStorageSnapNodeTimer = metrics.NewRegisteredTimer("rawdb/get/storage/snapnode/time", nil) +) diff --git a/core/state_processor.go b/core/state_processor.go index 8b13aa0eb3..4afc4683b7 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -28,9 +29,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" ) +var ( + processTxTimer = metrics.NewRegisteredTimer("process/tx/time", nil) +) + // StateProcessor is a basic Processor, which takes care of transitioning // state from one point to another. // @@ -88,6 +94,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } // Iterate over and process the individual transactions for i, tx := range block.Transactions() { + start := time.Now() msg, err := TransactionToMessage(tx, signer, header.BaseFee) if err != nil { return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) @@ -99,6 +106,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } receipts = append(receipts, receipt) allLogs = append(allLogs, receipt.Logs...) + if metrics.EnabledExpensive { + processTxTimer.UpdateSince(start) + } } // Fail if Shanghai not enabled and len(withdrawals) is non-zero. withdrawals := block.Withdrawals() diff --git a/eth/backend.go b/eth/backend.go index c20bded0f1..5939fc3462 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -207,6 +207,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { StateHistory: config.StateHistory, StateScheme: scheme, TrieCommitInterval: config.TrieCommitInterval, + PathSyncFlush: config.PathSyncFlush, } ) // Override the chain config with provided settings. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 39fc3734f0..7f31961a9c 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -109,7 +109,8 @@ type Config struct { // State scheme represents the scheme used to store ethereum states and trie // nodes on top. It can be 'hash', 'path', or none which means use the scheme // consistent with persistent state. - StateScheme string `toml:",omitempty"` + StateScheme string `toml:",omitempty"` + PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top // RequiredBlocks is a set of block number -> hash mappings which must be in the // canonical chain of all remote peers. Setting the option makes geth verify the diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 1d64d3ad16..b006e034fd 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -28,6 +28,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TransactionHistory uint64 `toml:",omitempty"` StateHistory uint64 `toml:",omitempty"` StateScheme string `toml:",omitempty"` + PathSyncFlush bool `toml:",omitempty"` RequiredBlocks map[uint64]common.Hash `toml:"-"` LightServ int `toml:",omitempty"` LightIngress int `toml:",omitempty"` @@ -78,6 +79,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TransactionHistory = c.TransactionHistory enc.StateHistory = c.StateHistory enc.StateScheme = c.StateScheme + enc.PathSyncFlush = c.PathSyncFlush enc.RequiredBlocks = c.RequiredBlocks enc.LightServ = c.LightServ enc.LightIngress = c.LightIngress @@ -132,6 +134,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TransactionHistory *uint64 `toml:",omitempty"` StateHistory *uint64 `toml:",omitempty"` StateScheme *string `toml:",omitempty"` + PathSyncFlush *bool `toml:",omitempty"` RequiredBlocks map[uint64]common.Hash `toml:"-"` LightServ *int `toml:",omitempty"` LightIngress *int `toml:",omitempty"` @@ -207,6 +210,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.StateScheme != nil { c.StateScheme = *dec.StateScheme } + if dec.PathSyncFlush != nil { + c.PathSyncFlush = *dec.PathSyncFlush + } if dec.RequiredBlocks != nil { c.RequiredBlocks = dec.RequiredBlocks } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 19128d12b1..497b38dedb 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u parent = root } if report { - _, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb - log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) + _, nodes, immutablenodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb + log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs) } return statedb, func() { triedb.Dereference(block.Root()) }, nil } diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 7e14f69395..14d45fc831 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -369,8 +369,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed // if the relevant state is available in disk. var preferDisk bool if statedb != nil { - s1, s2, s3 := statedb.Database().TrieDB().Size() - preferDisk = s1+s2+s3 > defaultTracechainMemLimit + s1, s2, s3, s4 := statedb.Database().TrieDB().Size() + preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit } statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk) if err != nil { diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index e58efbddbe..b9a729caa8 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -190,6 +190,10 @@ func (db *Database) Has(key []byte) (bool, error) { // Get retrieves the given key if it's present in the key-value store. func (db *Database) Get(key []byte) ([]byte, error) { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbGetTimer.UpdateSince(start) }() + } dat, err := db.db.Get(key, nil) if err != nil { return nil, err @@ -199,11 +203,19 @@ func (db *Database) Get(key []byte) ([]byte, error) { // Put inserts the given value into the key-value store. func (db *Database) Put(key []byte, value []byte) error { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbPutTimer.UpdateSince(start) }() + } return db.db.Put(key, value, nil) } // Delete removes the key from the key-value store. func (db *Database) Delete(key []byte) error { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbDeleteTimer.UpdateSince(start) }() + } return db.db.Delete(key, nil) } @@ -414,6 +426,10 @@ func (b *batch) ValueSize() int { // Write flushes any accumulated data to disk. func (b *batch) Write() error { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbBatchWriteTimer.UpdateSince(start) }() + } return b.db.Write(b.b, nil) } diff --git a/ethdb/metrics.go b/ethdb/metrics.go new file mode 100644 index 0000000000..39133a7243 --- /dev/null +++ b/ethdb/metrics.go @@ -0,0 +1,10 @@ +package ethdb + +import "github.com/ethereum/go-ethereum/metrics" + +var ( + EthdbGetTimer = metrics.NewRegisteredTimer("ethdb/get/time", nil) + EthdbPutTimer = metrics.NewRegisteredTimer("ethdb/put/time", nil) + EthdbDeleteTimer = metrics.NewRegisteredTimer("ethdb/delete/time", nil) + EthdbBatchWriteTimer = metrics.NewRegisteredTimer("ethdb/batch/write/time", nil) +) diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index ae0039f41e..aa9a3f2fc9 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -299,6 +299,10 @@ func (d *Database) Has(key []byte) (bool, error) { // Get retrieves the given key if it's present in the key-value store. func (d *Database) Get(key []byte) ([]byte, error) { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbGetTimer.UpdateSince(start) }() + } d.quitLock.RLock() defer d.quitLock.RUnlock() if d.closed { @@ -316,6 +320,10 @@ func (d *Database) Get(key []byte) ([]byte, error) { // Put inserts the given value into the key-value store. func (d *Database) Put(key []byte, value []byte) error { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbPutTimer.UpdateSince(start) }() + } d.quitLock.RLock() defer d.quitLock.RUnlock() if d.closed { @@ -326,6 +334,10 @@ func (d *Database) Put(key []byte, value []byte) error { // Delete removes the key from the key-value store. func (d *Database) Delete(key []byte) error { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbDeleteTimer.UpdateSince(start) }() + } d.quitLock.RLock() defer d.quitLock.RUnlock() if d.closed { @@ -482,6 +494,10 @@ func (d *Database) meter(refresh time.Duration, namespace string) { nonLevel0CompCount = int64(d.nonLevel0Comp.Load()) level0CompCount = int64(d.level0Comp.Load()) ) + fmt.Printf("loop print db stats db_metrics=\n%v\n", stats) + d.log.Info("loop print db stats", "comp_time", compTime, "write_delay_count", writeDelayCount, "write_delay_time", + writeDelayTime, "non_level0_comp_count", nonLevel0CompCount, "level0_comp_count", level0CompCount) + writeDelayTimes[i%2] = writeDelayTime writeDelayCounts[i%2] = writeDelayCount compTimes[i%2] = compTime @@ -580,6 +596,10 @@ func (b *batch) ValueSize() int { // Write flushes any accumulated data to disk. func (b *batch) Write() error { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { ethdb.EthdbBatchWriteTimer.UpdateSince(start) }() + } b.db.quitLock.RLock() defer b.db.quitLock.RUnlock() if b.db.closed { diff --git a/go.mod b/go.mod index 0b7be66d06..b20617eb1c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/ethereum/go-ethereum -go 1.20 +go 1.21 + +//toolchain go1.22.0 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 diff --git a/go.sum b/go.sum index ada56d58cf..aee9afc2d4 100644 --- a/go.sum +++ b/go.sum @@ -51,9 +51,11 @@ github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuI github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 h1:8q4SaHjFsClSvuVne0ID/5Ka8u3fcIHyqkLjcFpNRHQ= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0/go.mod h1:bjGvMhVMb+EEm3VRNQawDMUyMMjo+S5ewNjflkep/0Q= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybIsqD8sMV8js0NyQM8JDnVtg= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9OrhHJoDD8ZDq51FHgXjqtP9z6bEwBq9U= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0/go.mod h1:c+Lifp3EDEamAkPVzMooRNOK6CZjNSdEnf1A7jsI9u4= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 h1:gggzg0SUMs6SQbEw+3LoSsYf9YMjkupeAnHMX8O9mmY= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0/go.mod h1:+6KLcKIVgxoBDMqMO/Nvy7bZ9a0nbU3I1DtFQK3YvB4= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= @@ -68,10 +70,13 @@ github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 h1:OBhqkivkhkMqLPymWEppkm7vgPQY2XsHoEkaMQ0AdZY= +github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0/go.mod h1:kgDmCTgBzIEPFElEF+FK0SdjAor06dRq2Go927dnQ6o= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= +github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno= github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= @@ -189,6 +194,7 @@ github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/btcutil v1.1.2 h1:XLMbX8JQEiwMcYft2EGi8zPUkoa0abKIU6/BJSRsjzQ= +github.com/btcsuite/btcd/btcutil v1.1.2/go.mod h1:UR7dsSJzJUfMmFiiLlIrMq1lS9jh9EdCV7FStZSnpi0= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -234,6 +240,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.9.1 h1:yFVvsI0VxmRShfawbt/laCIDy/mtTqqnvoNgiy5bEV8= github.com/cockroachdb/errors v1.9.1/go.mod h1:2sxOtL2WIc096WSZqZ5h8fa17rdDq9HZOZLBCor4mBk= github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= @@ -248,6 +255,7 @@ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1: github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/cometbft/cometbft-db v0.7.0 h1:uBjbrBx4QzU0zOEnU8KxoDl18dMNgDh+zZRUE0ucsbo= +github.com/cometbft/cometbft-db v0.7.0/go.mod h1:yiKJIm2WKrt6x8Cyxtq9YTEcIMPcEe4XPxhgX59Fzf0= github.com/consensys/bavard v0.1.8-0.20210406032232-f3452dc9b572/go.mod h1:Bpd0/3mZuaj6Sj+PqrmIquiOKy397AKGThQPaGzNXAQ= github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= @@ -263,6 +271,7 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d h1:49RLWk1j44Xu4fjHb6JFYmeUnDORVwHNkDxaQ0ctCVU= +github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y= github.com/cosmos/gogoproto v1.4.11 h1:LZcMHrx4FjUgrqQSWeaGC1v/TeuVFqSLa43CC6aWR2g= github.com/cosmos/gogoproto v1.4.11/go.mod h1:/g39Mh8m17X8Q/GDEs5zYTSNaNnInBSohtaxzQnYq1Y= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= @@ -274,6 +283,7 @@ github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80/go.mod h1:gzbV github.com/crate-crypto/go-kzg-4844 v0.7.0 h1:C0vgZRk4q4EZ/JgPfzuSoxdCq3C3mOZMBShovmncxvA= github.com/crate-crypto/go-kzg-4844 v0.7.0/go.mod h1:1kMhvPgI0Ky3yIa+9lFySEBUBXkYxeOi8ZF1sYioxhc= github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM= +github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= @@ -292,6 +302,7 @@ github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14y github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= +github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= @@ -303,6 +314,7 @@ github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlN github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8= github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= +github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.4-0.20210318174700-74754f61e018/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= @@ -317,6 +329,7 @@ github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwu github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo= github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= @@ -410,6 +423,7 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclK github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -522,6 +536,7 @@ github.com/gomodule/redigo v1.7.1-0.20190724094224-574c33c3df38/go.mod h1:B4C85q github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= +github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.1.1-0.20171103154506-982329095285/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -552,6 +567,7 @@ github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/orderedcode v0.0.1 h1:UzfcAexk9Vhv8+9pNOgRu41f16lHq725vPwnSeiG/Us= +github.com/google/orderedcode v0.0.1/go.mod h1:iVyU4/qPKHY5h/wSd6rZZCDcLJNxiWO6dvsYES2Sb20= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= @@ -607,6 +623,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.0.1/go.mod h1:oVMjMN64nzEcepv1kdZKg github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is= github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc= +github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= @@ -619,6 +636,7 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-hclog v1.2.0 h1:La19f8d7WIlm4ogzNHB0JGqs5AUDAZ2UfCY4sJXcJdM= +github.com/hashicorp/go-hclog v1.2.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= @@ -740,6 +758,7 @@ github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHW github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U= +github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/joonix/log v0.0.0-20200409080653-9c1d2ceb5f1d/go.mod h1:fS54ONkjDV71zS9CDx3V9K21gJg7byKSvI4ajuWFNJw= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -823,6 +842,7 @@ github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= +github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtTX406BpdQMqw= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= @@ -1005,6 +1025,7 @@ github.com/mimoo/StrobeGo v0.0.0-20210601165009-122bf33a46e0/go.mod h1:43+3pMjjK github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= @@ -1132,6 +1153,7 @@ github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je4 github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= +github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/openconfig/gnmi v0.0.0-20190823184014-89b2bf29312c/go.mod h1:t+O9It+LKzfOAhKTT5O0ehDix+MTqbtT0T9t+7zzOvc= github.com/openconfig/reference v0.0.0-20190727015836-8dfd928c9696/go.mod h1:ym2A+zigScwkSEb/cVQB0/ZMpU3rqiH6X7WRRsxgOGw= @@ -1170,6 +1192,7 @@ github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1182,6 +1205,7 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= @@ -1453,6 +1477,7 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= @@ -1961,7 +1986,9 @@ google.golang.org/genproto v0.0.0-20201019141844-1ed22bb0c154/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210426193834-eac7f76ac494/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= +google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 h1:W18sezcAYs+3tDZX4F80yctqa12jcP1PUS2gQu1zTPU= +google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97/go.mod h1:iargEX0SFPm3xcfMI0d1domjg0ZF4Aa0p2awqyxhvF0= google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I= google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc= google.golang.org/grpc v1.2.1-0.20170921194603-d4b75ebd4f9f/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= diff --git a/trie/database.go b/trie/database.go index 1e59f0908f..256db6d6ec 100644 --- a/trie/database.go +++ b/trie/database.go @@ -18,8 +18,10 @@ package trie import ( "errors" + "strings" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/triedb/hashdb" @@ -57,7 +59,7 @@ type backend interface { // // For hash scheme, there is no differentiation between diff layer nodes // and dirty disk layer nodes, so both are merged into the second return. - Size() (common.StorageSize, common.StorageSize) + Size() (common.StorageSize, common.StorageSize, common.StorageSize) // Update performs a state transition by committing dirty nodes contained // in the given set in order to update state from the specified parent to @@ -89,8 +91,22 @@ type Database struct { // the legacy hash-based scheme is used by default. func NewDatabase(diskdb ethdb.Database, config *Config) *Database { // Sanitize the config and use the default one if it's not specified. + dbScheme := rawdb.ReadStateScheme(diskdb) if config == nil { - config = HashDefaults + if dbScheme == rawdb.PathScheme { + config = &Config{ + PathDB: pathdb.Defaults, + } + } else { + config = HashDefaults + } + } + if config.PathDB == nil && config.HashDB == nil { + if dbScheme == rawdb.PathScheme { + config.PathDB = pathdb.Defaults + } else { + config.HashDB = hashdb.Defaults + } } var preimages *preimageStore if config.Preimages { @@ -101,12 +117,30 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database { diskdb: diskdb, preimages: preimages, } - if config.HashDB != nil && config.PathDB != nil { - log.Crit("Both 'hash' and 'path' mode are configured") - } - if config.PathDB != nil { + /* + * 1. First, initialize db according to the user config + * 2. Second, initialize the db according to the scheme already used by db + * 3. Last, use the default scheme, namely hash scheme + */ + if config.HashDB != nil { + if rawdb.ReadStateScheme(diskdb) == rawdb.PathScheme { + log.Warn("incompatible state scheme", "old", rawdb.PathScheme, "new", rawdb.HashScheme) + } + db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{}) + } else if config.PathDB != nil { + if rawdb.ReadStateScheme(diskdb) == rawdb.HashScheme { + log.Warn("incompatible state scheme", "old", rawdb.HashScheme, "new", rawdb.PathScheme) + } + db.backend = pathdb.New(diskdb, config.PathDB) + } else if strings.Compare(dbScheme, rawdb.PathScheme) == 0 { + if config.PathDB == nil { + config.PathDB = pathdb.Defaults + } db.backend = pathdb.New(diskdb, config.PathDB) } else { + if config.HashDB == nil { + config.HashDB = hashdb.Defaults + } db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{}) } return db @@ -151,16 +185,16 @@ func (db *Database) Commit(root common.Hash, report bool) error { // Size returns the storage size of diff layer nodes above the persistent disk // layer, the dirty nodes buffered within the disk layer, and the size of cached // preimages. -func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) { +func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) { var ( - diffs, nodes common.StorageSize - preimages common.StorageSize + diffs, nodes, immutablenodes common.StorageSize + preimages common.StorageSize ) - diffs, nodes = db.backend.Size() + diffs, nodes, immutablenodes = db.backend.Size() if db.preimages != nil { preimages = db.preimages.size() } - return diffs, nodes, preimages + return diffs, nodes, immutablenodes, preimages } // Initialized returns an indicator if the state data is already initialized @@ -318,3 +352,14 @@ func (db *Database) SetBufferSize(size int) error { } return pdb.SetBufferSize(size) } + +// Head return the top non-fork difflayer/disklayer root hash for rewinding. +// It's only supported by path-based database and will return empty hash for +// others. +func (db *Database) Head() common.Hash { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return common.Hash{} + } + return pdb.Head() +} diff --git a/trie/trie.go b/trie/trie.go index 07467ac69c..7e531054f0 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -21,13 +21,21 @@ import ( "bytes" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/trie/trienode" ) +var ( + trieGetTimer = metrics.NewRegisteredTimer("trie/get/time", nil) + trieReaderGetTimer = metrics.NewRegisteredTimer("trie/reader/get/time", nil) + trieReaderTotalTimer = metrics.NewRegisteredTimer("trie/reader/total/time", nil) +) + // Trie is a Merkle Patricia Trie. Use New to create a trie that sits on // top of a database. Whenever trie performs a commit operation, the generated // nodes will be gathered and returned in a set. Once the trie is committed, @@ -145,6 +153,10 @@ func (t *Trie) Get(key []byte) ([]byte, error) { if t.committed { return nil, ErrCommitted } + start := time.Now() + if metrics.EnabledExpensive { + defer func() { trieGetTimer.UpdateSince(start) }() + } value, newroot, didResolve, err := t.get(t.root, keybytesToHex(key), 0) if err == nil && didResolve { t.root = newroot @@ -177,7 +189,11 @@ func (t *Trie) get(origNode node, key []byte, pos int) (value []byte, newnode no } return value, n, didResolve, err case hashNode: + start := time.Now() child, err := t.resolveAndTrack(n, key[:pos]) + if metrics.EnabledExpensive { + trieReaderGetTimer.UpdateSince(start) + } if err != nil { return nil, n, true, err } @@ -585,6 +601,10 @@ func (t *Trie) resolve(n node, prefix []byte) (node, error) { // node's original value. The rlp-encoded blob is preferred to be loaded from // database because it's easy to decode node while complex to encode node to blob. func (t *Trie) resolveAndTrack(n hashNode, prefix []byte) (node, error) { + start := time.Now() + if metrics.EnabledExpensive { + defer func() { trieReaderTotalTimer.UpdateSince(start) }() + } blob, err := t.reader.node(prefix, common.BytesToHash(n)) if err != nil { return nil, err diff --git a/trie/triedb/hashdb/database.go b/trie/triedb/hashdb/database.go index 764ab24ec8..1f49491d01 100644 --- a/trie/triedb/hashdb/database.go +++ b/trie/triedb/hashdb/database.go @@ -627,7 +627,7 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n // // The first return will always be 0, representing the memory stored in unbounded // diff layers above the dirty cache. This is only available in pathdb. -func (db *Database) Size() (common.StorageSize, common.StorageSize) { +func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) { db.lock.RLock() defer db.lock.RUnlock() @@ -635,7 +635,7 @@ func (db *Database) Size() (common.StorageSize, common.StorageSize) { // the total memory consumption, the maintenance metadata is also needed to be // counted. var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize) - return 0, db.dirtiesSize + db.childrenSize + metadataSize + return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0 } // Close closes the trie database and releases all held resources. diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go new file mode 100644 index 0000000000..55d3b943b5 --- /dev/null +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -0,0 +1,482 @@ +package pathdb + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/fastcache" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie/trienode" +) + +var _ trienodebuffer = &asyncnodebuffer{} + +// asyncnodebuffer implement trienodebuffer interface, and async the nodecache +// to disk. +type asyncnodebuffer struct { + mux sync.RWMutex + current *nodecache + background *nodecache + isFlushing atomic.Bool + stopFlushing atomic.Bool +} + +// newAsyncNodeBuffer initializes the async node buffer with the provided nodes. +func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *asyncnodebuffer { + if nodes == nil { + nodes = make(map[common.Hash]map[string]*trienode.Node) + } + var size uint64 + for _, subset := range nodes { + for path, n := range subset { + size += uint64(len(n.Blob) + len(path)) + } + } + + return &asyncnodebuffer{ + current: newNodeCache(uint64(limit), size, nodes, layers), + background: newNodeCache(uint64(limit), 0, make(map[common.Hash]map[string]*trienode.Node), 0), + } +} + +// node retrieves the trie node with given node info. +func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) { + a.mux.RLock() + defer a.mux.RUnlock() + + node, err := a.current.node(owner, path, hash) + if err != nil { + return nil, err + } + if node == nil { + return a.background.node(owner, path, hash) + } + return node, nil +} + +// commit merges the dirty nodes into the nodebuffer. This operation won't take +// the ownership of the nodes map which belongs to the bottom-most diff layer. +// It will just hold the node references from the given map which are safe to +// copy. +func (a *asyncnodebuffer) commit(block uint64, nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { + a.mux.Lock() + defer a.mux.Unlock() + + err := a.current.commit(block, nodes, true) + if err != nil { + log.Crit("[BUG] failed to commit nodes to asyncnodebuffer", "error", err) + } + return a +} + +// revert is the reverse operation of commit. It also merges the provided nodes +// into the nodebuffer, the difference is that the provided node set should +// revert the changes made by the last state transition. +func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + a.mux.Lock() + defer a.mux.Unlock() + + var err error + a.current, err = a.current.merge(a.background) + if err != nil { + log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err) + } + a.background.reset() + return a.current.revert(db, nodes) +} + +// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur. +func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + return errors.New("not supported") +} + +// reset cleans up the disk cache. +func (a *asyncnodebuffer) reset() { + a.mux.Lock() + defer a.mux.Unlock() + + a.current.reset() + a.background.reset() +} + +// empty returns an indicator if nodebuffer contains any state transition inside. +func (a *asyncnodebuffer) empty() bool { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.empty() && a.background.empty() +} + +// flush persists the in-memory dirty trie node into the disk if the configured +// memory threshold is reached. Note, all data must be written atomically. +func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { + a.mux.Lock() + defer a.mux.Unlock() + + if a.stopFlushing.Load() { + return nil + } + + if force { + for { + if atomic.LoadUint64(&a.background.immutable) == 1 { + time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) + log.Info("waiting background memory table flushed into disk for forcing flush node buffer") + continue + } + atomic.StoreUint64(&a.current.immutable, 1) + return a.current.flush(db, clean, id, true) + } + } + + if a.current.size < a.current.limit { + return nil + } + + // background flush doing + if atomic.LoadUint64(&a.background.immutable) == 1 { + return nil + } + + atomic.StoreUint64(&a.current.immutable, 1) + a.current, a.background = a.background, a.current + + a.isFlushing.Store(true) + go func(persistID uint64) { + defer a.isFlushing.Store(false) + for { + err := a.background.flush(db, clean, persistID, true) + if err == nil { + a.background.reset() + log.Debug("succeed to flush background nodecache to disk", "state_id", persistID) + return + } + log.Error("failed to flush background nodecache to disk", "state_id", persistID, "error", err) + } + }(id) + return nil +} + +// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. +func (a *asyncnodebuffer) waitAndStopFlushing() { + a.stopFlushing.Store(true) + for a.isFlushing.Load() { + time.Sleep(time.Second) + log.Warn("waiting background memory table flushed into disk") + } +} + +// getAllNodes return all the trie nodes are cached in trienodebuffer. +func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { + a.mux.Lock() + defer a.mux.Unlock() + + cached, err := a.current.merge(a.background) + if err != nil { + log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err) + } + return cached.nodes +} + +// getLayers return the size of cached difflayers. +func (a *asyncnodebuffer) getLayers() uint64 { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.layers + a.background.layers +} + +// getSize return the trienodebuffer used size. +func (a *asyncnodebuffer) getSize() (uint64, uint64) { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.size, a.background.size +} + +// setClean set fastcache to trienodebuffer for cache the trie nodes, +// used for nodebufferlist. +func (a *asyncnodebuffer) setClean(clean *fastcache.Cache) { + return +} + +type nodecache struct { + block uint64 + layers uint64 // The number of diff layers aggregated inside + size uint64 // The size of aggregated writes + limit uint64 // The maximum memory allowance in bytes + nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path + immutable uint64 // The flag equal 1, flush nodes to disk background + + pre *nodecache + next *nodecache +} + +func newNodeCache(limit, size uint64, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodecache { + return &nodecache{ + layers: layers, + size: size, + limit: limit, + nodes: nodes, + immutable: 0, + } +} + +func (nc *nodecache) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) { + subset, ok := nc.nodes[owner] + if !ok { + return nil, nil + } + n, ok := subset[string(path)] + if !ok { + return nil, nil + } + if n.Hash != hash { + dirtyFalseMeter.Mark(1) + log.Error("Unexpected trie node in async node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash) + return nil, newUnexpectedNodeError("dirty", hash, n.Hash, owner, path, n.Blob) + } + return n, nil +} + +func (nc *nodecache) commit(block uint64, nodes map[common.Hash]map[string]*trienode.Node, checkStatus bool) error { + if checkStatus && atomic.LoadUint64(&nc.immutable) == 1 { + return errWriteImmutable + } + + if nc.block != 0 && nc.block >= block { + log.Crit("block number out of order", "pre_block", nc.block, "capping_block", block) + } + nc.block = block + var ( + delta int64 + overwrite int64 + overwriteSize int64 + ) + for owner, subset := range nodes { + current, exist := nc.nodes[owner] + if !exist { + // Allocate a new map for the subset instead of claiming it directly + // from the passed map to avoid potential concurrent map read/write. + // The nodes belong to original diff layer are still accessible even + // after merging, thus the ownership of nodes map should still belong + // to original layer and any mutation on it should be prevented. + current = make(map[string]*trienode.Node) + for path, n := range subset { + current[path] = n + delta += int64(len(n.Blob) + len(path)) + } + nc.nodes[owner] = current + continue + } + for path, n := range subset { + if orig, exist := current[path]; !exist { + delta += int64(len(n.Blob) + len(path)) + } else { + delta += int64(len(n.Blob) - len(orig.Blob)) + overwrite++ + overwriteSize += int64(len(orig.Blob) + len(path)) + } + current[path] = n + } + nc.nodes[owner] = current + } + nc.updateSize(delta) + nc.layers++ + frontBufferDeltaMeter.Mark(delta) + gcNodesMeter.Mark(overwrite) + gcBytesMeter.Mark(overwriteSize) + return nil +} + +func (nc *nodecache) updateSize(delta int64) { + size := int64(nc.size) + delta + if size >= 0 { + nc.size = uint64(size) + return + } + s := nc.size + nc.size = 0 + log.Error("Invalid pathdb buffer size", "prev", common.StorageSize(s), "delta", common.StorageSize(delta)) +} + +func (nc *nodecache) reset() { + atomic.StoreUint64(&nc.immutable, 0) + nc.layers = 0 + nc.size = 0 + nc.block = 0 + nc.pre = nil + nc.next = nil + nc.nodes = make(map[common.Hash]map[string]*trienode.Node) +} + +func (nc *nodecache) empty() bool { + return nc.layers == 0 +} + +func (nc *nodecache) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, checkStatus bool) error { + if checkStatus && atomic.LoadUint64(&nc.immutable) != 1 { + return errFlushMutable + } + + // Ensure the target state id is aligned with the internal counter. + head := rawdb.ReadPersistentStateID(db) + if head+nc.layers != id { + return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", nc.layers, head, id) + } + var ( + start = time.Now() + batch = db.NewBatchWithSize(int(float64(nc.size) * DefaultBatchRedundancyRate)) + ) + nodes := writeNodes(batch, nc.nodes, clean) + rawdb.WritePersistentStateID(batch, id) + + // Flush all mutations in a single batch + size := batch.ValueSize() + if err := batch.Write(); err != nil { + return err + } + backCommitlayerNumberMeter.Update(int64(nc.layers)) + commitBytesMeter.Mark(int64(size)) + commitNodesMeter.Mark(int64(nodes)) + commitTimeTimer.UpdateSince(start) + log.Debug("Persisted pathdb nodes", "nodes", len(nc.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + return nil +} + +func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) { + if nc == nil && nc1 == nil { + return nil, nil + } + if nc == nil || nc.empty() { + res := copyNodeCache(nc1) + atomic.StoreUint64(&res.immutable, 0) + return res, nil + } + if nc1 == nil || nc1.empty() { + res := copyNodeCache(nc) + atomic.StoreUint64(&res.immutable, 0) + return res, nil + } + if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) { + return nil, errIncompatibleMerge + } + + var ( + immutable *nodecache + mutable *nodecache + res = &nodecache{} + ) + if atomic.LoadUint64(&nc.immutable) == 1 { + immutable = nc + mutable = nc1 + } else { + immutable = nc1 + mutable = nc + } + res.size = immutable.size + mutable.size + res.layers = immutable.layers + mutable.layers + res.limit = immutable.limit + res.nodes = make(map[common.Hash]map[string]*trienode.Node) + for acc, subTree := range immutable.nodes { + if _, ok := res.nodes[acc]; !ok { + res.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + res.nodes[acc][path] = node + } + } + + for acc, subTree := range mutable.nodes { + if _, ok := res.nodes[acc]; !ok { + res.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + res.nodes[acc][path] = node + } + } + return res, nil +} + +func (nc *nodecache) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + if atomic.LoadUint64(&nc.immutable) == 1 { + return errRevertImmutable + } + + // Short circuit if no embedded state transition to revert. + if nc.layers == 0 { + return errStateUnrecoverable + } + nc.layers-- + + // Reset the entire buffer if only a single transition left. + if nc.layers == 0 { + nc.reset() + return nil + } + var delta int64 + for owner, subset := range nodes { + current, ok := nc.nodes[owner] + if !ok { + panic(fmt.Sprintf("non-existent subset (%x)", owner)) + } + for path, n := range subset { + orig, ok := current[path] + if !ok { + // There is a special case in MPT that one child is removed from + // a fullNode which only has two children, and then a new child + // with different position is immediately inserted into the fullNode. + // In this case, the clean child of the fullNode will also be + // marked as dirty because of node collapse and expansion. + // + // In case of database rollback, don't panic if this "clean" + // node occurs which is not present in buffer. + var nhash common.Hash + if owner == (common.Hash{}) { + _, nhash = rawdb.ReadAccountTrieNode(db, []byte(path)) + } else { + _, nhash = rawdb.ReadStorageTrieNode(db, owner, []byte(path)) + } + // Ignore the clean node in the case described above. + if nhash == n.Hash { + continue + } + panic(fmt.Sprintf("non-existent node (%x %v) blob: %v", owner, path, crypto.Keccak256Hash(n.Blob).Hex())) + } + current[path] = n + delta += int64(len(n.Blob)) - int64(len(orig.Blob)) + } + } + nc.updateSize(delta) + return nil +} + +func copyNodeCache(n *nodecache) *nodecache { + if n == nil { + return nil + } + nc := &nodecache{ + layers: n.layers, + size: n.size, + limit: n.limit, + immutable: atomic.LoadUint64(&n.immutable), + nodes: make(map[common.Hash]map[string]*trienode.Node), + } + for acc, subTree := range n.nodes { + if _, ok := nc.nodes[acc]; !ok { + nc.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + nc.nodes[acc][path] = node + } + } + return nc +} diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index dc64414e9b..94d0f13f24 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -52,6 +52,14 @@ const ( // Do not increase the buffer size arbitrarily, otherwise the system // pause time will increase when the database writes happen. DefaultBufferSize = 64 * 1024 * 1024 + + // DefaultBackgroundFlushInterval defines the default the wait interval + // that background node cache flush disk. + DefaultBackgroundFlushInterval = 3 + + // DefaultBatchRedundancyRate defines the batch size, compatible write + // size calculation is inaccurate + DefaultBatchRedundancyRate = 1.1 ) // layer is the interface implemented by all state layers which includes some @@ -86,6 +94,7 @@ type layer interface { // Config contains the settings for database. type Config struct { + SyncFlush bool // Flag of trienodebuffer sync flush cache to disk StateHistory uint64 // Number of recent blocks to maintain state history for CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes @@ -186,7 +195,7 @@ func New(diskdb ethdb.Database, config *Config) *Database { log.Crit("Failed to disable database", "err", err) // impossible to happen } } - log.Warn("Path-based state scheme is an experimental feature") + log.Warn("Path-based state scheme is an experimental feature", "sync", db.config.SyncFlush) return db } @@ -303,7 +312,10 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0))) + nb := NewTrieNodeBuffer(db.diskdb, db.config.SyncFlush, db.bufferSize, nil, 0) + dl := newDiskLayer(root, 0, db, nil, nb) + nb.setClean(dl.cleans) + db.tree.reset(dl) // Re-enable the database as the final step. db.waitSync = false @@ -410,16 +422,16 @@ func (db *Database) Close() error { // Size returns the current storage size of the memory cache in front of the // persistent database layer. -func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize) { +func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, immutableNodes common.StorageSize) { db.tree.forEach(func(layer layer) { if diff, ok := layer.(*diffLayer); ok { diffs += common.StorageSize(diff.memory) } if disk, ok := layer.(*diskLayer); ok { - nodes += disk.size() + nodes, immutableNodes = disk.size() } }) - return diffs, nodes + return diffs, nodes, immutableNodes } // Initialized returns an indicator if the state data is already @@ -452,6 +464,13 @@ func (db *Database) Scheme() string { return rawdb.PathScheme } +// Head return the top non-fork difflayer/disklayer root hash for rewinding. +func (db *Database) Head() common.Hash { + db.lock.Lock() + defer db.lock.Unlock() + return db.tree.front() +} + // modifyAllowed returns the indicator if mutation is allowed. This function // assumes the db.lock is already held. func (db *Database) modifyAllowed() error { diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index d3b6419cc5..ebf69c81d4 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -25,25 +25,86 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" "golang.org/x/crypto/sha3" ) +// trienodebuffer is a collection of modified trie nodes to aggregate the disk +// write. The content of the trienodebuffer must be checked before diving into +// disk (since it basically is not-yet-written data). +type trienodebuffer interface { + // node retrieves the trie node with given node info. + node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) + + // commit merges the dirty nodes into the trienodebuffer. This operation won't take + // the ownership of the nodes map which belongs to the bottom-most diff layer. + // It will just hold the node references from the given map which are safe to + // copy. + commit(block uint64, nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer + + // revert is the reverse operation of commit. It also merges the provided nodes + // into the trienodebuffer, the difference is that the provided node set should + // revert the changes made by the last state transition. + revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error + + // flush persists the in-memory dirty trie node into the disk if the configured + // memory threshold is reached. Note, all data must be written atomically. + flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error + + // setSize sets the buffer size to the provided number, and invokes a flush + // operation if the current memory usage exceeds the new limit. + setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error + + // reset cleans up the disk cache. + reset() + + // empty returns an indicator if trienodebuffer contains any state transition inside. + empty() bool + + // getSize return the trienodebuffer used size. + getSize() (uint64, uint64) + + // getAllNodes return all the trie nodes are cached in trienodebuffer. + getAllNodes() map[common.Hash]map[string]*trienode.Node + + // getLayers return the size of cached difflayers. + getLayers() uint64 + + // waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. + waitAndStopFlushing() + + // setClean set fastcache to trienodebuffer for cache the trie nodes, used for nodebufferlist. + setClean(clean *fastcache.Cache) +} + +func NewTrieNodeBuffer(db ethdb.Database, sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer { + if sync { + log.Info("new sync node buffer", "limit", common.StorageSize(limit), "layers", layers) + return newNodeBuffer(limit, nodes, layers) + } + //log.Info("new async node buffer", "limit", common.StorageSize(limit), "layers", layers) + //return newAsyncNodeBuffer(limit, nodes, layers) + + log.Info("new node buffer list", "limit", common.StorageSize(limit), "layers", layers) + return newNodeBufferList(db, uint64(limit), nodes, layers) +} + // diskLayer is a low level persistent layer built on top of a key-value store. type diskLayer struct { root common.Hash // Immutable, root hash to which this layer was made for id uint64 // Immutable, corresponding state id db *Database // Path-based trie database cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs - buffer *nodebuffer // Node buffer to aggregate writes + buffer trienodebuffer // Node buffer to aggregate writes stale bool // Signals that the layer became stale (state progressed) lock sync.RWMutex // Lock used to protect stale flag } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer trienodebuffer) *diskLayer { // Initialize a clean cache if the memory allowance is not zero // or reuse the provided cache if it is not nil (inherited from // the original disk layer). @@ -198,7 +259,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // diff layer, and flush the content in disk layer if there are too // many nodes cached. The clean cache is inherited from the original // disk layer for reusing. - ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes)) + ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.block, bottom.nodes)) err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force) if err != nil { return nil, err @@ -266,14 +327,15 @@ func (dl *diskLayer) setBufferSize(size int) error { } // size returns the approximate size of cached nodes in the disk layer. -func (dl *diskLayer) size() common.StorageSize { +func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) { dl.lock.RLock() defer dl.lock.RUnlock() if dl.stale { - return 0 + return 0, 0 } - return common.StorageSize(dl.buffer.size) + dirtyNodes, dirtyimmutableNodes := dl.buffer.getSize() + return common.StorageSize(dirtyNodes), common.StorageSize(dirtyimmutableNodes) } // resetCache releases the memory held by clean cache to prevent memory leak. diff --git a/trie/triedb/pathdb/errors.go b/trie/triedb/pathdb/errors.go index 78ee4459fe..46b986f884 100644 --- a/trie/triedb/pathdb/errors.go +++ b/trie/triedb/pathdb/errors.go @@ -49,6 +49,20 @@ var ( // errUnexpectedNode is returned if the requested node with specified path is // not hash matched with expectation. errUnexpectedNode = errors.New("unexpected node") + + // errWriteImmutable is returned if write to background immutable nodecache + // under asyncnodebuffer + errWriteImmutable = errors.New("write immutable nodecache") + + // errFlushMutable is returned if flush the background mutable nodecache + // to disk, under asyncnodebuffer + errFlushMutable = errors.New("flush mutable nodecache") + + // errIncompatibleMerge is returned when merge node cache occurs error. + errIncompatibleMerge = errors.New("incompatible nodecache merge") + + // errRevertImmutable is returned if revert the background immutable nodecache + errRevertImmutable = errors.New("revert immutable nodecache") ) func newUnexpectedNodeError(loc string, expHash common.Hash, gotHash common.Hash, owner common.Hash, path []byte, blob []byte) error { diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index ac770763e3..1dadb433fe 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -130,7 +130,10 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0)) + nb := NewTrieNodeBuffer(db.diskdb, db.config.SyncFlush, db.bufferSize, nil, 0) + dl := newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nb) + nb.setClean(dl.cleans) + return dl } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -170,7 +173,9 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored)) + nb := NewTrieNodeBuffer(db.diskdb, db.config.SyncFlush, db.bufferSize, nodes, id-stored) + base := newDiskLayer(root, id, db, nil, nb) + nb.setClean(base.cleans) return base, nil } @@ -260,8 +265,9 @@ func (dl *diskLayer) journal(w io.Writer) error { return err } // Step three, write all unwritten nodes into the journal - nodes := make([]journalNodes, 0, len(dl.buffer.nodes)) - for owner, subset := range dl.buffer.nodes { + bufferNodes := dl.buffer.getAllNodes() + nodes := make([]journalNodes, 0, len(bufferNodes)) + for owner, subset := range bufferNodes { entry := journalNodes{Owner: owner} for path, node := range subset { entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) @@ -271,7 +277,7 @@ func (dl *diskLayer) journal(w io.Writer) error { if err := rlp.Encode(w, nodes); err != nil { return err } - log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(dl.buffer.nodes)) + log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes)) return nil } @@ -337,6 +343,10 @@ func (dl *diffLayer) journal(w io.Writer) error { // flattening everything down (bad for reorgs). And this function will mark the // database as read-only to prevent all following mutation to disk. func (db *Database) Journal(root common.Hash) error { + // Run the journaling + db.lock.Lock() + defer db.lock.Unlock() + // Retrieve the head layer to journal from. l := db.tree.get(root) if l == nil { @@ -344,16 +354,14 @@ func (db *Database) Journal(root common.Hash) error { } disk := db.tree.bottom() if l, ok := l.(*diffLayer); ok { - log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) + log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.getLayers()) } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) - log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) + log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.getLayers()) } start := time.Now() - // Run the journaling - db.lock.Lock() - defer db.lock.Unlock() - + // wait and stop the flush trienodebuffer, for asyncnodebuffer need fixed diskroot + disk.buffer.waitAndStopFlushing() // Short circuit if the database is in read only mode. if db.readOnly { return errDatabaseReadOnly diff --git a/trie/triedb/pathdb/layertree.go b/trie/triedb/pathdb/layertree.go index d314779910..9d495741b1 100644 --- a/trie/triedb/pathdb/layertree.go +++ b/trie/triedb/pathdb/layertree.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" ) @@ -190,6 +191,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { remove(root) } } + layerNumberMeter.Update(int64(len(tree.layers))) return nil } @@ -212,3 +214,46 @@ func (tree *layerTree) bottom() *diskLayer { } return current.(*diskLayer) } + +// front return the top non-fork difflayer/disklayer root hash for rewinding. +func (tree *layerTree) front() common.Hash { + tree.lock.RLock() + defer tree.lock.RUnlock() + + chain := make(map[common.Hash][]common.Hash) + var base common.Hash + for _, layer := range tree.layers { + switch dl := layer.(type) { + case *diskLayer: + if dl.stale { + log.Info("pathdb top disklayer is stale") + return base + } + base = dl.rootHash() + case *diffLayer: + if _, ok := chain[dl.parentLayer().rootHash()]; !ok { + chain[dl.parentLayer().rootHash()] = make([]common.Hash, 0) + } + chain[dl.parentLayer().rootHash()] = append(chain[dl.parentLayer().rootHash()], dl.rootHash()) + default: + log.Crit("unsupported layer type") + } + } + if (base == common.Hash{}) { + log.Info("pathdb top difflayer is empty") + return base + } + parent := base + for { + children, ok := chain[parent] + if !ok { + log.Info("pathdb top difflayer", "root", parent) + return parent + } + if len(children) != 1 { + log.Info("pathdb top difflayer is forked", "common ancestor root", parent) + return parent + } + parent = children[0] + } +} diff --git a/trie/triedb/pathdb/metrics.go b/trie/triedb/pathdb/metrics.go index 9e2b1dcbf5..1f79b9e17d 100644 --- a/trie/triedb/pathdb/metrics.go +++ b/trie/triedb/pathdb/metrics.go @@ -47,4 +47,17 @@ var ( historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil) historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil) historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil) + + layerNumberMeter = metrics.NewRegisteredGauge("pathdb/layer/number", nil) + frontBufferDeltaMeter = metrics.NewRegisteredMeter("pathdb/front/buffer/delta", nil) + backCommitlayerNumberMeter = metrics.NewRegisteredGauge("pathdb/back/commit/layer/number", nil) + + // only for node buffer list + nodeBufferListSizeMeter = metrics.NewRegisteredMeter("pathdb/nodebufferlist/size", nil) + baseNodeBufferSizeMeter = metrics.NewRegisteredMeter("pathdb/basenodebuffer/size", nil) + nodeBufferCountMeter = metrics.NewRegisteredMeter("pathdb/nodebufferlist/count", nil) + nodeBufferLayerMeter = metrics.NewRegisteredMeter("pathdb/nodebufferlist/layer", nil) + baseNodeBufferLayerMeter = metrics.NewRegisteredMeter("pathdb/basenodebuffer/layer", nil) + nodeBufferPersistID = metrics.NewRegisteredMeter("pathdb/nodebufferlist/persistid", nil) + nodeBufferLastBlock = metrics.NewRegisteredMeter("pathdb/nodebufferlist/lastblock", nil) ) diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 4a7d328b9a..9bd54a2794 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -29,6 +29,8 @@ import ( "github.com/ethereum/go-ethereum/trie/trienode" ) +var _ trienodebuffer = &nodebuffer{} + // nodebuffer is a collection of modified trie nodes to aggregate the disk // write. The content of the nodebuffer must be checked before diving into // disk (since it basically is not-yet-written data). @@ -80,7 +82,7 @@ func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*tr // the ownership of the nodes map which belongs to the bottom-most diff layer. // It will just hold the node references from the given map which are safe to // copy. -func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *nodebuffer { +func (b *nodebuffer) commit(block uint64, nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { var ( delta int64 overwrite int64 @@ -273,3 +275,27 @@ func cacheKey(owner common.Hash, path []byte) []byte { } return append(owner.Bytes(), path...) } + +// getSize return the nodebuffer used size. +func (b *nodebuffer) getSize() (uint64, uint64) { + return b.size, 0 +} + +// getAllNodes return all the trie nodes are cached in nodebuffer. +func (b *nodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { + return b.nodes +} + +// getLayers return the size of cached difflayers. +func (b *nodebuffer) getLayers() uint64 { + return b.layers +} + +// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. +func (b *nodebuffer) waitAndStopFlushing() {} + +// setClean set fastcache to trienodebuffer for cache the trie nodes, +// used for nodebufferlist. +func (b *nodebuffer) setClean(clean *fastcache.Cache) { + return +} diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go new file mode 100644 index 0000000000..12505b08ed --- /dev/null +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -0,0 +1,507 @@ +package pathdb + +import ( + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie/trienode" +) + +const ( + // mergeBlockInterval defines the interval of block in one node cache. + mergeBlockInterval = 1800 + + // mergeNodeCacheInterval defines the interval to collect nodes to flush disk. + mergeNodeCacheInterval = 3 + + // reserveNodeCacheNumber defines the reserve number of node cache in node list. + reserveNodeCacheNumber = 3 +) + +var _ trienodebuffer = &nodebufferlist{} + +var nodeCachePool = sync.Pool{ + New: func() interface{} { + return new(nodecache) + }, +} + +// nodebufferlist implements the trienodebuffer interface, It is designed to meet +// the withdraw proof function of opBNB at the storage layer while taking into +// account high performance. It is a nodecache based queue that stores +// mergeBlockInterval compressed block difflayers per nodecache. It also has one +// base nodecache that collects the list's trie nodes to write disk. +type nodebufferlist struct { + db ethdb.Database + clean *fastcache.Cache + limit uint64 + block uint64 + + head *nodecache + tail *nodecache + + mux sync.RWMutex + basemMux sync.RWMutex + base *nodecache + size uint64 + count uint64 + persistID uint64 + layers uint64 + + isFlushing atomic.Bool + stopFlushing atomic.Bool + stopCh chan struct{} +} + +// newNodeBufferList initializes the node buffer list with the provided nodes +func newNodeBufferList( + db ethdb.Database, + limit uint64, + nodes map[common.Hash]map[string]*trienode.Node, + layers uint64) *nodebufferlist { + if nodes == nil { + nodes = make(map[common.Hash]map[string]*trienode.Node) + } + var size uint64 + for _, subset := range nodes { + for path, n := range subset { + size += uint64(len(n.Blob) + len(path)) + } + } + nc := newNodeCache(limit, size, nodes, layers) + ele := newNodeCache(limit, 0, make(map[common.Hash]map[string]*trienode.Node), 0) + nf := &nodebufferlist{ + db: db, + limit: limit, + base: nc, + head: ele, + tail: ele, + count: 1, + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + } + go nf.loop() + return nf +} + +// node retrieves the trie node with given node info. +func (nf *nodebufferlist) node(owner common.Hash, path []byte, hash common.Hash) (node *trienode.Node, err error) { + nf.mux.RLock() + find := func(nc *nodecache) bool { + subset, ok := nc.nodes[owner] + if !ok { + return true + } + n, ok := subset[string(path)] + if !ok { + return true + } + if n.Hash != hash { + log.Error("Unexpected trie node in node buffer list", "owner", owner, "path", path, "expect", hash, "got", n.Hash) + err = newUnexpectedNodeError("dirty", hash, n.Hash, owner, path, n.Blob) + return false + } + node = n + return false + } + nf.traverse(find) + if err != nil { + nf.mux.RUnlock() + return nil, err + } + if node != nil { + nf.mux.RUnlock() + return node, nil + } + nf.mux.RUnlock() + + nf.basemMux.RLock() + node, err = nf.base.node(owner, path, hash) + nf.basemMux.RUnlock() + return node, err +} + +// commit merges the dirty nodes into the trienodebuffer. This operation won't take +// the ownership of the nodes map which belongs to the bottom-most diff layer. +// It will just hold the node references from the given map which are safe to +// copy. +func (nf *nodebufferlist) commit(block uint64, nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { + nf.mux.Lock() + defer nf.mux.Unlock() + + oldSize := nf.head.size + err := nf.head.commit(block, nodes, false) + if err != nil { + log.Crit("failed to commit nodes to node buffer list", "error", err) + } + + nf.block = block + nf.size = nf.size + nf.head.size - oldSize + nf.layers++ + + nodeBufferLastBlock.Mark(int64(nf.block)) + nodeBufferListSizeMeter.Mark(int64(nf.size)) + nodeBufferLayerMeter.Mark(int64(nf.layers)) + + if block != 0 && block%mergeBlockInterval == 0 { + nc := nodeCachePool.Get().(*nodecache) + nc.reset() + nf.pushFront(nc) + } + return nf +} + +// revert is the reverse operation of commit. It also merges the provided nodes +// into the trienodebuffer, the difference is that the provided node set should +// revert the changes made by the last state transition. +func (nf *nodebufferlist) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + // hang user read/write and background write, + nf.mux.Lock() + nf.basemMux.Lock() + defer nf.mux.Unlock() + defer nf.basemMux.Unlock() + + merge := func(buffer *nodecache) bool { + if err := nf.base.commit(buffer.block, buffer.nodes, false); err != nil { + log.Crit("failed to commit nodes to base node buffer", "error", err) + } + baseNodeBufferSizeMeter.Mark(int64(nf.base.size)) + baseNodeBufferLayerMeter.Mark(int64(nf.base.layers)) + + del := nf.popBack() + nf.base.layers-- + nf.base.layers += del.layers + nodeCachePool.Put(del) + return true + } + nf.traverseReverse(merge) + nc := nodeCachePool.Get().(*nodecache) + nc.reset() + nf.head = nc + nf.tail = nc + + return nf.base.revert(nf.db, nodes) +} + +// flush persists the in-memory dirty trie node into the disk if the configured +// memory threshold is reached. Note, all data must be written atomically. +func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { + if nf.clean == nil { + nf.clean = clean + } + if !force { + return nil + } + + // hang user read/write and background write + nf.mux.Lock() + nf.basemMux.Lock() + defer nf.mux.Unlock() + defer nf.basemMux.Unlock() + + nf.stopFlushing.Store(true) + defer nf.stopFlushing.Store(false) + for { + if nf.isFlushing.Swap(true) { + time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) + log.Info("waiting base node cache flushed into disk for forcing flush node buffer") + continue + } else { + break + } + } + + commitFunc := func(buffer *nodecache) bool { + if err := nf.base.commit(buffer.block, buffer.nodes, false); err != nil { + log.Crit("failed to commit nodes to base node buffer", "error", err) + } + del := nf.popBack() + nf.base.layers-- + nf.base.layers += del.layers + + baseNodeBufferSizeMeter.Mark(int64(nf.base.size)) + baseNodeBufferLayerMeter.Mark(int64(nf.base.layers)) + + nodeCachePool.Put(del) + return true + } + nf.traverseReverse(commitFunc) + persistID := nf.persistID + nf.base.layers + err := nf.base.flush(nf.db, nf.clean, persistID, false) + if err != nil { + log.Crit("failed to flush base node buffer to disk", "error", err) + } + nf.isFlushing.Store(false) + nf.base.reset() + nf.persistID = persistID + + baseNodeBufferSizeMeter.Mark(int64(nf.base.size)) + baseNodeBufferLayerMeter.Mark(int64(nf.base.layers)) + nodeBufferPersistID.Mark(int64(nf.persistID)) + return nil +} + +// setSize sets the buffer size to the provided number, and invokes a flush +// operation if the current memory usage exceeds the new limit. +func (nf *nodebufferlist) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + return errors.New("not supported") +} + +// reset cleans up the disk cache. +func (nf *nodebufferlist) reset() { + nf.mux.Lock() + defer nf.mux.Unlock() + + for { + nc := nf.popBack() + if nc == nil { + break + } + nodeCachePool.Put(nc) + } + buffer := nodeCachePool.Get().(*nodecache) + buffer.reset() + nf.head = buffer + nf.tail = buffer + nf.base.reset() + + baseNodeBufferSizeMeter.Mark(int64(nf.base.size)) + baseNodeBufferLayerMeter.Mark(int64(nf.base.layers)) +} + +// empty returns an indicator if trienodebuffer contains any state transition inside +func (nf *nodebufferlist) empty() bool { + return nf.getLayers() == 0 +} + +// getSize return the trienodebuffer used size. +func (nf *nodebufferlist) getSize() (uint64, uint64) { + // no lock, the return vals are used to log, not strictly correct + return nf.size, nf.base.size +} + +// getAllNodes return all the trie nodes are cached in trienodebuffer. +func (nf *nodebufferlist) getAllNodes() map[common.Hash]map[string]*trienode.Node { + nf.mux.Lock() + nf.basemMux.Lock() + defer nf.mux.Unlock() + defer nf.basemMux.Unlock() + + nc := nodeCachePool.Get().(*nodecache) + nc.reset() + if err := nc.commit(nf.base.block, nf.base.nodes, false); err != nil { + log.Crit("failed to commit nodes to node buffer", "error", err) + } + merge := func(buffer *nodecache) bool { + if err := nc.commit(buffer.block, buffer.nodes, false); err != nil { + log.Crit("failed to commit nodes to node buffer", "error", err) + } + return true + } + nf.traverseReverse(merge) + return nc.nodes +} + +// getLayers return the size of cached difflayers. +func (nf *nodebufferlist) getLayers() uint64 { + nf.mux.RLock() + defer nf.mux.RUnlock() + + return nf.layers + nf.base.layers +} + +// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. +func (nf *nodebufferlist) waitAndStopFlushing() { + close(nf.stopCh) + nf.stopFlushing.Store(true) + for nf.isFlushing.Load() { + time.Sleep(time.Second) + log.Warn("waiting background node buffer flushed into disk") + } +} + +// setClean sets fastcache to trienodebuffer for cache the trie nodes, used for nodebufferlist. +func (nf *nodebufferlist) setClean(clean *fastcache.Cache) { + nf.mux.Lock() + defer nf.mux.Unlock() + nf.clean = clean +} + +// pushFront push cache to the nodebufferlist head. +func (nf *nodebufferlist) pushFront(cache *nodecache) { + if cache == nil { + return + } + cache.pre = nil + cache.next = nf.head + nf.head.pre = cache + nf.head = cache + + nf.size += cache.size + nf.layers += cache.layers + nf.count++ + + nodeBufferListSizeMeter.Mark(int64(nf.size)) + nodeBufferCountMeter.Mark(int64(nf.count)) + nodeBufferLayerMeter.Mark(int64(nf.layers)) + return +} + +// pop the nodebufferlist tail element. +func (nf *nodebufferlist) popBack() *nodecache { + if nf.tail == nil { + return nil + } + tag := nf.tail + nf.tail = nf.tail.pre + if nf.tail != nil { + nf.tail.next = nil + } + + nf.size -= tag.size + if nf.size < 0 { + log.Warn("node buffer list size less 0", "old", nf.size, "dealt", tag.size) + nf.size = 0 + } + nf.layers -= tag.layers + if nf.layers < 0 { + log.Warn("node buffer list layers less 0", "old", nf.layers, "dealt", tag.layers) + nf.layers = 0 + } + nf.count-- + if nf.count < 0 { + log.Warn("node buffer list count less 0", "old", nf.count) + nf.count = 0 + } + + nodeBufferListSizeMeter.Mark(int64(nf.size)) + nodeBufferCountMeter.Mark(int64(nf.count)) + nodeBufferLayerMeter.Mark(int64(nf.layers)) + return tag +} + +// traverse iterates the nodebufferlist and call the cb. +func (nf *nodebufferlist) traverse(cb func(*nodecache) bool) { + cursor := nf.head + for { + if cursor == nil { + return + } + next := cursor.next + if !cb(cursor) { + break + } + cursor = next + } + return +} + +// traverseReverse iterates the nodebufferlist in reverse and call the cb. +func (nf *nodebufferlist) traverseReverse(cb func(*nodecache) bool) { + cursor := nf.tail + for { + if cursor == nil { + return + } + pre := cursor.pre + if !cb(cursor) { + break + } + cursor = pre + } + return +} + +// diffToBase calls traverseReverse and merges the nodecache's nodes to +// base node buffer, if up to limit size and flush to disk. It is called +// periodically in the background +func (nf *nodebufferlist) diffToBase() { + commitFunc := func(buffer *nodecache) bool { + if nf.base.size >= nf.limit { + log.Debug("base node buffer need write disk immediately") + return false + } + if nf.count < reserveNodeCacheNumber { + log.Debug("node buffer list less, waiting more difflayers are committed") + return false + } + if buffer.block%mergeBlockInterval != 0 { + log.Crit("committed block number misaligned", "block", buffer.block) + } + + nf.basemMux.Lock() + err := nf.base.commit(buffer.block, buffer.nodes, false) + nf.basemMux.Unlock() + if err != nil { + log.Info("failed to commit nodes to base node buffer", "error", err) + } + del := nf.popBack() + nf.base.layers-- + nf.base.layers += del.layers + nodeCachePool.Put(del) + + baseNodeBufferSizeMeter.Mark(int64(nf.base.size)) + baseNodeBufferLayerMeter.Mark(int64(nf.base.layers)) + + nf.report() + return true + } + nf.traverseReverse(commitFunc) +} + +// backgroundFlush flush base node buffer to disk. +func (nf *nodebufferlist) backgroundFlush() { + nf.basemMux.RLock() + persistID := nf.persistID + nf.base.layers + nf.basemMux.RUnlock() + err := nf.base.flush(nf.db, nf.clean, persistID, false) + if err != nil { + log.Crit("failed to flush base node buffer to disk", "error", err) + } + nf.basemMux.Lock() + nf.base.reset() + nf.persistID = persistID + nf.basemMux.Unlock() + baseNodeBufferSizeMeter.Mark(int64(nf.base.size)) + baseNodeBufferLayerMeter.Mark(int64(nf.base.layers)) + nodeBufferPersistID.Mark(int64(nf.persistID)) +} + +// loop runs the background task, collects the nodes for writing to disk. +func (nf *nodebufferlist) loop() { + mergeTicker := time.NewTicker(time.Second * mergeNodeCacheInterval) + for { + select { + case <-nf.stopCh: + return + case <-mergeTicker.C: + nf.diffToBase() + if nf.base.size > nf.limit { + if nf.stopFlushing.Load() { + nf.mux.Unlock() + continue + } + if nf.isFlushing.Swap(true) { + nf.mux.Unlock() + continue + } + nf.backgroundFlush() + nf.isFlushing.Swap(false) + } + } + } +} + +// report logs the nodebufferlist info for monitor. +func (nf *nodebufferlist) report() { + log.Info("node buffer list info", "block_number", nf.block, "layers", nf.layers, + "node_buffer_count", nf.count, "persist_id", nf.persistID, "list_size", common.StorageSize(nf.size), + "base_buffer_size", common.StorageSize(nf.base.size), "base_buffer_layer", nf.base.layers) +}