From 05a2a218e3b17272f2b66f63ab7c98fc7b5d27a2 Mon Sep 17 00:00:00 2001 From: Shivam Saraf Date: Sun, 6 Oct 2024 22:18:09 -0400 Subject: [PATCH] Fix flaky test --- service/matching/matcher_test.go | 43 ++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/service/matching/matcher_test.go b/service/matching/matcher_test.go index 89a2ddca3bb..a002a3f5700 100644 --- a/service/matching/matcher_test.go +++ b/service/matching/matcher_test.go @@ -267,24 +267,51 @@ func (t *MatcherTestSuite) TestRejectSyncMatchWhenBacklog() { func (t *MatcherTestSuite) TestForwardingWhenBacklogIsYoung() { historyTask := newInternalTaskForSyncMatch(randomTaskInfo().Data, nil) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) intruptC := make(chan struct{}) - // poll forwarding attempt happens when there is no backlog - t.client.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&matchingservice.PollWorkflowTaskQueueResponse{}, errMatchingHostThrottleTest) - go t.childMatcher.Poll(ctx, &pollMetadata{}) //nolint:errcheck - time.Sleep(time.Millisecond) + var wg sync.WaitGroup + wg.Add(1) - // task is not forwarded because there is a poller waiting - youngBacklogTask := newInternalTaskFromBacklog(randomTaskInfoWithAge(time.Second), nil) + t.client.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(arg0 context.Context, arg1 *matchingservice.PollWorkflowTaskQueueRequest, arg2 ...interface{}) { + wg.Done() + }, + ).Return(&matchingservice.PollWorkflowTaskQueueResponse{}, errMatchingHostThrottleTest) + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + // poll forwarding attempt happens when there is no backlog + _, err := t.childMatcher.Poll(ctx, &pollMetadata{}) + t.Assert().NoError(err) + cancel() + }() + // This ensures that the poll request has been forwarded to the parent partition before the offer is made. + // Without this, there is a chance that the request is matched on the child partition, which will fail the test by + // complaining about a missing PollWorkflowTaskQueue. + wg.Wait() + + // to ensure poller is now blocked locally + time.Sleep(2 * time.Millisecond) + + // task is not forwarded because there is a local poller waiting err := t.childMatcher.MustOffer(ctx, historyTask, intruptC) t.Nil(err) cancel() // young task is forwarded - t.client.EXPECT().AddWorkflowTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(&matchingservice.AddWorkflowTaskResponse{}, errMatchingHostThrottleTest) + youngBacklogTask := newInternalTaskFromBacklog(randomTaskInfoWithAge(time.Second), nil) + + wg.Add(1) + t.client.EXPECT().AddWorkflowTask(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(arg0 context.Context, arg1 *matchingservice.AddWorkflowTaskRequest, arg2 ...interface{}) { + // Offer forwarding has occured + wg.Done() + }, + ).Return(&matchingservice.AddWorkflowTaskResponse{}, errMatchingHostThrottleTest) ctx, cancel = context.WithTimeout(context.Background(), time.Second) go t.childMatcher.MustOffer(ctx, youngBacklogTask, intruptC) //nolint:errcheck + wg.Wait() time.Sleep(time.Millisecond) cancel() }