diff --git a/p2p/node/pubsubManager/gossipsub_test.go b/p2p/node/pubsubManager/gossipsub_test.go index 012416666a..a93b14dcd2 100644 --- a/p2p/node/pubsubManager/gossipsub_test.go +++ b/p2p/node/pubsubManager/gossipsub_test.go @@ -143,7 +143,7 @@ func TestMultipleRequests(t *testing.T) { n := 100 // Use a context with timeout to avoid hanging indefinitely - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() mockHost, mockPeerStore, privKey, peerID := setup(t) @@ -174,7 +174,7 @@ func TestMultipleRequests(t *testing.T) { return pubsub.ValidationAccept } - // SUBSCRIBE + // SUBSCRIBE to all topics for i, topic := range topics { err := ps.SubscribeAndRegisterValidator(common.Location{0, 0}, topic, validatorFunc) require.NoError(t, err, "Failed to subscribe to topic %d", topic) @@ -183,7 +183,7 @@ func TestMultipleRequests(t *testing.T) { } } - // BROADCAST + // Create a buffered channel large enough to hold all sent messages testCh := make(chan interface{}, n*len(topics)) ps.SetReceiveHandler(func(receivedFrom peer.ID, msgId string, msgTopic string, data interface{}, location common.Location) { select { @@ -194,8 +194,10 @@ func TestMultipleRequests(t *testing.T) { }) var messages []interface{} - var wg sync.WaitGroup + var broadcastWg sync.WaitGroup + var receiveWg sync.WaitGroup + // BROADCAST messages concurrently for i := 0; i < n; i++ { newWo := types.CopyWorkObject(wo) newWo.WorkObjectHeader().SetNonce(types.EncodeNonce(uint64(i))) @@ -211,23 +213,25 @@ func TestMultipleRequests(t *testing.T) { } messages = append(messages, msg) - wg.Add(1) + broadcastWg.Add(1) + // Broadcast each message in its own goroutine go func(msg interface{}) { - defer wg.Done() - err = ps.Broadcast(common.Location{0, 0}, msg) + defer broadcastWg.Done() + err := ps.Broadcast(common.Location{0, 0}, msg) require.NoError(t, err, "Failed to broadcast message") }(msg) } } - // VERIFY + // VERIFY receiving concurrently var mu sync.Mutex receivedMessages := make([]interface{}, 0, n*len(topics)) + for i := 0; i < (n * len(topics)); i++ { - wg.Add(1) + receiveWg.Add(1) go func(j int) { - defer wg.Done() + defer receiveWg.Done() select { case receivedMessage := <-testCh: mu.Lock() @@ -235,14 +239,18 @@ func TestMultipleRequests(t *testing.T) { mu.Unlock() case <-ctx.Done(): t.Error("context done before receive message at index: ", j) + require.True(t, false, "Unable to receive one of the messages") } }(i) } - wg.Wait() + // Wait for all broadcasts to complete + broadcastWg.Wait() + // Wait for all receiving to complete + receiveWg.Wait() // Ensure all broadcasted messages were received - require.Len(t, receivedMessages, len(messages), "The number of received messages does not match the number of broadcasted messages. expected: %d, got: %d", len(messages), len(receivedMessages)) + require.Len(t, receivedMessages, len(messages), "The number of received messages does not match the number of broadcasted messages. sent: %d, received: %d", len(receivedMessages), len(messages)) ps.Stop() if len(ps.GetTopics()) != 0 {