Skip to content

Commit

Permalink
fix: i dont know how can explain this
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 30, 2023
1 parent fe6d739 commit b5daccc
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
4 changes: 0 additions & 4 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ type Dcp struct {
Debug bool `yaml:"debug"`
}

func (c *Dcp) IsCollectionModeEnabled() bool {
return !(c.ScopeName == DefaultScopeName && len(c.CollectionNames) == 1 && c.CollectionNames[0] == DefaultCollectionName)
}

func (c *Dcp) IsCouchbaseMetadata() bool {
return c.Metadata.Type == MetadataTypeCouchbase
}
Expand Down
19 changes: 10 additions & 9 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,14 @@ func (s *client) DcpConnect() error {
BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize),
UseExpiryOpcode: true,
},
IoConfig: gocbcore.IoConfig{
UseCollections: true,
},
KVConfig: gocbcore.KVConfig{
ConnectionBufferSize: uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)),
},
}

if s.config.IsCollectionModeEnabled() {
agentConfig.IoConfig = gocbcore.IoConfig{
UseCollections: true,
}
}

client, err := gocbcore.CreateDcpAgent(
agentConfig,
fmt.Sprintf("%s_%s", s.config.Dcp.Group.Name, uuid.New().String()),
Expand Down Expand Up @@ -483,7 +480,9 @@ func (s *client) OpenStream(

openStreamOptions := gocbcore.OpenStreamOptions{}

if collectionIDs != nil && s.dcpAgent.HasCollectionsSupport() {
if s.dcpAgent.HasCollectionsSupport() {
openStreamOptions.ManifestOptions = &gocbcore.OpenStreamManifestOptions{ManifestUID: 0}

options := &gocbcore.OpenStreamFilterOptions{
CollectionIDs: []uint32{},
}
Expand All @@ -492,7 +491,9 @@ func (s *client) OpenStream(
options.CollectionIDs = append(options.CollectionIDs, id)
}

openStreamOptions.FilterOptions = options
if len(options.CollectionIDs) > 0 {
openStreamOptions.FilterOptions = options
}
}

ch := make(chan error)
Expand Down Expand Up @@ -589,7 +590,7 @@ func (s *client) getCollectionID(scopeName string, collectionName string) (uint3
func (s *client) GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string {
collectionIDs := map[uint32]string{}

if s.config.IsCollectionModeEnabled() {
if s.dcpAgent.HasCollectionsSupport() {
for _, collectionName := range collectionNames {
collectionID, err := s.getCollectionID(scopeName, collectionName)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ func BenchmarkDcp(b *testing.B) {
counter := 0
finish := make(chan struct{}, 1)

c.ApplyDefaults()

dcp, err := NewDcp(c, func(ctx *models.ListenerContext) {
if _, ok := ctx.Event.(models.DcpMutation); ok {
if counter == 0 {
Expand Down
4 changes: 3 additions & 1 deletion stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ func (s *stream) listenEnd() {
logger.Log.Debug("end stream vbId: %v", endContext.Event.VbID)
}

if !s.closeWithCancel && endContext.Err != nil && errors.Is(endContext.Err, gocbcore.ErrSocketClosed) {
if !s.closeWithCancel && endContext.Err != nil &&
(errors.Is(endContext.Err, gocbcore.ErrSocketClosed) ||
errors.Is(endContext.Err, gocbcore.ErrDCPBackfillFailed)) {
s.reopenStream(endContext.Event.VbID)
} else {
s.activeStreams--
Expand Down

0 comments on commit b5daccc

Please sign in to comment.