diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 32ea3940b5..7666411d2e 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -26,18 +26,44 @@ import ( "github.com/ethereum/go-ethereum/log" ) +type UnlimitedCapChannel[T any] struct { + SendSide chan T + ReceiveSide chan T +} + +func NewUnlimitedCapChannel[T any](size int) *UnlimitedCapChannel[T] { + if size == 0 { + panic("trying to create UnlimitedCapChannel with 0 initial size, need size to be atleast 1") + } + u := &UnlimitedCapChannel[T]{ + SendSide: make(chan T), + ReceiveSide: make(chan T, size), + } + go func(uc *UnlimitedCapChannel[T]) { + for { + if len(uc.ReceiveSide) == cap(u.ReceiveSide) { + tmp := make(chan T, cap(u.ReceiveSide)*2) + for len(u.ReceiveSide) > 0 { + tmp <- <-u.ReceiveSide + } + u.ReceiveSide = tmp + } + u.ReceiveSide <- <-uc.SendSide + } + }(u) + return u +} + type api struct { sim *SimulatedBeacon } func (a *api) loop() { - var ( - // Arbitrum: we need to make newTxs a buffered channel because by the current design of simulated beacon - // it would deadlock with this cycle a.sim.Commit() -> txpool.Sync() -> subpools reset -> update feeds (newTxs is one of the receivers) - // Note: capacity of this channel should be the worst-case estimate of number of transactions all arriving simultaneously to the pool - newTxs = make(chan core.NewTxsEvent, 15) - sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) - ) + // Arbitrum: we make a channel with dynamic capacity (UnlimitedCapChannel[core.NewTxsEvent]) and subscribe to tx-events on the SendSide + // and read events on the ReceiveSide because by the current design of simulated beacon + // it would deadlock with this cycle a.sim.Commit() -> txpool.Sync() -> subpools reset -> update feeds (newTxs is one of the receivers) + uc := NewUnlimitedCapChannel[core.NewTxsEvent](5) + sub := a.sim.eth.TxPool().SubscribeTransactions(uc.SendSide, true) defer sub.Unsubscribe() for { @@ -49,7 +75,7 @@ func (a *api) loop() { if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } - case <-newTxs: + case <-uc.ReceiveSide: a.sim.Commit() } } diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index b6d37134ae..295c1f68c4 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -140,8 +140,11 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { } } -func TestSimulatedBeaconAPIDeadlocksInExtremeConditions(t *testing.T) { - txs := make(map[common.Hash]types.Transaction) +// Tests that zero-period dev mode can handle a lot of simultaneous +// transactions/withdrawals +func TestOnDemandSpam(t *testing.T) { + var withdrawals []types.Withdrawal + txs := make(map[common.Hash]*types.Transaction) var ( // testKey is a private key to use for funding a tester account. @@ -158,23 +161,29 @@ func TestSimulatedBeaconAPIDeadlocksInExtremeConditions(t *testing.T) { _ = mock defer node.Close() - // simulated beacon api - mockApi := &api{mock} - go mockApi.loop() + api := &api{mock} + go api.loop() chainHeadCh := make(chan core.ChainHeadEvent, 10) subscription := ethService.BlockChain().SubscribeChainHeadEvent(chainHeadCh) defer subscription.Unsubscribe() - // generate a bunch of transactions to overload simulated beacon api - // current capacity of core.NewTxsEvent channel is 15, we send 30 txs + // generate some withdrawals + for i := 0; i < 0; i++ { + withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) + if err := mock.withdrawals.add(&withdrawals[i]); err != nil { + t.Fatal("addWithdrawal failed", err) + } + } + + // generate a bunch of transactions signer := types.NewEIP155Signer(ethService.BlockChain().Config().ChainID) - for i := 0; i < 30; i++ { + for i := 0; i < 20000; i++ { tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee), nil), signer, testKey) if err != nil { t.Fatalf("error signing transaction, err=%v", err) } - txs[tx.Hash()] = *tx + txs[tx.Hash()] = tx if err := ethService.APIBackend.SendTx(context.Background(), tx); err != nil { t.Fatal("SendTx failed", err) @@ -182,21 +191,25 @@ func TestSimulatedBeaconAPIDeadlocksInExtremeConditions(t *testing.T) { } includedTxs := make(map[common.Hash]struct{}) + var includedWithdrawals []uint64 - timer := time.NewTimer(12 * time.Second) + timer := time.NewTimer(20 * time.Second) for { select { case evt := <-chainHeadCh: for _, includedTx := range evt.Block.Transactions() { includedTxs[includedTx.Hash()] = struct{}{} } + for _, includedWithdrawal := range evt.Block.Withdrawals() { + includedWithdrawals = append(includedWithdrawals, includedWithdrawal.Index) + } // ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10 - if len(includedTxs) == len(txs) { - t.Fatal("all txs were included, the simulated beacon api did not deadlock") + if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) { + return } case <-timer.C: - return + t.Fatal("timed out without including all withdrawals/txs") } } }