diff --git a/internal/adminapi/konnect.go b/internal/adminapi/konnect.go index 5ab899cea5..e5fae99620 100644 --- a/internal/adminapi/konnect.go +++ b/internal/adminapi/konnect.go @@ -114,3 +114,30 @@ func KonnectHTTPDoer() kong.Doer { return resp, nil } } + +// KonnectClientFactory is a factory to create KonnectClient instances. +type KonnectClientFactory struct { + konnectConfig KonnectConfig + logger logr.Logger +} + +// NewKonnectClientFactory creates a new KonnectClientFactory instance. +func NewKonnectClientFactory(konnectConfig KonnectConfig, logger logr.Logger) *KonnectClientFactory { + return &KonnectClientFactory{ + konnectConfig: konnectConfig, + logger: logger, + } +} + +// NewKonnectClient create a new KonnectClient instance, ensuring the connection to Konnect Admin API. +// Please note it may block for a few seconds while trying to connect to Konnect Admin API. +func (f *KonnectClientFactory) NewKonnectClient(ctx context.Context) (*KonnectClient, error) { + konnectAdminAPIClient, err := NewKongClientForKonnectControlPlane(f.konnectConfig) + if err != nil { + return nil, fmt.Errorf("failed creating Konnect Control Plane Admin API client: %w", err) + } + if err := EnsureKonnectConnection(ctx, konnectAdminAPIClient.AdminAPIClient(), f.logger); err != nil { + return nil, fmt.Errorf("failed to ensure connection to Konnect Admin API: %w", err) + } + return konnectAdminAPIClient, nil +} diff --git a/internal/clients/config_status.go b/internal/clients/config_status.go index 84bbe000d3..38a48dd3ea 100644 --- a/internal/clients/config_status.go +++ b/internal/clients/config_status.go @@ -32,6 +32,7 @@ type GatewayConfigApplyStatus struct { // KonnectConfigUploadStatus stores the status of uploading configuration to Konnect. type KonnectConfigUploadStatus struct { + // Failed indicates whether the config upload to Konnect failed. Failed bool } diff --git a/internal/clients/manager.go b/internal/clients/manager.go index f71283e0f1..9dd3fa8e0e 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -31,7 +31,6 @@ type ClientFactory interface { // AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that // we should configure. type AdminAPIClientsProvider interface { - KonnectClient() *adminapi.KonnectClient GatewayClients() []*adminapi.Client GatewayClientsToConfigure() []*adminapi.Client } @@ -75,10 +74,6 @@ type AdminAPIClientsManager struct { // readinessReconciliationTicker is used to run readiness reconciliation loop. readinessReconciliationTicker Ticker - // konnectClient represents a special-case of the data-plane which is Konnect cloud. - // This client is used to synchronise configuration with Konnect's Control Plane Admin API. - konnectClient *adminapi.KonnectClient - // lock prevents concurrent access to the manager's fields. lock sync.RWMutex @@ -177,20 +172,6 @@ func (c *AdminAPIClientsManager) Notify(discoveredAPIs []adminapi.DiscoveredAdmi } } -// SetKonnectClient sets a client that will be used to communicate with Konnect Control Plane Admin API. -// If called multiple times, it will override the client. -func (c *AdminAPIClientsManager) SetKonnectClient(client *adminapi.KonnectClient) { - c.lock.Lock() - defer c.lock.Unlock() - c.konnectClient = client -} - -func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient { - c.lock.RLock() - defer c.lock.RUnlock() - return c.konnectClient -} - // GatewayClients returns a copy of current client's slice. Konnect client won't be included. // This method can be used when some actions need to be performed only against Kong Gateway clients. func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client { diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index 8657d17d8a..89d9f68aa4 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -207,12 +207,6 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { require.Len(t, m.GatewayClients(), 1, "expecting one initial client") require.Equal(t, m.GatewayClientsCount(), 1, "expecting one initial client") require.Len(t, m.GatewayClientsToConfigure(), 1, "Expecting one initial client") - - konnectTestClient := &adminapi.KonnectClient{} - m.SetKonnectClient(konnectTestClient) - require.Len(t, m.GatewayClients(), 1, "konnect client should not be returned from GatewayClients") - require.Equal(t, m.GatewayClientsCount(), 1, "konnect client should not be counted in GatewayClientsCount") - require.Equal(t, konnectTestClient, m.KonnectClient(), "konnect client should be returned from KonnectClient") } func TestAdminAPIClientsManager_Clients_DBMode(t *testing.T) { diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 86db4be5ef..44483945ef 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -35,7 +35,6 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" "github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics" - "github.com/kong/kubernetes-ingress-controller/v3/internal/konnect" "github.com/kong/kubernetes-ingress-controller/v3/internal/logging" "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" @@ -84,6 +83,11 @@ type FallbackConfigGenerator interface { ) (store.CacheStores, fallback.GeneratedCacheMetadata, error) } +// KonnectKongStateUpdater is an interface for updating the current state of configuration seen by Konnect. +type KonnectKongStateUpdater interface { + UpdateKongState(ctx context.Context, kongState *kongstate.KongState, isFallback bool) +} + // KongClient is a threadsafe high level API client for the Kong data-plane(s) // which parses Kubernetes object caches into Kong Admin configurations and // sends them as updates to the data-plane(s) (Kong Admin API). @@ -180,9 +184,9 @@ type KongClient struct { // lastValidCacheSnapshot can also represent the fallback cache snapshot that was successfully synced with gateways. lastValidCacheSnapshot *store.CacheStores - // konnectConfigSynchronizer receives latest successfully applied Kong configuration from KongClient - // and uploads it to Konnect. - konnectConfigSynchronizer *konnect.ConfigSynchronizer + // konnectKongStateUpdater is used to update the current state seen by Konnect that will be picked asynchronously + // by the Konnect config synchronization loop. + konnectKongStateUpdater KonnectKongStateUpdater } // NewKongClient provides a new KongClient object after connecting to the @@ -492,7 +496,7 @@ func (c *KongClient) Update(ctx context.Context) error { const isFallback = false shas, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, parsingResult.KongState, c.kongConfig, isFallback) - // Taking into account the results of syncing configuration with Gateways and Konnect, and potential translation + // Taking into account the results of syncing configuration with Gateways and potential translation // failures, calculate the config status and update it. c.configStatusNotifier.NotifyGatewayConfigStatus(ctx, clients.GatewayConfigApplyStatus{ TranslationFailuresOccurred: len(parsingResult.TranslationFailures) > 0, @@ -513,8 +517,8 @@ func (c *KongClient) Update(ctx context.Context) error { return gatewaysSyncErr } - // Send configuration to Konnect ONLY when successfully applied configuration to gateways. - c.maybeSendOutToKonnectClient(ctx, parsingResult.KongState, c.kongConfig, isFallback) + // Send configuration to Konnect only when successfully applied configuration to gateways. + c.maybeUpdateKonnectKongState(ctx, parsingResult.KongState, isFallback) // Gateways were successfully synced with the current configuration, so we can update the last valid cache snapshot. c.maybePreserveTheLastValidConfigCache(cacheSnapshot) @@ -634,7 +638,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration( if gatewaysSyncErr != nil { return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr) } - c.maybeSendOutToKonnectClient(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback) + c.maybeUpdateKonnectKongState(ctx, fallbackParsingResult.KongState, isFallback) // Configuration was successfully recovered with the fallback configuration. Store the last valid configuration. c.maybePreserveTheLastValidConfigCache(fallbackCache) @@ -731,34 +735,16 @@ func (c *KongClient) sendOutToGatewayClients( return previousSHAs, nil } -// maybeSendOutToKonnectClient sends out the configuration to Konnect when KonnectClient is provided. -// It's a noop when Konnect integration is not enabled. -func (c *KongClient) maybeSendOutToKonnectClient( +// maybeUpdateKonnectKongState updates the KongState seen by Konnect if the konnectKongStateUpdater is set. +func (c *KongClient) maybeUpdateKonnectKongState( ctx context.Context, s *kongstate.KongState, - config sendconfig.Config, - _ bool, + isFallback bool, ) { - if c.konnectConfigSynchronizer == nil { + if c.konnectKongStateUpdater == nil { return } - konnectClient := c.clientsProvider.KonnectClient() - if konnectClient == nil { - return - } - - if config.SanitizeKonnectConfigDumps { - s = s.SanitizedCopy(util.DefaultUUIDGenerator{}) - } - - deckGenParams := deckgen.GenerateDeckContentParams{ - SelectorTags: config.FilterTags, - ExpressionRoutes: config.ExpressionRoutes, - PluginSchemas: konnectClient.PluginSchemaStore(), - AppendStubEntityWhenConfigEmpty: false, - } - targetContent := deckgen.ToDeckContent(ctx, c.logger, s, deckGenParams) - c.konnectConfigSynchronizer.SetTargetContent(targetContent) + c.konnectKongStateUpdater.UpdateKongState(ctx, s, isFallback) } func (c *KongClient) sendToClient( @@ -770,16 +756,11 @@ func (c *KongClient) sendToClient( ) (string, error) { logger := c.logger.WithValues("url", client.AdminAPIClient().BaseRootURL()) - // If the client is Konnect and the feature flag is turned on, - // we should sanitize the configuration before sending it out. - if client.IsKonnect() && config.SanitizeKonnectConfigDumps { - s = s.SanitizedCopy(util.DefaultUUIDGenerator{}) - } deckGenParams := deckgen.GenerateDeckContentParams{ SelectorTags: config.FilterTags, ExpressionRoutes: config.ExpressionRoutes, PluginSchemas: client.PluginSchemaStore(), - AppendStubEntityWhenConfigEmpty: !client.IsKonnect() && config.InMemory, + AppendStubEntityWhenConfigEmpty: config.InMemory, } targetContent := deckgen.ToDeckContent(ctx, logger, s, deckGenParams) customEntities := make(sendconfig.CustomEntitiesByType) @@ -846,7 +827,7 @@ func (c *KongClient) sendToClient( } // SetConfigStatusNotifier sets a notifier which notifies subscribers about configuration sending results. -// Currently it is used for uploading the node status to konnect control plane. +// Currently, it is used for uploading the node status to Konnect control plane. func (c *KongClient) SetConfigStatusNotifier(n clients.ConfigStatusNotifier) { c.lock.Lock() defer c.lock.Unlock() @@ -854,11 +835,11 @@ func (c *KongClient) SetConfigStatusNotifier(n clients.ConfigStatusNotifier) { c.configStatusNotifier = n } -func (c *KongClient) SetKonnectConfigSynchronizer(s *konnect.ConfigSynchronizer) { +// SetKonnectKongStateUpdater sets the KonnectKongStateUpdater which updates the KongState seen by Konnect. +func (c *KongClient) SetKonnectKongStateUpdater(u KonnectKongStateUpdater) { c.lock.Lock() defer c.lock.Unlock() - - c.konnectConfigSynchronizer = s + c.konnectKongStateUpdater = u } // ----------------------------------------------------------------------------- diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 1a887f1263..e76d32b291 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net/http" "slices" "strings" "sync" @@ -44,7 +43,6 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" "github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics" - "github.com/kong/kubernetes-ingress-controller/v3/internal/konnect" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/versions" "github.com/kong/kubernetes-ingress-controller/v3/test/helpers" @@ -146,14 +144,9 @@ var ( // mockGatewayClientsProvider is a mock implementation of dataplane.AdminAPIClientsProvider. type mockGatewayClientsProvider struct { gatewayClients []*adminapi.Client - konnectClient *adminapi.KonnectClient dbMode dpconf.DBMode } -func (p *mockGatewayClientsProvider) KonnectClient() *adminapi.KonnectClient { - return p.konnectClient -} - func (p *mockGatewayClientsProvider) GatewayClients() []*adminapi.Client { return p.gatewayClients } @@ -200,7 +193,6 @@ func (m *mockFallbackConfigGenerator) GenerateBackfillingBrokenObjects( func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) { var ( ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) testGatewayClients = []*adminapi.Client{ mustSampleGatewayClient(t), mustSampleGatewayClient(t), @@ -210,22 +202,15 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes testCases := []struct { name string gatewayClients []*adminapi.Client - konnectClient *adminapi.KonnectClient + withKonnectUpdater bool errorOnUpdateForURLs []string expectError bool }{ { - name: "2 gateway clients and konnect with no errors", - gatewayClients: testGatewayClients, - konnectClient: testKonnectClient, - expectError: false, - }, - { - name: "2 gateway clients and konnect with error on konnect", - gatewayClients: testGatewayClients, - konnectClient: testKonnectClient, - errorOnUpdateForURLs: []string{testKonnectClient.BaseRootURL()}, - expectError: false, + name: "2 gateway clients and konnect with no errors", + gatewayClients: testGatewayClients, + withKonnectUpdater: true, + expectError: false, }, { name: "2 gateway clients with error on one of them", @@ -233,26 +218,6 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes errorOnUpdateForURLs: []string{testGatewayClients[0].BaseRootURL()}, expectError: true, }, - { - name: "2 gateway clients and konnect with error on one of gateways and konnect", - gatewayClients: testGatewayClients, - errorOnUpdateForURLs: []string{ - testGatewayClients[0].BaseRootURL(), - testKonnectClient.BaseRootURL(), - }, - expectError: true, - }, - { - name: "only konnect client with no error", - konnectClient: testKonnectClient, - expectError: false, - }, - { - name: "only konnect client with error on it", - konnectClient: testKonnectClient, - errorOnUpdateForURLs: []string{testKonnectClient.BaseRootURL()}, - expectError: false, - }, { name: "no clients at all", expectError: false, @@ -263,22 +228,24 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes t.Run(tc.name, func(t *testing.T) { clientsProvider := &mockGatewayClientsProvider{ gatewayClients: tc.gatewayClients, - konnectClient: tc.konnectClient, } updateStrategyResolver := mocks.NewUpdateStrategyResolver() for _, url := range tc.errorOnUpdateForURLs { updateStrategyResolver.ReturnErrorOnUpdate(url) } - // always return true for ConfigurationChanged to trigger an update configChangeDetector := mocks.ConfigurationChangeDetector{ + // Always return true for ConfigurationChanged to trigger an update. ConfigurationChanged: true, } configBuilder := newMockKongConfigBuilder() kongRawStateGetter := &mockKongLastValidConfigFetcher{} kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) - // Set Konnect client - if tc.konnectClient != nil { - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, clients.NoOpConfigStatusNotifier{}) + + // Set KonnectKongStateUpdater if requested. + var konnectUpdater *mocks.KonnectKongStateUpdater + if tc.withKonnectUpdater { + konnectUpdater = &mocks.KonnectKongStateUpdater{} + kongClient.SetKonnectKongStateUpdater(konnectUpdater) } err := kongClient.Update(ctx) @@ -294,9 +261,8 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes }) updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount(t, expectedURLsCalled) // Verify that Konnect client is called eventually. - if tc.konnectClient != nil { - // Should eventually get content in Konnect client if Konnect client enabled. - _ = updateStrategyResolver.EventuallyGetLastUpdatedContentForURL(t, tc.konnectClient.BaseRootURL(), testKonenctUploadWait, testKonnectUploadPeriod) + if tc.withKonnectUpdater { + require.Len(t, konnectUpdater.Calls(), 1, "expected Konnect updater to be called") } }) } @@ -308,7 +274,6 @@ func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) { mustSampleGatewayClient(t), mustSampleGatewayClient(t), }, - konnectClient: mustSampleKonnectClient(t), } updateStrategyResolver := mocks.NewUpdateStrategyResolver() @@ -449,12 +414,10 @@ func (p *mockKongConfigBuilder) returnTranslationFailuresForAllButFirstCall(fail func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { var ( ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) testGatewayClient = mustSampleGatewayClient(t) clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, } configChangeDetector = mocks.ConfigurationChangeDetector{ConfigurationChanged: true} @@ -464,44 +427,33 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { testCases := []struct { name string gatewayFailuresCount int - konnectFailuresCount int translationFailures bool - expectedStatus clients.ConfigStatus + expectedNotification clients.GatewayConfigApplyStatus }{ { name: "success", translationFailures: false, - expectedStatus: clients.ConfigStatusOK, + expectedNotification: clients.GatewayConfigApplyStatus{ + TranslationFailuresOccurred: false, + ApplyConfigFailed: false, + }, }, { name: "gateway failure", gatewayFailuresCount: 2, translationFailures: false, - expectedStatus: clients.ConfigStatusApplyFailed, + expectedNotification: clients.GatewayConfigApplyStatus{ + TranslationFailuresOccurred: false, + ApplyConfigFailed: true, + }, }, { name: "translation failures", translationFailures: true, - expectedStatus: clients.ConfigStatusTranslationErrorHappened, - }, - { - name: "konnect failure", - konnectFailuresCount: 2, - translationFailures: false, - expectedStatus: clients.ConfigStatusOKKonnectApplyFailed, - }, - { - name: "both gateway and konnect failure", - gatewayFailuresCount: 2, - konnectFailuresCount: 2, - translationFailures: false, - expectedStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, - }, - { - name: "translation failures and konnect failure", - konnectFailuresCount: 2, - translationFailures: true, - expectedStatus: clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed, + expectedNotification: clients.GatewayConfigApplyStatus{ + TranslationFailuresOccurred: true, + ApplyConfigFailed: false, + }, }, } @@ -514,34 +466,18 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) ) - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, statusQueue) - // Set an initial content in Konnect syncer to avoid that failure on gateway update causing no target content saved in Konnect config syncer - // thus uploading config to Konnect is not triggered. - kongClient.konnectConfigSynchronizer.SetTargetContent(&file.Content{}) kongClient.SetConfigStatusNotifier(statusQueue) for range tc.gatewayFailuresCount { updateStrategyResolver.ReturnErrorOnUpdate(testGatewayClient.BaseRootURL()) } - for range tc.konnectFailuresCount { - updateStrategyResolver.ReturnErrorOnUpdate(testKonnectClient.BaseRootURL()) - } configBuilder.returnTranslationFailures(tc.translationFailures) _ = kongClient.Update(ctx) gatewayNotifications := statusQueue.GatewayConfigStatusNotifications() require.Len(t, gatewayNotifications, 1, "Should receive gateway configuration status right after update") - require.Eventually( - t, func() bool { - konnectNotifications := statusQueue.KonnectConfigStatusNotifications() - return len(konnectNotifications) > 0 - }, 10*testKonnectUploadPeriod, testKonnectUploadPeriod, - "Should receive Konnect config status in time", - ) - konnectNotifications := statusQueue.KonnectConfigStatusNotifications() - require.Equal(t, tc.expectedStatus, clients.CalculateConfigStatus( - gatewayNotifications[0], konnectNotifications[0], - )) + notification := gatewayNotifications[0] + assert.Equal(t, tc.expectedNotification, notification) }) } } @@ -784,12 +720,10 @@ func TestKongClient_KubernetesEvents(t *testing.T) { func TestKongClient_EmptyConfigUpdate(t *testing.T) { var ( ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) testGatewayClient = mustSampleGatewayClient(t) clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, } updateStrategyResolver = mocks.NewUpdateStrategyResolver() @@ -798,9 +732,11 @@ func TestKongClient_EmptyConfigUpdate(t *testing.T) { kongRawStateGetter = &mockKongLastValidConfigFetcher{} kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) ) - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, newMockConfigStatusQueue()) t.Run("dbless", func(t *testing.T) { + konnectKongStateUpdater := &mocks.KonnectKongStateUpdater{} + kongClient.SetKonnectKongStateUpdater(konnectKongStateUpdater) + kongClient.kongConfig.InMemory = true err := kongClient.Update(ctx) require.NoError(t, err) @@ -818,33 +754,26 @@ func TestKongClient_EmptyConfigUpdate(t *testing.T) { }, }, "gateway content should have appended stub upstream") - var konnectContent sendconfig.ContentWithHash - require.Eventually(t, func() bool { - c, ok := updateStrategyResolver.LastUpdatedContentForURL(testKonnectClient.BaseRootURL()) - if ok { - konnectContent = c - } - return ok - }, testKonenctUploadWait, testKonnectUploadPeriod, "Konnect client should be updated in time") - require.True(t, deckgen.IsContentEmpty(konnectContent.Content), "konnect content should be empty") + require.Len(t, konnectKongStateUpdater.Calls(), 1) + konnectKongStateUpdaterCall := konnectKongStateUpdater.Calls()[0] + assert.Equal(t, &kongstate.KongState{}, konnectKongStateUpdaterCall.KongState) }) t.Run("db", func(t *testing.T) { + konnectKongStateUpdater := &mocks.KonnectKongStateUpdater{} + kongClient.SetKonnectKongStateUpdater(konnectKongStateUpdater) + kongClient.kongConfig.InMemory = false err := kongClient.Update(ctx) require.NoError(t, err) gwContent, ok := updateStrategyResolver.LastUpdatedContentForURL(testGatewayClient.BaseRootURL()) require.True(t, ok) - require.True(t, deckgen.IsContentEmpty(gwContent.Content), "konnect content should be empty") - - var konnectContent sendconfig.ContentWithHash - var konnectContentOK bool - require.Eventually(t, func() bool { - konnectContent, konnectContentOK = updateStrategyResolver.LastUpdatedContentForURL(testKonnectClient.BaseRootURL()) - return konnectContentOK - }, testKonenctUploadWait, testKonnectUploadPeriod, "Konnect client should be updated in time") - require.True(t, deckgen.IsContentEmpty(konnectContent.Content), "konnect content should be empty") + require.True(t, deckgen.IsContentEmpty(gwContent.Content), "gateway content should be empty") + + require.Len(t, konnectKongStateUpdater.Calls(), 1) + konnectKongStateUpdaterCall := konnectKongStateUpdater.Calls()[0] + assert.Equal(t, &kongstate.KongState{}, konnectKongStateUpdaterCall.KongState) }) } @@ -890,38 +819,6 @@ func setupTestKongClient( return kongClient } -const ( - testKonnectUploadPeriod = 20 * time.Millisecond - testKonenctUploadWait = 5 * testKonnectUploadPeriod -) - -func attachKonnectConfigSynchronizer( - ctx context.Context, - t *testing.T, - kc *KongClient, - updateStrategyResolver *mocks.UpdateStrategyResolver, - clientsProvider *mockGatewayClientsProvider, - configChangeDetector sendconfig.ConfigurationChangeDetector, - configStatusNotifier clients.ConfigStatusNotifier, -) { - config := sendconfig.Config{ - SanitizeKonnectConfigDumps: true, - } - konnectConfigSynchronizer := konnect.NewConfigSynchronizer( - logr.Discard(), - config, - testKonnectUploadPeriod, - clientsProvider.KonnectClient(), - updateStrategyResolver, - configChangeDetector, - configStatusNotifier, - mocks.MetricsRecorder{}, - ) - kc.SetKonnectConfigSynchronizer(konnectConfigSynchronizer) - err := konnectConfigSynchronizer.Start(ctx) - require.NoError(t, err) -} - func mustSampleGatewayClient(t *testing.T) *adminapi.Client { t.Helper() c, err := adminapi.NewTestClient(fmt.Sprintf("https://%s:8080", uuid.NewString())) @@ -929,16 +826,6 @@ func mustSampleGatewayClient(t *testing.T) *adminapi.Client { return c } -func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { - t.Helper() - - c, err := adminapi.NewKongAPIClient(fmt.Sprintf("https://%s.konghq.tech", uuid.NewString()), &http.Client{}) - require.NoError(t, err) - - rgID := uuid.NewString() - return adminapi.NewKonnectClient(c, rgID, false) -} - type mockKongLastValidConfigFetcher struct { kongRawState *utils.KongRawState lastKongState *kongstate.KongState @@ -1092,46 +979,6 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { } } -func TestKongClientUpdate_KonnectUpdatesAreSanitized(t *testing.T) { - ctx := context.Background() - clientsProvider := &mockGatewayClientsProvider{ - gatewayClients: []*adminapi.Client{mustSampleGatewayClient(t)}, - konnectClient: mustSampleKonnectClient(t), - } - updateStrategyResolver := mocks.NewUpdateStrategyResolver() - configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} - configBuilder := newMockKongConfigBuilder() - configBuilder.kongState = &kongstate.KongState{ - Certificates: []kongstate.Certificate{ - { - Certificate: kong.Certificate{ - ID: kong.String("new_cert"), - Key: kong.String(`private-key-string`), // This should be redacted. - }, - }, - }, - } - - kongRawStateGetter := &mockKongLastValidConfigFetcher{} - kongClient := setupTestKongClient( - t, - updateStrategyResolver, - clientsProvider, - configChangeDetector, - configBuilder, - nil, - kongRawStateGetter, - ) - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, clients.NoOpConfigStatusNotifier{}) - require.NoError(t, kongClient.Update(ctx)) - - konnectContent := updateStrategyResolver.EventuallyGetLastUpdatedContentForURL(t, clientsProvider.konnectClient.BaseRootURL(), testKonenctUploadWait, testKonnectUploadPeriod) - require.Len(t, konnectContent.Content.Certificates, 1, "expected Konnect to have 1 certificate") - cert := konnectContent.Content.Certificates[0] - require.NotNil(t, cert.Key, "expected Konnect to have certificate key") - require.Equal(t, "{vault://redacted-value}", *cert.Key, "expected Konnect to have redacted certificate key") -} - func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { ctx := context.Background() configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} @@ -1168,10 +1015,8 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { configBuilder := newMockKongConfigBuilder() fallbackConfigGenerator := newMockFallbackConfigGenerator() gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, } kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), @@ -1295,10 +1140,8 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { ctx := context.Background() gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, } updateStrategyResolver := mocks.NewUpdateStrategyResolver() configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} @@ -1441,10 +1284,8 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { ctx := context.Background() gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, } updateStrategyResolver := mocks.NewUpdateStrategyResolver() configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} @@ -1561,11 +1402,9 @@ func TestKongClient_LastValidCacheSnapshot(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - testKonnectClient := mustSampleKonnectClient(t) testGatewayClient := mustSampleGatewayClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, } kongClient, err := NewKongClient( @@ -1620,7 +1459,6 @@ func TestKongClient_ConfigDumpSanitization(t *testing.T) { gatewayClients: []*adminapi.Client{ mustSampleGatewayClient(t), }, - konnectClient: mustSampleKonnectClient(t), } updateStrategyResolver := mocks.NewUpdateStrategyResolver() configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} diff --git a/internal/dataplane/sendconfig/dbmode.go b/internal/dataplane/sendconfig/dbmode.go index 0469895dfd..344ca37d27 100644 --- a/internal/dataplane/sendconfig/dbmode.go +++ b/internal/dataplane/sendconfig/dbmode.go @@ -38,24 +38,37 @@ type UpdateStrategyDBMode struct { resourceErrorLock *sync.Mutex } +// UpdateStrategyDBModeOpt is a functional option for UpdateStrategyDBMode. +type UpdateStrategyDBModeOpt func(*UpdateStrategyDBMode) + +// WithDiagnostic sets the diagnostic server to send diffs to. +func WithDiagnostic(diagnostic *diagnostics.ClientDiagnostic) UpdateStrategyDBModeOpt { + return func(s *UpdateStrategyDBMode) { + s.diagnostic = diagnostic + } +} + func NewUpdateStrategyDBMode( client *kong.Client, dumpConfig dump.Config, version semver.Version, concurrency int, - diagnostic *diagnostics.ClientDiagnostic, logger logr.Logger, + opts ...UpdateStrategyDBModeOpt, ) *UpdateStrategyDBMode { - return &UpdateStrategyDBMode{ + s := &UpdateStrategyDBMode{ client: client, dumpConfig: dumpConfig, version: version, concurrency: concurrency, - diagnostic: diagnostic, logger: logger, resourceErrors: []ResourceError{}, resourceErrorLock: &sync.Mutex{}, } + for _, opt := range opts { + opt(s) + } + return s } func NewUpdateStrategyDBModeKonnect( @@ -63,10 +76,9 @@ func NewUpdateStrategyDBModeKonnect( dumpConfig dump.Config, version semver.Version, concurrency int, - diagnostic *diagnostics.ClientDiagnostic, logger logr.Logger, ) *UpdateStrategyDBMode { - s := NewUpdateStrategyDBMode(client, dumpConfig, version, concurrency, diagnostic, logger) + s := NewUpdateStrategyDBMode(client, dumpConfig, version, concurrency, logger) s.isKonnect = true return s } diff --git a/internal/dataplane/sendconfig/strategy.go b/internal/dataplane/sendconfig/strategy.go index cfaca6addf..50ba90a992 100644 --- a/internal/dataplane/sendconfig/strategy.go +++ b/internal/dataplane/sendconfig/strategy.go @@ -108,20 +108,6 @@ func (r DefaultUpdateStrategyResolver) resolveUpdateStrategy( }, r.config.Version, r.config.Concurrency, - // The DB mode update strategy is used for both DB mode gateways and Konnect-integrated controllers. In the - // Konnect case, we don't actually want to collect diffs, and don't actually provide a diagnostic when setting - // it up, so we only collect and send diffs if we're talking to a gateway. - // - // TODO maybe this is wrong? I'm not sure if we actually support (or if not, explicitly prohibit) - // configuring a controller to use both DB mode and talk to Konnect, or if we only support DB-less when using - // Konnect. If those are mutually exclusive, maybe we can just collect diffs for Konnect mode? If they're - // not mutually exclusive, trying to do diagnostics diff updates for both the updates would have both attempt - // to store diffs. This is... maybe okay. They should be identical, but that's a load-bearing "should": we know - // Konnect can sometimes differ in what it accepts versus the gateway, and we have some Konnect configuration - // (consumer exclude, sensitive value mask) where they're _definitely_ different. That same configuration could - // make the diff confusing even if it's DB mode only, since it doesn't reflect what we're sending to the gateway - // in some cases. - nil, r.logger, ) } @@ -136,8 +122,8 @@ func (r DefaultUpdateStrategyResolver) resolveUpdateStrategy( }, r.config.Version, r.config.Concurrency, - diagnostic, r.logger, + WithDiagnostic(diagnostic), ) } diff --git a/internal/konnect/config_synchronizer.go b/internal/konnect/config_synchronizer.go index 1203cd5320..be65733c27 100644 --- a/internal/konnect/config_synchronizer.go +++ b/internal/konnect/config_synchronizer.go @@ -14,8 +14,12 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v3/internal/clients" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v3/internal/logging" "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" + "github.com/kong/kubernetes-ingress-controller/v3/internal/util" ) const ( @@ -25,106 +29,187 @@ const ( DefaultConfigUploadPeriod = 30 * time.Second ) -// ConfigSynchronizer runs a loop to upload the traslated Kong configuration to Konnect in the given period. +type ClientFactory interface { + NewKonnectClient(ctx context.Context) (*adminapi.KonnectClient, error) +} + +// ConfigSynchronizer runs a loop to upload the translated Kong configuration to Konnect periodically. type ConfigSynchronizer struct { - logger logr.Logger - syncTicker *time.Ticker - kongConfig sendconfig.Config - konnectClient *adminapi.KonnectClient + logger logr.Logger + kongConfig sendconfig.Config + konnectClientFactory ClientFactory + metricsRecorder metrics.Recorder updateStrategyResolver sendconfig.UpdateStrategyResolver configChangeDetector sendconfig.ConfigurationChangeDetector configStatusNotifier clients.ConfigStatusNotifier - targetContent *file.Content - lock sync.RWMutex + syncTicker *time.Ticker + + konnectAdminClient *adminapi.KonnectClient + konnectAdminClientLock sync.RWMutex + + // targetConfig is the latest configuration to be uploaded to Konnect. + targetConfig targetConfig + // configLock is used to prevent data + configLock sync.RWMutex +} + +type targetConfig struct { + // Content the configuration to be uploaded to Konnect. It represents the latest state of the configuration + // received from the KongClient. + Content *file.Content + + // IsFallback indicates whether the configuration is a fallback configuration. + IsFallback bool +} + +type ConfigSynchronizerParams struct { + Logger logr.Logger + KongConfig sendconfig.Config + ConfigUploadPeriod time.Duration + KonnectClientFactory ClientFactory + UpdateStrategyResolver sendconfig.UpdateStrategyResolver + ConfigChangeDetector sendconfig.ConfigurationChangeDetector + ConfigStatusNotifier clients.ConfigStatusNotifier + MetricsRecorder metrics.Recorder } -func NewConfigSynchronizer( - logger logr.Logger, - kongConfig sendconfig.Config, - configUploadPeriod time.Duration, - konnectClient *adminapi.KonnectClient, - updateStrategyResolver sendconfig.UpdateStrategyResolver, - configChangeDetector sendconfig.ConfigurationChangeDetector, - configStatusNotifier clients.ConfigStatusNotifier, - metricsRecorder metrics.Recorder, -) *ConfigSynchronizer { +func NewConfigSynchronizer(p ConfigSynchronizerParams) *ConfigSynchronizer { return &ConfigSynchronizer{ - logger: logger, - syncTicker: time.NewTicker(configUploadPeriod), - kongConfig: kongConfig, - konnectClient: konnectClient, - metricsRecorder: metricsRecorder, - updateStrategyResolver: updateStrategyResolver, - configChangeDetector: configChangeDetector, - configStatusNotifier: configStatusNotifier, + logger: p.Logger, + kongConfig: p.KongConfig, + syncTicker: time.NewTicker(p.ConfigUploadPeriod), + konnectClientFactory: p.KonnectClientFactory, + updateStrategyResolver: p.UpdateStrategyResolver, + configChangeDetector: p.ConfigChangeDetector, + configStatusNotifier: p.ConfigStatusNotifier, + metricsRecorder: p.MetricsRecorder, } } -var _ manager.Runnable = &ConfigSynchronizer{} +var _ manager.LeaderElectionRunnable = &ConfigSynchronizer{} // Start starts the loop to receive configuration and uplaod configuration to Konnect. func (s *ConfigSynchronizer) Start(ctx context.Context) error { - s.logger.Info("Started Konnect configuration synchronizer") - go s.runKonnectUpdateServer(ctx) + s.logger.Info("Starting Konnect configuration synchronizer") + + konnectAdminClient, err := s.konnectClientFactory.NewKonnectClient(ctx) + if err != nil { + s.logger.Error(err, "Failed to create Konnect client, skipping Konnect configuration synchronization") + + // We failed to set up Konnect client. We cannot proceed with running the synchronizer. + // As it's a manager runnable, we'll wait for the context to be done and return only then to not break the + // manager's start process. + <-ctx.Done() + return ctx.Err() + } + + // Set the Konnect client to be used to upload configuration and start the synchronizer main loop. + s.konnectAdminClientLock.Lock() + s.konnectAdminClient = konnectAdminClient + s.konnectAdminClientLock.Unlock() + s.logger.Info("Konnect client initialized, starting Konnect configuration synchronization") + s.run(ctx) + return nil } -// SetTargetContent stores the latest configuration in `file.Content` format. -// REVIEW: should we use channel to receive the configuration? -func (s *ConfigSynchronizer) SetTargetContent(targetContent *file.Content) { - s.lock.Lock() - defer s.lock.Unlock() - s.targetContent = targetContent +// NeedLeaderElection returns true to indicate that this runnable requires leader election. +// This is required to ensure that only one instance of the synchronizer is running at a time. +func (s *ConfigSynchronizer) NeedLeaderElection() bool { + return true } -// GetTargetContentCopy returns a copy of the latest configuration in `file.Content` format -// to prevent data race and long duration of occupying lock. -func (s *ConfigSynchronizer) GetTargetContentCopy() *file.Content { - s.lock.RLock() - defer s.lock.RUnlock() - return s.targetContent.DeepCopy() +// UpdateKongState updates the Kong state to be uploaded to Konnect asynchronously. It may not update the state if +// the Konnect client is not initialized yet. +func (s *ConfigSynchronizer) UpdateKongState( + ctx context.Context, + ks *kongstate.KongState, + isFallbackConfig bool, +) { + // Running the update in a goroutine to not block the caller (i.e. KongClient) as we want to make Konnect updates + // affect the critical path as little as possible. + go func() { + // Konnect client may not be initialized yet as that happens asynchronously after the synchronizer is started. + // UpdateKongState may be called before the initialization completes. + s.konnectAdminClientLock.RLock() + defer s.konnectAdminClientLock.RUnlock() + if s.konnectAdminClient == nil { + s.logger.Info("Konnect client not initialized yet, skipping Kong state update") + return + } + + // Sanitize the configuration dumps if configured to do so. + if s.kongConfig.SanitizeKonnectConfigDumps { + ks = ks.SanitizedCopy(util.DefaultUUIDGenerator{}) + } + + // Generate the deck content to be uploaded to Konnect. It may issue some API calls to Konnect to get additional + // information like plugin schemas. + deckGenParams := deckgen.GenerateDeckContentParams{ + SelectorTags: s.kongConfig.FilterTags, + ExpressionRoutes: s.kongConfig.ExpressionRoutes, + PluginSchemas: s.konnectAdminClient.PluginSchemaStore(), + } + targetContent := deckgen.ToDeckContent(ctx, s.logger, ks, deckGenParams) + + // Update the target configuration to be picked up by the synchronizer loop. + s.configLock.Lock() + defer s.configLock.Unlock() + s.targetConfig = targetConfig{ + Content: targetContent, + IsFallback: isFallbackConfig, + } + }() } -// runKonnectUpdateServer starts the loop to receive configuration and send configuration to Konenct. -func (s *ConfigSynchronizer) runKonnectUpdateServer(ctx context.Context) { +// run starts the loop uploading the current configuration to Konnect. +func (s *ConfigSynchronizer) run(ctx context.Context) { for { select { case <-ctx.Done(): - s.logger.Info("Context done: shutting down the Konnect update server") + s.logger.Info("Context done: shutting down the Konnect configuration synchronizer") s.syncTicker.Stop() case <-s.syncTicker.C: - s.logger.Info("Start uploading to Konnect") - client := s.konnectClient - if client == nil { - s.logger.Info("Konnect client not ready, skipping") - continue - } - // Copy target content to upload here because uploading full configuration to Konnect may cost too much time. - targetContent := s.GetTargetContentCopy() - if targetContent == nil { - s.logger.Info("No target content received, skipping") - continue - } - err := s.uploadConfig(ctx, client, targetContent) - if err != nil { - s.logger.Error(err, "failed to upload configuration to Konnect") - logKonnectErrors(s.logger, err) - } - s.configStatusNotifier.NotifyKonnectConfigStatus(ctx, clients.KonnectConfigUploadStatus{ - Failed: err != nil, - }) + s.handleConfigSynchronizationTick(ctx) } } } +func (s *ConfigSynchronizer) handleConfigSynchronizationTick(ctx context.Context) { + s.logger.V(logging.DebugLevel).Info("Start uploading configuration to Konnect") + + // Get the latest configuration copy to upload to Konnect. We don't want to hold the lock for a long time to prevent + // blocking the update of the configuration. + targetCfg := s.getTargetConfigCopy() + if targetCfg.Content == nil { + s.logger.Info("No configuration received yet, skipping Konnect configuration synchronization") + return + } + + // Upload the configuration to Konnect. + err := s.uploadConfig(ctx, s.konnectAdminClient, targetCfg) + if err != nil { + s.logger.Error(err, "Failed to upload configuration to Konnect") + logKonnectErrors(s.logger, err) + } + + // Notify the status of the configuration upload to the system reporting it. + s.configStatusNotifier.NotifyKonnectConfigStatus(ctx, clients.KonnectConfigUploadStatus{ + Failed: err != nil, + }) +} + // uploadConfig sends the given configuration to Konnect. -func (s *ConfigSynchronizer) uploadConfig(ctx context.Context, client *adminapi.KonnectClient, targetContent *file.Content) error { - const isFallback = false +func (s *ConfigSynchronizer) uploadConfig( + ctx context.Context, + client *adminapi.KonnectClient, + targetCfg targetConfig, +) error { // Remove consumers in target content if consumer sync is disabled. if client.ConsumersSyncDisabled() { - targetContent.Consumers = []file.FConsumer{} + targetCfg.Content.Consumers = []file.FConsumer{} } newSHA, err := sendconfig.PerformUpdate( @@ -132,14 +217,14 @@ func (s *ConfigSynchronizer) uploadConfig(ctx context.Context, client *adminapi. s.logger, client, s.kongConfig, - targetContent, + targetCfg.Content, // Konnect client does not upload custom entities. sendconfig.CustomEntitiesByType{}, s.metricsRecorder, s.updateStrategyResolver, s.configChangeDetector, nil, - isFallback, + targetCfg.IsFallback, ) if err != nil { return err @@ -148,10 +233,18 @@ func (s *ConfigSynchronizer) uploadConfig(ctx context.Context, client *adminapi. return nil } +// getTargetConfigCopy returns a copy of the latest configuration in `file.Content` format +// to prevent data race and long duration of occupying lock. +func (s *ConfigSynchronizer) getTargetConfigCopy() targetConfig { + s.configLock.RLock() + defer s.configLock.RUnlock() + return targetConfig{ + s.targetConfig.Content.DeepCopy(), + s.targetConfig.IsFallback, + } +} + // logKonnectErrors logs details of each error response returned from Konnect API. -// TODO: This is copied from internal/dataplane package. -// Remove the definition in dataplane package after using separate loop to upload config to Konnect: -// https://github.com/Kong/kubernetes-ingress-controller/issues/6338 func logKonnectErrors(logger logr.Logger, err error) { if crudActionErrors := deckerrors.ExtractCRUDActionErrors(err); len(crudActionErrors) > 0 { for _, actionErr := range crudActionErrors { diff --git a/internal/konnect/config_synchronizer_test.go b/internal/konnect/config_synchronizer_test.go index b42566cd97..31aa70b98f 100644 --- a/internal/konnect/config_synchronizer_test.go +++ b/internal/konnect/config_synchronizer_test.go @@ -1,4 +1,4 @@ -package konnect +package konnect_test import ( "context" @@ -17,66 +17,44 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v3/internal/clients" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v3/internal/konnect" "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" ) func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { t.Helper() - c, err := adminapi.NewKongAPIClient(fmt.Sprintf("https://%s.konghq.tech", uuid.NewString()), &http.Client{}) require.NoError(t, err) - rgID := uuid.NewString() return adminapi.NewKonnectClient(c, rgID, false) } -func TestConfigSynchronizer_GetTargetContentCopy(t *testing.T) { - content := &file.Content{ - FormatVersion: "3.0", - Services: []file.FService{ - { - Service: kong.Service{ - Name: kong.String("service1"), - Host: kong.String("example.com"), - }, - Routes: []*file.FRoute{ - { - Route: kong.Route{ - Name: kong.String("route1"), - Expression: kong.String("http.path == \"/foo\""), - }, - }, - }, - }, - }, - } - - s := &ConfigSynchronizer{} - s.SetTargetContent(content) - copiedContent := s.GetTargetContentCopy() - require.Equal(t, content, copiedContent, "Copied content should have values with same fields with original content") - require.NotSame(t, content, copiedContent, "Copied content should not point to the same object with the original content") -} - -func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { +func TestConfigSynchronizer_UpdatesKongConfigAccordingly(t *testing.T) { + log := logr.Discard() sendConfigPeriod := 10 * time.Millisecond testKonnectClient := mustSampleKonnectClient(t) resolver := mocks.NewUpdateStrategyResolver() - log := logr.Discard() - s := &ConfigSynchronizer{ - logger: logr.Discard(), - syncTicker: time.NewTicker(sendConfigPeriod), - konnectClient: testKonnectClient, - metricsRecorder: mocks.MetricsRecorder{}, - updateStrategyResolver: resolver, - configChangeDetector: sendconfig.NewDefaultConfigurationChangeDetector(log), - configStatusNotifier: clients.NoOpConfigStatusNotifier{}, - } + configStatusNotifier := clients.NewChannelConfigNotifier(log) + s := konnect.NewConfigSynchronizer( + konnect.ConfigSynchronizerParams{ + Logger: log, + ConfigUploadPeriod: sendConfigPeriod, + KonnectClientFactory: &mocks.KonnectClientFactory{Client: testKonnectClient}, + UpdateStrategyResolver: resolver, + ConfigChangeDetector: sendconfig.NewDefaultConfigurationChangeDetector(log), + ConfigStatusNotifier: configStatusNotifier, + MetricsRecorder: &mocks.MetricsRecorder{}, + }, + ) ctx, cancel := context.WithCancel(context.Background()) - err := s.Start(ctx) - require.NoError(t, err) + defer cancel() + go func() { + err := s.Start(ctx) + require.NoError(t, err) + }() t.Logf("Verifying that no URL are updated when no configuration received") require.Never(t, func() bool { @@ -84,7 +62,7 @@ func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { }, 10*sendConfigPeriod, sendConfigPeriod, "Should not update any URL when no configuration received") t.Logf("Verifying that the new config updated when received") - content := &file.Content{ + expectedContent := &file.Content{ FormatVersion: "3.0", Services: []file.FService{ { @@ -92,30 +70,34 @@ func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { Name: kong.String("service1"), Host: kong.String("example.com"), }, - Routes: []*file.FRoute{ - { - Route: kong.Route{ - Name: kong.String("route1"), - Expression: kong.String("http.path == \"/foo\""), - }, + }, + }, + } + kongState := func() *kongstate.KongState { + return &kongstate.KongState{ + Services: []kongstate.Service{ + { + Service: kong.Service{ + Name: kong.String("service1"), + Host: kong.String("example.com"), }, }, }, - }, + } } - s.SetTargetContent(content) + s.UpdateKongState(ctx, kongState(), false) require.EventuallyWithT(t, func(t *assert.CollectT) { urls := resolver.GetUpdateCalledForURLs() require.Len(t, urls, 1, "should update only one URL (Konnect)") url := urls[0] contentWithHash, ok := resolver.LastUpdatedContentForURL(url) require.True(t, ok, "should have last updated content for the URL") - require.Empty(t, cmp.Diff(content, contentWithHash.Content), "should send expected configuration") + require.Empty(t, cmp.Diff(expectedContent, contentWithHash.Content), "should send expected configuration") }, 10*sendConfigPeriod, sendConfigPeriod) t.Logf("Verifying that update is not called when config not changed") l := len(resolver.GetUpdateCalledForURLs()) - s.SetTargetContent(content) + s.UpdateKongState(ctx, kongState(), false) require.Never(t, func() bool { return len(resolver.GetUpdateCalledForURLs()) != l }, 10*sendConfigPeriod, sendConfigPeriod) @@ -123,16 +105,13 @@ func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { t.Logf("Verifying that new config are not sent after context cancelled") cancel() <-ctx.Done() - // modify content - newContent := content.DeepCopy() - newContent.Consumers = []file.FConsumer{ - { - Consumer: kong.Consumer{ - Username: kong.String("consumer-1"), - }, - }, - } - s.SetTargetContent(newContent) + + // Modify the Kong state and expected content and update it again. + state := kongState() + state.Services[0].Host = kong.String("example.org") + expectedContent.Services[0].Host = kong.String("example.org") + s.UpdateKongState(ctx, state, false) + // The latest updated content should always be the content in the previous update // because it should not update new content after context cancelled. require.Never(t, func() bool { @@ -149,6 +128,59 @@ func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { if !ok { return false } - return !(assert.ObjectsAreEqual(content, contentWithHash.Content)) + return assert.ObjectsAreEqual(expectedContent, contentWithHash.Content) }, 10*sendConfigPeriod, sendConfigPeriod, "Should not send new updates after context cancelled") } + +func TestConfigSynchronizer_ConfigIsSanitizedWhenConfiguredSo(t *testing.T) { + log := logr.Discard() + sendConfigPeriod := 10 * time.Millisecond + testKonnectClient := mustSampleKonnectClient(t) + resolver := mocks.NewUpdateStrategyResolver() + configStatusNotifier := clients.NewChannelConfigNotifier(log) + s := konnect.NewConfigSynchronizer( + konnect.ConfigSynchronizerParams{ + Logger: log, + KongConfig: sendconfig.Config{ + SanitizeKonnectConfigDumps: true, + }, + ConfigUploadPeriod: sendConfigPeriod, + KonnectClientFactory: &mocks.KonnectClientFactory{Client: testKonnectClient}, + UpdateStrategyResolver: resolver, + ConfigChangeDetector: sendconfig.NewDefaultConfigurationChangeDetector(log), + ConfigStatusNotifier: configStatusNotifier, + MetricsRecorder: &mocks.MetricsRecorder{}, + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + err := s.Start(ctx) + require.NoError(t, err) + }() + + t.Log("Updating Kong state with sensitive information") + kongState := &kongstate.KongState{ + Certificates: []kongstate.Certificate{ + { + Certificate: kong.Certificate{ + ID: kong.String("new_cert"), + Key: kong.String(`private-key-string`), // This should be redacted. + }, + }, + }, + } + s.UpdateKongState(ctx, kongState, false) + + t.Log("Verifying that the sensitive information is redacted") + require.EventuallyWithT(t, func(t *assert.CollectT) { + konnectContent, ok := resolver.LastUpdatedContentForURL(testKonnectClient.BaseRootURL()) + require.True(t, ok, "should have last updated content for the URL") + + require.Len(t, konnectContent.Content.Certificates, 1, "expected 1 certificate") + cert := konnectContent.Content.Certificates[0] + require.NotNil(t, cert.Key, "expected certificate key") + require.Equal(t, "{vault://redacted-value}", *cert.Key, "expected redacted certificate key") + }, 10*sendConfigPeriod, sendConfigPeriod) +} diff --git a/internal/manager/run.go b/internal/manager/run.go index 79159c441c..a987df039d 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -13,6 +13,7 @@ import ( "github.com/avast/retry-go/v4" "github.com/blang/semver/v4" "github.com/go-logr/logr" + "github.com/google/uuid" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" @@ -272,42 +273,34 @@ func Run( instanceIDProvider := NewInstanceIDProvider() if c.Konnect.ConfigSynchronizationEnabled { - konnectNodesAPIClient, err := nodes.NewClient(c.Konnect) - if err != nil { - return fmt.Errorf("failed creating konnect client: %w", err) - } // In case of failures when building Konnect related objects, we're not returning errors as Konnect is not // considered critical feature, and it should not break the basic functionality of the controller. - // Run the Konnect Admin API client initialization in a separate goroutine to not block while ensuring - // connection. - go setupKonnectAdminAPIClientWithClientsMgr(ctx, c.Konnect, clientsManager, setupLog) - - // Set channel to send config status. + // Set up a config status notifier to be used by Konnect related components. Register it with the data plane + // client so it can send status updates to Konnect. configStatusNotifier := clients.NewChannelConfigNotifier(logger) dataplaneClient.SetConfigStatusNotifier(configStatusNotifier) - // Setup Konnect config synchronizer. - konnectConfigSynchronizer, err := setupKonnectConfigSynchronizer( + // Setup Konnect ConfigSynchronizer with manager. + konnectConfigSynchronizer, err := setupKonnectConfigSynchronizerWithMgr( ctx, mgr, - c.Konnect.UploadConfigPeriod, + c, kongConfig, - clientsManager, updateStrategyResolver, configStatusNotifier, metricsRecorder, ) if err != nil { setupLog.Error(err, "Failed to setup Konnect configuration synchronizer with manager, skipping") + } else { + dataplaneClient.SetKonnectKongStateUpdater(konnectConfigSynchronizer) } - dataplaneClient.SetKonnectConfigSynchronizer(konnectConfigSynchronizer) // Setup Konnect NodeAgent with manager. if err := setupKonnectNodeAgentWithMgr( c, mgr, - konnectNodesAPIClient, configStatusNotifier, clientsManager, setupLog, @@ -315,7 +308,6 @@ func Run( ); err != nil { setupLog.Error(err, "Failed to setup Konnect NodeAgent with manager, skipping") } - } // Setup and inject license getter. @@ -413,33 +405,24 @@ func waitForKubernetesAPIReadiness(ctx context.Context, logger logr.Logger, mgr } // setupKonnectNodeAgentWithMgr creates and adds Konnect NodeAgent as the manager's Runnable. -// Returns error if failed to create Konnect NodeAgent. func setupKonnectNodeAgentWithMgr( c *Config, mgr manager.Manager, - konnectNodeAPIClient *nodes.Client, configStatusSubscriber clients.ConfigStatusSubscriber, clientsManager *clients.AdminAPIClientsManager, logger logr.Logger, instanceIDProvider *InstanceIDProvider, ) error { - var hostname string - nn, err := util.GetPodNN() + konnectNodesAPIClient, err := nodes.NewClient(c.Konnect) if err != nil { - logger.Error(err, "Failed getting pod name and/or namespace, fallback to use hostname as node name in Konnect") - hostname, _ = os.Hostname() - } else { - hostname = nn.String() - logger.Info(fmt.Sprintf("Using %s as controller's node name in Konnect", hostname)) + return fmt.Errorf("failed creating konnect client: %w", err) } - version := metadata.Release - agent := konnect.NewNodeAgent( - hostname, - version, + resolveControllerHostnameForKonnect(logger), + metadata.Release, c.Konnect.RefreshNodePeriod, logger, - konnectNodeAPIClient, + konnectNodesAPIClient, configStatusSubscriber, konnect.NewGatewayClientGetter(logger, clientsManager), clientsManager, @@ -451,26 +434,22 @@ func setupKonnectNodeAgentWithMgr( return nil } -// setupKonnectAdminAPIClientWithClientsMgr initializes Konnect Admin API client and sets it to clientsManager. -// If it fails to initialize the client, it logs the error and returns. -func setupKonnectAdminAPIClientWithClientsMgr( - ctx context.Context, - config adminapi.KonnectConfig, - clientsManager *clients.AdminAPIClientsManager, - logger logr.Logger, -) { - konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectControlPlane(config) +// resolveControllerHostnameForKonnect resolves the hostname to be used by Konnect NodeAgent. It tries to get the pod +// name and namespace, and if it fails, it falls back to using the hostname. If that fails too, it generates a random +// UUID. +func resolveControllerHostnameForKonnect(logger logr.Logger) string { + nn, err := util.GetPodNN() if err != nil { - logger.Error(err, "Failed creating Konnect Control Plane Admin API client, skipping synchronisation") - return - } - if err := adminapi.EnsureKonnectConnection(ctx, konnectAdminAPIClient.AdminAPIClient(), logger); err != nil { - logger.Error(err, "Failed to ensure connection to Konnect Admin API, skipping synchronisation") - return + logger.Error(err, "Failed getting pod name and/or namespace, falling back to use hostname as node name in Konnect") + hostname, err := os.Hostname() + if err != nil { + logger.Error(err, "Failed getting hostname, falling back to random UUID as node name in Konnect") + return uuid.NewString() + } + return hostname } - - clientsManager.SetKonnectClient(konnectAdminAPIClient) - logger.Info("Initialized Konnect Admin API client") + logger.WithValues("hostname", nn.String()).Info("Resolved controller hostname for Konnect") + return nn.String() } type IsReady interface { diff --git a/internal/manager/setup.go b/internal/manager/setup.go index 08f26be560..45b3f31255 100644 --- a/internal/manager/setup.go +++ b/internal/manager/setup.go @@ -502,32 +502,33 @@ func setupLicenseGetter( return nil, nil } -// setupKonnectConfigSynchronizer sets up Konnect config sychronizer and adds it to the manager runnables. -func setupKonnectConfigSynchronizer( +// setupKonnectConfigSynchronizerWithMgr sets up Konnect config sychronizer and adds it to the manager runnables. +func setupKonnectConfigSynchronizerWithMgr( ctx context.Context, mgr manager.Manager, - configUploadPeriod time.Duration, + cfg *Config, kongConfig sendconfig.Config, - clientsProvider clients.AdminAPIClientsProvider, updateStrategyResolver sendconfig.UpdateStrategyResolver, configStatusNotifier clients.ConfigStatusNotifier, metricsRecorder metrics.Recorder, ) (*konnect.ConfigSynchronizer, error) { - logger := ctrl.LoggerFrom(ctx).WithName("konnect-config-synchronizer") s := konnect.NewConfigSynchronizer( - ctrl.LoggerFrom(ctx).WithName("konnect-config-synchronizer"), - kongConfig, - configUploadPeriod, - clientsProvider.KonnectClient(), - updateStrategyResolver, - sendconfig.NewDefaultConfigurationChangeDetector(logger), - configStatusNotifier, - metricsRecorder, + konnect.ConfigSynchronizerParams{ + Logger: ctrl.LoggerFrom(ctx).WithName("konnect-config-synchronizer"), + KongConfig: kongConfig, + ConfigUploadPeriod: cfg.Konnect.UploadConfigPeriod, + KonnectClientFactory: adminapi.NewKonnectClientFactory(cfg.Konnect, ctrl.LoggerFrom(ctx).WithName("konnect-client-factory")), + UpdateStrategyResolver: updateStrategyResolver, + ConfigChangeDetector: sendconfig.NewDefaultConfigurationChangeDetector( + ctrl.LoggerFrom(ctx).WithName("konnect-config-change-detector"), + ), + ConfigStatusNotifier: configStatusNotifier, + MetricsRecorder: metricsRecorder, + }, ) err := mgr.Add(s) if err != nil { - return nil, err + return nil, fmt.Errorf("could not add Konnect config synchronizer to manager: %w", err) } - return s, nil } diff --git a/test/kongintegration/dbmode_update_strategy_test.go b/test/kongintegration/dbmode_update_strategy_test.go index 79ba3f9318..5a0cdcf3ef 100644 --- a/test/kongintegration/dbmode_update_strategy_test.go +++ b/test/kongintegration/dbmode_update_strategy_test.go @@ -52,7 +52,6 @@ func TestUpdateStrategyDBMode(t *testing.T) { dump.Config{}, semver.MustParse("3.6.0"), 10, - nil, logger, ) diff --git a/test/kongintegration/kong_client_golden_tests_outputs_test.go b/test/kongintegration/kong_client_golden_tests_outputs_test.go index 4d35361d4b..2520866f4f 100644 --- a/test/kongintegration/kong_client_golden_tests_outputs_test.go +++ b/test/kongintegration/kong_client_golden_tests_outputs_test.go @@ -105,7 +105,7 @@ func TestKongClientGoldenTestsOutputs_Konnect(t *testing.T) { updateStrategy := sendconfig.NewUpdateStrategyDBModeKonnect(adminAPIClient.AdminAPIClient(), dump.Config{ SkipCACerts: true, KonnectControlPlane: cpID, - }, semver.MustParse("3.5.0"), 10, nil, logr.Discard()) + }, semver.MustParse("3.5.0"), 10, logr.Discard()) for _, goldenTestOutputPath := range allGoldenTestsOutputsPaths(t) { t.Run(goldenTestOutputPath, func(t *testing.T) { diff --git a/test/mocks/konnect_client_factory.go b/test/mocks/konnect_client_factory.go new file mode 100644 index 0000000000..bbf1ed3b04 --- /dev/null +++ b/test/mocks/konnect_client_factory.go @@ -0,0 +1,16 @@ +package mocks + +import ( + "context" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" +) + +// KonnectClientFactory is a mock implementation of konnect.ClientFactory. +type KonnectClientFactory struct { + Client *adminapi.KonnectClient +} + +func (f *KonnectClientFactory) NewKonnectClient(context.Context) (*adminapi.KonnectClient, error) { + return f.Client, nil +} diff --git a/test/mocks/konnect_kongstate_updater.go b/test/mocks/konnect_kongstate_updater.go new file mode 100644 index 0000000000..ac463c8fc4 --- /dev/null +++ b/test/mocks/konnect_kongstate_updater.go @@ -0,0 +1,28 @@ +package mocks + +import ( + "context" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" +) + +// KonnectKongStateUpdater is a mock implementation of dataplane.KonnectKongStateUpdater. +type KonnectKongStateUpdater struct { + calls []KonnectKongStateUpdaterCall +} + +type KonnectKongStateUpdaterCall struct { + KongState *kongstate.KongState + IsFallback bool +} + +func (k *KonnectKongStateUpdater) UpdateKongState(_ context.Context, kongState *kongstate.KongState, isFallback bool) { + k.calls = append(k.calls, KonnectKongStateUpdaterCall{ + KongState: kongState, + IsFallback: isFallback, + }) +} + +func (k *KonnectKongStateUpdater) Calls() []KonnectKongStateUpdaterCall { + return k.calls +}