diff --git a/README.md b/README.md index f624c31..b548d2c 100644 --- a/README.md +++ b/README.md @@ -23,42 +23,42 @@ This repository contains go implementation of a Couchbase Database Change Protoc package main import ( - "github.com/Trendyol/go-dcp" - "github.com/Trendyol/go-dcp/logger" - "github.com/Trendyol/go-dcp/models" + "github.com/Trendyol/go-dcp" + "github.com/Trendyol/go-dcp/logger" + "github.com/Trendyol/go-dcp/models" ) func listener(ctx *models.ListenerContext) { - switch event := ctx.Event.(type) { - case models.DcpMutation: - logger.Log.Info( - "mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v", - event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(), - ) - case models.DcpDeletion: - logger.Log.Info( - "deleted(vb=%v,eventTime=%v) | id: %v", - event.VbID, event.EventTime, string(event.Key), - ) - case models.DcpExpiration: - logger.Log.Info( - "expired(vb=%v,eventTime=%v) | id: %v", - event.VbID, event.EventTime, string(event.Key), - ) - } - - ctx.Ack() + switch event := ctx.Event.(type) { + case models.DcpMutation: + logger.Log.Info( + "mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v", + event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(), + ) + case models.DcpDeletion: + logger.Log.Info( + "deleted(vb=%v,eventTime=%v) | id: %v", + event.VbID, event.EventTime, string(event.Key), + ) + case models.DcpExpiration: + logger.Log.Info( + "expired(vb=%v,eventTime=%v) | id: %v", + event.VbID, event.EventTime, string(event.Key), + ) + } + + ctx.Ack() } func main() { - connector, err := dcp.NewDcp("config.yml", listener) - if err != nil { - panic(err) - } + connector, err := dcp.NewDcp("config.yml", listener) + if err != nil { + panic(err) + } - defer connector.Close() + defer connector.Close() - connector.Start() + connector.Start() } ``` @@ -154,6 +154,7 @@ You can adjust the average window time for the metrics by specifying the value o | cbgo_seq_no_current | The current sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | | cbgo_start_seq_no_current | The starting sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | | cbgo_end_seq_no_current | The ending sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | +| cbgo_persist_seq_no_current | The persist sequence number on a specific vBucket | vbId: ID of the vBucket | Gauge | | cbgo_lag_current | The current lag on a specific vBucket | vbId: ID of the vBucket | Gauge | | cbgo_process_latency_ms_current | The average process latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge | | cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter | diff --git a/couchbase/observer.go b/couchbase/observer.go index eef3d15..cd48166 100644 --- a/couchbase/observer.go +++ b/couchbase/observer.go @@ -30,6 +30,7 @@ type Observer interface { OSOSnapshot(snapshot models.DcpOSOSnapshot) SeqNoAdvanced(advanced gocbcore.DcpSeqNoAdvanced) GetMetrics() *wrapper.ConcurrentSwissMap[uint16, *ObserverMetric] + GetPersistSeqNo() *wrapper.ConcurrentSwissMap[uint16, gocbcore.SeqNo] Listen() models.ListenerCh Close() CloseEnd() @@ -87,6 +88,8 @@ func (so *observer) persistSeqNoChangedListener(event interface{}) { if persistSeqNo.SeqNo > currentPersistSeqNo { so.persistSeqNo.Store(persistSeqNo.VbID, persistSeqNo.SeqNo) } + } else { + logger.Log.Debug("persistSeqNo: %v on vbId: %v", persistSeqNo.SeqNo, persistSeqNo.VbID) } } @@ -365,6 +368,10 @@ func (so *observer) GetMetrics() *wrapper.ConcurrentSwissMap[uint16, *ObserverMe return so.metrics } +func (so *observer) GetPersistSeqNo() *wrapper.ConcurrentSwissMap[uint16, gocbcore.SeqNo] { + return so.persistSeqNo +} + func (so *observer) Listen() models.ListenerCh { return so.listenerCh } diff --git a/couchbase/rollback_mitigation.go b/couchbase/rollback_mitigation.go index 96ed3f6..1a47c95 100644 --- a/couchbase/rollback_mitigation.go +++ b/couchbase/rollback_mitigation.go @@ -139,6 +139,7 @@ func (r *rollbackMitigation) getMinSeqNo(vbID uint16) gocbcore.SeqNo { //nolint: } if vbUUID != replica.vbUUID { + logger.Log.Debug("vbUUID mismatch %v != %v", vbID, replica.vbUUID) return 0 } @@ -256,14 +257,17 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU if err != nil { if errors.Is(err, gocbcore.ErrTemporaryFailure) { - // skip + logger.Log.Error("error while observe: %v", err) return } else { panic(err) } } - replicas, _ := r.persistedSeqNos.Load(vbID) + replicas, ok := r.persistedSeqNos.Load(vbID) + if !ok { + logger.Log.Error("replicas of vbID: %v not found", vbID) + } if len(replicas) > replica { replicas[replica].SetSeqNo(result.PersistSeqNo) @@ -273,6 +277,8 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU VbID: vbID, SeqNo: r.getMinSeqNo(vbID), }) + } else { + logger.Log.Error("replica: %v not found", replica) } }) } diff --git a/metric/collector.go b/metric/collector.go index 09451c9..47b2741 100644 --- a/metric/collector.go +++ b/metric/collector.go @@ -1,6 +1,7 @@ package metric import ( + "github.com/couchbase/gocbcore/v10" "strconv" "github.com/Trendyol/go-dcp/models" @@ -24,6 +25,7 @@ type metricCollector struct { currentSeqNo *prometheus.Desc startSeqNo *prometheus.Desc endSeqNo *prometheus.Desc + persistSeqNo *prometheus.Desc processLatency *prometheus.Desc dcpLatency *prometheus.Desc @@ -55,6 +57,17 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { seqNoMap, err := s.client.GetVBucketSeqNos() + observer.GetPersistSeqNo().Range(func(vbID uint16, seqNo gocbcore.SeqNo) bool { + ch <- prometheus.MustNewConstMetric( + s.persistSeqNo, + prometheus.CounterValue, + float64(seqNo), + strconv.Itoa(int(vbID)), + ) + + return true + }) + observer.GetMetrics().Range(func(vbID uint16, metric *couchbase.ObserverMetric) bool { ch <- prometheus.MustNewConstMetric( s.mutation, @@ -254,6 +267,12 @@ func NewMetricCollector(client couchbase.Client, stream stream.Stream, vBucketDi []string{"vbId"}, nil, ), + persistSeqNo: prometheus.NewDesc( + prometheus.BuildFQName(helpers.Name, "persist_seq_no", "current"), + "Persist seq no", + []string{"vbId"}, + nil, + ), lag: prometheus.NewDesc( prometheus.BuildFQName(helpers.Name, "lag", "current"), "Lag",