Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer list #68

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ var (
Usage: "Scheme to use for storing ethereum state ('hash' or 'path')",
Category: flags.StateCategory,
}
PathDBSyncFlag = &cli.BoolFlag{
Name: "pathdb.sync",
Usage: "sync flush nodes cache to disk in path schema",
Value: false,
Category: flags.StateCategory,
}
StateHistoryFlag = &cli.Uint64Flag{
Name: "history.state",
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
Expand Down Expand Up @@ -1838,6 +1844,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions")
cfg.TransactionHistory = ctx.Uint64(TxLookupLimitFlag.Name)
}
if ctx.IsSet(PathDBSyncFlag.Name) {
cfg.PathSyncFlush = true
}
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
cfg.TransactionHistory = 0
log.Warn("Disabled transaction unindexing for archive node")
Expand Down
26 changes: 16 additions & 10 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type CacheConfig struct {
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -163,6 +164,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SyncFlush: c.PathSyncFlush,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
Expand Down Expand Up @@ -372,6 +374,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if bc.triedb.Scheme() == rawdb.PathScheme {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
}
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)

Expand Down Expand Up @@ -1042,7 +1050,7 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
log.Error("Dangling trie nodes after full cleanup")
}
}
Expand Down Expand Up @@ -1465,8 +1473,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
_, nodes, _, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
Expand Down Expand Up @@ -1882,12 +1890,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read
trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read
blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
blockExecutionTimer.Update(ptime) // The time spent on EVM processing
blockValidationTimer.Update(vtime) // The time spent on block validation

// Write the block to the chain and get the status.
var (
Expand All @@ -1910,7 +1916,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them

blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockWriteTimer.UpdateSince(wstart)
blockInsertTimer.UpdateSince(start)

// Report the import stats before returning the various results
Expand All @@ -1921,8 +1927,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if bc.snaps != nil {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead)
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
3 changes: 2 additions & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand Down Expand Up @@ -73,6 +73,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
}
context = append(context, []interface{}{"triedirty", triebufNodes}...)
context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...)

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/accessors_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package rawdb

import (
"encoding/binary"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

// ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled.
Expand Down Expand Up @@ -74,6 +76,10 @@ func DeleteSnapshotRoot(db ethdb.KeyValueWriter) {

// ReadAccountSnapshot retrieves the snapshot entry of an account trie leaf.
func ReadAccountSnapshot(db ethdb.KeyValueReader, hash common.Hash) []byte {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { rawdbGetAccountSnapNodeTimer.UpdateSince(start) }()
}
data, _ := db.Get(accountSnapshotKey(hash))
return data
}
Expand All @@ -94,6 +100,10 @@ func DeleteAccountSnapshot(db ethdb.KeyValueWriter, hash common.Hash) {

// ReadStorageSnapshot retrieves the snapshot entry of an storage trie leaf.
func ReadStorageSnapshot(db ethdb.KeyValueReader, accountHash, storageHash common.Hash) []byte {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { rawdbGetStorageSnapNodeTimer.UpdateSince(start) }()
}
data, _ := db.Get(storageSnapshotKey(accountHash, storageHash))
return data
}
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/accessors_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package rawdb
import (
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -68,6 +70,10 @@ func (h *hasher) release() {
// ReadAccountTrieNode retrieves the account trie node and the associated node
// hash with the specified node path.
func ReadAccountTrieNode(db ethdb.KeyValueReader, path []byte) ([]byte, common.Hash) {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { rawdbGetAccountTrieNodeTimer.UpdateSince(start) }()
}
data, err := db.Get(accountTrieNodeKey(path))
if err != nil {
return nil, common.Hash{}
Expand Down Expand Up @@ -116,6 +122,10 @@ func DeleteAccountTrieNode(db ethdb.KeyValueWriter, path []byte) {
// ReadStorageTrieNode retrieves the storage trie node and the associated node
// hash with the specified node path.
func ReadStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path []byte) ([]byte, common.Hash) {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { rawdbGetStorageTrieNodeTimer.UpdateSince(start) }()
}
data, err := db.Get(storageTrieNodeKey(accountHash, path))
if err != nil {
return nil, common.Hash{}
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package rawdb

import "github.com/ethereum/go-ethereum/metrics"

var (
rawdbGetAccountTrieNodeTimer = metrics.NewRegisteredTimer("rawdb/get/account/trienode/time", nil)
rawdbGetStorageTrieNodeTimer = metrics.NewRegisteredTimer("rawdb/get/storage/trienode/time", nil)
rawdbGetAccountSnapNodeTimer = metrics.NewRegisteredTimer("rawdb/get/account/snapnode/time", nil)
rawdbGetStorageSnapNodeTimer = metrics.NewRegisteredTimer("rawdb/get/storage/snapnode/time", nil)
)
10 changes: 10 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -28,9 +29,14 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
)

var (
processTxTimer = metrics.NewRegisteredTimer("process/tx/time", nil)
)

// StateProcessor is a basic Processor, which takes care of transitioning
// state from one point to another.
//
Expand Down Expand Up @@ -88,6 +94,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
start := time.Now()
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
Expand All @@ -99,6 +106,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
if metrics.EnabledExpensive {
processTxTimer.UpdateSince(start)
}
}
// Fail if Shanghai not enabled and len(withdrawals) is non-zero.
withdrawals := block.Withdrawals()
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
StateHistory: config.StateHistory,
StateScheme: scheme,
TrieCommitInterval: config.TrieCommitInterval,
PathSyncFlush: config.PathSyncFlush,
}
)
// Override the chain config with provided settings.
Expand Down
3 changes: 2 additions & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type Config struct {
// State scheme represents the scheme used to store ethereum states and trie
// nodes on top. It can be 'hash', 'path', or none which means use the scheme
// consistent with persistent state.
StateScheme string `toml:",omitempty"`
StateScheme string `toml:",omitempty"`
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
parent = root
}
if report {
_, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
_, nodes, immutablenodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs)
}
return statedb, func() { triedb.Dereference(block.Root()) }, nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2, s3 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3 > defaultTracechainMemLimit
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func (db *Database) Has(key []byte) (bool, error) {

// Get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { ethdb.EthdbGetTimer.UpdateSince(start) }()
}
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
Expand All @@ -199,11 +203,19 @@ func (db *Database) Get(key []byte) ([]byte, error) {

// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { ethdb.EthdbPutTimer.UpdateSince(start) }()
}
return db.db.Put(key, value, nil)
}

// Delete removes the key from the key-value store.
func (db *Database) Delete(key []byte) error {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { ethdb.EthdbDeleteTimer.UpdateSince(start) }()
}
return db.db.Delete(key, nil)
}

Expand Down Expand Up @@ -414,6 +426,10 @@ func (b *batch) ValueSize() int {

// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
start := time.Now()
if metrics.EnabledExpensive {
defer func() { ethdb.EthdbBatchWriteTimer.UpdateSince(start) }()
}
return b.db.Write(b.b, nil)
}

Expand Down
10 changes: 10 additions & 0 deletions ethdb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ethdb

import "github.com/ethereum/go-ethereum/metrics"

var (
EthdbGetTimer = metrics.NewRegisteredTimer("ethdb/get/time", nil)
EthdbPutTimer = metrics.NewRegisteredTimer("ethdb/put/time", nil)
EthdbDeleteTimer = metrics.NewRegisteredTimer("ethdb/delete/time", nil)
EthdbBatchWriteTimer = metrics.NewRegisteredTimer("ethdb/batch/write/time", nil)
)
Loading
Loading