Skip to content

Commit

Permalink
Merge branch 'master' into feat/autosharding-config
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Aug 31, 2023
2 parents d69f8f9 + 229fb7a commit 30c988f
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 205 deletions.
11 changes: 11 additions & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins:
golint:
enabled: true
gofmt:
enabled: true
govet:
enabled: true
# golangci-lint:
#enabled: true
exclude_patterns:
- "."
47 changes: 0 additions & 47 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@ import (
"fmt"
"io"
"net"
"sync"
"testing"

"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
Expand Down Expand Up @@ -140,47 +137,3 @@ func RandomHex(n int) (string, error) {
}
return hex.EncodeToString(bytes), nil
}

type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan peermanager.PeerData
}

func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan peermanager.PeerData, 10),
}

return result
}

func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) {
go func() {
for p := range ch {
t.Lock()
t.peerMap[p.AddrInfo.ID] = struct{}{}
t.Unlock()
}
}()
}

func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool {
t.RLock()
defer t.RUnlock()
_, ok := t.peerMap[p]
return ok
}

func (t *TestPeerDiscoverer) PeerCount() int {
t.RLock()
defer t.RUnlock()
return len(t.peerMap)
}

func (t *TestPeerDiscoverer) Clear() {
t.Lock()
defer t.Unlock()
t.peerMap = make(map[peer.ID]struct{})
}
7 changes: 4 additions & 3 deletions waku/v2/discv5/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"

"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -108,7 +109,7 @@ func TestDiscV5(t *testing.T) {
ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn1 := tests.NewTestPeerDiscoverer()
peerconn1 := peermanager.NewTestPeerDiscoverer()
d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)
Expand All @@ -120,7 +121,7 @@ func TestDiscV5(t *testing.T) {
require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn2 := tests.NewTestPeerDiscoverer()
peerconn2 := peermanager.NewTestPeerDiscoverer()
d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err)
d2.SetHost(host2)
Expand All @@ -132,7 +133,7 @@ func TestDiscV5(t *testing.T) {
require.NoError(t, err)
l3, err := newLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
peerconn3 := tests.NewTestPeerDiscoverer()
peerconn3 := peermanager.NewTestPeerDiscoverer()
d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err)
d3.SetHost(host3)
Expand Down
9 changes: 1 addition & 8 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
Expand Down Expand Up @@ -254,13 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)

// Setup peer connection strategy
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Minute, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))

w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, w.peermanager, discoveryConnectTimeout, bkf, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
Expand Down
15 changes: 5 additions & 10 deletions waku/v2/peermanager/connection_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason
return true, 0
}

// NotifyDisconnect is called when a connection disconnects.
func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) {
ip, err := manet.ToIP(addr)
if err != nil {
Expand Down Expand Up @@ -111,16 +112,10 @@ func (c *ConnectionGater) validateInboundConn(addr multiaddr.Multiaddr) bool {
c.Lock()
defer c.Unlock()

currConnections, ok := c.limiter[ip.String()]
if !ok {
c.limiter[ip.String()] = 1
return true
} else {
if currConnections+1 > maxConnsPerIP {
return false
}

c.limiter[ip.String()]++
if currConnections := c.limiter[ip.String()]; currConnections+1 > maxConnsPerIP {
return false
}

c.limiter[ip.String()]++
return true
}
56 changes: 56 additions & 0 deletions waku/v2/peermanager/mock_peer_discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package peermanager

import (
"context"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
)

// TestPeerDiscoverer is mock peer discoverer for testing
type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
}

// NewTestPeerDiscoverer is a constructor for TestPeerDiscoverer
func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
}

return result
}

// Subscribe is for subscribing to peer discoverer
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan PeerData) {
go func() {
for p := range ch {
t.Lock()
t.peerMap[p.AddrInfo.ID] = struct{}{}
t.Unlock()
}
}()
}

// HasPeer is for checking if a peer is present in peer discoverer
func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool {
t.RLock()
defer t.RUnlock()
_, ok := t.peerMap[p]
return ok
}

// PeerCount is for getting the number of peers in peer discoverer
func (t *TestPeerDiscoverer) PeerCount() int {
t.RLock()
defer t.RUnlock()
return len(t.peerMap)
}

// Clear is for clearing the peer discoverer
func (t *TestPeerDiscoverer) Clear() {
t.Lock()
defer t.Unlock()
t.peerMap = make(map[peer.ID]struct{})
}
22 changes: 15 additions & 7 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package peermanager
import (
"context"
"errors"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -53,27 +54,34 @@ type PeerConnectionStrategy struct {
logger *zap.Logger
}

// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func getBackOff() backoff.BackoffFactory {
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Minute, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
return bkf
}

// NewPeerConnectionStrategy creates a utility to connect to peers,
// but only if we have not recently tried connecting to them already.
//
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(cacheSize int, pm *PeerManager,
dialTimeout time.Duration, backoff backoff.BackoffFactory,
logger *zap.Logger) (*PeerConnectionStrategy, error) {

func NewPeerConnectionStrategy(pm *PeerManager,
dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) {
// cacheSize is the size of a TwoQueueCache
cacheSize := 600
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
}
//
pc := &PeerConnectionStrategy{
cache: cache,
wg: sync.WaitGroup{},
dialTimeout: dialTimeout,
pm: pm,
backoff: backoff,
backoff: getBackOff(),
logger: logger.Named("discovery-connector"),
}
pm.SetPeerConnector(pc)
Expand Down
8 changes: 4 additions & 4 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"go.uber.org/zap"
)

// TODO: Move all the protocol IDs to a common location.
// WakuRelayIDv200 is protocol ID for Waku v2 relay protocol
// TODO: Move all the protocol IDs to a common location.
const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0")

// PeerManager applies various controls and manage connections towards peers.
Expand Down Expand Up @@ -258,7 +258,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol

//Add Service peers to serviceSlots.
for _, proto := range protocols {
pm.AddPeerToServiceSlot(proto, info.ID)
pm.addPeerToServiceSlot(proto, info.ID)
}

//Add to the peer-store
Expand All @@ -279,10 +279,10 @@ func (pm *PeerManager) RemovePeer(peerID peer.ID) {
pm.serviceSlots.removePeer(peerID)
}

// AddPeerToServiceSlot adds a peerID to serviceSlot.
// addPeerToServiceSlot adds a peerID to serviceSlot.
// Adding to peerStore is expected to be already done by caller.
// If relay proto is passed, it is not added to serviceSlot.
func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
if proto == WakuRelayIDv200 {
pm.logger.Warn("Cannot add Relay peer to service peer slots")
return
Expand Down
Loading

0 comments on commit 30c988f

Please sign in to comment.