Skip to content

Commit

Permalink
Merge branch 'release/3.2.x' into prepare-release/3.2.4
Browse files Browse the repository at this point in the history
  • Loading branch information
randmonkey authored Aug 20, 2024
2 parents cc03dd4 + 649ced2 commit 5669be0
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 59 deletions.
24 changes: 24 additions & 0 deletions internal/clients/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func intoTurnedPending(urls ...string) []adminapi.DiscoveredAdminAPI {
}

func TestAdminAPIClientsManager_OnNotifyClientsAreUpdatedAccordingly(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -240,6 +245,11 @@ func TestAdminAPIClientsManager_Clients_DBMode(t *testing.T) {
}

func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

t.Parallel()

readinessChecker := &mockReadinessChecker{}
Expand Down Expand Up @@ -330,6 +340,10 @@ func TestAdminAPIClientsManager_SubscribeToGatewayClientsChanges(t *testing.T) {
}

func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
)

readinessChecker := &mockReadinessChecker{}
readinessChecker.LetChecksReturn(clients.ReadinessCheckResult{ClientsTurnedReady: intoTurnedReady(testURL1)})
testClient, err := adminapi.NewTestClient(testURL1)
Expand Down Expand Up @@ -365,6 +379,11 @@ func TestAdminAPIClientsManager_ConcurrentNotify(t *testing.T) {
}

func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

testClient, err := adminapi.NewTestClient(testURL1)
require.NoError(t, err)

Expand Down Expand Up @@ -459,6 +478,11 @@ func TestAdminAPIClientsManager_GatewayClientsChanges(t *testing.T) {
}

func TestAdminAPIClientsManager_PeriodicReadinessReconciliation(t *testing.T) {
const (
testURL1 = "http://localhost:8001"
testURL2 = "http://localhost:8002"
)

testClient, err := adminapi.NewTestClient(testURL1)
require.NoError(t, err)

Expand Down
136 changes: 96 additions & 40 deletions internal/clients/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package clients
import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -14,7 +14,7 @@ import (
)

const (
readinessCheckTimeout = time.Second
readinessCheckTimeout = 5 * time.Second
)

// ReadinessCheckResult represents the result of a readiness check.
Expand Down Expand Up @@ -70,19 +70,55 @@ func (c DefaultReadinessChecker) CheckReadiness(
readyClients []AlreadyCreatedClient,
pendingClients []adminapi.DiscoveredAdminAPI,
) ReadinessCheckResult {
var (
turnedReadyCh = make(chan []*adminapi.Client)
turnedPendingCh = make(chan []adminapi.DiscoveredAdminAPI)
)

go func(ctx context.Context, pendingClients []adminapi.DiscoveredAdminAPI) {
turnedReadyCh <- c.checkPendingGatewayClients(ctx, pendingClients)
close(turnedReadyCh)
}(ctx, pendingClients)

go func(ctx context.Context, readyClients []AlreadyCreatedClient) {
turnedPendingCh <- c.checkAlreadyExistingClients(ctx, readyClients)
close(turnedPendingCh)
}(ctx, readyClients)

return ReadinessCheckResult{
ClientsTurnedReady: c.checkPendingGatewayClients(ctx, pendingClients),
ClientsTurnedPending: c.checkAlreadyExistingClients(ctx, readyClients),
ClientsTurnedReady: <-turnedReadyCh,
ClientsTurnedPending: <-turnedPendingCh,
}
}

// checkPendingGatewayClients checks if the pending clients are ready to be used and returns the ones that are.
func (c DefaultReadinessChecker) checkPendingGatewayClients(ctx context.Context, lastPending []adminapi.DiscoveredAdminAPI) (turnedReady []*adminapi.Client) {
var (
wg sync.WaitGroup
ch = make(chan *adminapi.Client)
)
for _, adminAPI := range lastPending {
if client := c.checkPendingClient(ctx, adminAPI); client != nil {
turnedReady = append(turnedReady, client)
}
wg.Add(1)
go func(adminAPI adminapi.DiscoveredAdminAPI) {
defer wg.Done()
if client := c.checkPendingClient(ctx, adminAPI); client != nil {
select {
case ch <- client:
case <-ctx.Done():
}
}
}(adminAPI)
}

go func() {
wg.Wait()
close(ch)
}()

for client := range ch {
turnedReady = append(turnedReady, client)
}

return turnedReady
}

Expand All @@ -93,72 +129,92 @@ func (c DefaultReadinessChecker) checkPendingClient(
ctx context.Context,
pendingClient adminapi.DiscoveredAdminAPI,
) (client *adminapi.Client) {
defer func() {
c.logger.V(logging.DebugLevel).
Info(fmt.Sprintf("Checking readiness of pending client for %q", pendingClient.Address),
"ok", client != nil,
)
}()

ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout)
defer cancel()

logger := c.logger.WithValues("address", pendingClient.Address)

client, err := c.factory.CreateAdminAPIClient(ctx, pendingClient)
if err != nil {
// Despite the error reason we still want to keep the client in the pending list to retry later.
c.logger.V(logging.DebugLevel).Info("Pending client is not ready yet",
logger.V(logging.DebugLevel).Info(
"Pending client is not ready yet",
"reason", err.Error(),
"address", pendingClient.Address,
)
return nil
}

logger.V(logging.DebugLevel).Info(
"Checked readiness of pending client",
"ok", client != nil,
)

return client
}

// checkAlreadyExistingClients checks if the already existing clients are still ready to be used and returns the ones
// that are not.
func (c DefaultReadinessChecker) checkAlreadyExistingClients(ctx context.Context, alreadyCreatedClients []AlreadyCreatedClient) (turnedPending []adminapi.DiscoveredAdminAPI) {
var (
wg sync.WaitGroup
pendingChan = make(chan adminapi.DiscoveredAdminAPI)
)

for _, client := range alreadyCreatedClients {
// For ready clients we check readiness by calling the Status endpoint.
if ready := c.checkAlreadyCreatedClient(ctx, client); !ready {
podRef, ok := client.PodReference()
if !ok {
// This should never happen, but if it does, we want to log it.
c.logger.Error(
errors.New("missing pod reference"),
"Failed to get PodReference for client",
"address", client.BaseRootURL(),
)
continue
wg.Add(1)
go func(client AlreadyCreatedClient) {
defer wg.Done()

// For ready clients we check readiness by calling the Status endpoint.
if ready := c.checkAlreadyCreatedClient(ctx, client); !ready {
podRef, ok := client.PodReference()
if !ok {
// This should never happen, but if it does, we want to log it.
c.logger.Error(
errors.New("missing pod reference"),
"Failed to get PodReference for client",
"address", client.BaseRootURL(),
)
return
}
select {
case <-ctx.Done():
case pendingChan <- adminapi.DiscoveredAdminAPI{
Address: client.BaseRootURL(),
PodRef: podRef,
}:
}
}
turnedPending = append(turnedPending, adminapi.DiscoveredAdminAPI{
Address: client.BaseRootURL(),
PodRef: podRef,
})
}
}(client)
}

go func() {
wg.Wait()
close(pendingChan)
}()

for pendingClient := range pendingChan {
turnedPending = append(turnedPending, pendingClient)
}

return turnedPending
}

func (c DefaultReadinessChecker) checkAlreadyCreatedClient(ctx context.Context, client AlreadyCreatedClient) (ready bool) {
defer func() {
c.logger.V(logging.DebugLevel).Info(
fmt.Sprintf("Checking readiness of already created client for %q", client.BaseRootURL()),
"ok", ready,
)
}()
logger := c.logger.WithValues("address", client.BaseRootURL())

ctx, cancel := context.WithTimeout(ctx, readinessCheckTimeout)
defer cancel()
if err := client.IsReady(ctx); err != nil {
// Despite the error reason we still want to keep the client in the pending list to retry later.
c.logger.V(logging.DebugLevel).Info(
logger.V(logging.DebugLevel).Info(
"Already created client is not ready, moving to pending",
"address", client.BaseRootURL(),
"reason", err.Error(),
)
return false
}

logger.V(logging.DebugLevel).Info("Already created client is ready")

return true
}
Loading

0 comments on commit 5669be0

Please sign in to comment.