Skip to content

Commit

Permalink
Merge branch 'master' into ServiceSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 authored Aug 30, 2023
2 parents 63b7be5 + 4c52149 commit fb7be03
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 134 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Thank you for considering to help out with the source code! We welcome contribut
If you'd like to contribute to go-waku, please fork, fix, commit and send a pull request. If you wish to submit more complex changes though, please check up with the core devs first to ensure those changes are in line with the general philosophy of the project and/or get some early feedback which can make both your efforts much lighter as well as our review and merge procedures quick and simple.

To build and test this repository, you need:
- [Go](https://golang.org/) (version 1.17 or later)
- [Go](https://golang.org/) (version 1.19 or 1.20)
- [protoc](https://grpc.io/docs/protoc-installation/)
- [protoc-gen-go](https://protobuf.dev/getting-started/gotutorial/#compiling-protocol-buffers)

Expand Down
81 changes: 66 additions & 15 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ type PeerConnector interface {
}

type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
peerChannel *peerChannel

peerConnector PeerConnector
peerCh chan peermanager.PeerData
NAT nat.Interface

log *zap.Logger
Expand Down Expand Up @@ -134,6 +134,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
peerChannel: &peerChannel{},
localnode: localnode,
metrics: newMetrics(reg),
config: discover.Config{
Expand Down Expand Up @@ -194,6 +195,50 @@ func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h
}

type peerChannel struct {
mutex sync.Mutex
channel chan peermanager.PeerData
started bool
ctx context.Context
}

func (p *peerChannel) Start(ctx context.Context) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.started = true
p.ctx = ctx
p.channel = make(chan peermanager.PeerData)
}

func (p *peerChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return
}
p.started = false
close(p.channel)
}

func (p *peerChannel) Subscribe() chan peermanager.PeerData {
return p.channel
}

func (p *peerChannel) Publish(peer peermanager.PeerData) bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return false
}
select {
case p.channel <- peer:
case <-p.ctx.Done():
return false

}
return true
}

// only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error {
// compare and swap sets the discovery v5 to `started` state
Expand All @@ -205,8 +250,8 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel

d.peerCh = make(chan peermanager.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)
d.peerChannel.Start(ctx)
d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe())

err := d.listen(ctx)
if err != nil {
Expand Down Expand Up @@ -249,7 +294,13 @@ func (d *DiscoveryV5) Stop() {

d.wg.Wait()

close(d.peerCh)
defer func() {
if r := recover(); r != nil {
d.log.Info("recovering from panic and quitting")
}
}()

d.peerChannel.Stop()
}

/*
Expand Down Expand Up @@ -433,10 +484,10 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
ENR: n,
}

select {
case d.peerCh <- peer:
case <-ctx.Done():
return nil
if d.peerChannel.Publish(peer) {
d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
} else {
d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
}

return nil
Expand Down
5 changes: 1 addition & 4 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,17 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {

//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
maxOutPeers := int(w.peermanager.OutRelayPeersTarget)

// Setup peer connection strategy
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Minute, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))

w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, maxOutPeers, discoveryConnectTimeout, bkf, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, w.peermanager, discoveryConnectTimeout, bkf, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
w.peermanager.SetPeerConnector(w.peerConnector)

if w.opts.enableDiscV5 {
err := w.mountDiscV5()
Expand Down Expand Up @@ -400,7 +398,6 @@ func (w *WakuNode) Start(ctx context.Context) error {

w.peerConnector.SetHost(host)
w.peermanager.SetHost(host)
w.peerConnector.SetPeerManager(w.peermanager)
err = w.peerConnector.Start(ctx)
if err != nil {
return err
Expand Down
Loading

0 comments on commit fb7be03

Please sign in to comment.