diff --git a/dcp.go b/dcp.go index 1002832..4b38607 100644 --- a/dcp.go +++ b/dcp.go @@ -166,6 +166,8 @@ func (s *dcp) Start() { logger.Log.Debug("cancel channel triggered") s.closeWithCancel = true } + + s.close() } func (s *dcp) GetClient() couchbase.Client { @@ -177,6 +179,10 @@ func (s *dcp) WaitUntilReady() chan struct{} { } func (s *dcp) Close() { + s.cancelCh <- syscall.SIGTERM +} + +func (s *dcp) close() { if !s.config.HealthCheck.Disabled { s.healthCheck.Stop() } diff --git a/logger/logger.go b/logger/logger.go index 3c3c40c..8265817 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -106,6 +106,7 @@ func (l *goCbCoreLogger) Log(level gocbcore.LogLevel, _ int, format string, v .. if level > l.level { return nil } - Log.Log(coreToDcp[level], format, v...) + msg := fmt.Sprintf(format, v...) + Log.Log(coreToDcp[level], "gocbcore - %s", msg) return nil } diff --git a/stream/stream.go b/stream/stream.go index 266e03d..352c336 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -54,22 +54,22 @@ type stream struct { vBucketDiscovery VBucketDiscovery bus EventBus.Bus eventHandler models.EventHandler - config *config.Dcp - metric *Metric + bucketInfo *couchbase.BucketInfo + observers *wrapper.ConcurrentSwissMap[uint16, couchbase.Observer] rebalanceTimer *time.Timer vbIDRange *models.VbIDRange dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool] stopCh chan struct{} listener models.Listener - version *couchbase.Version - bucketInfo *couchbase.BucketInfo + config *config.Dcp finishStreamWithEndEventCh chan struct{} finishStreamWithCloseCh chan struct{} offsets *wrapper.ConcurrentSwissMap[uint16, *models.Offset] - observers *wrapper.ConcurrentSwissMap[uint16, couchbase.Observer] + metric *Metric collectionIDs map[uint32]string - activeStreams atomic.Int32 + streamEndNotSupportedData *streamEndNotSupportedData rebalanceLock sync.Mutex + activeStreams atomic.Int32 streamFinishedWithCloseCh bool streamFinishedWithEndEventCh bool anyDirtyOffset bool @@ -78,6 +78,11 @@ type stream struct { open bool } +type streamEndNotSupportedData struct { + queue chan struct{} + ending bool +} + func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) { if s.vbIDRange.In(vbID) { if current, ok := s.offsets.Load(vbID); ok && current.SeqNo > offset.SeqNo { @@ -173,6 +178,10 @@ func (s *stream) reopenStream(vbID uint16) { } func (s *stream) listenEnd(endContext models.DcpStreamEndContext) { + if s.streamEndNotSupportedData != nil && s.streamEndNotSupportedData.ending { + <-s.streamEndNotSupportedData.queue + } + if !s.closeWithCancel && endContext.Err != nil { if !errors.Is(endContext.Err, gocbcore.ErrDCPStreamClosed) { logger.Log.Error("end stream vbID: %v got error: %v", endContext.Event.VbID, endContext.Err) @@ -330,28 +339,39 @@ func (s *stream) openAllStreams(vbIDs []uint16) { openWg.Wait() } -func (s *stream) closeAllStreams(internal bool) { - var wg sync.WaitGroup - wg.Add(s.offsets.Count()) - - s.offsets.Range(func(vbID uint16, _ *models.Offset) bool { - go func(vbID uint16) { - defer wg.Done() - if internal { - // todo: this is not a good way to close stream - observer, _ := s.observers.Load(vbID) - observer.End(models.DcpStreamEnd{VbID: vbID}, nil) - } else { - err := s.client.CloseStream(vbID) - if err != nil { +func (s *stream) closeAllStreams() { + // We need to do this without async when couchbase version below v5.5.0. + // Because "gocbcore - memdopmap.go - FindOpenStream" is not thread safe. + // BTW We cannot use ConcurrentSwissMap either. You know it's concurrent :/ + if s.streamEndNotSupportedData != nil { + s.streamEndNotSupportedData.ending = true + for vbID := s.vbIDRange.Start; vbID <= s.vbIDRange.End; vbID++ { + s.streamEndNotSupportedData.queue <- struct{}{} + if err := s.client.CloseStream(vbID); err != nil { + logger.Log.Error( + "cannot close stream on (stream end not supporting) mode, vbID: %d, err: %v", + vbID, err, + ) + } + } + s.streamEndNotSupportedData.ending = false + } else { + var wg sync.WaitGroup + wg.Add(s.offsets.Count()) + s.offsets.Range(func(vbID uint16, _ *models.Offset) bool { + go func(vbID uint16) { + if err := s.client.CloseStream(vbID); err != nil { logger.Log.Error("cannot close stream, vbID: %d, err: %v", vbID, err) } - } - }(vbID) - return true - }) - wg.Wait() + wg.Done() + }(vbID) + + return true + }) + + wg.Wait() + } } func (s *stream) wait() { @@ -385,8 +405,7 @@ func (s *stream) Close(closeWithCancel bool) { s.checkpoint.StopSchedule() } - disableStreamEndByClient := s.version.Lower(couchbase.SrvVer550) - s.closeAllStreams(disableStreamEndByClient) + s.closeAllStreams() s.observers.Range(func(_ uint16, observer couchbase.Observer) bool { observer.CloseEnd() @@ -439,12 +458,11 @@ func NewStream(client couchbase.Client, bus EventBus.Bus, eventHandler models.EventHandler, ) Stream { - return &stream{ + stream := &stream{ client: client, metadata: metadata, listener: listener, config: config, - version: version, bucketInfo: bucketInfo, vBucketDiscovery: vBucketDiscovery, collectionIDs: collectionIDs, @@ -455,4 +473,13 @@ func NewStream(client couchbase.Client, eventHandler: eventHandler, metric: &Metric{}, } + + if version.Lower(couchbase.SrvVer550) { + stream.streamEndNotSupportedData = &streamEndNotSupportedData{ + ending: false, + queue: make(chan struct{}, 1), + } + } + + return stream }