From 649ced2ff732e449da3f5ceae62cd8cc3b03a55a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Mon, 19 Aug 2024 14:59:10 +0200 Subject: [PATCH] feat: check Gateway pending clients readiness concurrently (#6357) (#6424) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Grzegorz Burzyński (cherry picked from commit b85dbb1fc36e7f24423447a91c1233144d7d6271) Co-authored-by: Patryk Małek --- internal/clients/manager_test.go | 24 +++++ internal/clients/readiness.go | 136 ++++++++++++++++++++--------- internal/clients/readiness_test.go | 96 ++++++++++++++++---- 3 files changed, 197 insertions(+), 59 deletions(-) diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index 6c466f3e19..8657d17d8a 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -86,6 +86,11 @@ func intoTurnedPending(urls ...string) []adminapi.DiscoveredAdminAPI { } func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.T) { + const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" + ) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -240,6 +245,11 @@ func TestAdminAPIClientsManager_Clients_DBMode(t *testing.T) { } func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { + const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" + ) + t.Parallel() readinessChecker := &mockReadinessChecker{} @@ -330,6 +340,10 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) { } func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { + const ( + testURL1 = "http://localhost:8001" + ) + readinessChecker := &mockReadinessChecker{} readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)}) testClient, err := adminapi.NewTestClient(testURL1) @@ -365,6 +379,11 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) { } func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) { + const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" + ) + testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) @@ -459,6 +478,11 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) { } func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) { + const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" + ) + testClient, err := adminapi.NewTestClient(testURL1) require.NoError(t, err) diff --git a/internal/clients/readiness.go b/internal/clients/readiness.go index 6283b224c9..19ad26fad5 100644 --- a/internal/clients/readiness.go +++ b/internal/clients/readiness.go @@ -3,7 +3,7 @@ package clients import ( "context" "errors" - "fmt" + "sync" "time" "github.com/go-logr/logr" @@ -14,7 +14,7 @@ import ( ) const ( - readinessCheckTimeout = time.Second + readinessCheckTimeout = 5 * time.Second ) // ReadinessCheckResult represents the result of a readiness check. @@ -70,19 +70,55 @@ func (c DefaultReadinessChecker) CheckReadiness( readyClients []AlreadyCreatedClient, pendingClients []adminapi.DiscoveredAdminAPI, ) ReadinessCheckResult { + var ( + turnedReadyCh = make(chan []*adminapi.Client) + turnedPendingCh = make(chan []adminapi.DiscoveredAdminAPI) + ) + + go func(ctx context.Context, pendingClients []adminapi.DiscoveredAdminAPI) { + turnedReadyCh <- c.checkPendingGatewayClients(ctx, pendingClients) + close(turnedReadyCh) + }(ctx, pendingClients) + + go func(ctx context.Context, readyClients []AlreadyCreatedClient) { + turnedPendingCh <- c.checkAlreadyExistingClients(ctx, readyClients) + close(turnedPendingCh) + }(ctx, readyClients) + return ReadinessCheckResult{ - ClientsTurnedReady: c.checkPendingGatewayClients(ctx, pendingClients), - ClientsTurnedPending: c.checkAlreadyExistingClients(ctx, readyClients), + ClientsTurnedReady: <-turnedReadyCh, + ClientsTurnedPending: <-turnedPendingCh, } } // checkPendingGatewayClients checks if the pending clients are ready to be used and returns the ones that are. func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) { + var ( + wg sync.WaitGroup + ch = make(chan *adminapi.Client) + ) for _, adminAPI := range lastPending { - if client := c.checkPendingClient(ctx, adminAPI); client != nil { - turnedReady = append(turnedReady, client) - } + wg.Add(1) + go func(adminAPI adminapi.DiscoveredAdminAPI) { + defer wg.Done() + if client := c.checkPendingClient(ctx, adminAPI); client != nil { + select { + case ch <- client: + case <-ctx.Done(): + } + } + }(adminAPI) + } + + go func() { + wg.Wait() + close(ch) + }() + + for client := range ch { + turnedReady = append(turnedReady, client) } + return turnedReady } @@ -93,72 +129,92 @@ func (c DefaultReadinessChecker) checkPendingClient( ctx context.Context, pendingClient adminapi.DiscoveredAdminAPI, ) (client *adminapi.Client) { - defer func() { - c.logger.V(logging.DebugLevel). - Info(fmt.Sprintf("Checking readiness of pending client for %q", pendingClient.Address), - "ok", client != nil, - ) - }() - ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) defer cancel() + + logger := c.logger.WithValues("address", pendingClient.Address) + client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient) if err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. - c.logger.V(logging.DebugLevel).Info("Pending client is not ready yet", + logger.V(logging.DebugLevel).Info( + "Pending client is not ready yet", "reason", err.Error(), - "address", pendingClient.Address, ) return nil } + logger.V(logging.DebugLevel).Info( + "Checked readiness of pending client", + "ok", client != nil, + ) + return client } // checkAlreadyExistingClients checks if the already existing clients are still ready to be used and returns the ones // that are not. func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, alreadyCreatedClients []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) { + var ( + wg sync.WaitGroup + pendingChan = make(chan adminapi.DiscoveredAdminAPI) + ) + for _, client := range alreadyCreatedClients { - // For ready clients we check readiness by calling the Status endpoint. - if ready := c.checkAlreadyCreatedClient(ctx, client); !ready { - podRef, ok := client.PodReference() - if !ok { - // This should never happen, but if it does, we want to log it. - c.logger.Error( - errors.New("missing pod reference"), - "Failed to get PodReference for client", - "address", client.BaseRootURL(), - ) - continue + wg.Add(1) + go func(client AlreadyCreatedClient) { + defer wg.Done() + + // For ready clients we check readiness by calling the Status endpoint. + if ready := c.checkAlreadyCreatedClient(ctx, client); !ready { + podRef, ok := client.PodReference() + if !ok { + // This should never happen, but if it does, we want to log it. + c.logger.Error( + errors.New("missing pod reference"), + "Failed to get PodReference for client", + "address", client.BaseRootURL(), + ) + return + } + select { + case <-ctx.Done(): + case pendingChan <- adminapi.DiscoveredAdminAPI{ + Address: client.BaseRootURL(), + PodRef: podRef, + }: + } } - turnedPending = append(turnedPending, adminapi.DiscoveredAdminAPI{ - Address: client.BaseRootURL(), - PodRef: podRef, - }) - } + }(client) } + + go func() { + wg.Wait() + close(pendingChan) + }() + + for pendingClient := range pendingChan { + turnedPending = append(turnedPending, pendingClient) + } + return turnedPending } func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, client AlreadyCreatedClient) (ready bool) { - defer func() { - c.logger.V(logging.DebugLevel).Info( - fmt.Sprintf("Checking readiness of already created client for %q", client.BaseRootURL()), - "ok", ready, - ) - }() + logger := c.logger.WithValues("address", client.BaseRootURL()) ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout) defer cancel() if err := client.IsReady(ctx); err != nil { // Despite the error reason we still want to keep the client in the pending list to retry later. - c.logger.V(logging.DebugLevel).Info( + logger.V(logging.DebugLevel).Info( "Already created client is not ready, moving to pending", - "address", client.BaseRootURL(), "reason", err.Error(), ) return false } + logger.V(logging.DebugLevel).Info("Already created client is ready") + return true } diff --git a/internal/clients/readiness_test.go b/internal/clients/readiness_test.go index 07472c0b40..910da8a9ab 100644 --- a/internal/clients/readiness_test.go +++ b/internal/clients/readiness_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "github.com/go-logr/logr" @@ -15,34 +16,27 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/clients" ) -const ( - testURL1 = "http://localhost:8001" - testURL2 = "http://localhost:8002" -) - -var testPodRef = k8stypes.NamespacedName{ - Namespace: "default", - Name: "mock", -} - type mockClientFactory struct { ready map[string]bool // Maps address to readiness. - callsCount map[string]int // Maps address to number of CreateAdminAPIClient calls. + lock sync.RWMutex + callsCount map[string]int // Maps address to number of CreateAdminAPIClient calls. t *testing.T } -func newMockClientFactory(t *testing.T, ready map[string]bool) mockClientFactory { - return mockClientFactory{ +func newMockClientFactory(t *testing.T, ready map[string]bool) *mockClientFactory { + return &mockClientFactory{ ready: ready, callsCount: map[string]int{}, t: t, } } -func (cf mockClientFactory) CreateAdminAPIClient(_ context.Context, adminAPI adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) { +func (cf *mockClientFactory) CreateAdminAPIClient(_ context.Context, adminAPI adminapi.DiscoveredAdminAPI) (*adminapi.Client, error) { address := adminAPI.Address + cf.lock.Lock() cf.callsCount[address]++ + cf.lock.Unlock() ready, ok := cf.ready[address] if !ok { @@ -55,9 +49,16 @@ func (cf mockClientFactory) CreateAdminAPIClient(_ context.Context, adminAPI adm return adminapi.NewTestClient(address) } +func (cf *mockClientFactory) CallsForAddress(address string) int { + cf.lock.RLock() + defer cf.lock.RUnlock() + return cf.callsCount[address] +} + type mockAlreadyCreatedClient struct { url string isReady bool + podRef k8stypes.NamespacedName } func (m mockAlreadyCreatedClient) IsReady(context.Context) error { @@ -68,7 +69,7 @@ func (m mockAlreadyCreatedClient) IsReady(context.Context) error { } func (m mockAlreadyCreatedClient) PodReference() (k8stypes.NamespacedName, bool) { - return testPodRef, true + return m.podRef, true } func (m mockAlreadyCreatedClient) BaseRootURL() string { @@ -76,6 +77,18 @@ func (m mockAlreadyCreatedClient) BaseRootURL() string { } func TestDefaultReadinessChecker(t *testing.T) { + const ( + testURL1 = "http://localhost:8001" + testURL2 = "http://localhost:8002" + testURL3 = "http://localhost:8003" + testURL4 = "http://localhost:8004" + ) + + testPodRef := k8stypes.NamespacedName{ + Namespace: "default", + Name: "mock", + } + testCases := []struct { name string @@ -90,6 +103,7 @@ func TestDefaultReadinessChecker(t *testing.T) { name: "ready turning pending", alreadyCreatedClients: []clients.AlreadyCreatedClient{ mockAlreadyCreatedClient{ + podRef: testPodRef, url: testURL1, isReady: false, }, @@ -113,9 +127,15 @@ func TestDefaultReadinessChecker(t *testing.T) { name: "ready turning pending, pending turning ready at once", alreadyCreatedClients: []clients.AlreadyCreatedClient{ mockAlreadyCreatedClient{ + podRef: testPodRef, url: testURL1, isReady: false, }, + mockAlreadyCreatedClient{ + podRef: testPodRef, + url: testURL3, + isReady: true, + }, }, pendingClients: []adminapi.DiscoveredAdminAPI{ { @@ -126,13 +146,18 @@ func TestDefaultReadinessChecker(t *testing.T) { pendingClientsReadiness: map[string]bool{ testURL2: true, }, - expectedTurnedReady: []string{testURL2}, - expectedTurnedPending: []string{testURL1}, + expectedTurnedReady: []string{ + testURL2, + }, + expectedTurnedPending: []string{ + testURL1, + }, }, { name: "no changes", alreadyCreatedClients: []clients.AlreadyCreatedClient{ mockAlreadyCreatedClient{ + podRef: testPodRef, url: testURL1, isReady: true, }, @@ -158,10 +183,12 @@ func TestDefaultReadinessChecker(t *testing.T) { name: "multiple ready, one turning pending", alreadyCreatedClients: []clients.AlreadyCreatedClient{ mockAlreadyCreatedClient{ + podRef: testPodRef, url: testURL1, isReady: true, }, mockAlreadyCreatedClient{ + podRef: testPodRef, url: testURL2, isReady: false, // This one will turn pending. }, @@ -191,6 +218,37 @@ func TestDefaultReadinessChecker(t *testing.T) { testURL2, }, }, + { + name: "multiple pending, two turning ready", + pendingClients: []adminapi.DiscoveredAdminAPI{ + { + Address: testURL1, + PodRef: testPodRef, + }, + { + Address: testURL2, + PodRef: testPodRef, + }, + { + Address: testURL3, + PodRef: testPodRef, + }, + { + Address: testURL4, + PodRef: testPodRef, + }, + }, + pendingClientsReadiness: map[string]bool{ + testURL1: false, + testURL2: true, // This one will turn ready. + testURL3: false, + testURL4: true, // This one will turn ready. + }, + expectedTurnedReady: []string{ + testURL2, + testURL4, + }, + }, } for _, tc := range testCases { @@ -207,12 +265,12 @@ func TestDefaultReadinessChecker(t *testing.T) { // For every pending client turning ready we expect exactly one call to CreateAdminAPIClient. for _, url := range tc.pendingClients { - require.Equal(t, 1, factory.callsCount[url.Address]) + require.Equal(t, 1, factory.CallsForAddress(url.Address)) } // For every already created client we expect NO calls to CreateAdminAPIClient. for _, url := range tc.alreadyCreatedClients { - require.Zero(t, factory.callsCount[url.BaseRootURL()]) + require.Zero(t, factory.CallsForAddress(url.BaseRootURL())) } }) }