Skip to content

Commit

Permalink
fix: waitGroup, deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
vpavlin committed Aug 2, 2024
1 parent 4355c4a commit 5f14adc
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 98 deletions.
12 changes: 6 additions & 6 deletions examples/chat2-reliable/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
incomingBuffer: make([]*pb.Message, 0),
messageHistory: make([]*pb.Message, 0),
mutex: sync.Mutex{},
//C: make(chan *protocol.Envelope, 10),
}

chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
}

connWg := sync.WaitGroup{}
connWg.Add(3)
connWg.Add(2)

chat.wg.Add(7) // Added 2 more goroutines for periodic tasks
go chat.parseInput()
Expand Down Expand Up @@ -158,11 +159,11 @@ func (c *Chat) receiveMessages() {
}

msg, err := decodeMessage(c.options.ContentTopic, value.Message())
if err == nil {
c.processReceivedMessage(msg)
} else {
if err != nil {
fmt.Printf("Error decoding message: %v\n", err)
continue
}
c.processReceivedMessage(msg)
}
}
}
Expand Down Expand Up @@ -396,7 +397,7 @@ func decodeMessage(contentTopic string, wakumsg *wpb.WakuMessage) (*pb.Message,
func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
defer c.wg.Done()

connectionWg.Wait() // Wait until node connection operations are done
connectionWg.Wait() // Wait until node connection operations are

if !c.options.Store.Enable {
return
Expand Down Expand Up @@ -427,7 +428,6 @@ func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) {
store.WithAutomaticRequestID(),
storeOpt,
store.WithPaging(false, 100))

if err != nil {
c.ui.ErrorMessage(fmt.Errorf("could not query storenode: %w", err))
} else {
Expand Down
105 changes: 13 additions & 92 deletions examples/chat2-reliable/reliability.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/bits-and-blooms/bloom/v3"
Expand All @@ -29,10 +28,11 @@ const (
)

func (c *Chat) initReliabilityProtocol() {
c.wg.Add(5)
c.wg.Add(4)
c.setupMessageRequestHandler()

go c.periodicBufferSweep()
go c.periodicSyncMessage()
go c.setupMessageRequestHandler()
go c.startBloomFilterCleaner()
go c.startEagerPushMechanism()
}
Expand All @@ -59,63 +59,6 @@ type UnacknowledgedMessage struct {
ResendAttempts int
}

type TimestampedMessageID struct {
ID string
Timestamp time.Time
}

type RollingBloomFilter struct {
filter *bloom.BloomFilter
window time.Duration
messages []TimestampedMessageID
mutex sync.Mutex
}

func NewRollingBloomFilter() *RollingBloomFilter {
return &RollingBloomFilter{
filter: bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate),
window: bloomFilterWindow,
messages: make([]TimestampedMessageID, 0),
}
}

func (rbf *RollingBloomFilter) Add(messageID string) {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()

rbf.filter.Add([]byte(messageID))
rbf.messages = append(rbf.messages, TimestampedMessageID{
ID: messageID,
Timestamp: time.Now(),
})
}

func (rbf *RollingBloomFilter) Test(messageID string) bool {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()

return rbf.filter.Test([]byte(messageID))
}

func (rbf *RollingBloomFilter) Clean() {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()

cutoff := time.Now().Add(-rbf.window)
newMessages := make([]TimestampedMessageID, 0)
newFilter := bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate)

for _, msg := range rbf.messages {
if msg.Timestamp.After(cutoff) {
newMessages = append(newMessages, msg)
newFilter.Add([]byte(msg.ID))
}
}

rbf.messages = newMessages
rbf.filter = newFilter
}

func (c *Chat) startBloomFilterCleaner() {
defer c.wg.Done()

Expand Down Expand Up @@ -170,10 +113,10 @@ func (c *Chat) processReceivedMessage(msg *pb.Message) {

func (c *Chat) processBufferedMessages() {
c.mutex.Lock()
defer c.mutex.Unlock()

processed := make(map[string]bool)
remainingBuffer := make([]*pb.Message, 0, len(c.incomingBuffer))
processedBuffer := make([]*pb.Message, 0)

for _, msg := range c.incomingBuffer {
if processed[msg.MessageId] {
Expand All @@ -183,19 +126,22 @@ func (c *Chat) processBufferedMessages() {
missingDeps := c.checkCausalDependencies(msg)
if len(missingDeps) == 0 {
// Release the lock while processing the message
c.mutex.Unlock()
if msg.Content != "" {
c.ui.ChatMessage(int64(c.getLamportTimestamp()), msg.SenderId, msg.Content)
}

c.addToMessageHistory(msg)
c.mutex.Lock()
processedBuffer = append(processedBuffer, msg)
} else {
remainingBuffer = append(remainingBuffer, msg)
}
}

c.incomingBuffer = remainingBuffer
c.mutex.Unlock()

for _, msg := range processedBuffer {
c.addToMessageHistory(msg)
}
}

func (c *Chat) reviewAckStatus(msg *pb.Message) {
Expand Down Expand Up @@ -330,7 +276,7 @@ func (c *Chat) checkUnacknowledgedMessages() {
unackMsg := c.outgoingBuffer[i]
if now.Sub(unackMsg.SendTime) > ackTimeout {
if unackMsg.ResendAttempts < maxResendAttempts {
c.resendMessage(unackMsg.Message)
c.resendMessage(unackMsg.Message, unackMsg.ResendAttempts)
c.outgoingBuffer[i].ResendAttempts++
c.outgoingBuffer[i].SendTime = now
} else {
Expand All @@ -343,9 +289,9 @@ func (c *Chat) checkUnacknowledgedMessages() {
}
}

func (c *Chat) resendMessage(msg *pb.Message) {
func (c *Chat) resendMessage(msg *pb.Message, resendAttempts int) {
go func() {
delay := resendBaseDelay * time.Duration(1<<uint(c.getResendAttempts(msg.MessageId)))
delay := resendBaseDelay * time.Duration(1<<uint(resendAttempts))
if delay > maxResendDelay {
delay = maxResendDelay
}
Expand All @@ -361,17 +307,6 @@ func (c *Chat) resendMessage(msg *pb.Message) {
}()
}

func (c *Chat) getResendAttempts(messageId string) int {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, unackMsg := range c.outgoingBuffer {
if unackMsg.Message.MessageId == messageId {
return unackMsg.ResendAttempts
}
}
return 0
}

func (c *Chat) periodicSyncMessage() {
defer c.wg.Done()

Expand Down Expand Up @@ -442,17 +377,3 @@ func (c *Chat) getLamportTimestamp() int32 {
defer c.mutex.Unlock()
return c.lamportTimestamp
}

// MarshalBinary implements the encoding.BinaryMarshaler interface for RollingBloomFilter
func (rbf *RollingBloomFilter) MarshalBinary() ([]byte, error) {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()
return rbf.filter.MarshalBinary()
}

// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface for RollingBloomFilter
func (rbf *RollingBloomFilter) UnmarshalBinary(data []byte) error {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()
return rbf.filter.UnmarshalBinary(data)
}
79 changes: 79 additions & 0 deletions examples/chat2-reliable/rolling_bloom_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"sync"
"time"

"github.com/bits-and-blooms/bloom/v3"
)

type TimestampedMessageID struct {
ID string
Timestamp time.Time
}

type RollingBloomFilter struct {
filter *bloom.BloomFilter
window time.Duration
messages []TimestampedMessageID
mutex sync.Mutex
}

func NewRollingBloomFilter() *RollingBloomFilter {
return &RollingBloomFilter{
filter: bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate),
window: bloomFilterWindow,
messages: make([]TimestampedMessageID, 0),
}
}

func (rbf *RollingBloomFilter) Add(messageID string) {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()

rbf.filter.Add([]byte(messageID))
rbf.messages = append(rbf.messages, TimestampedMessageID{
ID: messageID,
Timestamp: time.Now(),
})
}

func (rbf *RollingBloomFilter) Test(messageID string) bool {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()

return rbf.filter.Test([]byte(messageID))
}

func (rbf *RollingBloomFilter) Clean() {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()

cutoff := time.Now().Add(-rbf.window)
newMessages := make([]TimestampedMessageID, 0)
newFilter := bloom.NewWithEstimates(bloomFilterSize, bloomFilterFPRate)

for _, msg := range rbf.messages {
if msg.Timestamp.After(cutoff) {
newMessages = append(newMessages, msg)
newFilter.Add([]byte(msg.ID))
}
}

rbf.messages = newMessages
rbf.filter = newFilter
}

// MarshalBinary implements the encoding.BinaryMarshaler interface for RollingBloomFilter
func (rbf *RollingBloomFilter) MarshalBinary() ([]byte, error) {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()
return rbf.filter.MarshalBinary()
}

// UnmarshalBinary implements the encoding.BinaryUnmarshaler interface for RollingBloomFilter
func (rbf *RollingBloomFilter) UnmarshalBinary(data []byte) error {
rbf.mutex.Lock()
defer rbf.mutex.Unlock()
return rbf.filter.UnmarshalBinary(data)
}

0 comments on commit 5f14adc

Please sign in to comment.