Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[config change] Improve BlocksReExecutor implementation #2714

Merged
merged 11 commits into from
Nov 7, 2024
153 changes: 109 additions & 44 deletions blocks_reexecutor/blocks_reexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ import (
"strings"

"github.com/ethereum/go-ethereum/arbitrum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/stopwaiter"
flag "github.com/spf13/pflag"
)

type Config struct {
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
BlocksPerThread uint64 `koanf:"blocks-per-thread"`
Enable bool `koanf:"enable"`
Mode string `koanf:"mode"`
StartBlock uint64 `koanf:"start-block"`
EndBlock uint64 `koanf:"end-block"`
Room int `koanf:"room"`
MinBlocksPerThread uint64 `koanf:"min-blocks-per-thread"`
}

func (c *Config) Validate() error {
Expand All @@ -48,10 +53,10 @@ var DefaultConfig = Config{
}

var TestConfig = Config{
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
BlocksPerThread: 10,
Enable: true,
Mode: "full",
Room: runtime.NumCPU(),
MinBlocksPerThread: 10,
}

func ConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -60,22 +65,26 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Uint64(prefix+".start-block", DefaultConfig.StartBlock, "first block number of the block range for re-execution")
f.Uint64(prefix+".end-block", DefaultConfig.EndBlock, "last block number of the block range for re-execution")
f.Int(prefix+".room", DefaultConfig.Room, "number of threads to parallelize blocks re-execution")
f.Uint64(prefix+".blocks-per-thread", DefaultConfig.BlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
f.Uint64(prefix+".min-blocks-per-thread", DefaultConfig.MinBlocksPerThread, "minimum number of blocks to execute per thread. When mode is random this acts as the size of random block range sample")
}

type BlocksReExecutor struct {
stopwaiter.StopWaiter
config *Config
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
blocksPerThread uint64
config *Config
db state.Database
blockchain *core.BlockChain
stateFor arbitrum.StateForHeaderFunction
done chan struct{}
fatalErrChan chan error
startBlock uint64
currentBlock uint64
minBlocksPerThread uint64
}

func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *BlocksReExecutor {
func New(c *Config, blockchain *core.BlockChain, ethDb ethdb.Database, fatalErrChan chan error) (*BlocksReExecutor, error) {
if blockchain.TrieDB().Scheme() == rawdb.PathScheme {
return nil, errors.New("blocksReExecutor not supported on pathdb")
}
start := c.StartBlock
end := c.EndBlock
chainStart := blockchain.Config().ArbitrumChainParams.GenesisBlockNum
Expand All @@ -92,13 +101,13 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
log.Warn("invalid state reexecutor's end block number, resetting to latest", "end", end, "latest", chainEnd)
end = chainEnd
}
blocksPerThread := uint64(10000)
if c.BlocksPerThread != 0 {
blocksPerThread = c.BlocksPerThread
minBlocksPerThread := uint64(10000)
if c.MinBlocksPerThread != 0 {
minBlocksPerThread = c.MinBlocksPerThread
}
if c.Mode == "random" && end != start {
// Reexecute a range of 10000 or (non-zero) c.BlocksPerThread number of blocks between start to end picked randomly
rng := blocksPerThread
// Reexecute a range of 10000 or (non-zero) c.MinBlocksPerThread number of blocks between start to end picked randomly
rng := minBlocksPerThread
if rng > end-start {
rng = end - start
}
Expand All @@ -111,32 +120,41 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block
if start > 0 && start != chainStart {
start--
}
// Divide work equally among available threads when BlocksPerThread is zero
if c.BlocksPerThread == 0 {
// Divide work equally among available threads when MinBlocksPerThread is zero
if c.MinBlocksPerThread == 0 {
// #nosec G115
work := (end - start) / uint64(c.Room)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if work > 0 {
blocksPerThread = work
minBlocksPerThread = work
}
}
return &BlocksReExecutor{
config: c,
blockchain: blockchain,
currentBlock: end,
startBlock: start,
blocksPerThread: blocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
state, err := blockchain.StateAt(header.Root)
return state, arbitrum.NoopStateRelease, err
},
trieConfig := triedb.Config{
Preimages: false,
HashDB: hashdb.Defaults,
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
blocksReExecutor := &BlocksReExecutor{
config: c,
db: state.NewDatabaseWithConfig(ethDb, &trieConfig),
blockchain: blockchain,
currentBlock: end,
startBlock: start,
minBlocksPerThread: minBlocksPerThread,
done: make(chan struct{}, c.Room),
fatalErrChan: fatalErrChan,
}
blocksReExecutor.stateFor = func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) {
sdb, err := state.NewDeterministic(header.Root, blocksReExecutor.db)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
_ = blocksReExecutor.db.TrieDB().Reference(header.Root, common.Hash{}) // Will be dereferenced later in advanceStateUpToBlock
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
}
return sdb, arbitrum.NoopStateRelease, err
}
return blocksReExecutor, nil
}

// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.BlocksPerThread, currentBlock] to the last available valid state
// LaunchBlocksReExecution launches the thread to apply blocks of range [currentBlock-s.config.MinBlocksPerThread, currentBlock] to the last available valid state
func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentBlock uint64) uint64 {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
start := arbmath.SaturatingUSub(currentBlock, s.blocksPerThread)
start := arbmath.SaturatingUSub(currentBlock, s.minBlocksPerThread)
if start < s.startBlock {
start = s.startBlock
}
Expand All @@ -149,8 +167,8 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB
defer release()
start = startHeader.Number.Uint64()
s.LaunchThread(func(ctx context.Context) {
_, err := arbitrum.AdvanceStateUpToBlock(ctx, s.blockchain, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader, nil)
if err != nil {
log.Info("Starting reexecution of blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
if err := s.advanceStateUpToBlock(ctx, startState, s.blockchain.GetHeaderByNumber(currentBlock), startHeader); err != nil {
s.fatalErrChan <- fmt.Errorf("blocksReExecutor errored advancing state from block %d to block %d, err: %w", start, currentBlock, err)
} else {
log.Info("Successfully reexecuted blocks against historic state", "stateAt", start, "startBlock", start+1, "endBlock", currentBlock)
Expand Down Expand Up @@ -199,3 +217,50 @@ func (s *BlocksReExecutor) Start(ctx context.Context, done chan struct{}) {
func (s *BlocksReExecutor) StopAndWait() {
s.StopWaiter.StopAndWait()
}

func (s *BlocksReExecutor) commitStateAndVerify(statedb *state.StateDB, expected common.Hash, blockNumber uint64) (*state.StateDB, error) {
result, err := statedb.Commit(blockNumber, true)
if err != nil {
return nil, err
}
if result != expected {
return nil, fmt.Errorf("bad root hash expected: %v got: %v", expected, result)
}
_ = s.db.TrieDB().Reference(result, common.Hash{})
return state.New(result, statedb.Database(), nil)
}

func (s *BlocksReExecutor) advanceStateUpToBlock(ctx context.Context, state *state.StateDB, targetHeader *types.Header, lastAvailableHeader *types.Header) error {
targetBlockNumber := targetHeader.Number.Uint64()
blockToRecreate := lastAvailableHeader.Number.Uint64() + 1
prevHash := lastAvailableHeader.Hash()
lastRoot := lastAvailableHeader.Root
defer func() {
if (lastRoot != common.Hash{}) {
_ = s.db.TrieDB().Dereference(lastRoot)
}
}()
var block *types.Block
var err error
for ctx.Err() == nil {
state, block, err = arbitrum.AdvanceStateByBlock(ctx, s.blockchain, state, targetHeader, blockToRecreate, prevHash, nil)
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
prevHash = block.Hash()
state, err = s.commitStateAndVerify(state, block.Root(), block.NumberU64())
if err != nil {
return fmt.Errorf("failed committing state for block %d : %w", blockToRecreate, err)
}
_ = s.db.TrieDB().Dereference(lastRoot)
lastRoot = block.Root()
if blockToRecreate >= targetBlockNumber {
if block.Hash() != targetHeader.Hash() {
return fmt.Errorf("blockHash doesn't match when recreating number: %d expected: %v got: %v", blockToRecreate, targetHeader.Hash(), block.Hash())
}
return nil
}
blockToRecreate++
}
return ctx.Err()
}
6 changes: 5 additions & 1 deletion cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,11 @@ func mainImpl() int {

var blocksReExecutor *blocksreexecutor.BlocksReExecutor
if nodeConfig.BlocksReExecutor.Enable && l2BlockChain != nil {
blocksReExecutor = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, fatalErrChan)
blocksReExecutor, err = blocksreexecutor.New(&nodeConfig.BlocksReExecutor, l2BlockChain, chainDb, fatalErrChan)
if err != nil {
log.Error("error initializing blocksReExecutor", "err", err)
return 1
}
if nodeConfig.Init.ThenQuit {
if err := gethexec.PopulateStylusTargetCache(&nodeConfig.Execution.StylusTarget); err != nil {
log.Error("error populating stylus target cache", "err", err)
Expand Down
8 changes: 6 additions & 2 deletions system_tests/blocks_reexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
blocksreexecutor "github.com/offchainlabs/nitro/blocks_reexecutor"
)

Expand All @@ -13,6 +14,7 @@ func TestBlocksReExecutorModes(t *testing.T) {
defer cancel()

builder := NewNodeBuilder(ctx).DefaultConfig(t, false)
builder.execConfig.Caching.StateScheme = rawdb.HashScheme
cleanup := builder.Build(t)
defer cleanup()

Expand All @@ -37,7 +39,8 @@ func TestBlocksReExecutorModes(t *testing.T) {

// Reexecute blocks at mode full
success := make(chan struct{})
executorFull := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, feedErrChan)
executorFull, err := blocksreexecutor.New(&blocksreexecutor.TestConfig, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorFull.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand All @@ -49,7 +52,8 @@ func TestBlocksReExecutorModes(t *testing.T) {
success = make(chan struct{})
c := &blocksreexecutor.TestConfig
c.Mode = "random"
executorRandom := blocksreexecutor.New(c, blockchain, feedErrChan)
executorRandom, err := blocksreexecutor.New(c, blockchain, builder.L2.ExecNode.ChainDB, feedErrChan)
Require(t, err)
executorRandom.Start(ctx, success)
select {
case err := <-feedErrChan:
Expand Down
Loading