Skip to content

Commit

Permalink
feat: detect corrupted checkpoint when checkpoint seqNo bigger then l…
Browse files Browse the repository at this point in the history
…atest
  • Loading branch information
erayarslan committed Jul 20, 2024
1 parent c06c48a commit f766145
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
2 changes: 1 addition & 1 deletion couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (so *observer) persistSeqNoChangedListener(persistSeqNo models.PersistSeqNo
so.persistSeqNo.Store(persistSeqNo.VbID, persistSeqNo.SeqNo)
}
} else {
logger.Log.Trace("persistSeqNo: %v on vbId: %v", persistSeqNo.SeqNo, persistSeqNo.VbID)
logger.Log.Trace("persistSeqNo: %v on vbID: %v", persistSeqNo.SeqNo, persistSeqNo.VbID)
}
}

Expand Down
6 changes: 3 additions & 3 deletions couchbase/rollback_mitigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *rollbackMitigation) observeVbID(
Deadline: time.Now().Add(time.Second * 5),
}, callback)
if err != nil {
logger.Log.Error("observeVBID error for vbId: %v, replica:%v, vbUUID: %v, err: %v", vbID, replica, vbUUID, err)
logger.Log.Error("observeVBID error for vbID: %v, replica:%v, vbUUID: %v, err: %v", vbID, replica, vbUUID, err)
callback(nil, err)
}
}
Expand Down Expand Up @@ -170,15 +170,15 @@ func (r *rollbackMitigation) markAbsentInstances() error { //nolint:unused
serverIndex, err := r.configSnapshot.VbucketToServer(vbID, uint32(idx))
if err != nil {
if errors.Is(err, gocbcore.ErrInvalidReplica) {
logger.Log.Debug("invalid replica of vbId: %v, replica: %v, err: %v", vbID, idx, err)
logger.Log.Debug("invalid replica of vbID: %v, replica: %v, err: %v", vbID, idx, err)
replica.SetAbsent()
} else {
outerError = err
return false
}
} else {
if serverIndex < 0 {
logger.Log.Debug("invalid server index of vbId: %v, replica: %v, serverIndex: %v", vbID, idx, serverIndex)
logger.Log.Debug("invalid server index of vbID: %v, replica: %v, serverIndex: %v", vbID, idx, serverIndex)
replica.SetAbsent()
}
}
Expand Down
24 changes: 18 additions & 6 deletions stream/checkpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"errors"
"sync"
"time"

Expand Down Expand Up @@ -109,6 +110,7 @@ func (s *checkpoint) Save() {
}
}

//nolint:funlen
func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset], *wrapper.ConcurrentSwissMap[uint16, bool], bool) {
s.loadLock.Lock()
defer s.loadLock.Unlock()
Expand All @@ -121,19 +123,19 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
panic(err)
}

seqNoMap, err := s.client.GetVBucketSeqNos()
if err != nil {
logger.Log.Error("error while getting vBucket seqNos, err: %v", err)
panic(err)
}

offsets := wrapper.CreateConcurrentSwissMap[uint16, *models.Offset](1024)
dirtyOffsets := wrapper.CreateConcurrentSwissMap[uint16, bool](1024)
anyDirtyOffset := false

if !exist && s.config.Checkpoint.AutoReset == CheckpointAutoResetTypeLatest {
logger.Log.Debug("no checkpoint found, auto reset checkpoint to latest")

seqNoMap, err := s.client.GetVBucketSeqNos()
if err != nil {
logger.Log.Error("error while getting vBucket seqNos, err: %v", err)
panic(err)
}

dump.Range(func(vbID uint16, doc *models.CheckpointDocument) bool {
currentSeqNo, _ := seqNoMap.Load(vbID)

Expand All @@ -158,6 +160,16 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
}

dump.Range(func(vbID uint16, doc *models.CheckpointDocument) bool {
latestSeqNo, _ := seqNoMap.Load(vbID)
if doc.Checkpoint.SeqNo > latestSeqNo {
err := errors.New("checkpoint seqNo bigger then vBucket latest seqNo")
logger.Log.Error(
"error while loading checkpoint, vbID: %v, checkpoint seqNo: %v, latest seqNo: %v, err: %v",
vbID, doc.Checkpoint.SeqNo, latestSeqNo, err,
)
panic(err)
}

offsets.Store(vbID, &models.Offset{
SnapshotMarker: &models.SnapshotMarker{
StartSeqNo: doc.Checkpoint.Snapshot.StartSeqNo,
Expand Down
8 changes: 4 additions & 4 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
s.offsets.Store(vbID, offset)
s.dirtyOffsets.Store(vbID, dirty)
} else {
logger.Log.Warn("vbId=%v not belong our vbId range", vbID)
logger.Log.Warn("vbID: %v not belong our vbId range", vbID)
}
}

Expand Down Expand Up @@ -164,14 +164,14 @@ func (s *stream) listenEnd() {
for endContext := range s.observer.ListenEnd() {
if !s.closeWithCancel && endContext.Err != nil {
if !errors.Is(endContext.Err, gocbcore.ErrDCPStreamClosed) {
logger.Log.Error("end stream vbId: %v got error: %v", endContext.Event.VbID, endContext.Err)
logger.Log.Error("end stream vbID: %v got error: %v", endContext.Event.VbID, endContext.Err)
} else {
logger.Log.Debug("end stream vbId: %v got error: %v", endContext.Event.VbID, endContext.Err)
logger.Log.Debug("end stream vbID: %v got error: %v", endContext.Event.VbID, endContext.Err)
}
}

if endContext.Err == nil {
logger.Log.Debug("end stream vbId: %v", endContext.Event.VbID)
logger.Log.Debug("end stream vbID: %v", endContext.Event.VbID)
}

if !s.closeWithCancel && endContext.Err != nil &&
Expand Down

0 comments on commit f766145

Please sign in to comment.