diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 6d6958220..71ca97118 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -59,6 +59,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore optsServer := make([]metamorph.ServerOption, 0) processorOpts := make([]metamorph.Option, 0) + callbackerOpts := make([]metamorph.CallbackerOption, 0) if arcConfig.IsTracingEnabled() { cleanup, err := tracing.Enable(logger, "metamorph", arcConfig.Tracing.DialAddr) @@ -69,6 +70,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore } optsServer = append(optsServer, metamorph.WithTracer(arcConfig.Tracing.KeyValueAttributes...)) + callbackerOpts = append(callbackerOpts, metamorph.WithCallbackerTracer(arcConfig.Tracing.KeyValueAttributes...)) } stopFn := func() { @@ -123,7 +125,8 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore stopFn() return nil, fmt.Errorf("failed to create callbacker client: %v", err) } - callbacker := metamorph.NewGrpcCallbacker(callbackerConn, procLogger) + + callbacker := metamorph.NewGrpcCallbacker(callbackerConn, procLogger, callbackerOpts...) processorOpts = append(processorOpts, metamorph.WithCacheExpiryTime(mtmConfig.ProcessorCacheExpiryTime), metamorph.WithProcessExpiredTxsInterval(mtmConfig.UnseenTransactionRebroadcastingInterval), diff --git a/internal/metamorph/grpc_callbacker.go b/internal/metamorph/grpc_callbacker.go index 84e8f8d3e..e86c2b5f7 100644 --- a/internal/metamorph/grpc_callbacker.go +++ b/internal/metamorph/grpc_callbacker.go @@ -4,34 +4,59 @@ import ( "context" "log/slog" + "go.opentelemetry.io/otel/attribute" + "github.com/bitcoin-sv/arc/internal/callbacker/callbacker_api" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/bitcoin-sv/arc/internal/tracing" ) type GrpcCallbacker struct { - cc callbacker_api.CallbackerAPIClient - l *slog.Logger + cc callbacker_api.CallbackerAPIClient + l *slog.Logger + tracingEnabled bool + tracingAttributes []attribute.KeyValue +} + +func WithCallbackerTracer(attr ...attribute.KeyValue) func(*GrpcCallbacker) { + return func(p *GrpcCallbacker) { + p.tracingEnabled = true + if len(attr) > 0 { + p.tracingAttributes = append(p.tracingAttributes, attr...) + } + } } -func NewGrpcCallbacker(api callbacker_api.CallbackerAPIClient, logger *slog.Logger) GrpcCallbacker { - return GrpcCallbacker{ +type CallbackerOption func(s *GrpcCallbacker) + +func NewGrpcCallbacker(api callbacker_api.CallbackerAPIClient, logger *slog.Logger, opts ...CallbackerOption) GrpcCallbacker { + c := GrpcCallbacker{ cc: api, l: logger, } + + for _, opt := range opts { + opt(&c) + } + + return c } -func (c GrpcCallbacker) SendCallback(tx *store.Data) { - if len(tx.Callbacks) == 0 { +func (c GrpcCallbacker) SendCallback(ctx context.Context, data *store.Data) { + ctx, span := tracing.StartTracing(ctx, "SendCallback", c.tracingEnabled, c.tracingAttributes...) + defer tracing.EndTracing(span) + + if len(data.Callbacks) == 0 { return } - in := toGrpcInput(tx) + in := toGrpcInput(data) if in == nil { return } - _, err := c.cc.SendCallback(context.Background(), in) + _, err := c.cc.SendCallback(ctx, in) if err != nil { c.l.Error("sending callback failed", slog.String("err", err.Error()), slog.Any("input", in)) } diff --git a/internal/metamorph/mocks/callback_sender_mock.go b/internal/metamorph/mocks/callback_sender_mock.go index 12c0c56c7..310ae98ba 100644 --- a/internal/metamorph/mocks/callback_sender_mock.go +++ b/internal/metamorph/mocks/callback_sender_mock.go @@ -4,6 +4,7 @@ package mocks import ( + "context" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/store" "sync" @@ -19,7 +20,7 @@ var _ metamorph.CallbackSender = &CallbackSenderMock{} // // // make and configure a mocked metamorph.CallbackSender // mockedCallbackSender := &CallbackSenderMock{ -// SendCallbackFunc: func(data *store.Data) { +// SendCallbackFunc: func(ctx context.Context, data *store.Data) { // panic("mock out the SendCallback method") // }, // } @@ -30,12 +31,14 @@ var _ metamorph.CallbackSender = &CallbackSenderMock{} // } type CallbackSenderMock struct { // SendCallbackFunc mocks the SendCallback method. - SendCallbackFunc func(data *store.Data) + SendCallbackFunc func(ctx context.Context, data *store.Data) // calls tracks calls to the methods. calls struct { // SendCallback holds details about calls to the SendCallback method. SendCallback []struct { + // Ctx is the ctx argument value. + Ctx context.Context // Data is the data argument value. Data *store.Data } @@ -44,19 +47,21 @@ type CallbackSenderMock struct { } // SendCallback calls SendCallbackFunc. -func (mock *CallbackSenderMock) SendCallback(data *store.Data) { +func (mock *CallbackSenderMock) SendCallback(ctx context.Context, data *store.Data) { if mock.SendCallbackFunc == nil { panic("CallbackSenderMock.SendCallbackFunc: method is nil but CallbackSender.SendCallback was just called") } callInfo := struct { + Ctx context.Context Data *store.Data }{ + Ctx: ctx, Data: data, } mock.lockSendCallback.Lock() mock.calls.SendCallback = append(mock.calls.SendCallback, callInfo) mock.lockSendCallback.Unlock() - mock.SendCallbackFunc(data) + mock.SendCallbackFunc(ctx, data) } // SendCallbackCalls gets all the calls that were made to SendCallback. @@ -64,9 +69,11 @@ func (mock *CallbackSenderMock) SendCallback(data *store.Data) { // // len(mockedCallbackSender.SendCallbackCalls()) func (mock *CallbackSenderMock) SendCallbackCalls() []struct { + Ctx context.Context Data *store.Data } { var calls []struct { + Ctx context.Context Data *store.Data } mock.lockSendCallback.RLock() diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 623a5c9c8..dbce47395 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -108,7 +108,7 @@ type Processor struct { type Option func(f *Processor) type CallbackSender interface { - SendCallback(data *store.Data) + SendCallback(ctx context.Context, data *store.Data) } func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *PeerTxMessage, opts ...Option) (*Processor, error) { @@ -298,10 +298,10 @@ func (p *Processor) StartProcessMinedCallbacks() { } func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) { - _, span := tracing.StartTracing(ctx, "updateMined", p.tracingEnabled, p.tracingAttributes...) + ctx, span := tracing.StartTracing(ctx, "updateMined", p.tracingEnabled, p.tracingAttributes...) defer tracing.EndTracing(span) - updatedData, err := p.store.UpdateMined(p.ctx, txsBlocks) + updatedData, err := p.store.UpdateMined(ctx, txsBlocks) if err != nil { p.logger.Error("failed to register transactions", slog.String("err", err.Error())) return @@ -309,7 +309,7 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr for _, data := range updatedData { if len(data.Callbacks) > 0 { - p.callbackSender.SendCallback(data) + p.callbackSender.SendCallback(ctx, data) } } } @@ -496,7 +496,7 @@ func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, } if sendCallback && len(data.Callbacks) > 0 { - p.callbackSender.SendCallback(data) + p.callbackSender.SendCallback(ctx, data) } } return nil @@ -739,11 +739,11 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques } func (p *Processor) ProcessTransactions(ctx context.Context, sReq []*store.Data) { - _, span := tracing.StartTracing(ctx, "ProcessTransactions", p.tracingEnabled, p.tracingAttributes...) + ctx, span := tracing.StartTracing(ctx, "ProcessTransactions", p.tracingEnabled, p.tracingAttributes...) defer tracing.EndTracing(span) // store in database - err := p.store.SetBulk(p.ctx, sReq) + err := p.store.SetBulk(ctx, sReq) if err != nil { p.logger.Error("Failed to bulk store txs", slog.Int("number", len(sReq)), slog.String("err", err.Error())) return diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index 39a939d77..ea4d65e7d 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -11,21 +11,21 @@ import ( "testing" "time" - "github.com/bitcoin-sv/arc/internal/cache" "github.com/coocood/freecache" + "github.com/libsv/go-p2p" + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" + "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" "github.com/bitcoin-sv/arc/internal/metamorph/store" storeMocks "github.com/bitcoin-sv/arc/internal/metamorph/store/mocks" "github.com/bitcoin-sv/arc/internal/testdata" - "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/proto" ) func TestNewProcessor(t *testing.T) { @@ -495,7 +495,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} callbackSender := &mocks.CallbackSenderMock{ - SendCallbackFunc: func(_ *store.Data) { + SendCallbackFunc: func(_ context.Context, _ *store.Data) { callbackSent <- struct{}{} }, } @@ -871,7 +871,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} minedTxsChan := make(chan *blocktx_api.TransactionBlock, 5) callbackSender := &mocks.CallbackSenderMock{ - SendCallbackFunc: func(_ *store.Data) {}, + SendCallbackFunc: func(_ context.Context, _ *store.Data) {}, } cStore := cache.NewFreecacheStore(freecache.NewCache(baseCacheSize)) sut, err := metamorph.NewProcessor( diff --git a/internal/metamorph/server.go b/internal/metamorph/server.go index 3dc0e9a87..14efa528d 100644 --- a/internal/metamorph/server.go +++ b/internal/metamorph/server.go @@ -14,6 +14,7 @@ import ( "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/ordishs/go-bitcoin" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -218,7 +219,16 @@ func toStoreData(hash *chainhash.Hash, statusReceived metamorph_api.Status, req } func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph_api.Status, data *store.Data, timeoutSeconds int64, txID string) *metamorph_api.TransactionStatus { ctx, span := tracing.StartTracing(ctx, "processTransaction", s.tracingEnabled, s.tracingAttributes...) - defer tracing.EndTracing(span) + returnedStatus := &metamorph_api.TransactionStatus{ + Txid: txID, + Status: metamorph_api.Status_RECEIVED, + } + defer func() { + if span != nil { + span.SetAttributes(attribute.String("final-status", returnedStatus.Status.String()), attribute.String("TxID", returnedStatus.Txid), attribute.Bool("time-out", returnedStatus.TimedOut)) + } + tracing.EndTracing(span) + }() responseChannel := make(chan StatusAndError, 10) @@ -241,11 +251,6 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph waitForStatus = metamorph_api.Status_SEEN_ON_NETWORK } - returnedStatus := &metamorph_api.TransactionStatus{ - Txid: txID, - Status: metamorph_api.Status_RECEIVED, - } - // Return the status if it has greater or equal value if returnedStatus.GetStatus() >= waitForStatus { return returnedStatus @@ -260,6 +265,10 @@ func (s *Server) processTransaction(ctx context.Context, waitForStatus metamorph case res := <-responseChannel: returnedStatus.Status = res.Status + if span != nil { + span.AddEvent("status change", trace.WithAttributes(attribute.String("status", returnedStatus.Status.String()))) + } + if len(res.CompetingTxs) > 0 { returnedStatus.CompetingTxs = res.CompetingTxs } diff --git a/internal/metamorph/store/postgresql/postgres.go b/internal/metamorph/store/postgresql/postgres.go index 787bf70f1..f63aa3d25 100644 --- a/internal/metamorph/store/postgresql/postgres.go +++ b/internal/metamorph/store/postgresql/postgres.go @@ -584,6 +584,9 @@ func (p *PostgreSQL) GetSeenOnNetwork(ctx context.Context, since time.Time, unti } func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.UpdateStatus) ([]*store.Data, error) { + ctx, span := tracing.StartTracing(ctx, "UpdateStatusBulk", p.tracingEnabled, append(p.tracingAttributes, attribute.Int("updates", len(updates)))...) + defer tracing.EndTracing(span) + txHashes := make([][]byte, len(updates)) statuses := make([]metamorph_api.Status, len(updates)) rejectReasons := make([]string, len(updates)) @@ -673,6 +676,9 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat } func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.UpdateStatus) ([]*store.Data, error) { + ctx, span := tracing.StartTracing(ctx, "UpdateDoubleSpend", p.tracingEnabled, append(p.tracingAttributes, attribute.Int("updates", len(updates)))...) + defer tracing.EndTracing(span) + qBulk := ` UPDATE metamorph.transactions SET @@ -780,6 +786,9 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda } func (p *PostgreSQL) UpdateMined(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) ([]*store.Data, error) { + ctx, span := tracing.StartTracing(ctx, "UpdateMined", p.tracingEnabled, append(p.tracingAttributes, attribute.Int("updates", len(txsBlocks)))...) + defer tracing.EndTracing(span) + if txsBlocks == nil { return nil, nil } diff --git a/internal/tracing/init.go b/internal/tracing/init.go index ab4e43f34..e32358643 100644 --- a/internal/tracing/init.go +++ b/internal/tracing/init.go @@ -51,9 +51,6 @@ func Enable(logger *slog.Logger, serviceName string, tracingAddr string) (func() return nil, fmt.Errorf("failed to create trace provider: %v", err) } - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) - cleanup := func() { err = exporter.Shutdown(ctx) if err != nil { diff --git a/test/config/config.yaml b/test/config/config.yaml index 6ae343777..ae935e110 100644 --- a/test/config/config.yaml +++ b/test/config/config.yaml @@ -14,9 +14,6 @@ messageQueue: tracing: enabled: false dialAddr: http://arc-jaeger:4317 - attributes: - testKey1: testAttribute1 - testKey2: testAttribute2 peerRpc: password: bitcoin diff --git a/test/docker-compose.yaml b/test/docker-compose.yaml index b170d8454..d351aa099 100644 --- a/test/docker-compose.yaml +++ b/test/docker-compose.yaml @@ -170,7 +170,7 @@ services: - ./config/config.yaml:/service/config.yaml command: [ "./arc", "-blocktx=true", "-config=." ] environment: - - ARC_TRACING_DIALADDR + - ARC_TRACING_ENABLED depends_on: nats-1: condition: service_healthy @@ -199,7 +199,7 @@ services: - "8021" command: [ "./arc", "-callbacker=true", "-config=." ] environment: - - ARC_TRACING_DIALADDR + - ARC_TRACING_ENABLED volumes: - ./config/config.yaml:/service/config.yaml depends_on: @@ -217,7 +217,7 @@ services: - "8001" command: [ "./arc", "-metamorph=true", "-config=." ] environment: - - ARC_TRACING_DIALADDR + - ARC_TRACING_ENABLED volumes: - ./config/config.yaml:/service/config.yaml depends_on: @@ -227,6 +227,8 @@ services: condition: service_healthy arc-redis: condition: service_healthy + migrate-metamorph: + condition: service_completed_successfully healthcheck: test: ["CMD", "/bin/grpc_health_probe", "-addr=:8005", "-service=liveness", "-rpc-timeout=5s"] interval: 10s @@ -245,7 +247,7 @@ services: - "2112" command: [ "./arc", "-api=true", "-config=." ] environment: - - ARC_TRACING_DIALADDR + - ARC_TRACING_ENABLED volumes: - ./config/config.yaml:/service/config.yaml depends_on: diff --git a/test/submit_batch_test.go b/test/submit_batch_test.go index 260df5a1d..31523624b 100644 --- a/test/submit_batch_test.go +++ b/test/submit_batch_test.go @@ -2,16 +2,17 @@ package test import ( "fmt" - sdkTx "github.com/bitcoin-sv/go-sdk/transaction" - "github.com/bitcoin-sv/go-sdk/transaction/template/p2pkh" - "github.com/bitcoinsv/bsvutil" - - ec "github.com/bitcoin-sv/go-sdk/primitives/ec" "net/http" "testing" "time" + ec "github.com/bitcoin-sv/go-sdk/primitives/ec" "github.com/stretchr/testify/require" + + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" + "github.com/bitcoin-sv/go-sdk/transaction/template/p2pkh" + "github.com/bitcoinsv/bsvutil" + "github.com/stretchr/testify/assert" ) func TestBatchChainedTxs(t *testing.T) { @@ -22,7 +23,7 @@ func TestBatchChainedTxs(t *testing.T) { utxos := getUtxos(t, address) require.True(t, len(utxos) > 0, "No UTXOs available for the address") - txs, err := createTxChain(privateKey, utxos[0], 30) + txs, err := createTxChain(privateKey, utxos[0], 20) require.NoError(t, err) request := make([]TransactionRequest, len(txs)) @@ -37,8 +38,14 @@ func TestBatchChainedTxs(t *testing.T) { // Send POST request t.Logf("submitting batch of %d chained txs", len(txs)) resp := postRequest[TransactionResponseBatch](t, arcEndpointV1Txs, createPayload(t, request), nil, http.StatusOK) - for _, txResponse := range resp { - require.Equal(t, Status_SEEN_ON_NETWORK, txResponse.TxStatus) + hasFailed := false + for i, txResponse := range resp { + if !assert.Equal(t, Status_SEEN_ON_NETWORK, txResponse.TxStatus, fmt.Sprintf("index: %d", i)) { + hasFailed = true + } + } + if hasFailed { + t.FailNow() } time.Sleep(1 * time.Second) @@ -46,8 +53,13 @@ func TestBatchChainedTxs(t *testing.T) { // repeat request to ensure response remains the same t.Logf("re-submitting batch of %d chained txs", len(txs)) resp = postRequest[TransactionResponseBatch](t, arcEndpointV1Txs, createPayload(t, request), nil, http.StatusOK) - for _, txResponse := range resp { - require.Equal(t, Status_SEEN_ON_NETWORK, txResponse.TxStatus) + for i, txResponse := range resp { + if !assert.Equal(t, Status_SEEN_ON_NETWORK, txResponse.TxStatus, fmt.Sprintf("index: %d", i)) { + hasFailed = true + } + } + if hasFailed { + t.FailNow() } }) } diff --git a/test/submit_single_test.go b/test/submit_single_test.go index 99b72706d..cc7aaf7e7 100644 --- a/test/submit_single_test.go +++ b/test/submit_single_test.go @@ -239,8 +239,8 @@ func TestCallback(t *testing.T) { }, { name: "post transactions with multiple callbacks", - numberOfTxs: 8, - numberOfCallbackServers: 10, + numberOfTxs: 5, + numberOfCallbackServers: 3, }, } @@ -295,6 +295,7 @@ func TestCallback(t *testing.T) { // submit transactions for _, tx := range txs { for _, callbackSrv := range callbackServers { + time.Sleep(100 * time.Millisecond) testTxSubmission(t, callbackSrv.url, callbackSrv.token, false, tx) // This is to test the multiple submissions with the same callback URL and token // Expected behavior is that the callback should not be added to tx and the server should receive the callback only once