Skip to content

Commit

Permalink
fix: fast fail when couchbase connection unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 30, 2024
1 parent 991c107 commit 588e704
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 76 deletions.
4 changes: 2 additions & 2 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func (c *Dcp) applyDefaultRollbackMitigation() {

func (c *Dcp) applyDefaultCheckpoint() {
if c.Checkpoint.Interval == 0 {
c.Checkpoint.Interval = 20 * time.Second
c.Checkpoint.Interval = 30 * time.Second
}

if c.Checkpoint.Timeout == 0 {
c.Checkpoint.Timeout = 60 * time.Second
c.Checkpoint.Timeout = 10 * time.Second
}

if c.Checkpoint.Type == "" {
Expand Down
4 changes: 2 additions & 2 deletions config/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func TestDcpApplyDefaultCheckpoint(t *testing.T) {
c := &Dcp{}
c.applyDefaultCheckpoint()

if c.Checkpoint.Interval != 20*time.Second {
if c.Checkpoint.Interval != 30*time.Second {
t.Errorf("Checkpoint.Interval is not set to expected value")
}

if c.Checkpoint.Timeout != 60*time.Second {
if c.Checkpoint.Timeout != 10*time.Second {
t.Errorf("Checkpoint.Timeout is not set to expected value")
}

Expand Down
10 changes: 4 additions & 6 deletions couchbase/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type HealthCheck interface {
Start(ch chan struct{})
Start()
Stop()
}

Expand All @@ -18,16 +18,14 @@ type healthCheck struct {
client Client
}

func (h *healthCheck) Start(ch chan struct{}) {
func (h *healthCheck) Start() {
h.ticker = time.NewTicker(h.config.Interval)

go func() {
for range h.ticker.C {
if _, err := h.client.Ping(); err != nil {
logger.Log.Error("health check failed: %v", err)
h.ticker.Stop()
ch <- struct{}{}
break
logger.Log.Error("error while health check: %v", err)
panic(err)
}
}
}()
Expand Down
74 changes: 35 additions & 39 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,26 @@ type Dcp interface {
}

type dcp struct {
bus EventBus.Bus
stream stream.Stream
api api.API
leaderElection stream.LeaderElection
vBucketDiscovery stream.VBucketDiscovery
serviceDiscovery servicediscovery.ServiceDiscovery
metadata metadata.Metadata
eventHandler models.EventHandler
client couchbase.Client
apiShutdown chan struct{}
healCheckFailedCh chan struct{}
config *config.Dcp
version *couchbase.Version
bucketInfo *couchbase.BucketInfo
healthCheck couchbase.HealthCheck
listener models.Listener
readyCh chan struct{}
cancelCh chan os.Signal
stopCh chan struct{}
metricCollectors []prometheus.Collector
closeWithCancel bool
bus EventBus.Bus
stream stream.Stream
api api.API
leaderElection stream.LeaderElection
vBucketDiscovery stream.VBucketDiscovery
serviceDiscovery servicediscovery.ServiceDiscovery
metadata metadata.Metadata
eventHandler models.EventHandler
client couchbase.Client
apiShutdown chan struct{}
config *config.Dcp
version *couchbase.Version
bucketInfo *couchbase.BucketInfo
healthCheck couchbase.HealthCheck
listener models.Listener
readyCh chan struct{}
cancelCh chan os.Signal
stopCh chan struct{}
metricCollectors []prometheus.Collector
closeWithCancel bool
}

func (s *dcp) SetMetadata(metadata metadata.Metadata) {
Expand Down Expand Up @@ -149,7 +148,7 @@ func (s *dcp) Start() {

if !s.config.HealthCheck.Disabled {
s.healthCheck = couchbase.NewHealthCheck(&s.config.HealthCheck, s.client)
s.healthCheck.Start(s.healCheckFailedCh)
s.healthCheck.Start()
}

logger.Log.Info("dcp stream started")
Expand All @@ -158,12 +157,10 @@ func (s *dcp) Start() {

select {
case <-s.stopCh:
logger.Log.Info("stop channel triggered")
logger.Log.Debug("stop channel triggered")
case <-s.cancelCh:
logger.Log.Info("cancel channel triggered")
logger.Log.Debug("cancel channel triggered")
s.closeWithCancel = true
case <-s.healCheckFailedCh:
logger.Log.Info("health check channel triggered")
}
}

Expand Down Expand Up @@ -266,19 +263,18 @@ func newDcp(config *config.Dcp, listener models.Listener) (Dcp, error) {
}

return &dcp{
client: client,
listener: listener,
config: config,
version: version,
bucketInfo: bucketInfo,
apiShutdown: make(chan struct{}, 1),
cancelCh: make(chan os.Signal, 1),
stopCh: make(chan struct{}, 1),
healCheckFailedCh: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
metricCollectors: []prometheus.Collector{},
eventHandler: models.DefaultEventHandler,
bus: EventBus.New(),
client: client,
listener: listener,
config: config,
version: version,
bucketInfo: bucketInfo,
apiShutdown: make(chan struct{}, 1),
cancelCh: make(chan os.Signal, 1),
stopCh: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
metricCollectors: []prometheus.Collector{},
eventHandler: models.DefaultEventHandler,
bus: EventBus.New(),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/couchbase/gocbcore/v10 v10.3.2 // indirect
github.com/couchbase/gocbcore/v10 v10.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZdaCyB6Is=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/couchbase/gocbcore/v10 v10.3.2 h1:Z1jIJSj2tCqQhd+Uwa4gKqHbCVJ8Mizw6zxX6XsAjPA=
github.com/couchbase/gocbcore/v10 v10.3.2/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM=
github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE=
github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM=
github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1 h1:H7OK4q4WsDxqNIB/Ba8BQBXBHFilZnyItHrLr3qmsKA=
github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY=
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ retract (
require (
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef
github.com/couchbase/gocbcore/v10 v10.3.2
github.com/couchbase/gocbcore/v10 v10.4.0
github.com/gofiber/fiber/v2 v2.52.2
github.com/google/uuid v1.6.0
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZd
github.com/containerd/containerd v1.7.13/go.mod h1:zT3up6yTRfEUa6+GsITYIJNgSVL9NQ4x4h1RPzk0Wu4=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/couchbase/gocbcore/v10 v10.3.2 h1:Z1jIJSj2tCqQhd+Uwa4gKqHbCVJ8Mizw6zxX6XsAjPA=
github.com/couchbase/gocbcore/v10 v10.3.2/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM=
github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE=
github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM=
github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1 h1:H7OK4q4WsDxqNIB/Ba8BQBXBHFilZnyItHrLr3qmsKA=
github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY=
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
Expand Down
16 changes: 8 additions & 8 deletions metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,20 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
strconv.Itoa(int(vbID)),
)

var lag float64

seqNo, _ := seqNoMap.Load(vbID)

if seqNo > offset.SeqNo {
lag = float64(seqNo - offset.SeqNo)
}

if err != nil {
ch <- prometheus.NewInvalidMetric(
s.lag,
err,
)
} else {
var lag float64

seqNo, _ := seqNoMap.Load(vbID)

if seqNo > offset.SeqNo {
lag = float64(seqNo - offset.SeqNo)
}

totalLag += lag

ch <- prometheus.MustNewConstMetric(
Expand Down
16 changes: 6 additions & 10 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,15 @@ func (s *stream) reopenStream(vbID uint16) {

retry--
if retry == 0 {
s.decreaseStream()
break
logger.Log.Error("error while re-open stream, vbID: %d, err: give up after few retry", innerVbID)
panic(err)
}

time.Sleep(time.Second * 5)
}
}(vbID)
}

func (s *stream) decreaseStream() {
s.activeStreams--
if s.activeStreams == 0 && !s.streamFinishedWithCloseCh {
s.finishStreamWithEndEventCh <- struct{}{}
}
}

func (s *stream) listenEnd() {
for endContext := range s.observer.ListenEnd() {
if !s.closeWithCancel && endContext.Err != nil {
Expand All @@ -189,7 +182,10 @@ func (s *stream) listenEnd() {
errors.Is(endContext.Err, gocbcore.ErrDCPStreamDisconnected)) {
s.reopenStream(endContext.Event.VbID)
} else {
s.decreaseStream()
s.activeStreams--
if s.activeStreams == 0 && !s.streamFinishedWithCloseCh {
s.finishStreamWithEndEventCh <- struct{}{}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/couchbase/gocbcore/v10 v10.3.2 // indirect
github.com/couchbase/gocbcore/v10 v10.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/containerd v1.7.13 h1:wPYKIeGMN8vaggSKuV1X0wZulpMz4CrgEsZdaCyB6Is=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/couchbase/gocbcore/v10 v10.3.2 h1:Z1jIJSj2tCqQhd+Uwa4gKqHbCVJ8Mizw6zxX6XsAjPA=
github.com/couchbase/gocbcore/v10 v10.3.2/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM=
github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE=
github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM=
github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1 h1:H7OK4q4WsDxqNIB/Ba8BQBXBHFilZnyItHrLr3qmsKA=
github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY=
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
Expand Down

0 comments on commit 588e704

Please sign in to comment.