Skip to content

Commit

Permalink
feat: Tracing implementation
Browse files Browse the repository at this point in the history
* Implemented primitive tracing features
* Tidied up the project modules

Refs: #78
  • Loading branch information
Emre Uygun authored and emrygun committed Jan 11, 2025
1 parent 0bef4d3 commit 94ea7ae
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 16 deletions.
19 changes: 18 additions & 1 deletion couchbase/observer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package couchbase

import (
"context"
"github.com/Trendyol/go-dcp/tracing"
"reflect"
"time"

"github.com/asaskevich/EventBus"
Expand Down Expand Up @@ -63,6 +66,7 @@ type observer struct {
currentSnapshot *models.SnapshotMarker
collectionIDs map[uint32]string
metrics *ObserverMetric
tracer *tracing.TracerComponent
listener func(args models.ListenerArgs)
endListener func(context models.DcpStreamEndContext)
vbUUID gocbcore.VbUUID
Expand Down Expand Up @@ -146,7 +150,18 @@ func (so *observer) sendOrSkip(args models.ListenerArgs) {
return
}

so.listener(args)
opTrace := so.tracer.StartOpTelemeteryHandler(
"go-dcp-observer",
reflect.TypeOf(args.Event).Name(),
tracing.RequestSpanContext{RefCtx: context.Background(), Value: args.Event},
tracing.NewObserverLabels(so.vbID, so.collectionIDs),
)

tracingContextAwareListenerArgs := models.ListenerArgs{Event: args.Event, TraceContext: opTrace.RootContext()}

so.listener(tracingContextAwareListenerArgs)

opTrace.Finish()
}

func (so *observer) SnapshotMarker(event models.DcpSnapshotMarker) {
Expand Down Expand Up @@ -453,10 +468,12 @@ func NewObserver(
endListener func(context models.DcpStreamEndContext),
collectionIDs map[uint32]string,
bus EventBus.Bus,
tc *tracing.TracerComponent,
) Observer {
observer := &observer{
vbID: vbID,
metrics: &ObserverMetric{},
tracer: tc,
collectionIDs: collectionIDs,
listener: listener,
endListener: endListener,
Expand Down
4 changes: 4 additions & 0 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"errors"
"github.com/Trendyol/go-dcp/tracing"
"os"
"os/signal"
"reflect"
Expand Down Expand Up @@ -113,9 +114,12 @@ func (s *dcp) Start() {

s.vBucketDiscovery = stream.NewVBucketDiscovery(s.client, s.config, vBuckets, s.bus)

tc := tracing.NewTracerComponent()

s.stream = stream.NewStream(
s.client, s.metadata, s.config, s.version, s.bucketInfo, s.vBucketDiscovery,
s.listener, s.client.GetCollectionIDs(s.config.ScopeName, s.config.CollectionNames), s.stopCh, s.bus, s.eventHandler,
tc,
)

if s.config.LeaderElection.Enabled {
Expand Down
100 changes: 98 additions & 2 deletions dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dcp
import (
"context"
"fmt"
"github.com/couchbase/gocbcore/v10"
"github.com/testcontainers/testcontainers-go"
"os"
"strconv"
"strings"
Expand All @@ -21,8 +23,6 @@ import (
"github.com/Trendyol/go-dcp/config"

"github.com/Trendyol/go-dcp/couchbase"
"github.com/couchbase/gocbcore/v10"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

Expand Down Expand Up @@ -253,6 +253,89 @@ func test(t *testing.T, version string) {
logger.Log.Info("mock data stream finished with totalSize=%v", counter.Load())
}

func testWithTraces(t *testing.T, version string) {
chunkSize := 4
bulkSize := 1024
iteration := 512
mockDataSize := iteration * bulkSize * chunkSize
totalNotify := 10
notifySize := mockDataSize / totalNotify

c := getConfig()
c.ApplyDefaults()

ctx := context.Background()

container, err := setupContainer(c, ctx, version)
if err != nil {
t.Fatal(err)
}

var counter atomic.Int32
finish := make(chan struct{}, 1)

dcp, err := NewDcp(c, func(ctx *models.ListenerContext) {
if _, ok := ctx.Event.(models.DcpMutation); ok {
ctx.Ack()

// Traces
lt1 := ctx.ListenerTracerComponent.InitializeListenerTrace("test1", map[string]interface{}{})
time.Sleep(time.Second * 1)

lt11 := ctx.ListenerTracerComponent.CreateListenerTrace(lt1, "test1-1", map[string]interface{}{})
time.Sleep(time.Second * 1)
lt11.Finish()

lt12 := ctx.ListenerTracerComponent.CreateListenerTrace(lt1, "test1-2", map[string]interface{}{
"test1-2": "This is a test metadata",
})

time.Sleep(time.Second * 1)
lt121 := ctx.ListenerTracerComponent.CreateListenerTrace(lt12, "test1-2-1", map[string]interface{}{})
time.Sleep(time.Millisecond * 100)
lt121.Finish()

time.Sleep(time.Millisecond * 300)
lt12.Finish()

lt1.Finish()

val := int(counter.Add(1))

if val%notifySize == 0 {
logger.Log.Info("%v/%v processed", val/notifySize, totalNotify)
}

if val == mockDataSize {
finish <- struct{}{}
}

}
})
if err != nil {
t.Fatal(err)
}

go func() {
<-dcp.WaitUntilReady()
insertDataToContainer(c, t, iteration, chunkSize, bulkSize)
}()

go func() {
<-finish
dcp.Close()
}()

dcp.Start()

err = container.Terminate(ctx)
if err != nil {
t.Fatal(err)
}

logger.Log.Info("mock data stream finished with totalSize=%v", counter.Load())
}

func TestDcp(t *testing.T) {
version := os.Getenv("CB_VERSION")

Expand All @@ -265,6 +348,19 @@ func TestDcp(t *testing.T) {
})
}

func TestDcpWithTraces(t *testing.T) {
t.Skip()
version := os.Getenv("CB_VERSION")

if version == "" {
t.Skip("Skipping test")
}

t.Run(version, func(t *testing.T) {
testWithTraces(t, version)
})
}

func TestNewDcpConfigWithEnvVariables(t *testing.T) {
os.Setenv("DCP_USERNAME", "envUser")
os.Setenv("DCP_PASSWORD", "envPass")
Expand Down
1 change: 1 addition & 0 deletions example/grafana/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

14 changes: 10 additions & 4 deletions models/listeners.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package models

import (
"github.com/Trendyol/go-dcp/tracing"
)

type ListenerContext struct {
Commit func()
Event interface{}
Ack func()
Commit func()
Event interface{}
Ack func()
ListenerTracerComponent tracing.ListenerTracerComponent
}

type ListenerArgs struct {
Event interface{}
Event interface{}
TraceContext tracing.RequestSpanContext
}

type DcpStreamEndContext struct {
Expand Down
23 changes: 14 additions & 9 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stream
import (
"errors"
"fmt"
"github.com/Trendyol/go-dcp/tracing"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -54,20 +55,21 @@ type stream struct {
vBucketDiscovery VBucketDiscovery
bus EventBus.Bus
eventHandler models.EventHandler
bucketInfo *couchbase.BucketInfo
observers *wrapper.ConcurrentSwissMap[uint16, couchbase.Observer]
config *config.Dcp
metric *Metric
rebalanceTimer *time.Timer
vbIDRange *models.VbIDRange
dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool]
stopCh chan struct{}
listener models.Listener
config *config.Dcp
bucketInfo *couchbase.BucketInfo
finishStreamWithEndEventCh chan struct{}
finishStreamWithCloseCh chan struct{}
offsets *wrapper.ConcurrentSwissMap[uint16, *models.Offset]
metric *Metric
observers *wrapper.ConcurrentSwissMap[uint16, couchbase.Observer]
collectionIDs map[uint32]string
streamEndNotSupportedData *streamEndNotSupportedData
tracerComponent *tracing.TracerComponent
rebalanceLock sync.Mutex
activeStreams atomic.Int32
streamFinishedWithCloseCh bool
Expand Down Expand Up @@ -105,7 +107,7 @@ func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
}
}

func (s *stream) waitAndForward(payload interface{}, offset *models.Offset, vbID uint16, eventTime time.Time) {
func (s *stream) waitAndForward(payload interface{}, spanCtx tracing.RequestSpanContext, offset *models.Offset, vbID uint16, eventTime time.Time) {
if helpers.IsMetadata(payload) {
s.setOffset(vbID, offset, false)
return
Expand All @@ -120,6 +122,7 @@ func (s *stream) waitAndForward(payload interface{}, offset *models.Offset, vbID
s.setOffset(vbID, offset, true)
s.anyDirtyOffset = true
},
ListenerTracerComponent: s.tracerComponent.NewListenerTracerComponent(spanCtx),
}

start := time.Now()
Expand All @@ -132,11 +135,11 @@ func (s *stream) waitAndForward(payload interface{}, offset *models.Offset, vbID
func (s *stream) listen(args models.ListenerArgs) {
switch v := args.Event.(type) {
case models.DcpMutation:
s.waitAndForward(v, v.Offset, v.VbID, v.EventTime)
s.waitAndForward(v, args.TraceContext, v.Offset, v.VbID, v.EventTime)
case models.DcpDeletion:
s.waitAndForward(v, v.Offset, v.VbID, v.EventTime)
s.waitAndForward(v, args.TraceContext, v.Offset, v.VbID, v.EventTime)
case models.DcpExpiration:
s.waitAndForward(v, v.Offset, v.VbID, v.EventTime)
s.waitAndForward(v, args.TraceContext, v.Offset, v.VbID, v.EventTime)
case models.DcpSeqNoAdvanced:
s.setOffset(v.VbID, v.Offset, true)
case models.DcpCollectionCreation:
Expand Down Expand Up @@ -240,7 +243,7 @@ func (s *stream) Open() {
for _, vbID := range vbIDs {
s.observers.Store(
vbID,
couchbase.NewObserver(s.config, vbID, s.listen, s.listenEnd, s.collectionIDs, s.bus),
couchbase.NewObserver(s.config, vbID, s.listen, s.listenEnd, s.collectionIDs, s.bus, s.tracerComponent),
)
}

Expand Down Expand Up @@ -457,6 +460,7 @@ func NewStream(client couchbase.Client,
stopCh chan struct{},
bus EventBus.Bus,
eventHandler models.EventHandler,
tc *tracing.TracerComponent,
) Stream {
stream := &stream{
client: client,
Expand All @@ -472,6 +476,7 @@ func NewStream(client couchbase.Client,
bus: bus,
eventHandler: eventHandler,
metric: &Metric{},
tracerComponent: tc,
}

if version.Lower(couchbase.SrvVer550) {
Expand Down
Loading

0 comments on commit 94ea7ae

Please sign in to comment.