diff --git a/config/dcp.go b/config/dcp.go index 5678338..8fc3771 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -375,11 +375,11 @@ func (c *Dcp) applyDefaultRollbackMitigation() { func (c *Dcp) applyDefaultCheckpoint() { if c.Checkpoint.Interval == 0 { - c.Checkpoint.Interval = 20 * time.Second + c.Checkpoint.Interval = 30 * time.Second } if c.Checkpoint.Timeout == 0 { - c.Checkpoint.Timeout = 60 * time.Second + c.Checkpoint.Timeout = 10 * time.Second } if c.Checkpoint.Type == "" { diff --git a/config/dcp_test.go b/config/dcp_test.go index 94b6316..b594854 100644 --- a/config/dcp_test.go +++ b/config/dcp_test.go @@ -156,11 +156,11 @@ func TestDcpApplyDefaultCheckpoint(t *testing.T) { c := &Dcp{} c.applyDefaultCheckpoint() - if c.Checkpoint.Interval != 20*time.Second { + if c.Checkpoint.Interval != 30*time.Second { t.Errorf("Checkpoint.Interval is not set to expected value") } - if c.Checkpoint.Timeout != 60*time.Second { + if c.Checkpoint.Timeout != 10*time.Second { t.Errorf("Checkpoint.Timeout is not set to expected value") } diff --git a/couchbase/healthcheck.go b/couchbase/healthcheck.go index f869160..33a3a28 100644 --- a/couchbase/healthcheck.go +++ b/couchbase/healthcheck.go @@ -8,7 +8,7 @@ import ( ) type HealthCheck interface { - Start(ch chan struct{}) + Start() Stop() } @@ -18,16 +18,14 @@ type healthCheck struct { client Client } -func (h *healthCheck) Start(ch chan struct{}) { +func (h *healthCheck) Start() { h.ticker = time.NewTicker(h.config.Interval) go func() { for range h.ticker.C { if _, err := h.client.Ping(); err != nil { - logger.Log.Error("health check failed: %v", err) - h.ticker.Stop() - ch <- struct{}{} - break + logger.Log.Error("error while health check: %v", err) + panic(err) } } }() diff --git a/dcp.go b/dcp.go index 761c846..359f782 100644 --- a/dcp.go +++ b/dcp.go @@ -46,27 +46,26 @@ type Dcp interface { } type dcp struct { - bus EventBus.Bus - stream stream.Stream - api api.API - leaderElection stream.LeaderElection - vBucketDiscovery stream.VBucketDiscovery - serviceDiscovery servicediscovery.ServiceDiscovery - metadata metadata.Metadata - eventHandler models.EventHandler - client couchbase.Client - apiShutdown chan struct{} - healCheckFailedCh chan struct{} - config *config.Dcp - version *couchbase.Version - bucketInfo *couchbase.BucketInfo - healthCheck couchbase.HealthCheck - listener models.Listener - readyCh chan struct{} - cancelCh chan os.Signal - stopCh chan struct{} - metricCollectors []prometheus.Collector - closeWithCancel bool + bus EventBus.Bus + stream stream.Stream + api api.API + leaderElection stream.LeaderElection + vBucketDiscovery stream.VBucketDiscovery + serviceDiscovery servicediscovery.ServiceDiscovery + metadata metadata.Metadata + eventHandler models.EventHandler + client couchbase.Client + apiShutdown chan struct{} + config *config.Dcp + version *couchbase.Version + bucketInfo *couchbase.BucketInfo + healthCheck couchbase.HealthCheck + listener models.Listener + readyCh chan struct{} + cancelCh chan os.Signal + stopCh chan struct{} + metricCollectors []prometheus.Collector + closeWithCancel bool } func (s *dcp) SetMetadata(metadata metadata.Metadata) { @@ -149,7 +148,7 @@ func (s *dcp) Start() { if !s.config.HealthCheck.Disabled { s.healthCheck = couchbase.NewHealthCheck(&s.config.HealthCheck, s.client) - s.healthCheck.Start(s.healCheckFailedCh) + s.healthCheck.Start() } logger.Log.Info("dcp stream started") @@ -158,12 +157,10 @@ func (s *dcp) Start() { select { case <-s.stopCh: - logger.Log.Info("stop channel triggered") + logger.Log.Debug("stop channel triggered") case <-s.cancelCh: - logger.Log.Info("cancel channel triggered") + logger.Log.Debug("cancel channel triggered") s.closeWithCancel = true - case <-s.healCheckFailedCh: - logger.Log.Info("health check channel triggered") } } @@ -266,19 +263,18 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) { } return &dcp{ - client: client, - listener: listener, - config: config, - version: version, - bucketInfo: bucketInfo, - apiShutdown: make(chan struct{}, 1), - cancelCh: make(chan os.Signal, 1), - stopCh: make(chan struct{}, 1), - healCheckFailedCh: make(chan struct{}, 1), - readyCh: make(chan struct{}, 1), - metricCollectors: []prometheus.Collector{}, - eventHandler: models.DefaultEventHandler, - bus: EventBus.New(), + client: client, + listener: listener, + config: config, + version: version, + bucketInfo: bucketInfo, + apiShutdown: make(chan struct{}, 1), + cancelCh: make(chan os.Signal, 1), + stopCh: make(chan struct{}, 1), + readyCh: make(chan struct{}, 1), + metricCollectors: []prometheus.Collector{}, + eventHandler: models.DefaultEventHandler, + bus: EventBus.New(), }, nil } diff --git a/example/go.mod b/example/go.mod index d6c439f..902333a 100644 --- a/example/go.mod +++ b/example/go.mod @@ -12,7 +12,7 @@ require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/couchbase/gocbcore/v10 v10.3.2 // indirect + github.com/couchbase/gocbcore/v10 v10.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.3 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/example/go.sum b/example/go.sum index 89100e5..a87a119 100644 --- a/example/go.sum +++ b/example/go.sum @@ -15,8 +15,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZdaCyB6Is= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= -github.com/couchbase/gocbcore/v10 v10.3.2 h1:Z1jIJSj2tCqQhd+Uwa4gKqHbCVJ8Mizw6zxX6XsAjPA= -github.com/couchbase/gocbcore/v10 v10.3.2/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= +github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE= +github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1 h1:H7OK4q4WsDxqNIB/Ba8BQBXBHFilZnyItHrLr3qmsKA= github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= diff --git a/go.mod b/go.mod index 2a9a5d0..719e55d 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ retract ( require ( github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef - github.com/couchbase/gocbcore/v10 v10.3.2 + github.com/couchbase/gocbcore/v10 v10.4.0 github.com/gofiber/fiber/v2 v2.52.2 github.com/google/uuid v1.6.0 github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index b1492f9..bc48a44 100644 --- a/go.sum +++ b/go.sum @@ -23,8 +23,8 @@ github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZd github.com/containerd/containerd v1.7.13/go.mod h1:zT3up6yTRfEUa6+GsITYIJNgSVL9NQ4x4h1RPzk0Wu4= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= -github.com/couchbase/gocbcore/v10 v10.3.2 h1:Z1jIJSj2tCqQhd+Uwa4gKqHbCVJ8Mizw6zxX6XsAjPA= -github.com/couchbase/gocbcore/v10 v10.3.2/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= +github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE= +github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1 h1:H7OK4q4WsDxqNIB/Ba8BQBXBHFilZnyItHrLr3qmsKA= github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= diff --git a/metric/collector.go b/metric/collector.go index 47f0c3d..1995977 100644 --- a/metric/collector.go +++ b/metric/collector.go @@ -146,20 +146,20 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { strconv.Itoa(int(vbID)), ) - var lag float64 - - seqNo, _ := seqNoMap.Load(vbID) - - if seqNo > offset.SeqNo { - lag = float64(seqNo - offset.SeqNo) - } - if err != nil { ch <- prometheus.NewInvalidMetric( s.lag, err, ) } else { + var lag float64 + + seqNo, _ := seqNoMap.Load(vbID) + + if seqNo > offset.SeqNo { + lag = float64(seqNo - offset.SeqNo) + } + totalLag += lag ch <- prometheus.MustNewConstMetric( diff --git a/stream/stream.go b/stream/stream.go index d201ba1..3f744cb 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -151,8 +151,8 @@ func (s *stream) reopenStream(vbID uint16) { retry-- if retry == 0 { - s.decreaseStream() - break + logger.Log.Error("error while re-open stream, vbID: %d, err: give up after few retry", innerVbID) + panic(err) } time.Sleep(time.Second * 5) @@ -160,13 +160,6 @@ func (s *stream) reopenStream(vbID uint16) { }(vbID) } -func (s *stream) decreaseStream() { - s.activeStreams-- - if s.activeStreams == 0 && !s.streamFinishedWithCloseCh { - s.finishStreamWithEndEventCh <- struct{}{} - } -} - func (s *stream) listenEnd() { for endContext := range s.observer.ListenEnd() { if !s.closeWithCancel && endContext.Err != nil { @@ -189,7 +182,10 @@ func (s *stream) listenEnd() { errors.Is(endContext.Err, gocbcore.ErrDCPStreamDisconnected)) { s.reopenStream(endContext.Event.VbID) } else { - s.decreaseStream() + s.activeStreams-- + if s.activeStreams == 0 && !s.streamFinishedWithCloseCh { + s.finishStreamWithEndEventCh <- struct{}{} + } } } } diff --git a/test/integration/go.mod b/test/integration/go.mod index 5dd2033..59f86e5 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -12,7 +12,7 @@ require ( github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/couchbase/gocbcore/v10 v10.3.2 // indirect + github.com/couchbase/gocbcore/v10 v10.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.3 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 89100e5..a87a119 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -15,8 +15,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZdaCyB6Is= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= -github.com/couchbase/gocbcore/v10 v10.3.2 h1:Z1jIJSj2tCqQhd+Uwa4gKqHbCVJ8Mizw6zxX6XsAjPA= -github.com/couchbase/gocbcore/v10 v10.3.2/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= +github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE= +github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1 h1:H7OK4q4WsDxqNIB/Ba8BQBXBHFilZnyItHrLr3qmsKA= github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=