From bfbc50eb22f30a1a59c85d9843f6d52d121c60a2 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Fri, 25 Aug 2023 14:42:10 +0400 Subject: [PATCH 1/9] fix: missed passing protocols to addPeer (#682) --- waku/v2/peermanager/peer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 77bdb98c9..71cc25865 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -260,7 +260,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol } //Add to the peer-store - err = pm.addPeer(info.ID, info.Addrs, origin) + err = pm.addPeer(info.ID, info.Addrs, origin, protocols...) if err != nil { return "", err } From 8b73eb8ae3070358a7aeb385f2382e9072ccf447 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Fri, 25 Aug 2023 19:36:06 +0400 Subject: [PATCH 2/9] refactor(WakuPeerStore): nit origin (#685) --- waku/v2/peerstore/waku_peer_store.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index e62364bdd..7f5dc88e2 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -41,10 +41,10 @@ type WakuPeerstoreImpl struct { // WakuPeerstore is an interface for implementing WakuPeerStore type WakuPeerstore interface { SetOrigin(p peer.ID, origin Origin) error - Origin(p peer.ID, origin Origin) (Origin, error) + Origin(p peer.ID) (Origin, error) PeersByOrigin(origin Origin) peer.IDSlice SetENR(p peer.ID, enr *enode.Node) error - ENR(p peer.ID, origin Origin) (*enode.Node, error) + ENR(p peer.ID) (*enode.Node, error) AddConnFailure(p peer.AddrInfo) ResetConnFailures(p peer.AddrInfo) ConnFailures(p peer.AddrInfo) int @@ -69,7 +69,7 @@ func (ps *WakuPeerstoreImpl) SetOrigin(p peer.ID, origin Origin) error { } // Origin fetches the origin for a specific peer. -func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) { +func (ps *WakuPeerstoreImpl) Origin(p peer.ID) (Origin, error) { result, err := ps.peerStore.Get(p, peerOrigin) if err != nil { return Unknown, err @@ -79,11 +79,11 @@ func (ps *WakuPeerstoreImpl) Origin(p peer.ID, origin Origin) (Origin, error) { } // PeersByOrigin returns the list of peers for a specific origin -func (ps *WakuPeerstoreImpl) PeersByOrigin(origin Origin) peer.IDSlice { +func (ps *WakuPeerstoreImpl) PeersByOrigin(expectedOrigin Origin) peer.IDSlice { var result peer.IDSlice for _, p := range ps.Peers() { - _, err := ps.Origin(p, origin) - if err == nil { + actualOrigin, err := ps.Origin(p) + if err == nil && actualOrigin == expectedOrigin { result = append(result, p) } } @@ -96,7 +96,7 @@ func (ps *WakuPeerstoreImpl) SetENR(p peer.ID, enr *enode.Node) error { } // ENR fetches the ENR record for a peer -func (ps *WakuPeerstoreImpl) ENR(p peer.ID, origin Origin) (*enode.Node, error) { +func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) { result, err := ps.peerStore.Get(p, peerENR) if err != nil { return nil, err From 041dc4070a1d8e63543bb0b71349c1b769564db9 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 25 Aug 2023 11:40:24 -0400 Subject: [PATCH 3/9] fix(filterV2): requestID and log request type --- waku/v2/protocol/filter/client.go | 5 +++-- waku/v2/protocol/filter/server.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 7a4943a57..6f59c655c 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -254,6 +254,7 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { params := new(FilterUnsubscribeParameters) params.log = wf.log + opts = append(DefaultUnsubscribeOptions(), opts...) for _, opt := range opts { opt(params) } @@ -353,7 +354,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co defer localWg.Done() err := wf.request( ctx, - &FilterSubscribeParameters{selectedPeer: peerID}, + &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, pb.FilterSubscribeRequest_UNSUBSCRIBE, contentFilter) if err != nil { @@ -422,7 +423,7 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte defer localWg.Done() err := wf.request( ctx, - &FilterSubscribeParameters{selectedPeer: peerID}, + &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, ContentFilter{}) if err != nil { diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index c4c7cfb68..60e5d10ad 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -118,7 +118,7 @@ func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stre wf.metrics.RecordRequest(subscribeRequest.FilterSubscribeType.String(), time.Since(start)) - logger.Info("received request") + logger.Info("received request", zap.String("requestType", subscribeRequest.FilterSubscribeType.String())) } } @@ -298,7 +298,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e } else { wf.metrics.RecordError(writeResponseFailure) } - logger.Error("pushing messages to peer", zap.Error(err)) + logger.Error("pushing messages to peer", logging.HexBytes("envelopeHash", env.Hash()), zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err)) wf.subscriptions.FlagAsFailure(peerID) return nil } From 09eb8ed19b40b409de1081d2011b2551905e40ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 28 Aug 2023 01:45:26 -0400 Subject: [PATCH 4/9] fix(discv5): threadsafe peerCh (#687) --- waku/v2/discv5/discover.go | 81 +++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 6ea17f728..0dee3dee8 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -34,16 +34,16 @@ type PeerConnector interface { } type DiscoveryV5 struct { - params *discV5Parameters - host host.Host - config discover.Config - udpAddr *net.UDPAddr - listener *discover.UDPv5 - localnode *enode.LocalNode - metrics Metrics + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + metrics Metrics + peerChannel *peerChannel peerConnector PeerConnector - peerCh chan peermanager.PeerData NAT nat.Interface log *zap.Logger @@ -134,6 +134,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn peerConnector: peerConnector, NAT: NAT, wg: &sync.WaitGroup{}, + peerChannel: &peerChannel{}, localnode: localnode, metrics: newMetrics(reg), config: discover.Config{ @@ -194,6 +195,50 @@ func (d *DiscoveryV5) SetHost(h host.Host) { d.host = h } +type peerChannel struct { + mutex sync.Mutex + channel chan peermanager.PeerData + started bool + ctx context.Context +} + +func (p *peerChannel) Start(ctx context.Context) { + p.mutex.Lock() + defer p.mutex.Unlock() + p.started = true + p.ctx = ctx + p.channel = make(chan peermanager.PeerData) +} + +func (p *peerChannel) Stop() { + p.mutex.Lock() + defer p.mutex.Unlock() + if !p.started { + return + } + p.started = false + close(p.channel) +} + +func (p *peerChannel) Subscribe() chan peermanager.PeerData { + return p.channel +} + +func (p *peerChannel) Publish(peer peermanager.PeerData) bool { + p.mutex.Lock() + defer p.mutex.Unlock() + if !p.started { + return false + } + select { + case p.channel <- peer: + case <-p.ctx.Done(): + return false + + } + return true +} + // only works if the discovery v5 hasn't been started yet. func (d *DiscoveryV5) Start(ctx context.Context) error { // compare and swap sets the discovery v5 to `started` state @@ -205,8 +250,8 @@ func (d *DiscoveryV5) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) d.cancel = cancel - d.peerCh = make(chan peermanager.PeerData) - d.peerConnector.Subscribe(ctx, d.peerCh) + d.peerChannel.Start(ctx) + d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe()) err := d.listen(ctx) if err != nil { @@ -249,7 +294,13 @@ func (d *DiscoveryV5) Stop() { d.wg.Wait() - close(d.peerCh) + defer func() { + if r := recover(); r != nil { + d.log.Info("recovering from panic and quitting") + } + }() + + d.peerChannel.Stop() } /* @@ -433,10 +484,10 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { ENR: n, } - select { - case d.peerCh <- peer: - case <-ctx.Done(): - return nil + if d.peerChannel.Publish(peer) { + d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) + } else { + d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) } return nil From 467d1b2ca54eeab8328908591f0864031df58961 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Mon, 28 Aug 2023 10:47:48 +0400 Subject: [PATCH 5/9] refactor: peerConnector (#665) * refactor: peerConnector * fix: code climate and dont waitOn subscriptions PeerData * fix: check in peerConnector is on outRelay connections * fix: introduced bug in peerConnector --- waku/v2/node/wakunode2.go | 5 +- waku/v2/peermanager/peer_connector.go | 128 ++++++------------ waku/v2/peermanager/peer_manager.go | 8 +- waku/v2/peermanager/test/peer_manager_test.go | 9 +- 4 files changed, 48 insertions(+), 102 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5297e28c1..b21c31938 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -253,7 +253,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { //Initialize peer manager. w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log) - maxOutPeers := int(w.peermanager.OutRelayPeersTarget) // Setup peer connection strategy cacheSize := 600 @@ -261,11 +260,10 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { minBackoff, maxBackoff := time.Minute, time.Hour bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, maxOutPeers, discoveryConnectTimeout, bkf, w.log) + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, w.peermanager, discoveryConnectTimeout, bkf, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } - w.peermanager.SetPeerConnector(w.peerConnector) if w.opts.enableDiscV5 { err := w.mountDiscV5() @@ -400,7 +398,6 @@ func (w *WakuNode) Start(ctx context.Context) error { w.peerConnector.SetHost(host) w.peermanager.SetHost(host) - w.peerConnector.SetPeerManager(w.peermanager) err = w.peerConnector.Start(ctx) if err != nil { return err diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 460f6cd45..fa1e2a87c 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -17,6 +17,8 @@ import ( "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "sync/atomic" + "go.uber.org/zap" lru "github.com/hashicorp/golang-lru" @@ -39,14 +41,10 @@ type PeerConnectionStrategy struct { pm *PeerManager cancel context.CancelFunc - paused bool - workerCtx context.Context - workerCancel context.CancelFunc + paused atomic.Bool wg sync.WaitGroup - maxOutPeers int dialTimeout time.Duration - peerCh chan PeerData dialCh chan peer.AddrInfo subscriptions []<-chan PeerData @@ -62,7 +60,7 @@ type PeerConnectionStrategy struct { // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer -func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, +func NewPeerConnectionStrategy(cacheSize int, pm *PeerManager, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { @@ -70,15 +68,16 @@ func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, if err != nil { return nil, err } - - return &PeerConnectionStrategy{ + pc := &PeerConnectionStrategy{ cache: cache, wg: sync.WaitGroup{}, - maxOutPeers: maxOutPeers, dialTimeout: dialTimeout, + pm: pm, backoff: backoff, logger: logger.Named("discovery-connector"), - }, nil + } + pm.SetPeerConnector(pc) + return pc, nil } type connCacheData struct { @@ -101,18 +100,31 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-chan PeerData) { for { + // for returning from the loop when peerConnector is paused. select { case <-ctx.Done(): return - case p := <-ch: + default: + } + // + if !c.isPaused() { select { case <-ctx.Done(): return - case c.peerCh <- p: + case p, ok := <-ch: + if !ok { + return + } + c.pm.AddDiscoveredPeer(p) + c.publishWork(ctx, p.AddrInfo) + case <-time.After(1 * time.Second): + // This timeout is to not lock the goroutine + break } + } else { + time.Sleep(1 * time.Second) // sleep while the peerConnector is paused. } } - } // SetHost sets the host to be able to mount or consume a protocol @@ -120,11 +132,6 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) { c.host = h } -// SetPeerManager sets the peermanager in order to utilize add peer -func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) { - c.pm = pm -} - // Start attempts to connect to the peers passed in by peerCh. // Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { @@ -134,12 +141,10 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) c.cancel = cancel - c.peerCh = make(chan PeerData) c.dialCh = make(chan peer.AddrInfo) - c.wg.Add(3) + c.wg.Add(2) go c.shouldDialPeers(ctx) - go c.workPublisher(ctx) go c.dialPeers(ctx) c.consumeSubscriptions(ctx) @@ -154,19 +159,14 @@ func (c *PeerConnectionStrategy) Stop() { } c.cancel() + c.cancel = nil c.wg.Wait() - close(c.peerCh) close(c.dialCh) - - c.subscriptions = nil - c.cancel = nil } func (c *PeerConnectionStrategy) isPaused() bool { - c.RLock() - defer c.RUnlock() - return c.paused + return c.paused.Load() } func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { @@ -174,38 +174,18 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - - c.Lock() - c.workerCtx, c.workerCancel = context.WithCancel(ctx) - c.Unlock() - for { select { case <-ctx.Done(): return case <-ticker.C: - isPaused := c.isPaused() - _, outRelayPeers, err := c.pm.GroupPeersByDirection() - if err != nil { - c.logger.Warn("failed to get outRelayPeers from peerstore", zap.Error(err)) - continue - } - numPeers := outRelayPeers.Len() - if numPeers >= c.maxOutPeers && !isPaused { - c.Lock() - c.paused = true - c.workerCancel() - c.Unlock() - } else if numPeers < c.maxOutPeers && isPaused { - c.Lock() - c.paused = false - c.workerCtx, c.workerCancel = context.WithCancel(ctx) - c.Unlock() - } + _, outRelayPeers := c.pm.getRelayPeers() + c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target } } } +// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) { for _, subs := range c.subscriptions { c.wg.Add(1) @@ -214,6 +194,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions(ctx context.Context) { c.consumeSubscription(ctx, s) }(subs) } + c.subscriptions = nil } func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) { @@ -224,45 +205,19 @@ func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInf } } -func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { - defer c.wg.Done() - - for { - select { - case <-ctx.Done(): - return - default: - isPaused := c.isPaused() - if !isPaused { - select { - case <-ctx.Done(): - return - case p := <-c.peerCh: - c.pm.AddDiscoveredPeer(p) - c.publishWork(ctx, p.AddrInfo) - case <-time.After(1 * time.Second): - // This timeout is to not lock the goroutine - break - } - } else { - // Check if paused again - time.Sleep(1 * time.Second) - } - } - } -} - const maxActiveDials = 5 +// c.cache is thread safe +// only reason why mutex is used: if canDialPeer is queried twice for the same peer. func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { c.mux.Lock() + defer c.mux.Unlock() val, ok := c.cache.Get(pi.ID) var cachedPeer *connCacheData if ok { tv := val.(*connCacheData) now := time.Now() if now.Before(tv.nextTry) { - c.mux.Unlock() return false } @@ -272,14 +227,13 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) c.cache.Add(pi.ID, cachedPeer) } - c.mux.Unlock() return true } func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer c.wg.Done() - maxGoRoutines := c.maxOutPeers + maxGoRoutines := c.pm.OutRelayPeersTarget if maxGoRoutines > maxActiveDials { maxGoRoutines = maxActiveDials } @@ -301,9 +255,7 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { if c.canDialPeer(pi) { sem <- struct{}{} c.wg.Add(1) - go c.dialPeer(pi, sem) - } else { - continue + go c.dialPeer(ctx, pi, sem) } case <-ctx.Done(): return @@ -311,11 +263,9 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { } } -func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { +func (c *PeerConnectionStrategy) dialPeer(ctx context.Context, pi peer.AddrInfo, sem chan struct{}) { defer c.wg.Done() - c.RLock() - ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout) - c.RUnlock() + ctx, cancel := context.WithTimeout(ctx, c.dialTimeout) defer cancel() err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 71cc25865..e0607bd0d 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -147,7 +147,7 @@ func (pm *PeerManager) connectToRelayPeers() { inRelayPeers, outRelayPeers := pm.getRelayPeers() if inRelayPeers.Len() > 0 && inRelayPeers.Len() > pm.InRelayPeersTarget { - pm.pruneInRelayConns(inRelayPeers, outRelayPeers) + pm.pruneInRelayConns(inRelayPeers) } if outRelayPeers.Len() > pm.OutRelayPeersTarget { @@ -191,7 +191,7 @@ func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) { return } -func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { +func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { //Start disconnecting peers, based on what? //For now, just disconnect most recently connected peers @@ -256,7 +256,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol //Add Service peers to serviceSlots. for _, proto := range protocols { - pm.AddPeerToServiceSlot(proto, info.ID, origin) + pm.AddPeerToServiceSlot(proto, info.ID) } //Add to the peer-store @@ -286,7 +286,7 @@ func (pm *PeerManager) RemovePeer(peerID peer.ID) { // AddPeerToServiceSlot adds a peerID to serviceSlot. // Adding to peerStore is expected to be already done by caller. // If relay proto is passed, it is not added to serviceSlot. -func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, origin wps.Origin) { +func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { if proto == WakuRelayIDv200 { pm.logger.Warn("Cannot add Relay peer to service peer slots") return diff --git a/waku/v2/peermanager/test/peer_manager_test.go b/waku/v2/peermanager/test/peer_manager_test.go index bd8c71b45..cd40e4766 100644 --- a/waku/v2/peermanager/test/peer_manager_test.go +++ b/waku/v2/peermanager/test/peer_manager_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/peermanager" - wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -45,7 +44,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerId, h2.ID()) //Test addition and selection from service-slot - pm.AddPeerToServiceSlot(protocol, h2.ID(), wps.Static) + pm.AddPeerToServiceSlot(protocol, h2.ID()) peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) require.NoError(t, err) @@ -58,14 +57,14 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerId, h2.ID()) h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(protocol, h3.ID(), wps.Static) + pm.AddPeerToServiceSlot(protocol, h3.ID()) h4, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) defer h4.Close() h1.Peerstore().AddAddrs(h4.ID(), h4.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(protocol1, h4.ID(), wps.Static) + pm.AddPeerToServiceSlot(protocol1, h4.ID()) //Test peer selection from first added peer to serviceSlot peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) @@ -91,7 +90,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, utils.ErrNoPeersAvailable) //Test peer selection for relay protocol from peer store h1.Peerstore().AddAddrs(h5.ID(), h5.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID(), wps.Static) + pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID()) _, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) require.Error(t, err, utils.ErrNoPeersAvailable) From 67d0b77147e412073aadc3f512a6d9a5d5315098 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 28 Aug 2023 14:05:59 +0530 Subject: [PATCH 6/9] chore: fix frequent logging from poll (#688) --- waku/v2/peermanager/peer_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index e0607bd0d..bb198db39 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -126,7 +126,7 @@ func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers if err != nil { return } - pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), + pm.logger.Debug("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len())) //Need to filter peers to check if they support relay @@ -136,8 +136,6 @@ func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers if outPeers.Len() != 0 { outRelayPeers, _ = utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200) } - pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), - zap.Int("outRelayPeers", outRelayPeers.Len())) return } @@ -145,6 +143,8 @@ func (pm *PeerManager) connectToRelayPeers() { //Check for out peer connections and connect to more peers. inRelayPeers, outRelayPeers := pm.getRelayPeers() + pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), + zap.Int("outRelayPeers", outRelayPeers.Len())) if inRelayPeers.Len() > 0 && inRelayPeers.Len() > pm.InRelayPeersTarget { pm.pruneInRelayConns(inRelayPeers) From 6463dbeb709f5e5353a405f1117ae4d12c7f7c39 Mon Sep 17 00:00:00 2001 From: Iuri Matias Date: Mon, 28 Aug 2023 10:46:18 -0400 Subject: [PATCH 7/9] chore: update Readme to reflect required go version --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9adbc90d5..a619cc248 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ Thank you for considering to help out with the source code! We welcome contribut If you'd like to contribute to go-waku, please fork, fix, commit and send a pull request. If you wish to submit more complex changes though, please check up with the core devs first to ensure those changes are in line with the general philosophy of the project and/or get some early feedback which can make both your efforts much lighter as well as our review and merge procedures quick and simple. To build and test this repository, you need: - - [Go](https://golang.org/) (version 1.17 or later) + - [Go](https://golang.org/) (version 1.19 or 1.20) - [protoc](https://grpc.io/docs/protoc-installation/) - [protoc-gen-go](https://protobuf.dev/getting-started/gotutorial/#compiling-protocol-buffers) From 8ad08d6b04813735fdbb60818d37f629c6be8953 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Tue, 29 Aug 2023 12:53:39 +0100 Subject: [PATCH 8/9] Check nils when deliting subscription (#691) --- waku/v2/protocol/filter/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 6f59c655c..f2787269c 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -379,7 +379,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co localWg.Wait() close(resultChan) for _, peerID := range peersUnsubscribed { - if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { + if wf.subscriptions != nil && wf.subscriptions.items != nil && wf.subscriptions.items[peerID] != nil && len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { delete(wf.subscriptions.items, peerID) } } From 4c52149facd6e3b21ca606fde549cdefe44e7d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 30 Aug 2023 01:18:06 -0400 Subject: [PATCH 9/9] chore: log succesful message pushes (#694) Co-authored-by: Prem Chaitanya Prathi --- waku/v2/protocol/filter/server.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 60e5d10ad..b47c8db35 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -267,7 +267,12 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { } func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error { - logger := wf.log.With(logging.HostID("peer", peerID)) + logger := wf.log.With( + logging.HostID("peer", peerID), + logging.HexBytes("envelopeHash", env.Hash()), + zap.String("pubsubTopic", env.PubsubTopic()), + zap.String("contentTopic", env.Message().ContentTopic), + ) messagePush := &pb.MessagePushV2{ PubsubTopic: env.PubsubTopic(), @@ -298,12 +303,15 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e } else { wf.metrics.RecordError(writeResponseFailure) } - logger.Error("pushing messages to peer", logging.HexBytes("envelopeHash", env.Hash()), zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err)) + logger.Error("pushing messages to peer", zap.Error(err)) wf.subscriptions.FlagAsFailure(peerID) return nil } wf.subscriptions.FlagAsSuccess(peerID) + + logger.Info("message pushed succesfully") // TODO: remove or change to debug once dogfooding of filter is complete + return nil }