Skip to content

Commit

Permalink
[KS-420] Add rate-limiting to Dispatcher (#14239)
Browse files Browse the repository at this point in the history
* [KS-420] Add rate-limiting to Dispatcher

* Implements rate limiter on dispatcher

* Improves TOML config

* Updates toml config docs

* Fixes config_capabilities.go

* Adds changeset

* Updates txtar files

* Fixes tests + updates docs

* Renames interface methods

* Fixes CI

* Fixes CI

---------

Co-authored-by: vyzaldysanchez <[email protected]>
  • Loading branch information
bolekk and vyzaldysanchez authored Aug 29, 2024
1 parent 81733d9 commit 674eac3
Show file tree
Hide file tree
Showing 26 changed files with 435 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/calm-laws-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Implements rate limiter for capabilities dispatcher
49 changes: 32 additions & 17 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package remote

import (
"context"
"errors"
"fmt"
sync "sync"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

Expand All @@ -25,11 +27,13 @@ var (

// dispatcher en/decodes messages and routes traffic between peers and capabilities
type dispatcher struct {
cfg config.Dispatcher
peerWrapper p2ptypes.PeerWrapper
peer p2ptypes.Peer
peerID p2ptypes.PeerID
signer p2ptypes.Signer
registry core.CapabilitiesRegistry
rateLimiter *common.RateLimiter
receivers map[key]*receiver
mu sync.RWMutex
stopCh services.StopChan
Expand All @@ -44,17 +48,26 @@ type key struct {

var _ services.Service = &dispatcher{}

const supportedVersion = 1

func NewDispatcher(peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) *dispatcher {
func NewDispatcher(cfg config.Dispatcher, peerWrapper p2ptypes.PeerWrapper, signer p2ptypes.Signer, registry core.CapabilitiesRegistry, lggr logger.Logger) (*dispatcher, error) {
rl, err := common.NewRateLimiter(common.RateLimiterConfig{
GlobalRPS: cfg.RateLimit().GlobalRPS(),
GlobalBurst: cfg.RateLimit().GlobalBurst(),
PerSenderRPS: cfg.RateLimit().PerSenderRPS(),
PerSenderBurst: cfg.RateLimit().PerSenderBurst(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to create rate limiter")
}
return &dispatcher{
cfg: cfg,
peerWrapper: peerWrapper,
signer: signer,
registry: registry,
rateLimiter: rl,
receivers: make(map[key]*receiver),
stopCh: make(services.StopChan),
lggr: lggr.Named("Dispatcher"),
}
}, nil
}

func (d *dispatcher) Start(ctx context.Context) error {
Expand Down Expand Up @@ -85,14 +98,12 @@ var capReceiveChannelUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Help: "The usage of the receive channel for each capability, 0 indicates empty, 1 indicates full.",
}, []string{"capabilityId", "donId"})

const receiverBufferSize = 10000

type receiver struct {
cancel context.CancelFunc
ch chan *remotetypes.MessageBody
ch chan *types.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotetypes.Receiver) error {
func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
Expand All @@ -101,7 +112,7 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec remotety
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId)
}

receiverCh := make(chan *remotetypes.MessageBody, receiverBufferSize)
receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize())

ctx, cancelCtx := d.stopCh.NewCtx()
d.wg.Add(1)
Expand Down Expand Up @@ -139,8 +150,8 @@ func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) {
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
msgBody.Version = supportedVersion
func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error {
msgBody.Version = uint32(d.cfg.SupportedVersion())
msgBody.Sender = d.peerID[:]
msgBody.Receiver = peerID[:]
msgBody.Timestamp = time.Now().UnixMilli()
Expand All @@ -152,7 +163,7 @@ func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBo
if err != nil {
return err
}
msg := &remotetypes.Message{Signature: signature, Body: rawBody}
msg := &types.Message{Signature: signature, Body: rawBody}
rawMsg, err := proto.Marshal(msg)
if err != nil {
return err
Expand All @@ -168,6 +179,10 @@ func (d *dispatcher) receive() {
d.lggr.Info("stopped - exiting receive")
return
case msg := <-recvCh:
if !d.rateLimiter.Allow(msg.Sender.String()) {
d.lggr.Debugw("rate limit exceeded, dropping message", "sender", msg.Sender)
continue
}
body, err := ValidateMessage(msg, d.peerID)
if err != nil {
d.lggr.Debugw("received invalid message", "error", err)
Expand All @@ -184,7 +199,7 @@ func (d *dispatcher) receive() {
continue
}

receiverQueueUsage := float64(len(receiver.ch)) / receiverBufferSize
receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize())
capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
Expand All @@ -195,7 +210,7 @@ func (d *dispatcher) receive() {
}
}

func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *remotetypes.MessageBody, errType types.Error) {
func (d *dispatcher) tryRespondWithError(peerID p2ptypes.PeerID, body *types.MessageBody, errType types.Error) {
if body == nil {
return
}
Expand Down
80 changes: 76 additions & 4 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
Expand All @@ -32,6 +33,47 @@ func (r *testReceiver) Receive(_ context.Context, msg *remotetypes.MessageBody)
r.ch <- msg
}

type testRateLimitConfig struct {
globalRPS float64
globalBurst int
rps float64
burst int
}

func (c testRateLimitConfig) GlobalRPS() float64 {
return c.globalRPS
}

func (c testRateLimitConfig) GlobalBurst() int {
return c.globalBurst
}

func (c testRateLimitConfig) PerSenderRPS() float64 {
return c.rps
}

func (c testRateLimitConfig) PerSenderBurst() int {
return c.burst
}

type testConfig struct {
supportedVersion int
receiverBufferSize int
rateLimit testRateLimitConfig
}

func (c testConfig) SupportedVersion() int {
return c.supportedVersion
}

func (c testConfig) ReceiverBufferSize() int {
return c.receiverBufferSize
}

func (c testConfig) RateLimit() config.DispatcherRateLimit {
return c.rateLimit
}

func TestDispatcher_CleanStartClose(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
Expand All @@ -44,7 +86,17 @@ func TestDispatcher_CleanStartClose(t *testing.T) {
signer := mocks.NewSigner(t)
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
dispatcher, err := remote.NewDispatcher(testConfig{
supportedVersion: 1,
receiverBufferSize: 10000,
rateLimit: testRateLimitConfig{
globalRPS: 800.0,
globalBurst: 100,
rps: 10.0,
burst: 50,
},
}, wrapper, signer, registry, lggr)
require.NoError(t, err)
require.NoError(t, dispatcher.Start(ctx))
require.NoError(t, dispatcher.Close())
}
Expand All @@ -65,11 +117,21 @@ func TestDispatcher_Receive(t *testing.T) {
signer.On("Sign", mock.Anything).Return(nil, errors.New("not implemented"))
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
dispatcher, err := remote.NewDispatcher(testConfig{
supportedVersion: 1,
receiverBufferSize: 10000,
rateLimit: testRateLimitConfig{
globalRPS: 800.0,
globalBurst: 100,
rps: 10.0,
burst: 50,
},
}, wrapper, signer, registry, lggr)
require.NoError(t, err)
require.NoError(t, dispatcher.Start(ctx))

rcv := newReceiver()
err := dispatcher.SetReceiver(capId1, donId1, rcv)
err = dispatcher.SetReceiver(capId1, donId1, rcv)
require.NoError(t, err)

// supported capability
Expand Down Expand Up @@ -113,7 +175,17 @@ func TestDispatcher_RespondWithError(t *testing.T) {
signer.On("Sign", mock.Anything).Return([]byte{}, nil)
registry := commonMocks.NewCapabilitiesRegistry(t)

dispatcher := remote.NewDispatcher(wrapper, signer, registry, lggr)
dispatcher, err := remote.NewDispatcher(testConfig{
supportedVersion: 1,
receiverBufferSize: 10000,
rateLimit: testRateLimitConfig{
globalRPS: 800.0,
globalBurst: 100,
rps: 10.0,
burst: 50,
},
}, wrapper, signer, registry, lggr)
require.NoError(t, err)
require.NoError(t, dispatcher.Start(ctx))

// unknown capability
Expand Down
1 change: 1 addition & 0 deletions core/config/capabilities_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ type CapabilitiesExternalRegistry interface {

type Capabilities interface {
Peering() P2P
Dispatcher() Dispatcher
ExternalRegistry() CapabilitiesExternalRegistry
}
14 changes: 14 additions & 0 deletions core/config/dispatcher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

type DispatcherRateLimit interface {
GlobalRPS() float64
GlobalBurst() int
PerSenderRPS() float64
PerSenderBurst() int
}

type Dispatcher interface {
SupportedVersion() int
ReceiverBufferSize() int
RateLimit() DispatcherRateLimit
}
16 changes: 16 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,22 @@ NetworkID = 'evm' # Default
# ChainID identifies the target chain id where the remote registry is located.
ChainID = '1' # Default

[Capabilities.Dispatcher]
# SupportedVersion is the version of the version of message schema.
SupportedVersion = 1 # Default
# ReceiverBufferSize is the size of the buffer for incoming messages.
ReceiverBufferSize = 10000 # Default

[Capabilities.Dispatcher.RateLimit]
# GlobalRPS is the global rate limit for the dispatcher.
GlobalRPS = 800 # Default
# GlobalBurst is the global burst limit for the dispatcher.
GlobalBurst = 1000 # Default
# PerSenderRPS is the per-sender rate limit for the dispatcher.
PerSenderRPS = 10 # Default
# PerSenderBurst is the per-sender burst limit for the dispatcher.
PerSenderBurst = 50 # Default

[Capabilities.Peering]
# IncomingMessageBufferSize is the per-remote number of incoming
# messages to buffer. Any additional messages received on top of those
Expand Down
42 changes: 42 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,14 +1438,56 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) {
}
}

type Dispatcher struct {
SupportedVersion *int
ReceiverBufferSize *int
RateLimit DispatcherRateLimit
}

func (d *Dispatcher) setFrom(f *Dispatcher) {
d.RateLimit.setFrom(&f.RateLimit)

if f.ReceiverBufferSize != nil {
d.ReceiverBufferSize = f.ReceiverBufferSize
}

if f.SupportedVersion != nil {
d.SupportedVersion = f.SupportedVersion
}
}

type DispatcherRateLimit struct {
GlobalRPS *float64
GlobalBurst *int
PerSenderRPS *float64
PerSenderBurst *int
}

func (drl *DispatcherRateLimit) setFrom(f *DispatcherRateLimit) {
if f.GlobalRPS != nil {
drl.GlobalRPS = f.GlobalRPS
}
if f.GlobalBurst != nil {
drl.GlobalBurst = f.GlobalBurst
}
if f.PerSenderRPS != nil {
drl.PerSenderRPS = f.PerSenderRPS
}
if f.PerSenderBurst != nil {
drl.PerSenderBurst = f.PerSenderBurst
}
}

type Capabilities struct {
Peering P2P `toml:",omitempty"`
Dispatcher Dispatcher `toml:",omitempty"`
ExternalRegistry ExternalRegistry `toml:",omitempty"`
}

func (c *Capabilities) setFrom(f *Capabilities) {
c.Peering.setFrom(&f.Peering)
c.ExternalRegistry.setFrom(&f.ExternalRegistry)
c.Dispatcher.setFrom(&f.Dispatcher)
}

type ThresholdKeyShareSecrets struct {
Expand Down
5 changes: 4 additions & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer
remoteDispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)
remoteDispatcher, err := remote.NewDispatcher(cfg.Capabilities().Dispatcher(), externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)
if err != nil {
return nil, fmt.Errorf("could not create dispatcher: %w", err)
}
dispatcher = remoteDispatcher
} else {
dispatcher = opts.CapabilitiesDispatcher
Expand Down
Loading

0 comments on commit 674eac3

Please sign in to comment.