diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index 645e776e..289e0c6c 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -1,6 +1,7 @@ package kube_events_manager import ( + "context" "sync" "time" @@ -31,7 +32,8 @@ type FactoryIndex struct { type Factory struct { shared dynamicinformer.DynamicSharedInformerFactory score uint64 - stopCh chan struct{} + ctx context.Context + cancel context.CancelFunc } type FactoryStore struct { @@ -45,15 +47,17 @@ func NewFactoryStore() *FactoryStore { } } -func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { +func (c *FactoryStore) add(ctx context.Context, index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) { + cctx, cancel := context.WithCancel(ctx) c.data[index] = Factory{ shared: f, score: uint64(1), - stopCh: make(chan struct{}, 1), + ctx: cctx, + cancel: cancel, } } -func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory { +func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index FactoryIndex) Factory { f, ok := c.data[index] if ok { f.score++ @@ -76,15 +80,15 @@ func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory client, resyncPeriod, index.Namespace, tweakListOptions) factory.ForResource(index.GVR) - c.add(index, factory) + c.add(ctx, index, factory) return c.data[index] } -func (c *FactoryStore) Start(client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error { +func (c *FactoryStore) Start(ctx context.Context, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error { c.mu.Lock() defer c.mu.Unlock() - factory := c.get(client, index) + factory := c.get(ctx, client, index) informer := factory.shared.ForResource(index.GVR).Informer() // Add error handler, ignore "already started" error. @@ -93,11 +97,11 @@ func (c *FactoryStore) Start(client dynamic.Interface, index FactoryIndex, handl informer.AddEventHandler(handler) if !informer.HasSynced() { - go informer.Run(factory.stopCh) + go informer.Run(factory.ctx.Done()) - if err := wait.PollImmediateUntil(DefaultSyncTime, func() (bool, error) { + if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return informer.HasSynced(), nil - }, factory.stopCh); err != nil { + }); err != nil { return err } } @@ -116,7 +120,7 @@ func (c *FactoryStore) Stop(index FactoryIndex) { f.score-- if f.score == 0 { - close(f.stopCh) + f.cancel() } delete(c.data, index) diff --git a/pkg/kube_events_manager/namespace_informer.go b/pkg/kube_events_manager/namespace_informer.go index e4984eb7..533e83de 100644 --- a/pkg/kube_events_manager/namespace_informer.go +++ b/pkg/kube_events_manager/namespace_informer.go @@ -125,17 +125,18 @@ func (ni *namespaceInformer) start() { log.Errorf("%s: Possible BUG!!! Start called before createSharedInformer, ShredInformer is nil", ni.Monitor.Metadata.DebugName) return } - stopCh := make(chan struct{}, 1) + cctx, cancel := context.WithCancel(ni.ctx) go func() { <-ni.ctx.Done() ni.stopped = true - close(stopCh) + cancel() }() - go ni.SharedInformer.Run(stopCh) - if err := wait.PollImmediateUntil(DefaultSyncTime, func() (bool, error) { + go ni.SharedInformer.Run(cctx.Done()) + + if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return ni.SharedInformer.HasSynced(), nil - }, stopCh); err != nil { + }); err != nil { ni.Monitor.LogEntry.Errorf("%s: cache is not synced for informer", ni.Monitor.Metadata.DebugName) } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index 86fae3d9..705ca54b 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -433,7 +433,7 @@ func (ei *resourceInformer) start() { // TODO: separate handler and informer errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage) - err := DefaultFactoryStore.Start(ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) + err := DefaultFactoryStore.Start(ei.ctx, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) if err != nil { ei.Monitor.LogEntry.Errorf("%s: cache is not synced for informer", ei.Monitor.Metadata.DebugName) return