Skip to content

Commit

Permalink
test(statesync): fix flaky TestReactor_Backfill (#773)
Browse files Browse the repository at this point in the history
* fix(statesync): invalid error handling in statesync backfill

* test(statesync): fix flaky TestReactor_Backfill
  • Loading branch information
lklimek authored Apr 4, 2024
1 parent 53ec39b commit e4c934f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
1 change: 1 addition & 0 deletions internal/statesync/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (q *blockQueue) nextHeight() <-chan int64 {
return ch
}

// we check initialHeight instead of startHeight as also need to address the startTime which we don't have here
if q.terminal == nil && q.fetchHeight >= q.initialHeight {
// return and decrement the fetch height
ch <- q.fetchHeight
Expand Down
45 changes: 25 additions & 20 deletions internal/statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,38 +489,42 @@ func (r *Reactor) backfill(
return
}
r.logger.Debug("fetching next block", "height", height, "peer", peer)
lb, err := func() (*types.LightBlock, error) {
subCtx, reqCancel := context.WithTimeout(ctxWithCancel, lightBlockResponseTimeout)
defer reqCancel()
// request the light block with a timeout
return r.dispatcher.LightBlock(subCtx, height, peer)
}()
if lb == nil {
r.logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height)
queue.retry(height)
// As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
// have any prior ones, thus we remove it from the peer list.
continue
}
// once the peer has returned a value, add it back to the peer list to be used again
r.peers.Append(peer)
if errors.Is(err, context.Canceled) {
return
}
// request the light block with a timeout
subCtx, subCtxCancel := context.WithTimeout(ctxWithCancel, lightBlockResponseTimeout)
lb, err := r.dispatcher.LightBlock(subCtx, height, peer)
subCtxCancel()

if err != nil {
queue.retry(height)
if errors.Is(err, errNoConnectedPeers) {
r.logger.Info("backfill: no connected peers to fetch light blocks from; sleeping...",
"sleepTime", sleepTime)
time.Sleep(sleepTime)
} else {
} else if errors.Is(err, context.DeadlineExceeded) {
// we don't punish the peer as it might just have not responded in time
r.logger.Info("backfill: error with fetching light block",
// In future, we might want to consider a backoff strategy
r.logger.Debug("backfill: peer didn't respond on time",
"height", height, "peer", peer, "error", err)
r.peers.Append(peer)
} else {
r.logger.Info("backfill: error fetching light block",
"height", height,
"error", err)
}
continue
}
if lb == nil {
r.logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height, "peers_outstanding", r.peers.Len())
queue.retry(height)
// As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
// have any prior ones, thus we remove it from the peer list.
continue
}
// once the peer has returned a value, add it back to the peer list to be used again
r.peers.Append(peer)
if errors.Is(err, context.Canceled) {
return
}

// run a validate basic. This checks the validator set and commit
// hashes line up
Expand All @@ -534,6 +538,7 @@ func (r *Reactor) backfill(
NodeID: peer,
Err: fmt.Errorf("received invalid light block: %w", err),
}); serr != nil {
r.logger.Error("backfill: failed to send block error", "error", serr)
return
}
continue
Expand Down
33 changes: 27 additions & 6 deletions internal/statesync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestReactor_Sync(t *testing.T) {

appHash := []byte{1, 2, 3}

go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0, 0)
go graduallyAddPeers(ctx, t, rts.peerUpdateCh, closeCh, 1*time.Second)
go handleSnapshotRequests(ctx, t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
{
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestReactor_BlockProviders(t *testing.T) {
defer close(closeCh)

chain := buildLightBlockChain(ctx, t, 1, 10, time.Now(), rts.privVal)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0, 0)

peers := rts.reactor.peers.All()
require.Len(t, peers, 2)
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
defer close(closeCh)

chain := buildLightBlockChain(ctx, t, 1, 10, time.Now(), rts.privVal)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0, 0)
go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)

rts.reactor.cfg.UseP2P = true
Expand Down Expand Up @@ -689,7 +689,7 @@ func TestReactor_Backfill(t *testing.T) {

for _, tc := range testCases {
t.Run(fmt.Sprintf("failure rate: %d", tc.failureRate), func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
Expand Down Expand Up @@ -728,7 +728,7 @@ func TestReactor_Backfill(t *testing.T) {

closeCh := make(chan struct{})
defer close(closeCh)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, tc.failureRate)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, tc.failureRate, uint64(stopHeight))

err := rts.reactor.backfill(
ctx,
Expand Down Expand Up @@ -775,14 +775,29 @@ func retryUntil(ctx context.Context, t *testing.T, fn func() bool, timeout time.
}
}

// handleLightBlockRequests will handle light block requests and respond with the appropriate light block
// based on the height of the request. It will also simulate failures based on the failure rate.
// The function will return when the context is done.
// # Arguments
// * `ctx` - the context
// * `t` - the testing.T instance
// * `chain` - the light block chain
// * `receiving` - the channel to receive requests
// * `sending` - the channel to send responses
// * `close` - the channel to close the function
// * `failureRate` - the rate of failure
// * `stopHeight` - minimum height for which to respond; below this height, the function will not respond to requests,
// causing timeouts. Use 0 to disable this mechanism.
func handleLightBlockRequests(
ctx context.Context,
t *testing.T,
chain map[int64]*types.LightBlock,
receiving chan p2p.Envelope,
sending chan p2p.Envelope,
close chan struct{},
failureRate int) {
failureRate int,
stopHeight uint64,
) {
requests := 0
errorCount := 0
for {
Expand All @@ -791,6 +806,12 @@ func handleLightBlockRequests(
return
case envelope := <-receiving:
if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
if msg.Height < stopHeight {
// this causes timeout; needed for backfill tests
// to ensure heights below stopHeight are not processed
// before all heights above stopHeight are processed
continue
}
if requests%10 >= failureRate {
lb, err := chain[int64(msg.Height)].ToProto()
require.NoError(t, err)
Expand Down

0 comments on commit e4c934f

Please sign in to comment.