Skip to content

Commit

Permalink
refactor: verify started state for doing filter operations
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Aug 28, 2023
1 parent 1c4c3d0 commit 32e051e
Showing 1 changed file with 45 additions and 1 deletion.
46 changes: 45 additions & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"net/http"
"sync"
"sync/atomic"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -44,6 +45,7 @@ type WakuFilterLightNode struct {
log *zap.Logger
subscriptions *SubscriptionsMap
pm *peermanager.PeerManager
started atomic.Bool
}

type ContentFilter struct {
Expand All @@ -56,6 +58,8 @@ type WakuFilterPushResult struct {
PeerID peer.ID
}

var errNotStarted = errors.New("filter is not started")

// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
Expand All @@ -78,6 +82,10 @@ func (wf *WakuFilterLightNode) SetHost(h host.Host) {
}

func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
if !wf.started.CompareAndSwap(false, true) {
return nil // Already started
}

wf.wg.Wait() // Wait for any goroutines to stop

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -94,7 +102,7 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error {

// Stop unmounts the filter protocol
func (wf *WakuFilterLightNode) Stop() {
if wf.cancel == nil {
if !wf.started.CompareAndSwap(true, false) {
return
}

Expand Down Expand Up @@ -206,6 +214,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr

// Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
if !wf.isStarted() {
return nil, errNotStarted
}

if contentFilter.Topic == "" {
return nil, errors.New("topic is required")
}
Expand Down Expand Up @@ -244,6 +256,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont

// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
if !wf.isStarted() {
return nil, errNotStarted
}

if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) {
return nil, errors.New("subscription does not exist")
}
Expand All @@ -263,6 +279,10 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscrib
}

func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
if !wf.isStarted() {
return errNotStarted
}

return wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
Expand All @@ -271,10 +291,18 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
}

func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
if !wf.isStarted() {
return errNotStarted
}

return wf.Ping(ctx, subscription.PeerID)
}

func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
if !wf.isStarted() {
return nil
}

wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock()

Expand Down Expand Up @@ -324,6 +352,10 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if !wf.isStarted() {
return nil, errNotStarted
}

if contentFilter.Topic == "" {
return nil, errors.New("topic is required")
}
Expand Down Expand Up @@ -401,6 +433,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if !wf.isStarted() {
return nil, errNotStarted
}

var contentTopics []string
for k := range sub.ContentTopics {
contentTopics = append(contentTopics, k)
Expand All @@ -413,6 +449,10 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,

// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if !wf.isStarted() {
return nil, errNotStarted
}

params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -466,3 +506,7 @@ func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...Filte

return resultChan, nil
}

func (wf *WakuFilterLightNode) isStarted() bool {
return wf.started.Load()
}

0 comments on commit 32e051e

Please sign in to comment.