Skip to content

Commit

Permalink
Optionally run the Namespace and ConfigMap caches (#42)
Browse files Browse the repository at this point in the history
Issue N/A

Description of changes:

- Run the caches only when `--watch-namespace` is not empty.
- Remove the "watch namespace" filter from the `NamespaceCache` event handlers.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
  • Loading branch information
a-hilaly authored Aug 10, 2021
1 parent 9a99433 commit c5631b6
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 77 deletions.
33 changes: 13 additions & 20 deletions pkg/runtime/cache/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,13 @@ const (
// make the changes accordingly.
type AccountCache struct {
sync.RWMutex

log logr.Logger

// ConfigMap informer
informer k8scache.SharedInformer
log logr.Logger
roleARNs map[string]string
}

// NewAccountCache makes a new AccountCache from a client.Interface
// and a logr.Logger
func NewAccountCache(clientset kubernetes.Interface, log logr.Logger) *AccountCache {
sharedInformer := informersv1.NewConfigMapInformer(
clientset,
currentNamespace,
informerResyncPeriod,
k8scache.Indexers{},
)
// NewAccountCache instanciate a new AccountCache.
func NewAccountCache(log logr.Logger) *AccountCache {
return &AccountCache{
informer: sharedInformer,
log: log.WithName("cache.account"),
roleARNs: make(map[string]string),
}
Expand All @@ -65,10 +53,15 @@ func resourceMatchACKRoleAccountsConfigMap(raw interface{}) bool {
return ok && object.ObjectMeta.Name == ACKRoleAccountMap
}

// Run adds the default event handler functions to the SharedInformer and
// runs the informer to begin processing items.
func (c *AccountCache) Run(stopCh <-chan struct{}) {
c.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
// Run instantiate a new SharedInformer for ConfigMaps and runs it to begin processing items.
func (c *AccountCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{}) {
informer := informersv1.NewConfigMapInformer(
clientSet,
currentNamespace,
informerResyncPeriod,
k8scache.Indexers{},
)
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if resourceMatchACKRoleAccountsConfigMap(obj) {
cm := obj.(*corev1.ConfigMap)
Expand All @@ -95,7 +88,7 @@ func (c *AccountCache) Run(stopCh <-chan struct{}) {
}
},
})
go c.informer.Run(stopCh)
go informer.Run(stopCh)
}

// GetAccountRoleARN queries the AWS accountID associated Role ARN
Expand Down
7 changes: 4 additions & 3 deletions pkg/runtime/cache/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func TestAccountCache(t *testing.T) {
fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions))

// initlizing account cache
accountCache := ackrtcache.NewAccountCache(k8sClient, fakeLogger)
accountCache := ackrtcache.NewAccountCache(fakeLogger)
stopCh := make(chan struct{})
accountCache.Run(stopCh)
accountCache.Run(k8sClient, stopCh)

// Test create events
k8sClient.CoreV1().ConfigMaps(testNamespace).Create(
_, err := k8sClient.CoreV1().ConfigMaps(testNamespace).Create(
context.Background(),
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -76,6 +76,7 @@ func TestAccountCache(t *testing.T) {
},
metav1.CreateOptions{},
)
require.Nil(t, err)

time.Sleep(time.Second)

Expand Down
15 changes: 7 additions & 8 deletions pkg/runtime/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,22 @@ type Caches struct {
Namespaces *NamespaceCache
}

// New creates a new Caches object from a kubernetes.Interface and
// a logr.Logger
func New(clientset kubernetes.Interface, log logr.Logger, watchNamespace string) Caches {
// New instantiate a new Caches object.
func New(log logr.Logger) Caches {
return Caches{
Accounts: NewAccountCache(clientset, log),
Namespaces: NewNamespaceCache(clientset, log, watchNamespace),
Accounts: NewAccountCache(log),
Namespaces: NewNamespaceCache(log),
}
}

// Run runs all the owned caches
func (c Caches) Run() {
func (c Caches) Run(clientSet kubernetes.Interface) {
stopCh := make(chan struct{})
if c.Accounts != nil {
c.Accounts.Run(stopCh)
c.Accounts.Run(clientSet, stopCh)
}
if c.Namespaces != nil {
c.Namespaces.Run(stopCh)
c.Namespaces.Run(clientSet, stopCh)
}
c.stopCh = stopCh
}
Expand Down
56 changes: 21 additions & 35 deletions pkg/runtime/cache/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,74 +63,60 @@ func (n *namespaceInfo) getEndpointURL() string {
// annotations, and caching those related to the ACK controller.
type NamespaceCache struct {
sync.RWMutex

log logr.Logger
// Provide a namespace specifically to listen to.
// Provide empty string to listen to all namespaces except kube-system and kube-public.
watchNamespace string

// Namespace informer
informer k8scache.SharedInformer
// namespaceInfos maps namespaces names to their known namespaceInfo
namespaceInfos map[string]*namespaceInfo
}

// NewNamespaceCache makes a new NamespaceCache from a
// kubernetes.Interface and a logr.Logger
func NewNamespaceCache(clientset kubernetes.Interface, log logr.Logger, watchNamespace string) *NamespaceCache {
sharedInformer := informersv1.NewNamespaceInformer(
clientset,
informerResyncPeriod,
k8scache.Indexers{},
)
// NewNamespaceCache instanciate a new NamespaceCache.
func NewNamespaceCache(log logr.Logger) *NamespaceCache {
return &NamespaceCache{
informer: sharedInformer,
log: log.WithName("cache.namespace"),
watchNamespace: watchNamespace,
namespaceInfos: make(map[string]*namespaceInfo),
}
}

// Check if the provided namespace should be listened to or not
func isWatchNamespace(raw interface{}, watchNamespace string) bool {
// isIgnoredNamespace returns true if an object is of type corev1.Namespace
// and its metadata name is one of 'ack-system', 'kube-system' or 'kube-public'
func isIgnoredNamespace(raw interface{}) bool {
object, ok := raw.(*corev1.Namespace)
if !ok {
return false
}

if watchNamespace != "" {
return watchNamespace == object.ObjectMeta.Name
}
return object.ObjectMeta.Name != "kube-system" && object.ObjectMeta.Name != "kube-public"
return ok &&
(object.ObjectMeta.Name == "ack-system" ||
object.ObjectMeta.Name == "kube-system" ||
object.ObjectMeta.Name == "kube-public")
}

// Run adds event handler functions to the SharedInformer and
// runs the informer to begin processing items.
func (c *NamespaceCache) Run(stopCh <-chan struct{}) {
c.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
// Run instantiate a new shared informer for namespaces and runs it to begin processing items.
func (c *NamespaceCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{}) {
informer := informersv1.NewNamespaceInformer(
clientSet,
informerResyncPeriod,
k8scache.Indexers{},
)
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if isWatchNamespace(obj, c.watchNamespace) {
if !isIgnoredNamespace(obj) {
ns := obj.(*corev1.Namespace)
c.setNamespaceInfoFromK8sObject(ns)
c.log.V(1).Info("created namespace", "name", ns.ObjectMeta.Name)
}
},
UpdateFunc: func(orig, desired interface{}) {
if isWatchNamespace(desired, c.watchNamespace) {
if !isIgnoredNamespace(desired) {
ns := desired.(*corev1.Namespace)
c.setNamespaceInfoFromK8sObject(ns)
c.log.V(1).Info("updated namespace", "name", ns.ObjectMeta.Name)
}
},
DeleteFunc: func(obj interface{}) {
if isWatchNamespace(obj, c.watchNamespace) {
if !isIgnoredNamespace(obj) {
ns := obj.(*corev1.Namespace)
c.deleteNamespaceInfo(ns.ObjectMeta.Name)
c.log.V(1).Info("deleted namespace", "name", ns.ObjectMeta.Name)
}
},
})
go c.informer.Run(stopCh)
go informer.Run(stopCh)
}

// GetDefaultRegion returns the default region if it it exists
Expand Down
13 changes: 8 additions & 5 deletions pkg/runtime/cache/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func TestNamespaceCache(t *testing.T) {
fakeLogger := ctrlrtzap.New(ctrlrtzap.UseFlagOptions(&zapOptions))

// initlizing account cache
namespaceCache := ackrtcache.NewNamespaceCache(k8sClient, fakeLogger, "")
namespaceCache := ackrtcache.NewNamespaceCache(fakeLogger)
stopCh := make(chan struct{})

namespaceCache.Run(stopCh)
namespaceCache.Run(k8sClient, stopCh)

// Test create events
k8sClient.CoreV1().Namespaces().Create(
_, err := k8sClient.CoreV1().Namespaces().Create(
context.Background(),
&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -69,6 +69,7 @@ func TestNamespaceCache(t *testing.T) {
},
metav1.CreateOptions{},
)
require.Nil(t, err)

time.Sleep(time.Second)

Expand All @@ -85,7 +86,7 @@ func TestNamespaceCache(t *testing.T) {
require.Equal(t, "https://amazon-service.region.amazonaws.com", endpointURL)

// Test update events
k8sClient.CoreV1().Namespaces().Update(
_, err = k8sClient.CoreV1().Namespaces().Update(
context.Background(),
&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -99,6 +100,7 @@ func TestNamespaceCache(t *testing.T) {
},
metav1.UpdateOptions{},
)
require.Nil(t, err)

time.Sleep(time.Second)

Expand All @@ -115,11 +117,12 @@ func TestNamespaceCache(t *testing.T) {
require.Equal(t, "https://amazon-other-service.region.amazonaws.com", endpointURL)

// Test delete events
k8sClient.CoreV1().Namespaces().Delete(
err = k8sClient.CoreV1().Namespaces().Delete(
context.Background(),
"production",
metav1.DeleteOptions{},
)
require.Nil(t, err)

time.Sleep(time.Second)

Expand Down
14 changes: 8 additions & 6 deletions pkg/runtime/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,15 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg
c.metaLock.Lock()
defer c.metaLock.Unlock()

clusterConfig := mgr.GetConfig()
clientset, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return err
cache := ackrtcache.New(c.log)
if cfg.WatchNamespace == "" {
clusterConfig := mgr.GetConfig()
clientSet, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return err
}
cache.Run(clientSet)
}
cache := ackrtcache.New(clientset, c.log, cfg.WatchNamespace)
cache.Run()

for _, rmf := range c.rmFactories {
rec := NewReconciler(c, rmf, c.log, cfg, c.metrics, cache)
Expand Down

0 comments on commit c5631b6

Please sign in to comment.