Skip to content

Commit

Permalink
Remove delay when not stopping on reshard
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 10, 2024
1 parent cd3b408 commit 784a84b
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,22 +708,28 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}
if je != nil {
// We're going to be ending the tablet stream, so we ensure a reasonable
// minimum amount of time is alloted for clients to Recv the journal event
// before the stream's context is cancelled (which would cause the grpc
// SendMsg or RecvMsg to fail). If the client doesn't Recv the journal
// event before the stream ends then they'll have to resume from the last
// ShardGtid they received before the journal event.
endTimer := time.NewTimer(stopOnReshardDelay)
defer endTimer.Stop()
var endTimer *time.Timer
if vs.stopOnReshard {
// We're going to be ending the tablet stream, along with the VStream, so
// we ensure a reasonable minimum amount of time is alloted for clients
// to Recv the journal event before the VStream's context is cancelled
// (which would cause the grpc SendMsg or RecvMsg to fail). If the client
// doesn't Recv the journal event before the VStream ends then they'll
// have to resume from the last ShardGtid they received before the
// journal event.
endTimer = time.NewTimer(stopOnReshardDelay)
defer endTimer.Stop()
}
// Wait until all other participants converge and then return EOF after
// the minimum delay has passed.
// any minimum delay has passed.
journalDone = je.done
select {
case <-ctx.Done():
return ctx.Err()
case <-journalDone:
<-endTimer.C
if endTimer != nil {
<-endTimer.C
}
return io.EOF
}
}
Expand Down

0 comments on commit 784a84b

Please sign in to comment.