From e86cac8071f11f2ab365e07cd662ac22bdeb5421 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 28 Aug 2023 15:19:02 -0400 Subject: [PATCH 1/4] refactor(filter): unsubscribe waitgroup and async --- waku/v2/protocol/filter/client.go | 79 +++++++++++++++++++----------- waku/v2/protocol/filter/options.go | 19 +++++++ 2 files changed, 70 insertions(+), 28 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f2787269c..28721b185 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -341,17 +341,33 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return nil, err } - localWg := sync.WaitGroup{} resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - var peersUnsubscribed []peer.ID for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - peersUnsubscribed = append(peersUnsubscribed, peerID) - localWg.Add(1) + + subscriptions, ok := wf.subscriptions.items[peerID] + if !ok || subscriptions == nil { + continue + } + + wf.cleanupSubscriptions(peerID, contentFilter) + if len(subscriptions.subscriptionsPerTopic) == 0 { + delete(wf.subscriptions.items, peerID) + } + + if params.wg != nil { + params.wg.Add(1) + } + go func(peerID peer.ID) { - defer localWg.Done() + defer func() { + if params.wg != nil { + params.wg.Done() + } + }() + err := wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, @@ -367,22 +383,19 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co } } - wf.cleanupSubscriptions(peerID, contentFilter) - - resultChan <- WakuFilterPushResult{ - Err: err, - PeerID: peerID, + if params.wg != nil { + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: peerID, + } } }(peerID) } - localWg.Wait() - close(resultChan) - for _, peerID := range peersUnsubscribed { - if wf.subscriptions != nil && wf.subscriptions.items != nil && wf.subscriptions.items[peerID] != nil && len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { - delete(wf.subscriptions.items, peerID) - } + if params.wg != nil { + params.wg.Wait() } + return resultChan, nil } @@ -408,19 +421,26 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte wf.subscriptions.Lock() defer wf.subscriptions.Unlock() - localWg := sync.WaitGroup{} resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - var peersUnsubscribed []peer.ID for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - peersUnsubscribed = append(peersUnsubscribed, peerID) - localWg.Add(1) + delete(wf.subscriptions.items, peerID) + + if params.wg != nil { + params.wg.Add(1) + } + go func(peerID peer.ID) { - defer localWg.Done() + defer func() { + if params.wg != nil { + params.wg.Done() + } + }() + err := wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, @@ -429,17 +449,20 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } - resultChan <- WakuFilterPushResult{ - Err: err, - PeerID: peerID, + if params.wg != nil { + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: peerID, + } } }(peerID) } - localWg.Wait() - close(resultChan) - for _, peerID := range peersUnsubscribed { - delete(wf.subscriptions.items, peerID) + if params.wg != nil { + params.wg.Wait() } + + close(resultChan) + return resultChan, nil } diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 188638b9e..b58043c7b 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -2,6 +2,7 @@ package filter import ( "context" + "sync" "time" "github.com/libp2p/go-libp2p/core/host" @@ -26,6 +27,7 @@ type ( selectedPeer peer.ID requestID []byte log *zap.Logger + wg *sync.WaitGroup } FilterParameters struct { @@ -135,9 +137,26 @@ func AutomaticRequestId() FilterUnsubscribeOption { } } +// WithWaitGroup allos specigying a waitgroup to wait until all +// unsubscribe requests are complete before the function is complete +func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.wg = wg + } +} + +// Async is used to fire and forget an unsubscription, and don't +// care about the results of it +func Async() FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.wg = nil + } +} + func DefaultUnsubscribeOptions() []FilterUnsubscribeOption { return []FilterUnsubscribeOption{ AutomaticRequestId(), + WithWaitGroup(&sync.WaitGroup{}), } } From 9e0287749d206bea9a95a59fcff6441f39c5c2d7 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 28 Aug 2023 15:26:55 -0400 Subject: [PATCH 2/4] refactor: verify started state for doing filter operations --- waku/v2/protocol/filter/client.go | 48 +++++++++++++++++++++++++- waku/v2/protocol/filter/filter_test.go | 43 +++++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 28721b185..5b1fdd742 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -8,6 +8,7 @@ import ( "math" "net/http" "sync" + "sync/atomic" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -44,6 +45,7 @@ type WakuFilterLightNode struct { log *zap.Logger subscriptions *SubscriptionsMap pm *peermanager.PeerManager + started atomic.Bool } type ContentFilter struct { @@ -56,6 +58,8 @@ type WakuFilterPushResult struct { PeerID peer.ID } +var errNotStarted = errors.New("filter is not started") + // NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil @@ -78,6 +82,10 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) { } func (wf *WakuFilterLightNode) Start(ctx context.Context) error { + if !wf.started.CompareAndSwap(false, true) { + return nil // Already started + } + wf.wg.Wait() // Wait for any goroutines to stop ctx, cancel := context.WithCancel(ctx) @@ -94,7 +102,7 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error { // Stop unmounts the filter protocol func (wf *WakuFilterLightNode) Stop() { - if wf.cancel == nil { + if !wf.started.CompareAndSwap(true, false) { return } @@ -206,6 +214,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr // Subscribe setups a subscription to receive messages that match a specific content filter func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { + if !wf.isStarted() { + return nil, errNotStarted + } + if contentFilter.Topic == "" { return nil, errors.New("topic is required") } @@ -244,6 +256,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { + if !wf.isStarted() { + return nil, errNotStarted + } + if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) { return nil, errors.New("subscription does not exist") } @@ -263,6 +279,10 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscrib } func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { + if !wf.isStarted() { + return errNotStarted + } + return wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID}, @@ -271,10 +291,18 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { } func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { + if !wf.isStarted() { + return errNotStarted + } + return wf.Ping(ctx, subscription.PeerID) } func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { + if !wf.isStarted() { + return nil + } + wf.subscriptions.RLock() defer wf.subscriptions.RUnlock() @@ -324,6 +352,10 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { + if !wf.isStarted() { + return nil, errNotStarted + } + if contentFilter.Topic == "" { return nil, errors.New("topic is required") } @@ -396,11 +428,17 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co params.wg.Wait() } + close(resultChan) + return resultChan, nil } // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { + if !wf.isStarted() { + return nil, errNotStarted + } + var contentTopics []string for k := range sub.ContentTopics { contentTopics = append(contentTopics, k) @@ -413,6 +451,10 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { + if !wf.isStarted() { + return nil, errNotStarted + } + params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err @@ -466,3 +508,7 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte return resultChan, nil } + +func (wf *WakuFilterLightNode) isStarted() bool { + return wf.started.Load() +} diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 5e676badb..e0b5dc672 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -333,3 +333,46 @@ func (s *FilterTestSuite) TestMultipleMessages() { }, s.subDetails.C) } + +func (s *FilterTestSuite) TestRunningGuard() { + s.lightNode.Stop() + + contentFilter := ContentFilter{ + Topic: "test", + ContentTopics: []string{"test"}, + } + + _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + + s.Require().ErrorIs(err, errNotStarted) + + err = s.lightNode.Start(s.ctx) + s.Require().NoError(err) + + _, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + + s.Require().NoError(err) +} + +func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { + contentFilter := ContentFilter{ + Topic: "test", + ContentTopics: []string{"test"}, + } + + _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + ch, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, Async()) + _, open := <-ch + s.Require().NoError(err) + s.Require().False(open) + + _, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + wg := sync.WaitGroup{} + _, err = s.lightNode.Unsubscribe(s.ctx, contentFilter, WithWaitGroup(&wg)) + wg.Wait() + s.Require().NoError(err) +} From df7e55c6620b166e40ac6a1a16b743771c8ce019 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 29 Aug 2023 11:11:21 -0400 Subject: [PATCH 3/4] fix: code review items --- waku/v2/protocol/filter/client.go | 90 +++++++++++++++++++------- waku/v2/protocol/filter/filter_test.go | 42 ++++++++++-- waku/v2/protocol/filter/options.go | 4 +- 3 files changed, 107 insertions(+), 29 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 5b1fdd742..2c01af338 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -8,7 +8,6 @@ import ( "math" "net/http" "sync" - "sync/atomic" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -35,6 +34,9 @@ var ( ) type WakuFilterLightNode struct { + sync.RWMutex + started bool + cancel context.CancelFunc ctx context.Context h host.Host @@ -45,7 +47,6 @@ type WakuFilterLightNode struct { log *zap.Logger subscriptions *SubscriptionsMap pm *peermanager.PeerManager - started atomic.Bool } type ContentFilter struct { @@ -58,7 +59,8 @@ type WakuFilterPushResult struct { PeerID peer.ID } -var errNotStarted = errors.New("filter is not started") +var errNotStarted = errors.New("not started") +var errAlreadyStarted = errors.New("already started") // NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options // Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. @@ -82,8 +84,11 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) { } func (wf *WakuFilterLightNode) Start(ctx context.Context) error { - if !wf.started.CompareAndSwap(false, true) { - return nil // Already started + wf.Lock() + defer wf.Unlock() + + if wf.started { + return errAlreadyStarted } wf.wg.Wait() // Wait for any goroutines to stop @@ -92,6 +97,7 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error { wf.cancel = cancel wf.ctx = ctx wf.subscriptions = NewSubscriptionMap(wf.log) + wf.started = true wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx)) @@ -102,7 +108,10 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error { // Stop unmounts the filter protocol func (wf *WakuFilterLightNode) Stop() { - if !wf.started.CompareAndSwap(true, false) { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return } @@ -110,10 +119,23 @@ func (wf *WakuFilterLightNode) Stop() { wf.h.RemoveStreamHandler(FilterPushID_v20beta1) - _, _ = wf.UnsubscribeAll(wf.ctx) + res, err := wf.unsubscribeAll(wf.ctx) + if err != nil { + wf.log.Warn("unsubscribing from full nodes", zap.Error(err)) + } + + for r := range res { + if r.Err != nil { + wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID)) + } + + } wf.subscriptions.Clear() + wf.started = false + wf.cancel = nil + wf.wg.Wait() } @@ -214,7 +236,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr // Subscribe setups a subscription to receive messages that match a specific content filter func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return nil, errNotStarted } @@ -256,7 +281,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return nil, errNotStarted } @@ -279,7 +307,10 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscrib } func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return errNotStarted } @@ -291,7 +322,10 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { } func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return errNotStarted } @@ -299,7 +333,10 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip } func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return nil } @@ -352,7 +389,10 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return nil, errNotStarted } @@ -435,7 +475,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { - if !wf.isStarted() { + wf.Lock() + defer wf.Unlock() + + if !wf.started { return nil, errNotStarted } @@ -449,12 +492,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.PubsubTopic, ContentTopics: contentTopics}, opts...) } -// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { - if !wf.isStarted() { - return nil, errNotStarted - } - +func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err @@ -509,6 +547,14 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte return resultChan, nil } -func (wf *WakuFilterLightNode) isStarted() bool { - return wf.started.Load() +// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { + wf.Lock() + defer wf.Unlock() + + if !wf.started { + return nil, errNotStarted + } + + return wf.unsubscribeAll(ctx, opts...) } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index e0b5dc672..2d6853710 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -3,6 +3,7 @@ package filter import ( "context" "crypto/rand" + "errors" "net/http" "sync" "testing" @@ -67,7 +68,7 @@ func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay. return relay, sub, host, broadcaster } -func (s *FilterTestSuite) makeWakuFilterLightNode() *WakuFilterLightNode { +func (s *FilterTestSuite) makeWakuFilterLightNode(start bool) *WakuFilterLightNode { port, err := tests.FindFreePort(s.T(), "", 5) s.Require().NoError(err) @@ -79,8 +80,10 @@ func (s *FilterTestSuite) makeWakuFilterLightNode() *WakuFilterLightNode { filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.log) filterPush.SetHost(host) s.lightNodeHost = host - err = filterPush.Start(context.Background()) - s.Require().NoError(err) + if start { + err = filterPush.Start(context.Background()) + s.Require().NoError(err) + } return filterPush } @@ -178,7 +181,7 @@ func (s *FilterTestSuite) SetupTest() { s.testTopic = "/waku/2/go/filter/test" s.testContentTopic = "TopicA" - s.lightNode = s.makeWakuFilterLightNode() + s.lightNode = s.makeWakuFilterLightNode(true) s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) @@ -363,7 +366,7 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) - ch, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, Async()) + ch, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait()) _, open := <-ch s.Require().NoError(err) s.Require().False(open) @@ -376,3 +379,32 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { wg.Wait() s.Require().NoError(err) } + +func (s *FilterTestSuite) TestStartStop() { + var wg sync.WaitGroup + wg.Add(2) + s.lightNode = s.makeWakuFilterLightNode(false) + + stopNode := func() { + for i := 0; i < 100000; i++ { + s.lightNode.Stop() + } + wg.Done() + } + + startNode := func() { + for i := 0; i < 100; i++ { + err := s.lightNode.Start(context.Background()) + if errors.Is(err, errAlreadyStarted) { + continue + } + s.Require().NoError(err) + } + wg.Done() + } + + go startNode() + go stopNode() + + wg.Wait() +} diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index b58043c7b..249db284c 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -145,9 +145,9 @@ func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption { } } -// Async is used to fire and forget an unsubscription, and don't +// DontWait is used to fire and forget an unsubscription, and don't // care about the results of it -func Async() FilterUnsubscribeOption { +func DontWait() FilterUnsubscribeOption { return func(params *FilterUnsubscribeParameters) { params.wg = nil } From b1acd53f17288f9d82edc0ee84fd043590334308 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 31 Aug 2023 16:37:07 -0400 Subject: [PATCH 4/4] fix: code review items pt 2 --- waku/v2/protocol/filter/client.go | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 2c01af338..d9448965e 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -236,8 +236,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr // Subscribe setups a subscription to receive messages that match a specific content filter func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return nil, errNotStarted @@ -281,8 +281,8 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return nil, errNotStarted @@ -307,8 +307,8 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscrib } func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return errNotStarted @@ -322,8 +322,8 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { } func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return errNotStarted @@ -333,8 +333,8 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip } func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return nil @@ -389,8 +389,8 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return nil, errNotStarted @@ -475,8 +475,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return nil, errNotStarted @@ -549,8 +549,8 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { - wf.Lock() - defer wf.Unlock() + wf.RLock() + defer wf.RUnlock() if !wf.started { return nil, errNotStarted