Skip to content

Commit

Permalink
change the readinessProbe, adjust discoverer, align tests
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Jul 18, 2023
1 parent 28c350b commit bfa24d1
Show file tree
Hide file tree
Showing 16 changed files with 246 additions and 123 deletions.
2 changes: 1 addition & 1 deletion config/variants/multi-gw/base/gateway_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ spec:
failureThreshold: 3
readinessProbe:
httpGet:
path: /status
path: /status/ready
port: 8100
scheme: HTTP
initialDelaySeconds: 5
Expand Down
2 changes: 1 addition & 1 deletion deploy/single/all-in-one-dbless-enterprise.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/single/all-in-one-dbless-konnect-enterprise.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/single/all-in-one-dbless-konnect.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/single/all-in-one-dbless.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/adminapi/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (d *Discoverer) AdminAPIsFromEndpointSlice(
}

for _, e := range endpoints.Endpoints {
if e.Conditions.Ready == nil || !*e.Conditions.Ready {
if e.Conditions.Terminating != nil && *e.Conditions.Terminating {
continue
}

Expand Down
27 changes: 17 additions & 10 deletions internal/adminapi/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) {
dnsStrategy: cfgtypes.ServiceScopedPodDNSStrategy,
},
{
name: "not ready endpoints are not returned",
name: "not ready endpoints are returned",
endpoints: discoveryv1.EndpointSlice{
ObjectMeta: endpointsSliceObjectMeta,
AddressType: discoveryv1.AddressTypeIPv4,
Expand All @@ -183,12 +183,18 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) {
},
Ports: builder.NewEndpointPort(8444).WithName("admin").IntoSlice(),
},
portNames: sets.New("admin"),
want: sets.New[DiscoveredAdminAPI](),
portNames: sets.New("admin"),
want: sets.New[DiscoveredAdminAPI](
DiscoveredAdminAPI{
Address: "https://10.0.0.1:8444",
PodRef: k8stypes.NamespacedName{
Name: "pod-1", Namespace: namespaceName,
},
}),
dnsStrategy: cfgtypes.IPDNSStrategy,
},
{
name: "not ready and terminating endpoints are not returned",
name: "ready and terminating endpoints are not returned",
endpoints: discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: uuid.NewString(),
Expand All @@ -199,7 +205,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) {
{
Addresses: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
Conditions: discoveryv1.EndpointConditions{
Ready: lo.ToPtr(false),
Ready: lo.ToPtr(true),
Terminating: lo.ToPtr(true),
},
TargetRef: testPodReference(namespaceName, "pod-1"),
Expand Down Expand Up @@ -237,7 +243,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) {
Addresses: []string{"10.0.2.1"},
Conditions: discoveryv1.EndpointConditions{
Ready: lo.ToPtr(false),
Terminating: lo.ToPtr(false),
Terminating: lo.ToPtr(true),
},
TargetRef: testPodReference(namespaceName, "pod-3"),
},
Expand Down Expand Up @@ -289,7 +295,7 @@ func TestDiscoverer_AddressesFromEndpointSlice(t *testing.T) {
Addresses: []string{"10.0.2.1"},
Conditions: discoveryv1.EndpointConditions{
Ready: lo.ToPtr(false),
Terminating: lo.ToPtr(false),
Terminating: lo.ToPtr(true),
},
TargetRef: testPodReference(namespaceName, "pod-3"),
},
Expand Down Expand Up @@ -551,7 +557,7 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) {
Addresses: []string{"8.0.0.1"},
Conditions: discoveryv1.EndpointConditions{
Ready: lo.ToPtr(false),
Terminating: lo.ToPtr(false),
Terminating: lo.ToPtr(true),
},
TargetRef: testPodReference(namespaceName, "pod-3"),
},
Expand Down Expand Up @@ -637,7 +643,7 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) {
dnsStrategy: cfgtypes.IPDNSStrategy,
},
{
name: "not Ready Endpoints are not matched",
name: "terminating Endpoints are not matched",
service: k8stypes.NamespacedName{
Namespace: namespaceName,
Name: serviceName,
Expand All @@ -652,7 +658,8 @@ func TestDiscoverer_GetAdminAPIsForService(t *testing.T) {
{
Addresses: []string{"7.0.0.1"},
Conditions: discoveryv1.EndpointConditions{
Ready: lo.ToPtr(false),
Ready: lo.ToPtr(false),
Terminating: lo.ToPtr(true),
},
TargetRef: testPodReference(namespaceName, "pod-1"),
},
Expand Down
5 changes: 2 additions & 3 deletions internal/clients/config_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"testing"
"time"

"github.com/go-logr/logr/testr"
"github.com/go-logr/logr"
"github.com/stretchr/testify/require"

"github.com/kong/kubernetes-ingress-controller/v2/internal/clients"
)

func TestChannelConfigNotifier(t *testing.T) {
logger := testr.New(t)
n := clients.NewChannelConfigNotifier(logger)
n := clients.NewChannelConfigNotifier(logr.Discard())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand Down
67 changes: 31 additions & 36 deletions internal/clients/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"sync"
"time"

"github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"

"github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi"
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/clock"
)

// DefaultReadinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop.
// It's the same as the default interval of the readiness probe.
// It's the same as the default interval of a Kubernetes container's readiness probe.
const DefaultReadinessReconciliationInterval = 10 * time.Second

// ClientFactory is responsible for creating Admin API clients.
type ClientFactory interface {
CreateAdminAPIClient(ctx context.Context, address adminapi.DiscoveredAdminAPI) (*adminapi.Client, error)
}
Expand All @@ -29,15 +30,17 @@ type AdminAPIClientsProvider interface {
GatewayClients() []*adminapi.Client
}

// Ticker is an interface that allows to control a ticker.
type Ticker interface {
Stop()
Channel() <-chan time.Time
Reset(d time.Duration)
}

// AdminAPIClientsManager keeps track of current Admin API clients of Gateways that we should configure.
// In particular, it can be notified about the clients' list update with use of Notify method, and queried
// for the latest slice of those with use of Clients method.
// In particular, it can be notified about the discovered clients' list with use of Notify method, and queried
// for the latest slice of ready to be configured clients with use of GatewayClients method. It also runs periodic
// readiness reconciliation loop which is responsible for checking readiness of the clients.
type AdminAPIClientsManager struct {
// discoveredAdminAPIsNotifyChan is used for notifications that contain Admin API
// endpoints list that should be used for configuring the dataplane.
Expand All @@ -47,7 +50,7 @@ type AdminAPIClientsManager struct {
ctx context.Context
onceNotifyLoopRunning sync.Once
running chan struct{}
isNotifyLoopRunning bool
isRunning bool

// readyGatewayClients represent all Kong Gateway data-planes that are ready to be configured.
readyGatewayClients map[string]*adminapi.Client
Expand All @@ -59,9 +62,6 @@ type AdminAPIClientsManager struct {
// readinessChecker is used to check readiness of the clients.
readinessChecker ReadinessChecker

// readinessReconciliationInterval is the interval at which the manager will run readiness reconciliation loop.
readinessReconciliationInterval time.Duration

// readinessReconciliationTicker is used to run readiness reconciliation loop.
readinessReconciliationTicker Ticker

Expand Down Expand Up @@ -99,15 +99,14 @@ func NewAdminAPIClientsManager(
return c.BaseRootURL(), c
})
c := &AdminAPIClientsManager{
readyGatewayClients: readyClients,
pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI),
readinessChecker: readinessChecker,
readinessReconciliationTicker: clock.NewTicker(),
readinessReconciliationInterval: DefaultReadinessReconciliationInterval,
discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI),
ctx: ctx,
running: make(chan struct{}),
logger: logger,
readyGatewayClients: readyClients,
pendingGatewayClients: make(map[string]adminapi.DiscoveredAdminAPI),
readinessChecker: readinessChecker,
readinessReconciliationTicker: clock.NewTicker(),
discoveredAdminAPIsNotifyChan: make(chan []adminapi.DiscoveredAdminAPI),
ctx: ctx,
running: make(chan struct{}),
logger: logger,
}

for _, opt := range opts {
Expand All @@ -123,13 +122,14 @@ func (c *AdminAPIClientsManager) Running() chan struct{} {
}

// Run runs a goroutine that will dynamically ingest new addresses of Kong Admin API endpoints.
// It should only be called when Gateway Discovery is enabled.
func (c *AdminAPIClientsManager) Run() {
c.onceNotifyLoopRunning.Do(func() {
go c.gatewayClientsReconciliationLoop()

c.lock.Lock()
defer c.lock.Unlock()
c.isNotifyLoopRunning = true
c.isRunning = true
})
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru
}

// No notify loop running, there will be no updates, let's propagate that to the caller.
if !c.isNotifyLoopRunning {
if !c.isRunning {
return nil, false
}

Expand All @@ -202,28 +202,18 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru
return ch, true
}

// gatewayClientsReconciliationLoop is an inner loop listening on notifyChan which are received via
// Notify() calls. Each time it receives on notifyChan tt will take the provided
// list of addresses and update the internally held list of clients such that:
// - the internal list of kong clients contains only the provided addresses
// - if a client for a provided address already exists it's not recreated again
// (hence no external calls are made to check the provided endpoint if there
// exists a client already using it)
// - client that do not exist in the provided address list are removed if they
// are present in the current state
//
// This function will acquire the internal lock to prevent the modification of
// internal clients list.
// gatewayClientsReconciliationLoop is an inner loop listening on:
// - discoveredAdminAPIsNotifyChan - triggered on every Notify() call.
// - readinessReconciliationTicker - triggered on every readinessReconciliationTicker tick.
func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() {
close(c.running)
c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval)
c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval)
defer c.readinessReconciliationTicker.Stop()

close(c.running)
for {
select {
case <-c.ctx.Done():
c.logger.Infof("closing AdminAPIClientsManager: %s", c.ctx.Err())
close(c.discoveredAdminAPIsNotifyChan)
c.closeGatewayClientsSubscribers()
return
case discoveredAdminAPIs := <-c.discoveredAdminAPIsNotifyChan:
Expand All @@ -234,6 +224,9 @@ func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() {
}
}

// onDiscoveredAdminAPIsNotification is called when a new notification about Admin API addresses change is received.
// It will adjust lists of gateway clients and notify subscribers about the change if readyGatewayClients list has
// changed.
func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) {
c.lock.Lock()
defer c.lock.Unlock()
Expand All @@ -246,6 +239,8 @@ func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdm
}
}

// onReadinessReconciliationTick is called on every readinessReconciliationTicker tick. It will reconcile readiness
// of all gateway clients and notify subscribers about the change if readyGatewayClients list has changed.
func (c *AdminAPIClientsManager) onReadinessReconciliationTick() {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -275,7 +270,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi
return true
}

// Make sure all discovered clients that are not on ready list are in the pending list.
// Make sure all discovered clients that are not in the ready list are in the pending list.
for _, d := range discoveredAdminAPIs {
if _, ok := c.readyGatewayClients[d.Address]; !ok {
c.pendingGatewayClients[d.Address] = d
Expand Down Expand Up @@ -316,7 +311,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi
func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
// Reset the ticker after each readiness reconciliation despite the trigger (whether it was a tick or a notification).
// It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications.
defer c.readinessReconciliationTicker.Reset(c.readinessReconciliationInterval)
defer c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval)

readinessCheckResult := c.readinessChecker.CheckReadiness(
c.ctx,
Expand Down
Loading

0 comments on commit bfa24d1

Please sign in to comment.