Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARCO-182: send callbacks to the same url one by one #563

Merged
merged 9 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ All notable changes to this project will be documented in this file. The format

## [Unreleased]

### Changed
- Callbacks are sent one by one to the same URL. In the previous implementation, each callback request created a new goroutine to send the callback, which could result in a potential DDoS of the callback receiver. The new approach sends callbacks to the same receiver in a serial manner. Note that URLs are not locked by the `callbacker` instance, so serial sends occur only within a single instance. In other words, the level of parallelism is determined by the number of `callbacker` instances.

## [1.3.0] - 2024-08-21

### Changed
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ install_gen:
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]
go install github.com/oapi-codegen/oapi-codegen/v2/cmd/[email protected]
go install github.com/matryer/moq@v0.3.4
go install github.com/matryer/moq@v0.4.0

.PHONY: docs
docs:
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ARC is a transaction processor for Bitcoin that keeps track of the life cycle of
- [ZMQ](#zmq)
- [BlockTx](#blocktx)
- [BlockTx stores](#blocktx-stores)
- [Callbacker](#callbacker)
- [Message Queue](#message-queue)
- [K8s-Watcher](#k8s-watcher)
- [Broadcaster-cli](#broadcaster-cli)
Expand Down Expand Up @@ -92,6 +93,9 @@ where options are:
-k8s-watcher=<true|false>
whether to start k8s-watcher (default=true)

-callbacker=<true|false>
whether to start callbacker (default=true)

-config=/location
directory to look for config.yaml (default='')

Expand Down Expand Up @@ -248,6 +252,17 @@ Metamorph publishes new transactions to the message queue and BlockTx subscribes

![Message Queue](./doc/message_queue.png)

### Callbacker

Callbacker is a microservice that sends callbacks to a specified URL.

Callbacker is designed to be horizontally scalable, with each instance operating independently. As a result, they do not communicate with each other and remain unaware of each other's existence.

You can run callbacker like this:

```shell
go run main.go -callbacker=true
```

## K8s-Watcher

Expand Down
26 changes: 16 additions & 10 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,40 @@ import (
"google.golang.org/grpc/reflection"
)

func StartCallbacker(logger *slog.Logger, config *config.ArcConfig) (func(), error) {
func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), error) {
logger = logger.With(slog.String("service", "callbacker"))
logger.Info("Starting")

callbackSrv, err := callbacker.NewSender(&http.Client{Timeout: 5 * time.Second}, logger)
config := appConfig.Callbacker

callbackSender, err := callbacker.NewSender(&http.Client{Timeout: 5 * time.Second}, logger)
if err != nil {
return nil, fmt.Errorf("callbacker failed: %v", err)
}

srvOpts := []callbacker.ServerOption{
callbacker.WithLogger(logger.With(slog.String("module", "callbacker-server"))),
}
server := callbacker.NewServer(callbackSrv, srvOpts...)
err = server.Serve(config.Callbacker.ListenAddr, config.GrpcMessageSize, config.PrometheusEndpoint)
callbackDispatcher := callbacker.NewCallbackDispatcher(callbackSender, config.Pause)

server := callbacker.NewServer(callbackDispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server"))))
err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint)
if err != nil {
return nil, fmt.Errorf("GRPCServer failed: %v", err)
}

healthServer, err := StartHealthServerCallbacker(server, config.Callbacker.Health, logger)
healthServer, err := StartHealthServerCallbacker(server, config.Health, logger)
if err != nil {
return nil, fmt.Errorf("failed to start health server: %v", err)
}

stopFn := func() {
logger.Info("Shutting down callbacker")

server.Shutdown()
callbackSrv.GracefulStop()
// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. dispatcher - ensure all already accepted callbacks are proccessed
// 3. sender - finally, stop the sender as there are no callbacks left to send.
server.GracefulStop()
callbackDispatcher.GracefulStop()
callbackSender.GracefulStop()

healthServer.Stop()

Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,5 @@ type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
}
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,6 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Pause: 0,
}
}
3 changes: 2 additions & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,5 @@ callbacker:
listenAddr: localhost:8021 # address space for callbacker to listen on. Can be for example localhost:8021 or :8021 for listening on all addresses
dialAddr: localhost:8021 # address for other services to dial callbacker service
health:
serverDialAddr: localhost:8025 # address at which the grpc health server is exposed
serverDialAddr: localhost:8025 # address at which the grpc health server is exposed
pause: 0s # pause between sending next callback to the same receiver
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,16 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand All @@ -400,8 +400,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -423,22 +423,22 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
123 changes: 123 additions & 0 deletions internal/callbacker/callbacker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/callbacker/callbacker_mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package callbacker

// from callbacker.go
//go:generate moq -out ./callbacker_mock.go ./ CallbackerI
Loading
Loading