Skip to content

Commit

Permalink
fix: corrupted checkpoint handling
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Aug 3, 2024
1 parent f766145 commit 69261bd
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
10 changes: 5 additions & 5 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client interface {
Close()
DcpConnect(useExpiryOpcode bool, useChangeStreams bool) error
DcpClose()
GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64], error)
GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error)
GetNumVBuckets() int
GetFailoverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error
Expand Down Expand Up @@ -417,7 +417,7 @@ func (s *client) DcpClose() {
logger.Log.Info("dcp connection closed %s", s.config.Hosts)
}

func (s *client) GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
func (s *client) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
snapshot, err := s.GetDcpAgentConfigSnapshot()
if err != nil {
return nil, err
Expand All @@ -432,7 +432,7 @@ func (s *client) GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64]

seqNos := wrapper.CreateConcurrentSwissMap[uint16, uint64](1024)

hasCollectionSupport := s.dcpAgent.HasCollectionsSupport()
hasCollectionSupport := awareCollection && s.dcpAgent.HasCollectionsSupport()

cIds := s.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames)
collectionIDs := make([]uint32, 0, len(cIds))
Expand Down Expand Up @@ -554,15 +554,15 @@ func (s *client) openStreamWithRollback(vbID uint16,

failoverLogs, err := s.GetFailoverLogs(vbID)
if err != nil {
logger.Log.Error("error while get failover logs, err: %v", err)
logger.Log.Error("error while get failover logs when rollback, err: %v", err)
return err
}

var targetUUID gocbcore.VbUUID = 0

for i := len(failoverLogs) - 1; i >= 0; i-- {
log := failoverLogs[i]
if rollbackSeqNo > log.SeqNo {
if rollbackSeqNo >= log.SeqNo {
targetUUID = log.VbUUID
}
}
Expand Down
2 changes: 1 addition & 1 deletion metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
return
}

seqNoMap, err := s.client.GetVBucketSeqNos()
seqNoMap, err := s.client.GetVBucketSeqNos(true)

observer.GetPersistSeqNo().Range(func(vbID uint16, seqNo gocbcore.SeqNo) bool {
ch <- prometheus.MustNewConstMetric(
Expand Down
10 changes: 8 additions & 2 deletions stream/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
panic(err)
}

seqNoMap, err := s.client.GetVBucketSeqNos()
seqNoMap, err := s.client.GetVBucketSeqNos(false)
if err != nil {
logger.Log.Error("error while getting vBucket seqNos, err: %v", err)
panic(err)
Expand All @@ -144,12 +144,18 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
anyDirtyOffset = true
}

failOverLogs, err := s.client.GetFailoverLogs(vbID)
if err != nil {
logger.Log.Error("error while get failover logs when initialize latest, err: %v", err)
panic(err)
}

offsets.Store(vbID, &models.Offset{
SnapshotMarker: &models.SnapshotMarker{
StartSeqNo: currentSeqNo,
EndSeqNo: currentSeqNo,
},
VbUUID: gocbcore.VbUUID(doc.Checkpoint.VbUUID),
VbUUID: failOverLogs[0].VbUUID,
SeqNo: currentSeqNo,
})

Expand Down

0 comments on commit 69261bd

Please sign in to comment.