From 69261bd422af33916e1176cd4abb1806be2e87c2 Mon Sep 17 00:00:00 2001 From: Eray Arslan Date: Sat, 3 Aug 2024 13:54:45 +0300 Subject: [PATCH] fix: corrupted checkpoint handling --- couchbase/client.go | 10 +++++----- metric/collector.go | 2 +- stream/checkpoint.go | 10 ++++++++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/couchbase/client.go b/couchbase/client.go index ec3ccd3..1ef370c 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -35,7 +35,7 @@ type Client interface { Close() DcpConnect(useExpiryOpcode bool, useChangeStreams bool) error DcpClose() - GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64], error) + GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) GetNumVBuckets() int GetFailoverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error) OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error @@ -417,7 +417,7 @@ func (s *client) DcpClose() { logger.Log.Info("dcp connection closed %s", s.config.Hosts) } -func (s *client) GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64], error) { +func (s *client) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) { snapshot, err := s.GetDcpAgentConfigSnapshot() if err != nil { return nil, err @@ -432,7 +432,7 @@ func (s *client) GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64] seqNos := wrapper.CreateConcurrentSwissMap[uint16, uint64](1024) - hasCollectionSupport := s.dcpAgent.HasCollectionsSupport() + hasCollectionSupport := awareCollection && s.dcpAgent.HasCollectionsSupport() cIds := s.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames) collectionIDs := make([]uint32, 0, len(cIds)) @@ -554,7 +554,7 @@ func (s *client) openStreamWithRollback(vbID uint16, failoverLogs, err := s.GetFailoverLogs(vbID) if err != nil { - logger.Log.Error("error while get failover logs, err: %v", err) + logger.Log.Error("error while get failover logs when rollback, err: %v", err) return err } @@ -562,7 +562,7 @@ func (s *client) openStreamWithRollback(vbID uint16, for i := len(failoverLogs) - 1; i >= 0; i-- { log := failoverLogs[i] - if rollbackSeqNo > log.SeqNo { + if rollbackSeqNo >= log.SeqNo { targetUUID = log.VbUUID } } diff --git a/metric/collector.go b/metric/collector.go index 1995977..ded78f4 100644 --- a/metric/collector.go +++ b/metric/collector.go @@ -61,7 +61,7 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { return } - seqNoMap, err := s.client.GetVBucketSeqNos() + seqNoMap, err := s.client.GetVBucketSeqNos(true) observer.GetPersistSeqNo().Range(func(vbID uint16, seqNo gocbcore.SeqNo) bool { ch <- prometheus.MustNewConstMetric( diff --git a/stream/checkpoint.go b/stream/checkpoint.go index c032a58..80fac18 100644 --- a/stream/checkpoint.go +++ b/stream/checkpoint.go @@ -123,7 +123,7 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset] panic(err) } - seqNoMap, err := s.client.GetVBucketSeqNos() + seqNoMap, err := s.client.GetVBucketSeqNos(false) if err != nil { logger.Log.Error("error while getting vBucket seqNos, err: %v", err) panic(err) @@ -144,12 +144,18 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset] anyDirtyOffset = true } + failOverLogs, err := s.client.GetFailoverLogs(vbID) + if err != nil { + logger.Log.Error("error while get failover logs when initialize latest, err: %v", err) + panic(err) + } + offsets.Store(vbID, &models.Offset{ SnapshotMarker: &models.SnapshotMarker{ StartSeqNo: currentSeqNo, EndSeqNo: currentSeqNo, }, - VbUUID: gocbcore.VbUUID(doc.Checkpoint.VbUUID), + VbUUID: failOverLogs[0].VbUUID, SeqNo: currentSeqNo, })