From 1ffb9aa0904e8ccd4fc89a5884b8b335d1f5ea31 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 15:05:54 +0800 Subject: [PATCH 01/10] chore: refactor sender api --- waku/v2/api/publish/message_sender.go | 85 +++++++++++++++++++++++++++ waku/v2/api/publish/rate_limiting.go | 10 ++++ 2 files changed, 95 insertions(+) create mode 100644 waku/v2/api/publish/message_sender.go diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go new file mode 100644 index 000000000..ff640dc01 --- /dev/null +++ b/waku/v2/api/publish/message_sender.go @@ -0,0 +1,85 @@ +package publish + +import ( + "context" + "errors" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "go.uber.org/zap" +) + +const peersToPublishForLightpush = 2 + +type PublishMethod int + +const ( + LightPush PublishMethod = iota + Relay +) + +type MessageSender struct { + ctx context.Context + publishMethod PublishMethod + lightPush *lightpush.WakuLightPush + relay *relay.WakuRelay + messageSentCheck *MessageSentCheck + rateLimiter *PublishRateLimiter + logger *zap.Logger +} + +func NewMessageSender(ctx context.Context, publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) *MessageSender { + return &MessageSender{ + ctx: ctx, + publishMethod: publishMethod, + lightPush: lightPush, + relay: relay, + logger: logger, + } +} + +func (ms *MessageSender) WithMessageSentCheck(messageSentCheck *MessageSentCheck) *MessageSender { + ms.messageSentCheck = messageSentCheck + return ms +} + +func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender { + ms.rateLimiter = rateLimiter + return ms +} + +func (ms *MessageSender) Send(env *protocol.Envelope) error { + logger := ms.logger.With(zap.Stringer("envelopeHash", env.Hash()), zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), zap.Int64("timestamp", env.Message().GetTimestamp())) + if ms.rateLimiter != nil { + if err := ms.rateLimiter.Check(ms.ctx, logger); err != nil { + return err + } + } + + switch ms.publishMethod { + case LightPush: + if ms.lightPush == nil { + return errors.New("lightpush is not available") + } + logger.Info("publishing message via lightpush") + _, err := ms.lightPush.Publish(ms.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush)) + return err + case Relay: + if ms.relay == nil { + return errors.New("relay is not available") + } + peerCnt := len(ms.relay.PubSub().ListPeers(env.PubsubTopic())) + logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) + _, err := ms.relay.Publish(ms.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) + return err + } + + if ms.messageSentCheck != nil { + ms.messageSentCheck.Add(env.PubsubTopic(), common.BytesToHash(env.Hash().Bytes()), uint32(env.Message().GetTimestamp()/int64(time.Second))) + } + + return nil +} diff --git a/waku/v2/api/publish/rate_limiting.go b/waku/v2/api/publish/rate_limiting.go index 4322413b3..87c0f427c 100644 --- a/waku/v2/api/publish/rate_limiting.go +++ b/waku/v2/api/publish/rate_limiting.go @@ -35,3 +35,13 @@ func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn Pu return publishFn(envelope, logger) } } + +func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { + if err := p.limiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("could not send message (limiter)", zap.Error(err)) + } + return err + } + return nil +} From 2d8a5427d75fb95105a04aeb99d3ee6bb18a927c Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 15:48:38 +0800 Subject: [PATCH 02/10] chore: more --- waku/v2/api/publish/message_sender.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index ff640dc01..801d388b7 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -21,6 +21,17 @@ const ( Relay ) +func (pm PublishMethod) String() string { + switch pm { + case LightPush: + return "LightPush" + case Relay: + return "Relay" + default: + return "Unknown" + } +} + type MessageSender struct { ctx context.Context publishMethod PublishMethod @@ -83,3 +94,7 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error { return nil } + +func (ms *MessageSender) PublishMethod() PublishMethod { + return ms.publishMethod +} From 85605b261ecb2a68587884999f18fcb7ed2a05fb Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 16:04:32 +0800 Subject: [PATCH 03/10] chore: refactor ephemeral of evenlope --- waku/v2/api/publish/message_sender.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 801d388b7..50224b1ee 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -88,7 +88,8 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error { return err } - if ms.messageSentCheck != nil { + ephemeral := env.Message().Ephemeral + if ms.messageSentCheck != nil && (ephemeral == nil || !*ephemeral) { ms.messageSentCheck.Add(env.PubsubTopic(), common.BytesToHash(env.Hash().Bytes()), uint32(env.Message().GetTimestamp()/int64(time.Second))) } From 5075fa10ac7114f46cc626a5506bac0dd10254f4 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 16:51:29 +0800 Subject: [PATCH 04/10] chore: use default rate limiter --- waku/v2/api/publish/message_sender.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 50224b1ee..f1af5f291 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -10,9 +10,12 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" + "golang.org/x/time/rate" ) -const peersToPublishForLightpush = 2 +const DefaultPeersToPublishForLightpush = 2 +const DefaultPublishingLimiterRate = rate.Limit(2) +const DefaultPublishingLimitBurst = 4 type PublishMethod int @@ -48,6 +51,7 @@ func NewMessageSender(ctx context.Context, publishMethod PublishMethod, lightPus publishMethod: publishMethod, lightPush: lightPush, relay: relay, + rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), logger: logger, } } @@ -76,7 +80,7 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error { return errors.New("lightpush is not available") } logger.Info("publishing message via lightpush") - _, err := ms.lightPush.Publish(ms.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush)) + _, err := ms.lightPush.Publish(ms.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush)) return err case Relay: if ms.relay == nil { From 9ba9962fdd18a0ec85884b35264349d89c69ee35 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 16:58:27 +0800 Subject: [PATCH 05/10] chore: confirm delivered --- waku/v2/api/publish/message_sender.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index f1af5f291..08a1ddc42 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -103,3 +104,15 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error { func (ms *MessageSender) PublishMethod() PublishMethod { return ms.publishMethod } + +func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) { + if ms.messageSentCheck != nil { + ms.messageSentCheck.DeleteByMessageIDs(messageIDs) + } +} + +func (ms *MessageSender) SetStorePeerID(peerID peer.ID) { + if ms.messageSentCheck != nil { + ms.messageSentCheck.SetStorePeerID(peerID) + } +} From 7450a79713ad09cedb797edbe2b631d0b3c6e665 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 17:15:54 +0800 Subject: [PATCH 06/10] chore: pass in chan --- waku/v2/api/publish/message_check.go | 14 +++++++------- waku/v2/api/publish/message_check_test.go | 2 +- waku/v2/api/publish/message_sender.go | 6 ++++++ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index a7b16a571..47f7c5408 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -30,8 +30,8 @@ type MessageSentCheck struct { messageIDs map[string]map[common.Hash]uint32 messageIDsMu sync.RWMutex storePeerID peer.ID - MessageStoredChan chan common.Hash - MessageExpiredChan chan common.Hash + messageStoredChan chan common.Hash + messageExpiredChan chan common.Hash ctx context.Context store *store.WakuStore timesource timesource.Timesource @@ -43,12 +43,12 @@ type MessageSentCheck struct { } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters -func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, logger *zap.Logger) *MessageSentCheck { +func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { return &MessageSentCheck{ messageIDs: make(map[string]map[common.Hash]uint32), messageIDsMu: sync.RWMutex{}, - MessageStoredChan: make(chan common.Hash, 1000), - MessageExpiredChan: make(chan common.Hash, 1000), + messageStoredChan: msgStoredChan, + messageExpiredChan: msgExpiredChan, ctx: ctx, store: store, timesource: timesource, @@ -232,12 +232,12 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c if found { ackHashes = append(ackHashes, hash) - m.MessageStoredChan <- hash + m.messageStoredChan <- hash } if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid { missedHashes = append(missedHashes, hash) - m.MessageExpiredChan <- hash + m.messageExpiredChan <- hash } } diff --git a/waku/v2/api/publish/message_check_test.go b/waku/v2/api/publish/message_check_test.go index 129472589..ef53f4d36 100644 --- a/waku/v2/api/publish/message_check_test.go +++ b/waku/v2/api/publish/message_check_test.go @@ -10,7 +10,7 @@ import ( func TestAddAndDelete(t *testing.T) { ctx := context.TODO() - messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil) + messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil) messageSentCheck.Add("topic", [32]byte{1}, 1) messageSentCheck.Add("topic", [32]byte{2}, 2) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 08a1ddc42..a6bff6c66 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -101,6 +101,12 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error { return nil } +func (ms *MessageSender) Start() { + if ms.messageSentCheck != nil { + go ms.messageSentCheck.Start() + } +} + func (ms *MessageSender) PublishMethod() PublishMethod { return ms.publishMethod } From 3a354ddba8ce74fa50bb0fec4770ffaf37ee7644 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Fri, 9 Aug 2024 12:09:13 +0800 Subject: [PATCH 07/10] chore: fix comments --- waku/v2/api/publish/message_sender.go | 3 +-- waku/v2/api/publish/rate_limiting.go | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index a6bff6c66..cce2c542a 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -93,8 +93,7 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error { return err } - ephemeral := env.Message().Ephemeral - if ms.messageSentCheck != nil && (ephemeral == nil || !*ephemeral) { + if ms.messageSentCheck != nil && !env.Message().GetEphemeral() { ms.messageSentCheck.Add(env.PubsubTopic(), common.BytesToHash(env.Hash().Bytes()), uint32(env.Message().GetTimestamp()/int64(time.Second))) } diff --git a/waku/v2/api/publish/rate_limiting.go b/waku/v2/api/publish/rate_limiting.go index 87c0f427c..a0bddcbdb 100644 --- a/waku/v2/api/publish/rate_limiting.go +++ b/waku/v2/api/publish/rate_limiting.go @@ -26,10 +26,7 @@ func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter { // ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { return func(envelope *protocol.Envelope, logger *zap.Logger) error { - if err := p.limiter.Wait(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("could not send message (limiter)", zap.Error(err)) - } + if err := p.Check(ctx, logger); err != nil { return err } return publishFn(envelope, logger) From 48de691a9729d7bdea274d2fc71a1fd70a818130 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Fri, 9 Aug 2024 13:33:41 +0800 Subject: [PATCH 08/10] chore: refactor send with request struct --- waku/v2/api/publish/message_sender.go | 72 +++++++++++++++++++++------ 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index cce2c542a..63c8065dc 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -23,6 +23,7 @@ type PublishMethod int const ( LightPush PublishMethod = iota Relay + DefaultMethod ) func (pm PublishMethod) String() string { @@ -37,7 +38,6 @@ func (pm PublishMethod) String() string { } type MessageSender struct { - ctx context.Context publishMethod PublishMethod lightPush *lightpush.WakuLightPush relay *relay.WakuRelay @@ -46,9 +46,27 @@ type MessageSender struct { logger *zap.Logger } -func NewMessageSender(ctx context.Context, publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) *MessageSender { - return &MessageSender{ +type Request struct { + ctx context.Context + envelope *protocol.Envelope + publishMethod PublishMethod +} + +func NewRequest(ctx context.Context, envelope *protocol.Envelope) *Request { + return &Request{ ctx: ctx, + envelope: envelope, + publishMethod: DefaultMethod, + } +} + +func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request { + r.publishMethod = publishMethod + return r +} + +func NewMessageSender(publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) *MessageSender { + return &MessageSender{ publishMethod: publishMethod, lightPush: lightPush, relay: relay, @@ -67,34 +85,60 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess return ms } -func (ms *MessageSender) Send(env *protocol.Envelope) error { - logger := ms.logger.With(zap.Stringer("envelopeHash", env.Hash()), zap.String("pubsubTopic", env.PubsubTopic()), zap.String("contentTopic", env.Message().ContentTopic), zap.Int64("timestamp", env.Message().GetTimestamp())) +func (ms *MessageSender) Send(req Request) error { + logger := ms.logger.With( + zap.Stringer("envelopeHash", req.envelope.Hash()), + zap.String("pubsubTopic", req.envelope.PubsubTopic()), + zap.String("contentTopic", req.envelope.Message().ContentTopic), + zap.Int64("timestamp", req.envelope.Message().GetTimestamp()), + ) + if ms.rateLimiter != nil { - if err := ms.rateLimiter.Check(ms.ctx, logger); err != nil { + if err := ms.rateLimiter.Check(req.ctx, logger); err != nil { return err } } - switch ms.publishMethod { + publishMethod := req.publishMethod + if publishMethod == DefaultMethod { + publishMethod = ms.publishMethod + } + + switch publishMethod { case LightPush: if ms.lightPush == nil { return errors.New("lightpush is not available") } logger.Info("publishing message via lightpush") - _, err := ms.lightPush.Publish(ms.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush)) - return err + _, err := ms.lightPush.Publish( + req.ctx, + req.envelope.Message(), + lightpush.WithPubSubTopic(req.envelope.PubsubTopic()), + lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush), + ) + if err != nil { + return err + } case Relay: if ms.relay == nil { return errors.New("relay is not available") } - peerCnt := len(ms.relay.PubSub().ListPeers(env.PubsubTopic())) + peerCnt := len(ms.relay.PubSub().ListPeers(req.envelope.PubsubTopic())) logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) - _, err := ms.relay.Publish(ms.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) - return err + _, err := ms.relay.Publish(req.ctx, req.envelope.Message(), relay.WithPubSubTopic(req.envelope.PubsubTopic())) + if err != nil { + return err + } + default: + return errors.New("unknown publish method") } - if ms.messageSentCheck != nil && !env.Message().GetEphemeral() { - ms.messageSentCheck.Add(env.PubsubTopic(), common.BytesToHash(env.Hash().Bytes()), uint32(env.Message().GetTimestamp()/int64(time.Second))) + if ms.messageSentCheck != nil && !req.envelope.Message().GetEphemeral() { + ms.messageSentCheck.Add( + req.envelope.PubsubTopic(), + common.BytesToHash(req.envelope.Hash().Bytes()), + uint32(req.envelope.Message().GetTimestamp()/int64(time.Second)), + ) } return nil From bba6f33788c4b69a0d707448f5b80d4e5e53e382 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Fri, 9 Aug 2024 15:00:39 +0800 Subject: [PATCH 09/10] chore: mock sent check interface --- waku/v2/api/publish/message_check.go | 7 ++ waku/v2/api/publish/message_sender.go | 19 ++-- waku/v2/api/publish/message_sender_test.go | 121 +++++++++++++++++++++ 3 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 waku/v2/api/publish/message_sender_test.go diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index 47f7c5408..a60a8d912 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -23,6 +23,13 @@ const DefaultMessageExpiredPerid = 10 // in seconds type MessageSentCheckOption func(*MessageSentCheck) error +type ISentCheck interface { + Start() + Add(topic string, messageID common.Hash, sentTime uint32) + DeleteByMessageIDs(messageIDs []common.Hash) + SetStorePeerID(peerID peer.ID) +} + // MessageSentCheck tracks the outgoing messages and check against store node // if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query // if the message keeps missing after `messageExpiredPerid`, the message id will be expired diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 63c8065dc..479d894ad 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -23,7 +23,7 @@ type PublishMethod int const ( LightPush PublishMethod = iota Relay - DefaultMethod + UnknownMethod ) func (pm PublishMethod) String() string { @@ -41,7 +41,7 @@ type MessageSender struct { publishMethod PublishMethod lightPush *lightpush.WakuLightPush relay *relay.WakuRelay - messageSentCheck *MessageSentCheck + messageSentCheck ISentCheck rateLimiter *PublishRateLimiter logger *zap.Logger } @@ -56,7 +56,7 @@ func NewRequest(ctx context.Context, envelope *protocol.Envelope) *Request { return &Request{ ctx: ctx, envelope: envelope, - publishMethod: DefaultMethod, + publishMethod: UnknownMethod, } } @@ -65,17 +65,20 @@ func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request { return r } -func NewMessageSender(publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) *MessageSender { +func NewMessageSender(publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) (*MessageSender, error) { + if publishMethod == UnknownMethod { + return nil, errors.New("publish method is required") + } return &MessageSender{ publishMethod: publishMethod, lightPush: lightPush, relay: relay, rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), logger: logger, - } + }, nil } -func (ms *MessageSender) WithMessageSentCheck(messageSentCheck *MessageSentCheck) *MessageSender { +func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender { ms.messageSentCheck = messageSentCheck return ms } @@ -85,7 +88,7 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess return ms } -func (ms *MessageSender) Send(req Request) error { +func (ms *MessageSender) Send(req *Request) error { logger := ms.logger.With( zap.Stringer("envelopeHash", req.envelope.Hash()), zap.String("pubsubTopic", req.envelope.PubsubTopic()), @@ -100,7 +103,7 @@ func (ms *MessageSender) Send(req Request) error { } publishMethod := req.publishMethod - if publishMethod == DefaultMethod { + if publishMethod == UnknownMethod { publishMethod = ms.publishMethod } diff --git a/waku/v2/api/publish/message_sender_test.go b/waku/v2/api/publish/message_sender_test.go new file mode 100644 index 000000000..6684951c8 --- /dev/null +++ b/waku/v2/api/publish/message_sender_test.go @@ -0,0 +1,121 @@ +package publish + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type MockMessageSentCheck struct { + Messages map[string]map[common.Hash]uint32 +} + +func (m *MockMessageSentCheck) Add(topic string, messageID common.Hash, time uint32) { + if m.Messages[topic] == nil { + m.Messages[topic] = make(map[common.Hash]uint32) + } + m.Messages[topic][messageID] = time +} + +func (m *MockMessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash) { +} + +func (m *MockMessageSentCheck) SetStorePeerID(peerID peer.ID) { +} + +func (m *MockMessageSentCheck) Start() { +} + +func TestNewSenderWithUnknownMethod(t *testing.T) { + sender, err := NewMessageSender(UnknownMethod, nil, nil, nil) + require.NotNil(t, err) + require.Nil(t, sender) +} + +func TestNewSenderWithRelay(t *testing.T) { + _, relayNode := createRelayNode(t) + relayNode.Start(context.Background()) + defer relayNode.Stop() + sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger()) + require.Nil(t, err) + require.NotNil(t, sender) + require.Nil(t, sender.messageSentCheck) + require.Equal(t, Relay, sender.publishMethod) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + Timestamp: utils.GetUnixEpoch(), + ContentTopic: "test-content-topic", + } + envelope := protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), "test-pubsub-topic") + req := NewRequest(context.TODO(), envelope) + err = sender.Send(req) + require.Nil(t, err) +} + +func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) { + _, relayNode := createRelayNode(t) + relayNode.Start(context.Background()) + defer relayNode.Stop() + sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger()) + + check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)} + sender.WithMessageSentCheck(check) + require.Nil(t, err) + require.NotNil(t, sender) + require.NotNil(t, sender.messageSentCheck) + require.Equal(t, Relay, sender.publishMethod) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + Timestamp: utils.GetUnixEpoch(), + ContentTopic: "test-content-topic", + } + envelope := protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), "test-pubsub-topic") + req := NewRequest(context.TODO(), envelope) + + require.Equal(t, 0, len(check.Messages)) + + err = sender.Send(req) + require.Nil(t, err) + require.Equal(t, 1, len(check.Messages)) + require.Equal( + t, + uint32(msg.GetTimestamp()/int64(time.Second)), + check.Messages["test-pubsub-topic"][common.BytesToHash(envelope.Hash().Bytes())], + ) +} + +func TestNewSenderWithLightPush(t *testing.T) { + sender, err := NewMessageSender(LightPush, nil, nil, nil) + require.Nil(t, err) + require.NotNil(t, sender) + require.Equal(t, LightPush, sender.publishMethod) +} + +func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) { + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + bcaster := relay.NewBroadcaster(10) + relay := relay.NewWakuRelay(bcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + relay.SetHost(host) + err = bcaster.Start(context.Background()) + require.NoError(t, err) + + return host, relay +} From 5ae71957bb311e7f422c0baeb1978e412b9f65f0 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Fri, 9 Aug 2024 15:11:37 +0800 Subject: [PATCH 10/10] chore: fix lint --- waku/v2/api/publish/message_sender_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/waku/v2/api/publish/message_sender_test.go b/waku/v2/api/publish/message_sender_test.go index 6684951c8..d6945c8c4 100644 --- a/waku/v2/api/publish/message_sender_test.go +++ b/waku/v2/api/publish/message_sender_test.go @@ -47,7 +47,8 @@ func TestNewSenderWithUnknownMethod(t *testing.T) { func TestNewSenderWithRelay(t *testing.T) { _, relayNode := createRelayNode(t) - relayNode.Start(context.Background()) + err := relayNode.Start(context.Background()) + require.Nil(t, err) defer relayNode.Stop() sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger()) require.Nil(t, err) @@ -68,7 +69,8 @@ func TestNewSenderWithRelay(t *testing.T) { func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) { _, relayNode := createRelayNode(t) - relayNode.Start(context.Background()) + err := relayNode.Start(context.Background()) + require.Nil(t, err) defer relayNode.Stop() sender, err := NewMessageSender(Relay, nil, relayNode, utils.Logger())