Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Sep 18, 2024
1 parent 2dfbdba commit e94eaa0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 10 deletions.
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)

w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)

if params.storeFactory != nil {
w.storeFactory = params.storeFactory
Expand Down
14 changes: 14 additions & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)

// Default UserAgent
Expand Down Expand Up @@ -94,6 +95,8 @@ type WakuNodeParameters struct {
enableStore bool
messageProvider legacy_store.MessageProvider

storeRateLimit rate.Limit

enableRendezvousPoint bool
rendezvousDB *rendezvous.DB

Expand Down Expand Up @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
WithWakuStoreRateLimit(8), // Value currently set in status.staging
}

// MultiAddresses return the list of multiaddresses configured in the node
Expand Down Expand Up @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
}
}

// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
// the store protocol from a storenode
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.storeRateLimit = value
return nil
}
}

// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider.
func WithWakuStore() WakuNodeOption {
Expand Down
19 changes: 10 additions & 9 deletions waku/v2/protocol/store/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const MaxPageSize = 100
// DefaultPageSize is the default number of waku messages per page
const DefaultPageSize = 20

const maxQueriesPerSecond = 8

const ok = uint32(200)

var (
Expand Down Expand Up @@ -68,19 +66,22 @@ func (e *StoreError) Error() string {

// WakuStore represents an instance of a store client
type WakuStore struct {
h host.Host
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager
rateLimiters map[peer.ID]*rate.Limiter
h host.Host
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager

defaultRatelimit rate.Limit
rateLimiters map[peer.ID]*rate.Limiter
}

// NewWakuStore is used to instantiate a StoreV3 client
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
s := new(WakuStore)
s.log = log.Named("store-client")
s.timesource = timesource
s.pm = pm
s.defaultRatelimit = defaultRatelimit
s.rateLimiters = make(map[peer.ID]*rate.Limiter)

if pm != nil {
Expand Down Expand Up @@ -269,7 +270,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
if !params.skipRatelimit {
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
if !ok {
rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1)
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
s.rateLimiters[params.selectedPeer] = rateLimiter
}
err := rateLimiter.Wait(ctx)
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func IncludeData(v bool) RequestOption {
}
}

// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
func SkipRateLimit() RequestOption {
return func(params *Parameters) error {
params.skipRatelimit = true
Expand Down

0 comments on commit e94eaa0

Please sign in to comment.