Skip to content

Commit

Permalink
feat(ARCO-276): Add more traces & tracing events
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim authored Nov 5, 2024
1 parent dd704e1 commit 08e0587
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 56 deletions.
5 changes: 4 additions & 1 deletion cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

optsServer := make([]metamorph.ServerOption, 0)
processorOpts := make([]metamorph.Option, 0)
callbackerOpts := make([]metamorph.CallbackerOption, 0)

if arcConfig.IsTracingEnabled() {
cleanup, err := tracing.Enable(logger, "metamorph", arcConfig.Tracing.DialAddr)
Expand All @@ -69,6 +70,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
}

optsServer = append(optsServer, metamorph.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
callbackerOpts = append(callbackerOpts, metamorph.WithCallbackerTracer(arcConfig.Tracing.KeyValueAttributes...))
}

stopFn := func() {
Expand Down Expand Up @@ -123,7 +125,8 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
stopFn()
return nil, fmt.Errorf("failed to create callbacker client: %v", err)
}
callbacker := metamorph.NewGrpcCallbacker(callbackerConn, procLogger)

callbacker := metamorph.NewGrpcCallbacker(callbackerConn, procLogger, callbackerOpts...)

processorOpts = append(processorOpts, metamorph.WithCacheExpiryTime(mtmConfig.ProcessorCacheExpiryTime),
metamorph.WithProcessExpiredTxsInterval(mtmConfig.UnseenTransactionRebroadcastingInterval),
Expand Down
41 changes: 33 additions & 8 deletions internal/metamorph/grpc_callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,59 @@ import (
"context"
"log/slog"

"go.opentelemetry.io/otel/attribute"

"github.com/bitcoin-sv/arc/internal/callbacker/callbacker_api"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/bitcoin-sv/arc/internal/tracing"
)

type GrpcCallbacker struct {
cc callbacker_api.CallbackerAPIClient
l *slog.Logger
cc callbacker_api.CallbackerAPIClient
l *slog.Logger
tracingEnabled bool
tracingAttributes []attribute.KeyValue
}

func WithCallbackerTracer(attr ...attribute.KeyValue) func(*GrpcCallbacker) {
return func(p *GrpcCallbacker) {
p.tracingEnabled = true
if len(attr) > 0 {
p.tracingAttributes = append(p.tracingAttributes, attr...)
}
}
}

func NewGrpcCallbacker(api callbacker_api.CallbackerAPIClient, logger *slog.Logger) GrpcCallbacker {
return GrpcCallbacker{
type CallbackerOption func(s *GrpcCallbacker)

func NewGrpcCallbacker(api callbacker_api.CallbackerAPIClient, logger *slog.Logger, opts ...CallbackerOption) GrpcCallbacker {
c := GrpcCallbacker{
cc: api,
l: logger,
}

for _, opt := range opts {
opt(&c)
}

return c
}

func (c GrpcCallbacker) SendCallback(tx *store.Data) {
if len(tx.Callbacks) == 0 {
func (c GrpcCallbacker) SendCallback(ctx context.Context, data *store.Data) {
ctx, span := tracing.StartTracing(ctx, "SendCallback", c.tracingEnabled, c.tracingAttributes...)
defer tracing.EndTracing(span)

if len(data.Callbacks) == 0 {
return
}

in := toGrpcInput(tx)
in := toGrpcInput(data)
if in == nil {
return
}

_, err := c.cc.SendCallback(context.Background(), in)
_, err := c.cc.SendCallback(ctx, in)
if err != nil {
c.l.Error("sending callback failed", slog.String("err", err.Error()), slog.Any("input", in))
}
Expand Down
15 changes: 11 additions & 4 deletions internal/metamorph/mocks/callback_sender_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type Processor struct {
type Option func(f *Processor)

type CallbackSender interface {
SendCallback(data *store.Data)
SendCallback(ctx context.Context, data *store.Data)
}

func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *PeerTxMessage, opts ...Option) (*Processor, error) {
Expand Down Expand Up @@ -298,18 +298,18 @@ func (p *Processor) StartProcessMinedCallbacks() {
}

func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) {
_, span := tracing.StartTracing(ctx, "updateMined", p.tracingEnabled, p.tracingAttributes...)
ctx, span := tracing.StartTracing(ctx, "updateMined", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)

updatedData, err := p.store.UpdateMined(p.ctx, txsBlocks)
updatedData, err := p.store.UpdateMined(ctx, txsBlocks)
if err != nil {
p.logger.Error("failed to register transactions", slog.String("err", err.Error()))
return
}

for _, data := range updatedData {
if len(data.Callbacks) > 0 {
p.callbackSender.SendCallback(data)
p.callbackSender.SendCallback(ctx, data)
}
}
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates,
}

if sendCallback && len(data.Callbacks) > 0 {
p.callbackSender.SendCallback(data)
p.callbackSender.SendCallback(ctx, data)
}
}
return nil
Expand Down Expand Up @@ -739,11 +739,11 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques
}

func (p *Processor) ProcessTransactions(ctx context.Context, sReq []*store.Data) {
_, span := tracing.StartTracing(ctx, "ProcessTransactions", p.tracingEnabled, p.tracingAttributes...)
ctx, span := tracing.StartTracing(ctx, "ProcessTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)

// store in database
err := p.store.SetBulk(p.ctx, sReq)
err := p.store.SetBulk(ctx, sReq)
if err != nil {
p.logger.Error("Failed to bulk store txs", slog.Int("number", len(sReq)), slog.String("err", err.Error()))
return
Expand Down
16 changes: 8 additions & 8 deletions internal/metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ import (
"testing"
"time"

"github.com/bitcoin-sv/arc/internal/cache"
"github.com/coocood/freecache"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/metamorph"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/mocks"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
storeMocks "github.com/bitcoin-sv/arc/internal/metamorph/store/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)

func TestNewProcessor(t *testing.T) {
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestStartSendStatusForTransaction(t *testing.T) {
pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}}

callbackSender := &mocks.CallbackSenderMock{
SendCallbackFunc: func(_ *store.Data) {
SendCallbackFunc: func(_ context.Context, _ *store.Data) {
callbackSent <- struct{}{}
},
}
Expand Down Expand Up @@ -871,7 +871,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) {
pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}}
minedTxsChan := make(chan *blocktx_api.TransactionBlock, 5)
callbackSender := &mocks.CallbackSenderMock{
SendCallbackFunc: func(_ *store.Data) {},
SendCallbackFunc: func(_ context.Context, _ *store.Data) {},
}
cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize))
sut, err := metamorph.NewProcessor(
Expand Down
21 changes: 15 additions & 6 deletions internal/metamorph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/ordishs/go-bitcoin"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -218,7 +219,16 @@ func toStoreData(hash *chainhash.Hash, statusReceived metamorph_api.Status, req
}
func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph_api.Status, data *store.Data, timeoutSeconds int64, txID string) *metamorph_api.TransactionStatus {
ctx, span := tracing.StartTracing(ctx, "processTransaction", s.tracingEnabled, s.tracingAttributes...)
defer tracing.EndTracing(span)
returnedStatus := &metamorph_api.TransactionStatus{
Txid: txID,
Status: metamorph_api.Status_RECEIVED,
}
defer func() {
if span != nil {
span.SetAttributes(attribute.String("final-status", returnedStatus.Status.String()), attribute.String("TxID", returnedStatus.Txid), attribute.Bool("time-out", returnedStatus.TimedOut))
}
tracing.EndTracing(span)
}()

responseChannel := make(chan StatusAndError, 10)

Expand All @@ -241,11 +251,6 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph
waitForStatus = metamorph_api.Status_SEEN_ON_NETWORK
}

returnedStatus := &metamorph_api.TransactionStatus{
Txid: txID,
Status: metamorph_api.Status_RECEIVED,
}

// Return the status if it has greater or equal value
if returnedStatus.GetStatus() >= waitForStatus {
return returnedStatus
Expand All @@ -260,6 +265,10 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph
case res := <-responseChannel:
returnedStatus.Status = res.Status

if span != nil {
span.AddEvent("status change", trace.WithAttributes(attribute.String("status", returnedStatus.Status.String())))
}

if len(res.CompetingTxs) > 0 {
returnedStatus.CompetingTxs = res.CompetingTxs
}
Expand Down
9 changes: 9 additions & 0 deletions internal/metamorph/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,9 @@ func (p *PostgreSQL) GetSeenOnNetwork(ctx context.Context, since time.Time, unti
}

func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.UpdateStatus) ([]*store.Data, error) {
ctx, span := tracing.StartTracing(ctx, "UpdateStatusBulk", p.tracingEnabled, append(p.tracingAttributes, attribute.Int("updates", len(updates)))...)
defer tracing.EndTracing(span)

txHashes := make([][]byte, len(updates))
statuses := make([]metamorph_api.Status, len(updates))
rejectReasons := make([]string, len(updates))
Expand Down Expand Up @@ -673,6 +676,9 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
}

func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.UpdateStatus) ([]*store.Data, error) {
ctx, span := tracing.StartTracing(ctx, "UpdateDoubleSpend", p.tracingEnabled, append(p.tracingAttributes, attribute.Int("updates", len(updates)))...)
defer tracing.EndTracing(span)

qBulk := `
UPDATE metamorph.transactions
SET
Expand Down Expand Up @@ -780,6 +786,9 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda
}

func (p *PostgreSQL) UpdateMined(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) ([]*store.Data, error) {
ctx, span := tracing.StartTracing(ctx, "UpdateMined", p.tracingEnabled, append(p.tracingAttributes, attribute.Int("updates", len(txsBlocks)))...)
defer tracing.EndTracing(span)

if txsBlocks == nil {
return nil, nil
}
Expand Down
3 changes: 0 additions & 3 deletions internal/tracing/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func Enable(logger *slog.Logger, serviceName string, tracingAddr string) (func()
return nil, fmt.Errorf("failed to create trace provider: %v", err)
}

otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

cleanup := func() {
err = exporter.Shutdown(ctx)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions test/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ messageQueue:
tracing:
enabled: false
dialAddr: http://arc-jaeger:4317
attributes:
testKey1: testAttribute1
testKey2: testAttribute2

peerRpc:
password: bitcoin
Expand Down
Loading

0 comments on commit 08e0587

Please sign in to comment.