Skip to content

Commit

Permalink
feat: persist seq no metric added
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Oct 27, 2023
1 parent 43996cb commit 7aec114
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 30 deletions.
57 changes: 29 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,42 @@ This repository contains go implementation of a Couchbase Database Change Protoc
package main

import (
"github.com/Trendyol/go-dcp"
"github.com/Trendyol/go-dcp/logger"
"github.com/Trendyol/go-dcp/models"
"github.com/Trendyol/go-dcp"
"github.com/Trendyol/go-dcp/logger"
"github.com/Trendyol/go-dcp/models"
)

func listener(ctx *models.ListenerContext) {
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Info(
"mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
)
case models.DcpDeletion:
logger.Log.Info(
"deleted(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
case models.DcpExpiration:
logger.Log.Info(
"expired(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
}

ctx.Ack()
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Info(
"mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
)
case models.DcpDeletion:
logger.Log.Info(
"deleted(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
case models.DcpExpiration:
logger.Log.Info(
"expired(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
}

ctx.Ack()
}

func main() {
connector, err := dcp.NewDcp("config.yml", listener)
if err != nil {
panic(err)
}
connector, err := dcp.NewDcp("config.yml", listener)
if err != nil {
panic(err)
}

defer connector.Close()
defer connector.Close()

connector.Start()
connector.Start()
}
```

Expand Down Expand Up @@ -154,6 +154,7 @@ You can adjust the average window time for the metrics by specifying the value o
| cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge |
| cbgo_process_latency_ms_current | The average process latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge |
| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter |
Expand Down
7 changes: 7 additions & 0 deletions couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Observer interface {
OSOSnapshot(snapshot models.DcpOSOSnapshot)
SeqNoAdvanced(advanced gocbcore.DcpSeqNoAdvanced)
GetMetrics() *wrapper.ConcurrentSwissMap[uint16, *ObserverMetric]
GetPersistSeqNo() *wrapper.ConcurrentSwissMap[uint16, gocbcore.SeqNo]
Listen() models.ListenerCh
Close()
CloseEnd()
Expand Down Expand Up @@ -87,6 +88,8 @@ func (so *observer) persistSeqNoChangedListener(event interface{}) {
if persistSeqNo.SeqNo > currentPersistSeqNo {
so.persistSeqNo.Store(persistSeqNo.VbID, persistSeqNo.SeqNo)
}
} else {
logger.Log.Debug("persistSeqNo: %v on vbId: %v", persistSeqNo.SeqNo, persistSeqNo.VbID)
}
}

Expand Down Expand Up @@ -365,6 +368,10 @@ func (so *observer) GetMetrics() *wrapper.ConcurrentSwissMap[uint16, *ObserverMe
return so.metrics
}

func (so *observer) GetPersistSeqNo() *wrapper.ConcurrentSwissMap[uint16, gocbcore.SeqNo] {
return so.persistSeqNo
}

func (so *observer) Listen() models.ListenerCh {
return so.listenerCh
}
Expand Down
10 changes: 8 additions & 2 deletions couchbase/rollback_mitigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (r *rollbackMitigation) getMinSeqNo(vbID uint16) gocbcore.SeqNo { //nolint:
}

if vbUUID != replica.vbUUID {
logger.Log.Debug("vbUUID mismatch %v != %v", vbID, replica.vbUUID)
return 0
}

Expand Down Expand Up @@ -256,14 +257,17 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU

if err != nil {
if errors.Is(err, gocbcore.ErrTemporaryFailure) {
// skip
logger.Log.Error("error while observe: %v", err)
return
} else {
panic(err)
}
}

replicas, _ := r.persistedSeqNos.Load(vbID)
replicas, ok := r.persistedSeqNos.Load(vbID)
if !ok {
logger.Log.Error("replicas of vbID: %v not found", vbID)
}

if len(replicas) > replica {
replicas[replica].SetSeqNo(result.PersistSeqNo)
Expand All @@ -273,6 +277,8 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU
VbID: vbID,
SeqNo: r.getMinSeqNo(vbID),
})
} else {
logger.Log.Error("replica: %v not found", replica)
}
})
}
Expand Down
19 changes: 19 additions & 0 deletions metric/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metric

import (

Check failure on line 3 in metric/collector.go

View workflow job for this annotation

GitHub Actions / build

File is not `gofumpt`-ed (gofumpt)
"github.com/couchbase/gocbcore/v10"
"strconv"

Check failure on line 5 in metric/collector.go

View workflow job for this annotation

GitHub Actions / build

File is not `gofumpt`-ed (gofumpt)

"github.com/Trendyol/go-dcp/models"
Expand All @@ -24,6 +25,7 @@ type metricCollector struct {
currentSeqNo *prometheus.Desc
startSeqNo *prometheus.Desc
endSeqNo *prometheus.Desc
persistSeqNo *prometheus.Desc

processLatency *prometheus.Desc
dcpLatency *prometheus.Desc
Expand Down Expand Up @@ -55,6 +57,17 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {

seqNoMap, err := s.client.GetVBucketSeqNos()

observer.GetPersistSeqNo().Range(func(vbID uint16, seqNo gocbcore.SeqNo) bool {
ch <- prometheus.MustNewConstMetric(
s.persistSeqNo,
prometheus.CounterValue,
float64(seqNo),
strconv.Itoa(int(vbID)),
)

return true
})

observer.GetMetrics().Range(func(vbID uint16, metric *couchbase.ObserverMetric) bool {
ch <- prometheus.MustNewConstMetric(
s.mutation,
Expand Down Expand Up @@ -254,6 +267,12 @@ func NewMetricCollector(client couchbase.Client, stream stream.Stream, vBucketDi
[]string{"vbId"},
nil,
),
persistSeqNo: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "persist_seq_no", "current"),
"Persist seq no",
[]string{"vbId"},
nil,
),
lag: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "lag", "current"),
"Lag",
Expand Down

0 comments on commit 7aec114

Please sign in to comment.