Skip to content

Commit

Permalink
remove backoff and rely simply on channel for flow control (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
stlava authored Sep 1, 2023
1 parent 3612dc4 commit e966ade
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 69 deletions.
2 changes: 1 addition & 1 deletion app/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func New(shutdownHandler shutdown.ShutdownHandler,
txnsWritten := make(chan *ordered_map.OrderedMap, clientBufferSize*5)

// Initialize in reverse order
replicationClient := client.New(shutdownHandler, statsChan, connManager, clientBufferSize)
replicationClient := client.New(shutdownHandler, statsChan, connManager, clientBufferSize, client.DefaultProgressFreq)

filterInstance := filter.New(
shutdownHandler,
Expand Down
65 changes: 20 additions & 45 deletions replication/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ var (
logger = logrus.New()
log = logger.WithField("package", "client")
logProgressInterval = int64(30 * time.Second)

// Settings for exponential sleep time. This prevents spinning when
// there is a backlog.
initialSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
DefaultProgressFreq = 10 * time.Second
)

func init() {
Expand All @@ -62,6 +58,7 @@ type Replicator struct {
outputChan chan *replication.WalMessage
progressLastSent int64
stoppedChan chan struct{}
progressFreq time.Duration

// handlePrimaryKeepaliveMessage
lastClientHeartbeatRequestTime time.Time
Expand All @@ -80,7 +77,8 @@ type Replicator struct {
func New(shutdownHandler shutdown.ShutdownHandler,
statsChan chan stats.Stat,
connManager conn.ManagerInterface,
clientBufferSize int) Replicator {
clientBufferSize int,
progressFreq time.Duration) Replicator {

return Replicator{
shutdownHandler: shutdownHandler,
Expand All @@ -90,6 +88,7 @@ func New(shutdownHandler shutdown.ShutdownHandler,
outputChan: make(chan *replication.WalMessage, clientBufferSize),
progressLastSent: int64(0),
stoppedChan: make(chan struct{}),
progressFreq: progressFreq,

// handlePrimaryKeepaliveMessage
lastClientHeartbeatRequestTime: time.Now(),
Expand Down Expand Up @@ -271,7 +270,7 @@ func (c *Replicator) Start(progressChan <-chan uint64) {
c.overallProgress = uint64(pkm.ServerWALEnd)

// Ticker to force update of progress
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(c.progressFreq)
defer ticker.Stop()

// Loop reading replication messages
Expand Down Expand Up @@ -350,7 +349,7 @@ func (c *Replicator) Start(progressChan <-chan uint64) {
case pglogrepl.PrimaryKeepaliveMessageByteID:
err = c.handlePrimaryKeepaliveMessage(t.Data[1:])
case pglogrepl.XLogDataByteID:
err = c.handleXLogData(t.Data[1:])
err = c.handleXLogData(t.Data[1:], ticker)
default:
continue
}
Expand Down Expand Up @@ -468,7 +467,7 @@ func (c *Replicator) handlePrimaryKeepaliveMessage(data []byte) error {
return nil
}

func (c *Replicator) handleXLogData(data []byte) error {
func (c *Replicator) handleXLogData(data []byte, progressTicker *time.Ticker) error {
xld, err := pglogrepl.ParseXLogData(data)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -545,48 +544,24 @@ func (c *Replicator) handleXLogData(data []byte) error {
wal.Pr.Transaction = c.transaction
wal.TimeBasedKey = c.timeBasedKey

// Attempt to write to channel. If full then send a keepalive and attempt
// to write to channel again.
var sleepTotal time.Duration
err = func() error {
var curSleep = initialSleep

for {
select {
case c.outputChan <- wal:
// Break out to read the next message
return nil
default:
// Send keepalive if channel is full
if err := c.handleProgress(true); err != nil {
// propagate error
return err
}

if curSleep > maxSleep {
curSleep = initialSleep
}
// Write to output channel and handle updating progress if we're blocked
WriteLoop:
for {
select {
case c.outputChan <- wal:

sleepTotal += curSleep
// Break out to read the next message
break WriteLoop

// Sleep here to prevent spinning.
time.Sleep(curSleep)
curSleep = curSleep * 2
case <-progressTicker.C:
if errP := c.handleProgress(true); errP != nil {
// propagate error
return errP
}
}
}()

if err != nil {
return err
}

now := time.Now().UnixNano()
c.statsChan <- stats.NewStatCount("replication", "received", 1, now)

if sleepTotal > 0 {
c.statsChan <- stats.NewStatHistogram("replication", "blocked", sleepTotal.Milliseconds(), now, "ms")
}

c.statsChan <- stats.NewStatCount("replication", "received", 1, time.Now().UnixNano())
return nil
}

Expand Down
50 changes: 27 additions & 23 deletions replication/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func getBasicTestSetup(test *testing.T) (*gomock.Controller, Replicator, chan ui
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 2)
replicator := New(sh, statsChan, mockManager, 2, DefaultProgressFreq)

// Setup return
mockManager.EXPECT().GetConnWithStartLsn(gomock.Any(), uint64(0)).Return(mockConn, nil).Times(1)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestWalMessage(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
stoppedChan := replicator.GetStoppedChan()

// Setup return
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestReplicationError(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
stoppedChan := replicator.GetStoppedChan()

// Setup return
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestMsgNotCopyData(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)

// Setup return
serverWalEnd := uint64(111)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestNilMessage(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
stoppedChan := replicator.GetStoppedChan()

// Setup return
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestStartValidMessage(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()
stoppedChan := replicator.GetStoppedChan()

Expand Down Expand Up @@ -400,7 +400,7 @@ func TestTxnsMessage(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()
stoppedChan := replicator.GetStoppedChan()

Expand Down Expand Up @@ -485,7 +485,7 @@ func TestNoCommit(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()
stoppedChan := replicator.GetStoppedChan()

Expand Down Expand Up @@ -556,7 +556,7 @@ func TestDupWalStart(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()
stoppedChan := replicator.GetStoppedChan()

Expand Down Expand Up @@ -621,7 +621,7 @@ func TestStartInvalidWalString(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()

// walData.WalString:
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestStartNilWalMessage(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()
stoppedChan := replicator.GetStoppedChan()

Expand Down Expand Up @@ -753,7 +753,7 @@ func TestStartWithSendStandbyStatus(t *testing.T) {
expectChan := make(chan interface{}, 100)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
stoppedChan := replicator.GetStoppedChan()

mockConn.EXPECT().ReceiveMessage(gomock.Any()).Return(getPrimaryKeepaliveMessage(uint64(10)), nil).Times(1)
Expand Down Expand Up @@ -805,7 +805,7 @@ func TestClosedProgressChan(t *testing.T) {

sh := shutdown.NewShutdownHandler()

replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
out := replicator.GetOutputChan()

go replicator.Start(progChan)
Expand Down Expand Up @@ -853,7 +853,7 @@ func TestStartOutputChannelFull(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 1)
replicator := New(sh, statsChan, mockManager, 1, 70*time.Millisecond)
out := replicator.GetOutputChan()
stoppedChan := replicator.GetStoppedChan()

Expand All @@ -879,7 +879,7 @@ func TestStartOutputChannelFull(t *testing.T) {
go replicator.Start(progChan)

// Wait for replicator to run
time.Sleep(5 * time.Millisecond)
time.Sleep(1 * time.Second)

select {
case _, ok := <-out:
Expand Down Expand Up @@ -908,7 +908,7 @@ func TestDeadlineExceededTwice(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
stoppedChan := replicator.GetStoppedChan()

// Setup return
Expand Down Expand Up @@ -953,7 +953,7 @@ func TestWithMultipleProgress(t *testing.T) {

sh := shutdown.NewShutdownHandler()

replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)
stoppedChan := replicator.GetStoppedChan()

// CommitWalStart from server
Expand Down Expand Up @@ -1035,7 +1035,7 @@ func TestDeadlineExceeded(t *testing.T) {
mockManager.EXPECT().Close().Times(1)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)

go replicator.Start(progChan)

Expand Down Expand Up @@ -1099,7 +1099,7 @@ func TestGetConnectionError(t *testing.T) {
mockManager.EXPECT().Close().Times(1)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)

go replicator.Start(progChan)

Expand Down Expand Up @@ -1170,7 +1170,7 @@ func TestProgressChanClosed(t *testing.T) {
statsChan := make(chan stats.Stat, 1000)

sh := shutdown.NewShutdownHandler()
replicator := New(sh, statsChan, mockManager, 10)
replicator := New(sh, statsChan, mockManager, 10, DefaultProgressFreq)

mockManager.EXPECT().GetConnWithStartLsn(gomock.Any(), gomock.Any()).Return(mockConn, nil).MinTimes(1)

Expand Down Expand Up @@ -1335,6 +1335,7 @@ func TestSendKeepaliveChanFull(t *testing.T) {
// Setup mock
mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t)
sh := replicator.shutdownHandler
replicator.progressFreq = 70 * time.Millisecond
stoppedChan := replicator.GetStoppedChan()
defer mockCtrl.Finish()
mockManager.EXPECT().GetConnWithStartLsn(gomock.Any(), gomock.Any()).Return(mockConn, nil).Times(4)
Expand Down Expand Up @@ -1366,7 +1367,7 @@ func TestSendKeepaliveChanFull(t *testing.T) {

// Wait for expect
select {
case <-time.After(100 * time.Millisecond):
case <-time.After(1 * time.Second):
assert.Fail(t, "did not pass test in time")
case <-expectChan:
// pass
Expand All @@ -1381,6 +1382,8 @@ func TestSendKeepaliveChanFullError(t *testing.T) {
mockCtrl, replicator, progChan, mockManager, mockConn := getBasicTestSetup(t)
defer mockCtrl.Finish()

replicator.progressFreq = 70 * time.Millisecond

expectChan := make(chan interface{}, 1)

mockManager.EXPECT().GetConnWithStartLsn(gomock.Any(), gomock.Any()).Return(mockConn, nil).Times(3)
Expand Down Expand Up @@ -1551,6 +1554,7 @@ func TestRecoveryFailed(t *testing.T) {
defer mockCtrl.Finish()
sh := replicator.shutdownHandler
stoppedChan := replicator.GetStoppedChan()
replicator.progressFreq = 70 * time.Millisecond

mockManager.EXPECT().GetConnWithStartLsn(gomock.Any(), gomock.Any()).Return(mockConn, nil).Times(1)

Expand All @@ -1565,17 +1569,17 @@ func TestRecoveryFailed(t *testing.T) {
}).Times(1)
err := errors.New("expected error")
mockManager.EXPECT().GetConn(gomock.Any()).Return(nil, err).Times(1)
mockManager.EXPECT().Close().Times(1)

// Run
go replicator.Start(progChan)
time.Sleep(5 * time.Millisecond)

// Wait for closing cleanup
mockManager.EXPECT().Close().Times(1)
sh.CancelFunc()

// Add a little delay to ensure shutdown ran
var timeout = time.NewTimer(100 * time.Millisecond)
var timeout = time.NewTimer(1 * time.Second)

// Check to see if shutdown closed output channel
select {
Expand Down

0 comments on commit e966ade

Please sign in to comment.