Skip to content

Commit

Permalink
chore: pass in chan
Browse files Browse the repository at this point in the history
  • Loading branch information
kaichaosun committed Aug 8, 2024
1 parent 602107d commit 4f027dd
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
14 changes: 7 additions & 7 deletions waku/v2/api/publish/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type MessageSentCheck struct {
messageIDs map[string]map[common.Hash]uint32
messageIDsMu sync.RWMutex
storePeerID peer.ID
MessageStoredChan chan common.Hash
MessageExpiredChan chan common.Hash
messageStoredChan chan common.Hash
messageExpiredChan chan common.Hash
ctx context.Context
store *store.WakuStore
timesource timesource.Timesource
Expand All @@ -43,12 +43,12 @@ type MessageSentCheck struct {
}

// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, logger *zap.Logger) *MessageSentCheck {
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck {
return &MessageSentCheck{
messageIDs: make(map[string]map[common.Hash]uint32),
messageIDsMu: sync.RWMutex{},
MessageStoredChan: make(chan common.Hash, 1000),
MessageExpiredChan: make(chan common.Hash, 1000),
messageStoredChan: msgStoredChan,
messageExpiredChan: msgExpiredChan,
ctx: ctx,
store: store,
timesource: timesource,
Expand Down Expand Up @@ -232,12 +232,12 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c

if found {
ackHashes = append(ackHashes, hash)
m.MessageStoredChan <- hash
m.messageStoredChan <- hash
}

if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid {
missedHashes = append(missedHashes, hash)
m.MessageExpiredChan <- hash
m.messageExpiredChan <- hash
}
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/publish/message_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestAddAndDelete(t *testing.T) {
ctx := context.TODO()
messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil)
messageSentCheck := NewMessageSentCheck(ctx, nil, nil, nil, nil, nil)

messageSentCheck.Add("topic", [32]byte{1}, 1)
messageSentCheck.Add("topic", [32]byte{2}, 2)
Expand Down
6 changes: 6 additions & 0 deletions waku/v2/api/publish/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func (ms *MessageSender) Send(env *protocol.Envelope) error {
return nil
}

func (ms *MessageSender) Start() {
if ms.messageSentCheck != nil {
go ms.messageSentCheck.Start()
}
}

func (ms *MessageSender) PublishMethod() PublishMethod {
return ms.publishMethod
}
Expand Down

0 comments on commit 4f027dd

Please sign in to comment.