Skip to content

Commit

Permalink
feat: check Gateway pending clients readiness concurrently (#6357)
Browse files Browse the repository at this point in the history
Co-authored-by: Grzegorz Burzyński <[email protected]>
  • Loading branch information
pmalek and czeslavo authored Jul 30, 2024
1 parent 745900d commit b85dbb1
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 46 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ Adding a new version? You'll need three changes:
resource is attached to multiple foreign Kong entities.
[#6280](https://github.com/Kong/kubernetes-ingress-controller/pull/6280)

### Changed

- Check Kong Gateway readiness concurrently. This greatly reduces the time which
is required to check all Gateway instances readiness, especially when there's many
of them. Increased individual readiness check timeout from 1s to 5s.
[#6347](https://github.com/Kong/kubernetes-ingress-controller/pull/6347)
[#6357](https://github.com/Kong/kubernetes-ingress-controller/pull/6357)

## 3.2.3

> Release date: 2024-07-23
Expand Down
24 changes: 24 additions & 0 deletions internal/clients/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func intoTurnedPending(urls ...string) []adminapi.DiscoveredAdminAPI {
}

func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -240,6 +245,11 @@ func TestAdminAPIClientsManager_Clients_DBMode(t *testing.T) {
}

func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

t.Parallel()

readinessChecker := &mockReadinessChecker{}
Expand Down Expand Up @@ -330,6 +340,10 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
}

func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
)

readinessChecker := &mockReadinessChecker{}
readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)})
testClient, err := adminapi.NewTestClient(testURL1)
Expand Down Expand Up @@ -365,6 +379,11 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {
}

func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

testClient, err := adminapi.NewTestClient(testURL1)
require.NoError(t, err)

Expand Down Expand Up @@ -459,6 +478,11 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
}

func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

testClient, err := adminapi.NewTestClient(testURL1)
require.NoError(t, err)

Expand Down
91 changes: 64 additions & 27 deletions internal/clients/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package clients
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -15,7 +14,7 @@ import (
)

const (
readinessCheckTimeout = time.Second
readinessCheckTimeout = 5 * time.Second
)

// ReadinessCheckResult represents the result of a readiness check.
Expand Down Expand Up @@ -71,19 +70,55 @@ func (c DefaultReadinessChecker) CheckReadiness(
readyClients []AlreadyCreatedClient,
pendingClients []adminapi.DiscoveredAdminAPI,
) ReadinessCheckResult {
var (
turnedReadyCh = make(chan []*adminapi.Client)
turnedPendingCh = make(chan []adminapi.DiscoveredAdminAPI)
)

go func(ctx context.Context, pendingClients []adminapi.DiscoveredAdminAPI) {
turnedReadyCh <- c.checkPendingGatewayClients(ctx, pendingClients)
close(turnedReadyCh)
}(ctx, pendingClients)

go func(ctx context.Context, readyClients []AlreadyCreatedClient) {
turnedPendingCh <- c.checkAlreadyExistingClients(ctx, readyClients)
close(turnedPendingCh)
}(ctx, readyClients)

return ReadinessCheckResult{
ClientsTurnedReady: c.checkPendingGatewayClients(ctx, pendingClients),
ClientsTurnedPending: c.checkAlreadyExistingClients(ctx, readyClients),
ClientsTurnedReady: <-turnedReadyCh,
ClientsTurnedPending: <-turnedPendingCh,
}
}

// checkPendingGatewayClients checks if the pending clients are ready to be used and returns the ones that are.
func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) {
var (
wg sync.WaitGroup
ch = make(chan *adminapi.Client)
)
for _, adminAPI := range lastPending {
if client := c.checkPendingClient(ctx, adminAPI); client != nil {
turnedReady = append(turnedReady, client)
}
wg.Add(1)
go func(adminAPI adminapi.DiscoveredAdminAPI) {
defer wg.Done()
if client := c.checkPendingClient(ctx, adminAPI); client != nil {
select {
case ch <- client:
case <-ctx.Done():
}
}
}(adminAPI)
}

go func() {
wg.Wait()
close(ch)
}()

for client := range ch {
turnedReady = append(turnedReady, client)
}

return turnedReady
}

Expand All @@ -94,33 +129,36 @@ func (c DefaultReadinessChecker) checkPendingClient(
ctx context.Context,
pendingClient adminapi.DiscoveredAdminAPI,
) (client *adminapi.Client) {
defer func() {
c.logger.V(logging.DebugLevel).
Info(fmt.Sprintf("Checking readiness of pending client for %q", pendingClient.Address),
"ok", client != nil,
)
}()

ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout)
defer cancel()

logger := c.logger.WithValues("address", pendingClient.Address)

client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient)
if err != nil {
// Despite the error reason we still want to keep the client in the pending list to retry later.
c.logger.V(logging.DebugLevel).Info("Pending client is not ready yet",
logger.V(logging.DebugLevel).Info(
"Pending client is not ready yet",
"reason", err.Error(),
"address", pendingClient.Address,
)
return nil
}

logger.V(logging.DebugLevel).Info(
"Checked readiness of pending client",
"ok", client != nil,
)

return client
}

// checkAlreadyExistingClients checks if the already existing clients are still ready to be used and returns the ones
// that are not.
func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, alreadyCreatedClients []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) {
var wg sync.WaitGroup
pendingChan := make(chan adminapi.DiscoveredAdminAPI, len(alreadyCreatedClients))
var (
wg sync.WaitGroup
pendingChan = make(chan adminapi.DiscoveredAdminAPI)
)

for _, client := range alreadyCreatedClients {
wg.Add(1)
Expand All @@ -139,9 +177,12 @@ func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context
)
return
}
pendingChan <- adminapi.DiscoveredAdminAPI{
select {
case <-ctx.Done():
case pendingChan <- adminapi.DiscoveredAdminAPI{
Address: client.BaseRootURL(),
PodRef: podRef,
}:
}
}
}(client)
Expand All @@ -160,24 +201,20 @@ func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context
}

func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, client AlreadyCreatedClient) (ready bool) {
defer func() {
c.logger.V(logging.DebugLevel).Info(
fmt.Sprintf("Checking readiness of already created client for %q", client.BaseRootURL()),
"ok", ready,
)
}()
logger := c.logger.WithValues("address", client.BaseRootURL())

ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout)
defer cancel()
if err := client.IsReady(ctx); err != nil {
// Despite the error reason we still want to keep the client in the pending list to retry later.
c.logger.V(logging.DebugLevel).Info(
logger.V(logging.DebugLevel).Info(
"Already created client is not ready, moving to pending",
"address", client.BaseRootURL(),
"reason", err.Error(),
)
return false
}

logger.V(logging.DebugLevel).Info("Already created client is ready")

return true
}
Loading

0 comments on commit b85dbb1

Please sign in to comment.