From b5daccc2c60a3d5c6dc00d93f2ba90a95478bf24 Mon Sep 17 00:00:00 2001 From: Eray Arslan Date: Thu, 30 Nov 2023 22:48:33 +0300 Subject: [PATCH] fix: i dont know how can explain this --- config/dcp.go | 4 ---- couchbase/client.go | 19 ++++++++++--------- dcp_test.go | 2 ++ stream/stream.go | 4 +++- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/config/dcp.go b/config/dcp.go index a0069f8..e014aca 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -133,10 +133,6 @@ type Dcp struct { Debug bool `yaml:"debug"` } -func (c *Dcp) IsCollectionModeEnabled() bool { - return !(c.ScopeName == DefaultScopeName && len(c.CollectionNames) == 1 && c.CollectionNames[0] == DefaultCollectionName) -} - func (c *Dcp) IsCouchbaseMetadata() bool { return c.Metadata.Type == MetadataTypeCouchbase } diff --git a/couchbase/client.go b/couchbase/client.go index 6d15efc..18c96d5 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -292,17 +292,14 @@ func (s *client) DcpConnect() error { BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize), UseExpiryOpcode: true, }, + IoConfig: gocbcore.IoConfig{ + UseCollections: true, + }, KVConfig: gocbcore.KVConfig{ ConnectionBufferSize: uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)), }, } - if s.config.IsCollectionModeEnabled() { - agentConfig.IoConfig = gocbcore.IoConfig{ - UseCollections: true, - } - } - client, err := gocbcore.CreateDcpAgent( agentConfig, fmt.Sprintf("%s_%s", s.config.Dcp.Group.Name, uuid.New().String()), @@ -483,7 +480,9 @@ func (s *client) OpenStream( openStreamOptions := gocbcore.OpenStreamOptions{} - if collectionIDs != nil && s.dcpAgent.HasCollectionsSupport() { + if s.dcpAgent.HasCollectionsSupport() { + openStreamOptions.ManifestOptions = &gocbcore.OpenStreamManifestOptions{ManifestUID: 0} + options := &gocbcore.OpenStreamFilterOptions{ CollectionIDs: []uint32{}, } @@ -492,7 +491,9 @@ func (s *client) OpenStream( options.CollectionIDs = append(options.CollectionIDs, id) } - openStreamOptions.FilterOptions = options + if len(options.CollectionIDs) > 0 { + openStreamOptions.FilterOptions = options + } } ch := make(chan error) @@ -589,7 +590,7 @@ func (s *client) getCollectionID(scopeName string, collectionName string) (uint3 func (s *client) GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string { collectionIDs := map[uint32]string{} - if s.config.IsCollectionModeEnabled() { + if s.dcpAgent.HasCollectionsSupport() { for _, collectionName := range collectionNames { collectionID, err := s.getCollectionID(scopeName, collectionName) if err != nil { diff --git a/dcp_test.go b/dcp_test.go index 9e3dc80..ed0fc39 100644 --- a/dcp_test.go +++ b/dcp_test.go @@ -157,6 +157,8 @@ func BenchmarkDcp(b *testing.B) { counter := 0 finish := make(chan struct{}, 1) + c.ApplyDefaults() + dcp, err := NewDcp(c, func(ctx *models.ListenerContext) { if _, ok := ctx.Event.(models.DcpMutation); ok { if counter == 0 { diff --git a/stream/stream.go b/stream/stream.go index d4bd7cd..21c9469 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -142,7 +142,9 @@ func (s *stream) listenEnd() { logger.Log.Debug("end stream vbId: %v", endContext.Event.VbID) } - if !s.closeWithCancel && endContext.Err != nil && errors.Is(endContext.Err, gocbcore.ErrSocketClosed) { + if !s.closeWithCancel && endContext.Err != nil && + (errors.Is(endContext.Err, gocbcore.ErrSocketClosed) || + errors.Is(endContext.Err, gocbcore.ErrDCPBackfillFailed)) { s.reopenStream(endContext.Event.VbID) } else { s.activeStreams--