Skip to content

Commit

Permalink
chore: don't use KonnectClient in ClientsProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Dec 23, 2024
1 parent d258100 commit a9d2248
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 493 deletions.
27 changes: 27 additions & 0 deletions internal/adminapi/konnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,30 @@ func KonnectHTTPDoer() kong.Doer {
return resp, nil
}
}

// KonnectClientFactory is a factory to create KonnectClient instances.
type KonnectClientFactory struct {
konnectConfig KonnectConfig
logger logr.Logger
}

// NewKonnectClientFactory creates a new KonnectClientFactory instance.
func NewKonnectClientFactory(konnectConfig KonnectConfig, logger logr.Logger) *KonnectClientFactory {
return &KonnectClientFactory{
konnectConfig: konnectConfig,
logger: logger,
}
}

// NewKonnectClient create a new KonnectClient instance, ensuring the connection to Konnect Admin API.
// Please note it may block for a few seconds while trying to connect to Konnect Admin API.
func (f *KonnectClientFactory) NewKonnectClient(ctx context.Context) (*KonnectClient, error) {
konnectAdminAPIClient, err := NewKongClientForKonnectControlPlane(f.konnectConfig)
if err != nil {
return nil, fmt.Errorf("failed creating Konnect Control Plane Admin API client: %w", err)
}
if err := EnsureKonnectConnection(ctx, konnectAdminAPIClient.AdminAPIClient(), f.logger); err != nil {
return nil, fmt.Errorf("failed to ensure connection to Konnect Admin API: %w", err)
}
return konnectAdminAPIClient, nil
}
1 change: 1 addition & 0 deletions internal/clients/config_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type GatewayConfigApplyStatus struct {

// KonnectConfigUploadStatus stores the status of uploading configuration to Konnect.
type KonnectConfigUploadStatus struct {
// Failed indicates whether the config upload to Konnect failed.
Failed bool
}

Expand Down
19 changes: 0 additions & 19 deletions internal/clients/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type ClientFactory interface {
// AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that
// we should configure.
type AdminAPIClientsProvider interface {
KonnectClient() *adminapi.KonnectClient
GatewayClients() []*adminapi.Client
GatewayClientsToConfigure() []*adminapi.Client
}
Expand Down Expand Up @@ -75,10 +74,6 @@ type AdminAPIClientsManager struct {
// readinessReconciliationTicker is used to run readiness reconciliation loop.
readinessReconciliationTicker Ticker

// konnectClient represents a special-case of the data-plane which is Konnect cloud.
// This client is used to synchronise configuration with Konnect's Control Plane Admin API.
konnectClient *adminapi.KonnectClient

// lock prevents concurrent access to the manager's fields.
lock sync.RWMutex

Expand Down Expand Up @@ -177,20 +172,6 @@ func (c *AdminAPIClientsManager) Notify(discoveredAPIs []adminapi.DiscoveredAdmi
}
}

// SetKonnectClient sets a client that will be used to communicate with Konnect Control Plane Admin API.
// If called multiple times, it will override the client.
func (c *AdminAPIClientsManager) SetKonnectClient(client *adminapi.KonnectClient) {
c.lock.Lock()
defer c.lock.Unlock()
c.konnectClient = client
}

func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient {
c.lock.RLock()
defer c.lock.RUnlock()
return c.konnectClient
}

// GatewayClients returns a copy of current client's slice. Konnect client won't be included.
// This method can be used when some actions need to be performed only against Kong Gateway clients.
func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client {
Expand Down
6 changes: 0 additions & 6 deletions internal/clients/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,6 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) {
require.Len(t, m.GatewayClients(), 1, "expecting one initial client")
require.Equal(t, m.GatewayClientsCount(), 1, "expecting one initial client")
require.Len(t, m.GatewayClientsToConfigure(), 1, "Expecting one initial client")

konnectTestClient := &adminapi.KonnectClient{}
m.SetKonnectClient(konnectTestClient)
require.Len(t, m.GatewayClients(), 1, "konnect client should not be returned from GatewayClients")
require.Equal(t, m.GatewayClientsCount(), 1, "konnect client should not be counted in GatewayClientsCount")
require.Equal(t, konnectTestClient, m.KonnectClient(), "konnect client should be returned from KonnectClient")
}

func TestAdminAPIClientsManager_Clients_DBMode(t *testing.T) {
Expand Down
63 changes: 22 additions & 41 deletions internal/dataplane/kong_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator"
"github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics"
"github.com/kong/kubernetes-ingress-controller/v3/internal/konnect"
"github.com/kong/kubernetes-ingress-controller/v3/internal/logging"
"github.com/kong/kubernetes-ingress-controller/v3/internal/metrics"
"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
Expand Down Expand Up @@ -84,6 +83,11 @@ type FallbackConfigGenerator interface {
) (store.CacheStores, fallback.GeneratedCacheMetadata, error)
}

// KonnectKongStateUpdater is an interface for updating the current state of configuration seen by Konnect.
type KonnectKongStateUpdater interface {
UpdateKongState(ctx context.Context, kongState *kongstate.KongState, isFallback bool)
}

// KongClient is a threadsafe high level API client for the Kong data-plane(s)
// which parses Kubernetes object caches into Kong Admin configurations and
// sends them as updates to the data-plane(s) (Kong Admin API).
Expand Down Expand Up @@ -180,9 +184,9 @@ type KongClient struct {
// lastValidCacheSnapshot can also represent the fallback cache snapshot that was successfully synced with gateways.
lastValidCacheSnapshot *store.CacheStores

// konnectConfigSynchronizer receives latest successfully applied Kong configuration from KongClient
// and uploads it to Konnect.
konnectConfigSynchronizer *konnect.ConfigSynchronizer
// konnectKongStateUpdater is used to update the current state seen by Konnect that will be picked asynchronously
// by the Konnect config synchronization loop.
konnectKongStateUpdater KonnectKongStateUpdater
}

// NewKongClient provides a new KongClient object after connecting to the
Expand Down Expand Up @@ -492,7 +496,7 @@ func (c *KongClient) Update(ctx context.Context) error {
const isFallback = false
shas, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, parsingResult.KongState, c.kongConfig, isFallback)

// Taking into account the results of syncing configuration with Gateways and Konnect, and potential translation
// Taking into account the results of syncing configuration with Gateways and potential translation
// failures, calculate the config status and update it.
c.configStatusNotifier.NotifyGatewayConfigStatus(ctx, clients.GatewayConfigApplyStatus{
TranslationFailuresOccurred: len(parsingResult.TranslationFailures) > 0,
Expand All @@ -513,8 +517,8 @@ func (c *KongClient) Update(ctx context.Context) error {
return gatewaysSyncErr
}

// Send configuration to Konnect ONLY when successfully applied configuration to gateways.
c.maybeSendOutToKonnectClient(ctx, parsingResult.KongState, c.kongConfig, isFallback)
// Send configuration to Konnect only when successfully applied configuration to gateways.
c.maybeUpdateKonnectKongState(ctx, parsingResult.KongState, isFallback)
// Gateways were successfully synced with the current configuration, so we can update the last valid cache snapshot.
c.maybePreserveTheLastValidConfigCache(cacheSnapshot)

Expand Down Expand Up @@ -634,7 +638,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration(
if gatewaysSyncErr != nil {
return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr)
}
c.maybeSendOutToKonnectClient(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback)
c.maybeUpdateKonnectKongState(ctx, fallbackParsingResult.KongState, isFallback)

// Configuration was successfully recovered with the fallback configuration. Store the last valid configuration.
c.maybePreserveTheLastValidConfigCache(fallbackCache)
Expand Down Expand Up @@ -731,34 +735,16 @@ func (c *KongClient) sendOutToGatewayClients(
return previousSHAs, nil
}

// maybeSendOutToKonnectClient sends out the configuration to Konnect when KonnectClient is provided.
// It's a noop when Konnect integration is not enabled.
func (c *KongClient) maybeSendOutToKonnectClient(
// maybeUpdateKonnectKongState updates the KongState seen by Konnect if the konnectKongStateUpdater is set.
func (c *KongClient) maybeUpdateKonnectKongState(
ctx context.Context,
s *kongstate.KongState,
config sendconfig.Config,
_ bool,
isFallback bool,
) {
if c.konnectConfigSynchronizer == nil {
if c.konnectKongStateUpdater == nil {
return
}
konnectClient := c.clientsProvider.KonnectClient()
if konnectClient == nil {
return
}

if config.SanitizeKonnectConfigDumps {
s = s.SanitizedCopy(util.DefaultUUIDGenerator{})
}

deckGenParams := deckgen.GenerateDeckContentParams{
SelectorTags: config.FilterTags,
ExpressionRoutes: config.ExpressionRoutes,
PluginSchemas: konnectClient.PluginSchemaStore(),
AppendStubEntityWhenConfigEmpty: false,
}
targetContent := deckgen.ToDeckContent(ctx, c.logger, s, deckGenParams)
c.konnectConfigSynchronizer.SetTargetContent(targetContent)
c.konnectKongStateUpdater.UpdateKongState(ctx, s, isFallback)
}

func (c *KongClient) sendToClient(
Expand All @@ -770,16 +756,11 @@ func (c *KongClient) sendToClient(
) (string, error) {
logger := c.logger.WithValues("url", client.AdminAPIClient().BaseRootURL())

// If the client is Konnect and the feature flag is turned on,
// we should sanitize the configuration before sending it out.
if client.IsKonnect() && config.SanitizeKonnectConfigDumps {
s = s.SanitizedCopy(util.DefaultUUIDGenerator{})
}
deckGenParams := deckgen.GenerateDeckContentParams{
SelectorTags: config.FilterTags,
ExpressionRoutes: config.ExpressionRoutes,
PluginSchemas: client.PluginSchemaStore(),
AppendStubEntityWhenConfigEmpty: !client.IsKonnect() && config.InMemory,
AppendStubEntityWhenConfigEmpty: config.InMemory,
}
targetContent := deckgen.ToDeckContent(ctx, logger, s, deckGenParams)
customEntities := make(sendconfig.CustomEntitiesByType)
Expand Down Expand Up @@ -846,19 +827,19 @@ func (c *KongClient) sendToClient(
}

// SetConfigStatusNotifier sets a notifier which notifies subscribers about configuration sending results.
// Currently it is used for uploading the node status to konnect control plane.
// Currently, it is used for uploading the node status to Konnect control plane.
func (c *KongClient) SetConfigStatusNotifier(n clients.ConfigStatusNotifier) {
c.lock.Lock()
defer c.lock.Unlock()

c.configStatusNotifier = n
}

func (c *KongClient) SetKonnectConfigSynchronizer(s *konnect.ConfigSynchronizer) {
// SetKonnectKongStateUpdater sets the KonnectKongStateUpdater which updates the KongState seen by Konnect.
func (c *KongClient) SetKonnectKongStateUpdater(u KonnectKongStateUpdater) {
c.lock.Lock()
defer c.lock.Unlock()

c.konnectConfigSynchronizer = s
c.konnectKongStateUpdater = u
}

// -----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit a9d2248

Please sign in to comment.