Skip to content

Commit

Permalink
feat: separate config status channels (#6349)
Browse files Browse the repository at this point in the history
Separate channels for gateway and Konnect config status updates.

---------

Co-authored-by: Travis Raines <[email protected]>
  • Loading branch information
randmonkey and rainest authored Jul 26, 2024
1 parent d63ce1d commit 205135b
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 112 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ Adding a new version? You'll need three changes:
`ReferenceGrant` in the namespace of the `KongPlugin` to grant permissions
to `KongCustomEntity` of referring to `KongPlugin`.
[#6289](https://github.com/Kong/kubernetes-ingress-controller/pull/6289)
- Konnect configuration updates are now handled separately from gateway
updates. This allows the controller to handle sync errors for the gateway and
Konnect speparately, and avoids one blocking the other.
[#6341](https://github.com/Kong/kubernetes-ingress-controller/pull/6341)
[#6349](https://github.com/Kong/kubernetes-ingress-controller/pull/6349)

### Fixed

Expand Down
89 changes: 61 additions & 28 deletions internal/clients/config_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,36 @@ const (
ConfigStatusUnknown ConfigStatus = "Unknown"
)

// CalculateConfigStatusInput aggregates the input to CalculateConfigStatus.
type CalculateConfigStatusInput struct {
// GatewayConfigApplyStatus stores the status of building Kong configuration and sending configuration to Kong gateways.
type GatewayConfigApplyStatus struct {
// TranslationFailuresOccurred is true means Translation of some of Kubernetes objects failed.
TranslationFailuresOccurred bool

// Any error occurred when syncing with Gateways.
GatewaysFailed bool
ApplyConfigFailed bool
}

// Any error occurred when syncing with Konnect,
KonnectFailed bool
// KonnectConfigUploadStatus stores the status of uploading configuration to Konnect.

// Translation of some of Kubernetes objects failed.
TranslationFailuresOccurred bool
type KonnectConfigUploadStatus struct {
Failed bool
}

// CalculateConfigStatus calculates a clients.ConfigStatus that sums up the configuration synchronisation result as
// a single enumerated value.
func CalculateConfigStatus(i CalculateConfigStatusInput) ConfigStatus {
func CalculateConfigStatus(g GatewayConfigApplyStatus, k KonnectConfigUploadStatus) ConfigStatus {
switch {
case !i.GatewaysFailed && !i.KonnectFailed && !i.TranslationFailuresOccurred:
case !g.ApplyConfigFailed && !g.TranslationFailuresOccurred && !k.Failed:
return ConfigStatusOK
case !i.GatewaysFailed && !i.KonnectFailed && i.TranslationFailuresOccurred:
case !g.ApplyConfigFailed && g.TranslationFailuresOccurred && !k.Failed:
return ConfigStatusTranslationErrorHappened
case i.GatewaysFailed && !i.KonnectFailed: // We don't care about translation failures if we can't apply to gateways.
case g.ApplyConfigFailed && !k.Failed: // We don't care about translation failures if we can't apply to gateways.
return ConfigStatusApplyFailed
case !i.GatewaysFailed && i.KonnectFailed && !i.TranslationFailuresOccurred:
case !g.ApplyConfigFailed && !g.TranslationFailuresOccurred && k.Failed:
return ConfigStatusOKKonnectApplyFailed
case !i.GatewaysFailed && i.KonnectFailed && i.TranslationFailuresOccurred:
case !g.ApplyConfigFailed && g.TranslationFailuresOccurred && k.Failed:
return ConfigStatusTranslationErrorHappenedKonnectApplyFailed
case i.GatewaysFailed && i.KonnectFailed: // We don't care about translation failures if we can't apply to gateways.
case g.ApplyConfigFailed && k.Failed: // We don't care about translation failures if we can't apply to gateways.
return ConfigStatusApplyFailedKonnectApplyFailed
}

Expand All @@ -56,53 +59,83 @@ func CalculateConfigStatus(i CalculateConfigStatusInput) ConfigStatus {
}

type ConfigStatusNotifier interface {
NotifyConfigStatus(context.Context, ConfigStatus)
NotifyGatewayConfigStatus(context.Context, GatewayConfigApplyStatus)
NotifyKonnectConfigStatus(context.Context, KonnectConfigUploadStatus)
}

type ConfigStatusSubscriber interface {
SubscribeConfigStatus() chan ConfigStatus
SubscribeGatewayConfigStatus() chan GatewayConfigApplyStatus
SubscribeKonnectConfigStatus() chan KonnectConfigUploadStatus
}

type NoOpConfigStatusNotifier struct{}

var _ ConfigStatusNotifier = NoOpConfigStatusNotifier{}

func (n NoOpConfigStatusNotifier) NotifyConfigStatus(_ context.Context, _ ConfigStatus) {
func (n NoOpConfigStatusNotifier) NotifyGatewayConfigStatus(_ context.Context, _ GatewayConfigApplyStatus) {
}

func (n NoOpConfigStatusNotifier) NotifyKonnectConfigStatus(_ context.Context, _ KonnectConfigUploadStatus) {
}

type ChannelConfigNotifier struct {
ch chan ConfigStatus
logger logr.Logger
gatewayStatusCh chan GatewayConfigApplyStatus
konnectStatusCh chan KonnectConfigUploadStatus
logger logr.Logger
}

var _ ConfigStatusNotifier = &ChannelConfigNotifier{}

func NewChannelConfigNotifier(logger logr.Logger) *ChannelConfigNotifier {
return &ChannelConfigNotifier{
ch: make(chan ConfigStatus),
logger: logger,
gatewayStatusCh: make(chan GatewayConfigApplyStatus),
konnectStatusCh: make(chan KonnectConfigUploadStatus),
logger: logger,
}
}

// NotifyConfigStatus sends the status in a separate goroutine. If the notification is not received in 1s, it's dropped.
func (n *ChannelConfigNotifier) NotifyConfigStatus(ctx context.Context, status ConfigStatus) {
// NotifyGatewayConfigStatus notifies status of sending configuration to Kong gateway(s).
func (n *ChannelConfigNotifier) NotifyGatewayConfigStatus(ctx context.Context, status GatewayConfigApplyStatus) {
const notifyTimeout = time.Second

go func() {
timeout := time.NewTimer(notifyTimeout)
defer timeout.Stop()

select {
case n.gatewayStatusCh <- status:
case <-ctx.Done():
n.logger.Info("Context done, not notifying gateway config status", "status", status)
case <-timeout.C:
n.logger.Info("Timed out notifying gateway config status", "status", status)
}
}()
}

// NotifyKonnectConfigStatus notifies status of sending configuration to Konnect.
func (n *ChannelConfigNotifier) NotifyKonnectConfigStatus(ctx context.Context, status KonnectConfigUploadStatus) {
const notifyTimeout = time.Second

go func() {
timeout := time.NewTimer(notifyTimeout)
defer timeout.Stop()

select {
case n.ch <- status:
case n.konnectStatusCh <- status:
case <-ctx.Done():
n.logger.Info("Context done, not notifying config status", "status", status)
n.logger.Info("Context done, not notifying Konnect config status", "status", status)
case <-timeout.C:
n.logger.Info("Timed out notifying config status", "status", status)
n.logger.Info("Timed out notifying Konnect config status", "status", status)
}
}()
}

func (n *ChannelConfigNotifier) SubscribeConfigStatus() chan ConfigStatus {
func (n *ChannelConfigNotifier) SubscribeGatewayConfigStatus() chan GatewayConfigApplyStatus {
// TODO: in case of multiple subscribers, we should use a fan-out pattern.
return n.gatewayStatusCh
}

func (n *ChannelConfigNotifier) SubscribeKonnectConfigStatus() chan KonnectConfigUploadStatus {
// TODO: in case of multiple subscribers, we should use a fan-out pattern.
return n.ch
return n.konnectStatusCh
}
14 changes: 8 additions & 6 deletions internal/clients/config_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ func TestChannelConfigNotifier(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

ch := n.SubscribeConfigStatus()
ch := n.SubscribeGatewayConfigStatus()

// Call NotifyConfigStatus 5 times to make sure that the method is non-blocking.
for i := 0; i < 5; i++ {
n.NotifyConfigStatus(ctx, clients.ConfigStatusOK)
n.NotifyGatewayConfigStatus(ctx, clients.GatewayConfigApplyStatus{})
}

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -90,11 +90,13 @@ func TestCalculateConfigStatus(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := clients.CalculateConfigStatus(clients.CalculateConfigStatusInput{
GatewaysFailed: tc.gatewayFailure,
KonnectFailed: tc.konnectFailure,
result := clients.CalculateConfigStatus(clients.GatewayConfigApplyStatus{
ApplyConfigFailed: tc.gatewayFailure,
TranslationFailuresOccurred: tc.translationFailures,
})
}, clients.KonnectConfigUploadStatus{
Failed: tc.konnectFailure,
},
)
require.Equal(t, tc.expectedConfigStatus, result)
})
}
Expand Down
31 changes: 7 additions & 24 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ type KongClient struct {
// It may be empty if the client is not running in a pod (e.g. in a unit test).
controllerPodReference mo.Option[k8stypes.NamespacedName]

// currentConfigStatus is the current status of the configuration synchronisation.
currentConfigStatus clients.ConfigStatus

// fallbackConfigGenerator is used to generate a fallback configuration in case of sync failures.
fallbackConfigGenerator FallbackConfigGenerator

Expand Down Expand Up @@ -490,13 +487,13 @@ func (c *KongClient) Update(ctx context.Context) error {

// Taking into account the results of syncing configuration with Gateways and Konnect, and potential translation
// failures, calculate the config status and update it.
c.updateConfigStatus(ctx, clients.CalculateConfigStatus(
clients.CalculateConfigStatusInput{
GatewaysFailed: gatewaysSyncErr != nil,
KonnectFailed: konnectSyncErr != nil,
TranslationFailuresOccurred: len(parsingResult.TranslationFailures) > 0,
},
))
c.configStatusNotifier.NotifyGatewayConfigStatus(ctx, clients.GatewayConfigApplyStatus{
TranslationFailuresOccurred: len(parsingResult.TranslationFailures) > 0,
ApplyConfigFailed: gatewaysSyncErr != nil,
})
c.configStatusNotifier.NotifyKonnectConfigStatus(ctx, clients.KonnectConfigUploadStatus{
Failed: konnectSyncErr != nil,
})

// In case of a failure in syncing configuration with Gateways, propagate the error.
if gatewaysSyncErr != nil {
Expand Down Expand Up @@ -1039,20 +1036,6 @@ func (c *KongClient) recordApplyConfigurationEvents(err error, rootURL string, i
c.eventRecorder.Event(pod, eventType, reason, message)
}

// updateConfigStatus updates the current config status and notifies about the change. It is a no-op if the status
// hasn't changed.
func (c *KongClient) updateConfigStatus(ctx context.Context, configStatus clients.ConfigStatus) {
if c.currentConfigStatus == configStatus {
// No change in config status, nothing to do.
c.logger.V(logging.DebugLevel).Info("No change in config status, not notifying")
return
}

c.logger.V(logging.DebugLevel).Info("Config status changed, notifying", "configStatus", configStatus)
c.currentConfigStatus = configStatus
c.configStatusNotifier.NotifyConfigStatus(ctx, configStatus)
}

func (c *KongClient) logFallbackCacheMetadata(metadata fallback.GeneratedCacheMetadata) {
log := c.logger.WithName("fallback-cache-generator")

Expand Down
48 changes: 39 additions & 9 deletions internal/dataplane/kong_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,10 @@ func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) {
}

type mockConfigStatusQueue struct {
notifications []clients.ConfigStatus
lock sync.RWMutex
gatewayConfigStatusNotifications []clients.GatewayConfigApplyStatus
konnectConfigStatusNotifications []clients.KonnectConfigUploadStatus
notifications []clients.ConfigStatus
lock sync.RWMutex
}

func newMockConfigStatusQueue() *mockConfigStatusQueue {
Expand All @@ -456,6 +458,34 @@ func (m *mockConfigStatusQueue) NotifyConfigStatus(_ context.Context, status cli
m.notifications = append(m.notifications, status)
}

func (m *mockConfigStatusQueue) NotifyGatewayConfigStatus(_ context.Context, status clients.GatewayConfigApplyStatus) {
m.lock.Lock()
defer m.lock.Unlock()
m.gatewayConfigStatusNotifications = append(m.gatewayConfigStatusNotifications, status)
}

func (m *mockConfigStatusQueue) NotifyKonnectConfigStatus(_ context.Context, status clients.KonnectConfigUploadStatus) {
m.lock.Lock()
defer m.lock.Unlock()
m.konnectConfigStatusNotifications = append(m.konnectConfigStatusNotifications, status)
}

func (m *mockConfigStatusQueue) GatewayConfigStatusNotifications() []clients.GatewayConfigApplyStatus {
m.lock.RLock()
defer m.lock.RUnlock()
copied := make([]clients.GatewayConfigApplyStatus, len(m.gatewayConfigStatusNotifications))
copy(copied, m.gatewayConfigStatusNotifications)
return copied
}

func (m *mockConfigStatusQueue) KonnectConfigStatusNotifications() []clients.KonnectConfigUploadStatus {
m.lock.RLock()
defer m.lock.RUnlock()
copied := make([]clients.KonnectConfigUploadStatus, len(m.konnectConfigStatusNotifications))
copy(copied, m.konnectConfigStatusNotifications)
return copied
}

func (m *mockConfigStatusQueue) Notifications() []clients.ConfigStatus {
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down Expand Up @@ -604,13 +634,13 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) {
configBuilder.returnTranslationFailures(tc.translationFailures)

_ = kongClient.Update(ctx)
notifications := statusQueue.Notifications()
require.Len(t, notifications, 1)
require.Equal(t, tc.expectedStatus, notifications[0])

_ = kongClient.Update(ctx)
notifications = statusQueue.Notifications()
require.Len(t, notifications, 1, "no new notification should be sent if the status hasn't changed")
gatewayNotifications := statusQueue.GatewayConfigStatusNotifications()
konnectNotifications := statusQueue.KonnectConfigStatusNotifications()
require.Len(t, gatewayNotifications, 1)
require.Len(t, konnectNotifications, 1)
require.Equal(t, tc.expectedStatus, clients.CalculateConfigStatus(
gatewayNotifications[0], konnectNotifications[0],
))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/dataplane/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestSynchronizer(t *testing.T) {
assert.Equal(t, err.Error(), "server is already running")

t.Log("verifying that eventually the synchronizer reports as ready for a dbless dataplane")
assert.Eventually(t, func() bool { return sync.IsReady() }, testSynchronizerTick*2, testSynchronizerTick)
assert.Eventually(t, func() bool { return sync.IsReady() }, testSynchronizerTick*3, testSynchronizerTick)

t.Log("verifying that the dataplane eventually receieves several successful updates from the synchronizer")
assert.Eventually(t, func() bool {
Expand Down
37 changes: 27 additions & 10 deletions internal/konnect/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type NodeAgent struct {
refreshPeriod time.Duration
refreshTicker Ticker

gatewayConfigStatus clients.GatewayConfigApplyStatus
konnectConfigStatus clients.KonnectConfigUploadStatus
configStatus atomic.Value
configStatusSubscriber clients.ConfigStatusSubscriber

Expand Down Expand Up @@ -181,29 +183,44 @@ func sortNodesByLastPing(nodes []*nodes.NodeItem) {

// subscribeConfigStatus subscribes and updates KIC status on translating and applying configurations to kong gateway.
func (a *NodeAgent) subscribeConfigStatus(ctx context.Context) {
ch := a.configStatusSubscriber.SubscribeConfigStatus()
gatewayStatusCh := a.configStatusSubscriber.SubscribeGatewayConfigStatus()
konnectStatusCh := a.configStatusSubscriber.SubscribeKonnectConfigStatus()
chDone := ctx.Done()

for {
select {
case <-chDone:
a.logger.Info("Subscribe loop stopped", "message", ctx.Err().Error())
return
case configStatus := <-ch:
if configStatus == a.configStatus.Load() {
a.logger.V(logging.DebugLevel).Info("Config status not changed, skipping update")
continue
case gatewayConfigStatus := <-gatewayStatusCh:
if a.gatewayConfigStatus != gatewayConfigStatus {
a.logger.V(logging.DebugLevel).Info("Gateway config status changed")
a.gatewayConfigStatus = gatewayConfigStatus
a.maybeUpdateConfigStatus(ctx)
}

a.logger.V(logging.DebugLevel).Info("Config status changed, updating nodes")
a.configStatus.Store(configStatus)
if err := a.updateNodes(ctx); err != nil {
a.logger.Error(err, "Failed to update nodes after config status changed")
case konnectConfigStatus := <-konnectStatusCh:
if a.konnectConfigStatus != konnectConfigStatus {
a.logger.V(logging.DebugLevel).Info("Konnect config status changed")
a.konnectConfigStatus = konnectConfigStatus
a.maybeUpdateConfigStatus(ctx)
}
}
}
}

func (a *NodeAgent) maybeUpdateConfigStatus(ctx context.Context) {
configStatus := clients.CalculateConfigStatus(a.gatewayConfigStatus, a.konnectConfigStatus)
if configStatus == a.configStatus.Load() {
a.logger.V(logging.DebugLevel).Info("Config status not changed, skipping update")
return
}
a.logger.V(logging.DebugLevel).Info("Config status changed, updating nodes")
a.configStatus.Store(configStatus)
if err := a.updateNodes(ctx); err != nil {
a.logger.Error(err, "Failed to update nodes after config status changed")
}
}

func (a *NodeAgent) subscribeToGatewayClientsChanges(ctx context.Context) {
gatewayClientsChangedCh, changesAreExpected := a.gatewayClientsChangesNotifier.SubscribeToGatewayClientsChanges()
if !changesAreExpected {
Expand Down
Loading

0 comments on commit 205135b

Please sign in to comment.