Skip to content

Commit

Permalink
feat: support event tracing
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Jul 13, 2023
1 parent 895dd83 commit 24da846
Show file tree
Hide file tree
Showing 30 changed files with 2,779 additions and 351 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/observability/tracing"
"github.com/vanus-labs/vanus/pkg/cluster"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials/insecure"

// this project.
Expand Down Expand Up @@ -109,6 +110,7 @@ func Connect(endpoints []string) Client {
}
return &client{
Endpoints: endpoints,
tracer: tracing.NewTracer("client.client", trace.SpanKindClient),
}
}

Expand Down
7 changes: 1 addition & 6 deletions client/internal/vanus/eventbus/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package eventbus
import (
// standard libraries.
"context"

// third-party libraries.
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -46,9 +47,6 @@ type NameService struct {
}

func (ns *NameService) LookupWritableLogs(ctx context.Context, eventbusID uint64) ([]*record.Eventlog, error) {
ctx, span := ns.tracer.Start(ctx, "LookupWritableLogs")
defer span.End()

req := &wrapperspb.UInt64Value{
Value: eventbusID,
}
Expand All @@ -62,9 +60,6 @@ func (ns *NameService) LookupWritableLogs(ctx context.Context, eventbusID uint64
}

func (ns *NameService) LookupReadableLogs(ctx context.Context, eventbusID uint64) ([]*record.Eventlog, error) {
ctx, span := ns.tracer.Start(ctx, "LookupReadableLogs")
defer span.End()

req := &wrapperspb.UInt64Value{
Value: eventbusID,
}
Expand Down
6 changes: 0 additions & 6 deletions client/internal/vanus/eventlog/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ type NameService struct {
}

func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64) (*record.Segment, error) {
ctx, span := ns.tracer.Start(ctx, "LookupWritableSegment")
defer span.End()

req := &ctrlpb.GetAppendableSegmentRequest{
EventlogId: logID,
Limited: 1,
Expand All @@ -74,9 +71,6 @@ func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64)
}

func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64) ([]*record.Segment, error) {
ctx, span := ns.tracer.Start(ctx, "LookupReadableSegments")
defer span.End()

req := &ctrlpb.ListSegmentRequest{
EventlogId: logID,
StartOffset: 0,
Expand Down
4 changes: 1 addition & 3 deletions client/internal/vanus/net/rpc/bare/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ func (c *client) Get(ctx context.Context) (interface{}, error) {
if c.closed.Load() {
return nil, errors.ErrClosed
}
_ctx, span := c.tracer.Start(ctx, "Get")
defer span.End()

if client := c.cachedClient(); client != nil {
return client, nil
}
return c.refreshClient(_ctx, false)
return c.refreshClient(ctx, false)
}

func (c *client) cachedClient() interface{} {
Expand Down
13 changes: 2 additions & 11 deletions client/internal/vanus/store/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func (s *BlockStore) Close() {
func (s *BlockStore) Read(
ctx context.Context, block uint64, offset int64, size int16, pollingTimeout uint32,
) (*cloudevents.CloudEventBatch, error) {
ctx, span := s.tracer.Start(ctx, "Read")
defer span.End()

req := &segpb.ReadFromBlockRequest{
BlockId: block,
Offset: offset,
Expand All @@ -90,9 +87,6 @@ func (s *BlockStore) Read(
}

func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) {
ctx, span := s.tracer.Start(ctx, "LookupOffset")
defer span.End()

req := &segpb.LookupOffsetInBlockRequest{
BlockId: blockID,
Stime: t.UnixMilli(),
Expand All @@ -111,20 +105,17 @@ func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Ti
}

func (s *BlockStore) Append(ctx context.Context, block uint64, events *cloudevents.CloudEventBatch) ([]int64, error) {
_ctx, span := s.tracer.Start(ctx, "Append")
defer span.End()

req := &segpb.AppendToBlockRequest{
BlockId: block,
Events: events,
}

client, err := s.client.Get(_ctx)
client, err := s.client.Get(ctx)
if err != nil {
return nil, err
}

res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req)
res, err := client.(segpb.SegmentServerClient).AppendToBlock(ctx, req)
if err != nil {
return nil, err
}
Expand Down
36 changes: 6 additions & 30 deletions client/pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,10 @@ func NewEventbus(cfg *eb.Config) *eventbus {
break
}

ctx, span := bus.tracer.Start(context.Background(), "updateWritableLogsTask")
if bus.writableWatcher != nil {
bus.updateWritableLogs(ctx, re)
bus.updateWritableLogs(context.Background(), re)
}

bus.writableWatcher.Wakeup()
span.End()
}
}()
bus.writableWatcher.Start()
Expand All @@ -88,13 +85,10 @@ func NewEventbus(cfg *eb.Config) *eventbus {
break
}

ctx, span := bus.tracer.Start(context.Background(), "updateReadableLogsTask")
if bus.readableWatcher != nil {
bus.updateReadableLogs(ctx, re)
bus.updateReadableLogs(context.Background(), re)
}

bus.readableWatcher.Wakeup()
span.End()
}
}()
bus.readableWatcher.Start()
Expand Down Expand Up @@ -272,9 +266,6 @@ func (b *eventbus) isNeedUpdateWritableLogs(err error) bool {
}

func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResult) {
_, span := b.tracer.Start(ctx, "updateWritableLogs")
defer span.End()

if !b.isNeedUpdateWritableLogs(re.Err) {
return
}
Expand Down Expand Up @@ -333,10 +324,7 @@ func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) eventlog.Ev
}

func (b *eventbus) refreshWritableLogs(ctx context.Context) {
_ctx, span := b.tracer.Start(ctx, "refreshWritableLogs")
defer span.End()

_ = b.writableWatcher.Refresh(_ctx)
_ = b.writableWatcher.Refresh(ctx)
}

func (b *eventbus) getReadableState() error {
Expand Down Expand Up @@ -364,9 +352,6 @@ func (b *eventbus) isNeedUpdateReadableLogs(err error) bool {
}

func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResult) {
_, span := b.tracer.Start(ctx, "updateReadableLogs")
defer span.End()

if !b.isNeedUpdateReadableLogs(re.Err) {
return
}
Expand Down Expand Up @@ -425,10 +410,7 @@ func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) eventlog.Ev
}

func (b *eventbus) refreshReadableLogs(ctx context.Context) {
_ctx, span := b.tracer.Start(ctx, "refreshReadableLogs")
defer span.End()

_ = b.readableWatcher.Refresh(_ctx)
_ = b.readableWatcher.Refresh(ctx)
}

type busWriter struct {
Expand Down Expand Up @@ -479,15 +461,12 @@ func (w *busWriter) Bus() api.Eventbus {
}

func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions) (eventlog.LogWriter, error) {
_ctx, span := w.tracer.Start(ctx, "pickWritableLog")
defer span.End()

l, err := opts.Policy.NextLog(ctx)
if err != nil {
return nil, err
}

lw := w.ebus.getWritableLog(_ctx, l.ID())
lw := w.ebus.getWritableLog(ctx, l.ID())
if lw == nil {
return nil, stderrors.New("can not pick writable log")
}
Expand Down Expand Up @@ -554,14 +533,11 @@ func (r *busReader) Bus() api.Eventbus {
}

func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions) (eventlog.LogReader, error) {
_ctx, span := r.tracer.Start(ctx, "pickReadableLog")
defer span.End()

l, err := opts.Policy.NextLog(ctx)
if err != nil {
return nil, err
}
lr := r.ebus.getReadableLog(_ctx, l.ID())
lr := r.ebus.getReadableLog(ctx, l.ID())
if lr == nil {
return nil, stderrors.New("can not pick readable log")
}
Expand Down
11 changes: 3 additions & 8 deletions client/pkg/eventlog/eventlog_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@ func NewEventlog(cfg *el.Config) Eventlog {
break
}

ctx, span := l.tracer.Start(context.Background(), "updateReadableSegmentsTask")
if r != nil {
l.updateWritableSegment(ctx, r)
l.updateWritableSegment(context.Background(), r)
}

l.writableWatcher.Wakeup()
span.End()
}
}()
l.writableWatcher.Start()
Expand All @@ -84,13 +81,11 @@ func NewEventlog(cfg *el.Config) Eventlog {
Msg("eventlog quits readable watcher")
break
}
ctx, span := l.tracer.Start(context.Background(), "updateReadableSegmentsTask")

if rs != nil {
l.updateReadableSegments(ctx, rs)
l.updateReadableSegments(context.Background(), rs)
}

l.readableWatcher.Wakeup()
span.End()
}
}()
l.readableWatcher.Start()
Expand Down
11 changes: 1 addition & 10 deletions client/pkg/eventlog/log_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ func (s *segment) Update(ctx context.Context, r *record.Segment, towrite bool) e
return nil
}

_, span := s.tracer.Start(ctx, "Update")
defer span.End()

switchBlock := func() bool {
if towrite {
if s.prefer.id != r.LeaderBlockID {
Expand All @@ -155,14 +152,11 @@ func (s *segment) Update(ctx context.Context, r *record.Segment, towrite bool) e
}

func (s *segment) Append(ctx context.Context, event *cloudevents.CloudEventBatch) ([]int64, error) {
_ctx, span := s.tracer.Start(ctx, "Append")
defer span.End()

b := s.preferSegmentBlock()
if b == nil {
return nil, errors.ErrNotLeader
}
offs, err := b.Append(_ctx, event)
offs, err := b.Append(ctx, event)
if err != nil {
return nil, err
}
Expand All @@ -176,9 +170,6 @@ func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeo
if from < s.startOffset {
return nil, errors.ErrOffsetUnderflow
}
ctx, span := s.tracer.Start(ctx, "Read")
defer span.End()

if eo := s.endOffset.Load(); eo >= 0 {
if from > eo {
return nil, errors.ErrOffsetOverflow
Expand Down
8 changes: 4 additions & 4 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func main() {
}

ctx := signal.SetupSignalContext()
ga := gateway.NewGateway(*cfg)
cfg.Observability.T.ServerName = "Vanus Gateway"
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetGatewayMetrics)

ga := gateway.NewGateway(*cfg)
if err = ga.Start(ctx); err != nil {
log.Error().Err(err).Msg("start gateway failed")
os.Exit(-1)
}

cfg.Observability.T.ServerName = "Vanus Gateway"
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetGatewayMetrics)
log.Info(ctx).Msg("Gateway has started")

select {
case <-ctx.Done():
log.Info(ctx).Msg("received system signal, preparing exit")
Expand Down
1 change: 1 addition & 0 deletions cmd/trigger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func main() {
os.Exit(-1)
}
ctx := signal.SetupSignalContext()
cfg.Observability.T.ServerName = "Vanus Trigger"
_ = observability.Initialize(ctx, cfg.Observability, metrics.GetTriggerMetrics)
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ require (
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
)

require github.com/vanus-labs/sdk/golang v0.4.7 // indirect

require (
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
Expand Down Expand Up @@ -127,4 +129,4 @@ replace (
github.com/vanus-labs/vanus/raft => ./raft
)

replace github.com/vanus-labs/sdk/golang => ./FORBIDDEN_DEPENDENCY
// replace github.com/vanus-labs/sdk/golang => ./FORBIDDEN_DEPENDENCY
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/vanus-labs/sdk/golang v0.4.7 h1:SIWuyguOX4t0Jve9U7g6A2A4knFjoIW+9UhMsji8jIY=
github.com/vanus-labs/sdk/golang v0.4.7/go.mod h1:QpmncLBj1i1rtZmqSsoZyWN4l6odMHpSZmbad4GBuQQ=
github.com/vigneshuvi/GoDateFormat v0.0.0-20210204121036-67364dc23c79 h1:37VzBuFO88QQnCEu+G41v9IqgJNBXR+4vR9vGwVqJ00=
github.com/vigneshuvi/GoDateFormat v0.0.0-20210204121036-67364dc23c79/go.mod h1:190gFTWxRNREiiPal7zWZlNrwFSpv3BxDmOfgYqoYCY=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
10 changes: 10 additions & 0 deletions internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package convert

import (
// standard libraries.
"encoding/base64"
"encoding/binary"
"errors"

// third-party libraries.
Expand Down Expand Up @@ -539,3 +541,11 @@ func ToPbTransformer(transformer *primitive.Transformer) *pb.Transformer {
Template: transformer.Template.Template,
}
}

func DecodeEventID(eid string) (uint64, uint64, error) {
bytes, err := base64.StdEncoding.DecodeString(eid)
if err != nil {
return 0, 0, err
}
return binary.BigEndian.Uint64(bytes[0:8]), binary.BigEndian.Uint64(bytes[8:16]), nil
}
Loading

0 comments on commit 24da846

Please sign in to comment.