Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Oct 2, 2024
1 parent 9013442 commit 0959473
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions p2p/node/pubsubManager/gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestMultipleRequests(t *testing.T) {
n := 100

// Use a context with timeout to avoid hanging indefinitely
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

mockHost, mockPeerStore, privKey, peerID := setup(t)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestMultipleRequests(t *testing.T) {
return pubsub.ValidationAccept
}

// SUBSCRIBE
// SUBSCRIBE to all topics
for i, topic := range topics {
err := ps.SubscribeAndRegisterValidator(common.Location{0, 0}, topic, validatorFunc)
require.NoError(t, err, "Failed to subscribe to topic %d", topic)
Expand All @@ -183,7 +183,7 @@ func TestMultipleRequests(t *testing.T) {
}
}

// BROADCAST
// Create a buffered channel large enough to hold all sent messages
testCh := make(chan interface{}, n*len(topics))
ps.SetReceiveHandler(func(receivedFrom peer.ID, msgId string, msgTopic string, data interface{}, location common.Location) {
select {
Expand All @@ -194,8 +194,10 @@ func TestMultipleRequests(t *testing.T) {
})

var messages []interface{}
var wg sync.WaitGroup
var broadcastWg sync.WaitGroup
var receiveWg sync.WaitGroup

// BROADCAST messages concurrently
for i := 0; i < n; i++ {
newWo := types.CopyWorkObject(wo)
newWo.WorkObjectHeader().SetNonce(types.EncodeNonce(uint64(i)))
Expand All @@ -211,38 +213,44 @@ func TestMultipleRequests(t *testing.T) {
}

messages = append(messages, msg)
wg.Add(1)
broadcastWg.Add(1)

// Broadcast each message in its own goroutine
go func(msg interface{}) {
defer wg.Done()
err = ps.Broadcast(common.Location{0, 0}, msg)
defer broadcastWg.Done()
err := ps.Broadcast(common.Location{0, 0}, msg)
require.NoError(t, err, "Failed to broadcast message")
}(msg)
}
}

// VERIFY
// VERIFY receiving concurrently
var mu sync.Mutex
receivedMessages := make([]interface{}, 0, n*len(topics))

for i := 0; i < (n * len(topics)); i++ {
wg.Add(1)
receiveWg.Add(1)
go func(j int) {
defer wg.Done()
defer receiveWg.Done()
select {
case receivedMessage := <-testCh:
mu.Lock()
receivedMessages = append(receivedMessages, receivedMessage)
mu.Unlock()
case <-ctx.Done():
t.Error("context done before receive message at index: ", j)
require.True(t, false, "Unable to receive one of the messages")
}
}(i)
}

wg.Wait()
// Wait for all broadcasts to complete
broadcastWg.Wait()
// Wait for all receiving to complete
receiveWg.Wait()

// Ensure all broadcasted messages were received
require.Len(t, receivedMessages, len(messages), "The number of received messages does not match the number of broadcasted messages. expected: %d, got: %d", len(messages), len(receivedMessages))
require.Len(t, receivedMessages, len(messages), "The number of received messages does not match the number of broadcasted messages. sent: %d, received: %d", len(receivedMessages), len(messages))

ps.Stop()
if len(ps.GetTopics()) != 0 {
Expand Down

0 comments on commit 0959473

Please sign in to comment.