diff --git a/CHANGELOG.md b/CHANGELOG.md index 19a05d1be..7e9191917 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,9 @@ All notable changes to this project will be documented in this file. The format ## [Unreleased] +### Changed +- Callbacks are sent one by one to the same URL. In the previous implementation, each callback request created a new goroutine to send the callback, which could result in a potential DDoS of the callback receiver. The new approach sends callbacks to the same receiver in a serial manner. Note that URLs are not locked by the `callbacker` instance, so serial sends occur only within a single instance. In other words, the level of parallelism is determined by the number of `callbacker` instances. + ## [1.3.0] - 2024-08-21 ### Changed diff --git a/Makefile b/Makefile index fbf337a2a..e7e4a344f 100644 --- a/Makefile +++ b/Makefile @@ -113,7 +113,7 @@ install_gen: go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.5.1 go install github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen@v2.3.0 - go install github.com/matryer/moq@v0.3.4 + go install github.com/matryer/moq@v0.4.0 .PHONY: docs docs: diff --git a/README.md b/README.md index baed6a40a..354eda198 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ ARC is a transaction processor for Bitcoin that keeps track of the life cycle of - [ZMQ](#zmq) - [BlockTx](#blocktx) - [BlockTx stores](#blocktx-stores) + - [Callbacker](#callbacker) - [Message Queue](#message-queue) - [K8s-Watcher](#k8s-watcher) - [Broadcaster-cli](#broadcaster-cli) @@ -92,6 +93,9 @@ where options are: -k8s-watcher= whether to start k8s-watcher (default=true) + -callbacker= + whether to start callbacker (default=true) + -config=/location directory to look for config.yaml (default='') @@ -248,6 +252,17 @@ Metamorph publishes new transactions to the message queue and BlockTx subscribes ![Message Queue](./doc/message_queue.png) +### Callbacker + +Callbacker is a microservice that sends callbacks to a specified URL. + +Callbacker is designed to be horizontally scalable, with each instance operating independently. As a result, they do not communicate with each other and remain unaware of each other's existence. + +You can run callbacker like this: + +```shell +go run main.go -callbacker=true +``` ## K8s-Watcher diff --git a/cmd/arc/services/callbacker.go b/cmd/arc/services/callbacker.go index 06e2ae57a..99ac984c2 100644 --- a/cmd/arc/services/callbacker.go +++ b/cmd/arc/services/callbacker.go @@ -14,25 +14,26 @@ import ( "google.golang.org/grpc/reflection" ) -func StartCallbacker(logger *slog.Logger, config *config.ArcConfig) (func(), error) { +func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), error) { logger = logger.With(slog.String("service", "callbacker")) logger.Info("Starting") - callbackSrv, err := callbacker.NewSender(&http.Client{Timeout: 5 * time.Second}, logger) + config := appConfig.Callbacker + + callbackSender, err := callbacker.NewSender(&http.Client{Timeout: 5 * time.Second}, logger) if err != nil { return nil, fmt.Errorf("callbacker failed: %v", err) } - srvOpts := []callbacker.ServerOption{ - callbacker.WithLogger(logger.With(slog.String("module", "callbacker-server"))), - } - server := callbacker.NewServer(callbackSrv, srvOpts...) - err = server.Serve(config.Callbacker.ListenAddr, config.GrpcMessageSize, config.PrometheusEndpoint) + callbackDispatcher := callbacker.NewCallbackDispatcher(callbackSender, config.Pause) + + server := callbacker.NewServer(callbackDispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server")))) + err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint) if err != nil { return nil, fmt.Errorf("GRPCServer failed: %v", err) } - healthServer, err := StartHealthServerCallbacker(server, config.Callbacker.Health, logger) + healthServer, err := StartHealthServerCallbacker(server, config.Health, logger) if err != nil { return nil, fmt.Errorf("failed to start health server: %v", err) } @@ -40,8 +41,13 @@ func StartCallbacker(logger *slog.Logger, config *config.ArcConfig) (func(), err stopFn := func() { logger.Info("Shutting down callbacker") - server.Shutdown() - callbackSrv.GracefulStop() + // dispose of dependencies in the correct order: + // 1. server - ensure no new callbacks will be received + // 2. dispatcher - ensure all already accepted callbacks are proccessed + // 3. sender - finally, stop the sender as there are no callbacks left to send. + server.GracefulStop() + callbackDispatcher.GracefulStop() + callbackSender.GracefulStop() healthServer.Stop() diff --git a/config/config.go b/config/config.go index b559799e5..0130c414b 100644 --- a/config/config.go +++ b/config/config.go @@ -126,4 +126,5 @@ type CallbackerConfig struct { ListenAddr string `mapstructure:"listenAddr"` DialAddr string `mapstructure:"dialAddr"` Health *HealthConfig `mapstructure:"health"` + Pause time.Duration `mapstructure:"pause"` } diff --git a/config/defaults.go b/config/defaults.go index cb0362c49..68ba52454 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -170,5 +170,6 @@ func getCallbackerConfig() *CallbackerConfig { Health: &HealthConfig{ SeverDialAddr: "localhost:8025", }, + Pause: 0, } } diff --git a/config/example_config.yaml b/config/example_config.yaml index ac02c2a67..00d05734e 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -124,4 +124,5 @@ callbacker: listenAddr: localhost:8021 # address space for callbacker to listen on. Can be for example localhost:8021 or :8021 for listening on all addresses dialAddr: localhost:8021 # address for other services to dial callbacker service health: - serverDialAddr: localhost:8025 # address at which the grpc health server is exposed \ No newline at end of file + serverDialAddr: localhost:8025 # address at which the grpc health server is exposed + pause: 0s # pause between sending next callback to the same receiver \ No newline at end of file diff --git a/go.mod b/go.mod index 580050fa6..793687ad3 100644 --- a/go.mod +++ b/go.mod @@ -144,15 +144,16 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.25.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect - golang.org/x/net v0.27.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.23.0 // indirect - golang.org/x/term v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/term v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.6.0 // indirect + golang.org/x/tools v0.24.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/go.sum b/go.sum index da414072e..074a2a039 100644 --- a/go.sum +++ b/go.sum @@ -389,8 +389,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -400,8 +400,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -423,22 +423,22 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= -golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/callbacker/callbacker_mock.go b/internal/callbacker/callbacker_mock.go new file mode 100644 index 000000000..a189c8bb0 --- /dev/null +++ b/internal/callbacker/callbacker_mock.go @@ -0,0 +1,123 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package callbacker + +import ( + "sync" +) + +// Ensure, that CallbackerIMock does implement CallbackerI. +// If this is not the case, regenerate this file with moq. +var _ CallbackerI = &CallbackerIMock{} + +// CallbackerIMock is a mock implementation of CallbackerI. +// +// func TestSomethingThatUsesCallbackerI(t *testing.T) { +// +// // make and configure a mocked CallbackerI +// mockedCallbackerI := &CallbackerIMock{ +// HealthFunc: func() error { +// panic("mock out the Health method") +// }, +// SendFunc: func(url string, token string, callback *Callback) { +// panic("mock out the Send method") +// }, +// } +// +// // use mockedCallbackerI in code that requires CallbackerI +// // and then make assertions. +// +// } +type CallbackerIMock struct { + // HealthFunc mocks the Health method. + HealthFunc func() error + + // SendFunc mocks the Send method. + SendFunc func(url string, token string, callback *Callback) + + // calls tracks calls to the methods. + calls struct { + // Health holds details about calls to the Health method. + Health []struct { + } + // Send holds details about calls to the Send method. + Send []struct { + // URL is the url argument value. + URL string + // Token is the token argument value. + Token string + // Callback is the callback argument value. + Callback *Callback + } + } + lockHealth sync.RWMutex + lockSend sync.RWMutex +} + +// Health calls HealthFunc. +func (mock *CallbackerIMock) Health() error { + if mock.HealthFunc == nil { + panic("CallbackerIMock.HealthFunc: method is nil but CallbackerI.Health was just called") + } + callInfo := struct { + }{} + mock.lockHealth.Lock() + mock.calls.Health = append(mock.calls.Health, callInfo) + mock.lockHealth.Unlock() + return mock.HealthFunc() +} + +// HealthCalls gets all the calls that were made to Health. +// Check the length with: +// +// len(mockedCallbackerI.HealthCalls()) +func (mock *CallbackerIMock) HealthCalls() []struct { +} { + var calls []struct { + } + mock.lockHealth.RLock() + calls = mock.calls.Health + mock.lockHealth.RUnlock() + return calls +} + +// Send calls SendFunc. +func (mock *CallbackerIMock) Send(url string, token string, callback *Callback) { + if mock.SendFunc == nil { + panic("CallbackerIMock.SendFunc: method is nil but CallbackerI.Send was just called") + } + callInfo := struct { + URL string + Token string + Callback *Callback + }{ + URL: url, + Token: token, + Callback: callback, + } + mock.lockSend.Lock() + mock.calls.Send = append(mock.calls.Send, callInfo) + mock.lockSend.Unlock() + mock.SendFunc(url, token, callback) +} + +// SendCalls gets all the calls that were made to Send. +// Check the length with: +// +// len(mockedCallbackerI.SendCalls()) +func (mock *CallbackerIMock) SendCalls() []struct { + URL string + Token string + Callback *Callback +} { + var calls []struct { + URL string + Token string + Callback *Callback + } + mock.lockSend.RLock() + calls = mock.calls.Send + mock.lockSend.RUnlock() + return calls +} diff --git a/internal/callbacker/callbacker_mocks.go b/internal/callbacker/callbacker_mocks.go new file mode 100644 index 000000000..8179a7912 --- /dev/null +++ b/internal/callbacker/callbacker_mocks.go @@ -0,0 +1,4 @@ +package callbacker + +// from callbacker.go +//go:generate moq -out ./callbacker_mock.go ./ CallbackerI diff --git a/internal/callbacker/dispatcher.go b/internal/callbacker/dispatcher.go new file mode 100644 index 000000000..c33cbec36 --- /dev/null +++ b/internal/callbacker/dispatcher.go @@ -0,0 +1,108 @@ +package callbacker + +import ( + "sync" + "time" +) + +type CallbackDispatcher struct { + c CallbackerI + managers map[string]*sendManager + mu sync.Mutex + + sleep time.Duration +} + +func NewCallbackDispatcher(callbacker CallbackerI, sleepDuration time.Duration) *CallbackDispatcher { + return &CallbackDispatcher{ + c: callbacker, + sleep: sleepDuration, + managers: make(map[string]*sendManager), + } +} + +func (d *CallbackDispatcher) Send(url, token string, dto *Callback) { + d.dispatch(url, token, dto) +} + +func (d *CallbackDispatcher) Health() error { + return d.c.Health() +} + +func (d *CallbackDispatcher) GracefulStop() { + d.mu.Lock() + defer d.mu.Unlock() + + for _, m := range d.managers { + m.GracefulStop() + } +} + +func (d *CallbackDispatcher) dispatch(url, token string, dto *Callback) { + d.mu.Lock() + m, ok := d.managers[url] + + if !ok { + m = runNewSendManager(url, d.c, d.sleep) + d.managers[url] = m + } + d.mu.Unlock() + + m.Add(token, dto) +} + +type sendManager struct { + url string + c CallbackerI + + wg sync.WaitGroup + ch chan *callbackEntry + + sleep time.Duration +} + +type callbackEntry struct { + token string + data *Callback +} + +func runNewSendManager(u string, c CallbackerI, s time.Duration) *sendManager { + m := &sendManager{ + url: u, + c: c, + sleep: s, + + ch: make(chan *callbackEntry), + } + + m.run() + return m +} + +func (m *sendManager) Add(token string, dto *Callback) { + m.wg.Add(1) + go func() { + m.ch <- &callbackEntry{token: token, data: dto} + }() +} + +func (m *sendManager) GracefulStop() { + m.wg.Wait() // wait for all accepted callbacks to be consumed + close(m.ch) // signal the `run` goroutine to exit +} + +func (m *sendManager) run() { + go func() { + for { + callback, ok := <-m.ch + if !ok { + return // exit the goroutine when channel is closed + } + + m.c.Send(m.url, callback.token, callback.data) + m.wg.Done() + + time.Sleep(m.sleep) + } + }() +} diff --git a/internal/callbacker/dispatcher_test.go b/internal/callbacker/dispatcher_test.go new file mode 100644 index 000000000..c2df8ca69 --- /dev/null +++ b/internal/callbacker/dispatcher_test.go @@ -0,0 +1,131 @@ +package callbacker + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_CallbackDispatcher(t *testing.T) { + tcs := []struct { + name string + sendInterval time.Duration + numOfReceivers int + numOfSendPerReceiver int + stopDispatcher bool + }{ + { + name: "send", + sendInterval: 0, + numOfReceivers: 20, + numOfSendPerReceiver: 1000, + }, + { + name: "process callbacks on stopping", + sendInterval: time.Nanosecond, // set interval to give time to call stop function + numOfReceivers: 100, + numOfSendPerReceiver: 200, + stopDispatcher: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + // given + cMq := &CallbackerIMock{ + SendFunc: func(url, token string, callback *Callback) {}, + } + + sut := NewCallbackDispatcher(cMq, tc.sendInterval) + + var receivers []string + for i := range tc.numOfReceivers { + receivers = append(receivers, fmt.Sprintf("url_%d", i)) + } + + // when + // send callbacks to receiver + wg := &sync.WaitGroup{} + for range tc.numOfSendPerReceiver { + wg.Add(1) + go func() { + for _, url := range receivers { + sut.Send(url, "", nil) + } + wg.Done() + }() + } + wg.Wait() + + if tc.stopDispatcher { + sut.GracefulStop() + } else { + // give a chance to process + time.Sleep(50 * time.Millisecond) + } + + // then + require.Equal(t, tc.numOfReceivers, len(sut.managers)) + require.Equal(t, tc.numOfReceivers*tc.numOfSendPerReceiver, len(cMq.SendCalls())) + }) + } +} + +func Test_sendManager(t *testing.T) { + tcs := []struct { + name string + sendInterval time.Duration + numOfSends int + stopManager bool + }{ + { + name: "send callbacks when run", + sendInterval: 0, + numOfSends: 100, + }, + { + name: "send callbacks on stopping", + sendInterval: time.Millisecond, // set interval to give time to call stop function + numOfSends: 10, + stopManager: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + // given + cMq := &CallbackerIMock{ + SendFunc: func(url, token string, callback *Callback) {}, + } + + sut := &sendManager{ + url: "", + c: cMq, + sleep: tc.sendInterval, + + ch: make(chan *callbackEntry), + } + + // add callbacks before starting the manager to queue them + for range tc.numOfSends { + sut.Add("", nil) + } + + // when + sut.run() + + if tc.stopManager { + sut.GracefulStop() + } else { + // give a chance to process + time.Sleep(5 * time.Millisecond) + } + + // then + require.Len(t, cMq.SendCalls(), tc.numOfSends) + }) + } +} diff --git a/internal/callbacker/sender.go b/internal/callbacker/sender.go index 573cdca7b..72bcf9553 100644 --- a/internal/callbacker/sender.go +++ b/internal/callbacker/sender.go @@ -23,7 +23,6 @@ type HttpClient interface { type CallbackSender struct { httpClient HttpClient - wg sync.WaitGroup mu sync.Mutex disposed bool stats *stats @@ -48,7 +47,7 @@ func NewSender(httpClient HttpClient, logger *slog.Logger) (*CallbackSender, err callbacker := &CallbackSender{ httpClient: httpClient, stats: stats, - logger: logger.With(slog.String("module", "callbacker")), + logger: logger.With(slog.String("module", "sender")), } return callbacker, nil @@ -59,12 +58,11 @@ func (p *CallbackSender) GracefulStop() { defer p.mu.Unlock() if p.disposed { - p.logger.Info("Callbacker is already stopped") + p.logger.Info("Sender is already stopped") return } - p.logger.Info("Stopping callbacker") - p.wg.Wait() + p.logger.Info("Stopping Sender") unregisterStats( p.stats.callbackSeenOnNetworkCount, @@ -76,7 +74,7 @@ func (p *CallbackSender) GracefulStop() { ) p.disposed = true - p.logger.Info("Stopped Callbacker") + p.logger.Info("Stopped Sender") } func (p *CallbackSender) Health() error { @@ -91,24 +89,20 @@ func (p *CallbackSender) Health() error { } func (p *CallbackSender) Send(url, token string, dto *Callback) { - p.wg.Add(1) - go func() { - defer p.wg.Done() - - ok := p.sendCallbackWithRetries(url, token, dto) - - if ok { - p.updateSuccessStats(dto.TxStatus) - } else { - p.logger.Warn("Couldn't send transaction callback after retries", - slog.String("url", url), - slog.String("token", token), - slog.String("hash", dto.Txid), - slog.Int("retries", retries)) - - p.stats.callbackFailedCount.Inc() - } - }() + ok := p.sendCallbackWithRetries(url, token, dto) + + if ok { + p.updateSuccessStats(dto.TxStatus) + return + } + + p.logger.Warn("Couldn't send transaction callback after retries", + slog.String("url", url), + slog.String("token", token), + slog.String("hash", dto.Txid), + slog.Int("retries", retries)) + + p.stats.callbackFailedCount.Inc() } func (p *CallbackSender) sendCallbackWithRetries(url, token string, dto *Callback) bool { diff --git a/internal/callbacker/server.go b/internal/callbacker/server.go index f2a98f1fa..9f0121f6b 100644 --- a/internal/callbacker/server.go +++ b/internal/callbacker/server.go @@ -36,7 +36,7 @@ func WithLogger(logger *slog.Logger) func(*Server) { func NewServer(callbacker CallbackerI, opts ...ServerOption) *Server { server := &Server{ callbacker: callbacker, - logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})).With(slog.String("service", "callbacker")), + logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})).With(slog.String("module", "server")), } for _, opt := range opts { @@ -80,7 +80,7 @@ func (s *Server) Serve(address string, grpcMessageSize int, prometheusEndpoint s return nil } -func (s *Server) Shutdown() { +func (s *Server) GracefulStop() { s.logger.Info("Shutting down") s.grpcServer.GracefulStop() diff --git a/pkg/api/handler/internal/TxFinder/tx_finder_test.go b/pkg/api/handler/internal/TxFinder/tx_finder_test.go index dd8645c4c..28e2eb9d1 100644 --- a/pkg/api/handler/internal/TxFinder/tx_finder_test.go +++ b/pkg/api/handler/internal/TxFinder/tx_finder_test.go @@ -139,7 +139,7 @@ func transactionHandler(_ *testing.T, thResponse func() ([]*metamorph.Transactio return &mq } -func peerRpcConfig(t *testing.T) *config.PeerRpcConfig { +func peerRpcConfig(_ *testing.T) *config.PeerRpcConfig { return &config.PeerRpcConfig{} } diff --git a/test/submit_single_test.go b/test/submit_single_test.go index 1ea3c7517..f7f392e6f 100644 --- a/test/submit_single_test.go +++ b/test/submit_single_test.go @@ -4,13 +4,14 @@ import ( "encoding/hex" "encoding/json" "fmt" - sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "net/http" "os" "strconv" "testing" "time" + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" + "github.com/libsv/go-bc" "github.com/stretchr/testify/require" ) @@ -212,91 +213,142 @@ func TestCallback(t *testing.T) { tt := []struct { name string + numberOfTxs int numberOfCallbackServers int attemptMultipleSubmissions bool }{ { name: "post transaction with one callback", + numberOfTxs: 1, numberOfCallbackServers: 1, }, { name: "post transaction with multiple callbacks", - numberOfCallbackServers: 2, + numberOfTxs: 1, + numberOfCallbackServers: 10, }, { name: "post transaction with one callback - multiple submissions", + numberOfTxs: 1, numberOfCallbackServers: 1, attemptMultipleSubmissions: true, }, + { + name: "post transactions with one callback", + numberOfTxs: 10, + numberOfCallbackServers: 1, + }, + { + name: "post transactions with multiple callback", + numberOfTxs: 10, + numberOfCallbackServers: 10, + }, + } + + type callbackServer struct { + url, token string + responseChan chan *TransactionResponse + errChan chan error } for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - address, privateKey := fundNewWallet(t) + // given + + // setup callback servers + const callbacksNumber = 2 // cannot be greater than 5 + + callbackServers := make([]*callbackServer, 0, tc.numberOfCallbackServers) + + for range tc.numberOfCallbackServers { + callbackReceivedChan, callbackErrChan, calbackResponseFn := prepareCallback(t, callbacksNumber) + callbackUrl, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, calbackResponseFn) + defer shutdown() + + callbackServers = append(callbackServers, &callbackServer{ + url: callbackUrl, + token: token, + responseChan: callbackReceivedChan, + errChan: callbackErrChan, + }) + } + + // create transactions + address, privateKey := getNewWalletAddress(t) + for i := range tc.numberOfTxs { + sendToAddress(t, address, float64(10+i)) + } + generate(t, 1) utxos := getUtxos(t, address) - require.True(t, len(utxos) > 0, "No UTXOs available for the address") + require.True(t, len(utxos) >= tc.numberOfTxs, "Insufficient UTXOs available for the address") - tx, err := createTx(privateKey, address, utxos[0]) - require.NoError(t, err) + txs := make([]*sdkTx.Transaction, 0, tc.numberOfTxs) + for i := range tc.numberOfTxs { + tx, err := createTx(privateKey, address, utxos[i]) + require.NoError(t, err) - const callbackNumbers = 2 // cannot be greater than 5 - callbackReceivedChannels := make([]chan *TransactionResponse, tc.numberOfCallbackServers) - callbackErrChannels := make([]chan error, tc.numberOfCallbackServers) - callbackShutdowns := make([]func(), tc.numberOfCallbackServers) + txs = append(txs, tx) + } - for i := 0; i < tc.numberOfCallbackServers; i++ { - callbackReceivedChan, callbackErrChan, calbackResponseFn := prepareCallback(t, callbackNumbers) - callbackUrl, token, shutdown := startCallbackSrv(t, callbackReceivedChan, callbackErrChan, calbackResponseFn) - callbackReceivedChannels[i] = callbackReceivedChan - callbackErrChannels[i] = callbackErrChan - callbackShutdowns[i] = shutdown - - testTxSubmission(t, callbackUrl, token, tx) - // This is to test the multiple submissions with the same callback URL and token - // Expected behavior is that the callback should not be added to tx and the server should receive the callback only once - if tc.attemptMultipleSubmissions { - testTxSubmission(t, callbackUrl, token, tx) + // when + + // submit transactions + for _, tx := range txs { + for _, callbackSrv := range callbackServers { + testTxSubmission(t, callbackSrv.url, callbackSrv.token, tx) + // This is to test the multiple submissions with the same callback URL and token + // Expected behavior is that the callback should not be added to tx and the server should receive the callback only once + if tc.attemptMultipleSubmissions { + testTxSubmission(t, callbackSrv.url, callbackSrv.token, tx) + } } + } - defer func() { - for _, shutdown := range callbackShutdowns { - shutdown() - } - }() + // mine trasactions + generate(t, 2) - generate(t, 10) + // then - statusUrl := fmt.Sprintf("%s/%s", arcEndpointV1Tx, tx.TxID()) - statusResp := getRequest[TransactionResponse](t, statusUrl) + // verify callbacks were received correctly + for i, srv := range callbackServers { + t.Logf("listen callbacks on server %s", srv.url) - require.NotNil(t, statusResp.MerklePath) - _, err = bc.NewBUMPFromStr(*statusResp.MerklePath) - require.NoError(t, err) + expectedTxsCallbacks := make(map[string]int) // key: txID, value: number of received callbacks + for _, tx := range txs { + expectedTxsCallbacks[tx.TxID()] = 0 + } - for i := 0; i < tc.numberOfCallbackServers; i++ { - t.Logf("callback %d", i) - for j := 0; j < callbackNumbers; j++ { - t.Logf("callback iteration %d", j) - callbackTimeout := time.After(time.Second * time.Duration(j+1) * 2 * 5) + expectedCallbacksNumber := callbacksNumber * tc.numberOfTxs + for j := 0; j < expectedCallbacksNumber; j++ { + callbackTimeout := time.After(30 * time.Second) select { - case callback := <-callbackReceivedChannels[i]: + case callback := <-srv.responseChan: require.NotNil(t, callback) - t.Logf("Callback %d iteration %d result: %s", i, j, callback.TxStatus) - require.Equal(t, statusResp.Txid, callback.Txid) - require.Equal(t, *statusResp.BlockHeight, *callback.BlockHeight) - require.Equal(t, *statusResp.BlockHash, *callback.BlockHash) + t.Logf("callback server %d iteration %d, txid: %s result: %s", i, j, callback.Txid, callback.TxStatus) + + visitNumber, expectedTx := expectedTxsCallbacks[callback.Txid] + require.True(t, expectedTx) + visitNumber++ + expectedTxsCallbacks[callback.Txid] = visitNumber + + if visitNumber == callbacksNumber { + delete(expectedTxsCallbacks, callback.Txid) // remove after receiving expected callbacks + } + require.Equal(t, Status_MINED, callback.TxStatus) - case err := <-callbackErrChannels[i]: - t.Fatalf("callback %d received - failed to parse callback %v", i, err) + case err := <-srv.errChan: + t.Fatalf("callback server %d received - failed to parse %d callback %v", i, j, err) case <-callbackTimeout: - t.Fatalf("callback %d not received - timeout", i) + t.Fatalf("callback server %d not received %d callback - timeout", i, j) } } + + require.Empty(t, expectedTxsCallbacks) // ensure all expected callbacks were received } }) } diff --git a/test/utils.go b/test/utils.go index 900e5e4cb..17a1685c5 100644 --- a/test/utils.go +++ b/test/utils.go @@ -12,6 +12,7 @@ import ( "math/rand" "net/http" "os" + "sync" "testing" "time" @@ -389,7 +390,7 @@ func startCallbackSrv(t *testing.T, receivedChan chan *TransactionResponse, errC srv := &http.Server{Addr: ":9000"} shutdownFn = func() { - t.Log("shutting down callback listener") + t.Logf("shutting down callback listener %s", callbackUrl) close(receivedChan) close(errChan) @@ -400,7 +401,7 @@ func startCallbackSrv(t *testing.T, receivedChan chan *TransactionResponse, errC } go func(server *http.Server) { - t.Log("starting callback server") + t.Logf("starting callback server %s", callbackUrl) err := server.ListenAndServe() if err != nil { return @@ -453,22 +454,24 @@ func testTxSubmission(t *testing.T, callbackUrl string, token string, tx *sdkTx. } func prepareCallback(t *testing.T, callbackNumbers int) (chan *TransactionResponse, chan error, callbackResponseFn) { - callbackReceivedChan := make(chan *TransactionResponse, callbackNumbers) // do not block callback server responses - callbackErrChan := make(chan error, callbackNumbers) - callbackIteration := 0 + callbackReceivedChan := make(chan *TransactionResponse, 100) // do not block callback server responses + callbackErrChan := make(chan error, 100) - calbackResponseFn := func(w http.ResponseWriter, rc chan *TransactionResponse, ec chan error, status *TransactionResponse) { - callbackIteration++ + responseVisitMap := make(map[string]int) + mu := &sync.Mutex{} - // Let ARC send the callback few times. Respond with success on the last one. + calbackResponseFn := func(w http.ResponseWriter, rc chan *TransactionResponse, ec chan error, status *TransactionResponse) { + mu.Lock() + callbackNumber := responseVisitMap[status.Txid] + callbackNumber++ + responseVisitMap[status.Txid] = callbackNumber + mu.Unlock() + // Let ARC send the same callback few times. Respond with success on the last one. respondWithSuccess := false - if callbackIteration < callbackNumbers { - t.Logf("%d callback received, responding bad request", callbackIteration) + if callbackNumber < callbackNumbers { respondWithSuccess = false } else { - t.Logf("callback interation: %v", callbackIteration) - t.Logf("%d callback received, responding success", callbackIteration) respondWithSuccess = true }