Skip to content

Commit

Permalink
fix: conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Aug 3, 2024
2 parents d5b1464 + 69261bd commit 9831b5d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 14 deletions.
22 changes: 11 additions & 11 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Client interface {
Close()
DcpConnect(useExpiryOpcode bool, useChangeStreams bool) error
DcpClose()
GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64], error)
GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error)
GetNumVBuckets() int
GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error
Expand Down Expand Up @@ -164,7 +164,7 @@ func CreateSecurityConfig(username string, password string, secureConnection boo

func CreateAgent(httpAddresses []string, bucketName string,
username string, password string, secureConnection bool, rootCAPath string,
maxQueueSize int, connectionBufferSize uint, connectionTimeout time.Duration,
maxQueueSize int, poolSize int, connectionBufferSize uint, connectionTimeout time.Duration,
) (*gocbcore.Agent, error) {
agent, err := gocbcore.CreateAgent(
&gocbcore.AgentConfig{
Expand All @@ -181,6 +181,7 @@ func CreateAgent(httpAddresses []string, bucketName string,
},
KVConfig: gocbcore.KVConfig{
MaxQueueSize: maxQueueSize,
PoolSize: poolSize,
ConnectionBufferSize: connectionBufferSize,
},
},
Expand Down Expand Up @@ -212,12 +213,12 @@ func CreateAgent(httpAddresses []string, bucketName string,
}

func (s *client) connect(bucketName string,
maxQueueSize int, connectionBufferSize uint, connectionTimeout time.Duration,
maxQueueSize int, poolSize int, connectionBufferSize uint, connectionTimeout time.Duration,
) (*gocbcore.Agent, error) {
return CreateAgent(
s.config.Hosts, bucketName, s.config.Username, s.config.Password,
s.config.SecureConnection, s.config.RootCAPath,
maxQueueSize, connectionBufferSize, connectionTimeout,
maxQueueSize, poolSize, connectionBufferSize, connectionTimeout,
)
}

Expand Down Expand Up @@ -263,8 +264,7 @@ func (s *client) Connect() error {
}
}

// when u set maxQueueSize to 0, gocbcore will be use default value
agent, err := s.connect(s.config.BucketName, 0, connectionBufferSize, connectionTimeout)
agent, err := s.connect(s.config.BucketName, 0, 0, connectionBufferSize, connectionTimeout)
if err != nil {
logger.Log.Error("error while connect to source bucket, err: %v", err)
return err
Expand All @@ -277,10 +277,10 @@ func (s *client) Connect() error {
if couchbaseMetadataConfig.Bucket == s.config.BucketName {
s.metaAgent = agent
} else {
// when u set maxQueueSize to 0, gocbcore will be use default value
metaAgent, err := s.connect(
couchbaseMetadataConfig.Bucket,
0,
0,
couchbaseMetadataConfig.ConnectionBufferSize,
couchbaseMetadataConfig.ConnectionTimeout,
)
Expand Down Expand Up @@ -427,7 +427,7 @@ func (s *client) DcpClose() {
logger.Log.Info("dcp connection closed %s", s.config.Hosts)
}

func (s *client) GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
func (s *client) GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error) {
snapshot, err := s.GetDcpAgentConfigSnapshot()
if err != nil {
return nil, err
Expand All @@ -442,7 +442,7 @@ func (s *client) GetVBucketSeqNos() (*wrapper.ConcurrentSwissMap[uint16, uint64]

seqNos := wrapper.CreateConcurrentSwissMap[uint16, uint64](1024)

hasCollectionSupport := s.dcpAgent.HasCollectionsSupport()
hasCollectionSupport := awareCollection && s.dcpAgent.HasCollectionsSupport()

cIds := s.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames)
collectionIDs := make([]uint32, 0, len(cIds))
Expand Down Expand Up @@ -564,15 +564,15 @@ func (s *client) openStreamWithRollback(vbID uint16,

failOverLogs, err := s.GetFailOverLogs(vbID)
if err != nil {
logger.Log.Error("error while get failOver logs, err: %v", err)
logger.Log.Error("error while get failOver logs when rollback, err: %v", err)
return err
}

var targetUUID gocbcore.VbUUID = 0

for i := len(failOverLogs) - 1; i >= 0; i-- {
log := failOverLogs[i]
if rollbackSeqNo > log.SeqNo {
if rollbackSeqNo >= log.SeqNo {
targetUUID = log.VbUUID
}
}
Expand Down
2 changes: 1 addition & 1 deletion metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
return
}

seqNoMap, err := s.client.GetVBucketSeqNos()
seqNoMap, err := s.client.GetVBucketSeqNos(true)

observers.Range(func(vbID uint16, observer couchbase.Observer) bool {
ch <- prometheus.MustNewConstMetric(
Expand Down
10 changes: 8 additions & 2 deletions stream/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
panic(err)
}

seqNoMap, err := s.client.GetVBucketSeqNos()
seqNoMap, err := s.client.GetVBucketSeqNos(false)
if err != nil {
logger.Log.Error("error while getting vBucket seqNos, err: %v", err)
panic(err)
Expand All @@ -144,12 +144,18 @@ func (s *checkpoint) Load() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset]
anyDirtyOffset = true
}

failOverLogs, err := s.client.GetFailOverLogs(vbID)
if err != nil {
logger.Log.Error("error while get failOver logs when initialize latest, err: %v", err)
panic(err)
}

offsets.Store(vbID, &models.Offset{
SnapshotMarker: &models.SnapshotMarker{
StartSeqNo: currentSeqNo,
EndSeqNo: currentSeqNo,
},
VbUUID: gocbcore.VbUUID(doc.Checkpoint.VbUUID),
VbUUID: failOverLogs[0].VbUUID,
SeqNo: currentSeqNo,
})

Expand Down

0 comments on commit 9831b5d

Please sign in to comment.