diff --git a/blocks_reexecutor/blocks_reexecutor.go b/blocks_reexecutor/blocks_reexecutor.go index 42bd1428dc..bb6de00cad 100644 --- a/blocks_reexecutor/blocks_reexecutor.go +++ b/blocks_reexecutor/blocks_reexecutor.go @@ -68,7 +68,7 @@ type BlocksReExecutor struct { stopwaiter.StopWaiter config *Config blockchain *core.BlockChain - stateFor func(header *types.Header) (*state.StateDB, error) + stateFor arbitrum.StateForHeaderFunction done chan struct{} fatalErrChan chan error startBlock uint64 @@ -110,7 +110,10 @@ func New(c *Config, blockchain *core.BlockChain, fatalErrChan chan error) *Block startBlock: start, done: make(chan struct{}, c.Room), fatalErrChan: fatalErrChan, - stateFor: func(header *types.Header) (*state.StateDB, error) { return blockchain.StateAt(header.Root) }, + stateFor: func(header *types.Header) (*state.StateDB, arbitrum.StateReleaseFunc, error) { + state, err := blockchain.StateAt(header.Root) + return state, arbitrum.NoopStateRelease, err + }, } } @@ -120,7 +123,9 @@ func (s *BlocksReExecutor) LaunchBlocksReExecution(ctx context.Context, currentB if start < s.startBlock { start = s.startBlock } - startState, startHeader, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) + // we don't use state release pattern here + // TODO do we want to use release pattern here? + startState, startHeader, _, err := arbitrum.FindLastAvailableState(ctx, s.blockchain, s.stateFor, s.blockchain.GetHeaderByNumber(start), nil, -1) if err != nil { s.fatalErrChan <- fmt.Errorf("blocksReExecutor failed to get last available state while searching for state at %d, err: %w", start, err) return s.startBlock diff --git a/go-ethereum b/go-ethereum index 657dcf6626..088149d73d 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 657dcf66263e940e86f9e89325c5100899d5ab58 +Subproject commit 088149d73d7b39c844050e63f8a9c988ed8bdb2d diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 0dda408aaa..e0f4a05126 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -183,6 +184,13 @@ func (b *NodeBuilder) DefaultConfig(t *testing.T, withL1 bool) *NodeBuilder { } func (b *NodeBuilder) Build(t *testing.T) func() { + if b.execConfig.RPC.MaxRecreateStateDepth == arbitrum.UninitializedMaxRecreateStateDepth { + if b.execConfig.Caching.Archive { + b.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultArchiveNodeMaxRecreateStateDepth + } else { + b.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultNonArchiveNodeMaxRecreateStateDepth + } + } if b.withL1 { l1, l2 := NewTestClient(b.ctx), NewTestClient(b.ctx) b.L2Info, l2.ConsensusNode, l2.Client, l2.Stack, b.L1Info, l1.L1Backend, l1.Client, l1.Stack = @@ -229,6 +237,13 @@ func (b *NodeBuilder) Build2ndNode(t *testing.T, params *SecondNodeParams) (*Tes if params.execConfig == nil { params.execConfig = b.execConfig } + if params.execConfig.RPC.MaxRecreateStateDepth == arbitrum.UninitializedMaxRecreateStateDepth { + if params.execConfig.Caching.Archive { + params.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultArchiveNodeMaxRecreateStateDepth + } else { + params.execConfig.RPC.MaxRecreateStateDepth = arbitrum.DefaultNonArchiveNodeMaxRecreateStateDepth + } + } l2 := NewTestClient(b.ctx) l2.Client, l2.ConsensusNode = diff --git a/system_tests/recreatestate_rpc_test.go b/system_tests/recreatestate_rpc_test.go index 1973587ecb..777ed17961 100644 --- a/system_tests/recreatestate_rpc_test.go +++ b/system_tests/recreatestate_rpc_test.go @@ -2,31 +2,30 @@ package arbtest import ( "context" + "encoding/binary" "errors" + "fmt" "math/big" "strings" + "sync" "testing" + "time" "github.com/ethereum/go-ethereum/arbitrum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/util" ) -func prepareNodeWithHistory(t *testing.T, ctx context.Context, execConfig *gethexec.Config, txCount uint64) (node *arbnode.Node, executionNode *gethexec.ExecutionNode, l2client *ethclient.Client, cancel func()) { - t.Helper() - builder := NewNodeBuilder(ctx).DefaultConfig(t, true) - builder.execConfig = execConfig - cleanup := builder.Build(t) - builder.L2Info.GenerateAccount("User2") +func makeSomeTransfers(t *testing.T, ctx context.Context, builder *NodeBuilder, txCount uint64) { var txs []*types.Transaction for i := uint64(0); i < txCount; i++ { tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil) @@ -38,8 +37,16 @@ func prepareNodeWithHistory(t *testing.T, ctx context.Context, execConfig *gethe _, err := builder.L2.EnsureTxSucceeded(tx) Require(t, err) } +} - return builder.L2.ConsensusNode, builder.L2.ExecNode, builder.L2.Client, cleanup +func prepareNodeWithHistory(t *testing.T, ctx context.Context, execConfig *gethexec.Config, txCount uint64) (*NodeBuilder, func()) { + t.Helper() + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.execConfig = execConfig + cleanup := builder.Build(t) + builder.L2Info.GenerateAccount("User2") + makeSomeTransfers(t, ctx, builder, txCount) + return builder, cleanup } func fillHeaderCache(t *testing.T, bc *core.BlockChain, from, to uint64) { @@ -89,17 +96,19 @@ func removeStatesFromDb(t *testing.T, bc *core.BlockChain, db ethdb.Database, fr func TestRecreateStateForRPCNoDepthLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodeConfig := gethexec.ConfigDefaultTest() - nodeConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth - nodeConfig.Sequencer.MaxBlockSpeed = 0 - nodeConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 - nodeConfig.Caching.Archive = true + execConfig := gethexec.ConfigDefaultTest() + execConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + execConfig.Caching.Archive = true + execConfig.Caching.SnapshotCache = 0 // disable snapshots // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there - nodeConfig.Caching.TrieCleanCache = 0 - nodeConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 - nodeConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 - _, execNode, l2client, cancelNode := prepareNodeWithHistory(t, ctx, nodeConfig, 32) + execConfig.Caching.TrieCleanCache = 0 + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 + execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 32) defer cancelNode() + execNode, l2client := builder.L2.ExecNode, builder.L2.Client bc := execNode.Backend.ArbInterface().BlockChain() db := execNode.Backend.ChainDb() @@ -123,17 +132,18 @@ func TestRecreateStateForRPCBigEnoughDepthLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() depthGasLimit := int64(256 * util.NormalizeL2GasForL1GasInitial(800_000, params.GWei)) - nodeConfig := gethexec.ConfigDefaultTest() - nodeConfig.RPC.MaxRecreateStateDepth = depthGasLimit - nodeConfig.Sequencer.MaxBlockSpeed = 0 - nodeConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 - nodeConfig.Caching.Archive = true + execConfig := gethexec.ConfigDefaultTest() + execConfig.RPC.MaxRecreateStateDepth = depthGasLimit + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + execConfig.Caching.Archive = true // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there - nodeConfig.Caching.TrieCleanCache = 0 - nodeConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 - nodeConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 - _, execNode, l2client, cancelNode := prepareNodeWithHistory(t, ctx, nodeConfig, 32) + execConfig.Caching.TrieCleanCache = 0 + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 + execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 32) defer cancelNode() + execNode, l2client := builder.L2.ExecNode, builder.L2.Client bc := execNode.Backend.ArbInterface().BlockChain() db := execNode.Backend.ChainDb() @@ -157,17 +167,18 @@ func TestRecreateStateForRPCBigEnoughDepthLimit(t *testing.T) { func TestRecreateStateForRPCDepthLimitExceeded(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodeConfig := gethexec.ConfigDefaultTest() - nodeConfig.RPC.MaxRecreateStateDepth = int64(200) - nodeConfig.Sequencer.MaxBlockSpeed = 0 - nodeConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 - nodeConfig.Caching.Archive = true + execConfig := gethexec.ConfigDefaultTest() + execConfig.RPC.MaxRecreateStateDepth = int64(200) + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + execConfig.Caching.Archive = true // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there - nodeConfig.Caching.TrieCleanCache = 0 - nodeConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 - nodeConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 - _, execNode, l2client, cancelNode := prepareNodeWithHistory(t, ctx, nodeConfig, 32) + execConfig.Caching.TrieCleanCache = 0 + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 + execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 32) defer cancelNode() + execNode, l2client := builder.L2.ExecNode, builder.L2.Client bc := execNode.Backend.ArbInterface().BlockChain() db := execNode.Backend.ChainDb() @@ -191,17 +202,18 @@ func TestRecreateStateForRPCMissingBlockParent(t *testing.T) { var headerCacheLimit uint64 = 512 ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodeConfig := gethexec.ConfigDefaultTest() - nodeConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth - nodeConfig.Sequencer.MaxBlockSpeed = 0 - nodeConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 - nodeConfig.Caching.Archive = true + execConfig := gethexec.ConfigDefaultTest() + execConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + execConfig.Caching.Archive = true // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there - nodeConfig.Caching.TrieCleanCache = 0 - nodeConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 - nodeConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 - _, execNode, l2client, cancelNode := prepareNodeWithHistory(t, ctx, nodeConfig, headerCacheLimit+5) + execConfig.Caching.TrieCleanCache = 0 + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 + execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, headerCacheLimit+5) defer cancelNode() + execNode, l2client := builder.L2.ExecNode, builder.L2.Client bc := execNode.Backend.ArbInterface().BlockChain() db := execNode.Backend.ChainDb() @@ -236,16 +248,17 @@ func TestRecreateStateForRPCBeyondGenesis(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodeConfig := gethexec.ConfigDefaultTest() - nodeConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth - nodeConfig.Sequencer.MaxBlockSpeed = 0 - nodeConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 - nodeConfig.Caching.Archive = true + execConfig := gethexec.ConfigDefaultTest() + execConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + execConfig.Caching.Archive = true // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there - nodeConfig.Caching.TrieCleanCache = 0 - nodeConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 - nodeConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 - _, execNode, l2client, cancelNode := prepareNodeWithHistory(t, ctx, nodeConfig, 32) + execConfig.Caching.TrieCleanCache = 0 + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 + execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 32) + execNode, l2client := builder.L2.ExecNode, builder.L2.Client defer cancelNode() bc := execNode.Backend.ArbInterface().BlockChain() db := execNode.Backend.ChainDb() @@ -271,17 +284,18 @@ func TestRecreateStateForRPCBlockNotFoundWhileRecreating(t *testing.T) { var blockCacheLimit uint64 = 256 ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodeConfig := gethexec.ConfigDefaultTest() - nodeConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth - nodeConfig.Sequencer.MaxBlockSpeed = 0 - nodeConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 - nodeConfig.Caching.Archive = true + execConfig := gethexec.ConfigDefaultTest() + execConfig.RPC.MaxRecreateStateDepth = arbitrum.InfiniteMaxRecreateStateDepth + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + execConfig.Caching.Archive = true // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there - nodeConfig.Caching.TrieCleanCache = 0 + execConfig.Caching.TrieCleanCache = 0 - nodeConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 - nodeConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 - _, execNode, l2client, cancelNode := prepareNodeWithHistory(t, ctx, nodeConfig, blockCacheLimit+4) + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 0 + execConfig.Caching.MaxAmountOfGasToSkipStateSaving = 0 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, blockCacheLimit+4) + execNode, l2client := builder.L2.ExecNode, builder.L2.Client defer cancelNode() bc := execNode.Backend.ArbInterface().BlockChain() db := execNode.Backend.ChainDb() @@ -306,7 +320,7 @@ func TestRecreateStateForRPCBlockNotFoundWhileRecreating(t *testing.T) { hash := rawdb.ReadCanonicalHash(db, lastBlock) Fatal(t, "Didn't fail to get balance at block:", lastBlock, " with hash:", hash, ", lastBlock:", lastBlock) } - if !strings.Contains(err.Error(), "block not found while recreating") { + if !strings.Contains(err.Error(), fmt.Sprintf("block #%d not found", blockBodyToRemove)) { Fatal(t, "Failed with unexpected error: \"", err, "\", at block:", lastBlock, "lastBlock:", lastBlock) } } @@ -358,9 +372,13 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig Fatal(t, "internal test error - tx got included in unexpected block number, have:", have, "want:", want) } } + bc := execNode.Backend.ArbInterface().BlockChain() genesis := uint64(0) - lastBlock, err := client.BlockNumber(ctx) - Require(t, err) + currentHeader := bc.CurrentBlock() + if currentHeader == nil { + Fatal(t, "missing current block") + } + lastBlock := currentHeader.Number.Uint64() if want := genesis + uint64(txCount); lastBlock < want { Fatal(t, "internal test error - not enough blocks produced during preparation, want:", want, "have:", lastBlock) } @@ -381,7 +399,7 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig Require(t, node.Start(ctx)) client = ClientForStack(t, stack) defer node.StopAndWait() - bc := execNode.Backend.ArbInterface().BlockChain() + bc = execNode.Backend.ArbInterface().BlockChain() gas := skipGas blocks := skipBlocks for i := genesis + 1; i <= genesis+uint64(txCount); i++ { @@ -391,8 +409,8 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig continue } gas += block.GasUsed() - blocks++ _, err := bc.StateAt(block.Root()) + blocks++ if (skipBlocks == 0 && skipGas == 0) || (skipBlocks != 0 && blocks > skipBlocks) || (skipGas != 0 && gas > skipGas) { if err != nil { t.Log("blocks:", blocks, "skipBlocks:", skipBlocks, "gas:", gas, "skipGas:", skipGas) @@ -401,13 +419,17 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig gas = 0 blocks = 0 } else { + if int(i) >= int(lastBlock)-int(cacheConfig.BlockCount) { + // skipping nonexistence check - the state might have been saved on node shutdown + continue + } if err == nil { t.Log("blocks:", blocks, "skipBlocks:", skipBlocks, "gas:", gas, "skipGas:", skipGas) Fatal(t, "state shouldn't be available, root:", block.Root(), "blockNumber:", i, "blockHash", block.Hash()) } expectedErr := &trie.MissingNodeError{} if !errors.As(err, &expectedErr) { - Fatal(t, "getting state failed with unexpected error, root:", block.Root(), "blockNumber:", i, "blockHash", block.Hash()) + Fatal(t, "getting state failed with unexpected error, root:", block.Root(), "blockNumber:", i, "blockHash:", block.Hash(), "err:", err) } } } @@ -429,7 +451,10 @@ func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) { cacheConfig := gethexec.DefaultCachingConfig cacheConfig.Archive = true - //// test defaults + cacheConfig.SnapshotCache = 0 // disable snapshots + cacheConfig.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are + + // test defaults testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512) cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 127 @@ -444,8 +469,10 @@ func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) { cacheConfig.MaxAmountOfGasToSkipStateSaving = 15 * 1000 * 1000 testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512) - // one test block ~ 925000 gas - testBlockGas := uint64(925000) + // lower number of blocks in triegc below 100 blocks, to be able to check for nonexistence in testSkippingSavingStateAndRecreatingAfterRestart (it doesn't check last BlockCount blocks as some of them may be persisted on node shutdown) + cacheConfig.BlockCount = 16 + + testBlockGas := uint64(925000) // one test block ~ 925000 gas skipBlockValues := []uint64{0, 1, 2, 3, 5, 21, 51, 100, 101} var skipGasValues []uint64 for _, i := range skipBlockValues { @@ -459,3 +486,206 @@ func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) { } } } + +func TestGettingStateForRPCFullNode(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + execConfig := gethexec.ConfigDefaultTest() + execConfig.Caching.SnapshotCache = 0 // disable snapshots + execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 16) + execNode, _ := builder.L2.ExecNode, builder.L2.Client + defer cancelNode() + bc := execNode.Backend.ArbInterface().BlockChain() + api := execNode.Backend.APIBackend() + + header := bc.CurrentBlock() + if header == nil { + Fatal(t, "failed to get current block header") + } + state, _, err := api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64())) + Require(t, err) + addr := builder.L2Info.GetAddress("User2") + exists := state.Exist(addr) + err = state.Error() + Require(t, err) + if !exists { + Fatal(t, "User2 address does not exist in the state") + } + // Get the state again to avoid caching + state, _, err = api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64())) + Require(t, err) + + blockCountRequiredToFlushDirties := builder.execConfig.Caching.BlockCount + makeSomeTransfers(t, ctx, builder, blockCountRequiredToFlushDirties) + + exists = state.Exist(addr) + err = state.Error() + Require(t, err) + if !exists { + Fatal(t, "User2 address does not exist in the state") + } +} + +func TestGettingStateForRPCHybridArchiveNode(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + execConfig := gethexec.ConfigDefaultTest() + execConfig.Caching.Archive = true + execConfig.Caching.MaxNumberOfBlocksToSkipStateSaving = 128 + execConfig.Caching.BlockCount = 128 + execConfig.Caching.SnapshotCache = 0 // disable snapshots + execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are + execConfig.Sequencer.MaxBlockSpeed = 0 + execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110 + builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 16) + execNode, _ := builder.L2.ExecNode, builder.L2.Client + defer cancelNode() + bc := execNode.Backend.ArbInterface().BlockChain() + api := execNode.Backend.APIBackend() + + header := bc.CurrentBlock() + if header == nil { + Fatal(t, "failed to get current block header") + } + state, _, err := api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64())) + Require(t, err) + addr := builder.L2Info.GetAddress("User2") + exists := state.Exist(addr) + err = state.Error() + Require(t, err) + if !exists { + Fatal(t, "User2 address does not exist in the state") + } + // Get the state again to avoid caching + state, _, err = api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64())) + Require(t, err) + + blockCountRequiredToFlushDirties := builder.execConfig.Caching.BlockCount + makeSomeTransfers(t, ctx, builder, blockCountRequiredToFlushDirties) + + exists = state.Exist(addr) + err = state.Error() + Require(t, err) + if !exists { + Fatal(t, "User2 address does not exist in the state") + } +} + +func TestStateAndHeaderForRecentBlock(t *testing.T) { + threads := 32 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + builder := NewNodeBuilder(ctx).DefaultConfig(t, true) + builder.execConfig.Caching.Archive = true + builder.execConfig.RPC.MaxRecreateStateDepth = 0 + cleanup := builder.Build(t) + defer cleanup() + builder.L2Info.GenerateAccount("User2") + + errors := make(chan error, threads+1) + senderDone := make(chan struct{}) + go func() { + defer close(senderDone) + for ctx.Err() == nil { + tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, new(big.Int).Lsh(big.NewInt(1), 128), nil) + err := builder.L2.Client.SendTransaction(ctx, tx) + if ctx.Err() != nil { + return + } + if err != nil { + errors <- err + return + } + _, err = builder.L2.EnsureTxSucceeded(tx) + if ctx.Err() != nil { + return + } + if err != nil { + errors <- err + return + } + time.Sleep(10 * time.Millisecond) + } + }() + api := builder.L2.ExecNode.Backend.APIBackend() + db := builder.L2.ExecNode.Backend.ChainDb() + i := 1 + var mtx sync.RWMutex + var wgCallers sync.WaitGroup + for j := 0; j < threads && ctx.Err() == nil; j++ { + wgCallers.Add(1) + go func() { + defer wgCallers.Done() + mtx.RLock() + blockNumber := i + mtx.RUnlock() + for blockNumber < 300 && ctx.Err() == nil { + prefix := make([]byte, 8) + binary.BigEndian.PutUint64(prefix, uint64(blockNumber)) + prefix = append([]byte("b"), prefix...) + it := db.NewIterator(prefix, nil) + defer it.Release() + if it.Next() { + key := it.Key() + if len(key) != len(prefix)+common.HashLength { + Fatal(t, "Wrong key length, have:", len(key), "want:", len(prefix)+common.HashLength) + } + blockHash := common.BytesToHash(key[len(prefix):]) + start := time.Now() + for ctx.Err() == nil { + _, _, err := api.StateAndHeaderByNumberOrHash(ctx, rpc.BlockNumberOrHash{BlockHash: &blockHash}) + if err == nil { + mtx.Lock() + if blockNumber == i { + i++ + } + mtx.Unlock() + break + } + if ctx.Err() != nil { + return + } + if !strings.Contains(err.Error(), "ahead of current block") { + errors <- err + return + } + if time.Since(start) > 5*time.Second { + errors <- fmt.Errorf("timeout - failed to get state for more then 5 seconds, block: %d, err: %w", blockNumber, err) + return + } + } + } + it.Release() + mtx.RLock() + blockNumber = i + mtx.RUnlock() + } + }() + } + callersDone := make(chan struct{}) + go func() { + wgCallers.Wait() + close(callersDone) + }() + + select { + case <-callersDone: + cancel() + case <-senderDone: + cancel() + case err := <-errors: + t.Error(err) + cancel() + } + <-callersDone + <-senderDone + close(errors) + for err := range errors { + if err != nil { + t.Error(err) + } + } +}