Skip to content

Commit

Permalink
fix: check subscription when relay publish message (#1212)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaichaosun authored Aug 31, 2024
1 parent 690849c commit 99d2477
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 11 deletions.
9 changes: 8 additions & 1 deletion cmd/waku/server/rest/lightpush_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rest

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/utils"
)
Expand All @@ -22,8 +24,13 @@ func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNod
node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay())

_, err := node1.Relay().Subscribe(context.Background(), protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)
_, err = node2.Relay().Subscribe(context.Background(), protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)

node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1)
err = node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1)
require.NoError(t, err)
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
require.NoError(t, err)
Expand Down
9 changes: 7 additions & 2 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
Expand All @@ -34,8 +35,9 @@ func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService {

func TestPostV1Message(t *testing.T) {
router := chi.NewRouter()
testTopic := "test"

_ = makeRelayService(t, router)
r := makeRelayService(t, router)
msg := &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Expand All @@ -44,8 +46,11 @@ func TestPostV1Message(t *testing.T) {
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)

_, err = r.node.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
require.NoError(t, err)

rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJSONBytes))
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/"+testTopic, bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
Expand Down
6 changes: 6 additions & 0 deletions waku/v2/api/publish/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func TestNewSenderWithRelay(t *testing.T) {
err := relayNode.Start(context.Background())
require.Nil(t, err)
defer relayNode.Stop()

_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger())
require.Nil(t, err)
require.NotNil(t, sender)
Expand All @@ -72,6 +75,9 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) {
err := relayNode.Start(context.Background())
require.Nil(t, err)
defer relayNode.Stop()

_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger())

check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func Test500(t *testing.T) {

sub1, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)
sub2, err := wakuNode1.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
sub2, err := wakuNode2.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
require.NoError(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
pubSubTopic3 := protocol.NewStaticShardingPubsubTopic(testClusterID, uint16(321))
pubSubTopic3Str := pubSubTopic3.String()
_, err = r.Publish(ctx, msg2, relay.WithPubSubTopic(pubSubTopic3Str))
require.NoError(t, err)
require.Error(t, err)

time.Sleep(100 * time.Millisecond)

Expand Down
14 changes: 10 additions & 4 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"time"

"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -213,13 +215,17 @@ func (s *FilterTestSuite) TestStaticSharding() {
// Test positive case for static shard pubsub topic - message gets received
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, ""})

// Test two negative cases for static shard pubsub topic - message times out
s.waitForTimeout(&WakuMsg{testTopics[0], s.TestContentTopic, ""})
// Test two negative cases for static shard pubsub topic
msg := &WakuMsg{testTopics[0], s.TestContentTopic, ""}
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic))
s.Require().Error(err)

s.waitForTimeout(&WakuMsg{testTopics[1], s.TestContentTopic, ""})
msg = &WakuMsg{testTopics[1], s.TestContentTopic, ""}
_, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(msg.ContentTopic, utils.GetUnixEpoch(), msg.Payload), relay.WithPubSubTopic(msg.PubSubTopic))
s.Require().Error(err)

// Cleanup
_, err := s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
_, err = s.LightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.TestTopic,
ContentTopics: protocol.NewContentTopicSet(s.TestContentTopic),
})
Expand Down
6 changes: 4 additions & 2 deletions waku/v2/protocol/lightpush/waku_lightpush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ func TestWakuLightPushCornerCases(t *testing.T) {

// Test corner case with default pubSub topic
_, err = client.Publish(ctx, msg2, WithDefaultPubsubTopic(), WithPeer(host2.ID()))
require.NoError(t, err)
require.Error(t, err)
require.Equal(t, "lightpush errorCould not publish message: cannot publish to unsubscribed topic", err.Error())

// Test situation when cancel func is nil
lightPushNode2.cancel = nil
Expand Down Expand Up @@ -405,6 +406,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {

// Check that msg2 publish finished without message delivery for unconfigured topic
_, err = client.Publish(ctx, msg2, WithPubSubTopic("/waku/2/rsv/25/0"), WithPeer(host2.ID()))
require.NoError(t, err)
require.Error(t, err)
require.Equal(t, "lightpush errorCould not publish message: cannot publish to unsubscribed topic", err.Error())
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, sub3.Ch)
}
8 changes: 8 additions & 0 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,20 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
if err != nil {
return pb.MessageHash{}, err
}
_, err = w.subscribeToPubsubTopic(params.pubsubTopic)
if err != nil {
return pb.MessageHash{}, err
}
}

if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) {
return pb.MessageHash{}, errors.New("not enough peers to publish")
}

if !w.IsSubscribed(params.pubsubTopic) {
return pb.MessageHash{}, errors.New("cannot publish to unsubscribed topic")
}

w.topicsMutex.Lock()
defer w.topicsMutex.Unlock()

Expand Down
54 changes: 54 additions & 0 deletions waku/v2/protocol/relay/waku_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,60 @@ func TestWakuRelay(t *testing.T) {
<-ctx.Done()
}

func TestWakuRelayUnsubscribedTopic(t *testing.T) {
testTopic := defaultTestPubSubTopic
anotherTopic := "/waku/2/go/relay/another-topic"

port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcaster := NewBroadcaster(10)
relay := NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)

err = bcaster.Start(context.Background())
require.NoError(t, err)
defer relay.Stop()

subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))

require.NoError(t, err)

require.Equal(t, relay.IsSubscribed(testTopic), true)
require.Equal(t, relay.IsSubscribed(anotherTopic), false)

topics := relay.Topics()
require.Equal(t, 1, len(topics))
require.Equal(t, testTopic, topics[0])

ctx, cancel := context.WithCancel(context.Background())
bytesToSend := []byte{1}
go func() {
defer cancel()
env := <-subs[0].Ch
if env != nil {
t.Log("received msg", logging.Hash(env.Hash()))
}
}()

msg := &pb.WakuMessage{
Payload: bytesToSend,
ContentTopic: "test",
}
_, err = relay.Publish(context.Background(), msg, WithPubSubTopic(anotherTopic))
require.Error(t, err)

time.Sleep(2 * time.Second)

err = relay.Unsubscribe(ctx, protocol.NewContentFilter(testTopic))
require.NoError(t, err)
<-ctx.Done()
}

func createRelayNode(t *testing.T) (host.Host, *WakuRelay) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
Expand Down

0 comments on commit 99d2477

Please sign in to comment.