Skip to content

Commit

Permalink
Fix deadlock in sim-beacon-api impl using channel with unlimited capa…
Browse files Browse the repository at this point in the history
…city
  • Loading branch information
ganeshvanahalli committed May 7, 2024
1 parent da4c975 commit cf65344
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 21 deletions.
42 changes: 34 additions & 8 deletions eth/catalyst/simulated_beacon_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,44 @@ import (
"github.com/ethereum/go-ethereum/log"
)

type UnlimitedCapChannel[T any] struct {
SendSide chan T
ReceiveSide chan T
}

func NewUnlimitedCapChannel[T any](size int) *UnlimitedCapChannel[T] {
if size == 0 {
panic("trying to create UnlimitedCapChannel with 0 initial size, need size to be atleast 1")
}
u := &UnlimitedCapChannel[T]{
SendSide: make(chan T),
ReceiveSide: make(chan T, size),
}
go func(uc *UnlimitedCapChannel[T]) {
for {
if len(uc.ReceiveSide) == cap(u.ReceiveSide) {
tmp := make(chan T, cap(u.ReceiveSide)*2)
for len(u.ReceiveSide) > 0 {
tmp <- <-u.ReceiveSide
}
u.ReceiveSide = tmp
}
u.ReceiveSide <- <-uc.SendSide
}
}(u)
return u
}

type api struct {
sim *SimulatedBeacon
}

func (a *api) loop() {
var (
// Arbitrum: we need to make newTxs a buffered channel because by the current design of simulated beacon
// it would deadlock with this cycle a.sim.Commit() -> txpool.Sync() -> subpools reset -> update feeds (newTxs is one of the receivers)
// Note: capacity of this channel should be the worst-case estimate of number of transactions all arriving simultaneously to the pool
newTxs = make(chan core.NewTxsEvent, 15)
sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
)
// Arbitrum: we make a channel with dynamic capacity (UnlimitedCapChannel[core.NewTxsEvent]) and subscribe to tx-events on the SendSide
// and read events on the ReceiveSide because by the current design of simulated beacon
// it would deadlock with this cycle a.sim.Commit() -> txpool.Sync() -> subpools reset -> update feeds (newTxs is one of the receivers)
uc := NewUnlimitedCapChannel[core.NewTxsEvent](5)
sub := a.sim.eth.TxPool().SubscribeTransactions(uc.SendSide, true)
defer sub.Unsubscribe()

for {
Expand All @@ -49,7 +75,7 @@ func (a *api) loop() {
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
}
case <-newTxs:
case <-uc.ReceiveSide:
a.sim.Commit()
}
}
Expand Down
39 changes: 26 additions & 13 deletions eth/catalyst/simulated_beacon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
}
}

func TestSimulatedBeaconAPIDeadlocksInExtremeConditions(t *testing.T) {
txs := make(map[common.Hash]types.Transaction)
// Tests that zero-period dev mode can handle a lot of simultaneous
// transactions/withdrawals
func TestOnDemandSpam(t *testing.T) {
var withdrawals []types.Withdrawal
txs := make(map[common.Hash]*types.Transaction)

var (
// testKey is a private key to use for funding a tester account.
Expand All @@ -158,45 +161,55 @@ func TestSimulatedBeaconAPIDeadlocksInExtremeConditions(t *testing.T) {
_ = mock
defer node.Close()

// simulated beacon api
mockApi := &api{mock}
go mockApi.loop()
api := &api{mock}
go api.loop()

chainHeadCh := make(chan core.ChainHeadEvent, 10)
subscription := ethService.BlockChain().SubscribeChainHeadEvent(chainHeadCh)
defer subscription.Unsubscribe()

// generate a bunch of transactions to overload simulated beacon api
// current capacity of core.NewTxsEvent channel is 15, we send 30 txs
// generate some withdrawals
for i := 0; i < 0; i++ {
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
if err := mock.withdrawals.add(&withdrawals[i]); err != nil {
t.Fatal("addWithdrawal failed", err)
}
}

// generate a bunch of transactions
signer := types.NewEIP155Signer(ethService.BlockChain().Config().ChainID)
for i := 0; i < 30; i++ {
for i := 0; i < 20000; i++ {
tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee), nil), signer, testKey)
if err != nil {
t.Fatalf("error signing transaction, err=%v", err)
}
txs[tx.Hash()] = *tx
txs[tx.Hash()] = tx

if err := ethService.APIBackend.SendTx(context.Background(), tx); err != nil {
t.Fatal("SendTx failed", err)
}
}

includedTxs := make(map[common.Hash]struct{})
var includedWithdrawals []uint64

timer := time.NewTimer(12 * time.Second)
timer := time.NewTimer(20 * time.Second)
for {
select {
case evt := <-chainHeadCh:
for _, includedTx := range evt.Block.Transactions() {
includedTxs[includedTx.Hash()] = struct{}{}
}
for _, includedWithdrawal := range evt.Block.Withdrawals() {
includedWithdrawals = append(includedWithdrawals, includedWithdrawal.Index)
}

// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) {
t.Fatal("all txs were included, the simulated beacon api did not deadlock")
if len(includedTxs) == len(txs) && len(includedWithdrawals) == len(withdrawals) {
return
}
case <-timer.C:
return
t.Fatal("timed out without including all withdrawals/txs")
}
}
}

0 comments on commit cf65344

Please sign in to comment.