diff --git a/pkg/runtime/cache/account.go b/pkg/runtime/cache/account.go index 5795bab..22a9c8e 100644 --- a/pkg/runtime/cache/account.go +++ b/pkg/runtime/cache/account.go @@ -34,25 +34,13 @@ const ( // make the changes accordingly. type AccountCache struct { sync.RWMutex - - log logr.Logger - - // ConfigMap informer - informer k8scache.SharedInformer + log logr.Logger roleARNs map[string]string } -// NewAccountCache makes a new AccountCache from a client.Interface -// and a logr.Logger -func NewAccountCache(clientset kubernetes.Interface, log logr.Logger) *AccountCache { - sharedInformer := informersv1.NewConfigMapInformer( - clientset, - currentNamespace, - informerResyncPeriod, - k8scache.Indexers{}, - ) +// NewAccountCache instanciate a new AccountCache. +func NewAccountCache(log logr.Logger) *AccountCache { return &AccountCache{ - informer: sharedInformer, log: log.WithName("cache.account"), roleARNs: make(map[string]string), } @@ -65,10 +53,15 @@ func resourceMatchACKRoleAccountsConfigMap(raw interface{}) bool { return ok && object.ObjectMeta.Name == ACKRoleAccountMap } -// Run adds the default event handler functions to the SharedInformer and -// runs the informer to begin processing items. -func (c *AccountCache) Run(stopCh <-chan struct{}) { - c.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ +// Run instantiate a new SharedInformer for ConfigMaps and runs it to begin processing items. +func (c *AccountCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{}) { + informer := informersv1.NewConfigMapInformer( + clientSet, + currentNamespace, + informerResyncPeriod, + k8scache.Indexers{}, + ) + informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { if resourceMatchACKRoleAccountsConfigMap(obj) { cm := obj.(*corev1.ConfigMap) @@ -95,7 +88,7 @@ func (c *AccountCache) Run(stopCh <-chan struct{}) { } }, }) - go c.informer.Run(stopCh) + go informer.Run(stopCh) } // GetAccountRoleARN queries the AWS accountID associated Role ARN diff --git a/pkg/runtime/cache/account_test.go b/pkg/runtime/cache/account_test.go index 3043662..948c7c9 100644 --- a/pkg/runtime/cache/account_test.go +++ b/pkg/runtime/cache/account_test.go @@ -61,12 +61,12 @@ func TestAccountCache(t *testing.T) { fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions)) // initlizing account cache - accountCache := ackrtcache.NewAccountCache(k8sClient, fakeLogger) + accountCache := ackrtcache.NewAccountCache(fakeLogger) stopCh := make(chan struct{}) - accountCache.Run(stopCh) + accountCache.Run(k8sClient, stopCh) // Test create events - k8sClient.CoreV1().ConfigMaps(testNamespace).Create( + _, err := k8sClient.CoreV1().ConfigMaps(testNamespace).Create( context.Background(), &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -76,6 +76,7 @@ func TestAccountCache(t *testing.T) { }, metav1.CreateOptions{}, ) + require.Nil(t, err) time.Sleep(time.Second) diff --git a/pkg/runtime/cache/cache.go b/pkg/runtime/cache/cache.go index 360f65c..5b5b0d4 100644 --- a/pkg/runtime/cache/cache.go +++ b/pkg/runtime/cache/cache.go @@ -56,23 +56,22 @@ type Caches struct { Namespaces *NamespaceCache } -// New creates a new Caches object from a kubernetes.Interface and -// a logr.Logger -func New(clientset kubernetes.Interface, log logr.Logger, watchNamespace string) Caches { +// New instantiate a new Caches object. +func New(log logr.Logger) Caches { return Caches{ - Accounts: NewAccountCache(clientset, log), - Namespaces: NewNamespaceCache(clientset, log, watchNamespace), + Accounts: NewAccountCache(log), + Namespaces: NewNamespaceCache(log), } } // Run runs all the owned caches -func (c Caches) Run() { +func (c Caches) Run(clientSet kubernetes.Interface) { stopCh := make(chan struct{}) if c.Accounts != nil { - c.Accounts.Run(stopCh) + c.Accounts.Run(clientSet, stopCh) } if c.Namespaces != nil { - c.Namespaces.Run(stopCh) + c.Namespaces.Run(clientSet, stopCh) } c.stopCh = stopCh } diff --git a/pkg/runtime/cache/namespace.go b/pkg/runtime/cache/namespace.go index 459d6f6..44454d2 100644 --- a/pkg/runtime/cache/namespace.go +++ b/pkg/runtime/cache/namespace.go @@ -63,74 +63,60 @@ func (n *namespaceInfo) getEndpointURL() string { // annotations, and caching those related to the ACK controller. type NamespaceCache struct { sync.RWMutex - log logr.Logger - // Provide a namespace specifically to listen to. - // Provide empty string to listen to all namespaces except kube-system and kube-public. - watchNamespace string - - // Namespace informer - informer k8scache.SharedInformer // namespaceInfos maps namespaces names to their known namespaceInfo namespaceInfos map[string]*namespaceInfo } -// NewNamespaceCache makes a new NamespaceCache from a -// kubernetes.Interface and a logr.Logger -func NewNamespaceCache(clientset kubernetes.Interface, log logr.Logger, watchNamespace string) *NamespaceCache { - sharedInformer := informersv1.NewNamespaceInformer( - clientset, - informerResyncPeriod, - k8scache.Indexers{}, - ) +// NewNamespaceCache instanciate a new NamespaceCache. +func NewNamespaceCache(log logr.Logger) *NamespaceCache { return &NamespaceCache{ - informer: sharedInformer, log: log.WithName("cache.namespace"), - watchNamespace: watchNamespace, namespaceInfos: make(map[string]*namespaceInfo), } } -// Check if the provided namespace should be listened to or not -func isWatchNamespace(raw interface{}, watchNamespace string) bool { +// isIgnoredNamespace returns true if an object is of type corev1.Namespace +// and its metadata name is one of 'ack-system', 'kube-system' or 'kube-public' +func isIgnoredNamespace(raw interface{}) bool { object, ok := raw.(*corev1.Namespace) - if !ok { - return false - } - - if watchNamespace != "" { - return watchNamespace == object.ObjectMeta.Name - } - return object.ObjectMeta.Name != "kube-system" && object.ObjectMeta.Name != "kube-public" + return ok && + (object.ObjectMeta.Name == "ack-system" || + object.ObjectMeta.Name == "kube-system" || + object.ObjectMeta.Name == "kube-public") } -// Run adds event handler functions to the SharedInformer and -// runs the informer to begin processing items. -func (c *NamespaceCache) Run(stopCh <-chan struct{}) { - c.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ +// Run instantiate a new shared informer for namespaces and runs it to begin processing items. +func (c *NamespaceCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{}) { + informer := informersv1.NewNamespaceInformer( + clientSet, + informerResyncPeriod, + k8scache.Indexers{}, + ) + informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - if isWatchNamespace(obj, c.watchNamespace) { + if !isIgnoredNamespace(obj) { ns := obj.(*corev1.Namespace) c.setNamespaceInfoFromK8sObject(ns) c.log.V(1).Info("created namespace", "name", ns.ObjectMeta.Name) } }, UpdateFunc: func(orig, desired interface{}) { - if isWatchNamespace(desired, c.watchNamespace) { + if !isIgnoredNamespace(desired) { ns := desired.(*corev1.Namespace) c.setNamespaceInfoFromK8sObject(ns) c.log.V(1).Info("updated namespace", "name", ns.ObjectMeta.Name) } }, DeleteFunc: func(obj interface{}) { - if isWatchNamespace(obj, c.watchNamespace) { + if !isIgnoredNamespace(obj) { ns := obj.(*corev1.Namespace) c.deleteNamespaceInfo(ns.ObjectMeta.Name) c.log.V(1).Info("deleted namespace", "name", ns.ObjectMeta.Name) } }, }) - go c.informer.Run(stopCh) + go informer.Run(stopCh) } // GetDefaultRegion returns the default region if it it exists diff --git a/pkg/runtime/cache/namespace_test.go b/pkg/runtime/cache/namespace_test.go index f48334b..8fe5cc1 100644 --- a/pkg/runtime/cache/namespace_test.go +++ b/pkg/runtime/cache/namespace_test.go @@ -49,13 +49,13 @@ func TestNamespaceCache(t *testing.T) { fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions)) // initlizing account cache - namespaceCache := ackrtcache.NewNamespaceCache(k8sClient, fakeLogger, "") + namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger) stopCh := make(chan struct{}) - namespaceCache.Run(stopCh) + namespaceCache.Run(k8sClient, stopCh) // Test create events - k8sClient.CoreV1().Namespaces().Create( + _, err := k8sClient.CoreV1().Namespaces().Create( context.Background(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -69,6 +69,7 @@ func TestNamespaceCache(t *testing.T) { }, metav1.CreateOptions{}, ) + require.Nil(t, err) time.Sleep(time.Second) @@ -85,7 +86,7 @@ func TestNamespaceCache(t *testing.T) { require.Equal(t, "https://amazon-service.region.amazonaws.com", endpointURL) // Test update events - k8sClient.CoreV1().Namespaces().Update( + _, err = k8sClient.CoreV1().Namespaces().Update( context.Background(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -99,6 +100,7 @@ func TestNamespaceCache(t *testing.T) { }, metav1.UpdateOptions{}, ) + require.Nil(t, err) time.Sleep(time.Second) @@ -115,11 +117,12 @@ func TestNamespaceCache(t *testing.T) { require.Equal(t, "https://amazon-other-service.region.amazonaws.com", endpointURL) // Test delete events - k8sClient.CoreV1().Namespaces().Delete( + err = k8sClient.CoreV1().Namespaces().Delete( context.Background(), "production", metav1.DeleteOptions{}, ) + require.Nil(t, err) time.Sleep(time.Second) diff --git a/pkg/runtime/service_controller.go b/pkg/runtime/service_controller.go index 61f3b6c..284981a 100644 --- a/pkg/runtime/service_controller.go +++ b/pkg/runtime/service_controller.go @@ -133,13 +133,15 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg c.metaLock.Lock() defer c.metaLock.Unlock() - clusterConfig := mgr.GetConfig() - clientset, err := kubernetes.NewForConfig(clusterConfig) - if err != nil { - return err + cache := ackrtcache.New(c.log) + if cfg.WatchNamespace == "" { + clusterConfig := mgr.GetConfig() + clientSet, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + return err + } + cache.Run(clientSet) } - cache := ackrtcache.New(clientset, c.log, cfg.WatchNamespace) - cache.Run() for _, rmf := range c.rmFactories { rec := NewReconciler(c, rmf, c.log, cfg, c.metrics, cache)