Skip to content

Commit

Permalink
fix: rollback mitigation for loop wait
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Apr 2, 2024
1 parent 588e704 commit 62515ff
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
2 changes: 1 addition & 1 deletion couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (so *observer) persistSeqNoChangedListener(persistSeqNo models.PersistSeqNo
so.persistSeqNo.Store(persistSeqNo.VbID, persistSeqNo.SeqNo)
}
} else {
logger.Log.Debug("persistSeqNo: %v on vbId: %v", persistSeqNo.SeqNo, persistSeqNo.VbID)
logger.Log.Trace("persistSeqNo: %v on vbId: %v", persistSeqNo.SeqNo, persistSeqNo.VbID)
}
}

Expand Down
20 changes: 12 additions & 8 deletions couchbase/rollback_mitigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (r *rollbackMitigation) getMinSeqNo(vbID uint16) gocbcore.SeqNo { //nolint:
}

if vbUUID != replica.vbUUID {
logger.Log.Debug("vbUUID mismatch %v != %v for %v index of %v", vbUUID, replica.vbUUID, idx, len(replicas))
logger.Log.Trace("vbUUID mismatch %v != %v for %v index of %v", vbUUID, replica.vbUUID, idx, len(replicas))
return 0
}

Expand Down Expand Up @@ -205,24 +205,25 @@ func (r *rollbackMitigation) startObserve(groupID int) {
r.persistedSeqNos.Range(func(vbID uint16, replicas []*vbUUIDAndSeqNo) bool {
for idx, replica := range replicas {
if replica.IsAbsent() {
wg.Done()
continue
}

if r.closed || r.activeGroupID != groupID {
logger.Log.Debug("closed(%v) or groupID(%v!=%v) changed on startObserve", r.closed, r.activeGroupID, groupID)
return false
wg.Done()
} else {
vbUUID, _ := r.vbUUIDMap.Load(vbID)
r.observe(vbID, idx, groupID, vbUUID, wg)
}

vbUUID, _ := r.vbUUIDMap.Load(vbID)
r.observe(vbID, idx, groupID, vbUUID, wg)
}

return true
})

wg.Wait()
case <-r.observeCloseCh:
logger.Log.Debug("observe close triggered")
logger.Log.Debug("observe close trigger received")
r.observeCloseDoneCh <- struct{}{}
return
}
Expand All @@ -245,7 +246,7 @@ func (r *rollbackMitigation) loadVbUUID(vbID uint16) error {
)
}

logger.Log.Debug(
logger.Log.Trace(
"observing vbID: %v, vbUUID: %v, failoverInfo: %v",
vbID, failoverLogs[0].VbUUID, strings.Join(failoverInfos, ", "),
)
Expand Down Expand Up @@ -276,9 +277,10 @@ func (r *rollbackMitigation) reconfigure() {

if r.observeTimer != nil {
r.observeTimer.Stop()
logger.Log.Debug("observe close triggered from reconfigure")
r.observeCloseCh <- struct{}{}
<-r.observeCloseDoneCh
logger.Log.Debug("observe close done")
logger.Log.Debug("observe close done from reconfigure")
}

r.activeGroupID++
Expand Down Expand Up @@ -419,8 +421,10 @@ func (r *rollbackMitigation) Stop() {

if r.observeTimer != nil {
r.observeTimer.Stop()
logger.Log.Debug("observe close triggered from stop")
r.observeCloseCh <- struct{}{}
<-r.observeCloseDoneCh
logger.Log.Debug("observe close done from stop")
}

logger.Log.Info("rollback mitigation stopped")
Expand Down

0 comments on commit 62515ff

Please sign in to comment.