Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config change] Improve BlocksReExecutor implementation #2714

Merged
merged 11 commits into from
Nov 7, 2024
177 changes: 130 additions & 47 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,31 @@ import (
"math/rand"
"runtime"
"strings"
"sync"

"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
)

type Config struct {
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
BlocksPerThread uint64 `koanf:"blocks-per-thread"`
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
MinBlocksPerThread uint64 `koanf:"min-blocks-per-thread"`
TrieCleanLimit int `koanf:"trie-clean-limit"`
}

func (c *Config) Validate() error {
Expand All @@ -48,10 +55,11 @@ var DefaultConfig = Config{
}

var TestConfig = Config{
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
BlocksPerThread: 10,
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
MinBlocksPerThread: 10,
TrieCleanLimit: 600,
}

func ConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -60,22 +68,28 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".start-block", DefaultConfig.StartBlock, "first block number of the block range for re-execution")
f.Uint64(prefix+".end-block", DefaultConfig.EndBlock, "last block number of the block range for re-execution")
f.Int(prefix+".room", DefaultConfig.Room, "number of threads to parallelize blocks re-execution")
f.Uint64(prefix+".blocks-per-thread", DefaultConfig.BlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
f.Uint64(prefix+".min-blocks-per-thread", DefaultConfig.MinBlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
f.Int(prefix+".trie-clean-limit", DefaultConfig.TrieCleanLimit, "memory allowance (MB) to use for caching trie nodes in memory")
}

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
config *Config
db state.Database
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
minBlocksPerThread uint64
mutex sync.Mutex
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database, fatalErrChan chan error) (*BlocksReExecutor, error) {
if blockchain.TrieDB().Scheme() == rawdb.PathScheme {
return nil, errors.New("blocksReExecutor not supported on pathdb")
}
start := c.StartBlock
end := c.EndBlock
chainStart := blockchain.Config().ArbitrumChainParams.GenesisBlockNum
Expand All @@ -92,13 +106,13 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
minBlocksPerThread := uint64(10000)
if c.MinBlocksPerThread != 0 {
minBlocksPerThread = c.MinBlocksPerThread
}
if c.Mode == "random" && end != start {
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
// Reexecute a range of 10000 or (non-zero) c.MinBlocksPerThread number of blocks between start to end picked randomly
rng := minBlocksPerThread
if rng > end-start {
rng = end - start
}
Expand All @@ -111,32 +125,46 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
// Divide work equally among available threads when MinBlocksPerThread is zero
if c.MinBlocksPerThread == 0 {
// #nosec G115
work := (end - start) / uint64(c.Room)
work := (end - start) / uint64(c.Room*2)
if work > 0 {
blocksPerThread = work
minBlocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
},
hashConfig := *hashdb.Defaults
hashConfig.CleanCacheSize = c.TrieCleanLimit * 1024 * 1024
trieConfig := triedb.Config{
Preimages: false,
HashDB: &hashConfig,
}
blocksReExecutor := &BlocksReExecutor{
config: c,
db: state.NewDatabaseWithConfig(ethDb, &trieConfig),
blockchain: blockchain,
currentBlock: end,
startBlock: start,
minBlocksPerThread: minBlocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
}
blocksReExecutor.stateFor = func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
blocksReExecutor.mutex.Lock()
defer blocksReExecutor.mutex.Unlock()
sdb, err := state.New(header.Root, blocksReExecutor.db, nil)
if err == nil {
_ = blocksReExecutor.db.TrieDB().Reference(header.Root, common.Hash{}) // Will be dereferenced later in advanceStateUpToBlock
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return sdb, func() { blocksReExecutor.dereferenceRoot(header.Root) }, nil
}
return sdb, arbitrum.NoopStateRelease, err
}
return blocksReExecutor, nil
}

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.MinBlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.minBlocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
Expand All @@ -145,12 +173,10 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB
s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err)
return s.startBlock
}
// NoOp
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
if err != nil {
log.Info("Starting reexecution of blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
if err := s.advanceStateUpToBlock(ctx, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, release); err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor errored advancing state from block %d to block %d, err: %w", start, currentBlock, err)
} else {
log.Info("Successfully reexecuted blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
Expand Down Expand Up @@ -199,3 +225,60 @@ func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
func (s *BlocksReExecutor) StopAndWait() {
s.StopWaiter.StopAndWait()
}

func (s *BlocksReExecutor) dereferenceRoot(root common.Hash) {
s.mutex.Lock()
defer s.mutex.Unlock()
_ = s.db.TrieDB().Dereference(root)
}

func (s *BlocksReExecutor) commitStateAndVerify(statedb *state.StateDB, expected common.Hash, blockNumber uint64) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
result, err := statedb.Commit(blockNumber, true)
if err != nil {
return nil, arbitrum.NoopStateRelease, err
}
if result != expected {
return nil, arbitrum.NoopStateRelease, fmt.Errorf("bad root hash expected: %v got: %v", expected, result)
}
sdb, err := state.New(result, s.db, nil)
if err == nil {
_ = s.db.TrieDB().Reference(result, common.Hash{})
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return sdb, func() { s.dereferenceRoot(result) }, nil
}
return sdb, arbitrum.NoopStateRelease, err
}

func (s *BlocksReExecutor) advanceStateUpToBlock(ctx context.Context, state *state.StateDB, targetHeader *types.Header, lastAvailableHeader *types.Header, lastRelease arbitrum.StateReleaseFunc) error {
targetBlockNumber := targetHeader.Number.Uint64()
blockToRecreate := lastAvailableHeader.Number.Uint64() + 1
prevHash := lastAvailableHeader.Hash()
var stateRelease arbitrum.StateReleaseFunc
defer func() {
lastRelease()
}()
var block *types.Block
var err error
for ctx.Err() == nil {
state, block, err = arbitrum.AdvanceStateByBlock(ctx, s.blockchain, state, blockToRecreate, prevHash, nil)
if err != nil {
return err
}
prevHash = block.Hash()
state, stateRelease, err = s.commitStateAndVerify(state, block.Root(), block.NumberU64())
if err != nil {
return fmt.Errorf("failed committing state for block %d : %w", blockToRecreate, err)
}
lastRelease()
lastRelease = stateRelease
if blockToRecreate >= targetBlockNumber {
if block.Hash() != targetHeader.Hash() {
return fmt.Errorf("blockHash doesn't match when recreating number: %d expected: %v got: %v", blockToRecreate, targetHeader.Hash(), block.Hash())
}
return nil
}
blockToRecreate++
}
return ctx.Err()
}
6 changes: 5 additions & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,11 @@ func mainImpl() int {

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor, err = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, chainDb, fatalErrChan)
if err != nil {
log.Error("error initializing blocksReExecutor", "err", err)
return 1
}
if nodeConfig.Init.ThenQuit {
if err := gethexec.PopulateStylusTargetCache(&nodeConfig.Execution.StylusTarget); err != nil {
log.Error("error populating stylus target cache", "err", err)
Expand Down
32 changes: 17 additions & 15 deletions execution/gethexec/block_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,21 @@ func NewBlockRecorder(config *BlockRecorderConfig, execEngine *ExecutionEngine,
return recorder
}

func stateLogFunc(targetHeader, header *types.Header, hasState bool) {
if targetHeader == nil || header == nil {
return
}
gap := targetHeader.Number.Int64() - header.Number.Int64()
step := int64(500)
stage := "computing state"
if !hasState {
step = 3000
stage = "looking for full block"
}
if (gap >= step) && (gap%step == 0) {
log.Info("Setting up validation", "stage", stage, "current", header.Number, "target", targetHeader.Number)
func stateLogFunc(targetHeader *types.Header) arbitrum.StateBuildingLogFunction {
return func(header *types.Header, hasState bool) {
if targetHeader == nil || header == nil {
return
}
gap := targetHeader.Number.Int64() - header.Number.Int64()
step := int64(500)
stage := "computing state"
if !hasState {
step = 3000
stage = "looking for full block"
}
if (gap >= step) && (gap%step == 0) {
log.Info("Setting up validation", "stage", stage, "current", header.Number, "target", targetHeader.Number)
}
}
}

Expand All @@ -109,7 +111,7 @@ func (r *BlockRecorder) RecordBlockCreation(
}
}

recordingdb, chaincontext, recordingKV, err := r.recordingDatabase.PrepareRecording(ctx, prevHeader, stateLogFunc)
recordingdb, chaincontext, recordingKV, err := r.recordingDatabase.PrepareRecording(ctx, prevHeader, stateLogFunc(prevHeader))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -321,7 +323,7 @@ func (r *BlockRecorder) PrepareForRecord(ctx context.Context, start, end arbutil
log.Warn("prepareblocks asked for non-found block", "hdrNum", hdrNum)
break
}
_, err := r.recordingDatabase.GetOrRecreateState(ctx, header, stateLogFunc)
_, err := r.recordingDatabase.GetOrRecreateState(ctx, header, stateLogFunc(header))
if err != nil {
log.Warn("prepareblocks failed to get state for block", "hdrNum", hdrNum, "err", err)
break
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
8 changes: 6 additions & 2 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
blocksreexecutor "github.com/offchainlabs/nitro/blocks_reexecutor"
)

Expand All @@ -13,6 +14,7 @@ func TestBlocksReExecutorModes(t *testing.T) {
defer cancel()

builder := NewNodeBuilder(ctx).DefaultConfig(t, false)
builder.execConfig.Caching.StateScheme = rawdb.HashScheme
cleanup := builder.Build(t)
defer cleanup()

Expand All @@ -37,7 +39,8 @@ func TestBlocksReExecutorModes(t *testing.T) {

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull, err := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorFull.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand All @@ -49,7 +52,8 @@ func TestBlocksReExecutorModes(t *testing.T) {
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom, err := blocksreexecutor.New(c, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorRandom.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand Down
Loading