Skip to content

Commit

Permalink
feat: add option to specify preferred peers for filter
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 24, 2024
1 parent 3546adf commit d39200f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
18 changes: 17 additions & 1 deletion waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Sub struct {
type subscribeParameters struct {
batchInterval time.Duration
multiplexChannelBuffer int
preferredPeers peer.IDSlice
}

type SubscribeOptions func(*subscribeParameters)
Expand All @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
}
}

func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
return func(params *subscribeParameters) {
params.preferredPeers = peers
}
}

// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub)
Expand Down Expand Up @@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
isExcludedPeer := false
for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it.
if p == px {
isExcludedPeer = true
break
}
}
if !isExcludedPeer {
options = append(options, filter.WithPeer(p))
}
}
if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))
Expand Down
11 changes: 10 additions & 1 deletion waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package filter

import (
"context"
"math/rand"
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}

func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
Expand Down Expand Up @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
if len(mgr.params.preferredPeers) > 0 {
//use one peer which is from preferred peers.
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
config.Peers = []peer.ID{randomPreferredPeer}
}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()")
params := subscribeParameters{300 * time.Second, 1024}
params := subscribeParameters{300 * time.Second, 1024, nil}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
Expand Down
18 changes: 16 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
}
reqPeerCount := params.maxPeers - len(params.selectedPeers)
for _, p := range params.selectedPeers {
if params.peersToExclude == nil {
params.peersToExclude = make(peermanager.PeerSet)
}
//exclude peers that are preferredpeers so that they don't get selected again.
if _, ok := params.peersToExclude[p]; !ok {
params.peersToExclude[p] = struct{}{}
}
}

if params.pm != nil && reqPeerCount > 0 {

wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers(
selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
Expand All @@ -350,7 +359,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err
if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
Expand Down

0 comments on commit d39200f

Please sign in to comment.